cleanup cleanup
This commit was SVN r2715.
Этот коммит содержится в:
родитель
cb48562fa5
Коммит
8d0851253a
@ -130,14 +130,24 @@ int mca_ptl_tcp_del_procs(struct mca_ptl_base_module_t* ptl, size_t nprocs, stru
|
||||
|
||||
int mca_ptl_tcp_finalize(struct mca_ptl_base_module_t* ptl)
|
||||
{
|
||||
extern ompi_mutex_t ompi_event_lock;
|
||||
ompi_list_item_t* item;
|
||||
mca_ptl_tcp_module_t *ptl_tcp = (mca_ptl_tcp_module_t*)ptl;
|
||||
|
||||
for( item = ompi_list_get_first(&ptl_tcp->ptl_peers);
|
||||
item != ompi_list_get_end(&ptl_tcp->ptl_peers);
|
||||
item = ompi_list_get_next(item)) {
|
||||
mca_ptl_tcp_peer_t *peer = (mca_ptl_tcp_peer_t*)item;
|
||||
mca_ptl_tcp_peer_shutdown(peer);
|
||||
}
|
||||
|
||||
OMPI_THREAD_LOCK(&ompi_event_lock);
|
||||
for( item = ompi_list_remove_first(&ptl_tcp->ptl_peers);
|
||||
item != NULL;
|
||||
item = ompi_list_remove_first(&ptl_tcp->ptl_peers)) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&ompi_event_lock);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "mca/pml/base/pml_base_sendreq.h"
|
||||
#include "mca/base/mca_base_param.h"
|
||||
#include "mca/base/mca_base_module_exchange.h"
|
||||
#include "mca/oob/base/base.h"
|
||||
#include "ptl_tcp.h"
|
||||
#include "ptl_tcp_addr.h"
|
||||
#include "ptl_tcp_proc.h"
|
||||
@ -172,6 +173,7 @@ int mca_ptl_tcp_component_close(void)
|
||||
if (mca_ptl_tcp_component.tcp_listen_sd >= 0) {
|
||||
ompi_event_del(&mca_ptl_tcp_component.tcp_recv_event);
|
||||
close(mca_ptl_tcp_component.tcp_listen_sd);
|
||||
mca_ptl_tcp_component.tcp_listen_sd = -1;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -459,6 +461,7 @@ int mca_ptl_tcp_component_progress(mca_ptl_tstamp_t tstamp)
|
||||
* requests and queue for completion of the connection handshake.
|
||||
*/
|
||||
|
||||
|
||||
static void mca_ptl_tcp_component_accept(void)
|
||||
{
|
||||
while(true) {
|
||||
@ -477,7 +480,7 @@ static void mca_ptl_tcp_component_accept(void)
|
||||
|
||||
/* wait for receipt of peers process identifier to complete this connection */
|
||||
event = malloc(sizeof(ompi_event_t));
|
||||
ompi_event_set(event, sd, OMPI_EV_READ|OMPI_EV_PERSIST, mca_ptl_tcp_component_recv_handler, event);
|
||||
ompi_event_set(event, sd, OMPI_EV_READ, mca_ptl_tcp_component_recv_handler, event);
|
||||
ompi_event_add(event, 0);
|
||||
}
|
||||
}
|
||||
@ -500,7 +503,6 @@ static void mca_ptl_tcp_component_recv_handler(int sd, short flags, void* user)
|
||||
mca_ptl_tcp_component_accept();
|
||||
return;
|
||||
}
|
||||
ompi_event_del((ompi_event_t*)user);
|
||||
free(user);
|
||||
|
||||
/* recv the process identifier */
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include "include/types.h"
|
||||
#include "mca/pml/base/pml_base_sendreq.h"
|
||||
#include "mca/ns/base/base.h"
|
||||
#include "mca/oob/base/base.h"
|
||||
#include "ptl_tcp.h"
|
||||
#include "ptl_tcp_addr.h"
|
||||
#include "ptl_tcp_peer.h"
|
||||
@ -64,12 +65,8 @@ static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer)
|
||||
|
||||
static void mca_ptl_tcp_peer_destruct(mca_ptl_base_peer_t* ptl_peer)
|
||||
{
|
||||
OMPI_THREAD_LOCK(&ptl_peer->peer_send_lock);
|
||||
OMPI_THREAD_LOCK(&ptl_peer->peer_recv_lock);
|
||||
mca_ptl_tcp_proc_remove(ptl_peer->peer_proc, ptl_peer);
|
||||
mca_ptl_tcp_peer_close(ptl_peer);
|
||||
OMPI_THREAD_UNLOCK(&ptl_peer->peer_send_lock);
|
||||
OMPI_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
|
||||
OBJ_DESTRUCT(&ptl_peer->peer_frags);
|
||||
OBJ_DESTRUCT(&ptl_peer->peer_send_lock);
|
||||
OBJ_DESTRUCT(&ptl_peer->peer_recv_lock);
|
||||
@ -183,6 +180,9 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t
|
||||
}
|
||||
}
|
||||
break;
|
||||
case MCA_PTL_TCP_SHUTDOWN:
|
||||
rc = OMPI_ERROR;
|
||||
break;
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&ptl_peer->peer_send_lock);
|
||||
return rc;
|
||||
@ -297,6 +297,17 @@ void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t* ptl_peer)
|
||||
ptl_peer->peer_retries++;
|
||||
}
|
||||
|
||||
void mca_ptl_tcp_peer_shutdown(mca_ptl_base_peer_t* ptl_peer)
|
||||
{
|
||||
OMPI_THREAD_LOCK(&ptl_peer->peer_recv_lock);
|
||||
OMPI_THREAD_LOCK(&ptl_peer->peer_send_lock);
|
||||
mca_ptl_tcp_peer_close(ptl_peer);
|
||||
ptl_peer->peer_state = MCA_PTL_TCP_SHUTDOWN;
|
||||
OMPI_THREAD_UNLOCK(&ptl_peer->peer_send_lock);
|
||||
OMPI_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Setup peer state to reflect that connection has been established,
|
||||
* and start any pending sends.
|
||||
@ -548,6 +559,10 @@ static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user)
|
||||
ptl_peer->peer_recv_frag = 0;
|
||||
break;
|
||||
}
|
||||
case MCA_PTL_TCP_SHUTDOWN:
|
||||
{
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
ompi_output(0, "mca_ptl_tcp_peer_recv_handler: invalid socket state(%d)", ptl_peer->peer_state);
|
||||
|
@ -26,6 +26,7 @@ typedef enum {
|
||||
MCA_PTL_TCP_CONNECTING,
|
||||
MCA_PTL_TCP_CONNECT_ACK,
|
||||
MCA_PTL_TCP_CONNECTED,
|
||||
MCA_PTL_TCP_SHUTDOWN,
|
||||
MCA_PTL_TCP_FAILED
|
||||
} mca_ptl_tcp_state_t;
|
||||
|
||||
@ -62,6 +63,7 @@ void mca_ptl_tcp_set_socket_options(int sd);
|
||||
void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t*);
|
||||
int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t*, mca_ptl_tcp_send_frag_t*);
|
||||
bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t*, struct sockaddr_in*, int);
|
||||
void mca_ptl_tcp_peer_shutdown(mca_ptl_base_peer_t*);
|
||||
|
||||
#endif
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user