Fixes for DR as well as better diagnostic..
Successfully passing the intel test suite with/without induced errors/drops. This commit was SVN r10518.
Этот коммит содержится в:
родитель
b34768962c
Коммит
8855e5b73a
@ -86,6 +86,7 @@ bool ompi_seq_tracker_check_duplicate(
|
||||
/*
|
||||
* insert item into sequence tracking list,
|
||||
* compacts continuous regions into a single entry
|
||||
* GMS::: Use a free list for the items (don't do OBJ_NEW)!
|
||||
*/
|
||||
void ompi_seq_tracker_insert(ompi_seq_tracker_t* seq_tracker,
|
||||
uint32_t seq_id)
|
||||
@ -146,9 +147,10 @@ void ompi_seq_tracker_insert(ompi_seq_tracker_t* seq_tracker,
|
||||
/* we have gone back in the list, and we went one item too far */
|
||||
new_item = OBJ_NEW(ompi_seq_tracker_range_t);
|
||||
new_item->seq_id_low = new_item->seq_id_high = seq_id;
|
||||
next_item = (ompi_seq_tracker_range_t*) opal_list_get_next(item);
|
||||
/* insert new_item directly before item */
|
||||
opal_list_insert_pos(seq_ids,
|
||||
(opal_list_item_t*) item,
|
||||
(opal_list_item_t*) next_item,
|
||||
(opal_list_item_t*) new_item);
|
||||
seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) new_item;
|
||||
return;
|
||||
@ -160,14 +162,11 @@ void ompi_seq_tracker_insert(ompi_seq_tracker_t* seq_tracker,
|
||||
if(direction == 1) {
|
||||
/* we have gone forward in the list, and we went one item too far */
|
||||
new_item = OBJ_NEW(ompi_seq_tracker_range_t);
|
||||
next_item = (ompi_seq_tracker_range_t*) opal_list_get_next(item);
|
||||
if(NULL == next_item) {
|
||||
opal_list_append(seq_ids, (opal_list_item_t*) new_item);
|
||||
} else {
|
||||
opal_list_insert_pos(seq_ids,
|
||||
(opal_list_item_t*) next_item,
|
||||
(opal_list_item_t*) new_item);
|
||||
}
|
||||
new_item->seq_id_low = new_item->seq_id_high = seq_id;
|
||||
opal_list_insert_pos(seq_ids,
|
||||
(opal_list_item_t*) item,
|
||||
(opal_list_item_t*) new_item);
|
||||
|
||||
seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) new_item;
|
||||
return;
|
||||
} else {
|
||||
|
@ -96,7 +96,6 @@ int mca_bml_base_send(
|
||||
mca_btl_base_descriptor_t* des,
|
||||
mca_btl_base_tag_t tag)
|
||||
{
|
||||
static int count;
|
||||
des->des_context = bml_btl;
|
||||
if(mca_bml_base_error_count <= 0 && mca_bml_base_error_rate_ceiling > 0) {
|
||||
mca_bml_base_error_count = (int) ((mca_bml_base_error_rate_ceiling * rand())/(RAND_MAX+1.0));
|
||||
|
@ -145,7 +145,8 @@ typedef uint8_t mca_btl_base_tag_t;
|
||||
#define MCA_BTL_FLAGS_SEND_INPLACE 0x8
|
||||
|
||||
/* btl transport is reliable */
|
||||
#define MCA_BTL_FLAGS_RELIABLE 0x10
|
||||
#define MCA_BTL_FLAGS_NEED_ACK 0x10
|
||||
#define MCA_BTL_FLAGS_NEED_CSUM 0xB
|
||||
|
||||
/* Default exclusivity levels */
|
||||
#define MCA_BTL_EXCLUSIVITY_HIGH 64*1024 /* internal loopback */
|
||||
|
@ -222,8 +222,9 @@ int mca_btl_openib_component_open(void)
|
||||
1024*1024, (int*) &mca_btl_openib_module.super.btl_min_rdma_size);
|
||||
mca_btl_openib_param_register_int("max_rdma_size", "maximium rdma size",
|
||||
1024*1024, (int*) &mca_btl_openib_module.super.btl_max_rdma_size);
|
||||
mca_btl_openib_param_register_int("flags", "BTL flags, SEND=0, PUT=1, GET=2",
|
||||
MCA_BTL_FLAGS_PUT, (int*) &mca_btl_openib_module.super.btl_flags);
|
||||
mca_btl_openib_param_register_int("flags", "BTL flags, SEND=1, PUT=2, GET=4",
|
||||
MCA_BTL_FLAGS_PUT | MCA_BTL_FLAGS_NEED_ACK | MCA_BTL_FLAGS_NEED_CSUM,
|
||||
(int*) &mca_btl_openib_module.super.btl_flags);
|
||||
|
||||
mca_btl_openib_param_register_int("bandwidth", "Approximate maximum bandwidth of interconnect",
|
||||
800, (int*) &mca_btl_openib_module.super.btl_bandwidth);
|
||||
|
@ -109,7 +109,7 @@ int mca_btl_self_component_open(void)
|
||||
MCA_BTL_EXCLUSIVITY_HIGH, (int*)&mca_btl_self.btl_exclusivity );
|
||||
mca_base_param_reg_int( (mca_base_component_t*)&mca_btl_self_component, "flags",
|
||||
"Active behavior flags", false, false,
|
||||
MCA_BTL_FLAGS_PUT | MCA_BTL_FLAGS_SEND_INPLACE | MCA_BTL_FLAGS_RELIABLE,
|
||||
MCA_BTL_FLAGS_PUT | MCA_BTL_FLAGS_SEND_INPLACE,
|
||||
(int*)&mca_btl_self.btl_flags );
|
||||
|
||||
/* initialize objects */
|
||||
|
@ -241,3 +241,10 @@ MCA_BML_BASE_BTL_DES_ALLOC(bml_btl, des, \
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
#define MCA_PML_DR_DEBUG_LEVEL 0
|
||||
#define MCA_PML_DR_DEBUG(level,msg) \
|
||||
if(level <= MCA_PML_DR_DEBUG_LEVEL){ \
|
||||
OPAL_OUTPUT(msg); \
|
||||
}
|
||||
|
||||
|
@ -46,17 +46,20 @@ do {
|
||||
if(mca_pml_dr.enable_csum) { \
|
||||
uint16_t csum = opal_csum(hdr, sizeof(type)); \
|
||||
if(hdr->hdr_common.hdr_csum != csum) { \
|
||||
OPAL_OUTPUT((0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n", \
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum)); \
|
||||
MCA_PML_DR_DEBUG(0, (0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n", \
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum)); \
|
||||
return; \
|
||||
} \
|
||||
} \
|
||||
ep = ompi_pointer_array_get_item(&mca_pml_dr.endpoints, hdr->hdr_common.hdr_src); \
|
||||
ep = ompi_pointer_array_get_item(&mca_pml_dr.endpoints, hdr->hdr_common.hdr_src); \
|
||||
assert(ep != NULL); \
|
||||
if(ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) { \
|
||||
OPAL_OUTPUT((0, "%s:%d: dropping duplicate ack", __FILE__, __LINE__)); \
|
||||
return; \
|
||||
MCA_PML_DR_DEBUG(0, (0, "%s:%d: dropping duplicate ack, vfrag ID %d", \
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_vid)); \
|
||||
return; \
|
||||
} \
|
||||
MCA_PML_DR_DEBUG(1, (0, "%s:%d: couldn't find vfrag ID %d \n", \
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_vid)); \
|
||||
} while (0)
|
||||
|
||||
|
||||
@ -117,19 +120,23 @@ void mca_pml_dr_recv_frag_callback(
|
||||
if(mca_pml_dr.enable_csum) {
|
||||
csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t));
|
||||
if(hdr->hdr_common.hdr_csum != csum) {
|
||||
OPAL_OUTPUT((0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum));
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum));
|
||||
return;
|
||||
}
|
||||
}
|
||||
if(hdr->hdr_common.hdr_dst != mca_pml_dr.my_rank ) {
|
||||
OPAL_OUTPUT((0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst));
|
||||
MCA_PML_DR_DEBUG(0, (0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst));
|
||||
return;
|
||||
}
|
||||
ep = ompi_pointer_array_get_item(&mca_pml_dr.endpoints, hdr->hdr_common.hdr_src);
|
||||
assert(ep != NULL);
|
||||
|
||||
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)) {
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: got a duplicate vfrag vfrag id %d\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_vid));
|
||||
|
||||
mca_pml_dr_recv_frag_ack(&ep->base,
|
||||
&hdr->hdr_common,
|
||||
hdr->hdr_match.hdr_src_ptr.pval,
|
||||
@ -139,7 +146,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
ompi_comm = ompi_comm_lookup(hdr->hdr_common.hdr_ctx);
|
||||
if(NULL == ompi_comm ) {
|
||||
OPAL_OUTPUT((0, "%s:%d: invalid communicator %d\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_ctx));
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_ctx));
|
||||
return;
|
||||
}
|
||||
comm = (mca_pml_dr_comm_t*)ompi_comm->c_pml_comm;
|
||||
@ -163,14 +170,14 @@ void mca_pml_dr_recv_frag_callback(
|
||||
csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t));
|
||||
|
||||
if(hdr->hdr_common.hdr_csum != csum) {
|
||||
OPAL_OUTPUT((0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum));
|
||||
MCA_PML_DR_DEBUG(0, (0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum));
|
||||
return;
|
||||
}
|
||||
}
|
||||
if(hdr->hdr_common.hdr_dst != mca_pml_dr.my_rank ) {
|
||||
OPAL_OUTPUT((0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst));
|
||||
MCA_PML_DR_DEBUG(0, (0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst));
|
||||
return;
|
||||
}
|
||||
ep = ompi_pointer_array_get_item(&mca_pml_dr.endpoints, hdr->hdr_common.hdr_src);
|
||||
@ -182,7 +189,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
ompi_comm = ompi_comm_lookup(hdr->hdr_common.hdr_ctx);
|
||||
if(NULL == ompi_comm) {
|
||||
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs_matched, hdr->hdr_common.hdr_vid)) {
|
||||
OPAL_OUTPUT((0, "%s:%d: acking duplicate matched rendezvous from sequence tracker\n", __FILE__, __LINE__));
|
||||
MCA_PML_DR_DEBUG(0, (0, "%s:%d: acking duplicate matched rendezvous from sequence tracker\n", __FILE__, __LINE__));
|
||||
mca_pml_dr_recv_frag_ack(&ep->base,
|
||||
&hdr->hdr_common,
|
||||
hdr->hdr_match.hdr_src_ptr.pval,
|
||||
@ -203,18 +210,19 @@ void mca_pml_dr_recv_frag_callback(
|
||||
recvreq =
|
||||
mca_pml_dr_comm_proc_check_matched(proc, hdr->hdr_common.hdr_vid);
|
||||
if(NULL != recvreq) {
|
||||
OPAL_OUTPUT((0, "%s:%d: acking duplicate matched rendezvous from pending matched\n", __FILE__, __LINE__));
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: acking duplicate matched rendezvous from pending matched vfrag id %d\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_vid));
|
||||
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
|
||||
hdr->hdr_match.hdr_src_ptr, recvreq->req_bytes_received, 1);
|
||||
} else {
|
||||
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs_matched, hdr->hdr_common.hdr_vid)) {
|
||||
OPAL_OUTPUT((0, "%s:%d: acking duplicate matched rendezvous from sequence tracker\n", __FILE__, __LINE__));
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: acking duplicate matched rendezvous from sequence tracker\n", __FILE__, __LINE__));
|
||||
mca_pml_dr_recv_frag_ack(&ep->base,
|
||||
&hdr->hdr_common,
|
||||
hdr->hdr_match.hdr_src_ptr.pval,
|
||||
~(uint64_t) 0, hdr->hdr_rndv.hdr_msg_length);
|
||||
} else {
|
||||
OPAL_OUTPUT((0, "%s:%d: droping duplicate unmatched rendezvous\n", __FILE__, __LINE__));
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: droping duplicate unmatched rendezvous\n", __FILE__, __LINE__));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -246,13 +254,13 @@ void mca_pml_dr_recv_frag_callback(
|
||||
if(mca_pml_dr.enable_csum) {
|
||||
csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
|
||||
if(hdr->hdr_common.hdr_csum != csum) {
|
||||
OPAL_OUTPUT((0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n",
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum));
|
||||
return;
|
||||
}
|
||||
}
|
||||
if(hdr->hdr_common.hdr_dst != mca_pml_dr.my_rank ) {
|
||||
OPAL_OUTPUT((0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n",
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst));
|
||||
return;
|
||||
}
|
||||
@ -261,7 +269,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
|
||||
/* seq_recvs protected by matching lock */
|
||||
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)) {
|
||||
OPAL_OUTPUT((0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__));
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__));
|
||||
mca_pml_dr_recv_frag_ack(&ep->base,
|
||||
&hdr->hdr_common,
|
||||
hdr->hdr_frag.hdr_src_ptr.pval,
|
||||
@ -269,7 +277,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
} else {
|
||||
ompi_comm = ompi_comm_lookup(hdr->hdr_common.hdr_ctx);
|
||||
if(NULL == ompi_comm) {
|
||||
OPAL_OUTPUT((0, "%s:%d: the world as we know it is bad\n", __FILE__, __LINE__));
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: the world as we know it is bad\n", __FILE__, __LINE__));
|
||||
orte_errmgr.abort();
|
||||
}
|
||||
comm = (mca_pml_dr_comm_t*)ompi_comm->c_pml_comm;
|
||||
@ -291,7 +299,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
break;
|
||||
}
|
||||
default:
|
||||
OPAL_OUTPUT((0, "%s:%d: dropping unknown header type\n", __FILE__,__LINE__));
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: dropping unknown header type\n", __FILE__,__LINE__));
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -679,7 +687,7 @@ rematch:
|
||||
if(mca_pml_dr.enable_csum && csum != hdr->hdr_csum) {
|
||||
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)ompi_proc->proc_pml,
|
||||
&hdr->hdr_common, hdr->hdr_src_ptr.pval, 0, 0);
|
||||
OPAL_OUTPUT((0, "%s:%d: received corrupted data 0x%08x != 0x%08x (segments %d length %d)\n",
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: received corrupted data 0x%08x != 0x%08x (segments %d length %d)\n",
|
||||
__FILE__, __LINE__, csum, hdr->hdr_csum, num_segments,
|
||||
segments[0].seg_len - mca_pml_dr_hdr_size(hdr->hdr_common.hdr_type)));
|
||||
MCA_PML_DR_RECV_FRAG_RETURN(frag);
|
||||
@ -715,7 +723,7 @@ rematch:
|
||||
if(mca_pml_dr.enable_csum && csum != hdr->hdr_csum) {
|
||||
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)ompi_proc->proc_pml,
|
||||
&hdr->hdr_common, hdr->hdr_src_ptr.pval, 0, 0);
|
||||
OPAL_OUTPUT((0, "%s:%d: received corrupted data 0x%08x != 0x%08x\n",
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: received corrupted data 0x%08x != 0x%08x\n",
|
||||
__FILE__, __LINE__, csum, hdr->hdr_csum));
|
||||
MCA_PML_DR_RECV_FRAG_RETURN(frag);
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
@ -724,6 +732,7 @@ rematch:
|
||||
opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag);
|
||||
}
|
||||
|
||||
|
||||
ompi_seq_tracker_insert(&ep->seq_recvs, hdr->hdr_common.hdr_vid);
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
|
||||
@ -734,6 +743,9 @@ rematch:
|
||||
|
||||
/* if buffered a short message - go ahead and ack */
|
||||
else if (hdr->hdr_common.hdr_type == MCA_PML_DR_HDR_TYPE_MATCH) {
|
||||
MCA_PML_DR_DEBUG(1,(0, "%s:%d: received short message, acking now vfrag id: %d\n",
|
||||
__FILE__, __LINE__, hdr->hdr_common.hdr_vid));
|
||||
|
||||
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)ompi_proc->proc_pml,
|
||||
&hdr->hdr_common, hdr->hdr_src_ptr.pval, 1, 0);
|
||||
}
|
||||
@ -792,11 +804,16 @@ void mca_pml_dr_recv_frag_ack(
|
||||
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
|
||||
des->des_cbfunc = mca_pml_dr_ctl_completion;
|
||||
|
||||
MCA_PML_DR_DEBUG(1,(0, "%s:%d: sending ack, vfrag ID %d",
|
||||
__FILE__, __LINE__, hdr->hdr_vid));
|
||||
|
||||
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
|
||||
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
mca_bml_base_free(bml_btl, des);
|
||||
goto retry;
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
/* queue request to retry later */
|
||||
|
@ -45,15 +45,17 @@ if(mca_pml_dr.enable_csum && csum != hdr->hdr_match.hdr_csum) { \
|
||||
&hdr->hdr_common, \
|
||||
hdr->hdr_match.hdr_src_ptr.pval, \
|
||||
0, 0); \
|
||||
OPAL_OUTPUT((0, "%s:%d: [rank %d -> rank %d] " \
|
||||
"data checksum failed 0x%08x != 0x%08x\n", \
|
||||
__FILE__, __LINE__, \
|
||||
hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst, \
|
||||
csum, hdr->hdr_match.hdr_csum)); \
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d: [rank %d -> rank %d] " \
|
||||
"data checksum failed 0x%08x != 0x%08x\n", \
|
||||
__FILE__, __LINE__, \
|
||||
hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst, \
|
||||
csum, hdr->hdr_match.hdr_csum)); \
|
||||
bytes_received = bytes_delivered = 0; \
|
||||
} else if (recvreq->req_acked == false) { \
|
||||
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, \
|
||||
hdr->hdr_match.hdr_src_ptr, bytes_received, 1); \
|
||||
MCA_PML_DR_DEBUG(1,(0, "%s:%d: sending ack, vfrag ID %d", \
|
||||
__FILE__, __LINE__, recvreq->req_vfrag0.vf_id)); \
|
||||
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, \
|
||||
hdr->hdr_match.hdr_src_ptr, bytes_received, 1); \
|
||||
}
|
||||
|
||||
|
||||
@ -241,6 +243,7 @@ void mca_pml_dr_recv_request_progress(
|
||||
bytes_delivered,
|
||||
csum);
|
||||
MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum,bytes_received);
|
||||
|
||||
break;
|
||||
|
||||
case MCA_PML_DR_HDR_TYPE_RNDV:
|
||||
@ -258,6 +261,7 @@ void mca_pml_dr_recv_request_progress(
|
||||
bytes_delivered,
|
||||
csum);
|
||||
MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum,bytes_received);
|
||||
|
||||
break;
|
||||
|
||||
case MCA_PML_DR_HDR_TYPE_FRAG:
|
||||
@ -265,6 +269,8 @@ void mca_pml_dr_recv_request_progress(
|
||||
MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq, &hdr->hdr_frag, vfrag);
|
||||
if(vfrag->vf_ack & bit) {
|
||||
if(vfrag->vf_ack == vfrag->vf_mask) {
|
||||
MCA_PML_DR_DEBUG(1,(0, "%s:%d: sending ack, vfrag ID %d",
|
||||
__FILE__, __LINE__, vfrag->vf_id));
|
||||
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
|
||||
hdr->hdr_frag.hdr_src_ptr,
|
||||
vfrag->vf_size,
|
||||
@ -298,11 +304,14 @@ void mca_pml_dr_recv_request_progress(
|
||||
/* we have received all the pieces of the vfrag, ack
|
||||
everything that passed the checksum */
|
||||
ompi_seq_tracker_insert(&recvreq->req_endpoint->seq_recvs, vfrag->vf_id);
|
||||
MCA_PML_DR_DEBUG(1,(0, "%s:%d: sending ack, vfrag ID %d",
|
||||
__FILE__, __LINE__, vfrag->vf_id));
|
||||
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
|
||||
hdr->hdr_frag.hdr_src_ptr, vfrag->vf_size, vfrag->vf_mask);
|
||||
hdr->hdr_frag.hdr_src_ptr,
|
||||
vfrag->vf_size, vfrag->vf_mask);
|
||||
}
|
||||
} else {
|
||||
OPAL_OUTPUT((0, "%s:%d received corrupted fragment 0x%08x != 0x%08x\n",
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d received corrupted fragment 0x%08x != 0x%08x\n",
|
||||
__FILE__,__LINE__,csum,hdr->hdr_frag.hdr_frag_csum));
|
||||
bytes_received = bytes_delivered = 0;
|
||||
}
|
||||
|
@ -388,7 +388,7 @@ do { \
|
||||
(vfrag)->vf_mask = (((uint64_t)1 << (hdr)->hdr_vlen)-1); \
|
||||
} \
|
||||
opal_list_append(&(recvreq)->req_vfrags, (opal_list_item_t*)vfrag); \
|
||||
(recvreq)->req_vfrag = vfrag; \
|
||||
(recvreq)->req_vfrag = vfrag; \
|
||||
} \
|
||||
} \
|
||||
OPAL_THREAD_UNLOCK(recvreq->req_mutex); \
|
||||
|
@ -179,6 +179,9 @@ static void mca_pml_dr_match_completion(
|
||||
|
||||
/* update statistics and complete */
|
||||
sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed;
|
||||
MCA_PML_DR_DEBUG(1, (0, "%s:%d: inserting vid %d into sequence tracker\n",
|
||||
__FILE__, __LINE__, vfrag->vf_id));
|
||||
|
||||
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
|
||||
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
|
||||
@ -240,6 +243,9 @@ static void mca_pml_dr_rndv_completion(
|
||||
}
|
||||
|
||||
/* update statistics and complete */
|
||||
MCA_PML_DR_DEBUG(1, (0, "%s:%d: inserting vid %d into sequence tracker\n",
|
||||
__FILE__, __LINE__, vfrag->vf_id));
|
||||
|
||||
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
|
||||
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
|
||||
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
@ -308,6 +314,9 @@ static void mca_pml_dr_frag_completion(
|
||||
}
|
||||
|
||||
/* record vfrag id to drop duplicate acks */
|
||||
MCA_PML_DR_DEBUG(1, (0, "%s:%d: inserting vid %d into sequence tracker\n",
|
||||
__FILE__, __LINE__, vfrag->vf_id));
|
||||
|
||||
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
|
||||
|
||||
/* return this vfrag */
|
||||
@ -963,6 +972,8 @@ void mca_pml_dr_send_request_match_ack(
|
||||
|
||||
/* update statistics */
|
||||
sendreq->req_bytes_delivered = vfrag->vf_size;
|
||||
MCA_PML_DR_DEBUG(1, (0, "%s:%d: inserting vid %d into sequence tracker\n",
|
||||
__FILE__, __LINE__, vfrag->vf_id));
|
||||
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
|
||||
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
}
|
||||
@ -1029,6 +1040,9 @@ void mca_pml_dr_send_request_rndv_ack(
|
||||
}
|
||||
|
||||
/* stash the vfrag id for duplicate acks.. */
|
||||
MCA_PML_DR_DEBUG(1, (0, "%s:%d: inserting vid %d into sequence tracker\n",
|
||||
__FILE__, __LINE__, vfrag->vf_id));
|
||||
|
||||
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
@ -1074,7 +1088,7 @@ void mca_pml_dr_send_request_frag_ack(
|
||||
|
||||
/* need to retransmit? */
|
||||
if(vfrag->vf_ack != vfrag->vf_mask) {
|
||||
opal_output(0, "got a vfrag NACK, retransmitting %x\n", ~vfrag->vf_ack);
|
||||
MCA_PML_DR_DEBUG(0,(0, "got a vfrag NACK, retransmitting %x\n", ~vfrag->vf_ack));
|
||||
MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq,vfrag);
|
||||
schedule = true;
|
||||
|
||||
@ -1086,6 +1100,9 @@ void mca_pml_dr_send_request_frag_ack(
|
||||
assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed);
|
||||
|
||||
/* stash the vfid for duplicate acks.. */
|
||||
MCA_PML_DR_DEBUG(1,(0, "%s:%d: inserting vid %d into sequence tracker\n",
|
||||
__FILE__, __LINE__, vfrag->vf_id));
|
||||
|
||||
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
|
||||
/* return vfrag */
|
||||
MCA_PML_DR_VFRAG_RETURN(vfrag);
|
||||
|
@ -119,7 +119,7 @@ do {
|
||||
(sendreq)->req_send.req_base.req_ompi.req_status._cancelled = 0; \
|
||||
\
|
||||
/* initialize datatype convertor for this request */ \
|
||||
if(count > 0) { \
|
||||
/* if(count > 0) { */ \
|
||||
/* We will create a convertor specialized for the */ \
|
||||
/* remote architecture and prepared with the datatype. */ \
|
||||
ompi_convertor_copy_and_prepare_for_send( \
|
||||
@ -131,9 +131,9 @@ do {
|
||||
&(sendreq)->req_send.req_convertor ); \
|
||||
ompi_convertor_get_packed_size(&(sendreq)->req_send.req_convertor, \
|
||||
&((sendreq)->req_send.req_bytes_packed) ); \
|
||||
} else { \
|
||||
(sendreq)->req_send.req_bytes_packed = 0; \
|
||||
} \
|
||||
/* } else { */ \
|
||||
/* (sendreq)->req_send.req_bytes_packed = 0; */ \
|
||||
/* } */ \
|
||||
} while(0)
|
||||
|
||||
|
||||
@ -308,7 +308,7 @@ do {
|
||||
vfrag->vf_offset = sendreq->req_send_offset; \
|
||||
vfrag->vf_max_send_size = max_send_size; \
|
||||
vfrag->vf_send.pval = sendreq; \
|
||||
sendreq->req_vfrag = vfrag; \
|
||||
sendreq->req_vfrag = vfrag; \
|
||||
} while(0)
|
||||
|
||||
/*
|
||||
@ -357,28 +357,31 @@ do { \
|
||||
* Requeue first fragment of message for retransmission
|
||||
*/
|
||||
|
||||
#define MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag) \
|
||||
do { \
|
||||
mca_btl_base_descriptor_t *des_old, *des_new; \
|
||||
OPAL_THREAD_ADD64(&vfrag->vf_pending,1); \
|
||||
OPAL_OUTPUT((0, "%s:%d:%s: retransmitting eager\n", __FILE__, __LINE__, __func__)); \
|
||||
assert(sendreq->req_descriptor->des_src != NULL); \
|
||||
\
|
||||
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); \
|
||||
OPAL_THREAD_ADD64(&(vfrag)->vf_pending,1); \
|
||||
(vfrag)->vf_state &= ~MCA_PML_DR_VFRAG_NACKED; \
|
||||
\
|
||||
des_old = sendreq->req_descriptor; \
|
||||
mca_bml_base_alloc(vfrag->bml_btl, &des_new, des_old->des_src->seg_len);\
|
||||
sendreq->req_descriptor = des_new; \
|
||||
memcpy(des_new->des_src->seg_addr.pval, \
|
||||
des_old->des_src->seg_addr.pval, \
|
||||
des_old->des_src->seg_len); \
|
||||
des_new->des_flags = des_old->des_flags; \
|
||||
des_new->des_cbdata = des_old->des_cbdata; \
|
||||
des_new->des_cbfunc = des_old->des_cbfunc; \
|
||||
mca_bml_base_send(vfrag->bml_btl, des_new, MCA_BTL_TAG_PML); \
|
||||
} while(0)
|
||||
#define MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag) \
|
||||
do { \
|
||||
mca_btl_base_descriptor_t *des_old, *des_new; \
|
||||
OPAL_THREAD_ADD64(&vfrag->vf_pending,1); \
|
||||
MCA_PML_DR_DEBUG(0, \
|
||||
(0, "%s:%d:%s: retransmitting eager\n", \
|
||||
__FILE__, __LINE__, __func__)); \
|
||||
assert(sendreq->req_descriptor->des_src != NULL); \
|
||||
\
|
||||
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); \
|
||||
OPAL_THREAD_ADD64(&(vfrag)->vf_pending,1); \
|
||||
(vfrag)->vf_state &= ~MCA_PML_DR_VFRAG_NACKED; \
|
||||
\
|
||||
des_old = sendreq->req_descriptor; \
|
||||
mca_bml_base_alloc(vfrag->bml_btl, &des_new, \
|
||||
des_old->des_src->seg_len); \
|
||||
sendreq->req_descriptor = des_new; \
|
||||
memcpy(des_new->des_src->seg_addr.pval, \
|
||||
des_old->des_src->seg_addr.pval, \
|
||||
des_old->des_src->seg_len); \
|
||||
des_new->des_flags = des_old->des_flags; \
|
||||
des_new->des_cbdata = des_old->des_cbdata; \
|
||||
des_new->des_cbfunc = des_old->des_cbfunc; \
|
||||
mca_bml_base_send(vfrag->bml_btl, des_new, MCA_BTL_TAG_PML); \
|
||||
} while(0)
|
||||
|
||||
/*
|
||||
* Requeue first fragment of message for retransmission
|
||||
@ -392,7 +395,8 @@ do { \
|
||||
mca_btl_base_descriptor_t *des_old, *des_new; \
|
||||
mca_pml_dr_hdr_t *hdr; \
|
||||
\
|
||||
opal_output(0, "%s:%d:%s: (re)transmitting rndv probe\n", __FILE__, __LINE__, __func__); \
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d:%s: (re)transmitting rndv probe\n", \
|
||||
__FILE__, __LINE__, __func__)); \
|
||||
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); \
|
||||
OPAL_THREAD_ADD64(&vfrag->vf_pending,1); \
|
||||
(vfrag)->vf_state &= ~MCA_PML_DR_VFRAG_NACKED; \
|
||||
|
@ -70,7 +70,7 @@ static void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
|
||||
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
|
||||
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
|
||||
|
||||
OPAL_OUTPUT((0, "%s:%d:%s: wdog timeout: 0x%08x", __FILE__, __LINE__, __func__, vfrag));
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d:%s: wdog timeout: 0x%08x", __FILE__, __LINE__, __func__, vfrag));
|
||||
|
||||
/* update pending counts */
|
||||
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-vfrag->vf_pending);
|
||||
@ -80,7 +80,7 @@ static void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
|
||||
if(++vfrag->vf_wdog_cnt == mca_pml_dr.wdog_retry_max) {
|
||||
/* declare btl dead */
|
||||
opal_output(0, "%s:%d:%s: failing BTL: %s", __FILE__, __LINE__, __func__,
|
||||
vfrag->bml_btl->btl->btl_component->btl_version.mca_component_name);
|
||||
vfrag->bml_btl->btl->btl_component->btl_version.mca_component_name);
|
||||
mca_bml.bml_del_btl(vfrag->bml_btl->btl);
|
||||
mca_pml_dr_vfrag_reset(vfrag);
|
||||
}
|
||||
@ -106,7 +106,8 @@ static void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
|
||||
static void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data)
|
||||
{
|
||||
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
|
||||
OPAL_OUTPUT((0, "%s:%d:%s: ack timeout: %0x08x", __FILE__, __LINE__, __func__, vfrag));
|
||||
MCA_PML_DR_DEBUG(0,(0, "%s:%d:%s: ack timeout: %0x08x",
|
||||
__FILE__, __LINE__, __func__, vfrag));
|
||||
|
||||
/* stop ack timer */
|
||||
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user