1
1

more dr work, add destination check on all receives, misc

This commit was SVN r9317.
Этот коммит содержится в:
Galen Shipman 2006-03-16 19:38:21 +00:00
родитель 66edc64be0
Коммит ff75de8c52
9 изменённых файлов: 334 добавлений и 155 удалений

Просмотреть файл

@ -36,7 +36,7 @@ static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc)
OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t);
OBJ_CONSTRUCT(&proc->unexpected_frags, opal_list_t);
OBJ_CONSTRUCT(&proc->acked_vfrags, opal_list_t);
proc->acked_vfrags_ptr = NULL;
proc->acked_vfrags_current = NULL;
}
@ -99,6 +99,7 @@ int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_c
dr_comm->procs[i].ompi_proc = ompi_comm->c_remote_group->grp_proc_pointers[i];
}
dr_comm->num_procs = size;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -31,6 +31,16 @@ extern "C" {
#endif
struct mca_pml_dr_acked_item_t{
opal_list_item_t super;
int32_t vfrag_id_high;
int32_t vfrag_id_low;
};
typedef struct mca_pml_dr_acked_item_t mca_pml_dr_acked_item_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_acked_item_t);
struct mca_pml_dr_comm_proc_t {
opal_object_t super;
uint16_t expected_sequence; /**< send message sequence number - receiver side */
@ -45,19 +55,10 @@ struct mca_pml_dr_comm_proc_t {
opal_list_t unexpected_frags; /**< unexpected fragment queues */
ompi_proc_t* ompi_proc; /**< back pointer to ompi_proc_t */
opal_list_t acked_vfrags; /**< list of vfrags id's that have been acked */
opal_list_item_t* acked_vfrags_ptr; /**< a pointer to the last place we were in the list */
mca_pml_dr_acked_item_t* acked_vfrags_current; /**< a pointer to the last place we were in the list */
};
typedef struct mca_pml_dr_comm_proc_t mca_pml_dr_comm_proc_t;
struct mca_pml_dr_acked_item_t{
opal_list_item_t super;
int32_t vfrag_id_high;
int32_t vfrag_id_low;
};
typedef struct mca_pml_dr_acked_item_t mca_pml_dr_acked_item_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_acked_item_t);
/**
* Cached on ompi_communicator_t to hold queues/state
* used by the PML<->PTL interface for matching logic.
@ -89,14 +90,14 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_comm_t);
OMPI_DECLSPEC extern int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_comm);
static inline bool mca_pml_dr_comm_proc_check_acked(mca_pml_dr_acked_item_t** current, int32_t vfrag_id) {
mca_pml_dr_acked_item_t* item = *current;
static inline bool mca_pml_dr_comm_proc_check_acked(mca_pml_dr_comm_proc_t* dr_comm_proc, int32_t vfrag_id) {
mca_pml_dr_acked_item_t* item = dr_comm_proc->acked_vfrags_current;
int8_t direction = 0; /* 1 is next, -1 is previous */
while(true) {
if(NULL == item) {
return false;
} else if(item->vfrag_id_high >= vfrag_id && item->vfrag_id_low <= vfrag_id) {
*current = (mca_pml_dr_acked_item_t*) item;
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) item;
return true;
} else if(vfrag_id > item->vfrag_id_high && direction != -1) {
direction = 1;
@ -110,10 +111,10 @@ static inline bool mca_pml_dr_comm_proc_check_acked(mca_pml_dr_acked_item_t** cu
}
}
static inline void mca_pml_dr_comm_proc_set_acked(opal_list_t* acked_vfrags,
mca_pml_dr_acked_item_t** current,
static inline void mca_pml_dr_comm_proc_set_acked(mca_pml_dr_comm_proc_t* dr_comm_proc,
int32_t vfrag_id) {
mca_pml_dr_acked_item_t* item = *current;
opal_list_t* acked_vfrags = &dr_comm_proc->acked_vfrags;
mca_pml_dr_acked_item_t* item = dr_comm_proc->acked_vfrags_current;
int8_t direction = 0; /* 1 is next, -1 is previous */
mca_pml_dr_acked_item_t *new_item, *next_item, *prev_item;
while(true) {
@ -121,18 +122,18 @@ static inline void mca_pml_dr_comm_proc_set_acked(opal_list_t* acked_vfrags,
new_item = OBJ_NEW(mca_pml_dr_acked_item_t);
new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id;
opal_list_append(acked_vfrags, (opal_list_item_t*) new_item);
*current = (mca_pml_dr_acked_item_t*) new_item;
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) new_item;
return;
} else if( item == (mca_pml_dr_acked_item_t*) &acked_vfrags->opal_list_head ) {
new_item = OBJ_NEW(mca_pml_dr_acked_item_t);
new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id;
opal_list_prepend(acked_vfrags, (opal_list_item_t*) new_item);
*current = (mca_pml_dr_acked_item_t*) new_item;
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) new_item;
return;
} else if(item->vfrag_id_high >= vfrag_id && item->vfrag_id_low <= vfrag_id ) {
*current = (mca_pml_dr_acked_item_t*) item;
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) item;
return;
} else if((item->vfrag_id_high + 1) == vfrag_id) {
@ -146,7 +147,7 @@ static inline void mca_pml_dr_comm_proc_set_acked(opal_list_t* acked_vfrags,
} else {
item->vfrag_id_high = vfrag_id;
}
*current = (mca_pml_dr_acked_item_t*) item;
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) item;
return;
} else if((item->vfrag_id_low - 1) == vfrag_id) {
@ -160,7 +161,7 @@ static inline void mca_pml_dr_comm_proc_set_acked(opal_list_t* acked_vfrags,
} else {
item->vfrag_id_low = vfrag_id;
}
*current = (mca_pml_dr_acked_item_t*) item;
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) item;
return;
} else if(vfrag_id > item->vfrag_id_high ) {
@ -172,7 +173,7 @@ static inline void mca_pml_dr_comm_proc_set_acked(opal_list_t* acked_vfrags,
opal_list_insert_pos(acked_vfrags,
(opal_list_item_t*) item,
(opal_list_item_t*) new_item);
*current = (mca_pml_dr_acked_item_t*) new_item;
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) new_item;
return;
} else {
direction = 1;
@ -190,7 +191,7 @@ static inline void mca_pml_dr_comm_proc_set_acked(opal_list_t* acked_vfrags,
(opal_list_item_t*) next_item,
(opal_list_item_t*) new_item);
}
*current = (mca_pml_dr_acked_item_t*) new_item;
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) new_item;
return;
} else {
direction = -1;

Просмотреть файл

@ -51,6 +51,10 @@ struct mca_pml_dr_common_hdr_t {
uint8_t hdr_type; /**< type of envelope */
uint8_t hdr_flags; /**< flags indicating how fragment should be processed */
uint16_t hdr_csum; /**< checksum over header */
int32_t hdr_dst; /**< destination rank */
int32_t hdr_src; /**< rank of sender */
uint16_t hdr_ctx; /**< communicator index */
uint32_t hdr_vid; /**< vfrag id */
};
typedef struct mca_pml_dr_common_hdr_t mca_pml_dr_common_hdr_t;
@ -63,10 +67,8 @@ typedef struct mca_pml_dr_common_hdr_t mca_pml_dr_common_hdr_t;
*/
struct mca_pml_dr_match_hdr_t {
mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */
uint32_t hdr_vid; /**< vfrag id */
uint16_t hdr_ctx; /**< communicator index */
uint16_t hdr_seq; /**< message sequence number */
int32_t hdr_src; /**< source rank */
int32_t hdr_tag; /**< user tag */
uint32_t hdr_csum; /**< checksum over data */
ompi_ptr_t hdr_src_ptr; /**< pointer to source vfrag - returned in ack */
@ -119,7 +121,6 @@ typedef struct mca_pml_dr_rendezvous_hdr_t mca_pml_dr_rendezvous_hdr_t;
*/
struct mca_pml_dr_frag_hdr_t {
mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */
uint32_t hdr_vid; /**< virtual frag id */
uint16_t hdr_vlen; /**< length of entire vfrag */
uint16_t hdr_frag_idx; /**< bit index of this frag w/in vfrag */
uint32_t hdr_frag_csum; /**< checksum over data */
@ -148,7 +149,6 @@ typedef struct mca_pml_dr_frag_hdr_t mca_pml_dr_frag_hdr_t;
struct mca_pml_dr_ack_hdr_t {
mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */
uint32_t hdr_vid; /**< virtual fragment id */
uint64_t hdr_vmask; /**< acknowledged frags */
ompi_ptr_t hdr_src_ptr; /**< source vfrag */
ompi_ptr_t hdr_dst_ptr; /**< matched receive request */

Просмотреть файл

@ -53,12 +53,6 @@ OBJ_CLASS_INSTANCE(
NULL
);
static void mca_pml_dr_recv_frag_unmatched_ack(
mca_pml_dr_match_hdr_t* hdr,
ompi_proc_t* ompi_proc,
uint8_t flags,
uint8_t mask);
/*
* Release resources.
*/
@ -86,7 +80,9 @@ void mca_pml_dr_recv_frag_callback(
{
mca_btl_base_segment_t* segments = des->des_dst;
mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval;
uint32_t csum = 1;
size_t hdr_size;
bool duplicate = false;
if(segments->seg_len < sizeof(mca_pml_dr_common_hdr_t)) {
return;
}
@ -98,7 +94,18 @@ void mca_pml_dr_recv_frag_callback(
assert(0);
return;
}
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
if(hdr->hdr_common.hdr_dst != (ompi_comm_lookup(hdr->hdr_common.hdr_ctx))->c_my_rank ) {
assert(0);
return;
}
MCA_PML_DR_RECV_FRAG_CHECK_DUP(hdr, duplicate);
if(duplicate) {
assert(0);
return;
} else {
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
}
break;
}
case MCA_PML_DR_HDR_TYPE_MATCH_ACK:
@ -107,6 +114,10 @@ void mca_pml_dr_recv_frag_callback(
assert(0);
return;
}
if(hdr->hdr_common.hdr_dst != (ompi_comm_lookup(hdr->hdr_common.hdr_ctx))->c_my_rank ) {
assert(0);
return;
}
mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack);
break;
}
@ -116,7 +127,26 @@ void mca_pml_dr_recv_frag_callback(
assert(0);
return;
}
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
if(hdr->hdr_common.hdr_dst != (ompi_comm_lookup(hdr->hdr_common.hdr_ctx))->c_my_rank ) {
assert(0);
return;
}
hdr_size = mca_pml_dr_hdr_size(hdr->hdr_common.hdr_type);
COMPUTE_SPECIFIC_CHECKSUM((void*)((unsigned char*)segments->seg_addr.pval + hdr_size),
segments->seg_len - hdr_size, csum);
if(csum != hdr->hdr_match.hdr_csum) {
/* drop it on the floor */
assert(0);
return;
}
MCA_PML_DR_RECV_FRAG_CHECK_DUP(hdr, duplicate);
if(duplicate) {
assert(0);
return;
} else {
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
}
break;
}
case MCA_PML_DR_HDR_TYPE_RNDV_ACK:
@ -125,19 +155,43 @@ void mca_pml_dr_recv_frag_callback(
assert(0);
return;
}
if(hdr->hdr_common.hdr_dst != (ompi_comm_lookup(hdr->hdr_common.hdr_ctx))->c_my_rank ) {
assert(0);
return;
}
mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack);
break;
}
case MCA_PML_DR_HDR_TYPE_FRAG:
{
mca_pml_dr_recv_request_t* recvreq;
mca_pml_dr_recv_request_t* recvreq;
mca_pml_dr_comm_proc_t* comm_proc;
if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t))) {
assert(0);
return;
}
if(hdr->hdr_common.hdr_dst != (ompi_comm_lookup(hdr->hdr_common.hdr_ctx))->c_my_rank ) {
assert(0);
return;
}
recvreq = hdr->hdr_frag.hdr_dst_ptr.pval;
mca_pml_dr_recv_request_progress(recvreq,btl,segments,des->des_dst_cnt);
comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs +
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
if(mca_pml_dr_comm_proc_check_acked(comm_proc,
hdr->hdr_common.hdr_vid)) {
mca_pml_dr_recv_frag_send_ack(comm_proc->ompi_proc,
&hdr->hdr_common,
hdr->hdr_frag.hdr_src_ptr,
~(uint64_t) 0);
assert(0);
return;
} else {
mca_pml_dr_recv_request_progress(recvreq,btl,segments,des->des_dst_cnt);
}
break;
}
case MCA_PML_DR_HDR_TYPE_FRAG_ACK:
{
@ -145,6 +199,10 @@ void mca_pml_dr_recv_frag_callback(
assert(0);
return;
}
if(hdr->hdr_common.hdr_dst != (ompi_comm_lookup(hdr->hdr_common.hdr_ctx))->c_my_rank ) {
assert(0);
return;
}
mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack);
break;
}
@ -458,13 +516,14 @@ bool mca_pml_dr_recv_frag_match(
ompi_proc_t* ompi_proc;
int rc;
uint32_t csum = 1;
size_t hdr_size;
/* communicator pointer */
comm_ptr=ompi_comm_lookup(hdr->hdr_ctx);
comm_ptr=ompi_comm_lookup(hdr->hdr_common.hdr_ctx);
comm=(mca_pml_dr_comm_t *)comm_ptr->c_pml_comm;
/* source sequence number */
frag_msg_seq = hdr->hdr_seq;
proc = comm->procs + hdr->hdr_src;
proc = comm->procs + hdr->hdr_common.hdr_src;
ompi_proc = proc->ompi_proc;
/* get next expected message sequence number - if threaded
@ -530,8 +589,21 @@ rematch:
}
} else {
/* if no match found, place on unexpected queue */
/* if no match found, verify csum, if pass place on unexpected queue */
mca_pml_dr_recv_frag_t* frag;
/* nack immediately if need be */
hdr_size = mca_pml_dr_hdr_size(hdr->hdr_common.hdr_type);
COMPUTE_SPECIFIC_CHECKSUM((void*)((unsigned char*)segments->seg_addr.pval + hdr_size),
segments->seg_len - hdr_size, csum);
if(csum != hdr->hdr_csum) {
mca_pml_dr_recv_frag_send_ack(ompi_proc,
&hdr->hdr_common,
hdr->hdr_src_ptr,
0);
assert(0);
return false;
}
MCA_PML_DR_RECV_FRAG_ALLOC(frag, rc);
if(OMPI_SUCCESS != rc) {
OPAL_THREAD_UNLOCK(&comm->matching_lock);
@ -554,10 +626,23 @@ rematch:
/*
* This message comes after the next expected, so it
* is ahead of sequence. Save it for later.
* is ahead of sequence. If passes csum save it for later.
*/
mca_pml_dr_recv_frag_t* frag;
hdr_size = mca_pml_dr_hdr_size(hdr->hdr_common.hdr_type);
COMPUTE_SPECIFIC_CHECKSUM((void*)((unsigned char*)segments->seg_addr.pval + hdr_size),
segments->seg_len - hdr_size, csum);
if(csum != hdr->hdr_csum) {
mca_pml_dr_recv_frag_send_ack(ompi_proc,
&hdr->hdr_common,
hdr->hdr_src_ptr,
0);
assert(0);
return false;
}
MCA_PML_DR_RECV_FRAG_ALLOC(frag, rc);
if(OMPI_SUCCESS != rc) {
OPAL_THREAD_UNLOCK(&comm->matching_lock);
@ -572,16 +657,15 @@ rematch:
/* release matching lock before processing fragment */
if(match != NULL) {
mca_pml_dr_recv_request_progress(match,btl,segments,num_segments);
} else {
size_t hdr_size = mca_pml_dr_hdr_size(hdr->hdr_common.hdr_type);
csum = OMPI_CSUM((void*)((unsigned char*)segments->seg_addr.pval + hdr_size), segments->seg_len - hdr_size);
assert(csum == hdr->hdr_csum);
mca_pml_dr_recv_frag_unmatched_ack(hdr,
ompi_proc,
hdr->hdr_common.hdr_type,
csum == hdr->hdr_csum ? 1 : 0);
}
else {
/* no need to csum, if it wasn't matched and
csum failed, we already nack'd it */
mca_pml_dr_recv_frag_send_ack(ompi_proc,
&hdr->hdr_common,
hdr->hdr_src_ptr,
1);
}
if(additional_match) {
opal_list_item_t* item;
while(NULL != (item = opal_list_remove_first(&additional_matches))) {
@ -595,11 +679,11 @@ rematch:
static void mca_pml_dr_recv_frag_unmatched_ack(
mca_pml_dr_match_hdr_t* hdr,
ompi_proc_t* ompi_proc,
uint8_t type,
uint8_t mask)
void mca_pml_dr_recv_frag_send_ack(
ompi_proc_t* ompi_proc,
mca_pml_dr_common_hdr_t* hdr,
ompi_ptr_t src_ptr,
uint64_t mask)
{
mca_bml_base_endpoint_t* bml_endpoint = NULL;
mca_btl_base_descriptor_t* des;
@ -619,11 +703,13 @@ static void mca_pml_dr_recv_frag_unmatched_ack(
/* fill out header */
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | type;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_type;
ack->hdr_common.hdr_flags = 0;
ack->hdr_vid = hdr->hdr_vid;
ack->hdr_common.hdr_dst = hdr->hdr_src;
ack->hdr_common.hdr_vid = hdr->hdr_vid;
ack->hdr_common.hdr_ctx = hdr->hdr_ctx;
ack->hdr_vmask = mask;
ack->hdr_src_ptr = hdr->hdr_src_ptr;
ack->hdr_src_ptr = src_ptr;
assert(ack->hdr_src_ptr.pval);
ack->hdr_dst_ptr.pval = NULL;
ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t));
@ -638,14 +724,12 @@ static void mca_pml_dr_recv_frag_unmatched_ack(
goto retry;
}
/* mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_vid); */
return;
/* queue request to retry later */
retry:
MCA_PML_DR_RECV_FRAG_ALLOC(frag,rc);
frag->hdr.hdr_match = *hdr;
/* frag->hdr.hdr_match = *hdr; */
frag->num_segments = 0;
opal_list_append(&mca_pml_dr.acks_pending, (opal_list_item_t*)frag);
}
@ -728,7 +812,7 @@ rematch:
* look only at "specific" receives, or "wild" receives,
* or if we need to traverse both sets at the same time.
*/
proc = comm->procs + hdr->hdr_src;
proc = comm->procs + hdr->hdr_common.hdr_src;
if (opal_list_get_size(&proc->specific_receives) == 0 ) {
/*
* There are only wild irecvs, so specialize the algorithm.

Просмотреть файл

@ -49,8 +49,36 @@ struct mca_pml_dr_recv_frag_t {
};
typedef struct mca_pml_dr_recv_frag_t mca_pml_dr_recv_frag_t;
void mca_pml_dr_recv_frag_send_ack(ompi_proc_t* ompi_proc,
mca_pml_dr_common_hdr_t* hdr,
ompi_ptr_t src_ptr,
uint64_t mask);
OBJ_CLASS_DECLARATION(mca_pml_dr_recv_frag_t);
#define MCA_PML_DR_RECV_FRAG_CHECK_DUP(hdr, dup) \
do { \
ompi_communicator_t *comm_ptr; \
mca_pml_dr_comm_t *comm; \
mca_pml_dr_comm_proc_t *proc; \
comm_ptr = ompi_comm_lookup(hdr->hdr_common.hdr_ctx); \
comm = (mca_pml_dr_comm_t*) comm_ptr->c_pml_comm; \
proc = comm->procs + hdr->hdr_common.hdr_src; \
if(mca_pml_dr_comm_proc_check_acked(proc, \
hdr->hdr_common.hdr_vid)) { \
mca_pml_dr_recv_frag_send_ack(proc->ompi_proc, \
&hdr->hdr_common, \
hdr->hdr_match.hdr_src_ptr, \
1); \
dup = true; \
} else { \
dup = false; \
} \
} while (0)
#define MCA_PML_DR_RECV_FRAG_ALLOC(frag,rc) \
do { \

Просмотреть файл

@ -136,10 +136,13 @@ static void mca_pml_dr_recv_request_ack(
mca_pml_dr_ack_hdr_t* ack;
int rc;
mca_pml_dr_comm_proc_t* comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs +
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
/* if this hasn't been initialized yet - this is a synchronous send */
if(NULL == proc) {
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(
recvreq->req_recv.req_base.req_comm, hdr->hdr_src);
recvreq->req_recv.req_base.req_comm, hdr->hdr_common.hdr_src);
proc = recvreq->req_proc = ompi_proc;
}
bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml;
@ -155,7 +158,10 @@ static void mca_pml_dr_recv_request_ack(
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_common.hdr_type;
ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCHED;
ack->hdr_vid = hdr->hdr_vid;
ack->hdr_common.hdr_dst = recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;;
ack->hdr_common.hdr_src = recvreq->req_recv.req_base.req_comm->c_my_rank;
ack->hdr_common.hdr_ctx = recvreq->req_recv.req_base.req_comm->c_contextid;
ack->hdr_common.hdr_vid = hdr->hdr_common.hdr_vid;
ack->hdr_vmask = mask;
ack->hdr_src_ptr = hdr->hdr_src_ptr;
assert(ack->hdr_src_ptr.pval);
@ -171,8 +177,9 @@ static void mca_pml_dr_recv_request_ack(
mca_bml_base_free(bml_btl, des);
goto retry;
}
/* mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_vid); */
mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_common.hdr_vid);
return;
/* queue request to retry later */
@ -201,10 +208,13 @@ static void mca_pml_dr_recv_request_vfrag_ack(
mca_bml_base_btl_t* bml_btl;
mca_pml_dr_ack_hdr_t* ack;
int rc;
mca_pml_dr_comm_proc_t* comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs +
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml;
bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager);
/* allocate descriptor */
MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t));
if(NULL == des) {
@ -215,7 +225,10 @@ static void mca_pml_dr_recv_request_vfrag_ack(
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG_ACK;
ack->hdr_common.hdr_flags = 0;
ack->hdr_vid = vfrag->vf_id;
ack->hdr_common.hdr_vid = vfrag->vf_id;
ack->hdr_common.hdr_dst = recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;;
ack->hdr_common.hdr_src = recvreq->req_recv.req_base.req_comm->c_my_rank;
ack->hdr_common.hdr_ctx = recvreq->req_recv.req_base.req_comm->c_contextid;
ack->hdr_vmask = vfrag->vf_ack;
ack->hdr_src_ptr = hdr->hdr_src_ptr;
ack->hdr_dst_ptr.pval = recvreq;
@ -229,77 +242,103 @@ static void mca_pml_dr_recv_request_vfrag_ack(
if(rc != OMPI_SUCCESS) {
mca_bml_base_free(bml_btl, des);
}
/* mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_vid); */
}
mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_common.hdr_vid);
}
/*
* Update the recv request status to reflect the number of bytes
* received and actually delivered to the application.
*/
void mca_pml_dr_recv_request_progress(
mca_pml_dr_recv_request_t* recvreq,
mca_btl_base_module_t* btl,
mca_btl_base_segment_t* segments,
size_t num_segments)
{
size_t bytes_received = 0;
size_t bytes_delivered = 0;
size_t data_offset = 0;
mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval;
size_t i;
uint32_t csum = 1;
uint64_t bit;
mca_pml_dr_vfrag_t* vfrag;
mca_pml_dr_comm_proc_t* comm_proc;
/*
* Update the recv request status to reflect the number of bytes
* received and actually delivered to the application.
*/
comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs +
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
void mca_pml_dr_recv_request_progress(
mca_pml_dr_recv_request_t* recvreq,
mca_btl_base_module_t* btl,
mca_btl_base_segment_t* segments,
size_t num_segments)
{
size_t bytes_received = 0;
size_t bytes_delivered = 0;
size_t data_offset = 0;
mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval;
size_t i;
uint32_t csum = 1;
uint64_t bit;
mca_pml_dr_vfrag_t* vfrag;
for(i=0; i<num_segments; i++)
bytes_received += segments[i].seg_len;
for(i=0; i<num_segments; i++)
bytes_received += segments[i].seg_len;
switch(hdr->hdr_common.hdr_type) {
case MCA_PML_DR_HDR_TYPE_MATCH:
switch(hdr->hdr_common.hdr_type) {
case MCA_PML_DR_HDR_TYPE_MATCH:
bytes_received -= sizeof(mca_pml_dr_match_hdr_t);
recvreq->req_recv.req_bytes_packed = bytes_received;
recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_ptr;
MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
MCA_PML_DR_RECV_REQUEST_UNPACK(
recvreq,
segments,
num_segments,
sizeof(mca_pml_dr_match_hdr_t),
data_offset,
bytes_received,
bytes_delivered,
csum);
if(csum != hdr->hdr_match.hdr_csum) {
assert(0);
bytes_received -= sizeof(mca_pml_dr_match_hdr_t);
recvreq->req_recv.req_bytes_packed = bytes_received;
recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_ptr;
MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
MCA_PML_DR_RECV_REQUEST_UNPACK(
recvreq,
segments,
num_segments,
sizeof(mca_pml_dr_match_hdr_t),
data_offset,
bytes_received,
bytes_delivered,
csum);
if(csum != hdr->hdr_match.hdr_csum) {
/* failed the csum, put the request
back on the list for matching later
at retransmission */
if(recvreq->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) {
mca_pml_dr_recv_request_match_wild(recvreq);
} else {
mca_pml_dr_recv_request_match_specific(recvreq);
}
mca_pml_dr_recv_frag_send_ack(comm_proc->ompi_proc,
&hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr,
0);
assert(0);
return;
}
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_match, 1);
break;
case MCA_PML_DR_HDR_TYPE_RNDV:
bytes_received -= sizeof(mca_pml_dr_rendezvous_hdr_t);
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_ptr;
MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
MCA_PML_DR_RECV_REQUEST_UNPACK(
recvreq,
segments,
num_segments,
sizeof(mca_pml_dr_rendezvous_hdr_t),
data_offset,
bytes_received,
bytes_delivered,
csum);
if(csum != hdr->hdr_match.hdr_csum) {
/* failed the csum, put the request
back on the list for matching later
at retransmission */
if(recvreq->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) {
mca_pml_dr_recv_request_match_wild(recvreq);
} else {
mca_pml_dr_recv_request_match_specific(recvreq);
}
mca_pml_dr_recv_frag_send_ack(comm_proc->ompi_proc,
&hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr,
0);
assert(0);
return;
}
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_match,
csum == hdr->hdr_match.hdr_csum ? 1 : 0);
break;
case MCA_PML_DR_HDR_TYPE_RNDV:
bytes_received -= sizeof(mca_pml_dr_rendezvous_hdr_t);
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_ptr;
MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
MCA_PML_DR_RECV_REQUEST_UNPACK(
recvreq,
segments,
num_segments,
sizeof(mca_pml_dr_rendezvous_hdr_t),
data_offset,
bytes_received,
bytes_delivered,
csum);
if(csum != hdr->hdr_match.hdr_csum) {
assert(0);
}
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_match,
csum == hdr->hdr_match.hdr_csum ? 1 : 0);
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_match, 1);
break;
case MCA_PML_DR_HDR_TYPE_FRAG:
@ -381,6 +420,20 @@ void mca_pml_dr_recv_request_matched_probe(
break;
}
/* check completion status */
OPAL_THREAD_LOCK(&ompi_request_lock);
recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_match.hdr_tag;
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_common.hdr_src;
recvreq->req_recv.req_base.req_ompi.req_status._count = bytes_packed;
recvreq->req_recv.req_base.req_pml_complete = true;
recvreq->req_recv.req_base.req_ompi.req_complete = true;
ompi_request_completed++;
if(ompi_request_waiting) {
opal_condition_broadcast(&ompi_request_cond);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* mark probe request completed */
MCA_PML_DR_RECV_REQUEST_PML_COMPLETE(recvreq);
}

Просмотреть файл

@ -199,7 +199,7 @@ do {
(request)->req_recv.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \
(request)->req_recv.req_base.req_ompi.req_status._cancelled = 0; \
\
/* attempt to match posted recv */ \
/* attempt to match unexpected recv */ \
if((request)->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) { \
mca_pml_dr_recv_request_match_wild(request); \
} else { \
@ -217,11 +217,12 @@ do {
hdr) \
do { \
(request)->req_recv.req_base.req_ompi.req_status.MPI_TAG = (hdr)->hdr_tag; \
(request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = (hdr)->hdr_src; \
(request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = \
(hdr)->hdr_common.hdr_src; \
if((request)->req_recv.req_bytes_packed != 0) { \
ompi_proc_t *proc = \
ompi_comm_peer_lookup( \
(request)->req_recv.req_base.req_comm, (hdr)->hdr_src); \
(request)->req_recv.req_base.req_comm, (hdr)->hdr_common.hdr_src); \
\
(request)->req_proc = proc; \
ompi_convertor_copy_and_prepare_for_recv( proc->proc_convertor, \
@ -317,7 +318,7 @@ void mca_pml_dr_recv_request_schedule(
#define MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq,hdr,vfrag) \
do { \
if((recvreq)->req_vfrag->vf_id == (hdr)->hdr_vid) { \
if((recvreq)->req_vfrag->vf_id == (hdr)->hdr_common.hdr_vid) { \
vfrag = (recvreq)->req_vfrag; \
} else if ((hdr)->hdr_frag_offset == 0) { \
vfrag = &(recvreq)->req_vfrag0; \
@ -331,7 +332,7 @@ do { \
item != opal_list_get_end(&(recvreq)->req_vfrags); \
item = opal_list_get_next(item)) { \
mca_pml_dr_vfrag_t* vf = (mca_pml_dr_vfrag_t*)item; \
if(vf->vf_id == (hdr)->hdr_vid) { \
if(vf->vf_id == (hdr)->hdr_common.hdr_vid) { \
vfrag = vf; \
break; \
} \
@ -339,7 +340,7 @@ do { \
if(NULL == vfrag) { \
MCA_PML_DR_VFRAG_ALLOC(vfrag,rc); \
if(NULL != vfrag) { \
(vfrag)->vf_id = (hdr)->hdr_vid; \
(vfrag)->vf_id = (hdr)->hdr_common.hdr_vid; \
(vfrag)->vf_len = (hdr)->hdr_vlen; \
(vfrag)->vf_ack = 0; \
(vfrag)->vf_mask_processed = 0; \

Просмотреть файл

@ -342,9 +342,10 @@ int mca_pml_dr_send_request_start_buffered(
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV;
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = csum;
@ -422,13 +423,14 @@ int mca_pml_dr_send_request_start_copy(
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : OMPI_CSUM_ZERO;
hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0;
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t));
/* update lengths */
@ -490,13 +492,14 @@ int mca_pml_dr_send_request_start_prepare(
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : OMPI_CSUM_ZERO;
hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0;
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t));
/* short message */
@ -561,13 +564,14 @@ int mca_pml_dr_send_request_start_rndv(
hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : OMPI_CSUM_ZERO;
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t));
@ -672,13 +676,15 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG;
hdr->hdr_vid = vfrag->vf_id;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_vid = vfrag->vf_id;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_vlen = vfrag->vf_len;
hdr->hdr_frag_idx = vfrag->vf_idx;
hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_frag_offset = sendreq->req_send_offset;
hdr->hdr_src_ptr.pval = vfrag;
hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
@ -760,7 +766,10 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG;
hdr->hdr_vid = vfrag->vf_id;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_vid = vfrag->vf_id;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_vlen = vfrag->vf_len;
hdr->hdr_frag_idx = vfrag->vf_idx;
hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum;

Просмотреть файл

@ -76,6 +76,7 @@ do { \
} while(0)
#if 0
#define MCA_PML_DR_VFRAG_WDOG_START(vfrag) \
do { \
opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \
@ -104,6 +105,7 @@ do { \
\
} while(0)
#endif
#if 1