From c30a42917cd8a807066da7f8461ab6677c56271e Mon Sep 17 00:00:00 2001 From: Mohan Date: Wed, 19 Jul 2017 15:09:24 -0700 Subject: [PATCH] Btl tcp: Refactoring non-blocking send/receive function Moving non-blocking send/receive function to btl_tcp will help reusing these function where ever needed. In this case we plan to reuse receive function to retrive magic string to validate established connection is from mpi process. Signed-off-by: Mohan Gandhi --- opal/mca/btl/tcp/btl_tcp.c | 63 +++++++++++++++++++++++++++++ opal/mca/btl/tcp/btl_tcp.h | 14 +++++++ opal/mca/btl/tcp/btl_tcp_endpoint.c | 55 +++++-------------------- 3 files changed, 88 insertions(+), 44 deletions(-) diff --git a/opal/mca/btl/tcp/btl_tcp.c b/opal/mca/btl/tcp/btl_tcp.c index 0800327c54..93cffd7b98 100644 --- a/opal/mca/btl/tcp/btl_tcp.c +++ b/opal/mca/btl/tcp/btl_tcp.c @@ -31,12 +31,14 @@ #include "opal/mca/mpool/base/base.h" #include "opal/mca/mpool/mpool.h" #include "opal/mca/btl/base/btl_base_error.h" +#include "opal/opal_socket_errno.h" #include "btl_tcp.h" #include "btl_tcp_frag.h" #include "btl_tcp_proc.h" #include "btl_tcp_endpoint.h" + mca_btl_tcp_module_t mca_btl_tcp_module = { .super = { .btl_component = &mca_btl_tcp_component.super, @@ -531,3 +533,64 @@ void mca_btl_tcp_dump(struct mca_btl_base_module_t* base_btl, } #endif /* OPAL_ENABLE_DEBUG && WANT_PEER_DUMP */ } + + +/* + * A blocking recv on a non-blocking socket. Used to receive the small + * amount of connection information that identifies the endpoints + * endpoint. + */ + +int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size) +{ + unsigned char* ptr = (unsigned char*)data; + size_t cnt = 0; + while (cnt < size) { + int retval = recv(sd, ((char *)ptr) + cnt, size - cnt, 0); + /* remote closed connection */ + if (0 == retval) { + BTL_ERROR(("remote peer unexpectedly closed connection while I was waiting for blocking message")); + return -1; + } + + /* socket is non-blocking so handle errors */ + if (retval < 0) { + if (opal_socket_errno != EINTR && + opal_socket_errno != EAGAIN && + opal_socket_errno != EWOULDBLOCK) { + BTL_ERROR(("recv(%d) failed: %s (%d)", sd, strerror(opal_socket_errno), opal_socket_errno)); + return -1; + } + continue; + } + cnt += retval; + } + return cnt; +} + + +/* + * A blocking send on a non-blocking socket. Used to send the small + * amount of connection information that identifies the endpoints + * endpoint. + */ + +int mca_btl_tcp_send_blocking(int sd, const void* data, size_t size) +{ + unsigned char* ptr = (unsigned char*)data; + size_t cnt = 0; + while(cnt < size) { + int retval = send(sd, ((const char *)ptr) + cnt, size - cnt, 0); + if (retval < 0) { + if (opal_socket_errno != EINTR && + opal_socket_errno != EAGAIN && + opal_socket_errno != EWOULDBLOCK) { + BTL_ERROR(("send() failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno)); + return -1; + } + continue; + } + cnt += retval; + } + return cnt; +} diff --git a/opal/mca/btl/tcp/btl_tcp.h b/opal/mca/btl/tcp/btl_tcp.h index c78bd30174..82e7c6125a 100644 --- a/opal/mca/btl/tcp/btl_tcp.h +++ b/opal/mca/btl/tcp/btl_tcp.h @@ -351,5 +351,19 @@ mca_btl_tcp_dump(struct mca_btl_base_module_t* btl, */ int mca_btl_tcp_ft_event(int state); +/* + * A blocking send on a non-blocking socket. Used to send the small + * amount of connection information that identifies the endpoints + * endpoint. + */ +int mca_btl_tcp_send_blocking(int sd, const void* data, size_t size); + +/* + * A blocking recv on a non-blocking socket. Used to receive the small + * amount of connection information that identifies the endpoints + * endpoint. + */ +int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size); + END_C_DECLS #endif diff --git a/opal/mca/btl/tcp/btl_tcp_endpoint.c b/opal/mca/btl/tcp/btl_tcp_endpoint.c index 9cd97e34b2..a5ab5d50c9 100644 --- a/opal/mca/btl/tcp/btl_tcp_endpoint.c +++ b/opal/mca/btl/tcp/btl_tcp_endpoint.c @@ -371,31 +371,18 @@ int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp /* - * A blocking send on a non-blocking socket. Used to send the small amount of connection - * information that identifies the endpoints endpoint. + * A blocking send on a non-blocking socket. Used to send the small + * amount of connection information that identifies the endpoints endpoint. */ static int mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint, - void* data, size_t size) + const void* data, size_t size) { - unsigned char* ptr = (unsigned char*)data; - size_t cnt = 0; - while(cnt < size) { - int retval = send(btl_endpoint->endpoint_sd, (const char *)ptr+cnt, size-cnt, 0); - if(retval < 0) { - if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { - BTL_ERROR(("send(%d, %p, %lu/%lu) failed: %s (%d)", - btl_endpoint->endpoint_sd, data, cnt, size, - strerror(opal_socket_errno), opal_socket_errno)); - btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED; - mca_btl_tcp_endpoint_close(btl_endpoint); - return -1; - } - continue; - } - cnt += retval; + int ret = mca_btl_tcp_send_blocking(btl_endpoint->endpoint_sd, data, size); + if (ret < 0) { + mca_btl_tcp_endpoint_close(btl_endpoint); } - return cnt; + return ret; } @@ -573,31 +560,11 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint */ static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size) { - unsigned char* ptr = (unsigned char*)data; - size_t cnt = 0; - while(cnt < size) { - int retval = recv(btl_endpoint->endpoint_sd, (char *)ptr+cnt, size-cnt, 0); - - /* remote closed connection */ - if(retval == 0) { - mca_btl_tcp_endpoint_close(btl_endpoint); - return cnt; - } - - /* socket is non-blocking so handle errors */ - if(retval < 0) { - if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { - BTL_ERROR(("recv(%d, %lu/%lu) failed: %s (%d)", - btl_endpoint->endpoint_sd, cnt, size, strerror(opal_socket_errno), opal_socket_errno)); - btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED; - mca_btl_tcp_endpoint_close(btl_endpoint); - return -1; - } - continue; - } - cnt += retval; + int ret = mca_btl_tcp_recv_blocking(btl_endpoint->endpoint_sd, data, size); + if (ret <= 0) { + mca_btl_tcp_endpoint_close(btl_endpoint); } - return cnt; + return ret; }