Update the rmcast callback function API to return message sequence number. Update orte_mcast test to stress the system.
This commit was SVN r24004.
Этот коммит содержится в:
родитель
e0660101d3
Коммит
bf665692c3
@ -82,18 +82,21 @@ static orte_rmcast_channel_t my_group_channel;
|
|||||||
/* local functions */
|
/* local functions */
|
||||||
static void callback_fn(int status,
|
static void callback_fn(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata);
|
opal_buffer_t *buf, void* cbdata);
|
||||||
|
|
||||||
static void recv_cmd(int status,
|
static void recv_cmd(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata);
|
opal_buffer_t *buf, void* cbdata);
|
||||||
|
|
||||||
static void recv_ack(int status,
|
static void recv_ack(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata);
|
opal_buffer_t *buf, void* cbdata);
|
||||||
@ -322,6 +325,7 @@ static int remove_data(char *key)
|
|||||||
|
|
||||||
static void callback_fn(int status,
|
static void callback_fn(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata)
|
opal_buffer_t *buf, void* cbdata)
|
||||||
@ -331,6 +335,7 @@ static void callback_fn(int status,
|
|||||||
|
|
||||||
static void recv_ack(int status,
|
static void recv_ack(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata)
|
opal_buffer_t *buf, void* cbdata)
|
||||||
@ -342,6 +347,7 @@ static void recv_ack(int status,
|
|||||||
|
|
||||||
static void recv_cmd(int status,
|
static void recv_cmd(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata)
|
opal_buffer_t *buf, void* cbdata)
|
||||||
|
@ -70,6 +70,7 @@ orte_grpcomm_base_module_t orte_grpcomm_mcast_module = {
|
|||||||
/* Local functions */
|
/* Local functions */
|
||||||
static void daemon_recv(int status,
|
static void daemon_recv(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata);
|
opal_buffer_t *buf, void* cbdata);
|
||||||
@ -459,6 +460,7 @@ static int get_proc_attr(const orte_process_name_t proc,
|
|||||||
|
|
||||||
static void daemon_recv(int status,
|
static void daemon_recv(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata)
|
opal_buffer_t *buf, void* cbdata)
|
||||||
|
@ -75,6 +75,7 @@ typedef struct {
|
|||||||
opal_list_item_t item;
|
opal_list_item_t item;
|
||||||
orte_process_name_t name;
|
orte_process_name_t name;
|
||||||
orte_rmcast_channel_t channel;
|
orte_rmcast_channel_t channel;
|
||||||
|
orte_rmcast_seq_t seq_num;
|
||||||
bool recvd;
|
bool recvd;
|
||||||
orte_rmcast_tag_t tag;
|
orte_rmcast_tag_t tag;
|
||||||
orte_rmcast_flag_t flags;
|
orte_rmcast_flag_t flags;
|
||||||
|
@ -264,7 +264,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||||
"%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %d",
|
"%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %lu",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_NAME_PRINT(&name), channel, (int)tag,
|
ORTE_NAME_PRINT(&name), channel, (int)tag,
|
||||||
(0 == flag) ? "iovecs" : "buffer", recvd_seq_num));
|
(0 == flag) ? "iovecs" : "buffer", recvd_seq_num));
|
||||||
@ -328,7 +328,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg)
|
|||||||
"%s rmcast:base:recv delivering iovecs to channel %d tag %d",
|
"%s rmcast:base:recv delivering iovecs to channel %d tag %d",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||||
|
|
||||||
ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, tag,
|
ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag,
|
||||||
&name, iovec_array, iovec_count, ptr->cbdata);
|
&name, iovec_array, iovec_count, ptr->cbdata);
|
||||||
/* if it isn't persistent, remove it */
|
/* if it isn't persistent, remove it */
|
||||||
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
||||||
@ -345,6 +345,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg)
|
|||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
ptr->seq_num = recvd_seq_num;
|
||||||
/* copy over the iovec array since it will be released by
|
/* copy over the iovec array since it will be released by
|
||||||
* the blocking recv
|
* the blocking recv
|
||||||
*/
|
*/
|
||||||
@ -370,7 +371,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg)
|
|||||||
"%s rmcast:base:recv delivering buffer to channel %d tag %d",
|
"%s rmcast:base:recv delivering buffer to channel %d tag %d",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||||
|
|
||||||
ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, tag,
|
ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag,
|
||||||
&name, recvd_buf, ptr->cbdata);
|
&name, recvd_buf, ptr->cbdata);
|
||||||
/* if it isn't persistent, remove it */
|
/* if it isn't persistent, remove it */
|
||||||
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
||||||
@ -390,7 +391,7 @@ void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg)
|
|||||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||||
"%s rmcast:base:recv copying buffer for blocking recv",
|
"%s rmcast:base:recv copying buffer for blocking recv",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
ptr->seq_num = recvd_seq_num;
|
||||||
/* copy the buffer across since it will be released
|
/* copy the buffer across since it will be released
|
||||||
* by the blocking recv
|
* by the blocking recv
|
||||||
*/
|
*/
|
||||||
|
@ -52,12 +52,14 @@ BEGIN_C_DECLS
|
|||||||
*/
|
*/
|
||||||
typedef void (*orte_rmcast_callback_buffer_fn_t)(int status,
|
typedef void (*orte_rmcast_callback_buffer_fn_t)(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata);
|
opal_buffer_t *buf, void* cbdata);
|
||||||
|
|
||||||
typedef void (*orte_rmcast_callback_fn_t)(int status,
|
typedef void (*orte_rmcast_callback_fn_t)(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
struct iovec *msg, int count, void* cbdata);
|
struct iovec *msg, int count, void* cbdata);
|
||||||
@ -96,6 +98,7 @@ typedef int (*orte_rmcast_base_module_send_nb_fn_t)(orte_rmcast_channel_t channe
|
|||||||
typedef int (*orte_rmcast_base_module_recv_buffer_fn_t)(orte_process_name_t *sender,
|
typedef int (*orte_rmcast_base_module_recv_buffer_fn_t)(orte_process_name_t *sender,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
|
orte_rmcast_seq_t *seq_num,
|
||||||
opal_buffer_t *buf);
|
opal_buffer_t *buf);
|
||||||
|
|
||||||
/* non-blocking receive buffer messages from a multicast channel */
|
/* non-blocking receive buffer messages from a multicast channel */
|
||||||
@ -109,6 +112,7 @@ typedef int (*orte_rmcast_base_module_recv_buffer_nb_fn_t)(orte_rmcast_channel_t
|
|||||||
typedef int (*orte_rmcast_base_module_recv_fn_t)(orte_process_name_t *sender,
|
typedef int (*orte_rmcast_base_module_recv_fn_t)(orte_process_name_t *sender,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
|
orte_rmcast_seq_t *seq_num,
|
||||||
struct iovec **msg, int *count);
|
struct iovec **msg, int *count);
|
||||||
|
|
||||||
/* non-blocking receive iovec messages from a multicast channel */
|
/* non-blocking receive iovec messages from a multicast channel */
|
||||||
|
@ -74,10 +74,10 @@ typedef uint8_t orte_rmcast_flag_t;
|
|||||||
#define ORTE_RMCAST_PERSISTENT 0x01
|
#define ORTE_RMCAST_PERSISTENT 0x01
|
||||||
|
|
||||||
/* message sequence number */
|
/* message sequence number */
|
||||||
typedef uint32_t orte_rmcast_seq_t;
|
typedef size_t orte_rmcast_seq_t;
|
||||||
#define ORTE_RMCAST_SEQ_MAX UINT32_MAX-1
|
#define ORTE_RMCAST_SEQ_MAX SIZE_MAX-1
|
||||||
#define ORTE_RMCAST_SEQ_INVALID UINT32_MAX
|
#define ORTE_RMCAST_SEQ_INVALID SIZE_MAX
|
||||||
#define ORTE_RMCAST_SEQ_T OPAL_UINT32
|
#define ORTE_RMCAST_SEQ_T OPAL_SIZE
|
||||||
|
|
||||||
END_C_DECLS
|
END_C_DECLS
|
||||||
|
|
||||||
|
@ -83,6 +83,7 @@ static int tcp_send_nb(orte_rmcast_channel_t channel,
|
|||||||
static int tcp_recv_buffer(orte_process_name_t *sender,
|
static int tcp_recv_buffer(orte_process_name_t *sender,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
|
orte_rmcast_seq_t *seq_num,
|
||||||
opal_buffer_t *buf);
|
opal_buffer_t *buf);
|
||||||
|
|
||||||
static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel,
|
static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel,
|
||||||
@ -94,6 +95,7 @@ static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel,
|
|||||||
static int tcp_recv(orte_process_name_t *sender,
|
static int tcp_recv(orte_process_name_t *sender,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
|
orte_rmcast_seq_t *seq_num,
|
||||||
struct iovec **msg, int *count);
|
struct iovec **msg, int *count);
|
||||||
|
|
||||||
static int tcp_recv_nb(orte_rmcast_channel_t channel,
|
static int tcp_recv_nb(orte_rmcast_channel_t channel,
|
||||||
@ -278,6 +280,7 @@ static bool send_complete, send_buf_complete;
|
|||||||
|
|
||||||
static void internal_snd_cb(int status,
|
static void internal_snd_cb(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
struct iovec *msg, int count, void *cbdata)
|
struct iovec *msg, int count, void *cbdata)
|
||||||
@ -287,6 +290,7 @@ static void internal_snd_cb(int status,
|
|||||||
|
|
||||||
static void internal_snd_buf_cb(int status,
|
static void internal_snd_buf_cb(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void *cbdata)
|
opal_buffer_t *buf, void *cbdata)
|
||||||
@ -444,14 +448,14 @@ static int queue_xmit(rmcast_base_send_t *snd,
|
|||||||
if (NULL != snd->buf) {
|
if (NULL != snd->buf) {
|
||||||
/* call the cbfunc if required */
|
/* call the cbfunc if required */
|
||||||
if (NULL != snd->cbfunc_buffer) {
|
if (NULL != snd->cbfunc_buffer) {
|
||||||
snd->cbfunc_buffer(rc, channel, snd->tag,
|
snd->cbfunc_buffer(rc, channel, ch->seq_num, snd->tag,
|
||||||
ORTE_PROC_MY_NAME,
|
ORTE_PROC_MY_NAME,
|
||||||
snd->buf, snd->cbdata);
|
snd->buf, snd->cbdata);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* call the cbfunc if required */
|
/* call the cbfunc if required */
|
||||||
if (NULL != snd->cbfunc_iovec) {
|
if (NULL != snd->cbfunc_iovec) {
|
||||||
snd->cbfunc_iovec(rc, channel, snd->tag,
|
snd->cbfunc_iovec(rc, channel, ch->seq_num, snd->tag,
|
||||||
ORTE_PROC_MY_NAME,
|
ORTE_PROC_MY_NAME,
|
||||||
snd->iovec_array, snd->iovec_count, snd->cbdata);
|
snd->iovec_array, snd->iovec_count, snd->cbdata);
|
||||||
}
|
}
|
||||||
@ -577,9 +581,10 @@ static int tcp_send_buffer_nb(orte_rmcast_channel_t channel,
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int tcp_recv(orte_process_name_t *name,
|
static int tcp_recv(orte_process_name_t *name,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
struct iovec **msg, int *count)
|
orte_rmcast_seq_t *seq_num,
|
||||||
|
struct iovec **msg, int *count)
|
||||||
{
|
{
|
||||||
rmcast_base_recv_t *recvptr;
|
rmcast_base_recv_t *recvptr;
|
||||||
int ret;
|
int ret;
|
||||||
@ -608,6 +613,7 @@ static int tcp_recv(orte_process_name_t *name,
|
|||||||
name->jobid = recvptr->name.jobid;
|
name->jobid = recvptr->name.jobid;
|
||||||
name->vpid = recvptr->name.vpid;
|
name->vpid = recvptr->name.vpid;
|
||||||
}
|
}
|
||||||
|
*seq_num = recvptr->seq_num;
|
||||||
*msg = recvptr->iovec_array;
|
*msg = recvptr->iovec_array;
|
||||||
*count = recvptr->iovec_count;
|
*count = recvptr->iovec_count;
|
||||||
|
|
||||||
@ -655,6 +661,7 @@ static int tcp_recv_nb(orte_rmcast_channel_t channel,
|
|||||||
static int tcp_recv_buffer(orte_process_name_t *name,
|
static int tcp_recv_buffer(orte_process_name_t *name,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
|
orte_rmcast_seq_t *seq_num,
|
||||||
opal_buffer_t *buf)
|
opal_buffer_t *buf)
|
||||||
{
|
{
|
||||||
rmcast_base_recv_t *recvptr;
|
rmcast_base_recv_t *recvptr;
|
||||||
@ -688,6 +695,7 @@ static int tcp_recv_buffer(orte_process_name_t *name,
|
|||||||
name->jobid = recvptr->name.jobid;
|
name->jobid = recvptr->name.jobid;
|
||||||
name->vpid = recvptr->name.vpid;
|
name->vpid = recvptr->name.vpid;
|
||||||
}
|
}
|
||||||
|
*seq_num = recvptr->seq_num;
|
||||||
if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) {
|
if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) {
|
||||||
ORTE_ERROR_LOG(ret);
|
ORTE_ERROR_LOG(ret);
|
||||||
}
|
}
|
||||||
|
@ -78,6 +78,7 @@ static int udp_send_nb(orte_rmcast_channel_t channel,
|
|||||||
static int udp_recv_buffer(orte_process_name_t *sender,
|
static int udp_recv_buffer(orte_process_name_t *sender,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
|
orte_rmcast_seq_t *seq_num,
|
||||||
opal_buffer_t *buf);
|
opal_buffer_t *buf);
|
||||||
|
|
||||||
static int udp_recv_buffer_nb(orte_rmcast_channel_t channel,
|
static int udp_recv_buffer_nb(orte_rmcast_channel_t channel,
|
||||||
@ -89,6 +90,7 @@ static int udp_recv_buffer_nb(orte_rmcast_channel_t channel,
|
|||||||
static int udp_recv(orte_process_name_t *sender,
|
static int udp_recv(orte_process_name_t *sender,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
|
orte_rmcast_seq_t *seq_num,
|
||||||
struct iovec **msg, int *count);
|
struct iovec **msg, int *count);
|
||||||
|
|
||||||
static int udp_recv_nb(orte_rmcast_channel_t channel,
|
static int udp_recv_nb(orte_rmcast_channel_t channel,
|
||||||
@ -249,6 +251,7 @@ static void finalize(void)
|
|||||||
/* internal blocking send support */
|
/* internal blocking send support */
|
||||||
static void internal_snd_cb(int status,
|
static void internal_snd_cb(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
struct iovec *msg, int count, void *cbdata)
|
struct iovec *msg, int count, void *cbdata)
|
||||||
@ -258,6 +261,7 @@ static void internal_snd_cb(int status,
|
|||||||
|
|
||||||
static void internal_snd_buf_cb(int status,
|
static void internal_snd_buf_cb(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void *cbdata)
|
opal_buffer_t *buf, void *cbdata)
|
||||||
@ -432,6 +436,7 @@ static int udp_send_buffer_nb(orte_rmcast_channel_t channel,
|
|||||||
static int udp_recv(orte_process_name_t *name,
|
static int udp_recv(orte_process_name_t *name,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
|
orte_rmcast_seq_t *seq_num,
|
||||||
struct iovec **msg, int *count)
|
struct iovec **msg, int *count)
|
||||||
{
|
{
|
||||||
rmcast_base_recv_t *recvptr;
|
rmcast_base_recv_t *recvptr;
|
||||||
@ -461,6 +466,7 @@ static int udp_recv(orte_process_name_t *name,
|
|||||||
name->jobid = recvptr->name.jobid;
|
name->jobid = recvptr->name.jobid;
|
||||||
name->vpid = recvptr->name.vpid;
|
name->vpid = recvptr->name.vpid;
|
||||||
}
|
}
|
||||||
|
*seq_num = recvptr->seq_num;
|
||||||
*msg = recvptr->iovec_array;
|
*msg = recvptr->iovec_array;
|
||||||
*count = recvptr->iovec_count;
|
*count = recvptr->iovec_count;
|
||||||
|
|
||||||
@ -507,6 +513,7 @@ static int udp_recv_nb(orte_rmcast_channel_t channel,
|
|||||||
static int udp_recv_buffer(orte_process_name_t *name,
|
static int udp_recv_buffer(orte_process_name_t *name,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
|
orte_rmcast_seq_t *seq_num,
|
||||||
opal_buffer_t *buf)
|
opal_buffer_t *buf)
|
||||||
{
|
{
|
||||||
rmcast_base_recv_t *recvptr;
|
rmcast_base_recv_t *recvptr;
|
||||||
@ -540,6 +547,7 @@ static int udp_recv_buffer(orte_process_name_t *name,
|
|||||||
name->jobid = recvptr->name.jobid;
|
name->jobid = recvptr->name.jobid;
|
||||||
name->vpid = recvptr->name.vpid;
|
name->vpid = recvptr->name.vpid;
|
||||||
}
|
}
|
||||||
|
*seq_num = recvptr->seq_num;
|
||||||
if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) {
|
if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(buf, recvptr->buf))) {
|
||||||
ORTE_ERROR_LOG(ret);
|
ORTE_ERROR_LOG(ret);
|
||||||
}
|
}
|
||||||
@ -1005,13 +1013,13 @@ static int xmit_data(rmcast_base_channel_t *chan, rmcast_base_send_t *snd)
|
|||||||
if (NULL != snd->buf) {
|
if (NULL != snd->buf) {
|
||||||
/* call the cbfunc if required */
|
/* call the cbfunc if required */
|
||||||
if (NULL != snd->cbfunc_buffer) {
|
if (NULL != snd->cbfunc_buffer) {
|
||||||
snd->cbfunc_buffer(rc, chan->channel, snd->tag,
|
snd->cbfunc_buffer(rc, chan->channel, chan->seq_num, snd->tag,
|
||||||
ORTE_PROC_MY_NAME, snd->buf, snd->cbdata);
|
ORTE_PROC_MY_NAME, snd->buf, snd->cbdata);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* call the cbfunc if required */
|
/* call the cbfunc if required */
|
||||||
if (NULL != snd->cbfunc_iovec) {
|
if (NULL != snd->cbfunc_iovec) {
|
||||||
snd->cbfunc_iovec(rc, chan->channel, snd->tag, ORTE_PROC_MY_NAME,
|
snd->cbfunc_iovec(rc, chan->channel, chan->seq_num, snd->tag, ORTE_PROC_MY_NAME,
|
||||||
snd->iovec_array, snd->iovec_count, snd->cbdata);
|
snd->iovec_array, snd->iovec_count, snd->cbdata);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -62,11 +62,13 @@ static void send_heartbeat(int fd, short event, void *arg);
|
|||||||
#if ORTE_ENABLE_MULTICAST
|
#if ORTE_ENABLE_MULTICAST
|
||||||
static void recv_rmcast_beats(int status,
|
static void recv_rmcast_beats(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata);
|
opal_buffer_t *buf, void* cbdata);
|
||||||
static void rmcast_callback_fn(int status,
|
static void rmcast_callback_fn(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata);
|
opal_buffer_t *buf, void* cbdata);
|
||||||
@ -305,6 +307,7 @@ static void check_heartbeat(int fd, short dummy, void *arg)
|
|||||||
#if ORTE_ENABLE_MULTICAST
|
#if ORTE_ENABLE_MULTICAST
|
||||||
static void recv_rmcast_beats(int status,
|
static void recv_rmcast_beats(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata)
|
opal_buffer_t *buf, void* cbdata)
|
||||||
@ -335,6 +338,7 @@ static void recv_rmcast_beats(int status,
|
|||||||
|
|
||||||
static void rmcast_callback_fn(int status,
|
static void rmcast_callback_fn(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void* cbdata)
|
opal_buffer_t *buf, void* cbdata)
|
||||||
|
@ -21,22 +21,27 @@
|
|||||||
|
|
||||||
static void cbfunc(int status,
|
static void cbfunc(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void *cbdata);
|
opal_buffer_t *buf, void *cbdata);
|
||||||
static void cbfunc_buf_snt(int status,
|
static void cbfunc_buf_snt(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void *cbdata);
|
opal_buffer_t *buf, void *cbdata);
|
||||||
|
|
||||||
static void cbfunc_iovec(int status,
|
static void cbfunc_iovec(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
struct iovec *msg, int count, void* cbdata);
|
struct iovec *msg, int count, void* cbdata);
|
||||||
|
|
||||||
static int datasize=1024;
|
static int datasize=1024;
|
||||||
|
static orte_rmcast_seq_t recvd_seq_num=0;
|
||||||
|
static orte_rmcast_seq_t sent_seq_num=0;
|
||||||
|
|
||||||
static void send_data(int fd, short flags, void *arg)
|
static void send_data(int fd, short flags, void *arg)
|
||||||
{
|
{
|
||||||
@ -58,7 +63,8 @@ static void send_data(int fd, short flags, void *arg)
|
|||||||
cbfunc_buf_snt, NULL))) {
|
cbfunc_buf_snt, NULL))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
sent_seq_num++;
|
||||||
/* create an iovec array */
|
/* create an iovec array */
|
||||||
for (i=0; i < 3; i++) {
|
for (i=0; i < 3; i++) {
|
||||||
iovec_array[i].iov_base = (uint8_t*)malloc(datasize);
|
iovec_array[i].iov_base = (uint8_t*)malloc(datasize);
|
||||||
@ -71,10 +77,16 @@ static void send_data(int fd, short flags, void *arg)
|
|||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
sent_seq_num++;
|
||||||
|
|
||||||
|
if (0 == (sent_seq_num % 100)) {
|
||||||
|
opal_output(0, "SENT SEQ_NUM %lu", sent_seq_num);
|
||||||
|
}
|
||||||
|
|
||||||
/* reset the timer */
|
/* reset the timer */
|
||||||
now.tv_sec = 5;
|
now.tv_sec = 0;
|
||||||
now.tv_usec = 0;
|
now.tv_usec = 1000;
|
||||||
opal_evtimer_add(tmp, &now);
|
opal_event_evtimer_add(tmp, &now);
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int argc, char* argv[])
|
int main(int argc, char* argv[])
|
||||||
@ -107,7 +119,7 @@ int main(int argc, char* argv[])
|
|||||||
orte_grpcomm.barrier();
|
orte_grpcomm.barrier();
|
||||||
|
|
||||||
/* wake up every 5 seconds and send something */
|
/* wake up every 5 seconds and send something */
|
||||||
ORTE_TIMER_EVENT(5, 0, send_data);
|
ORTE_TIMER_EVENT(0, 1000, send_data);
|
||||||
} else {
|
} else {
|
||||||
/* setup to recv data on our channel */
|
/* setup to recv data on our channel */
|
||||||
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_GROUP_OUTPUT_CHANNEL,
|
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_GROUP_OUTPUT_CHANNEL,
|
||||||
@ -133,6 +145,7 @@ blast:
|
|||||||
|
|
||||||
static void cbfunc(int status,
|
static void cbfunc(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buffer, void *cbdata)
|
opal_buffer_t *buffer, void *cbdata)
|
||||||
@ -143,32 +156,46 @@ static void cbfunc(int status,
|
|||||||
rc = 1;
|
rc = 1;
|
||||||
opal_dss.unpack(buffer, &i32, &rc, OPAL_INT32);
|
opal_dss.unpack(buffer, &i32, &rc, OPAL_INT32);
|
||||||
|
|
||||||
opal_output(0, "%s GOT BUFFER MESSAGE from %s on channel %d tag %d with value %d\n",
|
if (0 < recvd_seq_num) {
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
if ((seq_num - recvd_seq_num) != 1) {
|
||||||
ORTE_NAME_PRINT(sender), channel, tag, i32);
|
opal_output(0, "%s MESSAGE LOST seq %lu recvd_seq %lu",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), seq_num, recvd_seq_num);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
recvd_seq_num = seq_num;
|
||||||
|
|
||||||
|
if (0 == (recvd_seq_num % 100)) {
|
||||||
|
opal_output(0, "RECVD SEQ_NUM %lu", recvd_seq_num);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cbfunc_iovec(int status,
|
static void cbfunc_iovec(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
struct iovec *msg, int count, void* cbdata)
|
struct iovec *msg, int count, void* cbdata)
|
||||||
{
|
{
|
||||||
int rc;
|
if (0 < recvd_seq_num) {
|
||||||
|
if ((seq_num - recvd_seq_num) != 1) {
|
||||||
opal_output(0, "%s GOT IOVEC MESSAGE from %s of %d elements on tag %d\n",
|
opal_output(0, "%s MESSAGE LOST seq %lu recvd_seq %lu",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender), count, tag);
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), seq_num, recvd_seq_num);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
recvd_seq_num = seq_num;
|
||||||
|
|
||||||
|
if (0 == (recvd_seq_num % 100)) {
|
||||||
|
opal_output(0, "RECVD SEQ_NUM %lu", recvd_seq_num);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cbfunc_buf_snt(int status,
|
static void cbfunc_buf_snt(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
|
orte_rmcast_seq_t seq_num,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_process_name_t *sender,
|
orte_process_name_t *sender,
|
||||||
opal_buffer_t *buf, void *cbdata)
|
opal_buffer_t *buf, void *cbdata)
|
||||||
{
|
{
|
||||||
opal_output(0, "%s BUFFERED_NB SEND COMPLETE\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
||||||
|
|
||||||
OBJ_RELEASE(buf);
|
OBJ_RELEASE(buf);
|
||||||
}
|
}
|
||||||
|
Загрузка…
Ссылка в новой задаче
Block a user