diff --git a/orte/mca/oob/tcp/oob_tcp.c b/orte/mca/oob/tcp/oob_tcp.c index b2e6f5ccce..c7213e8cb3 100644 --- a/orte/mca/oob/tcp/oob_tcp.c +++ b/orte/mca/oob/tcp/oob_tcp.c @@ -223,6 +223,16 @@ int mca_oob_tcp_component_open(void) mca_oob_tcp_component.tcp_rcvbuf = mca_oob_tcp_param_register_int("rcvbuf", 128*1024); + /* AWF - may need to increase this for large-scale jobs - + see AWF comment in oob_tcp_peer.c */ + mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base, + "connect_timeout", + "connect() timeout in seconds, before trying next interface", + false, + false, + 10, + &mca_oob_tcp_component.tcp_timeout); + mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base, "listen_mode", "Mode for HNP to accept incoming connections: event, listen_thread", @@ -230,7 +240,7 @@ int mca_oob_tcp_component_open(void) false, "event", &listen_type); - + if ((0 == strcmp(listen_type, "event")) || NULL == getenv("I_AM_MPIRUN")) { mca_oob_tcp_component.tcp_listen_type = OOB_TCP_EVENT; } else if (0 == strcmp(listen_type, "listen_thread")) { diff --git a/orte/mca/oob/tcp/oob_tcp.h b/orte/mca/oob/tcp/oob_tcp.h index 30e3fdfabf..8814e4ce7f 100644 --- a/orte/mca/oob/tcp/oob_tcp.h +++ b/orte/mca/oob/tcp/oob_tcp.h @@ -242,6 +242,7 @@ struct mca_oob_tcp_component_t { int tcp_peer_retries; /**< max number of retries before declaring peer gone */ int tcp_sndbuf; /**< socket send buffer size */ int tcp_rcvbuf; /**< socket recv buffer size */ + int tcp_timeout; /**< socket connect timeout in seconds */ opal_free_list_t tcp_msgs; /**< free list of messages */ opal_event_t tcp_send_event; /**< event structure for sends */ opal_event_t tcp_recv_event; /**< event structure for recvs */ diff --git a/orte/mca/oob/tcp/oob_tcp_peer.c b/orte/mca/oob/tcp/oob_tcp_peer.c index f60ec4cae3..a4b0e5c16d 100644 --- a/orte/mca/oob/tcp/oob_tcp_peer.c +++ b/orte/mca/oob/tcp/oob_tcp_peer.c @@ -253,6 +253,97 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const orte_process_name_t* name) } +/* + * Try connecting to a peer using all the addresses that peer exported. + */ + +static int mca_oob_tcp_peer_try_connect(mca_oob_tcp_peer_t* peer) +{ + struct sockaddr_in inaddr; + int rc; + + do { + /* pick an address in round-robin fashion from the list exported by the peer */ + if((rc = mca_oob_tcp_addr_get_next(peer->peer_addr, &inaddr)) != ORTE_SUCCESS) { + opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: " + "mca_oob_tcp_addr_get_next failed with error=%d", + ORTE_NAME_ARGS(orte_process_info.my_name), + ORTE_NAME_ARGS(&(peer->peer_name)), + rc); + mca_oob_tcp_peer_close(peer); + return ORTE_ERR_UNREACH; + } + + if(mca_oob_tcp_component.tcp_debug > 0) { + opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: " + "connecting port %d to: %s:%d\n", + ORTE_NAME_ARGS(orte_process_info.my_name), + ORTE_NAME_ARGS(&(peer->peer_name)), + ntohs(mca_oob_tcp_component.tcp_listen_port), + inet_ntoa(inaddr.sin_addr), + ntohs(inaddr.sin_port)); + } + + /* start the connect - will likely fail with EINPROGRESS */ + if(connect(peer->peer_sd, + (struct sockaddr*)&inaddr, sizeof(struct sockaddr_in)) < 0) { + /* non-blocking so wait for completion */ + if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) { + /* AWF - the connect_timeout MCA parameter defaults to a low setting (10secs) + to minimize job startup time on IU BigRed. However, on large machines + such a short timeout may not be suitable -- the head node may not be + able to accept connections fast enough. If this is the case, increase + the connect_timeout MCA parameter. + */ + struct timeval tv = {mca_oob_tcp_component.tcp_timeout, 0}; + opal_evtimer_add(&peer->peer_timer_event, &tv); + + return ORTE_SUCCESS; + } + + if(mca_oob_tcp_component.tcp_debug > 0) { + opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: " + "connect to %s:%d failed with errno=%d", + ORTE_NAME_ARGS(orte_process_info.my_name), + ORTE_NAME_ARGS(&(peer->peer_name)), + inet_ntoa(inaddr.sin_addr), + ntohs(inaddr.sin_port), + opal_socket_errno); + } + + continue; + } + + /* send our globally unique process identifier to the peer */ + if((rc = mca_oob_tcp_peer_send_connect_ack(peer)) == ORTE_SUCCESS) { + peer->peer_state = MCA_OOB_TCP_CONNECT_ACK; + opal_event_add(&peer->peer_recv_event, 0); + return ORTE_SUCCESS; + } else { + opal_output(0, + "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: " + "mca_oob_tcp_peer_send_connect_ack to %s:%d failed with errno=%d", + ORTE_NAME_ARGS(orte_process_info.my_name), + ORTE_NAME_ARGS(&(peer->peer_name)), + inet_ntoa(inaddr.sin_addr), + ntohs(inaddr.sin_port), + rc); + } + } while(peer->peer_addr->addr_next != 0); + + /* None of the interfaces worked.. */ + opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: " + "connect to %s:%d failed, connecting over all interfaces failed!", + ORTE_NAME_ARGS(orte_process_info.my_name), + ORTE_NAME_ARGS(&(peer->peer_name)), + inet_ntoa(inaddr.sin_addr), + ntohs(inaddr.sin_port), + opal_socket_errno); + mca_oob_tcp_peer_close(peer); + return ORTE_ERR_UNREACH; +} + + /* * Start a connection to the peer. This will likely not complete, * as the socket is set to non-blocking, so register for event @@ -260,11 +351,10 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const orte_process_name_t* name) * our globally unique process identifier to the peer and wait for * the peers response. */ - + static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer) { - int rc, flags; - struct sockaddr_in inaddr; + int flags; /* create socket */ peer->peer_state = MCA_OOB_TCP_CONNECTING; @@ -311,63 +401,8 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer) /* * We should parse all the IP addresses exported by the peer and try to connect to each of them. */ - do { - /* pick an address in round-robin fashion from the list exported by the peer */ - if((rc = mca_oob_tcp_addr_get_next(peer->peer_addr, &inaddr)) != ORTE_SUCCESS) { - opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: mca_oob_tcp_addr_get_next failed with error=%d", - ORTE_NAME_ARGS(orte_process_info.my_name), - ORTE_NAME_ARGS(&(peer->peer_name)), - rc); - break; - } - if(mca_oob_tcp_component.tcp_debug > 0) { - opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: connecting port %d to: %s:%d\n", - ORTE_NAME_ARGS(orte_process_info.my_name), - ORTE_NAME_ARGS(&(peer->peer_name)), - ntohs(mca_oob_tcp_component.tcp_listen_port), - inet_ntoa(inaddr.sin_addr), - ntohs(inaddr.sin_port)); - } - - /* start the connect - will likely fail with EINPROGRESS */ - if(connect(peer->peer_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) { - /* non-blocking so wait for completion */ - if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) { - opal_event_add(&peer->peer_send_event, 0); - /* Waiting for completion in the middle of the list ?! Let's just hope we try with the - * correct IP address... - */ - return ORTE_SUCCESS; - } - if(mca_oob_tcp_component.tcp_debug > 0) { - opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: connect to %s:%d failed with errno=%d", - ORTE_NAME_ARGS(orte_process_info.my_name), - ORTE_NAME_ARGS(&(peer->peer_name)), - inet_ntoa(inaddr.sin_addr), - ntohs(inaddr.sin_port), - opal_socket_errno); - } - continue; - } - - /* send our globally unique process identifier to the peer */ - if((rc = mca_oob_tcp_peer_send_connect_ack(peer)) == ORTE_SUCCESS) { - peer->peer_state = MCA_OOB_TCP_CONNECT_ACK; - opal_event_add(&peer->peer_recv_event, 0); - return ORTE_SUCCESS; /* successfully connect to the peer */ - } else { - opal_output(0, - "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: " - "mca_oob_tcp_peer_send_connect_ack to %s:%d failed with errno=%d", - ORTE_NAME_ARGS(orte_process_info.my_name), - ORTE_NAME_ARGS(&(peer->peer_name)), - inet_ntoa(inaddr.sin_addr), - ntohs(inaddr.sin_port), - rc); - } - } while( peer->peer_addr->addr_next != 0 ); - mca_oob_tcp_peer_close(peer); - return ORTE_ERR_UNREACH; + + return mca_oob_tcp_peer_try_connect(peer); } @@ -393,6 +428,7 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer) mca_oob_tcp_peer_close(peer); return; } + if(so_error == EINPROGRESS) { opal_event_add(&peer->peer_send_event, 0); return; @@ -407,12 +443,11 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer) opal_evtimer_add(&peer->peer_timer_event, &tv); return; } else if(so_error != 0) { - opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_complete_connect: connect() failed with errno=%d\n", - ORTE_NAME_ARGS(orte_process_info.my_name), - ORTE_NAME_ARGS(&(peer->peer_name)), - so_error); - mca_oob_tcp_peer_close(peer); - return; + /* If no interfaces could connect (the error case below), an error message has + already been printed, so we can just return */ + if(ORTE_ERR_UNREACH == mca_oob_tcp_peer_try_connect(peer)) { + return; + } } if(mca_oob_tcp_peer_send_connect_ack(peer) == ORTE_SUCCESS) { @@ -902,6 +937,8 @@ static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user) OPAL_THREAD_LOCK(&peer->peer_lock); if(peer->peer_state == MCA_OOB_TCP_CLOSED) mca_oob_tcp_peer_start_connect(peer); + else if(peer->peer_state == MCA_OOB_TCP_CONNECTING) + mca_oob_tcp_peer_complete_connect(peer); OPAL_THREAD_UNLOCK(&peer->peer_lock); }