diff --git a/orte/mca/ess/base/ess_base_std_app.c b/orte/mca/ess/base/ess_base_std_app.c index e51e329c8b..e109417f40 100644 --- a/orte/mca/ess/base/ess_base_std_app.c +++ b/orte/mca/ess/base/ess_base_std_app.c @@ -66,20 +66,6 @@ int orte_ess_base_app_setup(void) /* Setup the communication infrastructure */ - /* start with multicast */ -#if ORTE_ENABLE_MULTICAST - if (ORTE_SUCCESS != (ret = orte_rmcast_base_open())) { - ORTE_ERROR_LOG(ret); - error = "orte_rmcast_base_open"; - goto error; - } - if (ORTE_SUCCESS != (ret = orte_rmcast_base_select())) { - ORTE_ERROR_LOG(ret); - error = "orte_rmcast_base_select"; - goto error; - } -#endif - /* Runtime Messaging Layer */ if (ORTE_SUCCESS != (ret = orte_rml_base_open())) { ORTE_ERROR_LOG(ret); @@ -117,6 +103,20 @@ int orte_ess_base_app_setup(void) goto error; } + /* multicast */ +#if ORTE_ENABLE_MULTICAST + if (ORTE_SUCCESS != (ret = orte_rmcast_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_rmcast_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_select"; + goto error; + } +#endif + /* non-daemon/HNP apps can only have the default proxy PLM * module open - provide a chance for it to initialize */ @@ -235,16 +235,16 @@ int orte_ess_base_app_finalize(void) orte_wait_finalize(); - /* now can close the rml and its friendly group comm */ - orte_grpcomm_base_close(); - orte_routed_base_close(); - orte_rml_base_close(); - /* close the multicast */ #if ORTE_ENABLE_MULTICAST orte_rmcast_base_close(); #endif + /* now can close the rml and its friendly group comm */ + orte_grpcomm_base_close(); + orte_routed_base_close(); + orte_rml_base_close(); + orte_session_dir_finalize(ORTE_PROC_MY_NAME); return ORTE_SUCCESS; diff --git a/orte/mca/ess/base/ess_base_std_orted.c b/orte/mca/ess/base/ess_base_std_orted.c index 1e9413f25d..d475ff1cfa 100644 --- a/orte/mca/ess/base/ess_base_std_orted.c +++ b/orte/mca/ess/base/ess_base_std_orted.c @@ -161,20 +161,6 @@ int orte_ess_base_orted_setup(char **hosts) /* Setup the communication infrastructure */ - /* start with multicast */ -#if ORTE_ENABLE_MULTICAST - if (ORTE_SUCCESS != (ret = orte_rmcast_base_open())) { - ORTE_ERROR_LOG(ret); - error = "orte_rmcast_base_open"; - goto error; - } - if (ORTE_SUCCESS != (ret = orte_rmcast_base_select())) { - ORTE_ERROR_LOG(ret); - error = "orte_rmcast_base_select"; - goto error; - } -#endif - /* Runtime Messaging Layer - this opens/selects the OOB as well */ if (ORTE_SUCCESS != (ret = orte_rml_base_open())) { ORTE_ERROR_LOG(ret); @@ -212,6 +198,20 @@ int orte_ess_base_orted_setup(char **hosts) goto error; } + /* multicast */ +#if ORTE_ENABLE_MULTICAST + if (ORTE_SUCCESS != (ret = orte_rmcast_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_rmcast_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_select"; + goto error; + } +#endif + /* Open/select the odls */ if (ORTE_SUCCESS != (ret = orte_odls_base_open())) { ORTE_ERROR_LOG(ret); diff --git a/orte/mca/ess/base/ess_base_std_tool.c b/orte/mca/ess/base/ess_base_std_tool.c index a965e2c4d4..3486c614cb 100644 --- a/orte/mca/ess/base/ess_base_std_tool.c +++ b/orte/mca/ess/base/ess_base_std_tool.c @@ -43,6 +43,9 @@ #include "orte/util/proc_info.h" #include "orte/util/session_dir.h" #include "orte/util/show_help.h" +#if ORTE_ENABLE_MULTICAST +#include "orte/mca/rmcast/base/base.h" +#endif #include "orte/runtime/orte_cr.h" #include "orte/runtime/orte_globals.h" @@ -89,6 +92,20 @@ int orte_ess_base_tool_setup(void) goto error; } + /* multicast */ +#if ORTE_ENABLE_MULTICAST + if (ORTE_SUCCESS != (ret = orte_rmcast_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_rmcast_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_select"; + goto error; + } +#endif + /* since I am a tool, then all I really want to do is communicate. * So setup communications and be done - finding the HNP * to which I want to communicate and setting up a route for diff --git a/orte/mca/ess/cm/ess_cm_module.c b/orte/mca/ess/cm/ess_cm_module.c index ed16338455..6eb5eda7e3 100644 --- a/orte/mca/ess/cm/ess_cm_module.c +++ b/orte/mca/ess/cm/ess_cm_module.c @@ -91,26 +91,29 @@ static int rte_init(void) goto error; } - /* open the reliable multicast framework, just in - * case we need it to query the HNP for a name - */ - if (ORTE_SUCCESS != (ret = orte_rmcast_base_open())) { - ORTE_ERROR_LOG(ret); - error = "orte_rmcast_base_open"; - goto error; - } - - if (ORTE_SUCCESS != (ret = orte_rmcast_base_select())) { - ORTE_ERROR_LOG(ret); - error = "orte_rmcast_base_select"; - goto error; - } - if (ORTE_PROC_IS_DAEMON) { - /* get a name for ourselves */ - if (ORTE_SUCCESS != (ret = cm_set_name())) { - error = "set_name"; - goto error; + /* if we do not know the HNP, then we have to + * use the multicast system to find it + */ + if (NULL == orte_process_info.my_hnp_uri) { + /* open the reliable multicast framework */ + if (ORTE_SUCCESS != (ret = orte_rmcast_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_open"; + goto error; + } + + if (ORTE_SUCCESS != (ret = orte_rmcast_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_select"; + goto error; + } + + /* get a name for ourselves */ + if (ORTE_SUCCESS != (ret = cm_set_name())) { + error = "set_name"; + goto error; + } } /* get the list of nodes used for this job */ @@ -148,10 +151,27 @@ static int rte_init(void) */ orte_plm_base_close(); - /* checkin with the HNP */ - if (ORTE_SUCCESS != (ret = cm_set_name())) { - error = "set_name"; - goto error; + /* if we do not know the HNP, then we have to use + * the multicast system to find it + */ + if (NULL == orte_process_info.my_hnp_uri) { + /* open the reliable multicast framework */ + if (ORTE_SUCCESS != (ret = orte_rmcast_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_open"; + goto error; + } + + if (ORTE_SUCCESS != (ret = orte_rmcast_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_select"; + goto error; + } + /* checkin with the HNP */ + if (ORTE_SUCCESS != (ret = cm_set_name())) { + error = "set_name"; + goto error; + } } /* do the rest of the standard tool init */ diff --git a/orte/mca/ess/hnp/ess_hnp_module.c b/orte/mca/ess/hnp/ess_hnp_module.c index 72781422e3..74c7da1f29 100644 --- a/orte/mca/ess/hnp/ess_hnp_module.c +++ b/orte/mca/ess/hnp/ess_hnp_module.c @@ -206,20 +206,6 @@ static int rte_init(void) /* Setup the communication infrastructure */ - /* start with multicast */ -#if ORTE_ENABLE_MULTICAST - if (ORTE_SUCCESS != (ret = orte_rmcast_base_open())) { - ORTE_ERROR_LOG(ret); - error = "orte_rmcast_base_open"; - goto error; - } - if (ORTE_SUCCESS != (ret = orte_rmcast_base_select())) { - ORTE_ERROR_LOG(ret); - error = "orte_rmcast_base_select"; - goto error; - } -#endif - /* * Runtime Messaging Layer */ @@ -260,6 +246,20 @@ static int rte_init(void) goto error; } + /* multicast */ +#if ORTE_ENABLE_MULTICAST + if (ORTE_SUCCESS != (ret = orte_rmcast_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_rmcast_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_rmcast_base_select"; + goto error; + } +#endif + /* Now provide a chance for the PLM * to perform any module-specific init functions. This * needs to occur AFTER the communications are setup @@ -638,16 +638,16 @@ static int rte_finalize(void) orte_plm_base_close(); orte_errmgr_base_close(); - /* now can close the rml and its friendly group comm */ - orte_grpcomm_base_close(); - orte_routed_base_close(); - orte_rml_base_close(); - /* close the multicast */ #if ORTE_ENABLE_MULTICAST orte_rmcast_base_close(); #endif - + + /* now can close the rml and its friendly group comm */ + orte_grpcomm_base_close(); + orte_routed_base_close(); + orte_rml_base_close(); + /* if we were doing timing studies, close the timing file */ if (orte_timing) { if (stdout != orte_timing_output && diff --git a/orte/mca/rmcast/rmcast_types.h b/orte/mca/rmcast/rmcast_types.h index 00356f6cc7..818b293a8e 100644 --- a/orte/mca/rmcast/rmcast_types.h +++ b/orte/mca/rmcast/rmcast_types.h @@ -65,6 +65,7 @@ typedef uint8_t orte_rmcast_flag_t; typedef uint32_t orte_rmcast_seq_t; #define ORTE_RMCAST_SEQ_MAX UINT32_MAX-1 #define ORTE_RMCAST_SEQ_INVALID UINT32_MAX +#define ORTE_RMCAST_SEQ_T OPAL_UINT32 END_C_DECLS diff --git a/orte/mca/rmcast/tcp/Makefile.am b/orte/mca/rmcast/tcp/Makefile.am new file mode 100644 index 0000000000..8cd067330b --- /dev/null +++ b/orte/mca/rmcast/tcp/Makefile.am @@ -0,0 +1,35 @@ +# +# Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if OMPI_BUILD_rmcast_tcp_DSO +component_noinst = +component_install = mca_rmcast_tcp.la +else +component_noinst = libmca_rmcast_tcp.la +component_install = +endif + +rmcast_tcp_SOURCES = \ + rmcast_tcp.c \ + rmcast_tcp.h \ + rmcast_tcp_component.c + +mcacomponentdir = $(pkglibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_rmcast_tcp_la_SOURCES = $(rmcast_tcp_SOURCES) +mca_rmcast_tcp_la_LDFLAGS = -module -avoid-version + +noinst_LTLIBRARIES = $(component_noinst) +libmca_rmcast_tcp_la_SOURCES = $(rmcast_tcp_SOURCES) +libmca_rmcast_tcp_la_LIBADD = +libmca_rmcast_tcp_la_LDFLAGS = -module -avoid-version diff --git a/orte/mca/rmcast/tcp/configure.params b/orte/mca/rmcast/tcp/configure.params new file mode 100644 index 0000000000..16467a0b98 --- /dev/null +++ b/orte/mca/rmcast/tcp/configure.params @@ -0,0 +1,13 @@ +# -*- shell-script -*- +# +# Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Specific to this module + +PARAM_CONFIG_FILES="Makefile" diff --git a/orte/mca/rmcast/tcp/rmcast_tcp.c b/orte/mca/rmcast/tcp/rmcast_tcp.c new file mode 100644 index 0000000000..b88773390f --- /dev/null +++ b/orte/mca/rmcast/tcp/rmcast_tcp.c @@ -0,0 +1,1079 @@ +/* + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "orte_config.h" +#include "orte/constants.h" +#include "opal/types.h" + +#ifdef HAVE_NETINET_IN_H +#include +#endif +#ifdef HAVE_ARPA_INET_H +#include +#endif +#include +#include + +#include "opal/class/opal_list.h" +#include "opal/opal_socket_errno.h" +#include "opal/util/output.h" +#include "opal/util/argv.h" +#include "opal/util/if.h" +#include "opal/util/net.h" +#include "opal/dss/dss.h" + +#include "orte/runtime/orte_globals.h" +#include "orte/runtime/orte_wait.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/util/name_fns.h" +#include "orte/util/show_help.h" +#include "orte/mca/grpcomm/grpcomm.h" +#include "orte/mca/rml/rml.h" +#include "orte/mca/rml/base/rml_contact.h" + +#include "orte/mca/rmcast/base/private.h" +#include "orte/mca/rmcast/base/base.h" +#include "rmcast_tcp.h" + +/* LOCAL DATA */ +static opal_mutex_t lock; +static opal_list_t recvs; +static opal_list_t channels; +static bool init_completed = false; +static orte_rmcast_channel_t next_channel; + +/* LOCAL FUNCTIONS */ +static void recv_handler(int status, orte_process_name_t* sender, + opal_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata); + +/* LOCAL STRUCTURE VALUES */ +static rmcast_base_channel_t *my_group_channel=NULL; + +/* API FUNCTIONS */ +static int init(void); + +static void finalize(void); + +static int tcp_send_buffer(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + opal_buffer_t *buf); + +static int tcp_send_buffer_nb(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + opal_buffer_t *buf, + orte_rmcast_callback_buffer_fn_t cbfunc, + void *cbdata); + +static int tcp_send(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + struct iovec *msg, int count); + +static int tcp_send_nb(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + struct iovec *msg, int count, + orte_rmcast_callback_fn_t cbfunc, + void *cbdata); + +static int tcp_recv_buffer(orte_process_name_t *sender, + orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + opal_buffer_t *buf, + orte_rmcast_seq_t *seq_num); + +static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + orte_rmcast_flag_t flags, + orte_rmcast_callback_buffer_fn_t cbfunc, + void *cbdata); + +static int tcp_recv(orte_process_name_t *sender, + orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + struct iovec **msg, int *count, + orte_rmcast_seq_t *seq_num); + +static int tcp_recv_nb(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + orte_rmcast_flag_t flags, + orte_rmcast_callback_fn_t cbfunc, + void *cbdata); + +static void cancel_recv(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag); + +static int open_channel(orte_rmcast_channel_t *channel, char *name, + char *network, int port, char *interface, uint8_t direction); + +static int close_channel(orte_rmcast_channel_t channel); + +static orte_rmcast_channel_t query(void); + +/* Define the module */ + +orte_rmcast_module_t orte_rmcast_tcp_module = { + init, + finalize, + tcp_send, + tcp_send_nb, + tcp_send_buffer, + tcp_send_buffer_nb, + tcp_recv, + tcp_recv_nb, + tcp_recv_buffer, + tcp_recv_buffer_nb, + cancel_recv, + open_channel, + close_channel, + query +}; + +/* during init, we setup two channels for both xmit and recv: + * (a) a public address announcement channel. There are two variants + * of this: + * (1) system processes - e.g., daemons, tools. This channel + * is reserved solely for their use in performing admin + * functions + * (2) application processes. This channel is used to announce + * their existence and contact info for auto-wireup + * (b) our own group's channel, which is where our own output + * will be sent. At this time, we assume that we always + * want to hear our peers, so this channels is also + * bidirectional + * + * In addition, the HNP opens a third channel which is used solely + * for cmd-control purposes. This is where a tool, for example, might + * send a cmd to the HNP to take some action - there is no point in + * having that cmd echo around to every daemon and/or other tool + * in the system. + */ +static int init(void) +{ + int rc; + orte_rmcast_channel_t channel; + + if (init_completed) { + return ORTE_SUCCESS; + } + init_completed = true; + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp: init called", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the globals */ + OBJ_CONSTRUCT(&lock, opal_mutex_t); + OBJ_CONSTRUCT(&recvs, opal_list_t); + OBJ_CONSTRUCT(&channels, opal_list_t); + next_channel = ORTE_RMCAST_DYNAMIC_CHANNELS; + + /* setup the respective public address channel */ + if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_TOOL) { + channel = ORTE_RMCAST_SYS_CHANNEL; + if (ORTE_SUCCESS != (rc = open_channel(&channel, "system", + NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { + ORTE_ERROR_LOG(rc); + return rc; + } + } else if (ORTE_PROC_IS_APP) { + channel = ORTE_RMCAST_APP_PUBLIC_CHANNEL; + if (ORTE_SUCCESS != (rc = open_channel(&channel, "app-announce", + NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { + ORTE_ERROR_LOG(rc); + return rc; + } + } else { + opal_output(0, "rmcast:tcp:init - unknown process type"); + return ORTE_ERR_SILENT; + } + + /* finally, if we are an app, setup our grp channel, if one was given */ + if (ORTE_PROC_IS_APP && NULL != orte_rmcast_base.my_group_name) { + channel = orte_rmcast_base.my_group_number; + if (ORTE_SUCCESS != (rc = open_channel(&channel, orte_rmcast_base.my_group_name, + NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { + ORTE_ERROR_LOG(rc); + return rc; + } + my_group_channel = (rmcast_base_channel_t*)opal_list_get_last(&channels); + } + + if (ORTE_JOBID_WILDCARD == orte_process_info.my_hnp.jobid) { + /* set the HNP info in our contact table */ + if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_hnp_uri))) { + orte_show_help("help-orcm-ps.txt", "orcm-ps:hnp-uri-bad", true, orte_process_info.my_hnp_uri); + return rc; + } + /* extract the name */ + if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri, + &orte_process_info.my_hnp, NULL))) { + orte_show_help("help-orcm-ps.txt", "orcm-ps:hnp-uri-bad", true, orte_process_info.my_hnp_uri); + return rc; + } + } + + /* now activate the non-blocking recv so we catch messages */ + if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, + ORTE_RML_TAG_MULTICAST, + ORTE_RML_NON_PERSISTENT, + recv_handler, + NULL))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + return ORTE_SUCCESS; +} + +static void finalize(void) +{ + opal_list_item_t *item; + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp: finalize called", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* cancel the recv */ + orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MULTICAST); + + /* deconstruct the globals */ + OPAL_THREAD_LOCK(&lock); + while (NULL != (item = opal_list_remove_first(&recvs))) { + OBJ_RELEASE(item); + } + OBJ_DESTRUCT(&recvs); + while (NULL != (item = opal_list_remove_first(&channels))) { + OBJ_RELEASE(item); + } + OBJ_DESTRUCT(&channels); + OPAL_THREAD_UNLOCK(&lock); + + OBJ_DESTRUCT(&lock); + + return; +} + +/* internal blocking send support */ +static bool send_complete, send_buf_complete; + +static void internal_snd_cb(int status, + orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + orte_process_name_t *sender, + orte_rmcast_seq_t seq_num, + struct iovec *msg, int count, void *cbdata) +{ + send_complete = true; +} + +static void internal_snd_buf_cb(int status, + orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + orte_process_name_t *sender, + orte_rmcast_seq_t seq_num, + opal_buffer_t *buf, void *cbdata) +{ + send_buf_complete = true; +} + +static int queue_xmit(rmcast_base_send_t *snd, + orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag) +{ + opal_list_item_t *item; + rmcast_base_channel_t *chptr, *ch; + int32_t sz; + int rc; + int8_t flag; + opal_buffer_t buf; + int32_t tmp32; + + /* if we were asked to send this on our group output + * channel, substitute it + */ + if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + if (NULL == my_group_channel) { + return ORTE_ERR_NOT_FOUND; + } + ch = my_group_channel; + goto process; + } + + /* find the channel */ + ch = NULL; + for (item = opal_list_get_first(&channels); + item != opal_list_get_end(&channels); + item = opal_list_get_next(item)) { + chptr = (rmcast_base_channel_t*)item; + if (channel == chptr->channel) { + ch = chptr; + break; + } + } + if (NULL == ch) { + /* didn't find it */ + return ORTE_ERR_NOT_FOUND; + } + +process: + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp: send of %d %s" + " called on multicast channel %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == snd->iovec_array) ? (int)snd->buf->bytes_used : (int)snd->iovec_count, + (NULL == snd->iovec_array) ? "bytes" : "iovecs", + (int)ch->channel)); + + + OPAL_THREAD_LOCK(&ch->send_lock); + /* setup a buffer */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + + /* pack our name */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack the channel */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &channel, 1, ORTE_RMCAST_CHANNEL_T))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack the tag */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &tag, 1, ORTE_RMCAST_TAG_T))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack the sequence number */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &ch->seq_num, 1, ORTE_RMCAST_SEQ_T))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* are we sending a buffer? */ + if (NULL == snd->buf) { + /* no, flag the buffer as containing iovecs */ + flag = 0; + opal_dss.pack(&buf, &flag, 1, OPAL_INT8); + + /* pack the number of iovecs */ + opal_dss.pack(&buf, &snd->iovec_count, 1, OPAL_INT32); + + /* pack each iovec into a buffer in prep for sending + * so we can recreate the array at the other end + */ + for (sz=0; sz < snd->iovec_count; sz++) { + /* pack the size */ + tmp32 = snd->iovec_array[sz].iov_len; + opal_dss.pack(&buf, &tmp32, 1, OPAL_INT32); + /* pack the bytes */ + opal_dss.pack(&buf, &(snd->iovec_array[sz].iov_base), tmp32, OPAL_UINT8); + } + + } else { + /* flag it as being a buffer */ + flag = 1; + opal_dss.pack(&buf, &flag, 1, OPAL_INT8); + + /* copy the payload */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, snd->buf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp multicasting %d bytes to channel %d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buf.bytes_used, + (int)channel, (int)tag)); + + if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(ORTE_JOBID_WILDCARD, + &buf, ORTE_RML_TAG_MULTICAST))) { + /* didn't get the message out */ + opal_output(0, "%s failed to send message to multicast channel %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)channel); + goto cleanup; + } + + if (1 == flag) { + /* call the cbfunc if required */ + if (NULL != snd->cbfunc_buffer) { + snd->cbfunc_buffer(rc, channel, tag, + ORTE_PROC_MY_NAME, ch->seq_num, + snd->buf, snd->cbdata); + } + } else { + /* call the cbfunc if required */ + if (NULL != snd->cbfunc_iovec) { + snd->cbfunc_iovec(rc, channel, tag, + ORTE_PROC_MY_NAME, ch->seq_num, + snd->iovec_array, snd->iovec_count, snd->cbdata); + } + } + + /* roll to next message sequence number */ + ORTE_MULTICAST_NEXT_SEQUENCE_NUM(ch->seq_num); + +cleanup: + OBJ_DESTRUCT(&buf); + + OPAL_THREAD_UNLOCK(&ch->send_lock); + + return ORTE_SUCCESS; +} + +static int tcp_send(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + struct iovec *msg, int count) +{ + rmcast_base_send_t snd; + int ret; + + /* queue it to be sent - preserves order! */ + OBJ_CONSTRUCT(&snd, rmcast_base_send_t); + snd.iovec_array = msg; + snd.iovec_count = count; + snd.tag = tag; + snd.cbfunc_iovec = internal_snd_cb; + send_complete = false; + + if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel, tag))) { + ORTE_ERROR_LOG(ret); + OBJ_DESTRUCT(&snd); + return ret; + } + + /* now wait for the send to complete */ + ORTE_PROGRESSED_WAIT(send_complete, 0, 1); + + OBJ_DESTRUCT(&snd); + return ORTE_SUCCESS; +} + +static int tcp_send_nb(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + struct iovec *msg, int count, + orte_rmcast_callback_fn_t cbfunc, + void *cbdata) +{ + int ret; + rmcast_base_send_t snd; + + /* queue it to be sent - preserves order! */ + OBJ_CONSTRUCT(&snd, rmcast_base_send_t); + snd.iovec_array = msg; + snd.iovec_count = count; + snd.tag = tag; + snd.cbfunc_iovec = cbfunc; + + if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel, tag))) { + ORTE_ERROR_LOG(ret); + OBJ_DESTRUCT(&snd); + return ret; + } + OBJ_DESTRUCT(&snd); + + return ORTE_SUCCESS; +} + +static int tcp_send_buffer(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + opal_buffer_t *buf) +{ + int ret; + rmcast_base_send_t snd; + + /* queue it to be sent - preserves order! */ + OBJ_CONSTRUCT(&snd, rmcast_base_send_t); + snd.buf = buf; + snd.tag = tag; + snd.cbfunc_buffer = internal_snd_buf_cb; + send_buf_complete = false; + + if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel, tag))) { + ORTE_ERROR_LOG(ret); + OBJ_DESTRUCT(&snd); + return ret; + } + + /* now wait for the send to complete */ + ORTE_PROGRESSED_WAIT(send_buf_complete, 0, 1); + OBJ_DESTRUCT(&snd); + + return ORTE_SUCCESS; +} + +static int tcp_send_buffer_nb(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + opal_buffer_t *buf, + orte_rmcast_callback_buffer_fn_t cbfunc, + void *cbdata) +{ + int ret; + rmcast_base_send_t snd; + + /* queue it to be sent - preserves order! */ + OBJ_CONSTRUCT(&snd, rmcast_base_send_t); + snd.buf = buf; + snd.tag = tag; + snd.cbfunc_buffer = cbfunc; + + if (ORTE_SUCCESS != (ret = queue_xmit(&snd, channel, tag))) { + ORTE_ERROR_LOG(ret); + OBJ_DESTRUCT(&snd); + return ret; + } + OBJ_DESTRUCT(&snd); + + return ORTE_SUCCESS; +} + +static int queue_recv(rmcast_base_recv_t *recvptr, + orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + orte_rmcast_callback_fn_t cbfunc_iovec, + orte_rmcast_callback_buffer_fn_t cbfunc_buffer, + bool blocking) +{ + opal_list_item_t *item; + rmcast_base_channel_t *ch, *chptr; + rmcast_base_recv_t *rptr; + + /* find the channel */ + ch = NULL; + for (item = opal_list_get_first(&channels); + item != opal_list_get_end(&channels); + item = opal_list_get_next(item)) { + chptr = (rmcast_base_channel_t*)item; + if (channel == chptr->channel) { + ch = chptr; + break; + } + } + if (NULL == ch) { + /* didn't find it */ + return ORTE_ERR_NOT_FOUND; + } + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp: queue_recv called on multicast channel %03d.%03d.%03d.%03d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network), tag)); + + if (!blocking) { + /* do we already have a recv for this channel/tag/cbfunc? */ + OPAL_THREAD_LOCK(&lock); + for (item = opal_list_get_first(&recvs); + item != opal_list_get_end(&recvs); + item = opal_list_get_next(item)) { + rptr = (rmcast_base_recv_t*)item; + if (channel == rptr->channel && + tag == rptr->tag && + ((NULL != cbfunc_iovec && cbfunc_iovec == rptr->cbfunc_iovec) || + (NULL != cbfunc_buffer && cbfunc_buffer == rptr->cbfunc_buffer))) { + /* matching recv in place */ + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp: matching recv already active on multicast channel %d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ch->channel, tag)); + OPAL_THREAD_UNLOCK(&lock); + return ORTE_EXISTS; + } + } + OPAL_THREAD_UNLOCK(&lock); + } + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp: adding non-blocking recv on multicast channel %d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ch->channel, tag)); + OPAL_THREAD_LOCK(&lock); + opal_list_append(&recvs, &recvptr->item); + OPAL_THREAD_UNLOCK(&lock); + + return ORTE_SUCCESS; +} + +static int tcp_recv(orte_process_name_t *name, + orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + struct iovec **msg, int *count, orte_rmcast_seq_t *seq_num) +{ + rmcast_base_recv_t *recvptr; + int ret; + + recvptr = OBJ_NEW(rmcast_base_recv_t); + recvptr->channel = channel; + recvptr->tag = tag; + + if (ORTE_SUCCESS != (ret = queue_recv(recvptr, channel, tag, NULL, NULL, true))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(recvptr); + return ret; + } + + ORTE_PROGRESSED_WAIT(recvptr->recvd, 0, 1); + + /* xfer the data */ + if (NULL != name) { + /* caller requested id of sender */ + name->jobid = recvptr->name.jobid; + name->vpid = recvptr->name.vpid; + } + *msg = recvptr->iovec_array; + *count = recvptr->iovec_count; + *seq_num = recvptr->seq_num; + + /* remove the recv */ + OPAL_THREAD_LOCK(&lock); + opal_list_remove_item(&recvs, &recvptr->item); + OPAL_THREAD_UNLOCK(&lock); + OBJ_RELEASE(recvptr); + + return ORTE_SUCCESS; +} + +static int tcp_recv_nb(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + orte_rmcast_flag_t flags, + orte_rmcast_callback_fn_t cbfunc, void *cbdata) +{ + rmcast_base_recv_t *recvptr; + int ret; + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp: recv_nb called on channel %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel)); + + recvptr = OBJ_NEW(rmcast_base_recv_t); + recvptr->channel = channel; + recvptr->tag = tag; + recvptr->flags = flags; + recvptr->cbfunc_iovec = cbfunc; + recvptr->cbdata = cbdata; + + if (ORTE_SUCCESS != (ret = queue_recv(recvptr, channel, tag, cbfunc, NULL, true))) { + if (ORTE_EXISTS == ret) { + /* this recv already exists - just release the copy */ + OBJ_RELEASE(recvptr); + return ORTE_SUCCESS; + } + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(recvptr); + return ret; + } + + return ORTE_SUCCESS; +} + +static int tcp_recv_buffer(orte_process_name_t *name, + orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + opal_buffer_t *buf, orte_rmcast_seq_t *seq_num) +{ + rmcast_base_recv_t *recvptr; + int ret; + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp: recv_buffer called on multicast channel %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel)); + + recvptr = OBJ_NEW(rmcast_base_recv_t); + recvptr->channel = channel; + recvptr->tag = tag; + + if (ORTE_SUCCESS != (ret = queue_recv(recvptr, channel, tag, NULL, NULL, true))) { + ORTE_ERROR_LOG(ret); + goto cleanup; + } + + ORTE_PROGRESSED_WAIT(recvptr->recvd, 0, 1); + + /* xfer the data */ + if (NULL != name) { + /* caller requested id of sender */ + name->jobid = recvptr->name.jobid; + name->vpid = recvptr->name.vpid; + } + *seq_num = recvptr->seq_num; + if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) { + ORTE_ERROR_LOG(ret); + } + /* release the data */ + OBJ_RELEASE(recvptr->buf); + +cleanup: + OPAL_THREAD_LOCK(&lock); + opal_list_remove_item(&recvs, &recvptr->item); + OPAL_THREAD_UNLOCK(&lock); + OBJ_RELEASE(recvptr); + + return ret; +} + +static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + orte_rmcast_flag_t flags, + orte_rmcast_callback_buffer_fn_t cbfunc, void *cbdata) +{ + rmcast_base_recv_t *recvptr; + int ret; + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp: recv_buffer_nb called on multicast channel %d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag)); + + recvptr = OBJ_NEW(rmcast_base_recv_t); + recvptr->channel = channel; + recvptr->tag = tag; + recvptr->flags = flags; + recvptr->cbfunc_buffer = cbfunc; + recvptr->cbdata = cbdata; + + if (ORTE_SUCCESS != (ret = queue_recv(recvptr, channel, tag, NULL, cbfunc, false))) { + if (ORTE_EXISTS == ret) { + /* this recv already exists - just release the copy */ + OBJ_RELEASE(recvptr); + return ORTE_SUCCESS; + } + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(recvptr); + return ret; + } + + return ORTE_SUCCESS; +} + +static void cancel_recv(orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag) +{ + opal_list_item_t *item, *next; + rmcast_base_recv_t *ptr; + + /* find all recv's for this channel and tag */ + item = opal_list_get_first(&recvs); + while (item != opal_list_get_end(&recvs)) { + next = opal_list_get_next(item); + + ptr = (rmcast_base_recv_t*)item; + if (channel == ptr->channel && + tag == ptr->tag) { + OPAL_THREAD_LOCK(&lock); + opal_list_remove_item(&recvs, &ptr->item); + OBJ_RELEASE(ptr); + OPAL_THREAD_UNLOCK(&lock); + } + item = next; + } +} + +/* for the tcp module, we will be using the RML to "fake" a + * multicast in combination with the grpcomm "xcast" interface. + * We cannot control the network and interface in this + * combination as it gets auto-picked well before us, so we + * ignore that info here + */ +static int open_channel(orte_rmcast_channel_t *channel, char *name, + char *network, int port, char *interface, uint8_t direction) +{ + opal_list_item_t *item; + rmcast_base_channel_t *nchan, *chan; + + /* see if this name has already been assigned a channel on the specified network */ + chan = NULL; + for (item = opal_list_get_first(&channels); + item != opal_list_get_end(&channels); + item = opal_list_get_next(item)) { + nchan = (rmcast_base_channel_t*)item; + + if (0 == strcasecmp(nchan->name, name)) { + /* check the channel, if one was given */ + if (ORTE_RMCAST_INVALID_CHANNEL != *channel && + nchan->channel != *channel) { + continue; + } + /* all setup - nothing to do */ + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp using existing channel", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + return ORTE_SUCCESS; + } + } + + /* we didn't find an existing match, so create a new channel */ + chan = OBJ_NEW(rmcast_base_channel_t); + chan->name = strdup(name); + /* if we were given a channel, then just use it */ + if (ORTE_RMCAST_INVALID_CHANNEL != *channel) { + chan->channel = *channel; + } else { + chan->channel = next_channel++; + *channel = chan->channel; + } + /* add to list of known channels */ + OPAL_THREAD_LOCK(&lock); + opal_list_append(&channels, &chan->item); + OPAL_THREAD_UNLOCK(&lock); + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp opening new channel for%s%s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (ORTE_RMCAST_RECV & direction) ? " RECV" : " ", + (ORTE_RMCAST_XMIT & direction) ? " XMIT" : " ")); + + return ORTE_SUCCESS; +} + +static int close_channel(orte_rmcast_channel_t channel) +{ + opal_list_item_t *item; + rmcast_base_channel_t *chan; + + OPAL_THREAD_LOCK(&lock); + for (item = opal_list_get_first(&channels); + item != opal_list_get_end(&channels); + item = opal_list_get_next(item)) { + chan = (rmcast_base_channel_t*)item; + + if (channel == chan->channel) { + opal_list_remove_item(&channels, item); + OBJ_RELEASE(chan); + OPAL_THREAD_UNLOCK(&lock); + return ORTE_SUCCESS; + } + } + + OPAL_THREAD_UNLOCK(&lock); + return ORTE_ERR_NOT_FOUND; +} + +static orte_rmcast_channel_t query(void) +{ + return orte_rmcast_base.my_group_number; +} + + +/**** LOCAL FUNCTIONS ****/ + +static void process_recv(int fd, short event, void *cbdata) +{ + orte_message_event_t *mev = (orte_message_event_t*)cbdata; + opal_buffer_t *buf = mev->buffer; + orte_rmcast_channel_t channel; + opal_list_item_t *item, *next; + rmcast_base_recv_t *ptr; + orte_process_name_t name; + orte_rmcast_tag_t tag; + int8_t flag; + struct iovec *iovec_array=NULL; + int32_t iovec_count=0, i, sz, n; + opal_buffer_t *recvd_buf=NULL; + int rc; + orte_rmcast_seq_t recvd_seq_num; + + /* extract the name of the original sender */ + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &name, &n, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* if this message is from myself, ignore it */ + if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) { + OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp:recv sent from myself: %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&name))); + goto cleanup; + } + + /* extract the "channel" */ + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &channel, &n, ORTE_RMCAST_CHANNEL_T))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* extract the tag */ + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &tag, &n, ORTE_RMCAST_TAG_T))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* extract the sequence number */ + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &recvd_seq_num, &n, ORTE_RMCAST_SEQ_T))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp:recv sender: %s tag: %d seq_num: %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&name), (int)tag, recvd_seq_num)); + + + /* unpack the iovec vs buf flag */ + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &flag, &n, OPAL_INT8))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* find all recv's for this channel and tag */ + item = opal_list_get_first(&recvs); + while (item != opal_list_get_end(&recvs)) { + next = opal_list_get_next(item); + ptr = (rmcast_base_recv_t*)item; + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp:recv checking channel %d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (int)ptr->channel, (int)ptr->tag)); + + if ((channel == ptr->channel || ORTE_RMCAST_WILDCARD_CHANNEL == ptr->channel) && + (tag == ptr->tag || ORTE_RMCAST_TAG_WILDCARD == ptr->tag)) { + + /* match found - see if data needs to be unpacked, or if + * we already have it so we only unpack it once + */ + if (0 == flag && NULL == iovec_array) { + /* iovecs included and we still need to unpack it - get + * the number of iovecs in the buffer + */ + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &iovec_count, &n, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* malloc the required space */ + iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec)); + /* unpack the iovecs */ + for (i=0; i < iovec_count; i++) { + /* unpack the number of bytes in this iovec */ + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &n, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* allocate the space */ + iovec_array[i].iov_base = (uint8_t*)malloc(sz); + iovec_array[i].iov_len = sz; + /* unpack the data */ + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, iovec_array[i].iov_base, &sz, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + } else if (1 == flag && NULL == recvd_buf) { + /* buffer was included */ + recvd_buf = OBJ_NEW(opal_buffer_t); + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(recvd_buf, buf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp:recv delivering message to channel %d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag)); + + if (0 == flag) { + /* dealing with iovecs */ + if (NULL != ptr->cbfunc_iovec) { + ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, tag, + &name, recvd_seq_num, + iovec_array, iovec_count, ptr->cbdata); + /* if it isn't persistent, remove it */ + if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) { + OPAL_THREAD_LOCK(&lock); + opal_list_remove_item(&recvs, &ptr->item); + OPAL_THREAD_UNLOCK(&lock); + OBJ_RELEASE(ptr); + } + } else { + /* copy over the iovec array since it will be released by + * the blocking recv + */ + ptr->iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec)); + ptr->iovec_count = iovec_count; + for (i=0; i < iovec_count; i++) { + ptr->iovec_array[i].iov_base = (uint8_t*)malloc(iovec_array[i].iov_len); + ptr->iovec_array[i].iov_len = iovec_array[i].iov_len; + memcpy(ptr->iovec_array[i].iov_base, iovec_array[i].iov_base, iovec_array[i].iov_len); + } + /* flag it as recvd to release blocking recv */ + ptr->recvd = true; + } + } else { + if (NULL != ptr->cbfunc_buffer) { + ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, tag, + &name, recvd_seq_num, + recvd_buf, ptr->cbdata); + /* if it isn't persistent, remove it */ + if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) { + OPAL_THREAD_LOCK(&lock); + opal_list_remove_item(&recvs, &ptr->item); + OPAL_THREAD_UNLOCK(&lock); + OBJ_RELEASE(ptr); + } + } else { + /* copy the buffer across since it will be released + * by the blocking recv + */ + ptr->buf = OBJ_NEW(opal_buffer_t); + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(ptr->buf, recvd_buf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* flag it as recvd to release blocking recv */ + ptr->recvd = true; + } + } + } + /* move along list */ + item = next; + } + +cleanup: + OBJ_RELEASE(mev); + if (NULL != iovec_array) { + for (i=0; i < iovec_count; i++) { + free(iovec_array[i].iov_base); + } + free(iovec_array); + } + if (NULL != recvd_buf) { + OBJ_RELEASE(recvd_buf); + } + return; +} + +static void recv_handler(int status, orte_process_name_t* sender, + opal_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata) +{ + int rc; + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:tcp recvd multicast msg", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* clear the way for the next message */ + ORTE_MESSAGE_EVENT(sender, buffer, tag, process_recv); + + /* reissue the recv */ + if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, + ORTE_RML_TAG_MULTICAST, + ORTE_RML_NON_PERSISTENT, + recv_handler, + NULL))) { + ORTE_ERROR_LOG(rc); + } + return; +} diff --git a/orte/mca/rmcast/tcp/rmcast_tcp.h b/orte/mca/rmcast/tcp/rmcast_tcp.h new file mode 100644 index 0000000000..cbd476c215 --- /dev/null +++ b/orte/mca/rmcast/tcp/rmcast_tcp.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + * + */ + +#ifndef ORTE_RMCAST_TCP_H +#define ORTE_RMCAST_TCP_H + +#include "orte_config.h" + +#include "opal/mca/mca.h" +#include "opal/mca/base/base.h" + +#include "orte/mca/rmcast/rmcast.h" + + +BEGIN_C_DECLS + +ORTE_MODULE_DECLSPEC extern orte_rmcast_base_component_t mca_rmcast_tcp_component; +extern orte_rmcast_module_t orte_rmcast_tcp_module; + +END_C_DECLS + +#endif diff --git a/orte/mca/rmcast/tcp/rmcast_tcp_component.c b/orte/mca/rmcast/tcp/rmcast_tcp_component.c new file mode 100644 index 0000000000..21f4e11689 --- /dev/null +++ b/orte/mca/rmcast/tcp/rmcast_tcp_component.c @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" + +#include "opal/mca/base/base.h" + +#include "orte/util/proc_info.h" +#include "orte/runtime/orte_globals.h" + +#include "rmcast_tcp.h" + +/* + * Local functions + */ +static int orte_rmcast_tcp_open(void); +static int orte_rmcast_tcp_close(void); +static int orte_rmcast_tcp_query(mca_base_module_t **module, int *priority); + +/* + * Local variables + */ +static bool initialized = false; + +/* + * Public string showing the iof hnp component version number + */ +const char *mca_rmcast_tcp_component_version_string = + "Open MPI tcp rmcast MCA component version " ORTE_VERSION; + +orte_rmcast_base_component_t mca_rmcast_tcp_component = { + { + ORTE_RMCAST_BASE_VERSION_1_0_0, + + "tcp", /* MCA component name */ + ORTE_MAJOR_VERSION, /* MCA component major version */ + ORTE_MINOR_VERSION, /* MCA component minor version */ + ORTE_RELEASE_VERSION, /* MCA component release version */ + + /* Component open, close, and query functions */ + orte_rmcast_tcp_open, + orte_rmcast_tcp_close, + orte_rmcast_tcp_query + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + } +}; + +/** + * component open/close/init function + */ +static int orte_rmcast_tcp_open(void) +{ + return ORTE_SUCCESS; +} + + +static int orte_rmcast_tcp_close(void) +{ + return ORTE_SUCCESS; +} + +/** + * Module query + */ + +static int orte_rmcast_tcp_query(mca_base_module_t **module, int *priority) +{ + if (!ORTE_PROC_IS_HNP && NULL == orte_process_info.my_hnp_uri) { + /* cannot operate */ + *priority = 0; + *module = NULL; + return ORTE_ERROR; + } + + /* selected by choice */ + *priority = 0; + *module = (mca_base_module_t *) &orte_rmcast_tcp_module; + initialized = true; + + return ORTE_SUCCESS; +} diff --git a/orte/mca/rml/rml_types.h b/orte/mca/rml/rml_types.h index 8e8024e84f..4d55cce31a 100644 --- a/orte/mca/rml/rml_types.h +++ b/orte/mca/rml/rml_types.h @@ -171,6 +171,8 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_msg_packet_t); /* bootstrap */ #define ORTE_RML_TAG_BOOTSTRAP 35 +/* TCP "fake" multicast */ +#define ORTE_RML_TAG_MULTICAST 36 #define ORTE_RML_TAG_MAX 100