1
1

Add multi-link capabilities to the TCP BTL. This is useful for systems where the

latency is high and the network relatively fast. This will allow for more kernel
level buffering, which allow overlap between system calls and communications.
Somehow, even on fast clusters there is an improvement (non significant).

This patch create multiple modules for the same device, which in turn will
create multiple sockets between the peers. By default the number of BTL by
device is set to 1, so there is no fundamental difference with the current
version. Change the value of btl_tcp_links to enable multiple links between
peers.

This commit was SVN r14076.
Этот коммит содержится в:
George Bosilca 2007-03-20 11:50:17 +00:00
родитель 0edd770644
Коммит 8c9e4baa47
4 изменённых файлов: 86 добавлений и 86 удалений

Просмотреть файл

@ -121,8 +121,8 @@ int mca_btl_tcp_add_procs(
tcp_endpoint->endpoint_btl = tcp_btl;
rc = mca_btl_tcp_proc_insert(tcp_proc, tcp_endpoint);
if(rc != OMPI_SUCCESS) {
OBJ_RELEASE(tcp_endpoint);
OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock);
OBJ_RELEASE(tcp_endpoint);
continue;
}

Просмотреть файл

@ -59,6 +59,7 @@ extern "C" {
struct mca_btl_tcp_component_t {
mca_btl_base_component_1_0_1_t super; /**< base BTL component */
uint32_t tcp_num_btls; /**< number of hcas available to the TCP component */
uint32_t tcp_num_links; /**< number of logical links per physical device */
struct mca_btl_tcp_module_t **tcp_btls; /**< array of available BTL modules */
struct mca_btl_tcp_proc_t* tcp_local; /**< local proc struct */
int tcp_free_list_num; /**< initial size of free lists */

Просмотреть файл

@ -169,7 +169,7 @@ int mca_btl_tcp_component_open(void)
{
#ifdef __WINDOWS__
WSADATA win_sock_data;
if (WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0) {
if( WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0 ) {
BTL_ERROR(("failed to initialise windows sockets:%d", WSAGetLastError()));
return OMPI_ERROR;
}
@ -190,6 +190,8 @@ int mca_btl_tcp_component_open(void)
opal_hash_table_init(&mca_btl_tcp_component.tcp_procs, 256);
/* register TCP component parameters */
mca_btl_tcp_component.tcp_num_links =
mca_btl_tcp_param_register_int("links", 1);
mca_btl_tcp_component.tcp_if_include =
mca_btl_tcp_param_register_string("if_include", "");
mca_btl_tcp_component.tcp_if_exclude =
@ -283,36 +285,51 @@ int mca_btl_tcp_component_close(void)
static int mca_btl_tcp_create(int if_index, const char* if_name)
{
struct mca_btl_tcp_module_t* btl = (struct mca_btl_tcp_module_t *)malloc(sizeof(mca_btl_tcp_module_t));
struct mca_btl_tcp_module_t* btl;
char param[256];
if(NULL == btl)
return OMPI_ERR_OUT_OF_RESOURCE;
memcpy(btl, &mca_btl_tcp_module, sizeof(mca_btl_tcp_module));
OBJ_CONSTRUCT(&btl->tcp_endpoints, opal_list_t);
mca_btl_tcp_component.tcp_btls[mca_btl_tcp_component.tcp_num_btls++] = btl;
int i;
/* initialize the btl */
btl->tcp_ifindex = if_index;
for( i = 0; i < (int)mca_btl_tcp_component.tcp_num_links; i++ ) {
btl = (struct mca_btl_tcp_module_t *)malloc(sizeof(mca_btl_tcp_module_t));
if(NULL == btl)
return OMPI_ERR_OUT_OF_RESOURCE;
memcpy(btl, &mca_btl_tcp_module, sizeof(mca_btl_tcp_module));
OBJ_CONSTRUCT(&btl->tcp_endpoints, opal_list_t);
mca_btl_tcp_component.tcp_btls[mca_btl_tcp_component.tcp_num_btls++] = btl;
/* initialize the btl */
btl->tcp_ifindex = if_index;
#if MCA_BTL_TCP_STATISTICS
btl->tcp_bytes_recv = 0;
btl->tcp_bytes_sent = 0;
btl->tcp_send_handler = 0;
btl->tcp_bytes_recv = 0;
btl->tcp_bytes_sent = 0;
btl->tcp_send_handler = 0;
#endif
opal_ifindextoaddr(if_index, (struct sockaddr*)&btl->tcp_ifaddr, sizeof(btl->tcp_ifaddr));
opal_ifindextomask(if_index, (struct sockaddr*)&btl->tcp_ifmask, sizeof(btl->tcp_ifmask));
opal_ifindextoaddr(if_index, (struct sockaddr*)&btl->tcp_ifaddr, sizeof(btl->tcp_ifaddr));
opal_ifindextomask(if_index, (struct sockaddr*)&btl->tcp_ifmask, sizeof(btl->tcp_ifmask));
/* allow user to specify interface bandwidth */
sprintf(param, "bandwidth_%s", if_name);
btl->super.btl_bandwidth = mca_btl_tcp_param_register_int(param, 0);
/* allow user to specify interface bandwidth */
sprintf(param, "bandwidth_%s", if_name);
btl->super.btl_bandwidth = mca_btl_tcp_param_register_int(param, 0);
/* allow user to override/specify latency ranking */
sprintf(param, "latency_%s", if_name);
btl->super.btl_latency = mca_btl_tcp_param_register_int(param, 0);
/* allow user to override/specify latency ranking */
sprintf(param, "latency_%s", if_name);
btl->super.btl_latency = mca_btl_tcp_param_register_int(param, 0);
if( i > 0 ) {
btl->super.btl_bandwidth >>= 1;
btl->super.btl_latency <<= 1;
}
/* allow user to specify interface bandwidth */
sprintf(param, "bandwidth_%s:%d", if_name, i);
btl->super.btl_bandwidth = mca_btl_tcp_param_register_int(param, btl->super.btl_bandwidth);
/* allow user to override/specify latency ranking */
sprintf(param, "latency_%s:%d", if_name, i);
btl->super.btl_latency = mca_btl_tcp_param_register_int(param, btl->super.btl_latency);
#if 0 && OMPI_ENABLE_DEBUG
BTL_OUTPUT(("interface: %s bandwidth %d latency %d",
if_name, btl->super.btl_bandwidth, btl->super.btl_latency));
BTL_OUTPUT(("interface %s instance %i: bandwidth %d latency %d\n", if_name, i,
btl->super.btl_bandwidth, btl->super.btl_latency));
#endif
}
return OMPI_SUCCESS;
}
@ -335,7 +352,8 @@ static int mca_btl_tcp_component_create_instances(void)
return OMPI_ERROR;
/* allocate memory for btls */
mca_btl_tcp_component.tcp_btls = (mca_btl_tcp_module_t **)malloc(if_count * sizeof(mca_btl_tcp_module_t*));
mca_btl_tcp_component.tcp_btls = (mca_btl_tcp_module_t**)malloc(mca_btl_tcp_component.tcp_num_links *
if_count * sizeof(mca_btl_tcp_module_t*));
if(NULL == mca_btl_tcp_component.tcp_btls)
return OMPI_ERR_OUT_OF_RESOURCE;
@ -396,9 +414,8 @@ static int mca_btl_tcp_component_create_listen(void)
/* create a listen socket for incoming connections */
mca_btl_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0);
if(mca_btl_tcp_component.tcp_listen_sd < 0) {
BTL_ERROR(("socket() failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
BTL_ERROR(("socket() failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
return OMPI_ERROR;
}
mca_btl_tcp_set_socket_options(mca_btl_tcp_component.tcp_listen_sd);
@ -410,9 +427,8 @@ static int mca_btl_tcp_component_create_listen(void)
inaddr.sin_port = 0;
if(bind(mca_btl_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
BTL_ERROR(("bind() failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
BTL_ERROR(("bind() failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
return OMPI_ERROR;
}
@ -420,8 +436,7 @@ static int mca_btl_tcp_component_create_listen(void)
addrlen = sizeof(struct sockaddr_in);
if(getsockname(mca_btl_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
BTL_ERROR(("getsockname() failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
return OMPI_ERROR;
}
mca_btl_tcp_component.tcp_listen_port = inaddr.sin_port;
@ -429,23 +444,20 @@ static int mca_btl_tcp_component_create_listen(void)
/* setup listen backlog to maximum allowed by kernel */
if(listen(mca_btl_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) {
BTL_ERROR(("listen() failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
return OMPI_ERROR;
}
/* set socket up to be non-blocking, otherwise accept could block */
if((flags = fcntl(mca_btl_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
return OMPI_ERROR;
} else {
flags |= O_NONBLOCK;
if(fcntl(mca_btl_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) {
BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
return OMPI_ERROR;
}
}
@ -468,16 +480,20 @@ static int mca_btl_tcp_component_create_listen(void)
static int mca_btl_tcp_component_exchange(void)
{
int rc=0;
size_t i=0;
size_t size = mca_btl_tcp_component.tcp_num_btls * sizeof(mca_btl_tcp_addr_t);
int rc = 0, index;
size_t i = 0, j;
size_t size = mca_btl_tcp_component.tcp_num_links * mca_btl_tcp_component.tcp_num_btls
* sizeof(mca_btl_tcp_addr_t);
if(mca_btl_tcp_component.tcp_num_btls != 0) {
mca_btl_tcp_addr_t *addrs = (mca_btl_tcp_addr_t *)malloc(size);
for(i=0; i<mca_btl_tcp_component.tcp_num_btls; i++) {
for( i = 0; i < mca_btl_tcp_component.tcp_num_btls; i++ ) {
struct mca_btl_tcp_module_t* btl = mca_btl_tcp_component.tcp_btls[i];
addrs[i].addr_inet = btl->tcp_ifaddr.sin_addr;
addrs[i].addr_port = mca_btl_tcp_component.tcp_listen_port;
addrs[i].addr_inuse = 0;
index = i * mca_btl_tcp_component.tcp_num_links;
for( j = 0; j < mca_btl_tcp_component.tcp_num_links; j++, index++ ) {
addrs[index].addr_inet = btl->tcp_ifaddr.sin_addr;
addrs[index].addr_port = mca_btl_tcp_component.tcp_listen_port;
addrs[index].addr_inuse = 0;
}
}
rc = mca_pml_base_modex_send(&mca_btl_tcp_component.super.btl_version, addrs, size);
free(addrs);
@ -577,8 +593,7 @@ static void mca_btl_tcp_component_accept(void)
continue;
if(opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK)
BTL_ERROR(("accept() failed: %s (%d).",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
return;
}
mca_btl_tcp_set_socket_options(sd);
@ -623,21 +638,18 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
/* now set socket up to be non-blocking */
if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
} else {
flags |= O_NONBLOCK;
if(fcntl(sd, F_SETFL, flags) < 0) {
BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
}
}
/* lookup the corresponding process */
btl_proc = mca_btl_tcp_proc_lookup(&guid);
if(NULL == btl_proc) {
BTL_ERROR(("errno=%d",errno));
CLOSE_THE_SOCKET(sd);
return;
}
@ -645,8 +657,7 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
/* lookup peer address */
if(getpeername(sd, (struct sockaddr*)&addr, &addr_len) != 0) {
BTL_ERROR(("getpeername() failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
CLOSE_THE_SOCKET(sd);
return;
}

Просмотреть файл

@ -143,16 +143,14 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
}
#if defined(SO_SNDBUF)
obtlen = sizeof(sndbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
BTL_ERROR(("SO_SNDBUF option: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
}
#else
sndbuf = -1;
@ -161,8 +159,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con
obtlen = sizeof(rcvbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
BTL_ERROR(("SO_RCVBUF option: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
}
#else
rcvbuf = -1;
@ -171,8 +168,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con
obtlen = sizeof(nodelay);
if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
BTL_ERROR(("TCP_NODELAY option: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
}
#else
nodelay = 0;
@ -264,8 +260,7 @@ static int mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpo
if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("send() failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
mca_btl_tcp_endpoint_close(btl_endpoint);
return -1;
}
@ -423,8 +418,7 @@ static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpo
if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("recv() failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
mca_btl_tcp_endpoint_close(btl_endpoint);
return -1;
}
@ -456,7 +450,7 @@ static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_en
/* compare this to the expected values */
if(memcmp(&btl_proc->proc_name, &guid, sizeof(orte_process_name_t)) != 0) {
BTL_ERROR(("received unexpected process identifier [%lu,%lu,%lu]",
ORTE_NAME_ARGS(&guid)));
ORTE_NAME_ARGS(&guid)));
mca_btl_tcp_endpoint_close(btl_endpoint);
return OMPI_ERR_UNREACH;
}
@ -477,24 +471,21 @@ void mca_btl_tcp_set_socket_options(int sd)
optval = 1;
if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
}
#endif
#if defined(SO_SNDBUF)
if(mca_btl_tcp_component.tcp_sndbuf > 0 &&
setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp_component.tcp_sndbuf, sizeof(int)) < 0) {
BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
}
#endif
#if defined(SO_RCVBUF)
if(mca_btl_tcp_component.tcp_rcvbuf > 0 &&
setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp_component.tcp_rcvbuf, sizeof(int)) < 0) {
BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
}
#endif
}
@ -519,7 +510,7 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
btl_endpoint->endpoint_retries++;
return OMPI_ERR_UNREACH;
}
opal_output( 0, "start connection on socket %d\n", btl_endpoint->endpoint_sd );
/* setup socket buffer sizes */
mca_btl_tcp_set_socket_options(btl_endpoint->endpoint_sd);
@ -529,14 +520,12 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
/* setup the socket as non-blocking */
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
} else {
flags |= O_NONBLOCK;
if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0)
BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
strerror(opal_socket_errno), opal_socket_errno));
}
/* start the connect - will likely fail with EINPROGRESS */
@ -550,6 +539,10 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
return OMPI_SUCCESS;
}
BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_ompi,
( "Unable to connect to the peer %s on port %d: %s\n",
inet_ntoa(btl_endpoint->endpoint_addr->addr_inet),
btl_endpoint->endpoint_addr->addr_port, strerror(opal_socket_errno) ) );
mca_btl_tcp_endpoint_close(btl_endpoint);
btl_endpoint->endpoint_retries++;
return OMPI_ERR_UNREACH;
@ -582,9 +575,7 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e
/* check connect completion status */
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
BTL_ERROR(("getsockopt() failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
BTL_ERROR(("getsockopt() failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno));
mca_btl_tcp_endpoint_close(btl_endpoint);
return;
}
@ -593,9 +584,7 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e
return;
}
if(so_error != 0) {
BTL_ERROR(("connect() failed: %s (%d)",
strerror(so_error),
so_error));
BTL_ERROR(("connect() failed: %s (%d)", strerror(so_error), so_error));
mca_btl_tcp_endpoint_close(btl_endpoint);
return;
}
@ -727,8 +716,7 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user)
break;
}
default:
BTL_ERROR(("invalid connection state (%d)",
btl_endpoint->endpoint_state));
BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
opal_event_del(&btl_endpoint->endpoint_send_event);
break;
}