1
1

Add support for async progress in the BTL TCP.

All BTL-only operations (basically all data movements
with the exception of the matching operation) can now
be handled for the TCP BTL by a progress thread.
Этот коммит содержится в:
George Bosilca 2013-08-30 13:57:11 +02:00
родитель 4b38b6bd0c
Коммит 32277db6ab
10 изменённых файлов: 738 добавлений и 218 удалений

Просмотреть файл

@ -32,7 +32,10 @@
#include "opal/datatype/opal_convertor.h" #include "opal/datatype/opal_convertor.h"
#include "ompi/mca/mpool/base/base.h" #include "ompi/mca/mpool/base/base.h"
#include "ompi/mca/mpool/mpool.h" #include "ompi/mca/mpool/mpool.h"
#include "ompi/proc/proc.h" #include "btl_tcp.h"
#include "btl_tcp_frag.h"
#include "btl_tcp_proc.h"
#include "btl_tcp_endpoint.h"
mca_btl_tcp2_module_t mca_btl_tcp2_module = { mca_btl_tcp2_module_t mca_btl_tcp2_module = {
{ {
@ -57,9 +60,9 @@ mca_btl_tcp2_module_t mca_btl_tcp2_module = {
mca_btl_tcp2_prepare_dst, mca_btl_tcp2_prepare_dst,
mca_btl_tcp2_send, mca_btl_tcp2_send,
NULL, /* send immediate */ NULL, /* send immediate */
mca_btl_tcp2_put, mca_btl_tcp_put,
NULL, /* get */ NULL, /* get */
mca_btl_base_dump, mca_btl_tcp_dump,
NULL, /* mpool */ NULL, /* mpool */
NULL, /* register error */ NULL, /* register error */
mca_btl_tcp2_ft_event mca_btl_tcp2_ft_event
@ -134,7 +137,9 @@ int mca_btl_tcp2_add_procs( struct mca_btl_base_module_t* btl,
/* we increase the count of MPI users of the event library /* we increase the count of MPI users of the event library
once per peer, so that we are used until we aren't once per peer, so that we are used until we aren't
connected to a peer */ connected to a peer */
#if !MCA_BTL_TCP_USES_PROGRESS_THREAD
opal_progress_event_users_increment(); opal_progress_event_users_increment();
#endif /* !MCA_BTL_TCP_USES_PROGRESS_THREAD */
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -153,7 +158,9 @@ int mca_btl_tcp2_del_procs(struct mca_btl_base_module_t* btl,
opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint); opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);
OBJ_RELEASE(tcp_endpoint); OBJ_RELEASE(tcp_endpoint);
} }
#if !MCA_BTL_TCP_USES_PROGRESS_THREAD
opal_progress_event_users_decrement(); opal_progress_event_users_decrement();
#endif /* !MCA_BTL_TCP_USES_PROGRESS_THREAD */
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -184,6 +191,10 @@ mca_btl_base_descriptor_t* mca_btl_tcp2_alloc(
return NULL; return NULL;
} }
#define GB_DEFINED 0
#if GB_DEFINED
opal_output(0, "alloc_frag( size = %lu )\n", size);
#endif /* GB_DEFINED */
frag->segments[0].seg_len = size; frag->segments[0].seg_len = size;
frag->segments[0].seg_addr.pval = frag+1; frag->segments[0].seg_addr.pval = frag+1;
@ -193,7 +204,8 @@ mca_btl_base_descriptor_t* mca_btl_tcp2_alloc(
frag->base.des_dst_cnt = 0; frag->base.des_dst_cnt = 0;
frag->base.des_flags = flags; frag->base.des_flags = flags;
frag->base.order = MCA_BTL_NO_ORDER; frag->base.order = MCA_BTL_NO_ORDER;
frag->btl = (mca_btl_tcp2_module_t*)btl; frag->btl = (mca_btl_tcp_module_t*)btl;
frag->endpoint = endpoint;
return (mca_btl_base_descriptor_t*)frag; return (mca_btl_base_descriptor_t*)frag;
} }
@ -296,6 +308,10 @@ mca_btl_base_descriptor_t* mca_btl_tcp2_prepare_src(
frag->base.des_flags = flags; frag->base.des_flags = flags;
frag->base.order = MCA_BTL_NO_ORDER; frag->base.order = MCA_BTL_NO_ORDER;
*size = max_data; *size = max_data;
#if GB_DEFINED
opal_output(0, "prepare_src( bConverted = %lu, size = %lu\n",
convertor->bConverted, *size);
#endif /* GB_DEFINED */
return &frag->base; return &frag->base;
} }
@ -343,6 +359,10 @@ mca_btl_base_descriptor_t* mca_btl_tcp2_prepare_dst(
frag->base.des_dst_cnt = 1; frag->base.des_dst_cnt = 1;
frag->base.des_flags = flags; frag->base.des_flags = flags;
frag->base.order = MCA_BTL_NO_ORDER; frag->base.order = MCA_BTL_NO_ORDER;
#if GB_DEFINED
opal_output(0, " prepare_dst( bConverted = %lu, size = %lu\n",
convertor->bConverted, *size);
#endif /* GB_DEFINED */
return &frag->base; return &frag->base;
} }
@ -384,7 +404,10 @@ int mca_btl_tcp2_send( struct mca_btl_base_module_t* btl,
frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_SEND; frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_SEND;
frag->hdr.count = 0; frag->hdr.count = 0;
if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr); if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
return mca_btl_tcp2_endpoint_send(endpoint,frag); #if GB_DEFINED
opal_output(0, "frag_send( size = %u )\n", frag->hdr.size );
#endif /* GB_DEFINED */
return mca_btl_tcp_endpoint_send(endpoint,frag);
} }
@ -425,7 +448,10 @@ int mca_btl_tcp2_put( mca_btl_base_module_t* btl,
frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT; frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT;
frag->hdr.count = frag->base.des_dst_cnt; frag->hdr.count = frag->base.des_dst_cnt;
if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr); if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
return ((i = mca_btl_tcp2_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : i); #if GB_DEFINED
opal_output(0, "frag_put( size = %u )\n", frag->hdr.size );
#endif /* GB_DEFINED */
return ((i = mca_btl_tcp_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : i);
} }
@ -462,12 +488,16 @@ int mca_btl_tcp2_get(
frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_GET; frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_GET;
frag->hdr.count = frag->base.des_src_cnt; frag->hdr.count = frag->base.des_src_cnt;
if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr); if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
return ((rc = mca_btl_tcp2_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : rc); #if GB_DEFINED
opal_output(0, "frag_get( size = %u )\n", frag->hdr.size );
#endif /* GB_DEFINED */
return ((rc = mca_btl_tcp_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : rc);
} }
/* /*
* Cleanup/release module resources. * Cleanup/release module resources. This function should only be called once,
* there is no need to protect it.
*/ */
int mca_btl_tcp2_finalize(struct mca_btl_base_module_t* btl) int mca_btl_tcp2_finalize(struct mca_btl_base_module_t* btl)
@ -479,8 +509,42 @@ int mca_btl_tcp2_finalize(struct mca_btl_base_module_t* btl)
item = opal_list_remove_first(&tcp_btl->tcp_endpoints)) { item = opal_list_remove_first(&tcp_btl->tcp_endpoints)) {
mca_btl_tcp2_endpoint_t *endpoint = (mca_btl_tcp2_endpoint_t*)item; mca_btl_tcp2_endpoint_t *endpoint = (mca_btl_tcp2_endpoint_t*)item;
OBJ_RELEASE(endpoint); OBJ_RELEASE(endpoint);
#if !MCA_BTL_TCP_USES_PROGRESS_THREAD
opal_progress_event_users_decrement(); opal_progress_event_users_decrement();
#endif /* !MCA_BTL_TCP_USES_PROGRESS_THREAD */
} }
free(tcp_btl); free(tcp_btl);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/**
*
*/
void mca_btl_tcp_dump(struct mca_btl_base_module_t* base_btl,
struct mca_btl_base_endpoint_t* endpoint,
int verbose)
{
mca_btl_tcp_module_t* btl = (mca_btl_tcp_module_t*)base_btl;
mca_btl_base_err("%s TCP %p kernel_id %d\n"
#if MCA_BTL_TCP_STATISTICS
" | statistics: sent %lu recv %lu\n"
#endif /* MCA_BTL_TCP_STATISTICS */
" | latency %u bandwidth %u\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (void*)btl, btl->tcp_ifkindex,
#if MCA_BTL_TCP_STATISTICS
btl->tcp_bytes_sent, btl->btl_bytes_recv,
#endif /* MCA_BTL_TCP_STATISTICS */
btl->super.btl_latency, btl->super.btl_bandwidth);
if( NULL != endpoint ) {
mca_btl_tcp_endpoint_dump( endpoint, "TCP" );
} else if( verbose ) {
opal_list_item_t *item;
for(item = opal_list_get_first(&btl->tcp_endpoints);
item != opal_list_get_end(&btl->tcp_endpoints);
item = opal_list_get_next(item)) {
mca_btl_tcp_endpoint_dump( (mca_btl_base_endpoint_t*)item, "TCP" );
}
}
}

Просмотреть файл

@ -49,18 +49,15 @@
#include <time.h> #include <time.h>
#endif /* HAVE_TIME_H */ #endif /* HAVE_TIME_H */
#include "opal/mca/event/event.h"
#include "ompi/types.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "opal/util/net.h" #include "opal/util/net.h"
#include "opal/util/fd.h"
#include "opal/util/show_help.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "ompi/mca/rte/rte.h"
#include "btl_tcp2.h" #include "btl_tcp_endpoint.h"
#include "btl_tcp2_endpoint.h" #include "btl_tcp_proc.h"
#include "btl_tcp2_proc.h" #include "btl_tcp_frag.h"
#include "btl_tcp2_frag.h"
#include "btl_tcp2_addr.h"
/* /*
* Initialize state of the endpoint instance. * Initialize state of the endpoint instance.
@ -123,12 +120,10 @@ static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user);
* diagnostics * diagnostics
*/ */
#if WANT_PEER_DUMP void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg)
static void mca_btl_tcp2_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg)
{ {
char src[64]; char src[64], dst[64], *status;
char dst[64]; int sndbuf, rcvbuf, nodelay, flags = -1;
int sndbuf,rcvbuf,nodelay,flags;
#if OPAL_ENABLE_IPV6 #if OPAL_ENABLE_IPV6
struct sockaddr_storage inaddr; struct sockaddr_storage inaddr;
#else #else
@ -136,69 +131,102 @@ static void mca_btl_tcp2_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, co
#endif #endif
opal_socklen_t obtlen; opal_socklen_t obtlen;
opal_socklen_t addrlen = sizeof(inaddr); opal_socklen_t addrlen = sizeof(inaddr);
opal_list_item_t *item;
getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen); if( -1 != btl_endpoint->endpoint_sd ) {
getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
#if OPAL_ENABLE_IPV6 #if OPAL_ENABLE_IPV6
{ {
char *address; char *address;
address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr); address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
if (NULL != address) { if (NULL != address) {
sprintf(src, "%s", address); sprintf(src, "%s", address);
}
} }
}
#else #else
sprintf(src, "%s", inet_ntoa(inaddr.sin_addr)); sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
#endif #endif
getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen); getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
#if OPAL_ENABLE_IPV6 #if OPAL_ENABLE_IPV6
{ {
char *address; char *address;
address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr); address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
if (NULL != address) { if (NULL != address) {
sprintf(dst, "%s", address); sprintf(dst, "%s", address);
}
} }
}
#else #else
sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr)); sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
#endif #endif
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) { if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno)); strerror(opal_socket_errno), opal_socket_errno));
} }
#if defined(SO_SNDBUF) #if defined(SO_SNDBUF)
obtlen = sizeof(sndbuf); obtlen = sizeof(sndbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) { if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
BTL_ERROR(("SO_SNDBUF option: %s (%d)", BTL_ERROR(("SO_SNDBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno)); strerror(opal_socket_errno), opal_socket_errno));
} }
#else #else
sndbuf = -1; sndbuf = -1;
#endif #endif
#if defined(SO_RCVBUF) #if defined(SO_RCVBUF)
obtlen = sizeof(rcvbuf); obtlen = sizeof(rcvbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) { if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
BTL_ERROR(("SO_RCVBUF option: %s (%d)", BTL_ERROR(("SO_RCVBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno)); strerror(opal_socket_errno), opal_socket_errno));
} }
#else #else
rcvbuf = -1; rcvbuf = -1;
#endif #endif
#if defined(TCP_NODELAY) #if defined(TCP_NODELAY)
obtlen = sizeof(nodelay); obtlen = sizeof(nodelay);
if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) { if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
BTL_ERROR(("TCP_NODELAY option: %s (%d)", BTL_ERROR(("TCP_NODELAY option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno)); strerror(opal_socket_errno), opal_socket_errno));
} }
#else #else
nodelay = 0; nodelay = 0;
#endif #endif
}
BTL_VERBOSE(("%s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x", mca_btl_base_err("%s %s: endpoint %p src %s - dst %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
msg, src, dst, nodelay, sndbuf, rcvbuf, flags)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg, (void*)btl_endpoint, src, dst, nodelay, sndbuf, rcvbuf, flags);
switch(btl_endpoint->endpoint_state) {
case MCA_BTL_TCP_CONNECTING:
status = "connecting"; break;
case MCA_BTL_TCP_CONNECT_ACK:
status = "connect ack"; break;
case MCA_BTL_TCP_CLOSED:
status = "closed"; break;
case MCA_BTL_TCP_FAILED:
status = "failed"; break;
case MCA_BTL_TCP_CONNECTED:
status = "connected"; break;
default:
status = "undefined"; break;
}
mca_btl_base_err("%s | [socket %d] [state %s] (nbo %s) (retries %u)\n"
#if MCA_BTL_TCP_ENDPOINT_CACHE
"\tcache %p length %lu pos %ld\n"
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
"\tpending: send %p recv %p\n",
msg, btl_endpoint->endpoint_sd, status,
(btl_endpoint->endpoint_nbo ? "true" : "false"), btl_endpoint->endpoint_retries,
#if MCA_BTL_TCP_ENDPOINT_CACHE
btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_length, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
(void*)btl_endpoint->endpoint_send_frag, (void*)btl_endpoint->endpoint_recv_frag );
for(item = opal_list_get_first(&btl_endpoint->endpoint_frags);
item != opal_list_get_end(&btl_endpoint->endpoint_frags);
item = opal_list_get_next(item)) {
mca_btl_tcp_dump_frag( (mca_btl_tcp_frag_t*)item, " | send" );
}
} }
#endif
/* /*
* Initialize events to be used by the endpoint instance for TCP select/poll callbacks. * Initialize events to be used by the endpoint instance for TCP select/poll callbacks.
@ -211,22 +239,22 @@ static inline void mca_btl_tcp2_endpoint_event_init(mca_btl_base_endpoint_t* btl
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache; btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
opal_event_set(opal_event_base, &btl_endpoint->endpoint_recv_event, opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd, btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST, OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp2_endpoint_recv_handler, mca_btl_tcp_endpoint_recv_handler,
btl_endpoint ); btl_endpoint );
/** /**
* The send event should be non persistent until the endpoint is * The send event should be non persistent until the endpoint is
* completely connected. This means, when the event is created it * completely connected. This means, when the event is created it
* will be fired only once, and when the endpoint is marked as * will be fired only once, and when the endpoint is marked as
* CONNECTED the event should be recreated with the correct flags. * CONNECTED the event should be recreated with the correct flags.
*/ */
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event, opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd, btl_endpoint->endpoint_sd,
OPAL_EV_WRITE, OPAL_EV_WRITE,
mca_btl_tcp2_endpoint_send_handler, mca_btl_tcp_endpoint_send_handler,
btl_endpoint); btl_endpoint);
} }
@ -239,7 +267,7 @@ int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tc
{ {
int rc = OMPI_SUCCESS; int rc = OMPI_SUCCESS;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_endpoint->endpoint_send_lock);
switch(btl_endpoint->endpoint_state) { switch(btl_endpoint->endpoint_state) {
case MCA_BTL_TCP_CONNECTING: case MCA_BTL_TCP_CONNECTING:
case MCA_BTL_TCP_CONNECT_ACK: case MCA_BTL_TCP_CONNECT_ACK:
@ -257,19 +285,18 @@ int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tc
if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY && if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd)) { mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd)) {
int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP); int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag);
if( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ) {
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
}
if( btl_ownership ) {
MCA_BTL_TCP_FRAG_RETURN(frag);
}
return 1; return 1;
} else { } else {
btl_endpoint->endpoint_send_frag = frag; btl_endpoint->endpoint_send_frag = frag;
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK; frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
#define GB_DEFINED 0
#if GB_DEFINED
opal_output(0, "%s:%d add the send event on socket %d\n",
__FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */
#endif /* GB_DEFINED */
MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
} }
} else { } else {
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK; frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
@ -277,7 +304,7 @@ int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tc
} }
break; break;
} }
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_endpoint->endpoint_send_lock);
return rc; return rc;
} }
@ -338,22 +365,20 @@ static int mca_btl_tcp2_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_e
bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint, bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
struct sockaddr* addr, int sd) struct sockaddr* addr, int sd)
{ {
mca_btl_tcp2_proc_t* this_proc = mca_btl_tcp2_proc_local(); mca_btl_tcp_proc_t *endpoint_proc = btl_endpoint->endpoint_proc;
mca_btl_tcp2_proc_t *endpoint_proc = btl_endpoint->endpoint_proc; const orte_process_name_t *this_proc = &(ompi_proc_local()->proc_name);
int cmpval; int cmpval;
if(NULL == btl_endpoint->endpoint_addr) {
return false;
}
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock); OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
if(NULL == btl_endpoint->endpoint_addr) { cmpval = ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return false;
}
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
&endpoint_proc->proc_ompi->proc_name, &endpoint_proc->proc_ompi->proc_name,
&this_proc->proc_ompi->proc_name); this_proc);
if((btl_endpoint->endpoint_sd < 0) || if((btl_endpoint->endpoint_sd < 0) ||
(btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED && (btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
cmpval < 0)) { cmpval < 0)) {
@ -365,9 +390,16 @@ bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return false; return false;
} }
mca_btl_tcp2_endpoint_event_init(btl_endpoint); mca_btl_tcp_endpoint_event_init(btl_endpoint);
opal_event_add(&btl_endpoint->endpoint_recv_event, 0); /* NOT NEEDED if we remove the PERSISTENT flag when we create the
mca_btl_tcp2_endpoint_connected(btl_endpoint); * first recv_event.
*/
#if GB_DEFINED
opal_output(0, "%s:%d add the recv event on socket %d\n",
__FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */
#endif /* GB_DEFINED */
opal_event_add(&btl_endpoint->endpoint_recv_event, 0); /* TODO */
mca_btl_tcp_endpoint_connected(btl_endpoint);
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
mca_btl_tcp2_endpoint_dump(btl_endpoint, "accepted"); mca_btl_tcp2_endpoint_dump(btl_endpoint, "accepted");
#endif #endif
@ -388,16 +420,19 @@ bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
*/ */
void mca_btl_tcp2_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint) void mca_btl_tcp2_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
{ {
if(btl_endpoint->endpoint_sd < 0) int sd = btl_endpoint->endpoint_sd;
return;
btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED; do {
if( sd < 0 ) return;
} while ( opal_atomic_cmpset( &(btl_endpoint->endpoint_sd), sd, -1 ) );
CLOSE_THE_SOCKET(sd);
btl_endpoint->endpoint_retries++; btl_endpoint->endpoint_retries++;
opal_event_del(&btl_endpoint->endpoint_recv_event); opal_event_del(&btl_endpoint->endpoint_recv_event);
opal_event_del(&btl_endpoint->endpoint_send_event); opal_event_del(&btl_endpoint->endpoint_send_event);
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
btl_endpoint->endpoint_sd = -1;
#if MCA_BTL_TCP_ENDPOINT_CACHE #if MCA_BTL_TCP_ENDPOINT_CACHE
free( btl_endpoint->endpoint_cache ); if( NULL != btl_endpoint->endpoint_cache )
free( btl_endpoint->endpoint_cache );
btl_endpoint->endpoint_cache = NULL; btl_endpoint->endpoint_cache = NULL;
btl_endpoint->endpoint_cache_pos = NULL; btl_endpoint->endpoint_cache_pos = NULL;
btl_endpoint->endpoint_cache_length = 0; btl_endpoint->endpoint_cache_length = 0;
@ -417,16 +452,21 @@ static void mca_btl_tcp2_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoin
btl_endpoint->endpoint_retries = 0; btl_endpoint->endpoint_retries = 0;
/* Create the send event in a persistent manner. */ /* Create the send event in a persistent manner. */
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event, opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd, btl_endpoint->endpoint_sd,
OPAL_EV_WRITE | OPAL_EV_PERSIST, OPAL_EV_WRITE | OPAL_EV_PERSIST,
mca_btl_tcp2_endpoint_send_handler, mca_btl_tcp_endpoint_send_handler,
btl_endpoint ); btl_endpoint );
if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) { if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
if(NULL == btl_endpoint->endpoint_send_frag) if(NULL == btl_endpoint->endpoint_send_frag) {
btl_endpoint->endpoint_send_frag = (mca_btl_tcp2_frag_t*) btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
opal_list_remove_first(&btl_endpoint->endpoint_frags); opal_list_remove_first(&btl_endpoint->endpoint_frags);
}
#if GB_DEFINED
opal_output(0, "%s:%d add the send event on socket %d\n",
__FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */
#endif /* GB_DEFINED */
opal_event_add(&btl_endpoint->endpoint_send_event, 0); opal_event_add(&btl_endpoint->endpoint_send_event, 0);
} }
} }
@ -578,7 +618,11 @@ static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endp
/* non-blocking so wait for completion */ /* non-blocking so wait for completion */
if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) { if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING; btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
opal_event_add(&btl_endpoint->endpoint_send_event, 0); #if GB_DEFINED
opal_output(0, "%s:%d add the send event on socket %d\n",
__FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */
#endif /* GB_DEFINED */
MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
{ {
@ -597,7 +641,11 @@ static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endp
/* send our globally unique process identifier to the endpoint */ /* send our globally unique process identifier to the endpoint */
if((rc = mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint)) == OMPI_SUCCESS) { if((rc = mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint)) == OMPI_SUCCESS) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK; btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
opal_event_add(&btl_endpoint->endpoint_recv_event, 0); #if GB_DEFINED
opal_output(0, "%s:%d add the recv event on socket %d\n",
__FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */
#endif /* GB_DEFINED */
MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_recv_event, 0);
} else { } else {
mca_btl_tcp2_endpoint_close(btl_endpoint); mca_btl_tcp2_endpoint_close(btl_endpoint);
} }
@ -619,6 +667,10 @@ static void mca_btl_tcp2_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_
mca_btl_tcp2_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr); mca_btl_tcp2_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
/* unregister from receiving event notifications */ /* unregister from receiving event notifications */
#if GB_DEFINED
opal_output(0, "%s:%d remove the send event on socket %d\n",
__FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */
#endif /* GB_DEFINED */
opal_event_del(&btl_endpoint->endpoint_send_event); opal_event_del(&btl_endpoint->endpoint_send_event);
/* check connect completion status */ /* check connect completion status */
@ -630,6 +682,10 @@ static void mca_btl_tcp2_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_
return; return;
} }
if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) { if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) {
#if GB_DEFINED
opal_output(0, "%s:%d add the send event on socket %d\n",
__FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */
#endif /* GB_DEFINED */
opal_event_add(&btl_endpoint->endpoint_send_event, 0); opal_event_add(&btl_endpoint->endpoint_send_event, 0);
return; return;
} }
@ -643,6 +699,10 @@ static void mca_btl_tcp2_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_
if(mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint) == OMPI_SUCCESS) { if(mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint) == OMPI_SUCCESS) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK; btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
#if GB_DEFINED
opal_output(0, "%s:%d add the recv event on socket %d\n",
__FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */
#endif /* GB_DEFINED */
opal_event_add(&btl_endpoint->endpoint_recv_event, 0); opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
} else { } else {
mca_btl_tcp2_endpoint_close(btl_endpoint); mca_btl_tcp2_endpoint_close(btl_endpoint);
@ -688,9 +748,12 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
mca_btl_tcp2_frag_t* frag; mca_btl_tcp2_frag_t* frag;
frag = btl_endpoint->endpoint_recv_frag; frag = btl_endpoint->endpoint_recv_frag;
data_still_pending_on_endpoint:
if(NULL == frag) { if(NULL == frag) {
if(mca_btl_tcp2_module.super.btl_max_send_size >
mca_btl_tcp2_module.super.btl_eager_limit) { if(mca_btl_tcp_module.super.btl_max_send_size >
mca_btl_tcp_module.super.btl_eager_limit) {
MCA_BTL_TCP_FRAG_ALLOC_MAX(frag); MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
} else { } else {
MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag); MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
@ -703,30 +766,32 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint); MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
} }
#if MCA_BTL_TCP_ENDPOINT_CACHE
assert( 0 == btl_endpoint->endpoint_cache_length );
data_still_pending_on_endpoint:
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
/* check for completion of non-blocking recv on the current fragment */ /* check for completion of non-blocking recv on the current fragment */
if(mca_btl_tcp2_frag_recv(frag, btl_endpoint->endpoint_sd) == false) { if( mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false ) {
btl_endpoint->endpoint_recv_frag = frag; btl_endpoint->endpoint_recv_frag = frag;
} else { } else {
btl_endpoint->endpoint_recv_frag = NULL; btl_endpoint->endpoint_recv_frag = NULL;
if( MCA_BTL_TCP_HDR_TYPE_SEND == frag->hdr.type ) {
mca_btl_active_message_callback_t* reg; TODO_MCA_BTL_TCP_RECV_TRIGGER_CB(frag);
reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag;
reg->cbfunc(&frag->btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata);
}
#if MCA_BTL_TCP_ENDPOINT_CACHE #if MCA_BTL_TCP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) { if( 0 != btl_endpoint->endpoint_cache_length ) {
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
/* Get a new fragment and try again */
frag = NULL;
#else
/* If the cache still contain some data we can reuse the same fragment /* If the cache still contain some data we can reuse the same fragment
* until we flush it completly. * until we flush it completly.
*/ */
MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint); MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
goto data_still_pending_on_endpoint; goto data_still_pending_on_endpoint;
} }
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
#if !MCA_BTL_TCP_USES_PROGRESS_THREAD
MCA_BTL_TCP_FRAG_RETURN(frag); MCA_BTL_TCP_FRAG_RETURN(frag);
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
} }
#if MCA_BTL_TCP_ENDPOINT_CACHE #if MCA_BTL_TCP_ENDPOINT_CACHE
assert( 0 == btl_endpoint->endpoint_cache_length ); assert( 0 == btl_endpoint->endpoint_cache_length );
@ -741,12 +806,13 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
* of the MPI_Finalize. The first one will close the connections, * of the MPI_Finalize. The first one will close the connections,
* and all others will complain. * and all others will complain.
*/ */
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
break; break;
default: default:
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state)); BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
mca_btl_tcp2_endpoint_close(btl_endpoint); btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
mca_btl_tcp_endpoint_close(btl_endpoint);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
break; break;
} }
} }
@ -759,8 +825,8 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user) static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user)
{ {
mca_btl_tcp2_endpoint_t* btl_endpoint = (mca_btl_tcp2_endpoint_t *)user; mca_btl_tcp_endpoint_t* btl_endpoint = (mca_btl_tcp_endpoint_t *)user;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock);
switch(btl_endpoint->endpoint_state) { switch(btl_endpoint->endpoint_state) {
case MCA_BTL_TCP_CONNECTING: case MCA_BTL_TCP_CONNECTING:
mca_btl_tcp2_endpoint_complete_connect(btl_endpoint); mca_btl_tcp2_endpoint_complete_connect(btl_endpoint);
@ -779,27 +845,31 @@ static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user)
opal_list_remove_first(&btl_endpoint->endpoint_frags); opal_list_remove_first(&btl_endpoint->endpoint_frags);
/* if required - update request status and release fragment */ /* if required - update request status and release fragment */
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ); assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK );
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc); TODO_MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag);
if( btl_ownership ) { opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock);
MCA_BTL_TCP_FRAG_RETURN(frag);
}
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
} }
/* if nothing else to do unregister for send event notifications */ /* if no more data to send unregister the send notifications */
if(NULL == btl_endpoint->endpoint_send_frag) { if(NULL == btl_endpoint->endpoint_send_frag) {
#if GB_DEFINED
opal_output(0, "%s:%d remove the send event on socket %d\n",
__FILE__, __LINE__, sd); /* GB */
#endif /* GB_DEFINED */
opal_event_del(&btl_endpoint->endpoint_send_event); opal_event_del(&btl_endpoint->endpoint_send_event);
} }
break; break;
default: default:
BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state)); BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
#if GB_DEFINED
opal_output(0, "%s:%d remove the send event on socket %d\n",
__FILE__, __LINE__, sd); /* GB */
#endif /* GB_DEFINED */
opal_event_del(&btl_endpoint->endpoint_send_event); opal_event_del(&btl_endpoint->endpoint_send_event);
break; break;
} }
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
} }

Просмотреть файл

@ -41,6 +41,14 @@ BEGIN_C_DECLS
#define MCA_BTL_TCP_FRAG_IOVEC_NUMBER 4 #define MCA_BTL_TCP_FRAG_IOVEC_NUMBER 4
/**
* Commands for the threaded version when the fragments must be completed
* by one of the MPI bounded threads.
*/
#define MCA_BTL_TCP_FRAG_STEP_UNDEFINED ((uint16_t)0x0000)
#define MCA_BTL_TCP_FRAG_STEP_SEND_COMPLETE ((uint16_t)0x0001)
#define MCA_BTL_TCP_FRAG_STEP_RECV_COMPLETE ((uint16_t)0x0002)
/** /**
* TCP fragment derived type. * TCP fragment derived type.
*/ */
@ -82,49 +90,77 @@ OBJ_CLASS_DECLARATION(mca_btl_tcp2_frag_user_t);
#define MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag) \ #define MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag) \
{ \ { \
ompi_free_list_item_t *item; \ ompi_free_list_item_t *item; \
OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_eager, item); \ MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_eager_mutex); \
frag = (mca_btl_tcp2_frag_t*) item; \ OMPI_FREE_LIST_GET_MT(&mca_btl_tcp_component.tcp_frag_eager, item); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_eager_mutex); \
frag = (mca_btl_tcp_frag_t*) item; \
} }
#define MCA_BTL_TCP_FRAG_ALLOC_MAX(frag) \ #define MCA_BTL_TCP_FRAG_ALLOC_MAX(frag) \
{ \ { \
ompi_free_list_item_t *item; \ ompi_free_list_item_t *item; \
OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_max, item); \ MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_max_mutex); \
frag = (mca_btl_tcp2_frag_t*) item; \ OMPI_FREE_LIST_GET_MT(&mca_btl_tcp_component.tcp_frag_max, item); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_max_mutex); \
frag = (mca_btl_tcp_frag_t*) item; \
} }
#define MCA_BTL_TCP_FRAG_ALLOC_USER(frag) \ #define MCA_BTL_TCP_FRAG_ALLOC_USER(frag) \
{ \ { \
ompi_free_list_item_t *item; \ ompi_free_list_item_t *item; \
OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_user, item); \ MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_user_mutex); \
frag = (mca_btl_tcp2_frag_t*) item; \ OMPI_FREE_LIST_GET_MT(&mca_btl_tcp_component.tcp_frag_user, item); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_user_mutex); \
frag = (mca_btl_tcp_frag_t*) item; \
} }
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
#define MCA_BTL_TCP_FRAG_RETURN(frag) \ #define MCA_BTL_TCP_FRAG_RETURN(frag) \
{ \ { \
OMPI_FREE_LIST_RETURN(frag->my_list, (ompi_free_list_item_t*)(frag)); \ (frag)->next_step = MCA_BTL_TCP_FRAG_STEP_UNDEFINED; \
if( frag->my_list == &mca_btl_tcp_component.tcp_frag_eager ) { \
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_eager_mutex); \
OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_eager_mutex); \
} else if( frag->my_list == &mca_btl_tcp_component.tcp_frag_max ) { \
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_max_mutex); \
OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_max_mutex); \
} else { \
assert( frag->my_list == &mca_btl_tcp_component.tcp_frag_user ); \
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_user_mutex); \
OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_user_mutex); \
} \
} }
#else
#define MCA_BTL_TCP_FRAG_RETURN(frag) \
{ \
(frag)->next_step = MCA_BTL_TCP_FRAG_STEP_UNDEFINED; \
OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \
}
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
#define MCA_BTL_TCP_FRAG_INIT_DST(frag,ep) \ #define MCA_BTL_TCP_FRAG_INIT_DST(frag,ep) \
do { \ do { \
frag->rc = 0; \ frag->base.des_src = NULL; \
frag->btl = ep->endpoint_btl; \ frag->base.des_src_cnt = 0; \
frag->base.des_dst = frag->segments; \
frag->base.des_dst_cnt = 1; \
frag->endpoint = ep; \ frag->endpoint = ep; \
frag->iov[0].iov_len = sizeof(frag->hdr); \ frag->iov[0].iov_len = sizeof(frag->hdr); \
frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr; \ frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr; \
frag->iov_cnt = 1; \ frag->iov_cnt = 1; \
frag->iov_idx = 0; \ frag->iov_idx = 0; \
frag->iov_ptr = frag->iov; \ frag->iov_ptr = frag->iov; \
frag->base.des_src = NULL; \ frag->rc = 0; \
frag->base.des_dst_cnt = 0; \
frag->base.des_dst = frag->segments; \
frag->base.des_dst_cnt = 1; \
} while(0) } while(0)
bool mca_btl_tcp2_frag_send(mca_btl_tcp2_frag_t*, int sd); bool mca_btl_tcp2_frag_send(mca_btl_tcp2_frag_t*, int sd);
bool mca_btl_tcp2_frag_recv(mca_btl_tcp2_frag_t*, int sd); bool mca_btl_tcp2_frag_recv(mca_btl_tcp2_frag_t*, int sd);
void mca_btl_tcp_dump_frag( mca_btl_tcp_frag_t* frag, char* msg );
END_C_DECLS END_C_DECLS
#endif #endif

Просмотреть файл

@ -3,6 +3,7 @@
* University Research and Technology * University Research and Technology
* Corporation. All rights reserved. * Corporation. All rights reserved.
* Copyright (c) 2004-2010 The University of Tennessee and The University * Copyright (c) 2004-2010 The University of Tennessee and The University
* Copyright (c) 2004-2012 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -77,10 +78,10 @@ void mca_btl_tcp2_proc_construct(mca_btl_tcp2_proc_t* tcp_proc)
void mca_btl_tcp2_proc_destruct(mca_btl_tcp2_proc_t* tcp_proc) void mca_btl_tcp2_proc_destruct(mca_btl_tcp2_proc_t* tcp_proc)
{ {
/* remove from list of all proc instances */ /* remove from list of all proc instances */
OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock); MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
opal_proc_table_remove_value(&mca_btl_tcp2_component.tcp_procs, opal_hash_table_remove_value_uint64(&mca_btl_tcp_component.tcp_procs,
tcp_proc->proc_ompi->proc_name); ompi_rte_hash_name(&tcp_proc->proc_ompi->proc_name));
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock); MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
/* release resources */ /* release resources */
if(NULL != tcp_proc->proc_endpoints) { if(NULL != tcp_proc->proc_endpoints) {
@ -103,11 +104,11 @@ mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_create(ompi_proc_t* ompi_proc)
size_t size; size_t size;
mca_btl_tcp2_proc_t* btl_proc; mca_btl_tcp2_proc_t* btl_proc;
OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock); MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
rc = opal_proc_table_get_value(&mca_btl_tcp2_component.tcp_procs, rc = opal_hash_table_get_value_uint64(&mca_btl_tcp_component.tcp_procs,
ompi_proc->proc_name, (void**)&btl_proc); hash, (void**)&btl_proc);
if(OMPI_SUCCESS == rc) { if(OMPI_SUCCESS == rc) {
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock); MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
return btl_proc; return btl_proc;
} }
@ -117,9 +118,9 @@ mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_create(ompi_proc_t* ompi_proc)
btl_proc->proc_ompi = ompi_proc; btl_proc->proc_ompi = ompi_proc;
/* add to hash table of all proc instance */ /* add to hash table of all proc instance */
opal_proc_table_set_value(&mca_btl_tcp2_component.tcp_procs, opal_hash_table_set_value_uint64(&mca_btl_tcp_component.tcp_procs,
ompi_proc->proc_name, btl_proc); hash, btl_proc);
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock); MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
/* lookup tcp parameters exported by this proc */ /* lookup tcp parameters exported by this proc */
rc = ompi_modex_recv( &mca_btl_tcp2_component.super.btl_version, rc = ompi_modex_recv( &mca_btl_tcp2_component.super.btl_version,
@ -681,8 +682,8 @@ int mca_btl_tcp2_proc_insert( mca_btl_tcp2_proc_t* btl_proc,
int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t* btl_proc, mca_btl_base_endpoint_t* btl_endpoint) int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t* btl_proc, mca_btl_base_endpoint_t* btl_endpoint)
{ {
size_t i; size_t i;
OPAL_THREAD_LOCK(&btl_proc->proc_lock); MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_proc->proc_lock);
for(i=0; i<btl_proc->proc_endpoint_count; i++) { for( i = 0; i < btl_proc->proc_endpoint_count; i++ ) {
if(btl_proc->proc_endpoints[i] == btl_endpoint) { if(btl_proc->proc_endpoints[i] == btl_endpoint) {
memmove(btl_proc->proc_endpoints+i, btl_proc->proc_endpoints+i+1, memmove(btl_proc->proc_endpoints+i, btl_proc->proc_endpoints+i+1,
(btl_proc->proc_endpoint_count-i-1)*sizeof(mca_btl_base_endpoint_t*)); (btl_proc->proc_endpoint_count-i-1)*sizeof(mca_btl_base_endpoint_t*));
@ -700,7 +701,7 @@ int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t* btl_proc, mca_btl_base_endpoin
break; break;
} }
} }
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock); MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_proc->proc_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -710,11 +711,11 @@ int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t* btl_proc, mca_btl_base_endpoin
*/ */
mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_lookup(const orte_process_name_t *name) mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_lookup(const orte_process_name_t *name)
{ {
mca_btl_tcp2_proc_t* proc = NULL; mca_btl_tcp_proc_t* proc = NULL;
OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock); MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
opal_proc_table_get_value(&mca_btl_tcp2_component.tcp_procs, opal_hash_table_get_value_uint64(&mca_btl_tcp_component.tcp_procs,
name->proc_name, (void**)&proc); ompi_rte_hash_name(name), (void**)&proc);
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock); MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
return proc; return proc;
} }
@ -725,7 +726,7 @@ mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_lookup(const orte_process_name_t *name)
bool mca_btl_tcp2_proc_accept(mca_btl_tcp2_proc_t* btl_proc, struct sockaddr* addr, int sd) bool mca_btl_tcp2_proc_accept(mca_btl_tcp2_proc_t* btl_proc, struct sockaddr* addr, int sd)
{ {
size_t i; size_t i;
OPAL_THREAD_LOCK(&btl_proc->proc_lock); MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_proc->proc_lock);
for( i = 0; i < btl_proc->proc_endpoint_count; i++ ) { for( i = 0; i < btl_proc->proc_endpoint_count; i++ ) {
mca_btl_base_endpoint_t* btl_endpoint = btl_proc->proc_endpoints[i]; mca_btl_base_endpoint_t* btl_endpoint = btl_proc->proc_endpoints[i];
/* Check all conditions before going to try to accept the connection. */ /* Check all conditions before going to try to accept the connection. */
@ -754,12 +755,12 @@ bool mca_btl_tcp2_proc_accept(mca_btl_tcp2_proc_t* btl_proc, struct sockaddr* ad
; ;
} }
if(mca_btl_tcp2_endpoint_accept(btl_endpoint, addr, sd)) { if(mca_btl_tcp_endpoint_accept(btl_endpoint, addr, sd)) {
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock); MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_proc->proc_lock);
return true; return true;
} }
} }
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock); MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_proc->proc_lock);
return false; return false;
} }

Просмотреть файл

@ -43,14 +43,88 @@
/* Open MPI includes */ /* Open MPI includes */
#include "opal/mca/event/event.h" #include "opal/mca/event/event.h"
#include "opal/mca/btl/btl.h" #include "ompi/class/ompi_free_list.h"
#include "opal/mca/btl/base/base.h" #include "ompi/mca/btl/btl.h"
#include "opal/mca/mpool/mpool.h" #include "ompi/mca/btl/base/base.h"
#include "ompi/mca/mpool/mpool.h"
#include "opal/class/opal_hash_table.h" #include "opal/class/opal_hash_table.h"
#define MCA_BTL_TCP_STATISTICS 0 #define MCA_BTL_TCP_STATISTICS 0
BEGIN_C_DECLS BEGIN_C_DECLS
#if (HAVE_PTHREAD_H == 1)
#define MCA_BTL_TCP_USES_PROGRESS_THREAD 1
#else
#define MCA_BTL_TCP_USES_PROGRESS_THREAD 0
#endif /* (HAVE_PTHREAD_H == 1) */
extern opal_event_base_t* mca_btl_tcp_event_base;
#define MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag) \
do { \
int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP); \
if( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ) { \
frag->base.des_cbfunc(&frag->endpoint->endpoint_btl->super, frag->endpoint, \
&frag->base, frag->rc); \
} \
if( btl_ownership ) { \
MCA_BTL_TCP_FRAG_RETURN(frag); \
} \
} while (0)
#define MCA_BTL_TCP_RECV_TRIGGER_CB(frag) \
do { \
if( MCA_BTL_TCP_HDR_TYPE_SEND == frag->hdr.type ) { \
mca_btl_active_message_callback_t* reg; \
reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag; \
reg->cbfunc(&frag->endpoint->endpoint_btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata); \
} \
} while (0)
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
extern opal_list_t mca_btl_tcp_ready_frag_pending_queue;
extern opal_mutex_t mca_btl_tcp_ready_frag_mutex;
extern int mca_btl_tcp_pipe_to_progress[2];
#define MCA_BTL_TCP_CRITICAL_SECTION_ENTER(name) \
opal_mutex_atomic_lock((name))
#define MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(name) \
opal_mutex_atomic_unlock((name))
#define TODO_MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag) \
do { \
(frag)->next_step = MCA_BTL_TCP_FRAG_STEP_SEND_COMPLETE; \
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_ready_frag_mutex); \
opal_list_append(&mca_btl_tcp_ready_frag_pending_queue, \
(opal_list_item_t*)frag); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_ready_frag_mutex); \
} while (0)
#define TODO_MCA_BTL_TCP_RECV_TRIGGER_CB(frag) \
do { \
(frag)->next_step = MCA_BTL_TCP_FRAG_STEP_RECV_COMPLETE; \
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_ready_frag_mutex); \
opal_list_append(&mca_btl_tcp_ready_frag_pending_queue, \
(opal_list_item_t*)frag); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_ready_frag_mutex); \
} while (0)
#define MCA_BTL_TCP_ACTIVATE_EVENT(event, value) \
do { \
opal_event_t* _event = (opal_event_t*)(event); \
opal_fd_write( mca_btl_tcp_pipe_to_progress[1], sizeof(opal_event_t*), \
&_event); \
} while (0)
#else
#define MCA_BTL_TCP_CRITICAL_SECTION_ENTER(name)
#define MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(name)
#define TODO_MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag) \
MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag)
#define TODO_MCA_BTL_TCP_RECV_TRIGGER_CB(frag) \
MCA_BTL_TCP_RECV_TRIGGER_CB(frag)
#define MCA_BTL_TCP_ACTIVATE_EVENT(event, value) \
do { \
opal_event_add(event, (value)); \
} while (0)
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
/** /**
* TCP BTL component. * TCP BTL component.
@ -95,6 +169,12 @@ struct mca_btl_tcp_component_t {
opal_free_list_t tcp_frag_max; opal_free_list_t tcp_frag_max;
opal_free_list_t tcp_frag_user; opal_free_list_t tcp_frag_user;
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
opal_event_t tcp_recv_thread_async_event;
opal_mutex_t tcp_frag_eager_mutex;
opal_mutex_t tcp_frag_max_mutex;
opal_mutex_t tcp_frag_user_mutex;
#endif
/* Do we want to use TCP_NODELAY? */ /* Do we want to use TCP_NODELAY? */
int tcp_not_use_nodelay; int tcp_not_use_nodelay;
@ -292,6 +372,20 @@ mca_btl_base_descriptor_t* mca_btl_tcp_prepare_src(
uint32_t flags uint32_t flags
); );
extern mca_btl_base_descriptor_t* mca_btl_tcp_prepare_dst(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* peer,
struct mca_mpool_base_registration_t*,
struct opal_convertor_t* convertor,
uint8_t order,
size_t reserve,
size_t* size,
uint32_t flags);
extern void
mca_btl_tcp_dump(struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
int verbose);
/** /**
* Fault Tolerance Event Notification Function * Fault Tolerance Event Notification Function

Просмотреть файл

@ -61,6 +61,8 @@
#include "opal/util/argv.h" #include "opal/util/argv.h"
#include "opal/util/net.h" #include "opal/util/net.h"
#include "opal/util/proc.h" #include "opal/util/proc.h"
#include "opal/util/net.h"
#include "opal/util/fd.h"
#include "opal/util/show_help.h" #include "opal/util/show_help.h"
#include "opal/constants.h" #include "opal/constants.h"
#include "opal/mca/btl/btl.h" #include "opal/mca/btl/btl.h"
@ -69,6 +71,11 @@
#include "opal/mca/btl/base/btl_base_error.h" #include "opal/mca/btl/base/btl_base_error.h"
#include "opal/mca/pmix/pmix.h" #include "opal/mca/pmix/pmix.h"
#include "ompi/constants.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/btl/base/base.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "btl_tcp.h" #include "btl_tcp.h"
#include "btl_tcp_addr.h" #include "btl_tcp_addr.h"
#include "btl_tcp_proc.h" #include "btl_tcp_proc.h"
@ -85,6 +92,16 @@ static int mca_btl_tcp_component_register(void);
static int mca_btl_tcp_component_open(void); static int mca_btl_tcp_component_open(void);
static int mca_btl_tcp_component_close(void); static int mca_btl_tcp_component_close(void);
opal_event_base_t* mca_btl_tcp_event_base = NULL;
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
static int mca_btl_tcp_progress_thread_trigger = -1;
int mca_btl_tcp_pipe_to_progress[2] = { -1, -1 };
static opal_thread_t mca_btl_tcp_progress_thread;
opal_list_t mca_btl_tcp_ready_frag_pending_queue;
opal_mutex_t mca_btl_tcp_ready_frag_mutex;
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
static char *mca_btl_tcp_if_seq_string;
mca_btl_tcp_component_t mca_btl_tcp_component = { mca_btl_tcp_component_t mca_btl_tcp_component = {
.super = { .super = {
/* First, the mca_base_component_t struct containing meta information /* First, the mca_base_component_t struct containing meta information
@ -102,6 +119,11 @@ mca_btl_tcp_component_t mca_btl_tcp_component = {
}, },
.btl_init = mca_btl_tcp_component_init, .btl_init = mca_btl_tcp_component_init,
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
mca_btl_tcp_component_progress,
#else
NULL,
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
} }
}; };
@ -165,8 +187,25 @@ struct mca_btl_tcp_event_t {
}; };
typedef struct mca_btl_tcp_event_t mca_btl_tcp_event_t; typedef struct mca_btl_tcp_event_t mca_btl_tcp_event_t;
OBJ_CLASS_INSTANCE( mca_btl_tcp_event_t, opal_list_item_t, static void mca_btl_tcp_event_construct(mca_btl_tcp_event_t* event)
NULL, NULL); {
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
opal_list_append(&mca_btl_tcp_component.tcp_events, &event->item);
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
}
static void mca_btl_tcp_event_destruct(mca_btl_tcp_event_t* event)
{
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
opal_list_remove_item(&mca_btl_tcp_component.tcp_events, &event->item);
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
}
OBJ_CLASS_INSTANCE(
mca_btl_tcp_event_t,
opal_list_item_t,
mca_btl_tcp_event_construct,
mca_btl_tcp_event_destruct);
/* /*
@ -306,6 +345,14 @@ static int mca_btl_tcp_component_open(void)
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_user, opal_free_list_t); OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_user, opal_free_list_t);
opal_proc_table_init(&mca_btl_tcp_component.tcp_procs, 16, 256); opal_proc_table_init(&mca_btl_tcp_component.tcp_procs, 16, 256);
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_user_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&mca_btl_tcp_ready_frag_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&mca_btl_tcp_ready_frag_pending_queue, opal_list_t);
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
/* if_include and if_exclude need to be mutually exclusive */ /* if_include and if_exclude need to be mutually exclusive */
if (OPAL_SUCCESS != if (OPAL_SUCCESS !=
mca_base_var_check_exclusive("ompi", mca_base_var_check_exclusive("ompi",
@ -330,8 +377,15 @@ static int mca_btl_tcp_component_open(void)
static int mca_btl_tcp_component_close(void) static int mca_btl_tcp_component_close(void)
{ {
if (NULL != mca_btl_tcp_component.tcp_btls) opal_list_item_t* item;
opal_list_item_t* next;
if (NULL != mca_btl_tcp_component.tcp_if_seq) {
free(mca_btl_tcp_component.tcp_if_seq);
}
if (NULL != mca_btl_tcp_component.tcp_btls) {
free(mca_btl_tcp_component.tcp_btls); free(mca_btl_tcp_component.tcp_btls);
}
if (mca_btl_tcp_component.tcp_listen_sd >= 0) { if (mca_btl_tcp_component.tcp_listen_sd >= 0) {
opal_event_del(&mca_btl_tcp_component.tcp_recv_event); opal_event_del(&mca_btl_tcp_component.tcp_recv_event);
@ -346,6 +400,18 @@ static int mca_btl_tcp_component_close(void)
} }
#endif #endif
/* cleanup any pending events */
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
for(item = opal_list_get_first(&mca_btl_tcp_component.tcp_events);
item != opal_list_get_end(&mca_btl_tcp_component.tcp_events);
item = next) {
mca_btl_tcp_event_t* event = (mca_btl_tcp_event_t*)item;
next = opal_list_get_next(item);
opal_event_del(&event->event);
OBJ_RELEASE(event);
}
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
/* release resources */ /* release resources */
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_procs); OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_procs);
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager); OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager);
@ -357,6 +423,40 @@ static int mca_btl_tcp_component_close(void)
mca_common_cuda_fini(); mca_common_cuda_fini();
#endif /* OPAL_CUDA_SUPPORT */ #endif /* OPAL_CUDA_SUPPORT */
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex);
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex);
if( (NULL != mca_btl_tcp_event_base) &&
(mca_btl_tcp_event_base != opal_sync_event_base) ) {
/* Turn of the progress thread before moving forward */
if( -1 != mca_btl_tcp_progress_thread_trigger ) {
mca_btl_tcp_progress_thread_trigger = 0;
/* Let the progress thread know that we're going away */
if( -1 != mca_btl_tcp_pipe_to_progress[1] ) {
close(mca_btl_tcp_pipe_to_progress[1]);
mca_btl_tcp_pipe_to_progress[1] = -1;
}
while( -1 != mca_btl_tcp_progress_thread_trigger ) {
/*event_base_loopbreak(mca_btl_tcp_event_base);*/
sched_yield();
usleep(100); /* give app a chance to re-enter library */
}
}
opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event);
opal_event_base_free(mca_btl_tcp_event_base);
mca_btl_tcp_event_base = NULL;
/* Close the remaining pipes */
if( -1 != mca_btl_tcp_pipe_to_progress[0] ) {
close(mca_btl_tcp_pipe_to_progress[0]);
mca_btl_tcp_pipe_to_progress[0] = -1;
}
}
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex);
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue);
#endif
return OPAL_SUCCESS; return OPAL_SUCCESS;
} }
@ -652,14 +752,70 @@ static int mca_btl_tcp_component_create_instances(void)
return ret; return ret;
} }
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
static void* mca_btl_tcp_progress_thread_engine(opal_object_t *obj)
{
opal_thread_t* current_thread = (opal_thread_t*)obj;
while( 1 == (*((int*)current_thread->t_arg)) ) {
opal_event_loop(mca_btl_tcp_event_base, OPAL_EVLOOP_ONCE);
}
(*((int*)current_thread->t_arg)) = -1;
return NULL;
}
static void mca_btl_tcp_component_event_async_handler(int fd, short unused, void *context)
{
opal_event_t* event;
int rc;
rc = read(fd, (void*)&event, sizeof(opal_event_t*));
assert( fd == mca_btl_tcp_pipe_to_progress[0] );
if( 0 == rc ) {
/* The main thread closed the pipe to trigger the shutdown procedure */
opal_thread_t* current_thread = (opal_thread_t*)context;
(*((int*)current_thread->t_arg)) = 0;
} else {
opal_event_add(event, 0);
}
}
int mca_btl_tcp_component_progress( void )
{
mca_btl_tcp_frag_t* frag;
int count = 0;
for( ;; ) {
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_ready_frag_mutex);
frag = (mca_btl_tcp_frag_t*)opal_list_remove_first(&mca_btl_tcp_ready_frag_pending_queue);
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_ready_frag_mutex);
if( NULL == frag ) break;
switch(frag->next_step) {
case MCA_BTL_TCP_FRAG_STEP_SEND_COMPLETE:
MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag);
count++;
break;
case MCA_BTL_TCP_FRAG_STEP_RECV_COMPLETE:
MCA_BTL_TCP_RECV_TRIGGER_CB(frag);
MCA_BTL_TCP_FRAG_RETURN(frag);
count++;
break;
default:
break;
}
}
return count;
}
#endif
/* /*
* Create a listen socket and bind to all interfaces * Create a listen socket and bind to all interfaces
*/ */
static int mca_btl_tcp_component_create_listen(uint16_t af_family) static int mca_btl_tcp_component_create_listen(uint16_t af_family)
{ {
int flags; int flags, sd, rc;
int sd;
struct sockaddr_storage inaddr; struct sockaddr_storage inaddr;
opal_socklen_t addrlen; opal_socklen_t addrlen;
@ -678,17 +834,16 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
#if OPAL_ENABLE_IPV6 #if OPAL_ENABLE_IPV6
{ {
struct addrinfo hints, *res = NULL; struct addrinfo hints, *res = NULL;
int error;
memset (&hints, 0, sizeof(hints)); memset (&hints, 0, sizeof(hints));
hints.ai_family = af_family; hints.ai_family = af_family;
hints.ai_socktype = SOCK_STREAM; hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; hints.ai_flags = AI_PASSIVE;
if ((error = getaddrinfo(NULL, "0", &hints, &res))) { if ((rc = getaddrinfo(NULL, "0", &hints, &res))) {
opal_output (0, opal_output (0,
"mca_btl_tcp_create_listen: unable to resolve. %s\n", "mca_btl_tcp_create_listen: unable to resolve. %s\n",
gai_strerror (error)); gai_strerror (rc));
CLOSE_THE_SOCKET(sd); CLOSE_THE_SOCKET(sd);
return OPAL_ERROR; return OPAL_ERROR;
} }
@ -816,25 +971,86 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
} }
} }
/* register listen port */ #if MCA_BTL_TCP_USES_PROGRESS_THREAD
/* Declare our intent to use threads. */
opal_event_use_threads();
if( NULL == mca_btl_tcp_event_base ) {
/* fall back to only one event base (the one shared by the entire Open MPI framework) */
mca_btl_tcp_event_base = opal_sync_event_base;
if( NULL == (mca_btl_tcp_event_base = opal_sync_event_base_create()) ) {
BTL_ERROR(("BTL TCP failed to create progress event base"));
goto move_forward_with_no_thread;
}
opal_event_base_priority_init(mca_btl_tcp_event_base, OPAL_EVENT_NUM_PRI);
/* construct the thread object */
OBJ_CONSTRUCT(&mca_btl_tcp_progress_thread, opal_thread_t);
/**
* Create a pipe to communicate between the main thread and the progress thread.
*/
if (0 != pipe(mca_btl_tcp_pipe_to_progress)) {
opal_event_base_free(mca_btl_tcp_event_base);
/* fall back to only one event base (the one shared by the entire Open MPI framework */
mca_btl_tcp_event_base = opal_sync_event_base;
mca_btl_tcp_progress_thread_trigger = -1; /* thread not started */
goto move_forward_with_no_thread;
}
/* setup the receiving end of the pipe as non-blocking */
if((flags = fcntl(mca_btl_tcp_pipe_to_progress[0], F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
} else {
flags |= O_NONBLOCK;
if(fcntl(mca_btl_tcp_pipe_to_progress[0], F_SETFL, flags) < 0)
BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
/* Progress thread event */
opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp_recv_thread_async_event,
mca_btl_tcp_pipe_to_progress[0],
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_component_event_async_handler,
&mca_btl_tcp_progress_thread );
opal_event_add(&mca_btl_tcp_component.tcp_recv_thread_async_event, 0);
/* fork off a thread to progress it */
mca_btl_tcp_progress_thread.t_run = mca_btl_tcp_progress_thread_engine;
mca_btl_tcp_progress_thread.t_arg = &mca_btl_tcp_progress_thread_trigger;
mca_btl_tcp_progress_thread_trigger = 1; /* thread up and running */
if( OPAL_SUCCESS != (rc = opal_thread_start(&mca_btl_tcp_progress_thread)) ) {
BTL_ERROR(("BTL TCP progress thread initialization failed (%d)", rc));
opal_event_base_free(mca_btl_tcp_event_base);
/* fall back to only one event base (the one shared by the entire Open MPI framework */
mca_btl_tcp_event_base = opal_sync_event_base;
mca_btl_tcp_progress_thread_trigger = -1; /* thread not started */
goto move_forward_with_no_thread;
}
}
move_forward_with_no_thread:
#else
mca_btl_tcp_event_base = opal_sync_event_base;
#endif
if (AF_INET == af_family) {
opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp_recv_event,
mca_btl_tcp_component.tcp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_component_accept_handler,
0 );
MCA_BTL_TCP_ACTIVATE_EVENT(&mca_btl_tcp_component.tcp_recv_event, 0);
}
#if OPAL_ENABLE_IPV6 #if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) { if (AF_INET6 == af_family) {
opal_event_set(opal_sync_event_base, &mca_btl_tcp_component.tcp6_recv_event, opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp6_recv_event,
mca_btl_tcp_component.tcp6_listen_sd, mca_btl_tcp_component.tcp6_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST, OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_component_accept_handler, mca_btl_tcp_component_accept_handler,
0 ); 0 );
opal_event_add(&mca_btl_tcp_component.tcp6_recv_event, 0); MCA_BTL_TCP_ACTIVATE_EVENT(&mca_btl_tcp_component.tcp6_recv_event, 0);
} else
#endif
{
opal_event_set(opal_sync_event_base, &mca_btl_tcp_component.tcp_recv_event,
mca_btl_tcp_component.tcp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_component_accept_handler,
0 );
opal_event_add(&mca_btl_tcp_component.tcp_recv_event, 0);
} }
#endif
return OPAL_SUCCESS; return OPAL_SUCCESS;
} }
@ -1048,9 +1264,11 @@ static void mca_btl_tcp_component_accept_handler( int incoming_sd,
} }
mca_btl_tcp_set_socket_options(sd); mca_btl_tcp_set_socket_options(sd);
assert( NULL != mca_btl_tcp_event_base );
/* wait for receipt of peers process identifier to complete this connection */ /* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW(mca_btl_tcp_event_t); event = OBJ_NEW(mca_btl_tcp_event_t);
opal_event_set(opal_sync_event_base, &event->event, sd, OPAL_EV_READ, mca_btl_tcp_component_recv_handler, event); opal_event_set(mca_btl_tcp_event_base, &(event->event), sd,
OPAL_EV_READ, mca_btl_tcp_component_recv_handler, event);
opal_event_add(&event->event, 0); opal_event_add(&event->event, 0);
} }
} }
@ -1112,3 +1330,14 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
(void)mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd); (void)mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd);
} }
/**
* Debugging infrastructure, absolutely not thread safe. Call with care.
*/
static void mca_btl_tcp_component_dump(void)
{
uint32_t i;
for( i = 0; i < mca_btl_tcp_component.tcp_num_btls; i++ ) {
mca_btl_tcp_dump( (mca_btl_base_module_t*)mca_btl_tcp_component.tcp_btls[i], NULL, 1 );
}
}

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology * University Research and Technology
* Corporation. All rights reserved. * Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University * Copyright (c) 2004-2012 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -61,7 +61,7 @@ struct mca_btl_base_endpoint_t {
struct mca_btl_tcp_frag_t* endpoint_send_frag; /**< current send frag being processed */ struct mca_btl_tcp_frag_t* endpoint_send_frag; /**< current send frag being processed */
struct mca_btl_tcp_frag_t* endpoint_recv_frag; /**< current recv frag being processed */ struct mca_btl_tcp_frag_t* endpoint_recv_frag; /**< current recv frag being processed */
mca_btl_tcp_state_t endpoint_state; /**< current state of the connection */ mca_btl_tcp_state_t endpoint_state; /**< current state of the connection */
size_t endpoint_retries; /**< number of connection retries attempted */ uint32_t endpoint_retries; /**< number of connection retries attempted */
opal_list_t endpoint_frags; /**< list of pending frags to send */ opal_list_t endpoint_frags; /**< list of pending frags to send */
opal_mutex_t endpoint_send_lock; /**< lock for concurrent access to endpoint state */ opal_mutex_t endpoint_send_lock; /**< lock for concurrent access to endpoint state */
opal_mutex_t endpoint_recv_lock; /**< lock for concurrent access to endpoint state */ opal_mutex_t endpoint_recv_lock; /**< lock for concurrent access to endpoint state */
@ -80,6 +80,7 @@ void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t*);
int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t*, struct mca_btl_tcp_frag_t*); int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t*, struct mca_btl_tcp_frag_t*);
void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t*, struct sockaddr*, int); void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t*, struct sockaddr*, int);
void mca_btl_tcp_endpoint_shutdown(mca_btl_base_endpoint_t*); void mca_btl_tcp_endpoint_shutdown(mca_btl_base_endpoint_t*);
void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg);
END_C_DECLS END_C_DECLS
#endif #endif

Просмотреть файл

@ -10,7 +10,6 @@
* University of Stuttgart. All rights reserved. * University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California. * Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved. * All rights reserved.
* Copyright (c) 2008-2012 Oracle and/or all its affiliates. All rights reserved.
* Copyright (c) 2014 Los Alamos National Security, LLC. All rights * Copyright (c) 2014 Los Alamos National Security, LLC. All rights
* reserved. * reserved.
* Copyright (c) 2015 Research Organization for Information Science * Copyright (c) 2015 Research Organization for Information Science
@ -151,6 +150,11 @@ bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
frag->iov_ptr->iov_base = (opal_iov_base_ptr_t) frag->iov_ptr->iov_base = (opal_iov_base_ptr_t)
(((unsigned char*)frag->iov_ptr->iov_base) + cnt); (((unsigned char*)frag->iov_ptr->iov_base) + cnt);
frag->iov_ptr->iov_len -= cnt; frag->iov_ptr->iov_len -= cnt;
#define GB_DEFINED 0
#if GB_DEFINED
opal_output(0, "%s:%d write %lu bytes from socket %d\n",
__FILE__, __LINE__, cnt, sd); /* GB */
#endif /* GB_DEFINED */
break; break;
} }
} }
@ -159,12 +163,14 @@ bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd) bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
{ {
int cnt, dont_copy_data = 0;
size_t i, num_vecs;
mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint; mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint;
int i, num_vecs, dont_copy_data = 0;
ssize_t cnt;
struct iovec *iov_ptr;
repeat: repeat:
num_vecs = frag->iov_cnt; num_vecs = frag->iov_cnt;
iov_ptr = &frag->iov[frag->iov_idx];
#if MCA_BTL_TCP_ENDPOINT_CACHE #if MCA_BTL_TCP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) { if( 0 != btl_endpoint->endpoint_cache_length ) {
size_t length; size_t length;
@ -265,11 +271,13 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
frag->iov[1].iov_len = frag->hdr.size; frag->iov[1].iov_len = frag->hdr.size;
frag->iov_cnt++; frag->iov_cnt++;
#ifndef __sparc #ifndef __sparc
#if !MCA_BTL_TCP_USES_PROGRESS_THREAD
/* The following cannot be done for sparc code /* The following cannot be done for sparc code
* because it causes alignment errors when accessing * because it causes alignment errors when accessing
* structures later on in the btl and pml code. * structures later on in the btl and pml code.
*/ */
dont_copy_data = 1; dont_copy_data = 1;
#endif
#endif #endif
goto repeat; goto repeat;
} }
@ -297,3 +305,20 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
} }
return false; return false;
} }
void mca_btl_tcp_dump_frag( mca_btl_tcp_frag_t* frag, char* msg )
{
int i;
mca_btl_base_err("%s %s frag %p (endpoint %p) size %lu iov_cnt %d iov_idx %d next_step %s rc %d\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg, (void*)frag, (void*)frag->endpoint, frag->size, frag->iov_cnt, frag->iov_idx,
(MCA_BTL_TCP_FRAG_STEP_UNDEFINED == frag->next_step ? "undefined" :
(MCA_BTL_TCP_FRAG_STEP_SEND_COMPLETE == frag->next_step ? "send_complete" :
(MCA_BTL_TCP_FRAG_STEP_RECV_COMPLETE == frag->next_step ? "recv_complete" : "unknown"))),
frag->rc);
for( i = 0; i < frag->iov_cnt; i++ ) {
mca_btl_base_err("%s %s | iov[%d] = {%p, %lu}\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg, i, frag->iov[i].iov_base, frag->iov[i].iov_len);
}
mca_btl_base_err("%s +\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology * University Research and Technology
* Corporation. All rights reserved. * Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University * Copyright (c) 2004-2012 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology * University Research and Technology
* Corporation. All rights reserved. * Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University * Copyright (c) 2004-2012 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,