corrections to scheduling logic
This commit was SVN r9354.
Этот коммит содержится в:
родитель
0750a8a118
Коммит
7a1ad5b6fb
@ -124,7 +124,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
if(false == duplicate) {
|
||||
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
|
||||
} else {
|
||||
OPAL_OUTPUT((0, "%s:%d: dropping duplicate fragment\n"));
|
||||
OPAL_OUTPUT((0, "%s:%d: dropping duplicate fragment\n", __FILE__, __LINE__));
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -676,7 +676,6 @@ void mca_pml_dr_recv_frag_send_ack(
|
||||
mca_bml_base_free(bml_btl, des);
|
||||
goto retry;
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
/* queue request to retry later */
|
||||
|
@ -205,7 +205,6 @@ static void mca_pml_dr_recv_request_ack(
|
||||
}
|
||||
|
||||
mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_common.hdr_vid);
|
||||
|
||||
return;
|
||||
|
||||
/* queue request to retry later */
|
||||
|
@ -69,7 +69,7 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req)
|
||||
OBJ_CONSTRUCT(&req->req_retrans, opal_list_t);
|
||||
|
||||
req->req_vfrag0.vf_len = 1;
|
||||
req->req_vfrag0.vf_idx = 1;
|
||||
req->req_vfrag0.vf_idx = 0;
|
||||
req->req_vfrag0.vf_ack = 0;
|
||||
req->req_vfrag0.vf_mask = 1;
|
||||
req->req_vfrag0.vf_mask_processed = 0;
|
||||
@ -122,13 +122,14 @@ static void mca_pml_dr_match_completion(
|
||||
/* been acked? */
|
||||
if(vfrag->vf_ack == vfrag->vf_mask) {
|
||||
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
|
||||
|
||||
/* return descriptor */
|
||||
if(NULL != sendreq->descriptor) {
|
||||
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
|
||||
sendreq->descriptor = NULL;
|
||||
}
|
||||
|
||||
/* update statistics */
|
||||
/* update statistics and complete */
|
||||
sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed;
|
||||
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
@ -136,7 +137,6 @@ static void mca_pml_dr_match_completion(
|
||||
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
|
||||
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
@ -166,29 +166,33 @@ static void mca_pml_dr_rndv_completion(
|
||||
/* local completion */
|
||||
vfrag->vf_mask_processed |= 0x1;
|
||||
|
||||
/* been acked? */
|
||||
/* positive ack? */
|
||||
if(vfrag->vf_ack == vfrag->vf_mask) {
|
||||
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
|
||||
if(sendreq->descriptor) {
|
||||
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
|
||||
sendreq->descriptor = NULL;
|
||||
}
|
||||
sendreq->req_bytes_delivered = vfrag->vf_size;
|
||||
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
|
||||
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
} else {
|
||||
schedule = true;
|
||||
|
||||
/* matched at peer? */
|
||||
if(sendreq->req_matched) {
|
||||
sendreq->req_bytes_delivered = vfrag->vf_size;
|
||||
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
|
||||
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
} else {
|
||||
schedule = true;
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
if(schedule) {
|
||||
mca_pml_dr_send_request_schedule(sendreq);
|
||||
}
|
||||
|
||||
/* negative ack - need to retransmit? */
|
||||
} else if(vfrag->vf_retrans) {
|
||||
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
|
||||
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* check for pending requests */
|
||||
MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING();
|
||||
@ -238,6 +242,7 @@ static void mca_pml_dr_frag_completion(
|
||||
|
||||
/* check to see if we need to schedule the remainder of the message */
|
||||
sendreq->req_bytes_delivered += vfrag->vf_size;
|
||||
assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed);
|
||||
|
||||
/* return this vfrag */
|
||||
MCA_PML_DR_VFRAG_RETURN(vfrag);
|
||||
@ -734,7 +739,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
|
||||
while(opal_list_get_size(&sendreq->req_retrans) &&
|
||||
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
|
||||
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*)opal_list_get_first(&sendreq->req_retrans);
|
||||
vfrag->vf_retry_cnt ++;
|
||||
|
||||
/*
|
||||
* Retransmit fragments that have not been acked.
|
||||
@ -745,20 +749,20 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
|
||||
mca_bml_base_btl_t* bml_btl = vfrag->bml_btl;
|
||||
mca_pml_dr_frag_hdr_t* hdr;
|
||||
mca_btl_base_descriptor_t* des;
|
||||
size_t offset = vfrag->vf_offset + (vfrag->vf_max_send_size * vfrag->vf_idx);
|
||||
size_t offset_in_vfrag = vfrag->vf_max_send_size * vfrag->vf_idx;
|
||||
size_t offset_in_msg = vfrag->vf_offset + offset_in_vfrag;
|
||||
size_t size;
|
||||
int rc;
|
||||
|
||||
|
||||
|
||||
vfrag->vf_retry_cnt ++;
|
||||
if(vfrag->vf_idx == vfrag->vf_len - 1) {
|
||||
size = vfrag->vf_size - offset;
|
||||
size = vfrag->vf_size - offset_in_vfrag;
|
||||
} else {
|
||||
size = vfrag->vf_max_send_size;
|
||||
}
|
||||
|
||||
/* pack into a descriptor */
|
||||
ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset);
|
||||
ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset_in_msg);
|
||||
mca_bml_base_prepare_src(
|
||||
bml_btl,
|
||||
NULL,
|
||||
@ -794,8 +798,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
|
||||
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
|
||||
|
||||
/* update state */
|
||||
vfrag->vf_idx++;
|
||||
sendreq->req_send_offset += size;
|
||||
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
|
||||
|
||||
/* reset the vfrag watchdog timer due to retransmission */
|
||||
@ -807,7 +809,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
|
||||
if(rc == OMPI_SUCCESS) {
|
||||
bytes_remaining -= size;
|
||||
} else {
|
||||
vfrag->vf_idx--;
|
||||
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1);
|
||||
mca_bml_base_free(bml_btl,des);
|
||||
OPAL_THREAD_LOCK(&mca_pml_dr.lock);
|
||||
@ -846,10 +847,11 @@ void mca_pml_dr_send_request_match_ack(
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
assert(vfrag->vf_ack == 0);
|
||||
|
||||
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
|
||||
if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
|
||||
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
|
||||
/* need to retransmit? */
|
||||
if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) {
|
||||
if(vfrag->vf_ack != vfrag->vf_mask) {
|
||||
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
|
||||
} else {
|
||||
/* if already have local completion free descriptor and complete message */
|
||||
@ -862,21 +864,20 @@ void mca_pml_dr_send_request_match_ack(
|
||||
/* do NOT complete message until matched at peer */
|
||||
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
|
||||
/* update statistics */
|
||||
sendreq->req_matched = true;
|
||||
sendreq->req_bytes_delivered = vfrag->vf_size;
|
||||
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
}
|
||||
|
||||
|
||||
/* wait for local completion */
|
||||
} else {
|
||||
/* need to retransmit? */
|
||||
if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) {
|
||||
if(vfrag->vf_ack != vfrag->vf_mask) {
|
||||
vfrag->vf_retrans = vfrag->vf_mask;
|
||||
} else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
|
||||
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
|
||||
|
||||
} else if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
|
||||
sendreq->req_matched = true;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
}
|
||||
@ -895,22 +896,28 @@ void mca_pml_dr_send_request_rndv_ack(
|
||||
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
|
||||
/* set acked bits */
|
||||
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
|
||||
|
||||
/* local completion? */
|
||||
if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
|
||||
bool schedule = false;
|
||||
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
|
||||
|
||||
|
||||
/* need to retransmit? */
|
||||
if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) {
|
||||
if(vfrag->vf_ack != vfrag->vf_mask) {
|
||||
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
|
||||
} else {
|
||||
/* acked and local completion */
|
||||
/* return descriptor for the first fragment */
|
||||
/* return descriptor of first fragment */
|
||||
if(NULL != sendreq->descriptor) {
|
||||
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
|
||||
sendreq->descriptor = NULL;
|
||||
}
|
||||
|
||||
/* do NOT schedule remainder of message until matched at peer */
|
||||
/* matched at peer? */
|
||||
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
|
||||
sendreq->req_matched = true;
|
||||
sendreq->req_bytes_delivered = vfrag->vf_size;
|
||||
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
|
||||
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
@ -918,9 +925,6 @@ void mca_pml_dr_send_request_rndv_ack(
|
||||
vfrag->vf_recv = ack->hdr_dst_ptr;
|
||||
schedule = true;
|
||||
}
|
||||
|
||||
/* vfrag has been matched at peer */
|
||||
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
@ -928,18 +932,20 @@ void mca_pml_dr_send_request_rndv_ack(
|
||||
mca_pml_dr_send_request_schedule(sendreq);
|
||||
}
|
||||
}
|
||||
|
||||
/* wait for local completion */
|
||||
} else {
|
||||
|
||||
/* need to retransmit? */
|
||||
if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) {
|
||||
if(vfrag->vf_ack != vfrag->vf_mask) {
|
||||
vfrag->vf_retrans = vfrag->vf_mask;
|
||||
} else {
|
||||
/* may need this to schedule rest of the message */
|
||||
vfrag->vf_recv = ack->hdr_dst_ptr;
|
||||
|
||||
/* dont set ack until matched at peer */
|
||||
/* matched at peer? */
|
||||
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
|
||||
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
|
||||
sendreq->req_matched = true;
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
@ -968,7 +974,7 @@ void mca_pml_dr_send_request_frag_ack(
|
||||
|
||||
/* reset local completion flags to only those that have been successfully acked */
|
||||
vfrag->vf_mask_processed = vfrag->vf_ack;
|
||||
vfrag->vf_idx = 1;
|
||||
vfrag->vf_idx = 0;
|
||||
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
|
||||
schedule = true;
|
||||
|
||||
@ -977,6 +983,7 @@ void mca_pml_dr_send_request_frag_ack(
|
||||
|
||||
/* update statistics */
|
||||
sendreq->req_bytes_delivered += vfrag->vf_size;
|
||||
assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed);
|
||||
|
||||
/* return vfrag */
|
||||
MCA_PML_DR_VFRAG_RETURN(vfrag);
|
||||
|
@ -53,6 +53,7 @@ struct mca_pml_dr_send_request_t {
|
||||
size_t req_pipeline_depth;
|
||||
size_t req_bytes_delivered;
|
||||
size_t req_send_offset;
|
||||
bool req_matched;
|
||||
|
||||
mca_pml_dr_vfrag_t* req_vfrag;
|
||||
mca_pml_dr_vfrag_t req_vfrag0;
|
||||
@ -158,6 +159,7 @@ do {
|
||||
sendreq->req_bytes_delivered = 0; \
|
||||
sendreq->req_state = 0; \
|
||||
sendreq->req_send_offset = 0; \
|
||||
sendreq->req_matched = false; \
|
||||
sendreq->req_send.req_base.req_pml_complete = false; \
|
||||
sendreq->req_send.req_base.req_ompi.req_complete = false; \
|
||||
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
|
||||
@ -168,7 +170,7 @@ do {
|
||||
sendreq->req_vfrag0.vf_ack = 0; \
|
||||
sendreq->req_vfrag0.vf_mask_processed = 0; \
|
||||
sendreq->req_vfrag0.vf_retrans = 0; \
|
||||
sendreq->req_vfrag0.vf_retry_cnt = 0; \
|
||||
sendreq->req_vfrag0.vf_retry_cnt = 0; \
|
||||
sendreq->req_vfrag = &sendreq->req_vfrag0; \
|
||||
\
|
||||
/* select a btl */ \
|
||||
@ -359,7 +361,7 @@ do { \
|
||||
\
|
||||
opal_output(0, "%s:%d:%s, retransmitting\n", __FILE__, __LINE__, __func__); \
|
||||
assert(sendreq->descriptor->des_src != NULL); \
|
||||
vfrag->vf_idx = 1; \
|
||||
vfrag->vf_idx = 0; \
|
||||
vfrag->vf_mask_processed = 0; \
|
||||
vfrag->vf_ack = 0; \
|
||||
vfrag->vf_retrans = 0; \
|
||||
|
@ -73,7 +73,7 @@ void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
|
||||
opal_output(0, "%s:%d:%s, wdog retry count exceeded! FATAL", __FILE__, __LINE__, __func__);
|
||||
orte_errmgr.abort();
|
||||
}
|
||||
vfrag->vf_idx = 1;
|
||||
vfrag->vf_idx = 0;
|
||||
vfrag->vf_mask_processed = 0;
|
||||
vfrag->vf_ack = 0;
|
||||
vfrag->vf_retrans = 0;
|
||||
@ -93,11 +93,12 @@ void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) {
|
||||
opal_output(0, "%s:%d: maximum ack retry count exceeded: FATAL", __FILE__, __LINE__);
|
||||
orte_errmgr.abort();
|
||||
}
|
||||
|
||||
if(0 == vfrag->vf_offset) { /* this is the first part of the message
|
||||
that we need to resend */
|
||||
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
|
||||
} else {
|
||||
vfrag->vf_idx = 1;
|
||||
vfrag->vf_idx = 0;
|
||||
vfrag->vf_mask_processed = 0;
|
||||
vfrag->vf_ack = 0;
|
||||
vfrag->vf_retrans = 0;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user