Cleanup send scheduling code.
This commit was SVN r16014.
Этот коммит содержится в:
родитель
0b0f9d14aa
Коммит
690fb95bda
@ -46,14 +46,10 @@ void mca_pml_ob1_send_request_process_pending(mca_bml_base_btl_t *bml_btl)
|
||||
mca_pml_ob1_send_request_t* sendreq;
|
||||
mca_bml_base_btl_t *send_dst;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
sendreq = (mca_pml_ob1_send_request_t*)
|
||||
opal_list_remove_first(&mca_pml_ob1.send_pending);
|
||||
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
if( OPAL_UNLIKELY(NULL == sendreq) )
|
||||
sendreq = get_request_from_send_pending(&pending_type);
|
||||
if(OPAL_UNLIKELY(NULL == sendreq))
|
||||
break;
|
||||
pending_type = sendreq->req_pending;
|
||||
sendreq->req_pending = MCA_PML_OB1_SEND_PENDING_NONE;
|
||||
|
||||
switch(pending_type) {
|
||||
case MCA_PML_OB1_SEND_PENDING_SCHEDULE:
|
||||
if(mca_pml_ob1_send_request_schedule_exclusive(sendreq) ==
|
||||
@ -67,23 +63,14 @@ void mca_pml_ob1_send_request_process_pending(mca_bml_base_btl_t *bml_btl)
|
||||
if(NULL == send_dst ||
|
||||
mca_pml_ob1_send_request_start_btl(sendreq, send_dst) ==
|
||||
OMPI_ERR_OUT_OF_RESOURCE) {
|
||||
/* if dst of this sendreq cannot be reached through the
|
||||
* endpoint or no resources put request back on the list */
|
||||
OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
sendreq->req_pending = MCA_PML_OB1_SEND_PENDING_START;
|
||||
if( OPAL_UNLIKELY(NULL == send_dst) ) {
|
||||
opal_list_append(&mca_pml_ob1.send_pending,
|
||||
(opal_list_item_t*)sendreq);
|
||||
} else {
|
||||
/* prepend to the pending list to minimize reordering */
|
||||
opal_list_prepend(&mca_pml_ob1.send_pending,
|
||||
(opal_list_item_t*)sendreq);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
/* if no destination try next request otherwise give up,
|
||||
* no more resources on this btl */
|
||||
if(send_dst != NULL)
|
||||
return;
|
||||
/* prepend to the pending list to minimize reordering in case
|
||||
* send_dst != 0 */
|
||||
add_request_to_send_pending(sendreq,
|
||||
MCA_PML_OB1_SEND_PENDING_START, NULL == send_dst);
|
||||
/* if no destination try next request otherwise give up,
|
||||
* no more resources on this btl */
|
||||
if(send_dst != NULL)
|
||||
return;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@ -904,6 +891,44 @@ void mca_pml_ob1_send_request_copy_in_out( mca_pml_ob1_send_request_t *sendreq,
|
||||
OPAL_THREAD_UNLOCK(&sendreq->req_send_range_lock);
|
||||
}
|
||||
|
||||
static inline mca_pml_ob1_send_range_t *
|
||||
get_send_range_nolock(mca_pml_ob1_send_request_t* sendreq)
|
||||
{
|
||||
opal_list_item_t *item;
|
||||
|
||||
item = opal_list_get_first(&sendreq->req_send_ranges);
|
||||
|
||||
if(opal_list_get_end(&sendreq->req_send_ranges) == item)
|
||||
return NULL;
|
||||
|
||||
return (mca_pml_ob1_send_range_t*)item;
|
||||
}
|
||||
|
||||
static inline mca_pml_ob1_send_range_t *
|
||||
get_send_range(mca_pml_ob1_send_request_t* sendreq)
|
||||
{
|
||||
mca_pml_ob1_send_range_t *range;
|
||||
|
||||
OPAL_THREAD_LOCK(&sendreq->req_send_range_lock);
|
||||
range = get_send_range_nolock(sendreq);
|
||||
OPAL_THREAD_UNLOCK(&sendreq->req_send_range_lock);
|
||||
|
||||
return range;
|
||||
}
|
||||
|
||||
static inline mca_pml_ob1_send_range_t *
|
||||
get_next_send_range(mca_pml_ob1_send_request_t* sendreq,
|
||||
mca_pml_ob1_send_range_t *range)
|
||||
{
|
||||
OPAL_THREAD_LOCK(&sendreq->req_send_range_lock);
|
||||
opal_list_remove_item(&sendreq->req_send_ranges, (opal_list_item_t *)range);
|
||||
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.send_ranges, &range->base);
|
||||
range = get_send_range_nolock(sendreq);
|
||||
OPAL_THREAD_UNLOCK(&sendreq->req_send_range_lock);
|
||||
|
||||
return range;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule pipeline of send descriptors for the given request.
|
||||
* Up to the rdma threshold. If this is a send based protocol,
|
||||
@ -912,145 +937,135 @@ void mca_pml_ob1_send_request_copy_in_out( mca_pml_ob1_send_request_t *sendreq,
|
||||
* costs of the rdma. Only one thread can be inside this function.
|
||||
*/
|
||||
|
||||
int mca_pml_ob1_send_request_schedule_exclusive(
|
||||
mca_pml_ob1_send_request_t* sendreq)
|
||||
int
|
||||
mca_pml_ob1_send_request_schedule_exclusive(mca_pml_ob1_send_request_t* sendreq)
|
||||
{
|
||||
do {
|
||||
size_t prev_bytes_remaining = 0;
|
||||
mca_pml_ob1_send_range_t *range = NULL;
|
||||
int num_fail = 0;
|
||||
size_t prev_bytes_remaining = 0;
|
||||
mca_pml_ob1_send_range_t *range;
|
||||
int num_fail = 0;
|
||||
|
||||
while(true) {
|
||||
mca_pml_ob1_frag_hdr_t* hdr;
|
||||
mca_btl_base_descriptor_t* des;
|
||||
int rc, btl_idx;
|
||||
size_t size, offset;
|
||||
opal_list_item_t *item;
|
||||
mca_bml_base_btl_t* bml_btl;
|
||||
/* check pipeline_depth here before attempting to get any locks */
|
||||
if(true == sendreq->req_throttle_sends &&
|
||||
sendreq->req_pipeline_depth >= mca_pml_ob1.send_pipeline_depth)
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
if( OPAL_UNLIKELY(NULL == range || 0 == range->range_send_length) ) {
|
||||
OPAL_THREAD_LOCK(&sendreq->req_send_range_lock);
|
||||
if(range) {
|
||||
opal_list_remove_first(&sendreq->req_send_ranges);
|
||||
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.send_ranges,
|
||||
&range->base);
|
||||
}
|
||||
range = get_send_range(sendreq);
|
||||
|
||||
item = opal_list_get_first(&sendreq->req_send_ranges);
|
||||
while(range && (false == sendreq->req_throttle_sends ||
|
||||
sendreq->req_pipeline_depth < mca_pml_ob1.send_pipeline_depth)) {
|
||||
mca_pml_ob1_frag_hdr_t* hdr;
|
||||
mca_btl_base_descriptor_t* des;
|
||||
int rc, btl_idx;
|
||||
size_t size, offset;
|
||||
mca_bml_base_btl_t* bml_btl;
|
||||
|
||||
if(opal_list_get_end(&sendreq->req_send_ranges) == item) {
|
||||
/* nothing to schedule any more. Exit the outer loop ASAP */
|
||||
OPAL_ATOMIC_CMPSET_32(&sendreq->req_lock,
|
||||
sendreq->req_lock, 1);
|
||||
OPAL_THREAD_UNLOCK(&sendreq->req_send_range_lock);
|
||||
break;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&sendreq->req_send_range_lock);
|
||||
assert(range->range_send_length != 0);
|
||||
if(range->range_send_length <= 0) {
|
||||
opal_output(0, "range->range_send_length <= 0!\n");
|
||||
while(1);
|
||||
}
|
||||
|
||||
range = (mca_pml_ob1_send_range_t*)item;
|
||||
prev_bytes_remaining = 0;
|
||||
if(prev_bytes_remaining == range->range_send_length)
|
||||
num_fail++;
|
||||
else
|
||||
num_fail = 0;
|
||||
|
||||
prev_bytes_remaining = range->range_send_length;
|
||||
|
||||
if( OPAL_UNLIKELY(num_fail == range->range_btl_cnt) ) {
|
||||
assert(sendreq->req_pending == MCA_PML_OB1_SEND_PENDING_NONE);
|
||||
add_request_to_send_pending(sendreq,
|
||||
MCA_PML_OB1_SEND_PENDING_SCHEDULE, true);
|
||||
/* Note that request remains locked. send_request_process_pending()
|
||||
* function will call shedule_exclusive() directly without taking
|
||||
* the lock */
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
do {
|
||||
btl_idx = range->range_btl_idx;
|
||||
bml_btl = range->range_btls[btl_idx].bml_btl;
|
||||
size = range->range_btls[btl_idx].length;
|
||||
if(++range->range_btl_idx == range->range_btl_cnt)
|
||||
range->range_btl_idx = 0;
|
||||
} while(!size);
|
||||
|
||||
/* makes sure that we don't exceed BTL max send size */
|
||||
if(bml_btl->btl_max_send_size != 0)
|
||||
{
|
||||
size_t max_send_size = bml_btl->btl_max_send_size -
|
||||
sizeof(mca_pml_ob1_frag_hdr_t);
|
||||
|
||||
if (size > max_send_size) {
|
||||
size = max_send_size;
|
||||
}
|
||||
|
||||
if(true == sendreq->req_throttle_sends &&
|
||||
sendreq->req_pipeline_depth >=
|
||||
mca_pml_ob1.send_pipeline_depth)
|
||||
break;
|
||||
}
|
||||
|
||||
/* pack into a descriptor */
|
||||
offset = (size_t)range->range_send_offset;
|
||||
ompi_convertor_set_position(&sendreq->req_send.req_base.req_convertor,
|
||||
&offset);
|
||||
range->range_send_offset = (uint64_t)offset;
|
||||
|
||||
if(prev_bytes_remaining == range->range_send_length)
|
||||
num_fail++;
|
||||
else
|
||||
num_fail = 0;
|
||||
mca_bml_base_prepare_src(bml_btl, NULL,
|
||||
&sendreq->req_send.req_base.req_convertor,
|
||||
MCA_BTL_NO_ORDER,
|
||||
sizeof(mca_pml_ob1_frag_hdr_t), &size, &des);
|
||||
if( OPAL_UNLIKELY(des == NULL) ) {
|
||||
continue;
|
||||
}
|
||||
des->des_cbfunc = mca_pml_ob1_frag_completion;
|
||||
des->des_cbdata = sendreq;
|
||||
|
||||
prev_bytes_remaining = range->range_send_length;
|
||||
|
||||
if( OPAL_UNLIKELY(num_fail == range->range_btl_cnt) ) {
|
||||
assert(sendreq->req_pending == MCA_PML_OB1_SEND_PENDING_NONE);
|
||||
OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
sendreq->req_pending = MCA_PML_OB1_SEND_PENDING_SCHEDULE;
|
||||
opal_list_append(&mca_pml_ob1.send_pending,
|
||||
(opal_list_item_t*)sendreq);
|
||||
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
do {
|
||||
btl_idx = range->range_btl_idx;
|
||||
bml_btl = range->range_btls[btl_idx].bml_btl;
|
||||
size = range->range_btls[btl_idx].length;
|
||||
if(++range->range_btl_idx == range->range_btl_cnt)
|
||||
range->range_btl_idx = 0;
|
||||
} while(!size);
|
||||
|
||||
/* makes sure that we don't exceed BTL max send size */
|
||||
if (bml_btl->btl_max_send_size != 0 &&
|
||||
size > (bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t))) {
|
||||
size = bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t);
|
||||
}
|
||||
|
||||
/* pack into a descriptor */
|
||||
offset = (size_t)range->range_send_offset;
|
||||
ompi_convertor_set_position(&sendreq->req_send.req_base.req_convertor,
|
||||
&offset);
|
||||
range->range_send_offset = (uint64_t)offset;
|
||||
|
||||
mca_bml_base_prepare_src(bml_btl, NULL,
|
||||
&sendreq->req_send.req_base.req_convertor,
|
||||
MCA_BTL_NO_ORDER,
|
||||
sizeof(mca_pml_ob1_frag_hdr_t), &size, &des);
|
||||
if( OPAL_UNLIKELY(des == NULL) ) {
|
||||
continue;
|
||||
}
|
||||
des->des_cbfunc = mca_pml_ob1_frag_completion;
|
||||
des->des_cbdata = sendreq;
|
||||
|
||||
/* setup header */
|
||||
hdr = (mca_pml_ob1_frag_hdr_t*)des->des_src->seg_addr.pval;
|
||||
hdr->hdr_common.hdr_flags = 0;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FRAG;
|
||||
hdr->hdr_frag_offset = range->range_send_offset;
|
||||
hdr->hdr_src_req.pval = sendreq;
|
||||
hdr->hdr_dst_req = sendreq->req_recv;
|
||||
/* setup header */
|
||||
hdr = (mca_pml_ob1_frag_hdr_t*)des->des_src->seg_addr.pval;
|
||||
hdr->hdr_common.hdr_flags = 0;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FRAG;
|
||||
hdr->hdr_frag_offset = range->range_send_offset;
|
||||
hdr->hdr_src_req.pval = sendreq;
|
||||
hdr->hdr_dst_req = sendreq->req_recv;
|
||||
|
||||
#if OMPI_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
#ifdef WORDS_BIGENDIAN
|
||||
hdr->hdr_common.hdr_flags |= MCA_PML_OB1_HDR_FLAGS_NBO;
|
||||
hdr->hdr_common.hdr_flags |= MCA_PML_OB1_HDR_FLAGS_NBO;
|
||||
#else
|
||||
/*
|
||||
* if we are little endian and the remote side is big endian,
|
||||
* we're responsible for making sure the data is in network byte
|
||||
* order
|
||||
*/
|
||||
if(sendreq->req_send.req_base.req_proc->proc_arch &
|
||||
OMPI_ARCH_ISBIGENDIAN) {
|
||||
hdr->hdr_common.hdr_flags |= MCA_PML_OB1_HDR_FLAGS_NBO;
|
||||
MCA_PML_OB1_FRAG_HDR_HTON(*hdr);
|
||||
}
|
||||
/*
|
||||
* if we are little endian and the remote side is big endian,
|
||||
* we're responsible for making sure the data is in network byte
|
||||
* order
|
||||
*/
|
||||
if(sendreq->req_send.req_base.req_proc->proc_arch &
|
||||
OMPI_ARCH_ISBIGENDIAN) {
|
||||
hdr->hdr_common.hdr_flags |= MCA_PML_OB1_HDR_FLAGS_NBO;
|
||||
MCA_PML_OB1_FRAG_HDR_HTON(*hdr);
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#if OMPI_WANT_PERUSE
|
||||
PERUSE_TRACE_COMM_OMPI_EVENT(PERUSE_COMM_REQ_XFER_CONTINUE,
|
||||
&(sendreq->req_send.req_base), size, PERUSE_SEND);
|
||||
PERUSE_TRACE_COMM_OMPI_EVENT(PERUSE_COMM_REQ_XFER_CONTINUE,
|
||||
&(sendreq->req_send.req_base), size, PERUSE_SEND);
|
||||
#endif /* OMPI_WANT_PERUSE */
|
||||
|
||||
/* initiate send - note that this may complete before the call returns */
|
||||
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
|
||||
|
||||
if( OPAL_LIKELY(rc == OMPI_SUCCESS) ) {
|
||||
/* update state */
|
||||
range->range_btls[btl_idx].length -= size;
|
||||
range->range_send_length -= size;
|
||||
range->range_send_offset += size;
|
||||
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth, 1);
|
||||
} else {
|
||||
mca_bml_base_free(bml_btl,des);
|
||||
continue;
|
||||
/* initiate send - note that this may complete before the call returns */
|
||||
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
|
||||
|
||||
if( OPAL_LIKELY(rc == OMPI_SUCCESS) ) {
|
||||
/* update state */
|
||||
range->range_btls[btl_idx].length -= size;
|
||||
range->range_send_length -= size;
|
||||
range->range_send_offset += size;
|
||||
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth, 1);
|
||||
if(range->range_send_length == 0) {
|
||||
range = get_next_send_range(sendreq, range);
|
||||
prev_bytes_remaining = 0;
|
||||
}
|
||||
mca_bml.bml_progress();
|
||||
} else {
|
||||
mca_bml_base_free(bml_btl,des);
|
||||
continue;
|
||||
}
|
||||
} while (OPAL_THREAD_ADD32(&sendreq->req_lock, -1) > 0);
|
||||
|
||||
send_request_pml_complete_check(sendreq);
|
||||
mca_bml.bml_progress();
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -68,6 +68,49 @@ struct mca_pml_ob1_send_range_t {
|
||||
typedef struct mca_pml_ob1_send_range_t mca_pml_ob1_send_range_t;
|
||||
OBJ_CLASS_DECLARATION(mca_pml_ob1_send_range_t);
|
||||
|
||||
static inline bool lock_send_request(mca_pml_ob1_send_request_t *sendreq)
|
||||
{
|
||||
return OPAL_THREAD_ADD32(&sendreq->req_lock, 1) == 1;
|
||||
}
|
||||
|
||||
static inline bool unlock_send_request(mca_pml_ob1_send_request_t *sendreq)
|
||||
{
|
||||
return OPAL_THREAD_ADD32(&sendreq->req_lock, -1) == 0;
|
||||
}
|
||||
|
||||
static inline void
|
||||
add_request_to_send_pending(mca_pml_ob1_send_request_t* sendreq,
|
||||
const mca_pml_ob1_send_pending_t type, const bool append)
|
||||
{
|
||||
opal_list_item_t *item = (opal_list_item_t*)sendreq;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
sendreq->req_pending = type;
|
||||
if(append)
|
||||
opal_list_append(&mca_pml_ob1.send_pending, item);
|
||||
else
|
||||
opal_list_prepend(&mca_pml_ob1.send_pending, item);
|
||||
|
||||
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
}
|
||||
|
||||
static inline mca_pml_ob1_send_request_t*
|
||||
get_request_from_send_pending(mca_pml_ob1_send_pending_t *type)
|
||||
{
|
||||
mca_pml_ob1_send_request_t *sendreq;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
sendreq = (mca_pml_ob1_send_request_t*)
|
||||
opal_list_remove_first(&mca_pml_ob1.send_pending);
|
||||
if(sendreq) {
|
||||
*type = sendreq->req_pending;
|
||||
sendreq->req_pending = MCA_PML_OB1_SEND_PENDING_NONE;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
|
||||
return sendreq;
|
||||
}
|
||||
|
||||
#define MCA_PML_OB1_SEND_REQUEST_ALLOC( comm, \
|
||||
dst, \
|
||||
sendreq, \
|
||||
@ -217,7 +260,7 @@ send_request_pml_complete_check(mca_pml_ob1_send_request_t *sendreq)
|
||||
* another request or if the request is persistent it can be restarted */
|
||||
if(sendreq->req_state == 0 &&
|
||||
sendreq->req_bytes_delivered >= sendreq->req_send.req_bytes_packed
|
||||
&& OPAL_THREAD_ADD32(&sendreq->req_lock, 1) == 1) {
|
||||
&& lock_send_request(sendreq)) {
|
||||
send_request_pml_complete(sendreq);
|
||||
return true;
|
||||
}
|
||||
@ -225,15 +268,14 @@ send_request_pml_complete_check(mca_pml_ob1_send_request_t *sendreq)
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Schedule additional fragments
|
||||
*/
|
||||
int mca_pml_ob1_send_request_schedule_exclusive(
|
||||
mca_pml_ob1_send_request_t* sendreq);
|
||||
int
|
||||
mca_pml_ob1_send_request_schedule_exclusive(mca_pml_ob1_send_request_t*);
|
||||
|
||||
static inline void mca_pml_ob1_send_request_schedule(
|
||||
mca_pml_ob1_send_request_t* sendreq)
|
||||
static inline void
|
||||
mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
|
||||
{
|
||||
/*
|
||||
* Only allow one thread in this routine for a given request.
|
||||
@ -242,8 +284,16 @@ static inline void mca_pml_ob1_send_request_schedule(
|
||||
* the scheduling logic once for every call.
|
||||
*/
|
||||
|
||||
if(OPAL_THREAD_ADD32(&sendreq->req_lock, 1) == 1)
|
||||
mca_pml_ob1_send_request_schedule_exclusive(sendreq);
|
||||
if(!lock_send_request(sendreq))
|
||||
return;
|
||||
|
||||
do {
|
||||
int rc;
|
||||
rc = mca_pml_ob1_send_request_schedule_exclusive(sendreq);
|
||||
if(rc == OMPI_ERR_OUT_OF_RESOURCE)
|
||||
return;
|
||||
} while(!unlock_send_request(sendreq));
|
||||
send_request_pml_complete_check(sendreq);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -371,10 +421,7 @@ mca_pml_ob1_send_request_start( mca_pml_ob1_send_request_t* sendreq )
|
||||
if( OPAL_LIKELY(OMPI_ERR_OUT_OF_RESOURCE != rc) )
|
||||
return rc;
|
||||
}
|
||||
OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
sendreq->req_pending = MCA_PML_OB1_SEND_PENDING_START;
|
||||
opal_list_append(&mca_pml_ob1.send_pending, (opal_list_item_t*)sendreq);
|
||||
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
add_request_to_send_pending(sendreq, MCA_PML_OB1_SEND_PENDING_START, true);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user