Add orte-level thread support to avoid some of the opal_if_threads protection used solely for ompi.
Use threads to help process multicast messages. This commit was SVN r24009.
Этот коммит содержится в:
родитель
fa919be622
Коммит
a47b33678b
@ -63,6 +63,7 @@ include tools/Makefile.am
|
||||
include orted/Makefile.am
|
||||
include test/mpi/Makefile.include
|
||||
include test/system/Makefile.include
|
||||
include threads/Makefile.am
|
||||
|
||||
# Set the convenience library to be the same as the non-convenience
|
||||
# library, but a) it's marked as "noinst", so LT knows it's a
|
||||
|
@ -21,6 +21,7 @@ if !ORTE_DISABLE_FULL_SUPPORT
|
||||
libmca_rmcast_la_SOURCES += \
|
||||
base/rmcast_base_close.c \
|
||||
base/rmcast_base_select.c \
|
||||
base/rmcast_base_fns.c
|
||||
base/rmcast_base_fns.c \
|
||||
base/rmcast_base_threads.c
|
||||
|
||||
endif
|
||||
|
@ -23,8 +23,8 @@
|
||||
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "opal/mca/event/event.h"
|
||||
#include "opal/threads/threads.h"
|
||||
|
||||
#include "orte/threads/threads.h"
|
||||
#include "orte/mca/rmcast/rmcast.h"
|
||||
#include "orte/mca/rmcast/base/private.h"
|
||||
|
||||
@ -55,6 +55,13 @@ typedef struct {
|
||||
opal_list_t channels;
|
||||
rmcast_base_channel_t *my_output_channel;
|
||||
rmcast_base_channel_t *my_input_channel;
|
||||
int process_ctl_pipe[2];
|
||||
opal_list_t msg_list;
|
||||
opal_event_base_t *event_base;
|
||||
opal_thread_t recv_thread;
|
||||
orte_thread_ctl_t recv_ctl;
|
||||
opal_thread_t recv_process;
|
||||
orte_thread_ctl_t recv_process_ctl;
|
||||
} orte_rmcast_base_t;
|
||||
|
||||
ORTE_DECLSPEC extern orte_rmcast_base_t orte_rmcast_base;
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include "opal/mca/event/event.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "opal/class/opal_ring_buffer.h"
|
||||
#include "opal/util/fd.h"
|
||||
|
||||
#include "orte/mca/rmcast/rmcast.h"
|
||||
|
||||
@ -115,8 +116,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_base_send_t);
|
||||
* event to break out of the recv and process the message later
|
||||
*/
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t *ev;
|
||||
opal_list_item_t super;
|
||||
opal_buffer_t *buf;
|
||||
} orte_mcast_msg_event_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_mcast_msg_event_t);
|
||||
@ -142,21 +142,20 @@ typedef struct {
|
||||
} rmcast_send_log_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t);
|
||||
|
||||
#define ORTE_MULTICAST_MESSAGE_EVENT(bf, cbfunc) \
|
||||
do { \
|
||||
orte_mcast_msg_event_t *mev; \
|
||||
struct timeval now; \
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
|
||||
"defining mcast msg event: %s %d", \
|
||||
__FILE__, __LINE__)); \
|
||||
mev = OBJ_NEW(orte_mcast_msg_event_t); \
|
||||
mev->buf = (bf); \
|
||||
opal_event_evtimer_set(opal_event_base, \
|
||||
mev->ev, (cbfunc), mev); \
|
||||
now.tv_sec = 0; \
|
||||
now.tv_usec = 0; \
|
||||
opal_event_evtimer_add(mev->ev, &now); \
|
||||
} while(0);
|
||||
#define ORTE_MULTICAST_MESSAGE_EVENT(dt, sz) \
|
||||
do { \
|
||||
orte_mcast_msg_event_t *mev; \
|
||||
char byte='a'; \
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output, \
|
||||
"defining mcast msg event: %s %d", \
|
||||
__FILE__, __LINE__)); \
|
||||
mev = OBJ_NEW(orte_mcast_msg_event_t); \
|
||||
opal_dss.load(mev->buf, (dt), (sz)); \
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); \
|
||||
opal_list_append(&orte_rmcast_base.msg_list, &mev->super); \
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); \
|
||||
opal_fd_write(orte_rmcast_base.process_ctl_pipe[1], 1, &byte); \
|
||||
} while(0);
|
||||
|
||||
|
||||
#define ORTE_MULTICAST_NEXT_SEQUENCE_NUM(seq) \
|
||||
@ -169,10 +168,6 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t);
|
||||
} while(0);
|
||||
|
||||
/**** FUNCTIONS ****/
|
||||
ORTE_DECLSPEC int orte_rmcast_base_build_msg(rmcast_base_channel_t *ch,
|
||||
opal_buffer_t **buffer,
|
||||
rmcast_base_send_t *snd);
|
||||
|
||||
ORTE_DECLSPEC int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
|
||||
orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
@ -181,7 +176,13 @@ ORTE_DECLSPEC int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
|
||||
orte_rmcast_callback_buffer_fn_t cbfunc_buffer,
|
||||
void *cbdata, bool blocking);
|
||||
|
||||
ORTE_DECLSPEC void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg);
|
||||
ORTE_DECLSPEC int orte_rmcast_base_queue_xmit(rmcast_base_send_t *snd,
|
||||
orte_rmcast_channel_t channel,
|
||||
opal_buffer_t **buffer,
|
||||
rmcast_base_channel_t **chan);
|
||||
|
||||
ORTE_DECLSPEC int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread);
|
||||
ORTE_DECLSPEC void orte_rmcast_base_stop_threads(void);
|
||||
|
||||
ORTE_DECLSPEC void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag);
|
||||
|
@ -28,7 +28,14 @@ int orte_rmcast_base_close(void)
|
||||
if (NULL != orte_rmcast.finalize) {
|
||||
orte_rmcast.finalize();
|
||||
}
|
||||
|
||||
|
||||
/* cleanup thread stuff */
|
||||
OBJ_DESTRUCT(&orte_rmcast_base.recv_thread);
|
||||
OBJ_DESTRUCT(&orte_rmcast_base.recv_ctl);
|
||||
OBJ_DESTRUCT(&orte_rmcast_base.recv_process);
|
||||
OBJ_DESTRUCT(&orte_rmcast_base.recv_process_ctl);
|
||||
opal_event_base_finalize(orte_rmcast_base.event_base);
|
||||
|
||||
/* Close all remaining available components (may be one if this is a
|
||||
Open RTE program, or [possibly] multiple if this is ompi_info) */
|
||||
|
||||
|
@ -14,7 +14,7 @@
|
||||
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/mca/base/base.h"
|
||||
#include "opal/threads/threads.h"
|
||||
#include "orte/threads/threads.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
@ -23,29 +23,70 @@
|
||||
#include "orte/mca/rmcast/base/base.h"
|
||||
#include "orte/mca/rmcast/base/private.h"
|
||||
|
||||
static int extract_hdr(opal_buffer_t *buf,
|
||||
orte_process_name_t *name,
|
||||
orte_rmcast_channel_t *channel,
|
||||
orte_rmcast_tag_t *tag,
|
||||
orte_rmcast_seq_t *seq_num);
|
||||
|
||||
static int insert_hdr(opal_buffer_t *buf,
|
||||
orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_seq_t seq_num);
|
||||
|
||||
int orte_rmcast_base_build_msg(rmcast_base_channel_t *ch,
|
||||
opal_buffer_t **buffer,
|
||||
rmcast_base_send_t *snd)
|
||||
int orte_rmcast_base_queue_xmit(rmcast_base_send_t *snd,
|
||||
orte_rmcast_channel_t channel,
|
||||
opal_buffer_t **buffer,
|
||||
rmcast_base_channel_t **chan)
|
||||
{
|
||||
rmcast_base_channel_t *chptr, *ch;
|
||||
int32_t sz;
|
||||
opal_buffer_t *buf;
|
||||
int rc;
|
||||
int8_t flag;
|
||||
int32_t tmp32;
|
||||
opal_buffer_t *buf;
|
||||
opal_list_item_t *item;
|
||||
|
||||
/* setup default responses */
|
||||
*buffer = NULL;
|
||||
*chan = NULL;
|
||||
|
||||
/* if we were asked to send this on our group output
|
||||
* channel, substitute it
|
||||
*/
|
||||
if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
|
||||
if (NULL == orte_rmcast_base.my_output_channel) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
ch = orte_rmcast_base.my_output_channel;
|
||||
goto process;
|
||||
} else if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
|
||||
if (NULL == orte_rmcast_base.my_input_channel) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
ch = orte_rmcast_base.my_input_channel;
|
||||
goto process;
|
||||
}
|
||||
|
||||
/* find the channel */
|
||||
ch = NULL;
|
||||
for (item = opal_list_get_first(&orte_rmcast_base.channels);
|
||||
item != opal_list_get_end(&orte_rmcast_base.channels);
|
||||
item = opal_list_get_next(item)) {
|
||||
chptr = (rmcast_base_channel_t*)item;
|
||||
if (channel == chptr->channel) {
|
||||
ch = chptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == ch) {
|
||||
/* didn't find it */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
process:
|
||||
/* return the channel */
|
||||
*chan = ch;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:build_msg of %d %s"
|
||||
"%s rmcast:base:queue_xmit of %d %s"
|
||||
" for multicast on channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == snd->iovec_array) ? (int)snd->buf->bytes_used : (int)snd->iovec_count,
|
||||
@ -54,7 +95,8 @@ int orte_rmcast_base_build_msg(rmcast_base_channel_t *ch,
|
||||
|
||||
/* setup a buffer */
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
*buffer = buf;
|
||||
|
||||
/* insert the header */
|
||||
if (ORTE_SUCCESS != (rc = insert_hdr(buf, ch->channel, snd->tag, ch->seq_num))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
@ -107,14 +149,12 @@ int orte_rmcast_base_build_msg(rmcast_base_channel_t *ch,
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
*buffer = buf;
|
||||
return ORTE_SUCCESS;
|
||||
|
||||
cleanup:
|
||||
if (NULL != buf) {
|
||||
OBJ_RELEASE(buf);
|
||||
}
|
||||
*buffer = NULL;
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -135,7 +175,7 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
|
||||
|
||||
if (!blocking) {
|
||||
/* do we already have a recv for this channel/tag? */
|
||||
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
for (item = opal_list_get_first(&orte_rmcast_base.recvs);
|
||||
item != opal_list_get_end(&orte_rmcast_base.recvs);
|
||||
item = opal_list_get_next(item)) {
|
||||
@ -154,7 +194,7 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: matching recv already active on multicast channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
|
||||
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
return ORTE_EXISTS;
|
||||
}
|
||||
rptr->cbfunc_iovec = cbfunc_iovec;
|
||||
@ -165,7 +205,7 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: matching recv already active on multicast channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
|
||||
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
return ORTE_EXISTS;
|
||||
}
|
||||
rptr->cbfunc_buffer = cbfunc_buffer;
|
||||
@ -173,9 +213,10 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
|
||||
if (NULL != recvptr) {
|
||||
*recvptr = rptr;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
}
|
||||
|
||||
/* if we get here, then we need to add a new recv */
|
||||
@ -183,7 +224,6 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: adding recv on multicast channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
|
||||
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
|
||||
rptr = OBJ_NEW(rmcast_base_recv_t);
|
||||
rptr->channel = channel;
|
||||
rptr->tag = tag;
|
||||
@ -198,229 +238,17 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
|
||||
/* wildcard tag recvs get pushed to the end of the list so
|
||||
* that specific tag recvs take precedence
|
||||
*/
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
if (ORTE_RMCAST_TAG_WILDCARD == tag) {
|
||||
opal_list_append(&orte_rmcast_base.recvs, &rptr->item);
|
||||
} else {
|
||||
opal_list_prepend(&orte_rmcast_base.recvs, &rptr->item);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg)
|
||||
{
|
||||
orte_rmcast_channel_t channel;
|
||||
opal_list_item_t *item;
|
||||
rmcast_base_recv_t *ptr;
|
||||
orte_process_name_t name;
|
||||
orte_rmcast_tag_t tag;
|
||||
int8_t flag;
|
||||
struct iovec *iovec_array=NULL;
|
||||
int32_t iovec_count=0, i, n, isz;
|
||||
opal_buffer_t *recvd_buf=NULL;
|
||||
int rc;
|
||||
orte_rmcast_seq_t recvd_seq_num;
|
||||
|
||||
/* extract the header */
|
||||
if (ORTE_SUCCESS != (rc = extract_hdr(msg->buf, &name, &channel, &tag, &recvd_seq_num))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* if this message is from myself, ignore it */
|
||||
if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv sent from myself: %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name)));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* if this message is from a different job family, ignore it unless
|
||||
* it is on the system channel. We ignore these messages to avoid
|
||||
* confusion between different jobs since we all may be sharing
|
||||
* multicast channels. The system channel is left open to support
|
||||
* cross-job communications for detecting multiple conflicting DVMs.
|
||||
*/
|
||||
if (ORTE_JOB_FAMILY(name.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) &&
|
||||
(ORTE_RMCAST_SYS_CHANNEL != channel)) {
|
||||
/* if we are not the HNP or a daemon, then we ignore this */
|
||||
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv from a different job family: %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name)));
|
||||
} else {
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
/* unpack the iovec vs buf flag */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &flag, &n, OPAL_INT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %lu",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name), channel, (int)tag,
|
||||
(0 == flag) ? "iovecs" : "buffer", recvd_seq_num));
|
||||
|
||||
|
||||
/* find the recv for this channel, tag, and type */
|
||||
for (item = opal_list_get_first(&orte_rmcast_base.recvs);
|
||||
item != opal_list_get_end(&orte_rmcast_base.recvs);
|
||||
item = opal_list_get_next(item)) {
|
||||
ptr = (rmcast_base_recv_t*)item;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:recv checking channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(int)ptr->channel, (int)ptr->tag));
|
||||
|
||||
if (channel != ptr->channel) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tag != ptr->tag && ORTE_RMCAST_TAG_WILDCARD != ptr->tag) {
|
||||
continue;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:recv delivering message to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
|
||||
/* we have a recv - unpack the data */
|
||||
if (0 == flag) {
|
||||
/* get the number of iovecs in the buffer */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &iovec_count, &n, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
/* malloc the required space */
|
||||
iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
|
||||
/* unpack the iovecs */
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
/* unpack the number of bytes in this iovec */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &isz, &n, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
iovec_array[i].iov_base = NULL;
|
||||
iovec_array[i].iov_len = isz;
|
||||
if (0 < isz) {
|
||||
/* allocate the space */
|
||||
iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(isz);
|
||||
/* unpack the data */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, iovec_array[i].iov_base, &isz, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (NULL != ptr->cbfunc_iovec) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:recv delivering iovecs to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
|
||||
ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag,
|
||||
&name, iovec_array, iovec_count, ptr->cbdata);
|
||||
/* if it isn't persistent, remove it */
|
||||
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
||||
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
|
||||
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
|
||||
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
|
||||
OBJ_RELEASE(ptr);
|
||||
}
|
||||
} else {
|
||||
/* if something is already present, then we have a problem */
|
||||
if (NULL != ptr->iovec_array) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:recv blocking recv already fulfilled",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
goto cleanup;
|
||||
}
|
||||
ptr->seq_num = recvd_seq_num;
|
||||
/* copy over the iovec array since it will be released by
|
||||
* the blocking recv
|
||||
*/
|
||||
ptr->iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
|
||||
ptr->iovec_count = iovec_count;
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
ptr->iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(iovec_array[i].iov_len);
|
||||
ptr->iovec_array[i].iov_len = iovec_array[i].iov_len;
|
||||
memcpy(ptr->iovec_array[i].iov_base, iovec_array[i].iov_base, iovec_array[i].iov_len);
|
||||
}
|
||||
/* flag it as recvd to release blocking recv */
|
||||
ptr->recvd = true;
|
||||
}
|
||||
} else {
|
||||
/* buffer was included */
|
||||
recvd_buf = OBJ_NEW(opal_buffer_t);
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(recvd_buf, msg->buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
if (NULL != ptr->cbfunc_buffer) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:recv delivering buffer to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
|
||||
ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag,
|
||||
&name, recvd_buf, ptr->cbdata);
|
||||
/* if it isn't persistent, remove it */
|
||||
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
||||
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
|
||||
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
|
||||
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
|
||||
OBJ_RELEASE(ptr);
|
||||
}
|
||||
} else {
|
||||
/* if something is already present, then we have a problem */
|
||||
if (NULL != ptr->buf) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:recv blocking recv already fulfilled",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
goto cleanup;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:recv copying buffer for blocking recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
ptr->seq_num = recvd_seq_num;
|
||||
/* copy the buffer across since it will be released
|
||||
* by the blocking recv
|
||||
*/
|
||||
ptr->buf = OBJ_NEW(opal_buffer_t);
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(ptr->buf, recvd_buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
/* flag it as recvd to release blocking recv */
|
||||
ptr->recvd = true;
|
||||
}
|
||||
}
|
||||
/* we are done - only one recv can match */
|
||||
break;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if (NULL != iovec_array) {
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
free(iovec_array[i].iov_base);
|
||||
}
|
||||
free(iovec_array);
|
||||
}
|
||||
if (NULL != recvd_buf) {
|
||||
OBJ_RELEASE(recvd_buf);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag)
|
||||
{
|
||||
@ -444,10 +272,10 @@ void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel,
|
||||
ptr = (rmcast_base_recv_t*)item;
|
||||
if (ch == ptr->channel &&
|
||||
tag == ptr->tag) {
|
||||
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
|
||||
OBJ_RELEASE(ptr);
|
||||
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
}
|
||||
item = next;
|
||||
}
|
||||
@ -495,40 +323,6 @@ int orte_rmcast_base_query(orte_rmcast_channel_t *output, orte_rmcast_channel_t
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int extract_hdr(opal_buffer_t *buf,
|
||||
orte_process_name_t *name,
|
||||
orte_rmcast_channel_t *channel,
|
||||
orte_rmcast_tag_t *tag,
|
||||
orte_rmcast_seq_t *seq_num)
|
||||
{
|
||||
int rc;
|
||||
int32_t n;
|
||||
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, name, &n, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, channel, &n, ORTE_RMCAST_CHANNEL_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, tag, &n, ORTE_RMCAST_TAG_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, seq_num, &n, ORTE_RMCAST_SEQ_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int insert_hdr(opal_buffer_t *buf,
|
||||
orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
|
@ -28,11 +28,13 @@
|
||||
#include "opal/util/opal_sos.h"
|
||||
#include "opal/class/opal_ring_buffer.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "opal/mca/event/event.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/util/parse_options.h"
|
||||
#include "orte/util/show_help.h"
|
||||
#include "orte/threads/threads.h"
|
||||
|
||||
#include "orte/mca/rmcast/base/private.h"
|
||||
|
||||
@ -111,6 +113,13 @@ int orte_rmcast_base_open(void)
|
||||
orte_rmcast_base.active = false;
|
||||
OBJ_CONSTRUCT(&orte_rmcast_base.recvs, opal_list_t);
|
||||
OBJ_CONSTRUCT(&orte_rmcast_base.channels, opal_list_t);
|
||||
OBJ_CONSTRUCT(&orte_rmcast_base.msg_list, opal_list_t);
|
||||
|
||||
orte_rmcast_base.event_base = opal_event_base_create();
|
||||
OBJ_CONSTRUCT(&orte_rmcast_base.recv_thread, opal_thread_t);
|
||||
OBJ_CONSTRUCT(&orte_rmcast_base.recv_ctl, orte_thread_ctl_t);
|
||||
OBJ_CONSTRUCT(&orte_rmcast_base.recv_process, opal_thread_t);
|
||||
OBJ_CONSTRUCT(&orte_rmcast_base.recv_process_ctl, orte_thread_ctl_t);
|
||||
|
||||
orte_rmcast_base.xmit_network = 0;
|
||||
orte_rmcast_base.my_group_name = NULL;
|
||||
@ -122,6 +131,15 @@ int orte_rmcast_base_open(void)
|
||||
orte_rmcast_base.my_output_channel = NULL;
|
||||
orte_rmcast_base.my_input_channel = NULL;
|
||||
|
||||
/* progress rate */
|
||||
mca_base_param_reg_int_name("rmcast", "base_msg_tick_rate",
|
||||
"Number of microsecs between message event loops (default: 10)",
|
||||
false, false, 10, &(orte_rmcast_base.recv_ctl.rate.tv_usec));
|
||||
|
||||
mca_base_param_reg_int_name("rmcast", "base_msg_process_tick_rate",
|
||||
"Number of microsecs between message event loops (default: 100)",
|
||||
false, false, 100, &(orte_rmcast_base.recv_process_ctl.rate.tv_usec));
|
||||
|
||||
/* public multicast channel for this job */
|
||||
mca_base_param_reg_string_name("rmcast", "base_multicast_network",
|
||||
"Network to use for multicast xmissions [link (default) | site | org | global | tuple-addr]",
|
||||
@ -341,16 +359,16 @@ int orte_rmcast_base_open(void)
|
||||
/**** CLASS INSTANCES ****/
|
||||
static void mcast_event_constructor(orte_mcast_msg_event_t *ev)
|
||||
{
|
||||
ev->ev = (opal_event_t*)malloc(sizeof(opal_event_t));
|
||||
ev->buf = OBJ_NEW(opal_buffer_t);
|
||||
}
|
||||
static void mcast_event_destructor(orte_mcast_msg_event_t *ev)
|
||||
{
|
||||
if (NULL != ev->ev) {
|
||||
free(ev->ev);
|
||||
if (NULL != ev->buf) {
|
||||
OBJ_RELEASE(ev->buf);
|
||||
}
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_mcast_msg_event_t,
|
||||
opal_object_t,
|
||||
opal_list_item_t,
|
||||
mcast_event_constructor,
|
||||
mcast_event_destructor);
|
||||
|
||||
@ -376,6 +394,7 @@ static void recv_construct(rmcast_base_recv_t *ptr)
|
||||
ptr->name.vpid = ORTE_VPID_INVALID;
|
||||
ptr->channel = ORTE_RMCAST_INVALID_CHANNEL;
|
||||
ptr->recvd = false;
|
||||
ptr->seq_num = ORTE_RMCAST_SEQ_INVALID;
|
||||
ptr->tag = ORTE_RMCAST_TAG_INVALID;
|
||||
ptr->flags = ORTE_RMCAST_NON_PERSISTENT; /* default */
|
||||
ptr->iovec_array = NULL;
|
||||
|
395
orte/mca/rmcast/base/rmcast_base_threads.c
Обычный файл
395
orte/mca/rmcast/base/rmcast_base_threads.c
Обычный файл
@ -0,0 +1,395 @@
|
||||
/*
|
||||
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "orte_config.h"
|
||||
#include "orte/constants.h"
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/mca/base/base.h"
|
||||
#include "opal/util/fd.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/threads/threads.h"
|
||||
|
||||
#include "orte/mca/rmcast/base/base.h"
|
||||
#include "orte/mca/rmcast/base/private.h"
|
||||
|
||||
static void* rcv_progress_thread(opal_object_t *obj);
|
||||
static void* rcv_processing_thread(opal_object_t *obj);
|
||||
static int extract_hdr(opal_buffer_t *buf,
|
||||
orte_process_name_t *name,
|
||||
orte_rmcast_channel_t *channel,
|
||||
orte_rmcast_tag_t *tag,
|
||||
orte_rmcast_seq_t *seq_num);
|
||||
|
||||
int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
|
||||
{
|
||||
int rc;
|
||||
|
||||
if (rcv_thread) {
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: starting recv thread",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
orte_rmcast_base.recv_thread.t_run = rcv_progress_thread;
|
||||
if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_thread))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
orte_rmcast_base.recv_ctl.running = true;
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: recv thread started",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
}
|
||||
|
||||
if (processing_thread) {
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: starting recv processing thread",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
if (pipe(orte_rmcast_base.process_ctl_pipe) < 0) {
|
||||
opal_output(0, "%s Cannot open processing thread ctl pipe",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
orte_rmcast_base.recv_process.t_run = rcv_processing_thread;
|
||||
if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_process))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
orte_rmcast_base.recv_process_ctl.running = true;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: recv processing thread started",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
void orte_rmcast_base_stop_threads(void)
|
||||
{
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: stopping recv thread",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
/* if the thread is active, stop it */
|
||||
if (orte_rmcast_base.recv_ctl.running) {
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
|
||||
orte_rmcast_base.recv_ctl.stop = true;
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
|
||||
opal_thread_kill(&orte_rmcast_base.recv_thread, SIGKILL);
|
||||
opal_thread_join(&orte_rmcast_base.recv_thread, NULL);
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: stopping recv processing thread",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
if (orte_rmcast_base.recv_process_ctl.running) {
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
orte_rmcast_base.recv_process_ctl.stop = true;
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
opal_thread_kill(&orte_rmcast_base.recv_process, SIGKILL);
|
||||
opal_thread_join(&orte_rmcast_base.recv_process, NULL);
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: all threads stopped",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
}
|
||||
|
||||
static void* rcv_processing_thread(opal_object_t *obj)
|
||||
{
|
||||
orte_mcast_msg_event_t *msg;
|
||||
orte_rmcast_channel_t channel;
|
||||
opal_list_item_t *item;
|
||||
rmcast_base_recv_t *ptr;
|
||||
orte_process_name_t name;
|
||||
orte_rmcast_tag_t tag;
|
||||
int8_t flag;
|
||||
struct iovec *iovec_array=NULL;
|
||||
int32_t iovec_count=0, i, n, isz;
|
||||
int rc;
|
||||
orte_rmcast_seq_t recvd_seq_num;
|
||||
char byte;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: recv processing thread operational",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
while (1) {
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
if (orte_rmcast_base.recv_process_ctl.stop) {
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
return OPAL_THREAD_CANCELLED;
|
||||
}
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
|
||||
/* block here until a trigger arrives */
|
||||
if (0 > (rc = opal_fd_read(orte_rmcast_base.process_ctl_pipe[0], 1, &byte))) {
|
||||
/* if something bad happened, punt */
|
||||
opal_output(0, "%s PUNTING THREAD", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
return OPAL_THREAD_CANCELLED;
|
||||
}
|
||||
/* get a message off the list */
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
if (NULL == (msg = (orte_mcast_msg_event_t*)opal_list_remove_first(&orte_rmcast_base.msg_list))) {
|
||||
/* nothing was there - error */
|
||||
opal_output(0, "%s ERROR PROCESSING MULTICAST MESSAGES",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
goto cleanup;
|
||||
}
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
|
||||
/* extract the header */
|
||||
if (ORTE_SUCCESS != (rc = extract_hdr(msg->buf, &name, &channel, &tag, &recvd_seq_num))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* if this message is from myself, ignore it */
|
||||
if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv sent from myself: %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name)));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* if this message is from a different job family, ignore it unless
|
||||
* it is on the system channel. We ignore these messages to avoid
|
||||
* confusion between different jobs since we all may be sharing
|
||||
* multicast channels. The system channel is left open to support
|
||||
* cross-job communications for detecting multiple conflicting DVMs.
|
||||
*/
|
||||
if (ORTE_JOB_FAMILY(name.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) &&
|
||||
(ORTE_RMCAST_SYS_CHANNEL != channel)) {
|
||||
/* if we are not the HNP or a daemon, then we ignore this */
|
||||
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv from a different job family: %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name)));
|
||||
} else {
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
/* unpack the iovec vs buf flag */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &flag, &n, OPAL_INT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %lu",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name), channel, (int)tag,
|
||||
(0 == flag) ? "iovecs" : "buffer", recvd_seq_num));
|
||||
|
||||
|
||||
/* find the recv for this channel, tag, and type */
|
||||
for (item = opal_list_get_first(&orte_rmcast_base.recvs);
|
||||
item != opal_list_get_end(&orte_rmcast_base.recvs);
|
||||
item = opal_list_get_next(item)) {
|
||||
ptr = (rmcast_base_recv_t*)item;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv checking channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(int)ptr->channel, (int)ptr->tag));
|
||||
|
||||
if (channel != ptr->channel) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tag != ptr->tag && ORTE_RMCAST_TAG_WILDCARD != ptr->tag) {
|
||||
continue;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv delivering message to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
|
||||
/* we have a recv - unpack the data */
|
||||
if (0 == flag) {
|
||||
/* get the number of iovecs in the buffer */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &iovec_count, &n, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
/* malloc the required space */
|
||||
iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
|
||||
/* unpack the iovecs */
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
/* unpack the number of bytes in this iovec */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &isz, &n, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
iovec_array[i].iov_base = NULL;
|
||||
iovec_array[i].iov_len = isz;
|
||||
if (0 < isz) {
|
||||
/* allocate the space */
|
||||
iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(isz);
|
||||
/* unpack the data */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, iovec_array[i].iov_base, &isz, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (NULL != ptr->cbfunc_iovec) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv delivering iovecs to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag,
|
||||
&name, iovec_array, iovec_count, ptr->cbdata);
|
||||
/* if it isn't persistent, remove it */
|
||||
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv removing non-persistent recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
|
||||
OBJ_RELEASE(ptr);
|
||||
}
|
||||
} else {
|
||||
/* if something is already present, then we have a problem */
|
||||
if (NULL != ptr->iovec_array) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv blocking recv already fulfilled",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
goto cleanup;
|
||||
}
|
||||
ptr->seq_num = recvd_seq_num;
|
||||
/* copy over the iovec array since it will be released by
|
||||
* the blocking recv
|
||||
*/
|
||||
ptr->iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
|
||||
ptr->iovec_count = iovec_count;
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
ptr->iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(iovec_array[i].iov_len);
|
||||
ptr->iovec_array[i].iov_len = iovec_array[i].iov_len;
|
||||
memcpy(ptr->iovec_array[i].iov_base, iovec_array[i].iov_base, iovec_array[i].iov_len);
|
||||
}
|
||||
/* flag it as recvd to release blocking recv */
|
||||
ptr->recvd = true;
|
||||
}
|
||||
} else {
|
||||
if (NULL != ptr->cbfunc_buffer) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv delivering buffer to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag,
|
||||
&name, msg->buf, ptr->cbdata);
|
||||
/* if it isn't persistent, remove it */
|
||||
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv removing non-persistent recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
|
||||
OBJ_RELEASE(ptr);
|
||||
}
|
||||
} else {
|
||||
/* if something is already present, then we have a problem */
|
||||
if (NULL != ptr->buf) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv blocking recv already fulfilled",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
goto cleanup;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv copying buffer for blocking recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
ptr->seq_num = recvd_seq_num;
|
||||
/* copy the buffer across since it will be released
|
||||
* by the blocking recv
|
||||
*/
|
||||
ptr->buf = OBJ_NEW(opal_buffer_t);
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(ptr->buf, msg->buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
/* flag it as recvd to release blocking recv */
|
||||
ptr->recvd = true;
|
||||
}
|
||||
}
|
||||
/* we are done - only one recv can match */
|
||||
break;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if (NULL != iovec_array) {
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
free(iovec_array[i].iov_base);
|
||||
}
|
||||
free(iovec_array);
|
||||
iovec_array = NULL;
|
||||
iovec_count = 0;
|
||||
}
|
||||
OBJ_RELEASE(msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void* rcv_progress_thread(opal_object_t *obj)
|
||||
{
|
||||
int events=0;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: recv thread operational",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
while (1) {
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
|
||||
if (orte_rmcast_base.recv_ctl.stop) {
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
|
||||
return OPAL_THREAD_CANCELLED;
|
||||
}
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
|
||||
events += opal_event_loop(orte_rmcast_base.event_base, OPAL_EVLOOP_ONCE);
|
||||
}
|
||||
}
|
||||
|
||||
static int extract_hdr(opal_buffer_t *buf,
|
||||
orte_process_name_t *name,
|
||||
orte_rmcast_channel_t *channel,
|
||||
orte_rmcast_tag_t *tag,
|
||||
orte_rmcast_seq_t *seq_num)
|
||||
{
|
||||
int rc;
|
||||
int32_t n;
|
||||
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, name, &n, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, channel, &n, ORTE_RMCAST_CHANNEL_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, tag, &n, ORTE_RMCAST_TAG_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, seq_num, &n, ORTE_RMCAST_SEQ_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
@ -55,6 +55,8 @@ static void relay_handler(int status, orte_process_name_t* sender,
|
||||
void* cbdata);
|
||||
static void relay(int fd, short event, void *cbdata);
|
||||
|
||||
static int send_data(rmcast_base_send_t *snd, orte_rmcast_channel_t channel);
|
||||
|
||||
/* API FUNCTIONS */
|
||||
static int init(void);
|
||||
|
||||
@ -191,7 +193,7 @@ static int init(void)
|
||||
/* activate a recv to catch relays */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_MULTICAST_RELAY,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
ORTE_RML_PERSISTENT,
|
||||
relay_handler,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
@ -249,10 +251,16 @@ static int init(void)
|
||||
}
|
||||
}
|
||||
|
||||
/* start the processing thread */
|
||||
if (ORTE_SUCCESS != (rc = orte_rmcast_base_start_threads(false, true))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* now activate the non-blocking recv so we catch messages */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_MULTICAST,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
ORTE_RML_PERSISTENT,
|
||||
recv_handler,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
@ -272,6 +280,10 @@ static void finalize(void)
|
||||
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) {
|
||||
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MULTICAST_RELAY);
|
||||
}
|
||||
|
||||
/* stop the processing thread */
|
||||
orte_rmcast_base_stop_threads();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@ -298,16 +310,16 @@ static void internal_snd_buf_cb(int status,
|
||||
send_buf_complete = true;
|
||||
}
|
||||
|
||||
static int queue_xmit(rmcast_base_send_t *snd,
|
||||
orte_rmcast_channel_t channel)
|
||||
static int send_data(rmcast_base_send_t *snd,
|
||||
orte_rmcast_channel_t channel)
|
||||
{
|
||||
opal_list_item_t *item;
|
||||
rmcast_base_channel_t *ch, *chptr;
|
||||
orte_proc_t *proc;
|
||||
orte_odls_child_t *child;
|
||||
int rc, v;
|
||||
opal_buffer_t *buf;
|
||||
|
||||
rmcast_base_channel_t *ch;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:tcp: send of %d %s"
|
||||
" called on multicast channel %d",
|
||||
@ -316,45 +328,8 @@ static int queue_xmit(rmcast_base_send_t *snd,
|
||||
(NULL == snd->iovec_array) ? "bytes" : "iovecs",
|
||||
(int)channel));
|
||||
|
||||
/* if we were asked to send this on our group output
|
||||
* channel, substitute it
|
||||
*/
|
||||
if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
|
||||
if (NULL == orte_rmcast_base.my_output_channel) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
ch = orte_rmcast_base.my_output_channel;
|
||||
goto process;
|
||||
} else if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
|
||||
if (NULL == orte_rmcast_base.my_input_channel) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
ch = orte_rmcast_base.my_input_channel;
|
||||
goto process;
|
||||
}
|
||||
|
||||
/* find the channel */
|
||||
ch = NULL;
|
||||
for (item = opal_list_get_first(&orte_rmcast_base.channels);
|
||||
item != opal_list_get_end(&orte_rmcast_base.channels);
|
||||
item = opal_list_get_next(item)) {
|
||||
chptr = (rmcast_base_channel_t*)item;
|
||||
if (channel == chptr->channel) {
|
||||
ch = chptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == ch) {
|
||||
/* didn't find it */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
process:
|
||||
/* setup the message for xmission */
|
||||
if (ORTE_SUCCESS != (rc = orte_rmcast_base_build_msg(ch, &buf, snd))) {
|
||||
if (ORTE_SUCCESS != (rc = orte_rmcast_base_queue_xmit(snd, channel, &buf, &ch))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
@ -473,8 +448,8 @@ static int queue_xmit(rmcast_base_send_t *snd,
|
||||
}
|
||||
|
||||
static int tcp_send(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count)
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count)
|
||||
{
|
||||
rmcast_base_send_t snd;
|
||||
int ret;
|
||||
@ -487,7 +462,7 @@ static int tcp_send(orte_rmcast_channel_t channel,
|
||||
snd.cbfunc_iovec = internal_snd_cb;
|
||||
send_complete = false;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel))) {
|
||||
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_DESTRUCT(&snd);
|
||||
return ret;
|
||||
@ -501,10 +476,10 @@ static int tcp_send(orte_rmcast_channel_t channel,
|
||||
}
|
||||
|
||||
static int tcp_send_nb(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count,
|
||||
orte_rmcast_callback_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count,
|
||||
orte_rmcast_callback_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
int ret;
|
||||
rmcast_base_send_t snd;
|
||||
@ -517,7 +492,7 @@ static int tcp_send_nb(orte_rmcast_channel_t channel,
|
||||
snd.cbfunc_iovec = cbfunc;
|
||||
snd.cbdata = cbdata;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel))) {
|
||||
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_DESTRUCT(&snd);
|
||||
return ret;
|
||||
@ -528,8 +503,8 @@ static int tcp_send_nb(orte_rmcast_channel_t channel,
|
||||
}
|
||||
|
||||
static int tcp_send_buffer(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf)
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf)
|
||||
{
|
||||
int ret;
|
||||
rmcast_base_send_t snd;
|
||||
@ -541,7 +516,7 @@ static int tcp_send_buffer(orte_rmcast_channel_t channel,
|
||||
snd.cbfunc_buffer = internal_snd_buf_cb;
|
||||
send_buf_complete = false;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel))) {
|
||||
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_DESTRUCT(&snd);
|
||||
return ret;
|
||||
@ -555,10 +530,10 @@ static int tcp_send_buffer(orte_rmcast_channel_t channel,
|
||||
}
|
||||
|
||||
static int tcp_send_buffer_nb(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf,
|
||||
orte_rmcast_callback_buffer_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf,
|
||||
orte_rmcast_callback_buffer_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
int ret;
|
||||
rmcast_base_send_t snd;
|
||||
@ -570,7 +545,7 @@ static int tcp_send_buffer_nb(orte_rmcast_channel_t channel,
|
||||
snd.cbfunc_buffer = cbfunc;
|
||||
snd.cbdata = cbdata;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel))) {
|
||||
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_DESTRUCT(&snd);
|
||||
return ret;
|
||||
@ -618,9 +593,9 @@ static int tcp_recv(orte_process_name_t *name,
|
||||
*count = recvptr->iovec_count;
|
||||
|
||||
/* remove the recv */
|
||||
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
opal_list_remove_item(&orte_rmcast_base.recvs, &recvptr->item);
|
||||
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
OBJ_RELEASE(recvptr);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
@ -702,9 +677,9 @@ static int tcp_recv_buffer(orte_process_name_t *name,
|
||||
/* release the data */
|
||||
OBJ_RELEASE(recvptr->buf);
|
||||
|
||||
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
opal_list_remove_item(&orte_rmcast_base.recvs, &recvptr->item);
|
||||
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
OBJ_RELEASE(recvptr);
|
||||
|
||||
return ret;
|
||||
@ -805,68 +780,22 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
|
||||
}
|
||||
|
||||
|
||||
/**** LOCAL FUNCTIONS ****/
|
||||
|
||||
static void process_recv(int fd, short event, void *cbdata)
|
||||
{
|
||||
orte_mcast_msg_event_t *mev = (orte_mcast_msg_event_t*)cbdata;
|
||||
opal_list_item_t *item;
|
||||
orte_odls_child_t *child;
|
||||
int rc;
|
||||
|
||||
/* if I am a daemon, I need to relay this to my children first */
|
||||
if (ORTE_PROC_IS_DAEMON) {
|
||||
for (item = opal_list_get_first(&orte_local_children);
|
||||
item != opal_list_get_end(&orte_local_children);
|
||||
item = opal_list_get_next(item)) {
|
||||
child = (orte_odls_child_t*)item;
|
||||
if (NULL == child->rml_uri) {
|
||||
/* race condition */
|
||||
continue;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s relaying multicast to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(child->name)));
|
||||
if (0 > (rc = orte_rml.send_buffer(child->name, mev->buf, ORTE_RML_TAG_MULTICAST, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* process the receive */
|
||||
orte_rmcast_base_process_recv(mev);
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(mev);
|
||||
return;
|
||||
}
|
||||
|
||||
/**** LOCAL FUNCTIONS ****/
|
||||
static void recv_handler(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
{
|
||||
int rc;
|
||||
opal_buffer_t *buf;
|
||||
uint8_t *data;
|
||||
int32_t siz;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:tcp recvd multicast msg",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* clear the way for the next message */
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
opal_dss.copy_payload(buf, buffer);
|
||||
ORTE_MULTICAST_MESSAGE_EVENT(buf, process_recv);
|
||||
opal_dss.unload(buffer, (void**)&data, &siz);
|
||||
ORTE_MULTICAST_MESSAGE_EVENT(data, siz);
|
||||
|
||||
/* reissue the recv */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_MULTICAST,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
recv_handler,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -877,7 +806,9 @@ static void relay(int fd, short event, void *cbdata)
|
||||
opal_list_item_t *item;
|
||||
orte_odls_child_t *child;
|
||||
int rc, v;
|
||||
|
||||
uint8_t *data;
|
||||
int32_t siz;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:tcp relaying multicast msg from %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
@ -933,9 +864,9 @@ static void relay(int fd, short event, void *cbdata)
|
||||
}
|
||||
|
||||
/* now process it myself */
|
||||
ORTE_MULTICAST_MESSAGE_EVENT(msg->buffer, process_recv);
|
||||
opal_dss.unload(msg->buffer, (void**)&data, &siz);
|
||||
ORTE_MULTICAST_MESSAGE_EVENT(data, siz);
|
||||
/* protect the buffer */
|
||||
msg->buffer = NULL;
|
||||
OBJ_RELEASE(msg);
|
||||
}
|
||||
|
||||
@ -943,8 +874,6 @@ static void relay_handler(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* if the message is from myself, ignore it */
|
||||
if (sender->jobid == ORTE_PROC_MY_NAME->jobid &&
|
||||
sender->vpid == ORTE_PROC_MY_NAME->vpid) {
|
||||
@ -959,13 +888,5 @@ static void relay_handler(int status, orte_process_name_t* sender,
|
||||
/* clear the way for the next message */
|
||||
ORTE_MESSAGE_EVENT(sender, buffer, tag, relay);
|
||||
|
||||
/* reissue the recv */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_MULTICAST_RELAY,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
relay_handler,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -26,6 +26,7 @@
|
||||
#include "opal/util/if.h"
|
||||
#include "opal/util/net.h"
|
||||
#include "opal/dss/dss.h"
|
||||
#include "opal/mca/event/event.h"
|
||||
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
@ -48,7 +49,7 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction);
|
||||
|
||||
static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket);
|
||||
|
||||
static int xmit_data(rmcast_base_channel_t *chan, rmcast_base_send_t *snd);
|
||||
static int send_data(rmcast_base_send_t *snd, orte_rmcast_channel_t channel);
|
||||
|
||||
/* API FUNCTIONS */
|
||||
static int init(void);
|
||||
@ -56,48 +57,48 @@ static int init(void);
|
||||
static void finalize(void);
|
||||
|
||||
static int udp_send_buffer(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf);
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf);
|
||||
|
||||
static int udp_send_buffer_nb(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf,
|
||||
orte_rmcast_callback_buffer_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf,
|
||||
orte_rmcast_callback_buffer_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
|
||||
static int udp_send(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count);
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count);
|
||||
|
||||
static int udp_send_nb(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count,
|
||||
orte_rmcast_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count,
|
||||
orte_rmcast_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
|
||||
static int udp_recv_buffer(orte_process_name_t *sender,
|
||||
orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_seq_t *seq_num,
|
||||
opal_buffer_t *buf);
|
||||
opal_buffer_t *buf);
|
||||
|
||||
static int udp_recv_buffer_nb(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_flag_t flags,
|
||||
orte_rmcast_callback_buffer_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_flag_t flags,
|
||||
orte_rmcast_callback_buffer_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
|
||||
static int udp_recv(orte_process_name_t *sender,
|
||||
orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_seq_t *seq_num,
|
||||
struct iovec **msg, int *count);
|
||||
struct iovec **msg, int *count);
|
||||
|
||||
static int udp_recv_nb(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_flag_t flags,
|
||||
orte_rmcast_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_flag_t flags,
|
||||
orte_rmcast_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
|
||||
static int open_channel(orte_rmcast_channel_t channel, char *name,
|
||||
char *network, int port, char *interface, uint8_t direction);
|
||||
@ -154,7 +155,7 @@ static int init(void)
|
||||
/* setup the globals */
|
||||
OBJ_CONSTRUCT(&msg_log, opal_pointer_array_t);
|
||||
opal_pointer_array_init(&msg_log, 8, INT_MAX, 8);
|
||||
|
||||
|
||||
/* setup the respective public address channel */
|
||||
if (ORTE_PROC_IS_TOOL) {
|
||||
/* tools only open the sys channel */
|
||||
@ -224,6 +225,14 @@ static int init(void)
|
||||
opal_output(0, "rmcast:udp:init - unknown process type");
|
||||
return ORTE_ERR_SILENT;
|
||||
}
|
||||
|
||||
/* start the recv threads */
|
||||
if (ORTE_SUCCESS != (rc = orte_rmcast_base_start_threads(true, true))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&msg_log);
|
||||
return rc;
|
||||
}
|
||||
|
||||
init_completed = true;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
@ -236,7 +245,10 @@ static void finalize(void)
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:udp: finalize called",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
|
||||
/* stop the threads */
|
||||
orte_rmcast_base_stop_threads();
|
||||
|
||||
for (j=0; j < msg_log.size; j++) {
|
||||
if (NULL != (log = opal_pointer_array_get_item(&msg_log, j))) {
|
||||
OBJ_RELEASE(log);
|
||||
@ -269,71 +281,9 @@ static void internal_snd_buf_cb(int status,
|
||||
((rmcast_base_send_t *)cbdata)->send_complete = true;
|
||||
}
|
||||
|
||||
static int queue_xmit(rmcast_base_send_t *snd,
|
||||
orte_rmcast_channel_t channel)
|
||||
{
|
||||
rmcast_base_channel_t *ch, *chptr;
|
||||
opal_list_item_t *item;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:udp: send of %d %s"
|
||||
" called on multicast channel %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == snd->iovec_array) ? (int)snd->buf->bytes_used : (int)snd->iovec_count,
|
||||
(NULL == snd->iovec_array) ? "bytes" : "iovecs",
|
||||
(int)channel));
|
||||
|
||||
/* if we were asked to send this on our group output
|
||||
* channel, substitute it
|
||||
*/
|
||||
if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
|
||||
if (NULL == orte_rmcast_base.my_output_channel) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
ch = orte_rmcast_base.my_output_channel;
|
||||
goto process;
|
||||
} else if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
|
||||
if (NULL == orte_rmcast_base.my_input_channel) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
ch = orte_rmcast_base.my_input_channel;
|
||||
goto process;
|
||||
}
|
||||
|
||||
/* find the channel */
|
||||
ch = NULL;
|
||||
for (item = opal_list_get_first(&orte_rmcast_base.channels);
|
||||
item != opal_list_get_end(&orte_rmcast_base.channels);
|
||||
item = opal_list_get_next(item)) {
|
||||
chptr = (rmcast_base_channel_t*)item;
|
||||
if (channel == chptr->channel) {
|
||||
ch = chptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == ch) {
|
||||
/* didn't find it */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
process:
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:udp: queue xmit of %d %s"
|
||||
" called on multicast channel %03d.%03d.%03d.%03d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == snd->iovec_array) ? (int)snd->buf->bytes_used : (int)snd->iovec_count,
|
||||
(NULL == snd->iovec_array) ? "bytes" : "iovecs",
|
||||
OPAL_IF_FORMAT_ADDR(ch->network)));
|
||||
|
||||
return xmit_data(ch, snd);
|
||||
}
|
||||
|
||||
static int udp_send(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count)
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count)
|
||||
{
|
||||
rmcast_base_send_t *snd;
|
||||
int ret;
|
||||
@ -346,7 +296,7 @@ static int udp_send(orte_rmcast_channel_t channel,
|
||||
snd->cbfunc_iovec = internal_snd_cb;
|
||||
snd->cbdata = snd;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = queue_xmit(snd, channel))) {
|
||||
if (ORTE_SUCCESS != (ret = send_data(snd, channel))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
return ret;
|
||||
}
|
||||
@ -358,10 +308,10 @@ static int udp_send(orte_rmcast_channel_t channel,
|
||||
}
|
||||
|
||||
static int udp_send_nb(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count,
|
||||
orte_rmcast_callback_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
orte_rmcast_tag_t tag,
|
||||
struct iovec *msg, int count,
|
||||
orte_rmcast_callback_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
int ret;
|
||||
rmcast_base_send_t *snd;
|
||||
@ -374,7 +324,7 @@ static int udp_send_nb(orte_rmcast_channel_t channel,
|
||||
snd->cbfunc_iovec = cbfunc;
|
||||
snd->cbdata = cbdata;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = queue_xmit(snd, channel))) {
|
||||
if (ORTE_SUCCESS != (ret = send_data(snd, channel))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
return ret;
|
||||
}
|
||||
@ -383,8 +333,8 @@ static int udp_send_nb(orte_rmcast_channel_t channel,
|
||||
}
|
||||
|
||||
static int udp_send_buffer(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf)
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf)
|
||||
{
|
||||
int ret;
|
||||
rmcast_base_send_t *snd;
|
||||
@ -396,7 +346,7 @@ static int udp_send_buffer(orte_rmcast_channel_t channel,
|
||||
snd->cbfunc_buffer = internal_snd_buf_cb;
|
||||
snd->cbdata = snd;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = queue_xmit(snd, channel))) {
|
||||
if (ORTE_SUCCESS != (ret = send_data(snd, channel))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_RELEASE(snd);
|
||||
return ret;
|
||||
@ -409,10 +359,10 @@ static int udp_send_buffer(orte_rmcast_channel_t channel,
|
||||
}
|
||||
|
||||
static int udp_send_buffer_nb(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf,
|
||||
orte_rmcast_callback_buffer_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
orte_rmcast_tag_t tag,
|
||||
opal_buffer_t *buf,
|
||||
orte_rmcast_callback_buffer_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
int ret;
|
||||
rmcast_base_send_t *snd;
|
||||
@ -424,7 +374,7 @@ static int udp_send_buffer_nb(orte_rmcast_channel_t channel,
|
||||
snd->cbfunc_buffer = cbfunc;
|
||||
snd->cbdata = cbdata;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = queue_xmit(snd, channel))) {
|
||||
if (ORTE_SUCCESS != (ret = send_data(snd, channel))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_RELEASE(snd);
|
||||
return ret;
|
||||
@ -480,9 +430,9 @@ static int udp_recv(orte_process_name_t *name,
|
||||
}
|
||||
|
||||
static int udp_recv_nb(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_flag_t flags,
|
||||
orte_rmcast_callback_fn_t cbfunc, void *cbdata)
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_flag_t flags,
|
||||
orte_rmcast_callback_fn_t cbfunc, void *cbdata)
|
||||
{
|
||||
orte_rmcast_channel_t chan;
|
||||
int ret;
|
||||
@ -511,10 +461,10 @@ static int udp_recv_nb(orte_rmcast_channel_t channel,
|
||||
}
|
||||
|
||||
static int udp_recv_buffer(orte_process_name_t *name,
|
||||
orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag,
|
||||
orte_rmcast_seq_t *seq_num,
|
||||
opal_buffer_t *buf)
|
||||
opal_buffer_t *buf)
|
||||
{
|
||||
rmcast_base_recv_t *recvptr;
|
||||
int ret;
|
||||
@ -715,27 +665,17 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
|
||||
|
||||
/**** LOCAL FUNCTIONS ****/
|
||||
|
||||
static void process_recv(int fd, short event, void *cbdata)
|
||||
{
|
||||
orte_mcast_msg_event_t *msg = (orte_mcast_msg_event_t*)cbdata;
|
||||
|
||||
orte_rmcast_base_process_recv(msg);
|
||||
OBJ_RELEASE(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
static void recv_handler(int sd, short flags, void* cbdata)
|
||||
{
|
||||
uint8_t *data;
|
||||
ssize_t sz;
|
||||
ssize_t siz;
|
||||
rmcast_base_channel_t *chan = (rmcast_base_channel_t*)cbdata;
|
||||
opal_buffer_t *buf;
|
||||
|
||||
|
||||
/* read the data */
|
||||
data = (uint8_t*)malloc(orte_rmcast_udp_sndbuf_size * sizeof(uint8_t));
|
||||
sz = read(sd, data, orte_rmcast_udp_sndbuf_size);
|
||||
siz = read(sd, data, orte_rmcast_udp_sndbuf_size);
|
||||
|
||||
if (sz <= 0) {
|
||||
if (siz <= 0) {
|
||||
/* this shouldn't happen - report the errno */
|
||||
opal_output(0, "%s Error on multicast recv socket event: %s(%d)",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(errno), errno);
|
||||
@ -745,13 +685,10 @@ static void recv_handler(int sd, short flags, void* cbdata)
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:udp recvd %d bytes from channel %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(int)sz, (int)chan->channel));
|
||||
(int)siz, (int)chan->channel));
|
||||
|
||||
/* clear the way for the next message */
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
opal_dss.load(buf, data, sz);
|
||||
ORTE_MULTICAST_MESSAGE_EVENT(buf, process_recv);
|
||||
|
||||
ORTE_MULTICAST_MESSAGE_EVENT(data, siz);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -804,7 +741,7 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction)
|
||||
"%s setup:channel activating recv event on fd %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),(int)chan->recv));
|
||||
|
||||
opal_event_set(opal_event_base, &chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan);
|
||||
opal_event_set(orte_rmcast_base.event_base, &chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan);
|
||||
opal_event_add(&chan->recv_ev, 0);
|
||||
}
|
||||
|
||||
@ -950,22 +887,23 @@ static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket)
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int xmit_data(rmcast_base_channel_t *chan, rmcast_base_send_t *snd)
|
||||
static int send_data(rmcast_base_send_t *snd, orte_rmcast_channel_t channel)
|
||||
{
|
||||
char *bytes;
|
||||
int32_t sz;
|
||||
int rc;
|
||||
opal_buffer_t *buf;
|
||||
|
||||
rmcast_base_channel_t *ch;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s transmitting data for channel %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), chan->channel));
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel));
|
||||
|
||||
/* setup the message for xmission */
|
||||
if (ORTE_SUCCESS != (rc = orte_rmcast_base_build_msg(chan, &buf, snd))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* setup the message for xmission */
|
||||
if (ORTE_SUCCESS != (rc = orte_rmcast_base_queue_xmit(snd, channel, &buf, &ch))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/* store the working buf in the send ring buffer in case we
|
||||
@ -997,13 +935,13 @@ static int xmit_data(rmcast_base_channel_t *chan, rmcast_base_send_t *snd)
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:udp multicasting %d bytes to network %03d.%03d.%03d.%03d port %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sz,
|
||||
OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port, (int)snd->tag));
|
||||
OPAL_IF_FORMAT_ADDR(ch->network), (int)ch->port, (int)snd->tag));
|
||||
|
||||
if (sz != (rc = sendto(chan->xmit, bytes, sz, 0,
|
||||
(struct sockaddr *)&(chan->addr), sizeof(struct sockaddr_in)))) {
|
||||
if (sz != (rc = sendto(ch->xmit, bytes, sz, 0,
|
||||
(struct sockaddr *)&(ch->addr), sizeof(struct sockaddr_in)))) {
|
||||
/* didn't get the message out */
|
||||
opal_output(0, "%s failed to send message to multicast network %03d.%03d.%03d.%03d on\n\terror %s(%d)",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(chan->network),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network),
|
||||
strerror(errno), errno);
|
||||
rc = errno;
|
||||
} else {
|
||||
@ -1013,19 +951,19 @@ static int xmit_data(rmcast_base_channel_t *chan, rmcast_base_send_t *snd)
|
||||
if (NULL != snd->buf) {
|
||||
/* call the cbfunc if required */
|
||||
if (NULL != snd->cbfunc_buffer) {
|
||||
snd->cbfunc_buffer(rc, chan->channel, chan->seq_num, snd->tag,
|
||||
snd->cbfunc_buffer(rc, ch->channel, ch->seq_num, snd->tag,
|
||||
ORTE_PROC_MY_NAME, snd->buf, snd->cbdata);
|
||||
}
|
||||
} else {
|
||||
/* call the cbfunc if required */
|
||||
if (NULL != snd->cbfunc_iovec) {
|
||||
snd->cbfunc_iovec(rc, chan->channel, chan->seq_num, snd->tag, ORTE_PROC_MY_NAME,
|
||||
snd->cbfunc_iovec(rc, ch->channel, ch->seq_num, snd->tag, ORTE_PROC_MY_NAME,
|
||||
snd->iovec_array, snd->iovec_count, snd->cbdata);
|
||||
}
|
||||
}
|
||||
|
||||
/* roll to next message sequence number */
|
||||
ORTE_MULTICAST_NEXT_SEQUENCE_NUM(chan->seq_num);
|
||||
ORTE_MULTICAST_NEXT_SEQUENCE_NUM(ch->seq_num);
|
||||
CLEANUP:
|
||||
return rc;
|
||||
}
|
||||
|
@ -163,7 +163,6 @@ static void cbfunc(int status,
|
||||
}
|
||||
}
|
||||
recvd_seq_num = seq_num;
|
||||
|
||||
if (0 == (recvd_seq_num % 100)) {
|
||||
opal_output(0, "RECVD SEQ_NUM %lu", recvd_seq_num);
|
||||
}
|
||||
@ -184,7 +183,6 @@ static void cbfunc_iovec(int status,
|
||||
}
|
||||
}
|
||||
recvd_seq_num = seq_num;
|
||||
|
||||
if (0 == (recvd_seq_num % 100)) {
|
||||
opal_output(0, "RECVD SEQ_NUM %lu", recvd_seq_num);
|
||||
}
|
||||
|
20
orte/threads/Makefile.am
Обычный файл
20
orte/threads/Makefile.am
Обычный файл
@ -0,0 +1,20 @@
|
||||
# -*- makefile -*-
|
||||
#
|
||||
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
# This makefile.am does not stand on its own - it is included from orte/Makefile.am
|
||||
|
||||
# Source code files
|
||||
headers += \
|
||||
threads/condition.h \
|
||||
threads/mutex.h \
|
||||
threads/threads.h
|
||||
|
||||
libopen_rte_la_SOURCES += \
|
||||
threads/thread.c
|
147
orte/threads/condition.h
Обычный файл
147
orte/threads/condition.h
Обычный файл
@ -0,0 +1,147 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
#ifndef ORTE_CONDITION_SPINLOCK_H
|
||||
#define ORTE_CONDITION_SPINLOCK_H
|
||||
|
||||
#include "orte_config.h"
|
||||
#ifdef HAVE_SYS_TIME_H
|
||||
#include <sys/time.h>
|
||||
#endif
|
||||
#ifdef HAVE_TIME_H
|
||||
#include <time.h>
|
||||
#endif
|
||||
#if OPAL_HAVE_POSIX_THREADS
|
||||
#include <pthread.h>
|
||||
#elif OPAL_HAVE_SOLARIS_THREADS
|
||||
#include <thread.h>
|
||||
#include <synch.h>
|
||||
#endif
|
||||
|
||||
#include "opal/threads/threads.h"
|
||||
#include "opal/runtime/opal_cr.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
static inline int orte_condition_wait(opal_condition_t *c, opal_mutex_t *m)
|
||||
{
|
||||
int rc = 0;
|
||||
c->c_waiting++;
|
||||
|
||||
#if OPAL_HAVE_POSIX_THREADS
|
||||
rc = pthread_cond_wait(&c->c_cond, &m->m_lock_pthread);
|
||||
#elif OPAL_HAVE_SOLARIS_THREADS
|
||||
rc = cond_wait(&c->c_cond, &m->m_lock_solaris);
|
||||
#else
|
||||
if (c->c_signaled) {
|
||||
c->c_waiting--;
|
||||
opal_mutex_unlock(m);
|
||||
opal_progress();
|
||||
OPAL_CR_TEST_CHECKPOINT_READY_STALL();
|
||||
opal_mutex_lock(m);
|
||||
return 0;
|
||||
}
|
||||
while (c->c_signaled == 0) {
|
||||
opal_mutex_unlock(m);
|
||||
opal_progress();
|
||||
OPAL_CR_TEST_CHECKPOINT_READY_STALL();
|
||||
opal_mutex_lock(m);
|
||||
}
|
||||
#endif
|
||||
|
||||
c->c_signaled--;
|
||||
c->c_waiting--;
|
||||
return rc;
|
||||
}
|
||||
|
||||
static inline int orte_condition_timedwait(opal_condition_t *c,
|
||||
opal_mutex_t *m,
|
||||
const struct timespec *abstime)
|
||||
{
|
||||
int rc = 0;
|
||||
|
||||
c->c_waiting++;
|
||||
#if OPAL_HAVE_POSIX_THREADS
|
||||
rc = pthread_cond_timedwait(&c->c_cond, &m->m_lock_pthread, abstime);
|
||||
#elif OPAL_HAVE_SOLARIS_THREADS
|
||||
{
|
||||
/* deal with const-ness */
|
||||
timestruc_t to;
|
||||
to.tv_sec = abstime->tv_sec;
|
||||
to.tv_nsec = abstime->tv_nsec;
|
||||
rc = cond_timedwait(&c->c_cond, &m->m_lock_solaris, &to);
|
||||
}
|
||||
#else
|
||||
{
|
||||
struct timeval tv;
|
||||
struct timeval absolute;
|
||||
absolute.tv_sec = abstime->tv_sec;
|
||||
absolute.tv_usec = abstime->tv_nsec * 1000;
|
||||
gettimeofday(&tv,NULL);
|
||||
if (c->c_signaled == 0) {
|
||||
do {
|
||||
opal_mutex_unlock(m);
|
||||
opal_progress();
|
||||
gettimeofday(&tv,NULL);
|
||||
opal_mutex_lock(m);
|
||||
} while (c->c_signaled == 0 &&
|
||||
(tv.tv_sec <= absolute.tv_sec ||
|
||||
(tv.tv_sec == absolute.tv_sec && tv.tv_usec < absolute.tv_usec)));
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
if (c->c_signaled != 0) c->c_signaled--;
|
||||
c->c_waiting--;
|
||||
return rc;
|
||||
}
|
||||
|
||||
static inline int orte_condition_signal(opal_condition_t *c)
|
||||
{
|
||||
if (c->c_waiting) {
|
||||
c->c_signaled++;
|
||||
#if OPAL_HAVE_POSIX_THREADS
|
||||
pthread_cond_signal(&c->c_cond);
|
||||
#elif OPAL_HAVE_SOLARIS_THREADS
|
||||
cond_signal(&c->c_cond);
|
||||
#endif
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int orte_condition_broadcast(opal_condition_t *c)
|
||||
{
|
||||
c->c_signaled = c->c_waiting;
|
||||
#if OPAL_HAVE_POSIX_THREADS
|
||||
if( 1 == c->c_waiting ) {
|
||||
pthread_cond_signal(&c->c_cond);
|
||||
} else {
|
||||
pthread_cond_broadcast(&c->c_cond);
|
||||
}
|
||||
#elif OPAL_HAVE_SOLARIS_THREADS
|
||||
cond_broadcast(&c->c_cond);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
||||
|
69
orte/threads/mutex.h
Обычный файл
69
orte/threads/mutex.h
Обычный файл
@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
|
||||
*
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef ORTE_MUTEX_H
|
||||
#define ORTE_MUTEX_H
|
||||
|
||||
#include "orte_config.h"
|
||||
|
||||
#include "opal/sys/atomic.h"
|
||||
#include "opal/threads/mutex.h"
|
||||
#if OPAL_ENABLE_DEBUG
|
||||
#include "opal/util/output.h"
|
||||
#endif
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
/* Lock a mutex */
|
||||
#define ORTE_THREAD_LOCK(mutex) opal_mutex_lock(mutex)
|
||||
|
||||
/**
|
||||
* Try to lock a mutex
|
||||
* Returns 0 if mutex was locked, non-zero otherwise.
|
||||
*/
|
||||
#define ORTE_THREAD_TRYLOCK(mutex) opal_mutex_trylock(mutex)
|
||||
|
||||
/** Unlock a mutex */
|
||||
#define ORTE_THREAD_UNLOCK(mutex) opal_mutex_unlock(mutex)
|
||||
|
||||
|
||||
/* Lock a mutex */
|
||||
#define ORTE_THREAD_SCOPED_LOCK(mutex, action) \
|
||||
do { \
|
||||
opal_mutex_lock(mutex); \
|
||||
(action); \
|
||||
opal_mutex_unlock(mutex); \
|
||||
} while (0)
|
||||
|
||||
/* Use an atomic operation for increment/decrement */
|
||||
|
||||
#define ORTE_THREAD_ADD32(x,y) opal_atomic_add_32(x,y)
|
||||
|
||||
#define ORTE_THREAD_ADD64(x,y) opal_atomic_add_64(x,y)
|
||||
|
||||
#define ORTE_THREAD_ADD_SIZE_T(x,y) opal_atomic_add_size_t(x,y)
|
||||
|
||||
#define ORTE_CMPSET(x, y, z) ((*(x) == (y)) ? ((*(x) = (z)), 1) : 0)
|
||||
|
||||
#if OPAL_HAVE_ATOMIC_CMPSET_32
|
||||
#define ORTE_ATOMIC_CMPSET_32(x, y, z) opal_atomic_cmpset_32(x, y, z)
|
||||
# endif
|
||||
|
||||
# if OPAL_HAVE_ATOMIC_CMPSET_64
|
||||
#define ORTE_ATOMIC_CMPSET_64(x, y, z) opal_atomic_cmpset_64(x, y, z)
|
||||
#endif
|
||||
|
||||
#if OPAL_HAVE_ATOMIC_CMPSET_32 || OPAL_HAVE_ATOMIC_CMPSET_64
|
||||
#define ORTE_ATOMIC_CMPSET(x, y, z) opal_atomic_cmpset(x, y, z)
|
||||
#endif
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif /* ORTE_MUTEX_H */
|
34
orte/threads/thread.c
Обычный файл
34
orte/threads/thread.c
Обычный файл
@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "orte_config.h"
|
||||
|
||||
#include "orte/threads/threads.h"
|
||||
|
||||
bool orte_debug_threads = false;
|
||||
|
||||
static void constructor(orte_thread_ctl_t *ptr)
|
||||
{
|
||||
OBJ_CONSTRUCT(&ptr->lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&ptr->cond, opal_condition_t);
|
||||
ptr->active = false;
|
||||
ptr->running = false;
|
||||
ptr->stop = false;
|
||||
ptr->rate.tv_sec = 0;
|
||||
ptr->rate.tv_usec = 0;
|
||||
}
|
||||
static void destructor(orte_thread_ctl_t *ptr)
|
||||
{
|
||||
OBJ_DESTRUCT(&ptr->lock);
|
||||
OBJ_DESTRUCT(&ptr->cond);
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_thread_ctl_t,
|
||||
opal_object_t,
|
||||
constructor, destructor);
|
||||
|
107
orte/threads/threads.h
Обычный файл
107
orte/threads/threads.h
Обычный файл
@ -0,0 +1,107 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2006 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef ORTE_THREAD_H
|
||||
#define ORTE_THREAD_H
|
||||
|
||||
#include "orte_config.h"
|
||||
|
||||
#include "opal/class/opal_object.h"
|
||||
#if OPAL_ENABLE_DEBUG
|
||||
#include "opal/util/output.h"
|
||||
#endif
|
||||
|
||||
#include "mutex.h"
|
||||
#include "condition.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_mutex_t lock;
|
||||
opal_condition_t cond;
|
||||
volatile bool active;
|
||||
volatile bool running;
|
||||
volatile bool stop;
|
||||
struct timeval rate;
|
||||
} orte_thread_ctl_t;
|
||||
OBJ_CLASS_DECLARATION(orte_thread_ctl_t);
|
||||
|
||||
ORTE_DECLSPEC extern bool orte_debug_threads;
|
||||
|
||||
#if OPAL_ENABLE_DEBUG
|
||||
#define ORTE_ACQUIRE_THREAD(ctl) \
|
||||
do { \
|
||||
ORTE_THREAD_LOCK(&(ctl)->lock); \
|
||||
if (orte_debug_threads) { \
|
||||
opal_output(0, "Waiting for thread %s:%d", \
|
||||
__FILE__, __LINE__); \
|
||||
} \
|
||||
while ((ctl)->active) { \
|
||||
orte_condition_wait(&(ctl)->cond, &(ctl)->lock); \
|
||||
} \
|
||||
if (orte_debug_threads) { \
|
||||
opal_output(0, "Thread obtained %s:%d", \
|
||||
__FILE__, __LINE__); \
|
||||
} \
|
||||
(ctl)->active = true; \
|
||||
} while(0);
|
||||
#else
|
||||
#define ORTE_ACQUIRE_THREAD(ctl) \
|
||||
do { \
|
||||
ORTE_THREAD_LOCK(&(ctl)->lock); \
|
||||
while ((ctl)->active) { \
|
||||
orte_condition_wait(&(ctl)->cond, &(ctl)->lock); \
|
||||
} \
|
||||
(ctl)->active = true; \
|
||||
} while(0);
|
||||
#endif
|
||||
|
||||
|
||||
#if OPAL_ENABLE_DEBUG
|
||||
#define ORTE_RELEASE_THREAD(ctl) \
|
||||
do { \
|
||||
if (orte_debug_threads) { \
|
||||
opal_output(0, "Releasing thread %s:%d", \
|
||||
__FILE__, __LINE__); \
|
||||
} \
|
||||
(ctl)->active = false; \
|
||||
orte_condition_broadcast(&(ctl)->cond); \
|
||||
ORTE_THREAD_UNLOCK(&(ctl)->lock); \
|
||||
} while(0);
|
||||
#else
|
||||
#define ORTE_RELEASE_THREAD(ctl) \
|
||||
do { \
|
||||
(ctl)->active = false; \
|
||||
orte_condition_broadcast(&(ctl)->cond); \
|
||||
ORTE_THREAD_UNLOCK(&(ctl)->lock); \
|
||||
} while(0);
|
||||
#endif
|
||||
|
||||
|
||||
#define ORTE_WAKEUP_THREAD(ctl) \
|
||||
do { \
|
||||
(ctl)->active = false; \
|
||||
orte_condition_broadcast(&(ctl)->cond); \
|
||||
} while(0);
|
||||
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif /* ORTE_THREAD_H */
|
Загрузка…
Ссылка в новой задаче
Block a user