1
1

resolved several race conditions

This commit was SVN r2402.
Этот коммит содержится в:
Tim Woodall 2004-08-31 02:57:39 +00:00
родитель 722867c82d
Коммит 7bd0f1f4b3
3 изменённых файлов: 74 добавлений и 24 удалений

Просмотреть файл

@ -284,7 +284,6 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
} }
/* is the peer instance willing to accept this connection */ /* is the peer instance willing to accept this connection */
if(mca_oob_tcp_peer_accept(peer, sd) == false) { if(mca_oob_tcp_peer_accept(peer, sd) == false) {
ompi_output(0, "mca_oob_tcp_recv_handler: peer instance not willing to accept connection.");
close(sd); close(sd);
return; return;
} }

Просмотреть файл

@ -43,8 +43,10 @@ int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* rc)
OMPI_THREAD_LOCK(&msg->msg_lock); OMPI_THREAD_LOCK(&msg->msg_lock);
while(msg->msg_complete == false) { while(msg->msg_complete == false) {
if(ompi_event_progress_thread()) { if(ompi_event_progress_thread()) {
int rc;
OMPI_THREAD_UNLOCK(&msg->msg_lock); OMPI_THREAD_UNLOCK(&msg->msg_lock);
ompi_event_loop(OMPI_EVLOOP_ONCE); rc = ompi_event_loop(OMPI_EVLOOP_ONCE);
assert(rc == 0);
OMPI_THREAD_LOCK(&msg->msg_lock); OMPI_THREAD_LOCK(&msg->msg_lock);
} else { } else {
ompi_condition_wait(&msg->msg_condition, &msg->msg_lock); ompi_condition_wait(&msg->msg_condition, &msg->msg_lock);

Просмотреть файл

@ -105,8 +105,31 @@ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
* queue the message and start the connection to the peer * queue the message and start the connection to the peer
*/ */
ompi_list_append(&peer->peer_send_queue, (ompi_list_item_t*)msg); ompi_list_append(&peer->peer_send_queue, (ompi_list_item_t*)msg);
if(peer->peer_state == MCA_OOB_TCP_CLOSED) { if(peer->peer_state == MCA_OOB_TCP_CLOSED) {
rc = mca_oob_tcp_peer_start_connect(peer); peer->peer_state = MCA_OOB_TCP_CONNECTING;
OMPI_THREAD_UNLOCK(&peer->peer_lock);
/*
* attempt to resolve peer address
*/
if (mca_oob_tcp_peer_name_lookup(peer) != OMPI_SUCCESS) {
OMPI_THREAD_LOCK(&peer->peer_lock);
if(peer->peer_retries++ < mca_oob_tcp_component.tcp_peer_retries) {
struct timeval tv = { 1, 0 };
ompi_evtimer_add(&peer->peer_timer_event, &tv);
}
OMPI_THREAD_UNLOCK(&peer->peer_lock);
/*
* start connection
*/
} else {
OMPI_THREAD_LOCK(&peer->peer_lock);
rc = mca_oob_tcp_peer_start_connect(peer);
OMPI_THREAD_UNLOCK(&peer->peer_lock);
}
return rc;
} }
break; break;
case MCA_OOB_TCP_FAILED: case MCA_OOB_TCP_FAILED:
@ -241,17 +264,6 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
ompi_output(0, "mca_oob_tcp_peer_connect: fcntl(F_SETFL) failed with errno=%d\n", errno); ompi_output(0, "mca_oob_tcp_peer_connect: fcntl(F_SETFL) failed with errno=%d\n", errno);
} }
/* resolve the peer address */
if ((rc = mca_oob_tcp_peer_name_lookup(peer)) != OMPI_SUCCESS) {
struct timeval tv = { 1, 0 };
mca_oob_tcp_peer_close(peer);
if(peer->peer_retries > mca_oob_tcp_component.tcp_peer_retries) {
return OMPI_ERR_UNREACH;
}
ompi_evtimer_add(&peer->peer_timer_event, &tv);
return OMPI_SUCCESS;
}
/* start the connect - will likely fail with EINPROGRESS */ /* start the connect - will likely fail with EINPROGRESS */
if(connect(peer->peer_sd, (struct sockaddr*)&(peer->peer_addr), sizeof(peer->peer_addr)) < 0) { if(connect(peer->peer_sd, (struct sockaddr*)&(peer->peer_addr), sizeof(peer->peer_addr)) < 0) {
/* non-blocking so wait for completion */ /* non-blocking so wait for completion */
@ -260,7 +272,10 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
ompi_event_add(&peer->peer_send_event, 0); ompi_event_add(&peer->peer_send_event, 0);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
ompi_output(0, "mca_oob_tcp_msg_peer_start_connect: unable to connect to peer. errno=%d", errno); ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_start_connect: connect failed with errno=%d",
mca_oob_name_self.cellid, mca_oob_name_self.jobid, mca_oob_name_self.vpid,
peer->peer_name.cellid, peer->peer_name.jobid,peer->peer_name.vpid,
errno);
mca_oob_tcp_peer_close(peer); mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH; return OMPI_ERR_UNREACH;
} }
@ -270,7 +285,12 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
peer->peer_state = MCA_OOB_TCP_CONNECT_ACK; peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;
ompi_event_add(&peer->peer_recv_event, 0); ompi_event_add(&peer->peer_recv_event, 0);
} else { } else {
ompi_output(0, "mca_oob_tcp_peer_start_connect: unable to send connect ack to peer. errno=%d", errno); ompi_output(0,
"[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_start_connect: "
"mca_oob_tcp_peer_send_connect_ack failed with errno=%d",
mca_oob_name_self.cellid, mca_oob_name_self.jobid, mca_oob_name_self.vpid,
peer->peer_name.cellid, peer->peer_name.jobid,peer->peer_name.vpid,
rc);
mca_oob_tcp_peer_close(peer); mca_oob_tcp_peer_close(peer);
} }
return rc; return rc;
@ -317,7 +337,7 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
peer->peer_state = MCA_OOB_TCP_CONNECT_ACK; peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;
ompi_event_add(&peer->peer_recv_event, 0); ompi_event_add(&peer->peer_recv_event, 0);
} else { } else {
ompi_output(0, "mca_oob_tcp_peer_complete_connect: unable to send connect ack."); ompi_output(0, "mca_oob_tcp_peer_complete_connect: unable to send connect ack.");
mca_oob_tcp_peer_close(peer); mca_oob_tcp_peer_close(peer);
} }
} }
@ -362,7 +382,8 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
ompi_event_del(&peer->peer_send_event); ompi_event_del(&peer->peer_send_event);
close(peer->peer_sd); close(peer->peer_sd);
peer->peer_sd = -1; peer->peer_sd = -1;
} }
ompi_event_del(&peer->peer_timer_event);
peer->peer_state = MCA_OOB_TCP_CLOSED; peer->peer_state = MCA_OOB_TCP_CLOSED;
} }
@ -441,7 +462,10 @@ static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data,
/* socket is non-blocking so handle errors */ /* socket is non-blocking so handle errors */
if(retval < 0) { if(retval < 0) {
if(errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { if(errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
ompi_output(0, "mca_oob_tcp_peer_recv_blocking: recv() failed with errno=%d\n",errno); ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_recv_blocking: recv() failed with errno=%d\n",
mca_oob_name_self.cellid, mca_oob_name_self.jobid, mca_oob_name_self.vpid,
peer->peer_name.cellid, peer->peer_name.jobid, peer->peer_name.vpid,
errno);
mca_oob_tcp_peer_close(peer); mca_oob_tcp_peer_close(peer);
return -1; return -1;
} }
@ -466,7 +490,10 @@ static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data,
int retval = send(peer->peer_sd, ptr+cnt, size-cnt, 0); int retval = send(peer->peer_sd, ptr+cnt, size-cnt, 0);
if(retval < 0) { if(retval < 0) {
if(errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { if(errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
ompi_output(0, "mca_oob_tcp_peer_send_blocking: send() failed with errno=%d\n",errno); ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_send_blocking: send() failed with errno=%d\n",
mca_oob_name_self.cellid, mca_oob_name_self.jobid, mca_oob_name_self.vpid,
peer->peer_name.cellid, peer->peer_name.jobid, peer->peer_name.vpid,
errno);
mca_oob_tcp_peer_close(peer); mca_oob_tcp_peer_close(peer);
return -1; return -1;
} }
@ -659,7 +686,10 @@ static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user)
} }
default: default:
{ {
ompi_output(0, "mca_oob_tcp_peer_recv_handler: invalid socket state(%d)", peer->peer_state); ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_recv_handler: invalid socket state(%d)",
mca_oob_name_self.cellid, mca_oob_name_self.jobid, mca_oob_name_self.vpid,
peer->peer_name.cellid, peer->peer_name.jobid, peer->peer_name.vpid,
peer->peer_state);
mca_oob_tcp_peer_close(peer); mca_oob_tcp_peer_close(peer);
break; break;
} }
@ -774,7 +804,10 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd)
if ((peer->peer_state == MCA_OOB_TCP_CLOSED) || if ((peer->peer_state == MCA_OOB_TCP_CLOSED) ||
(peer->peer_state != MCA_OOB_TCP_CONNECTED && (peer->peer_state != MCA_OOB_TCP_CONNECTED &&
mca_oob_tcp_process_name_compare(&peer->peer_name, MCA_OOB_NAME_SELF) < 0)) { mca_oob_tcp_process_name_compare(&peer->peer_name, MCA_OOB_NAME_SELF) < 0)) {
mca_oob_tcp_peer_close(peer);
if(peer->peer_state != MCA_OOB_TCP_CLOSED) {
mca_oob_tcp_peer_close(peer);
}
peer->peer_sd = sd; peer->peer_sd = sd;
mca_oob_tcp_peer_event_init(peer); mca_oob_tcp_peer_event_init(peer);
@ -783,8 +816,9 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd)
OMPI_THREAD_UNLOCK(&peer->peer_lock); OMPI_THREAD_UNLOCK(&peer->peer_lock);
return false; return false;
} }
ompi_event_add(&peer->peer_recv_event, 0);
mca_oob_tcp_peer_connected(peer); mca_oob_tcp_peer_connected(peer);
ompi_event_add(&peer->peer_recv_event, 0);
#if OMPI_ENABLE_DEBUG && 0 #if OMPI_ENABLE_DEBUG && 0
mca_oob_tcp_peer_dump(peer, "accepted"); mca_oob_tcp_peer_dump(peer, "accepted");
#endif #endif
@ -842,6 +876,21 @@ int mca_oob_tcp_peer_name_lookup(mca_oob_tcp_peer_t* peer)
static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user) static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user)
{ {
mca_oob_tcp_peer_start_connect((mca_oob_tcp_peer_t*)user); /* resolve the peer address */
mca_oob_tcp_peer_t *peer = (mca_oob_tcp_peer_t*)user;
if (mca_oob_tcp_peer_name_lookup(peer) != OMPI_SUCCESS) {
OMPI_THREAD_LOCK(&peer->peer_lock);
if(peer->peer_retries++ < mca_oob_tcp_component.tcp_peer_retries) {
struct timeval tv = { 1, 0 };
ompi_evtimer_add(&peer->peer_timer_event, &tv);
}
OMPI_THREAD_UNLOCK(&peer->peer_lock);
} else {
/* start the connection to the peer */
OMPI_THREAD_LOCK(&peer->peer_lock);
mca_oob_tcp_peer_start_connect(peer);
OMPI_THREAD_UNLOCK(&peer->peer_lock);
}
} }