From 754dc711772e4ff5fe6d7d29216b34ffe5b4da07 Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Thu, 5 May 2005 16:31:40 +0000 Subject: [PATCH] revised oob ping so that it doesnt attempt to setup/use a persistent connection This commit was SVN r5601. --- src/mca/oob/base/base.h | 2 +- src/mca/oob/base/oob_base_ping.c | 21 +++- src/mca/oob/oob.h | 4 +- src/mca/oob/tcp/oob_tcp.c | 126 +++++++++++++++------- src/mca/oob/tcp/oob_tcp.h | 2 +- src/mca/oob/tcp/oob_tcp_hdr.h | 8 +- src/mca/oob/tcp/oob_tcp_peer.c | 33 +++--- src/mca/oob/tcp/oob_tcp_ping.c | 175 ++++++++++++++++++++----------- src/mca/rml/rml.h | 2 +- 9 files changed, 251 insertions(+), 122 deletions(-) diff --git a/src/mca/oob/base/base.h b/src/mca/oob/base/base.h index 4e7f316461..b04f5ed06a 100644 --- a/src/mca/oob/base/base.h +++ b/src/mca/oob/base/base.h @@ -102,7 +102,7 @@ OMPI_DECLSPEC int mca_oob_set_contact_info(const char*); * an error status is returned. */ -OMPI_DECLSPEC int mca_oob_ping(orte_process_name_t* name, struct timeval* tv); +OMPI_DECLSPEC int mca_oob_ping(const char*, struct timeval* tv); /** * A barrier across all processes w/in the same job. diff --git a/src/mca/oob/base/oob_base_ping.c b/src/mca/oob/base/oob_base_ping.c index e3c8e2a8e9..be3f3fe378 100644 --- a/src/mca/oob/base/oob_base_ping.c +++ b/src/mca/oob/base/oob_base_ping.c @@ -24,13 +24,30 @@ #endif #include "include/constants.h" +#include "util/argv.h" #include "mca/ns/ns_types.h" #include "mca/oob/oob.h" #include "mca/oob/base/base.h" -int mca_oob_ping(orte_process_name_t* peer, struct timeval* tv) +int mca_oob_ping(const char* contact_info, struct timeval* tv) { - return(mca_oob.oob_ping(peer, tv)); + orte_process_name_t name; + char** uris; + char** ptr; + int rc; + + if(ORTE_SUCCESS != (rc = mca_oob_parse_contact_info(contact_info, &name, &uris))) { + return rc; + } + + ptr = uris; + while(ptr && *ptr) { + if(ORTE_SUCCESS == (rc = mca_oob.oob_ping(&name, *ptr, tv))) + break; + ptr++; + } + ompi_argv_free(uris); + return rc; } diff --git a/src/mca/oob/oob.h b/src/mca/oob/oob.h index 737e4867f4..a4d12ca411 100644 --- a/src/mca/oob/oob.h +++ b/src/mca/oob/oob.h @@ -70,7 +70,7 @@ typedef char* (*mca_oob_base_module_get_addr_fn_t)(void); * @param addr Address of seed in component specific uri format. */ -typedef int (*mca_oob_base_module_set_addr_fn_t)(const orte_process_name_t*, const char* addr); +typedef int (*mca_oob_base_module_set_addr_fn_t)(const orte_process_name_t*, const char* uri); /** @@ -81,7 +81,7 @@ typedef int (*mca_oob_base_module_set_addr_fn_t)(const orte_process_name_t*, con * @return OMPI error code (<0) or OMPI_SUCCESS */ -typedef int (*mca_oob_base_module_ping_fn_t)(const orte_process_name_t*, const struct timeval* tv); +typedef int (*mca_oob_base_module_ping_fn_t)(const orte_process_name_t*, const char* uri, const struct timeval* tv); /** * Implementation of mca_oob_send(). diff --git a/src/mca/oob/tcp/oob_tcp.c b/src/mca/oob/tcp/oob_tcp.c index 8b880e4352..467494f3bd 100644 --- a/src/mca/oob/tcp/oob_tcp.c +++ b/src/mca/oob/tcp/oob_tcp.c @@ -337,43 +337,42 @@ static int mca_oob_tcp_create_listen(void) /* - * Event callback when there is data available on the registered - * socket to recv. + * Handle probe */ - -static void mca_oob_tcp_recv_handler(int sd, short flags, void* user) +static void mca_oob_tcp_recv_probe(int sd, mca_oob_tcp_hdr_t* hdr) { - orte_process_name_t guid[2]; - mca_oob_tcp_peer_t* peer; - int rc, cmpval; - mca_oob_tcp_event_t* event = (mca_oob_tcp_event_t *)user; + unsigned char* ptr = (unsigned char*)&hdr; + size_t cnt = 0; - /* accept new connections on the listen socket */ - if(mca_oob_tcp_component.tcp_listen_sd == sd) { - mca_oob_tcp_accept(); - return; - } - OBJ_RELEASE(event); - - /* recv the process identifier */ - while((rc = recv(sd, (char *)guid, sizeof(guid), 0)) != sizeof(guid)) { - if(rc >= 0) { - if(mca_oob_tcp_component.tcp_debug > 1) { - ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: peer closed connection", - ORTE_NAME_ARGS(orte_process_info.my_name)); + hdr->msg_dst = hdr->msg_src; + hdr->msg_src = *orte_process_info.my_name; + while(cnt < sizeof(mca_oob_tcp_hdr_t)) { + int retval = send(sd, (char *)ptr+cnt, sizeof(mca_oob_tcp_hdr_t)-cnt, 0); + if(retval < 0) { + IMPORTANT_WINDOWS_COMMENT(); + if(ompi_socket_errno != EINTR && ompi_socket_errno != EAGAIN && ompi_socket_errno != EWOULDBLOCK) { + ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_recv_probe: send() failed with errno=%d\n", + ORTE_NAME_ARGS(orte_process_info.my_name), + ORTE_NAME_ARGS(&(hdr->msg_src)), + ompi_socket_errno); + close(sd); + return; } - close(sd); - return; - } - if(ompi_socket_errno != EINTR) { - ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: recv() failed with errno=%d\n", - ORTE_NAME_ARGS(orte_process_info.my_name), ompi_socket_errno); - close(sd); - return; + continue; } + cnt += retval; } - OMPI_PROCESS_NAME_NTOH(guid[0]); - OMPI_PROCESS_NAME_NTOH(guid[1]); + close(sd); +} + +/* + * Handle connection request + */ +static void mca_oob_tcp_recv_connect(int sd, mca_oob_tcp_hdr_t* hdr) +{ + mca_oob_tcp_peer_t* peer; + int flags; + int cmpval; /* now set socket up to be non-blocking */ if((flags = fcntl(sd, F_GETFL, 0)) < 0) { @@ -390,21 +389,21 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user) /* check for wildcard name - if this is true - we allocate a name from the name server * and return to the peer */ - cmpval = orte_ns.compare(ORTE_NS_CMP_ALL, guid, MCA_OOB_NAME_ANY); + cmpval = orte_ns.compare(ORTE_NS_CMP_ALL, &hdr->msg_src, MCA_OOB_NAME_ANY); if (cmpval == 0) { - if (ORTE_SUCCESS != orte_ns.create_jobid(&guid->jobid)) { + if (ORTE_SUCCESS != orte_ns.create_jobid(&hdr->msg_src.jobid)) { return; } - if (ORTE_SUCCESS != orte_ns.reserve_range(guid->jobid, 1, &guid->vpid)) { + if (ORTE_SUCCESS != orte_ns.reserve_range(hdr->msg_src.jobid, 1, &hdr->msg_src.vpid)) { return; } - if (ORTE_SUCCESS != orte_ns.assign_cellid_to_process(guid)) { + if (ORTE_SUCCESS != orte_ns.assign_cellid_to_process(&hdr->msg_src)) { return; } } /* lookup the corresponding process */ - peer = mca_oob_tcp_peer_lookup(guid); + peer = mca_oob_tcp_peer_lookup(&hdr->msg_src); if(NULL == peer) { ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: unable to locate peer", ORTE_NAME_ARGS(orte_process_info.my_name)); @@ -418,7 +417,7 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user) "rejected connection from [%d,%d,%d] connection state %d", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), - ORTE_NAME_ARGS(&(guid[0])), + ORTE_NAME_ARGS(&(hdr->msg_src)), peer->peer_state); } close(sd); @@ -426,6 +425,59 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user) } } +/* + * Event callback when there is data available on the registered + * socket to recv. + */ + +static void mca_oob_tcp_recv_handler(int sd, short flags, void* user) +{ + mca_oob_tcp_hdr_t hdr; + mca_oob_tcp_event_t* event = (mca_oob_tcp_event_t *)user; + int rc; + + /* accept new connections on the listen socket */ + if(mca_oob_tcp_component.tcp_listen_sd == sd) { + mca_oob_tcp_accept(); + return; + } + OBJ_RELEASE(event); + + /* recv the process identifier */ + while((rc = recv(sd, (char *)&hdr, sizeof(hdr), 0)) != sizeof(hdr)) { + if(rc >= 0) { + if(mca_oob_tcp_component.tcp_debug > 1) { + ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: peer closed connection", + ORTE_NAME_ARGS(orte_process_info.my_name)); + } + close(sd); + return; + } + if(ompi_socket_errno != EINTR) { + ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: recv() failed with errno=%d\n", + ORTE_NAME_ARGS(orte_process_info.my_name), ompi_socket_errno); + close(sd); + return; + } + } + MCA_OOB_TCP_HDR_NTOH(&hdr); + + /* dispatch based on message type */ + switch(hdr.msg_type) { + case MCA_OOB_TCP_PROBE: + mca_oob_tcp_recv_probe(sd, &hdr); + break; + case MCA_OOB_TCP_CONNECT: + mca_oob_tcp_recv_connect(sd, &hdr); + break; + default: + ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: invalid message type: %d\n", hdr.msg_type); + close(sd); + break; + } +} + + /* * Component initialization - create a module. * (1) initialize static resources diff --git a/src/mca/oob/tcp/oob_tcp.h b/src/mca/oob/tcp/oob_tcp.h index 3d0227403b..2cd76ffe0f 100644 --- a/src/mca/oob/tcp/oob_tcp.h +++ b/src/mca/oob/tcp/oob_tcp.h @@ -115,7 +115,7 @@ int mca_oob_tcp_set_addr(const orte_process_name_t*, const char*); * an error status is returned. */ -int mca_oob_tcp_ping(const orte_process_name_t* name, const struct timeval* tv); +int mca_oob_tcp_ping(const orte_process_name_t*, const char* uri, const struct timeval* tv); /** * Similiar to unix writev(2). diff --git a/src/mca/oob/tcp/oob_tcp_hdr.h b/src/mca/oob/tcp/oob_tcp_hdr.h index 5168830474..b8d3cd57aa 100644 --- a/src/mca/oob/tcp/oob_tcp_hdr.h +++ b/src/mca/oob/tcp/oob_tcp_hdr.h @@ -23,9 +23,11 @@ #include "mca/ns/ns_types.h" -#define MCA_OOB_TCP_IDENT 1 -#define MCA_OOB_TCP_DATA 2 -#define MCA_OOB_TCP_PING 3 +#define MCA_OOB_TCP_PROBE 1 +#define MCA_OOB_TCP_CONNECT 2 +#define MCA_OOB_TCP_IDENT 3 +#define MCA_OOB_TCP_DATA 4 +#define MCA_OOB_TCP_PING 5 /** * Header used by tcp oob protocol. diff --git a/src/mca/oob/tcp/oob_tcp_peer.c b/src/mca/oob/tcp/oob_tcp_peer.c index 4d8794a11e..9c58eacc75 100644 --- a/src/mca/oob/tcp/oob_tcp_peer.c +++ b/src/mca/oob/tcp/oob_tcp_peer.c @@ -490,16 +490,17 @@ static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) * have assigned the peer a unique process name - if it came up * without one. */ - orte_process_name_t guid[2]; + mca_oob_tcp_hdr_t hdr; + memset(&hdr,0,sizeof(hdr)); if (NULL == orte_process_info.my_name) { /* my name isn't defined yet */ - guid[0] = *MCA_OOB_NAME_ANY; + hdr.msg_src = *MCA_OOB_NAME_ANY; } else { - guid[0] = *(orte_process_info.my_name); + hdr.msg_src = *(orte_process_info.my_name); } - guid[1] = peer->peer_name; - OMPI_PROCESS_NAME_HTON(guid[0]); - OMPI_PROCESS_NAME_HTON(guid[1]); - if(mca_oob_tcp_peer_send_blocking(peer, guid, sizeof(guid)) != sizeof(guid)) { + hdr.msg_dst = peer->peer_name; + hdr.msg_type = MCA_OOB_TCP_CONNECT; + MCA_OOB_TCP_HDR_HTON(&hdr); + if(mca_oob_tcp_peer_send_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr)) { return OMPI_ERR_UNREACH; } return OMPI_SUCCESS; @@ -512,28 +513,32 @@ static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) */ static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer) { - orte_process_name_t guid[2]; - if((mca_oob_tcp_peer_recv_blocking(peer, guid, sizeof(guid))) != sizeof(guid)) { + mca_oob_tcp_hdr_t hdr; + if((mca_oob_tcp_peer_recv_blocking(peer, &hdr, sizeof(hdr))) != sizeof(hdr)) { + mca_oob_tcp_peer_close(peer); + return OMPI_ERR_UNREACH; + } + MCA_OOB_TCP_HDR_NTOH(&hdr); + if(hdr.msg_type != MCA_OOB_TCP_CONNECT) { + ompi_output(0, "mca_oob_tcp_peer_recv_connect_ack: invalid header type: %d\n", hdr.msg_type); mca_oob_tcp_peer_close(peer); return OMPI_ERR_UNREACH; } - OMPI_PROCESS_NAME_NTOH(guid[0]); - OMPI_PROCESS_NAME_NTOH(guid[1]); /* compare the peers name to the expected value */ - if(memcmp(&peer->peer_name, &guid[0], sizeof(orte_process_name_t)) != 0) { + if(memcmp(&peer->peer_name, &hdr.msg_src, sizeof(orte_process_name_t)) != 0) { ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_recv_connect_ack: " "received unexpected process identifier [%d,%d,%d]\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), - ORTE_NAME_ARGS(&(guid[0]))); + ORTE_NAME_ARGS(&(hdr.msg_src))); mca_oob_tcp_peer_close(peer); return OMPI_ERR_UNREACH; } /* if we have a wildcard name - use the name returned by the peer */ if(orte_ns.compare(ORTE_NS_CMP_ALL, orte_process_info.my_name, &mca_oob_name_any) == 0) { - *orte_process_info.my_name = guid[1]; + *orte_process_info.my_name = hdr.msg_dst; } /* connected */ diff --git a/src/mca/oob/tcp/oob_tcp_ping.c b/src/mca/oob/tcp/oob_tcp_ping.c index 4972f74fdb..0f6c49b8e7 100644 --- a/src/mca/oob/tcp/oob_tcp_ping.c +++ b/src/mca/oob/tcp/oob_tcp_ping.c @@ -14,9 +14,30 @@ * $HEADER$ */ #include "ompi_config.h" +#ifdef HAVE_UNISTD_H +#include +#endif +#include +#ifdef HAVE_SYS_UIO_H +#include +#endif +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#include "include/ompi_socket_errno.h" +#ifdef HAVE_NETINET_IN_H +#include +#endif +#ifdef HAVE_ARPA_INET_H +#include +#endif +#ifdef HAVE_NETINET_TCP_H +#include +#endif #include "mca/ns/ns_types.h" #include "mca/oob/tcp/oob_tcp.h" + /* * Ping a peer to see if it is alive. * @@ -26,78 +47,110 @@ */ int mca_oob_tcp_ping( - const orte_process_name_t* name, + const orte_process_name_t* name, + const char* uri, const struct timeval *timeout) { - mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name); - mca_oob_tcp_msg_t* msg; + int sd, flags, rc; + struct sockaddr_in inaddr; + fd_set fdset; + mca_oob_tcp_hdr_t hdr; struct timeval tv; - struct timespec ts; - int rc; - if(mca_oob_tcp_component.tcp_debug > 1) { - ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: timout %d secs %d usecs\n", + /* parse uri string */ + if(OMPI_SUCCESS != (rc = mca_oob_tcp_parse_uri(uri, &inaddr))) { + ompi_output(0, + "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: invalid uri: %s\n", ORTE_NAME_ARGS(orte_process_info.my_name), - ORTE_NAME_ARGS(&(peer->peer_name)), - timeout->tv_sec, timeout->tv_usec); + ORTE_NAME_ARGS(name), + uri); + return rc; } - if(NULL == peer) + + /* create socket */ + sd = socket(AF_INET, SOCK_STREAM, 0); + if (sd < 0) { + ompi_output(0, + "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: socket() failed with errno=%d\n", + ORTE_NAME_ARGS(orte_process_info.my_name), + ORTE_NAME_ARGS(name), + ompi_socket_errno); return OMPI_ERR_UNREACH; + } - MCA_OOB_TCP_MSG_ALLOC(msg, rc); - if(NULL == msg) - return rc; - - /* convert the header network byte order */ - msg->msg_hdr.msg_type = MCA_OOB_TCP_PING; - msg->msg_hdr.msg_size = 0; - msg->msg_hdr.msg_tag = 0; - if (NULL == orte_process_info.my_name) { /* don't know my name yet */ - msg->msg_hdr.msg_src = *MCA_OOB_NAME_ANY; + /* setup the socket as non-blocking */ + if((flags = fcntl(sd, F_GETFL, 0)) < 0) { + ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: fcntl(F_GETFL) failed with errno=%d\n", + ORTE_NAME_ARGS(orte_process_info.my_name), + ORTE_NAME_ARGS(name), + ompi_socket_errno); } else { - msg->msg_hdr.msg_src = *orte_process_info.my_name; - } - msg->msg_hdr.msg_dst = *name; - MCA_OOB_TCP_HDR_HTON(&msg->msg_hdr); - - /* create an iovec to hold the header */ - msg->msg_type = MCA_OOB_TCP_POSTED; - msg->msg_rc = 0; - msg->msg_flags = 0; - msg->msg_uiov = NULL; - msg->msg_ucnt = 0; - msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg, 1); - msg->msg_rwiov[0].iov_base = (ompi_iov_base_ptr_t)&msg->msg_hdr; - msg->msg_rwiov[0].iov_len = sizeof(msg->msg_hdr); - msg->msg_rwptr = msg->msg_rwiov; - msg->msg_rwcnt = msg->msg_rwnum = 1; - msg->msg_rwbuf = NULL; - msg->msg_cbfunc = NULL; - msg->msg_cbdata = NULL; - msg->msg_complete = false; - msg->msg_peer = peer->peer_name; - - /* initiate the send */ - rc = mca_oob_tcp_peer_send(peer, msg); - if(rc != OMPI_SUCCESS) { - MCA_OOB_TCP_MSG_RETURN(msg); - return rc; + flags |= O_NONBLOCK; + if(fcntl(sd, F_SETFL, flags) < 0) { + ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: fcntl(F_SETFL) failed with errno=%d\n", + ORTE_NAME_ARGS(orte_process_info.my_name), + ORTE_NAME_ARGS(name), + ompi_socket_errno); + } } - /* setup a timeout based on absolute time and wait for completion */ - gettimeofday(&tv, NULL); - tv.tv_sec += timeout->tv_sec; - tv.tv_usec += timeout->tv_usec; - while(tv.tv_usec > 1000000) { - tv.tv_sec++; - tv.tv_usec -= 1000000; + /* start the connect - will likely fail with EINPROGRESS */ + FD_ZERO(&fdset); + if(connect(sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) { + /* connect failed? */ + if(ompi_socket_errno != EINPROGRESS && ompi_socket_errno != EWOULDBLOCK) { + close(sd); + return OMPI_ERR_UNREACH; + } + + /* select with timeout to wait for connect to complete */ + FD_SET(sd, &fdset); + tv = *timeout; + rc = select(sd+1, NULL, &fdset, NULL, &tv); + if(rc <= 0) { + close(sd); + return OMPI_ERR_UNREACH; + } } - ts.tv_sec = tv.tv_sec; - ts.tv_nsec = (tv.tv_usec * 1000); - rc = mca_oob_tcp_msg_timedwait(msg, NULL, &ts); - if(rc != OMPI_SUCCESS) - mca_oob_tcp_peer_dequeue_msg(peer,msg); - MCA_OOB_TCP_MSG_RETURN(msg); - return rc; + + /* set socket back to blocking */ + flags &= ~O_NONBLOCK; + if(fcntl(sd, F_SETFL, flags) < 0) { + ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: fcntl(F_SETFL) failed with errno=%d\n", + ORTE_NAME_ARGS(orte_process_info.my_name), + ORTE_NAME_ARGS(name), + ompi_socket_errno); + } + + /* send a probe message */ + memset(&hdr, 0, sizeof(hdr)); + if(orte_process_info.my_name != NULL) { + hdr.msg_src = *orte_process_info.my_name; + } else { + hdr.msg_src = mca_oob_name_any; + } + hdr.msg_dst = *name; + hdr.msg_type = MCA_OOB_TCP_PROBE; + + if((rc = write(sd, &hdr, sizeof(hdr))) != sizeof(hdr)) { + close(sd); + return OMPI_ERR_UNREACH; + } + + /* select with timeout to wait for response */ + FD_SET(sd, &fdset); + tv = *timeout; + rc = select(sd+1, &fdset, NULL, NULL, &tv); + if(rc <= 0) { + close(sd); + return OMPI_ERR_UNREACH; + } + if((rc = read(sd, &hdr, sizeof(hdr))) != sizeof(hdr)) { + close(sd); + return OMPI_ERR_UNREACH; + } + close(sd); + return OMPI_SUCCESS; } + diff --git a/src/mca/rml/rml.h b/src/mca/rml/rml.h index 80e1641c89..16601e064f 100644 --- a/src/mca/rml/rml.h +++ b/src/mca/rml/rml.h @@ -94,7 +94,7 @@ typedef int (*orte_rml_module_parse_uris_fn_t)(const char* uri, * @return OMPI error code (<0) or OMPI_SUCCESS */ -typedef int (*orte_rml_module_ping_fn_t)(const orte_process_name_t*, const struct timeval* tv); +typedef int (*orte_rml_module_ping_fn_t)(const char* uri, const struct timeval* tv); /** * orte_rml.rml_send()