1
1

Some code cleanups from Brian to clarify port selection and opening logic

This commit was SVN r18055.
Этот коммит содержится в:
Ralph Castain 2008-04-01 12:39:02 +00:00
родитель 9c416a9845
Коммит 3e8846d685
2 изменённых файлов: 484 добавлений и 375 удалений

Просмотреть файл

@ -189,9 +189,8 @@ static int oob_tcp_windows_progress_callback( void )
*/
int mca_oob_tcp_component_open(void)
{
int value = 0;
int tmp, value = 0;
char *listen_type, *str = NULL;
int tmp;
#ifdef __WINDOWS__
WSADATA win_sock_data;
@ -222,15 +221,11 @@ int mca_oob_tcp_component_open(void)
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_completed, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_cond, opal_condition_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_listen_thread, opal_thread_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_available_devices, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections_fl, opal_free_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_listen_thread, opal_thread_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_copy_out_connections, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_copy_in_connections, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_connections_return, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_connections_return_copy, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_connections_lock, opal_mutex_t);
/* register oob module parameters */
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
@ -301,7 +296,8 @@ int mca_oob_tcp_component_open(void)
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
"connect_sleep",
"Enable (1) / disable (0) random sleep for connection wireup",
"Enable (1) / disable (0) random sleep for "
"connection wireup.",
false,
false,
1,
@ -309,7 +305,8 @@ int mca_oob_tcp_component_open(void)
mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base,
"listen_mode",
"Mode for HNP to accept incoming connections: event, listen_thread",
"Mode for HNP to accept incoming connections: "
"event, listen_thread.",
false,
false,
"event",
@ -327,34 +324,28 @@ int mca_oob_tcp_component_open(void)
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
"listen_thread_max_queue",
"High water mark for queued accepted socket list size",
"High water mark for queued accepted socket "
"list size. Used only when listen_mode is "
"listen_thread.",
false,
false,
10,
&mca_oob_tcp_component.tcp_copy_max_size);
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
"listen_thread_max_time",
"Maximum amount of time (in milliseconds) to wait between processing accepted socket list",
"listen_thread_wait_time",
"Time in milliseconds to wait before "
"actively checking for new connections when "
"listen_mode is listen_thread.",
false,
false,
10,
&tmp);
mca_oob_tcp_component.tcp_listen_thread_tv.tv_sec = tmp / (1000);
mca_oob_tcp_component.tcp_listen_thread_tv.tv_usec = (tmp % 1000) * 1000;
#if OPAL_TIMER_USEC_NATIVE
mca_oob_tcp_component.tcp_copy_delta = tmp * 1000;
#else
mca_oob_tcp_component.tcp_copy_delta = tmp *
opal_timer_base_get_freq() / 1000;
#endif
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
"accept_spin_count",
"Number of times to let accept return EWOULDBLOCK before updating accepted socket list",
false,
false,
10,
&mca_oob_tcp_component.tcp_copy_spin_count);
mca_oob_tcp_component.tcp_listen_thread_num_sockets = 0;
mca_oob_tcp_component.tcp_listen_thread_sds[0] = -1;
mca_oob_tcp_component.tcp_listen_thread_sds[1] = -1;
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
"port_min_v4", "Starting port allowed (IPv4)",
@ -385,8 +376,6 @@ int mca_oob_tcp_component_open(void)
mca_oob_tcp_component.tcp_listen_sd = -1;
mca_oob_tcp_component.tcp_match_count = 0;
mca_oob_tcp_component.tcp_last_copy_time = 0;
#if defined(__WINDOWS__)
/* Register the libevent callback which will trigger the OOB
* completion callbacks. */
@ -416,14 +405,11 @@ int mca_oob_tcp_component_close(void)
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_available_devices);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections_lock);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_connections_return_copy);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_connections_lock);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_connections_return);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_copy_out_connections);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections_fl);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_listen_thread);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_available_devices);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_cond);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_lock);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_completed);
@ -441,18 +427,47 @@ int mca_oob_tcp_component_close(void)
}
/*
* Called by mca_oob_tcp_recv_handler() when the TCP listen
* socket has pending connection requests. Accept incoming
* requests and queue for completion of the connection handshake.
/* Called by mca_oob_tcp_accept() and mca_oob_tcp_thread_handler() on
* a socket that has been accepted. This call finishes processing the
* socket, including setting socket options and registering for the
* OOB-level connection handshake. Used by both the threaded and
* event listen modes.
*/
static void
mca_oob_tcp_create_connection(const int accepted_fd,
const struct sockaddr *addr)
{
mca_oob_tcp_event_t* event;
/* setup socket options */
mca_oob_tcp_set_socket_options(accepted_fd);
/* log the accept */
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
opal_output(0, "%s mca_oob_tcp_accept: %s:%d\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
opal_net_get_hostname(addr),
opal_net_get_port(addr));
}
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW(mca_oob_tcp_event_t);
opal_event_set(&event->event, accepted_fd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);
opal_event_add(&event->event, 0);
}
/*
* Called by mca_oob_tcp_recv_handler() when the TCP listen socket
* has pending connection requests. Accept incoming requests and
* queue for completion of the connection handshake. Will not be
* called when listen_mode is listen_thread.
*/
static void mca_oob_tcp_accept(int incoming_sd)
{
while(true) {
struct sockaddr_storage addr;
opal_socklen_t addrlen = sizeof(struct sockaddr_storage);
mca_oob_tcp_event_t* event;
int sd;
sd = accept(incoming_sd, (struct sockaddr*)&addr, &addrlen);
@ -467,31 +482,25 @@ static void mca_oob_tcp_accept(int incoming_sd)
return;
}
/* setup socket options */
mca_oob_tcp_set_socket_options(sd);
/* log the accept */
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
opal_output(0, "%s mca_oob_tcp_accept: %s:%d\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
opal_net_get_hostname((struct sockaddr*) &addr),
opal_net_get_port((struct sockaddr*) &addr));
}
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW(mca_oob_tcp_event_t);
opal_event_set(&event->event, sd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);
opal_event_add(&event->event, 0);
mca_oob_tcp_create_connection(sd, (struct sockaddr*) &addr);
}
}
/*
* Create a listen socket and bind to all interfaces
* Create a listen socket in the specified af_family and bind to all
* interfaces.
*
* At one time, this also registered a callback with the event library
* for when connections were received on the listen socket. This is
* no longer the case -- the caller must register any events required.
*
* Called by both the threaded and event based listen modes.
*/
static int mca_oob_tcp_create_listen(int *target_sd, uint16_t af_family)
static int
mca_oob_tcp_create_listen(int *target_sd, uint16_t af_family)
{
int flags;
int flags, index, range, port;
struct sockaddr_storage inaddr;
opal_socklen_t addrlen;
@ -508,6 +517,8 @@ static int mca_oob_tcp_create_listen(int *target_sd, uint16_t af_family)
/* setup socket options */
mca_oob_tcp_set_socket_options(*target_sd);
/* Set some more options and fill in the family / address
information. Set to bind to the any address */
#if OPAL_WANT_IPV6
{
struct addrinfo hints, *res = NULL;
@ -550,22 +561,23 @@ static int mca_oob_tcp_create_listen(int *target_sd, uint16_t af_family)
addrlen = sizeof(struct sockaddr_in);
#endif
{ /* Don't reuse ports */
int flg = 0;
if (setsockopt (*target_sd, SOL_SOCKET, SO_REUSEADDR, (void*)&flg, sizeof (flg)) < 0) {
/* Disable reusing ports */
flags = 0;
if (setsockopt (*target_sd, SOL_SOCKET, SO_REUSEADDR, (void*)&flags, sizeof(flags)) < 0) {
opal_output(0, "mca_oob_tcp_create_listen: unable to unset the "
"SO_REUSEADDR option (%s:%d)\n",
strerror(opal_socket_errno), opal_socket_errno);
CLOSE_THE_SOCKET(*target_sd);
return ORTE_ERROR;
}
}
{
int index, range, port;
/* If an explicit range of ports was given, find the first open
port in the range. Otherwise, tcp_port_min will be 0, which
means "pick any port" */
if (AF_INET == af_family) {
range = mca_oob_tcp_component.tcp_port_range;
port = mca_oob_tcp_component.tcp_port_min;
}
#if OPAL_WANT_IPV6
if (AF_INET6 == af_family) {
range = mca_oob_tcp_component.tcp6_port_range;
@ -574,22 +586,27 @@ static int mca_oob_tcp_create_listen(int *target_sd, uint16_t af_family)
#endif /* OPAL_WANT_IPV6 */
for (index = 0; index < range; index++ ) {
#if OPAL_WANT_IPV6
((struct sockaddr_in6*) &inaddr)->sin6_port = port + index;
#else
if (AF_INET == af_family) {
((struct sockaddr_in*) &inaddr)->sin_port = port + index;
#endif /* OPAL_WANT_IPV6 */
} else if (AF_INET6 == af_family) {
((struct sockaddr_in6*) &inaddr)->sin6_port = port + index;
} else {
return ORTE_ERROR;
}
if(bind(*target_sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
if( (EADDRINUSE == opal_socket_errno) || (EADDRNOTAVAIL == opal_socket_errno) ) {
continue;
}
opal_output(0, "bind() failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno );
strerror(opal_socket_errno),
opal_socket_errno );
CLOSE_THE_SOCKET(*target_sd);
return ORTE_ERROR;
}
goto socket_binded;
}
if (AF_INET == af_family ) {
opal_output(0, "bind() failed: no port available in the range [%d..%d]",
mca_oob_tcp_component.tcp_port_min,
@ -602,11 +619,12 @@ static int mca_oob_tcp_create_listen(int *target_sd, uint16_t af_family)
mca_oob_tcp_component.tcp6_port_min + range);
}
#endif /* OPAL_WANT_IPV6 */
CLOSE_THE_SOCKET(*target_sd);
return ORTE_ERROR;
}
socket_binded:
/* resolve system assigned port */
/* resolve assigned port */
if (getsockname(*target_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
opal_output(0, "mca_oob_tcp_create_listen: getsockname(): %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
@ -646,48 +664,61 @@ static int mca_oob_tcp_create_listen(int *target_sd, uint16_t af_family)
}
}
/* register listen port */
if (AF_INET == af_family) {
opal_event_set(&mca_oob_tcp_component.tcp_recv_event,
*target_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_oob_tcp_recv_handler,
0);
opal_event_add(&mca_oob_tcp_component.tcp_recv_event, 0);
}
#if OPAL_WANT_IPV6
if (AF_INET6 == af_family) {
opal_event_set(&mca_oob_tcp_component.tcp6_recv_event,
*target_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_oob_tcp_recv_handler,
0);
opal_event_add(&mca_oob_tcp_component.tcp6_recv_event, 0);
}
#endif /* OPAL_WANT_IPV6 */
return ORTE_SUCCESS;
}
static void* mca_oob_tcp_listen_thread(opal_object_t *obj)
/*
* The listen thread created when listen_mode is threaded. Accepts
* incoming connections and places them in a queue for further
* processing. Finishing the accepted connection is done in the main
* thread to maintain thread safety even when the event library and
* most of ORTE is in single threaded mode.
*
* Runs until mca_oob_tcp_compnent.tcp_shutdown is set to true.
*/
static void*
mca_oob_tcp_listen_thread(opal_object_t *obj)
{
int rc, count;
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
int rc, count, i, max, accepted_connections, need_write;
opal_socklen_t addrlen = sizeof(struct sockaddr_storage);
opal_free_list_item_t *fl_item;
mca_oob_tcp_pending_connection_t *item;
mca_oob_tcp_pending_connection_t *pending_connection;
struct timeval timeout;
fd_set readfds;
opal_list_t local_accepted_list;
opal_free_list_t pending_connections_fl;
OBJ_CONSTRUCT(&local_accepted_list, opal_list_t);
OBJ_CONSTRUCT(&pending_connections_fl, opal_free_list_t);
opal_free_list_init(&pending_connections_fl,
sizeof(mca_oob_tcp_pending_connection_t),
OBJ_CLASS(mca_oob_tcp_pending_connection_t),
16, /* initial number */
-1, /* maximum number */
16); /* increment to grow by */
while (false == mca_oob_tcp_component.tcp_shutdown) {
count = 0;
FD_ZERO(&readfds);
FD_SET(mca_oob_tcp_component.tcp_listen_sd, &readfds);
timeout.tv_sec = 0;
timeout.tv_usec = 10000;
max = -1;
for (i = 0 ; i < mca_oob_tcp_component.tcp_listen_thread_num_sockets ; ++i) {
int sd = mca_oob_tcp_component.tcp_listen_thread_sds[i];
FD_SET(sd, &readfds);
max = (sd > max) ? sd : max;
}
/* XXX - FIX ME - should really slowly back this off as
connections are done. Will reduce amount of polling in the
HNP after the initial connection storm. Would also require
some type of wakeup mechanism for when shutdown happens */
timeout.tv_sec = mca_oob_tcp_component.tcp_listen_thread_tv.tv_sec;
timeout.tv_usec = mca_oob_tcp_component.tcp_listen_thread_tv.tv_usec;
rc = select(mca_oob_tcp_component.tcp_listen_sd + 1, &readfds,
NULL, NULL, &timeout);
/* Block in a select for a short (10ms) amount of time to give
the other thread a chance to do some work. If a connection
comes in, we'll get woken up right away. */
rc = select(max + 1, &readfds, NULL, NULL, &timeout);
if (rc < 0) {
if (EAGAIN != opal_socket_errno && EINTR != opal_socket_errno) {
perror("select");
@ -695,189 +726,219 @@ static void* mca_oob_tcp_listen_thread(opal_object_t *obj)
continue;
}
while (count < mca_oob_tcp_component.tcp_copy_spin_count &&
opal_list_get_size(&mca_oob_tcp_component.tcp_copy_in_connections) <
(size_t) mca_oob_tcp_component.tcp_copy_max_size) {
OPAL_FREE_LIST_WAIT(&mca_oob_tcp_component.tcp_pending_connections_fl,
fl_item, rc);
item = (mca_oob_tcp_pending_connection_t*) fl_item;
item->fd = accept(mca_oob_tcp_component.tcp_listen_sd,
(struct sockaddr*)&(item->addr), &addrlen);
if(item->fd < 0) {
OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_pending_connections_fl,
fl_item);
if (mca_oob_tcp_component.tcp_shutdown) return NULL;
/* Spin accepting connections until either our queue is full
or all active listen sockets do not have any incoming
connections */
do {
accepted_connections = 0;
for (i = 0 ; i < mca_oob_tcp_component.tcp_listen_thread_num_sockets ; ++i) {
int sd = mca_oob_tcp_component.tcp_listen_thread_sds[i];
if(opal_socket_errno != EAGAIN || opal_socket_errno != EWOULDBLOCK) {
opal_output(0, "mca_oob_tcp_accept: accept() failed: %s (%d).",
strerror(opal_socket_errno), opal_socket_errno);
CLOSE_THE_SOCKET(item->fd);
return NULL;
/* make sure we have space for an accepted connection */
if (opal_list_get_size(&local_accepted_list) >=
(size_t) mca_oob_tcp_component.tcp_copy_max_size) {
goto recover;
}
/* Can't wait because our thread is the only one that
can put things back in the free list */
OPAL_FREE_LIST_GET(&pending_connections_fl, fl_item, rc);
if (NULL == fl_item) goto recover;
pending_connection = (mca_oob_tcp_pending_connection_t*) fl_item;
pending_connection->fd = accept(sd,
(struct sockaddr*)&(pending_connection->addr),
&addrlen);
if (pending_connection->fd < 0) {
OPAL_FREE_LIST_RETURN(&pending_connections_fl, fl_item);
if (mca_oob_tcp_component.tcp_shutdown) goto done;
if (opal_socket_errno != EAGAIN ||
opal_socket_errno != EWOULDBLOCK) {
opal_output(0, "mca_oob_tcp_accept: accept() failed: %s (%d).",
strerror(opal_socket_errno), opal_socket_errno);
CLOSE_THE_SOCKET(pending_connection->fd);
goto done;
}
count++;
continue;
}
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
opal_output(0, "%s mca_oob_tcp_listen_thread: (%d, %d) %s:%d\n",
opal_output(0,
"%s mca_oob_tcp_listen_thread: new connection: "
"(%d, %d) %s:%d\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
item->fd, opal_socket_errno,
inet_ntoa(item->addr.sin_addr),
item->addr.sin_port);
pending_connection->fd, opal_socket_errno,
opal_net_get_hostname((struct sockaddr*) &pending_connection->addr),
opal_net_get_port((struct sockaddr*) &pending_connection->addr));
}
opal_list_append(&mca_oob_tcp_component.tcp_copy_in_connections,
(opal_list_item_t*) item);
opal_list_append(&local_accepted_list, (opal_list_item_t*) pending_connection);
accepted_connections++;
}
} while (accepted_connections > 0);
if (0 < opal_list_get_size(&mca_oob_tcp_component.tcp_copy_in_connections)) {
opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);
/* recover from a loop of accepting resources. Give any new
connections to the main thread and reap any available
connection fragments */
recover:
need_write = 0;
if (0 != opal_list_get_size(&local_accepted_list) ||
0 != opal_list_get_size(&mca_oob_tcp_component.tcp_connections_return)) {
opal_mutex_lock(&mca_oob_tcp_component.tcp_connections_lock);
/* copy local accepted list into shared list */
if (0 != opal_list_get_size(&local_accepted_list)) {
opal_list_join(&mca_oob_tcp_component.tcp_pending_connections,
opal_list_get_end(&mca_oob_tcp_component.tcp_pending_connections),
&mca_oob_tcp_component.tcp_copy_in_connections);
while (NULL != (fl_item = (opal_free_list_item_t*) opal_list_remove_first(&mca_oob_tcp_component.tcp_connections_return_copy))) {
OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_pending_connections_fl, fl_item);
&local_accepted_list);
}
opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);
/* If the pending connection list is now at high
watermark, will signal the other thread */
if (opal_list_get_size(&mca_oob_tcp_component.tcp_pending_connections) >=
(size_t) mca_oob_tcp_component.tcp_copy_max_size) {
need_write = 1;
}
/* As an optimization, we could probably copy into a local
list, exit the lock, then free the pending connections,
but I'm not convinced that would be any faster */
while (NULL != (fl_item = (opal_free_list_item_t*)
opal_list_remove_first(&mca_oob_tcp_component.tcp_connections_return))) {
OPAL_FREE_LIST_RETURN(&pending_connections_fl, fl_item);
}
opal_mutex_unlock(&mca_oob_tcp_component.tcp_connections_lock);
}
if (need_write) {
char buf[1] = { '\0' };
#ifdef HAVE_PIPE
write(mca_oob_tcp_component.tcp_connections_pipe[1], buf, 1);
#endif
}
}
done:
OBJ_DESTRUCT(&local_accepted_list);
OBJ_DESTRUCT(&pending_connections_fl);
return NULL;
}
/* called from opal_progress() to create the oob contact information
for the file descriptors accepted() by the accept thread. */
static int mca_oob_tcp_listen_progress(void)
/*
* Handler for accepting connections from the listen thread. Called by
* timer or pipe signal.
*/
static void
mca_oob_tcp_accept_thread_handler(int sd, short flags, void* user)
{
int count = 0;
mca_oob_tcp_pending_connection_t *item;
mca_oob_tcp_event_t* event;
#if OPAL_TIMER_USEC_NATIVE
opal_timer_t now = opal_timer_base_get_usec();
#else
opal_timer_t now = opal_timer_base_get_cycles();
#endif /* OPAL_TIMER_USEC_NATIVE */
/* probably more efficient to use the user pointer for this rather
than always recreating the list. Future work. */
opal_list_t local_accepted_list;
opal_list_t local_return_list;
mca_oob_tcp_pending_connection_t *new_connection;
struct timeval tv;
/* if we've not pulled pending connections for a while OR we've
hit the high water mark of pending connections, grab all the
pending connections */
if ((now - mca_oob_tcp_component.tcp_last_copy_time >
mca_oob_tcp_component.tcp_copy_delta) ||
((size_t) mca_oob_tcp_component.tcp_copy_max_size <
opal_list_get_size(&mca_oob_tcp_component.tcp_pending_connections))) {
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {
opal_output(0, "%s in accept_thread_handler: %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), flags);
}
/* copy the pending connections from the list the accept
thread is inserting into into a temporary list for us to
process from. Then copy the returned free list items into
that thread's return list, so it can free them soonish.
This is an O(1) operation, so we minimize the lock time. */
opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);
opal_list_join(&mca_oob_tcp_component.tcp_copy_out_connections,
opal_list_get_end(&mca_oob_tcp_component.tcp_copy_out_connections),
OBJ_CONSTRUCT(&local_accepted_list, opal_list_t);
OBJ_CONSTRUCT(&local_return_list, opal_list_t);
/* read the byte waiting - if we don't have pipe, this can't ever
happen, so no need for yet another #if */
if (OPAL_EV_READ == flags) {
char buf[1];
read(sd, buf, 1);
}
/* Copy in all pending connections. opal_list_join is O(1), so
this is pretty cheap. Size is pretty friendly to thread
safety, and join will properly handle the case where the list
magically got shorter. */
if (0 != opal_list_get_size(&mca_oob_tcp_component.tcp_pending_connections)) {
opal_mutex_lock(&mca_oob_tcp_component.tcp_connections_lock);
opal_list_join(&local_accepted_list,
opal_list_get_end(&local_accepted_list),
&mca_oob_tcp_component.tcp_pending_connections);
opal_list_join(&mca_oob_tcp_component.tcp_connections_return_copy,
opal_list_get_end(&mca_oob_tcp_component.tcp_connections_return_copy),
&mca_oob_tcp_component.tcp_connections_return);
opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);
/* process al the connections */
while (NULL != (item = (mca_oob_tcp_pending_connection_t*)
opal_list_remove_first(&mca_oob_tcp_component.
tcp_copy_out_connections))) {
/* setup socket options */
mca_oob_tcp_set_socket_options(item->fd);
/* log the accept */
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
opal_output(0, "%s mca_oob_tcp_listen_progress: %s:%d\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
inet_ntoa(item->addr.sin_addr),
item->addr.sin_port);
opal_mutex_unlock(&mca_oob_tcp_component.tcp_connections_lock);
}
/* wait for receipt of peers process identifier to
complete this connection */
event = OBJ_NEW(mca_oob_tcp_event_t);
opal_event_set(&event->event, item->fd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);
opal_event_add(&event->event, 0);
/* put on the needs returning list */
opal_list_append(&mca_oob_tcp_component.tcp_connections_return,
(opal_list_item_t*) item);
count++;
/* process all the connections */
while (NULL != (new_connection = (mca_oob_tcp_pending_connection_t*)
opal_list_remove_first(&local_accepted_list))) {
mca_oob_tcp_create_connection(new_connection->fd,
(struct sockaddr*) &(new_connection->addr));
opal_list_append(&local_return_list, (opal_list_item_t*) new_connection);
}
mca_oob_tcp_component.tcp_last_copy_time = now;
/* Copy all processed connections into the return list */
if (0 != opal_list_get_size(&local_return_list)) {
opal_mutex_lock(&mca_oob_tcp_component.tcp_connections_lock);
opal_list_join(&mca_oob_tcp_component.tcp_connections_return,
opal_list_get_end(&mca_oob_tcp_component.tcp_connections_return),
&local_return_list);
opal_mutex_unlock(&mca_oob_tcp_component.tcp_connections_lock);
}
return count;
OBJ_DESTRUCT(&local_accepted_list);
OBJ_DESTRUCT(&local_return_list);
tv.tv_sec = mca_oob_tcp_component.tcp_listen_thread_tv.tv_sec;
tv.tv_usec = mca_oob_tcp_component.tcp_listen_thread_tv.tv_usec;
#ifdef HAVE_PIPE
opal_event_set(&mca_oob_tcp_component.tcp_listen_thread_event,
mca_oob_tcp_component.tcp_connections_pipe[0],
OPAL_EV_READ,
mca_oob_tcp_accept_thread_handler, NULL);
#else
opal_event_set(&mca_oob_tcp_component.tcp_listen_thread_event,
-1, 0,
mca_oob_tcp_accept_thread_handler, NULL);
#endif
opal_event_add(&mca_oob_tcp_component.tcp_listen_thread_event, &tv);
}
static int mca_oob_tcp_create_listen_thread(void)
/*
* Create the actual listen thread. Should only be called once.
*/
static int
mca_oob_tcp_create_listen_thread(void)
{
struct sockaddr_in inaddr;
opal_socklen_t addrlen;
int flags;
struct timeval tv;
/* create a listen socket for incoming connections */
mca_oob_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0);
if(mca_oob_tcp_component.tcp_listen_sd < 0) {
opal_output(0,"mca_oob_tcp_component_init: socket() failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
#ifdef HAVE_PIPE
if (pipe(mca_oob_tcp_component.tcp_connections_pipe) < 0) {
opal_output(0, "mca_oob_tcp_create_listen_thread: pipe failed: %d", errno);
return ORTE_ERROR;
}
/* setup socket options */
mca_oob_tcp_set_socket_options(mca_oob_tcp_component.tcp_listen_sd);
/* bind address */
memset(&inaddr, 0, sizeof(inaddr));
inaddr.sin_family = AF_INET;
inaddr.sin_addr.s_addr = INADDR_ANY;
inaddr.sin_port = 0;
if(bind(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
opal_output(0,"mca_oob_tcp_create_listen: bind() failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
}
/* resolve system assigned port */
addrlen = sizeof(struct sockaddr_in);
if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
opal_output(0, "mca_oob_tcp_create_listen: getsockname() failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
}
mca_oob_tcp_component.tcp_listen_port = inaddr.sin_port;
/* setup listen backlog to maximum allowed by kernel */
if(listen(mca_oob_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) {
opal_output(0, "mca_oob_tcp_component_init: listen() failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
}
/* set socket up to be non-blocking, otherwise accept could block */
if((flags = fcntl(mca_oob_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) {
opal_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
} else {
flags |= O_NONBLOCK;
if(fcntl(mca_oob_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) {
opal_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
}
}
#endif
/* start the listen thread */
mca_oob_tcp_component.tcp_listen_thread.t_run = mca_oob_tcp_listen_thread;
mca_oob_tcp_component.tcp_listen_thread.t_arg = NULL;
/* register event for read and timeout */
tv.tv_sec = mca_oob_tcp_component.tcp_listen_thread_tv.tv_sec;
tv.tv_usec = mca_oob_tcp_component.tcp_listen_thread_tv.tv_usec;
#ifdef HAVE_PIPE
opal_event_set(&mca_oob_tcp_component.tcp_listen_thread_event,
mca_oob_tcp_component.tcp_connections_pipe[0],
OPAL_EV_READ,
mca_oob_tcp_accept_thread_handler, NULL);
#else
opal_event_set(&mca_oob_tcp_component.tcp_listen_thread_event,
-1, 0,
mca_oob_tcp_accept_thread_handler, NULL);
#endif
opal_event_add(&mca_oob_tcp_component.tcp_listen_thread_event, &tv);
return opal_thread_start(&mca_oob_tcp_component.tcp_listen_thread);
}
@ -914,8 +975,11 @@ static void mca_oob_tcp_recv_probe(int sd, mca_oob_tcp_hdr_t* hdr)
CLOSE_THE_SOCKET(sd);
}
/*
* Handle connection request
* Complete the OOB-level handshake to establish a connection with
* another peer. Called when the remote peer replies with his process
* identifier. Used in both the threaded and event listen modes.
*/
static void mca_oob_tcp_recv_connect(int sd, mca_oob_tcp_hdr_t* hdr)
{
@ -966,11 +1030,14 @@ static void mca_oob_tcp_recv_connect(int sd, mca_oob_tcp_hdr_t* hdr)
}
}
/*
* Event callback when there is data available on the registered
* socket to recv.
* socket to recv. This is called for the listen sockets to accept an
* incoming connection, on new sockets trying to complete the software
* connection process, and for probes. Data on an established
* connection is handled elsewhere.
*/
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
{
mca_oob_tcp_hdr_t hdr;
@ -1030,9 +1097,14 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
/*
* Component initialization - create a module.
* (1) initialize static resources
* (2) create listen socket
* Component initialization - create a module and initialize the
* static resources associated with that module.
*
* Also initializes the list of devices that will be used/supported by
* the module, using the if_include and if_exclude variables. This is
* the only place that this sorting should occur -- all other places
* should use the tcp_avaiable_devices list. This is a change from
* previous versions of this component.
*/
mca_oob_t* mca_oob_tcp_component_init(int* priority)
{
@ -1046,6 +1118,13 @@ mca_oob_t* mca_oob_tcp_component_init(int* priority)
if(opal_ifcount() <= 0)
return NULL;
/* Which interfaces should we use? Start by building a list of
all devices that meet the requirements of the if_include and
if_exclude list. This might include local and non-local
interfaces mixed together. After that sorting is done, if there
is a mix of devices, we go through the devices that survived
the initial sort and remove all the local devices (since we
have non-local devices to use). */
for (i = opal_ifbegin() ; i > 0 ; i = opal_ifnext(i)) {
char name[32];
mca_oob_tcp_device_t *dev;
@ -1129,10 +1208,10 @@ mca_oob_t* mca_oob_tcp_component_init(int* priority)
return &mca_oob_tcp;
}
/*
* Attempt to resolve peer name.
*/
int mca_oob_tcp_resolve(mca_oob_tcp_peer_t* peer)
{
mca_oob_tcp_addr_t* addr = NULL;
@ -1153,7 +1232,9 @@ int mca_oob_tcp_resolve(mca_oob_tcp_peer_t* peer)
/*
* Setup contact information in the registry.
* Ready the TCP module for connections. This includes creating
* listen sockets for both IPv4 and IPv6 and (possibly) starting the
* connection listen thread.
*/
int mca_oob_tcp_init(void)
{
@ -1177,29 +1258,16 @@ int mca_oob_tcp_init(void)
/* get my jobid */
jobid = ORTE_PROC_MY_NAME->jobid;
/* create a listen socket */
if ((OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) &&
orte_process_info.hnp) {
if (mca_oob_tcp_create_listen_thread() != ORTE_SUCCESS) {
opal_output(0, "mca_oob_tcp_init: unable to create listen thread");
return ORTE_ERROR;
}
opal_free_list_init(&mca_oob_tcp_component.tcp_pending_connections_fl,
sizeof(mca_oob_tcp_pending_connection_t),
OBJ_CLASS(mca_oob_tcp_pending_connection_t),
16, /* initial number */
-1, /* maximum number */
16); /* increment to grow by */
opal_progress_register(mca_oob_tcp_listen_progress);
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {
opal_output(0, "%s accepting connections via listen thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
} else {
/* fix up the listen_type, since we might have been in thread,
but can't do that since we weren't the HNP. */
/* Fix up the listen type. This is the first call into the OOB in
which the orte_process_info.hnp field is reliably set. The
listen_mode should only be listen_thread for the HNP -- all
others should use the traditional event library. */
if (!orte_process_info.hnp) {
mca_oob_tcp_component.tcp_listen_type = OOB_TCP_EVENT;
}
/* Create an IPv4 listen socket and either register with the event
engine or give to the listen thread */
rc = mca_oob_tcp_create_listen(&mca_oob_tcp_component.tcp_listen_sd,
AF_INET);
if (ORTE_SUCCESS != rc &&
@ -1209,6 +1277,22 @@ int mca_oob_tcp_init(void)
"mca_oob_tcp_init: unable to create IPv4 listen socket: %s\n",
opal_strerror(rc));
}
if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) {
int idx = mca_oob_tcp_component.tcp_listen_thread_num_sockets++;
mca_oob_tcp_component.tcp_listen_thread_sds[idx] =
mca_oob_tcp_component.tcp_listen_sd;
} else {
opal_event_set(&mca_oob_tcp_component.tcp_recv_event,
mca_oob_tcp_component.tcp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_oob_tcp_recv_handler,
0);
opal_event_add(&mca_oob_tcp_component.tcp_recv_event, 0);
}
/* Create an IPv6 listen socket (if IPv6 is enabled, of course)
and either register with the event engine or give to the listen
thread */
#if OPAL_WANT_IPV6
rc = mca_oob_tcp_create_listen(&mca_oob_tcp_component.tcp6_listen_sd,
AF_INET6);
@ -1219,7 +1303,34 @@ int mca_oob_tcp_init(void)
"mca_oob_tcp_init: unable to create IPv6 listen socket: %s\n",
opal_strerror(rc));
}
if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) {
int idx = mca_oob_tcp_component.tcp_listen_thread_num_sockets++;
mca_oob_tcp_component.tcp_listen_thread_sds[idx] =
mca_oob_tcp_component.tcp6_listen_sd;
} else {
opal_event_set(&mca_oob_tcp_component.tcp6_recv_event,
mca_oob_tcp_component.tcp6_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_oob_tcp_recv_handler,
0);
opal_event_add(&mca_oob_tcp_component.tcp6_recv_event, 0);
}
#endif
/* Finish up by either printing a nice message (event library) or
initializing the listen thread (listen thread) */
if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) {
rc = mca_oob_tcp_create_listen_thread();
if (ORTE_SUCCESS != rc) {
opal_output(0, "Unable to create listen thread: %d\n", rc);
return rc;
}
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {
opal_output(0, "%s accepting connections via listen thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
} else {
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {
opal_output(0, "%s accepting connections via event library",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
@ -1236,36 +1347,36 @@ int mca_oob_tcp_init(void)
int mca_oob_tcp_fini(void)
{
opal_list_item_t *item;
void *data;
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
opal_event_disable(); /* disable event processing */
/* close listen socket */
/* shut down the listening system */
if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) {
mca_oob_tcp_component.tcp_shutdown = true;
opal_thread_join(&mca_oob_tcp_component.tcp_listen_thread, &data);
opal_event_del(&mca_oob_tcp_component.tcp_listen_thread_event);
} else {
if (mca_oob_tcp_component.tcp_listen_sd >= 0) {
if (OOB_TCP_EVENT == mca_oob_tcp_component.tcp_listen_type) {
opal_event_del(&mca_oob_tcp_component.tcp_recv_event);
CLOSE_THE_SOCKET(mca_oob_tcp_component.tcp_listen_sd);
mca_oob_tcp_component.tcp_listen_sd = -1;
#if OPAL_WANT_IPV6
}
if (mca_oob_tcp_component.tcp6_listen_sd >= 0) {
opal_event_del(&mca_oob_tcp_component.tcp6_recv_event);
}
}
/* close listen socket */
if (mca_oob_tcp_component.tcp_listen_sd >= 0) {
CLOSE_THE_SOCKET(mca_oob_tcp_component.tcp_listen_sd);
mca_oob_tcp_component.tcp_listen_sd = -1;
}
#if OPAL_WANT_IPV6
if (mca_oob_tcp_component.tcp6_listen_sd >= 0) {
CLOSE_THE_SOCKET(mca_oob_tcp_component.tcp6_listen_sd);
mca_oob_tcp_component.tcp6_listen_sd = -1;
}
#endif /* OPAL_WANT_IPV6 */
} else if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) {
void *data;
/* adi@2007-04-12: Bug, FIXME:
* once the thread listener is IPv6 capable, don't forget to
* close the v6 socket
*/
mca_oob_tcp_component.tcp_shutdown = true;
CLOSE_THE_SOCKET(mca_oob_tcp_component.tcp_listen_sd);
opal_thread_join(&mca_oob_tcp_component.tcp_listen_thread, &data);
opal_progress_unregister(mca_oob_tcp_listen_progress);
}
mca_oob_tcp_component.tcp_listen_sd = -1;
}
/* cleanup all peers */
for(item = opal_list_remove_first(&mca_oob_tcp_component.tcp_peer_list);

Просмотреть файл

@ -227,19 +227,18 @@ struct mca_oob_tcp_component_t {
opal_list_t tcp_available_devices;
opal_thread_t tcp_listen_thread;
opal_free_list_t tcp_pending_connections_fl;
opal_list_t tcp_pending_connections;
opal_list_t tcp_copy_out_connections;
opal_list_t tcp_copy_in_connections;
opal_list_t tcp_connections_return;
opal_list_t tcp_connections_return_copy;
opal_mutex_t tcp_pending_connections_lock;
opal_thread_t tcp_listen_thread; /** handle to the listening thread */
opal_list_t tcp_pending_connections; /**< List of accepted connections waiting for processing */
opal_list_t tcp_connections_return; /**< List of connection fragments being returned to accept thread */
opal_mutex_t tcp_connections_lock; /**< Lock protecting pending_connections and connections_return */
int tcp_connections_pipe[2];
opal_event_t tcp_listen_thread_event;
int tcp_copy_max_size; /**< Max size of the copy list before copying must commence */
int tcp_listen_thread_num_sockets; /**< Number of sockets in tcp_listen_thread_sds */
int tcp_listen_thread_sds[2]; /**< Room for IPv4 and IPv6. Might need to make more dynamic. */
struct timeval tcp_listen_thread_tv; /**< Timeout when using listen thread */
opal_timer_t tcp_last_copy_time;
opal_timer_t tcp_copy_delta;
int tcp_copy_max_size;
int tcp_copy_spin_count;
int connect_sleep;
};
@ -262,8 +261,7 @@ extern int mca_oob_tcp_output_handle;
struct mca_oob_tcp_pending_connection_t {
opal_free_list_item_t super;
int fd;
/* Bug, FIXME: Port to IPv6 */
struct sockaddr_in addr;
struct sockaddr_storage addr;
};
typedef struct mca_oob_tcp_pending_connection_t mca_oob_tcp_pending_connection_t;
OBJ_CLASS_DECLARATION(mca_oob_tcp_pending_connection_t);