1
1
This commit was SVN r9380.
Этот коммит содержится в:
Tim Woodall 2006-03-23 15:11:06 +00:00
родитель 0dd4af919d
Коммит dc125cf7d5
6 изменённых файлов: 95 добавлений и 87 удалений

Просмотреть файл

@ -58,9 +58,6 @@ struct mca_pml_dr_t {
time_t tout_ack; time_t tout_ack;
time_t tout_watch_dog; time_t tout_watch_dog;
/* lock queue access */
opal_mutex_t lock;
/* pending lists */ /* pending lists */
opal_list_t send_pending; opal_list_t send_pending;
opal_list_t acks_pending; opal_list_t acks_pending;

Просмотреть файл

@ -108,8 +108,6 @@ int mca_pml_dr_component_open(void)
mca_pml_dr_param_register_int("timer_ack_multiplier", 2); mca_pml_dr_param_register_int("timer_ack_multiplier", 2);
mca_pml_dr.timer_ack_max_count = mca_pml_dr.timer_ack_max_count =
mca_pml_dr_param_register_int("timer_ack_max_count", 10); mca_pml_dr_param_register_int("timer_ack_max_count", 10);
OBJ_CONSTRUCT(&mca_pml_dr.lock, opal_mutex_t);
/* requests */ /* requests */
OBJ_CONSTRUCT(&mca_pml_dr.send_requests, ompi_free_list_t); OBJ_CONSTRUCT(&mca_pml_dr.send_requests, ompi_free_list_t);
@ -177,7 +175,6 @@ int mca_pml_dr_component_close(void)
OBJ_DESTRUCT(&mca_pml_dr.recv_requests); OBJ_DESTRUCT(&mca_pml_dr.recv_requests);
OBJ_DESTRUCT(&mca_pml_dr.recv_frags); OBJ_DESTRUCT(&mca_pml_dr.recv_frags);
OBJ_DESTRUCT(&mca_pml_dr.buffers); OBJ_DESTRUCT(&mca_pml_dr.buffers);
OBJ_DESTRUCT(&mca_pml_dr.lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -119,16 +119,17 @@ void mca_pml_dr_recv_frag_callback(
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_match_hdr_t); MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_match_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
OPAL_THREAD_LOCK(&comm->c_matching_lock); /* seq_recvs protected by matching lock */
OPAL_THREAD_LOCK(&comm->matching_lock);
if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&comm->c_matching_lock); OPAL_THREAD_UNLOCK(&comm->matching_lock);
OPAL_OUTPUT((0, "%s:%d: acking duplicate match\n", __FILE__, __LINE__)); OPAL_OUTPUT((0, "%s:%d: acking duplicate match\n", __FILE__, __LINE__));
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml, mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml,
&hdr->hdr_common, &hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr.pval, hdr->hdr_match.hdr_src_ptr.pval,
1); 1);
} else { } else {
OPAL_THREAD_UNLOCK(&comm->c_matching_lock); OPAL_THREAD_UNLOCK(&comm->matching_lock);
mca_pml_dr_recv_frag_match(comm,proc,btl,&hdr->hdr_match,segments,des->des_dst_cnt); mca_pml_dr_recv_frag_match(comm,proc,btl,&hdr->hdr_match,segments,des->des_dst_cnt);
} }
break; break;
@ -137,8 +138,14 @@ void mca_pml_dr_recv_frag_callback(
{ {
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t); MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
/* seq_sends protected by ompi_request lock*/
OPAL_THREAD_LOCK(&ompi_request_lock);
if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack); mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack);
} else {
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
break; break;
} }
@ -147,12 +154,13 @@ void mca_pml_dr_recv_frag_callback(
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_rendezvous_hdr_t); MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_rendezvous_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
/* seq_recvs protected by matching lock */
OPAL_THREAD_LOCK(&comm->matching_lock); OPAL_THREAD_LOCK(&comm->matching_lock);
if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
/* ack only if this has been matched */ /* ack only if the vfrag has been matched */
mca_pml_dr_recv_request_t* recvreq = mca_pml_dr_recv_request_t* recvreq =
mca_pml_dr_comm_proc_check_matched(proc, hdr->hdr_common.hdr_vid); mca_pml_dr_comm_proc_check_matched(proc, hdr->hdr_common.hdr_vid);
OPAL_THREAD_UNLOCK(&comm->c_matching_lock); OPAL_THREAD_UNLOCK(&comm->matching_lock);
if(NULL != recvreq) { if(NULL != recvreq) {
OPAL_OUTPUT((0, "%s:%d: acking duplicate matched rendezvous\n", __FILE__, __LINE__)); OPAL_OUTPUT((0, "%s:%d: acking duplicate matched rendezvous\n", __FILE__, __LINE__));
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
@ -161,19 +169,24 @@ void mca_pml_dr_recv_frag_callback(
OPAL_OUTPUT((0, "%s:%d: droping duplicate unmatched rendezvous\n", __FILE__, __LINE__)); OPAL_OUTPUT((0, "%s:%d: droping duplicate unmatched rendezvous\n", __FILE__, __LINE__));
} }
} else { } else {
OPAL_THREAD_UNLOCK(&comm->c_matching_lock); OPAL_THREAD_UNLOCK(&comm->matching_lock);
mca_pml_dr_recv_frag_match(comm,proc,btl,&hdr->hdr_match,segments,des->des_dst_cnt); mca_pml_dr_recv_frag_match(comm,proc,btl,&hdr->hdr_match,segments,des->des_dst_cnt);
} }
break; break;
} }
case MCA_PML_DR_HDR_TYPE_RNDV_ACK: case MCA_PML_DR_HDR_TYPE_RNDV_ACK:
{ {
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t); MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
/* seq_sends protected by ompi_request lock*/
OPAL_THREAD_LOCK(&ompi_request_lock);
if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack); mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack);
} } else {
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
break; break;
} }
case MCA_PML_DR_HDR_TYPE_FRAG: case MCA_PML_DR_HDR_TYPE_FRAG:
@ -182,16 +195,17 @@ void mca_pml_dr_recv_frag_callback(
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_frag_hdr_t); MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_frag_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
/* seq_recvs protected by matching lock */
OPAL_THREAD_LOCK(&comm->matching_lock); OPAL_THREAD_LOCK(&comm->matching_lock);
if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&comm->c_matching_lock); OPAL_THREAD_UNLOCK(&comm->matching_lock);
OPAL_OUTPUT((0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__)); OPAL_OUTPUT((0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__));
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml, mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml,
&hdr->hdr_common, &hdr->hdr_common,
hdr->hdr_frag.hdr_src_ptr.pval, hdr->hdr_frag.hdr_src_ptr.pval,
~(uint64_t) 0); ~(uint64_t) 0);
} else { } else {
OPAL_THREAD_UNLOCK(&comm->c_matching_lock); OPAL_THREAD_UNLOCK(&comm->matching_lock);
recvreq = hdr->hdr_frag.hdr_dst_ptr.pval; recvreq = hdr->hdr_frag.hdr_dst_ptr.pval;
mca_pml_dr_recv_request_progress(recvreq,btl,segments,des->des_dst_cnt); mca_pml_dr_recv_request_progress(recvreq,btl,segments,des->des_dst_cnt);
} }
@ -202,8 +216,14 @@ void mca_pml_dr_recv_frag_callback(
{ {
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t); MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
/* seq_sends protected by ompi_request lock*/
OPAL_THREAD_LOCK(&ompi_request_lock);
if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack); mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack);
} else {
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
break; break;
} }
@ -672,6 +692,7 @@ void mca_pml_dr_recv_frag_ack(
void *src_ptr, void *src_ptr,
uint64_t mask) uint64_t mask)
{ {
ompi_communicator_t* comm = ompi_comm_lookup(hdr->hdr_ctx);
mca_btl_base_descriptor_t* des; mca_btl_base_descriptor_t* des;
mca_bml_base_btl_t* bml_btl; mca_bml_base_btl_t* bml_btl;
mca_pml_dr_recv_frag_t* frag; mca_pml_dr_recv_frag_t* frag;
@ -689,6 +710,7 @@ void mca_pml_dr_recv_frag_ack(
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_type; ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_type;
ack->hdr_common.hdr_flags = 0; ack->hdr_common.hdr_flags = 0;
ack->hdr_common.hdr_src = comm->c_my_rank;
ack->hdr_common.hdr_dst = hdr->hdr_src; ack->hdr_common.hdr_dst = hdr->hdr_src;
ack->hdr_common.hdr_vid = hdr->hdr_vid; ack->hdr_common.hdr_vid = hdr->hdr_vid;
ack->hdr_common.hdr_ctx = hdr->hdr_ctx; ack->hdr_common.hdr_ctx = hdr->hdr_ctx;

Просмотреть файл

@ -288,8 +288,6 @@ void mca_pml_dr_recv_request_progress(
/* we have received all the pieces of the vfrag, ack /* we have received all the pieces of the vfrag, ack
everything that passed the checksum */ everything that passed the checksum */
mca_pml_dr_comm_proc_set_vid(&recvreq->req_proc->seq_recvs, vfrag->vf_id); mca_pml_dr_comm_proc_set_vid(&recvreq->req_proc->seq_recvs, vfrag->vf_id);
OPAL_OUTPUT((0, "%s:%d ACKING VFRAG vf_ack says %08x bytes_received %d\n",
__FILE__,__LINE__, vfrag->vf_ack, recvreq->req_bytes_received));
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
hdr->hdr_frag.hdr_src_ptr, vfrag->vf_size, vfrag->vf_mask); hdr->hdr_frag.hdr_src_ptr, vfrag->vf_size, vfrag->vf_mask);
} }

Просмотреть файл

@ -106,9 +106,8 @@ static void mca_pml_dr_match_completion(
mca_pml_dr_send_request_t* sendreq = descriptor->des_cbdata; mca_pml_dr_send_request_t* sendreq = descriptor->des_cbdata;
mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0; mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0;
/* free any descriptor used to retransmit */
if(descriptor != sendreq->descriptor) { if(descriptor != sendreq->descriptor) {
/* only the cached descriptor needs to hang around
for retransmission */
mca_bml_base_free(descriptor->des_context, descriptor); mca_bml_base_free(descriptor->des_context, descriptor);
} }
@ -124,7 +123,7 @@ static void mca_pml_dr_match_completion(
/* local completion */ /* local completion */
vfrag->vf_mask_pending = 0; vfrag->vf_mask_pending = 0;
/* been acked? */ /* wait for positive ack to complete request */
if(vfrag->vf_ack == vfrag->vf_mask) { if(vfrag->vf_ack == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag); MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
@ -136,12 +135,18 @@ static void mca_pml_dr_match_completion(
/* update statistics and complete */ /* update statistics and complete */
sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed; sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed;
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* on negative ack need to retransmit */
} else if(vfrag->vf_retrans) { } else if(vfrag->vf_retrans) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag); MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag); MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
} }
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* check for pending requests */
MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING();
} }
/* /*
@ -159,9 +164,8 @@ static void mca_pml_dr_rndv_completion(
mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0; mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0;
bool schedule = false; bool schedule = false;
/* free any descriptor used to retransmit */
if(descriptor != sendreq->descriptor) { if(descriptor != sendreq->descriptor) {
/* only the cached descriptor needs to hang around
for retransmission */
mca_bml_base_free(descriptor->des_context, descriptor); mca_bml_base_free(descriptor->des_context, descriptor);
} }
@ -171,13 +175,12 @@ static void mca_pml_dr_rndv_completion(
opal_output(0, "%s:%d FATAL", __FILE__, __LINE__); opal_output(0, "%s:%d FATAL", __FILE__, __LINE__);
orte_errmgr.abort(); orte_errmgr.abort();
} }
OPAL_THREAD_LOCK(&ompi_request_lock); OPAL_THREAD_LOCK(&ompi_request_lock);
/* local completion */ /* local completion */
vfrag->vf_mask_pending = 0; vfrag->vf_mask_pending = 0;
/* positive ack? */ /* wait for positive ack to complete request */
if(vfrag->vf_ack == vfrag->vf_mask) { if(vfrag->vf_ack == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag); MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
if(sendreq->descriptor) { if(sendreq->descriptor) {
@ -185,23 +188,23 @@ static void mca_pml_dr_rndv_completion(
sendreq->descriptor = NULL; sendreq->descriptor = NULL;
} }
/* matched at peer? */ /* update statistics and complete */
if(NULL != sendreq->req_vfrag0.vf_recv.pval) { mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){ if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
} else { } else {
schedule = true; schedule = true;
}
} }
OPAL_THREAD_UNLOCK(&ompi_request_lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(schedule) { if(schedule) {
mca_pml_dr_send_request_schedule(sendreq); mca_pml_dr_send_request_schedule(sendreq);
} }
/* negative ack - need to retransmit? */ /* on negative ack need to retransmit */
} else if(vfrag->vf_retrans) { } else if(vfrag->vf_retrans) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag); MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag); MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
/* check for pending requests */ /* check for pending requests */
@ -238,15 +241,12 @@ static void mca_pml_dr_frag_completion(
bit = ((uint64_t)1 << hdr->hdr_frag_idx); bit = ((uint64_t)1 << hdr->hdr_frag_idx);
vfrag->vf_mask_pending &= ~bit; vfrag->vf_mask_pending &= ~bit;
/* when we have local completion of the entire vfrag /* have all pending frags completed for this vfrag? */
we stop the local wdog timers and set our ack timer
as the peer should be sending us an ack for the vfrag
*/
if(vfrag->vf_mask_pending == 0) { if(vfrag->vf_mask_pending == 0) {
MCA_PML_DR_VFRAG_WDOG_STOP(vfrag); MCA_PML_DR_VFRAG_WDOG_STOP(vfrag);
/* has the vfrag already been acked */ /* has the vfrag already been acked */
if(vfrag->vf_ack == vfrag->vf_mask) { if (vfrag->vf_ack == vfrag->vf_mask) {
sendreq->req_bytes_delivered += vfrag->vf_size; sendreq->req_bytes_delivered += vfrag->vf_size;
assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed); assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed);
@ -255,13 +255,19 @@ static void mca_pml_dr_frag_completion(
if(vfrag->vf_idx != vfrag->vf_len) { if(vfrag->vf_idx != vfrag->vf_len) {
opal_list_remove_item(&sendreq->req_retrans, (opal_list_item_t*)vfrag); opal_list_remove_item(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
} }
/* record vfrag id to drop duplicate acks */
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
/* return this vfrag */ /* return this vfrag */
MCA_PML_DR_VFRAG_RETURN(vfrag); MCA_PML_DR_VFRAG_RETURN(vfrag);
/* waiting on ack */
} else if (vfrag->vf_idx == vfrag->vf_len) { } else if (vfrag->vf_idx == vfrag->vf_len) {
MCA_PML_DR_VFRAG_ACK_START(vfrag); MCA_PML_DR_VFRAG_ACK_START(vfrag);
} }
/* if not reset the watchdog timer */
} else { } else {
MCA_PML_DR_VFRAG_WDOG_RESET(vfrag); MCA_PML_DR_VFRAG_WDOG_RESET(vfrag);
} }
@ -339,6 +345,7 @@ int mca_pml_dr_send_request_start_buffered(
sendreq->req_vfrag0.vf_size = max_data; sendreq->req_vfrag0.vf_size = max_data;
sendreq->req_vfrag0.bml_btl = bml_btl; sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = true; sendreq->req_vfrag0.vf_rndv = true;
sendreq->req_vfrag0.vf_mask_pending = 1;
descriptor->des_cbfunc = mca_pml_dr_rndv_completion; descriptor->des_cbfunc = mca_pml_dr_rndv_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -422,7 +429,6 @@ int mca_pml_dr_send_request_start_copy(
int32_t free_after; int32_t free_after;
int rc; int rc;
/* allocate descriptor */ /* allocate descriptor */
mca_bml_base_alloc(bml_btl, &descriptor, sizeof(mca_pml_dr_match_hdr_t) + size); mca_bml_base_alloc(bml_btl, &descriptor, sizeof(mca_pml_dr_match_hdr_t) + size);
if(NULL == descriptor) { if(NULL == descriptor) {
@ -463,16 +469,17 @@ int mca_pml_dr_send_request_start_copy(
hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id; hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t)); hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t));
/* update lengths */ /* vfrag status */
segment->seg_len = sizeof(mca_pml_dr_match_hdr_t) + max_data;
sendreq->req_vfrag0.vf_size = max_data; sendreq->req_vfrag0.vf_size = max_data;
sendreq->req_vfrag0.bml_btl = bml_btl; sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = false; sendreq->req_vfrag0.vf_rndv = false;
sendreq->req_vfrag0.vf_mask_pending = 1;
/* short message */ /* short message */
descriptor->des_cbfunc = mca_pml_dr_match_completion; descriptor->des_cbfunc = mca_pml_dr_match_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_cbdata = sendreq; descriptor->des_cbdata = sendreq;
segment->seg_len = sizeof(mca_pml_dr_match_hdr_t) + max_data;
/* signal request completion */ /* signal request completion */
OPAL_THREAD_LOCK(&ompi_request_lock); OPAL_THREAD_LOCK(&ompi_request_lock);
@ -538,10 +545,11 @@ int mca_pml_dr_send_request_start_prepare(
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_cbdata = sendreq; descriptor->des_cbdata = sendreq;
/* update lengths */ /* vfrag state */
sendreq->req_vfrag0.vf_size = size; sendreq->req_vfrag0.vf_size = size;
sendreq->req_vfrag0.bml_btl = bml_btl; sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = false; sendreq->req_vfrag0.vf_rndv = false;
sendreq->req_vfrag0.vf_mask_pending = 1;
/* send */ /* send */
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML); rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
@ -598,23 +606,26 @@ int mca_pml_dr_send_request_start_rndv(
hdr->hdr_common.hdr_flags = flags; hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0; hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : OPAL_CSUM_ZERO; hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : OPAL_CSUM_ZERO;
hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t)); hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t));
/* first fragment of a long message */ /* first fragment of a long message */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbdata = sendreq; des->des_cbdata = sendreq;
des->des_cbfunc = mca_pml_dr_rndv_completion; des->des_cbfunc = mca_pml_dr_rndv_completion;
/* vfrag state */
sendreq->req_vfrag0.vf_size = size; sendreq->req_vfrag0.vf_size = size;
sendreq->req_vfrag0.bml_btl = bml_btl; sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = true; sendreq->req_vfrag0.vf_rndv = true;
sendreq->req_vfrag0.vf_mask_pending = 1;
/* send */ /* send */
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0); MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
@ -625,8 +636,6 @@ int mca_pml_dr_send_request_start_rndv(
return rc; return rc;
} }
/** /**
* Schedule pipeline of send descriptors for the given request, * Schedule pipeline of send descriptors for the given request,
* using send protocol. * using send protocol.
@ -724,7 +733,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
assert(hdr->hdr_frag_offset < sendreq->req_send.req_bytes_packed); assert(hdr->hdr_frag_offset < sendreq->req_send.req_bytes_packed);
vfrag->vf_mask_pending |= (1 << vfrag->vf_idx); vfrag->vf_mask_pending |= ((uint64_t)1 << vfrag->vf_idx);
/* update state */ /* update state */
vfrag->vf_idx++; vfrag->vf_idx++;
@ -747,9 +756,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
sendreq->req_send_offset -= size; sendreq->req_send_offset -= size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1); OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1);
mca_bml_base_free(bml_btl,des); mca_bml_base_free(bml_btl,des);
OPAL_THREAD_LOCK(&mca_pml_dr.lock); OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
break; break;
} }
mca_pml_dr_progress(); mca_pml_dr_progress();
@ -767,7 +776,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
*/ */
while(vfrag->vf_idx < vfrag->vf_len && while(vfrag->vf_idx < vfrag->vf_len &&
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) { sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
if(((1 << vfrag->vf_idx) & vfrag->vf_mask) == 0) { if(((uint64_t)1 << vfrag->vf_idx) & vfrag->vf_retrans) {
mca_bml_base_btl_t* bml_btl = vfrag->bml_btl; mca_bml_base_btl_t* bml_btl = vfrag->bml_btl;
mca_pml_dr_frag_hdr_t* hdr; mca_pml_dr_frag_hdr_t* hdr;
mca_btl_base_descriptor_t* des; mca_btl_base_descriptor_t* des;
@ -794,9 +803,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
&des &des
); );
if(des == NULL) { if(des == NULL) {
OPAL_THREAD_LOCK(&mca_pml_dr.lock); OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
break; break;
} }
des->des_cbfunc = mca_pml_dr_frag_completion; des->des_cbfunc = mca_pml_dr_frag_completion;
@ -819,14 +828,11 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv; hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t)); hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
vfrag->vf_mask_pending |= (1 << vfrag->vf_idx); vfrag->vf_mask_pending |= ((uint64_t)1 << vfrag->vf_idx);
vfrag->vf_rndv = false; vfrag->vf_rndv = false;
/* update state */
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
/* reset the vfrag watchdog timer due to retransmission */
MCA_PML_DR_VFRAG_WDOG_RESET(vfrag);
/* initiate send - note that this may complete before the call returns */ /* initiate send - note that this may complete before the call returns */
rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML); rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML);
@ -835,9 +841,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
} else { } else {
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1); OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1);
mca_bml_base_free(bml_btl,des); mca_bml_base_free(bml_btl,des);
OPAL_THREAD_LOCK(&mca_pml_dr.lock); OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
break; break;
} }
} }
@ -869,10 +875,10 @@ void mca_pml_dr_send_request_match_ack(
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
OPAL_THREAD_LOCK(&ompi_request_lock); OPAL_THREAD_LOCK(&ompi_request_lock);
assert(vfrag->vf_ack == 0);
assert(vfrag->vf_ack == 0);
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask; vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
if (vfrag->vf_mask_pending == 0) { if (vfrag->vf_mask_pending == 0) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag); MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
/* need to retransmit? */ /* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) { if(vfrag->vf_ack != vfrag->vf_mask) {
@ -887,10 +893,8 @@ void mca_pml_dr_send_request_match_ack(
/* update statistics */ /* update statistics */
sendreq->req_bytes_delivered = vfrag->vf_size; sendreq->req_bytes_delivered = vfrag->vf_size;
/* stash the vfid for duplicate acks.. */
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id); mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
/* wait for local completion */ /* wait for local completion */
@ -901,12 +905,12 @@ void mca_pml_dr_send_request_match_ack(
} else { } else {
vfrag->vf_recv = ack->hdr_dst_ptr; vfrag->vf_recv = ack->hdr_dst_ptr;
} }
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
/** /**
* Acknowledgment of rndv vfrag. * Acknowledgment of rendezvous vfrag.
*/ */
void mca_pml_dr_send_request_rndv_ack( void mca_pml_dr_send_request_rndv_ack(
@ -930,6 +934,7 @@ void mca_pml_dr_send_request_rndv_ack(
if(vfrag->vf_ack != vfrag->vf_mask) { if(vfrag->vf_ack != vfrag->vf_mask) {
/* got a NACK, resend eager data! */ /* got a NACK, resend eager data! */
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag); MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else { } else {
/* return descriptor of first fragment */ /* return descriptor of first fragment */
if(NULL != sendreq->descriptor) { if(NULL != sendreq->descriptor) {
@ -946,15 +951,13 @@ void mca_pml_dr_send_request_rndv_ack(
sendreq->req_send_offset = ack->hdr_vlen; sendreq->req_send_offset = ack->hdr_vlen;
schedule = true; schedule = true;
} }
/* stash the vfid for duplicate acks.. */ /* stash the vfrag id for duplicate acks.. */
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id); mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
OPAL_THREAD_UNLOCK(&ompi_request_lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(schedule) { if(schedule) {
mca_pml_dr_send_request_schedule(sendreq); mca_pml_dr_send_request_schedule(sendreq);
} }
} }
/* wait for local completion */ /* wait for local completion */
@ -964,10 +967,6 @@ void mca_pml_dr_send_request_rndv_ack(
vfrag->vf_retrans = vfrag->vf_mask; vfrag->vf_retrans = vfrag->vf_mask;
} else { } else {
/* may need this to schedule rest of the message */ /* may need this to schedule rest of the message */
if(sendreq->req_send.req_bytes_packed > 0) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
printf("blah");
}
vfrag->vf_recv = ack->hdr_dst_ptr; vfrag->vf_recv = ack->hdr_dst_ptr;
sendreq->req_send_offset = ack->hdr_vlen; sendreq->req_send_offset = ack->hdr_vlen;
sendreq->req_bytes_delivered = ack->hdr_vlen; sendreq->req_bytes_delivered = ack->hdr_vlen;
@ -994,18 +993,17 @@ void mca_pml_dr_send_request_frag_ack(
vfrag->vf_ack |= (ack->hdr_vmask & vfrag->vf_mask); vfrag->vf_ack |= (ack->hdr_vmask & vfrag->vf_mask);
/* need to retransmit? */ /* need to retransmit? */
if((vfrag->vf_ack & vfrag->vf_mask) != vfrag->vf_mask) { if(vfrag->vf_ack != vfrag->vf_mask) {
/* reset local completion flags to only those that have been successfully acked */ /* reset local completion flags to only those that have been successfully acked */
vfrag->vf_mask = ~vfrag->vf_ack; vfrag->vf_retrans = ~vfrag->vf_ack;
vfrag->vf_idx = 0; vfrag->vf_idx = 0;
vfrag->vf_mask_pending = 0; vfrag->vf_mask_pending = 0;
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
schedule = true; schedule = true;
/* acked and local completion */ /* acked and local completion */
} else if (vfrag->vf_mask_pending == 0 } else if (vfrag->vf_mask_pending == 0 && vfrag->vf_idx == vfrag->vf_len) {
&& vfrag->vf_idx == vfrag->vf_len) {
/* update statistics */ /* update statistics */
sendreq->req_bytes_delivered += vfrag->vf_size; sendreq->req_bytes_delivered += vfrag->vf_size;
@ -1013,7 +1011,6 @@ void mca_pml_dr_send_request_frag_ack(
/* stash the vfid for duplicate acks.. */ /* stash the vfid for duplicate acks.. */
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id); mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
/* return vfrag */ /* return vfrag */
MCA_PML_DR_VFRAG_RETURN(vfrag); MCA_PML_DR_VFRAG_RETURN(vfrag);

Просмотреть файл

@ -329,10 +329,10 @@ do { \
/* advance pending requests */ \ /* advance pending requests */ \
while(opal_list_get_size(&mca_pml_dr.send_pending)) { \ while(opal_list_get_size(&mca_pml_dr.send_pending)) { \
mca_pml_dr_send_request_t* sendreq; \ mca_pml_dr_send_request_t* sendreq; \
OPAL_THREAD_LOCK(&mca_pml_dr.lock); \ OPAL_THREAD_LOCK(&ompi_request_lock); \
sendreq = (mca_pml_dr_send_request_t*) \ sendreq = (mca_pml_dr_send_request_t*) \
opal_list_remove_first(&mca_pml_dr.send_pending); \ opal_list_remove_first(&mca_pml_dr.send_pending); \
OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); \ OPAL_THREAD_UNLOCK(&ompi_request_lock); \
if(NULL == sendreq) \ if(NULL == sendreq) \
break; \ break; \
mca_pml_dr_send_request_schedule(sendreq); \ mca_pml_dr_send_request_schedule(sendreq); \
@ -346,8 +346,7 @@ do { \
#define MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag) \ #define MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag) \
do { \ do { \
mca_bml_base_endpoint_t* endpoint = \ mca_bml_base_endpoint_t* endpoint = sendreq->req_endpoint; \
(mca_bml_base_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml; \
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \ mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
mca_btl_base_descriptor_t *des_old, *des_new; \ mca_btl_base_descriptor_t *des_old, *des_new; \
vfrag->vf_retry_cnt ++; \ vfrag->vf_retry_cnt ++; \
@ -367,7 +366,6 @@ do { \
des_new->des_flags = des_old->des_flags; \ des_new->des_flags = des_old->des_flags; \
des_new->des_cbdata = des_old->des_cbdata; \ des_new->des_cbdata = des_old->des_cbdata; \
des_new->des_cbfunc = des_old->des_cbfunc; \ des_new->des_cbfunc = des_old->des_cbfunc; \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
MCA_PML_DR_VFRAG_ACK_START(vfrag); \ MCA_PML_DR_VFRAG_ACK_START(vfrag); \
mca_bml_base_send(bml_btl, des_new, MCA_BTL_TAG_PML); \ mca_bml_base_send(bml_btl, des_new, MCA_BTL_TAG_PML); \
} while(0) } while(0)
@ -378,8 +376,7 @@ do { \
#define MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag) \ #define MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag) \
do { \ do { \
mca_bml_base_endpoint_t* endpoint = \ mca_bml_base_endpoint_t* endpoint = sendreq->req_endpoint; \
(mca_bml_base_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml; \
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \ mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
mca_btl_base_descriptor_t *des_old, *des_new; \ mca_btl_base_descriptor_t *des_old, *des_new; \
mca_pml_dr_hdr_t *hdr; \ mca_pml_dr_hdr_t *hdr; \