1
1

Add message sequence numbers to multicast messages, tracked by channel

This commit was SVN r22262.
Этот коммит содержится в:
Ralph Castain 2009-12-04 04:17:44 +00:00
родитель a7ca4050b5
Коммит 4a82dd9a45
7 изменённых файлов: 87 добавлений и 36 удалений

Просмотреть файл

@ -362,6 +362,7 @@ static bool name_success = false;
static void cbfunc(int status, static void cbfunc(int status,
int channel, orte_rmcast_tag_t tag, int channel, orte_rmcast_tag_t tag,
orte_process_name_t *sender, orte_process_name_t *sender,
orte_rmcast_seq_t seq_num,
opal_buffer_t *buf, void *cbdata) opal_buffer_t *buf, void *cbdata)
{ {
int32_t n; int32_t n;

Просмотреть файл

@ -53,6 +53,7 @@ typedef struct {
uint16_t port; uint16_t port;
uint32_t interface; uint32_t interface;
int xmit; int xmit;
orte_rmcast_seq_t seq_num;
int recv; int recv;
struct sockaddr_in addr; struct sockaddr_in addr;
opal_event_t send_ev; opal_event_t send_ev;
@ -74,6 +75,7 @@ typedef struct {
orte_rmcast_channel_t channel; orte_rmcast_channel_t channel;
bool recvd; bool recvd;
orte_rmcast_tag_t tag; orte_rmcast_tag_t tag;
orte_rmcast_seq_t seq_num;
orte_rmcast_flag_t flags; orte_rmcast_flag_t flags;
struct iovec *iovec_array; struct iovec *iovec_array;
int iovec_count; int iovec_count;
@ -135,20 +137,23 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_mcast_msg_event_t);
} while(0); } while(0);
#define ORTE_MULTICAST_MESSAGE_HDR_HTON(bfr, tg) \ #define ORTE_MULTICAST_MESSAGE_HDR_HTON(bfr, tg, seq) \
do { \ do { \
uint32_t nm; \ uint32_t nm; \
uint16_t tmp; \ uint16_t tmp; \
nm = htonl(ORTE_PROC_MY_NAME->jobid); \ nm = htonl(ORTE_PROC_MY_NAME->jobid); \
memcpy((bfr), &nm, 4); \ memcpy((bfr), &nm, 4); \
nm = htonl(ORTE_PROC_MY_NAME->vpid); \ nm = htonl(ORTE_PROC_MY_NAME->vpid); \
memcpy((bfr)+4, &nm, 4); \ memcpy((bfr)+4, &nm, 4); \
/* add the tag data, also converted */ \ /* add the tag data, also converted */ \
tmp = htons((tg)); \ tmp = htons((tg)); \
memcpy((bfr)+8, &tmp, 2); \ memcpy((bfr)+8, &tmp, 2); \
/* add the sequence number, also converted */ \
nm = htonl((seq)); \
memcpy((bfr)+10, &nm, 4); \
} while(0); } while(0);
#define ORTE_MULTICAST_MESSAGE_HDR_NTOH(bfr, nm, tg) \ #define ORTE_MULTICAST_MESSAGE_HDR_NTOH(bfr, nm, tg, seq) \
do { \ do { \
uint32_t tmp; \ uint32_t tmp; \
uint16_t tmp16; \ uint16_t tmp16; \
@ -160,20 +165,32 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_mcast_msg_event_t);
/* extract the target tag */ \ /* extract the target tag */ \
memcpy(&tmp16, (bfr)+8, 2); \ memcpy(&tmp16, (bfr)+8, 2); \
(tg) = ntohs(tmp16); \ (tg) = ntohs(tmp16); \
/* extract the sequence number */ \
memcpy(&tmp, (bfr)+10, 4); \
(seq) = ntohl(tmp); \
} while(0); } while(0);
#define ORTE_MULTICAST_LOAD_MESSAGE(bfr, dat, sz, maxsz, endsz) \ #define ORTE_MULTICAST_LOAD_MESSAGE(bfr, dat, sz, maxsz, endsz) \
do { \ do { \
if ((maxsz) <= (sz) + 10) { \ if ((maxsz) <= (sz) + 14) { \
*(endsz) = 0; \ *(endsz) = 0; \
} else { \ } else { \
memcpy((bfr)+10, (dat), (sz)); \ memcpy((bfr)+14, (dat), (sz)); \
*(endsz) = (sz) + 10; \ *(endsz) = (sz) + 14; \
} \ } \
} while(0); } while(0);
#define ORTE_MULTICAST_UNLOAD_MESSAGE(bfr, dat, sz) \ #define ORTE_MULTICAST_UNLOAD_MESSAGE(bfr, dat, sz) \
opal_dss.load((bfr), (dat)+10, (sz)-10); opal_dss.load((bfr), (dat)+14, (sz)-14);
#define ORTE_MULTICAST_NEXT_SEQUENCE_NUM(seq) \
do { \
if ((seq) < ORTE_RMCAST_SEQ_MAX) { \
(seq) += 1; \
} else { \
(seq) = 0; \
} \
} while(0);
#endif /* ORTE_DISABLE_FULL_SUPPORT */ #endif /* ORTE_DISABLE_FULL_SUPPORT */

Просмотреть файл

@ -317,6 +317,7 @@ static void recv_construct(rmcast_base_recv_t *ptr)
ptr->channel = ORTE_RMCAST_INVALID_CHANNEL; ptr->channel = ORTE_RMCAST_INVALID_CHANNEL;
ptr->recvd = false; ptr->recvd = false;
ptr->tag = ORTE_RMCAST_TAG_INVALID; ptr->tag = ORTE_RMCAST_TAG_INVALID;
ptr->seq_num = 0;
ptr->flags = ORTE_RMCAST_NON_PERSISTENT; /* default */ ptr->flags = ORTE_RMCAST_NON_PERSISTENT; /* default */
ptr->iovec_array = NULL; ptr->iovec_array = NULL;
ptr->iovec_count = 0; ptr->iovec_count = 0;
@ -338,6 +339,7 @@ static void channel_construct(rmcast_base_channel_t *ptr)
ptr->port = 0; ptr->port = 0;
ptr->interface = 0; ptr->interface = 0;
ptr->xmit = -1; ptr->xmit = -1;
ptr->seq_num = 0;
ptr->recv = -1; ptr->recv = -1;
memset(&ptr->addr, 0, sizeof(ptr->addr)); memset(&ptr->addr, 0, sizeof(ptr->addr));
OBJ_CONSTRUCT(&ptr->send_lock, opal_mutex_t); OBJ_CONSTRUCT(&ptr->send_lock, opal_mutex_t);

Просмотреть файл

@ -84,7 +84,8 @@ static int basic_send_nb(orte_rmcast_channel_t channel,
static int basic_recv_buffer(orte_process_name_t *sender, static int basic_recv_buffer(orte_process_name_t *sender,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf); opal_buffer_t *buf,
orte_rmcast_seq_t *seq_num);
static int basic_recv_buffer_nb(orte_rmcast_channel_t channel, static int basic_recv_buffer_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
@ -95,7 +96,8 @@ static int basic_recv_buffer_nb(orte_rmcast_channel_t channel,
static int basic_recv(orte_process_name_t *sender, static int basic_recv(orte_process_name_t *sender,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
struct iovec **msg, int *count); struct iovec **msg, int *count,
orte_rmcast_seq_t *seq_num);
static int basic_recv_nb(orte_rmcast_channel_t channel, static int basic_recv_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
@ -235,6 +237,7 @@ static void internal_snd_cb(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_process_name_t *sender, orte_process_name_t *sender,
orte_rmcast_seq_t seq_num,
struct iovec *msg, int count, void *cbdata) struct iovec *msg, int count, void *cbdata)
{ {
send_complete = true; send_complete = true;
@ -244,6 +247,7 @@ static void internal_snd_buf_cb(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_process_name_t *sender, orte_process_name_t *sender,
orte_rmcast_seq_t seq_num,
opal_buffer_t *buf, void *cbdata) opal_buffer_t *buf, void *cbdata)
{ {
send_buf_complete = true; send_buf_complete = true;
@ -476,7 +480,7 @@ static int queue_recv(rmcast_base_recv_t *recvptr,
static int basic_recv(orte_process_name_t *name, static int basic_recv(orte_process_name_t *name,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
struct iovec **msg, int *count) struct iovec **msg, int *count, orte_rmcast_seq_t *seq_num)
{ {
rmcast_base_recv_t *recvptr; rmcast_base_recv_t *recvptr;
int ret; int ret;
@ -501,6 +505,7 @@ static int basic_recv(orte_process_name_t *name,
} }
*msg = recvptr->iovec_array; *msg = recvptr->iovec_array;
*count = recvptr->iovec_count; *count = recvptr->iovec_count;
*seq_num = recvptr->seq_num;
/* remove the recv */ /* remove the recv */
OPAL_THREAD_LOCK(&lock); OPAL_THREAD_LOCK(&lock);
@ -546,7 +551,7 @@ static int basic_recv_nb(orte_rmcast_channel_t channel,
static int basic_recv_buffer(orte_process_name_t *name, static int basic_recv_buffer(orte_process_name_t *name,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf) opal_buffer_t *buf, orte_rmcast_seq_t *seq_num)
{ {
rmcast_base_recv_t *recvptr; rmcast_base_recv_t *recvptr;
int ret; int ret;
@ -572,6 +577,7 @@ static int basic_recv_buffer(orte_process_name_t *name,
name->jobid = recvptr->name.jobid; name->jobid = recvptr->name.jobid;
name->vpid = recvptr->name.vpid; name->vpid = recvptr->name.vpid;
} }
*seq_num = recvptr->seq_num;
if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) { if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
} }
@ -799,14 +805,15 @@ static void process_recv(int fd, short event, void *cbdata)
int32_t iovec_count=0, i, sz, n; int32_t iovec_count=0, i, sz, n;
opal_buffer_t *recvd_buf=NULL; opal_buffer_t *recvd_buf=NULL;
int rc; int rc;
int32_t recvd_seq_num;
/* extract the header */ /* extract the header */
ORTE_MULTICAST_MESSAGE_HDR_NTOH(msg->data, &name, tag); ORTE_MULTICAST_MESSAGE_HDR_NTOH(msg->data, &name, tag, recvd_seq_num);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic:recv sender: %s tag: %d", "%s rmcast:basic:recv sender: %s tag: %d seq_num: %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name), (int)tag)); ORTE_NAME_PRINT(&name), (int)tag, recvd_seq_num));
/* if this message is from myself, ignore it */ /* if this message is from myself, ignore it */
if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) { if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) {
@ -892,7 +899,8 @@ static void process_recv(int fd, short event, void *cbdata)
/* dealing with iovecs */ /* dealing with iovecs */
if (NULL != ptr->cbfunc_iovec) { if (NULL != ptr->cbfunc_iovec) {
ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, tag, ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, tag,
&name, iovec_array, iovec_count, ptr->cbdata); &name, recvd_seq_num,
iovec_array, iovec_count, ptr->cbdata);
/* if it isn't persistent, remove it */ /* if it isn't persistent, remove it */
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) { if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
OPAL_THREAD_LOCK(&lock); OPAL_THREAD_LOCK(&lock);
@ -916,7 +924,9 @@ static void process_recv(int fd, short event, void *cbdata)
} }
} else { } else {
if (NULL != ptr->cbfunc_buffer) { if (NULL != ptr->cbfunc_buffer) {
ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, tag, &name, recvd_buf, ptr->cbdata); ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, tag,
&name, recvd_seq_num,
recvd_buf, ptr->cbdata);
/* if it isn't persistent, remove it */ /* if it isn't persistent, remove it */
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) { if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
OPAL_THREAD_LOCK(&lock); OPAL_THREAD_LOCK(&lock);
@ -1158,7 +1168,7 @@ static void xmit_data(int sd, short flags, void* send_req)
snd = (rmcast_base_send_t*)item; snd = (rmcast_base_send_t*)item;
/* start the send data area with our header */ /* start the send data area with our header */
ORTE_MULTICAST_MESSAGE_HDR_HTON(chan->send_data, snd->tag); ORTE_MULTICAST_MESSAGE_HDR_HTON(chan->send_data, snd->tag, chan->seq_num);
/* are we sending a buffer? */ /* are we sending a buffer? */
if (NULL == snd->buf) { if (NULL == snd->buf) {
@ -1254,15 +1264,22 @@ static void xmit_data(int sd, short flags, void* send_req)
/* call the cbfunc if required */ /* call the cbfunc if required */
if (NULL != snd->cbfunc_buffer) { if (NULL != snd->cbfunc_buffer) {
snd->cbfunc_buffer(rc, chan->channel, snd->tag, ORTE_PROC_MY_NAME, snd->buf, snd->cbdata); snd->cbfunc_buffer(rc, chan->channel, snd->tag,
ORTE_PROC_MY_NAME, chan->seq_num,
snd->buf, snd->cbdata);
} }
} else { } else {
/* call the cbfunc if required */ /* call the cbfunc if required */
if (NULL != snd->cbfunc_iovec) { if (NULL != snd->cbfunc_iovec) {
snd->cbfunc_iovec(rc, chan->channel, snd->tag, ORTE_PROC_MY_NAME, snd->cbfunc_iovec(rc, chan->channel, snd->tag,
ORTE_PROC_MY_NAME, chan->seq_num,
snd->iovec_array, snd->iovec_count, snd->cbdata); snd->iovec_array, snd->iovec_count, snd->cbdata);
} }
} }
/* roll to next message sequence number */
ORTE_MULTICAST_NEXT_SEQUENCE_NUM(chan->seq_num);
/* cleanup */ /* cleanup */
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }

Просмотреть файл

@ -48,12 +48,14 @@ typedef void (*orte_rmcast_callback_buffer_fn_t)(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_process_name_t *sender, orte_process_name_t *sender,
orte_rmcast_seq_t seq_num,
opal_buffer_t *buf, void* cbdata); opal_buffer_t *buf, void* cbdata);
typedef void (*orte_rmcast_callback_fn_t)(int status, typedef void (*orte_rmcast_callback_fn_t)(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_process_name_t *sender, orte_process_name_t *sender,
orte_rmcast_seq_t seq_num,
struct iovec *msg, int count, void* cbdata); struct iovec *msg, int count, void* cbdata);
/* initialize the selected module */ /* initialize the selected module */
@ -90,7 +92,8 @@ typedef int (*orte_rmcast_base_module_send_nb_fn_t)(orte_rmcast_channel_t channe
typedef int (*orte_rmcast_base_module_recv_buffer_fn_t)(orte_process_name_t *sender, typedef int (*orte_rmcast_base_module_recv_buffer_fn_t)(orte_process_name_t *sender,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf); opal_buffer_t *buf,
orte_rmcast_seq_t *seq_num);
/* non-blocking receive buffer messages from a multicast channel */ /* non-blocking receive buffer messages from a multicast channel */
typedef int (*orte_rmcast_base_module_recv_buffer_nb_fn_t)(orte_rmcast_channel_t channel, typedef int (*orte_rmcast_base_module_recv_buffer_nb_fn_t)(orte_rmcast_channel_t channel,
@ -103,7 +106,8 @@ typedef int (*orte_rmcast_base_module_recv_buffer_nb_fn_t)(orte_rmcast_channel_t
typedef int (*orte_rmcast_base_module_recv_fn_t)(orte_process_name_t *sender, typedef int (*orte_rmcast_base_module_recv_fn_t)(orte_process_name_t *sender,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
struct iovec **msg, int *count); struct iovec **msg, int *count,
orte_rmcast_seq_t *seq_num);
/* non-blocking receive iovec messages from a multicast channel */ /* non-blocking receive iovec messages from a multicast channel */
typedef int (*orte_rmcast_base_module_recv_nb_fn_t)(orte_rmcast_channel_t channel, typedef int (*orte_rmcast_base_module_recv_nb_fn_t)(orte_rmcast_channel_t channel,

Просмотреть файл

@ -61,6 +61,10 @@ typedef uint8_t orte_rmcast_flag_t;
#define ORTE_RMCAST_NON_PERSISTENT 0x00 #define ORTE_RMCAST_NON_PERSISTENT 0x00
#define ORTE_RMCAST_PERSISTENT 0x01 #define ORTE_RMCAST_PERSISTENT 0x01
/* message sequence number */
typedef uint32_t orte_rmcast_seq_t;
#define ORTE_RMCAST_SEQ_MAX UINT32_MAX
END_C_DECLS END_C_DECLS

Просмотреть файл

@ -23,17 +23,20 @@ static void cbfunc(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_process_name_t *sender, orte_process_name_t *sender,
orte_rmcast_seq_t seq_num,
opal_buffer_t *buf, void *cbdata); opal_buffer_t *buf, void *cbdata);
static void cbfunc_buf_snt(int status, static void cbfunc_buf_snt(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_process_name_t *sender, orte_process_name_t *sender,
orte_rmcast_seq_t seq_num,
opal_buffer_t *buf, void *cbdata); opal_buffer_t *buf, void *cbdata);
static void cbfunc_iovec(int status, static void cbfunc_iovec(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_process_name_t *sender, orte_process_name_t *sender,
orte_rmcast_seq_t seq_num,
struct iovec *msg, int count, void* cbdata); struct iovec *msg, int count, void* cbdata);
orte_rmcast_channel_t chan=4; orte_rmcast_channel_t chan=4;
@ -140,6 +143,7 @@ static void cbfunc(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_process_name_t *sender, orte_process_name_t *sender,
orte_rmcast_seq_t seq_num,
opal_buffer_t *buffer, void *cbdata) opal_buffer_t *buffer, void *cbdata)
{ {
int32_t i32, rc; int32_t i32, rc;
@ -148,9 +152,9 @@ static void cbfunc(int status,
rc = 1; rc = 1;
opal_dss.unpack(buffer, &i32, &rc, OPAL_INT32); opal_dss.unpack(buffer, &i32, &rc, OPAL_INT32);
opal_output(0, "%s GOT BUFFER MESSAGE from %s on channel %d tag %d with value %d\n", opal_output(0, "%s GOT BUFFER MESSAGE from %s on channel %d tag %d seq_num %d with value %d\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), channel, tag, i32); ORTE_NAME_PRINT(sender), channel, tag, seq_num, i32);
if (i32 < 0) { if (i32 < 0) {
return; return;
@ -181,12 +185,13 @@ static void cbfunc_iovec(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_process_name_t *sender, orte_process_name_t *sender,
orte_rmcast_seq_t seq_num,
struct iovec *msg, int count, void* cbdata) struct iovec *msg, int count, void* cbdata)
{ {
int rc; int rc;
opal_output(0, "%s GOT IOVEC MESSAGE from %s of %d elements\n", opal_output(0, "%s GOT IOVEC MESSAGE from %s of %d elements on tag %d seq_num %d\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender), count); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender), count, tag, seq_num);
} }
@ -194,6 +199,7 @@ static void cbfunc_buf_snt(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_process_name_t *sender, orte_process_name_t *sender,
orte_rmcast_seq_t seq_num,
opal_buffer_t *buf, void *cbdata) opal_buffer_t *buf, void *cbdata)
{ {
opal_output(0, "%s BUFFERED_NB SEND COMPLETE\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_output(0, "%s BUFFERED_NB SEND COMPLETE\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));