1
1
openmpi/orte/mca/rmcast/tcp/rmcast_tcp.c
Ralph Castain 62a8b73f1a Correctly handle output sent to the group input channel
This commit was SVN r23362.
2010-07-07 14:17:48 +00:00

880 строки
31 KiB
C

/*
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "opal/types.h"
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#include <errno.h>
#include <fcntl.h>
#include "opal/class/opal_list.h"
#include "opal/opal_socket_errno.h"
#include "opal/util/output.h"
#include "opal/util/argv.h"
#include "opal/util/if.h"
#include "opal/util/net.h"
#include "opal/dss/dss.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rmcast/base/private.h"
#include "orte/mca/rmcast/base/base.h"
#include "rmcast_tcp.h"
/* LOCAL DATA */
static bool init_completed = false;
static orte_job_t *daemons=NULL;
/* LOCAL FUNCTIONS */
static void recv_handler(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void relay_handler(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void relay(int fd, short event, void *cbdata);
/* API FUNCTIONS */
static int init(void);
static void finalize(void);
static int tcp_send_buffer(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
opal_buffer_t *buf);
static int tcp_send_buffer_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
opal_buffer_t *buf,
orte_rmcast_callback_buffer_fn_t cbfunc,
void *cbdata);
static int tcp_send(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
struct iovec *msg, int count);
static int tcp_send_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
struct iovec *msg, int count,
orte_rmcast_callback_fn_t cbfunc,
void *cbdata);
static int tcp_recv_buffer(orte_process_name_t *sender,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
opal_buffer_t *buf);
static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_buffer_fn_t cbfunc,
void *cbdata);
static int tcp_recv(orte_process_name_t *sender,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
struct iovec **msg, int *count);
static int tcp_recv_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_fn_t cbfunc,
void *cbdata);
static int open_channel(orte_rmcast_channel_t channel, char *name,
char *network, int port, char *interface, uint8_t direction);
/* Define the module */
orte_rmcast_module_t orte_rmcast_tcp_module = {
init,
finalize,
tcp_send,
tcp_send_nb,
tcp_send_buffer,
tcp_send_buffer_nb,
tcp_recv,
tcp_recv_nb,
tcp_recv_buffer,
tcp_recv_buffer_nb,
orte_rmcast_base_cancel_recv,
open_channel,
orte_rmcast_base_close_channel,
orte_rmcast_base_query
};
/* during init, we setup two channels for both xmit and recv:
* (a) a public address announcement channel. There are two variants
* of this:
* (1) system processes - e.g., daemons, tools. This channel
* is reserved solely for their use in performing admin
* functions
* (2) application processes. This channel is used to announce
* their existence and contact info for auto-wireup
* (b) our own group's channel, which is where our own output
* will be sent. At this time, we assume that we always
* want to hear our peers, so this channels is also
* bidirectional
*
* In addition, the HNP opens a third channel which is used solely
* for cmd-control purposes. This is where a tool, for example, might
* send a cmd to the HNP to take some action - there is no point in
* having that cmd echo around to every daemon and/or other tool
* in the system.
*/
static int init(void)
{
int rc;
if (init_completed) {
return ORTE_SUCCESS;
}
init_completed = true;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: init called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup the respective public address channel */
if (ORTE_PROC_IS_TOOL) {
/* tools only open the sys channel */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_SYS_CHANNEL, "system",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels);
orte_rmcast_base.my_input_channel = NULL;
} else if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) {
/* daemons and hnp open the sys and data server channels */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_SYS_CHANNEL, "system",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels);
orte_rmcast_base.my_input_channel = NULL;
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_DATA_SERVER_CHANNEL, "data-server",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* activate a recv to catch relays */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_MULTICAST_RELAY,
ORTE_RML_NON_PERSISTENT,
relay_handler,
NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else if (ORTE_PROC_IS_APP) {
/* apps open the app public and data server channels */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_DATA_SERVER_CHANNEL, "data-server",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* finally, if we are an app, setup our grp xmit/recv channels, if given */
if (ORTE_PROC_IS_APP && NULL != orte_rmcast_base.my_group_name) {
if (ORTE_SUCCESS != (rc = open_channel(orte_rmcast_base.my_group_number,
"recv", NULL, -1, NULL, ORTE_RMCAST_RECV))) {
ORTE_ERROR_LOG(rc);
return rc;
}
orte_rmcast_base.my_input_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels);
if (ORTE_SUCCESS != (rc = open_channel(orte_rmcast_base.my_group_number+1,
"xmit", NULL, -1, NULL, ORTE_RMCAST_XMIT))) {
ORTE_ERROR_LOG(rc);
return rc;
}
orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels);
}
} else {
opal_output(0, "rmcast:tcp:init - unknown process type");
return ORTE_ERR_SILENT;
}
if (ORTE_JOBID_WILDCARD == orte_process_info.my_hnp.jobid) {
/* set the HNP info in our contact table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_hnp_uri))) {
orte_show_help("help-orcm-ps.txt", "orcm-ps:hnp-uri-bad", true, orte_process_info.my_hnp_uri);
return rc;
}
/* extract the name */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
&orte_process_info.my_hnp, NULL))) {
orte_show_help("help-orcm-ps.txt", "orcm-ps:hnp-uri-bad", true, orte_process_info.my_hnp_uri);
return rc;
}
}
/* now activate the non-blocking recv so we catch messages */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_MULTICAST,
ORTE_RML_NON_PERSISTENT,
recv_handler,
NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
static void finalize(void)
{
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: finalize called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return;
}
/* internal blocking send support */
static bool send_complete, send_buf_complete;
static void internal_snd_cb(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
struct iovec *msg, int count, void *cbdata)
{
send_complete = true;
}
static void internal_snd_buf_cb(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void *cbdata)
{
send_buf_complete = true;
}
static int queue_xmit(rmcast_base_send_t *snd,
orte_rmcast_channel_t channel)
{
opal_list_item_t *item;
rmcast_base_channel_t *ch, *chptr;
orte_proc_t *proc;
orte_odls_child_t *child;
int rc, v;
opal_buffer_t *buf;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: send of %d %s"
" called on multicast channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == snd->iovec_array) ? (int)snd->buf->bytes_used : (int)snd->iovec_count,
(NULL == snd->iovec_array) ? "bytes" : "iovecs",
(int)channel));
/* if we were asked to send this on our group output
* channel, substitute it
*/
if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
if (NULL == orte_rmcast_base.my_output_channel) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
ch = orte_rmcast_base.my_output_channel;
goto process;
} else if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
if (NULL == orte_rmcast_base.my_input_channel) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
ch = orte_rmcast_base.my_input_channel;
goto process;
}
/* find the channel */
ch = NULL;
for (item = opal_list_get_first(&orte_rmcast_base.channels);
item != opal_list_get_end(&orte_rmcast_base.channels);
item = opal_list_get_next(item)) {
chptr = (rmcast_base_channel_t*)item;
if (channel == chptr->channel) {
ch = chptr;
break;
}
}
if (NULL == ch) {
/* didn't find it */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
process:
/* setup the message for xmission */
if (ORTE_SUCCESS != (rc = orte_rmcast_base_build_msg(ch, &buf, snd))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OPAL_THREAD_LOCK(&ch->send_lock);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp multicasting %d bytes to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buf->bytes_used,
(int)ch->channel, (int)snd->tag));
if (ORTE_PROC_IS_HNP) {
/* if we don't already have it, get the daemon object */
if (NULL == daemons) {
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
}
/* send it to each daemon */
for (v=1; v < daemons->procs->size; v++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, v))) {
continue;
}
if (0 > (rc = orte_rml.send_buffer(&proc->name, buf, ORTE_RML_TAG_MULTICAST, 0))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
/* send the message to my children */
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (0 > (rc = orte_rml.send_buffer(child->name, buf, ORTE_RML_TAG_MULTICAST, 0))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
rc = ORTE_SUCCESS;
} else {
/* send it to the HNP */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp sending multicast to HNP %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_MULTICAST_RELAY, 0))) {
ORTE_ERROR_LOG(rc);
/* didn't get the message out */
opal_output(0, "%s failed to send message to multicast channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)ch->channel);
goto cleanup;
}
rc = ORTE_SUCCESS;
}
if (NULL != snd->buf) {
/* call the cbfunc if required */
if (NULL != snd->cbfunc_buffer) {
snd->cbfunc_buffer(rc, channel, snd->tag,
ORTE_PROC_MY_NAME,
snd->buf, snd->cbdata);
}
} else {
/* call the cbfunc if required */
if (NULL != snd->cbfunc_iovec) {
snd->cbfunc_iovec(rc, channel, snd->tag,
ORTE_PROC_MY_NAME,
snd->iovec_array, snd->iovec_count, snd->cbdata);
}
}
/* roll to next message sequence number */
ORTE_MULTICAST_NEXT_SEQUENCE_NUM(ch->seq_num);
cleanup:
OBJ_RELEASE(buf);
OPAL_THREAD_UNLOCK(&ch->send_lock);
return rc;
}
static int tcp_send(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
struct iovec *msg, int count)
{
rmcast_base_send_t snd;
int ret;
/* queue it to be sent - preserves order! */
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.iovec_array = msg;
snd.iovec_count = count;
snd.tag = tag;
snd.cbfunc_iovec = internal_snd_cb;
send_complete = false;
if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&snd);
return ret;
}
/* now wait for the send to complete */
ORTE_PROGRESSED_WAIT(send_complete, 0, 1);
OBJ_DESTRUCT(&snd);
return ORTE_SUCCESS;
}
static int tcp_send_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
struct iovec *msg, int count,
orte_rmcast_callback_fn_t cbfunc,
void *cbdata)
{
int ret;
rmcast_base_send_t snd;
/* queue it to be sent - preserves order! */
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.iovec_array = msg;
snd.iovec_count = count;
snd.tag = tag;
snd.cbfunc_iovec = cbfunc;
snd.cbdata = cbdata;
if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&snd);
return ret;
}
OBJ_DESTRUCT(&snd);
return ORTE_SUCCESS;
}
static int tcp_send_buffer(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
opal_buffer_t *buf)
{
int ret;
rmcast_base_send_t snd;
/* queue it to be sent - preserves order! */
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.buf = buf;
snd.tag = tag;
snd.cbfunc_buffer = internal_snd_buf_cb;
send_buf_complete = false;
if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&snd);
return ret;
}
/* now wait for the send to complete */
ORTE_PROGRESSED_WAIT(send_buf_complete, 0, 1);
OBJ_DESTRUCT(&snd);
return ORTE_SUCCESS;
}
static int tcp_send_buffer_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
opal_buffer_t *buf,
orte_rmcast_callback_buffer_fn_t cbfunc,
void *cbdata)
{
int ret;
rmcast_base_send_t snd;
/* queue it to be sent - preserves order! */
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.buf = buf;
snd.tag = tag;
snd.cbfunc_buffer = cbfunc;
snd.cbdata = cbdata;
if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&snd);
return ret;
}
OBJ_DESTRUCT(&snd);
return ORTE_SUCCESS;
}
static int tcp_recv(orte_process_name_t *name,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
struct iovec **msg, int *count)
{
rmcast_base_recv_t *recvptr;
int ret;
orte_rmcast_channel_t chan;
if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_input_channel->channel;
} else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_output_channel->channel;
} else {
chan = channel;
}
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(&recvptr, chan, tag,
ORTE_RMCAST_NON_PERSISTENT,
NULL, NULL, NULL, true))) {
ORTE_ERROR_LOG(ret);
return ret;
}
ORTE_PROGRESSED_WAIT(recvptr->recvd, 0, 1);
/* xfer the data */
if (NULL != name) {
/* caller requested id of sender */
name->jobid = recvptr->name.jobid;
name->vpid = recvptr->name.vpid;
}
*msg = recvptr->iovec_array;
*count = recvptr->iovec_count;
/* remove the recv */
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_remove_item(&orte_rmcast_base.recvs, &recvptr->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
OBJ_RELEASE(recvptr);
return ORTE_SUCCESS;
}
static int tcp_recv_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_fn_t cbfunc, void *cbdata)
{
int ret;
orte_rmcast_channel_t chan;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: recv_nb called on channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel));
if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_input_channel->channel;
} else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_output_channel->channel;
} else {
chan = channel;
}
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(NULL, chan, tag, flags,
cbfunc, NULL, cbdata, false))) {
if (ORTE_EXISTS == ret) {
ret = ORTE_SUCCESS;
} else {
ORTE_ERROR_LOG(ret);
}
}
return ret;
}
static int tcp_recv_buffer(orte_process_name_t *name,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
opal_buffer_t *buf)
{
rmcast_base_recv_t *recvptr;
int ret;
orte_rmcast_channel_t chan;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: recv_buffer called on multicast channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel));
if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_input_channel->channel;
} else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_output_channel->channel;
} else {
chan = channel;
}
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(&recvptr, chan, tag,
ORTE_RMCAST_NON_PERSISTENT,
NULL, NULL, NULL, true))) {
ORTE_ERROR_LOG(ret);
return ret;
}
ORTE_PROGRESSED_WAIT(recvptr->recvd, 0, 1);
/* xfer the data */
if (NULL != name) {
/* caller requested id of sender */
name->jobid = recvptr->name.jobid;
name->vpid = recvptr->name.vpid;
}
if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) {
ORTE_ERROR_LOG(ret);
}
/* release the data */
OBJ_RELEASE(recvptr->buf);
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_remove_item(&orte_rmcast_base.recvs, &recvptr->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
OBJ_RELEASE(recvptr);
return ret;
}
static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_buffer_fn_t cbfunc, void *cbdata)
{
int ret;
orte_rmcast_channel_t chan;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: recv_buffer_nb called on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_input_channel->channel;
} else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_output_channel->channel;
} else {
chan = channel;
}
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(NULL, chan, tag, flags,
NULL, cbfunc, cbdata, false))) {
if (ORTE_EXISTS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
}
return ORTE_SUCCESS;
}
/* for the tcp module, we will be using the RML to "fake" a
* multicast in combination with the grpcomm "xcast" interface.
* We cannot control the network and interface in this
* combination as it gets auto-picked well before us, so we
* ignore that info here
*/
static int open_channel(orte_rmcast_channel_t channel, char *name,
char *network, int port, char *interface, uint8_t direction)
{
opal_list_item_t *item;
rmcast_base_channel_t *chan;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s opening channel %d for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, name));
/* see if this name has already been assigned a channel on the specified network */
for (item = opal_list_get_first(&orte_rmcast_base.channels);
item != opal_list_get_end(&orte_rmcast_base.channels);
item = opal_list_get_next(item)) {
chan = (rmcast_base_channel_t*)item;
if (0 == strcasecmp(chan->name, name)) {
/* check the channel, if one was given */
if (ORTE_RMCAST_INVALID_CHANNEL != channel &&
ORTE_RMCAST_INVALID_CHANNEL == chan->channel) {
chan->channel = channel;
}
/* all setup - nothing to do */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp using existing channel",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
}
/* we didn't find an existing match, so create a new channel */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s creating new channel %d for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, name));
chan = OBJ_NEW(rmcast_base_channel_t);
chan->name = strdup(name);
chan->channel = channel;
/* add to list of known channels */
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_append(&orte_rmcast_base.channels, &chan->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp opening new channel for%s%s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(ORTE_RMCAST_RECV & direction) ? " RECV" : " ",
(ORTE_RMCAST_XMIT & direction) ? " XMIT" : " "));
return ORTE_SUCCESS;
}
/**** LOCAL FUNCTIONS ****/
static void process_recv(int fd, short event, void *cbdata)
{
orte_mcast_msg_event_t *mev = (orte_mcast_msg_event_t*)cbdata;
opal_list_item_t *item;
orte_odls_child_t *child;
int rc;
/* if I am a daemon, I need to relay this to my children first */
if (ORTE_PROC_IS_DAEMON) {
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s relaying multicast to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
if (0 > (rc = orte_rml.send_buffer(child->name, mev->buf, ORTE_RML_TAG_MULTICAST, 0))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
}
/* process the receive */
orte_rmcast_base_process_recv(mev);
cleanup:
OBJ_RELEASE(mev);
return;
}
static void recv_handler(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int rc;
opal_buffer_t *buf;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp recvd multicast msg",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* clear the way for the next message */
buf = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(buf, buffer);
ORTE_MULTICAST_MESSAGE_EVENT(buf, process_recv);
/* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_MULTICAST,
ORTE_RML_NON_PERSISTENT,
recv_handler,
NULL))) {
ORTE_ERROR_LOG(rc);
}
return;
}
static void relay(int fd, short event, void *cbdata)
{
orte_message_event_t *msg = (orte_message_event_t*)cbdata;
orte_proc_t *proc;
opal_list_item_t *item;
orte_odls_child_t *child;
int rc, v;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp relaying multicast msg from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->sender)));
/* if we don't already have it, get the daemon object */
if (NULL == daemons) {
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
}
/* send it to each daemon other than the one that sent it to me */
for (v=1; v < daemons->procs->size; v++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, v))) {
continue;
}
if (proc->name.vpid == msg->sender.vpid) {
continue;
}
if (0 > (rc = orte_rml.send_buffer(&proc->name, msg->buffer, ORTE_RML_TAG_MULTICAST, 0))) {
ORTE_ERROR_LOG(rc);
}
}
/* send the message to my children */
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (0 > (rc = orte_rml.send_buffer(child->name, msg->buffer, ORTE_RML_TAG_MULTICAST, 0))) {
ORTE_ERROR_LOG(rc);
}
}
/* now process it myself */
ORTE_MULTICAST_MESSAGE_EVENT(msg->buffer, process_recv);
/* protect the buffer */
msg->buffer = NULL;
OBJ_RELEASE(msg);
}
static void relay_handler(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int rc;
/* if the message is from myself, ignore it */
if (sender->jobid == ORTE_PROC_MY_NAME->jobid &&
sender->vpid == ORTE_PROC_MY_NAME->vpid) {
return;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp relay multicast msg from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* clear the way for the next message */
ORTE_MESSAGE_EVENT(sender, buffer, tag, relay);
/* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_MULTICAST_RELAY,
ORTE_RML_NON_PERSISTENT,
relay_handler,
NULL))) {
ORTE_ERROR_LOG(rc);
}
return;
}