/* -*- Mode: C; c-basic-offset:4 ; -*- */ /* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2007 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006-2008 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2006-2007 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2006-2007 Voltaire All rights reserved. * Copyright (c) 2006-2008 Mellanox Technologies, Inc. All rights reserved. * * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "ompi_config.h" #include #include #include #include #include "opal_stdint.h" #include "orte/mca/oob/base/base.h" #include "orte/mca/rml/rml.h" #include "orte/mca/errmgr/errmgr.h" #include "ompi/types.h" #include "ompi/mca/pml/base/pml_base_sendreq.h" #include "ompi/class/ompi_free_list.h" #include "btl_openib_endpoint.h" #include "btl_openib_proc.h" #include "btl_openib_xrc.h" #include "btl_openib_async.h" #include "connect/connect.h" static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint); static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint); static int post_send(mca_btl_openib_endpoint_t *ep, mca_btl_openib_send_frag_t *frag, const bool rdma) { mca_btl_openib_module_t *openib_btl = ep->endpoint_btl; mca_btl_base_segment_t *seg = &to_base_frag(frag)->segment; struct ibv_sge *sg = &to_com_frag(frag)->sg_entry; struct ibv_send_wr *sr_desc = &to_out_frag(frag)->sr_desc; struct ibv_send_wr *bad_wr; int qp = to_base_frag(frag)->base.order; sg->length = seg->seg_len + sizeof(mca_btl_openib_header_t) + (rdma ? sizeof(mca_btl_openib_footer_t) : 0) + frag->coalesced_length; sr_desc->send_flags = ib_send_flags(sg->length, &(ep->qps[qp])); if(ep->nbo) BTL_OPENIB_HEADER_HTON(*frag->hdr); if(rdma) { int32_t head; mca_btl_openib_footer_t* ftr = (mca_btl_openib_footer_t*)(((char*)frag->hdr) + sg->length - sizeof(mca_btl_openib_footer_t)); sr_desc->opcode = IBV_WR_RDMA_WRITE; MCA_BTL_OPENIB_RDMA_FRAG_SET_SIZE(ftr, sg->length); MCA_BTL_OPENIB_RDMA_MAKE_LOCAL(ftr); #if OMPI_ENABLE_DEBUG ftr->seq = ep->eager_rdma_remote.seq++; #endif if(ep->nbo) BTL_OPENIB_FOOTER_HTON(*ftr); sr_desc->wr.rdma.rkey = ep->eager_rdma_remote.rkey; MCA_BTL_OPENIB_RDMA_MOVE_INDEX(ep->eager_rdma_remote.head, head); sr_desc->wr.rdma.remote_addr = ep->eager_rdma_remote.base.lval + head * openib_btl->eager_rdma_frag_size + sizeof(mca_btl_openib_header_t) + mca_btl_openib_component.eager_limit + sizeof(mca_btl_openib_footer_t); sr_desc->wr.rdma.remote_addr -= sg->length; } else { if(BTL_OPENIB_QP_TYPE_PP(qp)) { sr_desc->opcode = IBV_WR_SEND; } else { sr_desc->opcode = IBV_WR_SEND_WITH_IMM; sr_desc->imm_data = ep->rem_info.rem_index; } } #if HAVE_XRC if(BTL_OPENIB_QP_TYPE_XRC(qp)) sr_desc->xrc_remote_srq_num = ep->rem_info.rem_srqs[qp].rem_srq_num; #endif assert(sg->addr == (uint64_t)(uintptr_t)frag->hdr); return ibv_post_send(ep->qps[qp].qp->lcl_qp, sr_desc, &bad_wr); } static inline int acruire_wqe(mca_btl_openib_endpoint_t *ep, mca_btl_openib_send_frag_t *frag) { int qp = to_base_frag(frag)->base.order; int prio = !(to_base_frag(frag)->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY); if(qp_get_wqe(ep, qp) < 0) { qp_put_wqe(ep, qp); OPAL_THREAD_LOCK(&ep->qps[qp].qp->lock); opal_list_append(&ep->qps[qp].qp->pending_frags[prio], (opal_list_item_t *)frag); OPAL_THREAD_UNLOCK(&ep->qps[qp].qp->lock); return OMPI_ERR_OUT_OF_RESOURCE; } return OMPI_SUCCESS; } static inline int acquire_eager_rdma_send_credit(mca_btl_openib_endpoint_t *endpoint) { if(OPAL_THREAD_ADD32(&endpoint->eager_rdma_remote.tokens, -1) < 0) { OPAL_THREAD_ADD32(&endpoint->eager_rdma_remote.tokens, 1); return OMPI_ERR_OUT_OF_RESOURCE; } return OMPI_SUCCESS; } static int acquire_send_credit(mca_btl_openib_endpoint_t *endpoint, mca_btl_openib_send_frag_t *frag) { mca_btl_openib_module_t *openib_btl = endpoint->endpoint_btl; int qp = to_base_frag(frag)->base.order; int prio = !(to_base_frag(frag)->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY); if(BTL_OPENIB_QP_TYPE_PP(qp)) { if(OPAL_THREAD_ADD32(&endpoint->qps[qp].u.pp_qp.sd_credits, -1) < 0) { OPAL_THREAD_ADD32(&endpoint->qps[qp].u.pp_qp.sd_credits, 1); opal_list_append(&endpoint->qps[qp].pending_frags[prio], (opal_list_item_t *)frag); return OMPI_ERR_OUT_OF_RESOURCE; } } else { if(OPAL_THREAD_ADD32(&openib_btl->qps[qp].u.srq_qp.sd_credits, -1) < 0) { OPAL_THREAD_ADD32(&openib_btl->qps[qp].u.srq_qp.sd_credits, 1); OPAL_THREAD_LOCK(&openib_btl->ib_lock); opal_list_append(&openib_btl->qps[qp].u.srq_qp.pending_frags[prio], (opal_list_item_t *)frag); OPAL_THREAD_UNLOCK(&openib_btl->ib_lock); return OMPI_ERR_OUT_OF_RESOURCE; } } return OMPI_SUCCESS; } #define GET_CREDITS(FROM, TO) \ do { \ TO = FROM; \ } while(0 == OPAL_ATOMIC_CMPSET_32(&FROM, TO, 0)) /* this function is called with endpoint->endpoint_lock held */ int mca_btl_openib_endpoint_post_send(mca_btl_openib_endpoint_t *endpoint, mca_btl_openib_send_frag_t *frag) { mca_btl_openib_header_t *hdr = frag->hdr; mca_btl_base_descriptor_t *des = &to_base_frag(frag)->base; int qp, ib_rc; int32_t cm_return; bool do_rdma = false; size_t eager_limit; if(OPAL_LIKELY(des->order == MCA_BTL_NO_ORDER)) des->order = frag->qp_idx; qp = des->order; if(acruire_wqe(endpoint, frag) != OMPI_SUCCESS) return OMPI_ERR_RESOURCE_BUSY; eager_limit = mca_btl_openib_component.eager_limit + sizeof(mca_btl_openib_header_coalesced_t) + sizeof(mca_btl_openib_control_header_t); if(des->des_src->seg_len + frag->coalesced_length <= eager_limit && (des->des_flags & MCA_BTL_DES_FLAGS_PRIORITY)) { /* High priority frag. Try to send over eager RDMA */ if(acquire_eager_rdma_send_credit(endpoint) == OMPI_SUCCESS) do_rdma = true; } if(!do_rdma && acquire_send_credit(endpoint, frag) != OMPI_SUCCESS) { qp_put_wqe(endpoint, qp); return OMPI_ERR_RESOURCE_BUSY; } GET_CREDITS(endpoint->eager_rdma_local.credits, hdr->credits); if(hdr->credits) hdr->credits |= BTL_OPENIB_RDMA_CREDITS_FLAG; if(!do_rdma) { if(BTL_OPENIB_QP_TYPE_PP(qp) && 0 == hdr->credits) { GET_CREDITS(endpoint->qps[qp].u.pp_qp.rd_credits, hdr->credits); } } else { hdr->credits |= (qp << 11); } GET_CREDITS(endpoint->qps[qp].u.pp_qp.cm_return, cm_return); /* cm_seen is only 8 bytes, but cm_return is 32 bytes */ if(cm_return > 255) { hdr->cm_seen = 255; cm_return -= 255; OPAL_THREAD_ADD32(&endpoint->qps[qp].u.pp_qp.cm_return, cm_return); } else { hdr->cm_seen = cm_return; } ib_rc = post_send(endpoint, frag, do_rdma); if(!ib_rc) return OMPI_SUCCESS; if(endpoint->nbo) BTL_OPENIB_HEADER_NTOH(*hdr); if(BTL_OPENIB_IS_RDMA_CREDITS(hdr->credits)) { OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits, BTL_OPENIB_CREDITS(hdr->credits)); } qp_put_wqe(endpoint, qp); if(do_rdma) { OPAL_THREAD_ADD32(&endpoint->eager_rdma_remote.tokens, 1); } else { if(BTL_OPENIB_QP_TYPE_PP(qp)) { OPAL_THREAD_ADD32(&endpoint->qps[qp].u.pp_qp.rd_credits, hdr->credits); OPAL_THREAD_ADD32(&endpoint->qps[qp].u.pp_qp.sd_credits, 1); } else if BTL_OPENIB_QP_TYPE_SRQ(qp){ mca_btl_openib_module_t *openib_btl = endpoint->endpoint_btl; OPAL_THREAD_ADD32(&openib_btl->qps[qp].u.srq_qp.sd_credits, 1); } } BTL_ERROR(("error posting send request error %d: %s\n", ib_rc, strerror(ib_rc))); return OMPI_ERROR; } OBJ_CLASS_INSTANCE(mca_btl_openib_endpoint_t, opal_object_t, mca_btl_openib_endpoint_construct, mca_btl_openib_endpoint_destruct); /* * Initialize state of the endpoint instance. * */ static mca_btl_openib_qp_t *endpoint_alloc_qp(void) { mca_btl_openib_qp_t *qp = calloc(1, sizeof(mca_btl_openib_qp_t)); if(!qp) { BTL_ERROR(("Failed to allocate memory for qp")); return NULL; } OBJ_CONSTRUCT(&qp->pending_frags[0], opal_list_t); OBJ_CONSTRUCT(&qp->pending_frags[1], opal_list_t); OBJ_CONSTRUCT(&qp->lock, opal_mutex_t); return qp; } static void endpoint_init_qp_pp(mca_btl_openib_endpoint_qp_t *ep_qp, const int qp) { mca_btl_openib_qp_info_t *qp_info = &mca_btl_openib_component.qp_infos[qp]; ep_qp->qp = endpoint_alloc_qp(); ep_qp->qp->users++; /* local credits are set here such that on initial posting * of the receive buffers we end up with zero credits to return * to our peer. The peer initializes his sd_credits to reflect this * below. Note that this may be a problem for iWARP as the sender * now has credits even if the receive buffers are not yet posted */ ep_qp->u.pp_qp.rd_credits = -qp_info->rd_num; ep_qp->u.pp_qp.rd_posted = 0; ep_qp->u.pp_qp.cm_sent = 0; ep_qp->u.pp_qp.cm_return = -qp_info->u.pp_qp.rd_rsv; ep_qp->u.pp_qp.cm_received = qp_info->u.pp_qp.rd_rsv; /* initialize the local view of credits */ ep_qp->u.pp_qp.sd_credits = qp_info->rd_num; /* number of available send WQEs */ ep_qp->qp->sd_wqe = qp_info->rd_num; } static void endpoint_init_qp_srq(mca_btl_openib_endpoint_qp_t *ep_qp, const int qp) { ep_qp->qp = endpoint_alloc_qp(); ep_qp->qp->users++; /* number of available send WQEs */ ep_qp->qp->sd_wqe = mca_btl_openib_component.qp_infos[qp].u.srq_qp.sd_max; } static void endpoint_init_qp_xrc(mca_btl_base_endpoint_t *ep, const int qp) { int max = ep->endpoint_btl->hca->ib_dev_attr.max_qp_wr - (mca_btl_openib_component.use_eager_rdma ? mca_btl_openib_component.max_eager_rdma : 0); mca_btl_openib_endpoint_qp_t *ep_qp = &ep->qps[qp]; ep_qp->qp = ep->ib_addr->qp; ep_qp->qp->sd_wqe += mca_btl_openib_component.qp_infos[qp].u.srq_qp.sd_max; /* make sure that we don't overrun maximum supported by hca */ if (ep_qp->qp->sd_wqe > max) ep_qp->qp->sd_wqe = max; ep_qp->qp->users++; } static void endpoint_init_qp(mca_btl_base_endpoint_t *ep, const int qp) { mca_btl_openib_endpoint_qp_t *ep_qp = &ep->qps[qp]; ep_qp->rd_credit_send_lock = 0; ep_qp->credit_frag = NULL; OBJ_CONSTRUCT(&ep_qp->pending_frags[0], opal_list_t); OBJ_CONSTRUCT(&ep_qp->pending_frags[1], opal_list_t); switch(BTL_OPENIB_QP_TYPE(qp)) { case MCA_BTL_OPENIB_PP_QP: endpoint_init_qp_pp(ep_qp, qp); break; case MCA_BTL_OPENIB_SRQ_QP: endpoint_init_qp_srq(ep_qp, qp); break; case MCA_BTL_OPENIB_XRC_QP: if (NULL == ep->ib_addr->qp) { ep->ib_addr->qp = endpoint_alloc_qp(); } endpoint_init_qp_xrc(ep, qp); break; default: BTL_ERROR(("Wrong QP type")); break; } } void mca_btl_openib_endpoint_init(mca_btl_openib_module_t *btl, mca_btl_base_endpoint_t *ep, ompi_btl_openib_connect_base_module_t *local_cpc, mca_btl_openib_proc_modex_t *remote_proc_info, ompi_btl_openib_connect_base_module_data_t *remote_cpc_data) { int qp; ep->endpoint_btl = btl; ep->use_eager_rdma = btl->hca->use_eager_rdma & mca_btl_openib_component.use_eager_rdma; ep->subnet_id = btl->port_info.subnet_id; ep->endpoint_local_cpc = local_cpc; ep->endpoint_remote_cpc_data = remote_cpc_data; ep->rem_info.rem_lid = remote_proc_info->pm_port_info.lid; ep->rem_info.rem_subnet_id = remote_proc_info->pm_port_info.subnet_id; ep->rem_info.rem_mtu = remote_proc_info->pm_port_info.mtu; opal_output(-1, "Got remote LID, subnet, MTU: %d, 0x%" PRIx64 ", %d", ep->rem_info.rem_lid, ep->rem_info.rem_subnet_id, ep->rem_info.rem_mtu); for (qp = 0; qp < mca_btl_openib_component.num_qps; qp++) { endpoint_init_qp(ep, qp); } } static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint) { /* setup qp structures */ endpoint->qps = (mca_btl_openib_endpoint_qp_t*) calloc(mca_btl_openib_component.num_qps, sizeof(mca_btl_openib_endpoint_qp_t)); if (MCA_BTL_XRC_ENABLED) { endpoint->rem_info.rem_qps = (mca_btl_openib_rem_qp_info_t*) calloc(1, sizeof(mca_btl_openib_rem_qp_info_t)); endpoint->rem_info.rem_srqs = (mca_btl_openib_rem_srq_info_t*) calloc(mca_btl_openib_component.num_xrc_qps, sizeof(mca_btl_openib_rem_srq_info_t)); } else { endpoint->rem_info.rem_qps = (mca_btl_openib_rem_qp_info_t*) calloc(mca_btl_openib_component.num_qps, sizeof(mca_btl_openib_rem_qp_info_t)); endpoint->rem_info.rem_srqs = NULL; } endpoint->ib_addr = NULL; endpoint->xrc_recv_qp_num = 0; endpoint->endpoint_btl = 0; endpoint->endpoint_proc = 0; endpoint->endpoint_local_cpc = NULL; endpoint->endpoint_remote_cpc_data = NULL; endpoint->endpoint_tstamp = 0.0; endpoint->endpoint_state = MCA_BTL_IB_CLOSED; endpoint->endpoint_retries = 0; OBJ_CONSTRUCT(&endpoint->endpoint_lock, opal_mutex_t); OBJ_CONSTRUCT(&endpoint->pending_lazy_frags, opal_list_t); OBJ_CONSTRUCT(&endpoint->pending_get_frags, opal_list_t); OBJ_CONSTRUCT(&endpoint->pending_put_frags, opal_list_t); endpoint->get_tokens = mca_btl_openib_component.ib_qp_ous_rd_atom; /* initialize RDMA eager related parts */ endpoint->eager_recv_count = 0; memset(&endpoint->eager_rdma_remote, 0, sizeof(mca_btl_openib_eager_rdma_remote_t)); memset(&endpoint->eager_rdma_local, 0, sizeof(mca_btl_openib_eager_rdma_local_t)); OBJ_CONSTRUCT(&endpoint->eager_rdma_local.lock, opal_mutex_t); endpoint->rem_info.rem_lid = 0; endpoint->rem_info.rem_subnet_id = 0; endpoint->rem_info.rem_mtu = 0; endpoint->nbo = false; endpoint->use_eager_rdma = false; endpoint->eager_rdma_remote.tokens = 0; endpoint->eager_rdma_local.credits = 0; } /* * Destroy a endpoint * */ static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint) { bool pval_clean = false; int qp; /* If the CPC has an endpoint_finalize function, call it */ if (NULL != endpoint->endpoint_local_cpc->cbm_endpoint_finalize) { endpoint->endpoint_local_cpc->cbm_endpoint_finalize(endpoint); } /* Release memory resources */ do { /* Make sure that mca_btl_openib_endpoint_connect_eager_rdma () * was not in "connect" or "bad" flow (failed to allocate memory) * and changed the pointer back to NULL */ if(!opal_atomic_cmpset_ptr(&endpoint->eager_rdma_local.base.pval, NULL, (void*)1)) { if ((void*)1 != endpoint->eager_rdma_local.base.pval && NULL != endpoint->eager_rdma_local.base.pval) { endpoint->endpoint_btl->super.btl_mpool->mpool_free(endpoint->endpoint_btl->super.btl_mpool, endpoint->eager_rdma_local.base.pval, (mca_mpool_base_registration_t*)endpoint->eager_rdma_local.reg); pval_clean=true; } } else { pval_clean=true; } } while (!pval_clean); /* Close opened QPs if we have them*/ for(qp = 0; qp < mca_btl_openib_component.num_qps; qp++) { MCA_BTL_OPENIB_CLEAN_PENDING_FRAGS(&endpoint->qps[qp].pending_frags[0]); MCA_BTL_OPENIB_CLEAN_PENDING_FRAGS(&endpoint->qps[qp].pending_frags[1]); OBJ_DESTRUCT(&endpoint->qps[qp].pending_frags[0]); OBJ_DESTRUCT(&endpoint->qps[qp].pending_frags[1]); if(--endpoint->qps[qp].qp->users != 0) continue; MCA_BTL_OPENIB_CLEAN_PENDING_FRAGS( &endpoint->qps[qp].qp->pending_frags[0]); MCA_BTL_OPENIB_CLEAN_PENDING_FRAGS( &endpoint->qps[qp].qp->pending_frags[1]); OBJ_DESTRUCT(&endpoint->qps[qp].qp->pending_frags[0]); OBJ_DESTRUCT(&endpoint->qps[qp].qp->pending_frags[1]); if(endpoint->qps[qp].qp->lcl_qp != NULL) if(ibv_destroy_qp(endpoint->qps[qp].qp->lcl_qp)) BTL_ERROR(("Failed to destroy QP:%d\n", qp)); free(endpoint->qps[qp].qp); } /* free the qps */ free(endpoint->qps); endpoint->qps = NULL; /* unregister xrc recv qp */ #if HAVE_XRC if (0 != endpoint->xrc_recv_qp_num) { if(ibv_unreg_xrc_rcv_qp(endpoint->endpoint_btl->hca->xrc_domain, endpoint->xrc_recv_qp_num)) { BTL_ERROR(("Failed to unregister XRC recv QP:%d\n", endpoint->xrc_recv_qp_num)); } } #endif OBJ_DESTRUCT(&endpoint->endpoint_lock); /* Clean pending lists */ MCA_BTL_OPENIB_CLEAN_PENDING_FRAGS(&endpoint->pending_lazy_frags); OBJ_DESTRUCT(&endpoint->pending_lazy_frags); MCA_BTL_OPENIB_CLEAN_PENDING_FRAGS(&endpoint->pending_get_frags); OBJ_DESTRUCT(&endpoint->pending_get_frags); MCA_BTL_OPENIB_CLEAN_PENDING_FRAGS(&endpoint->pending_put_frags); OBJ_DESTRUCT(&endpoint->pending_put_frags); } /* * call when the connect module has created all the qp's on an * endpoint and needs to have some receive buffers posted */ int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t *endpoint) { int qp; for (qp = 0; qp < mca_btl_openib_component.num_qps; ++qp) { if (BTL_OPENIB_QP_TYPE_PP(qp)) { mca_btl_openib_endpoint_post_rr_nolock(endpoint, qp); } else { mca_btl_openib_post_srr(endpoint->endpoint_btl, qp); } } return OMPI_SUCCESS; } /* * called when the connect module has completed setup of an endpoint */ void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t *endpoint) { opal_list_item_t *frag_item, *ep_item; mca_btl_openib_send_frag_t *frag; mca_btl_openib_endpoint_t *ep; bool master = false; if (MCA_BTL_XRC_ENABLED) { OPAL_THREAD_LOCK(&endpoint->ib_addr->addr_lock); if (MCA_BTL_IB_ADDR_CONNECTED == endpoint->ib_addr->status) { /* We are not xrc master */ /* set our qp pointer to master qp */ master = false; } else { /* I'm master of XRC */ endpoint->ib_addr->status = MCA_BTL_IB_ADDR_CONNECTED; master = true; } } /* Run over all qps and load alternative path */ #if OMPI_HAVE_THREADS if (APM_ENABLED) { int i; if (MCA_BTL_XRC_ENABLED) { if (master) { mca_btl_openib_load_apm(endpoint->ib_addr->qp->lcl_qp, endpoint); } } else { for(i = 0; i < mca_btl_openib_component.num_qps; i++) { mca_btl_openib_load_apm(endpoint->qps[i].qp->lcl_qp, endpoint); } } } #endif endpoint->endpoint_state = MCA_BTL_IB_CONNECTED; endpoint->endpoint_btl->hca->non_eager_rdma_endpoints++; /* The connection is correctly setup. Now we can decrease the event trigger. */ opal_progress_event_users_decrement(); /* While there are frags in the list, process them */ while (!opal_list_is_empty(&(endpoint->pending_lazy_frags))) { frag_item = opal_list_remove_first(&(endpoint->pending_lazy_frags)); frag = to_send_frag(frag_item); /* We need to post this one */ if(OMPI_ERROR == mca_btl_openib_endpoint_post_send(endpoint, frag)) BTL_ERROR(("Error posting send")); } /* if upper layer called put or get before connection moved to connected * state then we restart them here */ mca_btl_openib_frag_progress_pending_put_get(endpoint, mca_btl_openib_component.rdma_qp); if(MCA_BTL_XRC_ENABLED) { while(master && !opal_list_is_empty(&endpoint->ib_addr->pending_ep)) { ep_item = opal_list_remove_first(&endpoint->ib_addr->pending_ep); ep = (mca_btl_openib_endpoint_t *)ep_item; if (OMPI_SUCCESS != endpoint->endpoint_local_cpc->cbm_start_connect(endpoint->endpoint_local_cpc, ep)) { BTL_ERROR(("Failed to connect pending endpoint\n")); } } OPAL_THREAD_UNLOCK(&endpoint->ib_addr->addr_lock); } } /* * Attempt to send a fragment using a given endpoint. If the endpoint is not * connected, queue the fragment and start the connection as required. */ int mca_btl_openib_endpoint_send(mca_btl_base_endpoint_t* ep, mca_btl_openib_send_frag_t* frag) { int rc; OPAL_THREAD_LOCK(&ep->endpoint_lock); rc = check_endpoint_state(ep, &to_base_frag(frag)->base, &ep->pending_lazy_frags); if(OPAL_LIKELY(rc == OMPI_SUCCESS)) rc = mca_btl_openib_endpoint_post_send(ep, frag); OPAL_THREAD_UNLOCK(&ep->endpoint_lock); return rc; } /** * Return control fragment. */ static void mca_btl_openib_endpoint_credits( mca_btl_base_module_t* btl, struct mca_btl_base_endpoint_t* ep, struct mca_btl_base_descriptor_t* des, int status) { int qp; mca_btl_openib_send_control_frag_t *frag = to_send_control_frag(des); qp = frag->qp_idx; /* we don't acquire a WQE for credit message - so decrement. * Note: doing it for QP used for credit management */ qp_get_wqe(ep, des->order); if(check_send_credits(ep, qp) || check_eager_rdma_credits(ep)) mca_btl_openib_endpoint_send_credits(ep, qp); else { BTL_OPENIB_CREDITS_SEND_UNLOCK(ep, qp); /* check one more time if credits are available after unlock */ send_credits(ep, qp); } } /** * Return credits to peer */ void mca_btl_openib_endpoint_send_credits(mca_btl_openib_endpoint_t* endpoint, const int qp) { mca_btl_openib_module_t* openib_btl = endpoint->endpoint_btl; mca_btl_openib_send_control_frag_t* frag; mca_btl_openib_rdma_credits_header_t *credits_hdr; int rc; bool do_rdma = false; int32_t cm_return; frag = endpoint->qps[qp].credit_frag; if(OPAL_UNLIKELY(NULL == frag)) { frag = alloc_credit_frag(openib_btl); frag->qp_idx = qp; endpoint->qps[qp].credit_frag = frag; /* set those once and forever */ to_base_frag(frag)->base.order = mca_btl_openib_component.credits_qp; to_base_frag(frag)->base.des_cbfunc = mca_btl_openib_endpoint_credits; to_base_frag(frag)->base.des_cbdata = NULL; to_com_frag(frag)->endpoint = endpoint; frag->hdr->tag = MCA_BTL_TAG_BTL; to_base_frag(frag)->segment.seg_len = sizeof(mca_btl_openib_rdma_credits_header_t); } assert(frag->qp_idx == qp); credits_hdr = (mca_btl_openib_rdma_credits_header_t*) to_base_frag(frag)->segment.seg_addr.pval; if(acquire_eager_rdma_send_credit(endpoint) == MPI_SUCCESS) { do_rdma = true; } else { if(OPAL_THREAD_ADD32(&endpoint->qps[qp].u.pp_qp.cm_sent, 1) > (mca_btl_openib_component.qp_infos[qp].u.pp_qp.rd_rsv - 1)) { OPAL_THREAD_ADD32(&endpoint->qps[qp].u.pp_qp.cm_sent, -1); BTL_OPENIB_CREDITS_SEND_UNLOCK(endpoint, qp); return; } } GET_CREDITS(endpoint->qps[qp].u.pp_qp.rd_credits, frag->hdr->credits); frag->hdr->cm_seen = 0; GET_CREDITS(endpoint->qps[qp].u.pp_qp.cm_return, cm_return); if(cm_return > 255) { frag->hdr->cm_seen = 255; cm_return -= 255; OPAL_THREAD_ADD32(&endpoint->qps[qp].u.pp_qp.cm_return, cm_return); } else { frag->hdr->cm_seen = cm_return; } GET_CREDITS(endpoint->eager_rdma_local.credits, credits_hdr->rdma_credits); credits_hdr->qpn = qp; credits_hdr->control.type = MCA_BTL_OPENIB_CONTROL_CREDITS; if(endpoint->nbo) BTL_OPENIB_RDMA_CREDITS_HEADER_HTON(*credits_hdr); if((rc = post_send(endpoint, frag, do_rdma)) == 0) return; if(endpoint->nbo) { BTL_OPENIB_HEADER_NTOH(*frag->hdr); BTL_OPENIB_RDMA_CREDITS_HEADER_NTOH(*credits_hdr); } BTL_OPENIB_CREDITS_SEND_UNLOCK(endpoint, qp); OPAL_THREAD_ADD32(&endpoint->qps[qp].u.pp_qp.rd_credits, frag->hdr->credits); OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits, credits_hdr->rdma_credits); if(do_rdma) OPAL_THREAD_ADD32(&endpoint->eager_rdma_remote.tokens, 1); else OPAL_THREAD_ADD32(&endpoint->qps[qp].u.pp_qp.cm_sent, -1); BTL_ERROR(("error posting send request errno %d says %s", rc, strerror(errno))); } /* local callback function for completion of eager rdma connect */ static void mca_btl_openib_endpoint_eager_rdma_connect_cb( mca_btl_base_module_t* btl, struct mca_btl_base_endpoint_t* endpoint, struct mca_btl_base_descriptor_t* descriptor, int status) { mca_btl_openib_hca_t *hca = endpoint->endpoint_btl->hca; OPAL_THREAD_ADD32(&hca->non_eager_rdma_endpoints, -1); assert(hca->non_eager_rdma_endpoints >= 0); MCA_BTL_IB_FRAG_RETURN(descriptor); } /* send the eager rdma connect message to the remote endpoint */ static int mca_btl_openib_endpoint_send_eager_rdma( mca_btl_base_endpoint_t* endpoint) { mca_btl_openib_module_t* openib_btl = endpoint->endpoint_btl; mca_btl_openib_eager_rdma_header_t *rdma_hdr; mca_btl_openib_send_control_frag_t* frag; int rc; frag = alloc_credit_frag(openib_btl); if(NULL == frag) { return -1; } to_base_frag(frag)->base.des_cbfunc = mca_btl_openib_endpoint_eager_rdma_connect_cb; to_base_frag(frag)->base.des_cbdata = NULL; to_base_frag(frag)->base.des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; to_base_frag(frag)->base.order = mca_btl_openib_component.credits_qp; to_base_frag(frag)->segment.seg_len = sizeof(mca_btl_openib_eager_rdma_header_t); to_com_frag(frag)->endpoint = endpoint; frag->hdr->tag = MCA_BTL_TAG_BTL; rdma_hdr = (mca_btl_openib_eager_rdma_header_t*)to_base_frag(frag)->segment.seg_addr.pval; rdma_hdr->control.type = MCA_BTL_OPENIB_CONTROL_RDMA; rdma_hdr->rkey = endpoint->eager_rdma_local.reg->mr->rkey; rdma_hdr->rdma_start.lval = ompi_ptr_ptol(endpoint->eager_rdma_local.base.pval); BTL_VERBOSE(("sending rkey %lu, rdma_start.lval %llu, pval %p, ival %u type %d and sizeof(rdma_hdr) %d\n", rdma_hdr->rkey, rdma_hdr->rdma_start.lval, rdma_hdr->rdma_start.pval, rdma_hdr->rdma_start.ival, rdma_hdr->control.type, sizeof(mca_btl_openib_eager_rdma_header_t) )); if(endpoint->nbo) { BTL_OPENIB_EAGER_RDMA_CONTROL_HEADER_HTON((*rdma_hdr)); BTL_VERBOSE(("after HTON: sending rkey %lu, rdma_start.lval %llu, pval %p, ival %u\n", rdma_hdr->rkey, rdma_hdr->rdma_start.lval, rdma_hdr->rdma_start.pval, rdma_hdr->rdma_start.ival )); } rc = mca_btl_openib_endpoint_send(endpoint, frag); if (OMPI_SUCCESS == rc ||OMPI_ERR_RESOURCE_BUSY == rc) return OMPI_SUCCESS; MCA_BTL_IB_FRAG_RETURN(frag); BTL_ERROR(("Error sending RDMA buffer", strerror(errno))); return rc; } /* Setup eager RDMA buffers and notify the remote endpoint*/ void mca_btl_openib_endpoint_connect_eager_rdma( mca_btl_openib_endpoint_t* endpoint) { mca_btl_openib_module_t* openib_btl = endpoint->endpoint_btl; char *buf; mca_btl_openib_recv_frag_t *headers_buf; int i; /* Set local rdma pointer to 1 temporarily so other threads will not try * to enter the function */ if(!opal_atomic_cmpset_ptr(&endpoint->eager_rdma_local.base.pval, NULL, (void*)1)) return; headers_buf = (mca_btl_openib_recv_frag_t*) malloc(sizeof(mca_btl_openib_recv_frag_t) * mca_btl_openib_component.eager_rdma_num); if(NULL == headers_buf) goto unlock_rdma_local; buf = openib_btl->super.btl_mpool->mpool_alloc(openib_btl->super.btl_mpool, openib_btl->eager_rdma_frag_size * mca_btl_openib_component.eager_rdma_num, mca_btl_openib_component.buffer_alignment, MCA_MPOOL_FLAGS_CACHE_BYPASS, (mca_mpool_base_registration_t**)&endpoint->eager_rdma_local.reg); if(!buf) goto free_headers_buf; buf = buf + openib_btl->eager_rdma_frag_size - sizeof(mca_btl_openib_footer_t) - openib_btl->super.btl_eager_limit - sizeof(mca_btl_openib_header_t); for(i = 0; i < mca_btl_openib_component.eager_rdma_num; i++) { ompi_free_list_item_t *item; mca_btl_openib_recv_frag_t * frag; mca_btl_openib_frag_init_data_t init_data; item = (ompi_free_list_item_t*)&headers_buf[i]; item->registration = (void*)endpoint->eager_rdma_local.reg; item->ptr = buf + i * openib_btl->eager_rdma_frag_size; OBJ_CONSTRUCT(item, mca_btl_openib_recv_frag_t); init_data.order = MCA_BTL_NO_ORDER; init_data.list = NULL; mca_btl_openib_frag_init(item, &init_data); frag = to_recv_frag(item); to_base_frag(frag)->type = MCA_BTL_OPENIB_FRAG_EAGER_RDMA; to_com_frag(frag)->endpoint = endpoint; frag->ftr = (mca_btl_openib_footer_t*) ((char*)to_base_frag(frag)->segment.seg_addr.pval + mca_btl_openib_component.eager_limit); MCA_BTL_OPENIB_RDMA_MAKE_REMOTE(frag->ftr); } endpoint->eager_rdma_local.frags = headers_buf; endpoint->eager_rdma_local.rd_win = mca_btl_openib_component.eager_rdma_num >> 2; endpoint->eager_rdma_local.rd_win = endpoint->eager_rdma_local.rd_win?endpoint->eager_rdma_local.rd_win:1; /* set local rdma pointer to real value */ opal_atomic_cmpset_ptr(&endpoint->eager_rdma_local.base.pval, (void*)1, buf); if(mca_btl_openib_endpoint_send_eager_rdma(endpoint) == OMPI_SUCCESS) { mca_btl_openib_hca_t *hca = endpoint->endpoint_btl->hca; mca_btl_openib_endpoint_t **p; OBJ_RETAIN(endpoint); assert(((opal_object_t*)endpoint)->obj_reference_count == 2); do { p = &hca->eager_rdma_buffers[hca->eager_rdma_buffers_count]; } while(!opal_atomic_cmpset_ptr(p, NULL, endpoint)); OPAL_THREAD_ADD32(&openib_btl->eager_rdma_channels, 1); /* from this point progress function starts to poll new buffer */ OPAL_THREAD_ADD32(&hca->eager_rdma_buffers_count, 1); return; } openib_btl->super.btl_mpool->mpool_free(openib_btl->super.btl_mpool, buf, (mca_mpool_base_registration_t*)endpoint->eager_rdma_local.reg); free_headers_buf: free(headers_buf); unlock_rdma_local: /* set local rdma pointer back to zero. Will retry later */ opal_atomic_cmpset_ptr(&endpoint->eager_rdma_local.base.pval, endpoint->eager_rdma_local.base.pval, NULL); endpoint->eager_rdma_local.frags = NULL; }