diff --git a/src/mca/oob/tcp/Makefile.am b/src/mca/oob/tcp/Makefile.am index a92ab28721..8037a9b018 100644 --- a/src/mca/oob/tcp/Makefile.am +++ b/src/mca/oob/tcp/Makefile.am @@ -8,8 +8,11 @@ include $(top_ompi_srcdir)/config/Makefile.options sources = \ oob_tcp.c \ + oob_tcp.h \ oob_tcp_msg.c \ + oob_tcp_msg.h \ oob_tcp_peer.c \ + oob_tcp_peer.h \ oob_tcp_recv.c \ oob_tcp_send.c diff --git a/src/mca/oob/tcp/oob_tcp.h b/src/mca/oob/tcp/oob_tcp.h index 75a0cbea9c..6e7e8ca8f3 100644 --- a/src/mca/oob/tcp/oob_tcp.h +++ b/src/mca/oob/tcp/oob_tcp.h @@ -35,7 +35,7 @@ int mca_oob_tcp_finalize(void); /** - * Similiar to unix send(2). + * Similiar to unix writev(2). * * @param peer (IN) Opaque name of peer process. * @param msg (IN) Array of iovecs describing user buffers and lengths. @@ -47,7 +47,7 @@ int mca_oob_tcp_finalize(void); int mca_oob_tcp_send(const ompi_process_name_t* peer, const struct iovec *msg, int count, int flags); /** - * Similiar to unix recv(2) + * Similiar to unix readv(2) * * @param peer (IN) Opaque name of peer process or OOB_NAME_ANY for wildcard receive. * @param msg (IN) Array of iovecs describing user buffers and lengths. diff --git a/src/mca/oob/tcp/oob_tcp_msg.h b/src/mca/oob/tcp/oob_tcp_msg.h index 1d698ccf20..5731b223a0 100644 --- a/src/mca/oob/tcp/oob_tcp_msg.h +++ b/src/mca/oob/tcp/oob_tcp_msg.h @@ -36,7 +36,9 @@ typedef struct mca_oob_tcp_msg_t mca_oob_tcp_msg_t; OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t); - +/** + * Get a new structure for use with a message + */ #define MCA_OOB_TCP_MSG_ALLOC(msg, rc) \ { \ ompi_list_item_t* item; \ @@ -44,6 +46,9 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t); msg = (mca_oob_tcp_msg_t*)item; \ } +/** + * return a message structure that is no longer needed + */ #define MCA_OOB_TCP_MSG_RETURN(msg) \ { \ OMPI_FREE_LIST_RETURN(&mca_oob_tcp_module.tcp_msgs, (ompi_list_item_t*)msg); \ @@ -55,7 +60,6 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t); * @param size (OUT) Number of bytes delivered. * @retval OMPI_SUCCESS or error code on failure. */ - int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size); /** @@ -64,7 +68,6 @@ int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size); * @param msg (IN) Message send/recv that has completed. * @retval OMPI_SUCCESS or error code on failure. */ - int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg); /** @@ -73,10 +76,8 @@ int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg); * @param sd (IN) Socket descriptor to use for send. * @retval bool Bool flag indicating wether operation has completed. */ - bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, int sd); - /** * Called asynchronously to progress sending a message from the event library thread. * @param msg (IN) Message send that is in progress. diff --git a/src/mca/oob/tcp/oob_tcp_peer.c b/src/mca/oob/tcp/oob_tcp_peer.c index 2141746115..ff15a5fc9d 100644 --- a/src/mca/oob/tcp/oob_tcp_peer.c +++ b/src/mca/oob/tcp/oob_tcp_peer.c @@ -1,5 +1,4 @@ #include -#include #include #include #include @@ -137,10 +136,11 @@ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg) mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name) { int rc; - mca_oob_tcp_peer_t * peer; + mca_oob_tcp_peer_t * peer, * old; OMPI_THREAD_LOCK(&mca_oob_tcp_module.tcp_lock); - peer = (mca_oob_tcp_peer_t*)ompi_rb_tree_find(&mca_oob_tcp_module.tcp_peer_tree, name); + peer = (mca_oob_tcp_peer_t*)ompi_rb_tree_find(&mca_oob_tcp_module.tcp_peer_tree, + (ompi_process_name_t *) name); if(NULL != peer) { OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock); return peer; @@ -153,15 +153,32 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name) } peer->peer_name = *name; - if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_module.tcp_peer_tree, name, peer)) { + if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_module.tcp_peer_tree, + (ompi_process_name_t *) name, peer)) { MCA_OOB_TCP_PEER_RETURN(peer); OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock); return NULL; } ompi_list_prepend(&mca_oob_tcp_module.tcp_peer_list, (ompi_list_item_t *) peer); + /* if the peer list is over the maximum size, remove one unsed peer */ if(ompi_list_get_size(&mca_oob_tcp_module.tcp_peer_list) > mca_oob_tcp_module.tcp_cache_size) { - /* do something - remove LRU items from peer list (that aren't in use) */ + old = (mca_oob_tcp_peer_t *) + ompi_list_get_last(&mca_oob_tcp_module.tcp_peer_list); + while(1) { + if(0 == ompi_list_get_size(&(old->peer_send_queue)) && + 0 == ompi_list_get_size(&(old->peer_recv_queue))) { + ompi_list_remove_item(&mca_oob_tcp_module.tcp_peer_list, + (ompi_list_item_t *) old); + MCA_OOB_TCP_PEER_RETURN(old); + break; + } else { + old = (mca_oob_tcp_peer_t *) ompi_list_get_prev(old); + if(NULL == old) { + break; + } + } + } } OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock); return peer; @@ -251,7 +268,7 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer) mca_oob_tcp_peer_close(peer); return; } - + if(mca_oob_tcp_peer_send_connect_ack(peer) == OMPI_SUCCESS) { peer->peer_state = MCA_OOB_TCP_CONNECT_ACK; ompi_event_add(&peer->peer_recv_event, 0); @@ -300,7 +317,6 @@ static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer) * Send the globally unique identifier for this process to a peer on * a newly connected socket. */ - static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) { /* send process identifier to remote peer */ @@ -316,7 +332,6 @@ static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) * connected socket and verify the expected response. If so, move the * socket to a connected state. */ - static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer) { ompi_process_name_t guid; diff --git a/src/mca/oob/tcp/oob_tcp_peer.h b/src/mca/oob/tcp/oob_tcp_peer.h index da2af9151c..13739855d0 100644 --- a/src/mca/oob/tcp/oob_tcp_peer.h +++ b/src/mca/oob/tcp/oob_tcp_peer.h @@ -50,14 +50,19 @@ struct mca_oob_tcp_peer_t { }; typedef struct mca_oob_tcp_peer_t mca_oob_tcp_peer_t; - +/** + * Get a new peer data structure + */ #define MCA_OOB_TCP_PEER_ALLOC(peer, rc) \ { \ ompi_list_item_t* item; \ OMPI_FREE_LIST_GET(&mca_oob_tcp_module.tcp_peer_free, item, rc); \ peer = (mca_oob_tcp_peer_t*)item; \ } - + +/** + * Return a peer data structure + */ #define MCA_OOB_TCP_PEER_RETURN(peer) \ { \ OMPI_FREE_LIST_RETURN(&mca_oob_tcp_module.tcp_peer_free, (ompi_list_item_t*)peer); \ @@ -74,7 +79,7 @@ extern "C" { * * @param peer_name the name of the peer * - * @retval pointer to the newly created struture + * @retval pointer to the peer's (possibly newly created) struture * @retval NULL if there was a problem */ mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(const ompi_process_name_t* peer_name); diff --git a/src/mca/oob/tcp/oob_tcp_recv.c b/src/mca/oob/tcp/oob_tcp_recv.c index 085f7e5f12..3c39e658ba 100644 --- a/src/mca/oob/tcp/oob_tcp_recv.c +++ b/src/mca/oob/tcp/oob_tcp_recv.c @@ -1,7 +1,7 @@ #include "mca/oob/tcp/oob_tcp.h" /* - * Similiar to unix recv(2) + * Similiar to unix readv(2) * * @param peer (IN) Opaque name of peer process or OOB_NAME_ANY for wildcard receive. * @param msg (IN) Array of iovecs describing user buffers and lengths. diff --git a/src/mca/oob/tcp/oob_tcp_send.c b/src/mca/oob/tcp/oob_tcp_send.c index 48a90710a9..520f2f784c 100644 --- a/src/mca/oob/tcp/oob_tcp_send.c +++ b/src/mca/oob/tcp/oob_tcp_send.c @@ -35,7 +35,7 @@ int mca_oob_tcp_send(const ompi_process_name_t* name, const struct iovec *iov, i rc = mca_oob_tcp_peer_send(peer, msg); if(rc != OMPI_SUCCESS) { - OBJ_RELEASE(msg); + MCA_OOB_TCP_MSG_RETURN(msg); return rc; } @@ -87,7 +87,7 @@ int mca_oob_tcp_send_nb( rc = mca_oob_tcp_peer_send(peer, msg); if(rc != OMPI_SUCCESS) { - OBJ_RELEASE(msg); + MCA_OOB_TCP_MSG_RETURN(msg); return rc; } return OMPI_SUCCESS;