diff --git a/contrib/build-mca-comps-outside-of-tree/btl_tcp2.c b/contrib/build-mca-comps-outside-of-tree/btl_tcp2.c index bb25a08dee..59fdc7fe75 100644 --- a/contrib/build-mca-comps-outside-of-tree/btl_tcp2.c +++ b/contrib/build-mca-comps-outside-of-tree/btl_tcp2.c @@ -32,7 +32,10 @@ #include "opal/datatype/opal_convertor.h" #include "ompi/mca/mpool/base/base.h" #include "ompi/mca/mpool/mpool.h" -#include "ompi/proc/proc.h" +#include "btl_tcp.h" +#include "btl_tcp_frag.h" +#include "btl_tcp_proc.h" +#include "btl_tcp_endpoint.h" mca_btl_tcp2_module_t mca_btl_tcp2_module = { { @@ -57,9 +60,9 @@ mca_btl_tcp2_module_t mca_btl_tcp2_module = { mca_btl_tcp2_prepare_dst, mca_btl_tcp2_send, NULL, /* send immediate */ - mca_btl_tcp2_put, - NULL, /* get */ - mca_btl_base_dump, + mca_btl_tcp_put, + NULL, /* get */ + mca_btl_tcp_dump, NULL, /* mpool */ NULL, /* register error */ mca_btl_tcp2_ft_event @@ -134,7 +137,9 @@ int mca_btl_tcp2_add_procs( struct mca_btl_base_module_t* btl, /* we increase the count of MPI users of the event library once per peer, so that we are used until we aren't connected to a peer */ +#if !MCA_BTL_TCP_USES_PROGRESS_THREAD opal_progress_event_users_increment(); +#endif /* !MCA_BTL_TCP_USES_PROGRESS_THREAD */ } return OMPI_SUCCESS; @@ -153,7 +158,9 @@ int mca_btl_tcp2_del_procs(struct mca_btl_base_module_t* btl, opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint); OBJ_RELEASE(tcp_endpoint); } +#if !MCA_BTL_TCP_USES_PROGRESS_THREAD opal_progress_event_users_decrement(); +#endif /* !MCA_BTL_TCP_USES_PROGRESS_THREAD */ } return OMPI_SUCCESS; } @@ -183,7 +190,11 @@ mca_btl_base_descriptor_t* mca_btl_tcp2_alloc( if( OPAL_UNLIKELY(NULL == frag) ) { return NULL; } - + +#define GB_DEFINED 0 +#if GB_DEFINED + opal_output(0, "alloc_frag( size = %lu )\n", size); +#endif /* GB_DEFINED */ frag->segments[0].seg_len = size; frag->segments[0].seg_addr.pval = frag+1; @@ -193,7 +204,8 @@ mca_btl_base_descriptor_t* mca_btl_tcp2_alloc( frag->base.des_dst_cnt = 0; frag->base.des_flags = flags; frag->base.order = MCA_BTL_NO_ORDER; - frag->btl = (mca_btl_tcp2_module_t*)btl; + frag->btl = (mca_btl_tcp_module_t*)btl; + frag->endpoint = endpoint; return (mca_btl_base_descriptor_t*)frag; } @@ -296,6 +308,10 @@ mca_btl_base_descriptor_t* mca_btl_tcp2_prepare_src( frag->base.des_flags = flags; frag->base.order = MCA_BTL_NO_ORDER; *size = max_data; +#if GB_DEFINED + opal_output(0, "prepare_src( bConverted = %lu, size = %lu\n", + convertor->bConverted, *size); +#endif /* GB_DEFINED */ return &frag->base; } @@ -343,6 +359,10 @@ mca_btl_base_descriptor_t* mca_btl_tcp2_prepare_dst( frag->base.des_dst_cnt = 1; frag->base.des_flags = flags; frag->base.order = MCA_BTL_NO_ORDER; +#if GB_DEFINED + opal_output(0, " prepare_dst( bConverted = %lu, size = %lu\n", + convertor->bConverted, *size); +#endif /* GB_DEFINED */ return &frag->base; } @@ -384,7 +404,10 @@ int mca_btl_tcp2_send( struct mca_btl_base_module_t* btl, frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_SEND; frag->hdr.count = 0; if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr); - return mca_btl_tcp2_endpoint_send(endpoint,frag); +#if GB_DEFINED + opal_output(0, "frag_send( size = %u )\n", frag->hdr.size ); +#endif /* GB_DEFINED */ + return mca_btl_tcp_endpoint_send(endpoint,frag); } @@ -425,7 +448,10 @@ int mca_btl_tcp2_put( mca_btl_base_module_t* btl, frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT; frag->hdr.count = frag->base.des_dst_cnt; if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr); - return ((i = mca_btl_tcp2_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : i); +#if GB_DEFINED + opal_output(0, "frag_put( size = %u )\n", frag->hdr.size ); +#endif /* GB_DEFINED */ + return ((i = mca_btl_tcp_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : i); } @@ -462,12 +488,16 @@ int mca_btl_tcp2_get( frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_GET; frag->hdr.count = frag->base.des_src_cnt; if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr); - return ((rc = mca_btl_tcp2_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : rc); +#if GB_DEFINED + opal_output(0, "frag_get( size = %u )\n", frag->hdr.size ); +#endif /* GB_DEFINED */ + return ((rc = mca_btl_tcp_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : rc); } /* - * Cleanup/release module resources. + * Cleanup/release module resources. This function should only be called once, + * there is no need to protect it. */ int mca_btl_tcp2_finalize(struct mca_btl_base_module_t* btl) @@ -479,8 +509,42 @@ int mca_btl_tcp2_finalize(struct mca_btl_base_module_t* btl) item = opal_list_remove_first(&tcp_btl->tcp_endpoints)) { mca_btl_tcp2_endpoint_t *endpoint = (mca_btl_tcp2_endpoint_t*)item; OBJ_RELEASE(endpoint); +#if !MCA_BTL_TCP_USES_PROGRESS_THREAD opal_progress_event_users_decrement(); +#endif /* !MCA_BTL_TCP_USES_PROGRESS_THREAD */ } free(tcp_btl); return OMPI_SUCCESS; } + +/** + * + */ +void mca_btl_tcp_dump(struct mca_btl_base_module_t* base_btl, + struct mca_btl_base_endpoint_t* endpoint, + int verbose) +{ + mca_btl_tcp_module_t* btl = (mca_btl_tcp_module_t*)base_btl; + mca_btl_base_err("%s TCP %p kernel_id %d\n" +#if MCA_BTL_TCP_STATISTICS + " | statistics: sent %lu recv %lu\n" +#endif /* MCA_BTL_TCP_STATISTICS */ + " | latency %u bandwidth %u\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (void*)btl, btl->tcp_ifkindex, +#if MCA_BTL_TCP_STATISTICS + btl->tcp_bytes_sent, btl->btl_bytes_recv, +#endif /* MCA_BTL_TCP_STATISTICS */ + btl->super.btl_latency, btl->super.btl_bandwidth); + if( NULL != endpoint ) { + mca_btl_tcp_endpoint_dump( endpoint, "TCP" ); + } else if( verbose ) { + opal_list_item_t *item; + + for(item = opal_list_get_first(&btl->tcp_endpoints); + item != opal_list_get_end(&btl->tcp_endpoints); + item = opal_list_get_next(item)) { + mca_btl_tcp_endpoint_dump( (mca_btl_base_endpoint_t*)item, "TCP" ); + } + } +} + diff --git a/contrib/build-mca-comps-outside-of-tree/btl_tcp2_endpoint.c b/contrib/build-mca-comps-outside-of-tree/btl_tcp2_endpoint.c index f198654c79..778769d16c 100644 --- a/contrib/build-mca-comps-outside-of-tree/btl_tcp2_endpoint.c +++ b/contrib/build-mca-comps-outside-of-tree/btl_tcp2_endpoint.c @@ -49,18 +49,15 @@ #include #endif /* HAVE_TIME_H */ -#include "opal/mca/event/event.h" - -#include "ompi/types.h" -#include "ompi/mca/btl/base/btl_base_error.h" #include "opal/util/net.h" +#include "opal/util/fd.h" +#include "opal/util/show_help.h" +#include "ompi/mca/btl/base/btl_base_error.h" +#include "ompi/mca/rte/rte.h" -#include "btl_tcp2.h" -#include "btl_tcp2_endpoint.h" -#include "btl_tcp2_proc.h" -#include "btl_tcp2_frag.h" -#include "btl_tcp2_addr.h" - +#include "btl_tcp_endpoint.h" +#include "btl_tcp_proc.h" +#include "btl_tcp_frag.h" /* * Initialize state of the endpoint instance. @@ -123,12 +120,10 @@ static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user); * diagnostics */ -#if WANT_PEER_DUMP -static void mca_btl_tcp2_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg) +void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg) { - char src[64]; - char dst[64]; - int sndbuf,rcvbuf,nodelay,flags; + char src[64], dst[64], *status; + int sndbuf, rcvbuf, nodelay, flags = -1; #if OPAL_ENABLE_IPV6 struct sockaddr_storage inaddr; #else @@ -136,69 +131,102 @@ static void mca_btl_tcp2_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, co #endif opal_socklen_t obtlen; opal_socklen_t addrlen = sizeof(inaddr); + opal_list_item_t *item; - getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen); + if( -1 != btl_endpoint->endpoint_sd ) { + getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen); #if OPAL_ENABLE_IPV6 - { - char *address; - address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr); - if (NULL != address) { - sprintf(src, "%s", address); + { + char *address; + address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr); + if (NULL != address) { + sprintf(src, "%s", address); + } } - } #else - sprintf(src, "%s", inet_ntoa(inaddr.sin_addr)); + sprintf(src, "%s", inet_ntoa(inaddr.sin_addr)); #endif - getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen); + getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen); #if OPAL_ENABLE_IPV6 - { - char *address; - address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr); - if (NULL != address) { - sprintf(dst, "%s", address); + { + char *address; + address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr); + if (NULL != address) { + sprintf(dst, "%s", address); + } } - } #else - sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr)); + sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr)); #endif - if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) { - BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", - strerror(opal_socket_errno), opal_socket_errno)); - } + if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) { + BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno)); + } #if defined(SO_SNDBUF) - obtlen = sizeof(sndbuf); - if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) { - BTL_ERROR(("SO_SNDBUF option: %s (%d)", - strerror(opal_socket_errno), opal_socket_errno)); - } + obtlen = sizeof(sndbuf); + if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) { + BTL_ERROR(("SO_SNDBUF option: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno)); + } #else - sndbuf = -1; + sndbuf = -1; #endif #if defined(SO_RCVBUF) - obtlen = sizeof(rcvbuf); - if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) { - BTL_ERROR(("SO_RCVBUF option: %s (%d)", - strerror(opal_socket_errno), opal_socket_errno)); - } + obtlen = sizeof(rcvbuf); + if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) { + BTL_ERROR(("SO_RCVBUF option: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno)); + } #else - rcvbuf = -1; + rcvbuf = -1; #endif #if defined(TCP_NODELAY) - obtlen = sizeof(nodelay); - if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) { - BTL_ERROR(("TCP_NODELAY option: %s (%d)", - strerror(opal_socket_errno), opal_socket_errno)); - } + obtlen = sizeof(nodelay); + if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) { + BTL_ERROR(("TCP_NODELAY option: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno)); + } #else - nodelay = 0; + nodelay = 0; #endif + } - BTL_VERBOSE(("%s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x", - msg, src, dst, nodelay, sndbuf, rcvbuf, flags)); + mca_btl_base_err("%s %s: endpoint %p src %s - dst %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg, (void*)btl_endpoint, src, dst, nodelay, sndbuf, rcvbuf, flags); + + switch(btl_endpoint->endpoint_state) { + case MCA_BTL_TCP_CONNECTING: + status = "connecting"; break; + case MCA_BTL_TCP_CONNECT_ACK: + status = "connect ack"; break; + case MCA_BTL_TCP_CLOSED: + status = "closed"; break; + case MCA_BTL_TCP_FAILED: + status = "failed"; break; + case MCA_BTL_TCP_CONNECTED: + status = "connected"; break; + default: + status = "undefined"; break; + } + mca_btl_base_err("%s | [socket %d] [state %s] (nbo %s) (retries %u)\n" +#if MCA_BTL_TCP_ENDPOINT_CACHE + "\tcache %p length %lu pos %ld\n" +#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ + "\tpending: send %p recv %p\n", + msg, btl_endpoint->endpoint_sd, status, + (btl_endpoint->endpoint_nbo ? "true" : "false"), btl_endpoint->endpoint_retries, +#if MCA_BTL_TCP_ENDPOINT_CACHE + btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_length, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache, +#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ + (void*)btl_endpoint->endpoint_send_frag, (void*)btl_endpoint->endpoint_recv_frag ); + for(item = opal_list_get_first(&btl_endpoint->endpoint_frags); + item != opal_list_get_end(&btl_endpoint->endpoint_frags); + item = opal_list_get_next(item)) { + mca_btl_tcp_dump_frag( (mca_btl_tcp_frag_t*)item, " | send" ); + } } -#endif /* * Initialize events to be used by the endpoint instance for TCP select/poll callbacks. @@ -211,22 +239,22 @@ static inline void mca_btl_tcp2_endpoint_event_init(mca_btl_base_endpoint_t* btl btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache; #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ - opal_event_set(opal_event_base, &btl_endpoint->endpoint_recv_event, - btl_endpoint->endpoint_sd, - OPAL_EV_READ|OPAL_EV_PERSIST, - mca_btl_tcp2_endpoint_recv_handler, - btl_endpoint ); + opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_recv_event, + btl_endpoint->endpoint_sd, + OPAL_EV_READ|OPAL_EV_PERSIST, + mca_btl_tcp_endpoint_recv_handler, + btl_endpoint ); /** * The send event should be non persistent until the endpoint is * completely connected. This means, when the event is created it * will be fired only once, and when the endpoint is marked as * CONNECTED the event should be recreated with the correct flags. */ - opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event, - btl_endpoint->endpoint_sd, - OPAL_EV_WRITE, - mca_btl_tcp2_endpoint_send_handler, - btl_endpoint); + opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event, + btl_endpoint->endpoint_sd, + OPAL_EV_WRITE, + mca_btl_tcp_endpoint_send_handler, + btl_endpoint); } @@ -239,7 +267,7 @@ int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tc { int rc = OMPI_SUCCESS; - OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_endpoint->endpoint_send_lock); switch(btl_endpoint->endpoint_state) { case MCA_BTL_TCP_CONNECTING: case MCA_BTL_TCP_CONNECT_ACK: @@ -257,19 +285,18 @@ int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tc if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY && mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd)) { int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP); - - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); - if( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ) { - frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc); - } - if( btl_ownership ) { - MCA_BTL_TCP_FRAG_RETURN(frag); - } + opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock); + MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag); return 1; } else { btl_endpoint->endpoint_send_frag = frag; - opal_event_add(&btl_endpoint->endpoint_send_event, 0); frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK; +#define GB_DEFINED 0 +#if GB_DEFINED + opal_output(0, "%s:%d add the send event on socket %d\n", + __FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */ +#endif /* GB_DEFINED */ + MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0); } } else { frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK; @@ -277,7 +304,7 @@ int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tc } break; } - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_endpoint->endpoint_send_lock); return rc; } @@ -338,22 +365,20 @@ static int mca_btl_tcp2_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_e bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint, struct sockaddr* addr, int sd) { - mca_btl_tcp2_proc_t* this_proc = mca_btl_tcp2_proc_local(); - mca_btl_tcp2_proc_t *endpoint_proc = btl_endpoint->endpoint_proc; + mca_btl_tcp_proc_t *endpoint_proc = btl_endpoint->endpoint_proc; + const orte_process_name_t *this_proc = &(ompi_proc_local()->proc_name); int cmpval; + if(NULL == btl_endpoint->endpoint_addr) { + return false; + } + OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock); OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); - if(NULL == btl_endpoint->endpoint_addr) { - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); - return false; - } - - cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, + cmpval = ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL, &endpoint_proc->proc_ompi->proc_name, - &this_proc->proc_ompi->proc_name); + this_proc); if((btl_endpoint->endpoint_sd < 0) || (btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED && cmpval < 0)) { @@ -365,9 +390,16 @@ bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint, OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); return false; } - mca_btl_tcp2_endpoint_event_init(btl_endpoint); - opal_event_add(&btl_endpoint->endpoint_recv_event, 0); - mca_btl_tcp2_endpoint_connected(btl_endpoint); + mca_btl_tcp_endpoint_event_init(btl_endpoint); + /* NOT NEEDED if we remove the PERSISTENT flag when we create the + * first recv_event. + */ +#if GB_DEFINED + opal_output(0, "%s:%d add the recv event on socket %d\n", + __FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */ +#endif /* GB_DEFINED */ + opal_event_add(&btl_endpoint->endpoint_recv_event, 0); /* TODO */ + mca_btl_tcp_endpoint_connected(btl_endpoint); #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP mca_btl_tcp2_endpoint_dump(btl_endpoint, "accepted"); #endif @@ -388,16 +420,19 @@ bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint, */ void mca_btl_tcp2_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint) { - if(btl_endpoint->endpoint_sd < 0) - return; - btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED; + int sd = btl_endpoint->endpoint_sd; + + do { + if( sd < 0 ) return; + } while ( opal_atomic_cmpset( &(btl_endpoint->endpoint_sd), sd, -1 ) ); + + CLOSE_THE_SOCKET(sd); btl_endpoint->endpoint_retries++; opal_event_del(&btl_endpoint->endpoint_recv_event); opal_event_del(&btl_endpoint->endpoint_send_event); - CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd); - btl_endpoint->endpoint_sd = -1; #if MCA_BTL_TCP_ENDPOINT_CACHE - free( btl_endpoint->endpoint_cache ); + if( NULL != btl_endpoint->endpoint_cache ) + free( btl_endpoint->endpoint_cache ); btl_endpoint->endpoint_cache = NULL; btl_endpoint->endpoint_cache_pos = NULL; btl_endpoint->endpoint_cache_length = 0; @@ -417,16 +452,21 @@ static void mca_btl_tcp2_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoin btl_endpoint->endpoint_retries = 0; /* Create the send event in a persistent manner. */ - opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event, - btl_endpoint->endpoint_sd, - OPAL_EV_WRITE | OPAL_EV_PERSIST, - mca_btl_tcp2_endpoint_send_handler, - btl_endpoint ); + opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event, + btl_endpoint->endpoint_sd, + OPAL_EV_WRITE | OPAL_EV_PERSIST, + mca_btl_tcp_endpoint_send_handler, + btl_endpoint ); if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) { - if(NULL == btl_endpoint->endpoint_send_frag) - btl_endpoint->endpoint_send_frag = (mca_btl_tcp2_frag_t*) + if(NULL == btl_endpoint->endpoint_send_frag) { + btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*) opal_list_remove_first(&btl_endpoint->endpoint_frags); + } +#if GB_DEFINED + opal_output(0, "%s:%d add the send event on socket %d\n", + __FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */ +#endif /* GB_DEFINED */ opal_event_add(&btl_endpoint->endpoint_send_event, 0); } } @@ -578,7 +618,11 @@ static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endp /* non-blocking so wait for completion */ if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) { btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING; - opal_event_add(&btl_endpoint->endpoint_send_event, 0); +#if GB_DEFINED + opal_output(0, "%s:%d add the send event on socket %d\n", + __FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */ +#endif /* GB_DEFINED */ + MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0); return OMPI_SUCCESS; } { @@ -597,7 +641,11 @@ static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endp /* send our globally unique process identifier to the endpoint */ if((rc = mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint)) == OMPI_SUCCESS) { btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK; - opal_event_add(&btl_endpoint->endpoint_recv_event, 0); +#if GB_DEFINED + opal_output(0, "%s:%d add the recv event on socket %d\n", + __FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */ +#endif /* GB_DEFINED */ + MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_recv_event, 0); } else { mca_btl_tcp2_endpoint_close(btl_endpoint); } @@ -619,6 +667,10 @@ static void mca_btl_tcp2_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_ mca_btl_tcp2_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr); /* unregister from receiving event notifications */ +#if GB_DEFINED + opal_output(0, "%s:%d remove the send event on socket %d\n", + __FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */ +#endif /* GB_DEFINED */ opal_event_del(&btl_endpoint->endpoint_send_event); /* check connect completion status */ @@ -630,6 +682,10 @@ static void mca_btl_tcp2_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_ return; } if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) { +#if GB_DEFINED + opal_output(0, "%s:%d add the send event on socket %d\n", + __FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */ +#endif /* GB_DEFINED */ opal_event_add(&btl_endpoint->endpoint_send_event, 0); return; } @@ -643,6 +699,10 @@ static void mca_btl_tcp2_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_ if(mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint) == OMPI_SUCCESS) { btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK; +#if GB_DEFINED + opal_output(0, "%s:%d add the recv event on socket %d\n", + __FILE__, __LINE__, btl_endpoint->endpoint_sd); /* GB */ +#endif /* GB_DEFINED */ opal_event_add(&btl_endpoint->endpoint_recv_event, 0); } else { mca_btl_tcp2_endpoint_close(btl_endpoint); @@ -688,9 +748,12 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user) mca_btl_tcp2_frag_t* frag; frag = btl_endpoint->endpoint_recv_frag; + + data_still_pending_on_endpoint: if(NULL == frag) { - if(mca_btl_tcp2_module.super.btl_max_send_size > - mca_btl_tcp2_module.super.btl_eager_limit) { + + if(mca_btl_tcp_module.super.btl_max_send_size > + mca_btl_tcp_module.super.btl_eager_limit) { MCA_BTL_TCP_FRAG_ALLOC_MAX(frag); } else { MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag); @@ -703,30 +766,32 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user) MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint); } -#if MCA_BTL_TCP_ENDPOINT_CACHE - assert( 0 == btl_endpoint->endpoint_cache_length ); - data_still_pending_on_endpoint: -#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ /* check for completion of non-blocking recv on the current fragment */ - if(mca_btl_tcp2_frag_recv(frag, btl_endpoint->endpoint_sd) == false) { + if( mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false ) { btl_endpoint->endpoint_recv_frag = frag; } else { btl_endpoint->endpoint_recv_frag = NULL; - if( MCA_BTL_TCP_HDR_TYPE_SEND == frag->hdr.type ) { - mca_btl_active_message_callback_t* reg; - reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag; - reg->cbfunc(&frag->btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata); - } + + TODO_MCA_BTL_TCP_RECV_TRIGGER_CB(frag); + #if MCA_BTL_TCP_ENDPOINT_CACHE if( 0 != btl_endpoint->endpoint_cache_length ) { +#if MCA_BTL_TCP_USES_PROGRESS_THREAD + /* Get a new fragment and try again */ + frag = NULL; +#else /* If the cache still contain some data we can reuse the same fragment * until we flush it completly. */ MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint); +#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */ goto data_still_pending_on_endpoint; } #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ + +#if !MCA_BTL_TCP_USES_PROGRESS_THREAD MCA_BTL_TCP_FRAG_RETURN(frag); +#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */ } #if MCA_BTL_TCP_ENDPOINT_CACHE assert( 0 == btl_endpoint->endpoint_cache_length ); @@ -741,12 +806,13 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user) * of the MPI_Finalize. The first one will close the connections, * and all others will complain. */ - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); break; default: - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); + OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock); BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state)); - mca_btl_tcp2_endpoint_close(btl_endpoint); + btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED; + mca_btl_tcp_endpoint_close(btl_endpoint); + OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); break; } } @@ -759,8 +825,8 @@ static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user) static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user) { - mca_btl_tcp2_endpoint_t* btl_endpoint = (mca_btl_tcp2_endpoint_t *)user; - OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); + mca_btl_tcp_endpoint_t* btl_endpoint = (mca_btl_tcp_endpoint_t *)user; + opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock); switch(btl_endpoint->endpoint_state) { case MCA_BTL_TCP_CONNECTING: mca_btl_tcp2_endpoint_complete_connect(btl_endpoint); @@ -779,27 +845,31 @@ static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user) opal_list_remove_first(&btl_endpoint->endpoint_frags); /* if required - update request status and release fragment */ - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); + opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock); assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ); - frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc); - if( btl_ownership ) { - MCA_BTL_TCP_FRAG_RETURN(frag); - } - OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); - + TODO_MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag); + opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock); } - /* if nothing else to do unregister for send event notifications */ + /* if no more data to send unregister the send notifications */ if(NULL == btl_endpoint->endpoint_send_frag) { +#if GB_DEFINED + opal_output(0, "%s:%d remove the send event on socket %d\n", + __FILE__, __LINE__, sd); /* GB */ +#endif /* GB_DEFINED */ opal_event_del(&btl_endpoint->endpoint_send_event); } break; default: BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state)); +#if GB_DEFINED + opal_output(0, "%s:%d remove the send event on socket %d\n", + __FILE__, __LINE__, sd); /* GB */ +#endif /* GB_DEFINED */ opal_event_del(&btl_endpoint->endpoint_send_event); break; } - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); + opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock); } diff --git a/contrib/build-mca-comps-outside-of-tree/btl_tcp2_frag.h b/contrib/build-mca-comps-outside-of-tree/btl_tcp2_frag.h index 2f8d78f9dd..661e271683 100644 --- a/contrib/build-mca-comps-outside-of-tree/btl_tcp2_frag.h +++ b/contrib/build-mca-comps-outside-of-tree/btl_tcp2_frag.h @@ -41,6 +41,14 @@ BEGIN_C_DECLS #define MCA_BTL_TCP_FRAG_IOVEC_NUMBER 4 +/** + * Commands for the threaded version when the fragments must be completed + * by one of the MPI bounded threads. + */ +#define MCA_BTL_TCP_FRAG_STEP_UNDEFINED ((uint16_t)0x0000) +#define MCA_BTL_TCP_FRAG_STEP_SEND_COMPLETE ((uint16_t)0x0001) +#define MCA_BTL_TCP_FRAG_STEP_RECV_COMPLETE ((uint16_t)0x0002) + /** * TCP fragment derived type. */ @@ -82,49 +90,77 @@ OBJ_CLASS_DECLARATION(mca_btl_tcp2_frag_user_t); #define MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag) \ { \ ompi_free_list_item_t *item; \ - OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_eager, item); \ - frag = (mca_btl_tcp2_frag_t*) item; \ + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_eager_mutex); \ + OMPI_FREE_LIST_GET_MT(&mca_btl_tcp_component.tcp_frag_eager, item); \ + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_eager_mutex); \ + frag = (mca_btl_tcp_frag_t*) item; \ } #define MCA_BTL_TCP_FRAG_ALLOC_MAX(frag) \ { \ ompi_free_list_item_t *item; \ - OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_max, item); \ - frag = (mca_btl_tcp2_frag_t*) item; \ + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_max_mutex); \ + OMPI_FREE_LIST_GET_MT(&mca_btl_tcp_component.tcp_frag_max, item); \ + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_max_mutex); \ + frag = (mca_btl_tcp_frag_t*) item; \ } #define MCA_BTL_TCP_FRAG_ALLOC_USER(frag) \ { \ ompi_free_list_item_t *item; \ - OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_user, item); \ - frag = (mca_btl_tcp2_frag_t*) item; \ + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_user_mutex); \ + OMPI_FREE_LIST_GET_MT(&mca_btl_tcp_component.tcp_frag_user, item); \ + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_user_mutex); \ + frag = (mca_btl_tcp_frag_t*) item; \ } +#if MCA_BTL_TCP_USES_PROGRESS_THREAD #define MCA_BTL_TCP_FRAG_RETURN(frag) \ { \ - OMPI_FREE_LIST_RETURN(frag->my_list, (ompi_free_list_item_t*)(frag)); \ + (frag)->next_step = MCA_BTL_TCP_FRAG_STEP_UNDEFINED; \ + if( frag->my_list == &mca_btl_tcp_component.tcp_frag_eager ) { \ + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_eager_mutex); \ + OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \ + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_eager_mutex); \ + } else if( frag->my_list == &mca_btl_tcp_component.tcp_frag_max ) { \ + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_max_mutex); \ + OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \ + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_max_mutex); \ + } else { \ + assert( frag->my_list == &mca_btl_tcp_component.tcp_frag_user ); \ + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_frag_user_mutex); \ + OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \ + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_frag_user_mutex); \ + } \ } +#else +#define MCA_BTL_TCP_FRAG_RETURN(frag) \ +{ \ + (frag)->next_step = MCA_BTL_TCP_FRAG_STEP_UNDEFINED; \ + OMPI_FREE_LIST_RETURN_MT(frag->my_list, (ompi_free_list_item_t*)(frag)); \ +} +#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */ #define MCA_BTL_TCP_FRAG_INIT_DST(frag,ep) \ do { \ - frag->rc = 0; \ - frag->btl = ep->endpoint_btl; \ + frag->base.des_src = NULL; \ + frag->base.des_src_cnt = 0; \ + frag->base.des_dst = frag->segments; \ + frag->base.des_dst_cnt = 1; \ frag->endpoint = ep; \ frag->iov[0].iov_len = sizeof(frag->hdr); \ frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr; \ frag->iov_cnt = 1; \ frag->iov_idx = 0; \ frag->iov_ptr = frag->iov; \ - frag->base.des_src = NULL; \ - frag->base.des_dst_cnt = 0; \ - frag->base.des_dst = frag->segments; \ - frag->base.des_dst_cnt = 1; \ + frag->rc = 0; \ } while(0) bool mca_btl_tcp2_frag_send(mca_btl_tcp2_frag_t*, int sd); bool mca_btl_tcp2_frag_recv(mca_btl_tcp2_frag_t*, int sd); +void mca_btl_tcp_dump_frag( mca_btl_tcp_frag_t* frag, char* msg ); END_C_DECLS #endif diff --git a/contrib/build-mca-comps-outside-of-tree/btl_tcp2_proc.c b/contrib/build-mca-comps-outside-of-tree/btl_tcp2_proc.c index 15ae3a2c49..2834faa45a 100644 --- a/contrib/build-mca-comps-outside-of-tree/btl_tcp2_proc.c +++ b/contrib/build-mca-comps-outside-of-tree/btl_tcp2_proc.c @@ -3,6 +3,7 @@ * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2010 The University of Tennessee and The University + * Copyright (c) 2004-2012 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, @@ -77,10 +78,10 @@ void mca_btl_tcp2_proc_construct(mca_btl_tcp2_proc_t* tcp_proc) void mca_btl_tcp2_proc_destruct(mca_btl_tcp2_proc_t* tcp_proc) { /* remove from list of all proc instances */ - OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock); - opal_proc_table_remove_value(&mca_btl_tcp2_component.tcp_procs, - tcp_proc->proc_ompi->proc_name); - OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock); + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock); + opal_hash_table_remove_value_uint64(&mca_btl_tcp_component.tcp_procs, + ompi_rte_hash_name(&tcp_proc->proc_ompi->proc_name)); + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock); /* release resources */ if(NULL != tcp_proc->proc_endpoints) { @@ -103,11 +104,11 @@ mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_create(ompi_proc_t* ompi_proc) size_t size; mca_btl_tcp2_proc_t* btl_proc; - OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock); - rc = opal_proc_table_get_value(&mca_btl_tcp2_component.tcp_procs, - ompi_proc->proc_name, (void**)&btl_proc); + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock); + rc = opal_hash_table_get_value_uint64(&mca_btl_tcp_component.tcp_procs, + hash, (void**)&btl_proc); if(OMPI_SUCCESS == rc) { - OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock); + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock); return btl_proc; } @@ -117,9 +118,9 @@ mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_create(ompi_proc_t* ompi_proc) btl_proc->proc_ompi = ompi_proc; /* add to hash table of all proc instance */ - opal_proc_table_set_value(&mca_btl_tcp2_component.tcp_procs, - ompi_proc->proc_name, btl_proc); - OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock); + opal_hash_table_set_value_uint64(&mca_btl_tcp_component.tcp_procs, + hash, btl_proc); + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock); /* lookup tcp parameters exported by this proc */ rc = ompi_modex_recv( &mca_btl_tcp2_component.super.btl_version, @@ -681,8 +682,8 @@ int mca_btl_tcp2_proc_insert( mca_btl_tcp2_proc_t* btl_proc, int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t* btl_proc, mca_btl_base_endpoint_t* btl_endpoint) { size_t i; - OPAL_THREAD_LOCK(&btl_proc->proc_lock); - for(i=0; iproc_endpoint_count; i++) { + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_proc->proc_lock); + for( i = 0; i < btl_proc->proc_endpoint_count; i++ ) { if(btl_proc->proc_endpoints[i] == btl_endpoint) { memmove(btl_proc->proc_endpoints+i, btl_proc->proc_endpoints+i+1, (btl_proc->proc_endpoint_count-i-1)*sizeof(mca_btl_base_endpoint_t*)); @@ -700,7 +701,7 @@ int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t* btl_proc, mca_btl_base_endpoin break; } } - OPAL_THREAD_UNLOCK(&btl_proc->proc_lock); + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_proc->proc_lock); return OMPI_SUCCESS; } @@ -710,11 +711,11 @@ int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t* btl_proc, mca_btl_base_endpoin */ mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_lookup(const orte_process_name_t *name) { - mca_btl_tcp2_proc_t* proc = NULL; - OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock); - opal_proc_table_get_value(&mca_btl_tcp2_component.tcp_procs, - name->proc_name, (void**)&proc); - OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock); + mca_btl_tcp_proc_t* proc = NULL; + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock); + opal_hash_table_get_value_uint64(&mca_btl_tcp_component.tcp_procs, + ompi_rte_hash_name(name), (void**)&proc); + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock); return proc; } @@ -725,7 +726,7 @@ mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_lookup(const orte_process_name_t *name) bool mca_btl_tcp2_proc_accept(mca_btl_tcp2_proc_t* btl_proc, struct sockaddr* addr, int sd) { size_t i; - OPAL_THREAD_LOCK(&btl_proc->proc_lock); + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_proc->proc_lock); for( i = 0; i < btl_proc->proc_endpoint_count; i++ ) { mca_btl_base_endpoint_t* btl_endpoint = btl_proc->proc_endpoints[i]; /* Check all conditions before going to try to accept the connection. */ @@ -754,12 +755,12 @@ bool mca_btl_tcp2_proc_accept(mca_btl_tcp2_proc_t* btl_proc, struct sockaddr* ad ; } - if(mca_btl_tcp2_endpoint_accept(btl_endpoint, addr, sd)) { - OPAL_THREAD_UNLOCK(&btl_proc->proc_lock); + if(mca_btl_tcp_endpoint_accept(btl_endpoint, addr, sd)) { + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_proc->proc_lock); return true; } } - OPAL_THREAD_UNLOCK(&btl_proc->proc_lock); + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_proc->proc_lock); return false; } diff --git a/opal/mca/btl/tcp/btl_tcp.h b/opal/mca/btl/tcp/btl_tcp.h index 6b6b0e62e3..aea2f1bb13 100644 --- a/opal/mca/btl/tcp/btl_tcp.h +++ b/opal/mca/btl/tcp/btl_tcp.h @@ -43,14 +43,88 @@ /* Open MPI includes */ #include "opal/mca/event/event.h" -#include "opal/mca/btl/btl.h" -#include "opal/mca/btl/base/base.h" -#include "opal/mca/mpool/mpool.h" +#include "ompi/class/ompi_free_list.h" +#include "ompi/mca/btl/btl.h" +#include "ompi/mca/btl/base/base.h" +#include "ompi/mca/mpool/mpool.h" #include "opal/class/opal_hash_table.h" #define MCA_BTL_TCP_STATISTICS 0 BEGIN_C_DECLS +#if (HAVE_PTHREAD_H == 1) +#define MCA_BTL_TCP_USES_PROGRESS_THREAD 1 +#else +#define MCA_BTL_TCP_USES_PROGRESS_THREAD 0 +#endif /* (HAVE_PTHREAD_H == 1) */ + +extern opal_event_base_t* mca_btl_tcp_event_base; + +#define MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag) \ + do { \ + int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP); \ + if( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ) { \ + frag->base.des_cbfunc(&frag->endpoint->endpoint_btl->super, frag->endpoint, \ + &frag->base, frag->rc); \ + } \ + if( btl_ownership ) { \ + MCA_BTL_TCP_FRAG_RETURN(frag); \ + } \ + } while (0) +#define MCA_BTL_TCP_RECV_TRIGGER_CB(frag) \ + do { \ + if( MCA_BTL_TCP_HDR_TYPE_SEND == frag->hdr.type ) { \ + mca_btl_active_message_callback_t* reg; \ + reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag; \ + reg->cbfunc(&frag->endpoint->endpoint_btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata); \ + } \ + } while (0) + +#if MCA_BTL_TCP_USES_PROGRESS_THREAD +extern opal_list_t mca_btl_tcp_ready_frag_pending_queue; +extern opal_mutex_t mca_btl_tcp_ready_frag_mutex; +extern int mca_btl_tcp_pipe_to_progress[2]; + +#define MCA_BTL_TCP_CRITICAL_SECTION_ENTER(name) \ + opal_mutex_atomic_lock((name)) +#define MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(name) \ + opal_mutex_atomic_unlock((name)) + +#define TODO_MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag) \ + do { \ + (frag)->next_step = MCA_BTL_TCP_FRAG_STEP_SEND_COMPLETE; \ + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_ready_frag_mutex); \ + opal_list_append(&mca_btl_tcp_ready_frag_pending_queue, \ + (opal_list_item_t*)frag); \ + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_ready_frag_mutex); \ + } while (0) + +#define TODO_MCA_BTL_TCP_RECV_TRIGGER_CB(frag) \ + do { \ + (frag)->next_step = MCA_BTL_TCP_FRAG_STEP_RECV_COMPLETE; \ + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_ready_frag_mutex); \ + opal_list_append(&mca_btl_tcp_ready_frag_pending_queue, \ + (opal_list_item_t*)frag); \ + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_ready_frag_mutex); \ + } while (0) +#define MCA_BTL_TCP_ACTIVATE_EVENT(event, value) \ + do { \ + opal_event_t* _event = (opal_event_t*)(event); \ + opal_fd_write( mca_btl_tcp_pipe_to_progress[1], sizeof(opal_event_t*), \ + &_event); \ + } while (0) +#else +#define MCA_BTL_TCP_CRITICAL_SECTION_ENTER(name) +#define MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(name) +#define TODO_MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag) \ + MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag) +#define TODO_MCA_BTL_TCP_RECV_TRIGGER_CB(frag) \ + MCA_BTL_TCP_RECV_TRIGGER_CB(frag) +#define MCA_BTL_TCP_ACTIVATE_EVENT(event, value) \ + do { \ + opal_event_add(event, (value)); \ + } while (0) +#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */ /** * TCP BTL component. @@ -95,6 +169,12 @@ struct mca_btl_tcp_component_t { opal_free_list_t tcp_frag_max; opal_free_list_t tcp_frag_user; +#if MCA_BTL_TCP_USES_PROGRESS_THREAD + opal_event_t tcp_recv_thread_async_event; + opal_mutex_t tcp_frag_eager_mutex; + opal_mutex_t tcp_frag_max_mutex; + opal_mutex_t tcp_frag_user_mutex; +#endif /* Do we want to use TCP_NODELAY? */ int tcp_not_use_nodelay; @@ -292,6 +372,20 @@ mca_btl_base_descriptor_t* mca_btl_tcp_prepare_src( uint32_t flags ); +extern mca_btl_base_descriptor_t* mca_btl_tcp_prepare_dst( + struct mca_btl_base_module_t* btl, + struct mca_btl_base_endpoint_t* peer, + struct mca_mpool_base_registration_t*, + struct opal_convertor_t* convertor, + uint8_t order, + size_t reserve, + size_t* size, + uint32_t flags); + +extern void +mca_btl_tcp_dump(struct mca_btl_base_module_t* btl, + struct mca_btl_base_endpoint_t* endpoint, + int verbose); /** * Fault Tolerance Event Notification Function diff --git a/opal/mca/btl/tcp/btl_tcp_component.c b/opal/mca/btl/tcp/btl_tcp_component.c index a43d6453d0..71d0c3d84d 100644 --- a/opal/mca/btl/tcp/btl_tcp_component.c +++ b/opal/mca/btl/tcp/btl_tcp_component.c @@ -61,6 +61,8 @@ #include "opal/util/argv.h" #include "opal/util/net.h" #include "opal/util/proc.h" +#include "opal/util/net.h" +#include "opal/util/fd.h" #include "opal/util/show_help.h" #include "opal/constants.h" #include "opal/mca/btl/btl.h" @@ -69,6 +71,11 @@ #include "opal/mca/btl/base/btl_base_error.h" #include "opal/mca/pmix/pmix.h" +#include "ompi/constants.h" +#include "ompi/mca/btl/btl.h" +#include "ompi/mca/btl/base/base.h" +#include "ompi/runtime/ompi_module_exchange.h" +#include "ompi/mca/btl/base/btl_base_error.h" #include "btl_tcp.h" #include "btl_tcp_addr.h" #include "btl_tcp_proc.h" @@ -85,6 +92,16 @@ static int mca_btl_tcp_component_register(void); static int mca_btl_tcp_component_open(void); static int mca_btl_tcp_component_close(void); +opal_event_base_t* mca_btl_tcp_event_base = NULL; +#if MCA_BTL_TCP_USES_PROGRESS_THREAD +static int mca_btl_tcp_progress_thread_trigger = -1; +int mca_btl_tcp_pipe_to_progress[2] = { -1, -1 }; +static opal_thread_t mca_btl_tcp_progress_thread; +opal_list_t mca_btl_tcp_ready_frag_pending_queue; +opal_mutex_t mca_btl_tcp_ready_frag_mutex; +#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */ +static char *mca_btl_tcp_if_seq_string; + mca_btl_tcp_component_t mca_btl_tcp_component = { .super = { /* First, the mca_base_component_t struct containing meta information @@ -102,6 +119,11 @@ mca_btl_tcp_component_t mca_btl_tcp_component = { }, .btl_init = mca_btl_tcp_component_init, +#if MCA_BTL_TCP_USES_PROGRESS_THREAD + mca_btl_tcp_component_progress, +#else + NULL, +#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */ } }; @@ -165,8 +187,25 @@ struct mca_btl_tcp_event_t { }; typedef struct mca_btl_tcp_event_t mca_btl_tcp_event_t; -OBJ_CLASS_INSTANCE( mca_btl_tcp_event_t, opal_list_item_t, - NULL, NULL); +static void mca_btl_tcp_event_construct(mca_btl_tcp_event_t* event) +{ + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock); + opal_list_append(&mca_btl_tcp_component.tcp_events, &event->item); + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock); +} + +static void mca_btl_tcp_event_destruct(mca_btl_tcp_event_t* event) +{ + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock); + opal_list_remove_item(&mca_btl_tcp_component.tcp_events, &event->item); + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock); +} + +OBJ_CLASS_INSTANCE( + mca_btl_tcp_event_t, + opal_list_item_t, + mca_btl_tcp_event_construct, + mca_btl_tcp_event_destruct); /* @@ -306,6 +345,14 @@ static int mca_btl_tcp_component_open(void) OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_user, opal_free_list_t); opal_proc_table_init(&mca_btl_tcp_component.tcp_procs, 16, 256); +#if MCA_BTL_TCP_USES_PROGRESS_THREAD + OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex, opal_mutex_t); + OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex, opal_mutex_t); + OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_user_mutex, opal_mutex_t); + OBJ_CONSTRUCT(&mca_btl_tcp_ready_frag_mutex, opal_mutex_t); + OBJ_CONSTRUCT(&mca_btl_tcp_ready_frag_pending_queue, opal_list_t); +#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */ + /* if_include and if_exclude need to be mutually exclusive */ if (OPAL_SUCCESS != mca_base_var_check_exclusive("ompi", @@ -330,9 +377,16 @@ static int mca_btl_tcp_component_open(void) static int mca_btl_tcp_component_close(void) { - if (NULL != mca_btl_tcp_component.tcp_btls) - free(mca_btl_tcp_component.tcp_btls); + opal_list_item_t* item; + opal_list_item_t* next; + if (NULL != mca_btl_tcp_component.tcp_if_seq) { + free(mca_btl_tcp_component.tcp_if_seq); + } + if (NULL != mca_btl_tcp_component.tcp_btls) { + free(mca_btl_tcp_component.tcp_btls); + } + if (mca_btl_tcp_component.tcp_listen_sd >= 0) { opal_event_del(&mca_btl_tcp_component.tcp_recv_event); CLOSE_THE_SOCKET(mca_btl_tcp_component.tcp_listen_sd); @@ -346,6 +400,18 @@ static int mca_btl_tcp_component_close(void) } #endif + /* cleanup any pending events */ + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock); + for(item = opal_list_get_first(&mca_btl_tcp_component.tcp_events); + item != opal_list_get_end(&mca_btl_tcp_component.tcp_events); + item = next) { + mca_btl_tcp_event_t* event = (mca_btl_tcp_event_t*)item; + next = opal_list_get_next(item); + opal_event_del(&event->event); + OBJ_RELEASE(event); + } + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock); + /* release resources */ OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_procs); OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager); @@ -357,6 +423,40 @@ static int mca_btl_tcp_component_close(void) mca_common_cuda_fini(); #endif /* OPAL_CUDA_SUPPORT */ +#if MCA_BTL_TCP_USES_PROGRESS_THREAD + OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex); + OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex); + + if( (NULL != mca_btl_tcp_event_base) && + (mca_btl_tcp_event_base != opal_sync_event_base) ) { + /* Turn of the progress thread before moving forward */ + if( -1 != mca_btl_tcp_progress_thread_trigger ) { + mca_btl_tcp_progress_thread_trigger = 0; + /* Let the progress thread know that we're going away */ + if( -1 != mca_btl_tcp_pipe_to_progress[1] ) { + close(mca_btl_tcp_pipe_to_progress[1]); + mca_btl_tcp_pipe_to_progress[1] = -1; + } + while( -1 != mca_btl_tcp_progress_thread_trigger ) { + /*event_base_loopbreak(mca_btl_tcp_event_base);*/ + sched_yield(); + usleep(100); /* give app a chance to re-enter library */ + } + } + opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event); + opal_event_base_free(mca_btl_tcp_event_base); + mca_btl_tcp_event_base = NULL; + + /* Close the remaining pipes */ + if( -1 != mca_btl_tcp_pipe_to_progress[0] ) { + close(mca_btl_tcp_pipe_to_progress[0]); + mca_btl_tcp_pipe_to_progress[0] = -1; + } + } + OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex); + OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue); +#endif + return OPAL_SUCCESS; } @@ -652,14 +752,70 @@ static int mca_btl_tcp_component_create_instances(void) return ret; } +#if MCA_BTL_TCP_USES_PROGRESS_THREAD +static void* mca_btl_tcp_progress_thread_engine(opal_object_t *obj) +{ + opal_thread_t* current_thread = (opal_thread_t*)obj; + + while( 1 == (*((int*)current_thread->t_arg)) ) { + opal_event_loop(mca_btl_tcp_event_base, OPAL_EVLOOP_ONCE); + } + (*((int*)current_thread->t_arg)) = -1; + return NULL; +} + +static void mca_btl_tcp_component_event_async_handler(int fd, short unused, void *context) +{ + opal_event_t* event; + int rc; + + rc = read(fd, (void*)&event, sizeof(opal_event_t*)); + assert( fd == mca_btl_tcp_pipe_to_progress[0] ); + if( 0 == rc ) { + /* The main thread closed the pipe to trigger the shutdown procedure */ + opal_thread_t* current_thread = (opal_thread_t*)context; + (*((int*)current_thread->t_arg)) = 0; + } else { + opal_event_add(event, 0); + } +} + +int mca_btl_tcp_component_progress( void ) +{ + mca_btl_tcp_frag_t* frag; + int count = 0; + + for( ;; ) { + MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_ready_frag_mutex); + frag = (mca_btl_tcp_frag_t*)opal_list_remove_first(&mca_btl_tcp_ready_frag_pending_queue); + MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_ready_frag_mutex); + if( NULL == frag ) break; + + switch(frag->next_step) { + case MCA_BTL_TCP_FRAG_STEP_SEND_COMPLETE: + MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag); + count++; + break; + case MCA_BTL_TCP_FRAG_STEP_RECV_COMPLETE: + MCA_BTL_TCP_RECV_TRIGGER_CB(frag); + MCA_BTL_TCP_FRAG_RETURN(frag); + count++; + break; + default: + break; + } + } + return count; +} +#endif + /* * Create a listen socket and bind to all interfaces */ static int mca_btl_tcp_component_create_listen(uint16_t af_family) { - int flags; - int sd; + int flags, sd, rc; struct sockaddr_storage inaddr; opal_socklen_t addrlen; @@ -678,17 +834,16 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family) #if OPAL_ENABLE_IPV6 { struct addrinfo hints, *res = NULL; - int error; memset (&hints, 0, sizeof(hints)); hints.ai_family = af_family; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; - if ((error = getaddrinfo(NULL, "0", &hints, &res))) { + if ((rc = getaddrinfo(NULL, "0", &hints, &res))) { opal_output (0, - "mca_btl_tcp_create_listen: unable to resolve. %s\n", - gai_strerror (error)); + "mca_btl_tcp_create_listen: unable to resolve. %s\n", + gai_strerror (rc)); CLOSE_THE_SOCKET(sd); return OPAL_ERROR; } @@ -816,25 +971,86 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family) } } - /* register listen port */ +#if MCA_BTL_TCP_USES_PROGRESS_THREAD + /* Declare our intent to use threads. */ + opal_event_use_threads(); + if( NULL == mca_btl_tcp_event_base ) { + /* fall back to only one event base (the one shared by the entire Open MPI framework) */ + mca_btl_tcp_event_base = opal_sync_event_base; + + if( NULL == (mca_btl_tcp_event_base = opal_sync_event_base_create()) ) { + BTL_ERROR(("BTL TCP failed to create progress event base")); + goto move_forward_with_no_thread; + } + opal_event_base_priority_init(mca_btl_tcp_event_base, OPAL_EVENT_NUM_PRI); + + /* construct the thread object */ + OBJ_CONSTRUCT(&mca_btl_tcp_progress_thread, opal_thread_t); + + /** + * Create a pipe to communicate between the main thread and the progress thread. + */ + if (0 != pipe(mca_btl_tcp_pipe_to_progress)) { + opal_event_base_free(mca_btl_tcp_event_base); + /* fall back to only one event base (the one shared by the entire Open MPI framework */ + mca_btl_tcp_event_base = opal_sync_event_base; + mca_btl_tcp_progress_thread_trigger = -1; /* thread not started */ + goto move_forward_with_no_thread; + } + /* setup the receiving end of the pipe as non-blocking */ + if((flags = fcntl(mca_btl_tcp_pipe_to_progress[0], F_GETFL, 0)) < 0) { + BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno)); + } else { + flags |= O_NONBLOCK; + if(fcntl(mca_btl_tcp_pipe_to_progress[0], F_SETFL, flags) < 0) + BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno)); + } + /* Progress thread event */ + opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp_recv_thread_async_event, + mca_btl_tcp_pipe_to_progress[0], + OPAL_EV_READ|OPAL_EV_PERSIST, + mca_btl_tcp_component_event_async_handler, + &mca_btl_tcp_progress_thread ); + opal_event_add(&mca_btl_tcp_component.tcp_recv_thread_async_event, 0); + + /* fork off a thread to progress it */ + mca_btl_tcp_progress_thread.t_run = mca_btl_tcp_progress_thread_engine; + mca_btl_tcp_progress_thread.t_arg = &mca_btl_tcp_progress_thread_trigger; + mca_btl_tcp_progress_thread_trigger = 1; /* thread up and running */ + if( OPAL_SUCCESS != (rc = opal_thread_start(&mca_btl_tcp_progress_thread)) ) { + BTL_ERROR(("BTL TCP progress thread initialization failed (%d)", rc)); + opal_event_base_free(mca_btl_tcp_event_base); + /* fall back to only one event base (the one shared by the entire Open MPI framework */ + mca_btl_tcp_event_base = opal_sync_event_base; + mca_btl_tcp_progress_thread_trigger = -1; /* thread not started */ + goto move_forward_with_no_thread; + } + } + move_forward_with_no_thread: +#else + mca_btl_tcp_event_base = opal_sync_event_base; +#endif + + if (AF_INET == af_family) { + opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp_recv_event, + mca_btl_tcp_component.tcp_listen_sd, + OPAL_EV_READ|OPAL_EV_PERSIST, + mca_btl_tcp_component_accept_handler, + 0 ); + MCA_BTL_TCP_ACTIVATE_EVENT(&mca_btl_tcp_component.tcp_recv_event, 0); + } #if OPAL_ENABLE_IPV6 if (AF_INET6 == af_family) { - opal_event_set(opal_sync_event_base, &mca_btl_tcp_component.tcp6_recv_event, - mca_btl_tcp_component.tcp6_listen_sd, - OPAL_EV_READ|OPAL_EV_PERSIST, - mca_btl_tcp_component_accept_handler, - 0 ); - opal_event_add(&mca_btl_tcp_component.tcp6_recv_event, 0); - } else -#endif - { - opal_event_set(opal_sync_event_base, &mca_btl_tcp_component.tcp_recv_event, - mca_btl_tcp_component.tcp_listen_sd, - OPAL_EV_READ|OPAL_EV_PERSIST, - mca_btl_tcp_component_accept_handler, - 0 ); - opal_event_add(&mca_btl_tcp_component.tcp_recv_event, 0); + opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp6_recv_event, + mca_btl_tcp_component.tcp6_listen_sd, + OPAL_EV_READ|OPAL_EV_PERSIST, + mca_btl_tcp_component_accept_handler, + 0 ); + MCA_BTL_TCP_ACTIVATE_EVENT(&mca_btl_tcp_component.tcp6_recv_event, 0); } +#endif return OPAL_SUCCESS; } @@ -1048,9 +1264,11 @@ static void mca_btl_tcp_component_accept_handler( int incoming_sd, } mca_btl_tcp_set_socket_options(sd); + assert( NULL != mca_btl_tcp_event_base ); /* wait for receipt of peers process identifier to complete this connection */ event = OBJ_NEW(mca_btl_tcp_event_t); - opal_event_set(opal_sync_event_base, &event->event, sd, OPAL_EV_READ, mca_btl_tcp_component_recv_handler, event); + opal_event_set(mca_btl_tcp_event_base, &(event->event), sd, + OPAL_EV_READ, mca_btl_tcp_component_recv_handler, event); opal_event_add(&event->event, 0); } } @@ -1112,3 +1330,14 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user) (void)mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd); } +/** + * Debugging infrastructure, absolutely not thread safe. Call with care. + */ +static void mca_btl_tcp_component_dump(void) +{ + uint32_t i; + + for( i = 0; i < mca_btl_tcp_component.tcp_num_btls; i++ ) { + mca_btl_tcp_dump( (mca_btl_base_module_t*)mca_btl_tcp_component.tcp_btls[i], NULL, 1 ); + } +} diff --git a/opal/mca/btl/tcp/btl_tcp_endpoint.h b/opal/mca/btl/tcp/btl_tcp_endpoint.h index f50c6a43af..0798f3b081 100644 --- a/opal/mca/btl/tcp/btl_tcp_endpoint.h +++ b/opal/mca/btl/tcp/btl_tcp_endpoint.h @@ -2,7 +2,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2007 The University of Tennessee and The University + * Copyright (c) 2004-2012 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, @@ -61,7 +61,7 @@ struct mca_btl_base_endpoint_t { struct mca_btl_tcp_frag_t* endpoint_send_frag; /**< current send frag being processed */ struct mca_btl_tcp_frag_t* endpoint_recv_frag; /**< current recv frag being processed */ mca_btl_tcp_state_t endpoint_state; /**< current state of the connection */ - size_t endpoint_retries; /**< number of connection retries attempted */ + uint32_t endpoint_retries; /**< number of connection retries attempted */ opal_list_t endpoint_frags; /**< list of pending frags to send */ opal_mutex_t endpoint_send_lock; /**< lock for concurrent access to endpoint state */ opal_mutex_t endpoint_recv_lock; /**< lock for concurrent access to endpoint state */ @@ -80,6 +80,7 @@ void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t*); int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t*, struct mca_btl_tcp_frag_t*); void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t*, struct sockaddr*, int); void mca_btl_tcp_endpoint_shutdown(mca_btl_base_endpoint_t*); +void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg); END_C_DECLS #endif diff --git a/opal/mca/btl/tcp/btl_tcp_frag.c b/opal/mca/btl/tcp/btl_tcp_frag.c index d9bf9b76e6..e27e4cc03d 100644 --- a/opal/mca/btl/tcp/btl_tcp_frag.c +++ b/opal/mca/btl/tcp/btl_tcp_frag.c @@ -10,7 +10,6 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2008-2012 Oracle and/or all its affiliates. All rights reserved. * Copyright (c) 2014 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2015 Research Organization for Information Science @@ -48,9 +47,9 @@ #include "btl_tcp_frag.h" #include "btl_tcp_endpoint.h" -static void mca_btl_tcp_frag_eager_constructor(mca_btl_tcp_frag_t* frag) -{ - frag->size = mca_btl_tcp_module.super.btl_eager_limit; +static void mca_btl_tcp_frag_eager_constructor(mca_btl_tcp_frag_t* frag) +{ + frag->size = mca_btl_tcp_module.super.btl_eager_limit; frag->my_list = &mca_btl_tcp_component.tcp_frag_eager; } @@ -151,6 +150,11 @@ bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd) frag->iov_ptr->iov_base = (opal_iov_base_ptr_t) (((unsigned char*)frag->iov_ptr->iov_base) + cnt); frag->iov_ptr->iov_len -= cnt; +#define GB_DEFINED 0 +#if GB_DEFINED + opal_output(0, "%s:%d write %lu bytes from socket %d\n", + __FILE__, __LINE__, cnt, sd); /* GB */ +#endif /* GB_DEFINED */ break; } } @@ -159,12 +163,14 @@ bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd) bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd) { - int cnt, dont_copy_data = 0; - size_t i, num_vecs; mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint; + int i, num_vecs, dont_copy_data = 0; + ssize_t cnt; + struct iovec *iov_ptr; repeat: num_vecs = frag->iov_cnt; + iov_ptr = &frag->iov[frag->iov_idx]; #if MCA_BTL_TCP_ENDPOINT_CACHE if( 0 != btl_endpoint->endpoint_cache_length ) { size_t length; @@ -265,11 +271,13 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd) frag->iov[1].iov_len = frag->hdr.size; frag->iov_cnt++; #ifndef __sparc - /* The following cannot be done for sparc code +#if !MCA_BTL_TCP_USES_PROGRESS_THREAD + /* The following cannot be done for sparc code * because it causes alignment errors when accessing * structures later on in the btl and pml code. */ dont_copy_data = 1; +#endif #endif goto repeat; } @@ -297,3 +305,20 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd) } return false; } + +void mca_btl_tcp_dump_frag( mca_btl_tcp_frag_t* frag, char* msg ) +{ + int i; + + mca_btl_base_err("%s %s frag %p (endpoint %p) size %lu iov_cnt %d iov_idx %d next_step %s rc %d\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg, (void*)frag, (void*)frag->endpoint, frag->size, frag->iov_cnt, frag->iov_idx, + (MCA_BTL_TCP_FRAG_STEP_UNDEFINED == frag->next_step ? "undefined" : + (MCA_BTL_TCP_FRAG_STEP_SEND_COMPLETE == frag->next_step ? "send_complete" : + (MCA_BTL_TCP_FRAG_STEP_RECV_COMPLETE == frag->next_step ? "recv_complete" : "unknown"))), + frag->rc); + for( i = 0; i < frag->iov_cnt; i++ ) { + mca_btl_base_err("%s %s | iov[%d] = {%p, %lu}\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg, i, frag->iov[i].iov_base, frag->iov[i].iov_len); + } + mca_btl_base_err("%s +\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); +} diff --git a/opal/mca/btl/tcp/btl_tcp_hdr.h b/opal/mca/btl/tcp/btl_tcp_hdr.h index b797776201..70467e4371 100644 --- a/opal/mca/btl/tcp/btl_tcp_hdr.h +++ b/opal/mca/btl/tcp/btl_tcp_hdr.h @@ -2,7 +2,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2005 The University of Tennessee and The University + * Copyright (c) 2004-2012 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, diff --git a/opal/mca/btl/tcp/btl_tcp_proc.h b/opal/mca/btl/tcp/btl_tcp_proc.h index 5e56cd7dea..064e81fa2d 100644 --- a/opal/mca/btl/tcp/btl_tcp_proc.h +++ b/opal/mca/btl/tcp/btl_tcp_proc.h @@ -2,7 +2,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2006 The University of Tennessee and The University + * Copyright (c) 2004-2012 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,