resolved several issues in tcp cleanup
This commit was SVN r2048.
Этот коммит содержится в:
родитель
1dfc06736f
Коммит
4ac886f119
@ -54,6 +54,8 @@ int mca_ptl_tcp_add_procs(
|
|||||||
ompi_bitmap_t* reachable)
|
ompi_bitmap_t* reachable)
|
||||||
{
|
{
|
||||||
size_t i;
|
size_t i;
|
||||||
|
mca_ptl_tcp_module_t *ptl_tcp = (mca_ptl_tcp_module_t*)ptl;
|
||||||
|
|
||||||
for(i=0; i<nprocs; i++) {
|
for(i=0; i<nprocs; i++) {
|
||||||
struct ompi_proc_t *ompi_proc = ompi_procs[i];
|
struct ompi_proc_t *ompi_proc = ompi_procs[i];
|
||||||
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_create(ompi_proc);
|
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_create(ompi_proc);
|
||||||
@ -92,25 +94,48 @@ int mca_ptl_tcp_add_procs(
|
|||||||
ompi_bitmap_set_bit(reachable, i);
|
ompi_bitmap_set_bit(reachable, i);
|
||||||
OMPI_THREAD_UNLOCK(&ptl_proc->proc_lock);
|
OMPI_THREAD_UNLOCK(&ptl_proc->proc_lock);
|
||||||
peers[i] = ptl_peer;
|
peers[i] = ptl_peer;
|
||||||
|
ompi_list_append(&ptl_tcp->ptl_peers, (ompi_list_item_t*)ptl_peer);
|
||||||
}
|
}
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
int mca_ptl_tcp_del_procs(struct mca_ptl_base_module_t* ptl, size_t nprocs, struct ompi_proc_t **procs, struct mca_ptl_base_peer_t ** peers)
|
int mca_ptl_tcp_del_procs(struct mca_ptl_base_module_t* ptl, size_t nprocs, struct ompi_proc_t **procs, struct mca_ptl_base_peer_t ** peers)
|
||||||
{
|
{
|
||||||
size_t i;
|
size_t i;
|
||||||
|
mca_ptl_tcp_module_t *ptl_tcp = (mca_ptl_tcp_module_t*)ptl;
|
||||||
|
|
||||||
for(i=0; i<nprocs; i++) {
|
for(i=0; i<nprocs; i++) {
|
||||||
|
ompi_list_remove_item(&ptl_tcp->ptl_peers, (ompi_list_item_t*)peers[i]);
|
||||||
OBJ_RELEASE(peers[i]);
|
OBJ_RELEASE(peers[i]);
|
||||||
}
|
}
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
int mca_ptl_tcp_finalize(struct mca_ptl_base_module_t* ptl)
|
int mca_ptl_tcp_finalize(struct mca_ptl_base_module_t* ptl)
|
||||||
{
|
{
|
||||||
free(ptl);
|
ompi_list_item_t* item;
|
||||||
|
mca_ptl_tcp_module_t *ptl_tcp = (mca_ptl_tcp_module_t*)ptl;
|
||||||
|
|
||||||
|
for( item = ompi_list_remove_first(&ptl_tcp->ptl_peers);
|
||||||
|
item != NULL;
|
||||||
|
item = ompi_list_remove_first(&ptl_tcp->ptl_peers)) {
|
||||||
|
OBJ_RELEASE(item);
|
||||||
|
}
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
int mca_ptl_tcp_request_init(struct mca_ptl_base_module_t* ptl, struct mca_pml_base_send_request_t* request)
|
int mca_ptl_tcp_request_init(struct mca_ptl_base_module_t* ptl, struct mca_pml_base_send_request_t* request)
|
||||||
{
|
{
|
||||||
OBJ_CONSTRUCT(request+1, mca_ptl_tcp_send_frag_t);
|
OBJ_CONSTRUCT(request+1, mca_ptl_tcp_send_frag_t);
|
||||||
@ -118,6 +143,10 @@ int mca_ptl_tcp_request_init(struct mca_ptl_base_module_t* ptl, struct mca_pml_b
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
void mca_ptl_tcp_request_fini(struct mca_ptl_base_module_t* ptl, struct mca_pml_base_send_request_t* request)
|
void mca_ptl_tcp_request_fini(struct mca_ptl_base_module_t* ptl, struct mca_pml_base_send_request_t* request)
|
||||||
{
|
{
|
||||||
OBJ_DESTRUCT(request+1);
|
OBJ_DESTRUCT(request+1);
|
||||||
|
@ -23,23 +23,22 @@
|
|||||||
struct mca_ptl_tcp_component_t {
|
struct mca_ptl_tcp_component_t {
|
||||||
mca_ptl_base_component_1_0_0_t super; /**< base PTL component */
|
mca_ptl_base_component_1_0_0_t super; /**< base PTL component */
|
||||||
struct mca_ptl_tcp_module_t** tcp_ptl_modules; /**< array of available PTL moduless */
|
struct mca_ptl_tcp_module_t** tcp_ptl_modules; /**< array of available PTL moduless */
|
||||||
size_t tcp_num_ptl_modules; /**< number of ptls actually used */
|
size_t tcp_num_ptl_modules; /**< number of ptls actually used */
|
||||||
size_t tcp_max_ptl_modules; /**< maximum number of ptls - available kernel ifs */
|
size_t tcp_max_ptl_modules; /**< maximum number of ptls - available kernel ifs */
|
||||||
int tcp_listen_sd; /**< listen socket for incoming connection requests */
|
int tcp_listen_sd; /**< listen socket for incoming connection requests */
|
||||||
unsigned short tcp_listen_port; /**< listen port */
|
unsigned short tcp_listen_port; /**< listen port */
|
||||||
char* tcp_if_include; /**< comma seperated list of interface to include */
|
char* tcp_if_include; /**< comma seperated list of interface to include */
|
||||||
char* tcp_if_exclude; /**< comma seperated list of interface to exclude */
|
char* tcp_if_exclude; /**< comma seperated list of interface to exclude */
|
||||||
int tcp_free_list_num; /**< initial size of free lists */
|
int tcp_free_list_num; /**< initial size of free lists */
|
||||||
int tcp_free_list_max; /**< maximum size of free lists */
|
int tcp_free_list_max; /**< maximum size of free lists */
|
||||||
int tcp_free_list_inc; /**< number of elements to alloc when growing free lists */
|
int tcp_free_list_inc; /**< number of elements to alloc when growing free lists */
|
||||||
int tcp_sndbuf; /**< socket sndbuf size */
|
int tcp_sndbuf; /**< socket sndbuf size */
|
||||||
int tcp_rcvbuf; /**< socket rcvbuf size */
|
int tcp_rcvbuf; /**< socket rcvbuf size */
|
||||||
ompi_free_list_t tcp_send_requests; /**< free list of tcp send requests -- sendreq + sendfrag */
|
|
||||||
ompi_free_list_t tcp_send_frags; /**< free list of tcp send fragments */
|
ompi_free_list_t tcp_send_frags; /**< free list of tcp send fragments */
|
||||||
ompi_free_list_t tcp_recv_frags; /**< free list of tcp recv fragments */
|
ompi_free_list_t tcp_recv_frags; /**< free list of tcp recv fragments */
|
||||||
ompi_list_t tcp_procs; /**< list of tcp proc structures */
|
ompi_list_t tcp_procs; /**< list of tcp proc structures */
|
||||||
ompi_list_t tcp_pending_acks; /**< list of pending acks - retry as sends complete */
|
ompi_list_t tcp_pending_acks; /**< list of pending acks - retry as sends complete */
|
||||||
struct mca_ptl_tcp_proc_t* tcp_local; /**< the tcp proc instance corresponding to the local process */
|
struct mca_ptl_tcp_proc_t* tcp_local; /**< the tcp proc instance corresponding to the local process */
|
||||||
ompi_event_t tcp_send_event; /**< event structure for sends */
|
ompi_event_t tcp_send_event; /**< event structure for sends */
|
||||||
ompi_event_t tcp_recv_event; /**< event structure for recvs */
|
ompi_event_t tcp_recv_event; /**< event structure for recvs */
|
||||||
ompi_mutex_t tcp_lock; /**< lock for accessing module state */
|
ompi_mutex_t tcp_lock; /**< lock for accessing module state */
|
||||||
@ -99,10 +98,11 @@ extern int mca_ptl_tcp_component_progress(
|
|||||||
* TCP PTL Interface
|
* TCP PTL Interface
|
||||||
*/
|
*/
|
||||||
struct mca_ptl_tcp_module_t {
|
struct mca_ptl_tcp_module_t {
|
||||||
mca_ptl_base_module_t super; /**< base PTL module interface */
|
mca_ptl_base_module_t super; /**< base PTL module interface */
|
||||||
int ptl_ifindex; /**< PTL interface index */
|
int ptl_ifindex; /**< PTL interface index */
|
||||||
struct sockaddr_in ptl_ifaddr; /**< PTL interface address */
|
struct sockaddr_in ptl_ifaddr; /**< PTL interface address */
|
||||||
struct sockaddr_in ptl_ifmask; /**< PTL interface netmask */
|
struct sockaddr_in ptl_ifmask; /**< PTL interface netmask */
|
||||||
|
ompi_list_t ptl_peers; /**< List of all peers for this PTL */
|
||||||
#if MCA_PTL_TCP_STATISTICS
|
#if MCA_PTL_TCP_STATISTICS
|
||||||
size_t ptl_bytes_sent;
|
size_t ptl_bytes_sent;
|
||||||
size_t ptl_bytes_recv;
|
size_t ptl_bytes_recv;
|
||||||
|
@ -108,7 +108,6 @@ int mca_ptl_tcp_component_open(void)
|
|||||||
OBJ_CONSTRUCT(&mca_ptl_tcp_component.tcp_lock, ompi_mutex_t);
|
OBJ_CONSTRUCT(&mca_ptl_tcp_component.tcp_lock, ompi_mutex_t);
|
||||||
OBJ_CONSTRUCT(&mca_ptl_tcp_component.tcp_procs, ompi_list_t);
|
OBJ_CONSTRUCT(&mca_ptl_tcp_component.tcp_procs, ompi_list_t);
|
||||||
OBJ_CONSTRUCT(&mca_ptl_tcp_component.tcp_pending_acks, ompi_list_t);
|
OBJ_CONSTRUCT(&mca_ptl_tcp_component.tcp_pending_acks, ompi_list_t);
|
||||||
OBJ_CONSTRUCT(&mca_ptl_tcp_component.tcp_send_requests, ompi_free_list_t);
|
|
||||||
OBJ_CONSTRUCT(&mca_ptl_tcp_component.tcp_send_frags, ompi_free_list_t);
|
OBJ_CONSTRUCT(&mca_ptl_tcp_component.tcp_send_frags, ompi_free_list_t);
|
||||||
OBJ_CONSTRUCT(&mca_ptl_tcp_component.tcp_recv_frags, ompi_free_list_t);
|
OBJ_CONSTRUCT(&mca_ptl_tcp_component.tcp_recv_frags, ompi_free_list_t);
|
||||||
|
|
||||||
@ -145,12 +144,6 @@ int mca_ptl_tcp_component_open(void)
|
|||||||
|
|
||||||
int mca_ptl_tcp_component_close(void)
|
int mca_ptl_tcp_component_close(void)
|
||||||
{
|
{
|
||||||
if (mca_ptl_tcp_component.tcp_send_requests.fl_num_allocated !=
|
|
||||||
mca_ptl_tcp_component.tcp_send_requests.super.ompi_list_length) {
|
|
||||||
ompi_output(0, "tcp send requests: %d allocated %d returned\n",
|
|
||||||
mca_ptl_tcp_component.tcp_send_requests.fl_num_allocated,
|
|
||||||
mca_ptl_tcp_component.tcp_send_requests.super.ompi_list_length);
|
|
||||||
}
|
|
||||||
if (mca_ptl_tcp_component.tcp_send_frags.fl_num_allocated !=
|
if (mca_ptl_tcp_component.tcp_send_frags.fl_num_allocated !=
|
||||||
mca_ptl_tcp_component.tcp_send_frags.super.ompi_list_length) {
|
mca_ptl_tcp_component.tcp_send_frags.super.ompi_list_length) {
|
||||||
ompi_output(0, "tcp send frags: %d allocated %d returned\n",
|
ompi_output(0, "tcp send frags: %d allocated %d returned\n",
|
||||||
@ -172,13 +165,14 @@ int mca_ptl_tcp_component_close(void)
|
|||||||
free(mca_ptl_tcp_component.tcp_ptl_modules);
|
free(mca_ptl_tcp_component.tcp_ptl_modules);
|
||||||
|
|
||||||
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_procs);
|
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_procs);
|
||||||
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_send_requests);
|
|
||||||
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_send_frags);
|
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_send_frags);
|
||||||
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_recv_frags);
|
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_recv_frags);
|
||||||
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_lock);
|
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_lock);
|
||||||
|
|
||||||
if (mca_ptl_tcp_component.tcp_listen_sd >= 0)
|
if (mca_ptl_tcp_component.tcp_listen_sd >= 0) {
|
||||||
|
ompi_event_del(&mca_ptl_tcp_component.tcp_recv_event);
|
||||||
close(mca_ptl_tcp_component.tcp_listen_sd);
|
close(mca_ptl_tcp_component.tcp_listen_sd);
|
||||||
|
}
|
||||||
return ompi_event_fini();
|
return ompi_event_fini();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,6 +188,7 @@ static int mca_ptl_tcp_create(int if_index, const char* if_name)
|
|||||||
if(NULL == ptl)
|
if(NULL == ptl)
|
||||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
memcpy(ptl, &mca_ptl_tcp_module, sizeof(mca_ptl_tcp_module));
|
memcpy(ptl, &mca_ptl_tcp_module, sizeof(mca_ptl_tcp_module));
|
||||||
|
OBJ_CONSTRUCT(&ptl->ptl_peers, ompi_list_t);
|
||||||
mca_ptl_tcp_component.tcp_ptl_modules[mca_ptl_tcp_component.tcp_num_ptl_modules++] = ptl;
|
mca_ptl_tcp_component.tcp_ptl_modules[mca_ptl_tcp_component.tcp_num_ptl_modules++] = ptl;
|
||||||
|
|
||||||
/* initialize the ptl */
|
/* initialize the ptl */
|
||||||
@ -399,15 +394,6 @@ mca_ptl_base_module_t** mca_ptl_tcp_component_init(int *num_ptl_modules,
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* initialize free lists */
|
|
||||||
ompi_free_list_init(&mca_ptl_tcp_component.tcp_send_requests,
|
|
||||||
sizeof(mca_ptl_tcp_send_request_t),
|
|
||||||
OBJ_CLASS(mca_ptl_tcp_send_request_t),
|
|
||||||
mca_ptl_tcp_component.tcp_free_list_num,
|
|
||||||
mca_ptl_tcp_component.tcp_free_list_max,
|
|
||||||
mca_ptl_tcp_component.tcp_free_list_inc,
|
|
||||||
NULL); /* use default allocator */
|
|
||||||
|
|
||||||
ompi_free_list_init(&mca_ptl_tcp_component.tcp_send_frags,
|
ompi_free_list_init(&mca_ptl_tcp_component.tcp_send_frags,
|
||||||
sizeof(mca_ptl_tcp_send_frag_t),
|
sizeof(mca_ptl_tcp_send_frag_t),
|
||||||
OBJ_CLASS(mca_ptl_tcp_send_frag_t),
|
OBJ_CLASS(mca_ptl_tcp_send_frag_t),
|
||||||
|
@ -58,6 +58,15 @@ static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer)
|
|||||||
OBJ_CONSTRUCT(&ptl_peer->peer_recv_lock, ompi_mutex_t);
|
OBJ_CONSTRUCT(&ptl_peer->peer_recv_lock, ompi_mutex_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Cleanup any resources held by the peer.
|
||||||
|
*/
|
||||||
|
|
||||||
|
static void mca_ptl_tcp_peer_destruct(mca_ptl_base_peer_t* ptl_peer)
|
||||||
|
{
|
||||||
|
mca_ptl_tcp_proc_remove(ptl_peer->peer_proc, ptl_peer);
|
||||||
|
mca_ptl_tcp_peer_close(ptl_peer);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* diagnostics
|
* diagnostics
|
||||||
@ -132,16 +141,6 @@ static inline void mca_ptl_tcp_peer_event_init(mca_ptl_base_peer_t* ptl_peer, in
|
|||||||
ptl_peer);
|
ptl_peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Cleanup any resources held by the peer.
|
|
||||||
*/
|
|
||||||
|
|
||||||
static void mca_ptl_tcp_peer_destruct(mca_ptl_base_peer_t* ptl_peer)
|
|
||||||
{
|
|
||||||
mca_ptl_tcp_proc_remove(ptl_peer->peer_proc, ptl_peer);
|
|
||||||
mca_ptl_tcp_peer_close(ptl_peer);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Attempt to send a fragment using a given peer. If the peer is not connected,
|
* Attempt to send a fragment using a given peer. If the peer is not connected,
|
||||||
|
@ -198,12 +198,13 @@ int mca_ptl_tcp_proc_remove(mca_ptl_tcp_proc_t* ptl_proc, mca_ptl_base_peer_t* p
|
|||||||
OMPI_THREAD_LOCK(&ptl_proc->proc_lock);
|
OMPI_THREAD_LOCK(&ptl_proc->proc_lock);
|
||||||
for(i=0; i<ptl_proc->proc_peer_count; i++) {
|
for(i=0; i<ptl_proc->proc_peer_count; i++) {
|
||||||
if(ptl_proc->proc_peers[i] == ptl_peer) {
|
if(ptl_proc->proc_peers[i] == ptl_peer) {
|
||||||
memmove(&ptl_proc->proc_peers+i,ptl_proc->proc_peers+i+1,
|
memmove(ptl_proc->proc_peers+i,ptl_proc->proc_peers+i+1,
|
||||||
(ptl_proc->proc_peer_count-i)*sizeof(mca_ptl_base_peer_t*));
|
(ptl_proc->proc_peer_count-i)*sizeof(mca_ptl_base_peer_t*));
|
||||||
|
ptl_proc->proc_peer_count--;
|
||||||
|
ptl_peer->peer_addr->addr_inuse--;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ptl_proc->proc_peer_count--;
|
|
||||||
ptl_peer->peer_addr->addr_inuse--;
|
|
||||||
OMPI_THREAD_UNLOCK(&ptl_proc->proc_lock);
|
OMPI_THREAD_UNLOCK(&ptl_proc->proc_lock);
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
Загрузка…
Ссылка в новой задаче
Block a user