- correction for sync send
- now passing all of the intel p2p list This commit was SVN r6543.
Этот коммит содержится в:
родитель
19d58ee17e
Коммит
0423d414ef
@ -77,13 +77,12 @@ void mca_pml_ob1_recv_frag_callback(
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)
|
||||
hdr->hdr_ack.hdr_src_req.pval;
|
||||
sendreq->req_state = MCA_PML_OB1_SR_ACKED;
|
||||
sendreq->req_recv = hdr->hdr_ack.hdr_dst_req;
|
||||
sendreq->req_rdma_offset = hdr->hdr_ack.hdr_rdma_offset;
|
||||
#if MCA_PML_OB1_TIMESTAMPS
|
||||
sendreq->t_send1 = get_profiler_timestamp();
|
||||
#endif
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq);
|
||||
break;
|
||||
}
|
||||
case MCA_PML_OB1_HDR_TYPE_FRAG:
|
||||
|
@ -184,10 +184,18 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
&recvreq->req_recv.req_convertor,
|
||||
&recvreq->req_rdma_offset);
|
||||
ack->hdr_rdma_offset = recvreq->req_rdma_offset;
|
||||
if(recvreq->req_rdma_offset < hdr->hdr_frag_length) {
|
||||
opal_output(0, "[%s:%d] rdma offset %lu frag length %lu\n",
|
||||
recvreq->req_rdma_offset, hdr->hdr_frag_length);
|
||||
ack->hdr_rdma_offset = hdr->hdr_frag_length;
|
||||
recvreq->req_rdma_offset = hdr->hdr_frag_length;
|
||||
}
|
||||
} else {
|
||||
recvreq->req_rdma_offset = recvreq->req_recv.req_bytes_packed;
|
||||
ack->hdr_rdma_offset = recvreq->req_recv.req_bytes_packed;
|
||||
}
|
||||
|
||||
/* start rdma at the current fragment offset */
|
||||
} else {
|
||||
recvreq->req_rdma_offset = hdr->hdr_frag_length;
|
||||
ack->hdr_rdma_offset = hdr->hdr_frag_length;
|
||||
@ -196,8 +204,8 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
|
||||
/* zero byte message */
|
||||
else {
|
||||
recvreq->req_rdma_offset = hdr->hdr_frag_length;
|
||||
ack->hdr_rdma_offset = hdr->hdr_frag_length;
|
||||
recvreq->req_rdma_offset = 0;
|
||||
ack->hdr_rdma_offset = 0;
|
||||
}
|
||||
|
||||
/* initialize descriptor */
|
||||
|
@ -75,7 +75,7 @@ OBJ_CLASS_INSTANCE(
|
||||
* Completion of a short message - nothing left to schedule.
|
||||
*/
|
||||
|
||||
static void mca_pml_ob1_short_completion(
|
||||
static void mca_pml_ob1_match_completion(
|
||||
mca_btl_base_module_t* btl,
|
||||
struct mca_btl_base_endpoint_t* ep,
|
||||
struct mca_btl_base_descriptor_t* descriptor,
|
||||
@ -101,12 +101,11 @@ static void mca_pml_ob1_short_completion(
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Completion of a long or synchronous message - may need to schedule
|
||||
* additional fragments.
|
||||
/*
|
||||
* Completion of the first fragment of a long message that
|
||||
* requires an acknowledgement
|
||||
*/
|
||||
|
||||
static void mca_pml_ob1_send_completion(
|
||||
static void mca_pml_ob1_rndv_completion(
|
||||
mca_btl_base_module_t* btl,
|
||||
struct mca_btl_base_endpoint_t* ep,
|
||||
struct mca_btl_base_descriptor_t* descriptor,
|
||||
@ -114,8 +113,6 @@ static void mca_pml_ob1_send_completion(
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
|
||||
mca_pml_ob1_endpoint_t* btl_ep = sendreq->req_endpoint;
|
||||
mca_btl_base_segment_t* segments = descriptor->des_src;
|
||||
size_t i;
|
||||
|
||||
/* check completion status */
|
||||
if(OMPI_SUCCESS != status) {
|
||||
@ -125,25 +122,7 @@ static void mca_pml_ob1_send_completion(
|
||||
}
|
||||
|
||||
/* count bytes of user data actually delivered */
|
||||
for(i=0; i<descriptor->des_src_cnt; i++) {
|
||||
sendreq->req_bytes_delivered += segments[i].seg_len;
|
||||
}
|
||||
|
||||
/* adjust for message header */
|
||||
switch(((mca_pml_ob1_common_hdr_t*)segments->seg_addr.pval)->hdr_type) {
|
||||
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
||||
sendreq->req_bytes_delivered -= sizeof(mca_pml_ob1_match_hdr_t);
|
||||
break;
|
||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||
sendreq->req_bytes_delivered -= sizeof(mca_pml_ob1_rendezvous_hdr_t);
|
||||
break;
|
||||
case MCA_PML_OB1_HDR_TYPE_FRAG:
|
||||
sendreq->req_bytes_delivered -= sizeof(mca_pml_ob1_frag_hdr_t);
|
||||
break;
|
||||
default:
|
||||
opal_output(0, "mca_pml_ob1_send_completion: invalid header type\n");
|
||||
break;
|
||||
}
|
||||
MCA_PML_OB1_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,descriptor);
|
||||
|
||||
#if MCA_PML_OB1_TIMESTAMPS
|
||||
if(sendreq->req_pipeline_depth == 1) {
|
||||
@ -151,31 +130,74 @@ static void mca_pml_ob1_send_completion(
|
||||
}
|
||||
#endif
|
||||
|
||||
/* check for request completion */
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
if (OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,-1) == 0 &&
|
||||
sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
|
||||
MCA_PML_OB1_SEND_REQUEST_COMPLETE(sendreq);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
/* return the descriptor */
|
||||
btl_ep->btl_free(btl_ep->btl, descriptor);
|
||||
|
||||
/* update pipeline depth */
|
||||
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,-1);
|
||||
|
||||
/* advance the request */
|
||||
MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Completion of additional fragments of a large message - may need
|
||||
* to schedule additional fragments.
|
||||
*/
|
||||
|
||||
static void mca_pml_ob1_frag_completion(
|
||||
mca_btl_base_module_t* btl,
|
||||
struct mca_btl_base_endpoint_t* ep,
|
||||
struct mca_btl_base_descriptor_t* descriptor,
|
||||
int status)
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
|
||||
mca_pml_ob1_endpoint_t* btl_ep = sendreq->req_endpoint;
|
||||
bool schedule;
|
||||
|
||||
/* check completion status */
|
||||
if(OMPI_SUCCESS != status) {
|
||||
/* TSW - FIX */
|
||||
opal_output(0, "%s:%d FATAL", __FILE__, __LINE__);
|
||||
orte_errmgr.abort();
|
||||
}
|
||||
|
||||
/* count bytes of user data actually delivered */
|
||||
MCA_PML_OB1_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,descriptor);
|
||||
|
||||
#if MCA_PML_OB1_TIMESTAMPS
|
||||
if(sendreq->req_pipeline_depth == 1) {
|
||||
sendreq->t_send2 = get_profiler_timestamp();
|
||||
}
|
||||
#endif
|
||||
|
||||
/* return the descriptor */
|
||||
btl_ep->btl_free(btl_ep->btl, descriptor);
|
||||
|
||||
/* check for request completion */
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
if (OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth,-1) == 0 &&
|
||||
sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
|
||||
MCA_PML_OB1_SEND_REQUEST_COMPLETE(sendreq);
|
||||
schedule = false;
|
||||
} else {
|
||||
schedule = true;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
if(schedule) {
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
}
|
||||
|
||||
/* advance pending requests */
|
||||
while(NULL != sendreq) {
|
||||
switch(sendreq->req_state) {
|
||||
case MCA_PML_OB1_SR_ACKED:
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
while(opal_list_get_size(&mca_pml_ob1.send_pending)) {
|
||||
OPAL_THREAD_LOCK(&mca_pml_ob1.ob1_lock);
|
||||
sendreq = (mca_pml_ob1_send_request_t*)opal_list_remove_first(&mca_pml_ob1.send_pending);
|
||||
OPAL_THREAD_UNLOCK(&mca_pml_ob1.ob1_lock);
|
||||
}
|
||||
if(NULL == sendreq)
|
||||
break;
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -216,7 +238,7 @@ int mca_pml_ob1_send_request_start(
|
||||
hdr->hdr_match.hdr_msg_seq = sendreq->req_send.req_base.req_sequence;
|
||||
|
||||
/* short message */
|
||||
descriptor->des_cbfunc = mca_pml_ob1_short_completion;
|
||||
descriptor->des_cbfunc = mca_pml_ob1_match_completion;
|
||||
|
||||
/* request is complete at mpi level */
|
||||
ompi_request_complete((ompi_request_t*)sendreq);
|
||||
@ -279,7 +301,7 @@ int mca_pml_ob1_send_request_start(
|
||||
sendreq->req_rdma_offset = max_data;
|
||||
|
||||
/* short message */
|
||||
descriptor->des_cbfunc = mca_pml_ob1_short_completion;
|
||||
descriptor->des_cbfunc = mca_pml_ob1_match_completion;
|
||||
|
||||
/* request is complete at mpi level */
|
||||
ompi_request_complete((ompi_request_t*)sendreq);
|
||||
@ -317,6 +339,9 @@ int mca_pml_ob1_send_request_start(
|
||||
endpoint->btl_free(endpoint->btl, descriptor);
|
||||
return rc;
|
||||
}
|
||||
if(max_data != size) {
|
||||
opal_output(0, "[%s:%d] max_data (%lu) != size (%lu)\n", __FILE__,__LINE__,max_data,size);
|
||||
}
|
||||
}
|
||||
/* if the buffer is pinned or leave pinned is true we do not eagerly send
|
||||
any data */
|
||||
@ -341,8 +366,8 @@ int mca_pml_ob1_send_request_start(
|
||||
segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t) + max_data;
|
||||
sendreq->req_send_offset = max_data;
|
||||
|
||||
/* long message */
|
||||
descriptor->des_cbfunc = mca_pml_ob1_send_completion;
|
||||
/* first fragment of a long message */
|
||||
descriptor->des_cbfunc = mca_pml_ob1_rndv_completion;
|
||||
}
|
||||
}
|
||||
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
|
||||
@ -429,7 +454,7 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
|
||||
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
break;
|
||||
}
|
||||
des->des_cbfunc = mca_pml_ob1_send_completion;
|
||||
des->des_cbfunc = mca_pml_ob1_frag_completion;
|
||||
des->des_cbdata = sendreq;
|
||||
|
||||
/* setup header */
|
||||
|
@ -31,19 +31,12 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef enum {
|
||||
MCA_PML_OB1_SR_INIT,
|
||||
MCA_PML_OB1_SR_START,
|
||||
MCA_PML_OB1_SR_ACKED,
|
||||
MCA_PML_OB1_SR_COMPLETE
|
||||
} mca_pml_ob1_send_request_state_t;
|
||||
|
||||
|
||||
struct mca_pml_ob1_send_request_t {
|
||||
mca_pml_base_send_request_t req_send;
|
||||
mca_pml_ob1_proc_t* req_proc;
|
||||
mca_pml_ob1_endpoint_t* req_endpoint;
|
||||
mca_pml_ob1_send_request_state_t req_state;
|
||||
volatile int32_t req_state;
|
||||
struct mca_mpool_base_chunk_t* req_chunk;
|
||||
ompi_ptr_t req_recv;
|
||||
int32_t req_lock;
|
||||
@ -102,7 +95,6 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
sendmode, \
|
||||
persistent) \
|
||||
{ \
|
||||
sendreq->req_state = MCA_PML_OB1_SR_INIT; \
|
||||
MCA_PML_BASE_SEND_REQUEST_INIT(&sendreq->req_send, \
|
||||
buf, \
|
||||
count, \
|
||||
@ -179,8 +171,8 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
sendreq->req_pipeline_depth = 0; \
|
||||
sendreq->req_bytes_delivered = 0; \
|
||||
sendreq->req_chunk = NULL; \
|
||||
sendreq->req_state = 0; \
|
||||
sendreq->req_send_offset = 0; \
|
||||
sendreq->req_state = MCA_PML_OB1_SR_START; \
|
||||
sendreq->req_send.req_base.req_pml_complete = false; \
|
||||
sendreq->req_send.req_base.req_ompi.req_complete = false; \
|
||||
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
|
||||
@ -211,7 +203,6 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
(sendreq)->req_send.req_base.req_ompi.req_status._count = \
|
||||
(sendreq)->req_send.req_bytes_packed; \
|
||||
(sendreq)->req_send.req_base.req_ompi.req_complete = true; \
|
||||
(sendreq)->req_state = MCA_PML_OB1_SR_COMPLETE; \
|
||||
MCA_PML_OB1_SEND_REQUEST_TSTAMPS_DUMP(sendreq); \
|
||||
if(ompi_request_waiting) { \
|
||||
opal_condition_broadcast(&ompi_request_cond); \
|
||||
@ -220,14 +211,40 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
MCA_PML_OB1_FREE((ompi_request_t**)&sendreq); \
|
||||
} else if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \
|
||||
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
|
||||
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE; \
|
||||
} \
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Advance a request
|
||||
* Advance a pending send request. Note that the initial descriptor must complete
|
||||
* and the acknowledment received before the request can complete or be scheduled.
|
||||
* However, these events may occur in either order.
|
||||
*/
|
||||
|
||||
#define MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq) \
|
||||
do { \
|
||||
bool schedule = false; \
|
||||
\
|
||||
/* has an acknowledgment been received */ \
|
||||
if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2) { \
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock); \
|
||||
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { \
|
||||
MCA_PML_OB1_SEND_REQUEST_COMPLETE(sendreq); \
|
||||
} else { \
|
||||
schedule = true; \
|
||||
} \
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
|
||||
} \
|
||||
\
|
||||
/* additional data to schedule */ \
|
||||
if(schedule == true) { \
|
||||
mca_pml_ob1_send_request_schedule(sendreq); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
/*
|
||||
* Release resources associated with a request
|
||||
*/
|
||||
|
||||
#define MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq) \
|
||||
{ \
|
||||
@ -248,6 +265,38 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
&mca_pml_ob1.send_requests, (opal_list_item_t*)sendreq); \
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Update bytes delivered on request based on supplied descriptor
|
||||
*/
|
||||
|
||||
#define MCA_PML_OB1_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, descriptor) \
|
||||
do { \
|
||||
size_t i; \
|
||||
mca_btl_base_segment_t* segments = descriptor->des_src; \
|
||||
\
|
||||
for(i=0; i<descriptor->des_src_cnt; i++) { \
|
||||
sendreq->req_bytes_delivered += segments[i].seg_len; \
|
||||
} \
|
||||
\
|
||||
/* adjust for message header */ \
|
||||
switch(((mca_pml_ob1_common_hdr_t*)segments->seg_addr.pval)->hdr_type) { \
|
||||
case MCA_PML_OB1_HDR_TYPE_MATCH: \
|
||||
sendreq->req_bytes_delivered -= sizeof(mca_pml_ob1_match_hdr_t); \
|
||||
break; \
|
||||
case MCA_PML_OB1_HDR_TYPE_RNDV: \
|
||||
sendreq->req_bytes_delivered -= sizeof(mca_pml_ob1_rendezvous_hdr_t); \
|
||||
break; \
|
||||
case MCA_PML_OB1_HDR_TYPE_FRAG: \
|
||||
sendreq->req_bytes_delivered -= sizeof(mca_pml_ob1_frag_hdr_t); \
|
||||
break; \
|
||||
default: \
|
||||
opal_output(0, "[%s:%d]: invalid header type\n", __FILE__,__LINE__); \
|
||||
break; \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user