1
1

first cut at btl failover - seems to be working for simple test case

This commit was SVN r9816.
Этот коммит содержится в:
Tim Woodall 2006-05-04 16:16:26 +00:00
родитель 350d5b1713
Коммит 1b26caa95b
8 изменённых файлов: 414 добавлений и 261 удалений

Просмотреть файл

@ -79,15 +79,13 @@ struct mca_pml_dr_t {
/* my 'global' rank */
int32_t my_rank;
int timer_wdog_sec;
int timer_wdog_usec;
int timer_wdog_multiplier;
int timer_wdog_max_count;
struct timeval wdog_timer;
int wdog_timer_multiplier;
int wdog_retry_max;
int timer_ack_sec;
int timer_ack_usec;
int timer_ack_multiplier;
int timer_ack_max_count;
struct timeval ack_timer;
int ack_timer_multiplier;
int ack_retry_max;
/* enable/disable csum */
int enable_csum;

Просмотреть файл

@ -92,23 +92,23 @@ int mca_pml_dr_component_open(void)
mca_pml_dr_param_register_int("eager_limit", 128 * 1024);
mca_pml_dr.send_pipeline_depth =
mca_pml_dr_param_register_int("send_pipeline_depth", 3);
mca_pml_dr.timer_wdog_sec =
mca_pml_dr_param_register_int("timer_wdog_sec", 1);
mca_pml_dr.timer_wdog_usec =
mca_pml_dr_param_register_int("timer_wdog_usec", 0);
mca_pml_dr.timer_wdog_multiplier =
mca_pml_dr_param_register_int("timer_wdog_multiplier", 2);
mca_pml_dr.timer_wdog_max_count =
mca_pml_dr_param_register_int("timer_wdog_max_count", 10);
mca_pml_dr.wdog_timer.tv_sec =
mca_pml_dr_param_register_int("wdog_timer_sec", 5);
mca_pml_dr.wdog_timer.tv_usec =
mca_pml_dr_param_register_int("wdog_timer_usec", 0);
mca_pml_dr.wdog_timer_multiplier =
mca_pml_dr_param_register_int("wdog_timer_multiplier", 1);
mca_pml_dr.wdog_retry_max =
mca_pml_dr_param_register_int("wdog_retry_max", 1);
mca_pml_dr.timer_ack_sec =
mca_pml_dr_param_register_int("timer_ack_sec", 10);
mca_pml_dr.timer_ack_usec =
mca_pml_dr_param_register_int("timer_ack_usec", 0);
mca_pml_dr.timer_ack_multiplier =
mca_pml_dr_param_register_int("timer_ack_multiplier", 2);
mca_pml_dr.timer_ack_max_count =
mca_pml_dr_param_register_int("timer_ack_max_count", 10);
mca_pml_dr.ack_timer.tv_sec =
mca_pml_dr_param_register_int("ack_timer_sec", 10);
mca_pml_dr.ack_timer.tv_usec =
mca_pml_dr_param_register_int("ack_timer_usec", 0);
mca_pml_dr.ack_timer_multiplier =
mca_pml_dr_param_register_int("ack_timer_multiplier", 1);
mca_pml_dr.ack_retry_max =
mca_pml_dr_param_register_int("ack_retry_max", 3);
/* default is to csum all data */
mca_pml_dr.enable_csum =

Просмотреть файл

@ -20,12 +20,26 @@
#include "pml_dr.h"
#include "pml_dr_endpoint.h"
static void mca_pml_dr_endpoint_copy(mca_pml_dr_endpoint_t* dst, mca_pml_dr_endpoint_t* src)
{
dst->local = src->local;
dst->src = src->src;
dst->dst = src->dst;
ompi_seq_tracker_copy(&dst->seq_sends, &src->seq_sends);
ompi_seq_tracker_copy(&dst->seq_recvs, &src->seq_recvs);
ompi_seq_tracker_copy(&dst->seq_recvs_matched, &src->seq_recvs_matched);
dst->vfrag_seq = src->vfrag_seq;
}
static void mca_pml_dr_endpoint_construct(mca_pml_dr_endpoint_t* ep)
{
OBJ_CONSTRUCT(&ep->seq_sends, ompi_seq_tracker_t);
OBJ_CONSTRUCT(&ep->seq_recvs, ompi_seq_tracker_t);
OBJ_CONSTRUCT(&ep->seq_recvs_matched, ompi_seq_tracker_t);
ep->vfrag_seq = 0;
ep->base.copy = (mca_bml_base_endpoint_copy_fn_t)mca_pml_dr_endpoint_copy;
}

Просмотреть файл

@ -54,6 +54,7 @@ do {
ep = ompi_pointer_array_get_item(&mca_pml_dr.procs, hdr->hdr_common.hdr_src); \
assert(ep != NULL); \
if(ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) { \
OPAL_OUTPUT((0, "%s:%d: dropping duplicate ack", __FILE__, __LINE__)); \
return; \
} \
} while (0)

Просмотреть файл

@ -89,6 +89,51 @@ OBJ_CLASS_INSTANCE(
mca_pml_dr_send_request_construct,
mca_pml_dr_send_request_destruct);
/**
* Handle error status on local completion
*/
static void mca_pml_dr_error_completion(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
mca_pml_dr_vfrag_t* vfrag = descriptor->des_cbdata;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
switch(status) {
case OMPI_ERR_UNREACH:
case OMPI_ERR_CONNECTION_FAILED:
case OMPI_ERR_CONNECTION_REFUSED:
/**
* peer is no longer reachable through this btl
*/
mca_bml.bml_del_proc_btl(sendreq->req_proc->ompi_proc, btl);
break;
case OMPI_ERR_FATAL:
case OMPI_ERR_COMM_FAILURE:
/**
* btl is no longer available
*/
mca_bml.bml_del_btl(btl);
break;
default:
orte_errmgr.abort();
break;
}
/* update pending counts */
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,-1);
OPAL_THREAD_ADD64(&vfrag->vf_pending,-1);
/* reset vfrag state - select new BTL */
mca_pml_dr_vfrag_reset(vfrag);
/* reschedule vfrag */
mca_pml_dr_vfrag_reschedule(vfrag);
}
/**
* Completion of a short message - nothing left to schedule.
@ -100,34 +145,36 @@ static void mca_pml_dr_match_completion(
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
mca_pml_dr_send_request_t* sendreq = descriptor->des_cbdata;
mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0;
mca_pml_dr_vfrag_t* vfrag = descriptor->des_cbdata;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
/* kill pending wdog timer */
MCA_PML_DR_VFRAG_WDOG_STOP(vfrag);
/* free any descriptor used to retransmit */
if(descriptor != sendreq->descriptor) {
if(descriptor != sendreq->req_descriptor) {
mca_bml_base_free(descriptor->des_context, descriptor);
}
/* check completion status */
if(OMPI_SUCCESS != status) {
/* TSW - FIX */
opal_output(0, "%s:%d FATAL", __FILE__, __LINE__);
orte_errmgr.abort();
mca_pml_dr_error_completion(btl,ep,descriptor,status);
return;
}
/* wait for local completion */
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,-1);
if(OPAL_THREAD_ADD64(&vfrag->vf_pending,-1) > 0)
return;
/* wait for positive ack to complete request */
OPAL_THREAD_LOCK(&ompi_request_lock);
if(vfrag->vf_ack == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
/* return descriptor */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
sendreq->descriptor = NULL;
if(NULL != sendreq->req_descriptor) {
mca_bml_base_free(sendreq->req_descriptor->des_context, sendreq->req_descriptor);
sendreq->req_descriptor = NULL;
}
/* update statistics and complete */
@ -137,8 +184,12 @@ static void mca_pml_dr_match_completion(
/* on negative ack need to retransmit */
} else if(vfrag->vf_state & MCA_PML_DR_VFRAG_NACKED) {
MCA_PML_DR_VFRAG_ACK_RESET(vfrag);
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
/* start ack timer */
} else {
MCA_PML_DR_VFRAG_ACK_START(vfrag);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
@ -157,33 +208,35 @@ static void mca_pml_dr_rndv_completion(
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)descriptor->des_cbdata;
mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0;
mca_pml_dr_vfrag_t* vfrag = descriptor->des_cbdata;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
bool schedule = false;
/* kill pending wdog timer */
MCA_PML_DR_VFRAG_WDOG_STOP(vfrag);
/* free any descriptor used to retransmit */
if(descriptor != sendreq->descriptor) {
if(descriptor != sendreq->req_descriptor) {
mca_bml_base_free(descriptor->des_context, descriptor);
}
/* check completion status */
if(OMPI_SUCCESS != status) {
/* TSW - FIX */
opal_output(0, "%s:%d FATAL", __FILE__, __LINE__);
orte_errmgr.abort();
mca_pml_dr_error_completion(btl,ep,descriptor,status);
return;
}
/* local completion */
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,-1);
if(OPAL_THREAD_ADD64(&vfrag->vf_pending,-1) > 0)
return;
/* wait for positive ack to complete request */
OPAL_THREAD_LOCK(&ompi_request_lock);
if(vfrag->vf_ack == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
if(sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
sendreq->descriptor = NULL;
if(sendreq->req_descriptor) {
mca_bml_base_free(sendreq->req_descriptor->des_context, sendreq->req_descriptor);
sendreq->req_descriptor = NULL;
}
/* update statistics and complete */
@ -203,6 +256,11 @@ static void mca_pml_dr_rndv_completion(
MCA_PML_DR_VFRAG_ACK_RESET(vfrag);
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* setup ack timer */
} else {
OPAL_THREAD_UNLOCK(&ompi_request_lock);
MCA_PML_DR_VFRAG_ACK_START(vfrag);
}
/* check for pending requests */
@ -228,9 +286,8 @@ static void mca_pml_dr_frag_completion(
/* check completion status */
if(OMPI_SUCCESS != status) {
/* TSW - FIX */
opal_output(0, "%s:%d FATAL", __FILE__, __LINE__);
orte_errmgr.abort();
mca_pml_dr_error_completion(btl,ep,descriptor,status);
return;
}
/* have all pending frags completed for this vfrag? */
@ -313,7 +370,7 @@ int mca_pml_dr_send_request_start_buffered(
if(NULL == descriptor) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
sendreq->descriptor = descriptor; /* hang on to this for later */
sendreq->req_descriptor = descriptor; /* hang on to this for later */
segment = descriptor->des_src;
/* pack the data into the BTL supplied buffer */
@ -342,7 +399,7 @@ int mca_pml_dr_send_request_start_buffered(
descriptor->des_cbfunc = mca_pml_dr_rndv_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_cbdata = sendreq;
descriptor->des_cbdata = &sendreq->req_vfrag0;
/* buffer the remainder of the message */
rc = mca_pml_base_bsend_request_alloc((ompi_request_t*)sendreq);
@ -396,7 +453,7 @@ int mca_pml_dr_send_request_start_buffered(
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* send */
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
MCA_PML_DR_VFRAG_WDOG_START(&sendreq->req_vfrag0);
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, descriptor );
@ -429,7 +486,7 @@ int mca_pml_dr_send_request_start_copy(
if(NULL == descriptor) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
sendreq->descriptor = descriptor; /* hang on to this for later */
sendreq->req_descriptor = descriptor; /* hang on to this for later */
segment = descriptor->des_src;
/* pack the data into the supplied buffer */
@ -475,7 +532,7 @@ int mca_pml_dr_send_request_start_copy(
/* short message */
descriptor->des_cbfunc = mca_pml_dr_match_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_cbdata = sendreq;
descriptor->des_cbdata = &sendreq->req_vfrag0;
segment->seg_len = sizeof(mca_pml_dr_match_hdr_t) + max_data;
/* signal request completion */
@ -484,7 +541,7 @@ int mca_pml_dr_send_request_start_copy(
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* send */
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
MCA_PML_DR_VFRAG_WDOG_START(&sendreq->req_vfrag0);
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, descriptor );
@ -518,7 +575,7 @@ int mca_pml_dr_send_request_start_prepare(
if(NULL == descriptor) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
sendreq->descriptor = descriptor; /* hang on to this for later */
sendreq->req_descriptor = descriptor; /* hang on to this for later */
segment = descriptor->des_src;
/* build match header */
@ -542,7 +599,7 @@ int mca_pml_dr_send_request_start_prepare(
/* short message */
descriptor->des_cbfunc = mca_pml_dr_match_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_cbdata = sendreq;
descriptor->des_cbdata = &sendreq->req_vfrag0;
/* vfrag state */
sendreq->req_vfrag0.vf_size = size;
@ -550,7 +607,7 @@ int mca_pml_dr_send_request_start_prepare(
sendreq->req_vfrag0.vf_pending = 1;
/* send */
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
MCA_PML_DR_VFRAG_WDOG_START(&sendreq->req_vfrag0);
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, descriptor );
@ -596,7 +653,7 @@ int mca_pml_dr_send_request_start_rndv(
if(NULL == des) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
sendreq->descriptor = des; /* hang on to this for later */
sendreq->req_descriptor = des; /* hang on to this for later */
segment = des->des_src;
/* build hdr */
@ -619,7 +676,7 @@ int mca_pml_dr_send_request_start_rndv(
/* first fragment of a long message */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbdata = sendreq;
des->des_cbdata = &sendreq->req_vfrag0;
des->des_cbfunc = mca_pml_dr_rndv_completion;
/* vfrag state */
@ -629,7 +686,7 @@ int mca_pml_dr_send_request_start_rndv(
sendreq->req_vfrag0.vf_pending = 1;
/* send */
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
MCA_PML_DR_VFRAG_WDOG_START(&sendreq->req_vfrag0);
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, des );
@ -655,8 +712,117 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
assert(sendreq->req_vfrag0.vf_recv.pval != NULL);
if(OPAL_THREAD_ADD32(&sendreq->req_lock,1) == 1) {
do {
size_t bytes_remaining;
/*
* VFrags w/ nacks or that timed out
*/
while(opal_list_get_size(&sendreq->req_retrans) &&
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
mca_pml_dr_vfrag_t* vfrag;
OPAL_THREAD_LOCK(&ompi_request_lock);
vfrag = (mca_pml_dr_vfrag_t*)opal_list_get_first(&sendreq->req_retrans);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(NULL == vfrag) {
break;
}
/*
* Retransmit fragments that have not been acked.
*/
while(vfrag->vf_idx < vfrag->vf_len &&
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
if(((uint64_t)1 << vfrag->vf_idx) & ~vfrag->vf_ack) {
mca_bml_base_btl_t* bml_btl = vfrag->bml_btl;
mca_pml_dr_frag_hdr_t* hdr;
mca_btl_base_descriptor_t* des;
size_t offset_in_vfrag = vfrag->vf_max_send_size * vfrag->vf_idx;
size_t offset_in_msg = vfrag->vf_offset + offset_in_vfrag;
size_t size;
int rc;
if(vfrag->vf_idx == vfrag->vf_len - 1) {
size = vfrag->vf_size - offset_in_vfrag;
} else {
size = vfrag->vf_max_send_size;
}
/* pack into a descriptor */
ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset_in_msg);
mca_bml_base_prepare_src(
bml_btl,
NULL,
&sendreq->req_send.req_convertor,
sizeof(mca_pml_dr_frag_hdr_t),
&size,
&des
);
if(des == NULL) {
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
break;
}
des->des_cbfunc = mca_pml_dr_frag_completion;
des->des_cbdata = vfrag;
/* setup header */
hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst;
hdr->hdr_common.hdr_vid = vfrag->vf_id;
hdr->hdr_common.hdr_src = sendreq->req_endpoint->src;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_vlen = vfrag->vf_len;
hdr->hdr_frag_idx = vfrag->vf_idx;
hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_frag_offset = offset_in_msg;
hdr->hdr_src_ptr.pval = vfrag;
hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = (mca_pml_dr.enable_csum ?
opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t)) :
OPAL_CSUM_ZERO);
/* adjust number of outstanding operations */
if(OPAL_THREAD_ADD64(&vfrag->vf_pending, 1) == 1) {
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
}
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
/* adjust send offset - may not have finished scheduling entire vfrag */
if(offset_in_msg + size > sendreq->req_send_offset) {
sendreq->req_send_offset = offset_in_msg + size;
}
/* initiate send - note that this may complete before the call returns */
rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML);
if(rc == OMPI_SUCCESS) {
bytes_remaining -= size;
} else {
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1);
mca_bml_base_free(bml_btl,des);
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
break;
}
}
vfrag->vf_idx++;
}
/* remove from retrans list */
if(vfrag->vf_idx == vfrag->vf_len) {
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_remove_item(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
}
/* allocate remaining bytes to BTLs */
size_t bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_send_offset;
bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_send_offset;
while(bytes_remaining > 0 &&
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
@ -682,7 +848,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
break;
}
MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,endpoint,bytes_remaining,vfrag);
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
vfrag->bml_btl = bml_btl;
bytes_sent = 0;
@ -740,7 +905,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
/* update state */
vfrag->vf_idx++;
OPAL_THREAD_ADD64(&vfrag->vf_pending,1);
if(OPAL_THREAD_ADD64(&vfrag->vf_pending,1) == 1) {
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
}
sendreq->req_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
@ -758,101 +925,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
OPAL_THREAD_UNLOCK(&ompi_request_lock);
break;
}
mca_pml_dr_progress();
}
/*
* VFrags w/ nacks or that timed out
*/
while(opal_list_get_size(&sendreq->req_retrans) &&
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*)opal_list_get_first(&sendreq->req_retrans);
/*
* Retransmit fragments that have not been acked.
*/
while(vfrag->vf_idx < vfrag->vf_len &&
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
if(((uint64_t)1 << vfrag->vf_idx) & ~vfrag->vf_ack) {
mca_bml_base_btl_t* bml_btl = vfrag->bml_btl;
mca_pml_dr_frag_hdr_t* hdr;
mca_btl_base_descriptor_t* des;
size_t offset_in_vfrag = vfrag->vf_max_send_size * vfrag->vf_idx;
size_t offset_in_msg = vfrag->vf_offset + offset_in_vfrag;
size_t size;
int rc;
vfrag->vf_retry_cnt ++;
if(vfrag->vf_idx == vfrag->vf_len - 1) {
size = vfrag->vf_size - offset_in_vfrag;
} else {
size = vfrag->vf_max_send_size;
}
/* pack into a descriptor */
ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset_in_msg);
mca_bml_base_prepare_src(
bml_btl,
NULL,
&sendreq->req_send.req_convertor,
sizeof(mca_pml_dr_frag_hdr_t),
&size,
&des
);
if(des == NULL) {
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
break;
}
des->des_cbfunc = mca_pml_dr_frag_completion;
des->des_cbdata = vfrag;
/* setup header */
hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst;
hdr->hdr_common.hdr_vid = vfrag->vf_id;
hdr->hdr_common.hdr_src = sendreq->req_endpoint->src;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_vlen = vfrag->vf_len;
hdr->hdr_frag_idx = vfrag->vf_idx;
hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_frag_offset = offset_in_msg;
hdr->hdr_src_ptr.pval = vfrag;
hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = (mca_pml_dr.enable_csum ?
opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t)) :
OPAL_CSUM_ZERO);
OPAL_THREAD_ADD64(&vfrag->vf_pending, 1);
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
/* initiate send - note that this may complete before the call returns */
rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML);
if(rc == OMPI_SUCCESS) {
bytes_remaining -= size;
} else {
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1);
mca_bml_base_free(bml_btl,des);
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
break;
}
}
vfrag->vf_idx++;
}
/* move from retrans to pending list */
if(vfrag->vf_idx == vfrag->vf_len) {
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_remove_item(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
}
} while (OPAL_THREAD_ADD32(&sendreq->req_lock,-1) > 0);
}
@ -878,14 +950,14 @@ void mca_pml_dr_send_request_match_ack(
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
MCA_PML_DR_VFRAG_ACK_START(vfrag);
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
} else {
/* if already have local completion free descriptor and complete message */
/* return descriptor */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor );
sendreq->descriptor = NULL;
if(NULL != sendreq->req_descriptor) {
mca_bml_base_free(sendreq->req_descriptor->des_context, sendreq->req_descriptor );
sendreq->req_descriptor = NULL;
}
/* update statistics */
@ -930,14 +1002,14 @@ void mca_pml_dr_send_request_rndv_ack(
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
/* got a NACK, resend eager data! */
MCA_PML_DR_VFRAG_ACK_START(vfrag);
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else {
/* return descriptor of first fragment */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
sendreq->descriptor = NULL;
if(NULL != sendreq->req_descriptor) {
mca_bml_base_free(sendreq->req_descriptor->des_context, sendreq->req_descriptor);
sendreq->req_descriptor = NULL;
}
/* done? */
@ -1032,4 +1104,3 @@ void mca_pml_dr_send_request_frag_ack(
}

Просмотреть файл

@ -58,7 +58,7 @@ struct mca_pml_dr_send_request_t {
mca_pml_dr_vfrag_t* req_vfrag;
mca_pml_dr_vfrag_t req_vfrag0;
opal_list_t req_retrans;
mca_btl_base_descriptor_t* descriptor; /* descriptor for first frag, retransmission */
mca_btl_base_descriptor_t* req_descriptor; /* descriptor for first frag, retransmission */
};
typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t;
@ -159,12 +159,13 @@ do {
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \
MCA_PML_DR_VFRAG_INIT(&sendreq->req_vfrag0); \
sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&endpoint->vfrag_seq,1); \
sendreq->req_vfrag0.bml_btl = bml_btl; \
sendreq->req_vfrag = &sendreq->req_vfrag0; \
sendreq->req_endpoint = endpoint; \
sendreq->req_proc = proc; \
\
sendreq->req_lock = 0; \
sendreq->req_pipeline_depth = 0; \
sendreq->req_pipeline_depth = 1; \
sendreq->req_bytes_delivered = 0; \
sendreq->req_state = 0; \
sendreq->req_send_offset = 0; \
@ -315,10 +316,12 @@ do {
#define MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq, vfrag) \
do { \
if(vfrag->vf_idx == vfrag->vf_len || \
(vfrag->vf_wdog_cnt == 0 && vfrag->vf_ack_cnt == 0)) { \
opal_list_append(&(sendreq)->req_retrans, (opal_list_item_t*)vfrag); \
} \
vfrag->vf_idx = 0; \
vfrag->vf_state = 0; \
opal_output(0, "queuing vfrag for retrans!\n"); \
opal_list_append(&(sendreq)->req_retrans, (opal_list_item_t*)vfrag); \
} while(0)
/*
@ -355,29 +358,25 @@ do { \
#define MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag) \
do { \
mca_pml_dr_endpoint_t* endpoint = sendreq->req_endpoint; \
mca_bml_base_btl_t* bml_btl = \
mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \
mca_btl_base_descriptor_t *des_old, *des_new; \
\
if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \
opal_output(0, "%s:%d,%s retry count exceeded! FATAL", __FILE__, __LINE__, __func__); \
orte_errmgr.abort(); \
} \
OPAL_THREAD_ADD64(&vfrag->vf_pending,1); \
\
OPAL_OUTPUT((0, "%s:%d:%s, retransmitting eager\n", __FILE__, __LINE__, __func__)); \
assert(sendreq->descriptor->des_src != NULL); \
MCA_PML_DR_VFRAG_RESET(vfrag); \
des_old = sendreq->descriptor; \
mca_bml_base_alloc(bml_btl, &des_new, des_old->des_src->seg_len);\
assert(sendreq->req_descriptor->des_src != NULL); \
\
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,1); \
OPAL_THREAD_ADD64(&(vfrag)->vf_pending,1); \
(vfrag)->vf_state &= ~MCA_PML_DR_VFRAG_NACKED; \
\
des_old = sendreq->req_descriptor; \
mca_bml_base_alloc(vfrag->bml_btl, &des_new, des_old->des_src->seg_len);\
sendreq->req_descriptor = des_new; \
memcpy(des_new->des_src->seg_addr.pval, \
des_old->des_src->seg_addr.pval, \
des_old->des_src->seg_len); \
des_new->des_flags = des_old->des_flags; \
des_new->des_cbdata = des_old->des_cbdata; \
des_new->des_cbfunc = des_old->des_cbfunc; \
mca_bml_base_send(bml_btl, des_new, MCA_BTL_TAG_PML); \
mca_bml_base_send(vfrag->bml_btl, des_new, MCA_BTL_TAG_PML); \
} while(0)
/*
@ -392,18 +391,15 @@ do { \
mca_btl_base_descriptor_t *des_old, *des_new; \
mca_pml_dr_hdr_t *hdr; \
\
if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \
opal_output(0, "%s:%d,%s retry count exceeded! FATAL", __FILE__, __LINE__, __func__); \
orte_errmgr.abort(); \
} \
OPAL_THREAD_ADD64(&vfrag->vf_pending,1); \
\
opal_output(0, "%s:%d:%s, (re)transmitting rndv probe\n", __FILE__, __LINE__, __func__); \
assert(sendreq->descriptor->des_src != NULL); \
MCA_PML_DR_VFRAG_RESET(vfrag); \
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,1); \
OPAL_THREAD_ADD64(&vfrag->vf_pending,1); \
(vfrag)->vf_state &= ~MCA_PML_DR_VFRAG_NACKED; \
\
assert(sendreq->req_descriptor->des_src != NULL); \
mca_bml_base_alloc(bml_btl, &des_new, \
sizeof(mca_pml_dr_rendezvous_hdr_t)); \
des_old = sendreq->descriptor; \
des_old = sendreq->req_descriptor; \
/* build hdr */ \
hdr = (mca_pml_dr_hdr_t*)des_new->des_src->seg_addr.pval; \
hdr->hdr_common.hdr_flags = 0; \

Просмотреть файл

@ -19,10 +19,12 @@
#include "ompi_config.h"
#include "pml_dr_vfrag.h"
#include "pml_dr_sendreq.h"
#include "ompi/mca/bml/base/base.h"
#include "orte/mca/errmgr/errmgr.h"
void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* vfrag);
void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* vfrag);
static void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* vfrag);
static void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* vfrag);
static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag)
{
@ -36,14 +38,13 @@ static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag)
vfrag->vf_max_send_size = 0;
vfrag->vf_ack = 0;
vfrag->vf_mask = 1;
vfrag->vf_retry_cnt = 0;
vfrag->vf_state = 0;
vfrag->tv_wdog.tv_sec = mca_pml_dr.timer_wdog_sec;
vfrag->tv_wdog.tv_usec = mca_pml_dr.timer_wdog_usec;
vfrag->tv_ack.tv_sec = mca_pml_dr.timer_ack_usec;
vfrag->tv_ack.tv_usec = mca_pml_dr.timer_ack_usec;
opal_evtimer_set(&vfrag->ev_wdog, mca_pml_dr_vfrag_wdog_timeout, (void*) vfrag);
opal_evtimer_set(&vfrag->ev_ack, mca_pml_dr_vfrag_ack_timeout, (void*) vfrag);
vfrag->vf_wdog_tv = mca_pml_dr.wdog_timer;
vfrag->vf_ack_tv = mca_pml_dr.ack_timer;
vfrag->vf_wdog_cnt = 0;
vfrag->vf_ack_cnt = 0;
opal_evtimer_set(&vfrag->vf_wdog_ev, mca_pml_dr_vfrag_wdog_timeout, (void*) vfrag);
opal_evtimer_set(&vfrag->vf_ack_ev, mca_pml_dr_vfrag_ack_timeout, (void*) vfrag);
}
@ -64,47 +65,120 @@ OBJ_CLASS_INSTANCE(
/**
* The wdog timer expired, better do something about it, like resend the current part of the vfrag
*/
void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
static void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
{
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
OPAL_OUTPUT((0, "%s:%d:%s: wdog timeout!", __FILE__, __LINE__, __func__));
if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_wdog_max_count) {
opal_output(0, "%s:%d:%s retry count exceeded! FATAL", __FILE__, __LINE__, __func__);
orte_errmgr.abort();
}
/* back off watchdog timer */
vfrag->tv_wdog.tv_sec =
mca_pml_dr.timer_wdog_sec +
mca_pml_dr.timer_wdog_sec * mca_pml_dr.timer_wdog_multiplier *
vfrag->vf_retry_cnt;
vfrag->tv_wdog.tv_usec =
mca_pml_dr.timer_wdog_usec +
mca_pml_dr.timer_wdog_usec * mca_pml_dr.timer_wdog_multiplier *
vfrag->vf_retry_cnt;
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
OPAL_OUTPUT((0, "%s:%d:%s: wdog timeout: 0x%08x", __FILE__, __LINE__, __func__, vfrag));
/* retransmit vfrag */
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq, vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_schedule(sendreq);
/* update pending counts */
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,-vfrag->vf_pending);
OPAL_THREAD_ADD64(&vfrag->vf_pending,-vfrag->vf_pending);
/* check for hung btl */
if(++vfrag->vf_wdog_cnt == mca_pml_dr.wdog_retry_max) {
/* declare btl dead */
opal_output(0, "%s:%d:%s: failing BTL: %s", __FILE__, __LINE__, __func__,
vfrag->bml_btl->btl->btl_component->btl_version.mca_component_name);
mca_bml.bml_del_btl(vfrag->bml_btl->btl);
mca_pml_dr_vfrag_reset(vfrag);
}
/* back off watchdog timer */
vfrag->vf_wdog_tv.tv_sec =
mca_pml_dr.wdog_timer.tv_sec +
mca_pml_dr.wdog_timer.tv_sec * mca_pml_dr.wdog_timer_multiplier *
vfrag->vf_wdog_cnt;
vfrag->vf_wdog_tv.tv_usec =
mca_pml_dr.wdog_timer.tv_usec +
mca_pml_dr.wdog_timer.tv_usec * mca_pml_dr.wdog_timer_multiplier *
vfrag->vf_wdog_cnt;
/* reschedule vfrag */
mca_pml_dr_vfrag_reschedule(vfrag);
}
/**
* The ack timer expired, better do something about it, like resend the entire vfrag?
*/
void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) {
static void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data)
{
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
opal_output(0, "%s:%d:%s: ack timeout!", __FILE__, __LINE__, __func__);
OPAL_OUTPUT((0, "%s:%d:%s: ack timeout: %0x08x", __FILE__, __LINE__, __func__, vfrag));
OPAL_THREAD_LOCK(&ompi_request_lock);
/* stop ack timer */
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
/* check for hung btl */
if(++vfrag->vf_ack_cnt == mca_pml_dr.ack_retry_max) {
/* declare btl dead */
opal_output(0, "%s:%d:%s: failing BTL: %s", __FILE__, __LINE__, __func__,
vfrag->bml_btl->btl->btl_component->btl_version.mca_component_name);
mca_bml.bml_del_btl(vfrag->bml_btl->btl);
mca_pml_dr_vfrag_reset(vfrag);
}
/* back off ack timer */
vfrag->vf_ack_tv.tv_sec =
mca_pml_dr.ack_timer.tv_sec +
mca_pml_dr.ack_timer.tv_sec * mca_pml_dr.ack_timer_multiplier *
vfrag->vf_ack_cnt;
vfrag->vf_ack_tv.tv_usec =
mca_pml_dr.ack_timer.tv_usec +
mca_pml_dr.ack_timer.tv_usec * mca_pml_dr.ack_timer_multiplier *
vfrag->vf_ack_cnt;
/* reschedule vfrag */
mca_pml_dr_vfrag_reschedule(vfrag);
}
/**
* Vfrag failure - declare btl dead and try to resend on an alternate btl
*/
void mca_pml_dr_vfrag_reset(mca_pml_dr_vfrag_t* vfrag)
{
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
/* update counters - give new BTL a fair chance :-) */
vfrag->vf_ack_cnt = 0;
vfrag->vf_wdog_cnt = 0;
/* lookup new bml_btl data structure */
sendreq->req_endpoint = (mca_pml_dr_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml;
/* make sure a path is available */
if(mca_bml_base_btl_array_get_size(&sendreq->req_endpoint->base.btl_eager) == 0 ||
mca_bml_base_btl_array_get_size(&sendreq->req_endpoint->base.btl_eager) == 0) {
opal_output(0, "%s:%d:%s: no path to peer", __FILE__, __LINE__, __func__);
orte_errmgr.abort();
}
if(vfrag->vf_offset == 0) {
vfrag->bml_btl = mca_bml_base_btl_array_get_next(&sendreq->req_endpoint->base.btl_eager);
} else {
vfrag->bml_btl = mca_bml_base_btl_array_get_next(&sendreq->req_endpoint->base.btl_send);
}
opal_output(0, "%s:%d:%s: selected new BTL: %s", __FILE__, __LINE__, __func__,
vfrag->bml_btl->btl->btl_component->btl_version.mca_component_name);
}
/**
* Reschedule vfrag that has timed out
*/
void mca_pml_dr_vfrag_reschedule(mca_pml_dr_vfrag_t* vfrag)
{
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
/* start wdog timer */
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
/* first frag within send request */
OPAL_THREAD_LOCK(&ompi_request_lock);
if(vfrag == &sendreq->req_vfrag0) {
MCA_PML_DR_VFRAG_ACK_START(vfrag);
if(vfrag->vf_state & MCA_PML_DR_VFRAG_RNDV) {
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
} else {
@ -116,9 +190,7 @@ void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) {
} else {
MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq, vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
mca_pml_dr_send_request_schedule(sendreq);
}
}

Просмотреть файл

@ -39,7 +39,6 @@ struct mca_pml_dr_vfrag_t {
uint32_t vf_id;
uint16_t vf_idx;
uint16_t vf_len;
uint8_t vf_retry_cnt;
size_t vf_offset;
size_t vf_size;
size_t vf_max_send_size;
@ -54,13 +53,13 @@ struct mca_pml_dr_vfrag_t {
operation
2) a timeout for ACK of the VRAG
*/
struct timeval tv_wdog;
struct timeval tv_ack;
opal_event_t ev_ack;
opal_event_t ev_wdog;
uint8_t cnt_wdog;
uint8_t cnt_ack;
uint8_t cnt_nack;
struct timeval vf_wdog_tv;
opal_event_t vf_wdog_ev;
uint8_t vf_wdog_cnt;
struct timeval vf_ack_tv;
opal_event_t vf_ack_ev;
uint8_t vf_ack_cnt;
};
typedef struct mca_pml_dr_vfrag_t mca_pml_dr_vfrag_t;
@ -83,16 +82,13 @@ do { \
do { \
(vfrag)->vf_idx = 0; \
(vfrag)->vf_ack = 0; \
(vfrag)->vf_retry_cnt = 0; \
(vfrag)->vf_wdog_cnt = 0; \
(vfrag)->vf_ack_cnt = 0; \
(vfrag)->vf_recv.pval = NULL; \
(vfrag)->vf_state = 0; \
(vfrag)->vf_pending = 0; \
} while(0)
#define MCA_PML_DR_VFRAG_RESET(vfrag) \
do { \
(vfrag)->vf_idx = 0; \
(vfrag)->vf_state &= ~MCA_PML_DR_VFRAG_NACKED; \
(vfrag)->vf_wdog_tv = mca_pml_dr.wdog_timer; \
(vfrag)->vf_ack_tv = mca_pml_dr.ack_timer; \
} while(0)
@ -102,18 +98,18 @@ do { \
#define MCA_PML_DR_VFRAG_WDOG_START(vfrag) \
do { \
opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \
opal_event_add(&(vfrag)->vf_wdog_ev, &(vfrag)->vf_wdog_tv); \
} while(0)
#define MCA_PML_DR_VFRAG_WDOG_STOP(vfrag) \
do { \
opal_event_del(&vfrag->ev_wdog); \
opal_event_del(&(vfrag)->vf_wdog_ev); \
} while(0)
#define MCA_PML_DR_VFRAG_WDOG_RESET(vfrag) \
do { \
opal_event_del(&vfrag->ev_wdog); \
opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \
opal_event_del(&(vfrag)->vf_wdog_ev); \
opal_event_add(&(vfrag)->vf_wdog_ev, &vfrag->vf_wdog_tv); \
} while(0)
@ -123,20 +119,12 @@ do { \
#define MCA_PML_DR_VFRAG_ACK_START(vfrag) \
do { \
(vfrag)->tv_ack.tv_sec = \
mca_pml_dr.timer_ack_sec + \
mca_pml_dr.timer_ack_sec * mca_pml_dr.timer_ack_multiplier * \
(vfrag)->vf_retry_cnt; \
(vfrag)->tv_ack.tv_usec = \
mca_pml_dr.timer_ack_usec + \
mca_pml_dr.timer_ack_usec * mca_pml_dr.timer_ack_multiplier * \
(vfrag)->vf_retry_cnt; \
opal_event_add(&(vfrag)->ev_ack, &(vfrag)->tv_ack); \
opal_event_add(&(vfrag)->vf_ack_ev, &(vfrag)->vf_ack_tv); \
} while(0)
#define MCA_PML_DR_VFRAG_ACK_STOP(vfrag) \
do { \
opal_event_del(&vfrag->ev_ack); \
opal_event_del(&vfrag->vf_ack_ev); \
} while(0)
#define MCA_PML_DR_VFRAG_ACK_RESET(vfrag) \
@ -145,6 +133,19 @@ do { \
MCA_PML_DR_VFRAG_ACK_START(vfrag); \
} while(0)
/**
* Reset a VFRAG to use a new BTL
*/
void mca_pml_dr_vfrag_reset(mca_pml_dr_vfrag_t*);
/**
* Reschedule a vfrag that has timed out
*/
void mca_pml_dr_vfrag_reschedule(mca_pml_dr_vfrag_t*);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif