diff --git a/ompi/mca/btl/tcp/btl_tcp.c b/ompi/mca/btl/tcp/btl_tcp.c index 9e9cf9762c..144f5087b8 100644 --- a/ompi/mca/btl/tcp/btl_tcp.c +++ b/ompi/mca/btl/tcp/btl_tcp.c @@ -121,8 +121,8 @@ int mca_btl_tcp_add_procs( tcp_endpoint->endpoint_btl = tcp_btl; rc = mca_btl_tcp_proc_insert(tcp_proc, tcp_endpoint); if(rc != OMPI_SUCCESS) { - OBJ_RELEASE(tcp_endpoint); OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock); + OBJ_RELEASE(tcp_endpoint); continue; } diff --git a/ompi/mca/btl/tcp/btl_tcp.h b/ompi/mca/btl/tcp/btl_tcp.h index 5c228ef9db..d668f871a1 100644 --- a/ompi/mca/btl/tcp/btl_tcp.h +++ b/ompi/mca/btl/tcp/btl_tcp.h @@ -59,6 +59,7 @@ extern "C" { struct mca_btl_tcp_component_t { mca_btl_base_component_1_0_1_t super; /**< base BTL component */ uint32_t tcp_num_btls; /**< number of hcas available to the TCP component */ + uint32_t tcp_num_links; /**< number of logical links per physical device */ struct mca_btl_tcp_module_t **tcp_btls; /**< array of available BTL modules */ struct mca_btl_tcp_proc_t* tcp_local; /**< local proc struct */ int tcp_free_list_num; /**< initial size of free lists */ diff --git a/ompi/mca/btl/tcp/btl_tcp_component.c b/ompi/mca/btl/tcp/btl_tcp_component.c index c4ac2e5cc3..971cd3cc80 100644 --- a/ompi/mca/btl/tcp/btl_tcp_component.c +++ b/ompi/mca/btl/tcp/btl_tcp_component.c @@ -169,7 +169,7 @@ int mca_btl_tcp_component_open(void) { #ifdef __WINDOWS__ WSADATA win_sock_data; - if (WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0) { + if( WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0 ) { BTL_ERROR(("failed to initialise windows sockets:%d", WSAGetLastError())); return OMPI_ERROR; } @@ -190,6 +190,8 @@ int mca_btl_tcp_component_open(void) opal_hash_table_init(&mca_btl_tcp_component.tcp_procs, 256); /* register TCP component parameters */ + mca_btl_tcp_component.tcp_num_links = + mca_btl_tcp_param_register_int("links", 1); mca_btl_tcp_component.tcp_if_include = mca_btl_tcp_param_register_string("if_include", ""); mca_btl_tcp_component.tcp_if_exclude = @@ -283,36 +285,51 @@ int mca_btl_tcp_component_close(void) static int mca_btl_tcp_create(int if_index, const char* if_name) { - struct mca_btl_tcp_module_t* btl = (struct mca_btl_tcp_module_t *)malloc(sizeof(mca_btl_tcp_module_t)); + struct mca_btl_tcp_module_t* btl; char param[256]; - if(NULL == btl) - return OMPI_ERR_OUT_OF_RESOURCE; - memcpy(btl, &mca_btl_tcp_module, sizeof(mca_btl_tcp_module)); - OBJ_CONSTRUCT(&btl->tcp_endpoints, opal_list_t); - mca_btl_tcp_component.tcp_btls[mca_btl_tcp_component.tcp_num_btls++] = btl; + int i; - /* initialize the btl */ - btl->tcp_ifindex = if_index; + for( i = 0; i < (int)mca_btl_tcp_component.tcp_num_links; i++ ) { + btl = (struct mca_btl_tcp_module_t *)malloc(sizeof(mca_btl_tcp_module_t)); + if(NULL == btl) + return OMPI_ERR_OUT_OF_RESOURCE; + memcpy(btl, &mca_btl_tcp_module, sizeof(mca_btl_tcp_module)); + OBJ_CONSTRUCT(&btl->tcp_endpoints, opal_list_t); + mca_btl_tcp_component.tcp_btls[mca_btl_tcp_component.tcp_num_btls++] = btl; + + /* initialize the btl */ + btl->tcp_ifindex = if_index; #if MCA_BTL_TCP_STATISTICS - btl->tcp_bytes_recv = 0; - btl->tcp_bytes_sent = 0; - btl->tcp_send_handler = 0; + btl->tcp_bytes_recv = 0; + btl->tcp_bytes_sent = 0; + btl->tcp_send_handler = 0; #endif - opal_ifindextoaddr(if_index, (struct sockaddr*)&btl->tcp_ifaddr, sizeof(btl->tcp_ifaddr)); - opal_ifindextomask(if_index, (struct sockaddr*)&btl->tcp_ifmask, sizeof(btl->tcp_ifmask)); + opal_ifindextoaddr(if_index, (struct sockaddr*)&btl->tcp_ifaddr, sizeof(btl->tcp_ifaddr)); + opal_ifindextomask(if_index, (struct sockaddr*)&btl->tcp_ifmask, sizeof(btl->tcp_ifmask)); - /* allow user to specify interface bandwidth */ - sprintf(param, "bandwidth_%s", if_name); - btl->super.btl_bandwidth = mca_btl_tcp_param_register_int(param, 0); + /* allow user to specify interface bandwidth */ + sprintf(param, "bandwidth_%s", if_name); + btl->super.btl_bandwidth = mca_btl_tcp_param_register_int(param, 0); - /* allow user to override/specify latency ranking */ - sprintf(param, "latency_%s", if_name); - btl->super.btl_latency = mca_btl_tcp_param_register_int(param, 0); + /* allow user to override/specify latency ranking */ + sprintf(param, "latency_%s", if_name); + btl->super.btl_latency = mca_btl_tcp_param_register_int(param, 0); + if( i > 0 ) { + btl->super.btl_bandwidth >>= 1; + btl->super.btl_latency <<= 1; + } + /* allow user to specify interface bandwidth */ + sprintf(param, "bandwidth_%s:%d", if_name, i); + btl->super.btl_bandwidth = mca_btl_tcp_param_register_int(param, btl->super.btl_bandwidth); + /* allow user to override/specify latency ranking */ + sprintf(param, "latency_%s:%d", if_name, i); + btl->super.btl_latency = mca_btl_tcp_param_register_int(param, btl->super.btl_latency); #if 0 && OMPI_ENABLE_DEBUG - BTL_OUTPUT(("interface: %s bandwidth %d latency %d", - if_name, btl->super.btl_bandwidth, btl->super.btl_latency)); + BTL_OUTPUT(("interface %s instance %i: bandwidth %d latency %d\n", if_name, i, + btl->super.btl_bandwidth, btl->super.btl_latency)); #endif + } return OMPI_SUCCESS; } @@ -335,7 +352,8 @@ static int mca_btl_tcp_component_create_instances(void) return OMPI_ERROR; /* allocate memory for btls */ - mca_btl_tcp_component.tcp_btls = (mca_btl_tcp_module_t **)malloc(if_count * sizeof(mca_btl_tcp_module_t*)); + mca_btl_tcp_component.tcp_btls = (mca_btl_tcp_module_t**)malloc(mca_btl_tcp_component.tcp_num_links * + if_count * sizeof(mca_btl_tcp_module_t*)); if(NULL == mca_btl_tcp_component.tcp_btls) return OMPI_ERR_OUT_OF_RESOURCE; @@ -396,9 +414,8 @@ static int mca_btl_tcp_component_create_listen(void) /* create a listen socket for incoming connections */ mca_btl_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0); if(mca_btl_tcp_component.tcp_listen_sd < 0) { - BTL_ERROR(("socket() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + BTL_ERROR(("socket() failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } mca_btl_tcp_set_socket_options(mca_btl_tcp_component.tcp_listen_sd); @@ -410,9 +427,8 @@ static int mca_btl_tcp_component_create_listen(void) inaddr.sin_port = 0; if(bind(mca_btl_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) { - BTL_ERROR(("bind() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + BTL_ERROR(("bind() failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } @@ -420,8 +436,7 @@ static int mca_btl_tcp_component_create_listen(void) addrlen = sizeof(struct sockaddr_in); if(getsockname(mca_btl_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) { BTL_ERROR(("getsockname() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } mca_btl_tcp_component.tcp_listen_port = inaddr.sin_port; @@ -429,23 +444,20 @@ static int mca_btl_tcp_component_create_listen(void) /* setup listen backlog to maximum allowed by kernel */ if(listen(mca_btl_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) { BTL_ERROR(("listen() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } /* set socket up to be non-blocking, otherwise accept could block */ if((flags = fcntl(mca_btl_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) { BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } else { flags |= O_NONBLOCK; if(fcntl(mca_btl_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) { BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); return OMPI_ERROR; } } @@ -468,16 +480,20 @@ static int mca_btl_tcp_component_create_listen(void) static int mca_btl_tcp_component_exchange(void) { - int rc=0; - size_t i=0; - size_t size = mca_btl_tcp_component.tcp_num_btls * sizeof(mca_btl_tcp_addr_t); + int rc = 0, index; + size_t i = 0, j; + size_t size = mca_btl_tcp_component.tcp_num_links * mca_btl_tcp_component.tcp_num_btls + * sizeof(mca_btl_tcp_addr_t); if(mca_btl_tcp_component.tcp_num_btls != 0) { mca_btl_tcp_addr_t *addrs = (mca_btl_tcp_addr_t *)malloc(size); - for(i=0; itcp_ifaddr.sin_addr; - addrs[i].addr_port = mca_btl_tcp_component.tcp_listen_port; - addrs[i].addr_inuse = 0; + index = i * mca_btl_tcp_component.tcp_num_links; + for( j = 0; j < mca_btl_tcp_component.tcp_num_links; j++, index++ ) { + addrs[index].addr_inet = btl->tcp_ifaddr.sin_addr; + addrs[index].addr_port = mca_btl_tcp_component.tcp_listen_port; + addrs[index].addr_inuse = 0; + } } rc = mca_pml_base_modex_send(&mca_btl_tcp_component.super.btl_version, addrs, size); free(addrs); @@ -577,8 +593,7 @@ static void mca_btl_tcp_component_accept(void) continue; if(opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) BTL_ERROR(("accept() failed: %s (%d).", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); return; } mca_btl_tcp_set_socket_options(sd); @@ -623,21 +638,18 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user) /* now set socket up to be non-blocking */ if((flags = fcntl(sd, F_GETFL, 0)) < 0) { BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } else { flags |= O_NONBLOCK; if(fcntl(sd, F_SETFL, flags) < 0) { BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } } /* lookup the corresponding process */ btl_proc = mca_btl_tcp_proc_lookup(&guid); if(NULL == btl_proc) { - BTL_ERROR(("errno=%d",errno)); CLOSE_THE_SOCKET(sd); return; } @@ -645,8 +657,7 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user) /* lookup peer address */ if(getpeername(sd, (struct sockaddr*)&addr, &addr_len) != 0) { BTL_ERROR(("getpeername() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); CLOSE_THE_SOCKET(sd); return; } diff --git a/ompi/mca/btl/tcp/btl_tcp_endpoint.c b/ompi/mca/btl/tcp/btl_tcp_endpoint.c index 0636ff2470..28015c2c40 100644 --- a/ompi/mca/btl/tcp/btl_tcp_endpoint.c +++ b/ompi/mca/btl/tcp/btl_tcp_endpoint.c @@ -143,16 +143,14 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) { BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #if defined(SO_SNDBUF) obtlen = sizeof(sndbuf); if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) { BTL_ERROR(("SO_SNDBUF option: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #else sndbuf = -1; @@ -161,8 +159,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con obtlen = sizeof(rcvbuf); if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) { BTL_ERROR(("SO_RCVBUF option: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #else rcvbuf = -1; @@ -171,8 +168,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con obtlen = sizeof(nodelay); if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) { BTL_ERROR(("TCP_NODELAY option: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #else nodelay = 0; @@ -264,8 +260,7 @@ static int mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpo if(retval < 0) { if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { BTL_ERROR(("send() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); mca_btl_tcp_endpoint_close(btl_endpoint); return -1; } @@ -423,8 +418,7 @@ static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpo if(retval < 0) { if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { BTL_ERROR(("recv() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); mca_btl_tcp_endpoint_close(btl_endpoint); return -1; } @@ -456,7 +450,7 @@ static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_en /* compare this to the expected values */ if(memcmp(&btl_proc->proc_name, &guid, sizeof(orte_process_name_t)) != 0) { BTL_ERROR(("received unexpected process identifier [%lu,%lu,%lu]", - ORTE_NAME_ARGS(&guid))); + ORTE_NAME_ARGS(&guid))); mca_btl_tcp_endpoint_close(btl_endpoint); return OMPI_ERR_UNREACH; } @@ -477,24 +471,21 @@ void mca_btl_tcp_set_socket_options(int sd) optval = 1; if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) { BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #endif #if defined(SO_SNDBUF) if(mca_btl_tcp_component.tcp_sndbuf > 0 && setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp_component.tcp_sndbuf, sizeof(int)) < 0) { BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #endif #if defined(SO_RCVBUF) if(mca_btl_tcp_component.tcp_rcvbuf > 0 && setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp_component.tcp_rcvbuf, sizeof(int)) < 0) { BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } #endif } @@ -519,7 +510,7 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo btl_endpoint->endpoint_retries++; return OMPI_ERR_UNREACH; } - + opal_output( 0, "start connection on socket %d\n", btl_endpoint->endpoint_sd ); /* setup socket buffer sizes */ mca_btl_tcp_set_socket_options(btl_endpoint->endpoint_sd); @@ -529,14 +520,12 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo /* setup the socket as non-blocking */ if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) { BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } else { flags |= O_NONBLOCK; if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0) BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + strerror(opal_socket_errno), opal_socket_errno)); } /* start the connect - will likely fail with EINPROGRESS */ @@ -550,6 +539,10 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo opal_event_add(&btl_endpoint->endpoint_send_event, 0); return OMPI_SUCCESS; } + BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_ompi, + ( "Unable to connect to the peer %s on port %d: %s\n", + inet_ntoa(btl_endpoint->endpoint_addr->addr_inet), + btl_endpoint->endpoint_addr->addr_port, strerror(opal_socket_errno) ) ); mca_btl_tcp_endpoint_close(btl_endpoint); btl_endpoint->endpoint_retries++; return OMPI_ERR_UNREACH; @@ -582,9 +575,7 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e /* check connect completion status */ if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) { - BTL_ERROR(("getsockopt() failed: %s (%d)", - strerror(opal_socket_errno), - opal_socket_errno)); + BTL_ERROR(("getsockopt() failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno)); mca_btl_tcp_endpoint_close(btl_endpoint); return; } @@ -593,9 +584,7 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e return; } if(so_error != 0) { - BTL_ERROR(("connect() failed: %s (%d)", - strerror(so_error), - so_error)); + BTL_ERROR(("connect() failed: %s (%d)", strerror(so_error), so_error)); mca_btl_tcp_endpoint_close(btl_endpoint); return; } @@ -727,8 +716,7 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user) break; } default: - BTL_ERROR(("invalid connection state (%d)", - btl_endpoint->endpoint_state)); + BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state)); opal_event_del(&btl_endpoint->endpoint_send_event); break; }