1
1
This commit was SVN r9369.
Этот коммит содержится в:
Galen Shipman 2006-03-22 23:06:18 +00:00
родитель 078cdcc9a8
Коммит 70cf1ce562
9 изменённых файлов: 149 добавлений и 77 удалений

Просмотреть файл

@ -27,6 +27,23 @@ OBJ_CLASS_INSTANCE(mca_pml_dr_range_t,
NULL,
NULL);
static void mca_pml_dr_seq_tracker_construct(mca_pml_dr_seq_tracker_t* seq_tracker) {
OBJ_CONSTRUCT(&seq_tracker->vfrag_ids, opal_list_t);
seq_tracker->vfrag_ids_current = NULL;
}
static void mca_pml_dr_seq_tracker_destruct(mca_pml_dr_seq_tracker_t* seq_tracker) {
OBJ_DESTRUCT(&seq_tracker->vfrag_ids);
}
OBJ_CLASS_INSTANCE(
mca_pml_dr_seq_tracker_t,
opal_object_t,
mca_pml_dr_seq_tracker_construct,
mca_pml_dr_seq_tracker_destruct);
static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc)
{
proc->expected_sequence = 1;
@ -36,8 +53,8 @@ static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc)
OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t);
OBJ_CONSTRUCT(&proc->matched_receives, opal_list_t);
OBJ_CONSTRUCT(&proc->unexpected_frags, opal_list_t);
OBJ_CONSTRUCT(&proc->vfrag_ids, opal_list_t);
proc->vfrag_ids_current = NULL;
OBJ_CONSTRUCT(&proc->seq_sends, mca_pml_dr_seq_tracker_t);
OBJ_CONSTRUCT(&proc->seq_recvs, mca_pml_dr_seq_tracker_t);
}
@ -47,6 +64,8 @@ static void mca_pml_dr_comm_proc_destruct(mca_pml_dr_comm_proc_t* proc)
OBJ_DESTRUCT(&proc->matched_receives);
OBJ_DESTRUCT(&proc->specific_receives);
OBJ_DESTRUCT(&proc->unexpected_frags);
OBJ_DESTRUCT(&proc->seq_sends);
OBJ_DESTRUCT(&proc->seq_recvs);
}
@ -56,7 +75,6 @@ static OBJ_CLASS_INSTANCE(
mca_pml_dr_comm_proc_construct,
mca_pml_dr_comm_proc_destruct);
static void mca_pml_dr_comm_construct(mca_pml_dr_comm_t* comm)
{
OBJ_CONSTRUCT(&comm->wild_receives, opal_list_t);

Просмотреть файл

@ -40,6 +40,14 @@ typedef struct mca_pml_dr_range_t mca_pml_dr_range_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_range_t);
struct mca_pml_dr_seq_tracker_t{
opal_list_t vfrag_ids; /**< list of vfrags id's that have been seen */
mca_pml_dr_range_t* vfrag_ids_current; /**< a pointer to the last place we were in the list */
};
typedef struct mca_pml_dr_seq_tracker_t mca_pml_dr_seq_tracker_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_seq_tracker_t);
struct mca_pml_dr_comm_proc_t {
opal_object_t super;
@ -55,12 +63,12 @@ struct mca_pml_dr_comm_proc_t {
opal_list_t unexpected_frags; /**< unexpected fragment queues */
opal_list_t matched_receives; /**< list of in-progress matched receives */
ompi_proc_t* ompi_proc; /**< back pointer to ompi_proc_t */
opal_list_t vfrag_ids; /**< list of vfrags id's that have been seen */
mca_pml_dr_range_t* vfrag_ids_current; /**< a pointer to the last place we were in the list */
mca_pml_dr_seq_tracker_t seq_sends; /**< Tracks the send vfrags that have been acked */
mca_pml_dr_seq_tracker_t seq_recvs; /**< Tracks the receive vfrags that have been acked */
};
typedef struct mca_pml_dr_comm_proc_t mca_pml_dr_comm_proc_t;
/**
/**
* Cached on ompi_communicator_t to hold queues/state
* used by the PML<->PTL interface for matching logic.
*/
@ -97,18 +105,18 @@ OMPI_DECLSPEC extern int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_c
*/
static inline bool mca_pml_dr_comm_proc_check_duplicate(
mca_pml_dr_comm_proc_t* dr_proc,
mca_pml_dr_seq_tracker_t* seq_tracker,
uint32_t vfrag_id)
{
mca_pml_dr_range_t* item;
int8_t direction = 0; /* 1 is next, -1 is previous */
item = dr_proc->vfrag_ids_current;
item = seq_tracker->vfrag_ids_current;
while(true) {
if(NULL == item) {
return false;
} else if(item->vfrag_id_high >= vfrag_id && item->vfrag_id_low <= vfrag_id) {
dr_proc->vfrag_ids_current = (mca_pml_dr_range_t*) item;
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item;
return true;
} else if(vfrag_id > item->vfrag_id_high && direction != -1) {
direction = 1;
@ -126,10 +134,11 @@ static inline bool mca_pml_dr_comm_proc_check_duplicate(
/*
* Must be called w/ matching lock held
*/
static inline void mca_pml_dr_comm_proc_set_vid(mca_pml_dr_comm_proc_t* dr_comm_proc, uint32_t vfrag_id)
static inline void mca_pml_dr_comm_proc_set_vid(mca_pml_dr_seq_tracker_t* seq_tracker,
uint32_t vfrag_id)
{
opal_list_t* vfrag_ids = &dr_comm_proc->vfrag_ids;
mca_pml_dr_range_t* item = dr_comm_proc->vfrag_ids_current;
opal_list_t* vfrag_ids = &seq_tracker->vfrag_ids;
mca_pml_dr_range_t* item = seq_tracker->vfrag_ids_current;
int8_t direction = 0; /* 1 is next, -1 is previous */
mca_pml_dr_range_t *new_item, *next_item, *prev_item;
while(true) {
@ -137,18 +146,18 @@ static inline void mca_pml_dr_comm_proc_set_vid(mca_pml_dr_comm_proc_t* dr_comm_
new_item = OBJ_NEW(mca_pml_dr_range_t);
new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id;
opal_list_append(vfrag_ids, (opal_list_item_t*) new_item);
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
return;
} else if( item == (mca_pml_dr_range_t*) &vfrag_ids->opal_list_head ) {
new_item = OBJ_NEW(mca_pml_dr_range_t);
new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id;
opal_list_prepend(vfrag_ids, (opal_list_item_t*) new_item);
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
return;
} else if(item->vfrag_id_high >= vfrag_id && item->vfrag_id_low <= vfrag_id ) {
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) item;
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item;
return;
} else if((item->vfrag_id_high + 1) == vfrag_id) {
@ -162,7 +171,7 @@ static inline void mca_pml_dr_comm_proc_set_vid(mca_pml_dr_comm_proc_t* dr_comm_
} else {
item->vfrag_id_high = vfrag_id;
}
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) item;
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item;
return;
} else if((item->vfrag_id_low - 1) == vfrag_id) {
@ -176,7 +185,7 @@ static inline void mca_pml_dr_comm_proc_set_vid(mca_pml_dr_comm_proc_t* dr_comm_
} else {
item->vfrag_id_low = vfrag_id;
}
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) item;
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item;
return;
} else if(vfrag_id > item->vfrag_id_high ) {
@ -188,7 +197,7 @@ static inline void mca_pml_dr_comm_proc_set_vid(mca_pml_dr_comm_proc_t* dr_comm_
opal_list_insert_pos(vfrag_ids,
(opal_list_item_t*) item,
(opal_list_item_t*) new_item);
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
return;
} else {
direction = 1;
@ -206,7 +215,7 @@ static inline void mca_pml_dr_comm_proc_set_vid(mca_pml_dr_comm_proc_t* dr_comm_
(opal_list_item_t*) next_item,
(opal_list_item_t*) new_item);
}
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
return;
} else {
direction = -1;

Просмотреть файл

@ -120,7 +120,7 @@ void mca_pml_dr_recv_frag_callback(
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
OPAL_THREAD_LOCK(&comm->c_matching_lock);
if(mca_pml_dr_comm_proc_check_duplicate(proc, hdr->hdr_common.hdr_vid)) {
if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&comm->c_matching_lock);
OPAL_OUTPUT((0, "%s:%d: acking duplicate match\n", __FILE__, __LINE__));
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml,
@ -136,7 +136,10 @@ void mca_pml_dr_recv_frag_callback(
case MCA_PML_DR_HDR_TYPE_MATCH_ACK:
{
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t);
mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack);
}
break;
}
case MCA_PML_DR_HDR_TYPE_RNDV:
@ -145,15 +148,17 @@ void mca_pml_dr_recv_frag_callback(
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
OPAL_THREAD_LOCK(&comm->matching_lock);
if(mca_pml_dr_comm_proc_check_duplicate(proc, hdr->hdr_common.hdr_vid)) {
if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
/* ack only if this has been matched */
mca_pml_dr_recv_request_t* recvreq =
mca_pml_dr_comm_proc_check_matched(proc, hdr->hdr_common.hdr_vid);
OPAL_THREAD_UNLOCK(&comm->c_matching_lock);
if(NULL != recvreq) {
OPAL_OUTPUT((0, "%s:%d: acking duplicate rendezvous\n", __FILE__, __LINE__));
OPAL_OUTPUT((0, "%s:%d: acking duplicate matched rendezvous\n", __FILE__, __LINE__));
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr, recvreq->req_bytes_received, 1);
} else {
OPAL_OUTPUT((0, "%s:%d: droping duplicate unmatched rendezvous\n", __FILE__, __LINE__));
}
} else {
OPAL_THREAD_UNLOCK(&comm->c_matching_lock);
@ -163,8 +168,12 @@ void mca_pml_dr_recv_frag_callback(
}
case MCA_PML_DR_HDR_TYPE_RNDV_ACK:
{
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t);
mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack);
}
break;
}
case MCA_PML_DR_HDR_TYPE_FRAG:
@ -174,7 +183,7 @@ void mca_pml_dr_recv_frag_callback(
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
OPAL_THREAD_LOCK(&comm->matching_lock);
if(mca_pml_dr_comm_proc_check_duplicate(proc, hdr->hdr_common.hdr_vid)) {
if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&comm->c_matching_lock);
OPAL_OUTPUT((0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__));
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml,
@ -192,7 +201,10 @@ void mca_pml_dr_recv_frag_callback(
case MCA_PML_DR_HDR_TYPE_FRAG_ACK:
{
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t);
mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack);
}
break;
}
default:
@ -627,7 +639,7 @@ rematch:
opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag);
}
mca_pml_dr_comm_proc_set_vid(proc, hdr->hdr_common.hdr_vid);
mca_pml_dr_comm_proc_set_vid(&proc->seq_recvs, hdr->hdr_common.hdr_vid);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
/* release matching lock before processing fragment */

Просмотреть файл

@ -255,34 +255,41 @@ void mca_pml_dr_recv_request_progress(
break;
case MCA_PML_DR_HDR_TYPE_FRAG:
bytes_received -= sizeof(mca_pml_dr_frag_hdr_t);
data_offset = hdr->hdr_frag.hdr_frag_offset;
MCA_PML_DR_RECV_REQUEST_UNPACK(
recvreq,
segments,
num_segments,
sizeof(mca_pml_dr_frag_hdr_t),
data_offset,
bytes_received,
bytes_delivered,
csum);
bit = ((uint64_t)1 << hdr->hdr_frag.hdr_frag_idx);
MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq, &hdr->hdr_frag, vfrag);
if(vfrag->vf_ack & bit) {
/* duplicate, nothing to do */
return;
}
bytes_received -= sizeof(mca_pml_dr_frag_hdr_t);
data_offset = hdr->hdr_frag.hdr_frag_offset;
MCA_PML_DR_RECV_REQUEST_UNPACK(
recvreq,
segments,
num_segments,
sizeof(mca_pml_dr_frag_hdr_t),
data_offset,
bytes_received,
bytes_delivered,
csum);
/* update the mask to show that this vfrag was received,
* note that it might still fail the checksum though
*/
vfrag->vf_mask_processed |= bit;
vfrag->vf_mask_pending |= bit;
if(csum == hdr->hdr_frag.hdr_frag_csum) {
/* this part of the vfrag passed the checksum,
mark it so that we ack it after receiving the
entire vfrag */
vfrag->vf_ack |= bit;
if((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
if((vfrag->vf_mask_pending & vfrag->vf_mask) == vfrag->vf_mask) {
/* we have received all the pieces of the vfrag, ack
everything that passed the checksum */
mca_pml_dr_comm_proc_set_vid(recvreq->req_proc, vfrag->vf_id);
mca_pml_dr_comm_proc_set_vid(&recvreq->req_proc->seq_recvs, vfrag->vf_id);
OPAL_OUTPUT((0, "%s:%d ACKING VFRAG vf_ack says %08x bytes_received %d\n",
__FILE__,__LINE__, vfrag->vf_ack, recvreq->req_bytes_received));
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
hdr->hdr_frag.hdr_src_ptr, vfrag->vf_size, vfrag->vf_mask);
}

Просмотреть файл

@ -374,7 +374,7 @@ do { \
(vfrag)->vf_id = (hdr)->hdr_common.hdr_vid; \
(vfrag)->vf_len = (hdr)->hdr_vlen; \
(vfrag)->vf_ack = 0; \
(vfrag)->vf_mask_processed = 0; \
(vfrag)->vf_mask_pending = 0; \
(vfrag)->vf_retrans = 0; \
(vfrag)->vf_retry_cnt = 0; \
if((hdr)->hdr_vlen == 64) { \

Просмотреть файл

@ -72,7 +72,7 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req)
req->req_vfrag0.vf_idx = 0;
req->req_vfrag0.vf_ack = 0;
req->req_vfrag0.vf_mask = 1;
req->req_vfrag0.vf_mask_processed = 0;
req->req_vfrag0.vf_mask_pending = 0;
req->req_vfrag0.vf_send.pval = req;
req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND;
req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free;
@ -122,8 +122,7 @@ static void mca_pml_dr_match_completion(
OPAL_THREAD_LOCK(&ompi_request_lock);
/* local completion */
assert(vfrag->vf_mask_processed == 0);
vfrag->vf_mask_processed |= 0x1;
vfrag->vf_mask_pending = 0;
/* been acked? */
if(vfrag->vf_ack == vfrag->vf_mask) {
@ -176,7 +175,7 @@ static void mca_pml_dr_rndv_completion(
OPAL_THREAD_LOCK(&ompi_request_lock);
/* local completion */
vfrag->vf_mask_processed |= 0x1;
vfrag->vf_mask_pending = 0;
/* positive ack? */
if(vfrag->vf_ack == vfrag->vf_mask) {
@ -188,7 +187,6 @@ static void mca_pml_dr_rndv_completion(
/* matched at peer? */
if(NULL != sendreq->req_vfrag0.vf_recv.pval) {
sendreq->req_bytes_delivered = vfrag->vf_size;
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
} else {
@ -238,28 +236,30 @@ static void mca_pml_dr_frag_completion(
OPAL_THREAD_LOCK(&ompi_request_lock);
bit = ((uint64_t)1 << hdr->hdr_frag_idx);
vfrag->vf_mask_processed |= bit;
vfrag->vf_mask_pending &= ~bit;
/* update pipeline depth */
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1);
/* when we have local completion of the entire vfrag
we stop the local wdog timers and set our ack timer
as the peer should be sending us an ack for the vfrag
*/
if(vfrag->vf_mask_processed == vfrag->vf_mask) {
if(vfrag->vf_mask_pending == 0) {
MCA_PML_DR_VFRAG_WDOG_STOP(vfrag);
/* has the vfrag already been acked */
if(vfrag->vf_ack == vfrag->vf_mask) {
/* check to see if we need to schedule the remainder of the message */
sendreq->req_bytes_delivered += vfrag->vf_size;
assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed);
/* is this vfrag in the process of being retransmitted */
if(vfrag->vf_idx != vfrag->vf_len) {
opal_list_remove_item(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
}
/* return this vfrag */
MCA_PML_DR_VFRAG_RETURN(vfrag);
} else {
} else if (vfrag->vf_idx == vfrag->vf_len) {
MCA_PML_DR_VFRAG_ACK_START(vfrag);
}
} else {
@ -267,7 +267,8 @@ static void mca_pml_dr_frag_completion(
}
/* are we done with this request ? */
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
if(OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth, -1) == 0 &&
sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
} else if (sendreq->req_send_offset < sendreq->req_send.req_bytes_packed ||
opal_list_get_size(&sendreq->req_retrans)) {
@ -720,11 +721,16 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_src_ptr.pval = vfrag;
hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
assert(hdr->hdr_frag_offset < sendreq->req_send.req_bytes_packed);
vfrag->vf_mask_pending |= (1 << vfrag->vf_idx);
/* update state */
vfrag->vf_idx++;
vfrag->vf_rndv = false;
vfrag->vf_rndv = false;
sendreq->req_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
@ -761,7 +767,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
*/
while(vfrag->vf_idx < vfrag->vf_len &&
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
if(((1 << vfrag->vf_idx) & vfrag->vf_mask_processed) == 0) {
if(((1 << vfrag->vf_idx) & vfrag->vf_mask) == 0) {
mca_bml_base_btl_t* bml_btl = vfrag->bml_btl;
mca_pml_dr_frag_hdr_t* hdr;
mca_btl_base_descriptor_t* des;
@ -808,12 +814,13 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_vlen = vfrag->vf_len;
hdr->hdr_frag_idx = vfrag->vf_idx;
hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_frag_offset = sendreq->req_send_offset;
hdr->hdr_frag_offset = offset_in_msg;
hdr->hdr_src_ptr.pval = vfrag;
hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
vfrag->vf_rndv = false;
vfrag->vf_mask_pending |= (1 << vfrag->vf_idx);
vfrag->vf_rndv = false;
/* update state */
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
@ -865,7 +872,7 @@ void mca_pml_dr_send_request_match_ack(
assert(vfrag->vf_ack == 0);
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
if (vfrag->vf_mask_pending == 0) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
@ -880,6 +887,8 @@ void mca_pml_dr_send_request_match_ack(
/* update statistics */
sendreq->req_bytes_delivered = vfrag->vf_size;
/* stash the vfid for duplicate acks.. */
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
@ -913,7 +922,7 @@ void mca_pml_dr_send_request_rndv_ack(
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
/* local completion? */
if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
if (vfrag->vf_mask_pending == 0) {
bool schedule = false;
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
@ -937,23 +946,31 @@ void mca_pml_dr_send_request_rndv_ack(
sendreq->req_send_offset = ack->hdr_vlen;
schedule = true;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* stash the vfid for duplicate acks.. */
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(schedule) {
mca_pml_dr_send_request_schedule(sendreq);
}
}
/* wait for local completion */
} else {
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
vfrag->vf_retrans = vfrag->vf_mask;
} else {
/* may need this to schedule rest of the message */
if(sendreq->req_send.req_bytes_packed > 0) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
printf("blah");
}
vfrag->vf_recv = ack->hdr_dst_ptr;
sendreq->req_send_offset = ack->hdr_vlen;
sendreq->req_bytes_delivered = ack->hdr_vlen;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
@ -980,18 +997,23 @@ void mca_pml_dr_send_request_frag_ack(
if((vfrag->vf_ack & vfrag->vf_mask) != vfrag->vf_mask) {
/* reset local completion flags to only those that have been successfully acked */
vfrag->vf_mask_processed = vfrag->vf_ack;
vfrag->vf_mask = ~vfrag->vf_ack;
vfrag->vf_idx = 0;
vfrag->vf_mask_pending = 0;
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
schedule = true;
/* acked and local completion */
} else if (vfrag->vf_mask_processed == vfrag->vf_mask) {
} else if (vfrag->vf_mask_pending == 0
&& vfrag->vf_idx == vfrag->vf_len) {
/* update statistics */
sendreq->req_bytes_delivered += vfrag->vf_size;
assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed);
/* stash the vfid for duplicate acks.. */
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
/* return vfrag */
MCA_PML_DR_VFRAG_RETURN(vfrag);

Просмотреть файл

@ -41,7 +41,7 @@ extern "C" {
struct mca_pml_dr_send_request_t {
mca_pml_base_send_request_t req_send;
/* ompi_proc_t* req_proc; */
mca_pml_dr_comm_proc_t* req_proc;
mca_bml_base_endpoint_t* req_endpoint;
#if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t req_state;
@ -58,6 +58,7 @@ struct mca_pml_dr_send_request_t {
mca_pml_dr_vfrag_t req_vfrag0;
opal_list_t req_retrans;
mca_btl_base_descriptor_t* descriptor; /* descriptor for first frag, retransmission */
};
typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t;
@ -138,7 +139,7 @@ do {
/**
* Start a send request.
*/
#define MCA_PML_DR_SEND_REQUEST_START(sendreq, rc) \
do { \
mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \
@ -164,6 +165,7 @@ do {
sendreq->req_send.req_base.req_ompi.req_status._cancelled = 0; \
sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32(&proc->send_sequence,1); \
sendreq->req_endpoint = endpoint; \
sendreq->req_proc = proc; \
MCA_PML_DR_VFRAG_INIT(&sendreq->req_vfrag0); \
sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
sendreq->req_vfrag = &sendreq->req_vfrag0; \
@ -302,7 +304,7 @@ do {
vfrag->vf_offset = sendreq->req_send_offset; \
vfrag->vf_ack = 0; \
vfrag->vf_idx = 0; \
vfrag->vf_mask_processed = 0; \
vfrag->vf_mask_pending = 0; \
vfrag->vf_retrans = 0; \
vfrag->vf_retry_cnt = 0; \
vfrag->vf_max_send_size = max_send_size; \

Просмотреть файл

@ -68,6 +68,7 @@ void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
{
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
opal_output(0, "%s:%d:%s, wdog timeout!", __FILE__, __LINE__, __func__);
OPAL_THREAD_LOCK(&ompi_request_lock);
if(vfrag->vf_retry_cnt > mca_pml_dr.timer_wdog_max_count) {
opal_output(0, "%s:%d:%s, wdog retry count exceeded! FATAL", __FILE__, __LINE__, __func__);
@ -85,6 +86,7 @@ void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) {
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
opal_output(0, "%s:%d:%s, ack timeout!", __FILE__, __LINE__, __func__);
OPAL_THREAD_LOCK(&ompi_request_lock);
if(vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) {
opal_output(0, "%s:%d: maximum ack retry count exceeded: FATAL", __FILE__, __LINE__);
@ -94,9 +96,9 @@ void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) {
if(0 == vfrag->vf_offset) { /* this is the first part of the message
that we need to resend */
if(vfrag->vf_rndv) {
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
} else {
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
}
} else {

Просмотреть файл

@ -43,7 +43,7 @@ struct mca_pml_dr_vfrag_t {
size_t vf_max_send_size;
uint64_t vf_ack;
uint64_t vf_mask;
uint64_t vf_mask_processed;
uint64_t vf_mask_pending;
uint64_t vf_retrans;
bool vf_rndv;
struct mca_bml_base_btl_t* bml_btl;
@ -88,7 +88,7 @@ do { \
#define MCA_PML_DR_VFRAG_RESET(vfrag) \
do { \
(vfrag)->vf_idx = 0; \
(vfrag)->vf_mask_processed = 0; \
(vfrag)->vf_mask_pending = 0; \
(vfrag)->vf_ack = 0; \
(vfrag)->vf_retrans = 0; \
} while(0)