diff --git a/orte/mca/db/daemon/db_daemon.c b/orte/mca/db/daemon/db_daemon.c index 302da3b67a..60cc001d91 100644 --- a/orte/mca/db/daemon/db_daemon.c +++ b/orte/mca/db/daemon/db_daemon.c @@ -131,11 +131,11 @@ static int init(void) OBJ_CONSTRUCT(&datastore, opal_pointer_array_t); opal_pointer_array_init(&datastore, 16, INT_MAX, 16); } else if (ORTE_PROC_IS_APP) { - /* get my multicast group */ - my_group_channel = orte_rmcast.query_channel(); + /* get my multicast output group */ + orte_rmcast.query_channel(&my_group_channel, NULL); /* recv responses */ - if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_GROUP_CHANNEL, + if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(my_group_channel, ORTE_RMCAST_TAG_CMD_ACK, ORTE_RMCAST_PERSISTENT, recv_ack, NULL))) { @@ -160,8 +160,7 @@ static int finalize(void) } OBJ_DESTRUCT(&datastore); } else if (ORTE_PROC_IS_APP) { - orte_rmcast.cancel_recv(ORTE_RMCAST_GROUP_CHANNEL, ORTE_RMCAST_TAG_DATA); - orte_rmcast.cancel_recv(ORTE_RMCAST_GROUP_CHANNEL, ORTE_RMCAST_TAG_CMD_ACK); + orte_rmcast.cancel_recv(my_group_channel, ORTE_RMCAST_TAG_WILDCARD); } return ORTE_SUCCESS; diff --git a/orte/mca/rmcast/base/base.h b/orte/mca/rmcast/base/base.h index 3d3dd7d50d..3fac9a9c63 100644 --- a/orte/mca/rmcast/base/base.h +++ b/orte/mca/rmcast/base/base.h @@ -53,7 +53,8 @@ typedef struct { bool active; opal_list_t recvs; opal_list_t channels; - rmcast_base_channel_t *my_group_channel; + rmcast_base_channel_t *my_output_channel; + rmcast_base_channel_t *my_input_channel; } orte_rmcast_base_t; ORTE_DECLSPEC extern orte_rmcast_base_t orte_rmcast_base; diff --git a/orte/mca/rmcast/base/private.h b/orte/mca/rmcast/base/private.h index 88ce000c45..283d8ca789 100644 --- a/orte/mca/rmcast/base/private.h +++ b/orte/mca/rmcast/base/private.h @@ -186,7 +186,8 @@ ORTE_DECLSPEC void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel, ORTE_DECLSPEC int orte_rmcast_base_close_channel(orte_rmcast_channel_t channel); -ORTE_DECLSPEC orte_rmcast_channel_t orte_rmcast_base_query(void); +ORTE_DECLSPEC int orte_rmcast_base_query(orte_rmcast_channel_t *output, + orte_rmcast_channel_t *input); #endif /* ORTE_DISABLE_FULL_SUPPORT */ diff --git a/orte/mca/rmcast/base/rmcast_base_fns.c b/orte/mca/rmcast/base/rmcast_base_fns.c index 65620efe80..d0bb6a31b5 100644 --- a/orte/mca/rmcast/base/rmcast_base_fns.c +++ b/orte/mca/rmcast/base/rmcast_base_fns.c @@ -60,7 +60,6 @@ int orte_rmcast_base_build_msg(rmcast_base_channel_t *ch, ORTE_ERROR_LOG(rc); return rc; } - /* are we sending a buffer? */ if (NULL == snd->buf) { /* no, flag the buffer as containing iovecs */ @@ -69,7 +68,6 @@ int orte_rmcast_base_build_msg(rmcast_base_channel_t *ch, ORTE_ERROR_LOG(rc); goto cleanup; } - /* pack the number of iovecs */ if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &snd->iovec_count, 1, OPAL_INT32))) { ORTE_ERROR_LOG(rc); @@ -116,6 +114,7 @@ cleanup: if (NULL != buf) { OBJ_RELEASE(buf); } + *buffer = NULL; return rc; } @@ -131,7 +130,7 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr, rmcast_base_recv_t *rptr; OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:tcp: queue_recv called on multicast channel %d tag %d", + "%s rmcast:base: queue_recv called on multicast channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag)); if (!blocking) { @@ -153,7 +152,7 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr, if (NULL != rptr->cbfunc_iovec) { /* already have one in place */ OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:tcp: matching recv already active on multicast channel %d tag %d", + "%s rmcast:base: matching recv already active on multicast channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag)); OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock); return ORTE_EXISTS; @@ -164,7 +163,7 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr, if (NULL != rptr->cbfunc_buffer) { /* matching type - recv already in place */ OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:tcp: matching recv already active on multicast channel %d tag %d", + "%s rmcast:base: matching recv already active on multicast channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag)); OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock); return ORTE_EXISTS; @@ -182,7 +181,7 @@ int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr, /* if we get here, then we need to add a new recv */ OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:tcp: adding recv on multicast channel %d tag %d", + "%s rmcast:base: adding recv on multicast channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag)); OPAL_THREAD_LOCK(&orte_rmcast_base.lock); rptr = OBJ_NEW(rmcast_base_recv_t); @@ -252,7 +251,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) goto cleanup; } - OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, "%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&name), channel, (int)tag, @@ -266,7 +265,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) ptr = (rmcast_base_recv_t*)item; OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:tcp:recv checking channel %d tag %d", + "%s rmcast:base:recv checking channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)ptr->channel, (int)ptr->tag)); @@ -279,7 +278,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) } OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:tcp:recv delivering message to channel %d tag %d", + "%s rmcast:base:recv delivering message to channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag)); /* we have a recv - unpack the data */ @@ -314,7 +313,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) } if (NULL != ptr->cbfunc_iovec) { OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:tcp:recv delivering iovecs to channel %d tag %d", + "%s rmcast:base:recv delivering iovecs to channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag)); ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, tag, @@ -330,7 +329,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) /* if something is already present, then we have a problem */ if (NULL != ptr->iovec_array) { OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:tcp:recv blocking recv already fulfilled", + "%s rmcast:base:recv blocking recv already fulfilled", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); goto cleanup; } @@ -356,7 +355,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) } if (NULL != ptr->cbfunc_buffer) { OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:tcp:recv delivering buffer to channel %d tag %d", + "%s rmcast:base:recv delivering buffer to channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag)); ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, tag, @@ -372,12 +371,12 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) /* if something is already present, then we have a problem */ if (NULL != ptr->buf) { OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:tcp:recv blocking recv already fulfilled", + "%s rmcast:base:recv blocking recv already fulfilled", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); goto cleanup; } OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:tcp:recv copying buffer for blocking recv", + "%s rmcast:base:recv copying buffer for blocking recv", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); /* copy the buffer across since it will be released @@ -416,8 +415,10 @@ void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel, rmcast_base_recv_t *ptr; orte_rmcast_channel_t ch; - if (ORTE_RMCAST_GROUP_CHANNEL == channel) { + if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) { ch = orte_rmcast_base.my_group_number; + } else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + ch = orte_rmcast_base.my_group_number + 1; } else { ch = channel; } @@ -462,9 +463,23 @@ int orte_rmcast_base_close_channel(orte_rmcast_channel_t channel) return ORTE_ERR_NOT_FOUND; } -orte_rmcast_channel_t orte_rmcast_base_query(void) +int orte_rmcast_base_query(orte_rmcast_channel_t *output, orte_rmcast_channel_t *input) { - return orte_rmcast_base.my_group_channel->channel; + if (NULL != output) { + if (NULL == orte_rmcast_base.my_output_channel) { + *output = ORTE_RMCAST_INVALID_CHANNEL; + } else { + *output = orte_rmcast_base.my_output_channel->channel; + } + } + if (NULL != input) { + if (NULL == orte_rmcast_base.my_input_channel) { + *input = ORTE_RMCAST_INVALID_CHANNEL; + } else { + *input = orte_rmcast_base.my_input_channel->channel; + } + } + return ORTE_SUCCESS; } static int extract_hdr(opal_buffer_t *buf, diff --git a/orte/mca/rmcast/base/rmcast_base_open.c b/orte/mca/rmcast/base/rmcast_base_open.c index 2da6a8e1b3..073c9ae85c 100644 --- a/orte/mca/rmcast/base/rmcast_base_open.c +++ b/orte/mca/rmcast/base/rmcast_base_open.c @@ -117,7 +117,9 @@ int orte_rmcast_base_open(void) for (i=0; i < 255; i++) { orte_rmcast_base.ports[i] = 0; } - + orte_rmcast_base.my_output_channel = NULL; + orte_rmcast_base.my_input_channel = NULL; + /* public multicast channel for this job */ mca_base_param_reg_string_name("rmcast", "base_multicast_network", "Network to use for multicast xmissions [link (default) | site | org | global | tuple-addr]", diff --git a/orte/mca/rmcast/rmcast.h b/orte/mca/rmcast/rmcast.h index be246e6bf4..91aba3cce3 100644 --- a/orte/mca/rmcast/rmcast.h +++ b/orte/mca/rmcast/rmcast.h @@ -129,8 +129,9 @@ typedef int (*orte_rmcast_base_module_open_channel_fn_t)(orte_rmcast_channel_t c /* close the channel */ typedef int (*orte_rmcast_base_module_close_channel_fn_t)(orte_rmcast_channel_t channel); -/* return my group's channel */ -typedef orte_rmcast_channel_t (*orte_rmcast_base_module_query_channel_fn_t)(void); +/* return my group's channels */ +typedef int (*orte_rmcast_base_module_query_channel_fn_t)(orte_rmcast_channel_t *output, + orte_rmcast_channel_t *input); /* * rmcast component diff --git a/orte/mca/rmcast/rmcast_types.h b/orte/mca/rmcast/rmcast_types.h index d94510ba53..8c6bbdc6ff 100644 --- a/orte/mca/rmcast/rmcast_types.h +++ b/orte/mca/rmcast/rmcast_types.h @@ -26,7 +26,8 @@ typedef int32_t orte_rmcast_channel_t; #define ORTE_RMCAST_CHANNEL_T OPAL_INT32 /* ORTE IP multicast channels */ -#define ORTE_RMCAST_GROUP_CHANNEL -2 +#define ORTE_RMCAST_GROUP_INPUT_CHANNEL -3 +#define ORTE_RMCAST_GROUP_OUTPUT_CHANNEL -2 #define ORTE_RMCAST_WILDCARD_CHANNEL -1 #define ORTE_RMCAST_INVALID_CHANNEL 0 #define ORTE_RMCAST_SYS_CHANNEL 1 diff --git a/orte/mca/rmcast/tcp/rmcast_tcp.c b/orte/mca/rmcast/tcp/rmcast_tcp.c index 303a1a2229..e6098dc253 100644 --- a/orte/mca/rmcast/tcp/rmcast_tcp.c +++ b/orte/mca/rmcast/tcp/rmcast_tcp.c @@ -164,7 +164,8 @@ static int init(void) ORTE_ERROR_LOG(rc); return rc; } - orte_rmcast_base.my_group_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); + orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); + orte_rmcast_base.my_input_channel = NULL; } else if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) { /* daemons and hnp open the sys and data server channels */ if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_SYS_CHANNEL, "system", @@ -172,7 +173,8 @@ static int init(void) ORTE_ERROR_LOG(rc); return rc; } - orte_rmcast_base.my_group_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); + orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); + orte_rmcast_base.my_input_channel = NULL; if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_DATA_SERVER_CHANNEL, "data-server", NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { ORTE_ERROR_LOG(rc); @@ -199,15 +201,22 @@ static int init(void) ORTE_ERROR_LOG(rc); return rc; } - /* finally, if we are an app, setup our grp channel, if one was given */ + /* finally, if we are an app, setup our grp xmit/recv channels, if given */ if (ORTE_PROC_IS_APP && NULL != orte_rmcast_base.my_group_name) { if (ORTE_SUCCESS != (rc = open_channel(orte_rmcast_base.my_group_number, orte_rmcast_base.my_group_name, - NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { + NULL, -1, NULL, ORTE_RMCAST_RECV))) { ORTE_ERROR_LOG(rc); return rc; } - orte_rmcast_base.my_group_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); + orte_rmcast_base.my_input_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); + if (ORTE_SUCCESS != (rc = open_channel(orte_rmcast_base.my_group_number+1, + orte_rmcast_base.my_group_name, + NULL, -1, NULL, ORTE_RMCAST_XMIT))) { + ORTE_ERROR_LOG(rc); + return rc; + } + orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); } } else { opal_output(0, "rmcast:tcp:init - unknown process type"); @@ -292,12 +301,12 @@ static int queue_xmit(rmcast_base_send_t *snd, /* if we were asked to send this on our group output * channel, substitute it */ - if (ORTE_RMCAST_GROUP_CHANNEL == channel) { - if (NULL == orte_rmcast_base.my_group_channel) { + if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + if (NULL == orte_rmcast_base.my_output_channel) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); return ORTE_ERR_NOT_FOUND; } - ch = orte_rmcast_base.my_group_channel; + ch = orte_rmcast_base.my_output_channel; goto process; } @@ -518,8 +527,10 @@ static int tcp_recv(orte_process_name_t *name, int ret; orte_rmcast_channel_t chan; - if (ORTE_RMCAST_GROUP_CHANNEL == channel) { - chan = orte_rmcast_base.my_group_number; + if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_input_channel->channel; + } else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_output_channel->channel; } else { chan = channel; } @@ -563,8 +574,10 @@ static int tcp_recv_nb(orte_rmcast_channel_t channel, "%s rmcast:tcp: recv_nb called on channel %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel)); - if (ORTE_RMCAST_GROUP_CHANNEL == channel) { - chan = orte_rmcast_base.my_group_number; + if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_input_channel->channel; + } else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_output_channel->channel; } else { chan = channel; } @@ -594,8 +607,10 @@ static int tcp_recv_buffer(orte_process_name_t *name, "%s rmcast:tcp: recv_buffer called on multicast channel %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel)); - if (ORTE_RMCAST_GROUP_CHANNEL == channel) { - chan = orte_rmcast_base.my_group_number; + if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_input_channel->channel; + } else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_output_channel->channel; } else { chan = channel; } @@ -641,8 +656,10 @@ static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel, "%s rmcast:tcp: recv_buffer_nb called on multicast channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag)); - if (ORTE_RMCAST_GROUP_CHANNEL == channel) { - chan = orte_rmcast_base.my_group_number; + if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_input_channel->channel; + } else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_output_channel->channel; } else { chan = channel; } diff --git a/orte/mca/rmcast/udp/rmcast_udp.c b/orte/mca/rmcast/udp/rmcast_udp.c index c6765a80d8..4e9047a6c0 100644 --- a/orte/mca/rmcast/udp/rmcast_udp.c +++ b/orte/mca/rmcast/udp/rmcast_udp.c @@ -48,7 +48,7 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction); static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket); -static void xmit_data(int sd, short flags, void* send_req); +static int xmit_data(rmcast_base_channel_t *chan, rmcast_base_send_t *snd); /* API FUNCTIONS */ static int init(void); @@ -161,7 +161,8 @@ static int init(void) ORTE_ERROR_LOG(rc); return rc; } - orte_rmcast_base.my_group_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); + orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); + orte_rmcast_base.my_input_channel = NULL; } else if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) { /* daemons and hnp open the sys and data server channels */ if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_SYS_CHANNEL, "system", @@ -169,12 +170,13 @@ static int init(void) ORTE_ERROR_LOG(rc); return rc; } + orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); + orte_rmcast_base.my_input_channel = NULL; if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_DATA_SERVER_CHANNEL, "data-server", NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { ORTE_ERROR_LOG(rc); return rc; } - orte_rmcast_base.my_group_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); } else if (ORTE_PROC_IS_APP) { /* apps open the app public and data server channels */ if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_APP_PUBLIC_CHANNEL, "app-announce", @@ -187,15 +189,22 @@ static int init(void) ORTE_ERROR_LOG(rc); return rc; } - /* also setup our grp channel, if one was given */ - if (NULL != orte_rmcast_base.my_group_name) { + /* finally, if we are an app, setup our grp xmit/recv channels, if given */ + if (ORTE_PROC_IS_APP && NULL != orte_rmcast_base.my_group_name) { if (ORTE_SUCCESS != (rc = open_channel(orte_rmcast_base.my_group_number, orte_rmcast_base.my_group_name, - NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { + NULL, -1, NULL, ORTE_RMCAST_RECV))) { ORTE_ERROR_LOG(rc); return rc; } - orte_rmcast_base.my_group_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); + orte_rmcast_base.my_input_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); + if (ORTE_SUCCESS != (rc = open_channel(orte_rmcast_base.my_group_number+1, + orte_rmcast_base.my_group_name, + NULL, -1, NULL, ORTE_RMCAST_XMIT))) { + ORTE_ERROR_LOG(rc); + return rc; + } + orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); } } else { opal_output(0, "rmcast:udp:init - unknown process type"); @@ -253,12 +262,12 @@ static int queue_xmit(rmcast_base_send_t *snd, /* if we were asked to send this on our group output * channel, substitute it */ - if (ORTE_RMCAST_GROUP_CHANNEL == channel) { - if (NULL == orte_rmcast_base.my_group_channel) { + if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + if (NULL == orte_rmcast_base.my_output_channel) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); return ORTE_ERR_NOT_FOUND; } - ch = orte_rmcast_base.my_group_channel; + ch = orte_rmcast_base.my_output_channel; goto process; } @@ -288,18 +297,7 @@ process: (NULL == snd->iovec_array) ? "bytes" : "iovecs", OPAL_IF_FORMAT_ADDR(ch->network))); - /* add it to this channel's pending sends */ - OPAL_THREAD_LOCK(&ch->send_lock); - opal_list_append(&ch->pending_sends, &snd->item); - - /* do we need to start the send event? */ - if (!ch->sends_in_progress) { - opal_event_add(&ch->send_ev, 0); - ch->sends_in_progress = true; - } - OPAL_THREAD_UNLOCK(&ch->send_lock); - - return ORTE_SUCCESS; + return xmit_data(ch, snd); } static int udp_send(orte_rmcast_channel_t channel, @@ -413,8 +411,10 @@ static int udp_recv(orte_process_name_t *name, int ret; orte_rmcast_channel_t chan; - if (ORTE_RMCAST_GROUP_CHANNEL == channel) { - chan = orte_rmcast_base.my_group_number; + if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_input_channel->channel; + } else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_output_channel->channel; } else { chan = channel; } @@ -458,8 +458,10 @@ static int udp_recv_nb(orte_rmcast_channel_t channel, "%s rmcast:udp: recv_nb called on channel %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel)); - if (ORTE_RMCAST_GROUP_CHANNEL == channel) { - chan = orte_rmcast_base.my_group_number; + if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_input_channel->channel; + } else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_output_channel->channel; } else { chan = channel; } @@ -488,8 +490,10 @@ static int udp_recv_buffer(orte_process_name_t *name, "%s rmcast:udp: recv_buffer called on multicast channel %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel)); - if (ORTE_RMCAST_GROUP_CHANNEL == channel) { - chan = orte_rmcast_base.my_group_number; + if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_input_channel->channel; + } else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_output_channel->channel; } else { chan = channel; } @@ -535,8 +539,10 @@ static int udp_recv_buffer_nb(orte_rmcast_channel_t channel, "%s rmcast:udp: recv_buffer_nb called on multicast channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag)); - if (ORTE_RMCAST_GROUP_CHANNEL == channel) { - chan = orte_rmcast_base.my_group_number; + if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_input_channel->channel; + } else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + chan = orte_rmcast_base.my_output_channel->channel; } else { chan = channel; } @@ -740,8 +746,6 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction) } chan->xmit = xmitsd; chan->send_data = (uint8_t*)malloc(orte_rmcast_udp_sndbuf_size); - /* setup the event to xmit messages, but don't activate it */ - opal_event_set(&chan->send_ev, chan->xmit, OPAL_EV_WRITE, xmit_data, chan); } if (0 > chan->recv && ORTE_RMCAST_RECV & direction) { @@ -903,31 +907,24 @@ static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket) return ORTE_SUCCESS; } -static void xmit_data(int sd, short flags, void* send_req) +static int xmit_data(rmcast_base_channel_t *chan, rmcast_base_send_t *snd) { - rmcast_base_channel_t *chan = (rmcast_base_channel_t*)send_req; - rmcast_base_send_t *snd; - opal_list_item_t *item; char *bytes; int32_t sz; int rc; opal_buffer_t *buf; - rmcast_send_log_t *log, *lg; - OPAL_THREAD_LOCK(&chan->send_lock); OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s transmitting data for channel %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), chan->channel)); - - while (NULL != (item = opal_list_remove_first(&chan->pending_sends))) { - snd = (rmcast_base_send_t*)item; - + /* setup the message for xmission */ if (ORTE_SUCCESS != (rc = orte_rmcast_base_build_msg(chan, &buf, snd))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } +#if 0 /* store the working buf in the send ring buffer in case we * need to retransmit it later */ @@ -943,7 +940,8 @@ static void xmit_data(int sd, short flags, void* send_req) lg->seq_num, lg->channel)); OBJ_RELEASE(lg); } - +#endif + /* unload the working buf to obtain the payload */ if (ORTE_SUCCESS != (rc = opal_dss.unload(buf, (void**)&bytes, &sz))) { ORTE_ERROR_LOG(rc); @@ -965,6 +963,8 @@ static void xmit_data(int sd, short flags, void* send_req) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(chan->network), strerror(errno), errno); rc = errno; + } else { + rc = ORTE_SUCCESS; } if (NULL != snd->buf) { @@ -983,14 +983,6 @@ static void xmit_data(int sd, short flags, void* send_req) /* roll to next message sequence number */ ORTE_MULTICAST_NEXT_SEQUENCE_NUM(chan->seq_num); -CLEANUP: - /* cleanup */ - OBJ_RELEASE(item); - } - - /* cleanup */ - opal_event_del(&chan->send_ev); - chan->sends_in_progress = false; - - OPAL_THREAD_UNLOCK(&chan->send_lock); +CLEANUP: + return rc; } diff --git a/orte/test/system/orte_mcast.c b/orte/test/system/orte_mcast.c index 7c8ad4bdce..42f6838579 100644 --- a/orte/test/system/orte_mcast.c +++ b/orte/test/system/orte_mcast.c @@ -41,20 +41,22 @@ static int datasize=1024; static void send_data(int fd, short flags, void *arg) { opal_buffer_t buf, *bfptr; - int32_t i32; + int32_t *i32; struct iovec iovec_array[3]; int rc, i; opal_event_t *tmp = (opal_event_t*)arg; struct timeval now; bfptr = OBJ_NEW(opal_buffer_t); - i32 = -1; - opal_dss.pack(bfptr, &i32, datasize, OPAL_INT32); - if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(ORTE_RMCAST_GROUP_CHANNEL, + i32 = (int32_t*)malloc(datasize*sizeof(int32_t)); + for (i=0; i < datasize; i++) { + i32[i] = -1; + } + opal_dss.pack(bfptr, i32, datasize, OPAL_INT32); + if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(ORTE_RMCAST_GROUP_OUTPUT_CHANNEL, ORTE_RMCAST_TAG_OUTPUT, bfptr, cbfunc_buf_snt, NULL))) { ORTE_ERROR_LOG(rc); - OBJ_RELEASE(bfptr); return; } /* create an iovec array */ @@ -63,7 +65,7 @@ static void send_data(int fd, short flags, void *arg) iovec_array[i].iov_len = datasize; } /* send it out */ - if (ORTE_SUCCESS != (rc = orte_rmcast.send(ORTE_RMCAST_GROUP_CHANNEL, + if (ORTE_SUCCESS != (rc = orte_rmcast.send(ORTE_RMCAST_GROUP_OUTPUT_CHANNEL, ORTE_RMCAST_TAG_OUTPUT, iovec_array, 3))) { ORTE_ERROR_LOG(rc); @@ -108,13 +110,13 @@ int main(int argc, char* argv[]) ORTE_TIMER_EVENT(5, 0, send_data); } else { /* setup to recv data on our channel */ - if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_GROUP_CHANNEL, + if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_GROUP_OUTPUT_CHANNEL, ORTE_RMCAST_TAG_OUTPUT, ORTE_RMCAST_PERSISTENT, cbfunc, NULL))) { ORTE_ERROR_LOG(rc); } - if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(ORTE_RMCAST_GROUP_CHANNEL, + if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(ORTE_RMCAST_GROUP_OUTPUT_CHANNEL, ORTE_RMCAST_TAG_OUTPUT, ORTE_RMCAST_PERSISTENT, cbfunc_iovec, NULL))) {