1
1

Lots of changes to the tcp oob:

- almost finished the send side of the oob
- added the lists that will hold the posted recieves and the
  actual recieves
- made it so we send an addition iovec first that contains the
  size of the total message so we can recieve into a temporary buffer
  before the user has posted a recieve

This commit was SVN r1736.
Этот коммит содержится в:
Tim Prins 2004-07-15 13:51:40 +00:00
родитель 452bd7ddca
Коммит 8b836b72ff
8 изменённых файлов: 206 добавлений и 102 удалений

Просмотреть файл

@ -47,6 +47,8 @@ int mca_oob_tcp_open(void)
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_peer_free, ompi_free_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_peer_free, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_lock, ompi_mutex_t); OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_condition, ompi_condition_t); OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_condition, ompi_condition_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_post_recv, ompi_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_msg_recv, ompi_list_t);
#endif #endif
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -64,6 +66,8 @@ int mca_oob_tcp_close(void)
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_free); OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_free);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_condition); OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_condition);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_lock); OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_lock);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_post_recv);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_msg_recv);
#endif #endif
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -114,8 +118,8 @@ struct mca_oob_1_0_0_t* mca_oob_tcp_init(bool *allow_multi_user_threads,
int mca_oob_tcp_finalize(void) int mca_oob_tcp_finalize(void)
{ {
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_list); /* probably want to try to finish all sends and recieves here
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_tree); * before we return */
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -100,7 +100,7 @@ int mca_oob_tcp_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int
* OOB TCP Component * OOB TCP Component
*/ */
struct mca_oob_tcp_component_t { struct mca_oob_tcp_component_t {
mca_oob_base_component_1_0_0_t super; /**< base PTL component */ mca_oob_base_component_1_0_0_t super; /**< base OOB component */
int tcp_listen_sd; /**< listen socket for incoming connection requests */ int tcp_listen_sd; /**< listen socket for incoming connection requests */
unsigned short tcp_listen_port; /**< listen port */ unsigned short tcp_listen_port; /**< listen port */
ompi_list_t tcp_peer_list; /**< list of peers sorted in mru order */ ompi_list_t tcp_peer_list; /**< list of peers sorted in mru order */
@ -112,6 +112,8 @@ struct mca_oob_tcp_component_t {
ompi_mutex_t tcp_lock; /**< lock for accessing module state */ ompi_mutex_t tcp_lock; /**< lock for accessing module state */
ompi_condition_t tcp_condition; /**< condition variable for blocking sends */ ompi_condition_t tcp_condition; /**< condition variable for blocking sends */
size_t tcp_cache_size; /**< max size of tcp peer cache */ size_t tcp_cache_size; /**< max size of tcp peer cache */
ompi_list_t tcp_post_recv; /**< list of the recvs the user has posted */
ompi_list_t tcp_msg_recv; /**< list of recieved messages */
}; };
typedef struct mca_oob_tcp_component_t mca_oob_tcp_component_t; typedef struct mca_oob_tcp_component_t mca_oob_tcp_component_t;

Просмотреть файл

@ -15,13 +15,15 @@ OBJ_CLASS_INSTANCE(
static void mca_oob_tcp_msg_construct(mca_oob_tcp_msg_t* msg) static void mca_oob_tcp_msg_construct(mca_oob_tcp_msg_t* msg)
{ {
OBJ_CONSTRUCT(&msg->msg_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&msg->msg_condition, ompi_condition_t);
} }
static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t* msg) static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t* msg)
{ {
OBJ_DESTRUCT(&msg->msg_lock);
OBJ_DESTRUCT(&msg->msg_condition);
} }
@ -34,17 +36,120 @@ static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t* msg)
int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size) int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size)
{ {
return OMPI_SUCCESS; int rc = OMPI_SUCCESS;
ompi_mutex_lock(&msg->msg_lock);
while(msg->msg_complete == false)
ompi_condition_wait(&msg->msg_condition, &msg->msg_lock);
ompi_mutex_unlock(&msg->msg_lock);
*size = msg->msg_state;
MCA_OOB_TCP_MSG_RETURN(msg);
return rc;
} }
/** /**
* Signal that a message has completed. * Signal that a message has completed.
* @param msg (IN) Message to wait on. * @param msg (IN) Message to wait on.
* @param peer (IN) the peer of the message
* @retval OMPI_SUCCESS or error code on failure. * @retval OMPI_SUCCESS or error code on failure.
*/ */
int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer)
int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg)
{ {
ompi_mutex_lock(&msg->msg_lock);
msg->msg_complete = true;
if(NULL != msg->msg_cbfunc) {
msg->msg_cbfunc(msg->msg_state, &peer->peer_name, msg->msg_iov, msg->msg_count, msg->msg_cbdata);
ompi_mutex_unlock(&msg->msg_lock);
MCA_OOB_TCP_MSG_RETURN(msg);
} else {
ompi_condition_broadcast(&msg->msg_condition);
ompi_mutex_unlock(&msg->msg_lock);
}
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/**
* The function that actually sends the data!
* @param msg a pointer to the message to send
* @param peer the peer we are sending to
* @retval true if the entire message has been sent
* @retval false if the entire message has not been sent
*/
bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t * peer)
{
int rc;
while(1) {
rc = writev(peer->peer_sd, msg->msg_rwptr, msg->msg_rwcnt);
if(rc <= 0) {
if(errno == EINTR)
continue;
else if (errno == EAGAIN)
return false;
else {
close(peer->peer_sd);
peer->peer_state = MCA_OOB_TCP_CLOSED;
return false;
}
}
msg->msg_state += rc;
do {/* while there is still more iovects to write */
if(rc < msg->msg_rwptr->iov_len) {
msg->msg_rwptr->iov_len -= rc;
msg->msg_rwptr->iov_base = (void *) ((char *) msg->msg_rwptr->iov_base + rc);
break;
} else {
rc -= msg->msg_rwptr->iov_len;
(msg->msg_rwcnt)--;
(msg->msg_rwptr)++;
if(0 == msg->msg_rwcnt) {
ompi_list_remove_item(&peer->peer_send_queue, (ompi_list_item_t *) msg);
mca_oob_tcp_msg_complete(msg, peer);
return true;
}
}
} while(msg->msg_rwcnt);
}
}
/**
* Actually recieves the data
*
* @param msg the message to be recieved into
* @param peer the peer to recieve from
* @retval true if the whole message was recieved
* @retval false if the whole message was not recieved
*/
bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t * peer)
{
int rc;
while(1) {
rc = readv(peer->peer_sd, msg->msg_rwptr, msg->msg_rwcnt);
if(rc <= 0) {
if(errno == EINTR)
continue;
else if (errno == EAGAIN)
return false;
else {
close(peer->peer_sd);
peer->peer_state = MCA_OOB_TCP_CLOSED;
return false;
}
}
msg->msg_state += rc;
do {
if(rc < msg->msg_rwptr->iov_len) {
msg->msg_rwptr->iov_len -= rc;
msg->msg_rwptr->iov_base = (void *) ((char *) msg->msg_rwptr->iov_base + rc);
break;
} else {
rc -= msg->msg_rwptr->iov_len;
(msg->msg_rwcnt)--;
(msg->msg_rwptr)++;
if(0 == msg->msg_rwcnt) {
mca_oob_tcp_msg_complete(msg, peer);
return true;
}
}
} while(msg->msg_rwcnt);
}
}

Просмотреть файл

@ -6,23 +6,21 @@
* contains the data structure we will use to describe a message * contains the data structure we will use to describe a message
*/ */
#ifndef _MCA_OOB_TCP_MESSAGE_H_ #ifndef _MCA_OOB_TCP_MESSAGE_H_
#define _MCA_OOB_TCP_MESSAGE_H_ #define _MCA_OOB_TCP_MESSAGE_H_
#include "class/ompi_list.h" #include "class/ompi_list.h"
#include "mca/oob/tcp/oob_tcp_peer.h" #include "mca/oob/tcp/oob_tcp_peer.h"
#include "mca/oob/oob.h" #include "mca/oob/oob.h"
#include <errno.h>
struct mca_oob_tcp_peer_t;
/** /**
* describes each message being progressed. * describes each message being progressed.
*/ */
struct mca_oob_tcp_msg_t { struct mca_oob_tcp_msg_t {
ompi_list_item_t super; /**< make it so we can put this on a list */ ompi_list_item_t super; /**< make it so we can put this on a list */
size_t msg_state; /**< the amount sent or recieved */ int msg_state; /**< the amount sent or recieved or errno */
uint32_t msg_size; /**< the total size of the message */
const struct iovec * msg_user; /**< the data of the message */ const struct iovec * msg_user; /**< the data of the message */
struct iovec * msg_iov; /**< copy of iovec array - not data */ struct iovec * msg_iov; /**< copy of iovec array - not data */
struct iovec * msg_rwptr; /**< current read/write pointer into msg_iov */ struct iovec * msg_rwptr; /**< current read/write pointer into msg_iov */
@ -30,8 +28,14 @@ struct mca_oob_tcp_msg_t {
int msg_count; /**< the number of items in the iovec array */ int msg_count; /**< the number of items in the iovec array */
mca_oob_callback_fn_t msg_cbfunc; /**< the callback function for the send/recieve */ mca_oob_callback_fn_t msg_cbfunc; /**< the callback function for the send/recieve */
void *msg_cbdata; /**< the data for the callback fnuction */ void *msg_cbdata; /**< the data for the callback fnuction */
bool msg_complete; bool msg_complete; /**< whether the message is done sending or not */
struct mca_oob_tcp_peer_t *msg_peer; /**< the peer it belongs to */
ompi_mutex_t msg_lock; /**< lock for the condition variable */
ompi_condition_t msg_condition; /**< the message condition */
}; };
/**
* Convenience typedef
*/
typedef struct mca_oob_tcp_msg_t mca_oob_tcp_msg_t; typedef struct mca_oob_tcp_msg_t mca_oob_tcp_msg_t;
OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t); OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t);
@ -51,6 +55,8 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t);
*/ */
#define MCA_OOB_TCP_MSG_RETURN(msg) \ #define MCA_OOB_TCP_MSG_RETURN(msg) \
{ \ { \
/* frees the iovec allocated during the send/recieve */ \
free(msg->msg_iov); \
OMPI_FREE_LIST_RETURN(&mca_oob_tcp_module.tcp_msgs, (ompi_list_item_t*)msg); \ OMPI_FREE_LIST_RETURN(&mca_oob_tcp_module.tcp_msgs, (ompi_list_item_t*)msg); \
} }
@ -68,7 +74,7 @@ int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size);
* @param msg (IN) Message send/recv that has completed. * @param msg (IN) Message send/recv that has completed.
* @retval OMPI_SUCCESS or error code on failure. * @retval OMPI_SUCCESS or error code on failure.
*/ */
int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg); int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer);
/** /**
* Called asynchronously to progress sending a message from the event library thread. * Called asynchronously to progress sending a message from the event library thread.
@ -76,7 +82,7 @@ int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg);
* @param sd (IN) Socket descriptor to use for send. * @param sd (IN) Socket descriptor to use for send.
* @retval bool Bool flag indicating wether operation has completed. * @retval bool Bool flag indicating wether operation has completed.
*/ */
bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, int sd); bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer);
/** /**
* Called asynchronously to progress sending a message from the event library thread. * Called asynchronously to progress sending a message from the event library thread.
@ -85,15 +91,7 @@ bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, int sd);
* @retval bool Bool flag indicating wether operation has completed. * @retval bool Bool flag indicating wether operation has completed.
*/ */
bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, int sd); bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer);
/**
* Initialize a message for send/recv.
* @param msg (IN) Message send that is in progress.
* @param peer (IN) Peer to send/recv message to/from.
*/
void mca_oob_tcp_msg_init(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t*);
#endif /* _MCA_OOB_TCP_MESSAGE_H_ */ #endif /* _MCA_OOB_TCP_MESSAGE_H_ */

Просмотреть файл

@ -1,5 +1,6 @@
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/uio.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/errno.h> #include <sys/errno.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
@ -10,7 +11,7 @@
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer); static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer);
static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer, int sd); static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer); static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer); static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer); static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer);
@ -42,7 +43,6 @@ OBJ_CLASS_INSTANCE(
static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer) static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer)
{ {
OBJ_CONSTRUCT(&(peer->peer_send_queue), ompi_list_t); OBJ_CONSTRUCT(&(peer->peer_send_queue), ompi_list_t);
OBJ_CONSTRUCT(&(peer->peer_recv_queue), ompi_list_t);
OBJ_CONSTRUCT(&(peer->peer_lock), ompi_mutex_t); OBJ_CONSTRUCT(&(peer->peer_lock), ompi_mutex_t);
} }
@ -56,16 +56,15 @@ static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer)
*/ */
static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t * peer) static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t * peer)
{ {
mca_oob_tcp_peer_close(peer);
OBJ_DESTRUCT(&(peer->peer_send_queue)); OBJ_DESTRUCT(&(peer->peer_send_queue));
OBJ_DESTRUCT(&(peer->peer_recv_queue));
OBJ_DESTRUCT(&(peer->peer_lock)); OBJ_DESTRUCT(&(peer->peer_lock));
} }
/* /*
* Initialize events to be used by the peer instance for TCP select/poll callbacks. * Initialize events to be used by the peer instance for TCP select/poll callbacks.
*/ */
static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer, int sd)
{ {
ompi_event_set( ompi_event_set(
&peer->peer_recv_event, &peer->peer_recv_event,
@ -86,7 +85,6 @@ static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer, int sd)
* *
* *
*/ */
int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg) int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
{ {
int rc = OMPI_SUCCESS; int rc = OMPI_SUCCESS;
@ -112,11 +110,8 @@ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
if (NULL != peer->peer_send_msg) { if (NULL != peer->peer_send_msg) {
ompi_list_append(&peer->peer_send_queue, (ompi_list_item_t*)msg); ompi_list_append(&peer->peer_send_queue, (ompi_list_item_t*)msg);
} else { } else {
if(mca_oob_tcp_msg_send_handler(msg, peer->peer_sd)) { /*if the send does not complete */
OMPI_THREAD_UNLOCK(&peer->peer_lock); if(!mca_oob_tcp_msg_send_handler(msg, peer)) {
mca_oob_tcp_msg_complete(msg);
return rc;
} else {
peer->peer_send_msg = msg; peer->peer_send_msg = msg;
ompi_event_add(&peer->peer_send_event, 0); ompi_event_add(&peer->peer_send_event, 0);
} }
@ -127,7 +122,6 @@ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
return rc; return rc;
} }
/* /*
* Lookup a peer by name, create one if it doesn't exist. * Lookup a peer by name, create one if it doesn't exist.
* @param name Peers globally unique identifier. * @param name Peers globally unique identifier.
@ -153,6 +147,11 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name)
} }
peer->peer_name = *name; peer->peer_name = *name;
/******
* need to add the peer's address to the structure
******/
if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_module.tcp_peer_tree, if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_module.tcp_peer_tree,
(ompi_process_name_t *) name, peer)) { (ompi_process_name_t *) name, peer)) {
MCA_OOB_TCP_PEER_RETURN(peer); MCA_OOB_TCP_PEER_RETURN(peer);
@ -162,23 +161,25 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name)
ompi_list_prepend(&mca_oob_tcp_module.tcp_peer_list, (ompi_list_item_t *) peer); ompi_list_prepend(&mca_oob_tcp_module.tcp_peer_list, (ompi_list_item_t *) peer);
/* if the peer list is over the maximum size, remove one unsed peer */ /* if the peer list is over the maximum size, remove one unsed peer */
if(ompi_list_get_size(&mca_oob_tcp_module.tcp_peer_list) > if(ompi_list_get_size(&mca_oob_tcp_module.tcp_peer_list) >
mca_oob_tcp_module.tcp_cache_size) { mca_oob_tcp_module.tcp_cache_size) {
old = (mca_oob_tcp_peer_t *) old = (mca_oob_tcp_peer_t *)
ompi_list_get_last(&mca_oob_tcp_module.tcp_peer_list); ompi_list_get_last(&mca_oob_tcp_module.tcp_peer_list);
while(1) { while(1) {
if(0 == ompi_list_get_size(&(old->peer_send_queue)) && if(0 == ompi_list_get_size(&(old->peer_send_queue)) &&
0 == ompi_list_get_size(&(old->peer_recv_queue))) { NULL == peer->peer_recv_msg) {
ompi_list_remove_item(&mca_oob_tcp_module.tcp_peer_list, ompi_list_remove_item(&mca_oob_tcp_module.tcp_peer_list,
(ompi_list_item_t *) old); (ompi_list_item_t *) old);
MCA_OOB_TCP_PEER_RETURN(old); MCA_OOB_TCP_PEER_RETURN(old);
break; break;
} else { } else {
old = (mca_oob_tcp_peer_t *) ompi_list_get_prev(old); old = (mca_oob_tcp_peer_t *) ompi_list_get_prev(old);
if(NULL == old) { if(NULL == old) {
break; /* we tried, but we couldn't find one that was valid to get rid
} * of. Oh well. */
} break;
} }
}
}
} }
OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock); OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock);
return peer; return peer;
@ -195,7 +196,6 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name)
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer) static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
{ {
int rc,flags; int rc,flags;
struct sockaddr_in peer_addr;
peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0); peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0);
if (peer->peer_sd < 0) { if (peer->peer_sd < 0) {
@ -204,7 +204,7 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
} }
/* setup event callbacks */ /* setup event callbacks */
mca_oob_tcp_peer_event_init(peer, peer->peer_sd); mca_oob_tcp_peer_event_init(peer);
/* setup the socket as non-blocking */ /* setup the socket as non-blocking */
if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) { if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) {
@ -216,8 +216,7 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
} }
/* start the connect - will likely fail with EINPROGRESS */ /* start the connect - will likely fail with EINPROGRESS */
peer_addr = peer->peer_addr; if(connect(peer->peer_sd, (struct sockaddr*)&(peer->peer_addr), sizeof(peer->peer_addr)) < 0) {
if(connect(peer->peer_sd, (struct sockaddr*)&peer_addr, sizeof(peer_addr)) < 0) {
/* non-blocking so wait for completion */ /* non-blocking so wait for completion */
if(errno == EINPROGRESS) { if(errno == EINPROGRESS) {
peer->peer_state = MCA_OOB_TCP_CONNECTING; peer->peer_state = MCA_OOB_TCP_CONNECTING;
@ -244,7 +243,6 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
* later. Otherwise, send this processes identifier to the peer on the * later. Otherwise, send this processes identifier to the peer on the
* newly connected socket. * newly connected socket.
*/ */
static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer) static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
{ {
int so_error = 0; int so_error = 0;
@ -277,12 +275,10 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
} }
} }
/* /*
* Setup peer state to reflect that connection has been established, * Setup peer state to reflect that connection has been established,
* and start any pending sends. * and start any pending sends.
*/ */
static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer) static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer)
{ {
peer->peer_state = MCA_OOB_TCP_CONNECTED; peer->peer_state = MCA_OOB_TCP_CONNECTED;
@ -300,7 +296,6 @@ static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer)
* and update the peer state to reflect the connection has * and update the peer state to reflect the connection has
* been closed. * been closed.
*/ */
static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer) static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
{ {
if(peer->peer_sd >= 0) { if(peer->peer_sd >= 0) {
@ -431,11 +426,10 @@ static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user)
OMPI_THREAD_UNLOCK(&peer->peer_lock); OMPI_THREAD_UNLOCK(&peer->peer_lock);
return; return;
} }
mca_oob_tcp_msg_init(msg, peer);
} }
/* check for completion of non-blocking recv on the current fragment */ /* check for completion of non-blocking recv on the current fragment */
if(mca_oob_tcp_msg_recv_handler(msg, sd) == false) if(mca_oob_tcp_msg_recv_handler(msg, peer) == false)
peer->peer_recv_msg = msg; peer->peer_recv_msg = msg;
else else
peer->peer_recv_msg = 0; peer->peer_recv_msg = 0;
@ -455,7 +449,6 @@ static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user)
* A file descriptor is available/ready for send. Check the state * A file descriptor is available/ready for send. Check the state
* of the socket and take the appropriate action. * of the socket and take the appropriate action.
*/ */
static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user) static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user)
{ {
mca_oob_tcp_peer_t* peer = user; mca_oob_tcp_peer_t* peer = user;
@ -469,15 +462,10 @@ static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user)
/* complete the current send */ /* complete the current send */
do { do {
mca_oob_tcp_msg_t* msg = peer->peer_send_msg; mca_oob_tcp_msg_t* msg = peer->peer_send_msg;
if(mca_oob_tcp_msg_send_handler(msg, sd) == false) { if(mca_oob_tcp_msg_send_handler(msg, peer) == false) {
break; break;
} }
/* if required - update request status and release fragment */ /* if required - update request status and release fragment */
OMPI_THREAD_UNLOCK(&peer->peer_lock);
mca_oob_tcp_msg_complete(msg);
OMPI_THREAD_LOCK(&peer->peer_lock);
/* progress any pending sends */ /* progress any pending sends */
peer->peer_send_msg = (mca_oob_tcp_msg_t*) peer->peer_send_msg = (mca_oob_tcp_msg_t*)
ompi_list_remove_first(&peer->peer_send_queue); ompi_list_remove_first(&peer->peer_send_queue);
@ -502,7 +490,6 @@ static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user)
/* /*
* Routine for debugging to print the connection state and socket options * Routine for debugging to print the connection state and socket options
*/ */
static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg) static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg)
{ {
char src[64]; char src[64];
@ -552,12 +539,3 @@ static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg)
ompi_output(0, buff); ompi_output(0, buff);
} }
/* JMS Added these so that we can link successfully */
bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, int sd)
{ return true; }
bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, int sd)
{ return true; }
void mca_oob_tcp_msg_init(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t*b)
{ }

Просмотреть файл

@ -44,7 +44,6 @@ struct mca_oob_tcp_peer_t {
ompi_event_t peer_recv_event; /**< registration with event thread for recv events */ ompi_event_t peer_recv_event; /**< registration with event thread for recv events */
ompi_mutex_t peer_lock; /**< make sure only one thread accesses critical data structures */ ompi_mutex_t peer_lock; /**< make sure only one thread accesses critical data structures */
ompi_list_t peer_send_queue; /**< list of messages to send */ ompi_list_t peer_send_queue; /**< list of messages to send */
ompi_list_t peer_recv_queue; /**< list of pending receives */
mca_oob_tcp_msg_t *peer_send_msg; /**< current send in progress */ mca_oob_tcp_msg_t *peer_send_msg; /**< current send in progress */
mca_oob_tcp_msg_t *peer_recv_msg; /**< current recv in progress */ mca_oob_tcp_msg_t *peer_recv_msg; /**< current recv in progress */
}; };
@ -68,7 +67,6 @@ typedef struct mca_oob_tcp_peer_t mca_oob_tcp_peer_t;
OMPI_FREE_LIST_RETURN(&mca_oob_tcp_module.tcp_peer_free, (ompi_list_item_t*)peer); \ OMPI_FREE_LIST_RETURN(&mca_oob_tcp_module.tcp_peer_free, (ompi_list_item_t*)peer); \
} }
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
extern "C" { extern "C" {
#endif #endif
@ -92,10 +90,8 @@ mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(const ompi_process_name_t* peer_name
* @param msg The message to send. * @param msg The message to send.
* @retval OMPI_SUCCESS or error code on failure. * @retval OMPI_SUCCESS or error code on failure.
*/ */
int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg); int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg);
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }
#endif #endif

Просмотреть файл

@ -11,7 +11,6 @@
* iovec array without removing the message from the queue. * iovec array without removing the message from the queue.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error or number of bytes actually received.
*/ */
int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, int flags) int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, int flags)
{ {
return OMPI_ERR_NOT_IMPLEMENTED; return OMPI_ERR_NOT_IMPLEMENTED;
@ -28,12 +27,9 @@ int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *msg, int cou
* @param cbdata (IN) User data that is passed to callback function. * @param cbdata (IN) User data that is passed to callback function.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error or number of bytes actually received.
*/ */
int mca_oob_tcp_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int count, int flags, int mca_oob_tcp_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int count, int flags,
mca_oob_callback_fn_t cbfunc, void* cbdata) mca_oob_callback_fn_t cbfunc, void* cbdata)
{ {
return OMPI_ERR_NOT_IMPLEMENTED; return OMPI_ERR_NOT_IMPLEMENTED;
} }

Просмотреть файл

@ -3,7 +3,7 @@
#include "oob_tcp_peer.h" #include "oob_tcp_peer.h"
/* /*
* Similiar to unix send(2). * Similiar to unix writev(2).
* *
* @param peer (IN) Opaque name of peer process. * @param peer (IN) Opaque name of peer process.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
@ -24,14 +24,26 @@ int mca_oob_tcp_send(const ompi_process_name_t* name, const struct iovec *iov, i
if(NULL == msg) if(NULL == msg)
return rc; return rc;
/* calculate the size of the message */
msg->msg_size = sizeof(uint32_t);
for(rc = 0; rc < count; rc++) {
msg->msg_size += iov[rc].iov_len;
}
/* turn the size to network byte order so there will be no problems */
msg->msg_size = htonl(msg->msg_size);
msg->msg_user = iov; msg->msg_user = iov;
msg->msg_iov = (struct iovec*)malloc(sizeof(struct iovec)*count); /* create one additional iovect that will hold the size of the message */
msg->msg_iov = (struct iovec*)malloc(sizeof(struct iovec)*(count + 1));
msg->msg_iov[0].iov_base = &msg->msg_size;
msg->msg_iov[0].iov_len = sizeof(uint32_t);
msg->msg_rwptr = msg->msg_iov; msg->msg_rwptr = msg->msg_iov;
msg->msg_count = msg->msg_rwcnt = count; msg->msg_count = msg->msg_rwcnt = count + 1;
memcpy(msg->msg_iov, msg->msg_user, sizeof(struct iovec)*count); memcpy(msg->msg_iov, &(msg->msg_user[1]), sizeof(struct iovec)*count);
msg->msg_cbfunc = NULL; msg->msg_cbfunc = NULL;
msg->msg_cbdata = NULL; msg->msg_cbdata = NULL;
msg->msg_complete = false; msg->msg_complete = false;
msg->msg_peer = peer;
msg->msg_state = 0;
rc = mca_oob_tcp_peer_send(peer, msg); rc = mca_oob_tcp_peer_send(peer, msg);
if(rc != OMPI_SUCCESS) { if(rc != OMPI_SUCCESS) {
@ -76,14 +88,27 @@ int mca_oob_tcp_send_nb(
if(NULL == msg) if(NULL == msg)
return rc; return rc;
/* calculate the size of the message */
msg->msg_size = sizeof(size_t);
for(rc = 0; rc < count; rc++) {
msg->msg_size += iov[rc].iov_len;
}
/* turn the size to network byte order so there will be no problems */
msg->msg_size = htonl(msg->msg_size);
msg->msg_user = iov; msg->msg_user = iov;
msg->msg_iov = (struct iovec*)malloc(sizeof(struct iovec)*count); /* create one additional iovect that will hold the size of the message */
msg->msg_iov = (struct iovec*)malloc(sizeof(struct iovec)*(count + 1));
msg->msg_iov[0].iov_base = &msg->msg_size;
msg->msg_iov[0].iov_len = sizeof(size_t);
msg->msg_rwptr = msg->msg_iov; msg->msg_rwptr = msg->msg_iov;
msg->msg_count = msg->msg_rwcnt = count; msg->msg_count = msg->msg_rwcnt = count + 1;
memcpy(msg->msg_iov, msg->msg_user, sizeof(struct iovec)*count); memcpy(msg->msg_iov, &(msg->msg_user[1]), sizeof(struct iovec)*count);
msg->msg_cbfunc = cbfunc; msg->msg_cbfunc = cbfunc;
msg->msg_cbdata = cbdata; msg->msg_cbdata = cbdata;
msg->msg_complete = false; msg->msg_complete = false;
msg->msg_peer = peer;
msg->msg_state = 0;
rc = mca_oob_tcp_peer_send(peer, msg); rc = mca_oob_tcp_peer_send(peer, msg);
if(rc != OMPI_SUCCESS) { if(rc != OMPI_SUCCESS) {