1
1

port of revised flow control from openib

This commit was SVN r8799.
Этот коммит содержится в:
Tim Woodall 2006-01-24 23:44:30 +00:00
родитель e861158fcd
Коммит 51ec050647
5 изменённых файлов: 399 добавлений и 396 удалений

Просмотреть файл

@ -521,42 +521,6 @@ int mca_btl_mvapi_finalize(struct mca_btl_base_module_t* btl)
{
mca_btl_mvapi_module_t* mvapi_btl;
mvapi_btl = (mca_btl_mvapi_module_t*) btl;
#if 0
if(mvapi_btl->send_free_eager.fl_num_allocated !=
mvapi_btl->send_free_eager.super.opal_list_length){
opal_output(0, "btl ib send_free_eager frags: %d allocated %d returned \n",
mvapi_btl->send_free_eager.fl_num_allocated,
mvapi_btl->send_free_eager.super.opal_list_length);
}
if(mvapi_btl->send_free_max.fl_num_allocated !=
mvapi_btl->send_free_max.super.opal_list_length){
opal_output(0, "btl ib send_free_max frags: %d allocated %d returned \n",
mvapi_btl->send_free_max.fl_num_allocated,
mvapi_btl->send_free_max.super.opal_list_length);
}
if(mvapi_btl->send_free_frag.fl_num_allocated !=
mvapi_btl->send_free_frag.super.opal_list_length){
opal_output(0, "btl ib send_free_frag frags: %d allocated %d returned \n",
mvapi_btl->send_free_frag.fl_num_allocated,
mvapi_btl->send_free_frag.super.opal_list_length);
}
if(mvapi_btl->recv_free_eager.fl_num_allocated !=
mvapi_btl->recv_free_eager.super.opal_list_length){
opal_output(0, "btl ib recv_free_eager frags: %d allocated %d returned \n",
mvapi_btl->recv_free_eager.fl_num_allocated,
mvapi_btl->recv_free_eager.super.opal_list_length);
}
if(mvapi_btl->recv_free_max.fl_num_allocated !=
mvapi_btl->recv_free_max.super.opal_list_length){
opal_output(0, "btl ib recv_free_max frags: %d allocated %d returned \n",
mvapi_btl->recv_free_max.fl_num_allocated,
mvapi_btl->recv_free_max.super.opal_list_length);
}
#endif
return OMPI_SUCCESS;
}
@ -574,6 +538,7 @@ int mca_btl_mvapi_send(
mca_btl_mvapi_frag_t* frag = (mca_btl_mvapi_frag_t*)descriptor;
frag->endpoint = endpoint;
frag->hdr->tag = tag;
frag->sr_desc.opcode = VAPI_SEND;
return mca_btl_mvapi_endpoint_send(endpoint, frag);
}
@ -588,41 +553,29 @@ int mca_btl_mvapi_put( mca_btl_base_module_t* btl,
int rc;
mca_btl_mvapi_module_t* mvapi_btl = (mca_btl_mvapi_module_t*) btl;
mca_btl_mvapi_frag_t* frag = (mca_btl_mvapi_frag_t*) descriptor;
/* setup for queued requests */
frag->endpoint = endpoint;
assert(endpoint->endpoint_state == MCA_BTL_IB_CONNECTED ||
endpoint->endpoint_state == MCA_BTL_IB_WAITING_ACK);
frag->sr_desc.opcode = VAPI_RDMA_WRITE;
/* atomically test and acquire a token */
if(!mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,-1) < 0) {
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,1);
rc = OMPI_SUCCESS;
} else if(mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,-1) < 0) {
OPAL_THREAD_LOCK(&mvapi_btl->ib_lock);
opal_list_append(&mvapi_btl->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock);
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,1);
rc = OMPI_SUCCESS;
} else {
/* check for a send wqe */
if (OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp,-1) < 0) {
OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp,1);
OPAL_THREAD_LOCK(&endpoint->ib_lock);
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_UNLOCK(&endpoint->ib_lock);
return OMPI_SUCCESS;
/* post descriptor */
} else {
frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_lp;
frag->sr_desc.remote_addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_dst->seg_addr.pval;
frag->sr_desc.r_key = frag->base.des_dst->seg_key.key32[0];
frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_src->seg_addr.pval;
frag->sg_entry.len = frag->base.des_src->seg_len;
frag->ret = VAPI_post_sr(mvapi_btl->nic,
endpoint->lcl_qp_hndl_lp,
&frag->sr_desc);
if(VAPI_OK != frag->ret){
if(VAPI_OK != VAPI_post_sr(mvapi_btl->nic, endpoint->lcl_qp_hndl_lp, &frag->sr_desc)) {
rc = OMPI_ERROR;
} else {
rc = OMPI_SUCCESS;
@ -638,9 +591,7 @@ int mca_btl_mvapi_put( mca_btl_base_module_t* btl,
MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(endpoint, 1);
}
}
return rc;
}
/*
@ -655,36 +606,28 @@ int mca_btl_mvapi_get( mca_btl_base_module_t* btl,
mca_btl_mvapi_module_t* mvapi_btl = (mca_btl_mvapi_module_t*) btl;
mca_btl_mvapi_frag_t* frag = (mca_btl_mvapi_frag_t*) descriptor;
assert(endpoint->endpoint_state == MCA_BTL_IB_CONNECTED ||
endpoint->endpoint_state == MCA_BTL_IB_WAITING_ACK);
frag->sr_desc.opcode = VAPI_RDMA_READ;
frag->endpoint = endpoint;
/* atomically test and acquire a token */
if(!mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,-1) < 0) {
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,1);
rc = OMPI_SUCCESS;
} else if(mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,-1) < 0) {
frag->sr_desc.opcode = VAPI_RDMA_READ;
/* check for a send wqe */
if (OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp,-1) < 0) {
OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp,1);
OPAL_THREAD_LOCK(&mvapi_btl->ib_lock);
opal_list_append(&mvapi_btl->pending_frags_lp, (opal_list_item_t *)frag);
opal_list_append(&mvapi_btl->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock);
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,1);
rc = OMPI_SUCCESS;
return OMPI_SUCCESS;
/* check for a get token */
} else if(OPAL_THREAD_ADD32(&endpoint->get_tokens,-1) < 0) {
OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp,1);
OPAL_THREAD_ADD32(&endpoint->get_tokens,1);
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
OPAL_THREAD_ADD32(&endpoint->get_tokens,1);
rc = OMPI_SUCCESS;
return OMPI_SUCCESS;
} else {
@ -694,10 +637,7 @@ int mca_btl_mvapi_get( mca_btl_base_module_t* btl,
frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_dst->seg_addr.pval;
frag->sg_entry.len = frag->base.des_dst->seg_len;
frag->ret = VAPI_post_sr(mvapi_btl->nic,
endpoint->lcl_qp_hndl_lp,
&frag->sr_desc);
if(VAPI_OK != frag->ret){
if(VAPI_OK != VAPI_post_sr(mvapi_btl->nic, endpoint->lcl_qp_hndl_lp, &frag->sr_desc)) {
rc = OMPI_ERROR;
} else {
rc = OMPI_SUCCESS;

Просмотреть файл

@ -183,11 +183,11 @@ int mca_btl_mvapi_component_open(void)
0, (int*) &mca_btl_mvapi_component.ib_src_path_bits);
mca_btl_mvapi_param_register_int("rd_num", "number of receive descriptors to post to a QP",
16, (int*) &mca_btl_mvapi_component.rd_num);
8, (int*) &mca_btl_mvapi_component.rd_num);
mca_btl_mvapi_param_register_int("rd_low", "low water mark before reposting occurs",
12, (int*) &mca_btl_mvapi_component.rd_low);
6, (int*) &mca_btl_mvapi_component.rd_low);
mca_btl_mvapi_param_register_int("rd_win", "window size at which generate explicity credit message",
8, (int*) &mca_btl_mvapi_component.rd_win);
4, (int*) &mca_btl_mvapi_component.rd_win);
mca_btl_mvapi_component.rd_rsv = ((mca_btl_mvapi_component.rd_num<<1)-1) / mca_btl_mvapi_component.rd_win;
mca_btl_mvapi_param_register_int("srq_rd_max", "Maximum number of receive descriptors posted per SRQ.\n",
@ -532,9 +532,9 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules,
/* Initialize the rr_desc_post array for posting of rr*/
mvapi_btl->rr_desc_post = (VAPI_rr_desc_t*) malloc((mvapi_btl->rd_num * sizeof(VAPI_rr_desc_t)));
mvapi_btl->rr_desc_post = (VAPI_rr_desc_t*) malloc(
((mca_btl_mvapi_component.rd_num + mca_btl_mvapi_component.rd_rsv) * sizeof(VAPI_rr_desc_t)));
btls[i] = &mvapi_btl->super;
}
/* Post OOB receive to support dynamic connection setup */
@ -565,7 +565,6 @@ int mca_btl_mvapi_component_progress( void )
VAPI_wc_desc_t comp;
mca_btl_mvapi_module_t* mvapi_btl = &mca_btl_mvapi_component.mvapi_btls[i];
/* we have two completion queues, one for "high" priority and one for "low".
* we will check the high priority and process them until there are none left.
* note that low priority messages are only processed one per progress call.
@ -590,22 +589,49 @@ int mca_btl_mvapi_component_progress( void )
/* Process a completed send */
frag = (mca_btl_mvapi_frag_t*) (unsigned long) comp.id;
endpoint = (mca_btl_mvapi_endpoint_t*) frag->endpoint;
/* Process a completed send */
frag->base.des_cbfunc(&mvapi_btl->super, endpoint, &frag->base, OMPI_SUCCESS);
/* check and see if we need to progress pending sends */
if( mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_hp, 1) > 0
&& !opal_list_is_empty(&mvapi_btl->pending_frags_hp)) {
/* return send wqe */
OPAL_THREAD_ADD32(&endpoint->sd_wqe_hp, 1);
/* check to see if we need to progress any pending desciptors */
while (!opal_list_is_empty(&endpoint->pending_frags_hp) &&
endpoint->sd_wqe_hp > 0 && endpoint->sd_tokens_hp > 0) {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(endpoint->pending_frags_hp));
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
if(NULL == (frag = (mca_btl_mvapi_frag_t *) frag_item))
break;
if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) {
BTL_ERROR(("error in posting pending send\n"));
break;
}
}
if(!mca_btl_mvapi_component.use_srq) {
/* check to see if we need to return credits */
if( endpoint->rd_credits_hp >= mca_btl_mvapi_component.rd_win &&
OPAL_THREAD_ADD32(&endpoint->sd_credits_hp, 1) == 1) {
mca_btl_mvapi_endpoint_send_credits_hp(endpoint);
}
} else if(OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_hp, 1) > 0
&& !opal_list_is_empty(&mvapi_btl->pending_frags_hp)) {
/* dequeue resources due to global flow control */
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&mvapi_btl->ib_lock);
frag_item = opal_list_remove_first(&mvapi_btl->pending_frags_hp);
frag = (mca_btl_mvapi_frag_t *) frag_item;
if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(endpoint, frag)) {
OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock);
if(NULL != (frag = (mca_btl_mvapi_frag_t *) frag_item) &&
OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) {
BTL_ERROR(("error in posting pending send\n"));
}
}
}
count++;
break;
case VAPI_CQE_RQ_SEND_DATA:
/* process a RECV */
@ -621,47 +647,42 @@ int mca_btl_mvapi_component_progress( void )
/* repost receive descriptors */
#ifdef VAPI_FEATURE_SRQ
if(mca_btl_mvapi_component.use_srq) {
OPAL_THREAD_ADD32(&mvapi_btl->srd_posted_hp, -1);
MCA_BTL_MVAPI_POST_SRR_HIGH(mvapi_btl, 0);
} else
if(mca_btl_mvapi_component.use_srq) {
OPAL_THREAD_ADD32((int32_t*) &mvapi_btl->srd_posted_hp, -1);
MCA_BTL_MVAPI_POST_SRR_HIGH(mvapi_btl, 0);
} else {
#endif
{
OPAL_THREAD_ADD32(&endpoint->rd_posted_hp, -1);
MCA_BTL_MVAPI_ENDPOINT_POST_RR_HIGH(endpoint, 0);
}
/* check to see if we need to progress any pending desciptors */
if( !mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_hp, credits) > 0
&& !opal_list_is_empty(&(endpoint->pending_frags_hp))) {
do {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(endpoint->pending_frags_hp));
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
if(NULL == (frag = (mca_btl_mvapi_frag_t *) frag_item))
break;
if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) {
BTL_ERROR(("error in posting pending send\n"));
break;
OPAL_THREAD_ADD32((int32_t*) &endpoint->rd_posted_hp, -1);
MCA_BTL_MVAPI_ENDPOINT_POST_RR_HIGH(endpoint, 0);
/* check to see if we need to progress any pending desciptors */
if( OPAL_THREAD_ADD32(&endpoint->sd_tokens_hp, credits) > 0) {
while(!opal_list_is_empty(&endpoint->pending_frags_hp) &&
endpoint->sd_wqe_hp > 0 && endpoint->sd_tokens_hp > 0) {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(endpoint->pending_frags_hp));
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
if(NULL == (frag = (mca_btl_mvapi_frag_t *) frag_item))
break;
if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) {
BTL_ERROR(("error in posting pending send\n"));
break;
}
}
} while(endpoint->sd_tokens_hp > 0);
}
}
/* check to see if we need to return credits */
if( !mca_btl_mvapi_component.use_srq &&
endpoint->rd_credits_hp >= mca_btl_mvapi_component.rd_win) {
mca_btl_mvapi_endpoint_send_credits(
endpoint,
endpoint->lcl_qp_hndl_hp,
endpoint->rem_info.rem_qp_num_hp,
&endpoint->rd_credits_hp);
/* check to see if we need to return credits */
if( endpoint->rd_credits_hp >= mca_btl_mvapi_component.rd_win &&
OPAL_THREAD_ADD32(&endpoint->sd_credits_hp, 1) == 1) {
mca_btl_mvapi_endpoint_send_credits_hp(endpoint);
}
#ifdef VAPI_FEATURE_SRQ
}
count++;
#endif
count++;
break;
case VAPI_CQE_SQ_RDMA_READ:
case VAPI_CQE_SQ_RDMA_WRITE:
default:
@ -687,20 +708,46 @@ int mca_btl_mvapi_component_progress( void )
case VAPI_CQE_SQ_SEND_DATA :
/* Process a completed send - receiver must return tokens */
frag = (mca_btl_mvapi_frag_t*) (unsigned long) comp.id;
frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
count++;
endpoint = frag->endpoint;
/* if we have tokens, process pending sends */
if(mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp, 1) > 0
&& !opal_list_is_empty(&mvapi_btl->pending_frags_lp)) {
opal_list_item_t *frag_item;
frag_item = opal_list_remove_first(&mvapi_btl->pending_frags_lp);
frag = (mca_btl_mvapi_frag_t *) frag_item;
/* Process a completed send - receiver must return tokens */
frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
/* return send wqe */
OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp, 1);
/* check to see if we need to progress any pending desciptors */
while (!opal_list_is_empty(&endpoint->pending_frags_lp) &&
endpoint->sd_wqe_lp > 0 && endpoint->sd_tokens_lp > 0) {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(endpoint->pending_frags_lp));
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
if(NULL == (frag = (mca_btl_mvapi_frag_t *) frag_item))
break;
MCA_BTL_IB_FRAG_PROGRESS(frag);
}
if( !mca_btl_mvapi_component.use_srq) {
/* check to see if we need to return credits */
if( endpoint->rd_credits_lp >= mca_btl_mvapi_component.rd_win &&
OPAL_THREAD_ADD32(&endpoint->sd_credits_lp, 1) == 1) {
mca_btl_mvapi_endpoint_send_credits_lp(endpoint);
}
/* SRQ case */
} else if(OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp, 1) > 0
&& !opal_list_is_empty(&mvapi_btl->pending_frags_lp)) {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&mvapi_btl->ib_lock);
frag_item = opal_list_remove_first(&mvapi_btl->pending_frags_lp);
OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock);
if(NULL != (frag = (mca_btl_mvapi_frag_t *) frag_item)) {
MCA_BTL_IB_FRAG_PROGRESS(frag);
}
}
count++;
break;
case VAPI_CQE_SQ_RDMA_READ:
@ -711,84 +758,95 @@ int mca_btl_mvapi_component_progress( void )
case VAPI_CQE_SQ_RDMA_WRITE:
/* Process a completed write - returns tokens immediately */
frag = (mca_btl_mvapi_frag_t*) (unsigned long) comp.id;
endpoint = frag->endpoint;
frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
if(mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp, 1) > 0
&& !opal_list_is_empty(&mvapi_btl->pending_frags_lp)) {
/* process a completed write */
frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
/* return send wqe */
OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp, 1);
/* check for pending frags */
if(!opal_list_is_empty(&endpoint->pending_frags_lp)) {
opal_list_item_t *frag_item;
frag_item = opal_list_remove_first(&mvapi_btl->pending_frags_lp);
frag = (mca_btl_mvapi_frag_t *) frag_item;
MCA_BTL_IB_FRAG_PROGRESS(frag);
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&endpoint->pending_frags_lp);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
if(NULL != (frag = (mca_btl_mvapi_frag_t *) frag_item)) {
MCA_BTL_IB_FRAG_PROGRESS(frag);
}
}
if(!mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp, 1) > 0 &&
!opal_list_is_empty(&(endpoint->pending_frags_lp))) {
if (mca_btl_mvapi_component.use_srq &&
endpoint->sd_wqe_lp > 0 &&
!opal_list_is_empty(&mvapi_btl->pending_frags_lp)) {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&frag->endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(frag->endpoint->pending_frags_lp));
OPAL_THREAD_UNLOCK(&frag->endpoint->endpoint_lock);
frag = (mca_btl_mvapi_frag_t *) frag_item;
MCA_BTL_IB_FRAG_PROGRESS(frag);
OPAL_THREAD_LOCK(&mvapi_btl->ib_lock);
frag_item = opal_list_remove_first(&mvapi_btl->pending_frags_lp);
OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock);
if(NULL != (frag = (mca_btl_mvapi_frag_t *) frag_item)) {
MCA_BTL_IB_FRAG_PROGRESS(frag);
}
}
count++;
break;
case VAPI_CQE_RQ_SEND_DATA:
/* Process a RECV */
frag = (mca_btl_mvapi_frag_t*) (unsigned long) comp.id;
endpoint = (mca_btl_mvapi_endpoint_t*) frag->endpoint;
endpoint = (mca_btl_mvapi_endpoint_t*) frag->endpoint;
credits = frag->hdr->credits;
/* process received frag */
frag->rc=OMPI_SUCCESS;
frag->segment.seg_len = comp.byte_len-((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr);
/* advance the segment address past the header and subtract from the length..*/
frag->segment.seg_len = comp.byte_len-
((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr);
/* call registered callback */
mvapi_btl->ib_reg[frag->hdr->tag].cbfunc(&mvapi_btl->super, frag->hdr->tag, &frag->base, mvapi_btl->ib_reg[frag->hdr->tag].cbdata);
OMPI_FREE_LIST_RETURN(&(mvapi_btl->recv_free_max), (opal_list_item_t*) frag);
/* post descriptors */
mvapi_btl->ib_reg[frag->hdr->tag].cbfunc(&mvapi_btl->super,
frag->hdr->tag,
&frag->base,
mvapi_btl->ib_reg[frag->hdr->tag].cbdata);
OMPI_FREE_LIST_RETURN(&(mvapi_btl->recv_free_max), (opal_list_item_t*) frag);
#ifdef VAPI_FEATURE_SRQ
if(mca_btl_mvapi_component.use_srq) {
OPAL_THREAD_ADD32(&mvapi_btl->srd_posted_lp, -1);
MCA_BTL_MVAPI_POST_SRR_LOW(mvapi_btl, 0);
} else
if(mca_btl_mvapi_component.use_srq) {
/* repost receive descriptors */
OPAL_THREAD_ADD32((int32_t*) &mvapi_btl->srd_posted_lp, -1);
MCA_BTL_MVAPI_POST_SRR_LOW(mvapi_btl, 0);
} else {
#endif
{
OPAL_THREAD_ADD32(&endpoint->rd_posted_lp, -1);
MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(endpoint, 0);
/* repost receive descriptors */
OPAL_THREAD_ADD32((int32_t*) &endpoint->rd_posted_lp, -1);
MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(endpoint, 0);
/* check to see if we need to progress any pending desciptors */
if( OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp, credits) > 0) {
while(!opal_list_is_empty(&endpoint->pending_frags_lp) &&
endpoint->sd_wqe_lp > 0 && endpoint->sd_tokens_lp > 0) {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(endpoint->pending_frags_lp));
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
if(NULL == (frag = (mca_btl_mvapi_frag_t *) frag_item))
break;
MCA_BTL_IB_FRAG_PROGRESS(frag);
}
}
/* check to see if we need to return credits */
if( endpoint->rd_credits_lp >= mca_btl_mvapi_component.rd_win &&
OPAL_THREAD_ADD32(&endpoint->sd_credits_lp, 1) == 1) {
mca_btl_mvapi_endpoint_send_credits_lp(endpoint);
}
#ifdef VAPI_FEATURE_SRQ
}
/* check to see if we need to progress pending descriptors */
if(!mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp, credits) > 0 &&
!opal_list_is_empty(&(endpoint->pending_frags_lp))) {
do {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(endpoint->pending_frags_lp));
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
if(NULL == (frag = (mca_btl_mvapi_frag_t *) frag_item))
break;
MCA_BTL_IB_FRAG_PROGRESS(frag);
} while(endpoint->sd_tokens_lp > 0);
}
/* check to see if we need to return credits */
if( !mca_btl_mvapi_component.use_srq &&
endpoint->rd_credits_lp >= mca_btl_mvapi_component.rd_win) {
mca_btl_mvapi_endpoint_send_credits(
endpoint,
endpoint->lcl_qp_hndl_lp,
endpoint->rem_info.rem_qp_num_lp,
&endpoint->rd_credits_lp);
}
count++;
#endif
count++;
break;
default:
BTL_ERROR(("Errorneous network completion"));
break;

Просмотреть файл

@ -68,69 +68,102 @@ static inline int mca_btl_mvapi_endpoint_post_send(
mca_btl_mvapi_frag_t * frag)
{
VAPI_qp_hndl_t qp_hndl;
frag->sr_desc.remote_qkey = 0;
frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->hdr;
frag->sr_desc.opcode = VAPI_SEND;
int ret;
if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY && frag->size <= mvapi_btl->super.btl_eager_limit){
/* atomically test and acquire a token */
if(!mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_hp,-1) < 0) {
BTL_VERBOSE(("Queing because no send tokens \n"));
/* check for a send wqe */
if (OPAL_THREAD_ADD32(&endpoint->sd_wqe_hp,-1) < 0) {
OPAL_THREAD_ADD32(&endpoint->sd_wqe_hp,1);
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
opal_list_append(&endpoint->pending_frags_hp, (opal_list_item_t *)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
return OMPI_SUCCESS;
/* check for a token */
} else if(!mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_hp,-1) < 0) {
OPAL_THREAD_ADD32(&endpoint->sd_wqe_hp,1);
OPAL_THREAD_ADD32(&endpoint->sd_tokens_hp,1);
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
opal_list_append(&endpoint->pending_frags_hp, (opal_list_item_t *)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
return OMPI_SUCCESS;
} else if( mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_hp,-1) < 0) {
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_hp,-1) < 0) {
OPAL_THREAD_ADD32(&endpoint->sd_wqe_hp,1);
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_hp,1);
OPAL_THREAD_LOCK(&mvapi_btl->ib_lock);
opal_list_append(&mvapi_btl->pending_frags_hp, (opal_list_item_t *)frag);
OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock);
return OMPI_SUCCESS;
} else {
frag->hdr->credits = endpoint->rd_credits_hp;
OPAL_THREAD_ADD32(&endpoint->rd_credits_hp, - frag->hdr->credits);
frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_hp;
qp_hndl = endpoint->lcl_qp_hndl_hp;
/* queue the request */
} else {
frag->hdr->credits = (endpoint->rd_credits_hp > 0) ? endpoint->rd_credits_hp : 0;
OPAL_THREAD_ADD32(&endpoint->rd_credits_hp, -frag->hdr->credits);
qp_hndl = endpoint->lcl_qp_hndl_hp;
}
} else {
/* atomically test and acquire a token */
if(!mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,-1) < 0 ) {
BTL_VERBOSE(("Queing because no send tokens \n"));
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t *)frag);
/* check for a send wqe */
if (OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp,-1) < 0) {
OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp,1);
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
return OMPI_SUCCESS;
/* check for a token */
} else if(!mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,-1) < 0 ) {
OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp,1);
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,1);
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
return OMPI_SUCCESS;
} else if(mca_btl_mvapi_component.use_srq &&
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,-1) < 0) {
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,-1) < 0) {
OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp,1);
OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,1);
opal_list_append(&mvapi_btl->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_LOCK(&mvapi_btl->ib_lock);
opal_list_append(&mvapi_btl->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock);
return OMPI_SUCCESS;
} else {
frag->hdr->credits = endpoint->rd_credits_lp;
OPAL_THREAD_ADD32(&endpoint->rd_credits_lp, - frag->hdr->credits);
frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_lp;
qp_hndl = endpoint->lcl_qp_hndl_lp;
/* queue the request */
} else {
frag->hdr->credits = (endpoint->rd_credits_lp > 0) ? endpoint->rd_credits_lp : 0;
OPAL_THREAD_ADD32(&endpoint->rd_credits_lp, -frag->hdr->credits);
qp_hndl = endpoint->lcl_qp_hndl_lp;
}
}
frag->sr_desc.opcode = VAPI_SEND;
frag->sr_desc.remote_qkey = 0;
frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->hdr;
frag->sg_entry.len = frag->segment.seg_len + sizeof(mca_btl_mvapi_header_t);
if(frag->sg_entry.len <= mvapi_btl->ib_inline_max) {
frag->ret = EVAPI_post_inline_sr(mvapi_btl->nic,
qp_hndl,
&frag->sr_desc);
}else {
frag->ret = VAPI_post_sr(mvapi_btl->nic,
qp_hndl,
&frag->sr_desc);
ret = EVAPI_post_inline_sr(mvapi_btl->nic, qp_hndl, &frag->sr_desc);
} else {
ret = VAPI_post_sr(mvapi_btl->nic, qp_hndl, &frag->sr_desc);
}
if(VAPI_OK != frag->ret) {
BTL_ERROR(("VAPI_post_sr: %s\n", VAPI_strerror(frag->ret)));
if(VAPI_OK != ret) {
BTL_ERROR(("VAPI_post_sr: %s\n", VAPI_strerror(ret)));
return OMPI_ERROR;
}
#ifdef VAPI_FEATURE_SRQ
@ -143,7 +176,6 @@ static inline int mca_btl_mvapi_endpoint_post_send(
MCA_BTL_MVAPI_ENDPOINT_POST_RR_HIGH(endpoint, 1);
MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(endpoint, 1);
}
return OMPI_SUCCESS;
}
@ -173,11 +205,17 @@ static void mca_btl_mvapi_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
endpoint->rd_posted_hp = 0;
endpoint->rd_posted_lp = 0;
/* number of available send wqes */
endpoint->sd_wqe_hp = mca_btl_mvapi_component.rd_num;
endpoint->sd_wqe_lp = mca_btl_mvapi_component.rd_num;
/* zero these out w/ initial posting, so that we start out w/
* zero credits to return to peer
*/
endpoint->rd_credits_hp = -(mca_btl_mvapi_component.rd_num + mca_btl_mvapi_component.rd_rsv);
endpoint->rd_credits_lp = -(mca_btl_mvapi_component.rd_num + mca_btl_mvapi_component.rd_rsv);
endpoint->sd_credits_hp = 0;
endpoint->sd_credits_lp = 0;
/* initialize the high and low priority tokens */
endpoint->sd_tokens_hp = mca_btl_mvapi_component.rd_num;
@ -252,32 +290,6 @@ static int mca_btl_mvapi_endpoint_send_connect_data(mca_btl_base_endpoint_t* end
ORTE_ERROR_LOG(rc);
return rc;
}
#if 0
rc = orte_dps.pack(buffer, &((mva_btl_mvapi_endpoint_t*)endpoint)->rdma_buf->reg->r_key, 1, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.pack(buffer, &((mva_btl_mvapi_endpoint_t*)endpoint)->rdma_buf->base, 1, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.pack(buffer, &((mva_btl_mvapi_endpoint_t*)endpoint)->rdma_buf->entry_size, 1, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.pack(buffer, &((mva_btl_mvapi_endpoint_t*)endpoint)->rdma_buf->entry_cnt, 1, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
#endif
/* send to endpoint */
rc = orte_rml.send_buffer_nb(&endpoint->endpoint_proc->proc_guid, buffer, ORTE_RML_TAG_DYNAMIC-1, 0,
@ -365,16 +377,6 @@ static int mca_btl_mvapi_endpoint_start_connect(mca_btl_base_endpoint_t* endpoin
return rc;
}
#if 0
/* Create the RDMA buffer's for small messages */
if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_create_rdma_buf(endpoint->endpoint_btl,
(mca_btl_mvapi_endpoint_t*) endpoint))) {
BTL_ERROR(("error creating rdma_buf for small messages error code %d", rc));
return rc;
}
#endif
BTL_VERBOSE(("Initialized High Priority QP num = %d, Low Priority QP num = %d, LID = %d",
endpoint->lcl_qp_prop_hp.qp_num,
endpoint->lcl_qp_prop_lp.qp_num,
@ -430,15 +432,6 @@ static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t *
return rc;
}
#if 0
/* Create the RDMA buffer's for small messages */
if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_create_rdma_buf(endpoint->endpoint_btl,
(mca_btl_mvapi_endpoint_t*) endpoint))) {
BTL_ERROR(("error creating rdma_buf for small messages error code %d", rc));
return rc;
}
#endif
BTL_VERBOSE(("Initialized High Priority QP num = %d, Low Priority QP num = %d, LID = %d",
endpoint->lcl_qp_prop_hp.qp_num,
endpoint->lcl_qp_prop_lp.qp_num,
@ -541,41 +534,12 @@ static void mca_btl_mvapi_endpoint_recv(
ORTE_ERROR_LOG(rc);
return;
}
#if 0
rc = orte_dps.unpack(buffer, &ib_endpoint->rdma_buf->r_key, &cnt, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.unpack(buffer, &ib_endpoint->rdma_buf->rem_base, &cnt, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.unpack(buffer, &ib_endpoint->rdma_buf->rem_size, &cnt, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.unpack(buffer, &ib_endpoint->rdma_buf->rem_cnt, &cnt, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
#endif
BTL_VERBOSE(("Received High Priority QP num = %d, Low Priority QP num %d, LID = %d",
rem_info.rem_qp_num_hp,
rem_info.rem_qp_num_lp,
rem_info.rem_lid));
for(ib_proc = (mca_btl_mvapi_proc_t*)
opal_list_get_first(&mca_btl_mvapi_component.ib_procs);
ib_proc != (mca_btl_mvapi_proc_t*)
@ -812,40 +776,6 @@ int mca_btl_mvapi_endpoint_connect(
return OMPI_SUCCESS;
}
#if 0
/*
* Create the small message RDMA buffer
*/
int mca_btl_mvapi_endpoint_create_rdma_buf(
mca_btl_mvapi_module_t* mvapi_btl,
mca_btl_mvapi_endpoint_t* endpoint
)
{
endpoint->rdma_buf = (mca_btl_mvapi_rdma_buf_t*)
malloc(sizeof(mca_btl_mvapi_rdma_buf_t));
if(NULL == endpoint->rdma_buf) {
return OMPI_ERROR;
}
endpoint->entry_size = 8196;
endpoint->entry_cnt = 64;
endpoint->rdma_buf->base = mvapi_btl->btl_mpool->mpool_alloc(mvapi_btl->btl_mpool,
endpoint->rdma_buf->entry_size *
endpoint->rdma_buf->entry_cnt,
0,
0,
endpoint->rdma_buf->reg);
if(NULL == endpoint->rdma_buf->base) {
return OMPI_ERROR;
} else {
return OMPI_SUCCESS;
}
}
#endif
/*
* Create the queue pair note that this is just the initial
* queue pair creation and we need to get the remote queue pair
@ -875,10 +805,10 @@ int mca_btl_mvapi_endpoint_create_qp(
switch(transport_type) {
case VAPI_TS_RC: /* Set up RC qp parameters */
qp_init_attr.cap.max_oust_wr_rq = mca_btl_mvapi_component.rd_num + mca_btl_mvapi_component.rd_num;
qp_init_attr.cap.max_oust_wr_sq = mca_btl_mvapi_component.rd_num + mca_btl_mvapi_component.rd_num;
qp_init_attr.cap.max_sg_size_rq = mca_btl_mvapi_component.ib_sg_list_size;
qp_init_attr.cap.max_oust_wr_sq = mca_btl_mvapi_component.rd_num + 1;
qp_init_attr.cap.max_oust_wr_rq = mca_btl_mvapi_component.rd_num + mca_btl_mvapi_component.rd_rsv;
qp_init_attr.cap.max_sg_size_sq = mca_btl_mvapi_component.ib_sg_list_size;
qp_init_attr.cap.max_sg_size_rq = mca_btl_mvapi_component.ib_sg_list_size;
qp_init_attr.pd_hndl = ptag;
/* We don't have Reliable Datagram Handle right now */
qp_init_attr.rdd_hndl = 0;
@ -1041,17 +971,30 @@ int mca_btl_mvapi_endpoint_qp_init_query(
return OMPI_SUCCESS;
}
/**
* Return control fragment.
*/
static void mca_btl_mvapi_endpoint_control_cb(
static void mca_btl_mvapi_endpoint_credits_lp(
mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
int32_t credits;
/* we don't acquire a wqe or token for credit message - so decrement */
OPAL_THREAD_ADD32(&endpoint->sd_wqe_lp,-1);
/* check to see if there are addditional credits to return */
if ((credits = OPAL_THREAD_ADD32(&endpoint->sd_credits_lp,-1)) > 0) {
OPAL_THREAD_ADD32(&endpoint->sd_credits_lp,-credits);
if (endpoint->rd_credits_lp >= mca_btl_mvapi_component.rd_win &&
OPAL_THREAD_ADD32(&endpoint->sd_credits_lp,1) == 1) {
mca_btl_mvapi_endpoint_send_credits_lp(endpoint);
}
}
MCA_BTL_IB_FRAG_RETURN_EAGER((mca_btl_mvapi_module_t*)btl, (mca_btl_mvapi_frag_t*)descriptor);
}
@ -1059,40 +1002,111 @@ static void mca_btl_mvapi_endpoint_control_cb(
* Return credits to peer
*/
void mca_btl_mvapi_endpoint_send_credits(
mca_btl_mvapi_endpoint_t* endpoint,
VAPI_qp_hndl_t local_qp,
VAPI_qp_num_t remote_qp,
int32_t* credits)
void mca_btl_mvapi_endpoint_send_credits_lp(
mca_btl_mvapi_endpoint_t* endpoint)
{
mca_btl_mvapi_module_t* btl = endpoint->endpoint_btl;
mca_btl_mvapi_module_t* mvapi_btl = endpoint->endpoint_btl;
mca_btl_mvapi_frag_t* frag;
int rc;
int ret;
MCA_BTL_IB_FRAG_ALLOC_EAGER(btl, frag, rc);
MCA_BTL_IB_FRAG_ALLOC_EAGER(mvapi_btl, frag, ret);
if(NULL == frag) {
BTL_ERROR(("error allocating fragment"));
return;
}
frag->base.des_cbfunc = mca_btl_mvapi_endpoint_control_cb;
frag->base.des_cbfunc = mca_btl_mvapi_endpoint_credits_lp;
frag->base.des_cbdata = NULL;
frag->endpoint = endpoint;
frag->hdr->tag = MCA_BTL_TAG_BTL;
frag->hdr->credits = *credits;
OPAL_THREAD_ADD32(credits, -frag->hdr->credits);
frag->hdr->credits = endpoint->rd_credits_lp;
OPAL_THREAD_ADD32(&endpoint->rd_credits_lp, -frag->hdr->credits);
frag->sr_desc.remote_qkey = 0;
frag->sr_desc.opcode = VAPI_SEND;
frag->sr_desc.remote_qp = remote_qp;
frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->hdr;
frag->sg_entry.len = sizeof(mca_btl_mvapi_header_t);
rc = EVAPI_post_inline_sr(btl->nic, local_qp, &frag->sr_desc);
if(VAPI_SUCCESS != rc) {
BTL_ERROR(("error calling EVAPI_post_inline_sr: %s\n", VAPI_strerror(rc)));
MCA_BTL_IB_FRAG_RETURN_EAGER(btl, frag);
if(sizeof(mca_btl_mvapi_header_t) <= mvapi_btl->ib_inline_max) {
ret = EVAPI_post_inline_sr(mvapi_btl->nic, endpoint->lcl_qp_hndl_lp, &frag->sr_desc);
} else {
ret = VAPI_post_sr(mvapi_btl->nic, endpoint->lcl_qp_hndl_lp, &frag->sr_desc);
}
if(ret != VAPI_SUCCESS) {
OPAL_THREAD_ADD32(&endpoint->sd_credits_lp, -1);
OPAL_THREAD_ADD32(&endpoint->rd_credits_lp, frag->hdr->credits);
MCA_BTL_IB_FRAG_RETURN_EAGER(mvapi_btl, frag);
BTL_ERROR(("error posting send request errno %d says %s", strerror(errno)));
return;
}
}
/**
* Return control fragment.
*/
static void mca_btl_mvapi_endpoint_credits_hp(
mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
int32_t credits;
/* we don't acquire a wqe or token for credit message - so decrement */
OPAL_THREAD_ADD32(&endpoint->sd_wqe_hp,-1);
/* check to see if there are addditional credits to return */
if ((credits = OPAL_THREAD_ADD32(&endpoint->sd_credits_hp,-1)) > 0) {
OPAL_THREAD_ADD32(&endpoint->sd_credits_hp,-credits);
if (endpoint->rd_credits_hp >= mca_btl_mvapi_component.rd_win &&
OPAL_THREAD_ADD32(&endpoint->sd_credits_hp,1) == 1) {
mca_btl_mvapi_endpoint_send_credits_hp(endpoint);
}
}
MCA_BTL_IB_FRAG_RETURN_EAGER((mca_btl_mvapi_module_t*)btl, (mca_btl_mvapi_frag_t*)descriptor);
}
/**
* Return credits to peer
*/
void mca_btl_mvapi_endpoint_send_credits_hp(
mca_btl_mvapi_endpoint_t* endpoint)
{
mca_btl_mvapi_module_t* mvapi_btl = endpoint->endpoint_btl;
mca_btl_mvapi_frag_t* frag;
int ret;
MCA_BTL_IB_FRAG_ALLOC_EAGER(mvapi_btl, frag, ret);
if(NULL == frag) {
BTL_ERROR(("error allocating fragment"));
return;
}
frag->base.des_cbfunc = mca_btl_mvapi_endpoint_credits_hp;
frag->base.des_cbdata = NULL;
frag->endpoint = endpoint;
frag->hdr->tag = MCA_BTL_TAG_BTL;
frag->hdr->credits = endpoint->rd_credits_hp;
OPAL_THREAD_ADD32(&endpoint->rd_credits_hp, -frag->hdr->credits);
frag->sr_desc.opcode = VAPI_SEND;
frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->hdr;
frag->sg_entry.len = sizeof(mca_btl_mvapi_header_t);
if(sizeof(mca_btl_mvapi_header_t) <= mvapi_btl->ib_inline_max) {
ret = EVAPI_post_inline_sr(mvapi_btl->nic, endpoint->lcl_qp_hndl_hp, &frag->sr_desc);
} else {
ret = VAPI_post_sr(mvapi_btl->nic, endpoint->lcl_qp_hndl_hp, &frag->sr_desc);
}
if(ret != VAPI_SUCCESS) {
OPAL_THREAD_ADD32(&endpoint->sd_credits_lp, -1);
OPAL_THREAD_ADD32(&endpoint->rd_credits_lp, frag->hdr->credits);
MCA_BTL_IB_FRAG_RETURN_EAGER(mvapi_btl, frag);
BTL_ERROR(("error posting send request errno %d says %s", strerror(errno)));
return;
}
}

Просмотреть файл

@ -142,6 +142,10 @@ struct mca_btl_base_endpoint_t {
int32_t rd_posted_lp; /**< number of low priority descriptors posted to the nic*/
int32_t rd_credits_hp; /**< number of high priority credits to return to peer */
int32_t rd_credits_lp; /**< number of low priority credits to return to peer */
int32_t sd_credits_hp; /**< number of send wqe entries being used to return credits */
int32_t sd_credits_lp; /**< number of send wqe entries being used to return credits */
int32_t sd_wqe_hp; /**< number of available send wqe entries */
int32_t sd_wqe_lp; /**< number of available send wqe entries */
uint32_t subnet;
#if 0
@ -153,11 +157,8 @@ typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t;
typedef mca_btl_base_endpoint_t mca_btl_mvapi_endpoint_t;
int mca_btl_mvapi_endpoint_send(mca_btl_base_endpoint_t* endpoint, struct mca_btl_mvapi_frag_t* frag);
int mca_btl_mvapi_endpoint_connect(mca_btl_base_endpoint_t*);
void mca_btl_mvapi_endpoint_send_credits(
mca_btl_base_endpoint_t*,
VAPI_qp_hndl_t local,
VAPI_qp_num_t rem,
int32_t* credits);
void mca_btl_mvapi_endpoint_send_credits_hp(mca_btl_base_endpoint_t*);
void mca_btl_mvapi_endpoint_send_credits_lp(mca_btl_base_endpoint_t*);
void mca_btl_mvapi_post_recv(void);

Просмотреть файл

@ -41,14 +41,6 @@ struct mca_btl_mvapi_header_t {
typedef struct mca_btl_mvapi_header_t mca_btl_mvapi_header_t;
typedef enum {
MCA_BTL_IB_FRAG_SEND,
MCA_BTL_IB_FRAG_PUT,
MCA_BTL_IB_FRAG_GET,
MCA_BTL_IB_FRAG_ACK
} mca_btl_mvapi_frag_type_t;
/**
* IB send fragment derived type.
*/
@ -64,8 +56,6 @@ struct mca_btl_mvapi_frag_t {
VAPI_sr_desc_t sr_desc;
};
VAPI_sg_lst_entry_t sg_entry;
/* VAPI_mr_hndl_t mem_hndl; */
VAPI_ret_t ret;
mca_btl_mvapi_header_t *hdr;
mca_mpool_mvapi_registration_t * vapi_reg;
};