1
1
This commit was SVN r9410.
Этот коммит содержится в:
Tim Woodall 2006-03-24 06:49:45 +00:00
родитель dec87e2cea
Коммит 2e376e0ee8
7 изменённых файлов: 98 добавлений и 74 удалений

Просмотреть файл

@ -877,6 +877,7 @@ rematch:
match_made = true;
OBJ_CONSTRUCT(additional_matches, opal_list_t);
}
MCA_PML_DR_RECV_REQUEST_MATCHED(match,comm,proc,&frag->hdr.hdr_match);
opal_list_append(additional_matches, (opal_list_item_t *)frag);
}

Просмотреть файл

@ -111,11 +111,15 @@ static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, i
static void mca_pml_dr_recv_request_construct(mca_pml_dr_recv_request_t* request)
{
OBJ_CONSTRUCT(&request->req_vfrag0, mca_pml_dr_vfrag_t);
OBJ_CONSTRUCT(&request->req_vfrags, opal_list_t);
request->req_vfrag0.vf_len = 1;
request->req_vfrag0.vf_mask = 1;
request->req_vfrag0.vf_recv.pval = request;
request->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV;
request->req_recv.req_base.req_ompi.req_free = mca_pml_dr_recv_request_free;
request->req_recv.req_base.req_ompi.req_cancel = mca_pml_dr_recv_request_cancel;
OBJ_CONSTRUCT(&request->req_vfrag0, mca_pml_dr_vfrag_t);
OBJ_CONSTRUCT(&request->req_vfrags, opal_list_t);
}
static void mca_pml_dr_recv_request_destruct(mca_pml_dr_recv_request_t* request)
@ -274,7 +278,6 @@ void mca_pml_dr_recv_request_progress(
bytes_delivered,
csum);
/* update the mask to show that this vfrag was received,
* note that it might still fail the checksum though
*/

Просмотреть файл

@ -353,8 +353,6 @@ static inline struct mca_pml_dr_recv_request_t* mca_pml_dr_comm_proc_check_match
do { \
if((recvreq)->req_vfrag->vf_id == (hdr)->hdr_common.hdr_vid) { \
vfrag = (recvreq)->req_vfrag; \
} else if ((hdr)->hdr_frag_offset == 0) { \
vfrag = &(recvreq)->req_vfrag0; \
} else { \
opal_list_item_t* item; \
int rc; \

Просмотреть файл

@ -69,10 +69,7 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req)
OBJ_CONSTRUCT(&req->req_retrans, opal_list_t);
req->req_vfrag0.vf_len = 1;
req->req_vfrag0.vf_idx = 0;
req->req_vfrag0.vf_ack = 0;
req->req_vfrag0.vf_mask = 1;
req->req_vfrag0.vf_pending = 0;
req->req_vfrag0.vf_send.pval = req;
req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND;
req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free;
@ -140,7 +137,7 @@ static void mca_pml_dr_match_completion(
/* on negative ack need to retransmit */
} else if(vfrag->vf_state & MCA_PML_DR_VFRAG_NACKED) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
MCA_PML_DR_VFRAG_ACK_RESET(vfrag);
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
@ -203,7 +200,7 @@ static void mca_pml_dr_rndv_completion(
/* on negative ack need to retransmit */
} else if(vfrag->vf_state & MCA_PML_DR_VFRAG_NACKED) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
MCA_PML_DR_VFRAG_ACK_RESET(vfrag);
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
@ -227,7 +224,6 @@ static void mca_pml_dr_frag_completion(
mca_pml_dr_vfrag_t* vfrag = descriptor->des_cbdata;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
mca_bml_base_btl_t* bml_btl = vfrag->bml_btl;
mca_pml_dr_frag_hdr_t* hdr = (mca_pml_dr_frag_hdr_t*)descriptor->des_src->seg_addr.pval;
bool schedule = false;
/* check completion status */
@ -398,8 +394,8 @@ int mca_pml_dr_send_request_start_buffered(
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* send */
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, descriptor );
}
@ -483,8 +479,8 @@ int mca_pml_dr_send_request_start_copy(
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* send */
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, descriptor );
}
@ -546,8 +542,8 @@ int mca_pml_dr_send_request_start_prepare(
sendreq->req_vfrag0.vf_pending = 1;
/* send */
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, descriptor );
}
@ -622,8 +618,8 @@ int mca_pml_dr_send_request_start_rndv(
sendreq->req_vfrag0.vf_pending = 1;
/* send */
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, des );
}
@ -675,6 +671,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
break;
}
MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,bml_endpoint,bytes_remaining,vfrag);
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
vfrag->bml_btl = bml_btl;
offset = 0;
@ -734,10 +731,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
sendreq->req_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
/* start vfrag watchdog timer if this is the first part of the vfrag*/
if(vfrag->vf_idx == 1) {
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
}
/* initiate send - note that this may complete before the call returns */
rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML);
@ -870,6 +863,7 @@ void mca_pml_dr_send_request_match_ack(
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
MCA_PML_DR_VFRAG_ACK_START(vfrag);
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
} else {
/* if already have local completion free descriptor and complete message */
@ -921,6 +915,7 @@ void mca_pml_dr_send_request_rndv_ack(
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
/* got a NACK, resend eager data! */
MCA_PML_DR_VFRAG_ACK_START(vfrag);
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else {
@ -931,15 +926,18 @@ void mca_pml_dr_send_request_rndv_ack(
}
/* done? */
sendreq->req_send_offset = ack->hdr_vlen;
sendreq->req_bytes_delivered = ack->hdr_vlen;
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
} else {
} else {
/* start scheduling with a new vfrag */
vfrag->vf_recv = ack->hdr_dst_ptr;
vfrag->vf_state = 0;
sendreq->req_send_offset = ack->hdr_vlen;
vfrag->vf_size = ack->hdr_vlen;
schedule = true;
}
/* stash the vfrag id for duplicate acks.. */
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
@ -958,6 +956,7 @@ void mca_pml_dr_send_request_rndv_ack(
/* will need this to schedule rest of the message */
vfrag->vf_recv = ack->hdr_dst_ptr;
vfrag->vf_state = 0;
vfrag->vf_size = ack->hdr_vlen;
sendreq->req_send_offset = ack->hdr_vlen;
sendreq->req_bytes_delivered = ack->hdr_vlen;
}
@ -976,6 +975,7 @@ void mca_pml_dr_send_request_frag_ack(
mca_pml_dr_vfrag_t* vfrag = ack->hdr_src_ptr.pval;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
bool schedule = false;
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
OPAL_THREAD_LOCK(&ompi_request_lock);
@ -985,9 +985,7 @@ void mca_pml_dr_send_request_frag_ack(
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
MCA_PML_DR_VFRAG_WDOG_RESET(vfrag);
vfrag->vf_idx = 0;
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq,vfrag);
schedule = true;
/* acked and local completion */

Просмотреть файл

@ -157,8 +157,6 @@ do {
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
MCA_PML_DR_VFRAG_INIT(&sendreq->req_vfrag0); \
sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
sendreq->req_vfrag0.vf_max_send_size = endpoint->btl_max_send_size - \
sizeof(mca_pml_dr_frag_hdr_t); \
sendreq->req_vfrag = &sendreq->req_vfrag0; \
sendreq->req_endpoint = endpoint; \
sendreq->req_proc = proc; \
@ -310,13 +308,24 @@ do {
sendreq->req_vfrag = vfrag; \
} while(0)
/*
* Reschedule unacked fragments
*/
#define MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq, vfrag) \
do { \
vfrag->vf_idx = 0; \
vfrag->vf_state = 0; \
opal_list_append(&(sendreq)->req_retrans, (opal_list_item_t*)vfrag); \
} while(0)
/*
* Update bytes delivered on request based on supplied descriptor
*/
#define MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, vfrag, hdrlen) \
do { \
sendreq->req_bytes_delivered += vfrag->vf_size; \
#define MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, vfrag, hdrlen) \
do { \
sendreq->req_bytes_delivered += vfrag->vf_size; \
} while(0)
/*
* Attempt to process any pending requests
@ -365,7 +374,6 @@ do { \
des_new->des_flags = des_old->des_flags; \
des_new->des_cbdata = des_old->des_cbdata; \
des_new->des_cbfunc = des_old->des_cbfunc; \
MCA_PML_DR_VFRAG_ACK_START(vfrag); \
mca_bml_base_send(bml_btl, des_new, MCA_BTL_TAG_PML); \
} while(0)
@ -409,7 +417,6 @@ do { \
des_new->des_flags = des_old->des_flags; \
des_new->des_cbdata = des_old->des_cbdata; \
des_new->des_cbfunc = des_old->des_cbfunc; \
MCA_PML_DR_VFRAG_ACK_START(vfrag); \
mca_bml_base_send(bml_btl, des_new, MCA_BTL_TAG_PML); \
} while(0)

Просмотреть файл

@ -35,7 +35,7 @@ static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag)
vfrag->vf_size = 0;
vfrag->vf_max_send_size = 0;
vfrag->vf_ack = 0;
vfrag->vf_mask = 0;
vfrag->vf_mask = 1;
vfrag->vf_retry_cnt = 0;
vfrag->vf_state = 0;
vfrag->tv_wdog.tv_sec = mca_pml_dr.timer_wdog_sec;
@ -68,14 +68,26 @@ void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
{
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
opal_output(0, "%s:%d:%s, wdog timeout!", __FILE__, __LINE__, __func__);
OPAL_THREAD_LOCK(&ompi_request_lock);
if(vfrag->vf_retry_cnt > mca_pml_dr.timer_wdog_max_count) {
opal_output(0, "%s:%d:%s, wdog retry count exceeded! FATAL", __FILE__, __LINE__, __func__);
OPAL_OUTPUT((0, "%s:%d:%s: wdog timeout!", __FILE__, __LINE__, __func__));
if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_wdog_max_count) {
opal_output(0, "%s:%d:%s retry count exceeded! FATAL", __FILE__, __LINE__, __func__);
orte_errmgr.abort();
}
MCA_PML_DR_VFRAG_RESET(vfrag);
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
/* back off watchdog timer */
vfrag->tv_wdog.tv_sec =
mca_pml_dr.timer_wdog_sec +
mca_pml_dr.timer_wdog_sec * mca_pml_dr.timer_wdog_multiplier *
vfrag->vf_retry_cnt;
vfrag->tv_wdog.tv_usec =
mca_pml_dr.timer_wdog_usec +
mca_pml_dr.timer_wdog_usec * mca_pml_dr.timer_wdog_multiplier *
vfrag->vf_retry_cnt;
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
/* retransmit vfrag */
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq, vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_schedule(sendreq);
}
@ -86,25 +98,25 @@ void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) {
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
opal_output(0, "%s:%d:%s, ack timeout!", __FILE__, __LINE__, __func__);
OPAL_THREAD_LOCK(&ompi_request_lock);
if(vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) {
opal_output(0, "%s:%d: maximum ack retry count exceeded: FATAL", __FILE__, __LINE__);
orte_errmgr.abort();
}
opal_output(0, "%s:%d:%s: ack timeout!", __FILE__, __LINE__, __func__);
if(0 == vfrag->vf_offset) { /* this is the first part of the message
that we need to resend */
if(vfrag->vf_state & MCA_PML_DR_VFRAG_RNDV) {
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
} else {
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
}
} else {
MCA_PML_DR_VFRAG_RESET(vfrag);
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_LOCK(&ompi_request_lock);
/* first frag within send request */
if(vfrag == &sendreq->req_vfrag0) {
MCA_PML_DR_VFRAG_ACK_START(vfrag);
if(vfrag->vf_state & MCA_PML_DR_VFRAG_RNDV) {
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
} else {
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* reschedule unacked portion of vfrag */
} else {
MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq, vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
mca_pml_dr_send_request_schedule(sendreq);
}
}

Просмотреть файл

@ -81,7 +81,8 @@ do { \
#define MCA_PML_DR_VFRAG_INIT(vfrag) \
do { \
MCA_PML_DR_VFRAG_RESET(vfrag); \
(vfrag)->vf_idx = 0; \
(vfrag)->vf_ack = 0; \
(vfrag)->vf_retry_cnt = 0; \
(vfrag)->vf_recv.pval = NULL; \
(vfrag)->vf_state = 0; \
@ -91,35 +92,35 @@ do { \
#define MCA_PML_DR_VFRAG_RESET(vfrag) \
do { \
(vfrag)->vf_idx = 0; \
(vfrag)->vf_ack = 0; \
(vfrag)->vf_state &= ~MCA_PML_DR_VFRAG_NACKED; \
} while(0)
/*
* Watchdog Timer
*/
#define MCA_PML_DR_VFRAG_WDOG_START(vfrag) \
do { \
opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \
} while(0)
#define MCA_PML_DR_VFRAG_WDOG_RESET(vfrag) \
do { \
opal_event_del(&vfrag->ev_wdog); \
vfrag->tv_wdog.tv_sec = \
mca_pml_dr.timer_wdog_sec + \
mca_pml_dr.timer_wdog_sec * mca_pml_dr.timer_wdog_multiplier * \
vfrag->vf_retry_cnt; \
vfrag->tv_wdog.tv_usec = \
mca_pml_dr.timer_wdog_usec + \
mca_pml_dr.timer_wdog_usec * mca_pml_dr.timer_wdog_multiplier * \
vfrag->vf_retry_cnt; \
opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \
} while(0)
#define MCA_PML_DR_VFRAG_WDOG_STOP(vfrag) \
do { \
opal_event_del(&vfrag->ev_wdog); \
\
} while(0)
#define MCA_PML_DR_VFRAG_WDOG_RESET(vfrag) \
do { \
opal_event_del(&vfrag->ev_wdog); \
opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \
} while(0)
/*
* Ack Timer
*/
#define MCA_PML_DR_VFRAG_ACK_START(vfrag) \
do { \
(vfrag)->tv_ack.tv_sec = \
@ -131,15 +132,19 @@ do { \
mca_pml_dr.timer_ack_usec * mca_pml_dr.timer_ack_multiplier * \
(vfrag)->vf_retry_cnt; \
opal_event_add(&(vfrag)->ev_ack, &(vfrag)->tv_ack); \
\
} while(0)
#define MCA_PML_DR_VFRAG_ACK_STOP(vfrag) \
do { \
opal_event_del(&vfrag->ev_ack); \
\
} while(0)
#define MCA_PML_DR_VFRAG_ACK_RESET(vfrag) \
do { \
MCA_PML_DR_VFRAG_ACK_STOP(vfrag); \
MCA_PML_DR_VFRAG_ACK_START(vfrag); \
} while(0)
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif