1
1

revised oob ping so that it doesnt attempt to setup/use a persistent connection

This commit was SVN r5601.
Этот коммит содержится в:
Tim Woodall 2005-05-05 16:31:40 +00:00
родитель b429c73240
Коммит 754dc71177
9 изменённых файлов: 251 добавлений и 122 удалений

Просмотреть файл

@ -102,7 +102,7 @@ OMPI_DECLSPEC int mca_oob_set_contact_info(const char*);
* an error status is returned.
*/
OMPI_DECLSPEC int mca_oob_ping(orte_process_name_t* name, struct timeval* tv);
OMPI_DECLSPEC int mca_oob_ping(const char*, struct timeval* tv);
/**
* A barrier across all processes w/in the same job.

Просмотреть файл

@ -24,13 +24,30 @@
#endif
#include "include/constants.h"
#include "util/argv.h"
#include "mca/ns/ns_types.h"
#include "mca/oob/oob.h"
#include "mca/oob/base/base.h"
int mca_oob_ping(orte_process_name_t* peer, struct timeval* tv)
int mca_oob_ping(const char* contact_info, struct timeval* tv)
{
return(mca_oob.oob_ping(peer, tv));
orte_process_name_t name;
char** uris;
char** ptr;
int rc;
if(ORTE_SUCCESS != (rc = mca_oob_parse_contact_info(contact_info, &name, &uris))) {
return rc;
}
ptr = uris;
while(ptr && *ptr) {
if(ORTE_SUCCESS == (rc = mca_oob.oob_ping(&name, *ptr, tv)))
break;
ptr++;
}
ompi_argv_free(uris);
return rc;
}

Просмотреть файл

@ -70,7 +70,7 @@ typedef char* (*mca_oob_base_module_get_addr_fn_t)(void);
* @param addr Address of seed in component specific uri format.
*/
typedef int (*mca_oob_base_module_set_addr_fn_t)(const orte_process_name_t*, const char* addr);
typedef int (*mca_oob_base_module_set_addr_fn_t)(const orte_process_name_t*, const char* uri);
/**
@ -81,7 +81,7 @@ typedef int (*mca_oob_base_module_set_addr_fn_t)(const orte_process_name_t*, con
* @return OMPI error code (<0) or OMPI_SUCCESS
*/
typedef int (*mca_oob_base_module_ping_fn_t)(const orte_process_name_t*, const struct timeval* tv);
typedef int (*mca_oob_base_module_ping_fn_t)(const orte_process_name_t*, const char* uri, const struct timeval* tv);
/**
* Implementation of mca_oob_send().

Просмотреть файл

@ -337,43 +337,42 @@ static int mca_oob_tcp_create_listen(void)
/*
* Event callback when there is data available on the registered
* socket to recv.
* Handle probe
*/
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
static void mca_oob_tcp_recv_probe(int sd, mca_oob_tcp_hdr_t* hdr)
{
unsigned char* ptr = (unsigned char*)&hdr;
size_t cnt = 0;
hdr->msg_dst = hdr->msg_src;
hdr->msg_src = *orte_process_info.my_name;
while(cnt < sizeof(mca_oob_tcp_hdr_t)) {
int retval = send(sd, (char *)ptr+cnt, sizeof(mca_oob_tcp_hdr_t)-cnt, 0);
if(retval < 0) {
IMPORTANT_WINDOWS_COMMENT();
if(ompi_socket_errno != EINTR && ompi_socket_errno != EAGAIN && ompi_socket_errno != EWOULDBLOCK) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_recv_probe: send() failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(hdr->msg_src)),
ompi_socket_errno);
close(sd);
return;
}
continue;
}
cnt += retval;
}
close(sd);
}
/*
* Handle connection request
*/
static void mca_oob_tcp_recv_connect(int sd, mca_oob_tcp_hdr_t* hdr)
{
orte_process_name_t guid[2];
mca_oob_tcp_peer_t* peer;
int rc, cmpval;
mca_oob_tcp_event_t* event = (mca_oob_tcp_event_t *)user;
/* accept new connections on the listen socket */
if(mca_oob_tcp_component.tcp_listen_sd == sd) {
mca_oob_tcp_accept();
return;
}
OBJ_RELEASE(event);
/* recv the process identifier */
while((rc = recv(sd, (char *)guid, sizeof(guid), 0)) != sizeof(guid)) {
if(rc >= 0) {
if(mca_oob_tcp_component.tcp_debug > 1) {
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: peer closed connection",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
close(sd);
return;
}
if(ompi_socket_errno != EINTR) {
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: recv() failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name), ompi_socket_errno);
close(sd);
return;
}
}
OMPI_PROCESS_NAME_NTOH(guid[0]);
OMPI_PROCESS_NAME_NTOH(guid[1]);
int flags;
int cmpval;
/* now set socket up to be non-blocking */
if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
@ -390,21 +389,21 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
/* check for wildcard name - if this is true - we allocate a name from the name server
* and return to the peer
*/
cmpval = orte_ns.compare(ORTE_NS_CMP_ALL, guid, MCA_OOB_NAME_ANY);
cmpval = orte_ns.compare(ORTE_NS_CMP_ALL, &hdr->msg_src, MCA_OOB_NAME_ANY);
if (cmpval == 0) {
if (ORTE_SUCCESS != orte_ns.create_jobid(&guid->jobid)) {
if (ORTE_SUCCESS != orte_ns.create_jobid(&hdr->msg_src.jobid)) {
return;
}
if (ORTE_SUCCESS != orte_ns.reserve_range(guid->jobid, 1, &guid->vpid)) {
if (ORTE_SUCCESS != orte_ns.reserve_range(hdr->msg_src.jobid, 1, &hdr->msg_src.vpid)) {
return;
}
if (ORTE_SUCCESS != orte_ns.assign_cellid_to_process(guid)) {
if (ORTE_SUCCESS != orte_ns.assign_cellid_to_process(&hdr->msg_src)) {
return;
}
}
/* lookup the corresponding process */
peer = mca_oob_tcp_peer_lookup(guid);
peer = mca_oob_tcp_peer_lookup(&hdr->msg_src);
if(NULL == peer) {
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: unable to locate peer",
ORTE_NAME_ARGS(orte_process_info.my_name));
@ -418,7 +417,7 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
"rejected connection from [%d,%d,%d] connection state %d",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
ORTE_NAME_ARGS(&(guid[0])),
ORTE_NAME_ARGS(&(hdr->msg_src)),
peer->peer_state);
}
close(sd);
@ -426,6 +425,59 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
}
}
/*
* Event callback when there is data available on the registered
* socket to recv.
*/
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
{
mca_oob_tcp_hdr_t hdr;
mca_oob_tcp_event_t* event = (mca_oob_tcp_event_t *)user;
int rc;
/* accept new connections on the listen socket */
if(mca_oob_tcp_component.tcp_listen_sd == sd) {
mca_oob_tcp_accept();
return;
}
OBJ_RELEASE(event);
/* recv the process identifier */
while((rc = recv(sd, (char *)&hdr, sizeof(hdr), 0)) != sizeof(hdr)) {
if(rc >= 0) {
if(mca_oob_tcp_component.tcp_debug > 1) {
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: peer closed connection",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
close(sd);
return;
}
if(ompi_socket_errno != EINTR) {
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: recv() failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name), ompi_socket_errno);
close(sd);
return;
}
}
MCA_OOB_TCP_HDR_NTOH(&hdr);
/* dispatch based on message type */
switch(hdr.msg_type) {
case MCA_OOB_TCP_PROBE:
mca_oob_tcp_recv_probe(sd, &hdr);
break;
case MCA_OOB_TCP_CONNECT:
mca_oob_tcp_recv_connect(sd, &hdr);
break;
default:
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: invalid message type: %d\n", hdr.msg_type);
close(sd);
break;
}
}
/*
* Component initialization - create a module.
* (1) initialize static resources

Просмотреть файл

@ -115,7 +115,7 @@ int mca_oob_tcp_set_addr(const orte_process_name_t*, const char*);
* an error status is returned.
*/
int mca_oob_tcp_ping(const orte_process_name_t* name, const struct timeval* tv);
int mca_oob_tcp_ping(const orte_process_name_t*, const char* uri, const struct timeval* tv);
/**
* Similiar to unix writev(2).

Просмотреть файл

@ -23,9 +23,11 @@
#include "mca/ns/ns_types.h"
#define MCA_OOB_TCP_IDENT 1
#define MCA_OOB_TCP_DATA 2
#define MCA_OOB_TCP_PING 3
#define MCA_OOB_TCP_PROBE 1
#define MCA_OOB_TCP_CONNECT 2
#define MCA_OOB_TCP_IDENT 3
#define MCA_OOB_TCP_DATA 4
#define MCA_OOB_TCP_PING 5
/**
* Header used by tcp oob protocol.

Просмотреть файл

@ -490,16 +490,17 @@ static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
* have assigned the peer a unique process name - if it came up
* without one.
*/
orte_process_name_t guid[2];
mca_oob_tcp_hdr_t hdr;
memset(&hdr,0,sizeof(hdr));
if (NULL == orte_process_info.my_name) { /* my name isn't defined yet */
guid[0] = *MCA_OOB_NAME_ANY;
hdr.msg_src = *MCA_OOB_NAME_ANY;
} else {
guid[0] = *(orte_process_info.my_name);
hdr.msg_src = *(orte_process_info.my_name);
}
guid[1] = peer->peer_name;
OMPI_PROCESS_NAME_HTON(guid[0]);
OMPI_PROCESS_NAME_HTON(guid[1]);
if(mca_oob_tcp_peer_send_blocking(peer, guid, sizeof(guid)) != sizeof(guid)) {
hdr.msg_dst = peer->peer_name;
hdr.msg_type = MCA_OOB_TCP_CONNECT;
MCA_OOB_TCP_HDR_HTON(&hdr);
if(mca_oob_tcp_peer_send_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr)) {
return OMPI_ERR_UNREACH;
}
return OMPI_SUCCESS;
@ -512,28 +513,32 @@ static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
*/
static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
{
orte_process_name_t guid[2];
if((mca_oob_tcp_peer_recv_blocking(peer, guid, sizeof(guid))) != sizeof(guid)) {
mca_oob_tcp_hdr_t hdr;
if((mca_oob_tcp_peer_recv_blocking(peer, &hdr, sizeof(hdr))) != sizeof(hdr)) {
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
}
MCA_OOB_TCP_HDR_NTOH(&hdr);
if(hdr.msg_type != MCA_OOB_TCP_CONNECT) {
ompi_output(0, "mca_oob_tcp_peer_recv_connect_ack: invalid header type: %d\n", hdr.msg_type);
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
}
OMPI_PROCESS_NAME_NTOH(guid[0]);
OMPI_PROCESS_NAME_NTOH(guid[1]);
/* compare the peers name to the expected value */
if(memcmp(&peer->peer_name, &guid[0], sizeof(orte_process_name_t)) != 0) {
if(memcmp(&peer->peer_name, &hdr.msg_src, sizeof(orte_process_name_t)) != 0) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_recv_connect_ack: "
"received unexpected process identifier [%d,%d,%d]\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
ORTE_NAME_ARGS(&(guid[0])));
ORTE_NAME_ARGS(&(hdr.msg_src)));
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
}
/* if we have a wildcard name - use the name returned by the peer */
if(orte_ns.compare(ORTE_NS_CMP_ALL, orte_process_info.my_name, &mca_oob_name_any) == 0) {
*orte_process_info.my_name = guid[1];
*orte_process_info.my_name = hdr.msg_dst;
}
/* connected */

Просмотреть файл

@ -14,9 +14,30 @@
* $HEADER$
*/
#include "ompi_config.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <fcntl.h>
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include "include/ompi_socket_errno.h"
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETINET_TCP_H
#include <netinet/tcp.h>
#endif
#include "mca/ns/ns_types.h"
#include "mca/oob/tcp/oob_tcp.h"
/*
* Ping a peer to see if it is alive.
*
@ -27,77 +48,109 @@
int mca_oob_tcp_ping(
const orte_process_name_t* name,
const char* uri,
const struct timeval *timeout)
{
mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name);
mca_oob_tcp_msg_t* msg;
int sd, flags, rc;
struct sockaddr_in inaddr;
fd_set fdset;
mca_oob_tcp_hdr_t hdr;
struct timeval tv;
struct timespec ts;
int rc;
if(mca_oob_tcp_component.tcp_debug > 1) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: timout %d secs %d usecs\n",
/* parse uri string */
if(OMPI_SUCCESS != (rc = mca_oob_tcp_parse_uri(uri, &inaddr))) {
ompi_output(0,
"[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: invalid uri: %s\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
timeout->tv_sec, timeout->tv_usec);
ORTE_NAME_ARGS(name),
uri);
return rc;
}
if(NULL == peer)
/* create socket */
sd = socket(AF_INET, SOCK_STREAM, 0);
if (sd < 0) {
ompi_output(0,
"[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: socket() failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(name),
ompi_socket_errno);
return OMPI_ERR_UNREACH;
}
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg)
return rc;
/* convert the header network byte order */
msg->msg_hdr.msg_type = MCA_OOB_TCP_PING;
msg->msg_hdr.msg_size = 0;
msg->msg_hdr.msg_tag = 0;
if (NULL == orte_process_info.my_name) { /* don't know my name yet */
msg->msg_hdr.msg_src = *MCA_OOB_NAME_ANY;
/* setup the socket as non-blocking */
if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: fcntl(F_GETFL) failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(name),
ompi_socket_errno);
} else {
msg->msg_hdr.msg_src = *orte_process_info.my_name;
flags |= O_NONBLOCK;
if(fcntl(sd, F_SETFL, flags) < 0) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: fcntl(F_SETFL) failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(name),
ompi_socket_errno);
}
msg->msg_hdr.msg_dst = *name;
MCA_OOB_TCP_HDR_HTON(&msg->msg_hdr);
/* create an iovec to hold the header */
msg->msg_type = MCA_OOB_TCP_POSTED;
msg->msg_rc = 0;
msg->msg_flags = 0;
msg->msg_uiov = NULL;
msg->msg_ucnt = 0;
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg, 1);
msg->msg_rwiov[0].iov_base = (ompi_iov_base_ptr_t)&msg->msg_hdr;
msg->msg_rwiov[0].iov_len = sizeof(msg->msg_hdr);
msg->msg_rwptr = msg->msg_rwiov;
msg->msg_rwcnt = msg->msg_rwnum = 1;
msg->msg_rwbuf = NULL;
msg->msg_cbfunc = NULL;
msg->msg_cbdata = NULL;
msg->msg_complete = false;
msg->msg_peer = peer->peer_name;
/* initiate the send */
rc = mca_oob_tcp_peer_send(peer, msg);
if(rc != OMPI_SUCCESS) {
MCA_OOB_TCP_MSG_RETURN(msg);
return rc;
}
/* setup a timeout based on absolute time and wait for completion */
gettimeofday(&tv, NULL);
tv.tv_sec += timeout->tv_sec;
tv.tv_usec += timeout->tv_usec;
while(tv.tv_usec > 1000000) {
tv.tv_sec++;
tv.tv_usec -= 1000000;
}
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = (tv.tv_usec * 1000);
rc = mca_oob_tcp_msg_timedwait(msg, NULL, &ts);
if(rc != OMPI_SUCCESS)
mca_oob_tcp_peer_dequeue_msg(peer,msg);
MCA_OOB_TCP_MSG_RETURN(msg);
return rc;
/* start the connect - will likely fail with EINPROGRESS */
FD_ZERO(&fdset);
if(connect(sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
/* connect failed? */
if(ompi_socket_errno != EINPROGRESS && ompi_socket_errno != EWOULDBLOCK) {
close(sd);
return OMPI_ERR_UNREACH;
}
/* select with timeout to wait for connect to complete */
FD_SET(sd, &fdset);
tv = *timeout;
rc = select(sd+1, NULL, &fdset, NULL, &tv);
if(rc <= 0) {
close(sd);
return OMPI_ERR_UNREACH;
}
}
/* set socket back to blocking */
flags &= ~O_NONBLOCK;
if(fcntl(sd, F_SETFL, flags) < 0) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_ping: fcntl(F_SETFL) failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(name),
ompi_socket_errno);
}
/* send a probe message */
memset(&hdr, 0, sizeof(hdr));
if(orte_process_info.my_name != NULL) {
hdr.msg_src = *orte_process_info.my_name;
} else {
hdr.msg_src = mca_oob_name_any;
}
hdr.msg_dst = *name;
hdr.msg_type = MCA_OOB_TCP_PROBE;
if((rc = write(sd, &hdr, sizeof(hdr))) != sizeof(hdr)) {
close(sd);
return OMPI_ERR_UNREACH;
}
/* select with timeout to wait for response */
FD_SET(sd, &fdset);
tv = *timeout;
rc = select(sd+1, &fdset, NULL, NULL, &tv);
if(rc <= 0) {
close(sd);
return OMPI_ERR_UNREACH;
}
if((rc = read(sd, &hdr, sizeof(hdr))) != sizeof(hdr)) {
close(sd);
return OMPI_ERR_UNREACH;
}
close(sd);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -94,7 +94,7 @@ typedef int (*orte_rml_module_parse_uris_fn_t)(const char* uri,
* @return OMPI error code (<0) or OMPI_SUCCESS
*/
typedef int (*orte_rml_module_ping_fn_t)(const orte_process_name_t*, const struct timeval* tv);
typedef int (*orte_rml_module_ping_fn_t)(const char* uri, const struct timeval* tv);
/**
* orte_rml.rml_send()