* really didn't mean for this patch (the threaded accept() code) to come in with
r10841, so revert it (and it's fixes) out. Will bring back once cleaned up from the code used in the tbird experiment This commit was SVN r10991. The following SVN revision numbers were found above: r10841 --> open-mpi/ompi@dfa1221c3b
Этот коммит содержится в:
родитель
77e0c7b383
Коммит
c744f650ba
@ -83,7 +83,6 @@ OBJ_CLASS_INSTANCE(
|
||||
*/
|
||||
|
||||
static int mca_oob_tcp_create_listen(void);
|
||||
static int mca_oob_tcp_create_listen_thread(void);
|
||||
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user);
|
||||
static void mca_oob_tcp_accept(void);
|
||||
|
||||
@ -170,8 +169,6 @@ static inline char* mca_oob_tcp_param_register_str(
|
||||
*/
|
||||
int mca_oob_tcp_component_open(void)
|
||||
{
|
||||
char *listen_type;
|
||||
|
||||
#ifdef __WINDOWS__
|
||||
WSADATA win_sock_data;
|
||||
if (WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0) {
|
||||
@ -210,26 +207,7 @@ int mca_oob_tcp_component_open(void)
|
||||
mca_oob_tcp_component.tcp_rcvbuf =
|
||||
mca_oob_tcp_param_register_int("rcvbuf", 128*1024);
|
||||
|
||||
mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base,
|
||||
"listen_mode",
|
||||
"Mode for HNP to accept incoming connections: event, listen_thread",
|
||||
false,
|
||||
false,
|
||||
"event",
|
||||
&listen_type);
|
||||
|
||||
if (0 == strcmp(listen_type, "event")) {
|
||||
mca_oob_tcp_component.tcp_listen_type = OOB_TCP_EVENT;
|
||||
} else if (0 == strcmp(listen_type, "listen_thread")) {
|
||||
mca_oob_tcp_component.tcp_listen_type = OOB_TCP_LISTEN_THREAD;
|
||||
} else {
|
||||
opal_output(0, "Invalid value for oob_tcp_listen_mode parameter: %s",
|
||||
listen_type);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
/* initialize state */
|
||||
mca_oob_tcp_component.tcp_shutdown = false;
|
||||
mca_oob_tcp_component.tcp_listen_sd = -1;
|
||||
mca_oob_tcp_component.tcp_match_count = 0;
|
||||
return ORTE_SUCCESS;
|
||||
@ -374,131 +352,6 @@ static int mca_oob_tcp_create_listen(void)
|
||||
}
|
||||
|
||||
|
||||
static void* mca_oob_tcp_listen_thread(opal_object_t *obj)
|
||||
{
|
||||
int sd, rc;
|
||||
ompi_socklen_t addrlen = sizeof(struct sockaddr_in);
|
||||
struct sockaddr_in addr;
|
||||
opal_free_list_item_t *fl_item;
|
||||
mca_oob_tcp_pending_connection_t *item;
|
||||
|
||||
while (false == mca_oob_tcp_component.tcp_shutdown) {
|
||||
sd = accept(mca_oob_tcp_component.tcp_listen_sd,
|
||||
(struct sockaddr*)&addr, &addrlen);
|
||||
if(sd < 0) {
|
||||
if (mca_oob_tcp_component.tcp_shutdown) return NULL;
|
||||
|
||||
if(ompi_socket_errno == EINTR)
|
||||
continue;
|
||||
if(ompi_socket_errno != EAGAIN || ompi_socket_errno != EWOULDBLOCK)
|
||||
opal_output(0, "mca_oob_tcp_accept: accept() failed with errno %d.", ompi_socket_errno);
|
||||
close(sd);
|
||||
/* start a new listen socket */
|
||||
mca_oob_tcp_create_listen_thread();
|
||||
return NULL;
|
||||
}
|
||||
|
||||
OPAL_FREE_LIST_WAIT(&mca_oob_tcp_component.tcp_pending_connections_fl,
|
||||
fl_item, rc);
|
||||
item = (mca_oob_tcp_pending_connection_t*) fl_item;
|
||||
item->fd = sd;
|
||||
opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);
|
||||
opal_list_append(&mca_oob_tcp_component.tcp_pending_connections,
|
||||
(opal_list_item_t*) item);
|
||||
opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int mca_oob_tcp_listen_progress(void)
|
||||
{
|
||||
int count = 0;
|
||||
mca_oob_tcp_pending_connection_t *item;
|
||||
struct sockaddr_in addr;
|
||||
mca_oob_tcp_event_t* event;
|
||||
|
||||
if (opal_list_get_size(&mca_oob_tcp_component.tcp_pending_connections)) {
|
||||
opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);
|
||||
while (NULL != (item = (mca_oob_tcp_pending_connection_t*)
|
||||
opal_list_remove_first(&mca_oob_tcp_component.tcp_pending_connections))) {
|
||||
|
||||
/* setup socket options */
|
||||
mca_oob_tcp_set_socket_options(item->fd);
|
||||
|
||||
/* log the accept */
|
||||
if(mca_oob_tcp_component.tcp_debug) {
|
||||
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_accept: %s:%d\n",
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name),
|
||||
inet_ntoa(addr.sin_addr),
|
||||
addr.sin_port);
|
||||
}
|
||||
|
||||
/* wait for receipt of peers process identifier to
|
||||
complete this connection */
|
||||
event = OBJ_NEW(mca_oob_tcp_event_t);
|
||||
opal_event_set(&event->event, item->fd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);
|
||||
opal_event_add(&event->event, 0);
|
||||
OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_pending_connections_fl,
|
||||
(opal_free_list_item_t *) item);
|
||||
|
||||
count++;
|
||||
}
|
||||
opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
static int mca_oob_tcp_create_listen_thread(void)
|
||||
{
|
||||
struct sockaddr_in inaddr;
|
||||
ompi_socklen_t addrlen;
|
||||
|
||||
/* create a listen socket for incoming connections */
|
||||
mca_oob_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if(mca_oob_tcp_component.tcp_listen_sd < 0) {
|
||||
opal_output(0,"mca_oob_tcp_component_init: socket() failed with errno=%d", ompi_socket_errno);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
/* setup socket options */
|
||||
mca_oob_tcp_set_socket_options(mca_oob_tcp_component.tcp_listen_sd);
|
||||
|
||||
/* bind address */
|
||||
memset(&inaddr, 0, sizeof(inaddr));
|
||||
inaddr.sin_family = AF_INET;
|
||||
inaddr.sin_addr.s_addr = INADDR_ANY;
|
||||
inaddr.sin_port = 0;
|
||||
|
||||
if(bind(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
|
||||
opal_output(0,"mca_oob_tcp_create_listen: bind() failed with errno=%d", ompi_socket_errno);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
/* resolve system assigned port */
|
||||
addrlen = sizeof(struct sockaddr_in);
|
||||
if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
|
||||
opal_output(0, "mca_oob_tcp_create_listen: getsockname() failed with errno=%d", ompi_socket_errno);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
mca_oob_tcp_component.tcp_listen_port = inaddr.sin_port;
|
||||
|
||||
/* setup listen backlog to maximum allowed by kernel */
|
||||
if(listen(mca_oob_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) {
|
||||
opal_output(0, "mca_oob_tcp_component_init: listen() failed with errno=%d", ompi_socket_errno);
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
/* start the listen thread */
|
||||
mca_oob_tcp_component.tcp_listen_thread.t_run = mca_oob_tcp_listen_thread;
|
||||
mca_oob_tcp_component.tcp_listen_thread.t_arg = NULL;
|
||||
|
||||
return opal_thread_start(&mca_oob_tcp_component.tcp_listen_thread);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Handle probe
|
||||
*/
|
||||
@ -684,17 +537,9 @@ mca_oob_t* mca_oob_tcp_component_init(int* priority)
|
||||
memset(&mca_oob_tcp_component.tcp_send_event, 0, sizeof(opal_event_t));
|
||||
|
||||
/* create a listen socket */
|
||||
if (OOB_TCP_EVENT == mca_oob_tcp_component.tcp_listen_type) {
|
||||
if(mca_oob_tcp_create_listen() != ORTE_SUCCESS) {
|
||||
opal_output(0, "mca_oob_tcp_init: unable to create listen socket");
|
||||
return NULL;
|
||||
}
|
||||
} else if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) {
|
||||
if (mca_oob_tcp_create_listen_thread() != ORTE_SUCCESS) {
|
||||
opal_output(0, "mca_oob_tcp_init: unable to create listen thread");
|
||||
return NULL;
|
||||
}
|
||||
opal_progress_register(mca_oob_tcp_listen_progress);
|
||||
if(mca_oob_tcp_create_listen() != ORTE_SUCCESS) {
|
||||
opal_output(0, "mca_oob_tcp_init: unable to create listen socket\n");
|
||||
return NULL;
|
||||
}
|
||||
return &mca_oob_tcp;
|
||||
}
|
||||
@ -1087,19 +932,10 @@ int mca_oob_tcp_fini(void)
|
||||
|
||||
/* close listen socket */
|
||||
if (mca_oob_tcp_component.tcp_listen_sd >= 0) {
|
||||
if (OOB_TCP_EVENT == mca_oob_tcp_component.tcp_listen_type) {
|
||||
opal_event_del(&mca_oob_tcp_component.tcp_recv_event);
|
||||
close(mca_oob_tcp_component.tcp_listen_sd);
|
||||
} else if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) {
|
||||
void *data;
|
||||
close(mca_oob_tcp_component.tcp_listen_sd);
|
||||
opal_thread_join(&mca_oob_tcp_component.tcp_listen_thread, &data);
|
||||
opal_progress_unregister(mca_oob_tcp_listen_progress);
|
||||
}
|
||||
|
||||
opal_event_del(&mca_oob_tcp_component.tcp_recv_event);
|
||||
close(mca_oob_tcp_component.tcp_listen_sd);
|
||||
mca_oob_tcp_component.tcp_listen_sd = -1;
|
||||
}
|
||||
opal_progress_unregister(mca_oob_tcp_listen_progress);
|
||||
|
||||
/* cleanup all peers */
|
||||
for(item = opal_list_remove_first(&mca_oob_tcp_component.tcp_peer_list);
|
||||
|
@ -255,12 +255,6 @@ struct mca_oob_tcp_component_t {
|
||||
opal_condition_t tcp_match_cond; /**< condition variable used in finalize */
|
||||
int tcp_match_count; /**< number of matched recvs in progress */
|
||||
int tcp_debug; /**< debug level */
|
||||
bool tcp_shutdown;
|
||||
enum { OOB_TCP_EVENT, OOB_TCP_LISTEN_THREAD } tcp_listen_type;
|
||||
opal_thread_t tcp_listen_thread;
|
||||
opal_free_list_t tcp_pending_connections_fl;
|
||||
opal_list_t tcp_pending_connections;
|
||||
opal_mutex_t tcp_pending_connections_lock;
|
||||
};
|
||||
|
||||
/**
|
||||
@ -270,13 +264,6 @@ typedef struct mca_oob_tcp_component_t mca_oob_tcp_component_t;
|
||||
|
||||
OMPI_COMP_EXPORT extern mca_oob_tcp_component_t mca_oob_tcp_component;
|
||||
|
||||
struct mca_oob_tcp_pending_connection_t {
|
||||
opal_free_list_item_t super;
|
||||
int fd;
|
||||
};
|
||||
typedef struct mca_oob_tcp_pending_connection_t mca_oob_tcp_pending_connection_t;
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_pending_connection_t);
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user