From 8c9e4baa473ceda97d3872884c60ae0811a1dd33 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Tue, 20 Mar 2007 11:50:17 +0000 Subject: [PATCH] Add multi-link capabilities to the TCP BTL. This is useful for systems where the latency is high and the network relatively fast. This will allow for more kernel level buffering, which allow overlap between system calls and communications. Somehow, even on fast clusters there is an improvement (non significant). This patch create multiple modules for the same device, which in turn will create multiple sockets between the peers. By default the number of BTL by device is set to 1, so there is no fundamental difference with the current version. Change the value of btl_tcp_links to enable multiple links between peers. This commit was SVN r14076. --- ompi/mca/btl/tcp/btl_tcp.c | 2 +- ompi/mca/btl/tcp/btl_tcp.h | 1 + ompi/mca/btl/tcp/btl_tcp_component.c | 117 +++++++++++++++------------ ompi/mca/btl/tcp/btl_tcp_endpoint.c | 52 +++++------- 4 files changed, 86 insertions(+), 86 deletions(-) diff --git a/ompi/mca/btl/tcp/btl_tcp.c b/ompi/mca/btl/tcp/btl_tcp.c index 9e9cf9762c..144f5087b8 100644 --- a/ompi/mca/btl/tcp/btl_tcp.c +++ b/ompi/mca/btl/tcp/btl_tcp.c @@ -121,8 +121,8 @@ int mca_btl_tcp_add_procs( tcp_endpoint->endpoint_btl = tcp_btl; rc = mca_btl_tcp_proc_insert(tcp_proc, tcp_endpoint); if(rc != OMPI_SUCCESS) { - OBJ_RELEASE(tcp_endpoint); OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock); + OBJ_RELEASE(tcp_endpoint); continue; } diff --git a/ompi/mca/btl/tcp/btl_tcp.h b/ompi/mca/btl/tcp/btl_tcp.h index 5c228ef9db..d668f871a1 100644 --- a/ompi/mca/btl/tcp/btl_tcp.h +++ b/ompi/mca/btl/tcp/btl_tcp.h @@ -59,6 +59,7 @@ extern "C" { struct mca_btl_tcp_component_t { mca_btl_base_component_1_0_1_t super; /**< base BTL component */ uint32_t tcp_num_btls; /**< number of hcas available to the TCP component */ + uint32_t tcp_num_links; /**< number of logical links per physical device */ struct mca_btl_tcp_module_t **tcp_btls; /**< array of available BTL modules */ struct mca_btl_tcp_proc_t* tcp_local; /**< local proc struct */ int tcp_free_list_num; /**< initial size of free lists */ diff --git a/ompi/mca/btl/tcp/btl_tcp_component.c b/ompi/mca/btl/tcp/btl_tcp_component.c index c4ac2e5cc3..971cd3cc80 100644 --- a/ompi/mca/btl/tcp/btl_tcp_component.c +++ b/ompi/mca/btl/tcp/btl_tcp_component.c @@ -169,7 +169,7 @@ int mca_btl_tcp_component_open(void) { #ifdef __WINDOWS__ WSADATA win_sock_data; - if (WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0) { + if( WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0 ) { BTL_ERROR(("failed to initialise windows sockets:%d", WSAGetLastError())); return OMPI_ERROR; } @@ -190,6 +190,8 @@ int mca_btl_tcp_component_open(void) opal_hash_table_init(&mca_btl_tcp_component.tcp_procs, 256); /* register TCP component parameters */ + mca_btl_tcp_component.tcp_num_links = + mca_btl_tcp_param_register_int("links", 1); mca_btl_tcp_component.tcp_if_include = mca_btl_tcp_param_register_string("if_include", ""); mca_btl_tcp_component.tcp_if_exclude = @@ -283,36 +285,51 @@ int mca_btl_tcp_component_close(void) static int mca_btl_tcp_create(int if_index, const char* if_name) { - struct mca_btl_tcp_module_t* btl = (struct mca_btl_tcp_module_t *)malloc(sizeof(mca_btl_tcp_module_t)); + struct mca_btl_tcp_module_t* btl; char param[256]; - if(NULL == btl) - return OMPI_ERR_OUT_OF_RESOURCE; - memcpy(btl, &mca_btl_tcp_module, sizeof(mca_btl_tcp_module)); - OBJ_CONSTRUCT(&btl->tcp_endpoints, opal_list_t); - mca_btl_tcp_component.tcp_btls[mca_btl_tcp_component.tcp_num_btls++] = btl; + int i; - /* initialize the btl */ - btl->tcp_ifindex = if_index; + for( i = 0; i < (int)mca_btl_tcp_component.tcp_num_links; i++ ) { + btl = (struct mca_btl_tcp_module_t *)malloc(sizeof(mca_btl_tcp_module_t)); + if(NULL == btl) + return OMPI_ERR_OUT_OF_RESOURCE; + memcpy(btl, &mca_btl_tcp_module, sizeof(mca_btl_tcp_module)); + OBJ_CONSTRUCT(&btl->tcp_endpoints, opal_list_t); + mca_btl_tcp_component.tcp_btls[mca_btl_tcp_component.tcp_num_btls++] = btl; + + /* initialize the btl */ + btl->tcp_ifindex = if_index; #if MCA_BTL_TCP_STATISTICS - btl->tcp_bytes_recv = 0; - btl->tcp_bytes_sent = 0; - btl->tcp_send_handler = 0; + btl->tcp_bytes_recv = 0; + btl->tcp_bytes_sent = 0; + btl->tcp_send_handler = 0; #endif - opal_ifindextoaddr(if_index, (struct sockaddr*)&btl->tcp_ifaddr, sizeof(btl->tcp_ifaddr)); - opal_ifindextomask(if_index, (struct sockaddr*)&btl->tcp_ifmask, sizeof(btl->tcp_ifmask)); + opal_ifindextoaddr(if_index, (struct sockaddr*)&btl->tcp_ifaddr, sizeof(btl->tcp_ifaddr)); + opal_ifindextomask(if_index, (struct sockaddr*)&btl->tcp_ifmask, sizeof(btl->tcp_ifmask)); - /* allow user to specify interface bandwidth */ - sprintf(param, "bandwidth_%s", if_name); - btl->super.btl_bandwidth = mca_btl_tcp_param_register_int(param, 0); + /* allow user to specify interface bandwidth */ + sprintf(param, "bandwidth_%s", if_name); + btl->super.btl_bandwidth = mca_btl_tcp_param_register_int(param, 0); - /* allow user to override/specify latency ranking */ - sprintf(param, "latency_%s", if_name); - btl->super.btl_latency = mca_btl_tcp_param_register_int(param, 0); + /* allow user to override/specify latency ranking */ + sprintf(param, "latency_%s", if_name); + btl->super.btl_latency = mca_btl_tcp_param_register_int(param, 0); + if( i > 0 ) { + btl->super.btl_bandwidth >>= 1; + btl->super.btl_latency <<= 1; + } + /* allow user to specify interface bandwidth */ + sprintf(param, "bandwidth_%s:%d", if_name, i); + btl->super.btl_bandwidth = mca_btl_tcp_param_register_int(param, btl->super.btl_bandwidth); + /* allow user to override/specify latency ranking */ + sprintf(param, "latency_%s:%d", if_name, i); + btl->super.btl_latency = mca_btl_tcp_param_register_int(param, btl->super.btl_latency); #if 0 && OMPI_ENABLE_DEBUG - BTL_OUTPUT(("interface: %s bandwidth %d latency %d", - if_name, btl->super.btl_bandwidth, btl->super.btl_latency)); + BTL_OUTPUT(("interface %s instance %i: bandwidth %d latency %d\n", if_name, i, + btl->super.btl_bandwidth, btl->super.btl_latency)); #endif + } return OMPI_SUCCESS; } @@ -335,7 +352,8 @@ static int mca_btl_tcp_component_create_instances(void) return OMPI_ERROR; /* allocate memory for btls */ - mca_btl_tcp_component.tcp_btls = (mca_btl_tcp_module_t **)malloc(if_count * sizeof(mca_btl_tcp_module_t*)); + mca_btl_tcp_component.tcp_btls = (mca_btl_tcp_module_t**)malloc(mca_btl_tcp_component.tcp_num_links * + if_count * sizeof(mca_btl_tcp_module_t*)); if(NULL == mca_btl_tcp_component.tcp_btls) return OMPI_ERR_OUT_OF_RESOURCE; @@ -396,9 +414,8 @@ static int mca_btl_tcp_component_create_listen(void) /* create a listen socket for incoming connections */ mca_btl_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0); if(mca_btl_tcp_component.tcp_listen_sd < 0) { - BTL_ERROR(("socket() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + BTL_ERROR(("socket() failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } mca_btl_tcp_set_socket_options(mca_btl_tcp_component.tcp_listen_sd); @@ -410,9 +427,8 @@ static int mca_btl_tcp_component_create_listen(void) inaddr.sin_port = 0; if(bind(mca_btl_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) { - BTL_ERROR(("bind() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + BTL_ERROR(("bind() failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } @@ -420,8 +436,7 @@ static int mca_btl_tcp_component_create_listen(void) addrlen = sizeof(struct sockaddr_in); if(getsockname(mca_btl_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) { BTL_ERROR(("getsockname() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } mca_btl_tcp_component.tcp_listen_port = inaddr.sin_port; @@ -429,23 +444,20 @@ static int mca_btl_tcp_component_create_listen(void) /* setup listen backlog to maximum allowed by kernel */ if(listen(mca_btl_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) { BTL_ERROR(("listen() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } /* set socket up to be non-blocking, otherwise accept could block */ if((flags = fcntl(mca_btl_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) { BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } else { flags |= O_NONBLOCK; if(fcntl(mca_btl_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) { BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } } @@ -468,16 +480,20 @@ static int mca_btl_tcp_component_create_listen(void) static int mca_btl_tcp_component_exchange(void) { - int rc=0; - size_t i=0; - size_t size = mca_btl_tcp_component.tcp_num_btls * sizeof(mca_btl_tcp_addr_t); + int rc = 0, index; + size_t i = 0, j; + size_t size = mca_btl_tcp_component.tcp_num_links * mca_btl_tcp_component.tcp_num_btls + * sizeof(mca_btl_tcp_addr_t); if(mca_btl_tcp_component.tcp_num_btls != 0) { mca_btl_tcp_addr_t *addrs = (mca_btl_tcp_addr_t *)malloc(size); - for(i=0; itcp_ifaddr.sin_addr; - addrs[i].addr_port = mca_btl_tcp_component.tcp_listen_port; - addrs[i].addr_inuse = 0; + index = i * mca_btl_tcp_component.tcp_num_links; + for( j = 0; j < mca_btl_tcp_component.tcp_num_links; j++, index++ ) { + addrs[index].addr_inet = btl->tcp_ifaddr.sin_addr; + addrs[index].addr_port = mca_btl_tcp_component.tcp_listen_port; + addrs[index].addr_inuse = 0; + } } rc = mca_pml_base_modex_send(&mca_btl_tcp_component.super.btl_version, addrs, size); free(addrs); @@ -577,8 +593,7 @@ static void mca_btl_tcp_component_accept(void) continue; if(opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) BTL_ERROR(("accept() failed: %s (%d).", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); return; } mca_btl_tcp_set_socket_options(sd); @@ -623,21 +638,18 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user) /* now set socket up to be non-blocking */ if((flags = fcntl(sd, F_GETFL, 0)) < 0) { BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } else { flags |= O_NONBLOCK; if(fcntl(sd, F_SETFL, flags) < 0) { BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } } /* lookup the corresponding process */ btl_proc = mca_btl_tcp_proc_lookup(&guid); if(NULL == btl_proc) { - BTL_ERROR(("errno=%d",errno)); CLOSE_THE_SOCKET(sd); return; } @@ -645,8 +657,7 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user) /* lookup peer address */ if(getpeername(sd, (struct sockaddr*)&addr, &addr_len) != 0) { BTL_ERROR(("getpeername() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); CLOSE_THE_SOCKET(sd); return; } diff --git a/ompi/mca/btl/tcp/btl_tcp_endpoint.c b/ompi/mca/btl/tcp/btl_tcp_endpoint.c index 0636ff2470..28015c2c40 100644 --- a/ompi/mca/btl/tcp/btl_tcp_endpoint.c +++ b/ompi/mca/btl/tcp/btl_tcp_endpoint.c @@ -143,16 +143,14 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) { BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #if defined(SO_SNDBUF) obtlen = sizeof(sndbuf); if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) { BTL_ERROR(("SO_SNDBUF option: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #else sndbuf = -1; @@ -161,8 +159,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con obtlen = sizeof(rcvbuf); if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) { BTL_ERROR(("SO_RCVBUF option: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #else rcvbuf = -1; @@ -171,8 +168,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con obtlen = sizeof(nodelay); if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) { BTL_ERROR(("TCP_NODELAY option: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #else nodelay = 0; @@ -264,8 +260,7 @@ static int mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpo if(retval < 0) { if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { BTL_ERROR(("send() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); mca_btl_tcp_endpoint_close(btl_endpoint); return -1; } @@ -423,8 +418,7 @@ static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpo if(retval < 0) { if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { BTL_ERROR(("recv() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); mca_btl_tcp_endpoint_close(btl_endpoint); return -1; } @@ -456,7 +450,7 @@ static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_en /* compare this to the expected values */ if(memcmp(&btl_proc->proc_name, &guid, sizeof(orte_process_name_t)) != 0) { BTL_ERROR(("received unexpected process identifier [%lu,%lu,%lu]", - ORTE_NAME_ARGS(&guid))); + ORTE_NAME_ARGS(&guid))); mca_btl_tcp_endpoint_close(btl_endpoint); return OMPI_ERR_UNREACH; } @@ -477,24 +471,21 @@ void mca_btl_tcp_set_socket_options(int sd) optval = 1; if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) { BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #endif #if defined(SO_SNDBUF) if(mca_btl_tcp_component.tcp_sndbuf > 0 && setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp_component.tcp_sndbuf, sizeof(int)) < 0) { BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #endif #if defined(SO_RCVBUF) if(mca_btl_tcp_component.tcp_rcvbuf > 0 && setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp_component.tcp_rcvbuf, sizeof(int)) < 0) { BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #endif } @@ -519,7 +510,7 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo btl_endpoint->endpoint_retries++; return OMPI_ERR_UNREACH; } - + opal_output( 0, "start connection on socket %d\n", btl_endpoint->endpoint_sd ); /* setup socket buffer sizes */ mca_btl_tcp_set_socket_options(btl_endpoint->endpoint_sd); @@ -529,14 +520,12 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo /* setup the socket as non-blocking */ if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) { BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } else { flags |= O_NONBLOCK; if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0) BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } /* start the connect - will likely fail with EINPROGRESS */ @@ -550,6 +539,10 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo opal_event_add(&btl_endpoint->endpoint_send_event, 0); return OMPI_SUCCESS; } + BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_ompi, + ( "Unable to connect to the peer %s on port %d: %s\n", + inet_ntoa(btl_endpoint->endpoint_addr->addr_inet), + btl_endpoint->endpoint_addr->addr_port, strerror(opal_socket_errno) ) ); mca_btl_tcp_endpoint_close(btl_endpoint); btl_endpoint->endpoint_retries++; return OMPI_ERR_UNREACH; @@ -582,9 +575,7 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e /* check connect completion status */ if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) { - BTL_ERROR(("getsockopt() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + BTL_ERROR(("getsockopt() failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno)); mca_btl_tcp_endpoint_close(btl_endpoint); return; } @@ -593,9 +584,7 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e return; } if(so_error != 0) { - BTL_ERROR(("connect() failed: %s (%d)", - strerror(so_error), - so_error)); + BTL_ERROR(("connect() failed: %s (%d)", strerror(so_error), so_error)); mca_btl_tcp_endpoint_close(btl_endpoint); return; } @@ -727,8 +716,7 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user) break; } default: - BTL_ERROR(("invalid connection state (%d)", - btl_endpoint->endpoint_state)); + BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state)); opal_event_del(&btl_endpoint->endpoint_send_event); break; }