1
1

Some minor cleanup in the rmcast framework, ensure that a default multicast group is always defined for each app

This commit was SVN r23079.
Этот коммит содержится в:
Ralph Castain 2010-05-03 04:07:14 +00:00
родитель f994a7edf4
Коммит bcff0d6301
5 изменённых файлов: 73 добавлений и 23 удалений

Просмотреть файл

@ -28,9 +28,9 @@
#include "opal/class/opal_ring_buffer.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
#include "orte/util/parse_options.h"
#include "orte/util/show_help.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmcast/base/private.h"
@ -155,8 +155,13 @@ int orte_rmcast_base_open(void)
return ORTE_ERR_SILENT;
}
orte_rmcast_base.my_group_number = value;
} else {
/* since nothing was given, use our local jobid */
orte_rmcast_base.my_group_name = strdup(ORTE_LOCAL_JOBID_PRINT(ORTE_PROC_MY_NAME->jobid));
orte_rmcast_base.my_group_number = ORTE_RMCAST_DYNAMIC_CHANNELS + ORTE_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid);
}
/* multicast interfaces */
mca_base_param_reg_string_name("rmcast", "base_if_include",
"Comma-separated list of interfaces (given in IP form) to use for multicast messages",

Просмотреть файл

@ -26,13 +26,14 @@ typedef int32_t orte_rmcast_channel_t;
#define ORTE_RMCAST_CHANNEL_T OPAL_INT32
/* ORTE IP multicast channels */
#define ORTE_RMCAST_GROUP_OUTPUT_CHANNEL -2
#define ORTE_RMCAST_GROUP_CHANNEL -2
#define ORTE_RMCAST_WILDCARD_CHANNEL -1
#define ORTE_RMCAST_INVALID_CHANNEL 0
#define ORTE_RMCAST_SYS_CHANNEL 1
#define ORTE_RMCAST_APP_PUBLIC_CHANNEL 2
#define ORTE_RMCAST_DATA_SERVER_CHANNEL 3
#define ORTE_RMCAST_DYNAMIC_CHANNELS 3
#define ORTE_RMCAST_DYNAMIC_CHANNELS 4
/* define channel directions */
@ -54,8 +55,8 @@ typedef int32_t orte_rmcast_tag_t;
#define ORTE_RMCAST_TAG_MSG 6
#define ORTE_RMCAST_TAG_TOOL 7
#define ORTE_RMCAST_TAG_IOF 8
#define ORTE_RMCAST_TAG_STATE 9
#define ORTE_RMCAST_TAG_STATE_ACK 10
#define ORTE_RMCAST_TAG_DATA 9
#define ORTE_RMCAST_TAG_CMD_ACK 10
/* starting value for dynamically assignable tags */
#define ORTE_RMCAST_TAG_DYNAMIC 100

Просмотреть файл

@ -388,7 +388,7 @@ static int queue_xmit(rmcast_base_send_t *snd,
/* if we were asked to send this on our group output
* channel, substitute it
*/
if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
if (ORTE_RMCAST_GROUP_CHANNEL == channel) {
if (NULL == my_group_channel) {
return ORTE_ERR_NOT_FOUND;
}

Просмотреть файл

@ -292,7 +292,7 @@ static int queue_xmit(rmcast_base_send_t *snd,
/* if we were asked to send this on our group output
* channel, substitute it
*/
if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
if (ORTE_RMCAST_GROUP_CHANNEL == channel) {
if (NULL == my_group_channel) {
return ORTE_ERR_NOT_FOUND;
}

Просмотреть файл

@ -174,21 +174,43 @@ static int init(void)
opal_pointer_array_init(&msg_log, 8, INT_MAX, 8);
/* setup the respective public address channel */
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_TOOL) {
if (ORTE_PROC_IS_TOOL) {
/* tools only open the sys channel */
channel = ORTE_RMCAST_SYS_CHANNEL;
if (ORTE_SUCCESS != (rc = open_channel(&channel, "system",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) {
/* daemons and hnp open the sys and data server channels */
channel = ORTE_RMCAST_SYS_CHANNEL;
if (ORTE_SUCCESS != (rc = open_channel(&channel, "system",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
channel = ORTE_RMCAST_DATA_SERVER_CHANNEL;
if (ORTE_SUCCESS != (rc = open_channel(&channel, "data-server",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else if (ORTE_PROC_IS_APP) {
/* apps open the app public and data server channels */
channel = ORTE_RMCAST_APP_PUBLIC_CHANNEL;
if (ORTE_SUCCESS != (rc = open_channel(&channel, "app-announce",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* setup our grp channel, if one was given */
channel = ORTE_RMCAST_DATA_SERVER_CHANNEL;
if (ORTE_SUCCESS != (rc = open_channel(&channel, "data-server",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* also setup our grp channel, if one was given */
if (NULL != orte_rmcast_base.my_group_name) {
channel = orte_rmcast_base.my_group_number;
if (ORTE_SUCCESS != (rc = open_channel(&channel, orte_rmcast_base.my_group_name,
@ -274,7 +296,7 @@ static int queue_xmit(rmcast_base_send_t *snd,
/* if we were asked to send this on our group output
* channel, substitute it
*/
if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
if (ORTE_RMCAST_GROUP_CHANNEL == channel) {
if (NULL == my_group_channel) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
@ -497,19 +519,23 @@ static int queue_recv(rmcast_base_recv_t *recvptr,
}
static int udp_recv(orte_process_name_t *name,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
struct iovec **msg, int *count)
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
struct iovec **msg, int *count)
{
rmcast_base_recv_t *recvptr;
int ret;
recvptr = OBJ_NEW(rmcast_base_recv_t);
recvptr->iovecs_requested = true;
recvptr->channel = channel;
if (ORTE_RMCAST_GROUP_CHANNEL == channel) {
recvptr->channel = orte_rmcast_base.my_group_number;
} else {
recvptr->channel = channel;
}
recvptr->tag = tag;
if (ORTE_SUCCESS != (ret = queue_recv(recvptr, channel, tag, NULL, NULL, true))) {
if (ORTE_SUCCESS != (ret = queue_recv(recvptr, recvptr->channel, tag, NULL, NULL, true))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(recvptr);
return ret;
@ -548,13 +574,17 @@ static int udp_recv_nb(orte_rmcast_channel_t channel,
recvptr = OBJ_NEW(rmcast_base_recv_t);
recvptr->iovecs_requested = true;
recvptr->channel = channel;
if (ORTE_RMCAST_GROUP_CHANNEL == channel) {
recvptr->channel = orte_rmcast_base.my_group_number;
} else {
recvptr->channel = channel;
}
recvptr->tag = tag;
recvptr->flags = flags;
recvptr->cbfunc_iovec = cbfunc;
recvptr->cbdata = cbdata;
if (ORTE_SUCCESS != (ret = queue_recv(recvptr, channel, tag, cbfunc, NULL, false))) {
if (ORTE_SUCCESS != (ret = queue_recv(recvptr, recvptr->channel, tag, cbfunc, NULL, false))) {
if (ORTE_EXISTS == ret) {
/* this recv already exists - just release the copy */
OBJ_RELEASE(recvptr);
@ -581,10 +611,14 @@ static int udp_recv_buffer(orte_process_name_t *name,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel));
recvptr = OBJ_NEW(rmcast_base_recv_t);
recvptr->channel = channel;
if (ORTE_RMCAST_GROUP_CHANNEL == channel) {
recvptr->channel = orte_rmcast_base.my_group_number;
} else {
recvptr->channel = channel;
}
recvptr->tag = tag;
if (ORTE_SUCCESS != (ret = queue_recv(recvptr, channel, tag, NULL, NULL, true))) {
if (ORTE_SUCCESS != (ret = queue_recv(recvptr, recvptr->channel, tag, NULL, NULL, true))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
@ -625,13 +659,17 @@ static int udp_recv_buffer_nb(orte_rmcast_channel_t channel,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
recvptr = OBJ_NEW(rmcast_base_recv_t);
recvptr->channel = channel;
if (ORTE_RMCAST_GROUP_CHANNEL == channel) {
recvptr->channel = orte_rmcast_base.my_group_number;
} else {
recvptr->channel = channel;
}
recvptr->tag = tag;
recvptr->flags = flags;
recvptr->cbfunc_buffer = cbfunc;
recvptr->cbdata = cbdata;
if (ORTE_SUCCESS != (ret = queue_recv(recvptr, channel, tag, NULL, cbfunc, false))) {
if (ORTE_SUCCESS != (ret = queue_recv(recvptr, recvptr->channel, tag, NULL, cbfunc, false))) {
if (ORTE_EXISTS == ret) {
/* this recv already exists - just release the copy */
OBJ_RELEASE(recvptr);
@ -650,6 +688,13 @@ static void cancel_recv(orte_rmcast_channel_t channel,
{
opal_list_item_t *item, *next;
rmcast_base_recv_t *ptr;
orte_rmcast_channel_t ch;
if (ORTE_RMCAST_GROUP_CHANNEL == channel) {
ch = orte_rmcast_base.my_group_number;
} else {
ch = channel;
}
/* find all recv's for this channel and tag */
item = opal_list_get_first(&recvs);
@ -657,8 +702,7 @@ static void cancel_recv(orte_rmcast_channel_t channel,
next = opal_list_get_next(item);
ptr = (rmcast_base_recv_t*)item;
if (channel == ptr->channel &&
tag == ptr->tag) {
if (ch == ptr->channel && tag == ptr->tag) {
OPAL_THREAD_LOCK(&lock);
opal_list_remove_item(&recvs, &ptr->item);
OBJ_RELEASE(ptr);