1
1
- Unify the Windows and the others way of handling callbacks. Thanks to George.
- This will let Windows use the same callbacks as Linux does, which works also.

This commit was SVN r19746.

The following SVN revisions from the original message are invalid or
inconsistent and therefore were not cross-referenced:
  r19742
Этот коммит содержится в:
Shiqing Fan 2008-10-15 08:14:24 +00:00
родитель f6a6c0a74b
Коммит 8b60c755c2
2 изменённых файлов: 57 добавлений и 66 удалений

Просмотреть файл

@ -58,10 +58,6 @@
#include "orte/mca/oob/tcp/oob_tcp.h"
#if defined(__WINDOWS__)
static opal_mutex_t windows_callback;
#endif /* defined(__WINDOWS__) */
/*
* Data structure for accepting connections.
*/
@ -151,40 +147,6 @@ mca_oob_t mca_oob_tcp = {
mca_oob_tcp_ft_event
};
#if defined(__WINDOWS__)
static int oob_tcp_windows_progress_callback( void )
{
opal_list_item_t* item;
mca_oob_tcp_msg_t* msg;
int event_count = 0;
/* Only one thread at the time is allowed to execute callbacks */
if( !opal_mutex_trylock(&windows_callback) )
return 0;
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
while(NULL !=
(item = opal_list_remove_first(&mca_oob_tcp_component.tcp_msg_completed))) {
msg = (mca_oob_tcp_msg_t*)item;
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
msg->msg_cbfunc( msg->msg_rc,
&msg->msg_peer,
msg->msg_uiov,
msg->msg_ucnt,
msg->msg_hdr.msg_tag,
msg->msg_cbdata);
event_count++;
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
MCA_OOB_TCP_MSG_RETURN(msg);
}
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
opal_mutex_unlock(&windows_callback);
return event_count;
}
#endif /* defined(__WINDOWS__) */
/*
* Initialize global variables used w/in this module.
*/
@ -382,13 +344,6 @@ int mca_oob_tcp_component_open(void)
mca_oob_tcp_component.tcp_listen_sd = -1;
mca_oob_tcp_component.tcp_match_count = 0;
#if defined(__WINDOWS__)
/* Register the libevent callback which will trigger the OOB
* completion callbacks. */
OBJ_CONSTRUCT(&windows_callback, opal_mutex_t);
opal_progress_register(oob_tcp_windows_progress_callback);
#endif /* defined(__WINDOWS__) */
return ORTE_SUCCESS;
}
@ -401,8 +356,6 @@ int mca_oob_tcp_component_close(void)
opal_list_item_t *item;
#if defined(__WINDOWS__)
opal_progress_unregister(oob_tcp_windows_progress_callback);
OBJ_DESTRUCT( &windows_callback );
WSACleanup();
#endif /* defined(__WINDOWS__) */

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* Copyright (c) 2004-2008 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -187,22 +187,6 @@ int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, orte_process_name_t * peer)
if(NULL != msg->msg_cbfunc) {
OPAL_THREAD_UNLOCK(&msg->msg_lock);
#if defined(__WINDOWS__)
/**
* In order to be able to generate TCP events recursively, Windows need
* to get out of the callback attached to a specific socket. Therefore,
* as our OOB allow to block on a connection from a callback on the same
* connection, we have to trigger the completion callbacks outside of the
* OOB callbacks. We add them to the completed list here, and the progress
* engine will call our progress function later once all socket related
* events have been processed.
*/
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
opal_list_append(&mca_oob_tcp_component.tcp_msg_completed, (opal_list_item_t*)msg);
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return ORTE_SUCCESS;
#else
/* post to a global list of completed messages */
if ((msg->msg_flags & ORTE_RML_FLAG_RECURSIVE_CALLBACK) == 0) {
int size;
@ -243,7 +227,6 @@ int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, orte_process_name_t * peer)
} else {
MCA_OOB_TCP_MSG_RETURN(msg);
}
#endif /* defined(__WINDOWS__) */
} else {
opal_condition_broadcast(&msg->msg_condition);
OPAL_THREAD_UNLOCK(&msg->msg_lock);
@ -391,6 +374,7 @@ static bool mca_oob_tcp_msg_recv(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* pee
}
return false;
} else if (rc == 0) {
/* Under Windows have the PROTOCOL explicitly close the socket?? */
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT_FAIL) {
opal_output(0, "%s-%s mca_oob_tcp_msg_recv: peer closed connection",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -479,12 +463,41 @@ static void mca_oob_tcp_msg_ping(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* pee
}
#ifdef __WINDOWS__
/*
* XXX First try to fix orte issues, that readv is polled early and msg
*/
struct envelope_t {
mca_oob_tcp_peer_t* peer;
mca_oob_tcp_msg_t* post;
};
static void peer_event_callback_wrapper (int fd, short flags, void * envelope)
{
mca_oob_tcp_peer_t* peer = ((envelope_t *) envelope)->peer;
mca_oob_tcp_msg_t* post = ((envelope_t *) envelope)->post;
/* XXX We should have a ref_count per post, maybe on peer
* and have the event released
* If the event is not Persistent (like this one),
* it should freed again
*/
post->msg_cbfunc(
post->msg_rc,
&peer->peer_name,
post->msg_uiov,
post->msg_ucnt,
post->msg_hdr.msg_tag,
post->msg_cbdata);
}
#endif
/*
* Progress a completed recv:
* (1) signal a posted recv as complete
* (2) queue an unexpected message in the recv list
*/
static void mca_oob_tcp_msg_data(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* peer)
{
/* attempt to match unexpected message to a posted recv */
@ -544,6 +557,30 @@ static void mca_oob_tcp_msg_data(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* pee
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
if(post->msg_flags & ORTE_RML_PERSISTENT) {
#ifdef __WINDOWS__
/*
* In order to protect against reentry, due to a event in
* a threaded callback (and thereby reading EOF, thereby falsely *closing* the connection),
* just register a wrapper callback with zero timeout to libevent.
*
* This will be called "real soon", but we will be ready to grasp the next event.
*/
{
struct envelope_t * envelope;
opal_event_t * peer_event;
peer_event = (opal_event_t *) malloc (sizeof (opal_event_t));
memset (peer_event, 0, sizeof (opal_event_t));
envelope = (envelope_t *) malloc (sizeof (struct envelope_t));
envelope->peer = peer;
envelope->post = post;
opal_evtimer_set(peer_event, peer_event_callback_wrapper, envelope);
opal_event_add (peer_event, 0);
}
#else
post->msg_cbfunc(
post->msg_rc,
&peer->peer_name,
@ -551,6 +588,7 @@ static void mca_oob_tcp_msg_data(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* pee
post->msg_ucnt,
post->msg_hdr.msg_tag,
post->msg_cbdata);
#endif /* __WINDOWS__ */
} else {
mca_oob_tcp_msg_complete(post, &msg->msg_hdr.msg_origin);
}