1
1

Implement new function mca_pml_ob1_send_requst_copy_in_out(req, offset, len)

that allows to send any range of a request by send/recv instaed of RDMA
and use it to send data from the end of a request in pipeline protocol. 

This commit was SVN r14841.
Этот коммит содержится в:
Gleb Natapov 2007-06-03 08:30:07 +00:00
родитель 60571567a4
Коммит a25e1e7b15
9 изменённых файлов: 125 добавлений и 75 удалений

Просмотреть файл

@ -339,13 +339,13 @@ void mca_pml_ob1_process_pending_packets(mca_bml_base_btl_t* bml_btl)
send_dst, send_dst,
pckt->hdr.hdr_ack.hdr_src_req.lval, pckt->hdr.hdr_ack.hdr_src_req.lval,
pckt->hdr.hdr_ack.hdr_dst_req.pval, pckt->hdr.hdr_ack.hdr_dst_req.pval,
pckt->hdr.hdr_ack.hdr_rdma_offset); pckt->hdr.hdr_ack.hdr_send_offset);
MCA_PML_OB1_PCKT_PENDING_RETURN(pckt); MCA_PML_OB1_PCKT_PENDING_RETURN(pckt);
if(OMPI_ERR_OUT_OF_RESOURCE == rc) { if(OMPI_ERR_OUT_OF_RESOURCE == rc) {
MCA_PML_OB1_ADD_ACK_TO_PENDING(pckt->proc, MCA_PML_OB1_ADD_ACK_TO_PENDING(pckt->proc,
pckt->hdr.hdr_ack.hdr_src_req.lval, pckt->hdr.hdr_ack.hdr_src_req.lval,
pckt->hdr.hdr_ack.hdr_dst_req.pval, pckt->hdr.hdr_ack.hdr_dst_req.pval,
pckt->hdr.hdr_ack.hdr_rdma_offset); pckt->hdr.hdr_ack.hdr_send_offset);
return; return;
} }
break; break;

Просмотреть файл

@ -67,6 +67,7 @@ struct mca_pml_ob1_t {
ompi_free_list_t recv_frags; ompi_free_list_t recv_frags;
ompi_free_list_t pending_pckts; ompi_free_list_t pending_pckts;
ompi_free_list_t buffers; ompi_free_list_t buffers;
ompi_free_list_t send_ranges;
/* list of pending operations */ /* list of pending operations */
opal_list_t pckt_pending; opal_list_t pckt_pending;

Просмотреть файл

@ -197,7 +197,16 @@ int mca_pml_ob1_component_open(void)
OBJ_CONSTRUCT(&mca_pml_ob1.buffers, ompi_free_list_t); OBJ_CONSTRUCT(&mca_pml_ob1.buffers, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_pml_ob1.send_ranges, ompi_free_list_t);
ompi_free_list_init(
&mca_pml_ob1.send_ranges,
sizeof(mca_pml_ob1_send_range_t),
OBJ_CLASS(mca_pml_ob1_send_range_t),
mca_pml_ob1.free_list_num,
mca_pml_ob1.free_list_max,
mca_pml_ob1.free_list_inc,
NULL);
/* pending operations */ /* pending operations */
OBJ_CONSTRUCT(&mca_pml_ob1.send_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_ob1.send_pending, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_ob1.recv_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_ob1.recv_pending, opal_list_t);

Просмотреть файл

@ -172,7 +172,7 @@ struct mca_pml_ob1_ack_hdr_t {
#endif #endif
ompi_ptr_t hdr_src_req; /**< source request */ ompi_ptr_t hdr_src_req; /**< source request */
ompi_ptr_t hdr_dst_req; /**< matched receive request */ ompi_ptr_t hdr_dst_req; /**< matched receive request */
uint64_t hdr_rdma_offset; /**< starting point rdma protocol */ uint64_t hdr_send_offset; /**< starting point of copy in/out */
}; };
typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t; typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t;
@ -183,13 +183,13 @@ typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t;
#define MCA_PML_OB1_ACK_HDR_NTOH(h) \ #define MCA_PML_OB1_ACK_HDR_NTOH(h) \
do { \ do { \
MCA_PML_OB1_COMMON_HDR_NTOH(h.hdr_common); \ MCA_PML_OB1_COMMON_HDR_NTOH(h.hdr_common); \
(h).hdr_rdma_offset = ntoh64((h).hdr_rdma_offset); \ (h).hdr_send_offset = ntoh64((h).hdr_send_offset); \
} while (0) } while (0)
#define MCA_PML_OB1_ACK_HDR_HTON(h) \ #define MCA_PML_OB1_ACK_HDR_HTON(h) \
do { \ do { \
MCA_PML_OB1_COMMON_HDR_HTON((h).hdr_common); \ MCA_PML_OB1_COMMON_HDR_HTON((h).hdr_common); \
(h).hdr_rdma_offset = hton64((h).hdr_rdma_offset); \ (h).hdr_send_offset = hton64((h).hdr_send_offset); \
} while (0) } while (0)
/** /**

Просмотреть файл

@ -128,7 +128,16 @@ void mca_pml_ob1_recv_frag_callback(
#endif #endif
sendreq = (mca_pml_ob1_send_request_t*)hdr->hdr_ack.hdr_src_req.pval; sendreq = (mca_pml_ob1_send_request_t*)hdr->hdr_ack.hdr_src_req.pval;
sendreq->req_recv = hdr->hdr_ack.hdr_dst_req; sendreq->req_recv = hdr->hdr_ack.hdr_dst_req;
sendreq->req_rdma_offset = (size_t)hdr->hdr_ack.hdr_rdma_offset;
/* if the request should be delivered entirely by copy in/out
* then throttle sends */
if(sendreq->req_bytes_delivered == hdr->hdr_ack.hdr_send_offset)
sendreq->req_throttle_sends = true;
mca_pml_ob1_send_requst_copy_in_out(sendreq,
hdr->hdr_ack.hdr_send_offset,
sendreq->req_send.req_bytes_packed -
hdr->hdr_ack.hdr_send_offset);
if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2 && if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2 &&
sendreq->req_bytes_delivered >= sendreq->req_bytes_delivered >=
sendreq->req_send.req_bytes_packed) { sendreq->req_send.req_bytes_packed) {

Просмотреть файл

@ -166,7 +166,7 @@ static void mca_pml_ob1_put_completion( mca_btl_base_module_t* btl,
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received) if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received)
>= recvreq->req_recv.req_bytes_packed ) { >= recvreq->req_recv.req_bytes_packed ) {
MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq ); MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq );
} else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { } else if (recvreq->req_rdma_offset < recvreq->req_send_offset) {
/* schedule additional rdma operations */ /* schedule additional rdma operations */
mca_pml_ob1_recv_request_schedule(recvreq); mca_pml_ob1_recv_request_schedule(recvreq);
} }
@ -179,7 +179,7 @@ static void mca_pml_ob1_put_completion( mca_btl_base_module_t* btl,
int mca_pml_ob1_recv_request_ack_send_btl( int mca_pml_ob1_recv_request_ack_send_btl(
ompi_proc_t* proc, mca_bml_base_btl_t* bml_btl, ompi_proc_t* proc, mca_bml_base_btl_t* bml_btl,
uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_rdma_offset) uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_send_offset)
{ {
mca_btl_base_descriptor_t* des; mca_btl_base_descriptor_t* des;
mca_pml_ob1_ack_hdr_t* ack; mca_pml_ob1_ack_hdr_t* ack;
@ -197,7 +197,7 @@ int mca_pml_ob1_recv_request_ack_send_btl(
ack->hdr_common.hdr_flags = 0; ack->hdr_common.hdr_flags = 0;
ack->hdr_src_req.lval = hdr_src_req; ack->hdr_src_req.lval = hdr_src_req;
ack->hdr_dst_req.pval = hdr_dst_req; ack->hdr_dst_req.pval = hdr_dst_req;
ack->hdr_rdma_offset = hdr_rdma_offset; ack->hdr_send_offset = hdr_send_offset;
#if OMPI_ENABLE_HETEROGENEOUS_SUPPORT #if OMPI_ENABLE_HETEROGENEOUS_SUPPORT
#ifdef WORDS_BIGENDIAN #ifdef WORDS_BIGENDIAN
@ -235,11 +235,9 @@ static int mca_pml_ob1_recv_request_ack(
bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_bml; bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_bml;
/* by default copy */ /* by default copy everything */
recvreq->req_rdma_offset = hdr->hdr_msg_length; recvreq->req_send_offset = bytes_received;
if(hdr->hdr_msg_length > bytes_received) { if(hdr->hdr_msg_length > bytes_received) {
/* /*
* lookup request buffer to determine if memory is already * lookup request buffer to determine if memory is already
* registered. * registered.
@ -259,29 +257,30 @@ static int mca_pml_ob1_recv_request_ack(
if (hdr->hdr_match.hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_PIN && if (hdr->hdr_match.hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_PIN &&
recvreq->req_rdma_cnt != 0) { recvreq->req_rdma_cnt != 0) {
recvreq->req_rdma_offset = bytes_received; recvreq->req_send_offset = hdr->hdr_msg_length;
/* are rdma devices available for long rdma protocol */ /* are rdma devices available for long rdma protocol */
} else if (bml_endpoint->btl_send_limit < hdr->hdr_msg_length && } else if (bml_endpoint->btl_send_limit < hdr->hdr_msg_length &&
bml_endpoint->btl_rdma_offset < hdr->hdr_msg_length && bml_endpoint->btl_rdma_offset < hdr->hdr_msg_length &&
mca_bml_base_btl_array_get_size(&bml_endpoint->btl_rdma)) { mca_bml_base_btl_array_get_size(&bml_endpoint->btl_rdma)) {
/* use convertor to figure out the rdma offset for this request */ /* use convertor to figure out the rdma offset for this request */
recvreq->req_rdma_offset = bml_endpoint->btl_rdma_offset; recvreq->req_send_offset = hdr->hdr_msg_length -
if(recvreq->req_rdma_offset < bytes_received) { bml_endpoint->btl_rdma_offset;
recvreq->req_rdma_offset = bytes_received; if(recvreq->req_send_offset < bytes_received) {
recvreq->req_send_offset = bytes_received;
} }
ompi_convertor_set_position( &recvreq->req_recv.req_convertor, ompi_convertor_set_position( &recvreq->req_recv.req_convertor,
&recvreq->req_rdma_offset ); &recvreq->req_send_offset );
} }
} }
/* start rdma at current fragment offset - no need to ack */ /* nothing to send by copy in/out - no need to ack */
if(recvreq->req_rdma_offset == bytes_received) if(recvreq->req_send_offset == hdr->hdr_msg_length)
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/* let know to shedule function there is no need to put ACK flag */ /* let know to shedule function there is no need to put ACK flag */
recvreq->req_ack_sent = true; recvreq->req_ack_sent = true;
return mca_pml_ob1_recv_request_ack_send(proc, hdr->hdr_src_req.lval, return mca_pml_ob1_recv_request_ack_send(proc, hdr->hdr_src_req.lval,
recvreq, recvreq->req_rdma_offset); recvreq, recvreq->req_send_offset);
} }
@ -473,6 +472,7 @@ void mca_pml_ob1_recv_request_progress(
bytes_received -= sizeof(mca_pml_ob1_rendezvous_hdr_t); bytes_received -= sizeof(mca_pml_ob1_rendezvous_hdr_t);
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length; recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
recvreq->req_send = hdr->hdr_rndv.hdr_src_req; recvreq->req_send = hdr->hdr_rndv.hdr_src_req;
recvreq->req_rdma_offset = bytes_received;
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match); MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, bytes_received); mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, bytes_received);
if(recvreq->req_recv.req_base.req_pml_complete) { if(recvreq->req_recv.req_base.req_pml_complete) {
@ -537,7 +537,7 @@ void mca_pml_ob1_recv_request_progress(
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received) if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received)
>= recvreq->req_recv.req_bytes_packed ) { >= recvreq->req_recv.req_bytes_packed ) {
MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq ); MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq );
} else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { } else if (recvreq->req_rdma_offset < recvreq->req_send_offset) {
/* schedule additional rdma operations */ /* schedule additional rdma operations */
mca_pml_ob1_recv_request_schedule(recvreq); mca_pml_ob1_recv_request_schedule(recvreq);
} }
@ -599,7 +599,7 @@ int mca_pml_ob1_recv_request_schedule_exclusive( mca_pml_ob1_recv_request_t* rec
num_tries = recvreq->req_rdma_cnt; num_tries = recvreq->req_rdma_cnt;
do { do {
size_t bytes_remaining = recvreq->req_recv.req_bytes_packed - size_t bytes_remaining = recvreq->req_send_offset -
recvreq->req_rdma_offset; recvreq->req_rdma_offset;
size_t prev_bytes_remaining = 0; size_t prev_bytes_remaining = 0;
int num_fail = 0; int num_fail = 0;

Просмотреть файл

@ -47,6 +47,7 @@ struct mca_pml_ob1_recv_request_t {
size_t req_bytes_received; size_t req_bytes_received;
size_t req_bytes_delivered; size_t req_bytes_delivered;
size_t req_rdma_offset; size_t req_rdma_offset;
size_t req_send_offset;
mca_pml_ob1_rdma_btl_t req_rdma[MCA_PML_OB1_MAX_RDMA_PER_REQUEST]; mca_pml_ob1_rdma_btl_t req_rdma[MCA_PML_OB1_MAX_RDMA_PER_REQUEST];
uint32_t req_rdma_cnt; uint32_t req_rdma_cnt;
uint32_t req_rdma_idx; uint32_t req_rdma_idx;
@ -350,7 +351,7 @@ static inline void mca_pml_ob1_recv_request_schedule(
_pckt->hdr.hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_ACK; \ _pckt->hdr.hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_ACK; \
_pckt->hdr.hdr_ack.hdr_src_req.lval = (S); \ _pckt->hdr.hdr_ack.hdr_src_req.lval = (S); \
_pckt->hdr.hdr_ack.hdr_dst_req.pval = (D); \ _pckt->hdr.hdr_ack.hdr_dst_req.pval = (D); \
_pckt->hdr.hdr_ack.hdr_rdma_offset = (O); \ _pckt->hdr.hdr_ack.hdr_send_offset = (O); \
_pckt->proc = (P); \ _pckt->proc = (P); \
_pckt->bml_btl = NULL; \ _pckt->bml_btl = NULL; \
OPAL_THREAD_LOCK(&mca_pml_ob1.lock); \ OPAL_THREAD_LOCK(&mca_pml_ob1.lock); \
@ -364,7 +365,7 @@ int mca_pml_ob1_recv_request_ack_send_btl(ompi_proc_t* proc,
uint64_t hdr_rdma_offset); uint64_t hdr_rdma_offset);
static inline int mca_pml_ob1_recv_request_ack_send(ompi_proc_t* proc, static inline int mca_pml_ob1_recv_request_ack_send(ompi_proc_t* proc,
uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_rdma_offset) uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_send_offset)
{ {
size_t i; size_t i;
mca_bml_base_btl_t* bml_btl; mca_bml_base_btl_t* bml_btl;
@ -374,12 +375,12 @@ static inline int mca_pml_ob1_recv_request_ack_send(ompi_proc_t* proc,
for(i = 0; i < mca_bml_base_btl_array_get_size(&endpoint->btl_eager); i++) { for(i = 0; i < mca_bml_base_btl_array_get_size(&endpoint->btl_eager); i++) {
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
if(mca_pml_ob1_recv_request_ack_send_btl(proc, bml_btl, hdr_src_req, if(mca_pml_ob1_recv_request_ack_send_btl(proc, bml_btl, hdr_src_req,
hdr_dst_req, hdr_rdma_offset) == OMPI_SUCCESS) hdr_dst_req, hdr_send_offset) == OMPI_SUCCESS)
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
MCA_PML_OB1_ADD_ACK_TO_PENDING(proc, hdr_src_req, hdr_dst_req, MCA_PML_OB1_ADD_ACK_TO_PENDING(proc, hdr_src_req, hdr_dst_req,
hdr_rdma_offset); hdr_send_offset);
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }

Просмотреть файл

@ -33,6 +33,8 @@
#include "ompi/mca/bml/base/base.h" #include "ompi/mca/bml/base/base.h"
#include "ompi/datatype/dt_arch.h" #include "ompi/datatype/dt_arch.h"
OBJ_CLASS_INSTANCE(mca_pml_ob1_send_range_t, ompi_free_list_item_t,
NULL, NULL);
void mca_pml_ob1_send_request_process_pending(mca_bml_base_btl_t *bml_btl) void mca_pml_ob1_send_request_process_pending(mca_bml_base_btl_t *bml_btl)
{ {
@ -131,6 +133,8 @@ static void mca_pml_ob1_send_request_construct(mca_pml_ob1_send_request_t* req)
req->req_send.req_base.req_ompi.req_free = mca_pml_ob1_send_request_free; req->req_send.req_base.req_ompi.req_free = mca_pml_ob1_send_request_free;
req->req_send.req_base.req_ompi.req_cancel = mca_pml_ob1_send_request_cancel; req->req_send.req_base.req_ompi.req_cancel = mca_pml_ob1_send_request_cancel;
req->req_rdma_cnt = 0; req->req_rdma_cnt = 0;
req->req_throttle_sends = false;
OBJ_CONSTRUCT(&req->req_send_ranges, opal_list_t);
} }
OBJ_CLASS_INSTANCE( OBJ_CLASS_INSTANCE(
@ -435,7 +439,6 @@ int mca_pml_ob1_send_request_start_buffered(
/* update lengths */ /* update lengths */
segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t) + max_data; segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t) + max_data;
sendreq->req_send_offset = max_data;
descriptor->des_cbfunc = mca_pml_ob1_rndv_completion; descriptor->des_cbfunc = mca_pml_ob1_rndv_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -448,8 +451,8 @@ int mca_pml_ob1_send_request_start_buffered(
return rc; return rc;
} }
iov.iov_base = (IOVBASE_TYPE*)(((unsigned char*)sendreq->req_send.req_addr) + sendreq->req_send_offset); iov.iov_base = (IOVBASE_TYPE*)(((unsigned char*)sendreq->req_send.req_addr) + max_data);
iov.iov_len = max_data = sendreq->req_send.req_bytes_packed - sendreq->req_send_offset; iov.iov_len = max_data = sendreq->req_send.req_bytes_packed - max_data;
if((rc = ompi_convertor_pack( &sendreq->req_send.req_convertor, if((rc = ompi_convertor_pack( &sendreq->req_send.req_convertor,
&iov, &iov,
@ -549,8 +552,6 @@ int mca_pml_ob1_send_request_start_copy( mca_pml_ob1_send_request_t* sendreq,
/* update lengths */ /* update lengths */
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data; segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data;
sendreq->req_send_offset = max_data;
sendreq->req_rdma_offset = max_data;
/* short message */ /* short message */
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -625,10 +626,6 @@ int mca_pml_ob1_send_request_start_prepare( mca_pml_ob1_send_request_t* sendreq,
/* short message */ /* short message */
descriptor->des_cbfunc = mca_pml_ob1_match_completion_free; descriptor->des_cbfunc = mca_pml_ob1_match_completion_free;
/* update lengths */
sendreq->req_send_offset = size;
sendreq->req_rdma_offset = size;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_cbdata = sendreq; descriptor->des_cbdata = sendreq;
@ -786,7 +783,6 @@ int mca_pml_ob1_send_request_start_rdma(
/* update lengths with number of bytes actually packed */ /* update lengths with number of bytes actually packed */
segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t); segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t);
sendreq->req_rdma_offset = 0;
/* first fragment of a long message */ /* first fragment of a long message */
des->des_cbfunc = mca_pml_ob1_rndv_completion; des->des_cbfunc = mca_pml_ob1_rndv_completion;
@ -873,8 +869,6 @@ int mca_pml_ob1_send_request_start_rndv(
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbdata = sendreq; des->des_cbdata = sendreq;
des->des_cbfunc = mca_pml_ob1_rndv_completion; des->des_cbfunc = mca_pml_ob1_rndv_completion;
sendreq->req_send_offset = size;
sendreq->req_rdma_offset = size;
/* send */ /* send */
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
@ -884,6 +878,26 @@ int mca_pml_ob1_send_request_start_rndv(
return rc; return rc;
} }
void mca_pml_ob1_send_requst_copy_in_out(mca_pml_ob1_send_request_t *sendreq,
uint64_t send_offset, uint64_t send_length)
{
mca_pml_ob1_send_range_t *sr;
ompi_free_list_item_t *i;
int rc = OMPI_SUCCESS;
if(0 == send_length)
return;
OMPI_FREE_LIST_WAIT(&mca_pml_ob1.send_ranges, i, rc);
sr = (mca_pml_ob1_send_range_t*)i;
sr->range_send_offset = send_offset;
sr->range_send_length = send_length;
OPAL_THREAD_LOCK(&sendreq->req_send_range_lock);
opal_list_append(&sendreq->req_send_ranges, (opal_list_item_t*)sr);
OPAL_THREAD_UNLOCK(&sendreq->req_send_range_lock);
}
/** /**
* Schedule pipeline of send descriptors for the given request. * Schedule pipeline of send descriptors for the given request.
@ -901,33 +915,47 @@ int mca_pml_ob1_send_request_schedule_exclusive(
mca_bml_base_btl_array_get_size(&bml_endpoint->btl_send); mca_bml_base_btl_array_get_size(&bml_endpoint->btl_send);
do { do {
/* allocate remaining bytes to BTLs */
size_t bytes_remaining = sendreq->req_rdma_offset -
sendreq->req_send_offset;
size_t prev_bytes_remaining = 0, num_fail = 0; size_t prev_bytes_remaining = 0, num_fail = 0;
mca_pml_ob1_send_range_t *range = NULL;
if(bytes_remaining == 0) { while(true) {
OPAL_THREAD_ADD32(&sendreq->req_lock, -sendreq->req_lock);
return OMPI_SUCCESS;
}
while((int32_t)bytes_remaining > 0 &&
(sendreq->req_pipeline_depth < mca_pml_ob1.send_pipeline_depth
||
sendreq->req_rdma_offset < sendreq->req_send.req_bytes_packed))
{
mca_pml_ob1_frag_hdr_t* hdr; mca_pml_ob1_frag_hdr_t* hdr;
mca_btl_base_descriptor_t* des; mca_btl_base_descriptor_t* des;
int rc; int rc;
size_t size; size_t size;
opal_list_item_t *item;
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_t* bml_btl =
mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send);
if(NULL == range || 0 == range->range_send_length) {
OPAL_THREAD_LOCK(&sendreq->req_send_range_lock);
if(range) {
opal_list_remove_first(&sendreq->req_send_ranges);
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.send_ranges,
&range->base);
}
item = opal_list_get_first(&sendreq->req_send_ranges);
OPAL_THREAD_UNLOCK(&sendreq->req_send_range_lock);
if(opal_list_get_end(&sendreq->req_send_ranges) == item)
break;
range = (mca_pml_ob1_send_range_t*)item;
prev_bytes_remaining = 0;
}
if(prev_bytes_remaining == bytes_remaining) if(true == sendreq->req_throttle_sends &&
sendreq->req_pipeline_depth >=
mca_pml_ob1.send_pipeline_depth)
break;
if(prev_bytes_remaining == range->range_send_length)
num_fail++; num_fail++;
else else
num_fail = 0; num_fail = 0;
prev_bytes_remaining = bytes_remaining; prev_bytes_remaining = range->range_send_length;
if (num_fail == num_btl_avail) { if (num_fail == num_btl_avail) {
assert(sendreq->req_pending == MCA_PML_OB1_SEND_PENDING_NONE); assert(sendreq->req_pending == MCA_PML_OB1_SEND_PENDING_NONE);
@ -940,15 +968,15 @@ int mca_pml_ob1_send_request_schedule_exclusive(
} }
if(num_btl_avail == 1 || if(num_btl_avail == 1 ||
bytes_remaining < bml_btl->btl_min_send_size) { range->range_send_length < bml_btl->btl_min_send_size) {
size = bytes_remaining; size = range->range_send_length;
} else { } else {
/* otherwise attempt to give the BTL a percentage of the message /* otherwise attempt to give the BTL a percentage of the message
* based on a weighting factor. for simplicity calculate this as * based on a weighting factor. for simplicity calculate this as
* a percentage of the overall message length (regardless of * a percentage of the overall message length (regardless of
* amount previously assigned) * amount previously assigned)
*/ */
size = (size_t)(bml_btl->btl_weight * bytes_remaining); size = (size_t)(bml_btl->btl_weight * range->range_send_length);
} }
/* makes sure that we don't exceed BTL max send size */ /* makes sure that we don't exceed BTL max send size */
@ -959,7 +987,7 @@ int mca_pml_ob1_send_request_schedule_exclusive(
/* pack into a descriptor */ /* pack into a descriptor */
ompi_convertor_set_position(&sendreq->req_send.req_convertor, ompi_convertor_set_position(&sendreq->req_send.req_convertor,
&sendreq->req_send_offset); &range->range_send_offset);
mca_bml_base_prepare_src(bml_btl, NULL, mca_bml_base_prepare_src(bml_btl, NULL,
&sendreq->req_send.req_convertor, &sendreq->req_send.req_convertor,
@ -975,7 +1003,7 @@ int mca_pml_ob1_send_request_schedule_exclusive(
hdr = (mca_pml_ob1_frag_hdr_t*)des->des_src->seg_addr.pval; hdr = (mca_pml_ob1_frag_hdr_t*)des->des_src->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FRAG; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FRAG;
hdr->hdr_frag_offset = sendreq->req_send_offset; hdr->hdr_frag_offset = range->range_send_offset;
hdr->hdr_src_req.pval = sendreq; hdr->hdr_src_req.pval = sendreq;
hdr->hdr_dst_req = sendreq->req_recv; hdr->hdr_dst_req = sendreq->req_recv;
@ -996,25 +1024,18 @@ int mca_pml_ob1_send_request_schedule_exclusive(
#endif #endif
#endif #endif
/*
* The if-clause should be optimized away, in case the macro
* extends to ;
*/
#if OMPI_WANT_PERUSE #if OMPI_WANT_PERUSE
if( 0 != sendreq->req_send_offset ) { PERUSE_TRACE_COMM_OMPI_EVENT(PERUSE_COMM_REQ_XFER_CONTINUE,
PERUSE_TRACE_COMM_OMPI_EVENT(PERUSE_COMM_REQ_XFER_CONTINUE, &(sendreq->req_send.req_base), size, PERUSE_SEND);
&(sendreq->req_send.req_base),
size, PERUSE_SEND);
}
#endif /* OMPI_WANT_PERUSE */ #endif /* OMPI_WANT_PERUSE */
/* initiate send - note that this may complete before the call returns */ /* initiate send - note that this may complete before the call returns */
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
if(rc == OMPI_SUCCESS) { if(rc == OMPI_SUCCESS) {
bytes_remaining -= size; range->range_send_length -= size;
/* update state */ /* update state */
sendreq->req_send_offset += size; range->range_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth, 1); OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth, 1);
} else { } else {
mca_bml_base_free(bml_btl,des); mca_bml_base_free(bml_btl,des);

Просмотреть файл

@ -51,18 +51,26 @@ struct mca_pml_ob1_send_request_t {
int32_t req_state; int32_t req_state;
int32_t req_lock; int32_t req_lock;
#endif #endif
bool req_throttle_sends;
size_t req_pipeline_depth; size_t req_pipeline_depth;
size_t req_bytes_delivered; size_t req_bytes_delivered;
size_t req_send_offset;
size_t req_rdma_offset;
mca_pml_ob1_rdma_btl_t req_rdma[MCA_PML_OB1_MAX_RDMA_PER_REQUEST]; mca_pml_ob1_rdma_btl_t req_rdma[MCA_PML_OB1_MAX_RDMA_PER_REQUEST];
uint32_t req_rdma_cnt; uint32_t req_rdma_cnt;
mca_pml_ob1_send_pending_t req_pending; mca_pml_ob1_send_pending_t req_pending;
opal_mutex_t req_send_range_lock;
opal_list_t req_send_ranges;
}; };
typedef struct mca_pml_ob1_send_request_t mca_pml_ob1_send_request_t; typedef struct mca_pml_ob1_send_request_t mca_pml_ob1_send_request_t;
OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t); OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
struct mca_pml_ob1_send_range_t {
ompi_free_list_item_t base;
uint64_t range_send_offset;
uint64_t range_send_length;
};
typedef struct mca_pml_ob1_send_range_t mca_pml_ob1_send_range_t;
OBJ_CLASS_DECLARATION(mca_pml_ob1_send_range_t);
#define MCA_PML_OB1_SEND_REQUEST_ALLOC( \ #define MCA_PML_OB1_SEND_REQUEST_ALLOC( \
comm, \ comm, \
@ -328,7 +336,6 @@ static inline int mca_pml_ob1_send_request_start(
sendreq->req_lock = 0; sendreq->req_lock = 0;
sendreq->req_pipeline_depth = 0; sendreq->req_pipeline_depth = 0;
sendreq->req_bytes_delivered = 0; sendreq->req_bytes_delivered = 0;
sendreq->req_send_offset = 0;
sendreq->req_pending = MCA_PML_OB1_SEND_PENDING_NONE; sendreq->req_pending = MCA_PML_OB1_SEND_PENDING_NONE;
sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32( sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32(
&comm->procs[sendreq->req_send.req_base.req_peer].send_sequence,1); &comm->procs[sendreq->req_send.req_base.req_peer].send_sequence,1);
@ -394,6 +401,8 @@ int mca_pml_ob1_send_request_put_frag(mca_pml_ob1_rdma_frag_t* frag);
* should be considered for sending packets */ * should be considered for sending packets */
void mca_pml_ob1_send_request_process_pending(mca_bml_base_btl_t *bml_btl); void mca_pml_ob1_send_request_process_pending(mca_bml_base_btl_t *bml_btl);
void mca_pml_ob1_send_requst_copy_in_out(mca_pml_ob1_send_request_t *sendreq,
uint64_t send_offset, uint64_t send_length);
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }
#endif #endif