diff --git a/orte/mca/rmcast/base/rmcast_base_threads.c b/orte/mca/rmcast/base/rmcast_base_threads.c index c0da4180b4..9db87480e2 100644 --- a/orte/mca/rmcast/base/rmcast_base_threads.c +++ b/orte/mca/rmcast/base/rmcast_base_threads.c @@ -97,6 +97,15 @@ void orte_rmcast_base_stop_threads(void) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); } +static void cbfunc(int status, + orte_process_name_t *peer, + opal_buffer_t *buffer, + orte_rml_tag_t tag, + void* cbdata) +{ + OBJ_RELEASE(buffer); +} + void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg) { orte_rmcast_channel_t channel; @@ -112,7 +121,7 @@ void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg) rmcast_seq_tracker_t *trkr, *tptr; rmcast_recv_log_t *log, *logptr; bool restart; - opal_buffer_t alert; + opal_buffer_t *alert; /* extract the header */ if (ORTE_SUCCESS != (rc = extract_hdr(msg->buf, &name, &channel, &tag, &restart, &recvd_seq_num))) { @@ -213,24 +222,22 @@ void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg) if (1 != (recvd_seq_num - trkr->seq_num) || (ORTE_RMCAST_SEQ_MAX == trkr->seq_num && 0 != recvd_seq_num)) { /* missing a message - request it */ - OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output, - "%s Missed msg %d (%d) on channel %d from source %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recvd_seq_num, - trkr->seq_num, channel, ORTE_NAME_PRINT(&name))); - OBJ_CONSTRUCT(&alert, opal_buffer_t); - if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &channel, 1, ORTE_RMCAST_CHANNEL_T))) { + opal_output(0, "%s Missed msg %d (%d) on channel %d from source %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recvd_seq_num, + trkr->seq_num, channel, ORTE_NAME_PRINT(&name)); + alert = OBJ_NEW(opal_buffer_t); + if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &channel, 1, ORTE_RMCAST_CHANNEL_T))) { ORTE_ERROR_LOG(rc); exit(1); } - if (ORTE_SUCCESS != (rc = opal_dss.pack(&alert, &trkr->seq_num, 1, ORTE_RMCAST_SEQ_T))) { + if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &trkr->seq_num, 1, ORTE_RMCAST_SEQ_T))) { ORTE_ERROR_LOG(rc); exit(1); } - if (0 > (rc = orte_rml.send_buffer(&name, &alert, ORTE_RML_TAG_MISSED_MSG, 0))) { + if (0 > (rc = orte_rml.send_buffer_nb(&name, alert, ORTE_RML_TAG_MISSED_MSG, 0, cbfunc, NULL))) { ORTE_ERROR_LOG(rc); exit(1); } - OBJ_DESTRUCT(&alert); goto cleanup; } OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output, @@ -334,7 +341,7 @@ void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg) "%s rmcast:base:process_recv delivering iovecs to channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv->channel, (int)tag)); recv->cbfunc_iovec(ORTE_SUCCESS, recv->channel, recv->seq_num, tag, - &name, iovec_array, iovec_count, recv->cbdata); + &name, iovec_array, iovec_count, recv->cbdata); } else { /* if something is already present, then we have a problem */ if (NULL != recv->iovec_array) { @@ -362,7 +369,7 @@ void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg) "%s rmcast:base:process_recv delivering buffer to channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv->channel, (int)tag)); recv->cbfunc_buffer(ORTE_SUCCESS, recv->channel, recv->seq_num, tag, - &name, msg->buf, recv->cbdata); + &name, msg->buf, recv->cbdata); } else { /* if something is already present, then we have a problem */ if (NULL != recv->buf) { diff --git a/orte/mca/rmcast/udp/rmcast_udp.c b/orte/mca/rmcast/udp/rmcast_udp.c index 024a814fe6..18e245a21b 100644 --- a/orte/mca/rmcast/udp/rmcast_udp.c +++ b/orte/mca/rmcast/udp/rmcast_udp.c @@ -1078,6 +1078,15 @@ static int send_data(rmcast_base_send_t *snd, orte_rmcast_channel_t channel) return rc; } +static void cbfunc(int status, + struct orte_process_name_t* peer, + struct opal_buffer_t* buffer, + orte_rml_tag_t tag, + void* cbdata) +{ + OBJ_RELEASE(buffer); +} + static void resend_data(int status, orte_process_name_t* sender, opal_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata) @@ -1087,6 +1096,7 @@ static void resend_data(int status, orte_process_name_t* sender, orte_rmcast_seq_t start; rmcast_base_channel_t *ch; rmcast_send_log_t *log; + opal_buffer_t *recover; /* block any further ops until we complete the missing * message repair @@ -1105,10 +1115,9 @@ static void resend_data(int status, orte_process_name_t* sender, goto release; } - OPAL_OUTPUT_VERBOSE((0, orte_rmcast_base.rmcast_output, - "%s request resend data from %s for channel %d start %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(sender), channel, start)); + opal_output(0, "%s request resend data from %s for channel %d start %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender), channel, start); /* get the referenced channel object */ if (NULL == (ch = orte_rmcast_base_get_channel(channel))) { @@ -1130,7 +1139,10 @@ static void resend_data(int status, orte_process_name_t* sender, "%s resending msg %d to %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), log->seq_num, ORTE_NAME_PRINT(sender))); - if (0 > (rc = orte_rml.send_buffer(sender, log->buf, ORTE_RML_TAG_MULTICAST, 0))) { + recover = OBJ_NEW(opal_buffer_t); + opal_dss.copy_payload(recover, log->buf); + if (0 > (rc = orte_rml.send_buffer_nb(sender, recover, ORTE_RML_TAG_MULTICAST, 0, cbfunc, NULL))) { + OBJ_RELEASE(recover); ORTE_ERROR_LOG(rc); goto release; }