1
1
This commit was SVN r9340.
Этот коммит содержится в:
Galen Shipman 2006-03-20 21:57:30 +00:00
родитель 5600932c2f
Коммит ca13833e95
6 изменённых файлов: 137 добавлений и 114 удалений

Просмотреть файл

@ -107,7 +107,7 @@ void mca_pml_dr_recv_frag_callback(
if(false == duplicate) {
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
} else {
OPAL_OUTPUT((0, "%s:%d: dropping duplicate fragment\n"));
OPAL_OUTPUT((0, "%s:%d: dropping duplicate fragment\n", __FILE__, __LINE__));
}
break;
}
@ -163,6 +163,7 @@ void mca_pml_dr_recv_frag_callback(
break;
}
default:
OPAL_OUTPUT((0, "%s:%d: dropping unknown header type\n"));
return; /* drop it on the floor.. */
break;
}
@ -596,7 +597,7 @@ rematch:
hdr->hdr_src_ptr,
0);
opal_output(0, "%s:%d: corrupted data 0x%08x != 0x%08x\n",
__FILE__, __LINE__, csum, hdr->hdr_csum);
__FILE__, __LINE__, csum, hdr->hdr_csum);
MCA_PML_DR_RECV_FRAG_RETURN(frag);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
return false;

Просмотреть файл

@ -345,7 +345,8 @@ do { \
(vfrag)->vf_len = (hdr)->hdr_vlen; \
(vfrag)->vf_ack = 0; \
(vfrag)->vf_mask_processed = 0; \
(vfrag)->vf_send_cnt = 1; \
(vfrag)->vf_retrans = 0; \
(vfrag)->vf_retry_cnt = 0; \
if((hdr)->hdr_vlen == 64) { \
(vfrag)->vf_mask = ~(uint64_t)0; \
} else { \

Просмотреть файл

@ -121,7 +121,7 @@ static void mca_pml_dr_match_completion(
/* been acked? */
if(vfrag->vf_ack == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
/* return descriptor */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
@ -131,8 +131,12 @@ static void mca_pml_dr_match_completion(
/* update statistics */
sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed;
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else if(vfrag->vf_retrans) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
/*
@ -164,7 +168,7 @@ static void mca_pml_dr_rndv_completion(
/* been acked? */
if(vfrag->vf_ack == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
if(sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
sendreq->descriptor = NULL;
@ -175,11 +179,16 @@ static void mca_pml_dr_rndv_completion(
} else {
schedule = true;
}
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(schedule) {
mca_pml_dr_send_request_schedule(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(schedule) {
mca_pml_dr_send_request_schedule(sendreq);
}
} else if(vfrag->vf_retrans) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
}
/* check for pending requests */
MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING();
@ -419,6 +428,7 @@ int mca_pml_dr_send_request_start_copy(
return rc;
}
}
assert(sendreq->req_send.req_convertor.checksum);
/* build match header */
hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0;
@ -724,7 +734,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
while(opal_list_get_size(&sendreq->req_retrans) &&
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*)opal_list_get_first(&sendreq->req_retrans);
vfrag->vf_send_cnt ++;
vfrag->vf_retry_cnt ++;
/*
* Retransmit fragments that have not been acked.
@ -832,38 +842,41 @@ void mca_pml_dr_send_request_match_ack(
{
mca_pml_dr_vfrag_t* vfrag = ack->hdr_src_ptr.pval;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
OPAL_THREAD_LOCK(&ompi_request_lock);
assert(vfrag->vf_ack == 0);
/* need to retransmit? */
if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) {
mca_bml_base_btl_t* bml_btl = sendreq->descriptor->des_context;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_bml_base_send(bml_btl, sendreq->descriptor, MCA_BTL_TAG_PML);
/* if already have local completion free descriptor and complete message */
} else if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
/* return descriptor */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor );
sendreq->descriptor = NULL;
if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
/* need to retransmit? */
if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) {
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
} else {
/* if already have local completion free descriptor and complete message */
/* return descriptor */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor );
sendreq->descriptor = NULL;
}
/* do NOT complete message until matched at peer */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
/* update statistics */
sendreq->req_bytes_delivered = vfrag->vf_size;
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
/* do NOT complete message until matched at peer */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
/* update statistics */
sendreq->req_bytes_delivered = vfrag->vf_size;
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* wait for local completion */
} else {
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
/* need to retransmit? */
if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) {
vfrag->vf_retrans = vfrag->vf_mask;
} else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
@ -879,53 +892,55 @@ void mca_pml_dr_send_request_rndv_ack(
{
mca_pml_dr_vfrag_t* vfrag = ack->hdr_src_ptr.pval;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
OPAL_THREAD_LOCK(&ompi_request_lock);
/* need to retransmit? */
if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) {
mca_bml_base_btl_t* bml_btl = sendreq->descriptor->des_context;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_bml_base_send(bml_btl, sendreq->descriptor, MCA_BTL_TAG_PML);
/* acked and local completion */
} else if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
bool schedule = false;
/* return descriptor for the first fragment */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
sendreq->descriptor = NULL;
}
/* do NOT schedule remainder of message until matched at peer */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
sendreq->req_bytes_delivered = vfrag->vf_size;
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
} else {
vfrag->vf_recv = ack->hdr_dst_ptr;
schedule = true;
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
/* need to retransmit? */
if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) {
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
} else {
/* acked and local completion */
/* return descriptor for the first fragment */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
sendreq->descriptor = NULL;
}
/* vfrag has been matched at peer */
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(schedule) {
mca_pml_dr_send_request_schedule(sendreq);
/* do NOT schedule remainder of message until matched at peer */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
sendreq->req_bytes_delivered = vfrag->vf_size;
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
} else {
vfrag->vf_recv = ack->hdr_dst_ptr;
schedule = true;
}
/* vfrag has been matched at peer */
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(schedule) {
mca_pml_dr_send_request_schedule(sendreq);
}
}
/* wait for local completion */
} else {
/* may need this to schedule rest of the message */
vfrag->vf_recv = ack->hdr_dst_ptr;
/* dont set ack until matched at peer */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
/* need to retransmit? */
if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) {
vfrag->vf_retrans = vfrag->vf_mask;
} else {
/* may need this to schedule rest of the message */
vfrag->vf_recv = ack->hdr_dst_ptr;
/* dont set ack until matched at peer */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
}
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
@ -950,7 +965,7 @@ void mca_pml_dr_send_request_frag_ack(
/* need to retransmit? */
if((vfrag->vf_ack & vfrag->vf_mask) != vfrag->vf_mask) {
/* reset local completion flags to only those that have been successfully acked */
vfrag->vf_mask_processed = vfrag->vf_ack;
vfrag->vf_idx = 1;

Просмотреть файл

@ -167,6 +167,8 @@ do {
sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
sendreq->req_vfrag0.vf_ack = 0; \
sendreq->req_vfrag0.vf_mask_processed = 0; \
sendreq->req_vfrag0.vf_retrans = 0; \
sendreq->req_vfrag0.vf_retry_cnt = 0; \
sendreq->req_vfrag = &sendreq->req_vfrag0; \
\
/* select a btl */ \
@ -307,31 +309,13 @@ do {
vfrag->vf_ack = 0; \
vfrag->vf_idx = 0; \
vfrag->vf_mask_processed = 0; \
vfrag->vf_retrans = 0; \
vfrag->vf_retry_cnt = 0; \
vfrag->vf_max_send_size = max_send_size; \
vfrag->vf_send.pval = sendreq; \
sendreq->req_vfrag = vfrag; \
} while(0)
/*
*
*/
#define MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq,hdr,vfrag) \
do { \
opal_list_item_t* item; \
vfrag = NULL; \
for(item = opal_list_get_first(&(sendreq)->req_retrans); \
item != opal_list_get_end(&(sendreq)->req_retrans); \
item = opal_list_get_next(item)) { \
mca_pml_dr_vfrag_t* vf = (mca_pml_dr_vfrag_t*)item; \
if(vf->vf_id == (hdr)->hdr_vid) { \
vfrag = vf; \
break; \
} \
} \
} while(0)
/*
* Update bytes delivered on request based on supplied descriptor
*/
@ -360,6 +344,23 @@ do { \
} while (0)
/*
* Requeue first fragment of message for retransmission
*/
#define MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag) \
do { \
mca_bml_base_btl_t* bml_btl = sendreq->descriptor->des_context; \
opal_output(0, "%s:%d:%s, retransmitting\n", __FILE__, __LINE__, __func__); \
assert(sendreq->descriptor->des_src != NULL); \
vfrag->vf_idx = 1; \
vfrag->vf_mask_processed = 0; \
vfrag->vf_ack = 0; \
vfrag->vf_retrans = 0; \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
MCA_PML_DR_VFRAG_ACK_START(vfrag); \
mca_bml_base_send(bml_btl, sendreq->descriptor, MCA_BTL_TAG_PML);\
} while(0)
/**
* Start the specified request
*/

Просмотреть файл

@ -36,7 +36,8 @@ static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag)
vfrag->vf_max_send_size = 0;
vfrag->vf_ack = 0;
vfrag->vf_mask = 0;
vfrag->vf_send_cnt = 1;
vfrag->vf_retrans = 0;
vfrag->vf_retry_cnt = 0;
vfrag->tv_wdog.tv_sec = mca_pml_dr.timer_wdog_sec;
vfrag->tv_wdog.tv_usec = mca_pml_dr.timer_wdog_usec;
vfrag->tv_ack.tv_sec = mca_pml_dr.timer_ack_usec;
@ -68,14 +69,15 @@ void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
OPAL_THREAD_LOCK(&ompi_request_lock);
vfrag->vf_send_cnt++;
if(vfrag->vf_send_cnt > mca_pml_dr.timer_wdog_max_count) {
vfrag->vf_retry_cnt++;
if(vfrag->vf_retry_cnt > mca_pml_dr.timer_wdog_max_count) {
opal_output(0, "wdog retry count exceeded! %s:%d FATAL", __FILE__, __LINE__);
orte_errmgr.abort();
}
vfrag->vf_idx = 1;
vfrag->vf_mask_processed = 0;
vfrag->vf_ack = 0;
vfrag->vf_retrans = 0;
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_schedule(sendreq);
@ -88,21 +90,19 @@ void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) {
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
OPAL_THREAD_LOCK(&ompi_request_lock);
vfrag->vf_send_cnt++;
if(vfrag->vf_send_cnt > mca_pml_dr.timer_ack_max_count) {
vfrag->vf_retry_cnt++;
if(vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) {
opal_output(0, "%s:%d: maximum ack retry count exceeded: FATAL", __FILE__, __LINE__);
orte_errmgr.abort();
}
vfrag->vf_idx = 1;
vfrag->vf_mask_processed = 0;
vfrag->vf_ack = 0;
if(0 == vfrag->vf_offset) { /* this is the first part of the message
that we need to resend */
mca_bml_base_btl_t* bml_btl = sendreq->descriptor->des_context;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_bml_base_send(bml_btl, sendreq->descriptor, MCA_BTL_TAG_PML);
MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag);
} else {
vfrag->vf_idx = 1;
vfrag->vf_mask_processed = 0;
vfrag->vf_ack = 0;
vfrag->vf_retrans = 0;
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_schedule(sendreq);

Просмотреть файл

@ -37,13 +37,14 @@ struct mca_pml_dr_vfrag_t {
uint32_t vf_id;
uint16_t vf_idx;
uint16_t vf_len;
uint8_t vf_send_cnt;
uint8_t vf_retry_cnt;
size_t vf_offset;
size_t vf_size;
size_t vf_max_send_size;
uint64_t vf_ack;
uint64_t vf_mask;
uint64_t vf_mask_processed;
uint64_t vf_retrans;
struct mca_bml_base_btl_t* bml_btl;
/* we need a timer for the vfrag for:
@ -87,11 +88,13 @@ do { \
do { \
opal_event_del(&vfrag->ev_wdog); \
vfrag->tv_wdog.tv_sec = \
mca_pml_dr.timer_wdog_sec + \
mca_pml_dr.timer_wdog_sec * mca_pml_dr.timer_wdog_multiplier * \
vfrag->vf_send_cnt; \
vfrag->vf_retry_cnt; \
vfrag->tv_wdog.tv_usec = \
mca_pml_dr.timer_wdog_usec + \
mca_pml_dr.timer_wdog_usec * mca_pml_dr.timer_wdog_multiplier * \
vfrag->vf_send_cnt; \
vfrag->vf_retry_cnt; \
opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \
} while(0)
@ -103,13 +106,15 @@ do { \
#define MCA_PML_DR_VFRAG_ACK_START(vfrag) \
do { \
(vfrag)->tv_ack.tv_sec = \
(vfrag)->tv_ack.tv_sec = \
mca_pml_dr.timer_ack_sec + \
mca_pml_dr.timer_ack_sec * mca_pml_dr.timer_ack_multiplier * \
(vfrag)->vf_send_cnt; \
(vfrag)->tv_ack.tv_usec = \
(vfrag)->vf_retry_cnt; \
(vfrag)->tv_ack.tv_usec = \
mca_pml_dr.timer_ack_usec + \
mca_pml_dr.timer_ack_usec * mca_pml_dr.timer_ack_multiplier * \
(vfrag)->vf_send_cnt; \
opal_event_add(&(vfrag)->ev_ack, &(vfrag)->tv_ack); \
(vfrag)->vf_retry_cnt; \
opal_event_add(&(vfrag)->ev_ack, &(vfrag)->tv_ack); \
\
} while(0)