1
1
openmpi/orte/mca/oob/tcp/oob_tcp.c

1384 строки
48 KiB
C
Исходник Обычный вид История

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* In windows, many of the socket functions return an EWOULDBLOCK
* instead of \ things like EAGAIN, EINPROGRESS, etc. It has been
* verified that this will \ not conflict with other error codes that
* are returned by these functions \ under UNIX/Linux environments
*/
#include "orte_config.h"
#include "orte/orte_types.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include <fcntl.h>
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#include "opal/opal_socket_errno.h"
#include "opal/util/output.h"
#include "opal/util/if.h"
#include "orte/class/orte_proc_table.h"
#include "orte/mca/oob/tcp/oob_tcp.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ns/ns.h"
#include "orte/mca/gpr/gpr.h"
#include "ompi/constants.h"
/*
* Data structure for accepting connections.
*/
struct mca_oob_tcp_event_t {
opal_list_item_t item;
opal_event_t event;
};
typedef struct mca_oob_tcp_event_t mca_oob_tcp_event_t;
static void mca_oob_tcp_event_construct(mca_oob_tcp_event_t* event)
{
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
opal_list_append(&mca_oob_tcp_component.tcp_events, &event->item);
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
}
static void mca_oob_tcp_event_destruct(mca_oob_tcp_event_t* event)
{
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
opal_list_remove_item(&mca_oob_tcp_component.tcp_events, &event->item);
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
}
OBJ_CLASS_INSTANCE(
mca_oob_tcp_event_t,
opal_list_item_t,
mca_oob_tcp_event_construct,
mca_oob_tcp_event_destruct);
/*
* Local utility functions
*/
static int mca_oob_tcp_create_listen(void);
static int mca_oob_tcp_create_listen_thread(void);
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user);
static void mca_oob_tcp_accept(void);
struct mca_oob_tcp_subscription_t {
opal_list_item_t item;
orte_jobid_t jobid;
orte_gpr_subscription_id_t subid;
};
typedef struct mca_oob_tcp_subscription_t mca_oob_tcp_subscription_t;
OBJ_CLASS_INSTANCE(
mca_oob_tcp_subscription_t,
opal_list_item_t,
NULL,
NULL);
OBJ_CLASS_INSTANCE(
mca_oob_tcp_pending_connection_t,
opal_free_list_item_t,
NULL,
NULL);
/*
* Struct of function pointers and all that to let us be initialized
*/
- massive change for module<-->component name fixes throughout the code base. - many (most) mca type names have "component" or "module" in them, as relevant, just to further distinguish the difference between component data/actions and module data/actions. All developers are encouraged to perpetuate this convention when you create types that are specific to a framework, component, or module - did very little to entire framework (just the basics to make it compile) because it's just about to be almost entirely replaced - ditto for io / romio - did not work on elan or ib components; have to commit and then convert those on a different machine with the right libraries and headers - renamed a bunch of *_module.c files to *_component.c and *module*c to *component*c (a few still remain, e.g., ptl/ib, ptl/elan, etc.) - modified autogen/configure/build process to match new filenames (e.g., output static-components.h instead of static-modules.h) - removed DOS-style cr/lf stuff in ns/ns.h - added newline to end of file src/util/numtostr.h - removed some redundant error checking in the top-level topo functions - added a few {} here and there where people "forgot" to put them in for 1 line blocks ;-) - removed a bunch of MPI_* types from mca header files (replaced with corresponding ompi_* types) - all the ptl components had version numbers in their structs; removed - converted a few more elements in the MCA base to use the OBJ interface -- removed some old manual reference counting kruft This commit was SVN r1830.
2004-08-02 04:24:22 +04:00
mca_oob_tcp_component_t mca_oob_tcp_component = {
{
{
MCA_OOB_BASE_VERSION_1_0_0,
"tcp", /* MCA module name */
1, /* MCA component major version */
0, /* MCA component minor version */
0, /* MCA component release version */
mca_oob_tcp_component_open, /* component open */
mca_oob_tcp_component_close /* component close */
},
{
false /* checkpoint / restart */
},
mca_oob_tcp_component_init
- massive change for module<-->component name fixes throughout the code base. - many (most) mca type names have "component" or "module" in them, as relevant, just to further distinguish the difference between component data/actions and module data/actions. All developers are encouraged to perpetuate this convention when you create types that are specific to a framework, component, or module - did very little to entire framework (just the basics to make it compile) because it's just about to be almost entirely replaced - ditto for io / romio - did not work on elan or ib components; have to commit and then convert those on a different machine with the right libraries and headers - renamed a bunch of *_module.c files to *_component.c and *module*c to *component*c (a few still remain, e.g., ptl/ib, ptl/elan, etc.) - modified autogen/configure/build process to match new filenames (e.g., output static-components.h instead of static-modules.h) - removed DOS-style cr/lf stuff in ns/ns.h - added newline to end of file src/util/numtostr.h - removed some redundant error checking in the top-level topo functions - added a few {} here and there where people "forgot" to put them in for 1 line blocks ;-) - removed a bunch of MPI_* types from mca header files (replaced with corresponding ompi_* types) - all the ptl components had version numbers in their structs; removed - converted a few more elements in the MCA base to use the OBJ interface -- removed some old manual reference counting kruft This commit was SVN r1830.
2004-08-02 04:24:22 +04:00
}
};
static mca_oob_t mca_oob_tcp = {
mca_oob_tcp_get_addr,
mca_oob_tcp_set_addr,
mca_oob_tcp_ping,
mca_oob_tcp_send,
mca_oob_tcp_recv,
mca_oob_tcp_send_nb,
mca_oob_tcp_recv_nb,
mca_oob_tcp_recv_cancel,
mca_oob_tcp_init,
mca_oob_tcp_fini,
mca_oob_xcast
};
/*
* Utility function to register/lookup module parameters.
*/
static inline int mca_oob_tcp_param_register_int(
const char* param_name,
int default_value)
{
int id = mca_base_param_register_int("oob","tcp",param_name,NULL,default_value);
int param_value = default_value;
mca_base_param_lookup_int(id,&param_value);
return param_value;
}
static inline char* mca_oob_tcp_param_register_str(
const char* param_name,
const char* default_value)
{
int id = mca_base_param_register_string("oob","tcp",param_name,NULL,default_value);
char* param_value = NULL;
mca_base_param_lookup_string(id,&param_value);
return param_value;
}
/*
* Initialize global variables used w/in this module.
*/
int mca_oob_tcp_component_open(void)
{
char *listen_type;
int tmp;
#ifdef __WINDOWS__
WSADATA win_sock_data;
if (WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0) {
opal_output (0, "mca_oob_tcp_component_init: failed to initialise windows sockets: error %d\n", WSAGetLastError());
return ORTE_ERROR;
}
#endif
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_subscriptions, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_list, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peers, opal_hash_table_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_names, opal_hash_table_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_free, opal_free_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msgs, opal_free_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_events, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_post, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_recv, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_completed, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_cond, opal_condition_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_listen_thread, opal_thread_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections_fl, opal_free_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_copy_out_connections, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_copy_in_connections, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections_lock, opal_mutex_t);
/* register oob module parameters */
mca_oob_tcp_component.tcp_peer_limit =
mca_oob_tcp_param_register_int("peer_limit", -1);
mca_oob_tcp_component.tcp_peer_retries =
mca_oob_tcp_param_register_int("peer_retries", 60);
mca_oob_tcp_component.tcp_debug =
mca_oob_tcp_param_register_int("debug", 0);
mca_oob_tcp_component.tcp_include =
mca_oob_tcp_param_register_str("include", NULL);
mca_oob_tcp_component.tcp_exclude =
mca_oob_tcp_param_register_str("exclude", NULL);
mca_oob_tcp_component.tcp_sndbuf =
mca_oob_tcp_param_register_int("sndbuf", 128*1024);
mca_oob_tcp_component.tcp_rcvbuf =
mca_oob_tcp_param_register_int("rcvbuf", 128*1024);
/* AWF - may need to increase this for large-scale jobs -
see AWF comment in oob_tcp_peer.c */
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
"connect_timeout",
"connect() timeout in seconds, before trying next interface",
false,
false,
10,
&mca_oob_tcp_component.tcp_timeout);
mca_base_param_reg_string(&mca_oob_tcp_component.super.oob_base,
"listen_mode",
"Mode for HNP to accept incoming connections: event, listen_thread",
false,
false,
"event",
&listen_type);
if ((0 == strcmp(listen_type, "event")) || NULL == getenv("I_AM_MPIRUN")) {
mca_oob_tcp_component.tcp_listen_type = OOB_TCP_EVENT;
} else if (0 == strcmp(listen_type, "listen_thread")) {
mca_oob_tcp_component.tcp_listen_type = OOB_TCP_LISTEN_THREAD;
} else {
opal_output(0, "Invalid value for oob_tcp_listen_mode parameter: %s",
listen_type);
return ORTE_ERROR;
}
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
"listen_thread_max_queue",
"High water mark for queued accepted socket list size",
false,
false,
10,
&mca_oob_tcp_component.tcp_copy_max_size);
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
"listen_thread_max_time",
"Maximum amount of time (in milliseconds) to wait between processing accepted socket list",
false,
false,
10,
&tmp);
#if OPAL_TIMER_USEC_NATIVE
mca_oob_tcp_component.tcp_copy_delta = tmp * 1000;
#else
mca_oob_tcp_component.tcp_copy_delta = tmp *
opal_timer_base_get_freq() / 1000;
#endif
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
"accept_spin_count",
"Number of times to let accept return EWOULDBLOCK before updating accepted socket list",
false,
false,
10,
&mca_oob_tcp_component.tcp_copy_spin_count);
/* initialize state */
mca_oob_tcp_component.tcp_shutdown = false;
mca_oob_tcp_component.tcp_listen_sd = -1;
mca_oob_tcp_component.tcp_match_count = 0;
mca_oob_tcp_component.tcp_last_copy_time = 0;
return ORTE_SUCCESS;
}
/*
* Cleanup of global variables used by this module.
*/
int mca_oob_tcp_component_close(void)
{
#ifdef __WINDOWS__
WSACleanup();
#endif
/* cleanup resources */
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_list);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peers);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_names);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_free);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_events);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_subscriptions);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msgs);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_lock);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_post);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_recv);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_completed);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_lock);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_cond);
return ORTE_SUCCESS;
}
/*
* Called by mca_oob_tcp_recv_handler() when the TCP listen
* socket has pending connection requests. Accept incoming
* requests and queue for completion of the connection handshake.
*/
static void mca_oob_tcp_accept(void)
{
while(true) {
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
struct sockaddr_in addr;
mca_oob_tcp_event_t* event;
int sd;
sd = accept(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&addr, &addrlen);
if(sd < 0) {
if(opal_socket_errno == EINTR)
continue;
if(opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK)
opal_output(0, "mca_oob_tcp_accept: accept() failed with errno %d.", opal_socket_errno);
return;
}
/* setup socket options */
mca_oob_tcp_set_socket_options(sd);
/* log the accept */
if(mca_oob_tcp_component.tcp_debug) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_accept: %s:%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
inet_ntoa(addr.sin_addr),
addr.sin_port);
}
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW(mca_oob_tcp_event_t);
opal_event_set(&event->event, sd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);
opal_event_add(&event->event, 0);
}
}
/*
* Create a listen socket and bind to all interfaces
*/
static int mca_oob_tcp_create_listen(void)
{
int flags;
struct sockaddr_in inaddr;
opal_socklen_t addrlen;
/* create a listen socket for incoming connections */
mca_oob_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0);
if(mca_oob_tcp_component.tcp_listen_sd < 0) {
opal_output(0,"mca_oob_tcp_component_init: socket() failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
}
/* setup socket options */
mca_oob_tcp_set_socket_options(mca_oob_tcp_component.tcp_listen_sd);
/* bind address */
memset(&inaddr, 0, sizeof(inaddr));
inaddr.sin_family = AF_INET;
inaddr.sin_addr.s_addr = INADDR_ANY;
inaddr.sin_port = 0;
if(bind(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
opal_output(0,"mca_oob_tcp_create_listen: bind() failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
}
/* resolve system assigned port */
addrlen = sizeof(struct sockaddr_in);
if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
opal_output(0, "mca_oob_tcp_create_listen: getsockname() failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
}
mca_oob_tcp_component.tcp_listen_port = inaddr.sin_port;
/* setup listen backlog to maximum allowed by kernel */
if(listen(mca_oob_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) {
opal_output(0, "mca_oob_tcp_component_init: listen() failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
}
/* set socket up to be non-blocking, otherwise accept could block */
if((flags = fcntl(mca_oob_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) {
opal_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
} else {
flags |= O_NONBLOCK;
if(fcntl(mca_oob_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) {
opal_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
}
}
/* register listen port */
opal_event_set(
&mca_oob_tcp_component.tcp_recv_event,
mca_oob_tcp_component.tcp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_oob_tcp_recv_handler,
0);
opal_event_add(&mca_oob_tcp_component.tcp_recv_event, 0);
return ORTE_SUCCESS;
}
static void* mca_oob_tcp_listen_thread(opal_object_t *obj)
{
int rc, count;
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
opal_free_list_item_t *fl_item;
mca_oob_tcp_pending_connection_t *item;
struct timeval timeout;
fd_set readfds;
while (false == mca_oob_tcp_component.tcp_shutdown) {
count = 0;
FD_ZERO(&readfds);
FD_SET(mca_oob_tcp_component.tcp_listen_sd, &readfds);
timeout.tv_sec = 0;
timeout.tv_usec = 10000;
rc = select(mca_oob_tcp_component.tcp_listen_sd + 1, &readfds,
NULL, NULL, &timeout);
if (rc < 0) {
if (EAGAIN != opal_socket_errno && EINTR != opal_socket_errno) {
perror("select");
}
continue;
}
while (count < mca_oob_tcp_component.tcp_copy_spin_count &&
opal_list_get_size(&mca_oob_tcp_component.tcp_copy_in_connections) <
(size_t) mca_oob_tcp_component.tcp_copy_max_size) {
OPAL_FREE_LIST_WAIT(&mca_oob_tcp_component.tcp_pending_connections_fl,
fl_item, rc);
item = (mca_oob_tcp_pending_connection_t*) fl_item;
item->fd = accept(mca_oob_tcp_component.tcp_listen_sd,
(struct sockaddr*)&(item->addr), &addrlen);
if(item->fd < 0) {
OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_pending_connections_fl,
fl_item);
if (mca_oob_tcp_component.tcp_shutdown) return NULL;
if(opal_socket_errno != EAGAIN || opal_socket_errno != EWOULDBLOCK) {
opal_output(0, "mca_oob_tcp_accept: accept() failed with errno %d.", opal_socket_errno);
close(item->fd);
return NULL;
}
count++;
continue;
}
if(mca_oob_tcp_component.tcp_debug) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_listen_thread: (%d, %d) %s:%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
item->fd, opal_socket_errno,
inet_ntoa(item->addr.sin_addr),
item->addr.sin_port);
}
opal_list_append(&mca_oob_tcp_component.tcp_copy_in_connections,
(opal_list_item_t*) item);
}
if (0 < opal_list_get_size(&mca_oob_tcp_component.tcp_copy_in_connections)) {
opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);
opal_list_join(&mca_oob_tcp_component.tcp_pending_connections,
opal_list_get_end(&mca_oob_tcp_component.tcp_pending_connections),
&mca_oob_tcp_component.tcp_copy_in_connections);
opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);
}
}
return NULL;
}
/* called from opal_progress() to create the oob contact information
for the file descriptors accepted() by the accept thread. */
static int mca_oob_tcp_listen_progress(void)
{
int count = 0;
mca_oob_tcp_pending_connection_t *item;
mca_oob_tcp_event_t* event;
#if OPAL_TIMER_USEC_NATIVE
opal_timer_t now = opal_timer_base_get_usec();
#else
opal_timer_t now = opal_timer_base_get_cycles();
#endif /* OPAL_TIMER_USEC_NATIVE */
/* if we've not pulled pending connections for a while OR we've
hit the high water mark of pending connections, grab all the
pending connections */
if ((now - mca_oob_tcp_component.tcp_last_copy_time >
mca_oob_tcp_component.tcp_copy_delta) ||
((size_t) mca_oob_tcp_component.tcp_copy_max_size <
opal_list_get_size(&mca_oob_tcp_component.tcp_pending_connections))) {
/* copy the pending connections from the list the accept
thread is inserting into into a temporary list for us to
process from. This is an O(1) operation, so we minimize
the lock time */
opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);
opal_list_join(&mca_oob_tcp_component.tcp_copy_out_connections,
opal_list_get_end(&mca_oob_tcp_component.tcp_copy_out_connections),
&mca_oob_tcp_component.tcp_pending_connections);
opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);
/* process al the connections */
while (NULL != (item = (mca_oob_tcp_pending_connection_t*)
opal_list_remove_first(&mca_oob_tcp_component.
tcp_copy_out_connections))) {
/* setup socket options */
mca_oob_tcp_set_socket_options(item->fd);
/* log the accept */
if(mca_oob_tcp_component.tcp_debug) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_listen_progress: %s:%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
inet_ntoa(item->addr.sin_addr),
item->addr.sin_port);
}
/* wait for receipt of peers process identifier to
complete this connection */
event = OBJ_NEW(mca_oob_tcp_event_t);
opal_event_set(&event->event, item->fd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);
opal_event_add(&event->event, 0);
OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_pending_connections_fl,
(opal_free_list_item_t *) item);
count++;
}
mca_oob_tcp_component.tcp_last_copy_time = now;
}
return count;
}
static int mca_oob_tcp_create_listen_thread(void)
{
struct sockaddr_in inaddr;
opal_socklen_t addrlen;
int flags;
/* create a listen socket for incoming connections */
mca_oob_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0);
if(mca_oob_tcp_component.tcp_listen_sd < 0) {
opal_output(0,"mca_oob_tcp_component_init: socket() failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
}
/* setup socket options */
mca_oob_tcp_set_socket_options(mca_oob_tcp_component.tcp_listen_sd);
/* bind address */
memset(&inaddr, 0, sizeof(inaddr));
inaddr.sin_family = AF_INET;
inaddr.sin_addr.s_addr = INADDR_ANY;
inaddr.sin_port = 0;
if(bind(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
opal_output(0,"mca_oob_tcp_create_listen: bind() failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
}
/* resolve system assigned port */
addrlen = sizeof(struct sockaddr_in);
if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
opal_output(0, "mca_oob_tcp_create_listen: getsockname() failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
}
mca_oob_tcp_component.tcp_listen_port = inaddr.sin_port;
/* setup listen backlog to maximum allowed by kernel */
if(listen(mca_oob_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) {
opal_output(0, "mca_oob_tcp_component_init: listen() failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
}
/* set socket up to be non-blocking, otherwise accept could block */
if((flags = fcntl(mca_oob_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) {
opal_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
} else {
flags |= O_NONBLOCK;
if(fcntl(mca_oob_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) {
opal_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed with errno=%d", opal_socket_errno);
return ORTE_ERROR;
}
}
/* start the listen thread */
mca_oob_tcp_component.tcp_listen_thread.t_run = mca_oob_tcp_listen_thread;
mca_oob_tcp_component.tcp_listen_thread.t_arg = NULL;
return opal_thread_start(&mca_oob_tcp_component.tcp_listen_thread);
}
/*
* Handle probe
*/
static void mca_oob_tcp_recv_probe(int sd, mca_oob_tcp_hdr_t* hdr)
{
unsigned char* ptr = (unsigned char*)hdr;
size_t cnt = 0;
hdr->msg_type = MCA_OOB_TCP_PROBE;
hdr->msg_dst = hdr->msg_src;
hdr->msg_src = *orte_process_info.my_name;
MCA_OOB_TCP_HDR_HTON(hdr);
while(cnt < sizeof(mca_oob_tcp_hdr_t)) {
int retval = send(sd, (char *)ptr+cnt, sizeof(mca_oob_tcp_hdr_t)-cnt, 0);
if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_probe: send() failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(hdr->msg_src)),
opal_socket_errno);
CLOSE_THE_SOCKET(sd);
return;
}
continue;
}
cnt += retval;
}
CLOSE_THE_SOCKET(sd);
}
/*
* Handle connection request
*/
static void mca_oob_tcp_recv_connect(int sd, mca_oob_tcp_hdr_t* hdr)
{
mca_oob_tcp_peer_t* peer;
int flags;
int cmpval;
/* now set socket up to be non-blocking */
if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: fcntl(F_GETFL) failed with errno=%d",
ORTE_NAME_ARGS(orte_process_info.my_name), opal_socket_errno);
} else {
flags |= O_NONBLOCK;
if(fcntl(sd, F_SETFL, flags) < 0) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: fcntl(F_SETFL) failed with errno=%d",
ORTE_NAME_ARGS(orte_process_info.my_name), opal_socket_errno);
}
}
/* check for wildcard name - if this is true - we allocate a name from the name server
* and return to the peer
*/
cmpval = orte_ns.compare(ORTE_NS_CMP_ALL, &hdr->msg_src, MCA_OOB_NAME_ANY);
if (cmpval == 0) {
if (ORTE_SUCCESS != orte_ns.create_jobid(&hdr->msg_src.jobid)) {
return;
}
if (ORTE_SUCCESS != orte_ns.reserve_range(hdr->msg_src.jobid, 1, &hdr->msg_src.vpid)) {
return;
}
if (ORTE_SUCCESS != orte_ns.assign_cellid_to_process(&hdr->msg_src)) {
return;
}
}
/* lookup the corresponding process */
peer = mca_oob_tcp_peer_lookup(&hdr->msg_src);
if(NULL == peer) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: unable to locate peer",
ORTE_NAME_ARGS(orte_process_info.my_name));
CLOSE_THE_SOCKET(sd);
return;
}
/* is the peer instance willing to accept this connection */
if(mca_oob_tcp_peer_accept(peer, sd) == false) {
if(mca_oob_tcp_component.tcp_debug > 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_recv_handler: "
"rejected connection from [%lu,%lu,%lu] connection state %d",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
ORTE_NAME_ARGS(&(hdr->msg_src)),
peer->peer_state);
}
CLOSE_THE_SOCKET(sd);
return;
}
}
/*
* Event callback when there is data available on the registered
* socket to recv.
*/
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
{
mca_oob_tcp_hdr_t hdr;
mca_oob_tcp_event_t* event = (mca_oob_tcp_event_t *)user;
int rc;
/* accept new connections on the listen socket */
if(mca_oob_tcp_component.tcp_listen_sd == sd) {
mca_oob_tcp_accept();
return;
}
OBJ_RELEASE(event);
/* Some mem checkers don't realize that hdr will guarantee to be
fully filled in during the read(), below :-( */
OMPI_DEBUG_ZERO(hdr);
/* recv the process identifier */
while((rc = recv(sd, (char *)&hdr, sizeof(hdr), 0)) != sizeof(hdr)) {
if(rc >= 0) {
if(mca_oob_tcp_component.tcp_debug > 1) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: peer closed connection",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
CLOSE_THE_SOCKET(sd);
return;
}
if(opal_socket_errno != EINTR) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: recv() failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name), opal_socket_errno);
CLOSE_THE_SOCKET(sd);
return;
}
}
MCA_OOB_TCP_HDR_NTOH(&hdr);
/* dispatch based on message type */
switch(hdr.msg_type) {
case MCA_OOB_TCP_PROBE:
mca_oob_tcp_recv_probe(sd, &hdr);
break;
case MCA_OOB_TCP_CONNECT:
mca_oob_tcp_recv_connect(sd, &hdr);
break;
default:
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: invalid message type: %d\n",
ORTE_NAME_ARGS(orte_process_info.my_name), hdr.msg_type);
CLOSE_THE_SOCKET(sd);
break;
}
}
/*
* Component initialization - create a module.
* (1) initialize static resources
* (2) create listen socket
*/
mca_oob_t* mca_oob_tcp_component_init(int* priority)
{
*priority = 1;
/* are there any interfaces? */
if(opal_ifcount() <= 0)
return NULL;
/* initialize data structures */
opal_hash_table_init(&mca_oob_tcp_component.tcp_peers, 128);
opal_hash_table_init(&mca_oob_tcp_component.tcp_peer_names, 128);
opal_free_list_init(&mca_oob_tcp_component.tcp_peer_free,
sizeof(mca_oob_tcp_peer_t),
OBJ_CLASS(mca_oob_tcp_peer_t),
8, /* initial number */
mca_oob_tcp_component.tcp_peer_limit, /* maximum number */
8); /* increment to grow by */
opal_free_list_init(&mca_oob_tcp_component.tcp_msgs,
sizeof(mca_oob_tcp_msg_t),
OBJ_CLASS(mca_oob_tcp_msg_t),
8, /* initial number */
-1, /* maximum number */
8); /* increment to grow by */
/* intialize event library */
memset(&mca_oob_tcp_component.tcp_recv_event, 0, sizeof(opal_event_t));
memset(&mca_oob_tcp_component.tcp_send_event, 0, sizeof(opal_event_t));
/* create a listen socket */
if (OOB_TCP_EVENT == mca_oob_tcp_component.tcp_listen_type) {
if(mca_oob_tcp_create_listen() != ORTE_SUCCESS) {
opal_output(0, "mca_oob_tcp_init: unable to create listen socket");
return NULL;
}
} else if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) {
if (mca_oob_tcp_create_listen_thread() != ORTE_SUCCESS) {
opal_output(0, "mca_oob_tcp_init: unable to create listen thread");
return NULL;
}
opal_free_list_init(&mca_oob_tcp_component.tcp_pending_connections_fl,
sizeof(mca_oob_tcp_pending_connection_t),
OBJ_CLASS(mca_oob_tcp_pending_connection_t),
16, /* initial number */
-1, /* maximum number */
16); /* increment to grow by */
opal_progress_register(mca_oob_tcp_listen_progress);
}
return &mca_oob_tcp;
}
/*
* Callback from registry on change to subscribed segments.
*/
void mca_oob_tcp_registry_callback(
orte_gpr_notify_data_t* data,
void* cbdata)
{
orte_std_cntr_t i, j, k;
int rc;
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
orte_gpr_value_t **values, *value;
orte_gpr_keyval_t *keyval;
orte_byte_object_t *bo;
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
orte_buffer_t buffer;
mca_oob_tcp_addr_t* addr, *existing;
mca_oob_tcp_peer_t* peer;
if(mca_oob_tcp_component.tcp_debug > 1) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_registry_callback\n",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
/* process the callback */
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
values = (orte_gpr_value_t**)(data->values)->addr;
for(i = 0, k=0; k < data->cnt &&
i < (data->values)->size; i++) {
if (NULL != values[i]) {
k++;
value = values[i];
for(j = 0; j < value->cnt; j++) {
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
/* check to make sure this is the requested key */
keyval = value->keyvals[j];
if(strcmp(keyval->key,"oob-tcp") != 0)
continue;
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
/* transfer ownership of registry object to buffer and unpack */
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bo, keyval->value, ORTE_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
continue;
}
if(orte_dss.load(&buffer, bo->bytes, bo->size) != ORTE_SUCCESS) {
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
/* TSW - throw ERROR */
continue;
}
/* protect the values from the release */
keyval->value->type = ORTE_NULL;
keyval->value->data = NULL;
/* unpack the buffer */
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
addr = mca_oob_tcp_addr_unpack(&buffer);
OBJ_DESTRUCT(&buffer);
if(NULL == addr) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_registry_callback: unable to unpack peer address\n",
ORTE_NAME_ARGS(orte_process_info.my_name));
continue;
}
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
if(mca_oob_tcp_component.tcp_debug > 1) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_registry_callback: received peer [%lu,%lu,%lu]\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(addr->addr_name)));
}
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
/* check for existing cache entry */
existing = (mca_oob_tcp_addr_t *)orte_hash_table_get_proc(
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name);
if(NULL != existing) {
/* TSW - need to update existing entry */
OBJ_RELEASE(addr);
continue;
}
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
/* insert into cache and notify peer */
orte_hash_table_set_proc(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name, addr);
peer = (mca_oob_tcp_peer_t *)orte_hash_table_get_proc(
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
&mca_oob_tcp_component.tcp_peers, &addr->addr_name);
if(NULL != peer)
mca_oob_tcp_peer_resolved(peer, addr);
}
}
}
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
}
/*
* Attempt to resolve peer name.
*/
int mca_oob_tcp_resolve(mca_oob_tcp_peer_t* peer)
{
mca_oob_tcp_addr_t* addr;
mca_oob_tcp_subscription_t* subscription;
char *segment, *sub_name, *trig_name;
char *key="oob-tcp";
orte_gpr_subscription_id_t sub_id;
opal_list_item_t* item;
int rc;
/* if the address is already cached - simply return it */
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
addr = (mca_oob_tcp_addr_t *)orte_hash_table_get_proc(&mca_oob_tcp_component.tcp_peer_names,
&peer->peer_name);
if(NULL != addr) {
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
mca_oob_tcp_peer_resolved(peer, addr);
return ORTE_SUCCESS;
}
/* check to see if we have subscribed to this registry segment */
for( item = opal_list_get_first(&mca_oob_tcp_component.tcp_subscriptions);
item != opal_list_get_end(&mca_oob_tcp_component.tcp_subscriptions);
item = opal_list_get_next(item)) {
subscription = (mca_oob_tcp_subscription_t*)item;
if(subscription->jobid == peer->peer_name.jobid) {
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return ORTE_SUCCESS;
}
}
if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name,
OMPI_OOB_SUBSCRIPTION, peer->peer_name.jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* attach to the stage-1 standard trigger */
if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name,
ORTE_STG1_TRIGGER, peer->peer_name.jobid))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
return rc;
}
/* define the segment */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment,
peer->peer_name.jobid))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
return rc;
}
/* If we do not release the mutex before the subscrition we will deadlock
* as the suscription involve oob_tcp_send who involve again a lookup
* call. Before unlocking the mutex (just to be protected by it) we can
* create the subscription and add it to the list.
*/
subscription = OBJ_NEW(mca_oob_tcp_subscription_t);
subscription->jobid = peer->peer_name.jobid;
opal_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item);
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, NULL, NULL,
ORTE_GPR_NOTIFY_ADD_ENTRY |
ORTE_GPR_NOTIFY_VALUE_CHG |
ORTE_GPR_NOTIFY_PRE_EXISTING,
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR,
segment,
NULL, /* look at all containers on this segment */
key,
mca_oob_tcp_registry_callback, NULL))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
free(segment);
/* Subscription registration failed, we should activate the cleaning logic:
* remove the subscription from the list (protected by the mutex).
*/
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
opal_list_remove_item( &mca_oob_tcp_component.tcp_subscriptions,
&subscription->item );
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return rc;
}
/* the id of each subscription is recorded
* here so we can (if desired) cancel that subscription later
*/
subscription->subid = sub_id;
/* done with these, so release any memory */
free(trig_name);
free(sub_name);
free(segment);
return rc;
}
/*
* Setup contact information in the registry.
*/
int mca_oob_tcp_init(void)
{
orte_jobid_t jobid;
orte_buffer_t *buffer;
orte_gpr_subscription_id_t sub_id;
char *sub_name, *segment, *trig_name, **tokens;
char *keys[] = {"oob-tcp", ORTE_PROC_RML_IP_ADDRESS_KEY};
orte_data_value_t *values[2];
orte_byte_object_t bo;
mca_oob_tcp_subscription_t *subscription;
int rc;
opal_list_item_t* item;
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
char *tmp, *tmp2, *tmp3;
orte_std_cntr_t i, num_tokens;
/* random delay to stagger connections back to seed */
#if defined(__WINDOWS__)
Sleep((orte_process_info.my_name->vpid % orte_process_info.num_procs % 1000) * 100);
#else
usleep((orte_process_info.my_name->vpid % orte_process_info.num_procs % 1000) * 1000);
#endif
/* get my jobid */
if (ORTE_SUCCESS != (rc = orte_ns.get_jobid(&jobid,
orte_process_info.my_name))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* iterate through the open connections and send an ident message to all peers -
* note that we initially come up w/out knowing our process name - and are assigned
* a temporary name by our peer. once we have determined our real name - we send it
* to the peer.
*/
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
for(item = opal_list_get_first(&mca_oob_tcp_component.tcp_peer_list);
item != opal_list_get_end(&mca_oob_tcp_component.tcp_peer_list);
item = opal_list_get_next(item)) {
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)item;
mca_oob_tcp_peer_send_ident(peer);
}
/* register subscribe callback to receive notification when all processes have registered */
subscription = OBJ_NEW(mca_oob_tcp_subscription_t);
subscription->jobid = jobid;
opal_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item);
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
if(mca_oob_tcp_component.tcp_debug > 2) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_init: calling orte_gpr.subscribe\n",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name,
OMPI_OOB_SUBSCRIPTION, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* attach to the stage-1 standard trigger */
if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name,
ORTE_STG1_TRIGGER, jobid))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
return rc;
}
/* define the segment */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, trig_name, sub_name,
ORTE_GPR_NOTIFY_ADD_ENTRY |
ORTE_GPR_NOTIFY_VALUE_CHG |
ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG,
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR,
segment,
NULL, /* look at all containers on this segment */
keys[0],
mca_oob_tcp_registry_callback, NULL))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
free(segment);
return rc;
}
/* the id of each subscription is recorded
* here so we can (if desired) cancel that subscription later
*/
subscription->subid = sub_id;
/* done with these, so release any memory */
free(trig_name);
free(sub_name);
/* now setup to put our contact info on registry */
buffer = OBJ_NEW(orte_buffer_t);
if(buffer == NULL) {
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (ORTE_SUCCESS != (rc = mca_oob_tcp_addr_pack(buffer))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
return rc;
}
/* extract payload for storage */
if (ORTE_SUCCESS != (rc = orte_dss.unload(buffer, (void**)&(bo.bytes), &(bo.size)))) {
ORTE_ERROR_LOG(rc);
free(segment);
OBJ_RELEASE(buffer);
return rc;
}
OBJ_RELEASE(buffer);
values[0] = OBJ_NEW(orte_data_value_t);
if (NULL == values[0]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
values[0]->type = ORTE_BYTE_OBJECT;
if (ORTE_SUCCESS != (rc = orte_dss.copy(&(values[0]->data), &bo, ORTE_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
free(segment);
return rc;
}
/* setup the IP address for storage */
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
tmp = mca_oob.oob_get_addr();
Add a job_info segment to the system that holds a container for each job. Within each container is a keyval indicating the job state (i.e., all procs at stage1, finalized, etc.). This provides a rough state-of-health for the job. This required a little fiddling with a number of areas. Biggest problem was that it uncovered a potential for an infinite loop to be created in the registry. If a callback function modified the registry, the registry checked the triggers to see if anything had fired. Well, if the original callback was due to a trigger firing, that condition hadn't changed - so the trigger fired again....which caused the callback to be called, which modified the registry, which checked the triggers, etc. etc. Triggers are now checked and then "flagged" as being "in process" so that the registry will NOT recheck that trigger until all callbacks have been processed. Tried doing this with subscriptions as well, but that caused a problem - when we release processes from a stagegate, they (at the moment) immediately place data on the registry that should cause a subscription to fire. Unfortunately, the system will just hang if that subscription doesn't get processed. So, I have left the subscription system alone - any callback function that modifies the registry in a fashion that will fire a subscription will indeed fire that subscription. We'll have to see if this causes problems - it shouldn't, but a careless user could lock things up if the callback generates a callback to itself. Also fixed the code that placed a process' RML contact info on the registry to eliminate the leading '/' from the string. This commit was SVN r6684.
2005-07-29 18:11:19 +04:00
tmp2 = strrchr(tmp, '/') + 1;
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
tmp3 = strrchr(tmp, ':');
if(NULL == tmp2 || NULL == tmp3) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_init: invalid address \'%s\' "
"returned for selected oob interfaces.\n",
ORTE_NAME_ARGS(orte_process_info.my_name), tmp);
ORTE_ERROR_LOG(ORTE_ERROR);
free(segment);
free(tmp);
free(bo.bytes);
return ORTE_ERROR;
}
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
*tmp3 = '\0';
values[1] = OBJ_NEW(orte_data_value_t);
if (NULL == values[1]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
values[1]->type = ORTE_STRING;
values[1]->data = strdup(tmp2);
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 22:49:00 +04:00
free(tmp);
/* get the process tokens */
if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens, &num_tokens,
orte_process_info.my_name))) {
ORTE_ERROR_LOG(rc);
free(segment);
OBJ_RELEASE(values[0]);
OBJ_RELEASE(values[1]);
return rc;
}
/* put our contact info in registry */
if (ORTE_SUCCESS != (rc = orte_gpr.put_N(ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_XAND,
segment, tokens, 2, keys, values))) {
ORTE_ERROR_LOG(rc);
}
free(segment);
for(i=0; i < num_tokens; i++) {
free(tokens[i]);
tokens[i] = NULL;
}
if (NULL != tokens) free(tokens);
OBJ_RELEASE(values[0]);
OBJ_RELEASE(values[1]);
return rc;
}
/*
* Module cleanup.
*/
int mca_oob_tcp_fini(void)
{
opal_list_item_t *item;
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
opal_event_disable(); /* disable event processing */
/* close listen socket */
if (mca_oob_tcp_component.tcp_listen_sd >= 0) {
if (OOB_TCP_EVENT == mca_oob_tcp_component.tcp_listen_type) {
opal_event_del(&mca_oob_tcp_component.tcp_recv_event);
close(mca_oob_tcp_component.tcp_listen_sd);
} else if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) {
void *data;
mca_oob_tcp_component.tcp_shutdown = true;
close(mca_oob_tcp_component.tcp_listen_sd);
opal_thread_join(&mca_oob_tcp_component.tcp_listen_thread, &data);
opal_progress_unregister(mca_oob_tcp_listen_progress);
}
mca_oob_tcp_component.tcp_listen_sd = -1;
}
/* cleanup all peers */
for(item = opal_list_remove_first(&mca_oob_tcp_component.tcp_peer_list);
item != NULL;
item = opal_list_remove_first(&mca_oob_tcp_component.tcp_peer_list)) {
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)item;
MCA_OOB_TCP_PEER_RETURN(peer);
}
/* delete any pending events */
for(item = opal_list_remove_first(&mca_oob_tcp_component.tcp_events);
item != NULL;
item = opal_list_remove_first(&mca_oob_tcp_component.tcp_events)) {
mca_oob_tcp_event_t* event = (mca_oob_tcp_event_t*)item;
opal_event_del(&event->event);
OBJ_RELEASE(event);
}
opal_event_enable();
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return ORTE_SUCCESS;
}
/*
* Compare two process names for equality.
*
* @param n1 Process name 1.
* @param n2 Process name 2.
* @return (-1 for n1<n2 0 for equality, 1 for n1>n2)
*
* Note that the definition of < or > is somewhat arbitrary -
* just needs to be consistently applied to maintain an ordering
* when process names are used as indices.
*/
int mca_oob_tcp_process_name_compare(const orte_process_name_t* n1, const orte_process_name_t* n2)
{
return orte_ns.compare(ORTE_NS_CMP_ALL, n1, n2);
}
/*
* Return local process address as a URI string.
*/
char* mca_oob_tcp_get_addr(void)
{
int i;
char *contact_info = (char *)malloc((opal_ifcount()+1) * 32);
char *ptr = contact_info;
*ptr = 0;
for(i=opal_ifbegin(); i>0; i=opal_ifnext(i)) {
struct sockaddr_in addr;
char name[32];
opal_ifindextoname(i, name, sizeof(name));
if (mca_oob_tcp_component.tcp_include != NULL &&
strstr(mca_oob_tcp_component.tcp_include,name) == NULL)
continue;
if (mca_oob_tcp_component.tcp_exclude != NULL &&
strstr(mca_oob_tcp_component.tcp_exclude,name) != NULL)
continue;
opal_ifindextoaddr(i, (struct sockaddr*)&addr, sizeof(addr));
if(opal_ifcount() > 1 &&
opal_ifislocalhost((struct sockaddr*) &addr))
continue;
if(ptr != contact_info) {
ptr += sprintf(ptr, ";");
}
ptr += sprintf(ptr, "tcp://%s:%d", inet_ntoa(addr.sin_addr),
ntohs(mca_oob_tcp_component.tcp_listen_port));
}
return contact_info;
}
/*
* Parse a URI string into an IP address and port number.
*/
int mca_oob_tcp_parse_uri(const char* uri, struct sockaddr_in* inaddr)
{
char* tmp = strdup(uri);
char* ptr = tmp + 6;
char* addr = ptr;
char* port;
if(strncmp(tmp, "tcp://", 6) != 0) {
free(tmp);
return ORTE_ERR_BAD_PARAM;
}
ptr = strchr(addr, ':');
if(NULL == ptr) {
free(tmp);
return ORTE_ERR_BAD_PARAM;
}
*ptr = '\0';
ptr++;
port = ptr;
memset(inaddr, 0, sizeof(inaddr));
inaddr->sin_family = AF_INET;
inaddr->sin_addr.s_addr = inet_addr(addr);
if(inaddr->sin_addr.s_addr == INADDR_ANY) {
free(tmp);
return ORTE_ERR_BAD_PARAM;
}
inaddr->sin_port = htons(atoi(port));
free(tmp);
return ORTE_SUCCESS;
}
/*
* Setup address in the cache. Note that this could be called multiple
* times if a given destination exports multiple addresses.
*/
int mca_oob_tcp_set_addr(const orte_process_name_t* name, const char* uri)
{
struct sockaddr_in inaddr;
mca_oob_tcp_addr_t* addr;
mca_oob_tcp_peer_t* peer;
int rc;
if((rc = mca_oob_tcp_parse_uri(uri,&inaddr)) != ORTE_SUCCESS)
return rc;
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
addr = (mca_oob_tcp_addr_t*)orte_hash_table_get_proc(&mca_oob_tcp_component.tcp_peer_names, name);
if(NULL == addr) {
addr = OBJ_NEW(mca_oob_tcp_addr_t);
addr->addr_name = *name;
orte_hash_table_set_proc(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name, addr);
}
rc = mca_oob_tcp_addr_insert(addr, &inaddr);
peer = (mca_oob_tcp_peer_t *)orte_hash_table_get_proc(
&mca_oob_tcp_component.tcp_peers, &addr->addr_name);
if(NULL != peer) {
mca_oob_tcp_peer_resolved(peer, addr);
}
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return rc;
}