rework rndv and eager data timeout/retrans
This commit was SVN r9358.
Этот коммит содержится в:
родитель
c7ee5e13bc
Коммит
bcb23dc762
@ -121,9 +121,24 @@ void mca_pml_dr_recv_frag_callback(
|
||||
{
|
||||
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_rendezvous_hdr_t);
|
||||
MCA_PML_DR_RECV_FRAG_CHECK_DUP(hdr, duplicate);
|
||||
if(false == duplicate) {
|
||||
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
|
||||
|
||||
if(!duplicate) {
|
||||
if(hdr->hdr_rndv.hdr_msg_length && segments->seg_len) {
|
||||
/* no eager data on nonzero length message, this is a probe!
|
||||
we haven't seen the eager data, so nack and force retransmission*/
|
||||
ompi_communicator_t *comm_ptr = ompi_comm_lookup(hdr->hdr_common.hdr_ctx);
|
||||
mca_pml_dr_comm_t *comm = (mca_pml_dr_comm_t*) comm_ptr->c_pml_comm;
|
||||
mca_pml_dr_comm_proc_t *proc = comm->procs + hdr->hdr_common.hdr_src;
|
||||
mca_pml_dr_recv_frag_send_ack(proc->ompi_proc,
|
||||
&hdr->hdr_common,
|
||||
hdr->hdr_match.hdr_src_ptr,
|
||||
0);
|
||||
OPAL_OUTPUT((0, "%s:%d: nacking PROBE, haven't seen EAGER data yet!\n", __FILE__, __LINE__));
|
||||
} else {
|
||||
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
|
||||
}
|
||||
} else {
|
||||
/* must check the the pending receive list! */
|
||||
OPAL_OUTPUT((0, "%s:%d: dropping duplicate fragment\n", __FILE__, __LINE__));
|
||||
}
|
||||
break;
|
||||
@ -611,14 +626,6 @@ rematch:
|
||||
if(match != NULL) {
|
||||
mca_pml_dr_recv_request_progress(match,btl,segments,num_segments);
|
||||
}
|
||||
else {
|
||||
/* no need to csum, if it wasn't matched and
|
||||
csum failed, we already nack'd it */
|
||||
mca_pml_dr_recv_frag_send_ack(ompi_proc,
|
||||
&hdr->hdr_common,
|
||||
hdr->hdr_src_ptr,
|
||||
1);
|
||||
}
|
||||
if(additional_match) {
|
||||
opal_list_item_t* item;
|
||||
while(NULL != (item = opal_list_remove_first(&additional_matches))) {
|
||||
|
@ -106,6 +106,12 @@ static void mca_pml_dr_match_completion(
|
||||
mca_pml_dr_send_request_t* sendreq = descriptor->des_cbdata;
|
||||
mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0;
|
||||
|
||||
if(descriptor != sendreq->descriptor) {
|
||||
/* only the cached descriptor needs to hang around
|
||||
for retransmission */
|
||||
mca_bml_base_free(descriptor->des_context, descriptor);
|
||||
}
|
||||
|
||||
/* check completion status */
|
||||
if(OMPI_SUCCESS != status) {
|
||||
/* TSW - FIX */
|
||||
@ -135,7 +141,7 @@ static void mca_pml_dr_match_completion(
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
} else if(vfrag->vf_retrans) {
|
||||
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
|
||||
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
|
||||
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
|
||||
}
|
||||
}
|
||||
|
||||
@ -154,6 +160,12 @@ static void mca_pml_dr_rndv_completion(
|
||||
mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0;
|
||||
bool schedule = false;
|
||||
|
||||
if(descriptor != sendreq->descriptor) {
|
||||
/* only the cached descriptor needs to hang around
|
||||
for retransmission */
|
||||
mca_bml_base_free(descriptor->des_context, descriptor);
|
||||
}
|
||||
|
||||
/* check completion status */
|
||||
if(OMPI_SUCCESS != status) {
|
||||
/* TSW - FIX */
|
||||
@ -191,7 +203,7 @@ static void mca_pml_dr_rndv_completion(
|
||||
/* negative ack - need to retransmit? */
|
||||
} else if(vfrag->vf_retrans) {
|
||||
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
|
||||
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
|
||||
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
|
||||
}
|
||||
|
||||
/* check for pending requests */
|
||||
@ -852,7 +864,7 @@ void mca_pml_dr_send_request_match_ack(
|
||||
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
|
||||
/* need to retransmit? */
|
||||
if(vfrag->vf_ack != vfrag->vf_mask) {
|
||||
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
|
||||
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
|
||||
} else {
|
||||
/* if already have local completion free descriptor and complete message */
|
||||
/* return descriptor */
|
||||
@ -907,7 +919,8 @@ void mca_pml_dr_send_request_rndv_ack(
|
||||
|
||||
/* need to retransmit? */
|
||||
if(vfrag->vf_ack != vfrag->vf_mask) {
|
||||
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
|
||||
/* got a NACK, resend eager data! */
|
||||
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
|
||||
} else {
|
||||
/* return descriptor of first fragment */
|
||||
if(NULL != sendreq->descriptor) {
|
||||
|
@ -350,25 +350,78 @@ do { \
|
||||
* Requeue first fragment of message for retransmission
|
||||
*/
|
||||
|
||||
#define MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag) \
|
||||
#define MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag) \
|
||||
do { \
|
||||
mca_bml_base_btl_t* bml_btl = sendreq->descriptor->des_context; \
|
||||
mca_bml_base_endpoint_t* endpoint = \
|
||||
(mca_bml_base_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml; \
|
||||
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
|
||||
mca_btl_base_descriptor_t *des_old, *des_new; \
|
||||
vfrag->vf_retry_cnt ++; \
|
||||
if(vfrag->vf_retry_cnt > mca_pml_dr.timer_wdog_max_count) { \
|
||||
if(vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \
|
||||
opal_output(0, "%s:%d,%s retry count exceeded! FATAL", __FILE__, __LINE__, __func__); \
|
||||
orte_errmgr.abort(); \
|
||||
} \
|
||||
\
|
||||
opal_output(0, "%s:%d:%s, retransmitting\n", __FILE__, __LINE__, __func__); \
|
||||
opal_output(0, "%s:%d:%s, retransmitting eager\n", __FILE__, __LINE__, __func__); \
|
||||
assert(sendreq->descriptor->des_src != NULL); \
|
||||
vfrag->vf_idx = 0; \
|
||||
vfrag->vf_mask_processed = 0; \
|
||||
vfrag->vf_ack = 0; \
|
||||
vfrag->vf_retrans = 0; \
|
||||
MCA_PML_DR_VFRAG_RESET(vfrag); \
|
||||
des_old = sendreq->descriptor; \
|
||||
mca_bml_base_alloc(bml_btl, &des_new, des_old->des_src->seg_len);\
|
||||
memcpy(des_new->des_src->seg_addr.pval, \
|
||||
des_old->des_src->seg_addr.pval, \
|
||||
des_old->des_src->seg_len); \
|
||||
des_new->des_flags = des_old->des_flags; \
|
||||
des_new->des_cbdata = des_old->des_cbdata; \
|
||||
des_new->des_cbfunc = des_old->des_cbfunc; \
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
|
||||
MCA_PML_DR_VFRAG_ACK_START(vfrag); \
|
||||
mca_bml_base_send(bml_btl, sendreq->descriptor, MCA_BTL_TAG_PML);\
|
||||
mca_bml_base_send(bml_btl, des_new, MCA_BTL_TAG_PML); \
|
||||
} while(0)
|
||||
|
||||
/*
|
||||
* Requeue first fragment of message for retransmission
|
||||
*/
|
||||
|
||||
#define MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag) \
|
||||
do { \
|
||||
mca_bml_base_endpoint_t* endpoint = \
|
||||
(mca_bml_base_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml; \
|
||||
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
|
||||
mca_btl_base_descriptor_t *des_old, *des_new; \
|
||||
mca_pml_dr_hdr_t *hdr; \
|
||||
vfrag->vf_retry_cnt ++; \
|
||||
if(vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \
|
||||
opal_output(0, "%s:%d,%s retry count exceeded! FATAL", __FILE__, __LINE__, __func__); \
|
||||
orte_errmgr.abort(); \
|
||||
} \
|
||||
\
|
||||
opal_output(0, "%s:%d:%s, (re)transmitting rndv probe\n", __FILE__, __LINE__, __func__); \
|
||||
assert(sendreq->descriptor->des_src != NULL); \
|
||||
MCA_PML_DR_VFRAG_RESET(vfrag); \
|
||||
mca_bml_base_alloc(bml_btl, &des_new, \
|
||||
sizeof(mca_pml_dr_rendezvous_hdr_t)); \
|
||||
/* build hdr */ \
|
||||
hdr = (mca_pml_dr_hdr_t*)des_new->des_src->seg_addr.pval; \
|
||||
hdr->hdr_common.hdr_flags = 0; \
|
||||
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV; \
|
||||
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; \
|
||||
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; \
|
||||
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; \
|
||||
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \
|
||||
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \
|
||||
hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0; \
|
||||
hdr->hdr_match.hdr_csum = OPAL_CSUM_ZERO; \
|
||||
hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id; \
|
||||
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; \
|
||||
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t)); \
|
||||
des_new->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; \
|
||||
des_new->des_cbdata = sendreq; \
|
||||
des_new->des_cbfunc = mca_pml_dr_rndv_completion; \
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
|
||||
MCA_PML_DR_VFRAG_ACK_START(vfrag); \
|
||||
mca_bml_base_send(bml_btl, des_new, MCA_BTL_TAG_PML); \
|
||||
} while(0)
|
||||
|
||||
/**
|
||||
* Start the specified request
|
||||
*/
|
||||
|
@ -96,12 +96,9 @@ void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) {
|
||||
|
||||
if(0 == vfrag->vf_offset) { /* this is the first part of the message
|
||||
that we need to resend */
|
||||
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
|
||||
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
|
||||
} else {
|
||||
vfrag->vf_idx = 0;
|
||||
vfrag->vf_mask_processed = 0;
|
||||
vfrag->vf_ack = 0;
|
||||
vfrag->vf_retrans = 0;
|
||||
MCA_PML_DR_VFRAG_RESET(vfrag);
|
||||
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
mca_pml_dr_send_request_schedule(sendreq);
|
||||
|
@ -77,6 +77,13 @@ do { \
|
||||
OMPI_FREE_LIST_RETURN(&mca_pml_dr.vfrags, (opal_list_item_t*)vfrag); \
|
||||
} while(0)
|
||||
|
||||
#define MCA_PML_DR_VFRAG_RESET(vfrag) \
|
||||
do { \
|
||||
vfrag->vf_idx = 0; \
|
||||
vfrag->vf_mask_processed = 0; \
|
||||
vfrag->vf_ack = 0; \
|
||||
vfrag->vf_retrans = 0; \
|
||||
} while(0)
|
||||
#if 1
|
||||
|
||||
#define MCA_PML_DR_VFRAG_WDOG_START(vfrag) \
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user