2004-07-01 18:49:54 +04:00
|
|
|
/* -*- C -*-
|
|
|
|
*
|
|
|
|
* $HEADER$
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
#include <unistd.h>
|
|
|
|
#include <sys/types.h>
|
|
|
|
#include <fcntl.h>
|
|
|
|
#include "util/output.h"
|
2004-07-01 18:49:54 +04:00
|
|
|
#include "mca/oob/tcp/oob_tcp.h"
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
static int mca_oob_tcp_create_listen(void);
|
|
|
|
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user);
|
|
|
|
static void mca_oob_tcp_accept(void);
|
|
|
|
|
|
|
|
|
2004-07-01 18:49:54 +04:00
|
|
|
/*
|
|
|
|
* Struct of function pointers and all that to let us be initialized
|
|
|
|
*/
|
2004-08-02 04:24:22 +04:00
|
|
|
mca_oob_tcp_component_t mca_oob_tcp_component = {
|
|
|
|
{
|
2004-07-01 18:49:54 +04:00
|
|
|
{
|
|
|
|
MCA_OOB_BASE_VERSION_1_0_0,
|
|
|
|
"tcp", /* MCA module name */
|
|
|
|
1, /* MCA module major version */
|
|
|
|
0, /* MCA module minor version */
|
|
|
|
0, /* MCA module release version */
|
|
|
|
mca_oob_tcp_open, /* module open */
|
|
|
|
mca_oob_tcp_close /* module close */
|
|
|
|
},
|
|
|
|
{
|
|
|
|
false /* checkpoint / restart */
|
|
|
|
},
|
|
|
|
mca_oob_tcp_init, /* module init */
|
|
|
|
mca_oob_tcp_finalize
|
2004-08-02 04:24:22 +04:00
|
|
|
}
|
2004-07-01 18:49:54 +04:00
|
|
|
};
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
static mca_oob_t mca_oob_tcp = {
|
2004-07-01 18:49:54 +04:00
|
|
|
mca_oob_tcp_send,
|
|
|
|
mca_oob_tcp_recv,
|
|
|
|
mca_oob_tcp_send_nb,
|
|
|
|
mca_oob_tcp_recv_nb
|
|
|
|
};
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Utility function to register/lookup module parameters.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static inline int mca_oob_tcp_param_register_int(
|
|
|
|
const char* param_name,
|
|
|
|
int default_value)
|
|
|
|
{
|
|
|
|
int id = mca_base_param_register_int("ptl","tcp",param_name,NULL,default_value);
|
|
|
|
int param_value = default_value;
|
|
|
|
mca_base_param_lookup_int(id,¶m_value);
|
|
|
|
return param_value;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2004-07-01 18:49:54 +04:00
|
|
|
/*
|
2004-07-13 02:46:57 +04:00
|
|
|
* Initialize global variables used w/in this module.
|
2004-07-01 18:49:54 +04:00
|
|
|
*/
|
|
|
|
int mca_oob_tcp_open(void)
|
|
|
|
{
|
2004-08-03 01:24:00 +04:00
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_list, ompi_list_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_tree, ompi_rb_tree_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_free, ompi_free_list_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msgs, ompi_free_list_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_lock, ompi_mutex_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_post, ompi_list_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_recv, ompi_list_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_lock, ompi_mutex_t);
|
|
|
|
|
|
|
|
/* register oob module parameters */
|
|
|
|
mca_oob_tcp_component.tcp_peer_limit =
|
|
|
|
mca_oob_tcp_param_register_int("peer_limit", -1);
|
|
|
|
mca_oob_tcp_component.tcp_peer_retries =
|
|
|
|
mca_oob_tcp_param_register_int("peer_retries", 60);
|
2004-08-03 02:16:35 +04:00
|
|
|
|
|
|
|
/* initialize state */
|
|
|
|
mca_oob_tcp_component.tcp_listen_sd = -1;
|
2004-07-13 02:46:57 +04:00
|
|
|
return OMPI_SUCCESS;
|
2004-07-01 18:49:54 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2004-07-13 02:46:57 +04:00
|
|
|
/*
|
|
|
|
* Cleanup of global variables used by this module.
|
|
|
|
*/
|
|
|
|
|
2004-07-01 18:49:54 +04:00
|
|
|
int mca_oob_tcp_close(void)
|
|
|
|
{
|
2004-08-03 01:24:00 +04:00
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_list);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_tree);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_free);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msgs);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_post);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_recv);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_lock);
|
2004-07-13 02:46:57 +04:00
|
|
|
return OMPI_SUCCESS;
|
2004-07-01 18:49:54 +04:00
|
|
|
}
|
|
|
|
|
2004-07-13 02:46:57 +04:00
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
/*
|
|
|
|
* Called by mca_oob_tcp_recv_handler() when the TCP listen
|
|
|
|
* socket has pending connection requests. Accept incoming
|
|
|
|
* requests and queue for completion of the connection handshake.
|
2004-07-13 02:46:57 +04:00
|
|
|
*/
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
static void mca_oob_tcp_accept(void)
|
2004-07-13 02:46:57 +04:00
|
|
|
{
|
2004-08-03 01:24:00 +04:00
|
|
|
while(true) {
|
|
|
|
ompi_socklen_t addrlen = sizeof(struct sockaddr_in);
|
|
|
|
struct sockaddr_in addr;
|
|
|
|
ompi_event_t* event;
|
|
|
|
int sd = accept(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&addr, &addrlen);
|
|
|
|
if(sd < 0) {
|
|
|
|
if(errno == EINTR)
|
|
|
|
continue;
|
|
|
|
if(errno != EAGAIN || errno != EWOULDBLOCK)
|
|
|
|
ompi_output(0, "mca_oob_tcp_accept: accept() failed with errno %d.", errno);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* wait for receipt of peers process identifier to complete this connection */
|
|
|
|
event = malloc(sizeof(ompi_event_t));
|
|
|
|
ompi_event_set(event, sd, OMPI_EV_READ|OMPI_EV_PERSIST, mca_oob_tcp_recv_handler, event);
|
|
|
|
ompi_event_add(event, 0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Create a listen socket and bind to all interfaces
|
|
|
|
*/
|
|
|
|
|
|
|
|
static int mca_oob_tcp_create_listen(void)
|
|
|
|
{
|
|
|
|
int flags;
|
|
|
|
struct sockaddr_in inaddr;
|
|
|
|
ompi_socklen_t addrlen;
|
|
|
|
|
|
|
|
/* create a listen socket for incoming connections */
|
|
|
|
mca_oob_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0);
|
|
|
|
if(mca_oob_tcp_component.tcp_listen_sd < 0) {
|
|
|
|
ompi_output(0,"mca_oob_tcp_component_init: socket() failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* bind to all addresses and dynamically assigned port */
|
|
|
|
memset(&inaddr, 0, sizeof(inaddr));
|
|
|
|
inaddr.sin_family = AF_INET;
|
|
|
|
inaddr.sin_addr.s_addr = INADDR_ANY;
|
|
|
|
inaddr.sin_port = htons(5000+mca_oob_name_self.vpid);
|
|
|
|
|
|
|
|
if(bind(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
|
|
|
|
ompi_output(0,"mca_oob_tcp_create_listen: bind() failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* resolve system assignend port */
|
|
|
|
addrlen = sizeof(struct sockaddr_in);
|
|
|
|
if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_create_listen: getsockname() failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
}
|
|
|
|
mca_oob_tcp_component.tcp_listen_port = inaddr.sin_port;
|
|
|
|
|
|
|
|
/* setup listen backlog to maximum allowed by kernel */
|
|
|
|
if(listen(mca_oob_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_component_init: listen() failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* set socket up to be non-blocking, otherwise accept could block */
|
|
|
|
if((flags = fcntl(mca_oob_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
} else {
|
|
|
|
flags |= O_NONBLOCK;
|
|
|
|
if(fcntl(mca_oob_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* register listen port */
|
|
|
|
ompi_event_set(
|
|
|
|
&mca_oob_tcp_component.tcp_recv_event,
|
|
|
|
mca_oob_tcp_component.tcp_listen_sd,
|
|
|
|
OMPI_EV_READ|OMPI_EV_PERSIST,
|
|
|
|
mca_oob_tcp_recv_handler,
|
|
|
|
0);
|
|
|
|
ompi_event_add(&mca_oob_tcp_component.tcp_recv_event, 0);
|
|
|
|
return OMPI_SUCCESS;
|
|
|
|
}
|
2004-07-13 02:46:57 +04:00
|
|
|
|
2004-07-10 05:27:07 +04:00
|
|
|
|
2004-07-01 18:49:54 +04:00
|
|
|
/*
|
2004-08-03 01:24:00 +04:00
|
|
|
* Event callback when there is data available on the registered
|
|
|
|
* socket to recv.
|
2004-07-01 18:49:54 +04:00
|
|
|
*/
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
|
|
|
|
{
|
|
|
|
ompi_process_name_t name;
|
|
|
|
mca_oob_tcp_peer_t* peer;
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
/* accept new connections on the listen socket */
|
|
|
|
if(mca_oob_tcp_component.tcp_listen_sd == sd) {
|
|
|
|
mca_oob_tcp_accept();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
ompi_event_del((ompi_event_t*)user);
|
|
|
|
free(user);
|
|
|
|
|
|
|
|
/* recv the process identifier */
|
|
|
|
rc = recv(sd, &name, sizeof(name), 0);
|
|
|
|
if(rc != sizeof(name)) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_recv_handler: recv() return value %d != %d, errno = %d",
|
|
|
|
rc, sizeof(name), errno);
|
|
|
|
close(sd);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
OMPI_PROCESS_NAME_NTOH(name);
|
|
|
|
|
|
|
|
/* now set socket up to be non-blocking */
|
|
|
|
if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_recv_handler: fcntl(F_GETFL) failed with errno=%d", errno);
|
|
|
|
} else {
|
|
|
|
flags |= O_NONBLOCK;
|
|
|
|
if(fcntl(sd, F_SETFL, flags) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_recv_handler: fcntl(F_SETFL) failed with errno=%d", errno);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* lookup the corresponding process */
|
|
|
|
peer = mca_oob_tcp_peer_lookup(&name, true);
|
|
|
|
if(NULL == peer) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_recv_handler: unable to locate peer");
|
|
|
|
close(sd);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* is the peer instance willing to accept this connection */
|
|
|
|
if(mca_oob_tcp_peer_accept(peer, sd) == false) {
|
2004-08-04 18:33:02 +04:00
|
|
|
ompi_output(0, "mca_oob_tcp_recv_handler: peer instance not willing to accept connection.");
|
2004-08-03 01:24:00 +04:00
|
|
|
close(sd);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Module initialization.
|
|
|
|
* (1) initialize static resources
|
|
|
|
* (2) create listen socket
|
|
|
|
*/
|
|
|
|
mca_oob_t* mca_oob_tcp_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
|
2004-07-01 18:49:54 +04:00
|
|
|
{
|
2004-07-13 02:46:57 +04:00
|
|
|
/* initialize data structures */
|
2004-08-05 19:30:36 +04:00
|
|
|
ompi_rb_tree_init(&mca_oob_tcp_component.tcp_peer_tree, (ompi_rb_tree_comp_fn_t)mca_oob_tcp_process_name_compare);
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
ompi_free_list_init(&mca_oob_tcp_component.tcp_peer_free,
|
|
|
|
sizeof(mca_oob_tcp_peer_t),
|
|
|
|
OBJ_CLASS(mca_oob_tcp_peer_t),
|
|
|
|
8, /* initial number */
|
|
|
|
mca_oob_tcp_component.tcp_peer_limit, /* maximum number */
|
|
|
|
8, /* increment to grow by */
|
|
|
|
NULL); /* use default allocator */
|
|
|
|
|
|
|
|
ompi_free_list_init(&mca_oob_tcp_component.tcp_msgs,
|
|
|
|
sizeof(mca_oob_tcp_msg_t),
|
|
|
|
OBJ_CLASS(mca_oob_tcp_msg_t),
|
|
|
|
8, /* initial number */
|
2004-08-03 02:16:35 +04:00
|
|
|
-1, /* maximum number */
|
2004-08-03 01:24:00 +04:00
|
|
|
8, /* increment to grow by */
|
|
|
|
NULL); /* use default allocator */
|
|
|
|
|
|
|
|
#if 0
|
|
|
|
/* intialize event library */
|
2004-08-05 03:42:51 +04:00
|
|
|
memset(&mca_oob_tcp_component.tcp_recv_event, 0, sizeof(ompi_event_t));
|
|
|
|
memset(&mca_oob_tcp_component.tcp_send_event, 0, sizeof(ompi_event_t));
|
2004-08-03 01:24:00 +04:00
|
|
|
if(ompi_event_init() != OMPI_SUCCESS) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_init: unable to initialize event library\n");
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* create a listen socket */
|
|
|
|
if(mca_oob_tcp_create_listen() != OMPI_SUCCESS) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_init: unable to create listen socket\n");
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
return &mca_oob_tcp;
|
|
|
|
#else
|
2004-07-01 18:49:54 +04:00
|
|
|
return NULL;
|
2004-08-03 01:24:00 +04:00
|
|
|
#endif
|
2004-07-01 18:49:54 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
/*
|
|
|
|
* Module cleanup.
|
|
|
|
*/
|
2004-07-01 18:49:54 +04:00
|
|
|
int mca_oob_tcp_finalize(void)
|
|
|
|
{
|
2004-08-06 21:23:37 +04:00
|
|
|
int optval;
|
2004-08-04 18:33:02 +04:00
|
|
|
mca_oob_tcp_peer_t * peer;
|
2004-08-05 23:37:48 +04:00
|
|
|
if (mca_oob_tcp_component.tcp_listen_sd >= 0) {
|
2004-08-06 21:23:37 +04:00
|
|
|
optval = 1;
|
|
|
|
if(setsockopt(mca_oob_tcp_component.tcp_listen_sd, SOL_SOCKET,
|
|
|
|
SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
|
|
|
|
ompi_output(0,
|
|
|
|
"mca_oob_tcp_finalize: setsockopt(SO_REUSEADDR) failed with errno=%d\n",
|
|
|
|
errno);
|
|
|
|
}
|
2004-08-06 23:12:26 +04:00
|
|
|
/*** temporarily disable the removal of the revieve event
|
2004-08-10 03:07:53 +04:00
|
|
|
* to prevent segfaults */
|
|
|
|
ompi_event_del(&mca_oob_tcp_component.tcp_recv_event);
|
2004-08-05 23:37:48 +04:00
|
|
|
if(0 != close(mca_oob_tcp_component.tcp_listen_sd)) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_finalize: error closing listen socket. errno=%d", errno);
|
|
|
|
}
|
|
|
|
}
|
2004-08-06 21:23:37 +04:00
|
|
|
/* TODO: need to cleanup all peers - check for pending send/recvs. etc. */
|
|
|
|
while(NULL != (peer = (mca_oob_tcp_peer_t *)
|
|
|
|
ompi_list_remove_first(&mca_oob_tcp_component.tcp_peer_list))) {
|
|
|
|
OBJ_DESTRUCT(peer);
|
|
|
|
}
|
2004-08-05 03:42:51 +04:00
|
|
|
ompi_event_fini();
|
2004-08-06 21:23:37 +04:00
|
|
|
|
2004-07-01 18:49:54 +04:00
|
|
|
return OMPI_SUCCESS;
|
|
|
|
}
|
2004-07-13 02:46:57 +04:00
|
|
|
|
2004-08-10 03:07:53 +04:00
|
|
|
/*
|
2004-08-05 19:30:36 +04:00
|
|
|
* Compare two process names for equality.
|
|
|
|
*
|
|
|
|
* @param n1 Process name 1.
|
|
|
|
* @param n2 Process name 2.
|
|
|
|
* @return (-1 for n1<n2 0 for equality, 1 for n1>n2)
|
|
|
|
*
|
|
|
|
* Note that the definition of < or > is somewhat arbitrary -
|
|
|
|
* just needs to be consistently applied to maintain an ordering
|
|
|
|
* when process names are used as indices.
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
int mca_oob_tcp_process_name_compare(const ompi_process_name_t* n1, const ompi_process_name_t* n2)
|
|
|
|
{
|
|
|
|
if(n1->cellid < n2->cellid)
|
|
|
|
return -1;
|
|
|
|
else if(n1->cellid > n2->cellid)
|
|
|
|
return 1;
|
|
|
|
else if(n1->jobid < n2->jobid)
|
|
|
|
return -1;
|
|
|
|
else if(n1->jobid > n2->jobid)
|
|
|
|
return 1;
|
|
|
|
else if(n1->vpid < n2->vpid)
|
|
|
|
return -1;
|
|
|
|
else if(n1->vpid > n2->vpid)
|
|
|
|
return 1;
|
|
|
|
return(0);
|
|
|
|
}
|
|
|
|
|