1
1
This commit was SVN r9399.
Этот коммит содержится в:
Tim Woodall 2006-03-23 22:08:59 +00:00
родитель c38fd90e63
Коммит 996a1b56df
6 изменённых файлов: 63 добавлений и 80 удалений

Просмотреть файл

@ -278,13 +278,13 @@ void mca_pml_dr_recv_request_progress(
/* update the mask to show that this vfrag was received,
* note that it might still fail the checksum though
*/
vfrag->vf_mask_pending |= bit;
vfrag->vf_pending |= bit;
if(csum == hdr->hdr_frag.hdr_frag_csum) {
/* this part of the vfrag passed the checksum,
mark it so that we ack it after receiving the
entire vfrag */
vfrag->vf_ack |= bit;
if((vfrag->vf_mask_pending & vfrag->vf_mask) == vfrag->vf_mask) {
if((vfrag->vf_pending & vfrag->vf_mask) == vfrag->vf_mask) {
/* we have received all the pieces of the vfrag, ack
everything that passed the checksum */
ompi_seq_tracker_insert(&recvreq->req_proc->seq_recvs, vfrag->vf_id);

Просмотреть файл

@ -372,12 +372,9 @@ do { \
if(NULL == vfrag) { \
MCA_PML_DR_VFRAG_ALLOC(vfrag,rc); \
if(NULL != vfrag) { \
MCA_PML_DR_VFRAG_INIT(vfrag); \
(vfrag)->vf_id = (hdr)->hdr_common.hdr_vid; \
(vfrag)->vf_len = (hdr)->hdr_vlen; \
(vfrag)->vf_ack = 0; \
(vfrag)->vf_mask_pending = 0; \
(vfrag)->vf_retrans = 0; \
(vfrag)->vf_retry_cnt = 0; \
if((hdr)->hdr_vlen == 64) { \
(vfrag)->vf_mask = ~(uint64_t)0; \
} else { \

Просмотреть файл

@ -46,7 +46,6 @@ static inline int mca_pml_dr_send_request_free(struct ompi_request_t** request)
mca_pml_dr_send_request_t* sendreq = *(mca_pml_dr_send_request_t**)request;
assert( false == sendreq->req_send.req_base.req_free_called );
OPAL_THREAD_LOCK(&ompi_request_lock);
sendreq->req_send.req_base.req_free_called = true;
if( true == sendreq->req_send.req_base.req_pml_complete ) {
@ -73,7 +72,7 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req)
req->req_vfrag0.vf_idx = 0;
req->req_vfrag0.vf_ack = 0;
req->req_vfrag0.vf_mask = 1;
req->req_vfrag0.vf_mask_pending = 0;
req->req_vfrag0.vf_pending = 0;
req->req_vfrag0.vf_send.pval = req;
req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND;
req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free;
@ -119,12 +118,12 @@ static void mca_pml_dr_match_completion(
orte_errmgr.abort();
}
OPAL_THREAD_LOCK(&ompi_request_lock);
/* local completion */
vfrag->vf_mask_pending = 0;
/* wait for local completion */
if(OPAL_THREAD_ADD64(&vfrag->vf_pending,-1) > 0)
return;
/* wait for positive ack to complete request */
OPAL_THREAD_LOCK(&ompi_request_lock);
if(vfrag->vf_ack == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
@ -140,10 +139,10 @@ static void mca_pml_dr_match_completion(
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
/* on negative ack need to retransmit */
} else if(vfrag->vf_retrans) {
} else if(vfrag->vf_state & MCA_PML_DR_VFRAG_NACKED) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
}
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* check for pending requests */
@ -176,12 +175,13 @@ static void mca_pml_dr_rndv_completion(
opal_output(0, "%s:%d FATAL", __FILE__, __LINE__);
orte_errmgr.abort();
}
OPAL_THREAD_LOCK(&ompi_request_lock);
/* local completion */
vfrag->vf_mask_pending = 0;
if(OPAL_THREAD_ADD64(&vfrag->vf_pending,-1) > 0)
return;
/* wait for positive ack to complete request */
OPAL_THREAD_LOCK(&ompi_request_lock);
if(vfrag->vf_ack == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
if(sendreq->descriptor) {
@ -202,7 +202,7 @@ static void mca_pml_dr_rndv_completion(
}
/* on negative ack need to retransmit */
} else if(vfrag->vf_retrans) {
} else if(vfrag->vf_state & MCA_PML_DR_VFRAG_NACKED) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
@ -229,7 +229,6 @@ static void mca_pml_dr_frag_completion(
mca_bml_base_btl_t* bml_btl = vfrag->bml_btl;
mca_pml_dr_frag_hdr_t* hdr = (mca_pml_dr_frag_hdr_t*)descriptor->des_src->seg_addr.pval;
bool schedule = false;
uint64_t bit;
/* check completion status */
if(OMPI_SUCCESS != status) {
@ -238,15 +237,12 @@ static void mca_pml_dr_frag_completion(
orte_errmgr.abort();
}
OPAL_THREAD_LOCK(&ompi_request_lock);
bit = ((uint64_t)1 << hdr->hdr_frag_idx);
vfrag->vf_mask_pending &= ~bit;
/* have all pending frags completed for this vfrag? */
if(vfrag->vf_mask_pending == 0) {
if(OPAL_THREAD_ADD64(&vfrag->vf_pending,-1) == 0) {
MCA_PML_DR_VFRAG_WDOG_STOP(vfrag);
/* has the vfrag already been acked */
OPAL_THREAD_LOCK(&ompi_request_lock);
if (vfrag->vf_ack == vfrag->vf_mask) {
sendreq->req_bytes_delivered += vfrag->vf_size;
@ -345,8 +341,8 @@ int mca_pml_dr_send_request_start_buffered(
segment->seg_len = sizeof(mca_pml_dr_rendezvous_hdr_t) + max_data;
sendreq->req_vfrag0.vf_size = max_data;
sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = true;
sendreq->req_vfrag0.vf_mask_pending = 1;
sendreq->req_vfrag0.vf_state |= MCA_PML_DR_VFRAG_RNDV;
sendreq->req_vfrag0.vf_pending = 1;
descriptor->des_cbfunc = mca_pml_dr_rndv_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -473,8 +469,7 @@ int mca_pml_dr_send_request_start_copy(
/* vfrag status */
sendreq->req_vfrag0.vf_size = max_data;
sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = false;
sendreq->req_vfrag0.vf_mask_pending = 1;
sendreq->req_vfrag0.vf_pending = 1;
/* short message */
descriptor->des_cbfunc = mca_pml_dr_match_completion;
@ -493,7 +488,6 @@ int mca_pml_dr_send_request_start_copy(
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, descriptor );
}
return rc;
}
@ -549,8 +543,7 @@ int mca_pml_dr_send_request_start_prepare(
/* vfrag state */
sendreq->req_vfrag0.vf_size = size;
sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = false;
sendreq->req_vfrag0.vf_mask_pending = 1;
sendreq->req_vfrag0.vf_pending = 1;
/* send */
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
@ -625,12 +618,12 @@ int mca_pml_dr_send_request_start_rndv(
/* vfrag state */
sendreq->req_vfrag0.vf_size = size;
sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = true;
sendreq->req_vfrag0.vf_mask_pending = 1;
sendreq->req_vfrag0.vf_state |= MCA_PML_DR_VFRAG_RNDV;
sendreq->req_vfrag0.vf_pending = 1;
/* send */
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, des );
}
@ -734,13 +727,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
assert(hdr->hdr_frag_offset < sendreq->req_send.req_bytes_packed);
vfrag->vf_mask_pending |= ((uint64_t)1 << vfrag->vf_idx);
/* update state */
vfrag->vf_idx++;
vfrag->vf_rndv = false;
OPAL_THREAD_ADD64(&vfrag->vf_pending,1);
sendreq->req_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
@ -829,9 +818,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
vfrag->vf_mask_pending |= ((uint64_t)1 << vfrag->vf_idx);
vfrag->vf_rndv = false;
OPAL_THREAD_ADD64(&vfrag->vf_pending, 1);
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
/* initiate send - note that this may complete before the call returns */
@ -877,9 +864,8 @@ void mca_pml_dr_send_request_match_ack(
OPAL_THREAD_LOCK(&ompi_request_lock);
assert(vfrag->vf_ack == 0);
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
if (vfrag->vf_mask_pending == 0) {
if (vfrag->vf_pending == 0) {
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
@ -901,8 +887,8 @@ void mca_pml_dr_send_request_match_ack(
/* wait for local completion */
} else {
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
vfrag->vf_retrans = vfrag->vf_mask;
if (vfrag->vf_ack != vfrag->vf_mask) {
vfrag->vf_state |= MCA_PML_DR_VFRAG_NACKED;
} else {
vfrag->vf_recv = ack->hdr_dst_ptr;
}
@ -927,7 +913,7 @@ void mca_pml_dr_send_request_rndv_ack(
vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask;
/* local completion? */
if (vfrag->vf_mask_pending == 0) {
if (vfrag->vf_pending == 0) {
bool schedule = false;
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
@ -965,9 +951,9 @@ void mca_pml_dr_send_request_rndv_ack(
} else {
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
vfrag->vf_retrans = vfrag->vf_mask;
vfrag->vf_state |= MCA_PML_DR_VFRAG_NACKED;
} else {
/* may need this to schedule rest of the message */
/* will need this to schedule rest of the message */
vfrag->vf_recv = ack->hdr_dst_ptr;
sendreq->req_send_offset = ack->hdr_vlen;
sendreq->req_bytes_delivered = ack->hdr_vlen;
@ -995,13 +981,14 @@ void mca_pml_dr_send_request_frag_ack(
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
MCA_PML_DR_VFRAG_WDOG_RESET(vfrag);
vfrag->vf_idx = 0;
vfrag->vf_mask_pending = 0;
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
schedule = true;
/* acked and local completion */
} else if (vfrag->vf_mask_pending == 0 && vfrag->vf_idx == vfrag->vf_len) {
} else if (vfrag->vf_pending == 0 && vfrag->vf_idx == vfrag->vf_len) {
/* update statistics */
sendreq->req_bytes_delivered += vfrag->vf_size;

Просмотреть файл

@ -154,6 +154,15 @@ do {
break; \
} \
\
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
MCA_PML_DR_VFRAG_INIT(&sendreq->req_vfrag0); \
sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
sendreq->req_vfrag0.vf_max_send_size = endpoint->btl_max_send_size - \
sizeof(mca_pml_dr_frag_hdr_t); \
sendreq->req_vfrag = &sendreq->req_vfrag0; \
sendreq->req_endpoint = endpoint; \
sendreq->req_proc = proc; \
\
sendreq->req_lock = 0; \
sendreq->req_pipeline_depth = 0; \
sendreq->req_bytes_delivered = 0; \
@ -164,14 +173,8 @@ do {
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
sendreq->req_send.req_base.req_ompi.req_status._cancelled = 0; \
sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32(&proc->send_sequence,1); \
sendreq->req_endpoint = endpoint; \
sendreq->req_proc = proc; \
MCA_PML_DR_VFRAG_INIT(&sendreq->req_vfrag0); \
sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
sendreq->req_vfrag = &sendreq->req_vfrag0; \
\
/* select a btl */ \
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
eager_limit = bml_btl->btl_eager_limit - sizeof(mca_pml_dr_hdr_t); \
if(size <= eager_limit) { \
switch(sendreq->req_send.req_send_mode) { \
@ -277,6 +280,7 @@ do {
size_t max_send_size = endpoint->btl_max_send_size - sizeof(mca_pml_dr_frag_hdr_t); \
size_t div = size / max_send_size; \
\
MCA_PML_DR_VFRAG_INIT(vfrag); \
if(div == 0) { \
vfrag->vf_len = 1; \
vfrag->vf_size = size; \
@ -299,14 +303,8 @@ do {
else \
vfrag->vf_mask = (((uint64_t)1 << vfrag->vf_len) - (uint64_t)1); \
} \
\
vfrag->vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
vfrag->vf_offset = sendreq->req_send_offset; \
vfrag->vf_ack = 0; \
vfrag->vf_idx = 0; \
vfrag->vf_mask_pending = 0; \
vfrag->vf_retrans = 0; \
vfrag->vf_retry_cnt = 0; \
vfrag->vf_max_send_size = max_send_size; \
vfrag->vf_send.pval = sendreq; \
sendreq->req_vfrag = vfrag; \
@ -349,20 +347,20 @@ do { \
mca_bml_base_endpoint_t* endpoint = sendreq->req_endpoint; \
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
mca_btl_base_descriptor_t *des_old, *des_new; \
vfrag->vf_retry_cnt ++; \
if(vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \
\
if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \
opal_output(0, "%s:%d,%s retry count exceeded! FATAL", __FILE__, __LINE__, __func__); \
orte_errmgr.abort(); \
} \
OPAL_THREAD_ADD64(&vfrag->vf_pending,1); \
\
OPAL_OUTPUT((0, "%s:%d:%s, retransmitting eager\n", __FILE__, __LINE__, __func__)); \
assert(sendreq->descriptor->des_src != NULL); \
MCA_PML_DR_VFRAG_RESET(vfrag); \
vfrag->vf_mask_pending = 1; \
des_old = sendreq->descriptor; \
mca_bml_base_alloc(bml_btl, &des_new, des_old->des_src->seg_len);\
memcpy(des_new->des_src->seg_addr.pval, \
des_old->des_src->seg_addr.pval, \
memcpy(des_new->des_src->seg_addr.pval, \
des_old->des_src->seg_addr.pval, \
des_old->des_src->seg_len); \
des_new->des_flags = des_old->des_flags; \
des_new->des_cbdata = des_old->des_cbdata; \
@ -381,16 +379,16 @@ do { \
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
mca_btl_base_descriptor_t *des_old, *des_new; \
mca_pml_dr_hdr_t *hdr; \
vfrag->vf_retry_cnt ++; \
if(vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \
\
if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \
opal_output(0, "%s:%d,%s retry count exceeded! FATAL", __FILE__, __LINE__, __func__); \
orte_errmgr.abort(); \
} \
OPAL_THREAD_ADD64(&vfrag->vf_pending,1); \
\
opal_output(0, "%s:%d:%s, (re)transmitting rndv probe\n", __FILE__, __LINE__, __func__); \
assert(sendreq->descriptor->des_src != NULL); \
MCA_PML_DR_VFRAG_RESET(vfrag); \
vfrag->vf_mask_pending = 1; \
mca_bml_base_alloc(bml_btl, &des_new, \
sizeof(mca_pml_dr_rendezvous_hdr_t)); \
des_old = sendreq->descriptor; \
@ -411,7 +409,6 @@ do { \
des_new->des_flags = des_old->des_flags; \
des_new->des_cbdata = des_old->des_cbdata; \
des_new->des_cbfunc = des_old->des_cbfunc; \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
MCA_PML_DR_VFRAG_ACK_START(vfrag); \
mca_bml_base_send(bml_btl, des_new, MCA_BTL_TAG_PML); \
} while(0)

Просмотреть файл

@ -36,8 +36,8 @@ static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag)
vfrag->vf_max_send_size = 0;
vfrag->vf_ack = 0;
vfrag->vf_mask = 0;
vfrag->vf_retrans = 0;
vfrag->vf_retry_cnt = 0;
vfrag->vf_state = 0;
vfrag->tv_wdog.tv_sec = mca_pml_dr.timer_wdog_sec;
vfrag->tv_wdog.tv_usec = mca_pml_dr.timer_wdog_usec;
vfrag->tv_ack.tv_sec = mca_pml_dr.timer_ack_usec;
@ -95,7 +95,7 @@ void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) {
if(0 == vfrag->vf_offset) { /* this is the first part of the message
that we need to resend */
if(vfrag->vf_rndv) {
if(vfrag->vf_state & MCA_PML_DR_VFRAG_RNDV) {
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
} else {
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);

Просмотреть файл

@ -29,6 +29,8 @@
extern "C" {
#endif
#define MCA_PML_DR_VFRAG_NACKED 0x01
#define MCA_PML_DR_VFRAG_RNDV 0x02
struct mca_pml_dr_vfrag_t {
opal_list_item_t super;
@ -43,9 +45,8 @@ struct mca_pml_dr_vfrag_t {
size_t vf_max_send_size;
uint64_t vf_ack;
uint64_t vf_mask;
uint64_t vf_mask_pending;
uint64_t vf_retrans;
bool vf_rndv;
uint64_t vf_pending;
uint32_t vf_state;
struct mca_bml_base_btl_t* bml_btl;
/* we need a timer for the vfrag for:
@ -83,14 +84,15 @@ do { \
MCA_PML_DR_VFRAG_RESET(vfrag); \
(vfrag)->vf_retry_cnt = 0; \
(vfrag)->vf_recv.pval = NULL; \
(vfrag)->vf_state = 0; \
(vfrag)->vf_pending = 0; \
} while(0)
#define MCA_PML_DR_VFRAG_RESET(vfrag) \
do { \
(vfrag)->vf_idx = 0; \
(vfrag)->vf_mask_pending = 0; \
(vfrag)->vf_ack = 0; \
(vfrag)->vf_retrans = 0; \
(vfrag)->vf_state &= ~MCA_PML_DR_VFRAG_NACKED; \
} while(0)
#define MCA_PML_DR_VFRAG_WDOG_START(vfrag) \