2004-08-25 21:39:08 +04:00
|
|
|
/*
|
2004-07-01 18:49:54 +04:00
|
|
|
* $HEADER$
|
|
|
|
*/
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
#include <unistd.h>
|
|
|
|
#include <sys/types.h>
|
|
|
|
#include <fcntl.h>
|
2004-08-16 23:39:54 +04:00
|
|
|
#include <netinet/in.h>
|
|
|
|
#include <arpa/inet.h>
|
2004-08-03 01:24:00 +04:00
|
|
|
#include "util/output.h"
|
2004-08-16 23:39:54 +04:00
|
|
|
#include "util/if.h"
|
2004-07-01 18:49:54 +04:00
|
|
|
#include "mca/oob/tcp/oob_tcp.h"
|
2004-08-19 23:34:37 +04:00
|
|
|
#include "mca/ns/ns.h"
|
|
|
|
#include "mca/gpr/base/base.h"
|
|
|
|
#include "mca/gpr/gpr.h"
|
2004-08-25 21:39:08 +04:00
|
|
|
#include "mca/pcmclient/pcmclient.h"
|
|
|
|
#include "mca/pcmclient/base/base.h"
|
2004-07-01 18:49:54 +04:00
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
static int mca_oob_tcp_create_listen(void);
|
|
|
|
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user);
|
|
|
|
static void mca_oob_tcp_accept(void);
|
|
|
|
|
|
|
|
|
2004-09-02 03:07:40 +04:00
|
|
|
struct mca_oob_tcp_subscription_t {
|
|
|
|
ompi_list_item_t item;
|
|
|
|
mca_ns_base_jobid_t jobid;
|
|
|
|
};
|
|
|
|
typedef struct mca_oob_tcp_subscription_t mca_oob_tcp_subscription_t;
|
|
|
|
|
|
|
|
OBJ_CLASS_INSTANCE(
|
|
|
|
mca_oob_tcp_subscription_t,
|
|
|
|
ompi_list_item_t,
|
|
|
|
NULL,
|
|
|
|
NULL);
|
|
|
|
|
|
|
|
|
|
|
|
|
2004-07-01 18:49:54 +04:00
|
|
|
/*
|
|
|
|
* Struct of function pointers and all that to let us be initialized
|
|
|
|
*/
|
2004-08-02 04:24:22 +04:00
|
|
|
mca_oob_tcp_component_t mca_oob_tcp_component = {
|
|
|
|
{
|
2004-07-01 18:49:54 +04:00
|
|
|
{
|
|
|
|
MCA_OOB_BASE_VERSION_1_0_0,
|
|
|
|
"tcp", /* MCA module name */
|
2004-08-19 23:34:37 +04:00
|
|
|
1, /* MCA component major version */
|
|
|
|
0, /* MCA component minor version */
|
|
|
|
0, /* MCA component release version */
|
|
|
|
mca_oob_tcp_component_open, /* component open */
|
|
|
|
mca_oob_tcp_component_close /* component close */
|
2004-07-01 18:49:54 +04:00
|
|
|
},
|
|
|
|
{
|
|
|
|
false /* checkpoint / restart */
|
|
|
|
},
|
2004-08-19 23:34:37 +04:00
|
|
|
mca_oob_tcp_component_init
|
2004-08-02 04:24:22 +04:00
|
|
|
}
|
2004-07-01 18:49:54 +04:00
|
|
|
};
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
static mca_oob_t mca_oob_tcp = {
|
2004-08-16 23:39:54 +04:00
|
|
|
mca_oob_tcp_get_addr,
|
|
|
|
mca_oob_tcp_set_seed,
|
2004-07-01 18:49:54 +04:00
|
|
|
mca_oob_tcp_send,
|
|
|
|
mca_oob_tcp_recv,
|
|
|
|
mca_oob_tcp_send_nb,
|
2004-08-11 01:02:36 +04:00
|
|
|
mca_oob_tcp_recv_nb,
|
2004-08-19 23:34:37 +04:00
|
|
|
mca_oob_tcp_init,
|
|
|
|
mca_oob_tcp_fini,
|
2004-07-01 18:49:54 +04:00
|
|
|
};
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Utility function to register/lookup module parameters.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static inline int mca_oob_tcp_param_register_int(
|
|
|
|
const char* param_name,
|
|
|
|
int default_value)
|
|
|
|
{
|
2004-09-02 03:07:40 +04:00
|
|
|
int id = mca_base_param_register_int("oob","tcp",param_name,NULL,default_value);
|
2004-08-03 01:24:00 +04:00
|
|
|
int param_value = default_value;
|
|
|
|
mca_base_param_lookup_int(id,¶m_value);
|
|
|
|
return param_value;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2004-08-16 23:39:54 +04:00
|
|
|
static inline char* mca_oob_tcp_param_register_str(
|
|
|
|
const char* param_name,
|
|
|
|
const char* default_value)
|
|
|
|
{
|
2004-09-02 03:07:40 +04:00
|
|
|
int id = mca_base_param_register_string("oob","tcp",param_name,NULL,default_value);
|
2004-08-16 23:39:54 +04:00
|
|
|
char* param_value = NULL;
|
|
|
|
mca_base_param_lookup_string(id,¶m_value);
|
|
|
|
return param_value;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2004-07-01 18:49:54 +04:00
|
|
|
/*
|
2004-07-13 02:46:57 +04:00
|
|
|
* Initialize global variables used w/in this module.
|
2004-07-01 18:49:54 +04:00
|
|
|
*/
|
2004-08-19 23:34:37 +04:00
|
|
|
int mca_oob_tcp_component_open(void)
|
2004-07-01 18:49:54 +04:00
|
|
|
{
|
2004-09-02 03:07:40 +04:00
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_subscriptions, ompi_list_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_list, ompi_list_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_tree, ompi_rb_tree_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_names, ompi_rb_tree_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_free, ompi_free_list_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msgs, ompi_free_list_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_lock, ompi_mutex_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_post, ompi_list_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_recv, ompi_list_t);
|
|
|
|
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_lock, ompi_mutex_t);
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
/* register oob module parameters */
|
|
|
|
mca_oob_tcp_component.tcp_peer_limit =
|
|
|
|
mca_oob_tcp_param_register_int("peer_limit", -1);
|
|
|
|
mca_oob_tcp_component.tcp_peer_retries =
|
|
|
|
mca_oob_tcp_param_register_int("peer_retries", 60);
|
2004-09-02 03:07:40 +04:00
|
|
|
mca_oob_tcp_component.tcp_debug =
|
|
|
|
mca_oob_tcp_param_register_int("debug", 1);
|
2004-08-16 23:39:54 +04:00
|
|
|
memset(&mca_oob_tcp_component.tcp_seed_addr, 0, sizeof(mca_oob_tcp_component.tcp_seed_addr));
|
2004-08-03 02:16:35 +04:00
|
|
|
|
|
|
|
/* initialize state */
|
|
|
|
mca_oob_tcp_component.tcp_listen_sd = -1;
|
2004-07-13 02:46:57 +04:00
|
|
|
return OMPI_SUCCESS;
|
2004-07-01 18:49:54 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2004-07-13 02:46:57 +04:00
|
|
|
/*
|
|
|
|
* Cleanup of global variables used by this module.
|
|
|
|
*/
|
|
|
|
|
2004-08-19 23:34:37 +04:00
|
|
|
int mca_oob_tcp_component_close(void)
|
2004-07-01 18:49:54 +04:00
|
|
|
{
|
2004-08-03 01:24:00 +04:00
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_list);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_tree);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_free);
|
2004-09-02 03:07:40 +04:00
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_subscriptions);
|
2004-08-03 01:24:00 +04:00
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msgs);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_post);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_recv);
|
|
|
|
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_lock);
|
2004-07-13 02:46:57 +04:00
|
|
|
return OMPI_SUCCESS;
|
2004-07-01 18:49:54 +04:00
|
|
|
}
|
|
|
|
|
2004-07-13 02:46:57 +04:00
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
/*
|
|
|
|
* Called by mca_oob_tcp_recv_handler() when the TCP listen
|
|
|
|
* socket has pending connection requests. Accept incoming
|
|
|
|
* requests and queue for completion of the connection handshake.
|
2004-07-13 02:46:57 +04:00
|
|
|
*/
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
static void mca_oob_tcp_accept(void)
|
2004-07-13 02:46:57 +04:00
|
|
|
{
|
2004-08-03 01:24:00 +04:00
|
|
|
while(true) {
|
|
|
|
ompi_socklen_t addrlen = sizeof(struct sockaddr_in);
|
|
|
|
struct sockaddr_in addr;
|
|
|
|
ompi_event_t* event;
|
|
|
|
int sd = accept(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&addr, &addrlen);
|
|
|
|
if(sd < 0) {
|
|
|
|
if(errno == EINTR)
|
|
|
|
continue;
|
|
|
|
if(errno != EAGAIN || errno != EWOULDBLOCK)
|
|
|
|
ompi_output(0, "mca_oob_tcp_accept: accept() failed with errno %d.", errno);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* wait for receipt of peers process identifier to complete this connection */
|
|
|
|
event = malloc(sizeof(ompi_event_t));
|
|
|
|
ompi_event_set(event, sd, OMPI_EV_READ|OMPI_EV_PERSIST, mca_oob_tcp_recv_handler, event);
|
|
|
|
ompi_event_add(event, 0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Create a listen socket and bind to all interfaces
|
|
|
|
*/
|
|
|
|
|
|
|
|
static int mca_oob_tcp_create_listen(void)
|
|
|
|
{
|
|
|
|
int flags;
|
2004-08-12 17:29:37 +04:00
|
|
|
int optval = 1;
|
2004-08-03 01:24:00 +04:00
|
|
|
struct sockaddr_in inaddr;
|
|
|
|
ompi_socklen_t addrlen;
|
2004-08-12 17:29:37 +04:00
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
/* create a listen socket for incoming connections */
|
|
|
|
mca_oob_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0);
|
|
|
|
if(mca_oob_tcp_component.tcp_listen_sd < 0) {
|
|
|
|
ompi_output(0,"mca_oob_tcp_component_init: socket() failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
}
|
2004-08-12 17:29:37 +04:00
|
|
|
/* allow port to be re-used - for temporary fixed port numbers */
|
|
|
|
if (setsockopt(
|
|
|
|
mca_oob_tcp_component.tcp_listen_sd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_create_listen: setsockopt(SO_REUSEADDR) failed with errno=%d\n",
|
|
|
|
errno);
|
2004-08-11 20:02:59 +04:00
|
|
|
}
|
2004-08-12 17:29:37 +04:00
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
memset(&inaddr, 0, sizeof(inaddr));
|
|
|
|
inaddr.sin_family = AF_INET;
|
|
|
|
inaddr.sin_addr.s_addr = INADDR_ANY;
|
2004-08-19 23:34:37 +04:00
|
|
|
inaddr.sin_port = 0;
|
2004-08-12 17:29:37 +04:00
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
if(bind(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
|
|
|
|
ompi_output(0,"mca_oob_tcp_create_listen: bind() failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
}
|
2004-08-12 17:29:37 +04:00
|
|
|
|
2004-09-02 03:07:40 +04:00
|
|
|
/* resolve system assigned port */
|
2004-08-03 01:24:00 +04:00
|
|
|
addrlen = sizeof(struct sockaddr_in);
|
|
|
|
if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_create_listen: getsockname() failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
}
|
|
|
|
mca_oob_tcp_component.tcp_listen_port = inaddr.sin_port;
|
|
|
|
|
|
|
|
/* setup listen backlog to maximum allowed by kernel */
|
|
|
|
if(listen(mca_oob_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_component_init: listen() failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* set socket up to be non-blocking, otherwise accept could block */
|
|
|
|
if((flags = fcntl(mca_oob_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
} else {
|
|
|
|
flags |= O_NONBLOCK;
|
|
|
|
if(fcntl(mca_oob_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed with errno=%d", errno);
|
|
|
|
return OMPI_ERROR;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* register listen port */
|
|
|
|
ompi_event_set(
|
|
|
|
&mca_oob_tcp_component.tcp_recv_event,
|
|
|
|
mca_oob_tcp_component.tcp_listen_sd,
|
|
|
|
OMPI_EV_READ|OMPI_EV_PERSIST,
|
|
|
|
mca_oob_tcp_recv_handler,
|
|
|
|
0);
|
|
|
|
ompi_event_add(&mca_oob_tcp_component.tcp_recv_event, 0);
|
|
|
|
return OMPI_SUCCESS;
|
|
|
|
}
|
2004-07-13 02:46:57 +04:00
|
|
|
|
2004-07-10 05:27:07 +04:00
|
|
|
|
2004-07-01 18:49:54 +04:00
|
|
|
/*
|
2004-08-03 01:24:00 +04:00
|
|
|
* Event callback when there is data available on the registered
|
|
|
|
* socket to recv.
|
2004-07-01 18:49:54 +04:00
|
|
|
*/
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
|
|
|
|
{
|
2004-08-28 05:15:19 +04:00
|
|
|
ompi_process_name_t guid[2];
|
2004-08-03 01:24:00 +04:00
|
|
|
mca_oob_tcp_peer_t* peer;
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
/* accept new connections on the listen socket */
|
|
|
|
if(mca_oob_tcp_component.tcp_listen_sd == sd) {
|
|
|
|
mca_oob_tcp_accept();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
ompi_event_del((ompi_event_t*)user);
|
|
|
|
free(user);
|
|
|
|
|
|
|
|
/* recv the process identifier */
|
2004-09-02 03:07:40 +04:00
|
|
|
while((rc = recv(sd, guid, sizeof(guid), 0)) != sizeof(guid)) {
|
|
|
|
if(rc >= 0) {
|
|
|
|
close(sd);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
if(errno != EINTR) {
|
|
|
|
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: recv() failed with errno=%d\n", errno);
|
|
|
|
close(sd);
|
|
|
|
return;
|
|
|
|
}
|
2004-08-03 01:24:00 +04:00
|
|
|
}
|
2004-08-28 05:15:19 +04:00
|
|
|
OMPI_PROCESS_NAME_NTOH(guid[0]);
|
|
|
|
OMPI_PROCESS_NAME_NTOH(guid[1]);
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
/* now set socket up to be non-blocking */
|
|
|
|
if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_recv_handler: fcntl(F_GETFL) failed with errno=%d", errno);
|
|
|
|
} else {
|
|
|
|
flags |= O_NONBLOCK;
|
|
|
|
if(fcntl(sd, F_SETFL, flags) < 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_recv_handler: fcntl(F_SETFL) failed with errno=%d", errno);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2004-08-25 21:39:08 +04:00
|
|
|
/* check for wildcard name - if this is true - we allocate a name from the name server
|
|
|
|
* and return to the peer
|
|
|
|
*/
|
2004-08-28 05:15:19 +04:00
|
|
|
if(mca_oob_tcp_process_name_compare(guid, MCA_OOB_NAME_ANY) == 0) {
|
|
|
|
guid->jobid = ompi_name_server.create_jobid();
|
|
|
|
guid->vpid = ompi_name_server.reserve_range(guid->jobid,1);
|
|
|
|
ompi_name_server.assign_cellid_to_process(guid);
|
2004-08-25 21:39:08 +04:00
|
|
|
}
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
/* lookup the corresponding process */
|
2004-09-02 03:07:40 +04:00
|
|
|
peer = mca_oob_tcp_peer_lookup(guid);
|
2004-08-03 01:24:00 +04:00
|
|
|
if(NULL == peer) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_recv_handler: unable to locate peer");
|
|
|
|
close(sd);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* is the peer instance willing to accept this connection */
|
|
|
|
if(mca_oob_tcp_peer_accept(peer, sd) == false) {
|
|
|
|
close(sd);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2004-08-19 23:34:37 +04:00
|
|
|
* Component initialization - create a module.
|
2004-08-03 01:24:00 +04:00
|
|
|
* (1) initialize static resources
|
|
|
|
* (2) create listen socket
|
|
|
|
*/
|
2004-08-28 05:15:19 +04:00
|
|
|
mca_oob_t* mca_oob_tcp_component_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads)
|
2004-07-01 18:49:54 +04:00
|
|
|
{
|
2004-08-28 05:15:19 +04:00
|
|
|
/* dont allow tcp to be selected if we dont know the seed */
|
|
|
|
if(mca_oob_has_seed() == false)
|
|
|
|
return NULL;
|
|
|
|
|
|
|
|
*priority = 1;
|
|
|
|
*allow_multi_user_threads = true;
|
|
|
|
*have_hidden_threads = OMPI_HAVE_THREADS;
|
|
|
|
|
2004-07-13 02:46:57 +04:00
|
|
|
/* initialize data structures */
|
2004-08-05 19:30:36 +04:00
|
|
|
ompi_rb_tree_init(&mca_oob_tcp_component.tcp_peer_tree, (ompi_rb_tree_comp_fn_t)mca_oob_tcp_process_name_compare);
|
2004-09-02 03:07:40 +04:00
|
|
|
ompi_rb_tree_init(&mca_oob_tcp_component.tcp_peer_names, (ompi_rb_tree_comp_fn_t)mca_oob_tcp_process_name_compare);
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
ompi_free_list_init(&mca_oob_tcp_component.tcp_peer_free,
|
|
|
|
sizeof(mca_oob_tcp_peer_t),
|
|
|
|
OBJ_CLASS(mca_oob_tcp_peer_t),
|
|
|
|
8, /* initial number */
|
|
|
|
mca_oob_tcp_component.tcp_peer_limit, /* maximum number */
|
|
|
|
8, /* increment to grow by */
|
|
|
|
NULL); /* use default allocator */
|
|
|
|
|
|
|
|
ompi_free_list_init(&mca_oob_tcp_component.tcp_msgs,
|
|
|
|
sizeof(mca_oob_tcp_msg_t),
|
|
|
|
OBJ_CLASS(mca_oob_tcp_msg_t),
|
|
|
|
8, /* initial number */
|
2004-08-03 02:16:35 +04:00
|
|
|
-1, /* maximum number */
|
2004-08-03 01:24:00 +04:00
|
|
|
8, /* increment to grow by */
|
|
|
|
NULL); /* use default allocator */
|
|
|
|
|
|
|
|
/* intialize event library */
|
2004-08-05 03:42:51 +04:00
|
|
|
memset(&mca_oob_tcp_component.tcp_recv_event, 0, sizeof(ompi_event_t));
|
|
|
|
memset(&mca_oob_tcp_component.tcp_send_event, 0, sizeof(ompi_event_t));
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
/* create a listen socket */
|
|
|
|
if(mca_oob_tcp_create_listen() != OMPI_SUCCESS) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_init: unable to create listen socket\n");
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
return &mca_oob_tcp;
|
2004-07-01 18:49:54 +04:00
|
|
|
}
|
|
|
|
|
2004-09-02 03:07:40 +04:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Callback from registry on change to subscribed segments.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static void mca_oob_tcp_registry_callback(
|
|
|
|
ompi_registry_notify_message_t* msg,
|
|
|
|
void* cbdata)
|
|
|
|
{
|
|
|
|
ompi_list_item_t* item;
|
|
|
|
if(mca_oob_tcp_component.tcp_debug > 1) {
|
|
|
|
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_registry_callback\n",
|
|
|
|
OMPI_NAME_COMPONENTS(mca_oob_name_self));
|
|
|
|
}
|
|
|
|
|
|
|
|
/* process the callback */
|
|
|
|
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
while((item = ompi_list_remove_first(&msg->data)) != NULL) {
|
|
|
|
|
|
|
|
ompi_registry_value_t* value = (ompi_registry_value_t*)item;
|
|
|
|
ompi_buffer_t buffer;
|
|
|
|
mca_oob_tcp_addr_t* addr, *existing;
|
|
|
|
mca_oob_tcp_peer_t* peer;
|
|
|
|
|
|
|
|
/* transfer ownership of registry object to buffer and unpack */
|
|
|
|
ompi_buffer_init_preallocated(&buffer, value->object, value->object_size);
|
|
|
|
value->object = NULL;
|
|
|
|
value->object_size = 0;
|
|
|
|
addr = mca_oob_tcp_addr_unpack(buffer);
|
|
|
|
ompi_buffer_free(buffer);
|
|
|
|
if(NULL == addr) {
|
|
|
|
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_registry_callback: unable to unpack peer address\n",
|
|
|
|
OMPI_NAME_COMPONENTS(mca_oob_name_self));
|
|
|
|
OBJ_RELEASE(item);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
if(mca_oob_tcp_component.tcp_debug > 1) {
|
|
|
|
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_registry_callback: received peer [%d,%d,%d]\n",
|
|
|
|
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
|
|
|
OMPI_NAME_COMPONENTS(addr->addr_name));
|
|
|
|
}
|
|
|
|
|
|
|
|
/* check for existing cache entry */
|
|
|
|
existing = ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name);
|
|
|
|
if(NULL != existing) {
|
|
|
|
/* TSW - need to update existing entry */
|
|
|
|
OBJ_RELEASE(addr);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* insert into cache and notify peer */
|
|
|
|
ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name, addr);
|
|
|
|
peer = ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_tree, &addr->addr_name);
|
|
|
|
if(NULL != peer)
|
|
|
|
mca_oob_tcp_peer_resolved(peer, addr);
|
|
|
|
|
|
|
|
OBJ_RELEASE(item);
|
|
|
|
}
|
|
|
|
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Attempt to resolve peer name.
|
|
|
|
*/
|
|
|
|
|
|
|
|
int mca_oob_tcp_resolve(mca_oob_tcp_peer_t* peer)
|
|
|
|
{
|
|
|
|
mca_oob_tcp_addr_t* addr;
|
|
|
|
mca_oob_tcp_subscription_t* subscription;
|
|
|
|
ompi_list_item_t* item;
|
|
|
|
char segment[32];
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
/* if the address is already cached - simply return it */
|
|
|
|
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
addr = ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_names, &peer->peer_name);
|
|
|
|
if(NULL != addr) {
|
|
|
|
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
mca_oob_tcp_peer_resolved(peer, addr);
|
|
|
|
return OMPI_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* check to see if we have subscribed to this registry segment */
|
|
|
|
for( item = ompi_list_get_first(&mca_oob_tcp_component.tcp_subscriptions);
|
|
|
|
item != ompi_list_get_end(&mca_oob_tcp_component.tcp_subscriptions);
|
|
|
|
item = ompi_list_get_next(item)) {
|
|
|
|
subscription = (mca_oob_tcp_subscription_t*)item;
|
|
|
|
if(subscription->jobid == peer->peer_name.jobid) {
|
|
|
|
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
return OMPI_SUCCESS;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* otherwise - need to subscribe to this registry segment
|
|
|
|
* record the subscription
|
|
|
|
*/
|
|
|
|
subscription = OBJ_NEW(mca_oob_tcp_subscription_t);
|
|
|
|
subscription->jobid = peer->peer_name.jobid;
|
|
|
|
ompi_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item);
|
|
|
|
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
|
|
|
|
/* subscribe */
|
|
|
|
sprintf(segment, "oob-tcp-%u", peer->peer_name.jobid);
|
|
|
|
rc = ompi_registry.subscribe(
|
|
|
|
OMPI_REGISTRY_OR,
|
|
|
|
OMPI_REGISTRY_NOTIFY_ADD_ENTRY|OMPI_REGISTRY_NOTIFY_DELETE_ENTRY|OMPI_REGISTRY_NOTIFY_MODIFICATION,
|
|
|
|
segment,
|
|
|
|
NULL,
|
|
|
|
mca_oob_tcp_registry_callback,
|
|
|
|
NULL);
|
|
|
|
if(rc != OMPI_SUCCESS) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_resolve: ompi_registry.subscribe failed with error status: %d\n", rc);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
return OMPI_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2004-08-19 23:34:37 +04:00
|
|
|
/*
|
|
|
|
* Setup contact information in the registry.
|
|
|
|
*/
|
|
|
|
int mca_oob_tcp_init(void)
|
|
|
|
{
|
2004-09-02 03:07:40 +04:00
|
|
|
char *keys[2];
|
|
|
|
void *addr;
|
|
|
|
int32_t size;
|
|
|
|
char segment[32];
|
|
|
|
ompi_buffer_t buffer;
|
|
|
|
ompi_process_name_t* peers;
|
|
|
|
mca_oob_tcp_subscription_t* subscription;
|
|
|
|
size_t npeers;
|
2004-08-19 23:34:37 +04:00
|
|
|
int rc;
|
2004-09-02 03:07:40 +04:00
|
|
|
ompi_list_item_t* item;
|
|
|
|
|
|
|
|
/* iterate through the open connections and send an ident message to all peers -
|
|
|
|
* note that we initially come up w/out knowing our process name - and are assigned
|
|
|
|
* a temporary name by our peer. once we have determined our real name - we send it
|
|
|
|
* to the peer.
|
|
|
|
*/
|
|
|
|
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
for(item = ompi_list_get_first(&mca_oob_tcp_component.tcp_peer_list);
|
|
|
|
item != ompi_list_get_end(&mca_oob_tcp_component.tcp_peer_list);
|
|
|
|
item = ompi_list_get_next(item)) {
|
|
|
|
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)item;
|
|
|
|
mca_oob_tcp_peer_send_ident(peer);
|
|
|
|
}
|
|
|
|
|
|
|
|
rc = mca_pcmclient.pcmclient_get_peers(&peers, &npeers);
|
|
|
|
if(rc != OMPI_SUCCESS) {
|
|
|
|
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
|
|
|
|
sprintf(segment, "oob-tcp-%u", mca_oob_name_self.jobid);
|
|
|
|
if(mca_oob_tcp_component.tcp_debug > 1) {
|
|
|
|
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_init: calling ompi_registry.synchro(%s,%d)\n",
|
|
|
|
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
|
|
|
segment,
|
|
|
|
npeers);
|
|
|
|
}
|
2004-08-19 23:34:37 +04:00
|
|
|
|
2004-09-02 03:07:40 +04:00
|
|
|
/* register synchro callback to receive notification when all processes have registered */
|
|
|
|
subscription = OBJ_NEW(mca_oob_tcp_subscription_t);
|
|
|
|
subscription->jobid = mca_oob_name_self.jobid;
|
|
|
|
ompi_list_append(&mca_oob_tcp_component.tcp_subscriptions, subscription);
|
|
|
|
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
|
|
|
|
rc = ompi_registry.synchro(
|
|
|
|
OMPI_REGISTRY_OR,
|
|
|
|
OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
|
|
|
|
segment,
|
|
|
|
NULL,
|
|
|
|
npeers,
|
|
|
|
mca_oob_tcp_registry_callback,
|
|
|
|
NULL);
|
|
|
|
if(rc != OMPI_SUCCESS) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_init: registry synchro failed with error code %d.", rc);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* put our contact info in registry */
|
|
|
|
keys[0] = ompi_name_server.get_proc_name_string(&mca_oob_name_self);
|
|
|
|
keys[1] = NULL;
|
2004-08-19 23:34:37 +04:00
|
|
|
|
2004-09-02 03:07:40 +04:00
|
|
|
if(mca_oob_tcp_component.tcp_debug > 1) {
|
|
|
|
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_init: calling ompi_registry.put(%s,%s)\n",
|
|
|
|
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
|
|
|
segment,
|
|
|
|
keys[0]);
|
|
|
|
}
|
|
|
|
|
|
|
|
ompi_buffer_init(&buffer, 128);
|
|
|
|
mca_oob_tcp_addr_pack(buffer);
|
|
|
|
ompi_buffer_get(buffer, &addr, &size);
|
|
|
|
rc = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, segment, keys, addr, size);
|
|
|
|
ompi_buffer_free(buffer);
|
2004-08-19 23:34:37 +04:00
|
|
|
if(rc != OMPI_SUCCESS) {
|
2004-09-02 03:07:40 +04:00
|
|
|
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_init: registry put failed with error code %d.",
|
|
|
|
OMPI_NAME_COMPONENTS(mca_oob_name_self), rc);
|
2004-08-19 23:34:37 +04:00
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
return OMPI_SUCCESS;
|
|
|
|
}
|
2004-07-01 18:49:54 +04:00
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
/*
|
|
|
|
* Module cleanup.
|
|
|
|
*/
|
2004-08-19 23:34:37 +04:00
|
|
|
int mca_oob_tcp_fini(void)
|
2004-07-01 18:49:54 +04:00
|
|
|
{
|
2004-08-04 18:33:02 +04:00
|
|
|
mca_oob_tcp_peer_t * peer;
|
2004-08-12 17:29:37 +04:00
|
|
|
|
|
|
|
/* close listen socket */
|
2004-08-05 23:37:48 +04:00
|
|
|
if (mca_oob_tcp_component.tcp_listen_sd >= 0) {
|
2004-08-12 17:29:37 +04:00
|
|
|
ompi_event_del(&mca_oob_tcp_component.tcp_recv_event);
|
2004-08-05 23:37:48 +04:00
|
|
|
if(0 != close(mca_oob_tcp_component.tcp_listen_sd)) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_finalize: error closing listen socket. errno=%d", errno);
|
|
|
|
}
|
|
|
|
}
|
2004-08-12 17:29:37 +04:00
|
|
|
|
|
|
|
/* cleanup all peers */
|
2004-08-06 21:23:37 +04:00
|
|
|
while(NULL != (peer = (mca_oob_tcp_peer_t *)
|
|
|
|
ompi_list_remove_first(&mca_oob_tcp_component.tcp_peer_list))) {
|
|
|
|
OBJ_DESTRUCT(peer);
|
|
|
|
}
|
2004-07-01 18:49:54 +04:00
|
|
|
return OMPI_SUCCESS;
|
|
|
|
}
|
2004-07-13 02:46:57 +04:00
|
|
|
|
2004-08-10 03:07:53 +04:00
|
|
|
/*
|
2004-08-05 19:30:36 +04:00
|
|
|
* Compare two process names for equality.
|
|
|
|
*
|
|
|
|
* @param n1 Process name 1.
|
|
|
|
* @param n2 Process name 2.
|
|
|
|
* @return (-1 for n1<n2 0 for equality, 1 for n1>n2)
|
|
|
|
*
|
|
|
|
* Note that the definition of < or > is somewhat arbitrary -
|
|
|
|
* just needs to be consistently applied to maintain an ordering
|
|
|
|
* when process names are used as indices.
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
int mca_oob_tcp_process_name_compare(const ompi_process_name_t* n1, const ompi_process_name_t* n2)
|
|
|
|
{
|
|
|
|
if(n1->cellid < n2->cellid)
|
|
|
|
return -1;
|
|
|
|
else if(n1->cellid > n2->cellid)
|
|
|
|
return 1;
|
|
|
|
else if(n1->jobid < n2->jobid)
|
|
|
|
return -1;
|
|
|
|
else if(n1->jobid > n2->jobid)
|
|
|
|
return 1;
|
|
|
|
else if(n1->vpid < n2->vpid)
|
|
|
|
return -1;
|
|
|
|
else if(n1->vpid > n2->vpid)
|
|
|
|
return 1;
|
|
|
|
return(0);
|
|
|
|
}
|
|
|
|
|
2004-08-16 23:39:54 +04:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Return local process address as a URI string.
|
|
|
|
*/
|
|
|
|
|
|
|
|
char* mca_oob_tcp_get_addr(void)
|
|
|
|
{
|
|
|
|
int i;
|
|
|
|
char *contact_info = malloc((ompi_ifcount()+1) * 32);
|
|
|
|
char *ptr = contact_info;
|
|
|
|
*ptr = 0;
|
|
|
|
|
|
|
|
for(i=ompi_ifbegin(); i>0; i=ompi_ifnext(i)) {
|
|
|
|
struct sockaddr_in addr;
|
|
|
|
ompi_ifindextoaddr(i, (struct sockaddr*)&addr, sizeof(addr));
|
2004-09-02 03:07:40 +04:00
|
|
|
if(addr.sin_addr.s_addr == inet_addr("127.0.0.1"))
|
|
|
|
continue;
|
2004-08-16 23:39:54 +04:00
|
|
|
if(ptr != contact_info) {
|
|
|
|
ptr += sprintf(ptr, ";");
|
|
|
|
}
|
|
|
|
ptr += sprintf(ptr, "tcp://%s:%d", inet_ntoa(addr.sin_addr), ntohs(mca_oob_tcp_component.tcp_listen_port));
|
|
|
|
}
|
|
|
|
return contact_info;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Parse a URI string into an IP address and port number.
|
|
|
|
*/
|
|
|
|
|
2004-08-19 23:34:37 +04:00
|
|
|
int mca_oob_tcp_parse_uri(const char* uri, struct sockaddr_in* inaddr)
|
2004-08-16 23:39:54 +04:00
|
|
|
{
|
|
|
|
char* tmp = strdup(uri);
|
|
|
|
char* ptr = tmp + 6;
|
|
|
|
char* addr = ptr;
|
|
|
|
char* port;
|
|
|
|
if(strncmp(tmp, "tcp://", 6) != 0) {
|
|
|
|
free(tmp);
|
|
|
|
return OMPI_ERR_BAD_PARAM;
|
|
|
|
}
|
|
|
|
|
|
|
|
ptr = strchr(addr, ':');
|
|
|
|
if(NULL == ptr) {
|
|
|
|
free(tmp);
|
|
|
|
return OMPI_ERR_BAD_PARAM;
|
|
|
|
}
|
|
|
|
|
|
|
|
*ptr = '\0';
|
|
|
|
ptr++;
|
|
|
|
port = ptr;
|
|
|
|
|
|
|
|
memset(inaddr, 0, sizeof(inaddr));
|
|
|
|
inaddr->sin_family = AF_INET;
|
|
|
|
inaddr->sin_addr.s_addr = inet_addr(addr);
|
|
|
|
if(inaddr->sin_addr.s_addr == INADDR_ANY) {
|
|
|
|
free(tmp);
|
|
|
|
return OMPI_ERR_BAD_PARAM;
|
|
|
|
}
|
|
|
|
inaddr->sin_port = htons(atoi(port));
|
|
|
|
free(tmp);
|
|
|
|
return OMPI_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Set address for the seed daemon. Note that this could be called multiple
|
|
|
|
* times if the seed daemon exports multiple addresses.
|
|
|
|
*/
|
|
|
|
|
|
|
|
int mca_oob_tcp_set_seed(const char* uri)
|
|
|
|
{
|
|
|
|
struct sockaddr_in inaddr;
|
2004-09-02 03:07:40 +04:00
|
|
|
mca_oob_tcp_addr_t* addr;
|
2004-08-16 23:39:54 +04:00
|
|
|
int rc;
|
|
|
|
if((rc = mca_oob_tcp_parse_uri(uri,&inaddr)) != OMPI_SUCCESS)
|
|
|
|
return rc;
|
|
|
|
|
2004-09-02 03:07:40 +04:00
|
|
|
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
addr = (mca_oob_tcp_addr_t*)ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_names, &mca_oob_name_seed);
|
|
|
|
if(NULL == addr) {
|
|
|
|
addr = OBJ_NEW(mca_oob_tcp_addr_t);
|
|
|
|
addr->addr_name = mca_oob_name_seed;
|
|
|
|
ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name, addr);
|
2004-08-16 23:39:54 +04:00
|
|
|
}
|
2004-09-02 03:07:40 +04:00
|
|
|
rc = mca_oob_tcp_addr_insert(addr, &inaddr);
|
|
|
|
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
|
|
|
return rc;
|
2004-08-16 23:39:54 +04:00
|
|
|
}
|
|
|
|
|