diff --git a/orte/mca/db/daemon/db_daemon.c b/orte/mca/db/daemon/db_daemon.c index 60cc001d91..6d02550677 100644 --- a/orte/mca/db/daemon/db_daemon.c +++ b/orte/mca/db/daemon/db_daemon.c @@ -82,18 +82,21 @@ static orte_rmcast_channel_t my_group_channel; /* local functions */ static void callback_fn(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata); static void recv_cmd(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata); static void recv_ack(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata); @@ -322,6 +325,7 @@ static int remove_data(char *key) static void callback_fn(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata) @@ -331,6 +335,7 @@ static void callback_fn(int status, static void recv_ack(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata) @@ -342,6 +347,7 @@ static void recv_ack(int status, static void recv_cmd(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata) diff --git a/orte/mca/grpcomm/mcast/grpcomm_mcast.c b/orte/mca/grpcomm/mcast/grpcomm_mcast.c index caa50c842f..56d17e88e7 100644 --- a/orte/mca/grpcomm/mcast/grpcomm_mcast.c +++ b/orte/mca/grpcomm/mcast/grpcomm_mcast.c @@ -70,6 +70,7 @@ orte_grpcomm_base_module_t orte_grpcomm_mcast_module = { /* Local functions */ static void daemon_recv(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata); @@ -459,6 +460,7 @@ static int get_proc_attr(const orte_process_name_t proc, static void daemon_recv(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata) diff --git a/orte/mca/rmcast/base/private.h b/orte/mca/rmcast/base/private.h index be2cca38e9..24b8120493 100644 --- a/orte/mca/rmcast/base/private.h +++ b/orte/mca/rmcast/base/private.h @@ -75,6 +75,7 @@ typedef struct { opal_list_item_t item; orte_process_name_t name; orte_rmcast_channel_t channel; + orte_rmcast_seq_t seq_num; bool recvd; orte_rmcast_tag_t tag; orte_rmcast_flag_t flags; diff --git a/orte/mca/rmcast/base/rmcast_base_fns.c b/orte/mca/rmcast/base/rmcast_base_fns.c index 847193d5d9..285477ce08 100644 --- a/orte/mca/rmcast/base/rmcast_base_fns.c +++ b/orte/mca/rmcast/base/rmcast_base_fns.c @@ -264,7 +264,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) } OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, - "%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %d", + "%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&name), channel, (int)tag, (0 == flag) ? "iovecs" : "buffer", recvd_seq_num)); @@ -328,7 +328,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) "%s rmcast:base:recv delivering iovecs to channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag)); - ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, tag, + ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag, &name, iovec_array, iovec_count, ptr->cbdata); /* if it isn't persistent, remove it */ if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) { @@ -345,6 +345,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); goto cleanup; } + ptr->seq_num = recvd_seq_num; /* copy over the iovec array since it will be released by * the blocking recv */ @@ -370,7 +371,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) "%s rmcast:base:recv delivering buffer to channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag)); - ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, tag, + ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag, &name, recvd_buf, ptr->cbdata); /* if it isn't persistent, remove it */ if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) { @@ -390,7 +391,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg) OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:base:recv copying buffer for blocking recv", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - + ptr->seq_num = recvd_seq_num; /* copy the buffer across since it will be released * by the blocking recv */ diff --git a/orte/mca/rmcast/rmcast.h b/orte/mca/rmcast/rmcast.h index 91aba3cce3..b8430d0dad 100644 --- a/orte/mca/rmcast/rmcast.h +++ b/orte/mca/rmcast/rmcast.h @@ -52,12 +52,14 @@ BEGIN_C_DECLS */ typedef void (*orte_rmcast_callback_buffer_fn_t)(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata); typedef void (*orte_rmcast_callback_fn_t)(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, struct iovec *msg, int count, void* cbdata); @@ -96,6 +98,7 @@ typedef int (*orte_rmcast_base_module_send_nb_fn_t)(orte_rmcast_channel_t channe typedef int (*orte_rmcast_base_module_recv_buffer_fn_t)(orte_process_name_t *sender, orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, + orte_rmcast_seq_t *seq_num, opal_buffer_t *buf); /* non-blocking receive buffer messages from a multicast channel */ @@ -109,6 +112,7 @@ typedef int (*orte_rmcast_base_module_recv_buffer_nb_fn_t)(orte_rmcast_channel_t typedef int (*orte_rmcast_base_module_recv_fn_t)(orte_process_name_t *sender, orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, + orte_rmcast_seq_t *seq_num, struct iovec **msg, int *count); /* non-blocking receive iovec messages from a multicast channel */ diff --git a/orte/mca/rmcast/rmcast_types.h b/orte/mca/rmcast/rmcast_types.h index a1f01642b2..57cdf19006 100644 --- a/orte/mca/rmcast/rmcast_types.h +++ b/orte/mca/rmcast/rmcast_types.h @@ -74,10 +74,10 @@ typedef uint8_t orte_rmcast_flag_t; #define ORTE_RMCAST_PERSISTENT 0x01 /* message sequence number */ -typedef uint32_t orte_rmcast_seq_t; -#define ORTE_RMCAST_SEQ_MAX UINT32_MAX-1 -#define ORTE_RMCAST_SEQ_INVALID UINT32_MAX -#define ORTE_RMCAST_SEQ_T OPAL_UINT32 +typedef size_t orte_rmcast_seq_t; +#define ORTE_RMCAST_SEQ_MAX SIZE_MAX-1 +#define ORTE_RMCAST_SEQ_INVALID SIZE_MAX +#define ORTE_RMCAST_SEQ_T OPAL_SIZE END_C_DECLS diff --git a/orte/mca/rmcast/tcp/rmcast_tcp.c b/orte/mca/rmcast/tcp/rmcast_tcp.c index b3e1565b53..8908a43c61 100644 --- a/orte/mca/rmcast/tcp/rmcast_tcp.c +++ b/orte/mca/rmcast/tcp/rmcast_tcp.c @@ -83,6 +83,7 @@ static int tcp_send_nb(orte_rmcast_channel_t channel, static int tcp_recv_buffer(orte_process_name_t *sender, orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, + orte_rmcast_seq_t *seq_num, opal_buffer_t *buf); static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel, @@ -94,6 +95,7 @@ static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel, static int tcp_recv(orte_process_name_t *sender, orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, + orte_rmcast_seq_t *seq_num, struct iovec **msg, int *count); static int tcp_recv_nb(orte_rmcast_channel_t channel, @@ -278,6 +280,7 @@ static bool send_complete, send_buf_complete; static void internal_snd_cb(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, struct iovec *msg, int count, void *cbdata) @@ -287,6 +290,7 @@ static void internal_snd_cb(int status, static void internal_snd_buf_cb(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void *cbdata) @@ -444,14 +448,14 @@ static int queue_xmit(rmcast_base_send_t *snd, if (NULL != snd->buf) { /* call the cbfunc if required */ if (NULL != snd->cbfunc_buffer) { - snd->cbfunc_buffer(rc, channel, snd->tag, + snd->cbfunc_buffer(rc, channel, ch->seq_num, snd->tag, ORTE_PROC_MY_NAME, snd->buf, snd->cbdata); } } else { /* call the cbfunc if required */ if (NULL != snd->cbfunc_iovec) { - snd->cbfunc_iovec(rc, channel, snd->tag, + snd->cbfunc_iovec(rc, channel, ch->seq_num, snd->tag, ORTE_PROC_MY_NAME, snd->iovec_array, snd->iovec_count, snd->cbdata); } @@ -577,9 +581,10 @@ static int tcp_send_buffer_nb(orte_rmcast_channel_t channel, } static int tcp_recv(orte_process_name_t *name, - orte_rmcast_channel_t channel, - orte_rmcast_tag_t tag, - struct iovec **msg, int *count) + orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + orte_rmcast_seq_t *seq_num, + struct iovec **msg, int *count) { rmcast_base_recv_t *recvptr; int ret; @@ -608,6 +613,7 @@ static int tcp_recv(orte_process_name_t *name, name->jobid = recvptr->name.jobid; name->vpid = recvptr->name.vpid; } + *seq_num = recvptr->seq_num; *msg = recvptr->iovec_array; *count = recvptr->iovec_count; @@ -655,6 +661,7 @@ static int tcp_recv_nb(orte_rmcast_channel_t channel, static int tcp_recv_buffer(orte_process_name_t *name, orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, + orte_rmcast_seq_t *seq_num, opal_buffer_t *buf) { rmcast_base_recv_t *recvptr; @@ -688,6 +695,7 @@ static int tcp_recv_buffer(orte_process_name_t *name, name->jobid = recvptr->name.jobid; name->vpid = recvptr->name.vpid; } + *seq_num = recvptr->seq_num; if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) { ORTE_ERROR_LOG(ret); } diff --git a/orte/mca/rmcast/udp/rmcast_udp.c b/orte/mca/rmcast/udp/rmcast_udp.c index 8079a1d9ab..9bc09ccf8e 100644 --- a/orte/mca/rmcast/udp/rmcast_udp.c +++ b/orte/mca/rmcast/udp/rmcast_udp.c @@ -78,6 +78,7 @@ static int udp_send_nb(orte_rmcast_channel_t channel, static int udp_recv_buffer(orte_process_name_t *sender, orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, + orte_rmcast_seq_t *seq_num, opal_buffer_t *buf); static int udp_recv_buffer_nb(orte_rmcast_channel_t channel, @@ -89,6 +90,7 @@ static int udp_recv_buffer_nb(orte_rmcast_channel_t channel, static int udp_recv(orte_process_name_t *sender, orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, + orte_rmcast_seq_t *seq_num, struct iovec **msg, int *count); static int udp_recv_nb(orte_rmcast_channel_t channel, @@ -249,6 +251,7 @@ static void finalize(void) /* internal blocking send support */ static void internal_snd_cb(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, struct iovec *msg, int count, void *cbdata) @@ -258,6 +261,7 @@ static void internal_snd_cb(int status, static void internal_snd_buf_cb(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void *cbdata) @@ -432,6 +436,7 @@ static int udp_send_buffer_nb(orte_rmcast_channel_t channel, static int udp_recv(orte_process_name_t *name, orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, + orte_rmcast_seq_t *seq_num, struct iovec **msg, int *count) { rmcast_base_recv_t *recvptr; @@ -461,6 +466,7 @@ static int udp_recv(orte_process_name_t *name, name->jobid = recvptr->name.jobid; name->vpid = recvptr->name.vpid; } + *seq_num = recvptr->seq_num; *msg = recvptr->iovec_array; *count = recvptr->iovec_count; @@ -507,6 +513,7 @@ static int udp_recv_nb(orte_rmcast_channel_t channel, static int udp_recv_buffer(orte_process_name_t *name, orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, + orte_rmcast_seq_t *seq_num, opal_buffer_t *buf) { rmcast_base_recv_t *recvptr; @@ -540,6 +547,7 @@ static int udp_recv_buffer(orte_process_name_t *name, name->jobid = recvptr->name.jobid; name->vpid = recvptr->name.vpid; } + *seq_num = recvptr->seq_num; if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) { ORTE_ERROR_LOG(ret); } @@ -1005,13 +1013,13 @@ static int xmit_data(rmcast_base_channel_t *chan, rmcast_base_send_t *snd) if (NULL != snd->buf) { /* call the cbfunc if required */ if (NULL != snd->cbfunc_buffer) { - snd->cbfunc_buffer(rc, chan->channel, snd->tag, + snd->cbfunc_buffer(rc, chan->channel, chan->seq_num, snd->tag, ORTE_PROC_MY_NAME, snd->buf, snd->cbdata); } } else { /* call the cbfunc if required */ if (NULL != snd->cbfunc_iovec) { - snd->cbfunc_iovec(rc, chan->channel, snd->tag, ORTE_PROC_MY_NAME, + snd->cbfunc_iovec(rc, chan->channel, chan->seq_num, snd->tag, ORTE_PROC_MY_NAME, snd->iovec_array, snd->iovec_count, snd->cbdata); } } diff --git a/orte/mca/sensor/heartbeat/sensor_heartbeat.c b/orte/mca/sensor/heartbeat/sensor_heartbeat.c index 7a4da0916a..ab8c39f86b 100644 --- a/orte/mca/sensor/heartbeat/sensor_heartbeat.c +++ b/orte/mca/sensor/heartbeat/sensor_heartbeat.c @@ -62,11 +62,13 @@ static void send_heartbeat(int fd, short event, void *arg); #if ORTE_ENABLE_MULTICAST static void recv_rmcast_beats(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata); static void rmcast_callback_fn(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata); @@ -305,6 +307,7 @@ static void check_heartbeat(int fd, short dummy, void *arg) #if ORTE_ENABLE_MULTICAST static void recv_rmcast_beats(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata) @@ -335,6 +338,7 @@ static void recv_rmcast_beats(int status, static void rmcast_callback_fn(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void* cbdata) diff --git a/orte/test/system/orte_mcast.c b/orte/test/system/orte_mcast.c index 2448b66b26..17f82d38dd 100644 --- a/orte/test/system/orte_mcast.c +++ b/orte/test/system/orte_mcast.c @@ -21,22 +21,27 @@ static void cbfunc(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void *cbdata); static void cbfunc_buf_snt(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void *cbdata); static void cbfunc_iovec(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, struct iovec *msg, int count, void* cbdata); static int datasize=1024; +static orte_rmcast_seq_t recvd_seq_num=0; +static orte_rmcast_seq_t sent_seq_num=0; static void send_data(int fd, short flags, void *arg) { @@ -58,7 +63,8 @@ static void send_data(int fd, short flags, void *arg) cbfunc_buf_snt, NULL))) { ORTE_ERROR_LOG(rc); return; - } + } + sent_seq_num++; /* create an iovec array */ for (i=0; i < 3; i++) { iovec_array[i].iov_base = (uint8_t*)malloc(datasize); @@ -71,10 +77,16 @@ static void send_data(int fd, short flags, void *arg) ORTE_ERROR_LOG(rc); return; } + sent_seq_num++; + + if (0 == (sent_seq_num % 100)) { + opal_output(0, "SENT SEQ_NUM %lu", sent_seq_num); + } + /* reset the timer */ - now.tv_sec = 5; - now.tv_usec = 0; - opal_evtimer_add(tmp, &now); + now.tv_sec = 0; + now.tv_usec = 1000; + opal_event_evtimer_add(tmp, &now); } int main(int argc, char* argv[]) @@ -107,7 +119,7 @@ int main(int argc, char* argv[]) orte_grpcomm.barrier(); /* wake up every 5 seconds and send something */ - ORTE_TIMER_EVENT(5, 0, send_data); + ORTE_TIMER_EVENT(0, 1000, send_data); } else { /* setup to recv data on our channel */ if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_GROUP_OUTPUT_CHANNEL, @@ -133,6 +145,7 @@ blast: static void cbfunc(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buffer, void *cbdata) @@ -143,32 +156,46 @@ static void cbfunc(int status, rc = 1; opal_dss.unpack(buffer, &i32, &rc, OPAL_INT32); - opal_output(0, "%s GOT BUFFER MESSAGE from %s on channel %d tag %d with value %d\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(sender), channel, tag, i32); + if (0 < recvd_seq_num) { + if ((seq_num - recvd_seq_num) != 1) { + opal_output(0, "%s MESSAGE LOST seq %lu recvd_seq %lu", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), seq_num, recvd_seq_num); + } + } + recvd_seq_num = seq_num; + + if (0 == (recvd_seq_num % 100)) { + opal_output(0, "RECVD SEQ_NUM %lu", recvd_seq_num); + } } static void cbfunc_iovec(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, struct iovec *msg, int count, void* cbdata) { - int rc; - - opal_output(0, "%s GOT IOVEC MESSAGE from %s of %d elements on tag %d\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender), count, tag); + if (0 < recvd_seq_num) { + if ((seq_num - recvd_seq_num) != 1) { + opal_output(0, "%s MESSAGE LOST seq %lu recvd_seq %lu", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), seq_num, recvd_seq_num); + } + } + recvd_seq_num = seq_num; + if (0 == (recvd_seq_num % 100)) { + opal_output(0, "RECVD SEQ_NUM %lu", recvd_seq_num); + } } static void cbfunc_buf_snt(int status, orte_rmcast_channel_t channel, + orte_rmcast_seq_t seq_num, orte_rmcast_tag_t tag, orte_process_name_t *sender, opal_buffer_t *buf, void *cbdata) -{ - opal_output(0, "%s BUFFERED_NB SEND COMPLETE\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - +{ OBJ_RELEASE(buf); }