Btl Tcp: Updated tcp handshake methods
This commit has two changes 1. Adding magic string during handshake can cause issue when used with older version of MPI. Hence set RCVTIMEO paramter to 2 second 2. Using single call during handshake instead of two calls Signed-off-by: Mohan Gandhi <mohgan@amazon.com>
Этот коммит содержится в:
родитель
e3dfe11da9
Коммит
fc32ae401e
@ -536,9 +536,13 @@ void mca_btl_tcp_dump(struct mca_btl_base_module_t* base_btl,
|
||||
|
||||
|
||||
/*
|
||||
* A blocking recv on a non-blocking socket. Used to receive the small
|
||||
* amount of connection information that identifies the endpoints
|
||||
* endpoint.
|
||||
* A blocking recv for both blocking and non-blocking socket.
|
||||
* Used to receive the small amount of connection information
|
||||
* that identifies the endpoints
|
||||
*
|
||||
* when the socket is blocking (the caller introduces timeout)
|
||||
* which happens during initial handshake otherwise socket is
|
||||
* non-blocking most of the time.
|
||||
*/
|
||||
|
||||
int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size)
|
||||
|
@ -359,9 +359,13 @@ int mca_btl_tcp_ft_event(int state);
|
||||
int mca_btl_tcp_send_blocking(int sd, const void* data, size_t size);
|
||||
|
||||
/*
|
||||
* A blocking recv on a non-blocking socket. Used to receive the small
|
||||
* amount of connection information that identifies the endpoints
|
||||
* endpoint.
|
||||
* A blocking recv for both blocking and non-blocking socket.
|
||||
* Used to receive the small amount of connection information
|
||||
* that identifies the endpoints
|
||||
*
|
||||
* when the socket is blocking (the caller introduces timeout)
|
||||
* which happens during initial handshake otherwise socket is
|
||||
* non-blocking most of the time.
|
||||
*/
|
||||
int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size);
|
||||
|
||||
|
@ -54,6 +54,9 @@
|
||||
#endif
|
||||
#include <ctype.h>
|
||||
#include <limits.h>
|
||||
#ifdef HAVE_SYS_TIME_H
|
||||
#include <sys/time.h>
|
||||
#endif
|
||||
|
||||
#include "opal/mca/event/event.h"
|
||||
#include "opal/util/ethtool.h"
|
||||
@ -1335,26 +1338,47 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
||||
struct sockaddr_storage addr;
|
||||
opal_socklen_t addr_len = sizeof(addr);
|
||||
mca_btl_tcp_proc_t* btl_proc;
|
||||
int retval;
|
||||
bool sockopt = true;
|
||||
size_t retval, len = strlen(mca_btl_tcp_magic_id_string);
|
||||
mca_btl_tcp_endpoint_hs_msg_t hs_msg;
|
||||
struct timeval save, tv;
|
||||
socklen_t rcvtimeo_save_len = sizeof(save);
|
||||
char str[128];
|
||||
size_t len = strlen(mca_btl_tcp_magic_id_string);
|
||||
|
||||
/* Note, Socket will be in blocking mode during intial handshake
|
||||
* hence setting SO_RCVTIMEO to say 2 seconds here to avoid chance
|
||||
* of spin forever if it tries to connect to old version
|
||||
* as older version will send just process id which won't be long enough
|
||||
* to cross sizeof(str) length + process id struct
|
||||
* or when the remote side isn't OMPI where it's not going to send
|
||||
* any data*/
|
||||
|
||||
/* get the current timeout value so we can reset to it */
|
||||
if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &rcvtimeo_save_len)) {
|
||||
if (ENOPROTOOPT == errno) {
|
||||
sockopt = false;
|
||||
} else {
|
||||
opal_output_verbose(20, opal_btl_base_framework.framework_output,
|
||||
"Cannot get current recv timeout value of the socket"
|
||||
"Local_host:%s PID:%d",
|
||||
opal_process_info.nodename, getpid());
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
tv.tv_sec = 2;
|
||||
tv.tv_usec = 0;
|
||||
if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
|
||||
opal_output_verbose(20, opal_btl_base_framework.framework_output,
|
||||
"Cannot set new recv timeout value of the socket"
|
||||
"Local_host:%s PID:%d",
|
||||
opal_process_info.nodename, getpid());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_RELEASE(event);
|
||||
|
||||
/* Receive the magic string */
|
||||
assert(len < sizeof(str));
|
||||
str[0] = '\0';
|
||||
/* TODO: recv_blocking() will block forever, without timeout
|
||||
* There is chance of spin forever if it tries to connect to old version
|
||||
* as older version will send just process id which won't be long enough
|
||||
* to cross sizeof(str) length. Probably better to have a timeout
|
||||
* (say, 10 seconds) for receiving the magic string, and giving up after that.
|
||||
* Will be reiterating this logic in my next iteration
|
||||
*/
|
||||
retval = mca_btl_tcp_recv_blocking(sd, str, len);
|
||||
if (retval > 0) {
|
||||
str[retval] = '\0';
|
||||
}
|
||||
retval = mca_btl_tcp_recv_blocking(sd, (void *)&hs_msg, sizeof(hs_msg));
|
||||
guid = hs_msg.guid;
|
||||
|
||||
/* An unknown process attempted to connect to Open MPI via TCP.
|
||||
* Open MPI uses a "magic" string to trivially verify that the connecting
|
||||
@ -1367,38 +1391,42 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
||||
* This attempted connection will be ignored; your MPI job may or may not
|
||||
* continue properly.
|
||||
*/
|
||||
if (retval != (int) len) {
|
||||
opal_output_verbose(20, opal_btl_base_framework.framework_output,
|
||||
"server did not receive magic string. "
|
||||
if (sizeof(hs_msg) != retval) {
|
||||
opal_output_verbose(20, opal_btl_base_framework.framework_output,
|
||||
"server did not receive entire connect ACK "
|
||||
"Local_host:%s PID:%d Role:%s String_received:%s Test_fail:%s",
|
||||
opal_process_info.nodename,
|
||||
getpid(), "server",
|
||||
(len > 0) ? str : "<nothing>", "string length");
|
||||
(retval > 0) ? hs_msg.magic_id : "<nothing>",
|
||||
"handshake message length");
|
||||
|
||||
/* The other side probably isn't OMPI, so just hang up */
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
return;
|
||||
/* The other side probably isn't OMPI, so just hang up */
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
return;
|
||||
}
|
||||
if (0 != strncmp(str, mca_btl_tcp_magic_id_string, len)) {
|
||||
if (0 != strncmp(hs_msg.magic_id, mca_btl_tcp_magic_id_string, len)) {
|
||||
opal_output_verbose(20, opal_btl_base_framework.framework_output,
|
||||
"server did not receive right magic string. "
|
||||
"Local_host:%s PID:%d Role:%s String_received:%s Test_fail:%s",
|
||||
opal_process_info.nodename,
|
||||
getpid(), "server", str,
|
||||
getpid(), "server", hs_msg.magic_id,
|
||||
"string value");
|
||||
/* The other side probably isn't OMPI, so just hang up */
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
return;
|
||||
}
|
||||
/* recv the process identifier */
|
||||
retval = mca_btl_tcp_recv_blocking(sd, (char *)&guid, sizeof(guid));
|
||||
if(retval != sizeof(guid)) {
|
||||
opal_show_help("help-mpi-btl-tcp.txt", "server did not get guid",
|
||||
true, opal_process_info.nodename,
|
||||
getpid());
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
return;
|
||||
}
|
||||
|
||||
if (sockopt) {
|
||||
/* reset RECVTIMEO option to its original state */
|
||||
if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sizeof(save))) {
|
||||
opal_output_verbose(20, opal_btl_base_framework.framework_output,
|
||||
"Cannot reset recv timeout value"
|
||||
"Local_host:%s PID:%d",
|
||||
opal_process_info.nodename, getpid());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_PROCESS_NAME_NTOH(guid);
|
||||
|
||||
/* now set socket up to be non-blocking */
|
||||
|
@ -65,7 +65,8 @@
|
||||
/*
|
||||
* Magic ID string send during connect/accept handshake
|
||||
*/
|
||||
const char mca_btl_tcp_magic_id_string[] = "OPAL-TCP-BTL";
|
||||
|
||||
const char mca_btl_tcp_magic_id_string[MCA_BTL_TCP_MAGIC_STRING_LENGTH] = "OPAL-TCP-BTL";
|
||||
|
||||
/*
|
||||
* Initialize state of the endpoint instance.
|
||||
@ -389,39 +390,28 @@ mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint,
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Send the globally unique identifier for this process to a endpoint on
|
||||
* a newly connected socket.
|
||||
*/
|
||||
|
||||
static int mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
|
||||
static int
|
||||
mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
|
||||
{
|
||||
|
||||
opal_process_name_t guid = opal_proc_local_get()->proc_name;
|
||||
|
||||
int len = (int) strlen(mca_btl_tcp_magic_id_string);
|
||||
|
||||
/* send magic string to the remote endpoint, identifying me as a
|
||||
fellow Open MPI TCP BTL */
|
||||
if (mca_btl_tcp_endpoint_send_blocking(btl_endpoint,
|
||||
mca_btl_tcp_magic_id_string,
|
||||
len) != len) {
|
||||
opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
|
||||
true, opal_process_info.nodename,
|
||||
getpid(),
|
||||
"failed to send magic ID string");
|
||||
return OPAL_ERR_UNREACH;
|
||||
}
|
||||
|
||||
/* send process identifier to remote endpoint */
|
||||
OPAL_PROCESS_NAME_HTON(guid);
|
||||
if(mca_btl_tcp_endpoint_send_blocking(btl_endpoint, &guid, sizeof(guid)) != sizeof(guid)) {
|
||||
opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
|
||||
|
||||
mca_btl_tcp_endpoint_hs_msg_t hs_msg;
|
||||
strcpy(hs_msg.magic_id, mca_btl_tcp_magic_id_string);
|
||||
hs_msg.guid = guid;
|
||||
|
||||
if(sizeof(hs_msg) !=
|
||||
mca_btl_tcp_endpoint_send_blocking(btl_endpoint,
|
||||
&hs_msg, sizeof(hs_msg))) {
|
||||
opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
|
||||
true, opal_process_info.nodename,
|
||||
getpid(),
|
||||
"sending connect ACK failed");
|
||||
return OPAL_ERR_UNREACH;
|
||||
sizeof(hs_msg),
|
||||
"connect ACK failed to send magic-id and guid");
|
||||
return OPAL_ERR_UNREACH;
|
||||
}
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
@ -601,37 +591,15 @@ static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpo
|
||||
*/
|
||||
static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
|
||||
{
|
||||
size_t s, len = strlen(mca_btl_tcp_magic_id_string);;
|
||||
opal_process_name_t guid;
|
||||
size_t retval, len = strlen(mca_btl_tcp_magic_id_string);;
|
||||
mca_btl_tcp_proc_t* btl_proc = btl_endpoint->endpoint_proc;
|
||||
char msg[1024];
|
||||
opal_process_name_t guid;
|
||||
|
||||
/* First get magic string indicating that the connector is an Open MPI TCP BTL */
|
||||
assert(len < sizeof(msg));
|
||||
msg[0] = '\0';
|
||||
s = mca_btl_tcp_endpoint_recv_blocking(btl_endpoint, msg, len);
|
||||
if (s > 0) {
|
||||
msg[s] = '\0';
|
||||
}
|
||||
if (s != len) {
|
||||
opal_show_help("help-mpi-btl-tcp.txt", "did not receive magic string",
|
||||
true, opal_process_info.nodename,
|
||||
getpid(), "client",
|
||||
(s > 0) ? msg : "<nothing>", "string length");
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
if (0 != strncmp(msg, mca_btl_tcp_magic_id_string, len)) {
|
||||
opal_show_help("help-mpi-btl-tcp.txt", "did not receive magic string",
|
||||
true, opal_process_info.nodename,
|
||||
getpid(), "client", msg,
|
||||
"string value");
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
mca_btl_tcp_endpoint_hs_msg_t hs_msg;
|
||||
retval = mca_btl_tcp_endpoint_recv_blocking(btl_endpoint, &hs_msg, sizeof(hs_msg));
|
||||
|
||||
s = mca_btl_tcp_endpoint_recv_blocking(btl_endpoint,
|
||||
&guid, sizeof(opal_process_name_t));
|
||||
if (s != sizeof(opal_process_name_t)) {
|
||||
if (0 == s) {
|
||||
if (sizeof(hs_msg) != retval) {
|
||||
if (0 == retval) {
|
||||
/* If we get zero bytes, the peer closed the socket. This
|
||||
can happen when the two peers started the connection
|
||||
protocol simultaneously. Just report the problem
|
||||
@ -640,10 +608,19 @@ static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_en
|
||||
}
|
||||
opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
|
||||
true, opal_process_info.nodename,
|
||||
getpid(),
|
||||
"did not receive entire connect ACK from peer");
|
||||
return OPAL_ERR_UNREACH;
|
||||
getpid(), "did not receive entire connect ACK from peer");
|
||||
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
if (0 != strncmp(hs_msg.magic_id, mca_btl_tcp_magic_id_string, len)) {
|
||||
opal_show_help("help-mpi-btl-tcp.txt", "server did not receive magic string",
|
||||
true, opal_process_info.nodename,
|
||||
getpid(), "client", hs_msg.magic_id,
|
||||
"string value");
|
||||
return OPAL_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
guid = hs_msg.guid;
|
||||
OPAL_PROCESS_NAME_NTOH(guid);
|
||||
/* compare this to the expected values */
|
||||
/* TODO: this deserve a little bit more thinking as we are not supposed
|
||||
@ -757,7 +734,6 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
|
||||
"btl:tcp: connect() to %s:%d completed",
|
||||
opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
|
||||
ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
|
||||
|
||||
/* send our globally unique process identifier to the endpoint */
|
||||
if((rc = mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint)) == OPAL_SUCCESS) {
|
||||
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
|
||||
@ -917,6 +893,7 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
|
||||
the other end, and something bad has probably
|
||||
happened. */
|
||||
mca_btl_tcp_module_t *m = btl_endpoint->endpoint_btl;
|
||||
|
||||
/* Fail up to the PML */
|
||||
if (NULL != m->tcp_error_cb) {
|
||||
m->tcp_error_cb((mca_btl_base_module_t*) m, MCA_BTL_ERROR_FLAGS_FATAL,
|
||||
@ -1012,9 +989,7 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user)
|
||||
|
||||
switch(btl_endpoint->endpoint_state) {
|
||||
case MCA_BTL_TCP_CONNECTING:
|
||||
if (OPAL_SUCCESS != mca_btl_tcp_endpoint_complete_connect(btl_endpoint)) {
|
||||
mca_btl_tcp_module_t *m = btl_endpoint->endpoint_btl;
|
||||
}
|
||||
mca_btl_tcp_endpoint_complete_connect(btl_endpoint);
|
||||
break;
|
||||
case MCA_BTL_TCP_CONNECTED:
|
||||
/* complete the current send */
|
||||
|
@ -26,7 +26,7 @@
|
||||
BEGIN_C_DECLS
|
||||
|
||||
#define MCA_BTL_TCP_ENDPOINT_CACHE 1
|
||||
|
||||
#define MCA_BTL_TCP_MAGIC_STRING_LENGTH 16
|
||||
/**
|
||||
* State of TCP endpoint connection.
|
||||
*/
|
||||
@ -76,7 +76,12 @@ typedef mca_btl_base_endpoint_t mca_btl_tcp_endpoint_t;
|
||||
OBJ_CLASS_DECLARATION(mca_btl_tcp_endpoint_t);
|
||||
|
||||
/* Magic socket handshake string */
|
||||
extern const char mca_btl_tcp_magic_id_string[];
|
||||
extern const char mca_btl_tcp_magic_id_string[MCA_BTL_TCP_MAGIC_STRING_LENGTH];
|
||||
|
||||
typedef struct {
|
||||
opal_process_name_t guid;
|
||||
char magic_id[MCA_BTL_TCP_MAGIC_STRING_LENGTH];
|
||||
} mca_btl_tcp_endpoint_hs_msg_t;
|
||||
|
||||
void mca_btl_tcp_set_socket_options(int sd);
|
||||
void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t*);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user