From 2d9c509add2187866acea820559e6171972183c2 Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Tue, 8 Nov 2005 16:50:07 +0000 Subject: [PATCH] flow control This commit was SVN r8039. --- ompi/mca/btl/mvapi/btl_mvapi.c | 89 ++++--- ompi/mca/btl/mvapi/btl_mvapi.h | 83 +++---- ompi/mca/btl/mvapi/btl_mvapi_component.c | 301 +++++++++++++---------- ompi/mca/btl/mvapi/btl_mvapi_endpoint.c | 199 ++++++++++----- ompi/mca/btl/mvapi/btl_mvapi_endpoint.h | 123 +++++---- ompi/mca/btl/mvapi/btl_mvapi_frag.c | 21 +- ompi/mca/btl/mvapi/btl_mvapi_frag.h | 33 ++- 7 files changed, 490 insertions(+), 359 deletions(-) diff --git a/ompi/mca/btl/mvapi/btl_mvapi.c b/ompi/mca/btl/mvapi/btl_mvapi.c index b72b270954..524115efe3 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi.c +++ b/ompi/mca/btl/mvapi/btl_mvapi.c @@ -128,10 +128,10 @@ int mca_btl_mvapi_add_procs( if( 0 == mvapi_btl->num_peers ) { mvapi_btl->num_peers += nprocs; if(mca_btl_mvapi_component.use_srq) { - mvapi_btl->rd_buf_max = mca_btl_mvapi_component.ib_rr_buf_max + log2(nprocs) * mca_btl_mvapi_component.rd_per_peer; + mvapi_btl->rd_num = mca_btl_mvapi_component.rd_num + log2(nprocs) * mca_btl_mvapi_component.rd_per_peer; free(mvapi_btl->rr_desc_post); - mvapi_btl->rr_desc_post = (VAPI_rr_desc_t*) malloc((mvapi_btl->rd_buf_max * sizeof(VAPI_rr_desc_t))); - mvapi_btl->rd_buf_min = mvapi_btl->rd_buf_max / 2; + mvapi_btl->rr_desc_post = (VAPI_rr_desc_t*) malloc((mvapi_btl->rd_num * sizeof(VAPI_rr_desc_t))); + mvapi_btl->rd_low = mvapi_btl->rd_num / 2; } } return OMPI_SUCCESS; @@ -506,7 +506,7 @@ int mca_btl_mvapi_finalize(struct mca_btl_base_module_t* btl) mca_btl_mvapi_module_t* mvapi_btl; mvapi_btl = (mca_btl_mvapi_module_t*) btl; -#if 0 +#if 1 if(mvapi_btl->send_free_eager.fl_num_allocated != mvapi_btl->send_free_eager.super.opal_list_length){ opal_output(0, "btl ib send_free_eager frags: %d allocated %d returned \n", @@ -579,30 +579,33 @@ int mca_btl_mvapi_put( mca_btl_base_module_t* btl, frag->sr_desc.opcode = VAPI_RDMA_WRITE; /* atomically test and acquire a token */ if(!mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0) { - BTL_VERBOSE(("Queing because no rdma write tokens \n")); - BTL_MVAPI_INSERT_PENDING(frag, endpoint->pending_frags_lp, - endpoint->wr_sq_tokens_lp, endpoint->endpoint_lock, rc); + OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,-1) < 0) { + + OPAL_THREAD_LOCK(&endpoint->endpoint_lock); + opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t*)frag); + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,1); rc = OMPI_SUCCESS; + } else if(mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&mvapi_btl->wr_sq_tokens_lp,-1) < 0) { + OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,-1) < 0) { + + OPAL_THREAD_LOCK(&mvapi_btl->btl_lock); opal_list_append(&mvapi_btl->pending_frags_lp, (opal_list_item_t *)frag); - OPAL_THREAD_ADD32(&mvapi_btl->wr_sq_tokens_lp,1); + OPAL_THREAD_UNLOCK(&mvapi_btl->btl_lock); + OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,1); rc = OMPI_SUCCESS; } else { - - - - frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_low; + frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_lp; frag->sr_desc.remote_addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_dst->seg_addr.pval; frag->sr_desc.r_key = frag->base.des_dst->seg_key.key32[0]; frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_src->seg_addr.pval; frag->sg_entry.len = frag->base.des_src->seg_len; frag->ret = VAPI_post_sr(mvapi_btl->nic, - endpoint->lcl_qp_hndl_low, + endpoint->lcl_qp_hndl_lp, &frag->sr_desc); if(VAPI_OK != frag->ret){ rc = OMPI_ERROR; @@ -640,27 +643,41 @@ int mca_btl_mvapi_get( mca_btl_base_module_t* btl, frag->endpoint = endpoint; /* atomically test and acquire a token */ if(!mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0) { - BTL_VERBOSE(("Queing because no rdma write tokens \n")); - BTL_MVAPI_INSERT_PENDING(frag, endpoint->pending_frags_lp, - endpoint->wr_sq_tokens_lp, endpoint->endpoint_lock, rc); + OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,-1) < 0) { + + OPAL_THREAD_LOCK(&endpoint->endpoint_lock); + opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t*)frag); + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,1); rc = OMPI_SUCCESS; + } else if(mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&mvapi_btl->wr_sq_tokens_lp,-1) < 0) { + OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,-1) < 0) { + + OPAL_THREAD_LOCK(&mvapi_btl->btl_lock); opal_list_append(&mvapi_btl->pending_frags_lp, (opal_list_item_t *)frag); - OPAL_THREAD_ADD32(&mvapi_btl->wr_sq_tokens_lp,1); + OPAL_THREAD_UNLOCK(&mvapi_btl->btl_lock); + OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,1); rc = OMPI_SUCCESS; - } else { + } else if(OPAL_THREAD_ADD32(&endpoint->get_tokens,-1) < 0) { + + OPAL_THREAD_LOCK(&endpoint->endpoint_lock); + opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t*)frag); + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + OPAL_THREAD_ADD32(&endpoint->get_tokens,1); + rc = OMPI_SUCCESS; + + } else { - frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_low; + frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_lp; frag->sr_desc.remote_addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_src->seg_addr.pval; frag->sr_desc.r_key = frag->base.des_src->seg_key.key32[0]; frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_dst->seg_addr.pval; frag->sg_entry.len = frag->base.des_dst->seg_len; frag->ret = VAPI_post_sr(mvapi_btl->nic, - endpoint->lcl_qp_hndl_low, + endpoint->lcl_qp_hndl_lp, &frag->sr_desc); if(VAPI_OK != frag->ret){ rc = OMPI_ERROR; @@ -720,7 +737,7 @@ static void async_event_handler(VAPI_hca_hndl_t hca_hndl, case VAPI_SRQ_LIMIT_REACHED: { - int i; + size_t i; BTL_ERROR(("SRQ limit is reached, posting more buffers %s\n", VAPI_event_record_sym(event_p->type))); for(i = 0; i < mca_btl_mvapi_component.ib_num_btls; i++) { mca_btl_mvapi_module_t* mvapi_btl = &mca_btl_mvapi_component.mvapi_btls[i]; @@ -731,7 +748,7 @@ static void async_event_handler(VAPI_hca_hndl_t hca_hndl, } case VAPI_RECEIVE_QUEUE_DRAINED: { - + fprintf(stderr, "VAPI_RECEIVE_QUEUE_DRAINEDD\n"); } default: @@ -763,8 +780,8 @@ int mca_btl_mvapi_module_init(mca_btl_mvapi_module_t *mvapi_btl) } if(mca_btl_mvapi_component.use_srq) { - mvapi_btl->srr_posted_high = 0; - mvapi_btl->srr_posted_low = 0; + mvapi_btl->srd_posted_hp = 0; + mvapi_btl->srd_posted_lp = 0; srq_attr.pd_hndl = mvapi_btl->ptag; srq_attr.max_outs_wr = mca_btl_mvapi_component.ib_wq_size; srq_attr.max_sentries = mca_btl_mvapi_component.ib_sg_list_size; @@ -772,7 +789,7 @@ int mca_btl_mvapi_module_init(mca_btl_mvapi_module_t *mvapi_btl) srq_attr_mod.srq_limit = 16;/* mca_btl_mvapi_component.ib_wq_size; */ ret = VAPI_create_srq(mvapi_btl->nic, &srq_attr, - &mvapi_btl->srq_hndl_high, + &mvapi_btl->srq_hndl_hp, &srq_attr_out); if(ret != VAPI_OK) { BTL_ERROR(("error in VAPI_create_srq: %s", VAPI_strerror(ret))); @@ -785,7 +802,7 @@ int mca_btl_mvapi_module_init(mca_btl_mvapi_module_t *mvapi_btl) ret = VAPI_modify_srq ( mvapi_btl->nic, - mvapi_btl->srq_hndl_high, + mvapi_btl->srq_hndl_hp, &srq_attr_mod, srq_attr_mask, &max_outs_wr @@ -798,7 +815,7 @@ int mca_btl_mvapi_module_init(mca_btl_mvapi_module_t *mvapi_btl) ret = VAPI_create_srq(mvapi_btl->nic, &srq_attr, - &mvapi_btl->srq_hndl_low, + &mvapi_btl->srq_hndl_lp, &srq_attr_out); if(ret != VAPI_OK) { BTL_ERROR(("error in VAPI_create_srq: %s", VAPI_strerror(ret))); @@ -811,7 +828,7 @@ int mca_btl_mvapi_module_init(mca_btl_mvapi_module_t *mvapi_btl) ret = VAPI_modify_srq ( mvapi_btl->nic, - mvapi_btl->srq_hndl_low, + mvapi_btl->srq_hndl_lp, &srq_attr_mod, srq_attr_mask, &max_outs_wr @@ -824,11 +841,11 @@ int mca_btl_mvapi_module_init(mca_btl_mvapi_module_t *mvapi_btl) } else { - mvapi_btl->srq_hndl_high = VAPI_INVAL_SRQ_HNDL; - mvapi_btl->srq_hndl_low = VAPI_INVAL_SRQ_HNDL; + mvapi_btl->srq_hndl_hp = VAPI_INVAL_SRQ_HNDL; + mvapi_btl->srq_hndl_lp = VAPI_INVAL_SRQ_HNDL; } ret = VAPI_create_cq(mvapi_btl->nic, mca_btl_mvapi_component.ib_cq_size, - &mvapi_btl->cq_hndl_low, &cqe_cnt); + &mvapi_btl->cq_hndl_lp, &cqe_cnt); if( VAPI_OK != ret) { @@ -837,7 +854,7 @@ int mca_btl_mvapi_module_init(mca_btl_mvapi_module_t *mvapi_btl) } ret = VAPI_create_cq(mvapi_btl->nic, mca_btl_mvapi_component.ib_cq_size, - &mvapi_btl->cq_hndl_high, &cqe_cnt); + &mvapi_btl->cq_hndl_hp, &cqe_cnt); if( VAPI_OK != ret) { diff --git a/ompi/mca/btl/mvapi/btl_mvapi.h b/ompi/mca/btl/mvapi/btl_mvapi.h index 55759d8c35..4a52efa071 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi.h +++ b/ompi/mca/btl/mvapi/btl_mvapi.h @@ -86,14 +86,13 @@ struct mca_btl_mvapi_component_t { char* ib_mpool_name; /**< name of ib memory pool */ - - uint32_t ib_rr_buf_max; - /**< the maximum number of posted rr */ - - uint32_t ib_rr_buf_min; - /**< the minimum number of posted rr */ + + int32_t rd_num; /**< the number of receive descriptors to post to each queue pair */ + int32_t rd_low; /**< low water mark to reach before posting additional receive descriptors */ + int32_t rd_win; /**< ack credits when window size exceeded */ + int32_t rd_rsv; /**< descriptors held in reserve for control messages */ - uint32_t rd_per_peer; + int32_t rd_per_peer; /**< the number of recv desc posted per log(peer) in SRQ mode */ size_t eager_limit; @@ -144,8 +143,8 @@ struct mca_btl_mvapi_module_t { VAPI_hca_hndl_t nic; /**< NIC handle */ VAPI_pd_hndl_t ptag; /**< Protection Domain tag */ - VAPI_cq_hndl_t cq_hndl_high; /**< High Priority Completion Queue handle */ - VAPI_cq_hndl_t cq_hndl_low; /**< Low Priority Completion Queue handle */ + VAPI_cq_hndl_t cq_hndl_hp; /**< High Priority Completion Queue handle */ + VAPI_cq_hndl_t cq_hndl_lp; /**< Low Priority Completion Queue handle */ EVAPI_async_handler_hndl_t async_handler; /**< Async event handler used to detect weird/unknown events */ @@ -163,32 +162,26 @@ struct mca_btl_mvapi_module_t { opal_list_t repost; /**< list of buffers to repost */ opal_mutex_t ib_lock; /**< module level lock */ - VAPI_rr_desc_t* rr_desc_post; - VAPI_srq_hndl_t srq_hndl_high; /**< A high priority shared receive queue + VAPI_rr_desc_t* rr_desc_post; /**< an array to allow posting of rr in one swoop */ + VAPI_srq_hndl_t srq_hndl_hp; /**< A high priority shared receive queue runtime optional, can also use a receive queue per queue pair.. */ - VAPI_srq_hndl_t srq_hndl_low; /**< A low priority shared receive queue */ + VAPI_srq_hndl_t srq_hndl_lp; /**< A low priority shared receive queue */ - uint32_t srr_posted_high; /**< number of high priority shared rr posted to the nic*/ - uint32_t srr_posted_low; /**< number of low priority shared rr posted to the nic*/ - - /**< an array to allow posting of rr in one swoop */ size_t ib_inline_max; /**< max size of inline send*/ - - uint32_t num_peers; - uint32_t rd_buf_max; - uint32_t rd_buf_min; - - int32_t wr_sq_tokens_hp; - /**< number of high priority frags that can be outstanding (down counter) */ - int32_t wr_sq_tokens_lp; - /**< number of low priority frags that can be outstanding (down counter) */ - - opal_list_t pending_frags_hp; - /**< list of pending high priority frags */ + int32_t num_peers; - opal_list_t pending_frags_lp; - /**< list of pending low priority frags */ + int32_t srd_posted_hp; /**< number of high priority shared receive descriptors posted to the nic*/ + int32_t srd_posted_lp; /**< number of low priority shared receive descriptors posted to the nic*/ + + int32_t rd_num; /**< number of receive descriptors to post to srq */ + int32_t rd_low; /**< low water mark before reposting descriptors to srq */ + + int32_t sd_tokens_hp; /**< number of send tokens available on high priority srq */ + int32_t sd_tokens_lp; /**< number of send tokens available on low priority srq */ + + opal_list_t pending_frags_hp; /**< list of pending high priority frags */ + opal_list_t pending_frags_lp; /**< list of pending low priority frags */ }; typedef struct mca_btl_mvapi_module_t mca_btl_mvapi_module_t; @@ -200,15 +193,15 @@ struct mca_btl_mvapi_module_t { { \ do { \ OPAL_THREAD_LOCK(&mvapi_btl->ib_lock); \ - if(mvapi_btl->srr_posted_high <= mvapi_btl->rd_buf_min+additional && \ - mvapi_btl->srr_posted_high < mvapi_btl->rd_buf_max){ \ - MCA_BTL_MVAPI_POST_SRR_SUB(mvapi_btl->rd_buf_max - \ - mvapi_btl->srr_posted_high, \ + if(mvapi_btl->srd_posted_hp <= mvapi_btl->rd_low+additional && \ + mvapi_btl->srd_posted_hp < mvapi_btl->rd_num){ \ + MCA_BTL_MVAPI_POST_SRR_SUB(mvapi_btl->rd_num - \ + mvapi_btl->srd_posted_hp, \ mvapi_btl, \ &mvapi_btl->recv_free_eager, \ - &mvapi_btl->srr_posted_high, \ + &mvapi_btl->srd_posted_hp, \ mvapi_btl->nic, \ - mvapi_btl->srq_hndl_high); \ + mvapi_btl->srq_hndl_hp); \ } \ OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock); \ }while(0);\ @@ -219,15 +212,15 @@ struct mca_btl_mvapi_module_t { { \ do { \ OPAL_THREAD_LOCK(&mvapi_btl->ib_lock); \ - if(mvapi_btl->srr_posted_low <= mvapi_btl->rd_buf_min+additional && \ - mvapi_btl->srr_posted_low < mvapi_btl->rd_buf_max){ \ - MCA_BTL_MVAPI_POST_SRR_SUB(mvapi_btl->rd_buf_max - \ - mvapi_btl->srr_posted_low, \ + if(mvapi_btl->srd_posted_lp <= mvapi_btl->rd_low+additional && \ + mvapi_btl->srd_posted_lp < mvapi_btl->rd_num){ \ + MCA_BTL_MVAPI_POST_SRR_SUB(mvapi_btl->rd_num - \ + mvapi_btl->srd_posted_lp, \ mvapi_btl, \ &mvapi_btl->recv_free_max, \ - &mvapi_btl->srr_posted_low, \ + &mvapi_btl->srd_posted_lp, \ mvapi_btl->nic, \ - mvapi_btl->srq_hndl_low); \ + mvapi_btl->srq_hndl_lp); \ } \ OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock); \ } while(0); \ @@ -237,12 +230,12 @@ struct mca_btl_mvapi_module_t { #define MCA_BTL_MVAPI_POST_SRR_SUB(cnt, \ mvapi_btl, \ frag_list, \ - srr_posted, \ + srd_posted, \ nic, \ srq_hndl) \ {\ do { \ - uint32_t i; \ + int32_t i; \ VAPI_ret_t ret; \ uint32_t rwqe_posted = 0; \ int rc; \ @@ -268,7 +261,7 @@ struct mca_btl_mvapi_module_t { } else if(rwqe_posted < 1) { \ BTL_ERROR(("error posting receive descriptors to shared receive queue, number of entries posted is %d", rwqe_posted)); \ } else {\ - OPAL_THREAD_ADD32(srr_posted, cnt); \ + OPAL_THREAD_ADD32(srd_posted, cnt); \ }\ } while(0);\ } diff --git a/ompi/mca/btl/mvapi/btl_mvapi_component.c b/ompi/mca/btl/mvapi/btl_mvapi_component.c index f7f1250cfc..8d2134be2f 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi_component.c +++ b/ompi/mca/btl/mvapi/btl_mvapi_component.c @@ -21,6 +21,7 @@ #include #include "ompi/include/constants.h" #include "opal/event/event.h" +#include "opal/include/sys/timer.h" #include "opal/util/if.h" #include "opal/util/argv.h" #include "opal/util/output.h" @@ -42,6 +43,8 @@ #include "mca/pml/base/pml_base_module_exchange.h" #include + + mca_btl_mvapi_component_t mca_btl_mvapi_component = { { /* First, the mca_base_component_t struct containing meta information @@ -130,15 +133,11 @@ int mca_btl_mvapi_component_open(void) mca_btl_mvapi_param_register_int ("free_list_num", "intial size of free lists", 8, &mca_btl_mvapi_component.ib_free_list_num); mca_btl_mvapi_param_register_int ("free_list_max", "maximum size of free lists", - 1024, &mca_btl_mvapi_component.ib_free_list_max); + -1, &mca_btl_mvapi_component.ib_free_list_max); mca_btl_mvapi_param_register_int ("free_list_inc", "increment size of free lists", 32, &mca_btl_mvapi_component.ib_free_list_inc); mca_btl_mvapi_param_register_string("mpool", "name of the memory pool to be used", "mvapi", &mca_btl_mvapi_component.ib_mpool_name); - mca_btl_mvapi_param_register_int("rd_max", "maximum number of receive descriptors to post to a QP", - 16, (int*) &mca_btl_mvapi_component.ib_rr_buf_max); - mca_btl_mvapi_param_register_int("rd_min", "minimum number of receive descriptors before reposting occurs", - 8, (int*) &mca_btl_mvapi_component.ib_rr_buf_min); mca_btl_mvapi_param_register_int("reg_mru_len", "length of the registration cache most recently used list", 16, (int*) &mca_btl_mvapi_component.reg_mru_len); mca_btl_mvapi_param_register_int("use_srq", "if 1 use the IB shared receive queue to post receive descriptors", @@ -175,10 +174,16 @@ int mca_btl_mvapi_component_open(void) mca_btl_mvapi_param_register_int("ib_src_path_bits", "IB source path bits", 0, (int*) &mca_btl_mvapi_component.ib_src_path_bits); - + mca_btl_mvapi_param_register_int("rd_num", "number of receive descriptors to post to a QP", + 16, (int*) &mca_btl_mvapi_component.rd_num); + mca_btl_mvapi_param_register_int("rd_low", "low water mark before reposting occurs", + 12, (int*) &mca_btl_mvapi_component.rd_low); + mca_btl_mvapi_param_register_int("rd_win", "window size at which generate explicity credit message", + 8, (int*) &mca_btl_mvapi_component.rd_win); + mca_btl_mvapi_component.rd_rsv = ((mca_btl_mvapi_component.rd_num<<1)-1) / mca_btl_mvapi_component.rd_win; mca_btl_mvapi_param_register_int("rd_per_peer", "receive descriptors posted per peer, SRQ mode only", - 16, (int*) &mca_btl_mvapi_component.rd_per_peer); + 16, (int*) &mca_btl_mvapi_component.rd_per_peer); mca_btl_mvapi_param_register_int ("exclusivity", "BTL exclusivity", MCA_BTL_EXCLUSIVITY_DEFAULT, (int*) &mca_btl_mvapi_module.super.btl_exclusivity); mca_btl_mvapi_param_register_int ("eager_limit", "eager send limit", @@ -256,6 +261,27 @@ mca_btl_mvapi_modex_send(void) return rc; } +/* + * Callback function on control message. + */ + +static void mca_btl_mvapi_control( + struct mca_btl_base_module_t* btl, + mca_btl_base_tag_t tag, + mca_btl_base_descriptor_t* descriptor, + void* cbdata) +{ + /* dont return credits used for control messages */ + mca_btl_mvapi_frag_t* frag = (mca_btl_mvapi_frag_t*)descriptor; + mca_btl_mvapi_endpoint_t* endpoint = frag->endpoint; + if(frag->size == mca_btl_mvapi_component.eager_limit) { + OPAL_THREAD_ADD32(&endpoint->rd_credits_hp, -1); + } else { + OPAL_THREAD_ADD32(&endpoint->rd_credits_lp, -1); + } +} + + /* * IB component initialization: @@ -284,9 +310,12 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules, mca_btl_base_selected_module_t* ib_selected; opal_list_item_t* item; +#if 0 /* ugly HACK!! */ - /* mallopt(M_TRIM_THRESHOLD, -1); */ -/* mallopt(M_MMAP_MAX, 0); */ + mallopt(M_TRIM_THRESHOLD, -1); + mallopt(M_MMAP_MAX, 0); +#endif + /* initialization */ *num_btl_modules = 0; @@ -356,6 +385,8 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules, mvapi_btl->port_id = (IB_port_t) j; mvapi_btl->port = hca_port; mvapi_btl->port_info.subnet = hca_port.sm_lid; + mvapi_btl->ib_reg[MCA_BTL_TAG_BTL].cbfunc = mca_btl_mvapi_control; + mvapi_btl->ib_reg[MCA_BTL_TAG_BTL].cbdata = NULL; opal_list_append(&btl_list, (opal_list_item_t*) ib_selected); mca_btl_mvapi_component.ib_num_btls ++; @@ -397,11 +428,10 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules, free(mvapi_btl); mvapi_btl = &mca_btl_mvapi_component.mvapi_btls[i]; - mvapi_btl->rd_buf_max = mca_btl_mvapi_component.ib_rr_buf_max; - mvapi_btl->rd_buf_min = mca_btl_mvapi_component.ib_rr_buf_min; + mvapi_btl->rd_num = mca_btl_mvapi_component.rd_num + mca_btl_mvapi_component.rd_rsv; + mvapi_btl->rd_low = mca_btl_mvapi_component.rd_low; mvapi_btl->num_peers = 0; - mvapi_btl->wr_sq_tokens_hp = - mvapi_btl->wr_sq_tokens_lp = mca_btl_mvapi_component.max_total_wr_sq_tokens; + mvapi_btl->sd_tokens_hp = mvapi_btl->sd_tokens_lp = mca_btl_mvapi_component.max_wr_sq_tokens; /* Initialize module state */ @@ -450,7 +480,7 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules, ompi_free_list_init(&mvapi_btl->send_free_eager, length, OBJ_CLASS(mca_btl_mvapi_send_frag_eager_t), - 2*mvapi_btl->rd_buf_max, + 2*mvapi_btl->rd_num, mca_btl_mvapi_component.ib_free_list_max, mca_btl_mvapi_component.ib_free_list_inc, mvapi_btl->super.btl_mpool); @@ -458,7 +488,7 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules, ompi_free_list_init(&mvapi_btl->recv_free_eager, length, OBJ_CLASS(mca_btl_mvapi_recv_frag_eager_t), - 2*mvapi_btl->rd_buf_max, + 2*mvapi_btl->rd_num, mca_btl_mvapi_component.ib_free_list_max, mca_btl_mvapi_component.ib_free_list_inc, mvapi_btl->super.btl_mpool); @@ -474,7 +504,7 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules, ompi_free_list_init(&mvapi_btl->send_free_max, length, OBJ_CLASS(mca_btl_mvapi_send_frag_max_t), - 2*mvapi_btl->rd_buf_max, + 2*mvapi_btl->rd_num, mca_btl_mvapi_component.ib_free_list_max, mca_btl_mvapi_component.ib_free_list_inc, mvapi_btl->super.btl_mpool); @@ -485,7 +515,7 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules, ompi_free_list_init (&mvapi_btl->recv_free_max, length, OBJ_CLASS (mca_btl_mvapi_recv_frag_max_t), - 2*mvapi_btl->rd_buf_max, + 2*mvapi_btl->rd_num, mca_btl_mvapi_component.ib_free_list_max, mca_btl_mvapi_component.ib_free_list_inc, mvapi_btl->super.btl_mpool); @@ -509,7 +539,7 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules, /* Initialize the rr_desc_post array for posting of rr*/ - mvapi_btl->rr_desc_post = (VAPI_rr_desc_t*) malloc((mvapi_btl->rd_buf_max * sizeof(VAPI_rr_desc_t))); + mvapi_btl->rr_desc_post = (VAPI_rr_desc_t*) malloc((mvapi_btl->rd_num * sizeof(VAPI_rr_desc_t))); btls[i] = &mvapi_btl->super; } @@ -532,6 +562,8 @@ int mca_btl_mvapi_component_progress() { uint32_t i; int count = 0; + int32_t credits; + mca_btl_mvapi_frag_t* frag; mca_btl_mvapi_endpoint_t* endpoint; /* Poll for completions */ @@ -545,7 +577,7 @@ int mca_btl_mvapi_component_progress() * we will check the high priority and process them until there are none left. * note that low priority messages are only processed one per progress call. */ - ret = VAPI_poll_cq(mvapi_btl->nic, mvapi_btl->cq_hndl_high, &comp); + ret = VAPI_poll_cq(mvapi_btl->nic, mvapi_btl->cq_hndl_hp, &comp); if(VAPI_OK == ret) { if(comp.status != VAPI_SUCCESS) { BTL_ERROR(("Got error : %s, Vendor code : %d Frag : %p", @@ -561,73 +593,87 @@ int mca_btl_mvapi_component_progress() return OMPI_ERROR; case VAPI_CQE_SQ_SEND_DATA : - case VAPI_CQE_SQ_RDMA_READ: - case VAPI_CQE_SQ_RDMA_WRITE: - frag = (mca_btl_mvapi_frag_t*) (unsigned long) comp.id; - /* Process a completed send or an rdma write */ - frag->rc = OMPI_SUCCESS; - frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, frag->rc); - count++; - /* check and see if we need to progress pending sends */ - if( !mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_hp, 1) > 0 - && !opal_list_is_empty(&(frag->endpoint->pending_frags_hp))) { - opal_list_item_t *frag_item; - OPAL_THREAD_LOCK(&frag->endpoint->endpoint_lock); - frag_item = opal_list_remove_first(&(frag->endpoint->pending_frags_hp)); - OPAL_THREAD_UNLOCK(&frag->endpoint->endpoint_lock); - frag = (mca_btl_mvapi_frag_t *) frag_item; - if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) { - BTL_ERROR(("error in posting pending send\n")); - } - } + /* Process a completed send */ + frag = (mca_btl_mvapi_frag_t*) (unsigned long) comp.id; + endpoint = (mca_btl_mvapi_endpoint_t*) frag->endpoint; + + frag->rc = OMPI_SUCCESS; + frag->base.des_cbfunc(&mvapi_btl->super, endpoint, &frag->base, frag->rc); + count++; + + /* check and see if we need to progress pending sends */ if( mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&mvapi_btl->wr_sq_tokens_hp, 1) > 0 + OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_hp, 1) > 0 && !opal_list_is_empty(&mvapi_btl->pending_frags_hp)) { opal_list_item_t *frag_item; frag_item = opal_list_remove_first(&mvapi_btl->pending_frags_hp); frag = (mca_btl_mvapi_frag_t *) frag_item; - if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) { + if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(endpoint, frag)) { BTL_ERROR(("error in posting pending send\n")); } } - - break; case VAPI_CQE_RQ_SEND_DATA: - /* Process a RECV */ - BTL_VERBOSE(("Got a recv completion")); + /* process a RECV */ frag = (mca_btl_mvapi_frag_t*) (unsigned long) comp.id; endpoint = (mca_btl_mvapi_endpoint_t*) frag->endpoint; + credits = frag->hdr->credits; - frag->rc=OMPI_SUCCESS; - frag->segment.seg_len = comp.byte_len-((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr); - /* advance the segment address past the header and subtract from the length..*/ - mvapi_btl->ib_reg[frag->hdr->tag].cbfunc(&mvapi_btl->super, frag->hdr->tag, &frag->base, mvapi_btl->ib_reg[frag->hdr->tag].cbdata); - - OMPI_FREE_LIST_RETURN(&(mvapi_btl->recv_free_eager), (opal_list_item_t*) frag); - - + /* repost receive descriptors */ if(mca_btl_mvapi_component.use_srq) { - OPAL_THREAD_ADD32(&mvapi_btl->srr_posted_high, -1); + OPAL_THREAD_ADD32(&mvapi_btl->srd_posted_hp, -1); MCA_BTL_MVAPI_POST_SRR_HIGH(mvapi_btl, 0); } else { - OPAL_THREAD_ADD32(&endpoint->rr_posted_high, -1); - MCA_BTL_MVAPI_ENDPOINT_POST_RR_HIGH(((mca_btl_mvapi_frag_t*) (unsigned long) comp.id)->endpoint, 0); + OPAL_THREAD_ADD32(&endpoint->rd_posted_hp, -1); + MCA_BTL_MVAPI_ENDPOINT_POST_RR_HIGH(endpoint, 0); + } + + /* advance the segment address past the header and subtract from the length..*/ + frag->rc=OMPI_SUCCESS; + frag->segment.seg_len = comp.byte_len-((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr); + /* call registered callback */ + mvapi_btl->ib_reg[frag->hdr->tag].cbfunc(&mvapi_btl->super, frag->hdr->tag, &frag->base, mvapi_btl->ib_reg[frag->hdr->tag].cbdata); + OMPI_FREE_LIST_RETURN(&(mvapi_btl->recv_free_eager), (opal_list_item_t*) frag); + + /* check to see if we need to progress any pending desciptors */ + if( !mca_btl_mvapi_component.use_srq && + OPAL_THREAD_ADD32(&endpoint->sd_tokens_hp, credits) > 0 + && !opal_list_is_empty(&(endpoint->pending_frags_hp))) { + + do { + opal_list_item_t *frag_item; + OPAL_THREAD_LOCK(&endpoint->endpoint_lock); + frag_item = opal_list_remove_first(&(endpoint->pending_frags_hp)); + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + if(NULL == (frag = (mca_btl_mvapi_frag_t *) frag_item)) + break; + if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) { + BTL_ERROR(("error in posting pending send\n")); + break; + } + } while(endpoint->sd_tokens_hp > 0); + } + + /* check to see if we need to return credits */ + if( !mca_btl_mvapi_component.use_srq && + endpoint->rd_credits_hp >= mca_btl_mvapi_component.rd_win) { + mca_btl_mvapi_endpoint_send_credits(endpoint, endpoint->lcl_qp_hndl_hp, endpoint->rem_info.rem_qp_num_hp, &endpoint->rd_credits_hp); } count++; break; + case VAPI_CQE_SQ_RDMA_READ: + case VAPI_CQE_SQ_RDMA_WRITE: default: BTL_ERROR(("Unhandled work completion opcode is %d", comp.opcode)); break; } } - ret = VAPI_poll_cq(mvapi_btl->nic, mvapi_btl->cq_hndl_low, &comp); + ret = VAPI_poll_cq(mvapi_btl->nic, mvapi_btl->cq_hndl_lp, &comp); if(VAPI_OK == ret) { if(comp.status != VAPI_SUCCESS) { BTL_ERROR(("Got error : %s, Vendor code : %d Frag : %p", @@ -642,100 +688,101 @@ int mca_btl_mvapi_component_progress() BTL_ERROR(("Got an RDMA with Immediate data!, not supported!")); return OMPI_ERROR; - case VAPI_CQE_SQ_RDMA_READ: - case VAPI_CQE_SQ_RDMA_WRITE: case VAPI_CQE_SQ_SEND_DATA : - /* Process a completed send */ + /* Process a completed send - receiver must return tokens */ frag = (mca_btl_mvapi_frag_t*) (unsigned long) comp.id; frag->rc = OMPI_SUCCESS; frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, frag->rc); count++; - /* check and see if we need to progress pending sends */ + + /* if we have tokens, process pending sends */ + if(mca_btl_mvapi_component.use_srq && + OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp, 1) > 0 + && !opal_list_is_empty(&mvapi_btl->pending_frags_lp)) { + opal_list_item_t *frag_item; + frag_item = opal_list_remove_first(&mvapi_btl->pending_frags_lp); + frag = (mca_btl_mvapi_frag_t *) frag_item; + MCA_BTL_IB_FRAG_PROGRESS(frag); + } + break; + + case VAPI_CQE_SQ_RDMA_READ: + + frag = (mca_btl_mvapi_frag_t*) (unsigned long) comp.id; + OPAL_THREAD_ADD32(&frag->endpoint->get_tokens, 1); + /* fall through */ + + case VAPI_CQE_SQ_RDMA_WRITE: + + /* Process a completed write - returns send tokens immediately */ + frag = (mca_btl_mvapi_frag_t*) (unsigned long) comp.id; + endpoint = frag->endpoint; + frag->rc = OMPI_SUCCESS; + frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, frag->rc); + count++; + + if(mca_btl_mvapi_component.use_srq && + OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp, 1) > 0 + && !opal_list_is_empty(&mvapi_btl->pending_frags_lp)) { + opal_list_item_t *frag_item; + frag_item = opal_list_remove_first(&mvapi_btl->pending_frags_lp); + frag = (mca_btl_mvapi_frag_t *) frag_item; + MCA_BTL_IB_FRAG_PROGRESS(frag); + } if(!mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_lp, 1) > 0 && - !opal_list_is_empty(&(frag->endpoint->pending_frags_lp))) { + OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp, 1) > 0 && + !opal_list_is_empty(&(endpoint->pending_frags_lp))) { opal_list_item_t *frag_item; OPAL_THREAD_LOCK(&frag->endpoint->endpoint_lock); frag_item = opal_list_remove_first(&(frag->endpoint->pending_frags_lp)); OPAL_THREAD_UNLOCK(&frag->endpoint->endpoint_lock); frag = (mca_btl_mvapi_frag_t *) frag_item; - switch(frag->sr_desc.opcode){ - case VAPI_SEND: - if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) { - BTL_ERROR(("error in posting pending send\n")); - } - break; - case VAPI_RDMA_WRITE: - if(OMPI_SUCCESS != mca_btl_mvapi_put((mca_btl_base_module_t*) mvapi_btl, - frag->endpoint, - (mca_btl_base_descriptor_t*) frag)) { - BTL_ERROR(("error in posting pending rdma write\n")); - } - break; - case VAPI_RDMA_READ: - if(OMPI_SUCCESS != mca_btl_mvapi_put((mca_btl_base_module_t *) mvapi_btl, - frag->endpoint, - (mca_btl_base_descriptor_t*) frag)) { - BTL_ERROR(("error in posting pending rdma read\n")); - } - break; - default: - BTL_ERROR(("error in posting pending operation, invalide opcode %d\n", frag->sr_desc.opcode)); - } + MCA_BTL_IB_FRAG_PROGRESS(frag); } - if(mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&mvapi_btl->wr_sq_tokens_lp, 1) > 0 - && !opal_list_is_empty(&mvapi_btl->pending_frags_lp)) { - opal_list_item_t *frag_item; - frag_item = opal_list_remove_first(&mvapi_btl->pending_frags_lp); - frag = (mca_btl_mvapi_frag_t *) frag_item; - switch(frag->sr_desc.opcode){ - case VAPI_SEND: - if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) { - BTL_ERROR(("error in posting pending send\n")); - } - break; - case VAPI_RDMA_WRITE: - if(OMPI_SUCCESS != mca_btl_mvapi_put((mca_btl_base_module_t*) mvapi_btl, - frag->endpoint, - (mca_btl_base_descriptor_t*) frag)) { - BTL_ERROR(("error in posting pending rdma write\n")); - } - break; - case VAPI_RDMA_READ: - if(OMPI_SUCCESS != mca_btl_mvapi_put((mca_btl_base_module_t *) mvapi_btl, - frag->endpoint, - (mca_btl_base_descriptor_t*) frag)) { - BTL_ERROR(("error in posting pending rdma read\n")); - } - break; - default: - BTL_ERROR(("error in posting pending operation, invalide opcode %d\n", frag->sr_desc.opcode)); - } - } - break; case VAPI_CQE_RQ_SEND_DATA: - BTL_VERBOSE(("Got a recv completion")); frag = (mca_btl_mvapi_frag_t*) (unsigned long) comp.id; endpoint = (mca_btl_mvapi_endpoint_t*) frag->endpoint; + credits = frag->hdr->credits; + + /* post descriptors before processing receive */ + if(mca_btl_mvapi_component.use_srq) { + OPAL_THREAD_ADD32(&mvapi_btl->srd_posted_lp, -1); + MCA_BTL_MVAPI_POST_SRR_LOW(mvapi_btl, 0); + } else { + OPAL_THREAD_ADD32(&endpoint->rd_posted_lp, -1); + MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(endpoint, 0); + } + + /* process received frag */ frag->rc=OMPI_SUCCESS; frag->segment.seg_len = comp.byte_len-((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr); /* advance the segment address past the header and subtract from the length..*/ mvapi_btl->ib_reg[frag->hdr->tag].cbfunc(&mvapi_btl->super, frag->hdr->tag, &frag->base, mvapi_btl->ib_reg[frag->hdr->tag].cbdata); - OMPI_FREE_LIST_RETURN(&(mvapi_btl->recv_free_max), (opal_list_item_t*) frag); - - - if(mca_btl_mvapi_component.use_srq) { - OPAL_THREAD_ADD32(&mvapi_btl->srr_posted_low, -1); - MCA_BTL_MVAPI_POST_SRR_LOW(mvapi_btl, 0); - } else { - OPAL_THREAD_ADD32(&endpoint->rr_posted_low, -1); - MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(((mca_btl_mvapi_frag_t*) (unsigned long) comp.id)->endpoint, 0); + + /* check to see if we need to progress pending descriptors */ + if(!mca_btl_mvapi_component.use_srq && + OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp, credits) > 0 && + !opal_list_is_empty(&(endpoint->pending_frags_lp))) { + do { + opal_list_item_t *frag_item; + OPAL_THREAD_LOCK(&endpoint->endpoint_lock); + frag_item = opal_list_remove_first(&(endpoint->pending_frags_lp)); + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + if(NULL == (frag = (mca_btl_mvapi_frag_t *) frag_item)) + break; + MCA_BTL_IB_FRAG_PROGRESS(frag); + } while(endpoint->sd_tokens_lp > 0); + } + + /* check to see if we need to return credits */ + if( !mca_btl_mvapi_component.use_srq && + endpoint->rd_credits_lp >= mca_btl_mvapi_component.rd_win) { + mca_btl_mvapi_endpoint_send_credits(endpoint, endpoint->lcl_qp_hndl_lp, endpoint->rem_info.rem_qp_num_lp, &endpoint->rd_credits_lp); } count++; break; diff --git a/ompi/mca/btl/mvapi/btl_mvapi_endpoint.c b/ompi/mca/btl/mvapi/btl_mvapi_endpoint.c index 8b01c21e6e..cc492b58a6 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi_endpoint.c +++ b/ompi/mca/btl/mvapi/btl_mvapi_endpoint.c @@ -74,51 +74,48 @@ static inline int mca_btl_mvapi_endpoint_post_send( /* atomically test and acquire a token */ if(!mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_hp,-1) < 0) { + OPAL_THREAD_ADD32(&endpoint->sd_tokens_hp,-1) < 0) { BTL_VERBOSE(("Queing because no send tokens \n")); opal_list_append(&endpoint->pending_frags_hp, (opal_list_item_t *)frag); - OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_hp,1); - - /* OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); */ -/* mca_btl_mvapi_component_progress(); */ -/* OPAL_THREAD_LOCK(&endpoint->endpoint_lock); */ - + OPAL_THREAD_ADD32(&endpoint->sd_tokens_hp,1); return OMPI_SUCCESS; } else if( mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&mvapi_btl->wr_sq_tokens_hp,-1) < 0) { - OPAL_THREAD_ADD32(&mvapi_btl->wr_sq_tokens_hp,1); + OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_hp,-1) < 0) { + OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_hp,1); opal_list_append(&mvapi_btl->pending_frags_hp, (opal_list_item_t *)frag); return OMPI_SUCCESS; - }else { - frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_high; - qp_hndl = endpoint->lcl_qp_hndl_high; + } else { + frag->hdr->credits = endpoint->rd_credits_hp; + OPAL_THREAD_ADD32(&endpoint->rd_credits_hp, - frag->hdr->credits); + frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_hp; + qp_hndl = endpoint->lcl_qp_hndl_hp; } } else { /* atomically test and acquire a token */ if(!mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0 ) { + OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,-1) < 0 ) { BTL_VERBOSE(("Queing because no send tokens \n")); opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t *)frag); - OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,1); + OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,1); return OMPI_SUCCESS; } else if(mca_btl_mvapi_component.use_srq && - OPAL_THREAD_ADD32(&mvapi_btl->wr_sq_tokens_lp,-1) < 0) { - OPAL_THREAD_ADD32(&mvapi_btl->wr_sq_tokens_lp,1); + OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,-1) < 0) { + OPAL_THREAD_ADD32(&mvapi_btl->sd_tokens_lp,1); opal_list_append(&mvapi_btl->pending_frags_lp, (opal_list_item_t *)frag); return OMPI_SUCCESS; } else { - frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_low; - qp_hndl = endpoint->lcl_qp_hndl_low; + frag->hdr->credits = endpoint->rd_credits_lp; + OPAL_THREAD_ADD32(&endpoint->rd_credits_lp, - frag->hdr->credits); + frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_lp; + qp_hndl = endpoint->lcl_qp_hndl_lp; } } - frag->sg_entry.len = frag->segment.seg_len + - ((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr); - + frag->sg_entry.len = frag->segment.seg_len + sizeof(mca_btl_mvapi_header_t); if(frag->sg_entry.len <= mvapi_btl->ib_inline_max) { frag->ret = EVAPI_post_inline_sr(mvapi_btl->nic, qp_hndl, @@ -165,15 +162,23 @@ static void mca_btl_mvapi_endpoint_construct(mca_btl_base_endpoint_t* endpoint) OBJ_CONSTRUCT(&endpoint->pending_send_frags, opal_list_t); OBJ_CONSTRUCT(&endpoint->pending_frags_hp, opal_list_t); OBJ_CONSTRUCT(&endpoint->pending_frags_lp, opal_list_t); + OBJ_CONSTRUCT(&endpoint->pending_frags_get, opal_list_t); - - endpoint->rr_posted_high = 0; - endpoint->rr_posted_low = 0; + endpoint->rd_posted_hp = 0; + endpoint->rd_posted_lp = 0; + + /* zero these out w/ initial posting, so that we start out w/ + * zero credits to return to peer + */ + endpoint->rd_credits_hp = -(mca_btl_mvapi_component.rd_num + mca_btl_mvapi_component.rd_rsv); + endpoint->rd_credits_lp = -(mca_btl_mvapi_component.rd_num + mca_btl_mvapi_component.rd_rsv); + /* initialize the high and low priority tokens */ - endpoint->wr_sq_tokens_hp = mca_btl_mvapi_component.max_wr_sq_tokens; - endpoint->wr_sq_tokens_lp = mca_btl_mvapi_component.max_wr_sq_tokens; - endpoint->rem_info.rem_qp_num_high = 0; - endpoint->rem_info.rem_qp_num_low = 0; + endpoint->sd_tokens_hp = mca_btl_mvapi_component.rd_num; + endpoint->sd_tokens_lp = mca_btl_mvapi_component.rd_num; + endpoint->get_tokens = mca_btl_mvapi_component.ib_qp_ous_rd_atom; + endpoint->rem_info.rem_qp_num_hp = 0; + endpoint->rem_info.rem_qp_num_lp = 0; endpoint->rem_info.rem_lid = 0; endpoint->rem_info.rem_subnet = 0; } @@ -185,6 +190,11 @@ static void mca_btl_mvapi_endpoint_construct(mca_btl_base_endpoint_t* endpoint) static void mca_btl_mvapi_endpoint_destruct(mca_btl_base_endpoint_t* endpoint) { + OBJ_DESTRUCT(&endpoint->endpoint_lock); + OBJ_DESTRUCT(&endpoint->pending_send_frags); + OBJ_DESTRUCT(&endpoint->pending_frags_hp); + OBJ_DESTRUCT(&endpoint->pending_frags_lp); + OBJ_DESTRUCT(&endpoint->pending_frags_get); } /* @@ -214,13 +224,13 @@ static int mca_btl_mvapi_endpoint_send_connect_data(mca_btl_base_endpoint_t* end /* pack the info in the send buffer */ - rc = orte_dps.pack(buffer, &endpoint->lcl_qp_prop_high.qp_num, 1, ORTE_UINT32); + rc = orte_dps.pack(buffer, &endpoint->lcl_qp_prop_hp.qp_num, 1, ORTE_UINT32); if(rc != ORTE_SUCCESS) { ORTE_ERROR_LOG(rc); return rc; } - rc = orte_dps.pack(buffer, &endpoint->lcl_qp_prop_low.qp_num, 1, ORTE_UINT32); + rc = orte_dps.pack(buffer, &endpoint->lcl_qp_prop_lp.qp_num, 1, ORTE_UINT32); if(rc != ORTE_SUCCESS) { ORTE_ERROR_LOG(rc); return rc; @@ -269,8 +279,8 @@ static int mca_btl_mvapi_endpoint_send_connect_data(mca_btl_base_endpoint_t* end BTL_VERBOSE(("Sending High Priority QP num = %d, Low Priority QP num = %d, LID = %d", - endpoint->lcl_qp_prop_high.qp_num, - endpoint->lcl_qp_prop_low.qp_num, + endpoint->lcl_qp_prop_hp.qp_num, + endpoint->lcl_qp_prop_lp.qp_num, endpoint->endpoint_btl->port.lid)); if(rc < 0) { @@ -294,8 +304,8 @@ static int mca_btl_mvapi_endpoint_set_remote_info(mca_btl_base_endpoint_t* endpo memcpy(&((mca_btl_mvapi_endpoint_t*) endpoint)->rem_info, rem_info, sizeof(mca_btl_mvapi_rem_info_t)); BTL_VERBOSE(("Setting High Priority QP num = %d, Low Priority QP num %d, LID = %d", - endpoint->rem_info.rem_qp_num_high, - endpoint->rem_info.rem_qp_num_low, + endpoint->rem_info.rem_qp_num_hp, + endpoint->rem_info.rem_qp_num_lp, endpoint->rem_info.rem_lid)); return ORTE_SUCCESS; @@ -321,10 +331,10 @@ static int mca_btl_mvapi_endpoint_start_connect(mca_btl_base_endpoint_t* endpoin if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_create_qp(endpoint->endpoint_btl, endpoint->endpoint_btl->nic, endpoint->endpoint_btl->ptag, - endpoint->endpoint_btl->cq_hndl_high, - endpoint->endpoint_btl->srq_hndl_high, - &endpoint->lcl_qp_hndl_high, - &endpoint->lcl_qp_prop_high, + endpoint->endpoint_btl->cq_hndl_hp, + endpoint->endpoint_btl->srq_hndl_hp, + &endpoint->lcl_qp_hndl_hp, + &endpoint->lcl_qp_prop_hp, VAPI_TS_RC))) { BTL_ERROR(("error creating queue pair, error code %d", rc)); return rc; @@ -335,10 +345,10 @@ static int mca_btl_mvapi_endpoint_start_connect(mca_btl_base_endpoint_t* endpoin if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_create_qp(endpoint->endpoint_btl, endpoint->endpoint_btl->nic, endpoint->endpoint_btl->ptag, - endpoint->endpoint_btl->cq_hndl_low, - endpoint->endpoint_btl->srq_hndl_low, - &endpoint->lcl_qp_hndl_low, - &endpoint->lcl_qp_prop_low, + endpoint->endpoint_btl->cq_hndl_lp, + endpoint->endpoint_btl->srq_hndl_lp, + &endpoint->lcl_qp_hndl_lp, + &endpoint->lcl_qp_prop_lp, VAPI_TS_RC))) { BTL_ERROR(("error creating queue pair, error code %d", rc)); @@ -356,8 +366,8 @@ static int mca_btl_mvapi_endpoint_start_connect(mca_btl_base_endpoint_t* endpoin #endif BTL_VERBOSE(("Initialized High Priority QP num = %d, Low Priority QP num = %d, LID = %d", - endpoint->lcl_qp_prop_high.qp_num, - endpoint->lcl_qp_prop_low.qp_num, + endpoint->lcl_qp_prop_hp.qp_num, + endpoint->lcl_qp_prop_lp.qp_num, endpoint->endpoint_btl->port.lid)); /* Send connection info over to remote endpoint */ @@ -383,10 +393,10 @@ static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t * if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_create_qp(endpoint->endpoint_btl, endpoint->endpoint_btl->nic, endpoint->endpoint_btl->ptag, - endpoint->endpoint_btl->cq_hndl_high, - endpoint->endpoint_btl->srq_hndl_high, - &endpoint->lcl_qp_hndl_high, - &endpoint->lcl_qp_prop_high, + endpoint->endpoint_btl->cq_hndl_hp, + endpoint->endpoint_btl->srq_hndl_hp, + &endpoint->lcl_qp_hndl_hp, + &endpoint->lcl_qp_prop_hp, VAPI_TS_RC))) { BTL_ERROR(("error creating queue pair, error code %d", rc)); return rc; @@ -397,10 +407,10 @@ static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t * if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_create_qp(endpoint->endpoint_btl, endpoint->endpoint_btl->nic, endpoint->endpoint_btl->ptag, - endpoint->endpoint_btl->cq_hndl_low, - endpoint->endpoint_btl->srq_hndl_low, - &endpoint->lcl_qp_hndl_low, - &endpoint->lcl_qp_prop_low, + endpoint->endpoint_btl->cq_hndl_lp, + endpoint->endpoint_btl->srq_hndl_lp, + &endpoint->lcl_qp_hndl_lp, + &endpoint->lcl_qp_prop_lp, VAPI_TS_RC))) { BTL_ERROR(("error creating queue pair, error code %d", rc)); return rc; @@ -416,8 +426,8 @@ static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t * #endif BTL_VERBOSE(("Initialized High Priority QP num = %d, Low Priority QP num = %d, LID = %d", - endpoint->lcl_qp_prop_high.qp_num, - endpoint->lcl_qp_prop_low.qp_num, + endpoint->lcl_qp_prop_hp.qp_num, + endpoint->lcl_qp_prop_lp.qp_num, endpoint->endpoint_btl->port.lid)); @@ -497,12 +507,12 @@ static void mca_btl_mvapi_endpoint_recv( /* start by unpacking data first so we know who is knocking at our door */ - rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_high, &cnt, ORTE_UINT32); + rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_hp, &cnt, ORTE_UINT32); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); return; } - rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_low, &cnt, ORTE_UINT32); + rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_lp, &cnt, ORTE_UINT32); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); return; @@ -545,8 +555,8 @@ static void mca_btl_mvapi_endpoint_recv( BTL_VERBOSE(("Received High Priority QP num = %d, Low Priority QP num %d, LID = %d", - rem_info.rem_qp_num_high, - rem_info.rem_qp_num_low, + rem_info.rem_qp_num_hp, + rem_info.rem_qp_num_lp, rem_info.rem_lid)); @@ -763,15 +773,15 @@ int mca_btl_mvapi_endpoint_connect( /* Connection establishment RC */ rc = mca_btl_mvapi_endpoint_qp_init_query(endpoint->endpoint_btl, endpoint->endpoint_btl->nic, - endpoint->lcl_qp_hndl_high, - endpoint->rem_info.rem_qp_num_high, + endpoint->lcl_qp_hndl_hp, + endpoint->rem_info.rem_qp_num_hp, endpoint->rem_info.rem_lid, endpoint->endpoint_btl->port_id); rc = mca_btl_mvapi_endpoint_qp_init_query(endpoint->endpoint_btl, endpoint->endpoint_btl->nic, - endpoint->lcl_qp_hndl_low, - endpoint->rem_info.rem_qp_num_low, + endpoint->lcl_qp_hndl_lp, + endpoint->rem_info.rem_qp_num_lp, endpoint->rem_info.rem_lid, endpoint->endpoint_btl->port_id); @@ -844,13 +854,14 @@ int mca_btl_mvapi_endpoint_create_qp( VAPI_ret_t ret; VAPI_qp_init_attr_t qp_init_attr; - VAPI_qp_init_attr_ext_t qp_init_attr_ext; - + VAPI_qp_init_attr_ext_t qp_init_attr_ext; + + /* worst case number of credit messages could be queued */ switch(transport_type) { case VAPI_TS_RC: /* Set up RC qp parameters */ - qp_init_attr.cap.max_oust_wr_rq = mca_btl_mvapi_component.ib_rr_buf_max; - qp_init_attr.cap.max_oust_wr_sq = mca_btl_mvapi_component.max_wr_sq_tokens; + qp_init_attr.cap.max_oust_wr_rq = mca_btl_mvapi_component.rd_num + mca_btl_mvapi_component.rd_rsv; + qp_init_attr.cap.max_oust_wr_sq = mca_btl_mvapi_component.rd_num + mca_btl_mvapi_component.rd_rsv; qp_init_attr.cap.max_sg_size_rq = mca_btl_mvapi_component.ib_sg_list_size; qp_init_attr.cap.max_sg_size_sq = mca_btl_mvapi_component.ib_sg_list_size; qp_init_attr.pd_hndl = ptag; @@ -1013,3 +1024,59 @@ int mca_btl_mvapi_endpoint_qp_init_query( return OMPI_SUCCESS; } + +/** + * Return control fragment. + */ + +static void mca_btl_mvapi_endpoint_control_cb( + mca_btl_base_module_t* btl, + struct mca_btl_base_endpoint_t* ep, + struct mca_btl_base_descriptor_t* descriptor, + int status) +{ + MCA_BTL_IB_FRAG_RETURN_EAGER((mca_btl_mvapi_module_t*)btl, (mca_btl_mvapi_frag_t*)descriptor); +} + +/** + * + */ + +void mca_btl_mvapi_endpoint_send_credits( + mca_btl_mvapi_endpoint_t* endpoint, + VAPI_qp_hndl_t local_qp, + VAPI_qp_num_t remote_qp, + int32_t* credits) +{ + mca_btl_mvapi_module_t* btl = endpoint->endpoint_btl; + mca_btl_mvapi_frag_t* frag; + int rc; + + /* fprintf(stderr, "sending credits %d\n", *credits); */ + MCA_BTL_IB_FRAG_ALLOC_EAGER(btl, frag, rc); + if(NULL == frag) { + BTL_ERROR(("error allocating fragment")); + return; + } + + frag->base.des_cbfunc = mca_btl_mvapi_endpoint_control_cb; + frag->base.des_cbdata = NULL; + + frag->hdr->tag = MCA_BTL_TAG_BTL; + frag->hdr->credits = *credits; + OPAL_THREAD_ADD32(credits, -frag->hdr->credits); + + frag->sr_desc.remote_qkey = 0; + frag->sr_desc.opcode = VAPI_SEND; + frag->sr_desc.remote_qp = remote_qp; + + frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->hdr; + frag->sg_entry.len = sizeof(mca_btl_mvapi_header_t); + + rc = EVAPI_post_inline_sr(btl->nic, endpoint->lcl_qp_hndl_hp, &frag->sr_desc); + if(VAPI_SUCCESS != rc) { + BTL_ERROR(("error calling EVAPI_post_inline_sr: %s\n", VAPI_strerror(rc))); + MCA_BTL_IB_FRAG_RETURN_EAGER(btl, frag); + } +} + diff --git a/ompi/mca/btl/mvapi/btl_mvapi_endpoint.h b/ompi/mca/btl/mvapi/btl_mvapi_endpoint.h index 8994af51e4..f9b925e4c8 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi_endpoint.h +++ b/ompi/mca/btl/mvapi/btl_mvapi_endpoint.h @@ -34,7 +34,6 @@ #if defined(c_plusplus) || defined(__cplusplus) extern "C" { #endif -#define MAX_POST_RR (16) OBJ_CLASS_DECLARATION(mca_btl_mvapi_endpoint_t); @@ -77,10 +76,10 @@ typedef enum { struct mca_btl_mvapi_rem_info_t { - VAPI_qp_num_t rem_qp_num_high; + VAPI_qp_num_t rem_qp_num_hp; /* High priority remote side QP number */ - VAPI_qp_num_t rem_qp_num_low; + VAPI_qp_num_t rem_qp_num_lp; /* Low prioirty remote size QP number */ IB_lid_t rem_lid; @@ -124,36 +123,27 @@ struct mca_btl_base_endpoint_t { opal_list_t pending_send_frags; /**< list of pending send frags for this endpoint */ - opal_list_t pending_frags_hp; - /**< list of pending high priority frags */ + opal_list_t pending_frags_hp; /**< list of pending high priority frags */ + opal_list_t pending_frags_lp; /**< list of pending low priority frags */ + opal_list_t pending_frags_get; /**< list of pending get operations */ - opal_list_t pending_frags_lp; - /**< list of pending low priority frags */ - - int32_t wr_sq_tokens_hp; - /**< number of high priority frags that can be outstanding (down counter) */ - - int32_t wr_sq_tokens_lp; - /**< number of low priority frags that can be outstanding (down counter) */ - mca_btl_mvapi_rem_info_t rem_info; - VAPI_qp_hndl_t lcl_qp_hndl_high; - /* High priority local QP handle */ + VAPI_qp_hndl_t lcl_qp_hndl_hp; /* High priority local QP handle */ + VAPI_qp_hndl_t lcl_qp_hndl_lp; /* Low priority local QP handle */ + + VAPI_qp_prop_t lcl_qp_prop_hp; /* High priority local QP properties */ + VAPI_qp_prop_t lcl_qp_prop_lp; /* Low priority local QP properties */ + + int32_t sd_tokens_hp; /**< number of high priority send tokens */ + int32_t sd_tokens_lp; /**< number of low priority send tokens */ + int32_t get_tokens; /**< number of available get tokens */ - VAPI_qp_hndl_t lcl_qp_hndl_low; - /* Low priority local QP handle */ - - VAPI_qp_prop_t lcl_qp_prop_high; - /* High priority local QP properties */ - - VAPI_qp_prop_t lcl_qp_prop_low; - /* Low priority local QP properties */ - + int32_t rd_posted_hp; /**< number of high priority descriptors posted to the nic*/ + int32_t rd_posted_lp; /**< number of low priority descriptors posted to the nic*/ + int32_t rd_credits_hp; /**< number of high priority credits to return to peer */ + int32_t rd_credits_lp; /**< number of low priority credits to return to peer */ - uint32_t rr_posted_high; /**< number of high priority rr posted to the nic*/ - uint32_t rr_posted_low; /**< number of low priority rr posted to the nic*/ - uint32_t subnet; #if 0 mca_btl_mvapi_rdma_buf_t *rdma_buf; @@ -164,6 +154,11 @@ typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t; typedef mca_btl_base_endpoint_t mca_btl_mvapi_endpoint_t; int mca_btl_mvapi_endpoint_send(mca_btl_base_endpoint_t* endpoint, struct mca_btl_mvapi_frag_t* frag); int mca_btl_mvapi_endpoint_connect(mca_btl_base_endpoint_t*); +void mca_btl_mvapi_endpoint_send_credits( + mca_btl_base_endpoint_t*, + VAPI_qp_hndl_t local, + VAPI_qp_num_t rem, + int32_t* credits); void mca_btl_mvapi_post_recv(void); @@ -173,15 +168,16 @@ void mca_btl_mvapi_post_recv(void); do { \ mca_btl_mvapi_module_t * mvapi_btl = endpoint->endpoint_btl; \ OPAL_THREAD_LOCK(&mvapi_btl->ib_lock); \ - if(endpoint->rr_posted_high <= mca_btl_mvapi_component.ib_rr_buf_min+additional && \ - endpoint->rr_posted_high < mvapi_btl->rd_buf_max){ \ - MCA_BTL_MVAPI_ENDPOINT_POST_RR_SUB(mvapi_btl->rd_buf_max - \ - endpoint->rr_posted_high, \ + if(endpoint->rd_posted_hp <= mca_btl_mvapi_component.rd_low+additional && \ + endpoint->rd_posted_hp < mvapi_btl->rd_num){ \ + MCA_BTL_MVAPI_ENDPOINT_POST_RR_SUB(mvapi_btl->rd_num - \ + endpoint->rd_posted_hp, \ endpoint, \ &mvapi_btl->recv_free_eager, \ - &endpoint->rr_posted_high, \ + endpoint->rd_posted_hp, \ + endpoint->rd_credits_hp, \ mvapi_btl->nic, \ - endpoint->lcl_qp_hndl_high); \ + endpoint->lcl_qp_hndl_hp); \ } \ OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock); \ } while(0); \ @@ -193,15 +189,16 @@ void mca_btl_mvapi_post_recv(void); do { \ mca_btl_mvapi_module_t * mvapi_btl = endpoint->endpoint_btl; \ OPAL_THREAD_LOCK(&mvapi_btl->ib_lock); \ - if(endpoint->rr_posted_low <= mca_btl_mvapi_component.ib_rr_buf_min+additional && \ - endpoint->rr_posted_low < mvapi_btl->rd_buf_max){ \ - MCA_BTL_MVAPI_ENDPOINT_POST_RR_SUB(mvapi_btl->rd_buf_max - \ - endpoint->rr_posted_low, \ + if(endpoint->rd_posted_lp <= mca_btl_mvapi_component.rd_low+additional && \ + endpoint->rd_posted_lp < mvapi_btl->rd_num){ \ + MCA_BTL_MVAPI_ENDPOINT_POST_RR_SUB(mvapi_btl->rd_num - \ + endpoint->rd_posted_lp, \ endpoint, \ &mvapi_btl->recv_free_max, \ - &endpoint->rr_posted_low, \ + endpoint->rd_posted_lp, \ + endpoint->rd_credits_lp, \ mvapi_btl->nic, \ - endpoint->lcl_qp_hndl_low); \ + endpoint->lcl_qp_hndl_lp); \ } \ OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock); \ } while(0); \ @@ -211,18 +208,20 @@ void mca_btl_mvapi_post_recv(void); #define MCA_BTL_MVAPI_ENDPOINT_POST_RR_SUB(cnt, \ my_endpoint, \ frag_list, \ - rr_posted, \ + rd_posted, \ + rd_credits, \ nic, \ qp ) \ -{\ - do { \ - uint32_t i; \ +{ \ +do { \ + int32_t i; \ int rc; \ - opal_list_item_t* item; \ - mca_btl_mvapi_frag_t* frag = NULL; \ + int32_t num_post = cnt; \ mca_btl_mvapi_module_t *mvapi_btl = my_endpoint->endpoint_btl; \ VAPI_rr_desc_t* desc_post = mvapi_btl->rr_desc_post; \ - for(i = 0; i < cnt; i++) { \ + for(i = 0; i < num_post; i++) { \ + opal_list_item_t* item; \ + mca_btl_mvapi_frag_t* frag = NULL; \ OMPI_FREE_LIST_WAIT(frag_list, item, rc); \ frag = (mca_btl_mvapi_frag_t*) item; \ frag->endpoint = my_endpoint; \ @@ -231,30 +230,22 @@ void mca_btl_mvapi_post_recv(void); (unsigned char*) frag->hdr); \ desc_post[i] = frag->rr_desc; \ }\ - frag->ret = EVAPI_post_rr_list( nic, \ - qp, \ - cnt, \ - desc_post); \ - if(NULL != frag && VAPI_OK != frag->ret) { \ + rc = EVAPI_post_rr_list( nic, \ + qp, \ + num_post, \ + desc_post); \ + if(VAPI_OK != rc) { \ BTL_ERROR(("error posting receive descriptors: %s",\ - VAPI_strerror(frag->ret))); \ - } else if (NULL != frag){\ - OPAL_THREAD_ADD32(rr_posted, cnt); \ + VAPI_strerror(rc))); \ + } else { \ + /* fprintf(stderr, "posting: %d to %d\n", num_post, rd_posted); */ \ + OPAL_THREAD_ADD32(&(rd_posted), num_post); \ + /* fprintf(stderr, "credits: %d to %d\n", num_post, rd_credits); */ \ + OPAL_THREAD_ADD32(&(rd_credits), num_post); \ }\ } while(0); \ } -#define BTL_MVAPI_INSERT_PENDING(frag, frag_list, tokens, lock, rc) \ -{ \ - do{ \ - OPAL_THREAD_LOCK(&lock); \ - opal_list_append(&frag_list, (opal_list_item_t *)frag); \ - OPAL_THREAD_UNLOCK(&lock); \ - OPAL_THREAD_ADD32(&tokens, 1); \ - rc = OMPI_SUCCESS; \ - } while(0); \ -} - #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/ompi/mca/btl/mvapi/btl_mvapi_frag.c b/ompi/mca/btl/mvapi/btl_mvapi_frag.c index 3fc4789ae8..f6675f3809 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi_frag.c +++ b/ompi/mca/btl/mvapi/btl_mvapi_frag.c @@ -25,23 +25,9 @@ static void mca_btl_mvapi_frag_common_constructor( mca_btl_mvapi_frag_t* frag) { mca_mpool_mvapi_registration_t* mem_hndl = (mca_mpool_mvapi_registration_t*) frag->base.super.user_data; - frag->hdr = (mca_btl_mvapi_header_t*) (frag+1); /* initialize the btl header to point to start at end of frag */ -#if 0 - mod = (unsigned long) frag->hdr % MCA_BTL_IB_FRAG_ALIGN; - - if(mod != 0) { - frag->hdr = (mca_btl_mvapi_header_t*) ((unsigned char*) frag->hdr + (MCA_BTL_IB_FRAG_ALIGN - mod)); - } -#endif - - frag->segment.seg_addr.pval = ((unsigned char* )frag->hdr) + sizeof(mca_btl_mvapi_header_t); /* init the segment address to start after the btl header */ - -#if 0 - mod = (frag->segment.seg_addr.lval) % MCA_BTL_IB_FRAG_ALIGN; - if(mod != 0) { - frag->segment.seg_addr.lval += (MCA_BTL_IB_FRAG_ALIGN - mod); - } -#endif + frag->hdr = (mca_btl_mvapi_header_t*) (frag+1); /* initialize btl header to point to start at end of frag */ + frag->segment.seg_addr.pval = ((unsigned char* )frag->hdr) + sizeof(mca_btl_mvapi_header_t); + /* init the segment address to start after the btl header */ /* frag->mem_hndl = mem_hndl->hndl; */ frag->segment.seg_len = frag->size; @@ -49,7 +35,6 @@ static void mca_btl_mvapi_frag_common_constructor( mca_btl_mvapi_frag_t* frag) frag->sg_entry.lkey = mem_hndl->l_key; frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->hdr; frag->base.des_flags = 0; - } diff --git a/ompi/mca/btl/mvapi/btl_mvapi_frag.h b/ompi/mca/btl/mvapi/btl_mvapi_frag.h index de1f7da430..9e30a6c05b 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi_frag.h +++ b/ompi/mca/btl/mvapi/btl_mvapi_frag.h @@ -34,7 +34,11 @@ extern "C" { #endif OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_mvapi_frag_t); -typedef mca_btl_base_header_t mca_btl_mvapi_header_t; +struct mca_btl_mvapi_header_t { + mca_btl_base_tag_t tag; + int16_t credits; +}; +typedef struct mca_btl_mvapi_header_t mca_btl_mvapi_header_t; typedef enum { @@ -140,6 +144,33 @@ OBJ_CLASS_DECLARATION(mca_btl_mvapi_recv_frag_max_t); +#define MCA_BTL_IB_FRAG_PROGRESS(frag) \ +do { \ + switch(frag->sr_desc.opcode) { \ + case VAPI_SEND: \ + if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) { \ + BTL_ERROR(("error in posting pending send\n")); \ + } \ + break; \ + case VAPI_RDMA_WRITE: \ + if(OMPI_SUCCESS != mca_btl_mvapi_put((mca_btl_base_module_t*) mvapi_btl, \ + frag->endpoint, \ + (mca_btl_base_descriptor_t*) frag)) { \ + BTL_ERROR(("error in posting pending rdma write\n")); \ + } \ + break; \ + case VAPI_RDMA_READ: \ + if(OMPI_SUCCESS != mca_btl_mvapi_get((mca_btl_base_module_t *) mvapi_btl, \ + frag->endpoint, \ + (mca_btl_base_descriptor_t*) frag)) { \ + BTL_ERROR(("error in posting pending rdma read\n")); \ + } \ + break; \ + default: \ + BTL_ERROR(("error in posting pending operation, invalide opcode %d\n", frag->sr_desc.opcode)); \ + break; \ + } \ +} while (0) struct mca_btl_mvapi_module_t;