1
1

Btl tcp: Refactoring non-blocking send/receive function

Moving non-blocking send/receive function to btl_tcp
will help reusing these function where ever needed.
In this case we plan to reuse receive function to
retrive magic string to validate established connection
is from mpi process.

Signed-off-by: Mohan Gandhi <mohgan@amazon.com>
Этот коммит содержится в:
Mohan 2017-07-19 15:09:24 -07:00 коммит произвёл Mohan Gandhi
родитель af85e48dd7
Коммит c30a42917c
3 изменённых файлов: 88 добавлений и 44 удалений

Просмотреть файл

@ -31,12 +31,14 @@
#include "opal/mca/mpool/base/base.h"
#include "opal/mca/mpool/mpool.h"
#include "opal/mca/btl/base/btl_base_error.h"
#include "opal/opal_socket_errno.h"
#include "btl_tcp.h"
#include "btl_tcp_frag.h"
#include "btl_tcp_proc.h"
#include "btl_tcp_endpoint.h"
mca_btl_tcp_module_t mca_btl_tcp_module = {
.super = {
.btl_component = &mca_btl_tcp_component.super,
@ -531,3 +533,64 @@ void mca_btl_tcp_dump(struct mca_btl_base_module_t* base_btl,
}
#endif /* OPAL_ENABLE_DEBUG && WANT_PEER_DUMP */
}
/*
* A blocking recv on a non-blocking socket. Used to receive the small
* amount of connection information that identifies the endpoints
* endpoint.
*/
int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while (cnt < size) {
int retval = recv(sd, ((char *)ptr) + cnt, size - cnt, 0);
/* remote closed connection */
if (0 == retval) {
BTL_ERROR(("remote peer unexpectedly closed connection while I was waiting for blocking message"));
return -1;
}
/* socket is non-blocking so handle errors */
if (retval < 0) {
if (opal_socket_errno != EINTR &&
opal_socket_errno != EAGAIN &&
opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("recv(%d) failed: %s (%d)", sd, strerror(opal_socket_errno), opal_socket_errno));
return -1;
}
continue;
}
cnt += retval;
}
return cnt;
}
/*
* A blocking send on a non-blocking socket. Used to send the small
* amount of connection information that identifies the endpoints
* endpoint.
*/
int mca_btl_tcp_send_blocking(int sd, const void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = send(sd, ((const char *)ptr) + cnt, size - cnt, 0);
if (retval < 0) {
if (opal_socket_errno != EINTR &&
opal_socket_errno != EAGAIN &&
opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("send() failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno));
return -1;
}
continue;
}
cnt += retval;
}
return cnt;
}

Просмотреть файл

@ -351,5 +351,19 @@ mca_btl_tcp_dump(struct mca_btl_base_module_t* btl,
*/
int mca_btl_tcp_ft_event(int state);
/*
* A blocking send on a non-blocking socket. Used to send the small
* amount of connection information that identifies the endpoints
* endpoint.
*/
int mca_btl_tcp_send_blocking(int sd, const void* data, size_t size);
/*
* A blocking recv on a non-blocking socket. Used to receive the small
* amount of connection information that identifies the endpoints
* endpoint.
*/
int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size);
END_C_DECLS
#endif

Просмотреть файл

@ -371,31 +371,18 @@ int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp
/*
* A blocking send on a non-blocking socket. Used to send the small amount of connection
* information that identifies the endpoints endpoint.
* A blocking send on a non-blocking socket. Used to send the small
* amount of connection information that identifies the endpoints endpoint.
*/
static int
mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint,
void* data, size_t size)
const void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = send(btl_endpoint->endpoint_sd, (const char *)ptr+cnt, size-cnt, 0);
if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("send(%d, %p, %lu/%lu) failed: %s (%d)",
btl_endpoint->endpoint_sd, data, cnt, size,
strerror(opal_socket_errno), opal_socket_errno));
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
int ret = mca_btl_tcp_send_blocking(btl_endpoint->endpoint_sd, data, size);
if (ret < 0) {
mca_btl_tcp_endpoint_close(btl_endpoint);
return -1;
}
continue;
}
cnt += retval;
}
return cnt;
return ret;
}
@ -573,31 +560,11 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint
*/
static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = recv(btl_endpoint->endpoint_sd, (char *)ptr+cnt, size-cnt, 0);
/* remote closed connection */
if(retval == 0) {
int ret = mca_btl_tcp_recv_blocking(btl_endpoint->endpoint_sd, data, size);
if (ret <= 0) {
mca_btl_tcp_endpoint_close(btl_endpoint);
return cnt;
}
/* socket is non-blocking so handle errors */
if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("recv(%d, %lu/%lu) failed: %s (%d)",
btl_endpoint->endpoint_sd, cnt, size, strerror(opal_socket_errno), opal_socket_errno));
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
mca_btl_tcp_endpoint_close(btl_endpoint);
return -1;
}
continue;
}
cnt += retval;
}
return cnt;
return ret;
}