diff --git a/orte/mca/oob/tcp/oob_tcp.c b/orte/mca/oob/tcp/oob_tcp.c index c129de6242..1c38404fbd 100644 --- a/orte/mca/oob/tcp/oob_tcp.c +++ b/orte/mca/oob/tcp/oob_tcp.c @@ -83,7 +83,6 @@ OBJ_CLASS_INSTANCE( */ static int mca_oob_tcp_create_listen(void); -static int mca_oob_tcp_create_listen_thread(void); static void mca_oob_tcp_recv_handler(int sd, short flags, void* user); static void mca_oob_tcp_accept(void); @@ -170,8 +169,6 @@ static inline char* mca_oob_tcp_param_register_str( */ int mca_oob_tcp_component_open(void) { - char *listen_type; - #ifdef __WINDOWS__ WSADATA win_sock_data; if (WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0) { @@ -210,26 +207,7 @@ int mca_oob_tcp_component_open(void) mca_oob_tcp_component.tcp_rcvbuf = mca_oob_tcp_param_register_int("rcvbuf", 128*1024); - mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base, - "listen_mode", - "Mode for HNP to accept incoming connections: event, listen_thread", - false, - false, - "event", - &listen_type); - - if (0 == strcmp(listen_type, "event")) { - mca_oob_tcp_component.tcp_listen_type = OOB_TCP_EVENT; - } else if (0 == strcmp(listen_type, "listen_thread")) { - mca_oob_tcp_component.tcp_listen_type = OOB_TCP_LISTEN_THREAD; - } else { - opal_output(0, "Invalid value for oob_tcp_listen_mode parameter: %s", - listen_type); - return ORTE_ERROR; - } - /* initialize state */ - mca_oob_tcp_component.tcp_shutdown = false; mca_oob_tcp_component.tcp_listen_sd = -1; mca_oob_tcp_component.tcp_match_count = 0; return ORTE_SUCCESS; @@ -374,131 +352,6 @@ static int mca_oob_tcp_create_listen(void) } -static void* mca_oob_tcp_listen_thread(opal_object_t *obj) -{ - int sd, rc; - ompi_socklen_t addrlen = sizeof(struct sockaddr_in); - struct sockaddr_in addr; - opal_free_list_item_t *fl_item; - mca_oob_tcp_pending_connection_t *item; - - while (false == mca_oob_tcp_component.tcp_shutdown) { - sd = accept(mca_oob_tcp_component.tcp_listen_sd, - (struct sockaddr*)&addr, &addrlen); - if(sd < 0) { - if (mca_oob_tcp_component.tcp_shutdown) return NULL; - - if(ompi_socket_errno == EINTR) - continue; - if(ompi_socket_errno != EAGAIN || ompi_socket_errno != EWOULDBLOCK) - opal_output(0, "mca_oob_tcp_accept: accept() failed with errno %d.", ompi_socket_errno); - close(sd); - /* start a new listen socket */ - mca_oob_tcp_create_listen_thread(); - return NULL; - } - - OPAL_FREE_LIST_WAIT(&mca_oob_tcp_component.tcp_pending_connections_fl, - fl_item, rc); - item = (mca_oob_tcp_pending_connection_t*) fl_item; - item->fd = sd; - opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock); - opal_list_append(&mca_oob_tcp_component.tcp_pending_connections, - (opal_list_item_t*) item); - opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock); - } - - return NULL; -} - -static int mca_oob_tcp_listen_progress(void) -{ - int count = 0; - mca_oob_tcp_pending_connection_t *item; - struct sockaddr_in addr; - mca_oob_tcp_event_t* event; - - if (opal_list_get_size(&mca_oob_tcp_component.tcp_pending_connections)) { - opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock); - while (NULL != (item = (mca_oob_tcp_pending_connection_t*) - opal_list_remove_first(&mca_oob_tcp_component.tcp_pending_connections))) { - - /* setup socket options */ - mca_oob_tcp_set_socket_options(item->fd); - - /* log the accept */ - if(mca_oob_tcp_component.tcp_debug) { - opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_accept: %s:%d\n", - ORTE_NAME_ARGS(orte_process_info.my_name), - inet_ntoa(addr.sin_addr), - addr.sin_port); - } - - /* wait for receipt of peers process identifier to - complete this connection */ - event = OBJ_NEW(mca_oob_tcp_event_t); - opal_event_set(&event->event, item->fd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event); - opal_event_add(&event->event, 0); - OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_pending_connections_fl, - (opal_free_list_item_t *) item); - - count++; - } - opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock); - } - - return count; -} - - -static int mca_oob_tcp_create_listen_thread(void) -{ - struct sockaddr_in inaddr; - ompi_socklen_t addrlen; - - /* create a listen socket for incoming connections */ - mca_oob_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0); - if(mca_oob_tcp_component.tcp_listen_sd < 0) { - opal_output(0,"mca_oob_tcp_component_init: socket() failed with errno=%d", ompi_socket_errno); - return ORTE_ERROR; - } - - /* setup socket options */ - mca_oob_tcp_set_socket_options(mca_oob_tcp_component.tcp_listen_sd); - - /* bind address */ - memset(&inaddr, 0, sizeof(inaddr)); - inaddr.sin_family = AF_INET; - inaddr.sin_addr.s_addr = INADDR_ANY; - inaddr.sin_port = 0; - - if(bind(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) { - opal_output(0,"mca_oob_tcp_create_listen: bind() failed with errno=%d", ompi_socket_errno); - return ORTE_ERROR; - } - - /* resolve system assigned port */ - addrlen = sizeof(struct sockaddr_in); - if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) { - opal_output(0, "mca_oob_tcp_create_listen: getsockname() failed with errno=%d", ompi_socket_errno); - return ORTE_ERROR; - } - mca_oob_tcp_component.tcp_listen_port = inaddr.sin_port; - - /* setup listen backlog to maximum allowed by kernel */ - if(listen(mca_oob_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) { - opal_output(0, "mca_oob_tcp_component_init: listen() failed with errno=%d", ompi_socket_errno); - return ORTE_ERROR; - } - - /* start the listen thread */ - mca_oob_tcp_component.tcp_listen_thread.t_run = mca_oob_tcp_listen_thread; - mca_oob_tcp_component.tcp_listen_thread.t_arg = NULL; - - return opal_thread_start(&mca_oob_tcp_component.tcp_listen_thread); -} - - /* * Handle probe */ @@ -684,17 +537,9 @@ mca_oob_t* mca_oob_tcp_component_init(int* priority) memset(&mca_oob_tcp_component.tcp_send_event, 0, sizeof(opal_event_t)); /* create a listen socket */ - if (OOB_TCP_EVENT == mca_oob_tcp_component.tcp_listen_type) { - if(mca_oob_tcp_create_listen() != ORTE_SUCCESS) { - opal_output(0, "mca_oob_tcp_init: unable to create listen socket"); - return NULL; - } - } else if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) { - if (mca_oob_tcp_create_listen_thread() != ORTE_SUCCESS) { - opal_output(0, "mca_oob_tcp_init: unable to create listen thread"); - return NULL; - } - opal_progress_register(mca_oob_tcp_listen_progress); + if(mca_oob_tcp_create_listen() != ORTE_SUCCESS) { + opal_output(0, "mca_oob_tcp_init: unable to create listen socket\n"); + return NULL; } return &mca_oob_tcp; } @@ -1087,19 +932,10 @@ int mca_oob_tcp_fini(void) /* close listen socket */ if (mca_oob_tcp_component.tcp_listen_sd >= 0) { - if (OOB_TCP_EVENT == mca_oob_tcp_component.tcp_listen_type) { - opal_event_del(&mca_oob_tcp_component.tcp_recv_event); - close(mca_oob_tcp_component.tcp_listen_sd); - } else if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) { - void *data; - close(mca_oob_tcp_component.tcp_listen_sd); - opal_thread_join(&mca_oob_tcp_component.tcp_listen_thread, &data); - opal_progress_unregister(mca_oob_tcp_listen_progress); - } - + opal_event_del(&mca_oob_tcp_component.tcp_recv_event); + close(mca_oob_tcp_component.tcp_listen_sd); mca_oob_tcp_component.tcp_listen_sd = -1; } - opal_progress_unregister(mca_oob_tcp_listen_progress); /* cleanup all peers */ for(item = opal_list_remove_first(&mca_oob_tcp_component.tcp_peer_list); diff --git a/orte/mca/oob/tcp/oob_tcp.h b/orte/mca/oob/tcp/oob_tcp.h index 5ea8510b73..f66baffc5e 100644 --- a/orte/mca/oob/tcp/oob_tcp.h +++ b/orte/mca/oob/tcp/oob_tcp.h @@ -255,12 +255,6 @@ struct mca_oob_tcp_component_t { opal_condition_t tcp_match_cond; /**< condition variable used in finalize */ int tcp_match_count; /**< number of matched recvs in progress */ int tcp_debug; /**< debug level */ - bool tcp_shutdown; - enum { OOB_TCP_EVENT, OOB_TCP_LISTEN_THREAD } tcp_listen_type; - opal_thread_t tcp_listen_thread; - opal_free_list_t tcp_pending_connections_fl; - opal_list_t tcp_pending_connections; - opal_mutex_t tcp_pending_connections_lock; }; /** @@ -270,13 +264,6 @@ typedef struct mca_oob_tcp_component_t mca_oob_tcp_component_t; OMPI_COMP_EXPORT extern mca_oob_tcp_component_t mca_oob_tcp_component; - struct mca_oob_tcp_pending_connection_t { - opal_free_list_item_t super; - int fd; - }; - typedef struct mca_oob_tcp_pending_connection_t mca_oob_tcp_pending_connection_t; - OBJ_CLASS_DECLARATION(mca_oob_tcp_pending_connection_t); - #if defined(c_plusplus) || defined(__cplusplus) }