1
1
This commit was SVN r9366.
Этот коммит содержится в:
Tim Woodall 2006-03-22 15:02:36 +00:00
родитель bcb23dc762
Коммит 0f6161c6da
13 изменённых файлов: 341 добавлений и 411 удалений

Просмотреть файл

@ -228,36 +228,6 @@ extern int mca_pml_dr_start(
#endif
#define MCA_PML_DR_FREE(request) \
{ \
mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)(request); \
pml_request->req_free_called = true; \
if( pml_request->req_pml_complete == true) \
{ \
switch(pml_request->req_type) { \
case MCA_PML_REQUEST_SEND: \
{ \
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)pml_request; \
if(sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
} \
MCA_PML_DR_SEND_REQUEST_RETURN(sendreq); \
break; \
} \
case MCA_PML_REQUEST_RECV: \
{ \
mca_pml_dr_recv_request_t* recvreq = (mca_pml_dr_recv_request_t*)pml_request; \
MCA_PML_DR_RECV_REQUEST_RETURN(recvreq); \
break; \
} \
default: \
break; \
} \
} \
*(request) = MPI_REQUEST_NULL; \
}
#define MCA_PML_DR_DES_ALLOC(bml_btl, des, size) \
MCA_BML_BASE_BTL_DES_ALLOC(bml_btl, des, \
sizeof(mca_pml_dr_hdr_t) + (sizeof(mca_btl_base_segment_t) << 4), size)

Просмотреть файл

@ -22,7 +22,7 @@
#include "pml_dr.h"
#include "pml_dr_comm.h"
OBJ_CLASS_INSTANCE(mca_pml_dr_acked_item_t,
OBJ_CLASS_INSTANCE(mca_pml_dr_range_t,
opal_list_item_t,
NULL,
NULL);
@ -34,15 +34,17 @@ static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc)
proc->send_sequence = 0;
OBJ_CONSTRUCT(&proc->frags_cant_match, opal_list_t);
OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t);
OBJ_CONSTRUCT(&proc->matched_receives, opal_list_t);
OBJ_CONSTRUCT(&proc->unexpected_frags, opal_list_t);
OBJ_CONSTRUCT(&proc->acked_vfrags, opal_list_t);
proc->acked_vfrags_current = NULL;
OBJ_CONSTRUCT(&proc->vfrag_ids, opal_list_t);
proc->vfrag_ids_current = NULL;
}
static void mca_pml_dr_comm_proc_destruct(mca_pml_dr_comm_proc_t* proc)
{
OBJ_DESTRUCT(&proc->frags_cant_match);
OBJ_DESTRUCT(&proc->matched_receives);
OBJ_DESTRUCT(&proc->specific_receives);
OBJ_DESTRUCT(&proc->unexpected_frags);
}

Просмотреть файл

@ -31,20 +31,20 @@ extern "C" {
#endif
struct mca_pml_dr_acked_item_t{
struct mca_pml_dr_range_t{
opal_list_item_t super;
int32_t vfrag_id_high;
int32_t vfrag_id_low;
uint32_t vfrag_id_high;
uint32_t vfrag_id_low;
};
typedef struct mca_pml_dr_acked_item_t mca_pml_dr_acked_item_t;
typedef struct mca_pml_dr_range_t mca_pml_dr_range_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_acked_item_t);
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_range_t);
struct mca_pml_dr_comm_proc_t {
opal_object_t super;
uint16_t expected_sequence; /**< send message sequence number - receiver side */
int32_t vfrag_id; /**< virtual fragment identifier */
uint32_t vfrag_id; /**< virtual fragment identifier */
#if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t send_sequence; /**< send side sequence number */
#else
@ -53,9 +53,10 @@ struct mca_pml_dr_comm_proc_t {
opal_list_t frags_cant_match; /**< out-of-order fragment queues */
opal_list_t specific_receives; /**< queues of unmatched specific receives */
opal_list_t unexpected_frags; /**< unexpected fragment queues */
opal_list_t matched_receives; /**< list of in-progress matched receives */
ompi_proc_t* ompi_proc; /**< back pointer to ompi_proc_t */
opal_list_t acked_vfrags; /**< list of vfrags id's that have been acked */
mca_pml_dr_acked_item_t* acked_vfrags_current; /**< a pointer to the last place we were in the list */
opal_list_t vfrag_ids; /**< list of vfrags id's that have been seen */
mca_pml_dr_range_t* vfrag_ids_current; /**< a pointer to the last place we were in the list */
};
typedef struct mca_pml_dr_comm_proc_t mca_pml_dr_comm_proc_t;
@ -90,112 +91,126 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_comm_t);
OMPI_DECLSPEC extern int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_comm);
static inline bool mca_pml_dr_comm_proc_check_acked(mca_pml_dr_comm_proc_t* dr_comm_proc, int32_t vfrag_id) {
mca_pml_dr_acked_item_t* item = dr_comm_proc->acked_vfrags_current;
/**
* Look for duplicate sequence number in current range.
* Must be called w/ matching lock held.
*/
static inline bool mca_pml_dr_comm_proc_check_duplicate(
mca_pml_dr_comm_proc_t* dr_proc,
uint32_t vfrag_id)
{
mca_pml_dr_range_t* item;
int8_t direction = 0; /* 1 is next, -1 is previous */
item = dr_proc->vfrag_ids_current;
while(true) {
if(NULL == item) {
return false;
} else if(item->vfrag_id_high >= vfrag_id && item->vfrag_id_low <= vfrag_id) {
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) item;
dr_proc->vfrag_ids_current = (mca_pml_dr_range_t*) item;
return true;
} else if(vfrag_id > item->vfrag_id_high && direction != -1) {
direction = 1;
item = (mca_pml_dr_acked_item_t*) opal_list_get_next(item);
item = (mca_pml_dr_range_t*) opal_list_get_next(item);
} else if(vfrag_id < item->vfrag_id_low && direction != 1) {
direction = -1;
item = (mca_pml_dr_acked_item_t*) opal_list_get_prev(item);
item = (mca_pml_dr_range_t*) opal_list_get_prev(item);
} else {
return false;
}
}
}
static inline void mca_pml_dr_comm_proc_set_acked(mca_pml_dr_comm_proc_t* dr_comm_proc,
int32_t vfrag_id) {
opal_list_t* acked_vfrags = &dr_comm_proc->acked_vfrags;
mca_pml_dr_acked_item_t* item = dr_comm_proc->acked_vfrags_current;
/*
* Must be called w/ matching lock held
*/
static inline void mca_pml_dr_comm_proc_set_vid(mca_pml_dr_comm_proc_t* dr_comm_proc, uint32_t vfrag_id)
{
opal_list_t* vfrag_ids = &dr_comm_proc->vfrag_ids;
mca_pml_dr_range_t* item = dr_comm_proc->vfrag_ids_current;
int8_t direction = 0; /* 1 is next, -1 is previous */
mca_pml_dr_acked_item_t *new_item, *next_item, *prev_item;
mca_pml_dr_range_t *new_item, *next_item, *prev_item;
while(true) {
if( item == NULL || item == (mca_pml_dr_acked_item_t*) &acked_vfrags->opal_list_tail ) {
new_item = OBJ_NEW(mca_pml_dr_acked_item_t);
if( item == NULL || item == (mca_pml_dr_range_t*) &vfrag_ids->opal_list_tail ) {
new_item = OBJ_NEW(mca_pml_dr_range_t);
new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id;
opal_list_append(acked_vfrags, (opal_list_item_t*) new_item);
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) new_item;
opal_list_append(vfrag_ids, (opal_list_item_t*) new_item);
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
return;
} else if( item == (mca_pml_dr_acked_item_t*) &acked_vfrags->opal_list_head ) {
new_item = OBJ_NEW(mca_pml_dr_acked_item_t);
} else if( item == (mca_pml_dr_range_t*) &vfrag_ids->opal_list_head ) {
new_item = OBJ_NEW(mca_pml_dr_range_t);
new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id;
opal_list_prepend(acked_vfrags, (opal_list_item_t*) new_item);
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) new_item;
opal_list_prepend(vfrag_ids, (opal_list_item_t*) new_item);
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
return;
} else if(item->vfrag_id_high >= vfrag_id && item->vfrag_id_low <= vfrag_id ) {
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) item;
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) item;
return;
} else if((item->vfrag_id_high + 1) == vfrag_id) {
next_item = (mca_pml_dr_acked_item_t*) opal_list_get_next(item);
next_item = (mca_pml_dr_range_t*) opal_list_get_next(item);
/* try to consolidate */
if(next_item && next_item->vfrag_id_low == (vfrag_id+1)) {
item->vfrag_id_high = next_item->vfrag_id_high;
opal_list_remove_item(acked_vfrags, (opal_list_item_t*) next_item);
opal_list_remove_item(vfrag_ids, (opal_list_item_t*) next_item);
OBJ_RELEASE(next_item);
} else {
item->vfrag_id_high = vfrag_id;
}
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) item;
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) item;
return;
} else if((item->vfrag_id_low - 1) == vfrag_id) {
prev_item = (mca_pml_dr_acked_item_t*) opal_list_get_prev(item);
prev_item = (mca_pml_dr_range_t*) opal_list_get_prev(item);
/* try to consolidate */
if(prev_item && prev_item->vfrag_id_high == (vfrag_id-1)) {
item->vfrag_id_low = prev_item->vfrag_id_low;
opal_list_remove_item(acked_vfrags, (opal_list_item_t*) prev_item);
opal_list_remove_item(vfrag_ids, (opal_list_item_t*) prev_item);
OBJ_RELEASE(prev_item);
} else {
item->vfrag_id_low = vfrag_id;
}
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) item;
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) item;
return;
} else if(vfrag_id > item->vfrag_id_high ) {
if(direction == -1) {
/* we have gone back in the list, and we went one item too far */
new_item = OBJ_NEW(mca_pml_dr_acked_item_t);
new_item = OBJ_NEW(mca_pml_dr_range_t);
new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id;
/* insert new_item directly before item */
opal_list_insert_pos(acked_vfrags,
opal_list_insert_pos(vfrag_ids,
(opal_list_item_t*) item,
(opal_list_item_t*) new_item);
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) new_item;
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
return;
} else {
direction = 1;
item = (mca_pml_dr_acked_item_t*) opal_list_get_next(item);
item = (mca_pml_dr_range_t*) opal_list_get_next(item);
}
} else if(vfrag_id < item->vfrag_id_low) {
if(direction == 1) {
/* we have gone forward in the list, and we went one item too far */
new_item = OBJ_NEW(mca_pml_dr_acked_item_t);
next_item = (mca_pml_dr_acked_item_t*) opal_list_get_next(item);
new_item = OBJ_NEW(mca_pml_dr_range_t);
next_item = (mca_pml_dr_range_t*) opal_list_get_next(item);
if(NULL == next_item) {
opal_list_append(acked_vfrags, (opal_list_item_t*) new_item);
opal_list_append(vfrag_ids, (opal_list_item_t*) new_item);
} else {
opal_list_insert_pos(acked_vfrags,
opal_list_insert_pos(vfrag_ids,
(opal_list_item_t*) next_item,
(opal_list_item_t*) new_item);
}
dr_comm_proc->acked_vfrags_current = (mca_pml_dr_acked_item_t*) new_item;
dr_comm_proc->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
return;
} else {
direction = -1;
item = (mca_pml_dr_acked_item_t*) opal_list_get_prev(item);
item = (mca_pml_dr_range_t*) opal_list_get_prev(item);
}
} else {
return;

Просмотреть файл

@ -41,7 +41,6 @@
#define MCA_PML_DR_HDR_TYPE_FRAG_ACK (MCA_PML_DR_HDR_TYPE_ACK | MCA_PML_DR_HDR_TYPE_FRAG)
#define MCA_PML_DR_HDR_FLAGS_NBO 1 /* is the hdr in network byte order */
#define MCA_PML_DR_HDR_FLAGS_MATCHED 2
/**
* Common hdr attributes - must be first element in each hdr type
@ -149,6 +148,7 @@ typedef struct mca_pml_dr_frag_hdr_t mca_pml_dr_frag_hdr_t;
struct mca_pml_dr_ack_hdr_t {
mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */
uint32_t hdr_vlen; /**< total bytes received in vfrag */
uint64_t hdr_vmask; /**< acknowledged frags */
ompi_ptr_t hdr_src_ptr; /**< source vfrag */
ompi_ptr_t hdr_dst_ptr; /**< matched receive request */

Просмотреть файл

@ -102,7 +102,7 @@ int mca_pml_dr_send(void *buf,
MCA_PML_DR_SEND_REQUEST_START(sendreq, rc);
if (rc != OMPI_SUCCESS) {
MCA_PML_DR_FREE((ompi_request_t **) & sendreq);
ompi_request_free((ompi_request_t **) & sendreq);
return rc;
}

Просмотреть файл

@ -41,18 +41,31 @@
do { \
uint16_t csum = opal_csum(hdr, sizeof(type)); \
if(hdr->hdr_common.hdr_csum != csum) { \
opal_output(0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n", \
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum); \
OPAL_OUTPUT((0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n", \
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum)); \
return; \
} \
if(hdr->hdr_common.hdr_dst != (ompi_comm_lookup(hdr->hdr_common.hdr_ctx))->c_my_rank ) { \
opal_output(0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n", \
__FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst); \
OPAL_OUTPUT((0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n", \
__FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst)); \
return; \
} \
} while (0)
#define MCA_PML_DR_COMM_PROC_LOOKUP(hdr, comm, proc) \
do { \
ompi_communicator_t* comm_ptr=ompi_comm_lookup(hdr->hdr_common.hdr_ctx); \
if(NULL == comm_ptr) { \
OPAL_OUTPUT((0, "%s:%d: invalid communicator: %d\n", \
__FILE__,__LINE__,hdr->hdr_common.hdr_ctx)); \
return; \
} \
comm = (mca_pml_dr_comm_t*)comm_ptr->c_pml_comm; \
proc = comm->procs + hdr->hdr_common.hdr_src; \
} while (0)
OBJ_CLASS_INSTANCE(
mca_pml_dr_buffer_t,
opal_list_item_t,
@ -94,7 +107,8 @@ void mca_pml_dr_recv_frag_callback(
{
mca_btl_base_segment_t* segments = des->des_dst;
mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval;
bool duplicate = false;
mca_pml_dr_comm_t *comm;
mca_pml_dr_comm_proc_t *proc;
if(segments->seg_len < sizeof(mca_pml_dr_common_hdr_t)) {
return;
}
@ -103,11 +117,19 @@ void mca_pml_dr_recv_frag_callback(
case MCA_PML_DR_HDR_TYPE_MATCH:
{
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_match_hdr_t);
MCA_PML_DR_RECV_FRAG_CHECK_DUP(hdr, duplicate);
if(false == duplicate) {
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
OPAL_THREAD_LOCK(&comm->c_matching_lock);
if(mca_pml_dr_comm_proc_check_duplicate(proc, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&comm->c_matching_lock);
OPAL_OUTPUT((0, "%s:%d: acking duplicate match\n", __FILE__, __LINE__));
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml,
&hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr.pval,
1);
} else {
OPAL_OUTPUT((0, "%s:%d: dropping duplicate fragment\n", __FILE__, __LINE__));
OPAL_THREAD_UNLOCK(&comm->c_matching_lock);
mca_pml_dr_recv_frag_match(comm,proc,btl,&hdr->hdr_match,segments,des->des_dst_cnt);
}
break;
}
@ -120,26 +142,22 @@ void mca_pml_dr_recv_frag_callback(
case MCA_PML_DR_HDR_TYPE_RNDV:
{
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_rendezvous_hdr_t);
MCA_PML_DR_RECV_FRAG_CHECK_DUP(hdr, duplicate);
if(!duplicate) {
if(hdr->hdr_rndv.hdr_msg_length && segments->seg_len) {
/* no eager data on nonzero length message, this is a probe!
we haven't seen the eager data, so nack and force retransmission*/
ompi_communicator_t *comm_ptr = ompi_comm_lookup(hdr->hdr_common.hdr_ctx);
mca_pml_dr_comm_t *comm = (mca_pml_dr_comm_t*) comm_ptr->c_pml_comm;
mca_pml_dr_comm_proc_t *proc = comm->procs + hdr->hdr_common.hdr_src;
mca_pml_dr_recv_frag_send_ack(proc->ompi_proc,
&hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr,
0);
OPAL_OUTPUT((0, "%s:%d: nacking PROBE, haven't seen EAGER data yet!\n", __FILE__, __LINE__));
} else {
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
OPAL_THREAD_LOCK(&comm->matching_lock);
if(mca_pml_dr_comm_proc_check_duplicate(proc, hdr->hdr_common.hdr_vid)) {
/* ack only if this has been matched */
mca_pml_dr_recv_request_t* recvreq =
mca_pml_dr_comm_proc_check_matched(proc, hdr->hdr_common.hdr_vid);
OPAL_THREAD_UNLOCK(&comm->c_matching_lock);
if(NULL != recvreq) {
OPAL_OUTPUT((0, "%s:%d: acking duplicate rendezvous\n", __FILE__, __LINE__));
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr, recvreq->req_bytes_received, 1);
}
} else {
/* must check the the pending receive list! */
OPAL_OUTPUT((0, "%s:%d: dropping duplicate fragment\n", __FILE__, __LINE__));
OPAL_THREAD_UNLOCK(&comm->c_matching_lock);
mca_pml_dr_recv_frag_match(comm,proc,btl,&hdr->hdr_match,segments,des->des_dst_cnt);
}
break;
}
@ -152,20 +170,20 @@ void mca_pml_dr_recv_frag_callback(
case MCA_PML_DR_HDR_TYPE_FRAG:
{
mca_pml_dr_recv_request_t* recvreq;
mca_pml_dr_comm_proc_t* comm_proc;
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_frag_hdr_t);
recvreq = hdr->hdr_frag.hdr_dst_ptr.pval;
comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs +
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
if(mca_pml_dr_comm_proc_check_acked(comm_proc,
hdr->hdr_common.hdr_vid)) {
mca_pml_dr_recv_frag_send_ack(comm_proc->ompi_proc,
&hdr->hdr_common,
hdr->hdr_frag.hdr_src_ptr,
~(uint64_t) 0);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
OPAL_THREAD_LOCK(&comm->matching_lock);
if(mca_pml_dr_comm_proc_check_duplicate(proc, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&comm->c_matching_lock);
OPAL_OUTPUT((0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__));
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml,
&hdr->hdr_common,
hdr->hdr_frag.hdr_src_ptr.pval,
~(uint64_t) 0);
} else {
OPAL_THREAD_UNLOCK(&comm->c_matching_lock);
recvreq = hdr->hdr_frag.hdr_dst_ptr.pval;
mca_pml_dr_recv_request_progress(recvreq,btl,segments,des->des_dst_cnt);
}
break;
@ -178,8 +196,7 @@ void mca_pml_dr_recv_frag_callback(
break;
}
default:
OPAL_OUTPUT((0, "%s:%d: dropping unknown header type\n"));
return; /* drop it on the floor.. */
OPAL_OUTPUT((0, "%s:%d: dropping unknown header type\n", __FILE__,__LINE__));
break;
}
}
@ -472,31 +489,24 @@ static bool mca_pml_dr_check_cantmatch_for_match(
* - this routine may be called simultaneously by more than one thread
*/
bool mca_pml_dr_recv_frag_match(
mca_btl_base_module_t *btl,
mca_pml_dr_match_hdr_t *hdr,
mca_btl_base_segment_t* segments,
size_t num_segments)
mca_pml_dr_comm_t* comm,
mca_pml_dr_comm_proc_t *proc,
mca_btl_base_module_t *btl,
mca_pml_dr_match_hdr_t *hdr,
mca_btl_base_segment_t* segments,
size_t num_segments)
{
/* local variables */
uint16_t next_msg_seq_expected, frag_msg_seq;
ompi_communicator_t *comm_ptr;
mca_pml_dr_recv_request_t *match = NULL;
mca_pml_dr_comm_t *comm;
mca_pml_dr_comm_proc_t *proc;
bool additional_match=false;
opal_list_t additional_matches;
ompi_proc_t* ompi_proc;
ompi_proc_t* ompi_proc = proc->ompi_proc;
int rc;
uint32_t csum = OPAL_CSUM_ZERO;
/* communicator pointer */
comm_ptr=ompi_comm_lookup(hdr->hdr_common.hdr_ctx);
comm=(mca_pml_dr_comm_t *)comm_ptr->c_pml_comm;
/* source sequence number */
frag_msg_seq = hdr->hdr_seq;
proc = comm->procs + hdr->hdr_common.hdr_src;
ompi_proc = proc->ompi_proc;
/* get next expected message sequence number - if threaded
* run, lock to make sure that if another thread is processing
@ -550,6 +560,7 @@ rematch:
/*
* update delivered sequence number information, if needed.
*/
MCA_PML_DR_RECV_REQUEST_MATCHED(match,comm,proc,hdr);
if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) {
/* complete the probe */
@ -568,14 +579,12 @@ rematch:
OPAL_THREAD_UNLOCK(&comm->matching_lock);
return rc;
}
MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl,csum);
MCA_PML_DR_RECV_FRAG_INIT(frag,ompi_proc,hdr,segments,num_segments,btl,csum);
if(csum != hdr->hdr_csum) {
mca_pml_dr_recv_frag_send_ack(ompi_proc,
&hdr->hdr_common,
hdr->hdr_src_ptr,
0);
opal_output(0, "%s:%d: corrupted data 0x%08x != 0x%08x\n",
__FILE__, __LINE__, csum, hdr->hdr_csum);
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)ompi_proc->proc_pml,
&hdr->hdr_common, hdr->hdr_src_ptr.pval, 0);
OPAL_OUTPUT((0, "%s:%d: received corrupted data 0x%08x != 0x%08x\n",
__FILE__, __LINE__, csum, hdr->hdr_csum));
MCA_PML_DR_RECV_FRAG_RETURN(frag);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
return false;
@ -605,27 +614,33 @@ rematch:
OPAL_THREAD_UNLOCK(&comm->matching_lock);
return rc;
}
MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl,csum);
MCA_PML_DR_RECV_FRAG_INIT(frag,ompi_proc,hdr,segments,num_segments,btl,csum);
if(csum != hdr->hdr_csum) {
mca_pml_dr_recv_frag_send_ack(ompi_proc,
&hdr->hdr_common,
hdr->hdr_src_ptr,
0);
opal_output(0, "%s:%d: corrupted data 0x%08x != 0x%08x\n",
__FILE__, __LINE__, csum, hdr->hdr_csum);
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)ompi_proc->proc_pml,
&hdr->hdr_common, hdr->hdr_src_ptr.pval, 0);
OPAL_OUTPUT((0, "%s:%d: received corrupted data 0x%08x != 0x%08x\n",
__FILE__, __LINE__, csum, hdr->hdr_csum));
MCA_PML_DR_RECV_FRAG_RETURN(frag);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
return false;
}
opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag);
}
OPAL_THREAD_UNLOCK(&comm->matching_lock);
mca_pml_dr_comm_proc_set_vid(proc, hdr->hdr_common.hdr_vid);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
/* release matching lock before processing fragment */
if(match != NULL) {
mca_pml_dr_recv_request_progress(match,btl,segments,num_segments);
}
/* if buffered a short message - go ahead and ack */
else if (hdr->hdr_common.hdr_type == MCA_PML_DR_HDR_TYPE_MATCH) {
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)ompi_proc->proc_pml,
&hdr->hdr_common, hdr->hdr_src_ptr.pval, 1);
}
if(additional_match) {
opal_list_item_t* item;
while(NULL != (item = opal_list_remove_first(&additional_matches))) {
@ -639,21 +654,18 @@ rematch:
void mca_pml_dr_recv_frag_send_ack(
ompi_proc_t* ompi_proc,
mca_pml_dr_common_hdr_t* hdr,
ompi_ptr_t src_ptr,
uint64_t mask)
void mca_pml_dr_recv_frag_ack(
mca_bml_base_endpoint_t* endpoint,
mca_pml_dr_common_hdr_t* hdr,
void *src_ptr,
uint64_t mask)
{
mca_bml_base_endpoint_t* bml_endpoint = NULL;
mca_btl_base_descriptor_t* des;
mca_bml_base_btl_t* bml_btl;
mca_pml_dr_recv_frag_t* frag;
mca_pml_dr_ack_hdr_t* ack;
int rc;
bml_endpoint = (mca_bml_base_endpoint_t*) ompi_proc->proc_pml;
bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager);
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
/* allocate descriptor */
MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t));
@ -668,8 +680,9 @@ void mca_pml_dr_recv_frag_send_ack(
ack->hdr_common.hdr_dst = hdr->hdr_src;
ack->hdr_common.hdr_vid = hdr->hdr_vid;
ack->hdr_common.hdr_ctx = hdr->hdr_ctx;
ack->hdr_vlen = 0;
ack->hdr_vmask = mask;
ack->hdr_src_ptr = src_ptr;
ack->hdr_src_ptr.pval = src_ptr;
assert(ack->hdr_src_ptr.pval);
ack->hdr_dst_ptr.pval = NULL;
ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t));
@ -810,6 +823,8 @@ rematch:
/* associate the receive descriptor with the fragment
* descriptor */
frag->request=match;
match->req_proc = proc;
match->req_endpoint = (mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml;
/* add this fragment descriptor to the list of
* descriptors to be processed later

Просмотреть файл

@ -49,36 +49,8 @@ struct mca_pml_dr_recv_frag_t {
};
typedef struct mca_pml_dr_recv_frag_t mca_pml_dr_recv_frag_t;
void mca_pml_dr_recv_frag_send_ack(ompi_proc_t* ompi_proc,
mca_pml_dr_common_hdr_t* hdr,
ompi_ptr_t src_ptr,
uint64_t mask);
OBJ_CLASS_DECLARATION(mca_pml_dr_recv_frag_t);
#define MCA_PML_DR_RECV_FRAG_CHECK_DUP(hdr, dup) \
do { \
ompi_communicator_t *comm_ptr; \
mca_pml_dr_comm_t *comm; \
mca_pml_dr_comm_proc_t *proc; \
comm_ptr = ompi_comm_lookup(hdr->hdr_common.hdr_ctx); \
comm = (mca_pml_dr_comm_t*) comm_ptr->c_pml_comm; \
proc = comm->procs + hdr->hdr_common.hdr_src; \
if(mca_pml_dr_comm_proc_check_acked(proc, \
hdr->hdr_common.hdr_vid)) { \
mca_pml_dr_recv_frag_send_ack(proc->ompi_proc, \
&hdr->hdr_common, \
hdr->hdr_match.hdr_src_ptr, \
1); \
dup = true; \
} else { \
dup = false; \
} \
} while (0)
#define MCA_PML_DR_RECV_FRAG_ALLOC(frag,rc) \
do { \
@ -155,6 +127,16 @@ do { \
} while(0)
/**
* Generate an ack to the peer.
*/
void mca_pml_dr_recv_frag_ack(
mca_bml_base_endpoint_t* endpoint,
mca_pml_dr_common_hdr_t* hdr,
void* src_ptr,
uint64_t mask);
/**
* Callback from BTL on receipt of a recv_frag.
*/
@ -164,7 +146,7 @@ OMPI_DECLSPEC void mca_pml_dr_recv_frag_callback(
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t* descriptor,
void* cbdata);
/**
* Match incoming recv_frags against posted receives.
* Supports out of order delivery.
@ -176,6 +158,8 @@ OMPI_DECLSPEC void mca_pml_dr_recv_frag_callback(
* @return OMPI_SUCCESS or error status on failure.
*/
OMPI_DECLSPEC bool mca_pml_dr_recv_frag_match(
mca_pml_dr_comm_t* comm,
mca_pml_dr_comm_proc_t* proc,
mca_btl_base_module_t* btl,
mca_pml_dr_match_hdr_t *hdr,
mca_btl_base_segment_t* segments,

Просмотреть файл

@ -31,7 +31,7 @@
#include "orte/mca/errmgr/errmgr.h"
#define MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum) \
#define MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum,bytes_received) \
if(csum != hdr->hdr_match.hdr_csum) { \
/* failed the csum, put the request back on the list for \
* matching later on retransmission \
@ -41,18 +41,19 @@ if(csum != hdr->hdr_match.hdr_csum) { \
} else { \
mca_pml_dr_recv_request_match_specific(recvreq); \
} \
mca_pml_dr_recv_frag_send_ack(comm_proc->ompi_proc, \
mca_pml_dr_recv_frag_ack(recvreq->req_endpoint, \
&hdr->hdr_common, \
hdr->hdr_match.hdr_src_ptr, \
hdr->hdr_match.hdr_src_ptr.pval, \
0); \
opal_output(0, "%s:%d: [rank %d -> rank %d] " \
OPAL_OUTPUT((0, "%s:%d: [rank %d -> rank %d] " \
"data checksum failed 0x%08x != 0x%08x\n", \
__FILE__, __LINE__, \
hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst, \
csum, hdr->hdr_match.hdr_csum); \
csum, hdr->hdr_match.hdr_csum)); \
bytes_received = bytes_delivered = 0; \
} else { \
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_match, 1); \
} else if (recvreq->req_acked == false) { \
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, \
hdr->hdr_match.hdr_src_ptr, bytes_received, 1); \
}
@ -146,101 +147,23 @@ static void mca_pml_dr_ctl_completion(
}
/*
* Generate an ack to the peer after first fragment is matched.
* Generate an ack to the peer.
*/
static void mca_pml_dr_recv_request_ack(
void mca_pml_dr_recv_request_ack(
mca_pml_dr_recv_request_t* recvreq,
mca_pml_dr_match_hdr_t* hdr,
uint8_t mask)
mca_pml_dr_common_hdr_t* hdr,
ompi_ptr_t src_ptr,
size_t vlen,
uint64_t mask)
{
ompi_proc_t* proc = recvreq->req_proc;
mca_bml_base_endpoint_t* bml_endpoint = NULL;
mca_btl_base_descriptor_t* des;
mca_bml_base_btl_t* bml_btl;
mca_pml_dr_recv_frag_t* frag;
mca_pml_dr_ack_hdr_t* ack;
int rc;
mca_pml_dr_comm_proc_t* comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs +
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
/* if this hasn't been initialized yet - this is a synchronous send */
if(NULL == proc) {
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(
recvreq->req_recv.req_base.req_comm, hdr->hdr_common.hdr_src);
proc = recvreq->req_proc = ompi_proc;
}
bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml;
bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager);
/* allocate descriptor */
MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t));
if(NULL == des) {
goto retry;
}
/* fill out header */
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_common.hdr_type;
ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCHED;
ack->hdr_common.hdr_dst = recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;;
ack->hdr_common.hdr_src = recvreq->req_recv.req_base.req_comm->c_my_rank;
ack->hdr_common.hdr_ctx = recvreq->req_recv.req_base.req_comm->c_contextid;
ack->hdr_common.hdr_vid = hdr->hdr_common.hdr_vid;
ack->hdr_vmask = mask;
ack->hdr_src_ptr = hdr->hdr_src_ptr;
assert(ack->hdr_src_ptr.pval);
ack->hdr_dst_ptr.pval = recvreq;
ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t));
/* initialize descriptor */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbfunc = mca_pml_dr_ctl_completion;
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
if(rc != OMPI_SUCCESS) {
mca_bml_base_free(bml_btl, des);
goto retry;
}
mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_common.hdr_vid);
return;
/* queue request to retry later */
retry:
MCA_PML_DR_RECV_FRAG_ALLOC(frag,rc);
frag->hdr.hdr_match = *hdr;
frag->num_segments = 0;
frag->request = recvreq;
opal_list_append(&mca_pml_dr.acks_pending, (opal_list_item_t*)frag);
}
/*
* Generate an ack w/ the current vfrag status.
*/
static void mca_pml_dr_recv_request_vfrag_ack(
mca_pml_dr_recv_request_t* recvreq,
mca_pml_dr_vfrag_t* vfrag,
mca_pml_dr_frag_hdr_t* hdr)
{
ompi_proc_t* proc = recvreq->req_proc;
mca_bml_base_endpoint_t* bml_endpoint = NULL;
mca_btl_base_descriptor_t* des;
mca_bml_base_btl_t* bml_btl;
mca_pml_dr_ack_hdr_t* ack;
int rc;
mca_pml_dr_comm_proc_t* comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs +
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml;
bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager);
/* allocate descriptor */
bml_btl = mca_bml_base_btl_array_get_next(&recvreq->req_endpoint->btl_eager);
MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t));
if(NULL == des) {
return;
@ -248,14 +171,15 @@ static void mca_pml_dr_recv_request_vfrag_ack(
/* fill out header */
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG_ACK;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_type;
ack->hdr_common.hdr_flags = 0;
ack->hdr_common.hdr_vid = vfrag->vf_id;
ack->hdr_common.hdr_dst = recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;;
ack->hdr_common.hdr_src = recvreq->req_recv.req_base.req_comm->c_my_rank;
ack->hdr_common.hdr_ctx = recvreq->req_recv.req_base.req_comm->c_contextid;
ack->hdr_vmask = vfrag->vf_ack;
ack->hdr_src_ptr = hdr->hdr_src_ptr;
ack->hdr_common.hdr_vid = hdr->hdr_vid;
ack->hdr_vlen = vlen;
ack->hdr_vmask = mask;
ack->hdr_src_ptr = src_ptr;
ack->hdr_dst_ptr.pval = recvreq;
ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t));
@ -266,33 +190,31 @@ static void mca_pml_dr_recv_request_vfrag_ack(
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
if(rc != OMPI_SUCCESS) {
mca_bml_base_free(bml_btl, des);
} else {
recvreq->req_acked = true;
}
mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_common.hdr_vid);
}
}
/*
* Update the recv request status to reflect the number of bytes
* received and actually delivered to the application.
*/
void mca_pml_dr_recv_request_progress(
void mca_pml_dr_recv_request_progress(
mca_pml_dr_recv_request_t* recvreq,
mca_btl_base_module_t* btl,
mca_btl_base_segment_t* segments,
size_t num_segments)
{
{
size_t bytes_received = 0;
size_t bytes_delivered = 0;
size_t data_offset = 0;
mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval;
size_t i;
uint32_t csum = 1;
uint32_t csum = OPAL_CSUM_ZERO;
uint64_t bit;
mca_pml_dr_vfrag_t* vfrag;
mca_pml_dr_comm_proc_t* comm_proc;
comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs +
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
for(i=0; i<num_segments; i++)
bytes_received += segments[i].seg_len;
@ -301,9 +223,8 @@ static void mca_pml_dr_recv_request_vfrag_ack(
case MCA_PML_DR_HDR_TYPE_MATCH:
bytes_received -= sizeof(mca_pml_dr_match_hdr_t);
recvreq->req_recv.req_bytes_packed = bytes_received;
recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_ptr;
MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
MCA_PML_DR_RECV_REQUEST_BYTES_PACKED(recvreq, bytes_received);
MCA_PML_DR_RECV_REQUEST_UNPACK(
recvreq,
segments,
@ -313,15 +234,14 @@ static void mca_pml_dr_recv_request_vfrag_ack(
bytes_received,
bytes_delivered,
csum);
MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum);
MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum,bytes_received);
break;
case MCA_PML_DR_HDR_TYPE_RNDV:
bytes_received -= sizeof(mca_pml_dr_rendezvous_hdr_t);
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_ptr;
MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
MCA_PML_DR_RECV_REQUEST_BYTES_PACKED(recvreq, hdr->hdr_rndv.hdr_msg_length);
MCA_PML_DR_RECV_REQUEST_UNPACK(
recvreq,
segments,
@ -331,7 +251,7 @@ static void mca_pml_dr_recv_request_vfrag_ack(
bytes_received,
bytes_delivered,
csum);
MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum);
MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum,bytes_received);
break;
case MCA_PML_DR_HDR_TYPE_FRAG:
@ -348,7 +268,6 @@ static void mca_pml_dr_recv_request_vfrag_ack(
csum);
bit = ((uint64_t)1 << hdr->hdr_frag.hdr_frag_idx);
MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq, &hdr->hdr_frag, vfrag);
/* update the mask to show that this vfrag was received,
@ -363,9 +282,13 @@ static void mca_pml_dr_recv_request_vfrag_ack(
if((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
/* we have received all the pieces of the vfrag, ack
everything that passed the checksum */
mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag, &hdr->hdr_frag);
mca_pml_dr_comm_proc_set_vid(recvreq->req_proc, vfrag->vf_id);
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
hdr->hdr_frag.hdr_src_ptr, vfrag->vf_size, vfrag->vf_mask);
}
} else {
OPAL_OUTPUT((0, "%s:%d received corrupted fragment 0x%08x != 0x%08x\n",
__FILE__,__LINE__,csum,hdr->hdr_frag.hdr_frag_csum));
bytes_received = bytes_delivered = 0;
}
break;
@ -378,7 +301,7 @@ static void mca_pml_dr_recv_request_vfrag_ack(
OPAL_THREAD_LOCK(&ompi_request_lock);
recvreq->req_bytes_received += bytes_received;
recvreq->req_bytes_delivered += bytes_delivered;
if (recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed) {
if (recvreq->req_bytes_received == recvreq->req_recv.req_bytes_packed) {
MCA_PML_DR_RECV_REQUEST_PML_COMPLETE(recvreq);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
@ -425,10 +348,10 @@ void mca_pml_dr_recv_request_matched_probe(
if(ompi_request_waiting) {
opal_condition_broadcast(&ompi_request_cond);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* mark probe request completed */
MCA_PML_DR_RECV_REQUEST_PML_COMPLETE(recvreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
/*
@ -450,6 +373,10 @@ void mca_pml_dr_recv_request_match_specific(mca_pml_dr_recv_request_t* request)
if (opal_list_get_size(&proc->unexpected_frags) > 0 &&
(frag = mca_pml_dr_recv_request_match_specific_proc(request, proc)) != NULL) {
/* has the request already been matched */
if(frag->hdr.hdr_common.hdr_type == MCA_PML_DR_HDR_TYPE_MATCH)
request->req_acked = true;
MCA_PML_DR_RECV_REQUEST_MATCHED(request,comm,proc,&frag->hdr.hdr_match);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
@ -506,8 +433,11 @@ void mca_pml_dr_recv_request_match_wild(mca_pml_dr_recv_request_t* request)
/* loop over messages from the current proc */
if ((frag = mca_pml_dr_recv_request_match_specific_proc(request, proc)) != NULL) {
/* has the request already been matched */
if(frag->hdr.hdr_common.hdr_type == MCA_PML_DR_HDR_TYPE_MATCH)
request->req_acked = true;
MCA_PML_DR_RECV_REQUEST_MATCHED(request,comm,proc,&frag->hdr.hdr_match);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
mca_pml_dr_recv_request_progress(request,frag->btl,frag->segments,frag->num_segments);

Просмотреть файл

@ -26,8 +26,10 @@
#include "ompi/mca/pml/base/pml_base_recvreq.h"
#include "pml_dr.h"
#include "pml_dr_hdr.h"
#include "pml_dr_proc.h"
#include "pml_dr_vfrag.h"
#include "pml_dr_comm.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
@ -36,20 +38,19 @@ extern "C" {
struct mca_pml_dr_recv_request_t {
mca_pml_base_recv_request_t req_recv;
struct ompi_proc_t *req_proc;
#if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t req_lock;
#else
int32_t req_lock;
#endif
size_t req_pipeline_depth;
size_t req_bytes_received;
size_t req_bytes_delivered;
size_t req_bytes_received;
size_t req_bytes_delivered;
bool req_acked;
/* filled in after match */
struct mca_pml_dr_comm_proc_t* req_proc;
struct mca_bml_base_endpoint_t* req_endpoint;
opal_mutex_t* req_mutex;
/* vfrag state */
mca_pml_dr_vfrag_t *req_vfrag;
mca_pml_dr_vfrag_t req_vfrag0;
opal_list_t req_vfrags;
opal_mutex_t req_mutex;
};
typedef struct mca_pml_dr_recv_request_t mca_pml_dr_recv_request_t;
@ -125,12 +126,6 @@ do {
\
if( true == recvreq->req_recv.req_base.req_free_called ) { \
MCA_PML_DR_RECV_REQUEST_RETURN( recvreq ); \
} else { \
if(recvreq->req_recv.req_base.req_ompi.req_persistent) { \
if( !recvreq->req_recv.req_base.req_free_called ) { \
recvreq->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE; \
} \
} \
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} while(0)
@ -144,13 +139,12 @@ do {
#define MCA_PML_DR_RECV_REQUEST_RETURN(recvreq) \
do { \
opal_list_item_t* item; \
\
/* return vfrags */ \
OPAL_THREAD_LOCK(&(recvreq)->req_mutex); \
OPAL_THREAD_LOCK((recvreq)->req_mutex); \
opal_list_remove_item(&(recvreq)->req_proc->matched_receives, (opal_list_item_t*)(recvreq)); \
while(NULL != (item = opal_list_remove_first(&(recvreq)->req_vfrags))) { \
OMPI_FREE_LIST_RETURN(&mca_pml_dr.vfrags, item); \
} \
OPAL_THREAD_UNLOCK(&(recvreq)->req_mutex); \
OPAL_THREAD_UNLOCK((recvreq)->req_mutex); \
\
/* decrement reference counts */ \
MCA_PML_BASE_RECV_REQUEST_FINI(&(recvreq)->req_recv); \
@ -173,6 +167,17 @@ void mca_pml_dr_recv_request_match_wild(mca_pml_dr_recv_request_t* request);
*/
void mca_pml_dr_recv_request_match_specific(mca_pml_dr_recv_request_t* request);
/**
* Ack a matched request.
*/
void mca_pml_dr_recv_request_ack(
mca_pml_dr_recv_request_t* recvreq,
mca_pml_dr_common_hdr_t* hdr,
ompi_ptr_t src_ptr,
size_t vlen,
uint64_t vmask);
/**
* Start an initialized request.
*
@ -184,8 +189,7 @@ do {
/* init/re-init the request */ \
(request)->req_bytes_received = 0; \
(request)->req_bytes_delivered = 0; \
(request)->req_lock = 0; \
(request)->req_pipeline_depth = 0; \
(request)->req_acked = false; \
(request)->req_recv.req_base.req_pml_complete = false; \
(request)->req_recv.req_base.req_ompi.req_complete = false; \
(request)->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
@ -209,30 +213,36 @@ do {
/**
*
* Initialize request when match is made
*/
#define MCA_PML_DR_RECV_REQUEST_MATCHED( \
request, \
hdr) \
#define MCA_PML_DR_RECV_REQUEST_MATCHED(request,comm,proc,hdr) \
do { \
(request)->req_mutex = &comm->matching_lock; \
(request)->req_proc = proc; \
(request)->req_endpoint = (mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml; \
(request)->req_recv.req_base.req_ompi.req_status.MPI_TAG = (hdr)->hdr_tag; \
(request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = \
(hdr)->hdr_common.hdr_src; \
opal_list_append(&proc->matched_receives, (opal_list_item_t*)request); \
} while(0)
/**
* Setup convertor if message length is non-zero
*/
#define MCA_PML_DR_RECV_REQUEST_BYTES_PACKED(request, bytes_packed) \
do { \
(request)->req_recv.req_bytes_packed = bytes_packed; \
if((request)->req_recv.req_bytes_packed != 0) { \
ompi_proc_t *proc = \
ompi_comm_peer_lookup( \
(request)->req_recv.req_base.req_comm, (hdr)->hdr_common.hdr_src); \
\
(request)->req_proc = proc; \
ompi_proc_t *proc = (request)->req_proc->ompi_proc; \
ompi_convertor_copy_and_prepare_for_recv( proc->proc_convertor, \
(request)->req_recv.req_base.req_datatype, \
(request)->req_recv.req_base.req_count, \
(request)->req_recv.req_base.req_addr, \
CONVERTOR_WITH_CHECKSUM, \
&(request)->req_recv.req_convertor ); \
} else { \
(request)->req_proc = NULL; \
} \
} while (0)
@ -313,6 +323,26 @@ void mca_pml_dr_recv_request_matched_probe(
void mca_pml_dr_recv_request_schedule(
mca_pml_dr_recv_request_t* req);
/**
* Look for matched receive.
* Must be called w/ matching lock held.
*/
static inline struct mca_pml_dr_recv_request_t* mca_pml_dr_comm_proc_check_matched(
mca_pml_dr_comm_proc_t* dr_proc,
uint32_t vfrag_id)
{
opal_list_item_t* item;
for(item = opal_list_get_first(&dr_proc->matched_receives);
item != opal_list_get_end(&dr_proc->matched_receives);
item = opal_list_get_next(item)) {
struct mca_pml_dr_recv_request_t* recvreq = (struct mca_pml_dr_recv_request_t*)item;
if(recvreq->req_vfrag->vf_id == vfrag_id)
return recvreq;
}
return NULL;
}
/*
*
*/

Просмотреть файл

@ -187,7 +187,7 @@ static void mca_pml_dr_rndv_completion(
}
/* matched at peer? */
if(sendreq->req_matched) {
if(NULL != sendreq->req_vfrag0.vf_recv.pval) {
sendreq->req_bytes_delivered = vfrag->vf_size;
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
@ -335,9 +335,9 @@ int mca_pml_dr_send_request_start_buffered(
/* update lengths */
segment->seg_len = sizeof(mca_pml_dr_rendezvous_hdr_t) + max_data;
sendreq->req_send_offset = max_data;
sendreq->req_vfrag0.vf_size = max_data;
sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = true;
descriptor->des_cbfunc = mca_pml_dr_rndv_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -350,9 +350,10 @@ int mca_pml_dr_send_request_start_buffered(
return rc;
}
iov.iov_base = (void*)(((unsigned char*)sendreq->req_send.req_addr) + sendreq->req_send_offset);
iov.iov_len = max_data = sendreq->req_send.req_bytes_packed - sendreq->req_send_offset;
iov.iov_base = (void*)(((unsigned char*)sendreq->req_send.req_addr) + max_data);
iov.iov_len = sendreq->req_send.req_bytes_packed - max_data;
max_data = iov.iov_len;
if((rc = ompi_convertor_pack(
&sendreq->req_send.req_convertor,
&iov,
@ -463,10 +464,10 @@ int mca_pml_dr_send_request_start_copy(
/* update lengths */
segment->seg_len = sizeof(mca_pml_dr_match_hdr_t) + max_data;
sendreq->req_send_offset = max_data;
sendreq->req_vfrag0.vf_size = max_data;
sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = false;
/* short message */
descriptor->des_cbfunc = mca_pml_dr_match_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -537,9 +538,10 @@ int mca_pml_dr_send_request_start_prepare(
descriptor->des_cbdata = sendreq;
/* update lengths */
sendreq->req_send_offset = size;
sendreq->req_vfrag0.vf_size = size;
sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = false;
/* send */
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
@ -609,13 +611,13 @@ int mca_pml_dr_send_request_start_rndv(
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbdata = sendreq;
des->des_cbfunc = mca_pml_dr_rndv_completion;
sendreq->req_send_offset = size;
sendreq->req_vfrag0.vf_size = size;
sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_rndv = true;
/* send */
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
MCA_PML_DR_VFRAG_ACK_START(&sendreq->req_vfrag0);
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, des );
}
@ -721,6 +723,8 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
/* update state */
vfrag->vf_idx++;
vfrag->vf_rndv = false;
sendreq->req_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
@ -808,7 +812,8 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_src_ptr.pval = vfrag;
hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
vfrag->vf_rndv = false;
/* update state */
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
@ -873,13 +878,9 @@ void mca_pml_dr_send_request_match_ack(
sendreq->descriptor = NULL;
}
/* do NOT complete message until matched at peer */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
/* update statistics */
sendreq->req_matched = true;
sendreq->req_bytes_delivered = vfrag->vf_size;
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
/* update statistics */
sendreq->req_bytes_delivered = vfrag->vf_size;
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
@ -888,8 +889,8 @@ void mca_pml_dr_send_request_match_ack(
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
vfrag->vf_retrans = vfrag->vf_mask;
} else if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
sendreq->req_matched = true;
} else {
vfrag->vf_recv = ack->hdr_dst_ptr;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
@ -916,28 +917,25 @@ void mca_pml_dr_send_request_rndv_ack(
bool schedule = false;
MCA_PML_DR_VFRAG_ACK_STOP(vfrag);
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
/* got a NACK, resend eager data! */
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
} else {
} else {
/* return descriptor of first fragment */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
sendreq->descriptor = NULL;
}
/* matched at peer? */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
sendreq->req_matched = true;
sendreq->req_bytes_delivered = vfrag->vf_size;
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
} else {
vfrag->vf_recv = ack->hdr_dst_ptr;
schedule = true;
}
/* done? */
sendreq->req_bytes_delivered = ack->hdr_vlen;
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
} else {
vfrag->vf_recv = ack->hdr_dst_ptr;
sendreq->req_send_offset = ack->hdr_vlen;
schedule = true;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
@ -955,11 +953,7 @@ void mca_pml_dr_send_request_rndv_ack(
} else {
/* may need this to schedule rest of the message */
vfrag->vf_recv = ack->hdr_dst_ptr;
/* matched at peer? */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
sendreq->req_matched = true;
}
sendreq->req_send_offset = ack->hdr_vlen;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}

Просмотреть файл

@ -53,7 +53,6 @@ struct mca_pml_dr_send_request_t {
size_t req_pipeline_depth;
size_t req_bytes_delivered;
size_t req_send_offset;
bool req_matched;
mca_pml_dr_vfrag_t* req_vfrag;
mca_pml_dr_vfrag_t req_vfrag0;
@ -159,18 +158,14 @@ do {
sendreq->req_bytes_delivered = 0; \
sendreq->req_state = 0; \
sendreq->req_send_offset = 0; \
sendreq->req_matched = false; \
sendreq->req_send.req_base.req_pml_complete = false; \
sendreq->req_send.req_base.req_ompi.req_complete = false; \
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
sendreq->req_send.req_base.req_ompi.req_status._cancelled = 0; \
sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32(&proc->send_sequence,1); \
sendreq->req_endpoint = endpoint; \
MCA_PML_DR_VFRAG_INIT(&sendreq->req_vfrag0); \
sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
sendreq->req_vfrag0.vf_ack = 0; \
sendreq->req_vfrag0.vf_mask_processed = 0; \
sendreq->req_vfrag0.vf_retrans = 0; \
sendreq->req_vfrag0.vf_retry_cnt = 0; \
sendreq->req_vfrag = &sendreq->req_vfrag0; \
\
/* select a btl */ \
@ -219,9 +214,6 @@ do {
(sendreq)->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \
(sendreq)->req_send.req_base.req_ompi.req_status._count = \
(sendreq)->req_send.req_bytes_packed; \
if( (sendreq)->req_send.req_base.req_ompi.req_persistent ) { \
(sendreq)->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE; \
} \
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &((sendreq)->req_send.req_base.req_ompi) ); \
} while(0)
@ -362,7 +354,7 @@ do { \
orte_errmgr.abort(); \
} \
\
opal_output(0, "%s:%d:%s, retransmitting eager\n", __FILE__, __LINE__, __func__); \
OPAL_OUTPUT((0, "%s:%d:%s, retransmitting eager\n", __FILE__, __LINE__, __func__)); \
assert(sendreq->descriptor->des_src != NULL); \
MCA_PML_DR_VFRAG_RESET(vfrag); \
des_old = sendreq->descriptor; \
@ -400,6 +392,7 @@ do { \
MCA_PML_DR_VFRAG_RESET(vfrag); \
mca_bml_base_alloc(bml_btl, &des_new, \
sizeof(mca_pml_dr_rendezvous_hdr_t)); \
des_old = sendreq->descriptor; \
/* build hdr */ \
hdr = (mca_pml_dr_hdr_t*)des_new->des_src->seg_addr.pval; \
hdr->hdr_common.hdr_flags = 0; \
@ -414,12 +407,12 @@ do { \
hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id; \
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; \
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t)); \
des_new->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; \
des_new->des_cbdata = sendreq; \
des_new->des_cbfunc = mca_pml_dr_rndv_completion; \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
MCA_PML_DR_VFRAG_ACK_START(vfrag); \
mca_bml_base_send(bml_btl, des_new, MCA_BTL_TAG_PML); \
des_new->des_flags = des_old->des_flags; \
des_new->des_cbdata = des_old->des_cbdata; \
des_new->des_cbfunc = des_old->des_cbfunc; \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
MCA_PML_DR_VFRAG_ACK_START(vfrag); \
mca_bml_base_send(bml_btl, des_new, MCA_BTL_TAG_PML); \
} while(0)
/**

Просмотреть файл

@ -73,10 +73,7 @@ void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
opal_output(0, "%s:%d:%s, wdog retry count exceeded! FATAL", __FILE__, __LINE__, __func__);
orte_errmgr.abort();
}
vfrag->vf_idx = 0;
vfrag->vf_mask_processed = 0;
vfrag->vf_ack = 0;
vfrag->vf_retrans = 0;
MCA_PML_DR_VFRAG_RESET(vfrag);
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_schedule(sendreq);
@ -96,7 +93,12 @@ void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) {
if(0 == vfrag->vf_offset) { /* this is the first part of the message
that we need to resend */
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
if(vfrag->vf_rndv) {
MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag);
} else {
MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag);
}
} else {
MCA_PML_DR_VFRAG_RESET(vfrag);
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);

Просмотреть файл

@ -45,6 +45,7 @@ struct mca_pml_dr_vfrag_t {
uint64_t vf_mask;
uint64_t vf_mask_processed;
uint64_t vf_retrans;
bool vf_rndv;
struct mca_bml_base_btl_t* bml_btl;
/* we need a timer for the vfrag for:
@ -77,14 +78,20 @@ do { \
OMPI_FREE_LIST_RETURN(&mca_pml_dr.vfrags, (opal_list_item_t*)vfrag); \
} while(0)
#define MCA_PML_DR_VFRAG_INIT(vfrag) \
do { \
MCA_PML_DR_VFRAG_RESET(vfrag); \
(vfrag)->vf_retry_cnt = 0; \
(vfrag)->vf_recv.pval = NULL; \
} while(0)
#define MCA_PML_DR_VFRAG_RESET(vfrag) \
do { \
vfrag->vf_idx = 0; \
vfrag->vf_mask_processed = 0; \
vfrag->vf_ack = 0; \
vfrag->vf_retrans = 0; \
(vfrag)->vf_idx = 0; \
(vfrag)->vf_mask_processed = 0; \
(vfrag)->vf_ack = 0; \
(vfrag)->vf_retrans = 0; \
} while(0)
#if 1
#define MCA_PML_DR_VFRAG_WDOG_START(vfrag) \
do { \
@ -131,18 +138,6 @@ do { \
\
} while(0)
#endif
#if 0
#define MCA_PML_DR_VFRAG_WDOG_START(vfrag)
#define MCA_PML_DR_VFRAG_WDOG_RESET(vfrag)
#define MCA_PML_DR_VFRAG_WDOG_STOP(vfrag)
#define MCA_PML_DR_VFRAG_ACK_START(vfrag)
#define MCA_PML_DR_VFRAG_ACK_STOP(vfrag)
#endif
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif