1
1

resolved threading issues in tcp oob

This commit was SVN r1842.
Этот коммит содержится в:
Tim Woodall 2004-08-02 22:16:35 +00:00
родитель 105b8e6b5b
Коммит e51a55a05e
4 изменённых файлов: 23 добавлений и 15 удалений

Просмотреть файл

@ -80,6 +80,9 @@ int mca_oob_tcp_open(void)
mca_oob_tcp_param_register_int("peer_limit", -1);
mca_oob_tcp_component.tcp_peer_retries =
mca_oob_tcp_param_register_int("peer_retries", 60);
/* initialize state */
mca_oob_tcp_component.tcp_listen_sd = -1;
return OMPI_SUCCESS;
}
@ -90,6 +93,10 @@ int mca_oob_tcp_open(void)
int mca_oob_tcp_close(void)
{
if (mca_oob_tcp_component.tcp_listen_sd >= 0) {
close(mca_oob_tcp_component.tcp_listen_sd);
}
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_list);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_tree);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_free);
@ -272,7 +279,7 @@ mca_oob_t* mca_oob_tcp_init(bool *allow_multi_user_threads, bool *have_hidden_th
sizeof(mca_oob_tcp_msg_t),
OBJ_CLASS(mca_oob_tcp_msg_t),
8, /* initial number */
-1, /* maximum number */
-1, /* maximum number */
8, /* increment to grow by */
NULL); /* use default allocator */
@ -300,8 +307,7 @@ mca_oob_t* mca_oob_tcp_init(bool *allow_multi_user_threads, bool *have_hidden_th
*/
int mca_oob_tcp_finalize(void)
{
/* probably want to try to finish all sends and recieves here
* before we return */
/* TODO: need to cleanup all peers - check for pending send/recvs. etc. */
return OMPI_SUCCESS;
}

Просмотреть файл

@ -502,6 +502,7 @@ static void mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
*/
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
msg = mca_oob_tcp_msg_match_post(&peer->peer_name, hdr.msg_tag, false);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
if(NULL != msg) {
uint32_t posted_size = 0;
int i;
@ -530,8 +531,6 @@ static void mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
int rc;
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
OMPI_THREAD_UNLOCK(&peer->peer_lock);
return;
}
msg->msg_type = MCA_OOB_TCP_UNEXPECTED;
@ -555,7 +554,6 @@ static void mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
/* continue processing until complete */
peer->peer_recv_msg = msg;
}
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
}

Просмотреть файл

@ -40,21 +40,21 @@ int mca_oob_tcp_recv(
rc += msg->msg_rwiov[i].iov_len;
}
if(MCA_OOB_PEEK & flags) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
return rc;
}
/* otherwise dequeue the message and return to free list */
ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg);
MCA_OOB_TCP_MSG_RETURN(msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
return rc;
}
/* the message has not already been received. So we add it to the receive queue */
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
return rc;
}
@ -74,7 +74,7 @@ int mca_oob_tcp_recv(
msg->msg_complete = false;
msg->msg_peer = *peer;
ompi_list_append(&mca_oob_tcp_component.tcp_msg_post, (ompi_list_item_t *) msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
/* wait for the receive to complete */
mca_oob_tcp_msg_wait(msg, &rc);
@ -107,7 +107,7 @@ int mca_oob_tcp_recv_nb(
int i, rc, size = 0;
/* lock the tcp struct */
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
/* check to see if a matching receive is on the list */
msg = mca_oob_tcp_msg_match_recv(peer, tag);
@ -124,14 +124,14 @@ int mca_oob_tcp_recv_nb(
rc += msg->msg_rwiov[i].iov_len;
}
if(MCA_OOB_PEEK & flags) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata);
return 0;
}
/* otherwise dequeue the message and return to free list */
ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata);
MCA_OOB_TCP_MSG_RETURN(msg);
return 0;
@ -140,7 +140,7 @@ int mca_oob_tcp_recv_nb(
/* the message has not already been received. So we add it to the receive queue */
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
return rc;
}
@ -160,7 +160,7 @@ int mca_oob_tcp_recv_nb(
msg->msg_complete = false;
msg->msg_peer = *peer;
ompi_list_append(&mca_oob_tcp_component.tcp_msg_post, (ompi_list_item_t *) msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
return 0;
}

Просмотреть файл

@ -100,6 +100,7 @@ static inline int mca_ptl_tcp_param_register_int(
int mca_ptl_tcp_component_open(void)
{
/* initialize state */
mca_ptl_tcp_component.tcp_listen_sd = -1;
mca_ptl_tcp_component.tcp_ptl_modules = NULL;
mca_ptl_tcp_component.tcp_num_ptl_modules = 0;
@ -175,6 +176,9 @@ int mca_ptl_tcp_component_close(void)
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_send_frags);
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_recv_frags);
OBJ_DESTRUCT(&mca_ptl_tcp_component.tcp_lock);
if (mca_ptl_tcp_component.tcp_listen_sd >= 0)
close(mca_ptl_tcp_component.tcp_listen_sd);
return ompi_event_fini();
}