1
1

Update the multicast subsystem - ported from Cisco branch

This commit was SVN r24246.
Этот коммит содержится в:
Ralph Castain 2011-01-13 01:54:05 +00:00
родитель f3aaa885a3
Коммит b09f57b03d
15 изменённых файлов: 941 добавлений и 713 удалений

Просмотреть файл

@ -43,8 +43,8 @@ static void opal_ring_buffer_construct(opal_ring_buffer_t *ring)
OBJ_CONSTRUCT(&ring->lock, opal_mutex_t);
OBJ_CONSTRUCT(&ring->cond, opal_condition_t);
ring->in_use = false;
ring->head = NULL;
ring->tail = NULL;
ring->head = 0;
ring->tail = -1;
ring->size = 0;
ring->addr = NULL;
}
@ -81,8 +81,78 @@ int opal_ring_buffer_init(opal_ring_buffer_t* ring, int size)
return OPAL_ERR_OUT_OF_RESOURCE;
}
ring->size = size;
/* point the head to the first location */
ring->head = &ring->addr[0];
return OPAL_SUCCESS;
}
void* opal_ring_buffer_push(opal_ring_buffer_t *ring, void *ptr)
{
char *p=NULL;
OPAL_ACQUIRE_THREAD(&(ring->lock), &(ring->cond), &(ring->in_use));
if (NULL != ring->addr[ring->head]) {
p = (char*)ring->addr[ring->head];
if (ring->tail == ring->size - 1) {
ring->tail = 0;
} else {
ring->tail = ring->head + 1;
}
}
ring->addr[ring->head] = (char*)ptr;
if (ring->tail < 0) {
ring->tail = ring->head;
}
if (ring->head == ring->size - 1) {
ring->head = 0;
} else {
ring->head++;
}
OPAL_RELEASE_THREAD(&(ring->lock), &(ring->cond), &(ring->in_use));
return (void*)p;
}
void* opal_ring_buffer_pop(opal_ring_buffer_t *ring)
{
char *p=NULL;
OPAL_ACQUIRE_THREAD(&(ring->lock), &(ring->cond), &(ring->in_use));
if (-1 == ring->tail) {
/* nothing has been put on the ring yet */
p = NULL;
} else {
p = (char*)ring->addr[ring->tail];
ring->addr[ring->tail] = NULL;
if (ring->tail == ring->size-1) {
ring->tail = 0;
} else {
ring->tail++;
}
/* see if the ring is empty */
if (ring->tail == ring->head) {
ring->tail = -1;
}
}
OPAL_RELEASE_THREAD(&(ring->lock), &(ring->cond), &(ring->in_use));
return (void*)p;
}
void* opal_ring_buffer_poke(opal_ring_buffer_t *ring, int i)
{
char *p=NULL;
int offset;
OPAL_ACQUIRE_THREAD(&(ring->lock), &(ring->cond), &(ring->in_use));
if (ring->size <= i || -1 == ring->tail) {
p = NULL;
} else {
/* calculate the offset of the tail in the ring */
offset = ring->tail + i;
/* correct for wrap-around */
if (ring->size <= offset) {
offset -= ring->size;
}
p = ring->addr[offset];
}
OPAL_RELEASE_THREAD(&(ring->lock), &(ring->cond), &(ring->in_use));
return (void*)p;
}

Просмотреть файл

@ -43,8 +43,8 @@ struct opal_ring_buffer_t {
opal_condition_t cond;
bool in_use;
/* head/tail indices */
char **head;
char **tail;
int head;
int tail;
/** size of list, i.e. number of elements in addr */
int size;
/** pointer to ring */
@ -79,35 +79,11 @@ OPAL_DECLSPEC int opal_ring_buffer_init(opal_ring_buffer_t* ring, int size);
* @return OPAL_SUCCESS. Returns error if ring cannot take
* another entry
*/
static inline void* opal_ring_buffer_push(opal_ring_buffer_t *ring, void *ptr)
{
char *p=NULL;
OPAL_ACQUIRE_THREAD(&(ring->lock), &(ring->cond), &(ring->in_use));
if (NULL != *ring->head) {
p = *ring->head;
if (ring->head == ring->tail) {
/* push the tail ahead of us */
if (ring->tail == &ring->addr[ring->size-1]) {
ring->tail = &ring->addr[0];
} else {
ring->tail++;
}
}
}
*ring->head = (char *) ptr;
if (ring->head == &ring->addr[ring->size-1]) {
ring->head = &ring->addr[0];
} else {
ring->head++;
}
OPAL_RELEASE_THREAD(&(ring->lock), &(ring->cond), &(ring->in_use));
return (void*)p;
}
OPAL_DECLSPEC void* opal_ring_buffer_push(opal_ring_buffer_t *ring, void *ptr);
/**
* Pop an item off of the ring. The head of the ring will be
* Pop an item off of the ring. The oldest entry on the ring will be
* returned. If nothing on the ring, NULL is returned.
*
* @param ring Pointer to ring (IN)
@ -115,26 +91,13 @@ static inline void* opal_ring_buffer_push(opal_ring_buffer_t *ring, void *ptr)
* @return Error code. NULL indicates an error.
*/
static inline void* opal_ring_buffer_pop(opal_ring_buffer_t *ring)
{
char *p;
OPAL_ACQUIRE_THREAD(&(ring->lock), &(ring->cond), &(ring->in_use));
if (NULL == ring->tail || ring->head == ring->tail) {
p = NULL;
} else {
p = *ring->tail;
*ring->tail = NULL;
if (ring->tail == &ring->addr[ring->size-1]) {
ring->tail = &ring->addr[0];
} else {
ring->tail++;
}
}
OPAL_RELEASE_THREAD(&(ring->lock), &(ring->cond), &(ring->in_use));
return (void*)p;
}
OPAL_DECLSPEC void* opal_ring_buffer_pop(opal_ring_buffer_t *ring);
/*
* Access an element of the ring, without removing it, indexed
* starting at the tail
*/
OPAL_DECLSPEC void* opal_ring_buffer_poke(opal_ring_buffer_t *ring, int i);
END_C_DECLS

Просмотреть файл

@ -118,7 +118,9 @@ enum {
ORTE_ERR_MEM_LIMIT_EXCEEDED = (ORTE_ERR_BASE - 36),
ORTE_ERR_HEARTBEAT_LOST = (ORTE_ERR_BASE - 37),
ORTE_ERR_PROC_STALLED = (ORTE_ERR_BASE - 38),
ORTE_ERR_COMM_DISABLED = (ORTE_ERR_BASE - 39)
ORTE_ERR_NO_APP_SPECIFIED = (ORTE_ERR_BASE - 39),
ORTE_ERR_NO_EXE_SPECIFIED = (ORTE_ERR_BASE - 40),
ORTE_ERR_COMM_DISABLED = (ORTE_ERR_BASE - 41)
};
#define ORTE_ERR_MAX (ORTE_ERR_BASE - 100)

Просмотреть файл

@ -25,6 +25,7 @@
#include "opal/mca/event/event.h"
#include "orte/threads/threads.h"
#include "orte/mca/rmcast/rmcast.h"
#include "orte/mca/rmcast/base/private.h"
@ -53,7 +54,8 @@ typedef struct {
opal_list_t channels;
rmcast_base_channel_t *my_output_channel;
rmcast_base_channel_t *my_input_channel;
opal_event_base_t *event_base;
bool unreliable_xport;
opal_list_t msg_logs;
opal_thread_t recv_thread;
orte_thread_ctl_t recv_ctl;
int recv_pipe[2];

Просмотреть файл

@ -27,7 +27,6 @@
#include "opal/mca/event/event.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_ring_buffer.h"
#include "opal/util/fd.h"
#include "orte/threads/threads.h"
#include "orte/mca/rmcast/rmcast.h"
@ -55,14 +54,10 @@ typedef struct {
uint16_t port;
uint32_t interface;
int xmit;
bool restart;
orte_rmcast_seq_t seq_num;
int recv;
struct sockaddr_in addr;
opal_event_t send_ev;
opal_mutex_t send_lock;
bool sends_in_progress;
opal_list_t pending_sends;
uint8_t *send_data;
opal_event_t recv_ev;
/* ring buffer to cache our messages */
opal_ring_buffer_t cache;
@ -111,10 +106,16 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_base_send_t);
/* Data structure for tracking recvd sequence numbers */
typedef struct {
opal_object_t super;
orte_process_name_t name;
opal_list_item_t super;
orte_rmcast_channel_t channel;
orte_rmcast_seq_t seq_num;
} rmcast_seq_tracker_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_seq_tracker_t);
typedef struct {
opal_list_item_t super;
orte_process_name_t name;
opal_list_t last_msg;
} rmcast_recv_log_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_recv_log_t);
@ -130,30 +131,31 @@ typedef struct {
} rmcast_send_log_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t);
#define ORTE_MULTICAST_MESSAGE_EVENT(dt, sz) \
do { \
opal_buffer_t *buf; \
OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output, \
"defining mcast msg event: %s %d", \
__FILE__, __LINE__)); \
buf = OBJ_NEW(opal_buffer_t); \
opal_dss.load(buf, (dt), (sz)); \
if (orte_progress_threads_enabled) { \
opal_fd_write(orte_rmcast_base.recv_pipe[1], \
sizeof(opal_buffer_t*), &buf); \
} else { \
orte_rmcast_base_process_msg(buf); \
} \
} while(0);
#define ORTE_MULTICAST_NEXT_SEQUENCE_NUM(seq) \
do { \
if ((seq) < ORTE_RMCAST_SEQ_MAX) { \
(seq) += 1; \
} else { \
(seq) = 0; \
} \
#define ORTE_MULTICAST_MESSAGE_EVENT(sndr, bf) \
do { \
orte_rmcast_msg_t *msg; \
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
"defining mcast msg event: %s %d", \
__FILE__, __LINE__)); \
msg = OBJ_NEW(orte_rmcast_msg_t); \
msg->sender.jobid = (sndr)->jobid; \
msg->sender.vpid = (sndr)->vpid; \
opal_dss.copy_payload(msg->buf, (bf)); \
opal_fd_write(orte_rmcast_base.recv_pipe[1], \
sizeof(orte_rmcast_msg_t*), &msg); \
} while(0);
#define ORTE_MULTICAST_NEXT_SEQUENCE_NUM(seq) \
do { \
if ((seq) == ORTE_RMCAST_SEQ_INVALID || \
(seq) == ORTE_RMCAST_SEQ_MAX) { \
(seq) = 0; \
} else { \
(seq) += 1; \
} \
} while(0);
/**** FUNCTIONS ****/
@ -165,15 +167,16 @@ ORTE_DECLSPEC int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
orte_rmcast_callback_buffer_fn_t cbfunc_buffer,
void *cbdata, bool blocking);
ORTE_DECLSPEC rmcast_base_channel_t* orte_rmcast_base_get_channel(orte_rmcast_channel_t channel);
ORTE_DECLSPEC int orte_rmcast_base_queue_xmit(rmcast_base_send_t *snd,
orte_rmcast_channel_t channel,
opal_buffer_t **buffer,
rmcast_base_channel_t **chan);
ORTE_DECLSPEC int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread);
ORTE_DECLSPEC int orte_rmcast_base_start_threads(void);
ORTE_DECLSPEC void orte_rmcast_base_stop_threads(void);
ORTE_DECLSPEC int orte_rmcast_base_process_msg(opal_buffer_t *msg);
ORTE_DECLSPEC void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg);
ORTE_DECLSPEC void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag);

Просмотреть файл

@ -30,6 +30,11 @@ int orte_rmcast_base_close(void)
if (NULL != orte_rmcast.finalize) {
orte_rmcast.finalize();
}
while (NULL != (item = opal_list_remove_first(&orte_rmcast_base.msg_logs))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&orte_rmcast_base.msg_logs);
while (NULL != (item = opal_list_remove_first(&orte_rmcast_base.recvs))) {
OBJ_RELEASE(item);
@ -47,12 +52,8 @@ int orte_rmcast_base_close(void)
OBJ_DESTRUCT(&orte_rmcast_base.recv_process);
OBJ_DESTRUCT(&orte_rmcast_base.recv_process_ctl);
if (orte_progress_threads_enabled) {
opal_event_base_finalize(orte_rmcast_base.event_base);
}
/* Close all remaining available components (may be one if this is a
Open RTE program, or [possibly] multiple if this is ompi_info) */
Open RTE program, or [possibly] multiple if this is ompi_info) */
mca_base_components_close(orte_rmcast_base.rmcast_output,
&orte_rmcast_base.rmcast_opened, NULL);

Просмотреть файл

@ -14,7 +14,7 @@
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "orte/threads/threads.h"
#include "opal/threads/threads.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
@ -26,42 +26,29 @@
static int insert_hdr(opal_buffer_t *buf,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
bool restart,
orte_rmcast_seq_t seq_num);
int orte_rmcast_base_queue_xmit(rmcast_base_send_t *snd,
orte_rmcast_channel_t channel,
opal_buffer_t **buffer,
rmcast_base_channel_t **chan)
rmcast_base_channel_t* orte_rmcast_base_get_channel(orte_rmcast_channel_t channel)
{
rmcast_base_channel_t *chptr, *ch;
int32_t sz;
int rc;
int8_t flag;
int32_t tmp32;
opal_buffer_t *buf;
opal_list_item_t *item;
/* setup default responses */
*buffer = NULL;
*chan = NULL;
/* if we were asked to send this on our group output
* channel, substitute it
*/
if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
if (NULL == orte_rmcast_base.my_output_channel) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
return NULL;
}
ch = orte_rmcast_base.my_output_channel;
goto process;
return orte_rmcast_base.my_output_channel;
} else if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
if (NULL == orte_rmcast_base.my_input_channel) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
return NULL;
}
ch = orte_rmcast_base.my_input_channel;
goto process;
return orte_rmcast_base.my_input_channel;
}
/* find the channel */
@ -79,28 +66,54 @@ int orte_rmcast_base_queue_xmit(rmcast_base_send_t *snd,
ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
if (NULL == ch) {
/* didn't find it */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return NULL;
}
return ch;
}
int orte_rmcast_base_queue_xmit(rmcast_base_send_t *snd,
orte_rmcast_channel_t channel,
opal_buffer_t **buffer,
rmcast_base_channel_t **chan)
{
rmcast_base_channel_t *ch;
int32_t sz;
int rc;
int8_t flag;
int32_t tmp32;
opal_buffer_t *buf;
/* setup default responses */
*buffer = NULL;
*chan = NULL;
/* get the channel object */
if (NULL == (ch = orte_rmcast_base_get_channel(channel))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
process:
/* return the channel */
*chan = ch;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:queue_xmit of %d %s"
" for multicast on channel %d tag %d",
" for multicast on channel %d tag %d seq_num %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == snd->iovec_array) ? (int)snd->buf->bytes_used : (int)snd->iovec_count,
(NULL == snd->iovec_array) ? "bytes" : "iovecs",
(int)ch->channel, snd->tag));
(int)ch->channel, snd->tag, ch->seq_num));
/* setup a buffer */
buf = OBJ_NEW(opal_buffer_t);
*buffer = buf;
/* assign a sequence number */
ORTE_MULTICAST_NEXT_SEQUENCE_NUM(ch->seq_num);
/* insert the header */
if (ORTE_SUCCESS != (rc = insert_hdr(buf, ch->channel, snd->tag, ch->seq_num))) {
if (ORTE_SUCCESS != (rc = insert_hdr(buf, ch->channel, snd->tag, ch->restart, ch->seq_num))) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -151,6 +164,10 @@ int orte_rmcast_base_queue_xmit(rmcast_base_send_t *snd,
goto cleanup;
}
}
/* flag this channel as no longer in restart mode since
* it will have sent at least one message
*/
ch->restart = false;
return ORTE_SUCCESS;
cleanup:
@ -328,9 +345,11 @@ int orte_rmcast_base_query(orte_rmcast_channel_t *output, orte_rmcast_channel_t
static int insert_hdr(opal_buffer_t *buf,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
bool restart,
orte_rmcast_seq_t seq_num)
{
int rc;
uint8_t flag;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
@ -347,6 +366,16 @@ static int insert_hdr(opal_buffer_t *buf,
return rc;
}
if (restart) {
flag = 1;
} else {
flag = 0;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &flag, 1, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &seq_num, 1, ORTE_RMCAST_SEQ_T))) {
ORTE_ERROR_LOG(rc);
}

Просмотреть файл

@ -28,7 +28,6 @@
#include "opal/util/opal_sos.h"
#include "opal/class/opal_ring_buffer.h"
#include "opal/class/opal_list.h"
#include "opal/mca/event/event.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
@ -79,6 +78,7 @@ orte_rmcast_module_t orte_rmcast = {
NULL,
NULL,
NULL,
NULL,
NULL
};
orte_rmcast_base_t orte_rmcast_base;
@ -116,6 +116,9 @@ int orte_rmcast_base_open(void)
OBJ_CONSTRUCT(&orte_rmcast_base.recv_process, opal_thread_t);
OBJ_CONSTRUCT(&orte_rmcast_base.recv_process_ctl, orte_thread_ctl_t);
OBJ_CONSTRUCT(&orte_rmcast_base.msg_logs, opal_list_t);
orte_rmcast_base.unreliable_xport = false;
orte_rmcast_base.xmit_network = 0;
orte_rmcast_base.my_group_name = NULL;
orte_rmcast_base.my_group_number = 0;
@ -126,13 +129,6 @@ int orte_rmcast_base_open(void)
orte_rmcast_base.my_output_channel = NULL;
orte_rmcast_base.my_input_channel = NULL;
/* setup the local event base */
if (orte_progress_threads_enabled) {
orte_rmcast_base.event_base = opal_event_base_create();
} else {
orte_rmcast_base.event_base = opal_event_base;
}
/* public multicast channel for this job */
mca_base_param_reg_string_name("rmcast", "base_multicast_network",
"Network to use for multicast xmissions [link (default) | site | org | global | tuple-addr]",
@ -151,6 +147,7 @@ int orte_rmcast_base_open(void)
/* must have been given an actual network address */
rc = opal_iftupletoaddr(tmp, &orte_rmcast_base.xmit_network, NULL);
}
free(tmp);
if (ORTE_SUCCESS != rc) {
orte_show_help("help-rmcast-base.txt", "unrecognized-network", true, tmp);
@ -178,6 +175,7 @@ int orte_rmcast_base_open(void)
return ORTE_ERR_SILENT;
}
orte_rmcast_base.my_group_number = value;
free(tmp);
} else {
/* since nothing was given, use our local jobid */
orte_rmcast_base.my_group_name = strdup(ORTE_LOCAL_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
@ -376,14 +374,10 @@ static void channel_construct(rmcast_base_channel_t *ptr)
ptr->port = 0;
ptr->interface = 0;
ptr->xmit = -1;
ptr->seq_num = 0;
ptr->restart = true;
ptr->seq_num = ORTE_RMCAST_SEQ_INVALID;
ptr->recv = -1;
memset(&ptr->addr, 0, sizeof(ptr->addr));
memset(&ptr->send_ev, 0, sizeof(opal_event_t));
OBJ_CONSTRUCT(&ptr->send_lock, opal_mutex_t);
ptr->sends_in_progress = false;
OBJ_CONSTRUCT(&ptr->pending_sends, opal_list_t);
ptr->send_data = NULL;
memset(&ptr->recv_ev, 0, sizeof(opal_event_t));
OBJ_CONSTRUCT(&ptr->cache, opal_ring_buffer_t);
opal_ring_buffer_init(&ptr->cache, orte_rmcast_base.cache_size);
@ -397,20 +391,10 @@ static void channel_destruct(rmcast_base_channel_t *ptr)
opal_event_del(&ptr->recv_ev);
CLOSE_THE_SOCKET(ptr->recv);
}
/* attempt to xmit any pending sends */
/* cleanup the xmit side */
if (0 < ptr->xmit) {
opal_event_del(&ptr->send_ev);
CLOSE_THE_SOCKET(ptr->xmit);
}
OBJ_DESTRUCT(&ptr->send_lock);
/* release the channel name */
if (NULL != ptr->name) {
free(ptr->name);
}
if (NULL != ptr->send_data) {
free(ptr->send_data);
}
/* clear the cache */
while (NULL != (rb = (rmcast_send_log_t*)opal_ring_buffer_pop(&ptr->cache))) {
OBJ_RELEASE(rb);
@ -422,35 +406,45 @@ OBJ_CLASS_INSTANCE(rmcast_base_channel_t,
channel_construct,
channel_destruct);
static void trk_construct(rmcast_seq_tracker_t *ptr)
{
ptr->channel = ORTE_RMCAST_INVALID_CHANNEL;
ptr->seq_num = ORTE_RMCAST_SEQ_INVALID;
}
OBJ_CLASS_INSTANCE(rmcast_seq_tracker_t,
opal_list_item_t,
trk_construct, NULL);
static void recvlog_construct(rmcast_recv_log_t *ptr)
{
ptr->name.jobid = ORTE_JOBID_INVALID;
ptr->name.vpid = ORTE_VPID_INVALID;
ptr->channel = ORTE_RMCAST_INVALID_CHANNEL;
ptr->seq_num = 0;
OBJ_CONSTRUCT(&ptr->last_msg, opal_list_t);
}
static void recvlog_destruct(rmcast_recv_log_t *ptr)
{
opal_list_item_t *item;
ptr->name.jobid = ORTE_JOBID_INVALID;
ptr->name.vpid = ORTE_VPID_INVALID;
ptr->channel = ORTE_RMCAST_INVALID_CHANNEL;
ptr->seq_num = 0;
while (NULL != (item = opal_list_remove_first(&ptr->last_msg))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&ptr->last_msg);
}
OBJ_CLASS_INSTANCE(rmcast_recv_log_t,
opal_object_t,
opal_list_item_t,
recvlog_construct,
recvlog_destruct);
static void sendlog_construct(rmcast_send_log_t *ptr)
{
ptr->channel = ORTE_RMCAST_INVALID_CHANNEL;
ptr->seq_num = 0;
ptr->seq_num = ORTE_RMCAST_SEQ_INVALID;
ptr->buf = OBJ_NEW(opal_buffer_t);
}
static void sendlog_destruct(rmcast_send_log_t *ptr)
{
ptr->channel = ORTE_RMCAST_INVALID_CHANNEL;
ptr->seq_num = 0;
if (NULL != ptr->buf) {
OBJ_RELEASE(ptr->buf);
}
@ -460,4 +454,19 @@ OBJ_CLASS_INSTANCE(rmcast_send_log_t,
sendlog_construct,
sendlog_destruct);
static void msg_construct(orte_rmcast_msg_t *ptr)
{
ptr->buf = OBJ_NEW(opal_buffer_t);
}
static void msg_destruct(orte_rmcast_msg_t *ptr)
{
if (NULL != ptr->buf) {
OBJ_RELEASE(ptr->buf);
}
}
OBJ_CLASS_INSTANCE(orte_rmcast_msg_t,
opal_object_t,
msg_construct,
msg_destruct);
#endif /* ORTE_DISABLE_FULL_SUPPORT */

Просмотреть файл

@ -17,6 +17,7 @@
#include "opal/util/fd.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/threads/threads.h"
@ -24,50 +25,22 @@
#include "orte/mca/rmcast/base/base.h"
#include "orte/mca/rmcast/base/private.h"
static void* rcv_progress_thread(opal_object_t *obj);
static void* rcv_processing_thread(opal_object_t *obj);
static int extract_hdr(opal_buffer_t *buf,
orte_process_name_t *name,
orte_rmcast_channel_t *channel,
orte_rmcast_tag_t *tag,
bool *restart,
orte_rmcast_seq_t *seq_num);
int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
int orte_rmcast_base_start_threads(void)
{
int rc;
if (!orte_progress_threads_enabled) {
return ORTE_SUCCESS;
}
if (rcv_thread && !orte_rmcast_base.recv_ctl.running) {
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: starting recv thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* set the update to target the rmcast event base since the
* recv thread will be progressing that event base
*/
orte_rmcast_base.recv_ctl.evbase = orte_rmcast_base.event_base;
/* start the thread */
orte_rmcast_base.recv_thread.t_run = rcv_progress_thread;
if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_thread))) {
ORTE_ERROR_LOG(rc);
orte_rmcast_base.recv_ctl.running = false;
return rc;
}
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: recv thread started",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
if (processing_thread && !orte_rmcast_base.recv_process_ctl.running) {
if (!orte_rmcast_base.recv_process_ctl.running) {
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: starting recv processing thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* set the update to target the rmcast event base since we are
* processing messages that arrive from that source
*/
orte_rmcast_base.recv_process_ctl.evbase = orte_rmcast_base.event_base;
/* setup a pipe that we will use to signal the thread that a message
* is waiting to be processed - don't define an event for it
*/
@ -98,23 +71,6 @@ void orte_rmcast_base_stop_threads(void)
{
opal_buffer_t *msg=NULL;
if (!orte_progress_threads_enabled) {
return;
}
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: stopping recv thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if the thread is active, stop it */
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
if (orte_rmcast_base.recv_ctl.running) {
orte_rmcast_base.recv_ctl.stop = true;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
opal_thread_join(&orte_rmcast_base.recv_thread, NULL);
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
}
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: stopping recv processing thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -132,7 +88,7 @@ void orte_rmcast_base_stop_threads(void)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
int orte_rmcast_base_process_msg(opal_buffer_t *msg)
void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg)
{
orte_rmcast_channel_t channel;
rmcast_base_recv_t *ptr, *recv=NULL;
@ -144,9 +100,13 @@ int orte_rmcast_base_process_msg(opal_buffer_t *msg)
int rc=ORTE_SUCCESS;
orte_rmcast_seq_t recvd_seq_num;
opal_list_item_t *item;
rmcast_seq_tracker_t *trkr, *tptr;
rmcast_recv_log_t *log, *logptr;
bool restart;
opal_buffer_t alert;
/* extract the header */
if (ORTE_SUCCESS != (rc = extract_hdr(msg, &name, &channel, &tag, &recvd_seq_num))) {
if (ORTE_SUCCESS != (rc = extract_hdr(msg->buf, &name, &channel, &tag, &restart, &recvd_seq_num))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -160,6 +120,16 @@ int orte_rmcast_base_process_msg(opal_buffer_t *msg)
goto cleanup;
}
/* if this is a heartbeat and I am not a daemon, then ignore it
* to avoid swamping tools
*/
if (!ORTE_PROC_IS_DAEMON && ORTE_RMCAST_TAG_HEARTBEAT == tag) {
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv ignoring heartbeat",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto cleanup;
}
/* if this message is from a different job family, ignore it unless
* it is on the system channel. We ignore these messages to avoid
* confusion between different jobs since we all may be sharing
@ -179,15 +149,99 @@ int orte_rmcast_base_process_msg(opal_buffer_t *msg)
}
}
if (orte_rmcast_base.unreliable_xport) {
/* if the message is not on a system-specified channel, then check to see if we
* are missing any messages and need a resend
*/
if (ORTE_RMCAST_DYNAMIC_CHANNELS <= channel) {
log = NULL;
for (item = opal_list_get_first(&orte_rmcast_base.msg_logs);
item != opal_list_get_end(&orte_rmcast_base.msg_logs);
item = opal_list_get_next(item)) {
logptr = (rmcast_recv_log_t*)item;
/* look for this source */
if (name.jobid == logptr->name.jobid &&
name.vpid == logptr->name.vpid) {
log = logptr;
break;
}
}
if (NULL == log) {
/* new source */
log = OBJ_NEW(rmcast_recv_log_t);
log->name.jobid = name.jobid;
log->name.vpid = name.vpid;
opal_list_append(&orte_rmcast_base.msg_logs, &log->super);
}
/* look for the channel */
trkr = NULL;
for (item = opal_list_get_first(&log->last_msg);
item != opal_list_get_end(&log->last_msg);
item = opal_list_get_next(item)) {
tptr = (rmcast_seq_tracker_t*)item;
if (channel == tptr->channel) {
trkr = tptr;
break;
}
}
if (NULL == trkr) {
/* new channel */
trkr = OBJ_NEW(rmcast_seq_tracker_t);
trkr->channel = channel;
opal_list_append(&log->last_msg, &trkr->super);
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
"%s NEW CHANNEL: %d SENDER: %s SEQ %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
channel, ORTE_NAME_PRINT(&log->name), recvd_seq_num));
} else if (ORTE_RMCAST_SEQ_INVALID != trkr->seq_num && !restart) {
/* if this is a repeat msg, ignore it */
if (recvd_seq_num <= trkr->seq_num) {
OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output,
"%s Repeat msg %d on channel %d from source %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recvd_seq_num, channel,
ORTE_NAME_PRINT(&name)));
}
if (1 != (recvd_seq_num - trkr->seq_num) ||
(ORTE_RMCAST_SEQ_MAX == trkr->seq_num && 0 != recvd_seq_num)) {
/* missing a message - request it */
OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output,
"%s Missing msg %d (%d) on channel %d from source %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recvd_seq_num,
trkr->seq_num, channel, ORTE_NAME_PRINT(&name)));
OBJ_CONSTRUCT(&alert, opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &channel, 1, ORTE_RMCAST_CHANNEL_T))) {
ORTE_ERROR_LOG(rc);
exit(1);
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &trkr->seq_num, 1, ORTE_RMCAST_SEQ_T))) {
ORTE_ERROR_LOG(rc);
exit(1);
}
if (0 > (rc = orte_rml.send_buffer(&name, &alert, ORTE_RML_TAG_MISSED_MSG, 0))) {
ORTE_ERROR_LOG(rc);
exit(1);
}
OBJ_DESTRUCT(&alert);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
"%s CHANNEL: %d SENDER: %s SEQ: %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
channel, ORTE_NAME_PRINT(&log->name), recvd_seq_num));
}
trkr->seq_num = recvd_seq_num;
}
}
/* unpack the iovec vs buf flag */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg, &flag, &n, OPAL_INT8))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &flag, &n, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %lu",
"%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name), channel, (int)tag,
(0 == flag) ? "iovecs" : "buffer", recvd_seq_num));
@ -240,7 +294,7 @@ int orte_rmcast_base_process_msg(opal_buffer_t *msg)
if (0 == flag) {
/* get the number of iovecs in the buffer */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg, &iovec_count, &n, OPAL_INT32))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &iovec_count, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -250,7 +304,7 @@ int orte_rmcast_base_process_msg(opal_buffer_t *msg)
for (i=0; i < iovec_count; i++) {
/* unpack the number of bytes in this iovec */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg, &isz, &n, OPAL_INT32))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &isz, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -260,7 +314,7 @@ int orte_rmcast_base_process_msg(opal_buffer_t *msg)
/* allocate the space */
iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(isz);
/* unpack the data */
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg, iovec_array[i].iov_base, &isz, OPAL_UINT8))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, iovec_array[i].iov_base, &isz, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
@ -292,7 +346,6 @@ int orte_rmcast_base_process_msg(opal_buffer_t *msg)
}
/* release blocking recv */
ORTE_WAKEUP_THREAD(&recv->ctl);
return ORTE_SUCCESS;
}
} else {
if (NULL != recv->cbfunc_buffer) {
@ -300,7 +353,7 @@ int orte_rmcast_base_process_msg(opal_buffer_t *msg)
"%s rmcast:base:process_recv delivering buffer to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv->channel, (int)tag));
recv->cbfunc_buffer(ORTE_SUCCESS, recv->channel, recv->seq_num, tag,
&name, msg, recv->cbdata);
&name, msg->buf, recv->cbdata);
} else {
/* if something is already present, then we have a problem */
if (NULL != recv->buf) {
@ -316,13 +369,12 @@ int orte_rmcast_base_process_msg(opal_buffer_t *msg)
* by the blocking recv
*/
recv->buf = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(recv->buf, msg))) {
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(recv->buf, msg->buf))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* release blocking recv */
ORTE_WAKEUP_THREAD(&recv->ctl);
return ORTE_SUCCESS;
}
}
@ -342,13 +394,13 @@ int orte_rmcast_base_process_msg(opal_buffer_t *msg)
OBJ_RELEASE(recv);
}
return rc;
return;
}
static void* rcv_processing_thread(opal_object_t *obj)
{
opal_buffer_t *msg;
orte_rmcast_msg_t *msg;
int rc;
struct timespec tp={0, 10};
@ -363,7 +415,7 @@ static void* rcv_processing_thread(opal_object_t *obj)
while (1) {
/* block here until a trigger arrives */
if (0 > (rc = opal_fd_read(orte_rmcast_base.recv_pipe[0],
sizeof(opal_buffer_t*), &msg))) {
sizeof(orte_rmcast_msg_t*), &msg))) {
/* if something bad happened, punt */
opal_output(0, "%s PUNTING THREAD", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
@ -384,38 +436,7 @@ static void* rcv_processing_thread(opal_object_t *obj)
}
/* process it - processing function releases the msg */
if (ORTE_SUCCESS != (rc = orte_rmcast_base_process_msg(msg))) {
ORTE_ERROR_LOG(rc);
}
}
}
static void* rcv_progress_thread(opal_object_t *obj)
{
struct timespec tp={0, 10};
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: recv thread operational",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
orte_rmcast_base.recv_ctl.running = true;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
while (1) {
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
if (orte_rmcast_base.recv_ctl.stop) {
orte_rmcast_base.recv_ctl.running = false;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
/* give a little delay to ensure the main thread gets into
* opal_thread_join before we exit
*/
nanosleep(&tp, NULL);
return OPAL_THREAD_CANCELLED;
}
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
/* block in the event lib */
opal_event_loop(orte_rmcast_base.event_base, OPAL_EVLOOP_ONCE);
orte_rmcast.process_msg(msg);
}
}
@ -423,11 +444,13 @@ static int extract_hdr(opal_buffer_t *buf,
orte_process_name_t *name,
orte_rmcast_channel_t *channel,
orte_rmcast_tag_t *tag,
bool *restart,
orte_rmcast_seq_t *seq_num)
{
int rc;
int32_t n;
uint8_t flag;
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, name, &n, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
@ -446,6 +469,17 @@ static int extract_hdr(opal_buffer_t *buf,
return rc;
}
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &flag, &n, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (flag) {
*restart = true;
} else {
*restart = false;
}
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, seq_num, &n, ORTE_RMCAST_SEQ_T))) {
ORTE_ERROR_LOG(rc);

Просмотреть файл

@ -47,23 +47,6 @@ BEGIN_C_DECLS
/* ******************************************************************** */
/**
* Function prototypes for callback from receiving multicast messages
*/
typedef void (*orte_rmcast_callback_buffer_fn_t)(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata);
typedef void (*orte_rmcast_callback_fn_t)(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
struct iovec *msg, int count, void* cbdata);
/* initialize the selected module */
typedef int (*orte_rmcast_base_module_init_fn_t)(void);
@ -137,16 +120,8 @@ typedef int (*orte_rmcast_base_module_close_channel_fn_t)(orte_rmcast_channel_t
typedef int (*orte_rmcast_base_module_query_channel_fn_t)(orte_rmcast_channel_t *output,
orte_rmcast_channel_t *input);
/* disable comm - includes terminating all threads. This is
* required for clean shutdown of codes that use this framework
* as otherwise rmcast can segfault if it is executing a cbfunc
* for a recvd message and the receiver goes away!
*/
typedef void (*orte_rmcast_base_module_disable_comm_fn_t)(void);
/* reverses the effect */
typedef void (*orte_rmcast_base_module_enable_comm_fn_t)(void);
/* process a recvd message */
typedef void (*orte_rmcast_base_module_process_msg_fn_t)(orte_rmcast_msg_t *msg);
/*
* rmcast component
@ -162,6 +137,16 @@ typedef struct orte_rmcast_base_component_1_0_0_t orte_rmcast_base_component_1_0
/** Convenience typedef */
typedef orte_rmcast_base_component_1_0_0_t orte_rmcast_base_component_t;
/* disable comm - includes terminating all threads. This is
* required for clean shutdown of codes that use this framework
* as otherwise rmcast can segfault if it is executing a cbfunc
* for a recvd message and the receiver goes away!
*/
typedef void (*orte_rmcast_base_module_disable_comm_fn_t)(void);
/* reverses the effect */
typedef void (*orte_rmcast_base_module_enable_comm_fn_t)(void);
/*
* Component modules Ver 1.0
*/
@ -182,6 +167,7 @@ struct orte_rmcast_base_module_t {
orte_rmcast_base_module_query_channel_fn_t query_channel;
orte_rmcast_base_module_enable_comm_fn_t enable_comm;
orte_rmcast_base_module_disable_comm_fn_t disable_comm;
orte_rmcast_base_module_process_msg_fn_t process_msg;
};
/** Convienence typedef */
typedef struct orte_rmcast_base_module_t orte_rmcast_module_t;

Просмотреть файл

@ -18,24 +18,38 @@
#include "orte/constants.h"
#include "orte/types.h"
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#include "opal/dss/dss_types.h"
BEGIN_C_DECLS
/* Data structure for passing messages to recv processing */
typedef struct {
opal_object_t super;
orte_process_name_t sender;
opal_buffer_t *buf;
} orte_rmcast_msg_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_rmcast_msg_t);
/* channel type */
typedef uint32_t orte_rmcast_channel_t;
#define ORTE_RMCAST_CHANNEL_T OPAL_UINT32
/* ORTE IP multicast channels */
#define ORTE_RMCAST_GROUP_INPUT_CHANNEL 0
#define ORTE_RMCAST_GROUP_OUTPUT_CHANNEL 1
#define ORTE_RMCAST_WILDCARD_CHANNEL 2
#define ORTE_RMCAST_INVALID_CHANNEL 3
#define ORTE_RMCAST_SYS_CHANNEL 4
#define ORTE_RMCAST_APP_PUBLIC_CHANNEL 5
#define ORTE_RMCAST_DATA_SERVER_CHANNEL 6
#define ORTE_RMCAST_ERROR_CHANNEL 7
#define ORTE_RMCAST_DIRECT_CHANNEL 1
#define ORTE_RMCAST_GROUP_OUTPUT_CHANNEL 2
#define ORTE_RMCAST_WILDCARD_CHANNEL 3
#define ORTE_RMCAST_INVALID_CHANNEL 4
#define ORTE_RMCAST_SYS_CHANNEL 5
#define ORTE_RMCAST_APP_PUBLIC_CHANNEL 6
#define ORTE_RMCAST_DATA_SERVER_CHANNEL 7
#define ORTE_RMCAST_ERROR_CHANNEL 8
#define ORTE_RMCAST_DYNAMIC_CHANNELS 8
#define ORTE_RMCAST_DYNAMIC_CHANNELS 9
/* define channel directions */
@ -48,21 +62,21 @@ typedef int32_t orte_rmcast_tag_t;
#define ORTE_RMCAST_TAG_T OPAL_INT32
/* tag values for well-known services */
#define ORTE_RMCAST_TAG_WILDCARD 0
#define ORTE_RMCAST_TAG_INVALID 1
#define ORTE_RMCAST_TAG_BOOTSTRAP 2
#define ORTE_RMCAST_TAG_ANNOUNCE 3
#define ORTE_RMCAST_TAG_OUTPUT 4
#define ORTE_RMCAST_TAG_PS 5
#define ORTE_RMCAST_TAG_MSG 6
#define ORTE_RMCAST_TAG_TOOL 7
#define ORTE_RMCAST_TAG_IOF 8
#define ORTE_RMCAST_TAG_DATA 9
#define ORTE_RMCAST_TAG_CMD_ACK 10
#define ORTE_RMCAST_TAG_HEARTBEAT 11
#define ORTE_RMCAST_TAG_COMMAND 12
#define ORTE_RMCAST_TAG_ERRMGR 13
#define ORTE_RMCAST_TAG_WILDCARD 0
#define ORTE_RMCAST_TAG_INVALID 1
#define ORTE_RMCAST_TAG_BOOTSTRAP 2
#define ORTE_RMCAST_TAG_ANNOUNCE 3
#define ORTE_RMCAST_TAG_OUTPUT 4
#define ORTE_RMCAST_TAG_PS 5
#define ORTE_RMCAST_TAG_MSG 6
#define ORTE_RMCAST_TAG_TOOL 7
#define ORTE_RMCAST_TAG_IOF 8
#define ORTE_RMCAST_TAG_DATA 9
#define ORTE_RMCAST_TAG_CMD_ACK 10
#define ORTE_RMCAST_TAG_HEARTBEAT 11
#define ORTE_RMCAST_TAG_COMMAND 12
#define ORTE_RMCAST_TAG_ERRMGR 13
#define ORTE_RMCAST_TAG_UPDATE_STATE 14
/* starting value for dynamically assignable tags */
#define ORTE_RMCAST_TAG_DYNAMIC 100
@ -74,10 +88,28 @@ typedef uint8_t orte_rmcast_flag_t;
#define ORTE_RMCAST_PERSISTENT 0x01
/* message sequence number */
typedef size_t orte_rmcast_seq_t;
#define ORTE_RMCAST_SEQ_MAX SIZE_MAX-1
#define ORTE_RMCAST_SEQ_INVALID SIZE_MAX
#define ORTE_RMCAST_SEQ_T OPAL_SIZE
typedef int32_t orte_rmcast_seq_t;
#define ORTE_RMCAST_SEQ_MAX INT32_MAX
#define ORTE_RMCAST_SEQ_INVALID -1
#define ORTE_RMCAST_SEQ_T OPAL_INT32
/**
* Function prototypes for callback from receiving multicast messages
*/
typedef void (*orte_rmcast_callback_buffer_fn_t)(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata);
typedef void (*orte_rmcast_callback_fn_t)(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
struct iovec *msg, int count, void* cbdata);
END_C_DECLS

Просмотреть файл

@ -36,6 +36,7 @@
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/threads/threads.h"
#include "orte/mca/rmcast/base/private.h"
#include "orte/mca/rmcast/base/base.h"
@ -45,17 +46,13 @@
static bool init_completed = false;
static orte_job_t *daemons=NULL;
static bool comm_enabled = false;
static orte_thread_ctl_t ctl;
/* LOCAL FUNCTIONS */
static void recv_handler(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void relay_handler(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void relay(int fd, short event, void *cbdata);
static int send_data(rmcast_base_send_t *snd, orte_rmcast_channel_t channel);
/* API FUNCTIONS */
@ -114,6 +111,8 @@ static void enable_comm(void);
static void disable_comm(void);
static void process_msg(orte_rmcast_msg_t *msg);
/* Define the module */
orte_rmcast_module_t orte_rmcast_tcp_module = {
@ -132,7 +131,8 @@ orte_rmcast_module_t orte_rmcast_tcp_module = {
orte_rmcast_base_close_channel,
orte_rmcast_base_query,
enable_comm,
disable_comm
disable_comm,
process_msg
};
/* during init, we setup two channels for both xmit and recv:
@ -167,6 +167,9 @@ static int init(void)
"%s rmcast:tcp: init called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup local ctl */
OBJ_CONSTRUCT(&ctl, orte_thread_ctl_t);
/* setup the respective public address channel */
if (ORTE_PROC_IS_TOOL) {
/* tools only open the sys channel */
@ -197,15 +200,6 @@ static int init(void)
ORTE_ERROR_LOG(rc);
return rc;
}
/* activate a recv to catch relays */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_MULTICAST_RELAY,
ORTE_RML_PERSISTENT,
relay_handler,
NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else if (ORTE_PROC_IS_APP) {
/* apps open the app public and data server channels */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce",
@ -259,7 +253,7 @@ static int init(void)
}
/* start the processing thread */
if (ORTE_SUCCESS != (rc = orte_rmcast_base_start_threads(false, true))) {
if (ORTE_SUCCESS != (rc = orte_rmcast_base_start_threads())) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -288,52 +282,28 @@ static void finalize(void)
comm_enabled = false;
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MULTICAST);
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) {
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MULTICAST_RELAY);
}
/* stop the processing thread */
orte_rmcast_base_stop_threads();
init_completed = false;
OBJ_DESTRUCT(&ctl);
return;
}
static void enable_comm(void)
{
orte_rmcast_base_start_threads(false, true);
ORTE_ACQUIRE_THREAD(&ctl);
orte_rmcast_base_start_threads();
comm_enabled = true;
ORTE_RELEASE_THREAD(&ctl);
}
static void disable_comm(void)
{
ORTE_ACQUIRE_THREAD(&ctl);
comm_enabled = false;
orte_rmcast_base_stop_threads();
}
/* internal blocking send support */
static void internal_snd_cb(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
struct iovec *msg, int count, void *cbdata)
{
rmcast_base_send_t *snd = (rmcast_base_send_t*)cbdata;
ORTE_WAKEUP_THREAD(&snd->ctl);
}
static void internal_snd_buf_cb(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void *cbdata)
{
rmcast_base_send_t *snd = (rmcast_base_send_t*)cbdata;
ORTE_WAKEUP_THREAD(&snd->ctl);
ORTE_RELEASE_THREAD(&ctl);
}
static int send_data(rmcast_base_send_t *snd,
@ -361,6 +331,7 @@ static int send_data(rmcast_base_send_t *snd,
/* setup the message for xmission */
if (ORTE_SUCCESS != (rc = orte_rmcast_base_queue_xmit(snd, channel, &buf, &ch))) {
ORTE_ERROR_LOG(rc);
ORTE_RELEASE_THREAD(&ctl);
return rc;
}
@ -369,31 +340,52 @@ static int send_data(rmcast_base_send_t *snd,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buf->bytes_used,
(int)ch->channel, (int)snd->tag));
if (ORTE_PROC_IS_HNP) {
/* if we don't already have it, get the daemon object */
if (NULL == daemons) {
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
}
/* send it to each daemon */
for (v=1; v < daemons->procs->size; v++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, v))) {
continue;
}
if (NULL == proc->rml_uri) {
/* not ready yet - don't know contact info */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp dont have path to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
continue;
}
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) {
/* if we are a daemon, then we have to send it to the HNP
* for relay to all other daemons - we cannot send it
* ourselves as, at startup, we won't know who else is
* out there until -after- a startup handshake is
* exchanged via multicast
*/
if (ORTE_PROC_IS_DAEMON) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp sending to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
if (0 > (rc = orte_rml.send_buffer(&proc->name, buf, ORTE_RML_TAG_MULTICAST, 0))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
/* ignore errors */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_MULTICAST, 0))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) {
ORTE_ERROR_LOG(rc);
}
}
} else {
/* if we don't already have it, get the daemon object */
if (NULL == daemons) {
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
}
/* send it to each daemon other than myself */
for (v=1; v < daemons->procs->size; v++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, v))) {
continue;
}
if (NULL == proc->rml_uri) {
/* not ready yet - don't know contact info */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp dont have path to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
continue;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp sending to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
/* ignore errors */
if (0 > (rc = orte_rml.send_buffer(&proc->name, buf, ORTE_RML_TAG_MULTICAST, 0))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) {
ORTE_ERROR_LOG(rc);
}
}
}
}
/* send the message to my children */
@ -401,48 +393,34 @@ static int send_data(rmcast_base_send_t *snd,
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (!child->alive) {
continue;
}
if (NULL == child->rml_uri) {
/* race condition - hasn't reported in yet */
continue;
}
/* ignore errors */
if (0 > (rc = orte_rml.send_buffer(child->name, buf, ORTE_RML_TAG_MULTICAST, 0))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) {
ORTE_ERROR_LOG(rc);
}
}
}
rc = ORTE_SUCCESS;
} else {
/* if I am a daemon, I need to relay this to my children first */
if (ORTE_PROC_IS_DAEMON) {
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (NULL == child->rml_uri) {
/* race condition */
continue;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s relaying multicast to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
if (0 > (rc = orte_rml.send_buffer(child->name, buf, ORTE_RML_TAG_MULTICAST, 0))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
}
/* send it to the HNP */
/* I am a tool or an app - send it to my HNP for relay */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp sending multicast to HNP %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_MULTICAST_RELAY, 0))) {
ORTE_ERROR_LOG(rc);
/* didn't get the message out */
opal_output(0, "%s failed to send message to multicast channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)ch->channel);
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_MULTICAST, 0))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) {
ORTE_ERROR_LOG(rc);
/* didn't get the message out */
opal_output(0, "%s failed to send message to multicast channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)ch->channel);
}
goto cleanup;
}
rc = ORTE_SUCCESS;
@ -464,9 +442,6 @@ static int send_data(rmcast_base_send_t *snd,
}
}
/* roll to next message sequence number */
ORTE_MULTICAST_NEXT_SEQUENCE_NUM(ch->seq_num);
cleanup:
OBJ_RELEASE(buf);
@ -474,13 +449,16 @@ static int send_data(rmcast_base_send_t *snd,
}
static int tcp_send(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
struct iovec *msg, int count)
orte_rmcast_tag_t tag,
struct iovec *msg, int count)
{
rmcast_base_send_t snd;
int ret;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
@ -489,32 +467,33 @@ static int tcp_send(orte_rmcast_channel_t channel,
snd.iovec_array = msg;
snd.iovec_count = count;
snd.tag = tag;
snd.cbfunc_iovec = internal_snd_cb;
snd.cbdata = &snd;
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&snd);
return ret;
}
/* now wait for the send to complete */
ORTE_ACQUIRE_THREAD(&snd.ctl);
/* carefully cleanup */
snd.iovec_array = NULL;
snd.iovec_count = 0;
OBJ_DESTRUCT(&snd);
return ORTE_SUCCESS;
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
static int tcp_send_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
struct iovec *msg, int count,
orte_rmcast_callback_fn_t cbfunc,
void *cbdata)
orte_rmcast_tag_t tag,
struct iovec *msg, int count,
orte_rmcast_callback_fn_t cbfunc,
void *cbdata)
{
int ret;
rmcast_base_send_t snd;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
@ -528,22 +507,28 @@ static int tcp_send_nb(orte_rmcast_channel_t channel,
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&snd);
return ret;
}
/* carefully cleanup */
snd.iovec_array = NULL;
snd.iovec_count = 0;
OBJ_DESTRUCT(&snd);
return ORTE_SUCCESS;
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
static int tcp_send_buffer(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
opal_buffer_t *buf)
orte_rmcast_tag_t tag,
opal_buffer_t *buf)
{
int ret;
rmcast_base_send_t snd;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
@ -551,32 +536,32 @@ static int tcp_send_buffer(orte_rmcast_channel_t channel,
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.buf = buf;
snd.tag = tag;
snd.cbfunc_buffer = internal_snd_buf_cb;
snd.cbdata = &snd;
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&snd);
return ret;
}
/* now wait for the send to complete */
ORTE_ACQUIRE_THREAD(&snd.ctl);
/* carefully cleanup */
snd.buf = NULL;
OBJ_DESTRUCT(&snd);
ORTE_RELEASE_THREAD(&ctl);
return ORTE_SUCCESS;
}
static int tcp_send_buffer_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
opal_buffer_t *buf,
orte_rmcast_callback_buffer_fn_t cbfunc,
void *cbdata)
orte_rmcast_tag_t tag,
opal_buffer_t *buf,
orte_rmcast_callback_buffer_fn_t cbfunc,
void *cbdata)
{
int ret;
rmcast_base_send_t snd;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
@ -589,12 +574,14 @@ static int tcp_send_buffer_nb(orte_rmcast_channel_t channel,
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&snd);
return ret;
}
/* carefully cleanup */
snd.buf = NULL;
OBJ_DESTRUCT(&snd);
return ORTE_SUCCESS;
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
static int tcp_recv(orte_process_name_t *name,
@ -607,7 +594,10 @@ static int tcp_recv(orte_process_name_t *name,
int ret;
orte_rmcast_channel_t chan;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
@ -625,6 +615,7 @@ static int tcp_recv(orte_process_name_t *name,
ORTE_ERROR_LOG(ret);
return ret;
}
ORTE_RELEASE_THREAD(&ctl);
ORTE_ACQUIRE_THREAD(&recvptr->ctl);
@ -638,7 +629,7 @@ static int tcp_recv(orte_process_name_t *name,
*msg = recvptr->iovec_array;
*count = recvptr->iovec_count;
/* release the recv */
/* remove the recv */
recvptr->iovec_array = NULL;
recvptr->iovec_count = 0;
OBJ_RELEASE(recvptr);
@ -658,6 +649,8 @@ static int tcp_recv_nb(orte_rmcast_channel_t channel,
"%s rmcast:tcp: recv_nb called on channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel));
ORTE_ACQUIRE_THREAD(&ctl);
if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_input_channel->channel;
} else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
@ -675,6 +668,7 @@ static int tcp_recv_nb(orte_rmcast_channel_t channel,
}
}
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
@ -688,7 +682,10 @@ static int tcp_recv_buffer(orte_process_name_t *name,
int ret;
orte_rmcast_channel_t chan;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
@ -708,8 +705,10 @@ static int tcp_recv_buffer(orte_process_name_t *name,
ORTE_RMCAST_NON_PERSISTENT,
NULL, NULL, NULL, true))) {
ORTE_ERROR_LOG(ret);
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
ORTE_RELEASE_THREAD(&ctl);
ORTE_ACQUIRE_THREAD(&recvptr->ctl);
@ -741,6 +740,8 @@ static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel,
"%s rmcast:tcp: recv_buffer_nb called on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
ORTE_ACQUIRE_THREAD(&ctl);
if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_input_channel->channel;
} else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
@ -751,13 +752,15 @@ static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel,
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(NULL, chan, tag, flags,
NULL, cbfunc, cbdata, false))) {
if (ORTE_EXISTS != ret) {
if (ORTE_EXISTS == ret) {
ret = ORTE_SUCCESS;
} else {
ORTE_ERROR_LOG(ret);
return ret;
}
}
return ORTE_SUCCESS;
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
/* for the tcp module, we will be using the RML to "fake" a
@ -825,14 +828,95 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
}
/**** LOCAL FUNCTIONS ****/
static void process_msg(orte_rmcast_msg_t *msg)
{
int rc;
opal_list_item_t *item;
int v;
orte_proc_t *proc;
orte_odls_child_t *child;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp processing message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->sender)));
if (ORTE_PROC_IS_HNP) {
/* if we don't already have it, get the daemon object */
if (NULL == daemons) {
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
}
/* relay msg to each daemon excluding myself and whomever sent this to me */
for (v=1; v < daemons->procs->size; v++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, v))) {
continue;
}
if (NULL == proc->rml_uri) {
/* not ready yet - don't know contact info */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp dont have path to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
continue;
}
if (msg->sender.jobid == proc->name.jobid &&
msg->sender.vpid == proc->name.vpid) {
continue;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp relaying msg to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
/* ignore errors */
if (0 > (rc = orte_rml.send_buffer(&proc->name, msg->buf, ORTE_RML_TAG_MULTICAST, 0))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) {
ORTE_ERROR_LOG(rc);
}
ORTE_ERROR_LOG(rc);
}
}
}
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) {
/* need to relay this to my children */
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (!child->alive) {
continue;
}
if (NULL == child->rml_uri) {
/* race condition */
continue;
}
if (msg->sender.jobid == child->name->jobid &&
msg->sender.vpid == child->name->vpid) {
continue;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s relaying multicast to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
/* ignore errors */
if (0 > (rc = orte_rml.send_buffer(child->name, msg->buf, ORTE_RML_TAG_MULTICAST, 0))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) {
ORTE_ERROR_LOG(rc);
}
}
}
}
/* now process it myself - this releases the msg */
orte_rmcast_base_process_msg(msg);
}
/**** LOCAL FUNCTIONS ****/
static void recv_handler(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
uint8_t *data;
int32_t siz;
if (!comm_enabled) {
return;
}
@ -842,105 +926,7 @@ static void recv_handler(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* clear the way for the next message */
opal_dss.unload(buffer, (void**)&data, &siz);
ORTE_MULTICAST_MESSAGE_EVENT(data, siz);
return;
}
static void relay(int fd, short event, void *cbdata)
{
orte_message_event_t *msg = (orte_message_event_t*)cbdata;
orte_proc_t *proc;
opal_list_item_t *item;
orte_odls_child_t *child;
int rc, v;
uint8_t *data;
int32_t siz;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp relaying multicast msg from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->sender)));
if (ORTE_PROC_IS_HNP) {
/* if we don't already have it, get the daemon object */
if (NULL == daemons) {
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
}
/* send it to each daemon other than the one that sent it to me */
for (v=1; v < daemons->procs->size; v++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, v))) {
continue;
}
/* if this message came from a daemon, then we don't want
* to send it back to the same one as it will enter an
* infinite loop
*/
if (ORTE_PROC_MY_NAME->jobid == msg->sender.jobid &&
proc->name.vpid == msg->sender.vpid) {
continue;
}
if (NULL == proc->rml_uri) {
/* race condition */
continue;
}
if (0 > (rc = orte_rml.send_buffer(&proc->name, msg->buffer, ORTE_RML_TAG_MULTICAST_RELAY, 0))) {
ORTE_ERROR_LOG(rc);
}
}
}
/* send the message to my children */
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (!child->alive) {
continue;
}
if (NULL == child->rml_uri) {
/* race condition */
OPAL_OUTPUT_VERBOSE((7, orte_rmcast_base.rmcast_output,
"%s child %s has not checked in",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
continue;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s relaying multicast msg from %s to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->sender),
ORTE_NAME_PRINT(child->name)));
if (0 > (rc = orte_rml.send_buffer(child->name, msg->buffer, ORTE_RML_TAG_MULTICAST, 0))) {
ORTE_ERROR_LOG(rc);
}
}
/* now process it myself */
opal_dss.unload(msg->buffer, (void**)&data, &siz);
ORTE_MULTICAST_MESSAGE_EVENT(data, siz);
/* protect the buffer */
OBJ_RELEASE(msg);
}
static void relay_handler(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
/* if the message is from myself, ignore it */
if (sender->jobid == ORTE_PROC_MY_NAME->jobid &&
sender->vpid == ORTE_PROC_MY_NAME->vpid) {
return;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp relay multicast msg from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* clear the way for the next message */
ORTE_MESSAGE_EVENT(sender, buffer, tag, relay);
ORTE_MULTICAST_MESSAGE_EVENT(sender, buffer);
return;
}

Просмотреть файл

@ -20,19 +20,21 @@
#include <fcntl.h>
#include "opal/class/opal_list.h"
#include "opal/class/opal_ring_buffer.h"
#include "opal/opal_socket_errno.h"
#include "opal/util/output.h"
#include "opal/util/argv.h"
#include "opal/util/if.h"
#include "opal/util/net.h"
#include "opal/dss/dss.h"
#include "opal/mca/event/event.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml.h"
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"
#include "orte/threads/threads.h"
#include "orte/mca/rmcast/base/private.h"
#include "orte/mca/rmcast/base/base.h"
@ -40,8 +42,8 @@
/* LOCAL DATA */
static bool init_completed = false;
static opal_pointer_array_t msg_log;
static bool comm_enabled = false;
static orte_thread_ctl_t ctl;
/* LOCAL FUNCTIONS */
static void recv_handler(int sd, short flags, void* user);
@ -52,6 +54,14 @@ static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket);
static int send_data(rmcast_base_send_t *snd, orte_rmcast_channel_t channel);
static void resend_data(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void missed_msg(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
/* API FUNCTIONS */
static int init(void);
@ -126,7 +136,8 @@ orte_rmcast_module_t orte_rmcast_udp_module = {
orte_rmcast_base_close_channel,
orte_rmcast_base_query,
enable_comm,
disable_comm
disable_comm,
orte_rmcast_base_process_msg
};
/* during init, we setup two channels for both xmit and recv:
@ -159,9 +170,11 @@ static int init(void)
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:udp: init called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup the globals */
OBJ_CONSTRUCT(&msg_log, opal_pointer_array_t);
opal_pointer_array_init(&msg_log, 8, INT_MAX, 8);
/* setup local ctl */
OBJ_CONSTRUCT(&ctl, orte_thread_ctl_t);
/* flag that we are unreliable and need help */
orte_rmcast_base.unreliable_xport = true;
/* setup the respective public address channel */
if (ORTE_PROC_IS_TOOL) {
@ -233,111 +246,103 @@ static int init(void)
return ORTE_ERR_SILENT;
}
/* start the recv threads */
if (ORTE_SUCCESS != (rc = orte_rmcast_base_start_threads(true, true))) {
/* setup the recv for missed message replacement */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_MISSED_MSG,
ORTE_RML_PERSISTENT,
resend_data,
NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_MULTICAST,
ORTE_RML_PERSISTENT,
missed_msg,
NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* start the recv threads */
if (ORTE_SUCCESS != (rc = orte_rmcast_base_start_threads())) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&msg_log);
return rc;
}
init_completed = true;
comm_enabled = true;
return ORTE_SUCCESS;
}
static void finalize(void)
{
rmcast_recv_log_t *log;
int j;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:udp: finalize called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* stop the chatter */
comm_enabled = false;
/* stop the missed msg recv */
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MISSED_MSG);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MULTICAST);
/* stop the threads */
orte_rmcast_base_stop_threads();
for (j=0; j < msg_log.size; j++) {
if (NULL != (log = opal_pointer_array_get_item(&msg_log, j))) {
OBJ_RELEASE(log);
}
}
OBJ_DESTRUCT(&msg_log);
OBJ_DESTRUCT(&ctl);
init_completed = false;
return;
}
static void enable_comm(void)
{
orte_rmcast_base_start_threads(true, true);
ORTE_ACQUIRE_THREAD(&ctl);
orte_rmcast_base_start_threads();
comm_enabled = true;
ORTE_RELEASE_THREAD(&ctl);
}
static void disable_comm(void)
{
ORTE_ACQUIRE_THREAD(&ctl);
comm_enabled = false;
orte_rmcast_base_stop_threads();
}
/* internal blocking send support */
static void internal_snd_cb(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
struct iovec *msg, int count, void *cbdata)
{
rmcast_base_send_t *snd = (rmcast_base_send_t*)cbdata;
ORTE_WAKEUP_THREAD(&snd->ctl);
}
static void internal_snd_buf_cb(int status,
orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void *cbdata)
{
rmcast_base_send_t *snd = (rmcast_base_send_t*)cbdata;
ORTE_WAKEUP_THREAD(&snd->ctl);
ORTE_RELEASE_THREAD(&ctl);
}
static int udp_send(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
struct iovec *msg, int count)
{
rmcast_base_send_t *snd;
rmcast_base_send_t snd;
int ret;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
snd = OBJ_NEW(rmcast_base_send_t);
snd->iovec_array = msg;
snd->iovec_count = count;
snd->tag = tag;
snd->cbfunc_iovec = internal_snd_cb;
snd->cbdata = snd;
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.iovec_array = msg;
snd.iovec_count = count;
snd.tag = tag;
if (ORTE_SUCCESS != (ret = send_data(snd, channel))) {
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
ORTE_ERROR_LOG(ret);
return ret;
}
/* now wait for the send to complete */
ORTE_ACQUIRE_THREAD(&snd->ctl);
/* carefully release the send */
snd->iovec_array = NULL;
snd->iovec_count = 0;
OBJ_RELEASE(snd);
return ORTE_SUCCESS;
snd.iovec_array = NULL;
snd.iovec_count = 0;
OBJ_DESTRUCT(&snd);
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
static int udp_send_nb(orte_rmcast_channel_t channel,
@ -347,26 +352,34 @@ static int udp_send_nb(orte_rmcast_channel_t channel,
void *cbdata)
{
int ret;
rmcast_base_send_t *snd;
rmcast_base_send_t snd;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
snd = OBJ_NEW(rmcast_base_send_t);
snd->iovec_array = msg;
snd->iovec_count = count;
snd->tag = tag;
snd->cbfunc_iovec = cbfunc;
snd->cbdata = cbdata;
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.iovec_array = msg;
snd.iovec_count = count;
snd.tag = tag;
snd.cbfunc_iovec = cbfunc;
snd.cbdata = cbdata;
if (ORTE_SUCCESS != (ret = send_data(snd, channel))) {
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
ORTE_ERROR_LOG(ret);
return ret;
}
return ORTE_SUCCESS;
/* carefully release the send */
snd.iovec_array = NULL;
snd.iovec_count = 0;
OBJ_DESTRUCT(&snd);
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
static int udp_send_buffer(orte_rmcast_channel_t channel,
@ -374,33 +387,30 @@ static int udp_send_buffer(orte_rmcast_channel_t channel,
opal_buffer_t *buf)
{
int ret;
rmcast_base_send_t *snd;
rmcast_base_send_t snd;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
snd = OBJ_NEW(rmcast_base_send_t);
snd->buf = buf;
snd->tag = tag;
snd->cbfunc_buffer = internal_snd_buf_cb;
snd->cbdata = snd;
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.buf = buf;
snd.tag = tag;
if (ORTE_SUCCESS != (ret = send_data(snd, channel))) {
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(snd);
return ret;
}
/* now wait for the send to complete */
ORTE_ACQUIRE_THREAD(&snd->ctl);
/* carefully release the send */
snd->buf = NULL;
OBJ_RELEASE(snd);
return ORTE_SUCCESS;
snd.buf = NULL;
OBJ_DESTRUCT(&snd);
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
static int udp_send_buffer_nb(orte_rmcast_channel_t channel,
@ -410,26 +420,32 @@ static int udp_send_buffer_nb(orte_rmcast_channel_t channel,
void *cbdata)
{
int ret;
rmcast_base_send_t *snd;
rmcast_base_send_t snd;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
snd = OBJ_NEW(rmcast_base_send_t);
snd->buf = buf;
snd->tag = tag;
snd->cbfunc_buffer = cbfunc;
snd->cbdata = cbdata;
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.buf = buf;
snd.tag = tag;
snd.cbfunc_buffer = cbfunc;
snd.cbdata = cbdata;
if (ORTE_SUCCESS != (ret = send_data(snd, channel))) {
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(snd);
return ret;
}
return ORTE_SUCCESS;
/* carefully release the send */
snd.buf = NULL;
OBJ_DESTRUCT(&snd);
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
static int udp_recv(orte_process_name_t *name,
@ -442,7 +458,10 @@ static int udp_recv(orte_process_name_t *name,
int ret;
orte_rmcast_channel_t chan;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
@ -458,8 +477,10 @@ static int udp_recv(orte_process_name_t *name,
ORTE_RMCAST_NON_PERSISTENT,
NULL, NULL, NULL, true))) {
ORTE_ERROR_LOG(ret);
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
ORTE_RELEASE_THREAD(&ctl);
ORTE_ACQUIRE_THREAD(&recvptr->ctl);
@ -493,6 +514,8 @@ static int udp_recv_nb(orte_rmcast_channel_t channel,
"%s rmcast:udp: recv_nb called on channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel));
ORTE_ACQUIRE_THREAD(&ctl);
if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_input_channel->channel;
} else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
@ -505,11 +528,11 @@ static int udp_recv_nb(orte_rmcast_channel_t channel,
cbfunc, NULL, cbdata, false))) {
if (ORTE_EXISTS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
}
return ORTE_SUCCESS;
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
static int udp_recv_buffer(orte_process_name_t *name,
@ -522,7 +545,10 @@ static int udp_recv_buffer(orte_process_name_t *name,
int ret;
orte_rmcast_channel_t chan;
ORTE_ACQUIRE_THREAD(&ctl);
if (!comm_enabled) {
ORTE_RELEASE_THREAD(&ctl);
return ORTE_ERR_COMM_DISABLED;
}
@ -542,8 +568,10 @@ static int udp_recv_buffer(orte_process_name_t *name,
ORTE_RMCAST_NON_PERSISTENT,
NULL, NULL, NULL, true))) {
ORTE_ERROR_LOG(ret);
ORTE_RELEASE_THREAD(&ctl);
return ret;
}
ORTE_RELEASE_THREAD(&ctl);
ORTE_ACQUIRE_THREAD(&recvptr->ctl);
@ -575,6 +603,8 @@ static int udp_recv_buffer_nb(orte_rmcast_channel_t channel,
"%s rmcast:udp: recv_buffer_nb called on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
ORTE_ACQUIRE_THREAD(&ctl);
if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_input_channel->channel;
} else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
@ -585,13 +615,15 @@ static int udp_recv_buffer_nb(orte_rmcast_channel_t channel,
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(NULL, chan, tag, flags,
NULL, cbfunc, cbdata, false))) {
if (ORTE_EXISTS != ret) {
if (ORTE_EXISTS == ret) {
ret = ORTE_SUCCESS;
} else {
ORTE_ERROR_LOG(ret);
return ret;
}
}
ORTE_RELEASE_THREAD(&ctl);
return ORTE_SUCCESS;
return ret;
}
static int open_channel(orte_rmcast_channel_t channel, char *name,
@ -663,9 +695,11 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) {
ORTE_ERROR_LOG(rc);
ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
return rc;
}
ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
return rc;
return ORTE_SUCCESS;
}
/* we didn't find an existing match, so create a new channel */
@ -715,23 +749,23 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
}
/**** LOCAL FUNCTIONS ****/
static void recv_handler(int sd, short flags, void* cbdata)
{
uint8_t *data;
ssize_t siz;
ssize_t sz;
rmcast_base_channel_t *chan = (rmcast_base_channel_t*)cbdata;
opal_buffer_t buf;
/* read the data */
data = (uint8_t*)malloc(orte_rmcast_udp_sndbuf_size * sizeof(uint8_t));
siz = read(sd, data, orte_rmcast_udp_sndbuf_size);
sz = read(sd, data, orte_rmcast_udp_sndbuf_size);
if (!comm_enabled) {
free(data);
return;
}
if (siz <= 0) {
if (sz <= 0) {
/* this shouldn't happen - report the errno */
opal_output(0, "%s Error on multicast recv socket event: %s(%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(errno), errno);
@ -741,13 +775,24 @@ static void recv_handler(int sd, short flags, void* cbdata)
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:udp recvd %d bytes from channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)siz, (int)chan->channel));
(int)sz, (int)chan->channel));
/* clear the way for the next message */
ORTE_MULTICAST_MESSAGE_EVENT(data, siz);
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, (void*)data, sz);
ORTE_MULTICAST_MESSAGE_EVENT(ORTE_NAME_INVALID, &buf);
OBJ_DESTRUCT(&buf);
return;
}
static void missed_msg(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
opal_output(0, "%s RECVD MISSING MESSAGE", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_MULTICAST_MESSAGE_EVENT(sender, buffer);
}
static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction)
{
int rc;
@ -781,7 +826,6 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction)
return rc;
}
chan->xmit = xmitsd;
chan->send_data = (uint8_t*)malloc(orte_rmcast_udp_sndbuf_size);
}
if (0 > chan->recv && ORTE_RMCAST_RECV & direction) {
@ -797,9 +841,8 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction)
"%s setup:channel activating recv event on fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),(int)chan->recv));
opal_event_set(orte_rmcast_base.event_base, &chan->recv_ev,
chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan);
OPAL_UPDATE_EVBASE(orte_rmcast_base.event_base, &chan->recv_ev, OPAL_EVENT_ADD);
opal_event_set(opal_event_base, &chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan);
opal_event_add(&chan->recv_ev, 0);
}
return ORTE_SUCCESS;
@ -946,11 +989,12 @@ static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket)
static int send_data(rmcast_base_send_t *snd, orte_rmcast_channel_t channel)
{
char *bytes;
char *bytes=NULL;
int32_t sz;
int rc;
opal_buffer_t *buf;
rmcast_base_channel_t *ch;
opal_buffer_t *buf=NULL;
rmcast_base_channel_t *chan;
rmcast_send_log_t *log, *lg;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
@ -961,70 +1005,131 @@ static int send_data(rmcast_base_send_t *snd, orte_rmcast_channel_t channel)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel));
/* setup the message for xmission */
if (ORTE_SUCCESS != (rc = orte_rmcast_base_queue_xmit(snd, channel, &buf, &ch))) {
if (ORTE_SUCCESS != (rc = orte_rmcast_base_queue_xmit(snd, channel, &buf, &chan))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
#if 0
/* store the working buf in the send ring buffer in case we
* need to retransmit it later
*/
log = OBJ_NEW(rmcast_send_log_t);
log->channel = chan->channel;
log->seq_num = chan->seq_num;
opal_dss.copy_payload(log->buf, buf);
if (NULL != (lg = (rmcast_send_log_t*)opal_ring_buffer_push(&chan->cache, log))) {
/* release the old message */
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s releasing message %d channel %d from log",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
lg->seq_num, lg->channel));
OBJ_RELEASE(lg);
}
#endif
/* store the working buf in the send ring buffer in case we
* need to retransmit it later
*/
log = OBJ_NEW(rmcast_send_log_t);
log->channel = chan->channel;
log->seq_num = chan->seq_num;
opal_dss.copy_payload(log->buf, buf);
if (NULL != (lg = (rmcast_send_log_t*)opal_ring_buffer_push(&chan->cache, log))) {
/* release the old message */
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s releasing message %d channel %d from log",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
lg->seq_num, lg->channel));
OBJ_RELEASE(lg);
}
/* unload the working buf to obtain the payload */
if (ORTE_SUCCESS != (rc = opal_dss.unload(buf, (void**)&bytes, &sz))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* unload the working buf to obtain the payload */
if (ORTE_SUCCESS != (rc = opal_dss.unload(buf, (void**)&bytes, &sz))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* done with the working buf */
OBJ_RELEASE(buf);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:udp multicasting %d bytes to network %03d.%03d.%03d.%03d port %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sz,
OPAL_IF_FORMAT_ADDR(ch->network), (int)ch->port, (int)snd->tag));
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:udp multicasting %d bytes to network %03d.%03d.%03d.%03d port %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sz,
OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port, (int)snd->tag));
if (sz != (rc = sendto(ch->xmit, bytes, sz, 0,
(struct sockaddr *)&(ch->addr), sizeof(struct sockaddr_in)))) {
/* didn't get the message out */
opal_output(0, "%s failed to send message to multicast network %03d.%03d.%03d.%03d on\n\terror %s(%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network),
strerror(errno), errno);
rc = errno;
} else {
rc = ORTE_SUCCESS;
}
if (sz != (rc = sendto(chan->xmit, bytes, sz, 0,
(struct sockaddr *)&(chan->addr), sizeof(struct sockaddr_in)))) {
/* didn't get the message out */
opal_output(0, "%s failed to send message to multicast network %03d.%03d.%03d.%03d on\n\terror %s(%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(chan->network),
strerror(errno), errno);
rc = ORTE_ERR_COMM_FAILURE;
} else {
rc = ORTE_SUCCESS;
}
if (NULL != snd->buf) {
/* call the cbfunc if required */
if (NULL != snd->cbfunc_buffer) {
snd->cbfunc_buffer(rc, ch->channel, ch->seq_num, snd->tag,
ORTE_PROC_MY_NAME, snd->buf, snd->cbdata);
}
} else {
/* call the cbfunc if required */
if (NULL != snd->cbfunc_iovec) {
snd->cbfunc_iovec(rc, ch->channel, ch->seq_num, snd->tag, ORTE_PROC_MY_NAME,
snd->iovec_array, snd->iovec_count, snd->cbdata);
}
if (NULL != snd->buf) {
/* call the cbfunc if required */
if (NULL != snd->cbfunc_buffer) {
snd->cbfunc_buffer(rc, chan->channel, chan->seq_num, snd->tag,
ORTE_PROC_MY_NAME, snd->buf, snd->cbdata);
}
} else {
/* call the cbfunc if required */
if (NULL != snd->cbfunc_iovec) {
snd->cbfunc_iovec(rc, chan->channel, chan->seq_num, snd->tag, ORTE_PROC_MY_NAME,
snd->iovec_array, snd->iovec_count, snd->cbdata);
}
}
/* roll to next message sequence number */
ORTE_MULTICAST_NEXT_SEQUENCE_NUM(ch->seq_num);
CLEANUP:
return rc;
CLEANUP:
if (NULL != buf) {
OBJ_RELEASE(buf);
}
if (NULL != bytes) {
free(bytes);
}
return rc;
}
static void resend_data(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int n, rc;
orte_rmcast_channel_t channel;
orte_rmcast_seq_t start;
rmcast_base_channel_t *ch;
rmcast_send_log_t *log;
/* block any further ops until we complete the missing
* message repair
*/
ORTE_ACQUIRE_THREAD(&ctl);
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &channel, &n, ORTE_RMCAST_CHANNEL_T))) {
ORTE_ERROR_LOG(rc);
goto release;
}
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &start, &n, ORTE_RMCAST_SEQ_T))) {
ORTE_ERROR_LOG(rc);
goto release;
}
OPAL_OUTPUT_VERBOSE((0, orte_rmcast_base.rmcast_output,
"%s request resend data from %s for channel %d start %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), channel, start));
/* get the referenced channel object */
if (NULL == (ch = orte_rmcast_base_get_channel(channel))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
goto release;
}
/* search its ring buffer for the starting message - function
* automatically starts at the oldest message and works up
* from there
*/
for (n=0; n < ch->cache.size; n++) {
log = (rmcast_send_log_t*)opal_ring_buffer_poke(&ch->cache, n);
if (NULL == log ||
log->seq_num <= start) {
continue;
}
OPAL_OUTPUT_VERBOSE((0, orte_rmcast_base.rmcast_output,
"%s resending msg %d to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
log->seq_num, ORTE_NAME_PRINT(sender)));
if (0 > (rc = orte_rml.send_buffer(sender, log->buf, ORTE_RML_TAG_MULTICAST, 0))) {
ORTE_ERROR_LOG(rc);
goto release;
}
}
release:
ORTE_RELEASE_THREAD(&ctl);
}

Просмотреть файл

@ -171,8 +171,8 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_msg_packet_t);
#define ORTE_RML_TAG_MULTICAST 35
/* multicast messages sent direct */
#define ORTE_RML_TAG_MULTICAST_DIRECT 36
/* multicast messages to be relayed */
#define ORTE_RML_TAG_MULTICAST_RELAY 37
/* report a missed msg */
#define ORTE_RML_TAG_MISSED_MSG 37
/* tag for receiving ack of abort msg */
#define ORTE_RML_TAG_ABORT 38

Просмотреть файл

@ -138,6 +138,12 @@ const char *orte_err2str(int errnum)
case ORTE_ERR_UNRECOVERABLE:
retval = "Unrecoverable error";
break;
case ORTE_ERR_NO_APP_SPECIFIED:
retval = "No application specified";
break;
case ORTE_ERR_NO_EXE_SPECIFIED:
retval = "No executable specified";
break;
case ORTE_ERR_COMM_DISABLED:
retval = "Communications have been disabled";
break;