1
1

Reorganize the rmcast code to capture common code elements. Increase max msg size for spread and udp transports. Cleanup the spread configuration doc.

This commit was SVN r23207.
Этот коммит содержится в:
Ralph Castain 2010-05-25 22:36:57 +00:00
родитель 27f070a575
Коммит ab6e06f5b3
18 изменённых файлов: 1170 добавлений и 2035 удалений

Просмотреть файл

@ -12,12 +12,10 @@
# #
# This configures one spread daemon running on port 4803 on localhost. # This configures one spread daemon running on port 4803 on localhost.
Spread_Segment 172.16.174.255:4803 { Spread_Segment 192.168.203.255:4803 {
rmcast-1 172.16.174.129 sjc-rcastain-8713 192.168.203.1
rmcast-2 172.16.174.130 ubuntu 192.168.203.192
rmcast-3 172.16.174.131
rmcast-4 172.16.174.132
} }

Просмотреть файл

@ -350,10 +350,11 @@ static void recv_cmd(int status,
orte_db_cmd_t cmd; orte_db_cmd_t cmd;
opal_buffer_t *ans; opal_buffer_t *ans;
int count, i; int count, i;
int32_t rc, ret; int32_t rc;
char *key; char *key;
orte_db_data_t *dat; orte_db_data_t *dat;
orte_rmcast_channel_t ch; orte_rmcast_channel_t ch;
char *ch_name;
OPAL_OUTPUT_VERBOSE((2, orte_db_base_output, OPAL_OUTPUT_VERBOSE((2, orte_db_base_output,
"%s db:daemon: cmd recvd from %s", "%s db:daemon: cmd recvd from %s",
@ -365,6 +366,8 @@ static void recv_cmd(int status,
count=1; count=1;
opal_dss.unpack(buf, &ch, &count, ORTE_RMCAST_CHANNEL_T); opal_dss.unpack(buf, &ch, &count, ORTE_RMCAST_CHANNEL_T);
count=1; count=1;
opal_dss.unpack(buf, &ch_name, &count, OPAL_STRING);
count=1;
opal_dss.unpack(buf, &key, &count, OPAL_STRING); opal_dss.unpack(buf, &key, &count, OPAL_STRING);
ans = OBJ_NEW(opal_buffer_t); ans = OBJ_NEW(opal_buffer_t);
@ -420,11 +423,9 @@ static void recv_cmd(int status,
rc = ORTE_ERR_NOT_FOUND; rc = ORTE_ERR_NOT_FOUND;
break; break;
} }
/* open a channel back to the sender */
if (ORTE_SUCCESS != (ret = orte_rmcast.open_channel(&ch, ORTE_NAME_PRINT(sender), /* ensure the return channel is open */
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { orte_rmcast.open_channel(ch, ch_name, NULL, -1, NULL, ORTE_RMCAST_XMIT);
ORTE_ERROR_LOG(ret);
return;
}
orte_rmcast.send_buffer_nb(ch, ORTE_RMCAST_TAG_CMD_ACK, ans, callback_fn, NULL); orte_rmcast.send_buffer_nb(ch, ORTE_RMCAST_TAG_CMD_ACK, ans, callback_fn, NULL);
} }

Просмотреть файл

@ -20,6 +20,7 @@ if !ORTE_DISABLE_FULL_SUPPORT
libmca_rmcast_la_SOURCES += \ libmca_rmcast_la_SOURCES += \
base/rmcast_base_close.c \ base/rmcast_base_close.c \
base/rmcast_base_select.c base/rmcast_base_select.c \
base/rmcast_base_fns.c
endif endif

Просмотреть файл

@ -21,9 +21,12 @@
#include <netinet/in.h> #include <netinet/in.h>
#endif #endif
#include "opal/class/opal_list.h"
#include "opal/event/event.h" #include "opal/event/event.h"
#include "opal/threads/threads.h"
#include "orte/mca/rmcast/rmcast.h" #include "orte/mca/rmcast/rmcast.h"
#include "orte/mca/rmcast/base/private.h"
BEGIN_C_DECLS BEGIN_C_DECLS
@ -44,6 +47,13 @@ typedef struct {
uint32_t interface; uint32_t interface;
uint16_t ports[256]; uint16_t ports[256];
int cache_size; int cache_size;
bool opened;
opal_mutex_t lock;
opal_condition_t cond;
bool active;
opal_list_t recvs;
opal_list_t channels;
rmcast_base_channel_t *my_group_channel;
} orte_rmcast_base_t; } orte_rmcast_base_t;
ORTE_DECLSPEC extern orte_rmcast_base_t orte_rmcast_base; ORTE_DECLSPEC extern orte_rmcast_base_t orte_rmcast_base;

Просмотреть файл

@ -41,7 +41,6 @@ BEGIN_C_DECLS
} while(0) } while(0)
/**** CLASS DEFINITIONS ****/ /**** CLASS DEFINITIONS ****/
/* /*
* Data structure for tracking assigned channels * Data structure for tracking assigned channels
@ -77,7 +76,6 @@ typedef struct {
orte_process_name_t name; orte_process_name_t name;
orte_rmcast_channel_t channel; orte_rmcast_channel_t channel;
bool recvd; bool recvd;
bool iovecs_requested;
orte_rmcast_tag_t tag; orte_rmcast_tag_t tag;
orte_rmcast_flag_t flags; orte_rmcast_flag_t flags;
struct iovec *iovec_array; struct iovec *iovec_array;
@ -118,9 +116,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_base_send_t);
typedef struct { typedef struct {
opal_object_t super; opal_object_t super;
opal_event_t *ev; opal_event_t *ev;
uint8_t *data; opal_buffer_t *buf;
ssize_t sz;
rmcast_base_channel_t *channel;
} orte_mcast_msg_event_t; } orte_mcast_msg_event_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_mcast_msg_event_t); ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_mcast_msg_event_t);
@ -145,8 +141,7 @@ typedef struct {
} rmcast_send_log_t; } rmcast_send_log_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t); ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t);
#define ORTE_MULTICAST_MESSAGE_EVENT(bf, cbfunc) \
#define ORTE_MULTICAST_MESSAGE_EVENT(dat, n, chan, cbfunc) \
do { \ do { \
orte_mcast_msg_event_t *mev; \ orte_mcast_msg_event_t *mev; \
struct timeval now; \ struct timeval now; \
@ -154,9 +149,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t);
"defining mcast msg event: %s %d", \ "defining mcast msg event: %s %d", \
__FILE__, __LINE__)); \ __FILE__, __LINE__)); \
mev = OBJ_NEW(orte_mcast_msg_event_t); \ mev = OBJ_NEW(orte_mcast_msg_event_t); \
mev->data = (dat); \ mev->buf = (bf); \
mev->sz = (n); \
mev->channel = (chan); \
opal_evtimer_set(mev->ev, (cbfunc), mev); \ opal_evtimer_set(mev->ev, (cbfunc), mev); \
now.tv_sec = 0; \ now.tv_sec = 0; \
now.tv_usec = 0; \ now.tv_usec = 0; \
@ -164,52 +157,6 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t);
} while(0); } while(0);
#define ORTE_MULTICAST_MESSAGE_HDR_HTON(bfr, tg, seq) \
do { \
uint32_t nm; \
uint16_t tmp; \
nm = htonl(ORTE_PROC_MY_NAME->jobid); \
memcpy((bfr), &nm, 4); \
nm = htonl(ORTE_PROC_MY_NAME->vpid); \
memcpy((bfr)+4, &nm, 4); \
/* add the tag data, also converted */ \
tmp = htons((tg)); \
memcpy((bfr)+8, &tmp, 2); \
/* add the sequence number, also converted */ \
nm = htonl((seq)); \
memcpy((bfr)+10, &nm, 4); \
} while(0);
#define ORTE_MULTICAST_MESSAGE_HDR_NTOH(bfr, nm, tg, seq) \
do { \
uint32_t tmp; \
uint16_t tmp16; \
/* extract the name and convert it to host order */ \
memcpy(&tmp, (bfr), 4); \
(nm)->jobid = ntohl(tmp); \
memcpy(&tmp, (bfr)+4, 4); \
(nm)->vpid = ntohl(tmp); \
/* extract the target tag */ \
memcpy(&tmp16, (bfr)+8, 2); \
(tg) = ntohs(tmp16); \
/* extract the sequence number */ \
memcpy(&tmp, (bfr)+10, 4); \
(seq) = ntohl(tmp); \
} while(0);
#define ORTE_MULTICAST_LOAD_MESSAGE(bfr, dat, sz, maxsz, endsz) \
do { \
if ((maxsz) <= (sz) + 14) { \
*(endsz) = -1 * ((sz) + 14); \
} else { \
memcpy((bfr)+14, (dat), (sz)); \
*(endsz) = (sz) + 14; \
} \
} while(0);
#define ORTE_MULTICAST_UNLOAD_MESSAGE(bfr, dat, sz) \
opal_dss.load((bfr), (dat)+14, (sz)-14);
#define ORTE_MULTICAST_NEXT_SEQUENCE_NUM(seq) \ #define ORTE_MULTICAST_NEXT_SEQUENCE_NUM(seq) \
do { \ do { \
if ((seq) < ORTE_RMCAST_SEQ_MAX) { \ if ((seq) < ORTE_RMCAST_SEQ_MAX) { \
@ -219,6 +166,28 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t);
} \ } \
} while(0); } while(0);
/**** FUNCTIONS ****/
ORTE_DECLSPEC int orte_rmcast_base_build_msg(rmcast_base_channel_t *ch,
opal_buffer_t **buffer,
rmcast_base_send_t *snd);
ORTE_DECLSPEC int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_fn_t cbfunc_iovec,
orte_rmcast_callback_buffer_fn_t cbfunc_buffer,
void *cbdata, bool blocking);
ORTE_DECLSPEC void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg);
ORTE_DECLSPEC void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag);
ORTE_DECLSPEC int orte_rmcast_base_close_channel(orte_rmcast_channel_t channel);
ORTE_DECLSPEC orte_rmcast_channel_t orte_rmcast_base_query(void);
#endif /* ORTE_DISABLE_FULL_SUPPORT */ #endif /* ORTE_DISABLE_FULL_SUPPORT */
END_C_DECLS END_C_DECLS

Просмотреть файл

@ -20,6 +20,10 @@
int orte_rmcast_base_close(void) int orte_rmcast_base_close(void)
{ {
if (!orte_rmcast_base.opened) {
return ORTE_SUCCESS;
}
/* finalize the active module */ /* finalize the active module */
if (NULL != orte_rmcast.finalize) { if (NULL != orte_rmcast.finalize) {
orte_rmcast.finalize(); orte_rmcast.finalize();
@ -31,5 +35,8 @@ int orte_rmcast_base_close(void)
mca_base_components_close(orte_rmcast_base.rmcast_output, mca_base_components_close(orte_rmcast_base.rmcast_output,
&orte_rmcast_base.rmcast_opened, NULL); &orte_rmcast_base.rmcast_opened, NULL);
orte_rmcast_base.opened = false;
OBJ_DESTRUCT(&orte_rmcast_base.lock);
OBJ_DESTRUCT(&orte_rmcast_base.cond);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

518
orte/mca/rmcast/base/rmcast_base_fns.c Обычный файл
Просмотреть файл

@ -0,0 +1,518 @@
/*
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include <stdio.h>
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "opal/threads/threads.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/rmcast/base/base.h"
#include "orte/mca/rmcast/base/private.h"
static int extract_hdr(opal_buffer_t *buf,
orte_process_name_t *name,
orte_rmcast_channel_t *channel,
orte_rmcast_tag_t *tag,
orte_rmcast_seq_t *seq_num);
static int insert_hdr(opal_buffer_t *buf,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_rmcast_seq_t seq_num);
int orte_rmcast_base_build_msg(rmcast_base_channel_t *ch,
opal_buffer_t **buffer,
rmcast_base_send_t *snd)
{
int32_t sz;
opal_buffer_t *buf;
int rc;
int8_t flag;
int32_t tmp32;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:build_msg of %d %s"
" for multicast on channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == snd->iovec_array) ? (int)snd->buf->bytes_used : (int)snd->iovec_count,
(NULL == snd->iovec_array) ? "bytes" : "iovecs",
(int)ch->channel, snd->tag));
/* setup a buffer */
buf = OBJ_NEW(opal_buffer_t);
/* insert the header */
if (ORTE_SUCCESS != (rc = insert_hdr(buf, ch->channel, snd->tag, ch->seq_num))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* are we sending a buffer? */
if (NULL == snd->buf) {
/* no, flag the buffer as containing iovecs */
flag = 0;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* pack the number of iovecs */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &snd->iovec_count, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* pack each iovec into a buffer in prep for sending
* so we can recreate the array at the other end
*/
for (sz=0; sz < snd->iovec_count; sz++) {
/* pack the size */
tmp32 = snd->iovec_array[sz].iov_len;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tmp32, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (0 < tmp32) {
/* pack the bytes */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, snd->iovec_array[sz].iov_base, tmp32, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
}
} else {
/* flag it as being a buffer */
flag = 1;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* copy the payload */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, snd->buf))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
*buffer = buf;
return ORTE_SUCCESS;
cleanup:
if (NULL != buf) {
OBJ_RELEASE(buf);
}
return rc;
}
int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_fn_t cbfunc_iovec,
orte_rmcast_callback_buffer_fn_t cbfunc_buffer,
void *cbdata, bool blocking)
{
opal_list_item_t *item;
rmcast_base_recv_t *rptr;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: queue_recv called on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
if (!blocking) {
/* do we already have a recv for this channel/tag? */
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
for (item = opal_list_get_first(&orte_rmcast_base.recvs);
item != opal_list_get_end(&orte_rmcast_base.recvs);
item = opal_list_get_next(item)) {
rptr = (rmcast_base_recv_t*)item;
if (channel != rptr->channel) {
/* different channel */
continue;
}
if (tag != rptr->tag) {
/* different tag */
continue;
}
if (NULL != cbfunc_iovec) {
if (NULL != rptr->cbfunc_iovec) {
/* already have one in place */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: matching recv already active on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_EXISTS;
}
rptr->cbfunc_iovec = cbfunc_iovec;
}
if (NULL != cbfunc_buffer) {
if (NULL != rptr->cbfunc_buffer) {
/* matching type - recv already in place */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: matching recv already active on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_EXISTS;
}
rptr->cbfunc_iovec = cbfunc_iovec;
}
if (NULL != recvptr) {
*recvptr = rptr;
}
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_SUCCESS;
}
}
/* if we get here, then we need to add a new recv */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: adding recv on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
rptr = OBJ_NEW(rmcast_base_recv_t);
rptr->channel = channel;
rptr->tag = tag;
rptr->flags = flags;
rptr->cbfunc_iovec = cbfunc_iovec;
rptr->cbfunc_buffer = cbfunc_buffer;
rptr->cbdata = cbdata;
if (NULL != recvptr) {
*recvptr = rptr;
}
opal_list_append(&orte_rmcast_base.recvs, &rptr->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_SUCCESS;
}
void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg)
{
orte_rmcast_channel_t channel;
opal_list_item_t *item;
rmcast_base_recv_t *ptr;
orte_process_name_t name;
orte_rmcast_tag_t tag;
int8_t flag;
struct iovec *iovec_array=NULL;
int32_t iovec_count=0, i, n, isz;
opal_buffer_t *recvd_buf=NULL;
int rc;
orte_rmcast_seq_t recvd_seq_num;
/* extract the header */
if (ORTE_SUCCESS != (rc = extract_hdr(msg->buf, &name, &channel, &tag, &recvd_seq_num))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* if this message is from myself, ignore it */
if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) {
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv sent from myself: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name)));
goto cleanup;
}
/* if this message is from a different job family, ignore it unless
* it is on the system channel. We ignore these messages to avoid
* confusion between different jobs since we all may be sharing
* multicast channels. The system channel is left open to support
* cross-job communications via the HNP.
*/
if (ORTE_JOB_FAMILY(name.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) &&
(ORTE_RMCAST_SYS_CHANNEL != channel)) {
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv from a different job family: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name)));
goto cleanup;
}
/* unpack the iovec vs buf flag */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &flag, &n, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name), channel, (int)tag,
(0 == flag) ? "iovecs" : "buffer", recvd_seq_num));
/* find the recv for this channel, tag, and type */
for (item = opal_list_get_first(&orte_rmcast_base.recvs);
item != opal_list_get_end(&orte_rmcast_base.recvs);
item = opal_list_get_next(item)) {
ptr = (rmcast_base_recv_t*)item;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp:recv checking channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)ptr->channel, (int)ptr->tag));
if (channel != ptr->channel) {
continue;
}
if (tag != ptr->tag && ORTE_RMCAST_TAG_WILDCARD != ptr->tag) {
continue;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp:recv delivering message to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
/* we have a recv - unpack the data */
if (0 == flag) {
/* get the number of iovecs in the buffer */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &iovec_count, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* malloc the required space */
iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
/* unpack the iovecs */
for (i=0; i < iovec_count; i++) {
/* unpack the number of bytes in this iovec */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &isz, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
iovec_array[i].iov_base = NULL;
iovec_array[i].iov_len = isz;
if (0 < isz) {
/* allocate the space */
iovec_array[i].iov_base = (uint8_t*)malloc(isz);
/* unpack the data */
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, iovec_array[i].iov_base, &isz, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
}
if (NULL != ptr->cbfunc_iovec) {
ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, tag,
&name, iovec_array, iovec_count, ptr->cbdata);
/* if it isn't persistent, remove it */
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
OBJ_RELEASE(ptr);
}
} else {
/* if something is already present, then we have a problem */
if (NULL != ptr->iovec_array) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp:recv blocking recv already fulfilled",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto cleanup;
}
/* copy over the iovec array since it will be released by
* the blocking recv
*/
ptr->iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
ptr->iovec_count = iovec_count;
for (i=0; i < iovec_count; i++) {
ptr->iovec_array[i].iov_base = (uint8_t*)malloc(iovec_array[i].iov_len);
ptr->iovec_array[i].iov_len = iovec_array[i].iov_len;
memcpy(ptr->iovec_array[i].iov_base, iovec_array[i].iov_base, iovec_array[i].iov_len);
}
/* flag it as recvd to release blocking recv */
ptr->recvd = true;
}
} else {
/* buffer was included */
recvd_buf = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(recvd_buf, msg->buf))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (NULL != ptr->cbfunc_buffer) {
ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, tag,
&name, recvd_buf, ptr->cbdata);
/* if it isn't persistent, remove it */
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
OBJ_RELEASE(ptr);
}
} else {
/* if something is already present, then we have a problem */
if (NULL != ptr->buf) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp:recv blocking recv already fulfilled",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto cleanup;
}
/* copy the buffer across since it will be released
* by the blocking recv
*/
ptr->buf = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(ptr->buf, recvd_buf))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* flag it as recvd to release blocking recv */
ptr->recvd = true;
}
}
/* we are done - only one recv can match */
break;
}
cleanup:
if (NULL != iovec_array) {
for (i=0; i < iovec_count; i++) {
free(iovec_array[i].iov_base);
}
free(iovec_array);
}
if (NULL != recvd_buf) {
OBJ_RELEASE(recvd_buf);
}
return;
}
void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag)
{
opal_list_item_t *item, *next;
rmcast_base_recv_t *ptr;
orte_rmcast_channel_t ch;
if (ORTE_RMCAST_GROUP_CHANNEL == channel) {
ch = orte_rmcast_base.my_group_number;
} else {
ch = channel;
}
/* find all recv's for this channel and tag */
item = opal_list_get_first(&orte_rmcast_base.recvs);
while (item != opal_list_get_end(&orte_rmcast_base.recvs)) {
next = opal_list_get_next(item);
ptr = (rmcast_base_recv_t*)item;
if (ch == ptr->channel &&
tag == ptr->tag) {
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
OBJ_RELEASE(ptr);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
}
item = next;
}
}
int orte_rmcast_base_close_channel(orte_rmcast_channel_t channel)
{
opal_list_item_t *item;
rmcast_base_channel_t *chan;
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
for (item = opal_list_get_first(&orte_rmcast_base.channels);
item != opal_list_get_end(&orte_rmcast_base.channels);
item = opal_list_get_next(item)) {
chan = (rmcast_base_channel_t*)item;
if (channel == chan->channel) {
opal_list_remove_item(&orte_rmcast_base.channels, item);
OBJ_RELEASE(chan);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_SUCCESS;
}
}
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_ERR_NOT_FOUND;
}
orte_rmcast_channel_t orte_rmcast_base_query(void)
{
return orte_rmcast_base.my_group_channel->channel;
}
static int extract_hdr(opal_buffer_t *buf,
orte_process_name_t *name,
orte_rmcast_channel_t *channel,
orte_rmcast_tag_t *tag,
orte_rmcast_seq_t *seq_num)
{
int rc;
int32_t n;
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, name, &n, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, channel, &n, ORTE_RMCAST_CHANNEL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, tag, &n, ORTE_RMCAST_TAG_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, seq_num, &n, ORTE_RMCAST_SEQ_T))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
static int insert_hdr(opal_buffer_t *buf,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_rmcast_seq_t seq_num)
{
int rc;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &channel, 1, ORTE_RMCAST_CHANNEL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RMCAST_TAG_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &seq_num, 1, ORTE_RMCAST_SEQ_T))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}

Просмотреть файл

@ -27,6 +27,7 @@
#include "opal/util/if.h" #include "opal/util/if.h"
#include "opal/util/opal_sos.h" #include "opal/util/opal_sos.h"
#include "opal/class/opal_ring_buffer.h" #include "opal/class/opal_ring_buffer.h"
#include "opal/class/opal_list.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
@ -79,8 +80,7 @@ orte_rmcast_module_t orte_rmcast = {
NULL NULL
}; };
orte_rmcast_base_t orte_rmcast_base; orte_rmcast_base_t orte_rmcast_base;
static bool opened=false;
static bool opened = false;
/** /**
* Function for finding and opening either all MCA components, or the one * Function for finding and opening either all MCA components, or the one
@ -101,8 +101,15 @@ int orte_rmcast_base_open(void)
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
opened = true; opened = true;
orte_rmcast_base.opened = true;
/* ensure all global values are initialized */ /* ensure all global values are initialized */
OBJ_CONSTRUCT(&orte_rmcast_base.lock, opal_mutex_t);
OBJ_CONSTRUCT(&orte_rmcast_base.cond, opal_condition_t);
orte_rmcast_base.active = false;
OBJ_CONSTRUCT(&orte_rmcast_base.recvs, opal_list_t);
OBJ_CONSTRUCT(&orte_rmcast_base.channels, opal_list_t);
orte_rmcast_base.xmit_network = 0; orte_rmcast_base.xmit_network = 0;
orte_rmcast_base.my_group_name = NULL; orte_rmcast_base.my_group_name = NULL;
orte_rmcast_base.my_group_number = 0; orte_rmcast_base.my_group_number = 0;
@ -291,16 +298,12 @@ int orte_rmcast_base_open(void)
static void mcast_event_constructor(orte_mcast_msg_event_t *ev) static void mcast_event_constructor(orte_mcast_msg_event_t *ev)
{ {
ev->ev = (opal_event_t*)malloc(sizeof(opal_event_t)); ev->ev = (opal_event_t*)malloc(sizeof(opal_event_t));
ev->data = NULL;
} }
static void mcast_event_destructor(orte_mcast_msg_event_t *ev) static void mcast_event_destructor(orte_mcast_msg_event_t *ev)
{ {
if (NULL != ev->ev) { if (NULL != ev->ev) {
free(ev->ev); free(ev->ev);
} }
if (NULL != ev->data) {
free(ev->data);
}
} }
OBJ_CLASS_INSTANCE(orte_mcast_msg_event_t, OBJ_CLASS_INSTANCE(orte_mcast_msg_event_t,
opal_object_t, opal_object_t,
@ -329,7 +332,6 @@ static void recv_construct(rmcast_base_recv_t *ptr)
ptr->name.vpid = ORTE_VPID_INVALID; ptr->name.vpid = ORTE_VPID_INVALID;
ptr->channel = ORTE_RMCAST_INVALID_CHANNEL; ptr->channel = ORTE_RMCAST_INVALID_CHANNEL;
ptr->recvd = false; ptr->recvd = false;
ptr->iovecs_requested = false;
ptr->tag = ORTE_RMCAST_TAG_INVALID; ptr->tag = ORTE_RMCAST_TAG_INVALID;
ptr->flags = ORTE_RMCAST_NON_PERSISTENT; /* default */ ptr->flags = ORTE_RMCAST_NON_PERSISTENT; /* default */
ptr->iovec_array = NULL; ptr->iovec_array = NULL;

Просмотреть файл

@ -122,8 +122,8 @@ typedef int (*orte_rmcast_base_module_recv_nb_fn_t)(orte_rmcast_channel_t channe
typedef void (*orte_rmcast_base_module_cancel_recv_fn_t)(orte_rmcast_channel_t channel, typedef void (*orte_rmcast_base_module_cancel_recv_fn_t)(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag); orte_rmcast_tag_t tag);
/* open the next available channel */ /* open the specified channel */
typedef int (*orte_rmcast_base_module_open_channel_fn_t)(orte_rmcast_channel_t *channel, char *name, typedef int (*orte_rmcast_base_module_open_channel_fn_t)(orte_rmcast_channel_t channel, char *name,
char *network, int port, char *interface, uint8_t direction); char *network, int port, char *interface, uint8_t direction);
/* close the channel */ /* close the channel */

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -19,14 +19,6 @@
BEGIN_C_DECLS BEGIN_C_DECLS
#define ORTE_RMCAST_SPREAD_MAX_MSG_SIZE 1500
typedef struct {
orte_rmcast_base_component_t super;
int max_msg_size;
} orte_rmcast_spread_component_t;
/* /*
* Module open / close * Module open / close
*/ */
@ -35,7 +27,7 @@ int orte_rmcast_spread_component_close(void);
int orte_rmcast_spread_component_query(mca_base_module_t **module, int *priority); int orte_rmcast_spread_component_query(mca_base_module_t **module, int *priority);
ORTE_MODULE_DECLSPEC extern orte_rmcast_spread_component_t mca_rmcast_spread_component; ORTE_MODULE_DECLSPEC extern orte_rmcast_base_component_t mca_rmcast_spread_component;
ORTE_DECLSPEC extern orte_rmcast_module_t orte_rmcast_spread_module; ORTE_DECLSPEC extern orte_rmcast_module_t orte_rmcast_spread_module;
END_C_DECLS END_C_DECLS

Просмотреть файл

@ -38,8 +38,7 @@ const char *mca_rmcast_spread_component_version_string =
* Instantiate the public struct with all of our public information * Instantiate the public struct with all of our public information
* and pointers to our public functions in it * and pointers to our public functions in it
*/ */
orte_rmcast_spread_component_t mca_rmcast_spread_component = { orte_rmcast_base_component_t mca_rmcast_spread_component = {
{
{ {
ORTE_RMCAST_BASE_VERSION_1_0_0, ORTE_RMCAST_BASE_VERSION_1_0_0,
@ -58,21 +57,12 @@ orte_rmcast_spread_component_t mca_rmcast_spread_component = {
/* The component is checkpoint ready */ /* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT MCA_BASE_METADATA_PARAM_CHECKPOINT
} }
}
}; };
int int
orte_rmcast_spread_component_open(void) orte_rmcast_spread_component_open(void)
{ {
mca_base_component_t *c = &mca_rmcast_spread_component.super.version;
mca_base_param_reg_int(c, "max_msg_size",
"Max #bytes in a single msg (must be > 0)",
false, false,
ORTE_RMCAST_SPREAD_MAX_MSG_SIZE,
&mca_rmcast_spread_component.max_msg_size);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -82,7 +82,7 @@ static int orte_rmcast_tcp_query(mca_base_module_t **module, int *priority)
} }
/* selected by choice */ /* selected by choice */
*priority = 0; *priority = 50;
*module = (mca_base_module_t *) &orte_rmcast_tcp_module; *module = (mca_base_module_t *) &orte_rmcast_tcp_module;
initialized = true; initialized = true;

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -24,14 +24,9 @@
BEGIN_C_DECLS BEGIN_C_DECLS
#define ORTE_RMCAST_UDP_MAX_MSG_SIZE 1500 #define ORTE_RMCAST_UDP_MTU 65536
typedef struct { ORTE_MODULE_DECLSPEC extern orte_rmcast_base_component_t mca_rmcast_udp_component;
orte_rmcast_base_component_t super;
int max_msg_size;
} orte_rmcast_udp_component_t;
ORTE_MODULE_DECLSPEC extern orte_rmcast_udp_component_t mca_rmcast_udp_component;
extern orte_rmcast_module_t orte_rmcast_udp_module; extern orte_rmcast_module_t orte_rmcast_udp_module;
END_C_DECLS END_C_DECLS

Просмотреть файл

@ -41,8 +41,7 @@ static bool initialized = false;
const char *mca_rmcast_udp_component_version_string = const char *mca_rmcast_udp_component_version_string =
"Open MPI udp rmcast MCA component version " ORTE_VERSION; "Open MPI udp rmcast MCA component version " ORTE_VERSION;
orte_rmcast_udp_component_t mca_rmcast_udp_component = { orte_rmcast_base_component_t mca_rmcast_udp_component = {
{
{ {
ORTE_RMCAST_BASE_VERSION_1_0_0, ORTE_RMCAST_BASE_VERSION_1_0_0,
@ -60,7 +59,6 @@ orte_rmcast_udp_component_t mca_rmcast_udp_component = {
/* The component is checkpoint ready */ /* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT MCA_BASE_METADATA_PARAM_CHECKPOINT
} }
}
}; };
/** /**
@ -68,14 +66,6 @@ orte_rmcast_udp_component_t mca_rmcast_udp_component = {
*/ */
static int orte_rmcast_udp_open(void) static int orte_rmcast_udp_open(void)
{ {
mca_base_component_t *c = &mca_rmcast_udp_component.super.version;
mca_base_param_reg_int(c, "max_msg_size",
"Max #bytes in a single msg (must be > 0)",
false, false,
ORTE_RMCAST_UDP_MAX_MSG_SIZE,
&mca_rmcast_udp_component.max_msg_size);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

Просмотреть файл

@ -36,7 +36,7 @@ static void cbfunc_iovec(int status,
orte_process_name_t *sender, orte_process_name_t *sender,
struct iovec *msg, int count, void* cbdata); struct iovec *msg, int count, void* cbdata);
orte_rmcast_channel_t chan=4; static int datasize=1024;
static void send_data(int fd, short flags, void *arg) static void send_data(int fd, short flags, void *arg)
{ {
@ -49,8 +49,8 @@ static void send_data(int fd, short flags, void *arg)
bfptr = OBJ_NEW(opal_buffer_t); bfptr = OBJ_NEW(opal_buffer_t);
i32 = -1; i32 = -1;
opal_dss.pack(bfptr, &i32, 1, OPAL_INT32); opal_dss.pack(bfptr, &i32, datasize, OPAL_INT32);
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(chan, if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(ORTE_RMCAST_GROUP_CHANNEL,
ORTE_RMCAST_TAG_OUTPUT, bfptr, ORTE_RMCAST_TAG_OUTPUT, bfptr,
cbfunc_buf_snt, NULL))) { cbfunc_buf_snt, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -59,11 +59,11 @@ static void send_data(int fd, short flags, void *arg)
} }
/* create an iovec array */ /* create an iovec array */
for (i=0; i < 3; i++) { for (i=0; i < 3; i++) {
iovec_array[i].iov_base = (uint8_t*)malloc(30); iovec_array[i].iov_base = (uint8_t*)malloc(datasize);
iovec_array[i].iov_len = 30; iovec_array[i].iov_len = datasize;
} }
/* send it out */ /* send it out */
if (ORTE_SUCCESS != (rc = orte_rmcast.send(chan, if (ORTE_SUCCESS != (rc = orte_rmcast.send(ORTE_RMCAST_GROUP_CHANNEL,
ORTE_RMCAST_TAG_OUTPUT, ORTE_RMCAST_TAG_OUTPUT,
iovec_array, 3))) { iovec_array, 3))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -85,48 +85,41 @@ int main(int argc, char* argv[])
struct iovec iovec_array[3]; struct iovec iovec_array[3];
if (0 > (rc = orte_init(&argc, &argv, ORTE_PROC_NON_MPI))) { if (0 > (rc = orte_init(&argc, &argv, ORTE_PROC_NON_MPI))) {
fprintf(stderr, "orte_nodename: couldn't init orte - error code %d\n", rc); fprintf(stderr, "orte_mcast: couldn't init orte - error code %d\n", rc);
return rc; return rc;
} }
gethostname(hostname, 512); gethostname(hostname, 512);
pid = getpid(); pid = getpid();
printf("orte_mcast: Node %s Name %s Pid %ld\n", if (1 < argc) {
hostname, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)pid); datasize = strtol(argv[1], NULL, 10);
}
printf("orte_mcast: Node %s Name %s Pid %ld datasize %d\n",
hostname, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)pid, datasize);
if (0 == ORTE_PROC_MY_NAME->vpid) { if (0 == ORTE_PROC_MY_NAME->vpid) {
orte_grpcomm.barrier(); orte_grpcomm.barrier();
/* open a new channel */
if (ORTE_SUCCESS != (rc = orte_rmcast.open_channel(&chan, "orte_mcast", NULL, -1, NULL, ORTE_RMCAST_XMIT))) {
ORTE_ERROR_LOG(rc);
goto blast;
}
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* pass the new channel number */
i32 = chan;
opal_dss.pack(&buf, &i32, 1, OPAL_INT32);
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer(ORTE_RMCAST_APP_PUBLIC_CHANNEL,
ORTE_RMCAST_TAG_ANNOUNCE, &buf))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
goto blast;
}
OBJ_DESTRUCT(&buf);
/* wake up every 5 seconds and send something */ /* wake up every 5 seconds and send something */
ORTE_TIMER_EVENT(5, 0, send_data); ORTE_TIMER_EVENT(5, 0, send_data);
} else { } else {
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_APP_PUBLIC_CHANNEL, /* setup to recv data on our channel */
ORTE_RMCAST_TAG_WILDCARD, if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_GROUP_CHANNEL,
ORTE_RMCAST_TAG_OUTPUT,
ORTE_RMCAST_PERSISTENT, ORTE_RMCAST_PERSISTENT,
cbfunc, NULL))) { cbfunc, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(ORTE_RMCAST_GROUP_CHANNEL,
ORTE_RMCAST_TAG_OUTPUT,
ORTE_RMCAST_PERSISTENT,
cbfunc_iovec, NULL))) {
ORTE_ERROR_LOG(rc);
}
orte_grpcomm.barrier(); /* ensure the public recv is ready */ orte_grpcomm.barrier(); /* ensure the public recv is ready */
} }
opal_event_dispatch(); opal_event_dispatch();
@ -152,29 +145,6 @@ static void cbfunc(int status,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), channel, tag, i32); ORTE_NAME_PRINT(sender), channel, tag, i32);
if (i32 < 0) {
return;
}
/* open a new channel */
chan = i32;
if (ORTE_SUCCESS != (rc = orte_rmcast.open_channel(&chan, "orte_mcast", NULL, -1, NULL, ORTE_RMCAST_RECV))) {
ORTE_ERROR_LOG(rc);
}
/* setup to recv data on it */
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(chan,
ORTE_RMCAST_TAG_OUTPUT,
ORTE_RMCAST_PERSISTENT,
cbfunc, NULL))) {
ORTE_ERROR_LOG(rc);
}
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(chan,
ORTE_RMCAST_TAG_OUTPUT,
ORTE_RMCAST_PERSISTENT,
cbfunc_iovec, NULL))) {
ORTE_ERROR_LOG(rc);
}
} }
static void cbfunc_iovec(int status, static void cbfunc_iovec(int status,