Minor cleanups, mainly to ensure we correctly block on blocking sends
This commit was SVN r22102.
Этот коммит содержится в:
родитель
2f91a4833b
Коммит
99c67183d2
@ -102,7 +102,6 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_base_recv_t);
|
|||||||
*/
|
*/
|
||||||
typedef struct {
|
typedef struct {
|
||||||
opal_list_item_t item;
|
opal_list_item_t item;
|
||||||
bool send_complete;
|
|
||||||
opal_buffer_t *data;
|
opal_buffer_t *data;
|
||||||
orte_rmcast_tag_t tag;
|
orte_rmcast_tag_t tag;
|
||||||
orte_rmcast_callback_fn_t cbfunc;
|
orte_rmcast_callback_fn_t cbfunc;
|
||||||
|
@ -279,7 +279,6 @@ OBJ_CLASS_INSTANCE(orte_mcast_msg_event_t,
|
|||||||
|
|
||||||
static void send_construct(rmcast_base_send_t *ptr)
|
static void send_construct(rmcast_base_send_t *ptr)
|
||||||
{
|
{
|
||||||
ptr->send_complete = false;
|
|
||||||
ptr->data = NULL;
|
ptr->data = NULL;
|
||||||
ptr->tag = ORTE_RMCAST_TAG_INVALID;
|
ptr->tag = ORTE_RMCAST_TAG_INVALID;
|
||||||
ptr->cbfunc = NULL;
|
ptr->cbfunc = NULL;
|
||||||
@ -329,14 +328,14 @@ static void channel_construct(rmcast_base_channel_t *ptr)
|
|||||||
static void channel_destruct(rmcast_base_channel_t *ptr)
|
static void channel_destruct(rmcast_base_channel_t *ptr)
|
||||||
{
|
{
|
||||||
/* cleanup the recv side */
|
/* cleanup the recv side */
|
||||||
opal_event_del(&ptr->recv_ev);
|
|
||||||
if (0 < ptr->recv) {
|
if (0 < ptr->recv) {
|
||||||
|
opal_event_del(&ptr->recv_ev);
|
||||||
CLOSE_THE_SOCKET(ptr->recv);
|
CLOSE_THE_SOCKET(ptr->recv);
|
||||||
}
|
}
|
||||||
/* attempt to xmit any pending sends */
|
/* attempt to xmit any pending sends */
|
||||||
/* cleanup the xmit side */
|
/* cleanup the xmit side */
|
||||||
opal_event_del(&ptr->send_ev);
|
|
||||||
if (0 < ptr->xmit) {
|
if (0 < ptr->xmit) {
|
||||||
|
opal_event_del(&ptr->send_ev);
|
||||||
CLOSE_THE_SOCKET(ptr->xmit);
|
CLOSE_THE_SOCKET(ptr->xmit);
|
||||||
}
|
}
|
||||||
OBJ_DESTRUCT(&ptr->send_lock);
|
OBJ_DESTRUCT(&ptr->send_lock);
|
||||||
|
@ -176,7 +176,7 @@ static int init(void)
|
|||||||
OPAL_THREAD_LOCK(&lock);
|
OPAL_THREAD_LOCK(&lock);
|
||||||
opal_list_append(&channels, &chan->item);
|
opal_list_append(&channels, &chan->item);
|
||||||
OPAL_THREAD_UNLOCK(&lock);
|
OPAL_THREAD_UNLOCK(&lock);
|
||||||
if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_BIDIR))) {
|
if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_XMIT))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
@ -211,11 +211,11 @@ static void finalize(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* internal blocking send support */
|
/* internal blocking send support */
|
||||||
|
static bool send_complete;
|
||||||
|
|
||||||
static void internal_snd_cb(orte_rmcast_channel_t channel, opal_buffer_t *buf, void *cbdata)
|
static void internal_snd_cb(orte_rmcast_channel_t channel, opal_buffer_t *buf, void *cbdata)
|
||||||
{
|
{
|
||||||
rmcast_base_send_t *snd = (rmcast_base_send_t*)cbdata;
|
send_complete = true;
|
||||||
|
|
||||||
snd->send_complete = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int basic_send(orte_rmcast_channel_t channel,
|
static int basic_send(orte_rmcast_channel_t channel,
|
||||||
@ -278,6 +278,7 @@ process:
|
|||||||
snd->tag = tag;
|
snd->tag = tag;
|
||||||
snd->cbfunc = internal_snd_cb;
|
snd->cbfunc = internal_snd_cb;
|
||||||
snd->cbdata = snd;
|
snd->cbdata = snd;
|
||||||
|
send_complete = false;
|
||||||
|
|
||||||
/* add it to this channel's pending sends */
|
/* add it to this channel's pending sends */
|
||||||
OPAL_THREAD_LOCK(&ch->send_lock);
|
OPAL_THREAD_LOCK(&ch->send_lock);
|
||||||
@ -291,7 +292,7 @@ process:
|
|||||||
OPAL_THREAD_UNLOCK(&ch->send_lock);
|
OPAL_THREAD_UNLOCK(&ch->send_lock);
|
||||||
|
|
||||||
/* now wait for the send to complete */
|
/* now wait for the send to complete */
|
||||||
ORTE_PROGRESSED_WAIT(snd->send_complete, 0, 1);
|
ORTE_PROGRESSED_WAIT(send_complete, 0, 1);
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user