1
1
This commit was SVN r1648.
Этот коммит содержится в:
Tim Woodall 2004-07-12 22:46:57 +00:00
родитель 85b39439a8
Коммит 8c1f2a1a68
9 изменённых файлов: 845 добавлений и 192 удалений

Просмотреть файл

@ -8,10 +8,8 @@ include $(top_ompi_srcdir)/config/Makefile.options
sources = \
oob_tcp.c \
oob_tcp.h \
oob_tcp_message.h \
oob_tcp_msg.c \
oob_tcp_peer.c \
oob_tcp_peer.h \
oob_tcp_recv.c \
oob_tcp_send.c
@ -30,8 +28,10 @@ endif
mcacomponentdir = $(libdir)/openmpi
mcacomponent_LTLIBRARIES = $(component_install)
mca_oob_tcp_la_SOURCES = $(sources)
mca_oob_tcp_la_LIBADD = $(LIBOMPI_LA)
mca_oob_tcp_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_oob_tcp_la_SOURCES = $(sources)
libmca_oob_tcp_la_LDFLAGS = -module -avoid-version

Просмотреть файл

@ -9,10 +9,10 @@
/*
* Struct of function pointers and all that to let us be initialized
*/
mca_oob_base_component_1_0_0_t mca_oob_tcp_module = {
mca_oob_tcp_component_t mca_oob_tcp_module = {
{
{
MCA_OOB_BASE_VERSION_1_0_0,
"tcp", /* MCA module name */
1, /* MCA module major version */
0, /* MCA module minor version */
@ -25,9 +25,10 @@ mca_oob_base_component_1_0_0_t mca_oob_tcp_module = {
},
mca_oob_tcp_init, /* module init */
mca_oob_tcp_finalize
}
};
struct mca_oob_1_0_0_t mca_oob_tcp_1_0_0 = {
static struct mca_oob_1_0_0_t mca_oob_tcp = {
mca_oob_tcp_send,
mca_oob_tcp_recv,
mca_oob_tcp_send_nb,
@ -35,22 +36,64 @@ struct mca_oob_1_0_0_t mca_oob_tcp_1_0_0 = {
};
/*
* for now these 2 functions simply return an error so we won't
* use this module
* Initialize global variables used w/in this module.
*/
int mca_oob_tcp_open(void)
{
return OMPI_ERROR;
mca_oob_tcp_module.tcp_listen_port = 1;
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_peer_list, ompi_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_peer_tree, ompi_rb_tree_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_peer_free, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_condition, ompi_condition_t);
return OMPI_SUCCESS;
}
/*
* Cleanup of global variables used by this module.
*/
int mca_oob_tcp_close(void)
{
return OMPI_ERROR;
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_list);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_tree);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_free);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_condition);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_lock);
return OMPI_SUCCESS;
}
/**
* Compare two process names for equality.
*
* @param n1 Process name 1.
* @param n2 Process name 2.
* @return (-1 for n1<n2 0 for equality, 1 for n1>n2)
*
* Note that the definition of < or > is somewhat arbitrary -
* just needs to be consistently applied to maintain an ordering
* when process names are used as indices.
*/
static int ompi_process_name_compare(ompi_process_name_t* n1, ompi_process_name_t* n2)
{
if(n1->cellid < n2->cellid)
return -1;
else if(n1->cellid > n2->cellid)
return 1;
else if(n1->jobid < n2->jobid)
return -1;
else if(n1->jobid > n2->jobid)
return 1;
else if(n1->procid < n2->procid)
return -1;
else if(n1->procid > n2->procid)
return 1;
return(0);
}
ompi_list_t mca_oob_tcp_peer_list;
ompi_rb_tree_t mca_oob_tcp_peer_tree;
/*
* this function will temporarily return NULL so we don't use it
@ -58,19 +101,17 @@ ompi_rb_tree_t mca_oob_tcp_peer_tree;
struct mca_oob_1_0_0_t* mca_oob_tcp_init(bool *allow_multi_user_threads,
bool *have_hidden_threads)
{
/* set up the list for the cache of peer processes */
OBJ_CONSTRUCT(&mca_oob_tcp_peer_list, ompi_list_t);
/* set up the rb tree for the cache of peer processes */
OBJ_CONSTRUCT(&mca_oob_tcp_peer_tree, ompi_rb_tree_t);
ompi_rb_tree_init(&mca_oob_tcp_peer_tree, &mca_oob_tcp_peer_comp);
/* return &mca_oob_tcp_1_0_0; */
/* initialize data structures */
ompi_rb_tree_init(&mca_oob_tcp_module.tcp_peer_tree, (ompi_rb_tree_comp_fn_t)ompi_process_name_compare);
/* return &mca_oob_tcp; */
return NULL;
}
int mca_oob_tcp_finalize(void)
{
OBJ_DESTRUCT(&mca_oob_tcp_peer_list);
OBJ_DESTRUCT(&mca_oob_tcp_peer_tree);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_list);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_tree);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -11,19 +11,13 @@
#include "mca/oob/oob.h"
#include "mca/oob/base/base.h"
#include "class/ompi_free_list.h"
#include "class/ompi_rb_tree.h"
#include "event/event.h"
#include "threads/mutex.h"
#include "threads/condition.h"
#include "mca/oob/tcp/oob_tcp_peer.h"
#include "mca/oob/tcp/oob_tcp_message.h"
/*
* the list of peers
*/
extern ompi_list_t mca_oob_tcp_peer_list;
/*
* the tree of peers
*/
extern ompi_rb_tree_t mca_oob_tcp_peer_tree;
#include "mca/oob/tcp/oob_tcp_msg.h"
#if defined(c_plusplus) || defined(__cplusplus)
@ -102,6 +96,28 @@ int mca_oob_tcp_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int
mca_oob_callback_fn_t cbfunc, void* cbdata);
/**
* OOB TCP Component
*/
struct mca_oob_tcp_component_t {
mca_oob_base_component_1_0_0_t super; /**< base PTL component */
int tcp_listen_sd; /**< listen socket for incoming connection requests */
unsigned short tcp_listen_port; /**< listen port */
ompi_list_t tcp_peer_list; /**< list of peers sorted in mru order */
ompi_rb_tree_t tcp_peer_tree; /**< tree of peers sorted by name */
ompi_free_list_t tcp_peer_free; /**< free list of peers */
ompi_free_list_t tcp_msgs; /**< free list of messages */
ompi_event_t tcp_send_event; /**< event structure for sends */
ompi_event_t tcp_recv_event; /**< event structure for recvs */
ompi_mutex_t tcp_lock; /**< lock for accessing module state */
ompi_condition_t tcp_condition; /**< condition variable for blocking sends */
size_t tcp_cache_size; /**< max size of tcp peer cache */
};
typedef struct mca_oob_tcp_component_t mca_oob_tcp_component_t;
extern mca_oob_tcp_component_t mca_oob_tcp_module;
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

Просмотреть файл

@ -1,31 +0,0 @@
/*
* $HEADER$
*/
/** @file:
*
* contains the data structure we will use to describe a message
*/
#ifndef _MCA_OOB_TCP_MESSAGE_H_
#define _MCA_OOB_TCP_MESSAGE_H_
#include "class/ompi_list.h"
#include "mca/oob/tcp/oob_tcp_peer.h"
#include "mca/oob/oob.h"
/**
* the describes each message
*/
struct mca_oob_tcp_message_t {
ompi_list_item_t super; /**< make it so we can put this on a list */
size_t message_state; /**< the amount sent or recieved */
struct iovec * message_data; /**< the data of the message */
struct iovec * message_tmp; /**< in case we have to make a copy of the iovecs */
int message_count; /**< the number of items in the iovect array */
mca_oob_callback_fn_t cbfunc;/**< the callback function for the send/recieve */
void * cbdata; /**< the data for the callback fnuction */
};
#endif /* _MCA_OOB_TCP_MESSAGE_H_ */

50
src/mca/oob/tcp/oob_tcp_msg.c Обычный файл
Просмотреть файл

@ -0,0 +1,50 @@
#include "oob_tcp.h"
#include "oob_tcp_msg.h"
static void mca_oob_tcp_msg_construct(mca_oob_tcp_msg_t*);
static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t*);
OBJ_CLASS_INSTANCE(
mca_oob_tcp_msg_t,
ompi_list_item_t,
mca_oob_tcp_msg_construct,
mca_oob_tcp_msg_destruct);
static void mca_oob_tcp_msg_construct(mca_oob_tcp_msg_t* msg)
{
}
static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t* msg)
{
}
/**
* Wait for a msg to complete.
* @param msg (IN) Message to wait on.
* @param size (OUT) Number of bytes delivered.
* @retval OMPI_SUCCESS or error code on failure.
*/
int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size)
{
return OMPI_SUCCESS;
}
/**
* Signal that a message has completed.
* @param msg (IN) Message to wait on.
* @retval OMPI_SUCCESS or error code on failure.
*/
int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg)
{
return OMPI_SUCCESS;
}

98
src/mca/oob/tcp/oob_tcp_msg.h Обычный файл
Просмотреть файл

@ -0,0 +1,98 @@
/*
* $HEADER$
*/
/** @file:
*
* contains the data structure we will use to describe a message
*/
#ifndef _MCA_OOB_TCP_MESSAGE_H_
#define _MCA_OOB_TCP_MESSAGE_H_
#include "class/ompi_list.h"
#include "mca/oob/tcp/oob_tcp_peer.h"
#include "mca/oob/oob.h"
struct mca_oob_tcp_peer_t;
/**
* describes each message being progressed.
*/
struct mca_oob_tcp_msg_t {
ompi_list_item_t super; /**< make it so we can put this on a list */
size_t msg_state; /**< the amount sent or recieved */
const struct iovec * msg_user; /**< the data of the message */
struct iovec * msg_iov; /**< copy of iovec array - not data */
struct iovec * msg_rwptr; /**< current read/write pointer into msg_iov */
int msg_rwcnt; /**< number of iovecs left for read/write */
int msg_count; /**< the number of items in the iovec array */
mca_oob_callback_fn_t msg_cbfunc; /**< the callback function for the send/recieve */
void *msg_cbdata; /**< the data for the callback fnuction */
bool msg_complete;
};
typedef struct mca_oob_tcp_msg_t mca_oob_tcp_msg_t;
OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t);
#define MCA_OOB_TCP_MSG_ALLOC(msg, rc) \
{ \
ompi_list_item_t* item; \
OMPI_FREE_LIST_GET(&mca_oob_tcp_module.tcp_msgs, item, rc); \
msg = (mca_oob_tcp_msg_t*)item; \
}
#define MCA_OOB_TCP_MSG_RETURN(msg) \
{ \
OMPI_FREE_LIST_RETURN(&mca_oob_tcp_module.tcp_msgs, (ompi_list_item_t*)msg); \
}
/**
* Wait for a msg to complete.
* @param msg (IN) Message to wait on.
* @param size (OUT) Number of bytes delivered.
* @retval OMPI_SUCCESS or error code on failure.
*/
int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size);
/**
* Signal that a message has completed. Wakes up any pending threads (for blocking send)
* or invokes callbacks for non-blocking case.
* @param msg (IN) Message send/recv that has completed.
* @retval OMPI_SUCCESS or error code on failure.
*/
int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg);
/**
* Called asynchronously to progress sending a message from the event library thread.
* @param msg (IN) Message send that is in progress.
* @param sd (IN) Socket descriptor to use for send.
* @retval bool Bool flag indicating wether operation has completed.
*/
bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, int sd);
/**
* Called asynchronously to progress sending a message from the event library thread.
* @param msg (IN) Message send that is in progress.
* @param sd (IN) Socket descriptor to use for send.
* @retval bool Bool flag indicating wether operation has completed.
*/
bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, int sd);
/**
* Initialize a message for send/recv.
* @param msg (IN) Message send that is in progress.
* @param peer (IN) Peer to send/recv message to/from.
*/
void mca_oob_tcp_msg_init(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t*);
#endif /* _MCA_OOB_TCP_MESSAGE_H_ */

Просмотреть файл

@ -1,37 +1,37 @@
#include <unistd.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/errno.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "util/output.h"
#include "mca/oob/tcp/oob_tcp_peer.h"
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer);
static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer, int sd);
static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t* peer);
static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);
static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer);
static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size);
static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size);
static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user);
static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user);
static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg);
OBJ_CLASS_INSTANCE(
mca_oob_tcp_peer_t,
ompi_list_item_t,
&mca_oob_tcp_peer_construct,
&mca_oob_tcp_peer_destruct);
mca_oob_tcp_peer_construct,
mca_oob_tcp_peer_destruct);
/*
* The function to compare 2 peers. Used for the rb tree
*
* @param peer1 the first peer
* @param peer2 the second peer
*
* @retval <0 if peer1 < peer2
* @retval >0 if peer1 > peer2
* @retval 0 if peer1 == peer2
*/
/*****NEED TO MAKE WORK *****/
int mca_oob_tcp_peer_comp(void * key1, void * key2)
{
/* mca_oob_tcp_peer_t * p1 = (mca_oob_tcp_peer_t *) key1;
mca_oob_tcp_peer_t * p2 = (mca_oob_tcp_peer_t *) key2;
if(p1->peer_name < p2->peer_name) {
return(-1);
} else if(p1->peer_name > p2->peer_name) {
return(1);
}*/
return(0);
}
/*
* This is the constructor function for the mca_oob_tcp_peer
* struct. Note that this function and OBJ_NEW should NEVER
@ -40,10 +40,10 @@ int mca_oob_tcp_peer_comp(void * key1, void * key2)
* @param peer a pointer to the mca_oob_tcp_peer_t struct to be initialized
* @retval none
*/
void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer)
static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer)
{
OBJ_CONSTRUCT(&(peer->peer_send), ompi_list_t);
OBJ_CONSTRUCT(&(peer->peer_recv), ompi_list_t);
OBJ_CONSTRUCT(&(peer->peer_send_queue), ompi_list_t);
OBJ_CONSTRUCT(&(peer->peer_recv_queue), ompi_list_t);
OBJ_CONSTRUCT(&(peer->peer_lock), ompi_mutex_t);
}
@ -55,47 +55,485 @@ void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer)
* @param peer a pointer to the mca_oob_tcp_peer_t struct to be destroyed
* @retval none
*/
void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t * peer)
static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t * peer)
{
OBJ_DESTRUCT(&(peer->peer_send));
OBJ_DESTRUCT(&(peer->peer_recv));
OBJ_DESTRUCT(&(peer->peer_send_queue));
OBJ_DESTRUCT(&(peer->peer_recv_queue));
OBJ_DESTRUCT(&(peer->peer_lock));
}
/*
* Creates a peer structure and adds to the tree and list.
*
* @param peer_name the name of the peer
*
* @retval pointer to the newly created struture
* @retval NULL if there was a problem
* Initialize events to be used by the peer instance for TCP select/poll callbacks.
*/
mca_oob_tcp_peer_t * mca_oob_tcp_add_peer(ompi_process_name_t peer_name)
{
mca_oob_tcp_peer_t * new_peer = OBJ_NEW(mca_oob_tcp_peer_t);
new_peer->peer_name = peer_name;
if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_peer_tree, &new_peer, NULL)) {
free(new_peer);
return NULL;
}
ompi_list_prepend(&mca_oob_tcp_peer_list, (ompi_list_item_t *) new_peer);
return new_peer;
}
/*
* Deletes a peer structure from the tree and lists and frees its memory
*
* @param peer_name the name of the peer
*
* @retval OMPI_SUCCESS
*/
int mca_oob_tcp_del_peer(ompi_process_name_t peer_name)
static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer, int sd)
{
mca_oob_tcp_peer_t * peer = ompi_rb_tree_find(&mca_oob_tcp_peer_tree, &peer_name);
ompi_rb_tree_delete(&mca_oob_tcp_peer_tree, peer);
ompi_list_remove_item(&mca_oob_tcp_peer_list, (ompi_list_item_t *)peer);
OBJ_RELEASE(peer);
ompi_event_set(
&peer->peer_recv_event,
peer->peer_sd,
OMPI_EV_READ|OMPI_EV_PERSIST,
mca_oob_tcp_peer_recv_handler,
peer);
ompi_event_set(
&peer->peer_send_event,
peer->peer_sd,
OMPI_EV_WRITE|OMPI_EV_PERSIST,
mca_oob_tcp_peer_send_handler,
peer);
return OMPI_SUCCESS;
}
/*
*
*
*/
int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
{
int rc = OMPI_SUCCESS;
OMPI_THREAD_LOCK(&peer->peer_lock);
switch(peer->peer_state) {
case MCA_OOB_TCP_CONNECTING:
case MCA_OOB_TCP_CONNECT_ACK:
case MCA_OOB_TCP_CLOSED:
/*
* queue the message and start the connection to the peer
*/
ompi_list_append(&peer->peer_send_queue, (ompi_list_item_t*)msg);
if(peer->peer_state == MCA_OOB_TCP_CLOSED)
rc = mca_oob_tcp_peer_start_connect(peer);
break;
case MCA_OOB_TCP_FAILED:
rc = OMPI_ERR_UNREACH;
break;
case MCA_OOB_TCP_CONNECTED:
/*
* start the message and queue if not completed
*/
if (NULL != peer->peer_send_msg) {
ompi_list_append(&peer->peer_send_queue, (ompi_list_item_t*)msg);
} else {
if(mca_oob_tcp_msg_send_handler(msg, peer->peer_sd)) {
OMPI_THREAD_UNLOCK(&peer->peer_lock);
mca_oob_tcp_msg_complete(msg);
return rc;
} else {
peer->peer_send_msg = msg;
ompi_event_add(&peer->peer_send_event, 0);
}
}
break;
}
OMPI_THREAD_UNLOCK(&peer->peer_lock);
return rc;
}
/*
* Lookup a peer by name, create one if it doesn't exist.
* @param name Peers globally unique identifier.
* @retval Pointer to the newly created struture or NULL on error.
*/
mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name)
{
int rc;
mca_oob_tcp_peer_t * peer;
OMPI_THREAD_LOCK(&mca_oob_tcp_module.tcp_lock);
peer = (mca_oob_tcp_peer_t*)ompi_rb_tree_find(&mca_oob_tcp_module.tcp_peer_tree, name);
if(NULL != peer) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock);
return peer;
}
MCA_OOB_TCP_PEER_ALLOC(peer, rc);
if(NULL == peer) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock);
return NULL;
}
peer->peer_name = *name;
if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_module.tcp_peer_tree, name, peer)) {
MCA_OOB_TCP_PEER_RETURN(peer);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock);
return NULL;
}
ompi_list_prepend(&mca_oob_tcp_module.tcp_peer_list, (ompi_list_item_t *) peer);
if(ompi_list_get_size(&mca_oob_tcp_module.tcp_peer_list) >
mca_oob_tcp_module.tcp_cache_size) {
/* do something - remove LRU items from peer list (that aren't in use) */
}
OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock);
return peer;
}
/*
* Start a connection to the peer. This will likely not complete,
* as the socket is set to non-blocking, so register for event
* notification of connect completion. On connection we send
* our globally unique process identifier to the peer and wait for
* the peers response.
*/
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
{
int rc,flags;
struct sockaddr_in peer_addr;
peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0);
if (peer->peer_sd < 0) {
peer->peer_retries++;
return OMPI_ERR_UNREACH;
}
/* setup event callbacks */
mca_oob_tcp_peer_event_init(peer, peer->peer_sd);
/* setup the socket as non-blocking */
if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) {
ompi_output(0, "mca_oob_tcp_peer_connect: fcntl(F_GETFL) failed with errno=%d\n", errno);
} else {
flags |= O_NONBLOCK;
if(fcntl(peer->peer_sd, F_SETFL, flags) < 0)
ompi_output(0, "mca_oob_tcp_peer_connect: fcntl(F_SETFL) failed with errno=%d\n", errno);
}
/* start the connect - will likely fail with EINPROGRESS */
peer_addr = peer->peer_addr;
if(connect(peer->peer_sd, (struct sockaddr*)&peer_addr, sizeof(peer_addr)) < 0) {
/* non-blocking so wait for completion */
if(errno == EINPROGRESS) {
peer->peer_state = MCA_OOB_TCP_CONNECTING;
ompi_event_add(&peer->peer_send_event, 0);
return OMPI_SUCCESS;
}
mca_oob_tcp_peer_close(peer);
peer->peer_retries++;
return OMPI_ERR_UNREACH;
}
/* send our globally unique process identifier to the peer */
if((rc = mca_oob_tcp_peer_send_connect_ack(peer)) == OMPI_SUCCESS) {
peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;
ompi_event_add(&peer->peer_recv_event, 0);
} else {
mca_oob_tcp_peer_close(peer);
}
return rc;
}
/*
* Check the status of the connection. If the connection failed, will retry
* later. Otherwise, send this processes identifier to the peer on the
* newly connected socket.
*/
static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
{
int so_error = 0;
ompi_socklen_t so_length = sizeof(so_error);
/* unregister from receiving event notifications */
ompi_event_del(&peer->peer_send_event);
/* check connect completion status */
if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_ERROR, &so_error, &so_length) < 0) {
ompi_output(0, "mca_ptl_tcp_peer_complete_connect: getsockopt() failed with errno=%d\n", errno);
mca_oob_tcp_peer_close(peer);
return;
}
if(so_error == EINPROGRESS) {
ompi_event_add(&peer->peer_send_event, 0);
return;
}
if(so_error != 0) {
ompi_output(0, "mca_oob_tcp_peer_complete_connect: connect() failed with errno=%d\n", so_error);
mca_oob_tcp_peer_close(peer);
return;
}
if(mca_oob_tcp_peer_send_connect_ack(peer) == OMPI_SUCCESS) {
peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;
ompi_event_add(&peer->peer_recv_event, 0);
} else {
mca_oob_tcp_peer_close(peer);
}
}
/*
* Setup peer state to reflect that connection has been established,
* and start any pending sends.
*/
static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer)
{
peer->peer_state = MCA_OOB_TCP_CONNECTED;
peer->peer_retries = 0;
if(ompi_list_get_size(&peer->peer_send_queue) > 0) {
if(NULL == peer->peer_send_msg)
peer->peer_send_msg = (mca_oob_tcp_msg_t*)
ompi_list_remove_first(&peer->peer_send_queue);
ompi_event_add(&peer->peer_send_event, 0);
}
}
/*
* Remove any event registrations associated with the socket
* and update the peer state to reflect the connection has
* been closed.
*/
static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
{
if(peer->peer_sd >= 0) {
ompi_event_del(&peer->peer_recv_event);
ompi_event_del(&peer->peer_send_event);
close(peer->peer_sd);
peer->peer_sd = -1;
}
peer->peer_state = MCA_OOB_TCP_CLOSED;
peer->peer_retries++;
}
/*
* Send the globally unique identifier for this process to a peer on
* a newly connected socket.
*/
static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
{
/* send process identifier to remote peer */
if(mca_oob_tcp_peer_send_blocking( peer, &mca_oob_base_self, sizeof(mca_oob_base_self)) !=
sizeof(mca_oob_base_self)) {
return OMPI_ERR_UNREACH;
}
return OMPI_SUCCESS;
}
/*
* Receive the peers globally unique process identification from a newly
* connected socket and verify the expected response. If so, move the
* socket to a connected state.
*/
static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
{
ompi_process_name_t guid;
if((mca_oob_tcp_peer_recv_blocking(peer, &guid, sizeof(ompi_process_name_t))) != sizeof(ompi_process_name_t)) {
return OMPI_ERR_UNREACH;
}
/* compare this to the expected values */
if(memcmp(&peer->peer_name, &guid, sizeof(ompi_process_name_t)) != 0) {
ompi_output(0, "mca_oob_tcp_peer_connect: received unexpected process identifier");
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
}
/* connected */
mca_oob_tcp_peer_connected(peer);
#if OMPI_ENABLE_DEBUG
mca_oob_tcp_peer_dump(peer, "connected");
#endif
return OMPI_SUCCESS;
}
/*
* A blocking recv on a non-blocking socket. Used to receive the small amount of connection
* information that identifies the peers endpoint.
*/
static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = recv(peer->peer_sd, ptr+cnt, size-cnt, 0);
/* remote closed connection */
if(retval == 0) {
mca_oob_tcp_peer_close(peer);
return -1;
}
/* socket is non-blocking so handle errors */
if(retval < 0) {
if(errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
ompi_output(0, "mca_oob_tcp_peer_recv_blocking: recv() failed with errno=%d\n",errno);
mca_oob_tcp_peer_close(peer);
return -1;
}
continue;
}
cnt += retval;
}
if((int)cnt == -1)
ompi_output(0, "mca_oob_tcp_peer_recv_blocking: invalid cnt\n");
return cnt;
}
/*
* A blocking send on a non-blocking socket. Used to send the small amount of connection
* information that identifies the peers endpoint.
*/
static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = send(peer->peer_sd, ptr+cnt, size-cnt, 0);
if(retval < 0) {
if(errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
ompi_output(0, "mca_oob_tcp_peer_send_blocking: send() failed with errno=%d\n",errno);
mca_oob_tcp_peer_close(peer);
return -1;
}
continue;
}
cnt += retval;
}
return cnt;
}
static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user)
{
mca_oob_tcp_peer_t* peer = user;
OMPI_THREAD_LOCK(&peer->peer_lock);
switch(peer->peer_state) {
case MCA_OOB_TCP_CONNECT_ACK:
{
mca_oob_tcp_peer_recv_connect_ack(peer);
break;
}
case MCA_OOB_TCP_CONNECTED:
{
mca_oob_tcp_msg_t* msg = peer->peer_recv_msg;
if(NULL == msg) {
int rc;
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
OMPI_THREAD_UNLOCK(&peer->peer_lock);
return;
}
mca_oob_tcp_msg_init(msg, peer);
}
/* check for completion of non-blocking recv on the current fragment */
if(mca_oob_tcp_msg_recv_handler(msg, sd) == false)
peer->peer_recv_msg = msg;
else
peer->peer_recv_msg = 0;
break;
}
default:
{
ompi_output(0, "mca_oob_tcp_peer_recv_handler: invalid socket state(%d)", peer->peer_state);
mca_oob_tcp_peer_close(peer);
break;
}
}
OMPI_THREAD_UNLOCK(&peer->peer_lock);
}
/*
* A file descriptor is available/ready for send. Check the state
* of the socket and take the appropriate action.
*/
static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user)
{
mca_oob_tcp_peer_t* peer = user;
OMPI_THREAD_LOCK(&peer->peer_lock);
switch(peer->peer_state) {
case MCA_OOB_TCP_CONNECTING:
mca_oob_tcp_peer_complete_connect(peer);
break;
case MCA_OOB_TCP_CONNECTED:
{
/* complete the current send */
do {
mca_oob_tcp_msg_t* msg = peer->peer_send_msg;
if(mca_oob_tcp_msg_send_handler(msg, sd) == false) {
break;
}
/* if required - update request status and release fragment */
OMPI_THREAD_UNLOCK(&peer->peer_lock);
mca_oob_tcp_msg_complete(msg);
OMPI_THREAD_LOCK(&peer->peer_lock);
/* progress any pending sends */
peer->peer_send_msg = (mca_oob_tcp_msg_t*)
ompi_list_remove_first(&peer->peer_send_queue);
} while (NULL != peer->peer_send_msg);
/* if nothing else to do unregister for send event notifications */
if(NULL == peer->peer_send_msg) {
ompi_event_del(&peer->peer_send_event);
}
break;
}
default:
ompi_output(0, "mca_oob_tcp_peer_send_handler: invalid connection state (%d)",
peer->peer_state);
ompi_event_del(&peer->peer_send_event);
break;
}
OMPI_THREAD_UNLOCK(&peer->peer_lock);
}
/*
* Routine for debugging to print the connection state and socket options
*/
static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg)
{
char src[64];
char dst[64];
char buff[255];
int sndbuf,rcvbuf,nodelay,flags;
struct sockaddr_in inaddr;
ompi_socklen_t optlen;
ompi_socklen_t addrlen = sizeof(struct sockaddr_in);
getsockname(peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen);
sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
getpeername(peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen);
sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) {
ompi_output(0, "mca_oob_tcp_peer_dump: fcntl(F_GETFL) failed with errno=%d\n", errno);
}
#if defined(SO_SNDBUF)
optlen = sizeof(sndbuf);
if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &optlen) < 0) {
ompi_output(0, "mca_oob_tcp_peer_dump: SO_SNDBUF option: errno %d\n", errno);
}
#else
sndbuf = -1;
#endif
#if defined(SO_RCVBUF)
optlen = sizeof(rcvbuf);
if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &optlen) < 0) {
ompi_output(0, "mca_oob_tcp_peer_dump: SO_RCVBUF option: errno %d\n", errno);
}
#else
rcvbuf = -1;
#endif
#if defined(TCP_NODELAY)
optlen = sizeof(nodelay);
if(getsockopt(peer->peer_sd, IPPROTO_TCP, TCP_NODELAY, &nodelay, &optlen) < 0) {
ompi_output(0, "mca_oob_tcp_peer_dump: TCP_NODELAY option: errno %d\n", errno);
}
#else
nodelay = 0;
#endif
sprintf(buff, "%s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
msg, src, dst, nodelay, sndbuf, rcvbuf, flags);
ompi_output(0, buff);
}

Просмотреть файл

@ -15,7 +15,8 @@
#include <netinet/in.h>
#include "threads/mutex.h"
#include <string.h>
#include "mca/oob/tcp/oob_tcp.h"
#include "oob_tcp.h"
#include "oob_tcp_msg.h"
/**
* the state of the connection
@ -29,16 +30,6 @@ typedef enum {
} mca_oob_tcp_state_t;
/**
* tcp interface
*/
struct mca_oob_tcp_addr_t {
int ifindex; /**< oob interface index */
struct sockaddr_in ifaddr; /**< oob interface address */
struct sockaddr_in ifmask; /**< oob interface netmask */
};
typedef struct mca_oob_tcp_addr_t mca_oob_tcp_addr_t;
/**
* This structire describes a peer
*/
@ -46,68 +37,59 @@ struct mca_oob_tcp_peer_t {
ompi_list_item_t super; /**< allow this to be on a list */
ompi_process_name_t peer_name; /**< the name of the peer */
mca_oob_tcp_state_t peer_state; /**< the state of the connection */
int peer_retries; /**< number of times connection attempt has failed */
struct sockaddr_in peer_addr; /**< the address of the peer process */
int peer_sd; /**< socket descriptor of the connection */
mca_oob_tcp_addr_t peer_addr; /**< the address of the peer process */
ompi_mutex_t peer_lock; /**< make sure only one thread accesses it at a time */
ompi_list_t peer_send; /**< list of items to send */
ompi_list_t peer_recv; /**< list of items to recieve */
ompi_event_t peer_send_event; /**< registration with event thread for send events */
ompi_event_t peer_recv_event; /**< registration with event thread for recv events */
ompi_mutex_t peer_lock; /**< make sure only one thread accesses critical data structures */
ompi_list_t peer_send_queue; /**< list of messages to send */
ompi_list_t peer_recv_queue; /**< list of pending receives */
mca_oob_tcp_msg_t *peer_send_msg; /**< current send in progress */
mca_oob_tcp_msg_t *peer_recv_msg; /**< current recv in progress */
};
typedef struct mca_oob_tcp_peer_t mca_oob_tcp_peer_t;
#define MCA_OOB_TCP_PEER_ALLOC(peer, rc) \
{ \
ompi_list_item_t* item; \
OMPI_FREE_LIST_GET(&mca_oob_tcp_module.tcp_peer_free, item, rc); \
peer = (mca_oob_tcp_peer_t*)item; \
}
#define MCA_OOB_TCP_PEER_RETURN(peer) \
{ \
OMPI_FREE_LIST_RETURN(&mca_oob_tcp_module.tcp_peer_free, (ompi_list_item_t*)peer); \
}
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* This is the constructor function for the mca_oob_tcp_peer
* struct. Note that this function and OBJ_NEW should NEVER
* be called directly. Instead, use mca_oob_tcp_add_peer
*
* @param peer a pointer to the mca_oob_tcp_peer_t struct to be initialized
* @retval none
*/
void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer);
/**
* This is the destructor function for the mca_oob_tcp_peer
* struct. Note that this function and OBJ_RELEASE should NEVER
* be called directly. Instead, use mca_oob_tcp_del_peer
*
* @param peer a pointer to the mca_oob_tcp_peer_t struct to be destroyed
* @retval none
*/
void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t * peer);
/**
* The function to compare 2 peers. Used for the rb tree
*
* @param peer1 the first peer
* @param peer2 the second peer
*
* @retval <0 if peer1 < peer2
* @retval >0 if peer1 > peer2
* @retval 0 if peer1 == peer2
*/
int mca_oob_tcp_peer_comp(void * key1, void * key2);
/**
* Creates a peer structure and adds to the tree and list.
* Lookup a peer in the cache - if it doesn't exists
* create one and cache it.
*
* @param peer_name the name of the peer
*
* @retval pointer to the newly created struture
* @retval NULL if there was a problem
*/
mca_oob_tcp_peer_t * mca_oob_tcp_add_peer(ompi_process_name_t peer_name);
mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(const ompi_process_name_t* peer_name);
/**
* Deletes a peer structure from the tree and lists and frees its memory
* Start sending a message to the specified peer. The routine
* can return before the send completes.
*
* @param peer_name the name of the peer
*
* @retval OMPI_SUCCESS
* @param peer The peer process.
* @param msg The message to send.
* @retval OMPI_SUCCESS or error code on failure.
*/
int mca_oob_tcp_del_peer(ompi_process_name_t peer_name);
int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg);
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -1,4 +1,6 @@
#include "mca/oob/tcp/oob_tcp.h"
#include "oob_tcp.h"
#include "oob_tcp_msg.h"
#include "oob_tcp_peer.h"
/*
* Similiar to unix send(2).
@ -10,9 +12,37 @@
* @return OMPI error code (<0) on error number of bytes actually sent.
*/
int mca_oob_tcp_send(const ompi_process_name_t* peer, const struct iovec *msg, int count, int flags)
int mca_oob_tcp_send(const ompi_process_name_t* name, const struct iovec *iov, int count, int flags)
{
return OMPI_ERR_NOT_IMPLEMENTED;
mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name);
mca_oob_tcp_msg_t* msg;
int rc, sent;
if(NULL == peer)
return OMPI_ERR_UNREACH;
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg)
return rc;
msg->msg_user = iov;
msg->msg_iov = (struct iovec*)malloc(sizeof(struct iovec)*count);
msg->msg_rwptr = msg->msg_iov;
msg->msg_count = msg->msg_rwcnt = count;
memcpy(msg->msg_iov, msg->msg_user, sizeof(struct iovec)*count);
msg->msg_cbfunc = NULL;
msg->msg_cbdata = NULL;
msg->msg_complete = false;
rc = mca_oob_tcp_peer_send(peer, msg);
if(rc != OMPI_SUCCESS) {
OBJ_RELEASE(msg);
return rc;
}
rc = mca_oob_tcp_msg_wait(msg, &sent);
if(rc != OMPI_SUCCESS)
return rc;
return sent;
}
/*
@ -28,9 +58,38 @@ int mca_oob_tcp_send(const ompi_process_name_t* peer, const struct iovec *msg, i
*
*/
int mca_oob_tcp_send_nb(const ompi_process_name_t* peer, const struct iovec* msg, int count,
int flags, mca_oob_callback_fn_t cbfunc, void* cbdata)
int mca_oob_tcp_send_nb(
const ompi_process_name_t* name,
const struct iovec* iov,
int count,
int flags,
mca_oob_callback_fn_t cbfunc,
void* cbdata)
{
return OMPI_ERR_NOT_IMPLEMENTED;
mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name);
mca_oob_tcp_msg_t* msg;
int rc;
if(NULL == peer)
return OMPI_ERR_UNREACH;
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg)
return rc;
msg->msg_user = iov;
msg->msg_iov = (struct iovec*)malloc(sizeof(struct iovec)*count);
msg->msg_rwptr = msg->msg_iov;
msg->msg_count = msg->msg_rwcnt = count;
memcpy(msg->msg_iov, msg->msg_user, sizeof(struct iovec)*count);
msg->msg_cbfunc = cbfunc;
msg->msg_cbdata = cbdata;
msg->msg_complete = false;
rc = mca_oob_tcp_peer_send(peer, msg);
if(rc != OMPI_SUCCESS) {
OBJ_RELEASE(msg);
return rc;
}
return OMPI_SUCCESS;
}