Remove the macro to compute the length of the segments from the send header
and add a new macro that can be used for both sends and receives. Move to atomic operations to manage the length of the sended or received status. There is one instance where the atomic operation is not required as the code can cannot be executed in same time by 2 differents threads. This commit was SVN r8933.
Этот коммит содержится в:
родитель
9f1357fb89
Коммит
e9706e6db0
@ -27,6 +27,18 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Compute the total number of bytes on supplied descriptor
|
||||
*/
|
||||
#define MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH(segments, count, hdrlen, length) \
|
||||
do { \
|
||||
size_t i; \
|
||||
\
|
||||
for( i = 0; i < count; i++ ) { \
|
||||
length += segments[i].seg_len; \
|
||||
} \
|
||||
length -= hdrlen; \
|
||||
} while(0)
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
|
@ -135,30 +135,24 @@ static void mca_pml_ob1_put_completion(
|
||||
mca_pml_ob1_recv_request_t* recvreq = (mca_pml_ob1_recv_request_t*)des->des_cbdata;
|
||||
mca_btl_base_segment_t* segments = des->des_dst;
|
||||
size_t i, bytes_received = 0;
|
||||
bool schedule = false;
|
||||
|
||||
for(i=0; i<des->des_dst_cnt; i++)
|
||||
bytes_received += segments[i].seg_len;
|
||||
|
||||
MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( des->des_dst, des->des_dst_cnt,
|
||||
0, bytes_received );
|
||||
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_pipeline_depth,-1);
|
||||
mca_bml_base_free(bml_btl, des);
|
||||
|
||||
/* check completion status */
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
recvreq->req_bytes_received += bytes_received;
|
||||
recvreq->req_bytes_delivered += bytes_received;
|
||||
if (recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed) {
|
||||
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received)
|
||||
>= recvreq->req_recv.req_bytes_packed ) {
|
||||
/* initialize request status */
|
||||
recvreq->req_recv.req_base.req_ompi.req_status._count =
|
||||
recvreq->req_bytes_delivered;
|
||||
recvreq->req_recv.req_base.req_pml_complete = true;
|
||||
recvreq->req_recv.req_base.req_ompi.req_status._count =
|
||||
(recvreq->req_bytes_received < recvreq->req_bytes_delivered ?
|
||||
recvreq->req_bytes_received : recvreq->req_bytes_delivered);
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) );
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
} else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) {
|
||||
schedule = true;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
if( true == schedule ) {
|
||||
/* schedule additional rdma operations */
|
||||
mca_pml_ob1_recv_request_schedule(recvreq);
|
||||
}
|
||||
@ -301,17 +295,18 @@ static void mca_pml_ob1_rget_completion(
|
||||
mca_pml_ob1_fin_hdr_t* hdr;
|
||||
mca_btl_base_descriptor_t *fin;
|
||||
int rc;
|
||||
|
||||
|
||||
/* is receive request complete */
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
recvreq->req_bytes_received += frag->rdma_length;
|
||||
recvreq->req_bytes_delivered += frag->rdma_length;
|
||||
if(recvreq->req_bytes_received == recvreq->req_recv.req_bytes_packed) {
|
||||
recvreq->req_recv.req_base.req_ompi.req_status._count = recvreq->req_bytes_delivered;
|
||||
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, frag->rdma_length)
|
||||
== recvreq->req_recv.req_bytes_packed ) {
|
||||
recvreq->req_recv.req_base.req_ompi.req_status._count =
|
||||
(recvreq->req_bytes_received < recvreq->req_bytes_delivered ?
|
||||
recvreq->req_bytes_received : recvreq->req_bytes_delivered);
|
||||
recvreq->req_recv.req_base.req_pml_complete = true;
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) );
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
/* return descriptor */
|
||||
mca_bml_base_free(bml_btl, des);
|
||||
@ -436,16 +431,15 @@ void mca_pml_ob1_recv_request_progress(
|
||||
size_t data_offset = 0;
|
||||
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
||||
size_t i;
|
||||
bool schedule = false;
|
||||
|
||||
for(i=0; i<num_segments; i++)
|
||||
bytes_received += segments[i].seg_len;
|
||||
|
||||
MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( segments, num_segments,
|
||||
0, bytes_received );
|
||||
switch(hdr->hdr_common.hdr_type) {
|
||||
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
||||
|
||||
bytes_received -= sizeof(mca_pml_ob1_match_hdr_t);
|
||||
recvreq->req_recv.req_bytes_packed = bytes_received;
|
||||
recvreq->req_bytes_delivered = bytes_received;
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
|
||||
MCA_PML_OB1_RECV_REQUEST_UNPACK(
|
||||
recvreq,
|
||||
@ -461,6 +455,7 @@ void mca_pml_ob1_recv_request_progress(
|
||||
|
||||
bytes_received -= sizeof(mca_pml_ob1_rendezvous_hdr_t);
|
||||
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
|
||||
recvreq->req_bytes_delivered = hdr->hdr_rndv.hdr_msg_length;
|
||||
recvreq->req_send = hdr->hdr_rndv.hdr_src_req;
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
|
||||
mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, bytes_received);
|
||||
@ -477,6 +472,7 @@ void mca_pml_ob1_recv_request_progress(
|
||||
case MCA_PML_OB1_HDR_TYPE_RGET:
|
||||
|
||||
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
|
||||
recvreq->req_bytes_delivered = hdr->hdr_rndv.hdr_msg_length;
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
|
||||
mca_pml_ob1_recv_request_rget(recvreq, btl, &hdr->hdr_rget);
|
||||
return;
|
||||
@ -500,21 +496,17 @@ void mca_pml_ob1_recv_request_progress(
|
||||
}
|
||||
|
||||
/* check completion status */
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
recvreq->req_bytes_received += bytes_received;
|
||||
recvreq->req_bytes_delivered += bytes_delivered;
|
||||
if (recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed) {
|
||||
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received)
|
||||
>= recvreq->req_recv.req_bytes_packed ) {
|
||||
/* initialize request status */
|
||||
recvreq->req_recv.req_base.req_ompi.req_status._count =
|
||||
recvreq->req_bytes_delivered;
|
||||
(recvreq->req_bytes_received < recvreq->req_bytes_delivered ?
|
||||
recvreq->req_bytes_received : recvreq->req_bytes_delivered);
|
||||
recvreq->req_recv.req_base.req_pml_complete = true;
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) );
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
} else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) {
|
||||
schedule = true;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
if( true == schedule ) {
|
||||
/* schedule additional rdma operations */
|
||||
mca_pml_ob1_recv_request_schedule(recvreq);
|
||||
}
|
||||
@ -538,9 +530,9 @@ void mca_pml_ob1_recv_request_matched_probe(
|
||||
switch(hdr->hdr_common.hdr_type) {
|
||||
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
||||
|
||||
for(i=0; i<num_segments; i++)
|
||||
bytes_packed += segments[i].seg_len;
|
||||
bytes_packed -= sizeof(mca_pml_ob1_match_hdr_t);
|
||||
MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( segments, num_segments,
|
||||
sizeof(mca_pml_ob1_match_hdr_t),
|
||||
bytes_packed );
|
||||
break;
|
||||
|
||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||
|
@ -161,6 +161,7 @@ static void mca_pml_ob1_rndv_completion(
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
|
||||
mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*) descriptor->des_context;
|
||||
|
||||
/* check completion status */
|
||||
if(OMPI_SUCCESS != status) {
|
||||
/* TSW - FIX */
|
||||
@ -168,10 +169,14 @@ static void mca_pml_ob1_rndv_completion(
|
||||
orte_errmgr.abort();
|
||||
}
|
||||
|
||||
/* count bytes of user data actually delivered */
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
MCA_PML_OB1_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,descriptor,sizeof(mca_pml_ob1_rendezvous_hdr_t));
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
/* count bytes of user data actually delivered. As the rndv completion only
|
||||
* happens in one thread, the increase of the req_bytes_delivered does not
|
||||
* have to be atomic.
|
||||
*/
|
||||
MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( descriptor->des_src,
|
||||
descriptor->des_src_cnt,
|
||||
sizeof(mca_pml_ob1_rendezvous_hdr_t),
|
||||
sendreq->req_bytes_delivered );
|
||||
|
||||
/* return the descriptor */
|
||||
mca_bml_base_free(bml_btl, descriptor);
|
||||
@ -195,14 +200,17 @@ static void mca_pml_ob1_rget_completion(
|
||||
int status)
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)des->des_cbdata;
|
||||
size_t req_bytes_delivered = 0;
|
||||
|
||||
/* count bytes of user data actually delivered and check for request completion */
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
MCA_PML_OB1_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,des,0);
|
||||
if (sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
|
||||
MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( des->des_src, des->des_src_cnt,
|
||||
0, req_bytes_delivered );
|
||||
if( OPAL_THREAD_ADD_SIZE_T( &sendreq->req_bytes_delivered, req_bytes_delivered )
|
||||
== sendreq->req_send.req_bytes_packed ) {
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
/* release resources */
|
||||
btl->btl_free(btl,des);
|
||||
@ -237,7 +245,7 @@ static void mca_pml_ob1_frag_completion(
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
|
||||
mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*) descriptor->des_context;
|
||||
bool schedule;
|
||||
size_t req_bytes_delivered = 0;
|
||||
|
||||
/* check completion status */
|
||||
if(OMPI_SUCCESS != status) {
|
||||
@ -246,20 +254,19 @@ static void mca_pml_ob1_frag_completion(
|
||||
orte_errmgr.abort();
|
||||
}
|
||||
|
||||
/* check for request completion */
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
|
||||
/* count bytes of user data actually delivered */
|
||||
MCA_PML_OB1_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,descriptor,sizeof(mca_pml_ob1_frag_hdr_t));
|
||||
MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( descriptor->des_src,
|
||||
descriptor->des_src_cnt,
|
||||
sizeof(mca_pml_ob1_frag_hdr_t),
|
||||
req_bytes_delivered );
|
||||
req_bytes_delivered = OPAL_THREAD_ADD_SIZE_T( &sendreq->req_bytes_delivered,
|
||||
req_bytes_delivered );
|
||||
if (OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1) == 0 &&
|
||||
sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
|
||||
req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
schedule = false;
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
} else {
|
||||
schedule = true;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
if(schedule) {
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
}
|
||||
|
||||
|
@ -307,23 +307,7 @@ do {
|
||||
OMPI_FREE_LIST_RETURN( \
|
||||
&mca_pml_ob1.send_requests, (opal_list_item_t*)sendreq); \
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Update bytes delivered on request based on supplied descriptor
|
||||
*/
|
||||
|
||||
#define MCA_PML_OB1_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, descriptor, hdrlen) \
|
||||
do { \
|
||||
size_t i, req_bytes_delivered = 0; \
|
||||
mca_btl_base_segment_t* segments = descriptor->des_src; \
|
||||
\
|
||||
for(i=0; i<descriptor->des_src_cnt; i++) { \
|
||||
req_bytes_delivered += segments[i].seg_len; \
|
||||
} \
|
||||
sendreq->req_bytes_delivered += (req_bytes_delivered - hdrlen); \
|
||||
} while(0)
|
||||
|
||||
/*
|
||||
* Attempt to process any pending requests
|
||||
*/
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user