1
1
openmpi/ompi/mca/btl/sctp/btl_sctp_endpoint.c
Brian Barrett f42783ae1a Move the RTE framework change into the trunk. With this change, all non-CR
runtime code goes through one of the rte, dpm, or pubsub frameworks.

This commit was SVN r27934.
2013-01-27 23:25:10 +00:00

1322 строки
53 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2009 Sun Microsystems, Inc All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* In windows, many of the socket functions return an EWOULDBLOCK
* instead of \ things like EAGAIN, EINPROGRESS, etc. It has been
* verified that this will \ not conflict with other error codes that
* are returned by these functions \ under UNIX/Linux environments
*/
#include "ompi_config.h"
#include <stdlib.h>
#include <string.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "opal/opal_socket_errno.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_NETINET_TCP_H
#include <netinet/tcp.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif /* HAVE_SYS_TIME_H */
#ifdef HAVE_TIME_H
#include <time.h>
#endif /* HAVE_TIME_H */
#include <netinet/sctp.h>
#include "ompi/types.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "opal/mca/event/event.h"
#include "btl_sctp.h"
#include "btl_sctp_endpoint.h"
#include "btl_sctp_proc.h"
#include "btl_sctp_frag.h"
#include "btl_sctp_addr.h"
#include "btl_sctp_utils.h"
#include "btl_sctp_recv_handler.h"
/* Pre-allocate pool of endpoints. These are copies of endpoints
* since opal_list only allows an item to be a member of one list
* at a time. We have a freelist of pre-allocated endpoints and
* also a list of endpoints that currently want to send. Our send
* handler walks this list each time it's invoked. If nobody currently
* wants to send (i.e., endpoint_associated_with_send is NULL), the
* POLLOUT for this socket is turned off. If there are no more free
* slots allocated, the pool expands.
*/
#define INITIAL_NUM_FREE_POOL_SLOTS 10
static opal_list_t sending_endpoints;
static opal_list_t sending_endpoints_freelist;
mca_btl_base_endpoint_t *endpoint_associated_with_send;
OBJ_CLASS_INSTANCE(
our_sctp_endpoint,
opal_list_item_t,
NULL,
NULL);
static int have_initiated_sending_endpoints_list=0;
struct sending_endpoint_chunk {
our_sctp_endpoint *to_free;
struct sending_endpoint_chunk *next;
};
typedef struct sending_endpoint_chunk sending_endpoint_chunk;
static sending_endpoint_chunk endpoint_chunks_to_free;
static sending_endpoint_chunk * endpoint_chunks_to_free_tail;
static int sending_endpoints_walk_count;
/*
* Initialize state of the endpoint instance.
*
*/
static void mca_btl_sctp_endpoint_construct(mca_btl_sctp_endpoint_t* endpoint)
{
endpoint->endpoint_btl = NULL;
endpoint->endpoint_proc = NULL;
endpoint->endpoint_addr = NULL;
endpoint->endpoint_sd = -1;
endpoint->endpoint_send_frag = 0;
endpoint->endpoint_recv_frag = 0;
endpoint->endpoint_state = MCA_BTL_SCTP_CLOSED;
endpoint->endpoint_retries = 0;
endpoint->endpoint_nbo = false;
#if MCA_BTL_SCTP_ENDPOINT_CACHE
endpoint->endpoint_cache = NULL;
endpoint->endpoint_cache_pos = NULL;
endpoint->endpoint_cache_length = 0;
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
endpoint->endpoint_has_initialized = 0;
endpoint->endpoint_in_list = 0;
OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t);
OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t);
if(0 == have_initiated_sending_endpoints_list) {
int i;
our_sctp_endpoint *free_entries;
have_initiated_sending_endpoints_list++;
OBJ_CONSTRUCT(&sending_endpoints, opal_list_t);
OBJ_CONSTRUCT(&sending_endpoints_freelist, opal_list_t);
endpoint_associated_with_send = NULL;
if(NULL == (free_entries = (our_sctp_endpoint *) malloc(sizeof(our_sctp_endpoint)
* INITIAL_NUM_FREE_POOL_SLOTS)))
{
BTL_ERROR(("cannot allocate free poll entries."));
return;
}
memset(free_entries, 0, sizeof(our_sctp_endpoint) * INITIAL_NUM_FREE_POOL_SLOTS);
for(i=0; i < INITIAL_NUM_FREE_POOL_SLOTS; i++) {
opal_list_append(&sending_endpoints_freelist, (opal_list_item_t *) &free_entries[i]);
}
endpoint_chunks_to_free.to_free = free_entries;
endpoint_chunks_to_free.next = NULL;
endpoint_chunks_to_free_tail = &endpoint_chunks_to_free;
}
}
/*
* Destroy a endpoint
*
*/
static void mca_btl_sctp_endpoint_destruct(mca_btl_sctp_endpoint_t* endpoint)
{
mca_btl_sctp_proc_remove(endpoint->endpoint_proc, endpoint);
mca_btl_sctp_endpoint_close(endpoint);
OBJ_DESTRUCT(&endpoint->endpoint_frags);
OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
if(have_initiated_sending_endpoints_list) {
sending_endpoint_chunk *chunkp = &endpoint_chunks_to_free, *nextp;
have_initiated_sending_endpoints_list--;
OBJ_DESTRUCT(&sending_endpoints);
OBJ_DESTRUCT(&sending_endpoints_freelist);
/* free up memory used in pool of sending_endpoint items */
/* only to_free malloc'd in static endpoint_chunks_to_free */
free(endpoint_chunks_to_free.to_free);
nextp = chunkp->next;
while(nextp) /* ...but all other chunks were dynamically allocated */
{
chunkp = nextp;
nextp = chunkp->next;
free(chunkp->to_free);
free(chunkp);
}
}
}
OBJ_CLASS_INSTANCE(
mca_btl_sctp_endpoint_t,
opal_list_item_t,
mca_btl_sctp_endpoint_construct,
mca_btl_sctp_endpoint_destruct);
static void mca_btl_sctp_endpoint_construct(mca_btl_base_endpoint_t* btl_endpoint);
static void mca_btl_sctp_endpoint_destruct(mca_btl_base_endpoint_t* btl_endpoint);
static int mca_btl_sctp_endpoint_start_connect(mca_btl_base_endpoint_t*);
static void mca_btl_sctp_endpoint_connected(mca_btl_base_endpoint_t*);
static void mca_btl_sctp_endpoint_recv_handler(int sd, short flags, void* user);
static void mca_btl_sctp_endpoint_send_handler(int sd, short flags, void* user);
/*
* Diagnostics: change this to "1" to enable the function
* mca_btl_sctp_endpoint_dump(), below
*/
#define WANT_PEER_DUMP 0
/*
* diagnostics
*/
#if WANT_PEER_DUMP
static void mca_btl_sctp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg)
{
char src[64];
char dst[64];
int sndbuf,rcvbuf,nodelay,flags;
struct sockaddr_in inaddr;
opal_socklen_t obtlen;
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed with errno=%d", opal_socket_errno));
}
#if defined(SO_SNDBUF)
obtlen = sizeof(sndbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
BTL_ERROR(("SO_SNDBUF option: errno %d", opal_socket_errno));
}
#else
sndbuf = -1;
#endif
#if defined(SO_RCVBUF)
obtlen = sizeof(rcvbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
BTL_ERROR(("SO_RCVBUF option: errno %d", opal_socket_errno));
}
#else
rcvbuf = -1;
#endif
#if defined(SCTP_NODELAY)
obtlen = sizeof(nodelay);
if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_SCTP, SCTP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
BTL_ERROR(("SCTP_NODELAY option: errno %d", opal_socket_errno));
}
#else
nodelay = 0;
#endif
BTL_DEBUG(("%s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x",
msg, src, dst, nodelay, sndbuf, rcvbuf, flags));
}
#endif
/*
* Initialize events to be used by the endpoint instance for TCP select/poll callbacks.
*/
static inline void mca_btl_sctp_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint, int sd)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1*/
#if MCA_BTL_SCTP_ENDPOINT_CACHE
btl_endpoint->endpoint_cache = (char*)malloc(mca_btl_sctp_component.sctp_endpoint_cache);
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
opal_event_set(opal_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_sctp_endpoint_recv_handler,
btl_endpoint );
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE|OPAL_EV_PERSIST,
mca_btl_sctp_endpoint_send_handler,
btl_endpoint);
}
else {
/* 1 to many */
if(0 == btl_endpoint->endpoint_has_initialized) {
btl_endpoint->endpoint_has_initialized++;
btl_endpoint->endpoint_sd = mca_btl_sctp_component.sctp_listen_sd;
#if MCA_BTL_SCTP_ENDPOINT_CACHE
btl_endpoint->endpoint_cache = (char*)malloc(mca_btl_sctp_component.sctp_endpoint_cache);
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
opal_event_set(opal_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_sctp_recv_handler,
btl_endpoint );
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE|OPAL_EV_PERSIST,
mca_btl_sctp_endpoint_send_handler,
btl_endpoint);
}
}
}
/*
* Attempt to send a fragment using a given endpoint. If the endpoint is not connected,
* queue the fragment and start the connection as required.
*
* Here we have our first notion of endpoint connection state. Instead of
* assigning state, it may be benficial to maintain another table of recvrs that
* we have sent to in the past. If that recvr is NOT found in the table, then it
* is a new proc and we have to send them our GUID before we send anything else.
*/
int mca_btl_sctp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_sctp_frag_t* frag)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1 */
int rc = OMPI_SUCCESS;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
switch(btl_endpoint->endpoint_state) {
case MCA_BTL_SCTP_CONNECTING:
case MCA_BTL_SCTP_CONNECT_ACK:
case MCA_BTL_SCTP_CLOSED:
opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
if(btl_endpoint->endpoint_state == MCA_BTL_SCTP_CLOSED) {
rc = mca_btl_sctp_endpoint_start_connect(btl_endpoint);
}
break;
case MCA_BTL_SCTP_FAILED:
rc = OMPI_ERR_UNREACH;
break;
case MCA_BTL_SCTP_CONNECTED:
if (btl_endpoint->endpoint_send_frag == NULL) {
if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
mca_btl_sctp_frag_send(frag, btl_endpoint->endpoint_sd)) {
int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
if( btl_ownership ) {
MCA_BTL_SCTP_FRAG_RETURN(frag);
}
return OMPI_SUCCESS;
} else {
btl_endpoint->endpoint_send_frag = frag;
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
}
} else {
opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
}
break;
case MCA_BTL_SCTP_SHUTDOWN:
rc = OMPI_ERROR;
break;
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
return rc;
}
else {
/* 1 to many */
int rc = OMPI_SUCCESS;
/* What if there are multiple procs on this endpoint? Possible? */
ompi_vpid_t vpid = btl_endpoint->endpoint_proc->proc_ompi->proc_name.vpid;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
if((mca_btl_sctp_proc_check_vpid(vpid, sender_proc_table)) == INVALID_ENTRY) {
opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
rc = mca_btl_sctp_endpoint_start_connect(btl_endpoint);
/* add the proc to sender_proc_table somewhere here */
mca_btl_sctp_proc_add_vpid(vpid, btl_endpoint->endpoint_proc, sender_proc_table);
}
else { /* VALID_ENTRY */
if (btl_endpoint->endpoint_send_frag == NULL) {
if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
mca_btl_sctp_frag_send(frag, btl_endpoint->endpoint_sd)) {
int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
if( btl_ownership ) {
MCA_BTL_SCTP_FRAG_RETURN(frag);
}
return OMPI_SUCCESS;
} else {
btl_endpoint->endpoint_send_frag = frag;
/* TODO make this all a function since repeated below */
/* if this endpoint is already in the sending_endpoints list; don't add duplicate */
/* if the associated endpoint is this one, avoid putting into the list */
if(btl_endpoint->endpoint_in_list ||
btl_endpoint == endpoint_associated_with_send) { /* pointer comparison assumes no dup endpoints */
; /* leave */
}
else if(NULL != endpoint_associated_with_send) {
/* another endpoint associated with sending on this one-to-many socket */
/* put this endpoint in the list of endpoints that need to send no matter what... */
our_sctp_endpoint *our_btl_endpoint;
if(0 == opal_list_get_size(&sending_endpoints_freelist))
{
/* need to expand the freelist */
our_sctp_endpoint *free_entries;
int i;
if(NULL == (endpoint_chunks_to_free_tail->next = (sending_endpoint_chunk *)
malloc(sizeof(sending_endpoint_chunk))))
{
BTL_ERROR(("cannot allocate sending endpoint chunk."));
return OMPI_ERR_OUT_OF_RESOURCE;
}
if(NULL == (free_entries = (our_sctp_endpoint *) malloc(sizeof(our_sctp_endpoint)
* INITIAL_NUM_FREE_POOL_SLOTS)))
{
BTL_ERROR(("cannot allocate free poll entries."));
/* only 1 of 2 newly required allocations were successful so free the other
* and reset the value so the generic destruction function works.
*/
free(endpoint_chunks_to_free_tail->next);
endpoint_chunks_to_free_tail->next = NULL;
return OMPI_ERR_OUT_OF_RESOURCE;
}
memset(free_entries, 0, sizeof(our_sctp_endpoint) * INITIAL_NUM_FREE_POOL_SLOTS);
for(i=0; i < INITIAL_NUM_FREE_POOL_SLOTS; i++) {
opal_list_append(&sending_endpoints_freelist,
(opal_list_item_t *) &free_entries[i]);
}
/* update location of where we'll expand next (if need be) */
endpoint_chunks_to_free_tail = endpoint_chunks_to_free_tail->next;
endpoint_chunks_to_free_tail->next = NULL;
endpoint_chunks_to_free_tail->to_free = free_entries;
}
our_btl_endpoint = (our_sctp_endpoint *) opal_list_remove_first(&sending_endpoints_freelist);
memset(our_btl_endpoint, 0, sizeof(our_sctp_endpoint));
our_btl_endpoint->endpoint = btl_endpoint;
opal_list_append(&sending_endpoints, (opal_list_item_t *) our_btl_endpoint);
btl_endpoint->endpoint_in_list++;
} else {
/* no endpoint is currently associated with sending on this socket */
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
endpoint_associated_with_send = btl_endpoint;
}
}
} else {
/* this btl has pending frags to send to queue this new one */
opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
}
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
return rc;
}
}
/*
* A blocking send on a non-blocking socket. Used to send the small amount of connection
* information that identifies the endpoints endpoint.
*/
static int mca_btl_sctp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1 */
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
/* We don't need to pass the sockaddr and len in
* sctp_sendmsg() as it's a TCP style 1 to 1 socket.
*/
int retval = sctp_sendmsg(btl_endpoint->endpoint_sd, (char *)ptr+cnt, size-cnt,
0, 0, 0, 0, 0, 0, 0);
if (retval >= 0) {
SCTP_BTL_ERROR((" mca_btl_sctp_endpoint_send_blocking() sd=%d, retval=%d\n",
btl_endpoint->endpoint_sd, retval));
}
if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN &&
opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("send() failed with errno=%d",opal_socket_errno));
mca_btl_sctp_endpoint_close(btl_endpoint);
return -1;
}
continue;
}
cnt += retval;
}
return cnt;
}
else {
/* 1 to many */
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
struct sockaddr_in btl_sockaddr;
btl_sockaddr = mca_btl_sctp_utils_sockaddr_from_endpoint(btl_endpoint);
while(cnt < size) {
socklen_t len = sizeof(struct sockaddr_in);
int retval = sctp_sendmsg(btl_endpoint->endpoint_sd, (char *)ptr+cnt, size-cnt,
(struct sockaddr *)&btl_sockaddr, len, 0, 0, 0, 0, 0);
if (retval >= 0) {
SCTP_BTL_ERROR((" mca_btl_sctp_endpoint_send_blocking() retval=%d\n", retval));
}
if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("send() failed with errno=%d",opal_socket_errno));
mca_btl_sctp_endpoint_close(btl_endpoint);
return -1;
}
continue;
}
cnt += retval;
}
return cnt;
}
}
/*
* Send the globally unique identifier for this process to a endpoint on
* a newly connected socket.
*/
static int mca_btl_sctp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
{
/* send process identifier to remote endpoint */
mca_btl_sctp_proc_t* btl_proc = mca_btl_sctp_proc_local();
ompi_process_name_t guid = btl_proc->proc_ompi->proc_name;
ORTE_PROCESS_NAME_HTON(guid);
if(mca_btl_sctp_endpoint_send_blocking(btl_endpoint, &guid, sizeof(guid)) !=
sizeof(guid)) {
return OMPI_ERR_UNREACH;
}
return OMPI_SUCCESS;
}
/*
* Check the state of this endpoint. If the incoming connection request matches
* our endpoints address, check the state of our connection:
* (1) if a connection has not been attempted, accept the connection
* (2) if a connection has not been established, and the endpoints process identifier
* is less than the local process, accept the connection
* otherwise, reject the connection and continue with the current connection
*/
bool mca_btl_sctp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint, struct sockaddr_in* addr, int sd)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1 */
mca_btl_sctp_addr_t* btl_addr;
mca_btl_sctp_proc_t* this_proc = mca_btl_sctp_proc_local();
int cmpval;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
if(((btl_addr = btl_endpoint->endpoint_addr) != NULL) &&
btl_addr->addr_inet.s_addr == addr->sin_addr.s_addr)
{
mca_btl_sctp_proc_t *endpoint_proc = btl_endpoint->endpoint_proc;
cmpval = ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
&endpoint_proc->proc_ompi->proc_name,
&this_proc->proc_ompi->proc_name);
if((btl_endpoint->endpoint_sd < 0) ||
(btl_endpoint->endpoint_state != MCA_BTL_SCTP_CONNECTED &&
cmpval < 0)) {
mca_btl_sctp_endpoint_close(btl_endpoint);
SCTP_BTL_ERROR(("endpoint_close in endpoint_accept #1\n"));
btl_endpoint->endpoint_sd = sd;
if(mca_btl_sctp_endpoint_send_connect_ack(btl_endpoint) != OMPI_SUCCESS) {
mca_btl_sctp_endpoint_close(btl_endpoint);
SCTP_BTL_ERROR(("endpoint_close in endpoint_accept #2\n"));
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return false;
}
mca_btl_sctp_endpoint_event_init(btl_endpoint, sd);
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
mca_btl_sctp_endpoint_connected(btl_endpoint);
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
mca_btl_sctp_endpoint_dump(btl_endpoint, "accepted");
#endif
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return true;
}
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return false;
}
else {
/* 1 to many */
mca_btl_sctp_addr_t* btl_addr;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
if((btl_addr = btl_endpoint->endpoint_addr) != NULL) {
/* conflicts can't happen with one-to-many socket */
mca_btl_sctp_endpoint_event_init(btl_endpoint, sd);
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
mca_btl_sctp_endpoint_dump(btl_endpoint, "accepted");
#endif
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return true;
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return false;
}
}
/*
* Remove any event registrations associated with the socket
* and update the endpoint state to reflect the connection has
* been closed.
*/
void mca_btl_sctp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
{
SCTP_BTL_ERROR(("inside endpoint_close (sd = %d)\n", btl_endpoint->endpoint_sd));
if(btl_endpoint->endpoint_sd >= 0) {
opal_event_del(&btl_endpoint->endpoint_recv_event);
opal_event_del(&btl_endpoint->endpoint_send_event);
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
btl_endpoint->endpoint_sd = -1;
#if MCA_BTL_SCTP_ENDPOINT_CACHE
free( btl_endpoint->endpoint_cache );
btl_endpoint->endpoint_cache = NULL;
btl_endpoint->endpoint_cache_pos = NULL;
btl_endpoint->endpoint_cache_length = 0;
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
}
btl_endpoint->endpoint_state = MCA_BTL_SCTP_CLOSED;
btl_endpoint->endpoint_retries++;
}
void mca_btl_sctp_endpoint_shutdown(mca_btl_base_endpoint_t* btl_endpoint)
{
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
mca_btl_sctp_endpoint_close(btl_endpoint);
btl_endpoint->endpoint_state = MCA_BTL_SCTP_SHUTDOWN;
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
}
/*
* Setup endpoint state to reflect that connection has been established,
* and start any pending sends.
*/
static void mca_btl_sctp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1 */
btl_endpoint->endpoint_state = MCA_BTL_SCTP_CONNECTED;
btl_endpoint->endpoint_retries = 0;
if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
if(NULL == btl_endpoint->endpoint_send_frag) {
btl_endpoint->endpoint_send_frag = (mca_btl_sctp_frag_t*)
opal_list_remove_first(&btl_endpoint->endpoint_frags);
}
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
}
}
else {
/* 1 to many */
btl_endpoint->endpoint_state = MCA_BTL_SCTP_CONNECTED;
btl_endpoint->endpoint_retries = 0;
if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
if(NULL == btl_endpoint->endpoint_send_frag) {
btl_endpoint->endpoint_send_frag = (mca_btl_sctp_frag_t*)
opal_list_remove_first(&btl_endpoint->endpoint_frags);
}
/* TOOD make this all a function since repeated below */
/* if this endpoint is already in the sending_endpoints list; don't add duplicate */
/* if the associated endpoint is this one, avoid putting into the list */
if(btl_endpoint->endpoint_in_list ||
btl_endpoint == endpoint_associated_with_send) { /* pointer comparison assumes no dup endpoints */
; /* leave */
}
else if(NULL != endpoint_associated_with_send) {
/* another endpoint associated with sending on this one-to-many socket */
/* put this endpoint in the list of endpoints that need to send no matter what... */
our_sctp_endpoint *our_btl_endpoint;
if(0 == opal_list_get_size(&sending_endpoints_freelist))
{
/* need to expand the freelist */
our_sctp_endpoint *free_entries;
int i;
if(NULL == (endpoint_chunks_to_free_tail->next = (sending_endpoint_chunk *)
malloc(sizeof(sending_endpoint_chunk))))
{
BTL_ERROR(("cannot allocate sending endpoint chunk."));
return;
}
if(NULL == (free_entries = (our_sctp_endpoint *) malloc(sizeof(our_sctp_endpoint)
* INITIAL_NUM_FREE_POOL_SLOTS)))
{
BTL_ERROR(("cannot allocate free poll entries."));
/* only 1 of 2 newly required allocations were successful so free the other
* and reset the value so the generic destruction function works.
*/
free(endpoint_chunks_to_free_tail->next);
endpoint_chunks_to_free_tail->next = NULL;
return;
}
for(i=0; i < INITIAL_NUM_FREE_POOL_SLOTS; i++) {
opal_list_append(&sending_endpoints_freelist,
(opal_list_item_t *) &free_entries[i]);
}
/* update location of where we'll expand next (if need be) */
endpoint_chunks_to_free_tail = endpoint_chunks_to_free_tail->next;
endpoint_chunks_to_free_tail->next = NULL;
endpoint_chunks_to_free_tail->to_free = free_entries;
}
our_btl_endpoint = (our_sctp_endpoint *) opal_list_remove_first(&sending_endpoints_freelist);
memset(our_btl_endpoint, 0, sizeof(our_sctp_endpoint));
our_btl_endpoint->endpoint = btl_endpoint;
opal_list_append(&sending_endpoints, (opal_list_item_t *) our_btl_endpoint);
btl_endpoint->endpoint_in_list++;
} else {
/* no endpoint is currently associated with sending on this socket */
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
endpoint_associated_with_send = btl_endpoint;
}
}
}
}
/*
* A blocking recv on a non-blocking socket. Used to receive the small amount of connection
* information that identifies the endpoints endpoint.
*/
static int mca_btl_sctp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1 */
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
int retval;
int msg_flags=0;
struct sctp_sndrcvinfo sri;
while(cnt < size) {
/* We don't need to pass the sockaddr and len in
* sctp_recvmsg() as it's a TCP style 1 to 1 socket.
*/
retval = sctp_recvmsg(btl_endpoint->endpoint_sd, (char *)ptr+cnt, size-cnt,
0, 0, &sri, &msg_flags);
/* remote closed connection */
if((retval == -1 && (opal_socket_errno == ECONNRESET || opal_socket_errno == EBADF))
|| retval == 0) {
mca_btl_sctp_endpoint_close(btl_endpoint);
return -1;
}
/* socket is non-blocking so handle errors */
if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("recv() failed with errno=%d",opal_socket_errno));
mca_btl_sctp_endpoint_close(btl_endpoint);
return -1;
}
continue;
}
SCTP_BTL_ERROR(("mca_btl_sctp_endpoint_recv_blocking() sd=%d, got %d bytes.\n", btl_endpoint->endpoint_sd, retval));
cnt += retval;
}
return cnt;
}
assert(0); /* not called in 1 to many, i don't think... */
return -1; /* never happens. avoids compiler warning. */
}
/*
* Receive the endpoints globally unique process identification from a newly
* connected socket and verify the expected response. If so, move the
* socket to a connected state.
*/
static int mca_btl_sctp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
{
ompi_process_name_t guid;
mca_btl_sctp_proc_t* btl_proc = btl_endpoint->endpoint_proc;
if((mca_btl_sctp_endpoint_recv_blocking(btl_endpoint, &guid,
sizeof(ompi_process_name_t))) != sizeof(ompi_process_name_t)) {
return OMPI_ERR_UNREACH;
}
ORTE_PROCESS_NAME_NTOH(guid);
/* compare this to the expected values */
if(memcmp(&btl_proc->proc_ompi->proc_name, &guid, sizeof(ompi_process_name_t)) != 0) {
BTL_ERROR(("received unexpected process identifier %s",
OMPI_NAME_PRINT(&guid)));
mca_btl_sctp_endpoint_close(btl_endpoint);
return OMPI_ERR_UNREACH;
}
/* connected */
mca_btl_sctp_endpoint_connected(btl_endpoint);
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
mca_btl_sctp_endpoint_dump(btl_endpoint, "connected");
#endif
return OMPI_SUCCESS;
}
int mca_btl_sctp_set_socket_options(int sd)
{
int optval, flags;
struct sctp_event_subscribe evnts;
/* register io event here to see populated sndrcvinfo struct */
memset(&evnts, 0, sizeof(evnts));
evnts.sctp_data_io_event = 1;
if(setsockopt(sd, IPPROTO_SCTP, SCTP_EVENTS, &evnts, sizeof(evnts)) < 0) {
BTL_ERROR(("setsockopt(SCTP_EVENTS) failed with errno=%d", opal_socket_errno));
return OMPI_ERROR;
}
/*TODO set maximum number of streams */
#if defined(SCTP_NODELAY)
optval = mca_btl_sctp_component.sctp_use_nodelay;
if(setsockopt(sd, IPPROTO_SCTP, SCTP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
BTL_ERROR(("setsockopt(SCTP_NODELAY) failed with errno=%d", opal_socket_errno));
return OMPI_ERROR;
}
#endif
#if defined(SO_SNDBUF)
if(mca_btl_sctp_component.sctp_sndbuf > 0 &&
setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_sctp_component.sctp_sndbuf, sizeof(int)) < 0) {
BTL_ERROR(("setsockopt(SO_SNDBUF) failed with errno %d", opal_socket_errno));
return OMPI_ERROR;
}
#endif
#if defined(SO_RCVBUF)
if(mca_btl_sctp_component.sctp_rcvbuf > 0 &&
setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_sctp_component.sctp_rcvbuf, sizeof(int)) < 0) {
BTL_ERROR(("setsockopt(SO_RCVBUF) failed with errno %d", opal_socket_errno));
return OMPI_ERROR;
}
#endif
/* set socket up to be non-blocking */
if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed with errno=%d", opal_socket_errno));
return OMPI_ERROR;
} else {
flags |= O_NONBLOCK;
if(fcntl(sd, F_SETFL, flags) < 0) {
BTL_ERROR(("fcntl(F_SETFL) failed with errno=%d", opal_socket_errno));
return OMPI_ERROR;
}
}
return OMPI_SUCCESS;
}
/*
* Start a connection to the endpoint. This will likely not complete,
* as the socket is set to non-blocking, so register for event
* notification of connect completion. On connection we send
* our globally unique process identifier to the endpoint and wait for
* the endpoints response.
*/
static int mca_btl_sctp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1 */
int rc;
struct sockaddr_in endpoint_addr;
btl_endpoint->endpoint_sd = socket(AF_INET, SOCK_STREAM, IPPROTO_SCTP);
if (btl_endpoint->endpoint_sd < 0) {
btl_endpoint->endpoint_retries++;
return OMPI_ERR_UNREACH;
}
/* bind NIC in this btl to this newly created socket */
if(bind(btl_endpoint->endpoint_sd, (struct sockaddr *)
&btl_endpoint->endpoint_btl->sctp_ifaddr,
sizeof(struct sockaddr_in)) < 0) {
return OMPI_ERR_FATAL;
}
/* setup socket buffer sizes */
if((rc = mca_btl_sctp_set_socket_options(btl_endpoint->endpoint_sd)) != OMPI_SUCCESS) {
return OMPI_ERR_FATAL;
}
/* setup event callbacks */
mca_btl_sctp_endpoint_event_init(btl_endpoint, btl_endpoint->endpoint_sd);
/* start the connect - will likely fail with EINPROGRESS */
memset(&endpoint_addr, 0, sizeof(endpoint_addr));
endpoint_addr.sin_family = AF_INET;
endpoint_addr.sin_addr = btl_endpoint->endpoint_addr->addr_inet;
endpoint_addr.sin_port = btl_endpoint->endpoint_addr->addr_port;
#ifdef FREEBSD
endpoint_addr.sin_len = sizeof(struct sockaddr);
#endif
if(connect(btl_endpoint->endpoint_sd, (struct sockaddr*)&endpoint_addr,
sizeof(endpoint_addr)) < 0)
{
/* non-blocking so wait for completion */
if(opal_socket_errno == EINPROGRESS ||
opal_socket_errno == EWOULDBLOCK)
{
btl_endpoint->endpoint_state = MCA_BTL_SCTP_CONNECTING;
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
return OMPI_SUCCESS;
}
SCTP_BTL_ERROR(("endpoint_close in start_connect #1\n"));
mca_btl_sctp_endpoint_close(btl_endpoint);
btl_endpoint->endpoint_retries++;
return OMPI_ERR_UNREACH;
}
/* send our globally unique process identifier to the endpoint */
if((rc = mca_btl_sctp_endpoint_send_connect_ack(btl_endpoint)) == OMPI_SUCCESS) {
btl_endpoint->endpoint_state = MCA_BTL_SCTP_CONNECT_ACK;
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
} else {
SCTP_BTL_ERROR(("endpoint_close in start_connect #2\n"));
mca_btl_sctp_endpoint_close(btl_endpoint);
}
return rc;
}
else {
/* 1 to many */
int rc;
struct sockaddr_in endpoint_addr;
/* setup event callbacks */
mca_btl_sctp_endpoint_event_init(btl_endpoint, btl_endpoint->endpoint_sd);
if (btl_endpoint->endpoint_sd < 0) {
btl_endpoint->endpoint_retries++;
return OMPI_ERR_UNREACH;
}
endpoint_addr = mca_btl_sctp_utils_sockaddr_from_endpoint(btl_endpoint);
/* send our globally unique process identifier to the endpoint */
if((rc = mca_btl_sctp_endpoint_send_connect_ack(btl_endpoint)) == OMPI_SUCCESS) {
/* Changing this to CONNECTED so that we bipass application level acks. */
mca_btl_sctp_endpoint_connected(btl_endpoint);
} else {
SCTP_BTL_ERROR(("endpoint_close in start_connect #3\n"));
mca_btl_sctp_endpoint_close(btl_endpoint);
}
return rc;
}
}
/*
* Check the status of the connection. If the connection failed, will retry
* later. Otherwise, send this processes identifier to the endpoint on the
* newly connected socket.
*/
static void mca_btl_sctp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_endpoint)
{
int so_error = 0;
opal_socklen_t so_length = sizeof(so_error);
/* unregister from receiving event notifications */
opal_event_del(&btl_endpoint->endpoint_send_event);
/* check connect completion status */
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
BTL_ERROR(("getsockopt() failed with errno=%d", opal_socket_errno));
mca_btl_sctp_endpoint_close(btl_endpoint);
return;
}
if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) {
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
return;
}
if(so_error != 0) {
BTL_ERROR(("connect() failed with errno=%d", so_error));
mca_btl_sctp_endpoint_close(btl_endpoint);
return;
}
if(mca_btl_sctp_endpoint_send_connect_ack(btl_endpoint) == OMPI_SUCCESS) {
btl_endpoint->endpoint_state = MCA_BTL_SCTP_CONNECT_ACK;
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
} else {
mca_btl_sctp_endpoint_close(btl_endpoint);
}
}
/* used in 1-1 only */
/*
* A file descriptor is available/ready for recv. Check the state
* of the socket and take the appropriate action.
*/
static void mca_btl_sctp_endpoint_recv_handler(int sd, short flags, void* user)
{
char *foo;
int foo_len;
mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
switch(btl_endpoint->endpoint_state) {
case MCA_BTL_SCTP_CONNECT_ACK:
{
mca_btl_sctp_endpoint_recv_connect_ack(btl_endpoint);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
break;
}
case MCA_BTL_SCTP_CONNECTED:
{
mca_btl_sctp_frag_t* frag;
frag = btl_endpoint->endpoint_recv_frag;
if(NULL == frag) {
int rc;
if(mca_btl_sctp_module.super.btl_max_send_size >
mca_btl_sctp_module.super.btl_eager_limit) {
MCA_BTL_SCTP_FRAG_ALLOC_MAX(frag, rc);
} else {
MCA_BTL_SCTP_FRAG_ALLOC_EAGER(frag, rc);
}
if(NULL == frag) {
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return;
}
MCA_BTL_SCTP_FRAG_INIT_DST(frag, btl_endpoint);
}
#if MCA_BTL_SCTP_ENDPOINT_CACHE
assert( 0 == btl_endpoint->endpoint_cache_length );
data_still_pending_on_endpoint:
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
/* check for completion of non-blocking recv on the current fragment */
/* Changed frag_recv call so adding dummy params here to allow
* compilation. This setup does not work for 1 to 1.
*/
if(mca_btl_sctp_frag_recv(frag, sd, foo, foo_len) == false) {
btl_endpoint->endpoint_recv_frag = frag;
} else {
btl_endpoint->endpoint_recv_frag = NULL;
switch(frag->hdr.type) {
case MCA_BTL_SCTP_HDR_TYPE_SEND:
{
mca_btl_active_message_callback_t* reg;
reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag;
reg->cbfunc(&frag->btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata);
break;
}
default:
break;
}
#if MCA_BTL_SCTP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) {
/* If the cache still contain some data we can reuse the same fragment
* until we flush it completly.
*/
MCA_BTL_SCTP_FRAG_INIT_DST(frag, btl_endpoint);
goto data_still_pending_on_endpoint;
}
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
MCA_BTL_SCTP_FRAG_RETURN(frag);
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
#if MCA_BTL_SCTP_ENDPOINT_CACHE
assert( 0 == btl_endpoint->endpoint_cache_length );
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
break;
}
case MCA_BTL_SCTP_SHUTDOWN:
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
break;
default:
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
mca_btl_sctp_endpoint_close(btl_endpoint);
break;
}
}
/*
* A file descriptor is available/ready for send. Check the state
* of the socket and take the appropriate action.
*/
static void mca_btl_sctp_endpoint_send_handler(int sd, short flags, void* user)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1 */
mca_btl_sctp_endpoint_t* btl_endpoint = (mca_btl_sctp_endpoint_t *)user;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
switch(btl_endpoint->endpoint_state) {
case MCA_BTL_SCTP_CONNECTING:
mca_btl_sctp_endpoint_complete_connect(btl_endpoint);
break;
case MCA_BTL_SCTP_CONNECTED:
{
int btl_ownership;
/* complete the current send */
do {
mca_btl_sctp_frag_t* frag = btl_endpoint->endpoint_send_frag;
if(mca_btl_sctp_frag_send(frag, btl_endpoint->endpoint_sd) == false) {
break;
}
/* progress any pending sends */
btl_endpoint->endpoint_send_frag = (mca_btl_sctp_frag_t*)
opal_list_remove_first(&btl_endpoint->endpoint_frags);
btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
/* if required - update request status and release fragment */
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
if( btl_ownership ) {
MCA_BTL_SCTP_FRAG_RETURN(frag);
}
} while (NULL != btl_endpoint->endpoint_send_frag);
/* if nothing else to do unregister for send event notifications */
if(NULL == btl_endpoint->endpoint_send_frag) {
opal_event_del(&btl_endpoint->endpoint_send_event);
}
break;
}
default:
BTL_ERROR(("invalid connection state (%d)",
btl_endpoint->endpoint_state));
opal_event_del(&btl_endpoint->endpoint_send_event);
break;
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
}
else {
/* 1 to many */
mca_btl_sctp_endpoint_t* btl_endpoint = (mca_btl_sctp_endpoint_t *)user;
our_sctp_endpoint *current_our_endpoint = NULL;
ompi_vpid_t vpid;
send_handler_1_to_many_different_endpoint:
vpid = btl_endpoint->endpoint_proc->proc_ompi->proc_name.vpid;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
if((mca_btl_sctp_proc_check_vpid(vpid, sender_proc_table)) == VALID_ENTRY) { int btl_ownership;
/* complete the current send */
do {
mca_btl_sctp_frag_t* frag = btl_endpoint->endpoint_send_frag;
if(NULL == frag) {
break;
}
if(mca_btl_sctp_frag_send(frag, btl_endpoint->endpoint_sd) == false) {
break;
}
/* progress any pending sends */
btl_endpoint->endpoint_send_frag = (mca_btl_sctp_frag_t*)
opal_list_remove_first(&btl_endpoint->endpoint_frags);
btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
/* if required - update request status and release fragment */
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
if( btl_ownership ) {
MCA_BTL_SCTP_FRAG_RETURN(frag);
}
}while (NULL != btl_endpoint->endpoint_send_frag);
/* if nothing else to do unregister for send event notifications */
if(NULL == btl_endpoint->endpoint_send_frag && NULL == current_our_endpoint) {
/* remove the send event with this endpoint */
opal_event_del(&btl_endpoint->endpoint_send_event);
endpoint_associated_with_send = NULL;
/* see if there is another endpoint that wants the send event */
if(opal_list_get_size(&sending_endpoints) > 0) {
/* entries in the list may be stale from walk-throughs, so prune these
* while verifying the entries. now pruned everytime the endpoint associated with
* POLLOUT on the one-to-many socket completes sending all of its frags.
*/
our_sctp_endpoint *our_btl_endpoint;
mca_btl_sctp_endpoint_t *next_endpoint;
while(opal_list_get_size(&sending_endpoints)) {
our_btl_endpoint = (our_sctp_endpoint *) opal_list_remove_first(&sending_endpoints);
next_endpoint = our_btl_endpoint->endpoint;
if(NULL == next_endpoint->endpoint_send_frag)
{
/* stale entry */
opal_list_append(&sending_endpoints_freelist, (opal_list_item_t *) our_btl_endpoint);
our_btl_endpoint = NULL;
next_endpoint->endpoint_in_list--;
continue;
}
else
{
/* go with this one */
break;
}
}
if(NULL != our_btl_endpoint)
{
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_LOCK(&next_endpoint->endpoint_send_lock);
btl_endpoint = next_endpoint;
assert(btl_endpoint->endpoint_in_list > 0);
btl_endpoint->endpoint_in_list--;
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
opal_list_append(&sending_endpoints_freelist, (opal_list_item_t *) our_btl_endpoint);
endpoint_associated_with_send = btl_endpoint;
goto send_handler_1_to_many_different_endpoint;
}
}
} else if(opal_list_get_size(&sending_endpoints) > 0) {
/* sending_endpoints might contain entries with different endpoints (i.e. non-duplicates) */
/* This case happens if one association on the one-to-many socket returns EAGAIN but we
* want to try another association on the same socket (in the kernel, different associations
* have different buffers despite sharing the same socket).
*/
/* if we're in the middle of a traversal and we've completed all the frags on some endpoint,
* we need to remove it from the list */
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
if(NULL == current_our_endpoint)
{
/* start traversing the list */
current_our_endpoint = (our_sctp_endpoint *) opal_list_get_first(&sending_endpoints);
sending_endpoints_walk_count = 0;
}
else
{
/* continue traversing the list */
current_our_endpoint = (our_sctp_endpoint *) opal_list_get_next(current_our_endpoint);
sending_endpoints_walk_count++;
}
if(NULL != current_our_endpoint &&
sending_endpoints_walk_count < (int) opal_list_get_size(&sending_endpoints))
{
btl_endpoint = current_our_endpoint->endpoint;
goto send_handler_1_to_many_different_endpoint;
}
else { /* done walking through sending_endpoints list */
return; /* don't want to unlock the send_lock twice */
}
}
}
else {
BTL_ERROR(("invalid connection state (%d)",
btl_endpoint->endpoint_state));
/*TODO: update del code to use sending_endpoints list */
opal_event_del(&btl_endpoint->endpoint_send_event);
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
}
}