Some minor updates to the tcp oob.
- Changed/added some documentation - Added the code to relase old peers when the list is too long This commit was SVN r1699.
Этот коммит содержится в:
родитель
f6af7c50ee
Коммит
18df76e46b
@ -8,8 +8,11 @@ include $(top_ompi_srcdir)/config/Makefile.options
|
||||
|
||||
sources = \
|
||||
oob_tcp.c \
|
||||
oob_tcp.h \
|
||||
oob_tcp_msg.c \
|
||||
oob_tcp_msg.h \
|
||||
oob_tcp_peer.c \
|
||||
oob_tcp_peer.h \
|
||||
oob_tcp_recv.c \
|
||||
oob_tcp_send.c
|
||||
|
||||
|
@ -35,7 +35,7 @@ int mca_oob_tcp_finalize(void);
|
||||
|
||||
|
||||
/**
|
||||
* Similiar to unix send(2).
|
||||
* Similiar to unix writev(2).
|
||||
*
|
||||
* @param peer (IN) Opaque name of peer process.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
@ -47,7 +47,7 @@ int mca_oob_tcp_finalize(void);
|
||||
int mca_oob_tcp_send(const ompi_process_name_t* peer, const struct iovec *msg, int count, int flags);
|
||||
|
||||
/**
|
||||
* Similiar to unix recv(2)
|
||||
* Similiar to unix readv(2)
|
||||
*
|
||||
* @param peer (IN) Opaque name of peer process or OOB_NAME_ANY for wildcard receive.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
|
@ -36,7 +36,9 @@ typedef struct mca_oob_tcp_msg_t mca_oob_tcp_msg_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t);
|
||||
|
||||
|
||||
/**
|
||||
* Get a new structure for use with a message
|
||||
*/
|
||||
#define MCA_OOB_TCP_MSG_ALLOC(msg, rc) \
|
||||
{ \
|
||||
ompi_list_item_t* item; \
|
||||
@ -44,6 +46,9 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t);
|
||||
msg = (mca_oob_tcp_msg_t*)item; \
|
||||
}
|
||||
|
||||
/**
|
||||
* return a message structure that is no longer needed
|
||||
*/
|
||||
#define MCA_OOB_TCP_MSG_RETURN(msg) \
|
||||
{ \
|
||||
OMPI_FREE_LIST_RETURN(&mca_oob_tcp_module.tcp_msgs, (ompi_list_item_t*)msg); \
|
||||
@ -55,7 +60,6 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t);
|
||||
* @param size (OUT) Number of bytes delivered.
|
||||
* @retval OMPI_SUCCESS or error code on failure.
|
||||
*/
|
||||
|
||||
int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size);
|
||||
|
||||
/**
|
||||
@ -64,7 +68,6 @@ int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size);
|
||||
* @param msg (IN) Message send/recv that has completed.
|
||||
* @retval OMPI_SUCCESS or error code on failure.
|
||||
*/
|
||||
|
||||
int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg);
|
||||
|
||||
/**
|
||||
@ -73,10 +76,8 @@ int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg);
|
||||
* @param sd (IN) Socket descriptor to use for send.
|
||||
* @retval bool Bool flag indicating wether operation has completed.
|
||||
*/
|
||||
|
||||
bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, int sd);
|
||||
|
||||
|
||||
/**
|
||||
* Called asynchronously to progress sending a message from the event library thread.
|
||||
* @param msg (IN) Message send that is in progress.
|
||||
|
@ -1,5 +1,4 @@
|
||||
#include <unistd.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/errno.h>
|
||||
@ -137,10 +136,11 @@ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
|
||||
mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name)
|
||||
{
|
||||
int rc;
|
||||
mca_oob_tcp_peer_t * peer;
|
||||
mca_oob_tcp_peer_t * peer, * old;
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_module.tcp_lock);
|
||||
peer = (mca_oob_tcp_peer_t*)ompi_rb_tree_find(&mca_oob_tcp_module.tcp_peer_tree, name);
|
||||
peer = (mca_oob_tcp_peer_t*)ompi_rb_tree_find(&mca_oob_tcp_module.tcp_peer_tree,
|
||||
(ompi_process_name_t *) name);
|
||||
if(NULL != peer) {
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock);
|
||||
return peer;
|
||||
@ -153,15 +153,32 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name)
|
||||
}
|
||||
|
||||
peer->peer_name = *name;
|
||||
if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_module.tcp_peer_tree, name, peer)) {
|
||||
if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_module.tcp_peer_tree,
|
||||
(ompi_process_name_t *) name, peer)) {
|
||||
MCA_OOB_TCP_PEER_RETURN(peer);
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock);
|
||||
return NULL;
|
||||
}
|
||||
ompi_list_prepend(&mca_oob_tcp_module.tcp_peer_list, (ompi_list_item_t *) peer);
|
||||
/* if the peer list is over the maximum size, remove one unsed peer */
|
||||
if(ompi_list_get_size(&mca_oob_tcp_module.tcp_peer_list) >
|
||||
mca_oob_tcp_module.tcp_cache_size) {
|
||||
/* do something - remove LRU items from peer list (that aren't in use) */
|
||||
old = (mca_oob_tcp_peer_t *)
|
||||
ompi_list_get_last(&mca_oob_tcp_module.tcp_peer_list);
|
||||
while(1) {
|
||||
if(0 == ompi_list_get_size(&(old->peer_send_queue)) &&
|
||||
0 == ompi_list_get_size(&(old->peer_recv_queue))) {
|
||||
ompi_list_remove_item(&mca_oob_tcp_module.tcp_peer_list,
|
||||
(ompi_list_item_t *) old);
|
||||
MCA_OOB_TCP_PEER_RETURN(old);
|
||||
break;
|
||||
} else {
|
||||
old = (mca_oob_tcp_peer_t *) ompi_list_get_prev(old);
|
||||
if(NULL == old) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_module.tcp_lock);
|
||||
return peer;
|
||||
@ -251,7 +268,7 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if(mca_oob_tcp_peer_send_connect_ack(peer) == OMPI_SUCCESS) {
|
||||
peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;
|
||||
ompi_event_add(&peer->peer_recv_event, 0);
|
||||
@ -300,7 +317,6 @@ static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
|
||||
* Send the globally unique identifier for this process to a peer on
|
||||
* a newly connected socket.
|
||||
*/
|
||||
|
||||
static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
/* send process identifier to remote peer */
|
||||
@ -316,7 +332,6 @@ static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
|
||||
* connected socket and verify the expected response. If so, move the
|
||||
* socket to a connected state.
|
||||
*/
|
||||
|
||||
static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
ompi_process_name_t guid;
|
||||
|
@ -50,14 +50,19 @@ struct mca_oob_tcp_peer_t {
|
||||
};
|
||||
typedef struct mca_oob_tcp_peer_t mca_oob_tcp_peer_t;
|
||||
|
||||
|
||||
/**
|
||||
* Get a new peer data structure
|
||||
*/
|
||||
#define MCA_OOB_TCP_PEER_ALLOC(peer, rc) \
|
||||
{ \
|
||||
ompi_list_item_t* item; \
|
||||
OMPI_FREE_LIST_GET(&mca_oob_tcp_module.tcp_peer_free, item, rc); \
|
||||
peer = (mca_oob_tcp_peer_t*)item; \
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return a peer data structure
|
||||
*/
|
||||
#define MCA_OOB_TCP_PEER_RETURN(peer) \
|
||||
{ \
|
||||
OMPI_FREE_LIST_RETURN(&mca_oob_tcp_module.tcp_peer_free, (ompi_list_item_t*)peer); \
|
||||
@ -74,7 +79,7 @@ extern "C" {
|
||||
*
|
||||
* @param peer_name the name of the peer
|
||||
*
|
||||
* @retval pointer to the newly created struture
|
||||
* @retval pointer to the peer's (possibly newly created) struture
|
||||
* @retval NULL if there was a problem
|
||||
*/
|
||||
mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(const ompi_process_name_t* peer_name);
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include "mca/oob/tcp/oob_tcp.h"
|
||||
|
||||
/*
|
||||
* Similiar to unix recv(2)
|
||||
* Similiar to unix readv(2)
|
||||
*
|
||||
* @param peer (IN) Opaque name of peer process or OOB_NAME_ANY for wildcard receive.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
|
@ -35,7 +35,7 @@ int mca_oob_tcp_send(const ompi_process_name_t* name, const struct iovec *iov, i
|
||||
|
||||
rc = mca_oob_tcp_peer_send(peer, msg);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
OBJ_RELEASE(msg);
|
||||
MCA_OOB_TCP_MSG_RETURN(msg);
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -87,7 +87,7 @@ int mca_oob_tcp_send_nb(
|
||||
|
||||
rc = mca_oob_tcp_peer_send(peer, msg);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
OBJ_RELEASE(msg);
|
||||
MCA_OOB_TCP_MSG_RETURN(msg);
|
||||
return rc;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user