/*
 * Copyright (c) 2009      Cisco Systems, Inc.  All rights reserved. 
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */
#include "orte_config.h"
#include "orte/constants.h"
#include "opal/types.h"

#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#include <errno.h>
#include <fcntl.h>

#include "opal/class/opal_list.h"
#include "opal/opal_socket_errno.h"
#include "opal/util/output.h"
#include "opal/util/argv.h"
#include "opal/util/if.h"
#include "opal/util/net.h"
#include "opal/dss/dss.h"

#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"

#include "orte/mca/rmcast/base/private.h"
#include "orte/mca/rmcast/base/base.h"
#include "rmcast_basic.h"

/* LOCAL DATA */
static opal_mutex_t lock;
static opal_list_t recvs;
static opal_list_t channels;
static bool init_completed = false;
static orte_rmcast_channel_t next_channel;

/* LOCAL FUNCTIONS */
static void recv_handler(int sd, short flags, void* user);

static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction);

static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket);

static void xmit_data(int sd, short flags, void* send_req);

/* LOCAL STRUCTURE VALUES */
static rmcast_base_channel_t *my_group_channel=NULL;

/* API FUNCTIONS */
static int init(void);

static void finalize(void);

static int basic_send(orte_rmcast_channel_t channel,
                      orte_rmcast_tag_t tag,
                      opal_buffer_t *buf);

static int basic_send_nb(orte_rmcast_channel_t channel,
                         orte_rmcast_tag_t tag,
                         opal_buffer_t *buf,
                         orte_rmcast_callback_fn_t cbfunc,
                         void *cbdata);

static int basic_recv(orte_rmcast_channel_t channel,
                      orte_rmcast_tag_t tag,
                      opal_buffer_t *buf);

static int basic_recv_nb(orte_rmcast_channel_t channel,
                         orte_rmcast_flag_t flags,
                         orte_rmcast_tag_t tag,
                         orte_rmcast_callback_fn_t cbfunc, void *cbdata);

static void cancel_recv(orte_rmcast_channel_t channel,
                        orte_rmcast_tag_t tag);

static int open_channel(orte_rmcast_channel_t *channel, char *name,
                        char *network, int port, char *interface, uint8_t direction);

static int close_channel(orte_rmcast_channel_t channel);

static orte_rmcast_channel_t query(void);

/* Define the module */

orte_rmcast_module_t orte_rmcast_basic_module = {
    init,
    finalize,
    basic_send,
    basic_send_nb,
    basic_recv,
    basic_recv_nb,
    cancel_recv,
    open_channel,
    close_channel,
    query
};

/* during init, we setup two channels for both xmit and recv:
 * (a) a public address announcement channel. There are two variants
 *     of this:
 *         (1) system processes - e.g., daemons, tools. This channel
 *             is reserved solely for their use in performing admin
 *             functions
 *         (2) application processes. This channel is used to announce
 *             their existence and contact info for auto-wireup
 * (b) our own group's channel, which is where our own output
 *     will be sent. At this time, we assume that we always
 *     want to hear our peers, so this channels is also
 *     bidirectional
 *
 * In addition, the HNP opens a third channel which is used solely
 * for cmd-control purposes. This is where a tool, for example, might
 * send a cmd to the HNP to take some action - there is no point in
 * having that cmd echo around to every daemon and/or other tool
 * in the system.
 */
static int init(void)
{
    rmcast_base_channel_t *chan;
    int rc;
    
    if (init_completed) {
        return ORTE_SUCCESS;
    }
    init_completed = true;
    
    OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:basic: init called",
                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
    
    /* setup the globals */
    OBJ_CONSTRUCT(&lock, opal_mutex_t);
    OBJ_CONSTRUCT(&recvs, opal_list_t);
    OBJ_CONSTRUCT(&channels, opal_list_t);
    next_channel = ORTE_RMCAST_DYNAMIC_CHANNELS;
    
    /* setup the respective public address channel */
    chan = OBJ_NEW(rmcast_base_channel_t);
    if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_TOOL) {
        chan->name = strdup("system");
        chan->channel = ORTE_RMCAST_SYS_CHANNEL;
        chan->network = orte_rmcast_base.xmit_network + ORTE_RMCAST_SYS_CHANNEL;
        chan->port = orte_rmcast_base.ports[ORTE_RMCAST_SYS_CHANNEL];
        chan->interface = orte_rmcast_base.interface;
    } else if (ORTE_PROC_IS_APP) {
        chan->name = strdup("app-announce");
        chan->channel = ORTE_RMCAST_APP_PUBLIC_CHANNEL;
        chan->network = orte_rmcast_base.xmit_network + ORTE_RMCAST_APP_PUBLIC_CHANNEL;
        chan->port = orte_rmcast_base.ports[ORTE_RMCAST_APP_PUBLIC_CHANNEL];
        chan->interface = orte_rmcast_base.interface;
    } else {
        opal_output(0, "rmcast:basic:init - unknown process type");
        return ORTE_ERR_SILENT;
    }
    OPAL_THREAD_LOCK(&lock);
    opal_list_append(&channels, &chan->item);
    OPAL_THREAD_UNLOCK(&lock);
    if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_BIDIR))) {
        ORTE_ERROR_LOG(rc);
        return rc;
    }
    
    /* finally, if we are an app, setup our grp channel, if one was given */
    if (ORTE_PROC_IS_APP && NULL != orte_rmcast_base.my_group_name) {
        chan = OBJ_NEW(rmcast_base_channel_t);
        chan->name = strdup(orte_rmcast_base.my_group_name);
        chan->channel = orte_rmcast_base.my_group_number;
        chan->network = orte_rmcast_base.xmit_network + orte_rmcast_base.my_group_number;
        chan->port = orte_rmcast_base.ports[orte_rmcast_base.my_group_number];        
        chan->interface = orte_rmcast_base.interface;
        OPAL_THREAD_LOCK(&lock);
        opal_list_append(&channels, &chan->item);
        OPAL_THREAD_UNLOCK(&lock);
        if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_XMIT))) {
            ORTE_ERROR_LOG(rc);
            return rc;
        }
        my_group_channel = chan;
    }

    return ORTE_SUCCESS;
}

static void finalize(void)
{
    opal_list_item_t *item;
    
    OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:basic: finalize called",
                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
    
    /* deconstruct the globals */
    OPAL_THREAD_LOCK(&lock);
    while (NULL != (item = opal_list_remove_first(&recvs))) {
        OBJ_RELEASE(item);
    }
    OBJ_DESTRUCT(&recvs);
    while (NULL != (item = opal_list_remove_first(&channels))) {
        OBJ_RELEASE(item);
    }
    OBJ_DESTRUCT(&channels);
    OPAL_THREAD_UNLOCK(&lock);

    OBJ_DESTRUCT(&lock);
    
    return;
}

/* internal blocking send support */
static bool send_complete;

static void internal_snd_cb(orte_rmcast_channel_t channel, opal_buffer_t *buf, void *cbdata)
{
    send_complete = true;
}

static int basic_send(orte_rmcast_channel_t channel,
                      orte_rmcast_tag_t tag,
                      opal_buffer_t *buf)
{
    opal_list_item_t *item;
    rmcast_base_channel_t *chptr, *ch;
    rmcast_base_send_t *snd;
    
    /* if we were asked to send this on our group output
     * channel, substitute it
     */
    if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
        if (NULL == my_group_channel) {
            return ORTE_ERR_NOT_FOUND;
        }
        ch = my_group_channel;
        goto process;
    }
    
    /* find the channel */
    ch = NULL;
    for (item = opal_list_get_first(&channels);
         item != opal_list_get_end(&channels);
         item = opal_list_get_next(item)) {
        chptr = (rmcast_base_channel_t*)item;
        if (channel == chptr->channel) {
            ch = chptr;
            break;
        }
    }
    if (NULL == ch) {
        /* didn't find it */
        return ORTE_ERR_NOT_FOUND;
    }
    
process:
    OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
                         "%s rmcast:basic: send of %lu bytes"
                         " called on multicast channel %03d.%03d.%03d.%03d %0x",
                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                         (unsigned long)buf->bytes_used,
                         OPAL_IF_FORMAT_ADDR(ch->network), ch->network));
    
    /* check the msg size to ensure it isn't too big */
    if (buf->bytes_used > (ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10)) {
        orte_show_help("help-orte-rmcast-basic.txt",
                       "orte-rmcast-basic:msg-too-large", true,
                       ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                       OPAL_IF_FORMAT_ADDR(ch->network), tag,
                       buf->bytes_used,
                       ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10);
        return ORTE_ERR_NOT_SUPPORTED;
    }
    
    /* queue it to be sent - preserves order! */
    snd = OBJ_NEW(rmcast_base_send_t);
    snd->data = buf;
    snd->tag = tag;
    snd->cbfunc = internal_snd_cb;
    snd->cbdata = snd;
    send_complete = false;
    
    /* add it to this channel's pending sends */
    OPAL_THREAD_LOCK(&ch->send_lock);
    opal_list_append(&ch->pending_sends, &snd->item);
    
    /* do we need to start the send event? */
    if (!ch->sends_in_progress) {
        opal_event_add(&ch->send_ev, 0);
        ch->sends_in_progress = true;
    }
    OPAL_THREAD_UNLOCK(&ch->send_lock);

    /* now wait for the send to complete */
    ORTE_PROGRESSED_WAIT(send_complete, 0, 1);
    
    return ORTE_SUCCESS;
}

static int basic_send_nb(orte_rmcast_channel_t channel,
                         orte_rmcast_tag_t tag,
                         opal_buffer_t *buf,
                         orte_rmcast_callback_fn_t cbfunc,
                         void *cbdata)
{
    opal_list_item_t *item;
    rmcast_base_channel_t *chptr, *ch;
    rmcast_base_send_t *snd;
    
    /* if we were asked to send this on our group output
     * channel, substitute it
     */
    if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
        if (NULL == my_group_channel) {
            return ORTE_ERR_NOT_FOUND;
        }
        ch = my_group_channel;
        goto process;
    }
    
    /* find the channel */
    ch = NULL;
    for (item = opal_list_get_first(&channels);
         item != opal_list_get_end(&channels);
         item = opal_list_get_next(item)) {
        chptr = (rmcast_base_channel_t*)item;
        if (channel == chptr->channel) {
            ch = chptr;
            break;
        }
    }
    if (NULL == ch) {
        /* didn't find it */
        return ORTE_ERR_NOT_FOUND;
    }
    
process:
    OPAL_OUTPUT_VERBOSE((0, orte_rmcast_base.rmcast_output,
                         "%s rmcast:basic: send_nb of %lu bytes"
                         " called on multicast channel %03d.%03d.%03d.%03d %0x",
                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                         (unsigned long)buf->bytes_used,
                         OPAL_IF_FORMAT_ADDR(ch->network), ch->network));
    
    if (buf->bytes_used > (ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10)) {
        orte_show_help("help-orte-rmcast-basic.txt",
                       "orte-rmcast-basic:msg-too-large", true,
                       ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                       OPAL_IF_FORMAT_ADDR(ch->network), tag,
                       buf->bytes_used,
                       ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10);
        return ORTE_ERR_NOT_SUPPORTED;
    }

    /* queue it to be sent - preserves order! */
    snd = OBJ_NEW(rmcast_base_send_t);
    snd->data = buf;
    snd->tag = tag;
    snd->cbfunc = cbfunc;
    snd->cbdata = cbdata;
    
    /* add it to this channel's pending sends */
    OPAL_THREAD_LOCK(&ch->send_lock);
    opal_list_append(&ch->pending_sends, &snd->item);
    
    /* do we need to start the send event? */
    if (!ch->sends_in_progress) {
        opal_event_add(&ch->send_ev, 0);
        ch->sends_in_progress = true;
    }
    OPAL_THREAD_UNLOCK(&ch->send_lock);
    
    return ORTE_SUCCESS;
}

static int basic_recv(orte_rmcast_channel_t channel,
                      orte_rmcast_tag_t tag,
                      opal_buffer_t *buf)
{
    opal_list_item_t *item;
    rmcast_base_recv_t *recvptr;
    rmcast_base_channel_t *ch, *chptr;
    
    /* find the channel */
    ch = NULL;
    for (item = opal_list_get_first(&channels);
         item != opal_list_get_end(&channels);
         item = opal_list_get_next(item)) {
        chptr = (rmcast_base_channel_t*)item;
        if (channel == chptr->channel) {
            ch = chptr;
            break;
        }
    }
    if (NULL == ch) {
        /* didn't find it */
        return ORTE_ERR_NOT_FOUND;
    }
    
    OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
                         "%s rmcast:basic: recv called on multicast channel %03d.%03d.%03d.%03d",
                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network)));
    
    recvptr = OBJ_NEW(rmcast_base_recv_t);
    recvptr->channel = channel;
    recvptr->tag = tag;
    OPAL_THREAD_LOCK(&lock);
    opal_list_append(&recvs, &recvptr->item);
    OPAL_THREAD_UNLOCK(&lock);
    
    ORTE_PROGRESSED_WAIT(recvptr->recvd, 0, 1);
    
    opal_dss.copy_payload(buf, recvptr->data);
    OPAL_THREAD_LOCK(&lock);
    opal_list_remove_item(&recvs, &recvptr->item);
    OPAL_THREAD_UNLOCK(&lock);
    OBJ_RELEASE(recvptr);
    
    return ORTE_SUCCESS;
}

static int basic_recv_nb(orte_rmcast_channel_t channel,
                         orte_rmcast_flag_t flags,
                         orte_rmcast_tag_t tag,
                         orte_rmcast_callback_fn_t cbfunc, void *cbdata)
{
    opal_list_item_t *item;
    rmcast_base_recv_t *recvptr;
    rmcast_base_channel_t *ch, *chptr;

    /* find the channel */
    ch = NULL;
    for (item = opal_list_get_first(&channels);
         item != opal_list_get_end(&channels);
         item = opal_list_get_next(item)) {
        chptr = (rmcast_base_channel_t*)item;
        if (channel == chptr->channel) {
            ch = chptr;
            break;
        }
    }
    if (NULL == ch) {
        /* didn't find it */
        return ORTE_ERR_NOT_FOUND;
    }
    
    /* do we already have a recv for this channel/tag/cbfunc? */
    OPAL_THREAD_LOCK(&lock);
    for (item = opal_list_get_first(&recvs);
         item != opal_list_get_end(&recvs);
         item = opal_list_get_next(item)) {
        recvptr = (rmcast_base_recv_t*)item;
        if (channel == recvptr->channel &&
            tag == recvptr->tag &&
            cbfunc == recvptr->cbfunc) {
            /* matching recv in place */
            OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
                                 "%s rmcast:basic: matching recv_nb already active on multicast channel %03d.%03d.%03d.%03d",
                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network)));
            return ORTE_SUCCESS;
        }
    }
    
    OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
                         "%s rmcast:basic: recv_nb called on multicast channel %03d.%03d.%03d.%03d",
                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network)));
    
    recvptr = OBJ_NEW(rmcast_base_recv_t);
    recvptr->channel = channel;
    recvptr->tag = tag;
    recvptr->flags = flags;
    recvptr->cbfunc = cbfunc;
    recvptr->cbdata = cbdata;
    OPAL_THREAD_LOCK(&lock);
    opal_list_append(&recvs, &recvptr->item);
    OPAL_THREAD_UNLOCK(&lock);
    
    return ORTE_SUCCESS;
}

static void cancel_recv(orte_rmcast_channel_t channel,
                        orte_rmcast_tag_t tag)
{
    opal_list_item_t *item, *next;
    rmcast_base_recv_t *ptr;
    
    /* find all recv's for this channel and tag */
    item = opal_list_get_first(&recvs);
    while (item != opal_list_get_end(&recvs)) {
        next = opal_list_get_next(item);
        
        ptr = (rmcast_base_recv_t*)item;
        if (channel == ptr->channel &&
            tag == ptr->tag) {
            OPAL_THREAD_LOCK(&lock);
            opal_list_remove_item(&recvs, &ptr->item);
            OBJ_RELEASE(ptr);
            OPAL_THREAD_UNLOCK(&lock);
        }
        item = next;
    }
}

static int open_channel(orte_rmcast_channel_t *channel, char *name,
                        char *network, int port, char *interface, uint8_t direction)
{
    opal_list_item_t *item;
    rmcast_base_channel_t *nchan, *chan;
    uint32_t netaddr=0, netmask=0, intr=0;
    int rc;
    
    /* parse the network, if provided */
    if (NULL != network) {
        if (ORTE_SUCCESS != (rc = opal_iftupletoaddr(network, &netaddr, &netmask))) {
            orte_show_help("help-rmcast-base.txt", "invalid-net-mask", true, network, ORTE_ERROR_NAME(rc));
            return ORTE_ERR_SILENT;
        }        
    }
    
    /* parse the interface, if provided */
    if (NULL != interface) {
        if (ORTE_SUCCESS != (rc = opal_iftupletoaddr(interface, &intr, NULL))) {
            orte_show_help("help-rmcast-base.txt", "invalid-net-mask", true, interface, ORTE_ERROR_NAME(rc));
            return ORTE_ERR_SILENT;
        }        
    }
    
    /* see if this name has already been assigned a channel on the specified network */
    chan = NULL;
    for (item = opal_list_get_first(&channels);
         item != opal_list_get_end(&channels);
         item = opal_list_get_next(item)) {
        nchan = (rmcast_base_channel_t*)item;
        
        if (0 == strcasecmp(nchan->name, name)) {
            /* check the network, if one was specified */
            if (0 != netaddr && netaddr != (nchan->network & netmask)) {
                continue;
            }
            /* check the channel, if one was given */
            if (ORTE_RMCAST_INVALID_CHANNEL != *channel &&
                nchan->channel != *channel) {
                continue;
            }
            chan = nchan;
            break;
        }
    }
    
    if (NULL != chan) {
        /* already exists - check that the requested
         * sockets are setup
         */
        if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) {
            ORTE_ERROR_LOG(rc);
            return rc;
        }
        return ORTE_SUCCESS;
    }
    
    /* we didn't find an existing match, so create a new channel */
    chan = OBJ_NEW(rmcast_base_channel_t);  /* puts it on list */
    chan->name = strdup(name);
    /* if we were given a channel, then just use it */
    if (ORTE_RMCAST_INVALID_CHANNEL != *channel) {
        chan->channel = *channel;
    } else {
        chan->channel = next_channel++;
        *channel = chan->channel;
    }
    /* if we were not given a network, use the default */
    if (NULL == network) {
        chan->network = orte_rmcast_base.xmit_network + chan->channel;
    } else {
        chan->network = netaddr;
    }
    /* if we were not given an interface, use the default */
    if (NULL == interface) {
        chan->interface = orte_rmcast_base.interface;
    } else {
        chan->interface = intr;
    }
    /* if we were not given a port, use a default one */
    if (port < 0) {
        chan->port = orte_rmcast_base.ports[chan->channel];
    } else {
        chan->port = port;
    }
    OPAL_THREAD_LOCK(&lock);
    opal_list_append(&channels, &chan->item);
    OPAL_THREAD_UNLOCK(&lock);
    
    if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) {
        ORTE_ERROR_LOG(rc);
        return rc;
    }
    
    return ORTE_SUCCESS;
}

static int close_channel(orte_rmcast_channel_t channel)
{
    opal_list_item_t *item;
    rmcast_base_channel_t *chan;

    OPAL_THREAD_LOCK(&lock);
    for (item = opal_list_get_first(&channels);
         item != opal_list_get_end(&channels);
         item = opal_list_get_next(item)) {
        chan = (rmcast_base_channel_t*)item;
        
        if (channel == chan->channel) {
            opal_list_remove_item(&channels, item);
            OBJ_RELEASE(chan);
            OPAL_THREAD_UNLOCK(&lock);
            return ORTE_SUCCESS;
        }
    }
    
    OPAL_THREAD_UNLOCK(&lock);
    return ORTE_ERR_NOT_FOUND;
}

static orte_rmcast_channel_t query(void)
{
    return orte_rmcast_base.my_group_number;
}


/****    LOCAL FUNCTIONS    ****/

static void process_recv(int fd, short event, void *cbdata)
{
    orte_mcast_msg_event_t *mev = (orte_mcast_msg_event_t*)cbdata;
    rmcast_base_channel_t *chan = mev->channel;
    opal_list_item_t *item, *next;
    rmcast_base_recv_t *ptr;
    uint8_t *payload, *data;
    uint32_t tmp;
    orte_process_name_t name;
    uint16_t tmp16;
    orte_rmcast_tag_t tag;
    opal_buffer_t buf;

    /* point to the data payload */
    data = (uint8_t*)mev->data;
    
    /* extract the name and convert it to host order */
    memcpy(&tmp, &data[0], 4);
    name.jobid = ntohl(tmp);
    memcpy(&tmp, &data[4], 4);
    name.vpid = ntohl(tmp);
    
    OPAL_OUTPUT_VERBOSE((4, orte_rmcast_base.rmcast_output,
                         "%s rmcast:basic:recv sender: %s",
                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                         ORTE_NAME_PRINT(&name)));
    
    /* if this message is from myself, ignore it */
    if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) {
        OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
                             "%s rmcast:basic:recv sent from myself: %s",
                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                             ORTE_NAME_PRINT(&name)));
        goto cleanup;
    }
    
    /* extract the target tag */
    memcpy(&tmp16, &data[8], 2);
    tag = ntohs(tmp16);
    
    OPAL_OUTPUT_VERBOSE((4, orte_rmcast_base.rmcast_output,
                         "%s rmcast:basic:recv got tag %d",
                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                         (int)tag));
    
    /* find all recv's for this channel and tag */
    item = opal_list_get_first(&recvs);
    while (item != opal_list_get_end(&recvs)) {
        next = opal_list_get_next(item);
        ptr = (rmcast_base_recv_t*)item;
        
        OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
                             "%s rmcast:basic:recv checking channel %d tag %d",
                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                             (int)ptr->channel, (int)ptr->tag));
        
        if ((chan->channel == ptr->channel || ORTE_RMCAST_WILDCARD_CHANNEL == ptr->channel) &&
            (tag == ptr->tag || ORTE_RMCAST_TAG_WILDCARD == ptr->tag)) {
            /* data must be placed in malloc'd area for buffer */
            payload = (uint8_t*)malloc(mev->sz-10);
            memcpy(payload, &data[10], mev->sz-10);
            
            /* create a buffer for the data */
            OBJ_CONSTRUCT(&buf, opal_buffer_t);
            
            /* load the data into the buffer */
            opal_dss.load(&buf, payload, mev->sz-10);
            
            if (NULL != ptr->cbfunc) {
                ptr->cbfunc(ptr->channel, &buf, ptr->cbdata);
                /* if it isn't persistent, remove it */
                if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
                    OPAL_THREAD_LOCK(&lock);
                    opal_list_remove_item(&recvs, &ptr->item);
                    OPAL_THREAD_UNLOCK(&lock);
                    OBJ_RELEASE(ptr);
                }
            } else {
                /* flag it as recvd to release blocking recv */
                ptr->recvd = true;
            }
            OBJ_DESTRUCT(&buf);  /* will release the data */
        }
        /* move along list */
        item = next;
    }
    
cleanup:
    OBJ_RELEASE(mev);
    return;
}

static void recv_handler(int sd, short flags, void* cbdata)
{
    uint8_t *data;
    ssize_t sz;
    rmcast_base_channel_t *chan = (rmcast_base_channel_t*)cbdata;
    
    /* read the data */
    data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size * sizeof(uint8_t));
    sz = read(sd, data, mca_rmcast_basic_component.max_msg_size);
    
    OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
                         "%s rmcast:basic recvd %d bytes from channel %d",
                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                         (int)sz, (int)chan->channel));

    /* clear the way for the next message */
    ORTE_MULTICAST_MESSAGE_EVENT(data, sz, chan, process_recv);
    
    return;
}

static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction)
{
    int rc;
    int xmitsd, recvsd;
    
    /* setup the IPv4 addr info */
    chan->addr.sin_family = AF_INET;
    chan->addr.sin_addr.s_addr = htonl(chan->network);
    chan->addr.sin_port = htons(chan->port);
    
    OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
                         "setup:channel addr %03d.%03d.%03d.%03d port %d",
                         OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port));
    
    if (0 > chan->xmit && ORTE_RMCAST_XMIT & direction) {
        /* create a xmit socket */
        if (ORTE_SUCCESS != (rc = setup_socket(&xmitsd, chan, false))) {
            ORTE_ERROR_LOG(rc);
            return rc;
        }
        chan->xmit = xmitsd;
        chan->send_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size);
        /* setup the event to xmit messages, but don't activate it */
        opal_event_set(&chan->send_ev, chan->xmit, OPAL_EV_WRITE, xmit_data, chan);        
    }
    
    if (0 > chan->recv && ORTE_RMCAST_RECV & direction) {
        /* create a recv socket */
        if (ORTE_SUCCESS != (rc = setup_socket(&recvsd, chan, true))) {
            ORTE_ERROR_LOG(rc);
            return rc;
        }
        chan->recv = recvsd;
        
        /* setup an event to catch messages */
        opal_event_set(&chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan);
        opal_event_add(&chan->recv_ev, 0);
    }

    return ORTE_SUCCESS;
}

static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket)
{
    uint8_t ttl = 1;
    struct sockaddr_in inaddr;
    struct ip_mreq req;
    int addrlen;
    int target_sd;
    int flags;

    OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
                         "setup:socket addr %03d.%03d.%03d.%03d port %d",
                         OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port));
    
    target_sd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
    if(target_sd < 0) {
        if (EAFNOSUPPORT != opal_socket_errno) {
            opal_output(0,"rmcast:init: socket() failed: %s (%d)", 
                        strerror(opal_socket_errno), opal_socket_errno);
        }
        return ORTE_ERR_IN_ERRNO;
    }
    
    /* set the multicast flags */
    if ((setsockopt(target_sd, IPPROTO_IP, IP_MULTICAST_TTL, 
                    (void *)&ttl, sizeof(ttl))) < 0) {
        opal_output(0,"rmcast:init: socketopt() failed on MULTICAST_TTL: %s (%d)", 
                    strerror(opal_socket_errno), opal_socket_errno);
        return ORTE_ERR_IN_ERRNO;
    }
    
    /* enable port sharing */
    flags = 1;
    if (setsockopt (target_sd, SOL_SOCKET, SO_REUSEADDR, (const char *)&flags, sizeof(flags)) < 0) {
        opal_output(0, "rmcast:basic: unable to set the "
                    "SO_REUSEADDR option (%s:%d)\n",
                    strerror(opal_socket_errno), opal_socket_errno);
        CLOSE_THE_SOCKET(target_sd);
        return ORTE_ERROR;
    }

    /* if this is the recv side... */
    if (recvsocket) {
        memset(&inaddr, 0, sizeof(inaddr));
        inaddr.sin_family = AF_INET;
        inaddr.sin_addr.s_addr = htonl(chan->network);
        inaddr.sin_port = htons(chan->port);
        addrlen = sizeof(struct sockaddr_in);
        
        /* bind the socket */
        if (bind(target_sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
            opal_output(0, "%s rmcast:init: bind() failed for addr %03d.%03d.%03d.%03d port %d\n\tError: %s (%d)",
                        ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port,
                        strerror(opal_socket_errno), opal_socket_errno);
            CLOSE_THE_SOCKET(target_sd);
            return ORTE_ERROR;
        }
        /* set membership on the multicast interface */
        memset(&req, 0, sizeof (req));
        req.imr_multiaddr.s_addr = htonl(chan->network);
        req.imr_interface.s_addr = htonl(chan->interface);
        
        OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
                             "setup:socket:membership addr %03d.%03d.%03d.%03d interface %03d.%03d.%03d.%03d",
                             OPAL_IF_FORMAT_ADDR(chan->network), OPAL_IF_FORMAT_ADDR(chan->interface)));

        if ((setsockopt(target_sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, 
                        (void *)&req, sizeof (req))) < 0) {
            opal_output(0, "%s rmcast:init: setsockopt() failed on ADD_MEMBERSHIP\n"
                        "\tfor multicast network %03d.%03d.%03d.%03d interface %03d.%03d.%03d.%03d\n\tError: %s (%d)",
                        ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                        OPAL_IF_FORMAT_ADDR(chan->network), OPAL_IF_FORMAT_ADDR(chan->interface),
                        strerror(opal_socket_errno), opal_socket_errno);
            CLOSE_THE_SOCKET(target_sd);
            return ORTE_ERROR;
        }
    } else {
        /* on the xmit side, need to set the interface */
        memset(&inaddr, 0, sizeof(inaddr));
        inaddr.sin_addr.s_addr = htonl(chan->interface);
        addrlen = sizeof(struct sockaddr_in);
        
        OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
                             "setup:socket:xmit interface %03d.%03d.%03d.%03d",
                             OPAL_IF_FORMAT_ADDR(chan->interface)));
        
        if ((setsockopt(target_sd, IPPROTO_IP, IP_MULTICAST_IF, 
                        (void *)&inaddr, addrlen)) < 0) {
            opal_output(0, "%s rmcast:init: setsockopt() failed on MULTICAST_IF\n"
                        "\tfor multicast network %03d.%03d.%03d.%03d interface %03d.%03d.%03d.%03d\n\tError: %s (%d)",
                        ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                        OPAL_IF_FORMAT_ADDR(chan->network), OPAL_IF_FORMAT_ADDR(chan->interface),
                        strerror(opal_socket_errno), opal_socket_errno);
            CLOSE_THE_SOCKET(target_sd);
            return ORTE_ERROR;
        }
    }
    
    /* set socket up to be non-blocking */
    if((flags = fcntl(target_sd, F_GETFL, 0)) < 0) {
        opal_output(0, "rmcast:init: fcntl(F_GETFL) failed: %s (%d)", 
                    strerror(opal_socket_errno), opal_socket_errno);
        return ORTE_ERROR;
    } else {
        flags |= O_NONBLOCK;
        if(fcntl(target_sd, F_SETFL, flags) < 0) {
            opal_output(0, "rmcast:init: fcntl(F_SETFL) failed: %s (%d)", 
                        strerror(opal_socket_errno), opal_socket_errno);
            return ORTE_ERROR;
        }
    }
    
    /* return the socket */
    *sd = target_sd;
    
    return ORTE_SUCCESS;
}

static void xmit_data(int sd, short flags, void* send_req)
{
    rmcast_base_channel_t *chan = (rmcast_base_channel_t*)send_req;
    rmcast_base_send_t *snd;
    opal_list_item_t *item;
    char *bytes;
    int32_t sz;
    int rc;
    uint16_t tmp;
    uint32_t nm;

    OPAL_THREAD_LOCK(&chan->send_lock);
    while (NULL != (item = opal_list_remove_first(&chan->pending_sends))) {
        snd = (rmcast_base_send_t*)item;
        
        /* extract the payload */
        opal_dss.unload(snd->data, (void**)&bytes, &sz);
        
        /* start the send data area with our name in network-byte-order */
        nm = htonl(ORTE_PROC_MY_NAME->jobid);
        memcpy(&chan->send_data[0], &nm, 4);
        nm = htonl(ORTE_PROC_MY_NAME->vpid);
        memcpy(&chan->send_data[4], &nm, 4);
        
        /* add the tag data, also converted */
        tmp = htons(snd->tag);
        memcpy(&chan->send_data[8], &tmp, 2);
        
        /* add the payload, up to the limit */
        memcpy(&chan->send_data[10], bytes, sz);
        
        OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
                             "%s rmcast:basic sending %d bytes to tag %d",
                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                             (int)(sz+10), (int)snd->tag));
        
        if ((sz+10) != (rc = sendto(chan->xmit, chan->send_data, sz+10, 0,
                    (struct sockaddr *)&(chan->addr), sizeof(struct sockaddr_in)))) {
            /* didn't get the message out */
            opal_output(0, "%s failed to send message - size %d may be too large (limit: %d)",
                        ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                        (int)(sz+10), ORTE_RMCAST_BASIC_MAX_MSG_SIZE);
            /* reload into original buffer */
            opal_dss.load(snd->data, (void*)bytes, sz);
            /* cleanup */
            OBJ_RELEASE(item);
            continue;
       }

        /* reload into original buffer */
        opal_dss.load(snd->data, (void*)bytes, sz);
        
        /* call the cbfunc if required */
        if (NULL != snd->cbfunc) {
            snd->cbfunc(chan->channel, snd->data, snd->cbdata);
        }
        /* cleanup */
        OBJ_RELEASE(item);
    }
    
    /* cleanup */
    opal_event_del(&chan->send_ev);
    chan->sends_in_progress = false;

    OPAL_THREAD_UNLOCK(&chan->send_lock);
}