1
1

Change the event base "wakeup" support to enable the passing of events to the central thread for add/del. Add a macro OPAL_UPDATE_EVBASE for this purpose as it will likely be widely used.

Update the ORTE thread support to utilize this capability. Update the rmcast framework to track the change.

This commit was SVN r24121.
Этот коммит содержится в:
Ralph Castain 2010-12-01 04:26:43 +00:00
родитель 963336ee5a
Коммит c56185887b
12 изменённых файлов: 279 добавлений и 284 удалений

Просмотреть файл

@ -65,13 +65,34 @@ typedef struct event opal_event_t;
* a condition_wait in another base, we need * a condition_wait in another base, we need
* to "wakeup" the event base in the second base * to "wakeup" the event base in the second base
* so the condition_wait can be checked * so the condition_wait can be checked
*
* On a more permanent level, use this to update
* the event base when it is being progressed in
* a separate thread.
*/ */
typedef struct { typedef struct {
struct event_base *base; struct event_base *base;
opal_event_t wakeup_event; opal_event_t update_event;
int wakeup_pipe[2]; int update_pipe[2];
} opal_event_base_t; } opal_event_base_t;
typedef struct {
opal_event_t *ev;
uint8_t op;
} opal_event_update_t;
#define OPAL_EVENT_NOOP 0x00
#define OPAL_EVENT_ADD 0x01
#define OPAL_EVENT_DEL 0x02
#define OPAL_UPDATE_EVBASE(b, evt, ad) \
do { \
opal_event_update_t up; \
up.ev = (evt); \
up.op = (ad); \
opal_fd_write((b)->update_pipe[1], sizeof(opal_event_update_t), &up); \
} while(0);
BEGIN_C_DECLS BEGIN_C_DECLS
/* Temporary global - will be replaced by layer-specific event bases */ /* Temporary global - will be replaced by layer-specific event bases */

Просмотреть файл

@ -100,12 +100,20 @@ static const struct eventop *eventops[] = {
static struct event_config *config=NULL; static struct event_config *config=NULL;
static void wakeup_event(int fd, short flags, void* arg) static void update_event(int fd, short flags, void* arg)
{ {
char byte; opal_event_update_t up;
/* clear the event */ /* read the event */
opal_fd_read(fd, 1, &byte); opal_fd_read(fd, sizeof(opal_event_update_t), &up);
if (NULL == up.ev) {
return;
}
if (OPAL_EVENT_ADD == up.op) {
event_add(up.ev, 0);
} else if (OPAL_EVENT_DEL == up.op) {
event_del(up.ev);
}
return; return;
} }
@ -130,26 +138,26 @@ opal_event_base_t* opal_event_base_create(void)
} }
evbase = (opal_event_base_t*)malloc(sizeof(opal_event_base_t)); evbase = (opal_event_base_t*)malloc(sizeof(opal_event_base_t));
evbase->base = base; evbase->base = base;
if (pipe(evbase->wakeup_pipe) < 0) { if (pipe(evbase->update_pipe) < 0) {
opal_output(0, "Unable to open wakeup pipe"); opal_output(0, "Unable to open update pipe");
free(evbase); free(evbase);
event_base_free(base); event_base_free(base);
return NULL; return NULL;
} }
event_assign(&evbase->wakeup_event, base, event_assign(&evbase->update_event, base,
evbase->wakeup_pipe[0], EV_READ | EV_PERSIST, evbase->update_pipe[0], EV_READ | EV_PERSIST,
wakeup_event, NULL); update_event, NULL);
event_add(&evbase->wakeup_event, 0); event_add(&evbase->update_event, 0);
return evbase; return evbase;
} }
void opal_event_base_finalize(opal_event_base_t *evbase) void opal_event_base_finalize(opal_event_base_t *evbase)
{ {
/* delete the wakeup event */ /* delete the wakeup event */
event_del(&evbase->wakeup_event); event_del(&evbase->update_event);
/* close the pipe */ /* close the pipe */
close(evbase->wakeup_pipe[0]); close(evbase->update_pipe[0]);
close(evbase->wakeup_pipe[1]); close(evbase->update_pipe[1]);
/* release the base */ /* release the base */
event_base_free(evbase->base); event_base_free(evbase->base);
/* free the storage */ /* free the storage */

Просмотреть файл

@ -48,15 +48,11 @@ typedef struct {
uint16_t ports[256]; uint16_t ports[256];
int cache_size; int cache_size;
bool opened; bool opened;
opal_mutex_t lock; orte_thread_ctl_t main_ctl;
opal_condition_t cond;
bool active;
opal_list_t recvs; opal_list_t recvs;
opal_list_t channels; opal_list_t channels;
rmcast_base_channel_t *my_output_channel; rmcast_base_channel_t *my_output_channel;
rmcast_base_channel_t *my_input_channel; rmcast_base_channel_t *my_input_channel;
bool enable_progress_thread;
opal_list_t msg_list;
opal_event_base_t *event_base; opal_event_base_t *event_base;
opal_thread_t recv_thread; opal_thread_t recv_thread;
orte_thread_ctl_t recv_ctl; orte_thread_ctl_t recv_ctl;

Просмотреть файл

@ -29,6 +29,7 @@
#include "opal/class/opal_ring_buffer.h" #include "opal/class/opal_ring_buffer.h"
#include "opal/util/fd.h" #include "opal/util/fd.h"
#include "orte/threads/threads.h"
#include "orte/mca/rmcast/rmcast.h" #include "orte/mca/rmcast/rmcast.h"
BEGIN_C_DECLS BEGIN_C_DECLS
@ -77,7 +78,7 @@ typedef struct {
orte_process_name_t name; orte_process_name_t name;
orte_rmcast_channel_t channel; orte_rmcast_channel_t channel;
orte_rmcast_seq_t seq_num; orte_rmcast_seq_t seq_num;
bool recvd; orte_thread_ctl_t ctl;
orte_rmcast_tag_t tag; orte_rmcast_tag_t tag;
orte_rmcast_flag_t flags; orte_rmcast_flag_t flags;
struct iovec *iovec_array; struct iovec *iovec_array;
@ -103,24 +104,11 @@ typedef struct {
orte_rmcast_callback_fn_t cbfunc_iovec; orte_rmcast_callback_fn_t cbfunc_iovec;
orte_rmcast_callback_buffer_fn_t cbfunc_buffer; orte_rmcast_callback_buffer_fn_t cbfunc_buffer;
void *cbdata; void *cbdata;
bool send_complete; orte_thread_ctl_t ctl;
} rmcast_base_send_t; } rmcast_base_send_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_base_send_t); ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_base_send_t);
/* Setup an event to process a multicast message
*
* Multicast messages can come at any time and rate. To minimize
* the probability of loss, and to avoid conflict when we send
* data when responding to an input message, we use a timer
* event to break out of the recv and process the message later
*/
typedef struct {
opal_list_item_t super;
opal_buffer_t *buf;
} orte_mcast_msg_event_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_mcast_msg_event_t);
/* Data structure for tracking recvd sequence numbers */ /* Data structure for tracking recvd sequence numbers */
typedef struct { typedef struct {
opal_object_t super; opal_object_t super;
@ -144,20 +132,17 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t);
#define ORTE_MULTICAST_MESSAGE_EVENT(dt, sz) \ #define ORTE_MULTICAST_MESSAGE_EVENT(dt, sz) \
do { \ do { \
char byte='a'; \ opal_buffer_t *buf; \
orte_mcast_msg_event_t *mev; \
OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output, \ OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output, \
"defining mcast msg event: %s %d", \ "defining mcast msg event: %s %d", \
__FILE__, __LINE__)); \ __FILE__, __LINE__)); \
mev = OBJ_NEW(orte_mcast_msg_event_t); \ buf = OBJ_NEW(opal_buffer_t); \
opal_dss.load(mev->buf, (dt), (sz)); \ opal_dss.load(buf, (dt), (sz)); \
if (orte_rmcast_base.enable_progress_thread) { \ if (orte_progress_threads_enabled) { \
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); \ opal_fd_write(orte_rmcast_base.recv_pipe[1], \
opal_list_append(&orte_rmcast_base.msg_list, &mev->super); \ sizeof(opal_buffer_t*), &buf); \
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); \
opal_fd_write(orte_rmcast_base.recv_pipe[1], 1, &byte); \
} else { \ } else { \
orte_rmcast_base_process_msg(mev); \ orte_rmcast_base_process_msg(buf); \
} \ } \
} while(0); } while(0);
@ -188,7 +173,7 @@ ORTE_DECLSPEC int orte_rmcast_base_queue_xmit(rmcast_base_send_t *snd,
ORTE_DECLSPEC int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread); ORTE_DECLSPEC int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread);
ORTE_DECLSPEC void orte_rmcast_base_stop_threads(void); ORTE_DECLSPEC void orte_rmcast_base_stop_threads(void);
ORTE_DECLSPEC int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg); ORTE_DECLSPEC int orte_rmcast_base_process_msg(opal_buffer_t *msg);
ORTE_DECLSPEC void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel, ORTE_DECLSPEC void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag); orte_rmcast_tag_t tag);

Просмотреть файл

@ -31,9 +31,6 @@ int orte_rmcast_base_close(void)
orte_rmcast.finalize(); orte_rmcast.finalize();
} }
/* cleanup thread stuff */
OBJ_DESTRUCT(&orte_rmcast_base.lock);
OBJ_DESTRUCT(&orte_rmcast_base.cond);
while (NULL != (item = opal_list_remove_first(&orte_rmcast_base.recvs))) { while (NULL != (item = opal_list_remove_first(&orte_rmcast_base.recvs))) {
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }
@ -42,15 +39,15 @@ int orte_rmcast_base_close(void)
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }
OBJ_DESTRUCT(&orte_rmcast_base.channels); OBJ_DESTRUCT(&orte_rmcast_base.channels);
while (NULL != (item = opal_list_remove_first(&orte_rmcast_base.msg_list))) {
OBJ_RELEASE(item); /* cleanup thread stuff */
} OBJ_DESTRUCT(&orte_rmcast_base.main_ctl);
OBJ_DESTRUCT(&orte_rmcast_base.msg_list);
OBJ_DESTRUCT(&orte_rmcast_base.recv_thread); OBJ_DESTRUCT(&orte_rmcast_base.recv_thread);
OBJ_DESTRUCT(&orte_rmcast_base.recv_ctl); OBJ_DESTRUCT(&orte_rmcast_base.recv_ctl);
OBJ_DESTRUCT(&orte_rmcast_base.recv_process); OBJ_DESTRUCT(&orte_rmcast_base.recv_process);
OBJ_DESTRUCT(&orte_rmcast_base.recv_process_ctl); OBJ_DESTRUCT(&orte_rmcast_base.recv_process_ctl);
if (orte_rmcast_base.enable_progress_thread) {
if (orte_progress_threads_enabled) {
opal_event_base_finalize(orte_rmcast_base.event_base); opal_event_base_finalize(orte_rmcast_base.event_base);
} }

Просмотреть файл

@ -66,6 +66,7 @@ int orte_rmcast_base_queue_xmit(rmcast_base_send_t *snd,
/* find the channel */ /* find the channel */
ch = NULL; ch = NULL;
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.main_ctl);
for (item = opal_list_get_first(&orte_rmcast_base.channels); for (item = opal_list_get_first(&orte_rmcast_base.channels);
item != opal_list_get_end(&orte_rmcast_base.channels); item != opal_list_get_end(&orte_rmcast_base.channels);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
@ -75,6 +76,7 @@ int orte_rmcast_base_queue_xmit(rmcast_base_send_t *snd,
break; break;
} }
} }
ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
if (NULL == ch) { if (NULL == ch) {
/* didn't find it */ /* didn't find it */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
@ -175,7 +177,7 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
if (!blocking) { if (!blocking) {
/* do we already have a recv for this channel/tag? */ /* do we already have a recv for this channel/tag? */
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); ORTE_ACQUIRE_THREAD(&orte_rmcast_base.main_ctl);
for (item = opal_list_get_first(&orte_rmcast_base.recvs); for (item = opal_list_get_first(&orte_rmcast_base.recvs);
item != opal_list_get_end(&orte_rmcast_base.recvs); item != opal_list_get_end(&orte_rmcast_base.recvs);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
@ -194,7 +196,7 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base: matching recv already active on multicast channel %d tag %d", "%s rmcast:base: matching recv already active on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
return ORTE_EXISTS; return ORTE_EXISTS;
} }
rptr->cbfunc_iovec = cbfunc_iovec; rptr->cbfunc_iovec = cbfunc_iovec;
@ -205,7 +207,7 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base: matching recv already active on multicast channel %d tag %d", "%s rmcast:base: matching recv already active on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
return ORTE_EXISTS; return ORTE_EXISTS;
} }
rptr->cbfunc_buffer = cbfunc_buffer; rptr->cbfunc_buffer = cbfunc_buffer;
@ -213,10 +215,10 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
if (NULL != recvptr) { if (NULL != recvptr) {
*recvptr = rptr; *recvptr = rptr;
} }
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
} }
/* if we get here, then we need to add a new recv */ /* if we get here, then we need to add a new recv */
@ -238,13 +240,13 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
/* wildcard tag recvs get pushed to the end of the list so /* wildcard tag recvs get pushed to the end of the list so
* that specific tag recvs take precedence * that specific tag recvs take precedence
*/ */
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); ORTE_ACQUIRE_THREAD(&orte_rmcast_base.main_ctl);
if (ORTE_RMCAST_TAG_WILDCARD == tag) { if (ORTE_RMCAST_TAG_WILDCARD == tag) {
opal_list_append(&orte_rmcast_base.recvs, &rptr->item); opal_list_append(&orte_rmcast_base.recvs, &rptr->item);
} else { } else {
opal_list_prepend(&orte_rmcast_base.recvs, &rptr->item); opal_list_prepend(&orte_rmcast_base.recvs, &rptr->item);
} }
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -265,6 +267,7 @@ void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel,
} }
/* find all recv's for this channel and tag */ /* find all recv's for this channel and tag */
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.main_ctl);
item = opal_list_get_first(&orte_rmcast_base.recvs); item = opal_list_get_first(&orte_rmcast_base.recvs);
while (item != opal_list_get_end(&orte_rmcast_base.recvs)) { while (item != opal_list_get_end(&orte_rmcast_base.recvs)) {
next = opal_list_get_next(item); next = opal_list_get_next(item);
@ -272,13 +275,12 @@ void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel,
ptr = (rmcast_base_recv_t*)item; ptr = (rmcast_base_recv_t*)item;
if (ch == ptr->channel && if (ch == ptr->channel &&
tag == ptr->tag) { tag == ptr->tag) {
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item); opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
OBJ_RELEASE(ptr); OBJ_RELEASE(ptr);
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
} }
item = next; item = next;
} }
ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
} }
int orte_rmcast_base_close_channel(orte_rmcast_channel_t channel) int orte_rmcast_base_close_channel(orte_rmcast_channel_t channel)
@ -286,7 +288,7 @@ int orte_rmcast_base_close_channel(orte_rmcast_channel_t channel)
opal_list_item_t *item; opal_list_item_t *item;
rmcast_base_channel_t *chan; rmcast_base_channel_t *chan;
OPAL_THREAD_LOCK(&orte_rmcast_base.lock); ORTE_ACQUIRE_THREAD(&orte_rmcast_base.main_ctl);
for (item = opal_list_get_first(&orte_rmcast_base.channels); for (item = opal_list_get_first(&orte_rmcast_base.channels);
item != opal_list_get_end(&orte_rmcast_base.channels); item != opal_list_get_end(&orte_rmcast_base.channels);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
@ -295,12 +297,12 @@ int orte_rmcast_base_close_channel(orte_rmcast_channel_t channel)
if (channel == chan->channel) { if (channel == chan->channel) {
opal_list_remove_item(&orte_rmcast_base.channels, item); opal_list_remove_item(&orte_rmcast_base.channels, item);
OBJ_RELEASE(chan); OBJ_RELEASE(chan);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock); ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
} }
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock); ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
return ORTE_ERR_NOT_FOUND; return ORTE_ERR_NOT_FOUND;
} }

Просмотреть файл

@ -107,12 +107,9 @@ int orte_rmcast_base_open(void)
orte_rmcast_base.opened = true; orte_rmcast_base.opened = true;
/* ensure all global values are initialized */ /* ensure all global values are initialized */
OBJ_CONSTRUCT(&orte_rmcast_base.lock, opal_mutex_t); OBJ_CONSTRUCT(&orte_rmcast_base.main_ctl, orte_thread_ctl_t);
OBJ_CONSTRUCT(&orte_rmcast_base.cond, opal_condition_t);
orte_rmcast_base.active = false;
OBJ_CONSTRUCT(&orte_rmcast_base.recvs, opal_list_t); OBJ_CONSTRUCT(&orte_rmcast_base.recvs, opal_list_t);
OBJ_CONSTRUCT(&orte_rmcast_base.channels, opal_list_t); OBJ_CONSTRUCT(&orte_rmcast_base.channels, opal_list_t);
OBJ_CONSTRUCT(&orte_rmcast_base.msg_list, opal_list_t);
OBJ_CONSTRUCT(&orte_rmcast_base.recv_thread, opal_thread_t); OBJ_CONSTRUCT(&orte_rmcast_base.recv_thread, opal_thread_t);
OBJ_CONSTRUCT(&orte_rmcast_base.recv_ctl, orte_thread_ctl_t); OBJ_CONSTRUCT(&orte_rmcast_base.recv_ctl, orte_thread_ctl_t);
@ -129,13 +126,8 @@ int orte_rmcast_base_open(void)
orte_rmcast_base.my_output_channel = NULL; orte_rmcast_base.my_output_channel = NULL;
orte_rmcast_base.my_input_channel = NULL; orte_rmcast_base.my_input_channel = NULL;
/* whether or not to use progress thread */ /* setup the local event base */
mca_base_param_reg_int_name("rmcast", "enable_progress_thread", if (orte_progress_threads_enabled) {
"Whether or not to enable progress thread (default: true)",
false, false, (int)true, &value);
orte_rmcast_base.enable_progress_thread = OPAL_INT_TO_BOOL(value);
if (orte_rmcast_base.enable_progress_thread) {
orte_rmcast_base.event_base = opal_event_base_create(); orte_rmcast_base.event_base = opal_event_base_create();
} else { } else {
orte_rmcast_base.event_base = opal_event_base; orte_rmcast_base.event_base = opal_event_base;
@ -317,21 +309,6 @@ int orte_rmcast_base_open(void)
} }
/**** CLASS INSTANCES ****/ /**** CLASS INSTANCES ****/
static void mcast_event_constructor(orte_mcast_msg_event_t *ev)
{
ev->buf = OBJ_NEW(opal_buffer_t);
}
static void mcast_event_destructor(orte_mcast_msg_event_t *ev)
{
if (NULL != ev->buf) {
OBJ_RELEASE(ev->buf);
}
}
OBJ_CLASS_INSTANCE(orte_mcast_msg_event_t,
opal_list_item_t,
mcast_event_constructor,
mcast_event_destructor);
static void send_construct(rmcast_base_send_t *ptr) static void send_construct(rmcast_base_send_t *ptr)
{ {
ptr->retransmit = false; ptr->retransmit = false;
@ -342,18 +319,23 @@ static void send_construct(rmcast_base_send_t *ptr)
ptr->cbfunc_iovec = NULL; ptr->cbfunc_iovec = NULL;
ptr->cbfunc_buffer = NULL; ptr->cbfunc_buffer = NULL;
ptr->cbdata = NULL; ptr->cbdata = NULL;
OBJ_CONSTRUCT(&ptr->ctl, orte_thread_ctl_t);
}
static void send_destruct(rmcast_base_send_t *ptr)
{
OBJ_DESTRUCT(&ptr->ctl);
} }
OBJ_CLASS_INSTANCE(rmcast_base_send_t, OBJ_CLASS_INSTANCE(rmcast_base_send_t,
opal_list_item_t, opal_list_item_t,
send_construct, send_construct,
NULL); send_destruct);
static void recv_construct(rmcast_base_recv_t *ptr) static void recv_construct(rmcast_base_recv_t *ptr)
{ {
ptr->name.jobid = ORTE_JOBID_INVALID; ptr->name.jobid = ORTE_JOBID_INVALID;
ptr->name.vpid = ORTE_VPID_INVALID; ptr->name.vpid = ORTE_VPID_INVALID;
ptr->channel = ORTE_RMCAST_INVALID_CHANNEL; ptr->channel = ORTE_RMCAST_INVALID_CHANNEL;
ptr->recvd = false; OBJ_CONSTRUCT(&ptr->ctl, orte_thread_ctl_t);
ptr->seq_num = ORTE_RMCAST_SEQ_INVALID; ptr->seq_num = ORTE_RMCAST_SEQ_INVALID;
ptr->tag = ORTE_RMCAST_TAG_INVALID; ptr->tag = ORTE_RMCAST_TAG_INVALID;
ptr->flags = ORTE_RMCAST_NON_PERSISTENT; /* default */ ptr->flags = ORTE_RMCAST_NON_PERSISTENT; /* default */
@ -364,10 +346,27 @@ static void recv_construct(rmcast_base_recv_t *ptr)
ptr->cbfunc_iovec = NULL; ptr->cbfunc_iovec = NULL;
ptr->cbdata = NULL; ptr->cbdata = NULL;
} }
static void recv_destruct(rmcast_base_recv_t *ptr)
{
int i;
OBJ_DESTRUCT(&ptr->ctl);
if (NULL != ptr->iovec_array) {
for (i=0; i < ptr->iovec_count; i++) {
if (NULL != ptr->iovec_array[i].iov_base) {
free(ptr->iovec_array[i].iov_base);
}
}
free(ptr->iovec_array);
}
if (NULL != ptr->buf) {
OBJ_RELEASE(ptr->buf);
}
}
OBJ_CLASS_INSTANCE(rmcast_base_recv_t, OBJ_CLASS_INSTANCE(rmcast_base_recv_t,
opal_list_item_t, opal_list_item_t,
recv_construct, recv_construct,
NULL); recv_destruct);
static void channel_construct(rmcast_base_channel_t *ptr) static void channel_construct(rmcast_base_channel_t *ptr)
{ {

Просмотреть файл

@ -36,7 +36,7 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
{ {
int rc; int rc;
if (!orte_rmcast_base.enable_progress_thread) { if (!orte_progress_threads_enabled) {
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -44,8 +44,6 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: starting recv thread", "%s rmcast:base: starting recv thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* set the wakeup pipe to target the rmcast event base */
orte_rmcast_base.recv_ctl.wakeup_pipe = orte_rmcast_base.event_base->wakeup_pipe[1];
/* start the thread */ /* start the thread */
orte_rmcast_base.recv_thread.t_run = rcv_progress_thread; orte_rmcast_base.recv_thread.t_run = rcv_progress_thread;
if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_thread))) { if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_thread))) {
@ -62,8 +60,10 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: starting recv processing thread", "%s rmcast:base: starting recv processing thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* set the wakeup pipe to target the rmcast event base */ /* set the update to target the rmcast event base since we are
orte_rmcast_base.recv_process_ctl.wakeup_pipe = orte_rmcast_base.event_base->wakeup_pipe[1]; * processing messages that arrive from that source
*/
orte_rmcast_base.recv_process_ctl.evbase = orte_rmcast_base.event_base;
/* setup a pipe that we will use to signal the thread that a message /* setup a pipe that we will use to signal the thread that a message
* is waiting to be processed - don't define an event for it * is waiting to be processed - don't define an event for it
*/ */
@ -72,7 +72,7 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
/* start the thread - we will send it a specific character when /* start the thread - we will send it a NULL msg pointer when
* we want it to stop * we want it to stop
*/ */
orte_rmcast_base.recv_process.t_run = rcv_processing_thread; orte_rmcast_base.recv_process.t_run = rcv_processing_thread;
@ -92,9 +92,9 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
void orte_rmcast_base_stop_threads(void) void orte_rmcast_base_stop_threads(void)
{ {
char byte='s'; opal_buffer_t *msg=NULL;
if (!orte_rmcast_base.enable_progress_thread) { if (!orte_progress_threads_enabled) {
return; return;
} }
@ -112,7 +112,7 @@ void orte_rmcast_base_stop_threads(void)
"%s rmcast:base: stopping recv processing thread", "%s rmcast:base: stopping recv processing thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (orte_rmcast_base.recv_process_ctl.running) { if (orte_rmcast_base.recv_process_ctl.running) {
opal_fd_write(orte_rmcast_base.recv_pipe[1], 1, &byte); opal_fd_write(orte_rmcast_base.recv_pipe[1], sizeof(opal_buffer_t*), &msg);
opal_thread_join(&orte_rmcast_base.recv_process, NULL); opal_thread_join(&orte_rmcast_base.recv_process, NULL);
} }
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
@ -120,10 +120,10 @@ void orte_rmcast_base_stop_threads(void)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
} }
int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg) int orte_rmcast_base_process_msg(opal_buffer_t *msg)
{ {
orte_rmcast_channel_t channel; orte_rmcast_channel_t channel;
rmcast_base_recv_t *ptr; rmcast_base_recv_t *ptr, *recv=NULL;
orte_process_name_t name; orte_process_name_t name;
orte_rmcast_tag_t tag; orte_rmcast_tag_t tag;
int8_t flag; int8_t flag;
@ -134,7 +134,7 @@ int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg)
opal_list_item_t *item; opal_list_item_t *item;
/* extract the header */ /* extract the header */
if (ORTE_SUCCESS != (rc = extract_hdr(msg->buf, &name, &channel, &tag, &recvd_seq_num))) { if (ORTE_SUCCESS != (rc = extract_hdr(msg, &name, &channel, &tag, &recvd_seq_num))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
@ -169,7 +169,7 @@ int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg)
/* unpack the iovec vs buf flag */ /* unpack the iovec vs buf flag */
n=1; n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &flag, &n, OPAL_INT8))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg, &flag, &n, OPAL_INT8))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
@ -182,6 +182,7 @@ int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg)
/* find the recv for this channel, tag, and type */ /* find the recv for this channel, tag, and type */
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.main_ctl);
for (item = opal_list_get_first(&orte_rmcast_base.recvs); for (item = opal_list_get_first(&orte_rmcast_base.recvs);
item != opal_list_get_end(&orte_rmcast_base.recvs); item != opal_list_get_end(&orte_rmcast_base.recvs);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
@ -200,16 +201,32 @@ int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg)
continue; continue;
} }
ptr->seq_num = recvd_seq_num;
recv = ptr;
break;
}
if (!(ORTE_RMCAST_PERSISTENT & recv->flags)) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv removing non-persistent recv",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
opal_list_remove_item(&orte_rmcast_base.recvs, &recv->item);
}
ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
if (NULL == recv) {
/* recv not found - dump msg */
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv delivering message to channel %d tag %d", "%s rmcast:base:process_recv delivering message to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv->channel, (int)tag));
ptr->seq_num = recvd_seq_num; /* we have a matching recv - unpack the data */
/* we have a recv - unpack the data */
if (0 == flag) { if (0 == flag) {
/* get the number of iovecs in the buffer */ /* get the number of iovecs in the buffer */
n=1; n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &iovec_count, &n, OPAL_INT32))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg, &iovec_count, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
@ -219,7 +236,7 @@ int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg)
for (i=0; i < iovec_count; i++) { for (i=0; i < iovec_count; i++) {
/* unpack the number of bytes in this iovec */ /* unpack the number of bytes in this iovec */
n=1; n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &isz, &n, OPAL_INT32))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg, &isz, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
@ -229,29 +246,21 @@ int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg)
/* allocate the space */ /* allocate the space */
iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(isz); iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(isz);
/* unpack the data */ /* unpack the data */
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, iovec_array[i].iov_base, &isz, OPAL_UINT8))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg, iovec_array[i].iov_base, &isz, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
} }
} }
if (NULL != ptr->cbfunc_iovec) { if (NULL != recv->cbfunc_iovec) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv delivering iovecs to channel %d tag %d", "%s rmcast:base:process_recv delivering iovecs to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv->channel, (int)tag));
ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, ptr->seq_num, tag, recv->cbfunc_iovec(ORTE_SUCCESS, recv->channel, recv->seq_num, tag,
&name, iovec_array, iovec_count, ptr->cbdata); &name, iovec_array, iovec_count, recv->cbdata);
/* if it isn't persistent, remove it */
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv removing non-persistent recv",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
OBJ_RELEASE(ptr);
}
} else { } else {
/* if something is already present, then we have a problem */ /* if something is already present, then we have a problem */
if (NULL != ptr->iovec_array) { if (NULL != recv->iovec_array) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv blocking recv already fulfilled", "%s rmcast:base:process_recv blocking recv already fulfilled",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -260,34 +269,27 @@ int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg)
/* copy over the iovec array since it will be released by /* copy over the iovec array since it will be released by
* the blocking recv * the blocking recv
*/ */
ptr->iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec)); recv->iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
ptr->iovec_count = iovec_count; recv->iovec_count = iovec_count;
for (i=0; i < iovec_count; i++) { for (i=0; i < iovec_count; i++) {
ptr->iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(iovec_array[i].iov_len); recv->iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(iovec_array[i].iov_len);
ptr->iovec_array[i].iov_len = iovec_array[i].iov_len; recv->iovec_array[i].iov_len = iovec_array[i].iov_len;
memcpy(ptr->iovec_array[i].iov_base, iovec_array[i].iov_base, iovec_array[i].iov_len); memcpy(recv->iovec_array[i].iov_base, iovec_array[i].iov_base, iovec_array[i].iov_len);
} }
/* flag it as recvd to release blocking recv */ /* release blocking recv */
ptr->recvd = true; ORTE_WAKEUP_THREAD(&recv->ctl);
return ORTE_SUCCESS;
} }
} else { } else {
if (NULL != ptr->cbfunc_buffer) { if (NULL != recv->cbfunc_buffer) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv delivering buffer to channel %d tag %d", "%s rmcast:base:process_recv delivering buffer to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv->channel, (int)tag));
ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, ptr->seq_num, tag, recv->cbfunc_buffer(ORTE_SUCCESS, recv->channel, recv->seq_num, tag,
&name, msg->buf, ptr->cbdata); &name, msg, recv->cbdata);
/* if it isn't persistent, remove it */
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv removing non-persistent recv",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
OBJ_RELEASE(ptr);
}
} else { } else {
/* if something is already present, then we have a problem */ /* if something is already present, then we have a problem */
if (NULL != ptr->buf) { if (NULL != recv->buf) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv blocking recv already fulfilled", "%s rmcast:base:process_recv blocking recv already fulfilled",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -299,18 +301,16 @@ int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg)
/* copy the buffer across since it will be released /* copy the buffer across since it will be released
* by the blocking recv * by the blocking recv
*/ */
ptr->buf = OBJ_NEW(opal_buffer_t); recv->buf = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(ptr->buf, msg->buf))) { if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(recv->buf, msg))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
/* flag it as recvd to release blocking recv */ /* release blocking recv */
ptr->recvd = true; ORTE_WAKEUP_THREAD(&recv->ctl);
return ORTE_SUCCESS;
} }
} }
/* we are done - only one recv can match */
break;
}
cleanup: cleanup:
if (NULL != iovec_array) { if (NULL != iovec_array) {
@ -324,6 +324,9 @@ int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg)
if (NULL != msg) { if (NULL != msg) {
OBJ_RELEASE(msg); OBJ_RELEASE(msg);
} }
if (NULL != recv && !(ORTE_RMCAST_PERSISTENT & recv->flags)) {
OBJ_RELEASE(recv);
}
return rc; return rc;
} }
@ -331,8 +334,7 @@ int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg)
static void* rcv_processing_thread(opal_object_t *obj) static void* rcv_processing_thread(opal_object_t *obj)
{ {
orte_mcast_msg_event_t *msg; opal_buffer_t *msg;
char byte;
int rc; int rc;
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
@ -341,28 +343,18 @@ static void* rcv_processing_thread(opal_object_t *obj)
while (1) { while (1) {
/* block here until a trigger arrives */ /* block here until a trigger arrives */
if (0 > (rc = opal_fd_read(orte_rmcast_base.recv_pipe[0], 1, &byte))) { if (0 > (rc = opal_fd_read(orte_rmcast_base.recv_pipe[0],
sizeof(opal_buffer_t*), &msg))) {
/* if something bad happened, punt */ /* if something bad happened, punt */
opal_output(0, "%s PUNTING THREAD", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_output(0, "%s PUNTING THREAD", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return OPAL_THREAD_CANCELLED; return OPAL_THREAD_CANCELLED;
} }
/* check to see if we were told to stop */ /* check to see if we were told to stop */
if ('s' == byte) { if (NULL == msg) {
return OPAL_THREAD_CANCELLED; return OPAL_THREAD_CANCELLED;
} }
/* get a message off the list */ /* process it - processing function releases the msg */
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
if (NULL == (msg = (orte_mcast_msg_event_t*)opal_list_remove_first(&orte_rmcast_base.msg_list))) {
/* nothing was there - error */
opal_output(0, "%s ERROR PROCESSING MULTICAST MESSAGES",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
continue;
}
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
/* process it - processing function release the msg */
if (ORTE_SUCCESS != (rc = orte_rmcast_base_process_msg(msg))) { if (ORTE_SUCCESS != (rc = orte_rmcast_base_process_msg(msg))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
@ -371,8 +363,6 @@ static void* rcv_processing_thread(opal_object_t *obj)
static void* rcv_progress_thread(opal_object_t *obj) static void* rcv_progress_thread(opal_object_t *obj)
{ {
int events=0;
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: recv thread operational", "%s rmcast:base: recv thread operational",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -384,7 +374,8 @@ static void* rcv_progress_thread(opal_object_t *obj)
return OPAL_THREAD_CANCELLED; return OPAL_THREAD_CANCELLED;
} }
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl); ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
events += opal_event_loop(orte_rmcast_base.event_base, OPAL_EVLOOP_ONCE); /* block in the event lib */
opal_event_loop(orte_rmcast_base.event_base, OPAL_EVLOOP_ONCE);
} }
} }

Просмотреть файл

@ -288,8 +288,6 @@ static void finalize(void)
} }
/* internal blocking send support */ /* internal blocking send support */
static bool send_complete, send_buf_complete;
static void internal_snd_cb(int status, static void internal_snd_cb(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_seq_t seq_num, orte_rmcast_seq_t seq_num,
@ -297,7 +295,9 @@ static void internal_snd_cb(int status,
orte_process_name_t *sender, orte_process_name_t *sender,
struct iovec *msg, int count, void *cbdata) struct iovec *msg, int count, void *cbdata)
{ {
send_complete = true; rmcast_base_send_t *snd = (rmcast_base_send_t*)cbdata;
ORTE_WAKEUP_THREAD(&snd->ctl);
} }
static void internal_snd_buf_cb(int status, static void internal_snd_buf_cb(int status,
@ -307,7 +307,9 @@ static void internal_snd_buf_cb(int status,
orte_process_name_t *sender, orte_process_name_t *sender,
opal_buffer_t *buf, void *cbdata) opal_buffer_t *buf, void *cbdata)
{ {
send_buf_complete = true; rmcast_base_send_t *snd = (rmcast_base_send_t*)cbdata;
ORTE_WAKEUP_THREAD(&snd->ctl);
} }
static int send_data(rmcast_base_send_t *snd, static int send_data(rmcast_base_send_t *snd,
@ -334,8 +336,6 @@ static int send_data(rmcast_base_send_t *snd,
return rc; return rc;
} }
OPAL_THREAD_LOCK(&ch->send_lock);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp multicasting %d bytes to channel %d tag %d", "%s rmcast:tcp multicasting %d bytes to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buf->bytes_used, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buf->bytes_used,
@ -442,8 +442,6 @@ static int send_data(rmcast_base_send_t *snd,
cleanup: cleanup:
OBJ_RELEASE(buf); OBJ_RELEASE(buf);
OPAL_THREAD_UNLOCK(&ch->send_lock);
return rc; return rc;
} }
@ -460,7 +458,7 @@ static int tcp_send(orte_rmcast_channel_t channel,
snd.iovec_count = count; snd.iovec_count = count;
snd.tag = tag; snd.tag = tag;
snd.cbfunc_iovec = internal_snd_cb; snd.cbfunc_iovec = internal_snd_cb;
send_complete = false; snd.cbdata = &snd;
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) { if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
@ -469,7 +467,7 @@ static int tcp_send(orte_rmcast_channel_t channel,
} }
/* now wait for the send to complete */ /* now wait for the send to complete */
ORTE_PROGRESSED_WAIT(send_complete, 0, 1); ORTE_ACQUIRE_THREAD(&snd.ctl);
OBJ_DESTRUCT(&snd); OBJ_DESTRUCT(&snd);
return ORTE_SUCCESS; return ORTE_SUCCESS;
@ -514,7 +512,7 @@ static int tcp_send_buffer(orte_rmcast_channel_t channel,
snd.buf = buf; snd.buf = buf;
snd.tag = tag; snd.tag = tag;
snd.cbfunc_buffer = internal_snd_buf_cb; snd.cbfunc_buffer = internal_snd_buf_cb;
send_buf_complete = false; snd.cbdata = &snd;
if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) { if (ORTE_SUCCESS != (ret = send_data(&snd, channel))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
@ -523,7 +521,7 @@ static int tcp_send_buffer(orte_rmcast_channel_t channel,
} }
/* now wait for the send to complete */ /* now wait for the send to complete */
ORTE_PROGRESSED_WAIT(send_buf_complete, 0, 1); ORTE_ACQUIRE_THREAD(&snd.ctl);
OBJ_DESTRUCT(&snd); OBJ_DESTRUCT(&snd);
return ORTE_SUCCESS; return ORTE_SUCCESS;
@ -580,7 +578,7 @@ static int tcp_recv(orte_process_name_t *name,
return ret; return ret;
} }
ORTE_PROGRESSED_WAIT(recvptr->recvd, 0, 1); ORTE_ACQUIRE_THREAD(&recvptr->ctl);
/* xfer the data */ /* xfer the data */
if (NULL != name) { if (NULL != name) {
@ -592,10 +590,9 @@ static int tcp_recv(orte_process_name_t *name,
*msg = recvptr->iovec_array; *msg = recvptr->iovec_array;
*count = recvptr->iovec_count; *count = recvptr->iovec_count;
/* remove the recv */ /* release the recv */
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); recvptr->iovec_array = NULL;
opal_list_remove_item(&orte_rmcast_base.recvs, &recvptr->item); recvptr->iovec_count = 0;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
OBJ_RELEASE(recvptr); OBJ_RELEASE(recvptr);
return ORTE_SUCCESS; return ORTE_SUCCESS;
@ -662,7 +659,7 @@ static int tcp_recv_buffer(orte_process_name_t *name,
return ret; return ret;
} }
ORTE_PROGRESSED_WAIT(recvptr->recvd, 0, 1); ORTE_ACQUIRE_THREAD(&recvptr->ctl);
/* xfer the data */ /* xfer the data */
if (NULL != name) { if (NULL != name) {
@ -674,12 +671,7 @@ static int tcp_recv_buffer(orte_process_name_t *name,
if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) { if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
} }
/* release the data */ /* release the recv */
OBJ_RELEASE(recvptr->buf);
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
opal_list_remove_item(&orte_rmcast_base.recvs, &recvptr->item);
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
OBJ_RELEASE(recvptr); OBJ_RELEASE(recvptr);
return ret; return ret;
@ -733,6 +725,7 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, name)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, name));
/* see if this name has already been assigned a channel on the specified network */ /* see if this name has already been assigned a channel on the specified network */
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.main_ctl);
for (item = opal_list_get_first(&orte_rmcast_base.channels); for (item = opal_list_get_first(&orte_rmcast_base.channels);
item != opal_list_get_end(&orte_rmcast_base.channels); item != opal_list_get_end(&orte_rmcast_base.channels);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
@ -752,6 +745,7 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp using existing channel", "%s rmcast:tcp using existing channel",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
} }
@ -766,9 +760,8 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
chan->name = strdup(name); chan->name = strdup(name);
chan->channel = channel; chan->channel = channel;
/* add to list of known channels */ /* add to list of known channels */
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_append(&orte_rmcast_base.channels, &chan->item); opal_list_append(&orte_rmcast_base.channels, &chan->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock); ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp opening new channel for%s%s", "%s rmcast:tcp opening new channel for%s%s",
@ -836,6 +829,7 @@ static void relay(int fd, short event, void *cbdata)
/* race condition */ /* race condition */
continue; continue;
} }
opal_output(0, "SENDING RELAY TO %s", ORTE_NAME_PRINT(&proc->name));
if (0 > (rc = orte_rml.send_buffer(&proc->name, msg->buffer, ORTE_RML_TAG_MULTICAST_RELAY, 0))) { if (0 > (rc = orte_rml.send_buffer(&proc->name, msg->buffer, ORTE_RML_TAG_MULTICAST_RELAY, 0))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }

Просмотреть файл

@ -268,7 +268,8 @@ static void internal_snd_cb(int status,
orte_process_name_t *sender, orte_process_name_t *sender,
struct iovec *msg, int count, void *cbdata) struct iovec *msg, int count, void *cbdata)
{ {
((rmcast_base_send_t *)cbdata)->send_complete = true; rmcast_base_send_t *snd = (rmcast_base_send_t*)cbdata;
ORTE_WAKEUP_THREAD(&snd->ctl);
} }
static void internal_snd_buf_cb(int status, static void internal_snd_buf_cb(int status,
@ -278,7 +279,8 @@ static void internal_snd_buf_cb(int status,
orte_process_name_t *sender, orte_process_name_t *sender,
opal_buffer_t *buf, void *cbdata) opal_buffer_t *buf, void *cbdata)
{ {
((rmcast_base_send_t *)cbdata)->send_complete = true; rmcast_base_send_t *snd = (rmcast_base_send_t*)cbdata;
ORTE_WAKEUP_THREAD(&snd->ctl);
} }
static int udp_send(orte_rmcast_channel_t channel, static int udp_send(orte_rmcast_channel_t channel,
@ -302,7 +304,12 @@ static int udp_send(orte_rmcast_channel_t channel,
} }
/* now wait for the send to complete */ /* now wait for the send to complete */
ORTE_PROGRESSED_WAIT(snd->send_complete, 0, 1); ORTE_ACQUIRE_THREAD(&snd->ctl);
/* carefully release the send */
snd->iovec_array = NULL;
snd->iovec_count = 0;
OBJ_RELEASE(snd);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -353,7 +360,11 @@ static int udp_send_buffer(orte_rmcast_channel_t channel,
} }
/* now wait for the send to complete */ /* now wait for the send to complete */
ORTE_PROGRESSED_WAIT(snd->send_complete, 0, 1); ORTE_ACQUIRE_THREAD(&snd->ctl);
/* carefully release the send */
snd->buf = NULL;
OBJ_RELEASE(snd);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -408,7 +419,7 @@ static int udp_recv(orte_process_name_t *name,
return ret; return ret;
} }
ORTE_PROGRESSED_WAIT(recvptr->recvd, 0, 1); ORTE_ACQUIRE_THREAD(&recvptr->ctl);
/* xfer the data */ /* xfer the data */
if (NULL != name) { if (NULL != name) {
@ -420,10 +431,9 @@ static int udp_recv(orte_process_name_t *name,
*msg = recvptr->iovec_array; *msg = recvptr->iovec_array;
*count = recvptr->iovec_count; *count = recvptr->iovec_count;
/* remove the recv */ /* carefully release the recv */
OPAL_THREAD_LOCK(&orte_rmcast_base.lock); recvptr->iovec_array = NULL;
opal_list_remove_item(&orte_rmcast_base.recvs, &recvptr->item); recvptr->iovec_count = 0;
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
OBJ_RELEASE(recvptr); OBJ_RELEASE(recvptr);
return ORTE_SUCCESS; return ORTE_SUCCESS;
@ -489,7 +499,7 @@ static int udp_recv_buffer(orte_process_name_t *name,
return ret; return ret;
} }
ORTE_PROGRESSED_WAIT(recvptr->recvd, 0, 1); ORTE_ACQUIRE_THREAD(&recvptr->ctl);
/* xfer the data */ /* xfer the data */
if (NULL != name) { if (NULL != name) {
@ -502,11 +512,6 @@ static int udp_recv_buffer(orte_process_name_t *name,
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
} }
/* release the data */ /* release the data */
OBJ_RELEASE(recvptr->buf);
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_remove_item(&orte_rmcast_base.recvs, &recvptr->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
OBJ_RELEASE(recvptr); OBJ_RELEASE(recvptr);
return ret; return ret;
@ -577,6 +582,7 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name, channel)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name, channel));
chan = NULL; chan = NULL;
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.main_ctl);
for (item = opal_list_get_first(&orte_rmcast_base.channels); for (item = opal_list_get_first(&orte_rmcast_base.channels);
item != opal_list_get_end(&orte_rmcast_base.channels); item != opal_list_get_end(&orte_rmcast_base.channels);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
@ -611,9 +617,9 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) { if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc;
} }
return ORTE_SUCCESS; ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
return rc;
} }
/* we didn't find an existing match, so create a new channel */ /* we didn't find an existing match, so create a new channel */
@ -642,9 +648,8 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
} else { } else {
chan->port = port; chan->port = port;
} }
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_append(&orte_rmcast_base.channels, &chan->item); opal_list_append(&orte_rmcast_base.channels, &chan->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock); ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:udp opening new channel %s:%d network %03d.%03d.%03d.%03d port %d for%s%s", "%s rmcast:udp opening new channel %s:%d network %03d.%03d.%03d.%03d port %d for%s%s",
@ -743,7 +748,7 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction)
opal_event_set(orte_rmcast_base.event_base, &chan->recv_ev, opal_event_set(orte_rmcast_base.event_base, &chan->recv_ev,
chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan); chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan);
opal_event_add(&chan->recv_ev, 0); OPAL_UPDATE_EVBASE(orte_rmcast_base.event_base, &chan->recv_ev, OPAL_EVENT_ADD);
} }
return ORTE_SUCCESS; return ORTE_SUCCESS;

Просмотреть файл

@ -21,8 +21,8 @@ static void constructor(orte_thread_ctl_t *ptr)
ptr->running = false; ptr->running = false;
ptr->stop = false; ptr->stop = false;
ptr->name = NULL; ptr->name = NULL;
/* default to waking up the global base */ /* default to updating the global base */
ptr->wakeup_pipe = opal_event_base->wakeup_pipe[1]; ptr->evbase = opal_event_base;
} }
static void destructor(orte_thread_ctl_t *ptr) static void destructor(orte_thread_ctl_t *ptr)
{ {

Просмотреть файл

@ -41,7 +41,7 @@ typedef struct {
volatile bool active; volatile bool active;
volatile bool running; volatile bool running;
volatile bool stop; volatile bool stop;
int wakeup_pipe; opal_event_base_t *evbase;
char *name; char *name;
} orte_thread_ctl_t; } orte_thread_ctl_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_thread_ctl_t); ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_thread_ctl_t);
@ -81,7 +81,6 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_thread_ctl_t);
#if OPAL_ENABLE_DEBUG #if OPAL_ENABLE_DEBUG
#define ORTE_RELEASE_THREAD(ctl) \ #define ORTE_RELEASE_THREAD(ctl) \
do { \ do { \
char byte='a'; \
if (opal_debug_threads) { \ if (opal_debug_threads) { \
opal_output(0, "Releasing thread %s at %s:%d", \ opal_output(0, "Releasing thread %s at %s:%d", \
(NULL == (ctl)->name) ? "NULL" : (ctl)->name, \ (NULL == (ctl)->name) ? "NULL" : (ctl)->name, \
@ -89,7 +88,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_thread_ctl_t);
} \ } \
(ctl)->active = false; \ (ctl)->active = false; \
ORTE_CONDITION_BROADCAST(&(ctl)->cond); \ ORTE_CONDITION_BROADCAST(&(ctl)->cond); \
opal_fd_write((ctl)->wakeup_pipe, 1, &byte); \ OPAL_UPDATE_EVBASE((ctl)->evbase, NULL, OPAL_EVENT_NOOP); \
ORTE_THREAD_UNLOCK(&(ctl)->lock); \ ORTE_THREAD_UNLOCK(&(ctl)->lock); \
} while(0); } while(0);
#else #else
@ -98,7 +97,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_thread_ctl_t);
char byte='a'; \ char byte='a'; \
(ctl)->active = false; \ (ctl)->active = false; \
ORTE_CONDITION_BROADCAST(&(ctl)->cond); \ ORTE_CONDITION_BROADCAST(&(ctl)->cond); \
opal_fd_write((ctl)->wakeup_pipe, 1, &byte); \ OPAL_UPDATE_EVBASE((ctl)->evbase, NULL, OPAL_EVENT_NOOP); \
ORTE_THREAD_UNLOCK(&(ctl)->lock); \ ORTE_THREAD_UNLOCK(&(ctl)->lock); \
} while(0); } while(0);
#endif #endif
@ -106,7 +105,6 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_thread_ctl_t);
#if OPAL_ENABLE_DEBUG #if OPAL_ENABLE_DEBUG
#define ORTE_WAKEUP_THREAD(ctl) \ #define ORTE_WAKEUP_THREAD(ctl) \
do { \ do { \
char byte='a'; \
ORTE_THREAD_LOCK(&(ctl)->lock); \ ORTE_THREAD_LOCK(&(ctl)->lock); \
if (opal_debug_threads) { \ if (opal_debug_threads) { \
opal_output(0, "Waking up thread %s at %s:%d", \ opal_output(0, "Waking up thread %s at %s:%d", \
@ -115,17 +113,16 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_thread_ctl_t);
} \ } \
(ctl)->active = false; \ (ctl)->active = false; \
ORTE_CONDITION_BROADCAST(&(ctl)->cond); \ ORTE_CONDITION_BROADCAST(&(ctl)->cond); \
opal_fd_write((ctl)->wakeup_pipe, 1, &byte); \ OPAL_UPDATE_EVBASE((ctl)->evbase, NULL, OPAL_EVENT_NOOP); \
ORTE_THREAD_UNLOCK(&(ctl)->lock); \ ORTE_THREAD_UNLOCK(&(ctl)->lock); \
} while(0); } while(0);
#else #else
#define ORTE_WAKEUP_THREAD(ctl) \ #define ORTE_WAKEUP_THREAD(ctl) \
do { \ do { \
char byte='a'; \
ORTE_THREAD_LOCK(&(ctl)->lock); \ ORTE_THREAD_LOCK(&(ctl)->lock); \
(ctl)->active = false; \ (ctl)->active = false; \
ORTE_CONDITION_BROADCAST(&(ctl)->cond); \ ORTE_CONDITION_BROADCAST(&(ctl)->cond); \
opal_fd_write((ctl)->wakeup_pipe, 1, &byte); \ OPAL_UPDATE_EVBASE((ctl)->evbase, NULL, OPAL_EVENT_NOOP); \
ORTE_THREAD_UNLOCK(&(ctl)->lock); \ ORTE_THREAD_UNLOCK(&(ctl)->lock); \
} while(0); } while(0);
#endif #endif