1
1

Use non-blocking sends for recovering from lost multicast messages

This commit was SVN r24943.
Этот коммит содержится в:
Ralph Castain 2011-07-25 18:49:47 +00:00
родитель adde221413
Коммит db193555c2
2 изменённых файлов: 36 добавлений и 17 удалений

Просмотреть файл

@ -97,6 +97,15 @@ void orte_rmcast_base_stop_threads(void)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
static void cbfunc(int status,
orte_process_name_t *peer,
opal_buffer_t *buffer,
orte_rml_tag_t tag,
void* cbdata)
{
OBJ_RELEASE(buffer);
}
void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg)
{
orte_rmcast_channel_t channel;
@ -112,7 +121,7 @@ void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg)
rmcast_seq_tracker_t *trkr, *tptr;
rmcast_recv_log_t *log, *logptr;
bool restart;
opal_buffer_t alert;
opal_buffer_t *alert;
/* extract the header */
if (ORTE_SUCCESS != (rc = extract_hdr(msg->buf, &name, &channel, &tag, &restart, &recvd_seq_num))) {
@ -213,24 +222,22 @@ void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg)
if (1 != (recvd_seq_num - trkr->seq_num) ||
(ORTE_RMCAST_SEQ_MAX == trkr->seq_num && 0 != recvd_seq_num)) {
/* missing a message - request it */
OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output,
"%s Missed msg %d (%d) on channel %d from source %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recvd_seq_num,
trkr->seq_num, channel, ORTE_NAME_PRINT(&name)));
OBJ_CONSTRUCT(&alert, opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &channel, 1, ORTE_RMCAST_CHANNEL_T))) {
opal_output(0, "%s Missed msg %d (%d) on channel %d from source %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recvd_seq_num,
trkr->seq_num, channel, ORTE_NAME_PRINT(&name));
alert = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &channel, 1, ORTE_RMCAST_CHANNEL_T))) {
ORTE_ERROR_LOG(rc);
exit(1);
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &trkr->seq_num, 1, ORTE_RMCAST_SEQ_T))) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &trkr->seq_num, 1, ORTE_RMCAST_SEQ_T))) {
ORTE_ERROR_LOG(rc);
exit(1);
}
if (0 > (rc = orte_rml.send_buffer(&name, &alert, ORTE_RML_TAG_MISSED_MSG, 0))) {
if (0 > (rc = orte_rml.send_buffer_nb(&name, alert, ORTE_RML_TAG_MISSED_MSG, 0, cbfunc, NULL))) {
ORTE_ERROR_LOG(rc);
exit(1);
}
OBJ_DESTRUCT(&alert);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
@ -334,7 +341,7 @@ void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg)
"%s rmcast:base:process_recv delivering iovecs to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv->channel, (int)tag));
recv->cbfunc_iovec(ORTE_SUCCESS, recv->channel, recv->seq_num, tag,
&name, iovec_array, iovec_count, recv->cbdata);
&name, iovec_array, iovec_count, recv->cbdata);
} else {
/* if something is already present, then we have a problem */
if (NULL != recv->iovec_array) {
@ -362,7 +369,7 @@ void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg)
"%s rmcast:base:process_recv delivering buffer to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv->channel, (int)tag));
recv->cbfunc_buffer(ORTE_SUCCESS, recv->channel, recv->seq_num, tag,
&name, msg->buf, recv->cbdata);
&name, msg->buf, recv->cbdata);
} else {
/* if something is already present, then we have a problem */
if (NULL != recv->buf) {

Просмотреть файл

@ -1078,6 +1078,15 @@ static int send_data(rmcast_base_send_t *snd, orte_rmcast_channel_t channel)
return rc;
}
static void cbfunc(int status,
struct orte_process_name_t* peer,
struct opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata)
{
OBJ_RELEASE(buffer);
}
static void resend_data(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
@ -1087,6 +1096,7 @@ static void resend_data(int status, orte_process_name_t* sender,
orte_rmcast_seq_t start;
rmcast_base_channel_t *ch;
rmcast_send_log_t *log;
opal_buffer_t *recover;
/* block any further ops until we complete the missing
* message repair
@ -1105,10 +1115,9 @@ static void resend_data(int status, orte_process_name_t* sender,
goto release;
}
OPAL_OUTPUT_VERBOSE((0, orte_rmcast_base.rmcast_output,
"%s request resend data from %s for channel %d start %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), channel, start));
opal_output(0, "%s request resend data from %s for channel %d start %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), channel, start);
/* get the referenced channel object */
if (NULL == (ch = orte_rmcast_base_get_channel(channel))) {
@ -1130,7 +1139,10 @@ static void resend_data(int status, orte_process_name_t* sender,
"%s resending msg %d to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
log->seq_num, ORTE_NAME_PRINT(sender)));
if (0 > (rc = orte_rml.send_buffer(sender, log->buf, ORTE_RML_TAG_MULTICAST, 0))) {
recover = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(recover, log->buf);
if (0 > (rc = orte_rml.send_buffer_nb(sender, recover, ORTE_RML_TAG_MULTICAST, 0, cbfunc, NULL))) {
OBJ_RELEASE(recover);
ORTE_ERROR_LOG(rc);
goto release;
}