diff --git a/ompi/mca/pml/dr/pml_dr.h b/ompi/mca/pml/dr/pml_dr.h index 3d41e920fa..720c5b2267 100644 --- a/ompi/mca/pml/dr/pml_dr.h +++ b/ompi/mca/pml/dr/pml_dr.h @@ -79,15 +79,13 @@ struct mca_pml_dr_t { /* my 'global' rank */ int32_t my_rank; - int timer_wdog_sec; - int timer_wdog_usec; - int timer_wdog_multiplier; - int timer_wdog_max_count; + struct timeval wdog_timer; + int wdog_timer_multiplier; + int wdog_retry_max; - int timer_ack_sec; - int timer_ack_usec; - int timer_ack_multiplier; - int timer_ack_max_count; + struct timeval ack_timer; + int ack_timer_multiplier; + int ack_retry_max; /* enable/disable csum */ int enable_csum; diff --git a/ompi/mca/pml/dr/pml_dr_component.c b/ompi/mca/pml/dr/pml_dr_component.c index 1a8044d5f0..aac422bb37 100644 --- a/ompi/mca/pml/dr/pml_dr_component.c +++ b/ompi/mca/pml/dr/pml_dr_component.c @@ -92,23 +92,23 @@ int mca_pml_dr_component_open(void) mca_pml_dr_param_register_int("eager_limit", 128 * 1024); mca_pml_dr.send_pipeline_depth = mca_pml_dr_param_register_int("send_pipeline_depth", 3); - mca_pml_dr.timer_wdog_sec = - mca_pml_dr_param_register_int("timer_wdog_sec", 1); - mca_pml_dr.timer_wdog_usec = - mca_pml_dr_param_register_int("timer_wdog_usec", 0); - mca_pml_dr.timer_wdog_multiplier = - mca_pml_dr_param_register_int("timer_wdog_multiplier", 2); - mca_pml_dr.timer_wdog_max_count = - mca_pml_dr_param_register_int("timer_wdog_max_count", 10); + mca_pml_dr.wdog_timer.tv_sec = + mca_pml_dr_param_register_int("wdog_timer_sec", 5); + mca_pml_dr.wdog_timer.tv_usec = + mca_pml_dr_param_register_int("wdog_timer_usec", 0); + mca_pml_dr.wdog_timer_multiplier = + mca_pml_dr_param_register_int("wdog_timer_multiplier", 1); + mca_pml_dr.wdog_retry_max = + mca_pml_dr_param_register_int("wdog_retry_max", 1); - mca_pml_dr.timer_ack_sec = - mca_pml_dr_param_register_int("timer_ack_sec", 10); - mca_pml_dr.timer_ack_usec = - mca_pml_dr_param_register_int("timer_ack_usec", 0); - mca_pml_dr.timer_ack_multiplier = - mca_pml_dr_param_register_int("timer_ack_multiplier", 2); - mca_pml_dr.timer_ack_max_count = - mca_pml_dr_param_register_int("timer_ack_max_count", 10); + mca_pml_dr.ack_timer.tv_sec = + mca_pml_dr_param_register_int("ack_timer_sec", 10); + mca_pml_dr.ack_timer.tv_usec = + mca_pml_dr_param_register_int("ack_timer_usec", 0); + mca_pml_dr.ack_timer_multiplier = + mca_pml_dr_param_register_int("ack_timer_multiplier", 1); + mca_pml_dr.ack_retry_max = + mca_pml_dr_param_register_int("ack_retry_max", 3); /* default is to csum all data */ mca_pml_dr.enable_csum = diff --git a/ompi/mca/pml/dr/pml_dr_endpoint.c b/ompi/mca/pml/dr/pml_dr_endpoint.c index 58962ad093..4aec1bc97e 100644 --- a/ompi/mca/pml/dr/pml_dr_endpoint.c +++ b/ompi/mca/pml/dr/pml_dr_endpoint.c @@ -20,12 +20,26 @@ #include "pml_dr.h" #include "pml_dr_endpoint.h" + +static void mca_pml_dr_endpoint_copy(mca_pml_dr_endpoint_t* dst, mca_pml_dr_endpoint_t* src) +{ + dst->local = src->local; + dst->src = src->src; + dst->dst = src->dst; + ompi_seq_tracker_copy(&dst->seq_sends, &src->seq_sends); + ompi_seq_tracker_copy(&dst->seq_recvs, &src->seq_recvs); + ompi_seq_tracker_copy(&dst->seq_recvs_matched, &src->seq_recvs_matched); + dst->vfrag_seq = src->vfrag_seq; +} + + static void mca_pml_dr_endpoint_construct(mca_pml_dr_endpoint_t* ep) { OBJ_CONSTRUCT(&ep->seq_sends, ompi_seq_tracker_t); OBJ_CONSTRUCT(&ep->seq_recvs, ompi_seq_tracker_t); OBJ_CONSTRUCT(&ep->seq_recvs_matched, ompi_seq_tracker_t); ep->vfrag_seq = 0; + ep->base.copy = (mca_bml_base_endpoint_copy_fn_t)mca_pml_dr_endpoint_copy; } diff --git a/ompi/mca/pml/dr/pml_dr_recvfrag.c b/ompi/mca/pml/dr/pml_dr_recvfrag.c index 1155b75fbb..80097bfc8d 100644 --- a/ompi/mca/pml/dr/pml_dr_recvfrag.c +++ b/ompi/mca/pml/dr/pml_dr_recvfrag.c @@ -54,6 +54,7 @@ do { ep = ompi_pointer_array_get_item(&mca_pml_dr.procs, hdr->hdr_common.hdr_src); \ assert(ep != NULL); \ if(ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) { \ + OPAL_OUTPUT((0, "%s:%d: dropping duplicate ack", __FILE__, __LINE__)); \ return; \ } \ } while (0) diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.c b/ompi/mca/pml/dr/pml_dr_sendreq.c index 76d14e7946..164c1b33c7 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.c +++ b/ompi/mca/pml/dr/pml_dr_sendreq.c @@ -89,6 +89,51 @@ OBJ_CLASS_INSTANCE( mca_pml_dr_send_request_construct, mca_pml_dr_send_request_destruct); +/** + * Handle error status on local completion + */ + +static void mca_pml_dr_error_completion( + struct mca_btl_base_module_t* btl, + struct mca_btl_base_endpoint_t* ep, + struct mca_btl_base_descriptor_t* descriptor, + int status) +{ + mca_pml_dr_vfrag_t* vfrag = descriptor->des_cbdata; + mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; + + switch(status) { + case OMPI_ERR_UNREACH: + case OMPI_ERR_CONNECTION_FAILED: + case OMPI_ERR_CONNECTION_REFUSED: + /** + * peer is no longer reachable through this btl + */ + mca_bml.bml_del_proc_btl(sendreq->req_proc->ompi_proc, btl); + break; + + case OMPI_ERR_FATAL: + case OMPI_ERR_COMM_FAILURE: + /** + * btl is no longer available + */ + mca_bml.bml_del_btl(btl); + break; + default: + orte_errmgr.abort(); + break; + } + + /* update pending counts */ + OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,-1); + OPAL_THREAD_ADD64(&vfrag->vf_pending,-1); + + /* reset vfrag state - select new BTL */ + mca_pml_dr_vfrag_reset(vfrag); + + /* reschedule vfrag */ + mca_pml_dr_vfrag_reschedule(vfrag); +} /** * Completion of a short message - nothing left to schedule. @@ -100,34 +145,36 @@ static void mca_pml_dr_match_completion( struct mca_btl_base_descriptor_t* descriptor, int status) { - mca_pml_dr_send_request_t* sendreq = descriptor->des_cbdata; - mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0; + mca_pml_dr_vfrag_t* vfrag = descriptor->des_cbdata; + mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; + + /* kill pending wdog timer */ + MCA_PML_DR_VFRAG_WDOG_STOP(vfrag); /* free any descriptor used to retransmit */ - if(descriptor != sendreq->descriptor) { + if(descriptor != sendreq->req_descriptor) { mca_bml_base_free(descriptor->des_context, descriptor); } /* check completion status */ if(OMPI_SUCCESS != status) { - /* TSW - FIX */ - opal_output(0, "%s:%d FATAL", __FILE__, __LINE__); - orte_errmgr.abort(); + mca_pml_dr_error_completion(btl,ep,descriptor,status); + return; } /* wait for local completion */ + OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,-1); if(OPAL_THREAD_ADD64(&vfrag->vf_pending,-1) > 0) return; /* wait for positive ack to complete request */ OPAL_THREAD_LOCK(&ompi_request_lock); if(vfrag->vf_ack == vfrag->vf_mask) { - MCA_PML_DR_VFRAG_ACK_STOP(vfrag); /* return descriptor */ - if(NULL != sendreq->descriptor) { - mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor); - sendreq->descriptor = NULL; + if(NULL != sendreq->req_descriptor) { + mca_bml_base_free(sendreq->req_descriptor->des_context, sendreq->req_descriptor); + sendreq->req_descriptor = NULL; } /* update statistics and complete */ @@ -137,9 +184,13 @@ static void mca_pml_dr_match_completion( /* on negative ack need to retransmit */ } else if(vfrag->vf_state & MCA_PML_DR_VFRAG_NACKED) { - MCA_PML_DR_VFRAG_ACK_RESET(vfrag); + MCA_PML_DR_VFRAG_WDOG_START(vfrag); MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag); - } + + /* start ack timer */ + } else { + MCA_PML_DR_VFRAG_ACK_START(vfrag); + } OPAL_THREAD_UNLOCK(&ompi_request_lock); /* check for pending requests */ @@ -157,33 +208,35 @@ static void mca_pml_dr_rndv_completion( struct mca_btl_base_descriptor_t* descriptor, int status) { - mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)descriptor->des_cbdata; - mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0; + mca_pml_dr_vfrag_t* vfrag = descriptor->des_cbdata; + mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; bool schedule = false; + /* kill pending wdog timer */ + MCA_PML_DR_VFRAG_WDOG_STOP(vfrag); + /* free any descriptor used to retransmit */ - if(descriptor != sendreq->descriptor) { + if(descriptor != sendreq->req_descriptor) { mca_bml_base_free(descriptor->des_context, descriptor); } /* check completion status */ if(OMPI_SUCCESS != status) { - /* TSW - FIX */ - opal_output(0, "%s:%d FATAL", __FILE__, __LINE__); - orte_errmgr.abort(); + mca_pml_dr_error_completion(btl,ep,descriptor,status); + return; } /* local completion */ + OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,-1); if(OPAL_THREAD_ADD64(&vfrag->vf_pending,-1) > 0) return; /* wait for positive ack to complete request */ OPAL_THREAD_LOCK(&ompi_request_lock); if(vfrag->vf_ack == vfrag->vf_mask) { - MCA_PML_DR_VFRAG_ACK_STOP(vfrag); - if(sendreq->descriptor) { - mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor); - sendreq->descriptor = NULL; + if(sendreq->req_descriptor) { + mca_bml_base_free(sendreq->req_descriptor->des_context, sendreq->req_descriptor); + sendreq->req_descriptor = NULL; } /* update statistics and complete */ @@ -203,6 +256,11 @@ static void mca_pml_dr_rndv_completion( MCA_PML_DR_VFRAG_ACK_RESET(vfrag); MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag); OPAL_THREAD_UNLOCK(&ompi_request_lock); + + /* setup ack timer */ + } else { + OPAL_THREAD_UNLOCK(&ompi_request_lock); + MCA_PML_DR_VFRAG_ACK_START(vfrag); } /* check for pending requests */ @@ -228,9 +286,8 @@ static void mca_pml_dr_frag_completion( /* check completion status */ if(OMPI_SUCCESS != status) { - /* TSW - FIX */ - opal_output(0, "%s:%d FATAL", __FILE__, __LINE__); - orte_errmgr.abort(); + mca_pml_dr_error_completion(btl,ep,descriptor,status); + return; } /* have all pending frags completed for this vfrag? */ @@ -313,7 +370,7 @@ int mca_pml_dr_send_request_start_buffered( if(NULL == descriptor) { return OMPI_ERR_OUT_OF_RESOURCE; } - sendreq->descriptor = descriptor; /* hang on to this for later */ + sendreq->req_descriptor = descriptor; /* hang on to this for later */ segment = descriptor->des_src; /* pack the data into the BTL supplied buffer */ @@ -342,7 +399,7 @@ int mca_pml_dr_send_request_start_buffered( descriptor->des_cbfunc = mca_pml_dr_rndv_completion; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; - descriptor->des_cbdata = sendreq; + descriptor->des_cbdata = &sendreq->req_vfrag0; /* buffer the remainder of the message */ rc = mca_pml_base_bsend_request_alloc((ompi_request_t*)sendreq); @@ -396,7 +453,7 @@ int mca_pml_dr_send_request_start_buffered( OPAL_THREAD_UNLOCK(&ompi_request_lock); /* send */ - MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0); + MCA_PML_DR_VFRAG_WDOG_START(&sendreq->req_vfrag0); rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML); if(OMPI_SUCCESS != rc) { mca_bml_base_free(bml_btl, descriptor ); @@ -429,7 +486,7 @@ int mca_pml_dr_send_request_start_copy( if(NULL == descriptor) { return OMPI_ERR_OUT_OF_RESOURCE; } - sendreq->descriptor = descriptor; /* hang on to this for later */ + sendreq->req_descriptor = descriptor; /* hang on to this for later */ segment = descriptor->des_src; /* pack the data into the supplied buffer */ @@ -475,7 +532,7 @@ int mca_pml_dr_send_request_start_copy( /* short message */ descriptor->des_cbfunc = mca_pml_dr_match_completion; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; - descriptor->des_cbdata = sendreq; + descriptor->des_cbdata = &sendreq->req_vfrag0; segment->seg_len = sizeof(mca_pml_dr_match_hdr_t) + max_data; /* signal request completion */ @@ -484,7 +541,7 @@ int mca_pml_dr_send_request_start_copy( OPAL_THREAD_UNLOCK(&ompi_request_lock); /* send */ - MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0); + MCA_PML_DR_VFRAG_WDOG_START(&sendreq->req_vfrag0); rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML); if(OMPI_SUCCESS != rc) { mca_bml_base_free(bml_btl, descriptor ); @@ -518,7 +575,7 @@ int mca_pml_dr_send_request_start_prepare( if(NULL == descriptor) { return OMPI_ERR_OUT_OF_RESOURCE; } - sendreq->descriptor = descriptor; /* hang on to this for later */ + sendreq->req_descriptor = descriptor; /* hang on to this for later */ segment = descriptor->des_src; /* build match header */ @@ -542,7 +599,7 @@ int mca_pml_dr_send_request_start_prepare( /* short message */ descriptor->des_cbfunc = mca_pml_dr_match_completion; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; - descriptor->des_cbdata = sendreq; + descriptor->des_cbdata = &sendreq->req_vfrag0; /* vfrag state */ sendreq->req_vfrag0.vf_size = size; @@ -550,7 +607,7 @@ int mca_pml_dr_send_request_start_prepare( sendreq->req_vfrag0.vf_pending = 1; /* send */ - MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0); + MCA_PML_DR_VFRAG_WDOG_START(&sendreq->req_vfrag0); rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML); if(OMPI_SUCCESS != rc) { mca_bml_base_free(bml_btl, descriptor ); @@ -596,7 +653,7 @@ int mca_pml_dr_send_request_start_rndv( if(NULL == des) { return OMPI_ERR_OUT_OF_RESOURCE; } - sendreq->descriptor = des; /* hang on to this for later */ + sendreq->req_descriptor = des; /* hang on to this for later */ segment = des->des_src; /* build hdr */ @@ -619,7 +676,7 @@ int mca_pml_dr_send_request_start_rndv( /* first fragment of a long message */ des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; - des->des_cbdata = sendreq; + des->des_cbdata = &sendreq->req_vfrag0; des->des_cbfunc = mca_pml_dr_rndv_completion; /* vfrag state */ @@ -629,7 +686,7 @@ int mca_pml_dr_send_request_start_rndv( sendreq->req_vfrag0.vf_pending = 1; /* send */ - MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0); + MCA_PML_DR_VFRAG_WDOG_START(&sendreq->req_vfrag0); rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); if(OMPI_SUCCESS != rc) { mca_bml_base_free(bml_btl, des ); @@ -655,8 +712,117 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) assert(sendreq->req_vfrag0.vf_recv.pval != NULL); if(OPAL_THREAD_ADD32(&sendreq->req_lock,1) == 1) { do { + size_t bytes_remaining; + /* + * VFrags w/ nacks or that timed out + */ + while(opal_list_get_size(&sendreq->req_retrans) && + sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) { + mca_pml_dr_vfrag_t* vfrag; + + OPAL_THREAD_LOCK(&ompi_request_lock); + vfrag = (mca_pml_dr_vfrag_t*)opal_list_get_first(&sendreq->req_retrans); + OPAL_THREAD_UNLOCK(&ompi_request_lock); + if(NULL == vfrag) { + break; + } + + /* + * Retransmit fragments that have not been acked. + */ + while(vfrag->vf_idx < vfrag->vf_len && + sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) { + if(((uint64_t)1 << vfrag->vf_idx) & ~vfrag->vf_ack) { + mca_bml_base_btl_t* bml_btl = vfrag->bml_btl; + mca_pml_dr_frag_hdr_t* hdr; + mca_btl_base_descriptor_t* des; + size_t offset_in_vfrag = vfrag->vf_max_send_size * vfrag->vf_idx; + size_t offset_in_msg = vfrag->vf_offset + offset_in_vfrag; + size_t size; + int rc; + + if(vfrag->vf_idx == vfrag->vf_len - 1) { + size = vfrag->vf_size - offset_in_vfrag; + } else { + size = vfrag->vf_max_send_size; + } + + /* pack into a descriptor */ + ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset_in_msg); + mca_bml_base_prepare_src( + bml_btl, + NULL, + &sendreq->req_send.req_convertor, + sizeof(mca_pml_dr_frag_hdr_t), + &size, + &des + ); + if(des == NULL) { + OPAL_THREAD_LOCK(&ompi_request_lock); + opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); + OPAL_THREAD_UNLOCK(&ompi_request_lock); + break; + } + des->des_cbfunc = mca_pml_dr_frag_completion; + des->des_cbdata = vfrag; + + /* setup header */ + hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval; + hdr->hdr_common.hdr_flags = 0; + hdr->hdr_common.hdr_csum = 0; + hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; + hdr->hdr_common.hdr_vid = vfrag->vf_id; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; + hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; + hdr->hdr_vlen = vfrag->vf_len; + hdr->hdr_frag_idx = vfrag->vf_idx; + hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum; + hdr->hdr_frag_offset = offset_in_msg; + hdr->hdr_src_ptr.pval = vfrag; + hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv; + hdr->hdr_common.hdr_csum = (mca_pml_dr.enable_csum ? + opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t)) : + OPAL_CSUM_ZERO); + + /* adjust number of outstanding operations */ + if(OPAL_THREAD_ADD64(&vfrag->vf_pending, 1) == 1) { + MCA_PML_DR_VFRAG_WDOG_START(vfrag); + } + OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); + + /* adjust send offset - may not have finished scheduling entire vfrag */ + if(offset_in_msg + size > sendreq->req_send_offset) { + sendreq->req_send_offset = offset_in_msg + size; + } + + /* initiate send - note that this may complete before the call returns */ + rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML); + + if(rc == OMPI_SUCCESS) { + bytes_remaining -= size; + } else { + OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1); + mca_bml_base_free(bml_btl,des); + OPAL_THREAD_LOCK(&ompi_request_lock); + opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); + OPAL_THREAD_UNLOCK(&ompi_request_lock); + break; + } + } + vfrag->vf_idx++; + } + + /* remove from retrans list */ + if(vfrag->vf_idx == vfrag->vf_len) { + OPAL_THREAD_LOCK(&ompi_request_lock); + opal_list_remove_item(&sendreq->req_retrans, (opal_list_item_t*)vfrag); + OPAL_THREAD_UNLOCK(&ompi_request_lock); + } + } + /* allocate remaining bytes to BTLs */ - size_t bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_send_offset; + bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_send_offset; while(bytes_remaining > 0 && sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) { @@ -682,7 +848,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) break; } MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,endpoint,bytes_remaining,vfrag); - MCA_PML_DR_VFRAG_WDOG_START(vfrag); vfrag->bml_btl = bml_btl; bytes_sent = 0; @@ -740,7 +905,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) /* update state */ vfrag->vf_idx++; - OPAL_THREAD_ADD64(&vfrag->vf_pending,1); + if(OPAL_THREAD_ADD64(&vfrag->vf_pending,1) == 1) { + MCA_PML_DR_VFRAG_WDOG_START(vfrag); + } sendreq->req_send_offset += size; OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); @@ -758,101 +925,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) OPAL_THREAD_UNLOCK(&ompi_request_lock); break; } - mca_pml_dr_progress(); - } - - /* - * VFrags w/ nacks or that timed out - */ - while(opal_list_get_size(&sendreq->req_retrans) && - sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) { - mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*)opal_list_get_first(&sendreq->req_retrans); - - /* - * Retransmit fragments that have not been acked. - */ - while(vfrag->vf_idx < vfrag->vf_len && - sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) { - if(((uint64_t)1 << vfrag->vf_idx) & ~vfrag->vf_ack) { - mca_bml_base_btl_t* bml_btl = vfrag->bml_btl; - mca_pml_dr_frag_hdr_t* hdr; - mca_btl_base_descriptor_t* des; - size_t offset_in_vfrag = vfrag->vf_max_send_size * vfrag->vf_idx; - size_t offset_in_msg = vfrag->vf_offset + offset_in_vfrag; - size_t size; - int rc; - - vfrag->vf_retry_cnt ++; - if(vfrag->vf_idx == vfrag->vf_len - 1) { - size = vfrag->vf_size - offset_in_vfrag; - } else { - size = vfrag->vf_max_send_size; - } - - /* pack into a descriptor */ - ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset_in_msg); - mca_bml_base_prepare_src( - bml_btl, - NULL, - &sendreq->req_send.req_convertor, - sizeof(mca_pml_dr_frag_hdr_t), - &size, - &des - ); - if(des == NULL) { - OPAL_THREAD_LOCK(&ompi_request_lock); - opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); - OPAL_THREAD_UNLOCK(&ompi_request_lock); - break; - } - des->des_cbfunc = mca_pml_dr_frag_completion; - des->des_cbdata = vfrag; - - /* setup header */ - hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval; - hdr->hdr_common.hdr_flags = 0; - hdr->hdr_common.hdr_csum = 0; - hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG; - hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; - hdr->hdr_common.hdr_vid = vfrag->vf_id; - hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; - hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; - hdr->hdr_vlen = vfrag->vf_len; - hdr->hdr_frag_idx = vfrag->vf_idx; - hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum; - hdr->hdr_frag_offset = offset_in_msg; - hdr->hdr_src_ptr.pval = vfrag; - hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv; - hdr->hdr_common.hdr_csum = (mca_pml_dr.enable_csum ? - opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t)) : - OPAL_CSUM_ZERO); - - OPAL_THREAD_ADD64(&vfrag->vf_pending, 1); - OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); - - /* initiate send - note that this may complete before the call returns */ - rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML); - - if(rc == OMPI_SUCCESS) { - bytes_remaining -= size; - } else { - OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1); - mca_bml_base_free(bml_btl,des); - OPAL_THREAD_LOCK(&ompi_request_lock); - opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); - OPAL_THREAD_UNLOCK(&ompi_request_lock); - break; - } - } - vfrag->vf_idx++; - } - - /* move from retrans to pending list */ - if(vfrag->vf_idx == vfrag->vf_len) { - OPAL_THREAD_LOCK(&ompi_request_lock); - opal_list_remove_item(&sendreq->req_retrans, (opal_list_item_t*)vfrag); - OPAL_THREAD_UNLOCK(&ompi_request_lock); - } } } while (OPAL_THREAD_ADD32(&sendreq->req_lock,-1) > 0); } @@ -878,14 +950,14 @@ void mca_pml_dr_send_request_match_ack( MCA_PML_DR_VFRAG_ACK_STOP(vfrag); /* need to retransmit? */ if(vfrag->vf_ack != vfrag->vf_mask) { - MCA_PML_DR_VFRAG_ACK_START(vfrag); + MCA_PML_DR_VFRAG_WDOG_START(vfrag); MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag); } else { /* if already have local completion free descriptor and complete message */ /* return descriptor */ - if(NULL != sendreq->descriptor) { - mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor ); - sendreq->descriptor = NULL; + if(NULL != sendreq->req_descriptor) { + mca_bml_base_free(sendreq->req_descriptor->des_context, sendreq->req_descriptor ); + sendreq->req_descriptor = NULL; } /* update statistics */ @@ -930,14 +1002,14 @@ void mca_pml_dr_send_request_rndv_ack( /* need to retransmit? */ if(vfrag->vf_ack != vfrag->vf_mask) { /* got a NACK, resend eager data! */ - MCA_PML_DR_VFRAG_ACK_START(vfrag); + MCA_PML_DR_VFRAG_WDOG_START(vfrag); MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag); OPAL_THREAD_UNLOCK(&ompi_request_lock); } else { /* return descriptor of first fragment */ - if(NULL != sendreq->descriptor) { - mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor); - sendreq->descriptor = NULL; + if(NULL != sendreq->req_descriptor) { + mca_bml_base_free(sendreq->req_descriptor->des_context, sendreq->req_descriptor); + sendreq->req_descriptor = NULL; } /* done? */ @@ -1032,4 +1104,3 @@ void mca_pml_dr_send_request_frag_ack( } - diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.h b/ompi/mca/pml/dr/pml_dr_sendreq.h index 67af885292..c809fe7b8a 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.h +++ b/ompi/mca/pml/dr/pml_dr_sendreq.h @@ -58,7 +58,7 @@ struct mca_pml_dr_send_request_t { mca_pml_dr_vfrag_t* req_vfrag; mca_pml_dr_vfrag_t req_vfrag0; opal_list_t req_retrans; - mca_btl_base_descriptor_t* descriptor; /* descriptor for first frag, retransmission */ + mca_btl_base_descriptor_t* req_descriptor; /* descriptor for first frag, retransmission */ }; typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t; @@ -159,12 +159,13 @@ do { bml_btl = mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \ MCA_PML_DR_VFRAG_INIT(&sendreq->req_vfrag0); \ sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&endpoint->vfrag_seq,1); \ + sendreq->req_vfrag0.bml_btl = bml_btl; \ sendreq->req_vfrag = &sendreq->req_vfrag0; \ sendreq->req_endpoint = endpoint; \ sendreq->req_proc = proc; \ \ sendreq->req_lock = 0; \ - sendreq->req_pipeline_depth = 0; \ + sendreq->req_pipeline_depth = 1; \ sendreq->req_bytes_delivered = 0; \ sendreq->req_state = 0; \ sendreq->req_send_offset = 0; \ @@ -315,10 +316,12 @@ do { #define MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq, vfrag) \ do { \ + if(vfrag->vf_idx == vfrag->vf_len || \ + (vfrag->vf_wdog_cnt == 0 && vfrag->vf_ack_cnt == 0)) { \ + opal_list_append(&(sendreq)->req_retrans, (opal_list_item_t*)vfrag); \ + } \ vfrag->vf_idx = 0; \ vfrag->vf_state = 0; \ - opal_output(0, "queuing vfrag for retrans!\n"); \ - opal_list_append(&(sendreq)->req_retrans, (opal_list_item_t*)vfrag); \ } while(0) /* @@ -355,29 +358,25 @@ do { \ #define MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag) \ do { \ - mca_pml_dr_endpoint_t* endpoint = sendreq->req_endpoint; \ - mca_bml_base_btl_t* bml_btl = \ - mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \ mca_btl_base_descriptor_t *des_old, *des_new; \ - \ - if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \ - opal_output(0, "%s:%d,%s retry count exceeded! FATAL", __FILE__, __LINE__, __func__); \ - orte_errmgr.abort(); \ - } \ OPAL_THREAD_ADD64(&vfrag->vf_pending,1); \ - \ OPAL_OUTPUT((0, "%s:%d:%s, retransmitting eager\n", __FILE__, __LINE__, __func__)); \ - assert(sendreq->descriptor->des_src != NULL); \ - MCA_PML_DR_VFRAG_RESET(vfrag); \ - des_old = sendreq->descriptor; \ - mca_bml_base_alloc(bml_btl, &des_new, des_old->des_src->seg_len);\ + assert(sendreq->req_descriptor->des_src != NULL); \ + \ + OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,1); \ + OPAL_THREAD_ADD64(&(vfrag)->vf_pending,1); \ + (vfrag)->vf_state &= ~MCA_PML_DR_VFRAG_NACKED; \ + \ + des_old = sendreq->req_descriptor; \ + mca_bml_base_alloc(vfrag->bml_btl, &des_new, des_old->des_src->seg_len);\ + sendreq->req_descriptor = des_new; \ memcpy(des_new->des_src->seg_addr.pval, \ des_old->des_src->seg_addr.pval, \ des_old->des_src->seg_len); \ des_new->des_flags = des_old->des_flags; \ des_new->des_cbdata = des_old->des_cbdata; \ des_new->des_cbfunc = des_old->des_cbfunc; \ - mca_bml_base_send(bml_btl, des_new, MCA_BTL_TAG_PML); \ + mca_bml_base_send(vfrag->bml_btl, des_new, MCA_BTL_TAG_PML); \ } while(0) /* @@ -392,18 +391,15 @@ do { \ mca_btl_base_descriptor_t *des_old, *des_new; \ mca_pml_dr_hdr_t *hdr; \ \ - if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \ - opal_output(0, "%s:%d,%s retry count exceeded! FATAL", __FILE__, __LINE__, __func__); \ - orte_errmgr.abort(); \ - } \ - OPAL_THREAD_ADD64(&vfrag->vf_pending,1); \ - \ opal_output(0, "%s:%d:%s, (re)transmitting rndv probe\n", __FILE__, __LINE__, __func__); \ - assert(sendreq->descriptor->des_src != NULL); \ - MCA_PML_DR_VFRAG_RESET(vfrag); \ + OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,1); \ + OPAL_THREAD_ADD64(&vfrag->vf_pending,1); \ + (vfrag)->vf_state &= ~MCA_PML_DR_VFRAG_NACKED; \ + \ + assert(sendreq->req_descriptor->des_src != NULL); \ mca_bml_base_alloc(bml_btl, &des_new, \ sizeof(mca_pml_dr_rendezvous_hdr_t)); \ - des_old = sendreq->descriptor; \ + des_old = sendreq->req_descriptor; \ /* build hdr */ \ hdr = (mca_pml_dr_hdr_t*)des_new->des_src->seg_addr.pval; \ hdr->hdr_common.hdr_flags = 0; \ diff --git a/ompi/mca/pml/dr/pml_dr_vfrag.c b/ompi/mca/pml/dr/pml_dr_vfrag.c index 8d2f27637d..9dba2d71e9 100644 --- a/ompi/mca/pml/dr/pml_dr_vfrag.c +++ b/ompi/mca/pml/dr/pml_dr_vfrag.c @@ -19,10 +19,12 @@ #include "ompi_config.h" #include "pml_dr_vfrag.h" #include "pml_dr_sendreq.h" +#include "ompi/mca/bml/base/base.h" #include "orte/mca/errmgr/errmgr.h" -void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* vfrag); -void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* vfrag); +static void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* vfrag); +static void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* vfrag); + static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag) { @@ -36,14 +38,13 @@ static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag) vfrag->vf_max_send_size = 0; vfrag->vf_ack = 0; vfrag->vf_mask = 1; - vfrag->vf_retry_cnt = 0; vfrag->vf_state = 0; - vfrag->tv_wdog.tv_sec = mca_pml_dr.timer_wdog_sec; - vfrag->tv_wdog.tv_usec = mca_pml_dr.timer_wdog_usec; - vfrag->tv_ack.tv_sec = mca_pml_dr.timer_ack_usec; - vfrag->tv_ack.tv_usec = mca_pml_dr.timer_ack_usec; - opal_evtimer_set(&vfrag->ev_wdog, mca_pml_dr_vfrag_wdog_timeout, (void*) vfrag); - opal_evtimer_set(&vfrag->ev_ack, mca_pml_dr_vfrag_ack_timeout, (void*) vfrag); + vfrag->vf_wdog_tv = mca_pml_dr.wdog_timer; + vfrag->vf_ack_tv = mca_pml_dr.ack_timer; + vfrag->vf_wdog_cnt = 0; + vfrag->vf_ack_cnt = 0; + opal_evtimer_set(&vfrag->vf_wdog_ev, mca_pml_dr_vfrag_wdog_timeout, (void*) vfrag); + opal_evtimer_set(&vfrag->vf_ack_ev, mca_pml_dr_vfrag_ack_timeout, (void*) vfrag); } @@ -64,47 +65,120 @@ OBJ_CLASS_INSTANCE( /** * The wdog timer expired, better do something about it, like resend the current part of the vfrag */ -void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data) +static void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data) { mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data; mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; - OPAL_OUTPUT((0, "%s:%d:%s: wdog timeout!", __FILE__, __LINE__, __func__)); - if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_wdog_max_count) { - opal_output(0, "%s:%d:%s retry count exceeded! FATAL", __FILE__, __LINE__, __func__); - orte_errmgr.abort(); - } - /* back off watchdog timer */ - vfrag->tv_wdog.tv_sec = - mca_pml_dr.timer_wdog_sec + - mca_pml_dr.timer_wdog_sec * mca_pml_dr.timer_wdog_multiplier * - vfrag->vf_retry_cnt; - vfrag->tv_wdog.tv_usec = - mca_pml_dr.timer_wdog_usec + - mca_pml_dr.timer_wdog_usec * mca_pml_dr.timer_wdog_multiplier * - vfrag->vf_retry_cnt; - MCA_PML_DR_VFRAG_WDOG_START(vfrag); + OPAL_OUTPUT((0, "%s:%d:%s: wdog timeout: 0x%08x", __FILE__, __LINE__, __func__, vfrag)); - /* retransmit vfrag */ - OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq, vfrag); - OPAL_THREAD_UNLOCK(&ompi_request_lock); - mca_pml_dr_send_request_schedule(sendreq); + /* update pending counts */ + OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,-vfrag->vf_pending); + OPAL_THREAD_ADD64(&vfrag->vf_pending,-vfrag->vf_pending); + + /* check for hung btl */ + if(++vfrag->vf_wdog_cnt == mca_pml_dr.wdog_retry_max) { + /* declare btl dead */ + opal_output(0, "%s:%d:%s: failing BTL: %s", __FILE__, __LINE__, __func__, + vfrag->bml_btl->btl->btl_component->btl_version.mca_component_name); + mca_bml.bml_del_btl(vfrag->bml_btl->btl); + mca_pml_dr_vfrag_reset(vfrag); + } + + /* back off watchdog timer */ + vfrag->vf_wdog_tv.tv_sec = + mca_pml_dr.wdog_timer.tv_sec + + mca_pml_dr.wdog_timer.tv_sec * mca_pml_dr.wdog_timer_multiplier * + vfrag->vf_wdog_cnt; + vfrag->vf_wdog_tv.tv_usec = + mca_pml_dr.wdog_timer.tv_usec + + mca_pml_dr.wdog_timer.tv_usec * mca_pml_dr.wdog_timer_multiplier * + vfrag->vf_wdog_cnt; + + /* reschedule vfrag */ + mca_pml_dr_vfrag_reschedule(vfrag); } + /** * The ack timer expired, better do something about it, like resend the entire vfrag? */ -void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) { +static void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) +{ mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data; - mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; - opal_output(0, "%s:%d:%s: ack timeout!", __FILE__, __LINE__, __func__); + OPAL_OUTPUT((0, "%s:%d:%s: ack timeout: %0x08x", __FILE__, __LINE__, __func__, vfrag)); - OPAL_THREAD_LOCK(&ompi_request_lock); + /* stop ack timer */ + MCA_PML_DR_VFRAG_ACK_STOP(vfrag); + + /* check for hung btl */ + if(++vfrag->vf_ack_cnt == mca_pml_dr.ack_retry_max) { + /* declare btl dead */ + opal_output(0, "%s:%d:%s: failing BTL: %s", __FILE__, __LINE__, __func__, + vfrag->bml_btl->btl->btl_component->btl_version.mca_component_name); + mca_bml.bml_del_btl(vfrag->bml_btl->btl); + mca_pml_dr_vfrag_reset(vfrag); + } + + /* back off ack timer */ + vfrag->vf_ack_tv.tv_sec = + mca_pml_dr.ack_timer.tv_sec + + mca_pml_dr.ack_timer.tv_sec * mca_pml_dr.ack_timer_multiplier * + vfrag->vf_ack_cnt; + vfrag->vf_ack_tv.tv_usec = + mca_pml_dr.ack_timer.tv_usec + + mca_pml_dr.ack_timer.tv_usec * mca_pml_dr.ack_timer_multiplier * + vfrag->vf_ack_cnt; + + /* reschedule vfrag */ + mca_pml_dr_vfrag_reschedule(vfrag); +} + +/** + * Vfrag failure - declare btl dead and try to resend on an alternate btl + */ + +void mca_pml_dr_vfrag_reset(mca_pml_dr_vfrag_t* vfrag) +{ + mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; + + /* update counters - give new BTL a fair chance :-) */ + vfrag->vf_ack_cnt = 0; + vfrag->vf_wdog_cnt = 0; + + /* lookup new bml_btl data structure */ + sendreq->req_endpoint = (mca_pml_dr_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml; + + /* make sure a path is available */ + if(mca_bml_base_btl_array_get_size(&sendreq->req_endpoint->base.btl_eager) == 0 || + mca_bml_base_btl_array_get_size(&sendreq->req_endpoint->base.btl_eager) == 0) { + opal_output(0, "%s:%d:%s: no path to peer", __FILE__, __LINE__, __func__); + orte_errmgr.abort(); + } + if(vfrag->vf_offset == 0) { + vfrag->bml_btl = mca_bml_base_btl_array_get_next(&sendreq->req_endpoint->base.btl_eager); + } else { + vfrag->bml_btl = mca_bml_base_btl_array_get_next(&sendreq->req_endpoint->base.btl_send); + } + opal_output(0, "%s:%d:%s: selected new BTL: %s", __FILE__, __LINE__, __func__, + vfrag->bml_btl->btl->btl_component->btl_version.mca_component_name); +} + + +/** + * Reschedule vfrag that has timed out + */ + +void mca_pml_dr_vfrag_reschedule(mca_pml_dr_vfrag_t* vfrag) +{ + mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; + + /* start wdog timer */ + MCA_PML_DR_VFRAG_WDOG_START(vfrag); /* first frag within send request */ + OPAL_THREAD_LOCK(&ompi_request_lock); if(vfrag == &sendreq->req_vfrag0) { - MCA_PML_DR_VFRAG_ACK_START(vfrag); if(vfrag->vf_state & MCA_PML_DR_VFRAG_RNDV) { MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag); } else { @@ -116,9 +190,7 @@ void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) { } else { MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq, vfrag); OPAL_THREAD_UNLOCK(&ompi_request_lock); - MCA_PML_DR_VFRAG_WDOG_START(vfrag); mca_pml_dr_send_request_schedule(sendreq); } } - diff --git a/ompi/mca/pml/dr/pml_dr_vfrag.h b/ompi/mca/pml/dr/pml_dr_vfrag.h index 9c8a42ce9c..aa02afa4f4 100644 --- a/ompi/mca/pml/dr/pml_dr_vfrag.h +++ b/ompi/mca/pml/dr/pml_dr_vfrag.h @@ -39,7 +39,6 @@ struct mca_pml_dr_vfrag_t { uint32_t vf_id; uint16_t vf_idx; uint16_t vf_len; - uint8_t vf_retry_cnt; size_t vf_offset; size_t vf_size; size_t vf_max_send_size; @@ -54,13 +53,13 @@ struct mca_pml_dr_vfrag_t { operation 2) a timeout for ACK of the VRAG */ - struct timeval tv_wdog; - struct timeval tv_ack; - opal_event_t ev_ack; - opal_event_t ev_wdog; - uint8_t cnt_wdog; - uint8_t cnt_ack; - uint8_t cnt_nack; + struct timeval vf_wdog_tv; + opal_event_t vf_wdog_ev; + uint8_t vf_wdog_cnt; + + struct timeval vf_ack_tv; + opal_event_t vf_ack_ev; + uint8_t vf_ack_cnt; }; typedef struct mca_pml_dr_vfrag_t mca_pml_dr_vfrag_t; @@ -83,16 +82,13 @@ do { \ do { \ (vfrag)->vf_idx = 0; \ (vfrag)->vf_ack = 0; \ - (vfrag)->vf_retry_cnt = 0; \ + (vfrag)->vf_wdog_cnt = 0; \ + (vfrag)->vf_ack_cnt = 0; \ (vfrag)->vf_recv.pval = NULL; \ (vfrag)->vf_state = 0; \ (vfrag)->vf_pending = 0; \ -} while(0) - -#define MCA_PML_DR_VFRAG_RESET(vfrag) \ -do { \ - (vfrag)->vf_idx = 0; \ - (vfrag)->vf_state &= ~MCA_PML_DR_VFRAG_NACKED; \ + (vfrag)->vf_wdog_tv = mca_pml_dr.wdog_timer; \ + (vfrag)->vf_ack_tv = mca_pml_dr.ack_timer; \ } while(0) @@ -102,18 +98,18 @@ do { \ #define MCA_PML_DR_VFRAG_WDOG_START(vfrag) \ do { \ - opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \ + opal_event_add(&(vfrag)->vf_wdog_ev, &(vfrag)->vf_wdog_tv); \ } while(0) #define MCA_PML_DR_VFRAG_WDOG_STOP(vfrag) \ do { \ - opal_event_del(&vfrag->ev_wdog); \ + opal_event_del(&(vfrag)->vf_wdog_ev); \ } while(0) #define MCA_PML_DR_VFRAG_WDOG_RESET(vfrag) \ do { \ - opal_event_del(&vfrag->ev_wdog); \ - opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \ + opal_event_del(&(vfrag)->vf_wdog_ev); \ + opal_event_add(&(vfrag)->vf_wdog_ev, &vfrag->vf_wdog_tv); \ } while(0) @@ -123,20 +119,12 @@ do { \ #define MCA_PML_DR_VFRAG_ACK_START(vfrag) \ do { \ - (vfrag)->tv_ack.tv_sec = \ - mca_pml_dr.timer_ack_sec + \ - mca_pml_dr.timer_ack_sec * mca_pml_dr.timer_ack_multiplier * \ - (vfrag)->vf_retry_cnt; \ - (vfrag)->tv_ack.tv_usec = \ - mca_pml_dr.timer_ack_usec + \ - mca_pml_dr.timer_ack_usec * mca_pml_dr.timer_ack_multiplier * \ - (vfrag)->vf_retry_cnt; \ - opal_event_add(&(vfrag)->ev_ack, &(vfrag)->tv_ack); \ + opal_event_add(&(vfrag)->vf_ack_ev, &(vfrag)->vf_ack_tv); \ } while(0) #define MCA_PML_DR_VFRAG_ACK_STOP(vfrag) \ do { \ - opal_event_del(&vfrag->ev_ack); \ + opal_event_del(&vfrag->vf_ack_ev); \ } while(0) #define MCA_PML_DR_VFRAG_ACK_RESET(vfrag) \ @@ -145,6 +133,19 @@ do { \ MCA_PML_DR_VFRAG_ACK_START(vfrag); \ } while(0) + +/** + * Reset a VFRAG to use a new BTL + */ + +void mca_pml_dr_vfrag_reset(mca_pml_dr_vfrag_t*); + +/** + * Reschedule a vfrag that has timed out + */ + +void mca_pml_dr_vfrag_reschedule(mca_pml_dr_vfrag_t*); + #if defined(c_plusplus) || defined(__cplusplus) } #endif