From 7999c0810757403df526157a869250dd3d4fbfd9 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 12 Sep 2006 09:17:59 +0000 Subject: [PATCH] consolidate credit management and CQ polling code. This commit was SVN r11622. --- ompi/mca/btl/openib/btl_openib.c | 17 +- ompi/mca/btl/openib/btl_openib.h | 6 +- ompi/mca/btl/openib/btl_openib_component.c | 247 ++++++---------- ompi/mca/btl/openib/btl_openib_endpoint.c | 319 ++++++++------------- ompi/mca/btl/openib/btl_openib_endpoint.h | 26 +- ompi/mca/btl/openib/btl_openib_frag.c | 2 +- 6 files changed, 227 insertions(+), 390 deletions(-) diff --git a/ompi/mca/btl/openib/btl_openib.c b/ompi/mca/btl/openib/btl_openib.c index c41cb3122a..2f172a3048 100644 --- a/ompi/mca/btl/openib/btl_openib.c +++ b/ompi/mca/btl/openib/btl_openib.c @@ -177,12 +177,13 @@ int mca_btl_openib_size_queues( struct mca_btl_openib_module_t* openib_btl, size openib_btl->hca->ib_dev_attr.max_cq : min_cq_size; #if OMPI_MCA_BTL_OPENIB_HAVE_RESIZE_CQ if(!first_time) { - rc = ibv_resize_cq(openib_btl->ib_cq_lp, mca_btl_openib_component.ib_cq_size); + rc = ibv_resize_cq(openib_btl->ib_cq[BTL_OPENIB_LP_QP], mca_btl_openib_component.ib_cq_size); if(rc) { BTL_ERROR(("cannot resize low priority completion queue, error: %d", rc)); return OMPI_ERROR; } - rc = ibv_resize_cq(openib_btl->ib_cq_hp, mca_btl_openib_component.ib_cq_size); + rc = ibv_resize_cq(openib_btl->ib_cq[BTL_OPENIB_HP_QP], + mca_btl_openib_component.ib_cq_size); if(rc) { BTL_ERROR(("cannot resize high priority completion queue, error: %d", rc)); return OMPI_ERROR; @@ -797,16 +798,16 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl) /* Create the low and high priority queue pairs */ #if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3 - openib_btl->ib_cq_lp = + openib_btl->ib_cq[BTL_OPENIB_LP_QP] = ibv_create_cq(openib_btl->hca->ib_dev_context, mca_btl_openib_component.ib_cq_size, NULL); #else - openib_btl->ib_cq_lp = + openib_btl->ib_cq[BTL_OPENIB_LP_QP] = ibv_create_cq(openib_btl->hca->ib_dev_context, mca_btl_openib_component.ib_cq_size, NULL, NULL, 0); #endif - if(NULL == openib_btl->ib_cq_lp) { + if(NULL == openib_btl->ib_cq[BTL_OPENIB_LP_QP]) { BTL_ERROR(("error creating low priority cq for %s errno says %s\n", ibv_get_device_name(openib_btl->hca->ib_dev), strerror(errno))); @@ -814,16 +815,16 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl) } #if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3 - openib_btl->ib_cq_hp = + openib_btl->ib_cq[BTL_OPENIB_HP_QP] = ibv_create_cq(openib_btl->hca->ib_dev_context, mca_btl_openib_component.ib_cq_size, NULL); #else - openib_btl->ib_cq_hp = + openib_btl->ib_cq[BTL_OPENIB_HP_QP] = ibv_create_cq(openib_btl->hca->ib_dev_context, mca_btl_openib_component.ib_cq_size, NULL, NULL, 0); #endif - if(NULL == openib_btl->ib_cq_hp) { + if(NULL == openib_btl->ib_cq[BTL_OPENIB_HP_QP]) { BTL_ERROR(("error creating high priority cq for %s errno says %s\n", ibv_get_device_name(openib_btl->hca->ib_dev), strerror(errno))); diff --git a/ompi/mca/btl/openib/btl_openib.h b/ompi/mca/btl/openib/btl_openib.h index edf48c7535..7759d665d6 100644 --- a/ompi/mca/btl/openib/btl_openib.h +++ b/ompi/mca/btl/openib/btl_openib.h @@ -166,8 +166,7 @@ struct mca_btl_openib_module_t { mca_btl_openib_port_info_t port_info; /* contains only the subnet right now */ mca_btl_openib_hca_t *hca; uint8_t port_num; /**< ID of the PORT */ - struct ibv_cq *ib_cq_hp; - struct ibv_cq *ib_cq_lp; + struct ibv_cq *ib_cq[2]; struct ibv_port_attr ib_port_attr; uint16_t lid; /**< lid that is actually used (for LMC) */ uint8_t src_path_bits; /**< offset from base lid (for LMC) */ @@ -433,9 +432,6 @@ static inline int mca_btl_openib_post_srr(mca_btl_openib_module_t* openib_btl, for(i = 0; i < num_post; i++) { OMPI_FREE_LIST_WAIT(free_list, item, rc); frag = (mca_btl_openib_frag_t*)item; - frag->sg_entry.length = frag->size + - ((unsigned char*)frag->segment.seg_addr.pval - - (unsigned char*)frag->hdr); if(ibv_post_srq_recv(openib_btl->srq[prio], &frag->wr_desc.rd_desc, &bad_wr)) { BTL_ERROR(("error posting receive descriptors to shared " diff --git a/ompi/mca/btl/openib/btl_openib_component.c b/ompi/mca/btl/openib/btl_openib_component.c index cf22b76d0e..480e289613 100644 --- a/ompi/mca/btl/openib/btl_openib_component.c +++ b/ompi/mca/btl/openib/btl_openib_component.c @@ -70,10 +70,10 @@ static mca_btl_base_module_t **btl_openib_component_init( bool enable_mpi_threads); static void merge_values(ompi_btl_openib_ini_values_t *target, ompi_btl_openib_ini_values_t *src); -static int btl_openib_handle_incoming_hp(mca_btl_openib_module_t *openib_btl, +static int btl_openib_handle_incoming(mca_btl_openib_module_t *openib_btl, mca_btl_openib_endpoint_t *endpoint, mca_btl_openib_frag_t *frag, - size_t byte_len); + size_t byte_len, const int prio); static char* btl_openib_component_status_to_string(enum ibv_wc_status status); static int btl_openib_component_progress(void); static void btl_openib_frag_progress_pending( @@ -691,15 +691,21 @@ static void merge_values(ompi_btl_openib_ini_values_t *target, } -static int btl_openib_handle_incoming_hp(mca_btl_openib_module_t *openib_btl, +static int btl_openib_handle_incoming(mca_btl_openib_module_t *openib_btl, mca_btl_openib_endpoint_t *endpoint, mca_btl_openib_frag_t *frag, - size_t byte_len) + size_t byte_len, const int prio) { - /* advance the segment address past the header and subtract from the length..*/ - frag->segment.seg_len = byte_len - - ((unsigned char*)frag->segment.seg_addr.pval - - (unsigned char*) frag->hdr); + ompi_free_list_t *free_list; + + if(BTL_OPENIB_HP_QP == prio) + free_list = &openib_btl->recv_free_eager; + else + free_list = &openib_btl->recv_free_max; + + /* advance the segment address past the header and subtract from the + * length..*/ + frag->segment.seg_len = byte_len - sizeof(mca_btl_openib_header_t); /* call registered callback */ openib_btl->ib_reg[frag->hdr->tag].cbfunc(&openib_btl->super, @@ -712,12 +718,11 @@ static int btl_openib_handle_incoming_hp(mca_btl_openib_module_t *openib_btl, BTL_OPENIB_CREDITS(frag->hdr->credits)); else if(!mca_btl_openib_component.use_srq && frag->hdr->credits > 0) - OPAL_THREAD_ADD32(&endpoint->sd_tokens[BTL_OPENIB_HP_QP], + OPAL_THREAD_ADD32(&endpoint->sd_tokens[prio], frag->hdr->credits); if (!MCA_BTL_OPENIB_RDMA_FRAG(frag)) { - OMPI_FREE_LIST_RETURN(&(openib_btl->recv_free_eager), - (ompi_free_list_item_t*) frag); + OMPI_FREE_LIST_RETURN(free_list, (ompi_free_list_item_t*) frag); } else { mca_btl_openib_frag_t *tf; OPAL_THREAD_LOCK(&endpoint->eager_rdma_local.lock); @@ -734,8 +739,9 @@ static int btl_openib_handle_incoming_hp(mca_btl_openib_module_t *openib_btl, OPAL_THREAD_UNLOCK(&endpoint->eager_rdma_local.lock); } - if (mca_btl_openib_component.use_eager_rdma && - !endpoint->eager_rdma_local.base.pval && + if (!endpoint->eager_rdma_local.base.pval && + mca_btl_openib_component.use_eager_rdma && + BTL_OPENIB_HP_QP == prio && openib_btl->eager_rdma_buffers_count < mca_btl_openib_component.max_eager_rdma && OPAL_THREAD_ADD32(&endpoint->eager_recv_count, 1) == @@ -743,32 +749,25 @@ static int btl_openib_handle_incoming_hp(mca_btl_openib_module_t *openib_btl, mca_btl_openib_endpoint_connect_eager_rdma(endpoint); } - - /* check to see if we need to return credits */ - if((endpoint->rd_credits[BTL_OPENIB_HP_QP] >= - mca_btl_openib_component.rd_win || - endpoint->eager_rdma_local.credits >= - mca_btl_openib_component.rd_win) && - OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP],1) == 1) { - mca_btl_openib_endpoint_send_credits_hp(endpoint); - } - /* repost receive descriptors if receive not by RDMA */ if(!MCA_BTL_OPENIB_RDMA_FRAG(frag)) { if(mca_btl_openib_component.use_srq) { - OPAL_THREAD_ADD32((int32_t*)&openib_btl->srd_posted[BTL_OPENIB_HP_QP], -1); - mca_btl_openib_post_srr(openib_btl, 0, BTL_OPENIB_HP_QP); + OPAL_THREAD_ADD32((int32_t*)&openib_btl->srd_posted[prio], -1); + mca_btl_openib_post_srr(openib_btl, 0, prio); } else { - OPAL_THREAD_ADD32((int32_t*)&endpoint->rd_posted[BTL_OPENIB_HP_QP], - -1); - btl_openib_endpoint_post_rr(endpoint, 0, BTL_OPENIB_HP_QP); + OPAL_THREAD_ADD32((int32_t*)&endpoint->rd_posted[prio], -1); + btl_openib_endpoint_post_rr(endpoint, 0, prio); } } /* nothing to progress for SRQ case */ if(!mca_btl_openib_component.use_srq) { - btl_openib_frag_progress_pending(openib_btl, endpoint, - BTL_OPENIB_HP_QP); + btl_openib_frag_progress_pending(openib_btl, endpoint, prio); + } + + /* check to see if we need to return credits */ + if(btl_openib_check_send_credits(endpoint, prio)) { + mca_btl_openib_endpoint_send_credits(endpoint, prio); } return OMPI_SUCCESS; @@ -927,15 +926,16 @@ static void btl_openib_frag_progress_pending( static int btl_openib_component_progress(void) { static char *qp_name[] = {"HP", "LP"}; - int i, j, c, qp = 0; + int i, j, c, qp; int count = 0,ne = 0, ret; - int32_t credits; mca_btl_openib_frag_t* frag; mca_btl_openib_endpoint_t* endpoint; struct ibv_wc wc; mca_btl_openib_module_t* openib_btl; - /* Poll for RDMA completions - if any succeed, we don't process the slower queues */ + /* Poll for RDMA completions - if any succeed, we don't process the slower + * queues. + */ for(i = 0; i < mca_btl_openib_component.ib_num_btls; i++) { mca_btl_openib_module_t* openib_btl = &mca_btl_openib_component.openib_btls[i]; @@ -971,9 +971,10 @@ static int btl_openib_component_progress(void) frag->segment.seg_addr.pval = ((unsigned char* )frag->hdr) + sizeof(mca_btl_openib_header_t); - ret = btl_openib_handle_incoming_hp(openib_btl, + ret = btl_openib_handle_incoming(openib_btl, frag->endpoint, frag, - size - sizeof(mca_btl_openib_footer_t)); + size - sizeof(mca_btl_openib_footer_t), + BTL_OPENIB_HP_QP); if (ret != MPI_SUCCESS) { openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL); @@ -989,45 +990,57 @@ static int btl_openib_component_progress(void) for(i = 0; i < mca_btl_openib_component.ib_num_btls; i++) { openib_btl = &mca_btl_openib_component.openib_btls[i]; - /* we have two completion queues, one for "high" priority and one for "low". - * we will check the high priority and process them until there are none left. - * note that low priority messages are only processed one per progress call. - */ + /* We have two completion queues, one for "high" priority and one for + * "low". Check high priority before low priority */ + for(qp = 0; qp < 2; qp++) { + ne = ibv_poll_cq(openib_btl->ib_cq[qp], 1, &wc); - ne=ibv_poll_cq(openib_btl->ib_cq_hp, 1, &wc ); + if(0 == ne) + continue; - if(ne != 0) { if(ne < 0 || wc.status != IBV_WC_SUCCESS) - goto error_hp; + goto error; frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id; endpoint = frag->endpoint; /* Handle work completions */ switch(wc.opcode) { + case IBV_WC_RDMA_READ: + assert(BTL_OPENIB_LP_QP == qp); + OPAL_THREAD_ADD32(&frag->endpoint->get_tokens, 1); + /* fall through */ + case IBV_WC_RDMA_WRITE: + if(BTL_OPENIB_LP_QP == qp) { + /* process a completed write */ + frag->base.des_cbfunc(&openib_btl->super, endpoint, + &frag->base, OMPI_SUCCESS); + + /* return send wqe */ + OPAL_THREAD_ADD32(&endpoint->sd_wqe[qp], 1); + + /* check for pending frags */ + btl_openib_frag_progress_pending(openib_btl, endpoint, qp); + + count++; + break; + } + /* fall through for high prio QP */ case IBV_WC_SEND: /* Process a completed send */ frag->base.des_cbfunc(&openib_btl->super, endpoint, &frag->base, OMPI_SUCCESS); /* return send wqe */ - OPAL_THREAD_ADD32(&endpoint->sd_wqe[BTL_OPENIB_HP_QP], 1); + OPAL_THREAD_ADD32(&endpoint->sd_wqe[qp], 1); if(mca_btl_openib_component.use_srq) - OPAL_THREAD_ADD32(&openib_btl->sd_tokens[BTL_OPENIB_HP_QP], 1); + OPAL_THREAD_ADD32(&openib_btl->sd_tokens[qp], 1); /* check to see if we need to progress any pending desciptors */ - btl_openib_frag_progress_pending(openib_btl, endpoint, - BTL_OPENIB_HP_QP); - - if(!mca_btl_openib_component.use_srq) { - /* check to see if we need to return credits */ - if((endpoint->rd_credits[BTL_OPENIB_HP_QP] >= - mca_btl_openib_component.rd_win || - endpoint->eager_rdma_local.credits >= - mca_btl_openib_component.rd_win) && - OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP], 1) == 1) { - mca_btl_openib_endpoint_send_credits_hp(endpoint); - } + btl_openib_frag_progress_pending(openib_btl, endpoint, qp); + /* check to see if we need to return credits */ + if(btl_openib_check_send_credits(endpoint, qp)) { + mca_btl_openib_endpoint_send_credits(endpoint, qp); } count++; @@ -1035,131 +1048,33 @@ static int btl_openib_component_progress(void) case IBV_WC_RECV: if(wc.wc_flags & IBV_WC_WITH_IMM) { - endpoint = (mca_btl_openib_endpoint_t*)orte_pointer_array_get_item(openib_btl->endpoints, wc.imm_data); + endpoint = (mca_btl_openib_endpoint_t*) + orte_pointer_array_get_item(openib_btl->endpoints, + wc.imm_data); frag->endpoint = endpoint; } /* Process a RECV */ - ret = btl_openib_handle_incoming_hp(openib_btl, endpoint, frag, - wc.byte_len); + ret = btl_openib_handle_incoming(openib_btl, endpoint, frag, + wc.byte_len, qp); if (ret != OMPI_SUCCESS) { - openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL); + openib_btl->error_cb(&openib_btl->super, + MCA_BTL_ERROR_FLAGS_FATAL); return 0; } count++; break; default: - BTL_ERROR(("Unhandled work completion opcode is %d", wc.opcode)); - openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL); - break; - } - } - - - ne=ibv_poll_cq(openib_btl->ib_cq_lp, 1, &wc); - - if(ne != 0) { - if(ne < 0 || wc.status != IBV_WC_SUCCESS) - goto error_lp; - frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id; - endpoint = frag->endpoint; - /* Handle n/w completions */ - switch(wc.opcode) { - case IBV_WC_SEND: - /* Process a completed send - receiver must return tokens */ - frag->base.des_cbfunc(&openib_btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS); - - /* return send wqe */ - OPAL_THREAD_ADD32(&endpoint->sd_wqe[BTL_OPENIB_LP_QP], 1); - if(mca_btl_openib_component.use_srq) - OPAL_THREAD_ADD32(&openib_btl->sd_tokens[BTL_OPENIB_LP_QP], 1); - - /* check to see if we need to progress any pending desciptors */ - btl_openib_frag_progress_pending(openib_btl, endpoint, - BTL_OPENIB_LP_QP); - - if(!mca_btl_openib_component.use_srq) { - /* check to see if we need to return credits */ - if( endpoint->rd_credits[BTL_OPENIB_LP_QP] >= - mca_btl_openib_component.rd_win && - OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP], 1) == 1) { - mca_btl_openib_endpoint_send_credits_lp(endpoint); - } - } - count++; - break; - - case IBV_WC_RDMA_READ: - - OPAL_THREAD_ADD32(&frag->endpoint->get_tokens, 1); - /* fall through */ - - case IBV_WC_RDMA_WRITE: - /* process a completed write */ - frag->base.des_cbfunc(&openib_btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS); - - /* return send wqe */ - OPAL_THREAD_ADD32(&endpoint->sd_wqe[BTL_OPENIB_LP_QP], 1); - - /* check for pending frags */ - btl_openib_frag_progress_pending(openib_btl, endpoint, - BTL_OPENIB_LP_QP); - - count++; - break; - - case IBV_WC_RECV: - /* Process a RECV */ - credits = frag->hdr->credits; - - /* advance the segment address past the header and subtract from the length..*/ - frag->segment.seg_len = wc.byte_len- - ((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr); - - /* call registered callback */ - openib_btl->ib_reg[frag->hdr->tag].cbfunc(&openib_btl->super, - frag->hdr->tag, - &frag->base, - openib_btl->ib_reg[frag->hdr->tag].cbdata); - OMPI_FREE_LIST_RETURN(&(openib_btl->recv_free_max), (ompi_free_list_item_t*) frag); - - if(mca_btl_openib_component.use_srq) { - /* repost receive descriptors */ - OPAL_THREAD_ADD32((int32_t*)&openib_btl->srd_posted[BTL_OPENIB_LP_QP], -1); - mca_btl_openib_post_srr(openib_btl, 0, BTL_OPENIB_LP_QP); - } else { - /* repost receive descriptors */ - OPAL_THREAD_ADD32((int32_t*) - &endpoint->rd_posted[BTL_OPENIB_LP_QP], -1); - btl_openib_endpoint_post_rr(endpoint, 0, BTL_OPENIB_LP_QP); - - OPAL_THREAD_ADD32(&endpoint->sd_tokens[BTL_OPENIB_LP_QP], - credits); - - /* check to see if we need to progress any pending desciptors */ - btl_openib_frag_progress_pending(openib_btl, endpoint, - BTL_OPENIB_LP_QP); - - /* check to see if we need to return credits */ - if(endpoint->rd_credits[BTL_OPENIB_LP_QP] >= - mca_btl_openib_component.rd_win && - OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP], 1) == 1) { - mca_btl_openib_endpoint_send_credits_lp(endpoint); - } - } - count++; - break; - default: - BTL_ERROR(("Unhandled work completion opcode is %d", wc.opcode)); - openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL); + BTL_ERROR(("Unhandled work completion opcode is %d", + wc.opcode)); + openib_btl->error_cb(&openib_btl->super, + MCA_BTL_ERROR_FLAGS_FATAL); break; } } } return count; -error_lp: - qp = 1; -error_hp: +error: if(ne < 0){ BTL_ERROR(("error polling %s CQ with %d errno says %s\n", qp_name[qp], ne, strerror(errno))); diff --git a/ompi/mca/btl/openib/btl_openib_endpoint.c b/ompi/mca/btl/openib/btl_openib_endpoint.c index 7fa221d38e..ef91b40f8a 100644 --- a/ompi/mca/btl/openib/btl_openib_endpoint.c +++ b/ompi/mca/btl/openib/btl_openib_endpoint.c @@ -114,45 +114,30 @@ static inline int mca_btl_openib_endpoint_post_send(mca_btl_openib_module_t* ope mca_btl_openib_endpoint_t * endpoint, mca_btl_openib_frag_t * frag) { - int do_rdma = 0; - struct ibv_qp* ib_qp; + int do_rdma = 0, prio; struct ibv_send_wr* bad_wr; + frag->sg_entry.addr = (unsigned long) frag->hdr; - if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY) { - assert(frag->size <= openib_btl->super.btl_eager_limit); - if(btl_openib_acquire_send_resources(openib_btl, endpoint, frag, - BTL_OPENIB_HP_QP, &do_rdma) == OMPI_ERR_OUT_OF_RESOURCE) - return MPI_SUCCESS; + prio = (frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY) ? + BTL_OPENIB_HP_QP : BTL_OPENIB_LP_QP; - if(endpoint->eager_rdma_local.credits > 0) { - frag->hdr->credits = endpoint->eager_rdma_local.credits; - OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits, - -frag->hdr->credits); - frag->hdr->credits |= BTL_OPENIB_RDMA_CREDITS_FLAG; - } else if(endpoint->rd_credits[BTL_OPENIB_HP_QP] > 0) { - frag->hdr->credits = endpoint->rd_credits[BTL_OPENIB_HP_QP]; - OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_HP_QP], - -frag->hdr->credits); - } else { - frag->hdr->credits = 0; - } - ib_qp = endpoint->lcl_qp[BTL_OPENIB_HP_QP]; + if(btl_openib_acquire_send_resources(openib_btl, endpoint, frag, + prio, &do_rdma) == OMPI_ERR_OUT_OF_RESOURCE) + return MPI_SUCCESS; + + if(BTL_OPENIB_HP_QP == prio && endpoint->eager_rdma_local.credits > 0) { + frag->hdr->credits = endpoint->eager_rdma_local.credits; + OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits, + -frag->hdr->credits); + frag->hdr->credits |= BTL_OPENIB_RDMA_CREDITS_FLAG; + } else if(endpoint->rd_credits[prio] > 0) { + frag->hdr->credits = endpoint->rd_credits[prio]; + OPAL_THREAD_ADD32(&endpoint->rd_credits[prio], -frag->hdr->credits); } else { - if(btl_openib_acquire_send_resources(openib_btl, endpoint, frag, - BTL_OPENIB_LP_QP, NULL) == OMPI_ERR_OUT_OF_RESOURCE) - return MPI_SUCCESS; + frag->hdr->credits = 0; + } - if(endpoint->rd_credits[BTL_OPENIB_LP_QP] > 0) { - frag->hdr->credits = endpoint->rd_credits[BTL_OPENIB_LP_QP]; - OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_LP_QP], - -frag->hdr->credits); - } else { - frag->hdr->credits = 0; - } - ib_qp = endpoint->lcl_qp[BTL_OPENIB_LP_QP]; - } - frag->sg_entry.length = frag->segment.seg_len + sizeof(mca_btl_openib_header_t) + (do_rdma ? sizeof(mca_btl_openib_footer_t) : 0); @@ -184,7 +169,7 @@ static inline int mca_btl_openib_endpoint_post_send(mca_btl_openib_module_t* ope mca_btl_openib_component.eager_limit + sizeof(mca_btl_openib_footer_t); frag->wr_desc.sr_desc.wr.rdma.remote_addr -= frag->sg_entry.length; - MCA_BTL_OPENIB_RDMA_NEXT_INDEX (endpoint->eager_rdma_remote.head); + MCA_BTL_OPENIB_RDMA_NEXT_INDEX(endpoint->eager_rdma_remote.head); } else { if(mca_btl_openib_component.use_srq) { frag->wr_desc.sr_desc.opcode = IBV_WR_SEND_WITH_IMM; @@ -194,9 +179,24 @@ static inline int mca_btl_openib_endpoint_post_send(mca_btl_openib_module_t* ope } } - if(ibv_post_send(ib_qp, - &frag->wr_desc.sr_desc, - &bad_wr)) { + if(ibv_post_send(endpoint->lcl_qp[prio], &frag->wr_desc.sr_desc, + &bad_wr)) { + if(BTL_OPENIB_IS_RDMA_CREDITS(frag->hdr->credits)) { + OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits, + BTL_OPENIB_CREDITS(frag->hdr->credits)); + } else { + OPAL_THREAD_ADD32(&endpoint->rd_credits[prio], frag->hdr->credits); + } + OPAL_THREAD_ADD32(&endpoint->sd_wqe[prio], 1); + if(do_rdma) { + OPAL_THREAD_ADD32(&endpoint->eager_rdma_remote.tokens, 1); + } else { + if(mca_btl_openib_component.use_srq) { + OPAL_THREAD_ADD32(&openib_btl->sd_tokens[prio], 1); + } else { + OPAL_THREAD_ADD32(&endpoint->sd_tokens[prio], 1); + } + } BTL_ERROR(("error posting send request errno says %s\n", strerror(errno))); return OMPI_ERROR; @@ -416,12 +416,12 @@ static int mca_btl_openib_endpoint_start_connect(mca_btl_base_endpoint_t* endpoi /* Create the High Priority Queue Pair */ - if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl, - openib_btl->hca->ib_pd, - openib_btl->ib_cq_hp, - openib_btl->srq[BTL_OPENIB_HP_QP], - endpoint->lcl_qp_attr_hp, - &endpoint->lcl_qp[BTL_OPENIB_HP_QP]))) { + if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl, + openib_btl->hca->ib_pd, + openib_btl->ib_cq[BTL_OPENIB_HP_QP], + openib_btl->srq[BTL_OPENIB_HP_QP], + endpoint->lcl_qp_attr_hp, + &endpoint->lcl_qp[BTL_OPENIB_HP_QP]))) { BTL_ERROR(("error creating queue pair, error code %d", rc)); return rc; } @@ -429,12 +429,12 @@ static int mca_btl_openib_endpoint_start_connect(mca_btl_base_endpoint_t* endpoi endpoint->lcl_psn_hp = lrand48() & 0xffffff; /* Create the Low Priority Queue Pair */ - if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl, - openib_btl->hca->ib_pd, - openib_btl->ib_cq_lp, - openib_btl->srq[BTL_OPENIB_LP_QP], - endpoint->lcl_qp_attr_lp, - &endpoint->lcl_qp[BTL_OPENIB_LP_QP]))) { + if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl, + openib_btl->hca->ib_pd, + openib_btl->ib_cq[BTL_OPENIB_LP_QP], + openib_btl->srq[BTL_OPENIB_LP_QP], + endpoint->lcl_qp_attr_lp, + &endpoint->lcl_qp[BTL_OPENIB_LP_QP]))) { BTL_ERROR(("error creating queue pair, error code %d", rc)); return rc; } @@ -466,13 +466,12 @@ static int mca_btl_openib_endpoint_reply_start_connect(mca_btl_openib_endpoint_t /* Create the High Priority Queue Pair */ - if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl, - openib_btl->hca->ib_pd, - openib_btl->ib_cq_hp, - openib_btl->srq[BTL_OPENIB_HP_QP], - - endpoint->lcl_qp_attr_hp, - &endpoint->lcl_qp[BTL_OPENIB_HP_QP]))) { + if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl, + openib_btl->hca->ib_pd, + openib_btl->ib_cq[BTL_OPENIB_HP_QP], + openib_btl->srq[BTL_OPENIB_HP_QP], + endpoint->lcl_qp_attr_hp, + &endpoint->lcl_qp[BTL_OPENIB_HP_QP]))) { BTL_ERROR(("error creating queue pair, error code %d", rc)); return rc; } @@ -480,13 +479,12 @@ static int mca_btl_openib_endpoint_reply_start_connect(mca_btl_openib_endpoint_t endpoint->lcl_psn_hp = lrand48() & 0xffffff; /* Create the Low Priority Queue Pair */ - if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl, - openib_btl->hca->ib_pd, - openib_btl->ib_cq_lp, - openib_btl->srq[BTL_OPENIB_LP_QP], - - endpoint->lcl_qp_attr_lp, - &endpoint->lcl_qp[BTL_OPENIB_LP_QP]))) { + if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl, + openib_btl->hca->ib_pd, + openib_btl->ib_cq[BTL_OPENIB_LP_QP], + openib_btl->srq[BTL_OPENIB_LP_QP], + endpoint->lcl_qp_attr_lp, + &endpoint->lcl_qp[BTL_OPENIB_LP_QP]))) { BTL_ERROR(("error creating queue pair, error code %d", rc)); return rc; } @@ -870,34 +868,30 @@ int mca_btl_openib_endpoint_connect( mca_btl_openib_module_t* openib_btl = (mca_btl_openib_module_t*) endpoint->endpoint_btl; /* Connection establishment RC */ - rc = mca_btl_openib_endpoint_qp_init_query( - openib_btl, - endpoint->lcl_qp[BTL_OPENIB_HP_QP], - endpoint->lcl_qp_attr_hp, - endpoint->lcl_psn_hp, - endpoint->rem_info.rem_qp_num_hp, - endpoint->rem_info.rem_psn_hp, - endpoint->rem_info.rem_lid, - endpoint->rem_info.rem_mtu, - openib_btl->port_num - ); + rc = mca_btl_openib_endpoint_qp_init_query(openib_btl, + endpoint->lcl_qp[BTL_OPENIB_HP_QP], + endpoint->lcl_qp_attr_hp, + endpoint->lcl_psn_hp, + endpoint->rem_info.rem_qp_num_hp, + endpoint->rem_info.rem_psn_hp, + endpoint->rem_info.rem_lid, + endpoint->rem_info.rem_mtu, + openib_btl->port_num); if(rc != OMPI_SUCCESS) { return rc; } - rc = mca_btl_openib_endpoint_qp_init_query( - openib_btl, - endpoint->lcl_qp[BTL_OPENIB_LP_QP], - endpoint->lcl_qp_attr_lp, - endpoint->lcl_psn_lp, - endpoint->rem_info.rem_qp_num_lp, - endpoint->rem_info.rem_psn_lp, - endpoint->rem_info.rem_lid, - endpoint->rem_info.rem_mtu, - openib_btl->port_num - ); + rc = mca_btl_openib_endpoint_qp_init_query(openib_btl, + endpoint->lcl_qp[BTL_OPENIB_LP_QP], + endpoint->lcl_qp_attr_lp, + endpoint->lcl_psn_lp, + endpoint->rem_info.rem_qp_num_lp, + endpoint->rem_info.rem_psn_lp, + endpoint->rem_info.rem_lid, + endpoint->rem_info.rem_mtu, + openib_btl->port_num); @@ -905,8 +899,10 @@ int mca_btl_openib_endpoint_connect( return rc; } - MCA_BTL_IB_FRAG_ALLOC_CREDIT_WAIT(openib_btl, endpoint->hp_credit_frag, rc); - MCA_BTL_IB_FRAG_ALLOC_CREDIT_WAIT(openib_btl, endpoint->lp_credit_frag, rc); + MCA_BTL_IB_FRAG_ALLOC_CREDIT_WAIT(openib_btl, + endpoint->credit_frag[BTL_OPENIB_HP_QP], rc); + MCA_BTL_IB_FRAG_ALLOC_CREDIT_WAIT(openib_btl, + endpoint->credit_frag[BTL_OPENIB_LP_QP], rc); if(mca_btl_openib_component.use_srq) { mca_btl_openib_post_srr(openib_btl, 1, BTL_OPENIB_HP_QP); @@ -1055,24 +1051,27 @@ int mca_btl_openib_endpoint_qp_init_query( * Return control fragment. */ -static void mca_btl_openib_endpoint_credits_lp( +static void mca_btl_openib_endpoint_credits( mca_btl_base_module_t* btl, struct mca_btl_base_endpoint_t* endpoint, struct mca_btl_base_descriptor_t* descriptor, int status) { - int32_t credits; + int32_t credits, prio; + + if((void*)descriptor == (void*)endpoint->credit_frag[BTL_OPENIB_LP_QP]) + prio = BTL_OPENIB_LP_QP; + else + prio = BTL_OPENIB_HP_QP; /* we don't acquire a wqe or token for credit message - so decrement */ - OPAL_THREAD_ADD32(&endpoint->sd_wqe[BTL_OPENIB_LP_QP],-1); + OPAL_THREAD_ADD32(&endpoint->sd_wqe[prio],-1); /* check to see if there are addditional credits to return */ - if ((credits = OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP],-1)) > 0) { - OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP],-credits); - if (endpoint->rd_credits[BTL_OPENIB_LP_QP] >= - mca_btl_openib_component.rd_win && - OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP],1) == 1) { - mca_btl_openib_endpoint_send_credits_lp(endpoint); + if((credits = OPAL_THREAD_ADD32(&endpoint->sd_credits[prio],-1)) > 0) { + OPAL_THREAD_ADD32(&endpoint->sd_credits[prio], -credits); + if(btl_openib_check_send_credits(endpoint, prio)) { + mca_btl_openib_endpoint_send_credits(endpoint, prio); } } } @@ -1081,120 +1080,38 @@ static void mca_btl_openib_endpoint_credits_lp( * Return credits to peer */ -void mca_btl_openib_endpoint_send_credits_lp( - mca_btl_openib_endpoint_t* endpoint) +void mca_btl_openib_endpoint_send_credits(mca_btl_openib_endpoint_t* endpoint, + const int prio) { mca_btl_openib_module_t* openib_btl = endpoint->endpoint_btl; mca_btl_openib_frag_t* frag; struct ibv_send_wr* bad_wr; mca_btl_openib_rdma_credits_header_t *credits_hdr; - frag = endpoint->lp_credit_frag; - credits_hdr = (mca_btl_openib_rdma_credits_header_t*)frag->segment.seg_addr.pval; + frag = endpoint->credit_frag[prio]; + credits_hdr = + (mca_btl_openib_rdma_credits_header_t*)frag->segment.seg_addr.pval; - frag->base.des_cbfunc = mca_btl_openib_endpoint_credits_lp; + frag->base.des_cbfunc = mca_btl_openib_endpoint_credits; frag->base.des_cbdata = NULL; frag->endpoint = endpoint; frag->hdr->tag = MCA_BTL_TAG_BTL; - if(endpoint->rd_credits[BTL_OPENIB_LP_QP] > 0) { - frag->hdr->credits = endpoint->rd_credits[BTL_OPENIB_LP_QP]; - OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_LP_QP], - -frag->hdr->credits); + /* send credits for high/low prios */ + if(endpoint->rd_credits[prio] > 0) { + frag->hdr->credits = endpoint->rd_credits[prio]; + OPAL_THREAD_ADD32(&endpoint->rd_credits[prio], -frag->hdr->credits); } else { frag->hdr->credits = 0; } - credits_hdr->control.type = MCA_BTL_OPENIB_CONTROL_CREDITS; - credits_hdr->rdma_credits = 0; - - if(mca_btl_openib_component.use_srq) { - frag->wr_desc.sr_desc.opcode = IBV_WR_SEND_WITH_IMM; - frag->wr_desc.sr_desc.imm_data = endpoint->rem_info.rem_index; - } else { - frag->wr_desc.sr_desc.opcode = IBV_WR_SEND; - } - frag->sg_entry.length = sizeof(mca_btl_openib_header_t) + - sizeof(mca_btl_openib_rdma_credits_header_t); - frag->sg_entry.addr = (unsigned long) frag->hdr; - - if(frag->sg_entry.length <= openib_btl->ib_inline_max) { - frag->wr_desc.sr_desc.send_flags = IBV_SEND_INLINE | IBV_SEND_SIGNALED; - } else { - frag->wr_desc.sr_desc.send_flags = IBV_SEND_SIGNALED; - } - - if(ibv_post_send(endpoint->lcl_qp[BTL_OPENIB_LP_QP], - &frag->wr_desc.sr_desc, - &bad_wr)) { - OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP], -1); - OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_LP_QP], - frag->hdr->credits); - MCA_BTL_IB_FRAG_RETURN(openib_btl, frag); - BTL_ERROR(("error posting send request errno %d says %s", strerror(errno))); - return; - } -} - - -/** - * Return control fragment. - */ - -static void mca_btl_openib_endpoint_credits_hp( - mca_btl_base_module_t* btl, - struct mca_btl_base_endpoint_t* endpoint, - struct mca_btl_base_descriptor_t* descriptor, - int status) -{ - int32_t credits; - - /* we don't acquire a wqe or token for credit message - so decrement */ - OPAL_THREAD_ADD32(&endpoint->sd_wqe[BTL_OPENIB_HP_QP],-1); - - /* check to see if there are addditional credits to return */ - if ((credits = OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP],-1)) > 0) { - OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP],-credits); - if ((endpoint->rd_credits[BTL_OPENIB_HP_QP] >= mca_btl_openib_component.rd_win || - endpoint->eager_rdma_local.credits >= mca_btl_openib_component.rd_win) && - OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP],1) == 1) { - mca_btl_openib_endpoint_send_credits_hp(endpoint); - } - } -} - -/** - * Return credits to peer - */ - -void mca_btl_openib_endpoint_send_credits_hp( - mca_btl_openib_endpoint_t* endpoint) -{ - mca_btl_openib_module_t* openib_btl = endpoint->endpoint_btl; - mca_btl_openib_frag_t* frag; - struct ibv_send_wr* bad_wr; - mca_btl_openib_rdma_credits_header_t *credits_hdr; - - frag = endpoint->hp_credit_frag; - - credits_hdr = (mca_btl_openib_rdma_credits_header_t*)frag->segment.seg_addr.pval; - frag->base.des_cbfunc = mca_btl_openib_endpoint_credits_hp; - frag->base.des_cbdata = NULL; - frag->endpoint = endpoint; - - frag->hdr->tag = MCA_BTL_TAG_BTL; - if(endpoint->rd_credits[BTL_OPENIB_HP_QP] > 0) { - frag->hdr->credits = endpoint->rd_credits[BTL_OPENIB_HP_QP]; - OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_HP_QP], - -frag->hdr->credits); - } else - frag->hdr->credits = 0; - if(endpoint->eager_rdma_local.credits > 0) { + /* send eager RDMA credits only for high prio */ + if(BTL_OPENIB_HP_QP == prio && endpoint->eager_rdma_local.credits > 0) { credits_hdr->rdma_credits = endpoint->eager_rdma_local.credits; OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits, -credits_hdr->rdma_credits); - } else + } else { credits_hdr->rdma_credits = 0; - + } credits_hdr->control.type = MCA_BTL_OPENIB_CONTROL_CREDITS; if(mca_btl_openib_component.use_srq) { @@ -1213,16 +1130,13 @@ void mca_btl_openib_endpoint_send_credits_hp( frag->wr_desc.sr_desc.send_flags = IBV_SEND_SIGNALED; } - if(ibv_post_send(endpoint->lcl_qp[BTL_OPENIB_HP_QP], - &frag->wr_desc.sr_desc, - &bad_wr)) { - OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP], -1); - OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_HP_QP], - frag->hdr->credits); - MCA_BTL_IB_FRAG_RETURN(openib_btl, frag); - BTL_ERROR(("error posting send request errno %d says %s", errno, + if(ibv_post_send(endpoint->lcl_qp[prio], &frag->wr_desc.sr_desc, &bad_wr)) { + OPAL_THREAD_ADD32(&endpoint->sd_credits[prio], -1); + OPAL_THREAD_ADD32(&endpoint->rd_credits[prio], frag->hdr->credits); + OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits, + credits_hdr->rdma_credits); + BTL_ERROR(("error posting send request errno %d says %s", strerror(errno))); - return; } } @@ -1260,8 +1174,7 @@ static int mca_btl_openib_endpoint_send_eager_rdma( rdma_hdr->rkey = endpoint->eager_rdma_local.reg->mr->rkey; rdma_hdr->rdma_start.pval = endpoint->eager_rdma_local.base.pval; frag->segment.seg_len = sizeof(mca_btl_openib_eager_rdma_header_t); - if (mca_btl_openib_endpoint_send(endpoint, frag) != - OMPI_SUCCESS) { + if (mca_btl_openib_endpoint_send(endpoint, frag) != OMPI_SUCCESS) { MCA_BTL_IB_FRAG_RETURN(openib_btl, frag); BTL_ERROR(("Error sending RDMA buffer", strerror(errno))); return -1; diff --git a/ompi/mca/btl/openib/btl_openib_endpoint.h b/ompi/mca/btl/openib/btl_openib_endpoint.h index 2d5920f57b..67c742bcaf 100644 --- a/ompi/mca/btl/openib/btl_openib_endpoint.h +++ b/ompi/mca/btl/openib/btl_openib_endpoint.h @@ -154,8 +154,7 @@ struct mca_btl_base_endpoint_t { /**< info about local RDMA buffer */ int32_t eager_rdma_index; /**< index into RDMA buffers pointer array */ uint32_t index; /**< index of the endpoint in endpoints array */ - struct mca_btl_openib_frag_t *hp_credit_frag; /**< frag for sending explicit high priority credits */ - struct mca_btl_openib_frag_t *lp_credit_frag; /**< frag for sending explicit low priority credits */ + struct mca_btl_openib_frag_t *credit_frag[2]; /**< frags for sending explicit high priority credits */ }; typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t; @@ -166,8 +165,7 @@ OBJ_CLASS_DECLARATION(mca_btl_openib_endpoint_t); int mca_btl_openib_endpoint_send(mca_btl_base_endpoint_t* endpoint, struct mca_btl_openib_frag_t* frag); int mca_btl_openib_endpoint_connect(mca_btl_base_endpoint_t*); void mca_btl_openib_post_recv(void); -void mca_btl_openib_endpoint_send_credits_hp(mca_btl_base_endpoint_t*); -void mca_btl_openib_endpoint_send_credits_lp(mca_btl_base_endpoint_t*); +void mca_btl_openib_endpoint_send_credits(mca_btl_base_endpoint_t*, const int); void mca_btl_openib_endpoint_connect_eager_rdma(mca_btl_openib_endpoint_t*); static inline int btl_openib_endpoint_post_rr(mca_btl_base_endpoint_t *endpoint, @@ -195,9 +193,6 @@ static inline int btl_openib_endpoint_post_rr(mca_btl_base_endpoint_t *endpoint, OMPI_FREE_LIST_WAIT(free_list, item, rc); frag = (mca_btl_openib_frag_t*)item; frag->endpoint = endpoint; - frag->sg_entry.length = frag->size + - ((unsigned char*)frag->segment.seg_addr.pval - - (unsigned char*)frag->hdr); if(ibv_post_recv(endpoint->lcl_qp[prio], &frag->wr_desc.rd_desc, &bad_wr)) { BTL_ERROR(("error posting receive errno says %s\n", @@ -212,6 +207,23 @@ static inline int btl_openib_endpoint_post_rr(mca_btl_base_endpoint_t *endpoint, return OMPI_SUCCESS; } +static inline int btl_openib_check_send_credits( + mca_btl_openib_endpoint_t *endpoint, const int prio) +{ + if(!mca_btl_openib_component.use_srq && + endpoint->rd_credits[prio] >= mca_btl_openib_component.rd_win) + return OPAL_THREAD_ADD32(&endpoint->sd_credits[prio], 1) == 1; + + if(BTL_OPENIB_LP_QP == prio) /* nothing more for low prio QP */ + return 0; + + /* for high prio check eager RDMA credits */ + if(endpoint->eager_rdma_local.credits >= mca_btl_openib_component.rd_win) + return OPAL_THREAD_ADD32(&endpoint->sd_credits[prio], 1) == 1; + + return 0; +} + #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/ompi/mca/btl/openib/btl_openib_frag.c b/ompi/mca/btl/openib/btl_openib_frag.c index 18a6b82670..5ba626b6af 100644 --- a/ompi/mca/btl/openib/btl_openib_frag.c +++ b/ompi/mca/btl/openib/btl_openib_frag.c @@ -38,7 +38,7 @@ static void mca_btl_openib_frag_common_constructor( mca_btl_openib_frag_t* frag) } frag->segment.seg_len = frag->size; frag->sg_entry.addr = (unsigned long) frag->hdr; - frag->sg_entry.length = frag->size; + frag->sg_entry.length = frag->size + sizeof(mca_btl_openib_header_t); frag->base.des_flags = 0; }