1
1

Merge pull request #1491 from ICLDisco/progress_thread

BTL TCP async progress
Этот коммит содержится в:
Jeff Squyres 2016-03-29 06:26:10 -04:00
родитель 8b554779d7 f69eba1bc4
Коммит 91c54d7a07
17 изменённых файлов: 689 добавлений и 249 удалений

Просмотреть файл

@ -108,6 +108,7 @@ sylvain.jeaugey@bull.net Sylvain Jeaugey Bull
terry.dontje@oracle.com Terry Dontje Sun, Oracle
thkorde@sandia.gov Todd Kordenbrock SNL
tmattox@gmail.com Tim Mattox IU, Cisco
tpatinya@vols.utk.edu Thananon Patinyasakdikul UTK
tprins@lanl.gov Tim Prins IU, LANL
twoodall@lanl.gov Tim Woodall LANL
vasily@mellanox.com Vasily Filipov Mellanox

Просмотреть файл

@ -32,7 +32,10 @@
#include "opal/datatype/opal_convertor.h"
#include "ompi/mca/mpool/base/base.h"
#include "ompi/mca/mpool/mpool.h"
#include "ompi/proc/proc.h"
#include "btl_tcp.h"
#include "btl_tcp_frag.h"
#include "btl_tcp_proc.h"
#include "btl_tcp_endpoint.h"
mca_btl_tcp2_module_t mca_btl_tcp2_module = {
{
@ -57,9 +60,9 @@ mca_btl_tcp2_module_t mca_btl_tcp2_module = {
mca_btl_tcp2_prepare_dst,
mca_btl_tcp2_send,
NULL, /* send immediate */
mca_btl_tcp2_put,
mca_btl_tcp_put,
NULL, /* get */
mca_btl_base_dump,
mca_btl_tcp_dump,
NULL, /* mpool */
NULL, /* register error */
mca_btl_tcp2_ft_event
@ -134,7 +137,9 @@ int mca_btl_tcp2_add_procs( struct mca_btl_base_module_t* btl,
/* we increase the count of MPI users of the event library
once per peer, so that we are used until we aren't
connected to a peer */
#if !MCA_BTL_TCP_USES_PROGRESS_THREAD
opal_progress_event_users_increment();
#endif /* !MCA_BTL_TCP_USES_PROGRESS_THREAD */
}
return OMPI_SUCCESS;
@ -153,7 +158,9 @@ int mca_btl_tcp2_del_procs(struct mca_btl_base_module_t* btl,
opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);
OBJ_RELEASE(tcp_endpoint);
}
#if !MCA_BTL_TCP_USES_PROGRESS_THREAD
opal_progress_event_users_decrement();
#endif /* !MCA_BTL_TCP_USES_PROGRESS_THREAD */
}
return OMPI_SUCCESS;
}
@ -193,7 +200,8 @@ mca_btl_base_descriptor_t* mca_btl_tcp2_alloc(
frag->base.des_dst_cnt = 0;
frag->base.des_flags = flags;
frag->base.order = MCA_BTL_NO_ORDER;
frag->btl = (mca_btl_tcp2_module_t*)btl;
frag->btl = (mca_btl_tcp_module_t*)btl;
frag->endpoint = endpoint;
return (mca_btl_base_descriptor_t*)frag;
}
@ -384,7 +392,7 @@ int mca_btl_tcp2_send( struct mca_btl_base_module_t* btl,
frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_SEND;
frag->hdr.count = 0;
if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
return mca_btl_tcp2_endpoint_send(endpoint,frag);
return mca_btl_tcp_endpoint_send(endpoint,frag);
}
@ -425,7 +433,7 @@ int mca_btl_tcp2_put( mca_btl_base_module_t* btl,
frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT;
frag->hdr.count = frag->base.des_dst_cnt;
if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
return ((i = mca_btl_tcp2_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : i);
return ((i = mca_btl_tcp_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : i);
}
@ -462,12 +470,13 @@ int mca_btl_tcp2_get(
frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_GET;
frag->hdr.count = frag->base.des_src_cnt;
if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
return ((rc = mca_btl_tcp2_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : rc);
return ((rc = mca_btl_tcp_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : rc);
}
/*
* Cleanup/release module resources.
* Cleanup/release module resources. This function should only be called once,
* there is no need to protect it.
*/
int mca_btl_tcp2_finalize(struct mca_btl_base_module_t* btl)
@ -479,8 +488,42 @@ int mca_btl_tcp2_finalize(struct mca_btl_base_module_t* btl)
item = opal_list_remove_first(&tcp_btl->tcp_endpoints)) {
mca_btl_tcp2_endpoint_t *endpoint = (mca_btl_tcp2_endpoint_t*)item;
OBJ_RELEASE(endpoint);
#if !MCA_BTL_TCP_USES_PROGRESS_THREAD
opal_progress_event_users_decrement();
#endif /* !MCA_BTL_TCP_USES_PROGRESS_THREAD */
}
free(tcp_btl);
return OMPI_SUCCESS;
}
/**
*
*/
void mca_btl_tcp_dump(struct mca_btl_base_module_t* base_btl,
struct mca_btl_base_endpoint_t* endpoint,
int verbose)
{
mca_btl_tcp_module_t* btl = (mca_btl_tcp_module_t*)base_btl;
mca_btl_base_err("%s TCP %p kernel_id %d\n"
#if MCA_BTL_TCP_STATISTICS
" | statistics: sent %lu recv %lu\n"
#endif /* MCA_BTL_TCP_STATISTICS */
" | latency %u bandwidth %u\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (void*)btl, btl->tcp_ifkindex,
#if MCA_BTL_TCP_STATISTICS
btl->tcp_bytes_sent, btl->btl_bytes_recv,
#endif /* MCA_BTL_TCP_STATISTICS */
btl->super.btl_latency, btl->super.btl_bandwidth);
if( NULL != endpoint ) {
mca_btl_tcp_endpoint_dump( endpoint, "TCP" );
} else if( verbose ) {
opal_list_item_t *item;
for(item = opal_list_get_first(&btl->tcp_endpoints);
item != opal_list_get_end(&btl->tcp_endpoints);
item = opal_list_get_next(item)) {
mca_btl_tcp_endpoint_dump( (mca_btl_base_endpoint_t*)item, "TCP" );
}
}
}

Просмотреть файл

@ -49,18 +49,15 @@
#include <time.h>
#endif /* HAVE_TIME_H */
#include "opal/mca/event/event.h"
#include "ompi/types.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "opal/util/net.h"
#include "opal/util/fd.h"
#include "opal/util/show_help.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "ompi/mca/rte/rte.h"
#include "btl_tcp2.h"
#include "btl_tcp2_endpoint.h"
#include "btl_tcp2_proc.h"
#include "btl_tcp2_frag.h"
#include "btl_tcp2_addr.h"
#include "btl_tcp_endpoint.h"
#include "btl_tcp_proc.h"
#include "btl_tcp_frag.h"
/*
* Initialize state of the endpoint instance.
@ -123,12 +120,10 @@ static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user);
* diagnostics
*/
#if WANT_PEER_DUMP
static void mca_btl_tcp2_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg)
void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg)
{
char src[64];
char dst[64];
int sndbuf,rcvbuf,nodelay,flags;
char src[64], dst[64], *status;
int sndbuf, rcvbuf, nodelay, flags = -1;
#if OPAL_ENABLE_IPV6
struct sockaddr_storage inaddr;
#else
@ -136,69 +131,102 @@ static void mca_btl_tcp2_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, co
#endif
opal_socklen_t obtlen;
opal_socklen_t addrlen = sizeof(inaddr);
opal_list_item_t *item;
getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
if( -1 != btl_endpoint->endpoint_sd ) {
getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
#if OPAL_ENABLE_IPV6
{
char *address;
address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
if (NULL != address) {
sprintf(src, "%s", address);
{
char *address;
address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
if (NULL != address) {
sprintf(src, "%s", address);
}
}
}
#else
sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
#endif
getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
#if OPAL_ENABLE_IPV6
{
char *address;
address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
if (NULL != address) {
sprintf(dst, "%s", address);
{
char *address;
address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
if (NULL != address) {
sprintf(dst, "%s", address);
}
}
}
#else
sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
#endif
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#if defined(SO_SNDBUF)
obtlen = sizeof(sndbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
BTL_ERROR(("SO_SNDBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
obtlen = sizeof(sndbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
BTL_ERROR(("SO_SNDBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#else
sndbuf = -1;
sndbuf = -1;
#endif
#if defined(SO_RCVBUF)
obtlen = sizeof(rcvbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
BTL_ERROR(("SO_RCVBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
obtlen = sizeof(rcvbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
BTL_ERROR(("SO_RCVBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#else
rcvbuf = -1;
rcvbuf = -1;
#endif
#if defined(TCP_NODELAY)
obtlen = sizeof(nodelay);
if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
BTL_ERROR(("TCP_NODELAY option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
obtlen = sizeof(nodelay);
if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
BTL_ERROR(("TCP_NODELAY option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#else
nodelay = 0;
nodelay = 0;
#endif
}
BTL_VERBOSE(("%s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x",
msg, src, dst, nodelay, sndbuf, rcvbuf, flags));
mca_btl_base_err("%s %s: endpoint %p src %s - dst %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg, (void*)btl_endpoint, src, dst, nodelay, sndbuf, rcvbuf, flags);
switch(btl_endpoint->endpoint_state) {
case MCA_BTL_TCP_CONNECTING:
status = "connecting"; break;
case MCA_BTL_TCP_CONNECT_ACK:
status = "connect ack"; break;
case MCA_BTL_TCP_CLOSED:
status = "closed"; break;
case MCA_BTL_TCP_FAILED:
status = "failed"; break;
case MCA_BTL_TCP_CONNECTED:
status = "connected"; break;
default:
status = "undefined"; break;
}
mca_btl_base_err("%s | [socket %d] [state %s] (nbo %s) (retries %u)\n"
#if MCA_BTL_TCP_ENDPOINT_CACHE
"\tcache %p length %lu pos %ld\n"
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
"\tpending: send %p recv %p\n",
msg, btl_endpoint->endpoint_sd, status,
(btl_endpoint->endpoint_nbo ? "true" : "false"), btl_endpoint->endpoint_retries,
#if MCA_BTL_TCP_ENDPOINT_CACHE
btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_length, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
(void*)btl_endpoint->endpoint_send_frag, (void*)btl_endpoint->endpoint_recv_frag );
for(item = opal_list_get_first(&btl_endpoint->endpoint_frags);
item != opal_list_get_end(&btl_endpoint->endpoint_frags);
item = opal_list_get_next(item)) {
mca_btl_tcp_dump_frag( (mca_btl_tcp_frag_t*)item, " | send" );
}
}
#endif
/*
* Initialize events to be used by the endpoint instance for TCP select/poll callbacks.
@ -211,22 +239,22 @@ static inline void mca_btl_tcp2_endpoint_event_init(mca_btl_base_endpoint_t* btl
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
opal_event_set(opal_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp2_endpoint_recv_handler,
btl_endpoint );
opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_endpoint_recv_handler,
btl_endpoint );
/**
* The send event should be non persistent until the endpoint is
* completely connected. This means, when the event is created it
* will be fired only once, and when the endpoint is marked as
* CONNECTED the event should be recreated with the correct flags.
*/
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE,
mca_btl_tcp2_endpoint_send_handler,
btl_endpoint);
opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE,
mca_btl_tcp_endpoint_send_handler,
btl_endpoint);
}
@ -239,7 +267,7 @@ int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tc
{
int rc = OMPI_SUCCESS;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_endpoint->endpoint_send_lock);
switch(btl_endpoint->endpoint_state) {
case MCA_BTL_TCP_CONNECTING:
case MCA_BTL_TCP_CONNECT_ACK:
@ -257,19 +285,13 @@ int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tc
if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd)) {
int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
if( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ) {
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
}
if( btl_ownership ) {
MCA_BTL_TCP_FRAG_RETURN(frag);
}
opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag);
return 1;
} else {
btl_endpoint->endpoint_send_frag = frag;
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
}
} else {
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
@ -277,7 +299,7 @@ int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tc
}
break;
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_endpoint->endpoint_send_lock);
return rc;
}
@ -338,22 +360,20 @@ static int mca_btl_tcp2_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_e
bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
struct sockaddr* addr, int sd)
{
mca_btl_tcp2_proc_t* this_proc = mca_btl_tcp2_proc_local();
mca_btl_tcp2_proc_t *endpoint_proc = btl_endpoint->endpoint_proc;
mca_btl_tcp_proc_t *endpoint_proc = btl_endpoint->endpoint_proc;
const orte_process_name_t *this_proc = &(ompi_proc_local()->proc_name);
int cmpval;
if(NULL == btl_endpoint->endpoint_addr) {
return false;
}
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
if(NULL == btl_endpoint->endpoint_addr) {
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return false;
}
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
cmpval = ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
&endpoint_proc->proc_ompi->proc_name,
&this_proc->proc_ompi->proc_name);
this_proc);
if((btl_endpoint->endpoint_sd < 0) ||
(btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
cmpval < 0)) {
@ -365,9 +385,12 @@ bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return false;
}
mca_btl_tcp2_endpoint_event_init(btl_endpoint);
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
mca_btl_tcp2_endpoint_connected(btl_endpoint);
mca_btl_tcp_endpoint_event_init(btl_endpoint);
/* NOT NEEDED if we remove the PERSISTENT flag when we create the
* first recv_event.
*/
opal_event_add(&btl_endpoint->endpoint_recv_event, 0); /* TODO */
mca_btl_tcp_endpoint_connected(btl_endpoint);
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
mca_btl_tcp2_endpoint_dump(btl_endpoint, "accepted");
#endif
@ -388,16 +411,19 @@ bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
*/
void mca_btl_tcp2_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
{
if(btl_endpoint->endpoint_sd < 0)
return;
btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
int sd = btl_endpoint->endpoint_sd;
do {
if( sd < 0 ) return;
} while ( opal_atomic_cmpset( &(btl_endpoint->endpoint_sd), sd, -1 ) );
CLOSE_THE_SOCKET(sd);
btl_endpoint->endpoint_retries++;
opal_event_del(&btl_endpoint->endpoint_recv_event);
opal_event_del(&btl_endpoint->endpoint_send_event);
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
btl_endpoint->endpoint_sd = -1;
#if MCA_BTL_TCP_ENDPOINT_CACHE
free( btl_endpoint->endpoint_cache );
if( NULL != btl_endpoint->endpoint_cache )
free( btl_endpoint->endpoint_cache );
btl_endpoint->endpoint_cache = NULL;
btl_endpoint->endpoint_cache_pos = NULL;
btl_endpoint->endpoint_cache_length = 0;
@ -417,16 +443,17 @@ static void mca_btl_tcp2_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoin
btl_endpoint->endpoint_retries = 0;
/* Create the send event in a persistent manner. */
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE | OPAL_EV_PERSIST,
mca_btl_tcp2_endpoint_send_handler,
btl_endpoint );
opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE | OPAL_EV_PERSIST,
mca_btl_tcp_endpoint_send_handler,
btl_endpoint );
if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
if(NULL == btl_endpoint->endpoint_send_frag)
btl_endpoint->endpoint_send_frag = (mca_btl_tcp2_frag_t*)
if(NULL == btl_endpoint->endpoint_send_frag) {
btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
opal_list_remove_first(&btl_endpoint->endpoint_frags);
}
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
}
}
@ -578,7 +605,7 @@ static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endp
/* non-blocking so wait for completion */
if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
return OMPI_SUCCESS;
}
{
@ -597,7 +624,7 @@ static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endp
/* send our globally unique process identifier to the endpoint */
if((rc = mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint)) == OMPI_SUCCESS) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_recv_event, 0);
} else {
mca_btl_tcp2_endpoint_close(btl_endpoint);
}
@ -688,9 +715,12 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
mca_btl_tcp2_frag_t* frag;
frag = btl_endpoint->endpoint_recv_frag;
data_still_pending_on_endpoint:
if(NULL == frag) {
if(mca_btl_tcp2_module.super.btl_max_send_size >
mca_btl_tcp2_module.super.btl_eager_limit) {
if(mca_btl_tcp_module.super.btl_max_send_size >
mca_btl_tcp_module.super.btl_eager_limit) {
MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
} else {
MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
@ -703,30 +733,32 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
}
#if MCA_BTL_TCP_ENDPOINT_CACHE
assert( 0 == btl_endpoint->endpoint_cache_length );
data_still_pending_on_endpoint:
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
/* check for completion of non-blocking recv on the current fragment */
if(mca_btl_tcp2_frag_recv(frag, btl_endpoint->endpoint_sd) == false) {
if( mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false ) {
btl_endpoint->endpoint_recv_frag = frag;
} else {
btl_endpoint->endpoint_recv_frag = NULL;
if( MCA_BTL_TCP_HDR_TYPE_SEND == frag->hdr.type ) {
mca_btl_active_message_callback_t* reg;
reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag;
reg->cbfunc(&frag->btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata);
}
TODO_MCA_BTL_TCP_RECV_TRIGGER_CB(frag);
#if MCA_BTL_TCP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) {
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
/* Get a new fragment and try again */
frag = NULL;
#else
/* If the cache still contain some data we can reuse the same fragment
* until we flush it completly.
*/
MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
goto data_still_pending_on_endpoint;
}
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
#if !MCA_BTL_TCP_USES_PROGRESS_THREAD
MCA_BTL_TCP_FRAG_RETURN(frag);
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
}
#if MCA_BTL_TCP_ENDPOINT_CACHE
assert( 0 == btl_endpoint->endpoint_cache_length );
@ -741,12 +773,13 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
* of the MPI_Finalize. The first one will close the connections,
* and all others will complain.
*/
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
break;
default:
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
mca_btl_tcp2_endpoint_close(btl_endpoint);
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
mca_btl_tcp_endpoint_close(btl_endpoint);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
break;
}
}
@ -759,8 +792,8 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user)
{
mca_btl_tcp2_endpoint_t* btl_endpoint = (mca_btl_tcp2_endpoint_t *)user;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
mca_btl_tcp_endpoint_t* btl_endpoint = (mca_btl_tcp_endpoint_t *)user;
opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock);
switch(btl_endpoint->endpoint_state) {
case MCA_BTL_TCP_CONNECTING:
mca_btl_tcp2_endpoint_complete_connect(btl_endpoint);
@ -779,17 +812,13 @@ static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user)
opal_list_remove_first(&btl_endpoint->endpoint_frags);
/* if required - update request status and release fragment */
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK );
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
if( btl_ownership ) {
MCA_BTL_TCP_FRAG_RETURN(frag);
}
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
TODO_MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag);
opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock);
}
/* if nothing else to do unregister for send event notifications */
/* if no more data to send unregister the send notifications */
if(NULL == btl_endpoint->endpoint_send_frag) {
opal_event_del(&btl_endpoint->endpoint_send_event);
}
@ -799,7 +828,7 @@ static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user)
opal_event_del(&btl_endpoint->endpoint_send_event);
break;
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
}

Просмотреть файл

@ -41,6 +41,14 @@ BEGIN_C_DECLS
#define MCA_BTL_TCP_FRAG_IOVEC_NUMBER 4
/**
* Commands for the threaded version when the fragments must be completed
* by one of the MPI bounded threads.
*/
#define MCA_BTL_TCP_FRAG_STEP_UNDEFINED ((uint16_t)0x0000)
#define MCA_BTL_TCP_FRAG_STEP_SEND_COMPLETE ((uint16_t)0x0001)
#define MCA_BTL_TCP_FRAG_STEP_RECV_COMPLETE ((uint16_t)0x0002)
/**
* TCP fragment derived type.
*/
@ -82,49 +90,77 @@ OBJ_CLASS_DECLARATION(mca_btl_tcp2_frag_user_t);
#define MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag) \
{ \
ompi_free_list_item_t *item; \
OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_eager, item); \
frag = (mca_btl_tcp2_frag_t*) item; \
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_eager_mutex); \
OMPI_FREE_LIST_GET_MT(&mca_btl_tcp_component.tcp_frag_eager, item); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_eager_mutex); \
frag = (mca_btl_tcp_frag_t*) item; \
}
#define MCA_BTL_TCP_FRAG_ALLOC_MAX(frag) \
{ \
ompi_free_list_item_t *item; \
OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_max, item); \
frag = (mca_btl_tcp2_frag_t*) item; \
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_max_mutex); \
OMPI_FREE_LIST_GET_MT(&mca_btl_tcp_component.tcp_frag_max, item); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_max_mutex); \
frag = (mca_btl_tcp_frag_t*) item; \
}
#define MCA_BTL_TCP_FRAG_ALLOC_USER(frag) \
{ \
ompi_free_list_item_t *item; \
OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_user, item); \
frag = (mca_btl_tcp2_frag_t*) item; \
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_user_mutex); \
OMPI_FREE_LIST_GET_MT(&mca_btl_tcp_component.tcp_frag_user, item); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_user_mutex); \
frag = (mca_btl_tcp_frag_t*) item; \
}
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
#define MCA_BTL_TCP_FRAG_RETURN(frag) \
{ \
OMPI_FREE_LIST_RETURN(frag->my_list, (ompi_free_list_item_t*)(frag)); \
(frag)->next_step = MCA_BTL_TCP_FRAG_STEP_UNDEFINED; \
if( frag->my_list == &mca_btl_tcp_component.tcp_frag_eager ) { \
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_eager_mutex); \
OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_eager_mutex); \
} else if( frag->my_list == &mca_btl_tcp_component.tcp_frag_max ) { \
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_max_mutex); \
OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_max_mutex); \
} else { \
assert( frag->my_list == &mca_btl_tcp_component.tcp_frag_user ); \
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_user_mutex); \
OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_user_mutex); \
} \
}
#else
#define MCA_BTL_TCP_FRAG_RETURN(frag) \
{ \
(frag)->next_step = MCA_BTL_TCP_FRAG_STEP_UNDEFINED; \
OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \
}
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
#define MCA_BTL_TCP_FRAG_INIT_DST(frag,ep) \
do { \
frag->rc = 0; \
frag->btl = ep->endpoint_btl; \
frag->base.des_src = NULL; \
frag->base.des_src_cnt = 0; \
frag->base.des_dst = frag->segments; \
frag->base.des_dst_cnt = 1; \
frag->endpoint = ep; \
frag->iov[0].iov_len = sizeof(frag->hdr); \
frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr; \
frag->iov_cnt = 1; \
frag->iov_idx = 0; \
frag->iov_ptr = frag->iov; \
frag->base.des_src = NULL; \
frag->base.des_dst_cnt = 0; \
frag->base.des_dst = frag->segments; \
frag->base.des_dst_cnt = 1; \
frag->rc = 0; \
} while(0)
bool mca_btl_tcp2_frag_send(mca_btl_tcp2_frag_t*, int sd);
bool mca_btl_tcp2_frag_recv(mca_btl_tcp2_frag_t*, int sd);
void mca_btl_tcp_dump_frag( mca_btl_tcp_frag_t* frag, char* msg );
END_C_DECLS
#endif

Просмотреть файл

@ -3,6 +3,7 @@
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2010 The University of Tennessee and The University
* Copyright (c) 2004-2012 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -77,10 +78,10 @@ void mca_btl_tcp2_proc_construct(mca_btl_tcp2_proc_t* tcp_proc)
void mca_btl_tcp2_proc_destruct(mca_btl_tcp2_proc_t* tcp_proc)
{
/* remove from list of all proc instances */
OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock);
opal_proc_table_remove_value(&mca_btl_tcp2_component.tcp_procs,
tcp_proc->proc_ompi->proc_name);
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock);
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
opal_hash_table_remove_value_uint64(&mca_btl_tcp_component.tcp_procs,
ompi_rte_hash_name(&tcp_proc->proc_ompi->proc_name));
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
/* release resources */
if(NULL != tcp_proc->proc_endpoints) {
@ -103,11 +104,11 @@ mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_create(ompi_proc_t* ompi_proc)
size_t size;
mca_btl_tcp2_proc_t* btl_proc;
OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock);
rc = opal_proc_table_get_value(&mca_btl_tcp2_component.tcp_procs,
ompi_proc->proc_name, (void**)&btl_proc);
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
rc = opal_hash_table_get_value_uint64(&mca_btl_tcp_component.tcp_procs,
hash, (void**)&btl_proc);
if(OMPI_SUCCESS == rc) {
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock);
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
return btl_proc;
}
@ -117,9 +118,9 @@ mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_create(ompi_proc_t* ompi_proc)
btl_proc->proc_ompi = ompi_proc;
/* add to hash table of all proc instance */
opal_proc_table_set_value(&mca_btl_tcp2_component.tcp_procs,
ompi_proc->proc_name, btl_proc);
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock);
opal_hash_table_set_value_uint64(&mca_btl_tcp_component.tcp_procs,
hash, btl_proc);
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
/* lookup tcp parameters exported by this proc */
rc = ompi_modex_recv( &mca_btl_tcp2_component.super.btl_version,
@ -681,8 +682,8 @@ int mca_btl_tcp2_proc_insert( mca_btl_tcp2_proc_t* btl_proc,
int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t* btl_proc, mca_btl_base_endpoint_t* btl_endpoint)
{
size_t i;
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
for(i=0; i<btl_proc->proc_endpoint_count; i++) {
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_proc->proc_lock);
for( i = 0; i < btl_proc->proc_endpoint_count; i++ ) {
if(btl_proc->proc_endpoints[i] == btl_endpoint) {
memmove(btl_proc->proc_endpoints+i, btl_proc->proc_endpoints+i+1,
(btl_proc->proc_endpoint_count-i-1)*sizeof(mca_btl_base_endpoint_t*));
@ -700,7 +701,7 @@ int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t* btl_proc, mca_btl_base_endpoin
break;
}
}
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_proc->proc_lock);
return OMPI_SUCCESS;
}
@ -710,11 +711,11 @@ int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t* btl_proc, mca_btl_base_endpoin
*/
mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_lookup(const orte_process_name_t *name)
{
mca_btl_tcp2_proc_t* proc = NULL;
OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock);
opal_proc_table_get_value(&mca_btl_tcp2_component.tcp_procs,
name->proc_name, (void**)&proc);
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock);
mca_btl_tcp_proc_t* proc = NULL;
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
opal_hash_table_get_value_uint64(&mca_btl_tcp_component.tcp_procs,
ompi_rte_hash_name(name), (void**)&proc);
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
return proc;
}
@ -725,7 +726,7 @@ mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_lookup(const orte_process_name_t *name)
bool mca_btl_tcp2_proc_accept(mca_btl_tcp2_proc_t* btl_proc, struct sockaddr* addr, int sd)
{
size_t i;
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_proc->proc_lock);
for( i = 0; i < btl_proc->proc_endpoint_count; i++ ) {
mca_btl_base_endpoint_t* btl_endpoint = btl_proc->proc_endpoints[i];
/* Check all conditions before going to try to accept the connection. */
@ -754,12 +755,12 @@ bool mca_btl_tcp2_proc_accept(mca_btl_tcp2_proc_t* btl_proc, struct sockaddr* ad
;
}
if(mca_btl_tcp2_endpoint_accept(btl_endpoint, addr, sd)) {
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
if(mca_btl_tcp_endpoint_accept(btl_endpoint, addr, sd)) {
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_proc->proc_lock);
return true;
}
}
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_proc->proc_lock);
return false;
}

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2013 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -89,7 +89,7 @@ typedef struct mca_pml_ob1_t mca_pml_ob1_t;
extern mca_pml_ob1_t mca_pml_ob1;
extern int mca_pml_ob1_output;
extern bool mca_pml_ob1_matching_protection;
/*
* PML interface functions.
*/
@ -261,6 +261,24 @@ do { \
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock); \
} while(0)
#define OB1_MATCHING_LOCK(lock) \
do { \
if( mca_pml_ob1_matching_protection ) { \
opal_mutex_lock(lock); \
} \
else { OPAL_THREAD_LOCK(lock); } \
} while(0)
#define OB1_MATCHING_UNLOCK(lock) \
do { \
if( mca_pml_ob1_matching_protection ) { \
opal_mutex_unlock(lock); \
} \
else { OPAL_THREAD_UNLOCK(lock); } \
} while(0)
int mca_pml_ob1_send_fin(ompi_proc_t* proc, mca_bml_base_btl_t* bml_btl,
opal_ptr_t hdr_frag, uint64_t size, uint8_t order, int status);

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2009 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -54,6 +54,7 @@ mca_pml_ob1_component_init( int* priority, bool enable_progress_threads,
static int mca_pml_ob1_component_fini(void);
int mca_pml_ob1_output = 0;
static int mca_pml_ob1_verbose = 0;
bool mca_pml_ob1_matching_protection = false;
mca_pml_base_component_2_0_0_t mca_pml_ob1_component = {
/* First, the mca_base_component_t struct containing meta
@ -277,10 +278,15 @@ mca_pml_ob1_component_init( int* priority,
OPAL_LIST_FOREACH(selected_btl, &mca_btl_base_modules_initialized, mca_btl_base_selected_module_t) {
mca_btl_base_module_t *btl = selected_btl->btl_module;
if (btl->btl_flags & MCA_BTL_FLAGS_BTL_PROGRESS_THREAD_ENABLED) {
mca_pml_ob1_matching_protection = true;
}
if (btl->btl_flags & MCA_BTL_FLAGS_SINGLE_ADD_PROCS) {
mca_pml_ob1.super.pml_flags |= MCA_PML_BASE_FLAG_REQUIRE_WORLD;
break;
}
}
/* Set this here (vs in component_open()) because

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2013 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2007 High Performance Computing Center Stuttgart,
@ -161,7 +161,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
* end points) from being processed, and potentially "loosing"
* the fragment.
*/
OPAL_THREAD_LOCK(&comm->matching_lock);
OB1_MATCHING_LOCK(&comm->matching_lock);
/* get sequence number of next message that can be processed */
if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) ||
@ -194,7 +194,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
/* release matching lock before processing fragment */
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OB1_MATCHING_UNLOCK(&comm->matching_lock);
if(OPAL_LIKELY(match)) {
bytes_received = segments->seg_len - OMPI_PML_OB1_MATCH_HDR_LEN;
@ -247,7 +247,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
return;
slow_path:
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OB1_MATCHING_UNLOCK(&comm->matching_lock);
mca_pml_ob1_recv_frag_match(btl, hdr, segments,
num_segments, MCA_PML_OB1_HDR_TYPE_MATCH);
}
@ -668,7 +668,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
* end points) from being processed, and potentially "loosing"
* the fragment.
*/
OPAL_THREAD_LOCK(&comm->matching_lock);
OB1_MATCHING_LOCK(&comm->matching_lock);
/* get sequence number of next message that can be processed */
next_msg_seq_expected = (uint16_t)proc->expected_sequence;
@ -704,7 +704,7 @@ out_of_order_match:
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
/* release matching lock before processing fragment */
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OB1_MATCHING_UNLOCK(&comm->matching_lock);
if(OPAL_LIKELY(match)) {
switch(type) {
@ -729,7 +729,7 @@ out_of_order_match:
* may now be used to form new matchs
*/
if(OPAL_UNLIKELY(opal_list_get_size(&proc->frags_cant_match) > 0)) {
OPAL_THREAD_LOCK(&comm->matching_lock);
OB1_MATCHING_LOCK(&comm->matching_lock);
if((frag = check_cantmatch_for_match(proc))) {
hdr = &frag->hdr.hdr_match;
segments = frag->segments;
@ -738,7 +738,7 @@ out_of_order_match:
type = hdr->hdr_common.hdr_type;
goto out_of_order_match;
}
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OB1_MATCHING_UNLOCK(&comm->matching_lock);
}
return OMPI_SUCCESS;
@ -749,7 +749,7 @@ wrong_seq:
*/
append_frag_to_list(&proc->frags_cant_match, btl, hdr, segments,
num_segments, NULL);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OB1_MATCHING_UNLOCK(&comm->matching_lock);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2013 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
@ -104,7 +104,7 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
mca_pml_ob1_comm_t *ob1_comm = comm->c_pml_comm;
/* The rest should be protected behind the match logic lock */
OPAL_THREAD_LOCK(&ob1_comm->matching_lock);
OB1_MATCHING_LOCK(&ob1_comm->matching_lock);
if( true == request->req_match_received ) { /* way to late to cancel this one */
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
assert( OMPI_ANY_TAG != ompi_request->req_status.MPI_TAG ); /* not matched isn't it */
@ -124,7 +124,7 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
* to true. Otherwise, the request will never be freed.
*/
request->req_recv.req_base.req_pml_complete = true;
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request->req_status._cancelled = true;
@ -1177,7 +1177,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
MCA_PML_BASE_RECV_START(&req->req_recv.req_base);
OPAL_THREAD_LOCK(&ob1_comm->matching_lock);
OB1_MATCHING_LOCK(&ob1_comm->matching_lock);
/**
* The laps of time between the ACTIVATE event and the SEARCH_UNEX one include
* the cost of the request lock.
@ -1219,7 +1219,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
it when the message comes in. */
append_recv_req_to_queue(queue, req);
req->req_match_received = false;
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
} else {
if(OPAL_LIKELY(!IS_PROB_REQ(req))) {
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_MATCH_UNEX,
@ -1237,7 +1237,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
opal_list_remove_item(&proc->unexpected_frags,
(opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
switch(hdr->hdr_common.hdr_type) {
case MCA_PML_OB1_HDR_TYPE_MATCH:
@ -1267,14 +1267,14 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
restarted with this request during mrecv */
opal_list_remove_item(&proc->unexpected_frags,
(opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
req->req_recv.req_base.req_addr = frag;
mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
frag->segments, frag->num_segments);
} else {
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
frag->segments, frag->num_segments);
}

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2008 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -241,6 +241,9 @@ typedef uint8_t mca_btl_base_tag_t;
* BTLs should not set this flag. */
#define MCA_BTL_FLAGS_SINGLE_ADD_PROCS 0x20000
/* The BTL is using progress thread and need the protection on matching */
#define MCA_BTL_FLAGS_BTL_PROGRESS_THREAD_ENABLED 0x40000
/* Default exclusivity levels */
#define MCA_BTL_EXCLUSIVITY_HIGH (64*1024) /* internal loopback */
#define MCA_BTL_EXCLUSIVITY_DEFAULT 1024 /* GM/IB/etc. */

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2012 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -43,14 +43,74 @@
/* Open MPI includes */
#include "opal/mca/event/event.h"
#include "opal/class/opal_free_list.h"
#include "opal/mca/btl/btl.h"
#include "opal/mca/btl/base/base.h"
#include "opal/mca/mpool/mpool.h"
#include "opal/class/opal_hash_table.h"
#include "opal/util/fd.h"
#define MCA_BTL_TCP_STATISTICS 0
BEGIN_C_DECLS
#if (HAVE_PTHREAD_H == 1)
#define MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD 1
#else
#define MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD 0
#endif /* (HAVE_PTHREAD_H == 1) */
extern opal_event_base_t* mca_btl_tcp_event_base;
#define MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag) \
do { \
int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP); \
if( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ) { \
frag->base.des_cbfunc(&frag->endpoint->endpoint_btl->super, frag->endpoint, \
&frag->base, frag->rc); \
} \
if( btl_ownership ) { \
MCA_BTL_TCP_FRAG_RETURN(frag); \
} \
} while (0)
#define MCA_BTL_TCP_RECV_TRIGGER_CB(frag) \
do { \
if( MCA_BTL_TCP_HDR_TYPE_SEND == frag->hdr.type ) { \
mca_btl_active_message_callback_t* reg; \
reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag; \
reg->cbfunc(&frag->endpoint->endpoint_btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata); \
} \
} while (0)
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
extern opal_list_t mca_btl_tcp_ready_frag_pending_queue;
extern opal_mutex_t mca_btl_tcp_ready_frag_mutex;
extern int mca_btl_tcp_pipe_to_progress[2];
extern int mca_btl_tcp_progress_thread_trigger;
#define MCA_BTL_TCP_CRITICAL_SECTION_ENTER(name) \
opal_mutex_atomic_lock((name))
#define MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(name) \
opal_mutex_atomic_unlock((name))
#define MCA_BTL_TCP_ACTIVATE_EVENT(event, value) \
do { \
if(0 < mca_btl_tcp_progress_thread_trigger) { \
opal_event_t* _event = (opal_event_t*)(event); \
opal_fd_write( mca_btl_tcp_pipe_to_progress[1], sizeof(opal_event_t*), \
&_event); \
} \
else { \
opal_event_add(event, (value)); \
} \
} while (0)
#else
#define MCA_BTL_TCP_CRITICAL_SECTION_ENTER(name)
#define MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(name)
#define MCA_BTL_TCP_ACTIVATE_EVENT(event, value) \
do { \
opal_event_add(event, (value)); \
} while (0)
#endif /* MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD */
/**
* TCP BTL component.
@ -69,6 +129,7 @@ struct mca_btl_tcp_component_t {
int tcp_endpoint_cache; /**< amount of cache on each endpoint */
opal_proc_table_t tcp_procs; /**< hash table of tcp proc structures */
opal_mutex_t tcp_lock; /**< lock for accessing module state */
opal_list_t tcp_events;
opal_event_t tcp_recv_event; /**< recv event for IPv4 listen socket */
int tcp_listen_sd; /**< IPv4 listen socket for incoming connection requests */
@ -95,6 +156,14 @@ struct mca_btl_tcp_component_t {
opal_free_list_t tcp_frag_max;
opal_free_list_t tcp_frag_user;
int tcp_enable_progress_thread; /** Support for tcp progress thread flag */
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
opal_event_t tcp_recv_thread_async_event;
opal_mutex_t tcp_frag_eager_mutex;
opal_mutex_t tcp_frag_max_mutex;
opal_mutex_t tcp_frag_user_mutex;
#endif
/* Do we want to use TCP_NODELAY? */
int tcp_not_use_nodelay;
@ -145,13 +214,6 @@ extern mca_btl_base_module_t** mca_btl_tcp_component_init(
);
/**
* TCP component progress.
*/
extern int mca_btl_tcp_component_progress(void);
/**
* Cleanup any resources held by the BTL.
*
@ -292,6 +354,10 @@ mca_btl_base_descriptor_t* mca_btl_tcp_prepare_src(
uint32_t flags
);
extern void
mca_btl_tcp_dump(struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
int verbose);
/**
* Fault Tolerance Event Notification Function

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2014 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -61,6 +61,8 @@
#include "opal/util/argv.h"
#include "opal/util/net.h"
#include "opal/util/proc.h"
#include "opal/util/net.h"
#include "opal/util/fd.h"
#include "opal/util/show_help.h"
#include "opal/constants.h"
#include "opal/mca/btl/btl.h"
@ -69,6 +71,10 @@
#include "opal/mca/btl/base/btl_base_error.h"
#include "opal/mca/pmix/pmix.h"
#include "ompi/constants.h"
#include "opal/mca/btl/btl.h"
#include "opal/mca/btl/base/base.h"
#include "opal/mca/btl/base/btl_base_error.h"
#include "btl_tcp.h"
#include "btl_tcp_addr.h"
#include "btl_tcp_proc.h"
@ -85,6 +91,16 @@ static int mca_btl_tcp_component_register(void);
static int mca_btl_tcp_component_open(void);
static int mca_btl_tcp_component_close(void);
opal_event_base_t* mca_btl_tcp_event_base = NULL;
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
int mca_btl_tcp_progress_thread_trigger = -1;
int mca_btl_tcp_pipe_to_progress[2] = { -1, -1 };
static opal_thread_t mca_btl_tcp_progress_thread;
opal_list_t mca_btl_tcp_ready_frag_pending_queue;
opal_mutex_t mca_btl_tcp_ready_frag_mutex;
#endif /* MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD */
static char *mca_btl_tcp_if_seq_string;
mca_btl_tcp_component_t mca_btl_tcp_component = {
.super = {
/* First, the mca_base_component_t struct containing meta information
@ -102,6 +118,7 @@ mca_btl_tcp_component_t mca_btl_tcp_component = {
},
.btl_init = mca_btl_tcp_component_init,
.btl_progress = NULL,
}
};
@ -165,8 +182,25 @@ struct mca_btl_tcp_event_t {
};
typedef struct mca_btl_tcp_event_t mca_btl_tcp_event_t;
OBJ_CLASS_INSTANCE( mca_btl_tcp_event_t, opal_list_item_t,
NULL, NULL);
static void mca_btl_tcp_event_construct(mca_btl_tcp_event_t* event)
{
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
opal_list_append(&mca_btl_tcp_component.tcp_events, &event->item);
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
}
static void mca_btl_tcp_event_destruct(mca_btl_tcp_event_t* event)
{
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
opal_list_remove_item(&mca_btl_tcp_component.tcp_events, &event->item);
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
}
OBJ_CLASS_INSTANCE(
mca_btl_tcp_event_t,
opal_list_item_t,
mca_btl_tcp_event_construct,
mca_btl_tcp_event_destruct);
/*
@ -250,6 +284,20 @@ static int mca_btl_tcp_component_register(void)
free(message);
#endif
/* Check if we should support async progress */
mca_btl_tcp_param_register_int ("progress_thread", NULL, 0, OPAL_INFO_LVL_1,
&mca_btl_tcp_component.tcp_enable_progress_thread);
#if !defined(MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD)
if( mca_btl_tcp_component.tcp_enable_progress_thread ) {
opal_show_help("help-mpi-btl-tcp.txt",
"unsuported progress thread",
true, "progress thread",
ompi_process_info.nodename,
mca_btl_tcp_component.tcp_if_seq,
"Progress thread support compiled out");
}
#endif /* !defined(MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD) */
mca_btl_tcp_component.report_all_unfound_interfaces = false;
(void) mca_base_component_var_register(&mca_btl_tcp_component.super.btl_version,
"warn_all_unfound_interfaces",
@ -301,11 +349,20 @@ static int mca_btl_tcp_component_open(void)
/* initialize objects */
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_procs, opal_proc_table_t);
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_events, opal_list_t);
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_eager, opal_free_list_t);
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_max, opal_free_list_t);
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_user, opal_free_list_t);
opal_proc_table_init(&mca_btl_tcp_component.tcp_procs, 16, 256);
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_user_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&mca_btl_tcp_ready_frag_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&mca_btl_tcp_ready_frag_pending_queue, opal_list_t);
#endif /* MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD */
/* if_include and if_exclude need to be mutually exclusive */
if (OPAL_SUCCESS !=
mca_base_var_check_exclusive("ompi",
@ -330,8 +387,12 @@ static int mca_btl_tcp_component_open(void)
static int mca_btl_tcp_component_close(void)
{
if (NULL != mca_btl_tcp_component.tcp_btls)
opal_list_item_t* item;
opal_list_item_t* next;
if (NULL != mca_btl_tcp_component.tcp_btls) {
free(mca_btl_tcp_component.tcp_btls);
}
if (mca_btl_tcp_component.tcp_listen_sd >= 0) {
opal_event_del(&mca_btl_tcp_component.tcp_recv_event);
@ -346,6 +407,18 @@ static int mca_btl_tcp_component_close(void)
}
#endif
/* cleanup any pending events */
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
for(item = opal_list_get_first(&mca_btl_tcp_component.tcp_events);
item != opal_list_get_end(&mca_btl_tcp_component.tcp_events);
item = next) {
mca_btl_tcp_event_t* event = (mca_btl_tcp_event_t*)item;
next = opal_list_get_next(item);
opal_event_del(&event->event);
OBJ_RELEASE(event);
}
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
/* release resources */
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_procs);
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager);
@ -357,6 +430,40 @@ static int mca_btl_tcp_component_close(void)
mca_common_cuda_fini();
#endif /* OPAL_CUDA_SUPPORT */
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex);
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex);
if( (NULL != mca_btl_tcp_event_base) &&
(mca_btl_tcp_event_base != opal_sync_event_base) ) {
/* Turn of the progress thread before moving forward */
if( -1 != mca_btl_tcp_progress_thread_trigger ) {
mca_btl_tcp_progress_thread_trigger = 0;
/* Let the progress thread know that we're going away */
if( -1 != mca_btl_tcp_pipe_to_progress[1] ) {
close(mca_btl_tcp_pipe_to_progress[1]);
mca_btl_tcp_pipe_to_progress[1] = -1;
}
while( -1 != mca_btl_tcp_progress_thread_trigger ) {
/*event_base_loopbreak(mca_btl_tcp_event_base);*/
sched_yield();
usleep(100); /* give app a chance to re-enter library */
}
}
opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event);
opal_event_base_free(mca_btl_tcp_event_base);
mca_btl_tcp_event_base = NULL;
/* Close the remaining pipes */
if( -1 != mca_btl_tcp_pipe_to_progress[0] ) {
close(mca_btl_tcp_pipe_to_progress[0]);
mca_btl_tcp_pipe_to_progress[0] = -1;
}
}
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex);
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue);
#endif
return OPAL_SUCCESS;
}
@ -652,14 +759,42 @@ static int mca_btl_tcp_component_create_instances(void)
return ret;
}
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
static void* mca_btl_tcp_progress_thread_engine(opal_object_t *obj)
{
opal_thread_t* current_thread = (opal_thread_t*)obj;
while( 1 == (*((int*)current_thread->t_arg)) ) {
opal_event_loop(mca_btl_tcp_event_base, OPAL_EVLOOP_ONCE);
}
(*((int*)current_thread->t_arg)) = -1;
return NULL;
}
static void mca_btl_tcp_component_event_async_handler(int fd, short unused, void *context)
{
opal_event_t* event;
int rc;
rc = read(fd, (void*)&event, sizeof(opal_event_t*));
assert( fd == mca_btl_tcp_pipe_to_progress[0] );
if( 0 == rc ) {
/* The main thread closed the pipe to trigger the shutdown procedure */
opal_thread_t* current_thread = (opal_thread_t*)context;
(*((int*)current_thread->t_arg)) = 0;
} else {
opal_event_add(event, 0);
}
}
#endif
/*
* Create a listen socket and bind to all interfaces
*/
static int mca_btl_tcp_component_create_listen(uint16_t af_family)
{
int flags;
int sd;
int flags, sd, rc;
struct sockaddr_storage inaddr;
opal_socklen_t addrlen;
@ -678,17 +813,16 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
#if OPAL_ENABLE_IPV6
{
struct addrinfo hints, *res = NULL;
int error;
memset (&hints, 0, sizeof(hints));
hints.ai_family = af_family;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
if ((error = getaddrinfo(NULL, "0", &hints, &res))) {
if ((rc = getaddrinfo(NULL, "0", &hints, &res))) {
opal_output (0,
"mca_btl_tcp_create_listen: unable to resolve. %s\n",
gai_strerror (error));
"mca_btl_tcp_create_listen: unable to resolve. %s\n",
gai_strerror (rc));
CLOSE_THE_SOCKET(sd);
return OPAL_ERROR;
}
@ -816,25 +950,86 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
}
}
/* register listen port */
if(mca_btl_tcp_component.tcp_enable_progress_thread){
/* Declare our intent to use threads. */
opal_event_use_threads();
if( NULL == mca_btl_tcp_event_base ) {
/* fall back to only one event base (the one shared by the entire Open MPI framework) */
if( NULL == (mca_btl_tcp_event_base = opal_event_base_create()) ) {
BTL_ERROR(("BTL TCP failed to create progress event base"));
goto move_forward_with_no_thread;
}
opal_event_base_priority_init(mca_btl_tcp_event_base, OPAL_EVENT_NUM_PRI);
/* construct the thread object */
OBJ_CONSTRUCT(&mca_btl_tcp_progress_thread, opal_thread_t);
/**
* Create a pipe to communicate between the main thread and the progress thread.
*/
if (0 != pipe(mca_btl_tcp_pipe_to_progress)) {
opal_event_base_free(mca_btl_tcp_event_base);
/* fall back to only one event base (the one shared by the entire Open MPI framework */
mca_btl_tcp_event_base = opal_sync_event_base;
mca_btl_tcp_progress_thread_trigger = -1; /* thread not started */
goto move_forward_with_no_thread;
}
/* setup the receiving end of the pipe as non-blocking */
if((flags = fcntl(mca_btl_tcp_pipe_to_progress[0], F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
} else {
flags |= O_NONBLOCK;
if(fcntl(mca_btl_tcp_pipe_to_progress[0], F_SETFL, flags) < 0)
BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
/* Progress thread event */
opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp_recv_thread_async_event,
mca_btl_tcp_pipe_to_progress[0],
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_component_event_async_handler,
&mca_btl_tcp_progress_thread );
opal_event_add(&mca_btl_tcp_component.tcp_recv_thread_async_event, 0);
/* fork off a thread to progress it */
mca_btl_tcp_progress_thread.t_run = mca_btl_tcp_progress_thread_engine;
mca_btl_tcp_progress_thread.t_arg = &mca_btl_tcp_progress_thread_trigger;
mca_btl_tcp_progress_thread_trigger = 1; /* thread up and running */
if( OPAL_SUCCESS != (rc = opal_thread_start(&mca_btl_tcp_progress_thread)) ) {
BTL_ERROR(("BTL TCP progress thread initialization failed (%d)", rc));
opal_event_base_free(mca_btl_tcp_event_base);
/* fall back to only one event base (the one shared by the entire Open MPI framework */
mca_btl_tcp_event_base = opal_sync_event_base;
mca_btl_tcp_progress_thread_trigger = -1; /* thread not started */
goto move_forward_with_no_thread;
}
}
}
else {
move_forward_with_no_thread:
mca_btl_tcp_event_base = opal_sync_event_base;
}
if (AF_INET == af_family) {
opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp_recv_event,
mca_btl_tcp_component.tcp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_component_accept_handler,
0 );
MCA_BTL_TCP_ACTIVATE_EVENT(&mca_btl_tcp_component.tcp_recv_event, 0);
}
#if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) {
opal_event_set(opal_sync_event_base, &mca_btl_tcp_component.tcp6_recv_event,
mca_btl_tcp_component.tcp6_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_component_accept_handler,
0 );
opal_event_add(&mca_btl_tcp_component.tcp6_recv_event, 0);
} else
#endif
{
opal_event_set(opal_sync_event_base, &mca_btl_tcp_component.tcp_recv_event,
mca_btl_tcp_component.tcp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_component_accept_handler,
0 );
opal_event_add(&mca_btl_tcp_component.tcp_recv_event, 0);
opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp6_recv_event,
mca_btl_tcp_component.tcp6_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_component_accept_handler,
0 );
MCA_BTL_TCP_ACTIVATE_EVENT(&mca_btl_tcp_component.tcp6_recv_event, 0);
}
#endif
return OPAL_SUCCESS;
}
@ -938,6 +1133,7 @@ mca_btl_base_module_t** mca_btl_tcp_component_init(int *num_btl_modules,
bool enable_mpi_threads)
{
int ret = OPAL_SUCCESS;
unsigned int i;
mca_btl_base_module_t **btls;
*num_btl_modules = 0;
@ -997,13 +1193,19 @@ mca_btl_base_module_t** mca_btl_tcp_component_init(int *num_btl_modules,
if(OPAL_SUCCESS != (ret = mca_btl_tcp_component_exchange() )) {
return 0;
}
btls = (mca_btl_base_module_t **)malloc(mca_btl_tcp_component.tcp_num_btls *
sizeof(mca_btl_base_module_t*));
if(NULL == btls) {
return NULL;
}
/* Register the btl to support the progress_thread */
if (0 < mca_btl_tcp_progress_thread_trigger) {
for( i = 0; i < mca_btl_tcp_component.tcp_num_btls; i++) {
mca_btl_tcp_component.tcp_btls[i]->super.btl_flags |= MCA_BTL_FLAGS_BTL_PROGRESS_THREAD_ENABLED;
}
}
#if OPAL_CUDA_SUPPORT
mca_common_cuda_stage_one_init();
#endif /* OPAL_CUDA_SUPPORT */
@ -1048,9 +1250,11 @@ static void mca_btl_tcp_component_accept_handler( int incoming_sd,
}
mca_btl_tcp_set_socket_options(sd);
assert( NULL != mca_btl_tcp_event_base );
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW(mca_btl_tcp_event_t);
opal_event_set(opal_sync_event_base, &event->event, sd, OPAL_EV_READ, mca_btl_tcp_component_recv_handler, event);
opal_event_set(mca_btl_tcp_event_base, &(event->event), sd,
OPAL_EV_READ, mca_btl_tcp_component_recv_handler, event);
opal_event_add(&event->event, 0);
}
}
@ -1112,3 +1316,14 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
(void)mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd);
}
/**
* Debugging infrastructure, absolutely not thread safe. Call with care.
*/
static void mca_btl_tcp_component_dump(void)
{
uint32_t i;
for( i = 0; i < mca_btl_tcp_component.tcp_num_btls; i++ ) {
mca_btl_tcp_dump( (mca_btl_base_module_t*)mca_btl_tcp_component.tcp_btls[i], NULL, 1 );
}
}

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2015 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -309,7 +309,7 @@ static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
opal_event_set(opal_sync_event_base, &btl_endpoint->endpoint_recv_event,
opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_endpoint_recv_handler,
@ -320,7 +320,7 @@ static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_
* will be fired only once, and when the endpoint is marked as
* CONNECTED the event should be recreated with the correct flags.
*/
opal_event_set(opal_sync_event_base, &btl_endpoint->endpoint_send_event,
opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE,
mca_btl_tcp_endpoint_send_handler,
@ -368,8 +368,8 @@ int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp
} else {
btl_endpoint->endpoint_send_frag = frag;
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [endpoint_send]");
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
}
} else {
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "send fragment enqueued [endpoint_send]");
@ -509,7 +509,7 @@ void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
assert(btl_endpoint->endpoint_sd_next == -1);
btl_endpoint->endpoint_sd_next = sd;
opal_event_evtimer_set(opal_sync_event_base, &btl_endpoint->endpoint_accept_event,
opal_event_evtimer_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_accept_event,
mca_btl_tcp_endpoint_complete_accept, btl_endpoint);
opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
}
@ -570,7 +570,7 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "READY [endpoint_connected]");
/* Create the send event in a persistent manner. */
opal_event_set(opal_sync_event_base, &btl_endpoint->endpoint_send_event,
opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE | OPAL_EV_PERSIST,
mca_btl_tcp_endpoint_send_handler,
@ -760,7 +760,7 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [start_connect]");
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
return OPAL_SUCCESS;
}
}
@ -790,6 +790,8 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e
opal_socklen_t so_length = sizeof(so_error);
struct sockaddr_storage endpoint_addr;
opal_event_del(&btl_endpoint->endpoint_send_event);
mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
/* check connect completion status */

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -61,7 +61,7 @@ struct mca_btl_base_endpoint_t {
struct mca_btl_tcp_frag_t* endpoint_send_frag; /**< current send frag being processed */
struct mca_btl_tcp_frag_t* endpoint_recv_frag; /**< current recv frag being processed */
mca_btl_tcp_state_t endpoint_state; /**< current state of the connection */
size_t endpoint_retries; /**< number of connection retries attempted */
uint32_t endpoint_retries; /**< number of connection retries attempted */
opal_list_t endpoint_frags; /**< list of pending frags to send */
opal_mutex_t endpoint_send_lock; /**< lock for concurrent access to endpoint state */
opal_mutex_t endpoint_recv_lock; /**< lock for concurrent access to endpoint state */
@ -80,6 +80,7 @@ void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t*);
int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t*, struct mca_btl_tcp_frag_t*);
void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t*, struct sockaddr*, int);
void mca_btl_tcp_endpoint_shutdown(mca_btl_base_endpoint_t*);
void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg);
END_C_DECLS
#endif

Просмотреть файл

@ -3,14 +3,13 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2014 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2012 Oracle and/or all its affiliates. All rights reserved.
* Copyright (c) 2014 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
@ -151,6 +150,9 @@ bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
frag->iov_ptr->iov_base = (opal_iov_base_ptr_t)
(((unsigned char*)frag->iov_ptr->iov_base) + cnt);
frag->iov_ptr->iov_len -= cnt;
OPAL_OUTPUT_VERBOSE((100, opal_btl_base_framework.framework_output,
"%s:%d write %d bytes on socket %d\n",
__FILE__, __LINE__, cnt, sd));
break;
}
}
@ -159,12 +161,14 @@ bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
{
int cnt, dont_copy_data = 0;
size_t i, num_vecs;
mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint;
int i, num_vecs, dont_copy_data = 0;
ssize_t cnt;
struct iovec *iov_ptr;
repeat:
num_vecs = frag->iov_cnt;
iov_ptr = &frag->iov[frag->iov_idx];
#if MCA_BTL_TCP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) {
size_t length;
@ -265,11 +269,13 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
frag->iov[1].iov_len = frag->hdr.size;
frag->iov_cnt++;
#ifndef __sparc
#if !MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
/* The following cannot be done for sparc code
* because it causes alignment errors when accessing
* structures later on in the btl and pml code.
*/
dont_copy_data = 1;
#endif
#endif
goto repeat;
}
@ -297,3 +303,4 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
}
return false;
}

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2014 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -56,6 +56,7 @@ struct mca_btl_tcp_frag_t {
size_t iov_cnt;
size_t iov_idx;
size_t size;
uint16_t next_step;
int rc;
opal_free_list_t* my_list;
/* fake rdma completion */
@ -126,8 +127,6 @@ do { \
bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t*, int sd);
bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t*, int sd);
size_t mca_btl_tcp_frag_dump(mca_btl_tcp_frag_t*, char*, char*, size_t);
size_t mca_btl_tcp_frag_dump(mca_btl_tcp_frag_t* frag, char* msg, char* buf, size_t length);
END_C_DECLS
#endif

Просмотреть файл

@ -1,6 +1,9 @@
# -*- text -*-
#
# Copyright (c) 2009-2014 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2015-2016 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
@ -56,6 +59,16 @@ most common causes when it does occur are:
* The operating system ran out of file descriptors
* The operating system ran out of memory
[unsuported progress thread]
WARNING: Support for the TCP progress thread has not been compiled in.
Fall back to the normal progress.
Local host: %s
Value: %s
Message: %s
#
Your Open MPI job will likely hang until the failure resason is fixed
(e.g., more file descriptors and/or memory becomes available), and may
eventually timeout / abort.