From 923666b75cbb5fbb4cdbc50664ca5c2f961b0a6a Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 28 Nov 2007 07:16:52 +0000 Subject: [PATCH] Process pending put/get frags on endpoint connection establishment. This commit was SVN r16785. --- ompi/mca/btl/openib/btl_openib.c | 72 +++++++++++++-------- ompi/mca/btl/openib/btl_openib.h | 3 + ompi/mca/btl/openib/btl_openib_component.c | 38 ++++++----- ompi/mca/btl/openib/btl_openib_endpoint.c | 75 +++++----------------- ompi/mca/btl/openib/btl_openib_endpoint.h | 33 ++++++++++ ompi/mca/btl/openib/connect/connect.h | 4 +- 6 files changed, 119 insertions(+), 106 deletions(-) diff --git a/ompi/mca/btl/openib/btl_openib.c b/ompi/mca/btl/openib/btl_openib.c index 831f8765c7..736cd83da6 100644 --- a/ompi/mca/btl/openib/btl_openib.c +++ b/ompi/mca/btl/openib/btl_openib.c @@ -913,7 +913,7 @@ int mca_btl_openib_send( */ int mca_btl_openib_put( mca_btl_base_module_t* btl, - mca_btl_base_endpoint_t* endpoint, + mca_btl_base_endpoint_t* ep, mca_btl_base_descriptor_t* descriptor) { struct ibv_send_wr* bad_wr; @@ -925,20 +925,31 @@ int mca_btl_openib_put( mca_btl_base_module_t* btl, assert(openib_frag_type(frag) == MCA_BTL_OPENIB_FRAG_SEND_USER || openib_frag_type(frag) == MCA_BTL_OPENIB_FRAG_SEND); + if(ep->endpoint_state != MCA_BTL_IB_CONNECTED) { + int rc; + OPAL_THREAD_LOCK(&ep->endpoint_lock); + rc = check_endpoint_state(ep, descriptor, &ep->pending_put_frags); + OPAL_THREAD_UNLOCK(&ep->ep_lock); + if(OMPI_ERR_TEMP_OUT_OF_RESOURCE == rc) + return OMPI_SUCCESS; + if(OMPI_SUCCESS != rc) + return rc; + } + if(MCA_BTL_NO_ORDER == qp) qp = mca_btl_openib_component.rdma_qp; /* check for a send wqe */ - if (qp_get_wqe(endpoint, qp) < 0) { - qp_put_wqe(endpoint, qp); - OPAL_THREAD_LOCK(&endpoint->endpoint_lock); - opal_list_append(&endpoint->pending_put_frags, (opal_list_item_t*)frag); - OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + if (qp_get_wqe(ep, qp) < 0) { + qp_put_wqe(ep, qp); + OPAL_THREAD_LOCK(&ep->endpoint_lock); + opal_list_append(&ep->pending_put_frags, (opal_list_item_t*)frag); + OPAL_THREAD_UNLOCK(&ep->endpoint_lock); return OMPI_SUCCESS; } /* post descriptor */ #if OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if((endpoint->endpoint_proc->proc_ompi->proc_arch & OMPI_ARCH_ISBIGENDIAN) + if((ep->endpoint_proc->proc_ompi->proc_arch & OMPI_ARCH_ISBIGENDIAN) != (ompi_proc_local()->proc_arch & OMPI_ARCH_ISBIGENDIAN)) { rem_addr = opal_swap_bytes8(rem_addr); rkey = opal_swap_bytes4(rkey); @@ -950,13 +961,13 @@ int mca_btl_openib_put( mca_btl_base_module_t* btl, to_com_frag(frag)->sg_entry.addr = (uint64_t)descriptor->des_src->seg_addr.pval; to_com_frag(frag)->sg_entry.length = descriptor->des_src->seg_len; - to_com_frag(frag)->endpoint = endpoint; + to_com_frag(frag)->endpoint = ep; descriptor->order = qp; /* Setting opcode on a frag constructor isn't enough since prepare_src * may return send_frag instead of put_frag */ frag->sr_desc.opcode = IBV_WR_RDMA_WRITE; - if(ibv_post_send(endpoint->qps[qp].qp->lcl_qp, &frag->sr_desc, &bad_wr)) + if(ibv_post_send(ep->qps[qp].qp->lcl_qp, &frag->sr_desc, &bad_wr)) return OMPI_ERROR; return OMPI_SUCCESS; @@ -967,8 +978,8 @@ int mca_btl_openib_put( mca_btl_base_module_t* btl, * RDMA READ remote buffer to local buffer address. */ -int mca_btl_openib_get( mca_btl_base_module_t* btl, - mca_btl_base_endpoint_t* endpoint, +int mca_btl_openib_get(mca_btl_base_module_t* btl, + mca_btl_base_endpoint_t* ep, mca_btl_base_descriptor_t* descriptor) { struct ibv_send_wr* bad_wr; @@ -979,30 +990,41 @@ int mca_btl_openib_get( mca_btl_base_module_t* btl, assert(openib_frag_type(frag) == MCA_BTL_OPENIB_FRAG_RECV_USER); + if(ep->endpoint_state != MCA_BTL_IB_CONNECTED) { + int rc; + OPAL_THREAD_LOCK(&ep->endpoint_lock); + rc = check_endpoint_state(ep, descriptor, &ep->pending_get_frags); + OPAL_THREAD_UNLOCK(&ep->endpoint_lock); + if(OMPI_ERR_TEMP_OUT_OF_RESOURCE == rc) + return OMPI_SUCCESS; + if(OMPI_SUCCESS != rc) + return rc; + } + if(MCA_BTL_NO_ORDER == qp) qp = mca_btl_openib_component.rdma_qp; /* check for a send wqe */ - if (qp_get_wqe(endpoint, qp) < 0) { - qp_put_wqe(endpoint, qp); - OPAL_THREAD_LOCK(&endpoint->endpoint_lock); - opal_list_append(&endpoint->pending_get_frags, (opal_list_item_t*)frag); - OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + if (qp_get_wqe(ep, qp) < 0) { + qp_put_wqe(ep, qp); + OPAL_THREAD_LOCK(&ep->endpoint_lock); + opal_list_append(&ep->pending_get_frags, (opal_list_item_t*)frag); + OPAL_THREAD_UNLOCK(&ep->endpoint_lock); return OMPI_SUCCESS; } /* check for a get token */ - if(OPAL_THREAD_ADD32(&endpoint->get_tokens,-1) < 0) { - qp_put_wqe(endpoint, qp); - OPAL_THREAD_ADD32(&endpoint->get_tokens,1); - OPAL_THREAD_LOCK(&endpoint->endpoint_lock); - opal_list_append(&endpoint->pending_get_frags, (opal_list_item_t*)frag); - OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + if(OPAL_THREAD_ADD32(&ep->get_tokens,-1) < 0) { + qp_put_wqe(ep, qp); + OPAL_THREAD_ADD32(&ep->get_tokens,1); + OPAL_THREAD_LOCK(&ep->endpoint_lock); + opal_list_append(&ep->pending_get_frags, (opal_list_item_t*)frag); + OPAL_THREAD_UNLOCK(&ep->endpoint_lock); return OMPI_SUCCESS; } #if OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if((endpoint->endpoint_proc->proc_ompi->proc_arch & OMPI_ARCH_ISBIGENDIAN) + if((ep->endpoint_proc->proc_ompi->proc_arch & OMPI_ARCH_ISBIGENDIAN) != (ompi_proc_local()->proc_arch & OMPI_ARCH_ISBIGENDIAN)) { rem_addr = opal_swap_bytes8(rem_addr); rkey = opal_swap_bytes4(rkey); @@ -1014,10 +1036,10 @@ int mca_btl_openib_get( mca_btl_base_module_t* btl, to_com_frag(frag)->sg_entry.addr = (uint64_t)descriptor->des_dst->seg_addr.pval; to_com_frag(frag)->sg_entry.length = descriptor->des_dst->seg_len; - to_com_frag(frag)->endpoint = endpoint; + to_com_frag(frag)->endpoint = ep; descriptor->order = qp; - if(ibv_post_send(endpoint->qps[qp].qp->lcl_qp, &frag->sr_desc, &bad_wr)) + if(ibv_post_send(ep->qps[qp].qp->lcl_qp, &frag->sr_desc, &bad_wr)) return OMPI_ERROR; return OMPI_SUCCESS; diff --git a/ompi/mca/btl/openib/btl_openib.h b/ompi/mca/btl/openib/btl_openib.h index 3b6197e496..7d08f4c15a 100644 --- a/ompi/mca/btl/openib/btl_openib.h +++ b/ompi/mca/btl/openib/btl_openib.h @@ -501,6 +501,9 @@ extern mca_btl_base_descriptor_t* mca_btl_openib_prepare_dst( size_t reserve, size_t* size); +extern void mca_btl_openib_frag_progress_pending_put_get( + struct mca_btl_base_endpoint_t*, const int); + /** * Fault Tolerance Event Notification Function * diff --git a/ompi/mca/btl/openib/btl_openib_component.c b/ompi/mca/btl/openib/btl_openib_component.c index e15fc0a6b8..a0b2e769e8 100644 --- a/ompi/mca/btl/openib/btl_openib_component.c +++ b/ompi/mca/btl/openib/btl_openib_component.c @@ -86,9 +86,6 @@ static int btl_openib_module_progress(mca_btl_openib_hca_t *hca); static void progress_pending_frags_pp(mca_btl_base_endpoint_t *, const int); static void progress_pending_frags_srq( mca_btl_openib_module_t* , const int); static void progress_pending_eager_rdma(mca_btl_base_endpoint_t*); -static void btl_openib_frag_progress_pending_put_get( - mca_btl_openib_module_t* openib_btl, mca_btl_base_endpoint_t *endpoint, - const int qp); static int openib_reg_mr(void *reg_data, void *base, size_t size, mca_mpool_base_registration_t *reg); @@ -1314,32 +1311,33 @@ static void progress_pending_frags_pp(mca_btl_base_endpoint_t *ep, const int qp) OPAL_THREAD_UNLOCK(&ep->endpoint_lock); } -static void btl_openib_frag_progress_pending_put_get( - mca_btl_openib_module_t* openib_btl, mca_btl_base_endpoint_t *endpoint, - const int qp) { +void mca_btl_openib_frag_progress_pending_put_get(mca_btl_base_endpoint_t *ep, + const int qp) +{ + mca_btl_openib_module_t* openib_btl = ep->endpoint_btl; opal_list_item_t *frag; - size_t i, len = opal_list_get_size(&endpoint->pending_get_frags); + size_t i, len = opal_list_get_size(&ep->pending_get_frags); - for(i = 0; i < len && endpoint->qps[qp].qp->sd_wqe > 0 && - endpoint->get_tokens > 0; i++) { - OPAL_THREAD_LOCK(&endpoint->endpoint_lock); - frag = opal_list_remove_first(&(endpoint->pending_get_frags)); - OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + for(i = 0; i < len && ep->qps[qp].qp->sd_wqe > 0 && ep->get_tokens > 0; i++) + { + OPAL_THREAD_LOCK(&ep->endpoint_lock); + frag = opal_list_remove_first(&(ep->pending_get_frags)); + OPAL_THREAD_UNLOCK(&ep->endpoint_lock); if(NULL == frag) break; - if(mca_btl_openib_get((mca_btl_base_module_t *)openib_btl, endpoint, + if(mca_btl_openib_get((mca_btl_base_module_t *)openib_btl, ep, &to_base_frag(frag)->base) == OMPI_ERR_OUT_OF_RESOURCE) break; } - len = opal_list_get_size(&endpoint->pending_put_frags); - for(i = 0; i < len && endpoint->qps[qp].qp->sd_wqe > 0; i++) { - OPAL_THREAD_LOCK(&endpoint->endpoint_lock); - frag = opal_list_remove_first(&(endpoint->pending_put_frags)); - OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + len = opal_list_get_size(&ep->pending_put_frags); + for(i = 0; i < len && ep->qps[qp].qp->sd_wqe > 0; i++) { + OPAL_THREAD_LOCK(&ep->endpoint_lock); + frag = opal_list_remove_first(&(ep->pending_put_frags)); + OPAL_THREAD_UNLOCK(&ep->endpoint_lock); if(NULL == frag) break; - if(mca_btl_openib_put((mca_btl_base_module_t*)openib_btl, endpoint, + if(mca_btl_openib_put((mca_btl_base_module_t*)openib_btl, ep, &to_base_frag(frag)->base) == OMPI_ERR_OUT_OF_RESOURCE) break; } @@ -1561,7 +1559,7 @@ static int btl_openib_module_progress(mca_btl_openib_hca_t* hca) } /* new wqe or/and get token available. Try to progress pending frags */ progress_pending_frags_wqe(endpoint->qps[qp].qp); - btl_openib_frag_progress_pending_put_get(openib_btl, endpoint, qp); + mca_btl_openib_frag_progress_pending_put_get(endpoint, qp); count++; break; diff --git a/ompi/mca/btl/openib/btl_openib_endpoint.c b/ompi/mca/btl/openib/btl_openib_endpoint.c index 6d87178fc3..f221916a3a 100644 --- a/ompi/mca/btl/openib/btl_openib_endpoint.c +++ b/ompi/mca/btl/openib/btl_openib_endpoint.c @@ -42,7 +42,6 @@ #include "btl_openib_endpoint.h" #include "btl_openib_proc.h" #include "btl_openib_frag.h" -#include "connect/base.h" static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint); static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint); @@ -498,75 +497,35 @@ void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t *endpoint) if(OMPI_ERROR == mca_btl_openib_endpoint_post_send(endpoint, frag)) BTL_ERROR(("Error posting send")); } + + /* if upper layer called put or get before connection moved to connected + * state then we restart them here */ + mca_btl_openib_frag_progress_pending_put_get(endpoint, + mca_btl_openib_component.rdma_qp); } /* * Attempt to send a fragment using a given endpoint. If the endpoint is not * connected, queue the fragment and start the connection as required. */ -int mca_btl_openib_endpoint_send(mca_btl_base_endpoint_t* endpoint, +int mca_btl_openib_endpoint_send(mca_btl_base_endpoint_t* ep, mca_btl_openib_send_frag_t* frag) { int rc; - bool call_progress = false; - - OPAL_THREAD_LOCK(&endpoint->endpoint_lock); - switch(endpoint->endpoint_state) { - case MCA_BTL_IB_CONNECTING: - BTL_VERBOSE(("Queing because state is connecting")); - - opal_list_append(&endpoint->pending_lazy_frags, - (opal_list_item_t *)frag); - call_progress = true; + OPAL_THREAD_LOCK(&ep->endpoint_lock); + rc = check_endpoint_state(ep, &to_base_frag(frag)->base, + &ep->pending_lazy_frags); + + if(OPAL_LIKELY(rc == OMPI_SUCCESS)) { + rc = mca_btl_openib_endpoint_post_send(ep, frag); + if(OMPI_ERR_OUT_OF_RESOURCE == rc) rc = OMPI_SUCCESS; - break; - - case MCA_BTL_IB_CONNECT_ACK: - case MCA_BTL_IB_WAITING_ACK: - BTL_VERBOSE(("Queuing because waiting for ack")); - - opal_list_append(&endpoint->pending_lazy_frags, - (opal_list_item_t *)frag); - call_progress = true; - rc = OMPI_SUCCESS; - break; - - case MCA_BTL_IB_CLOSED: - - BTL_VERBOSE(("Connection to endpoint closed ... connecting ...")); - opal_list_append(&endpoint->pending_lazy_frags, - (opal_list_item_t *)frag); - rc = ompi_btl_openib_connect.bcf_start_connect(endpoint); - /* - * As long as we expect a message from the peer (in order - * to setup the connection) let the event engine pool the - * OOB events. Note: we increment it once peer active - * connection. - */ - opal_progress_event_users_increment(); - call_progress = true; - break; - - case MCA_BTL_IB_FAILED: - - rc = OMPI_ERR_UNREACH; - break; - - case MCA_BTL_IB_CONNECTED: - BTL_VERBOSE(("Send to : %d, len : %lu, frag : %p", - endpoint->endpoint_proc->proc_guid.vpid, - frag->sg_entry.length, frag)); - rc = mca_btl_openib_endpoint_post_send(endpoint, frag); - if(rc == OMPI_ERR_OUT_OF_RESOURCE ) - rc = OMPI_SUCCESS; - break; - default: - rc = OMPI_ERR_UNREACH; - break; + } else if(OMPI_ERR_TEMP_OUT_OF_RESOURCE == rc) { + rc = OMPI_SUCCESS; } - OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); - if(call_progress) opal_progress(); + OPAL_THREAD_UNLOCK(&ep->endpoint_lock); + return rc; } diff --git a/ompi/mca/btl/openib/btl_openib_endpoint.h b/ompi/mca/btl/openib/btl_openib_endpoint.h index 96bfbdeeb6..8cfb25c6e3 100644 --- a/ompi/mca/btl/openib/btl_openib_endpoint.h +++ b/ompi/mca/btl/openib/btl_openib_endpoint.h @@ -33,6 +33,7 @@ #include #include #include "ompi/mca/btl/base/btl_base_error.h" +#include "connect/base.h" BEGIN_C_DECLS @@ -325,6 +326,38 @@ try_send: mca_btl_openib_endpoint_send_credits(ep, qp); } +static inline int check_endpoint_state(mca_btl_openib_endpoint_t *ep, + mca_btl_base_descriptor_t *des, opal_list_t *pending_list) +{ + int rc = OMPI_ERR_TEMP_OUT_OF_RESOURCE; + + switch(ep->endpoint_state) { + case MCA_BTL_IB_CLOSED: + rc = ompi_btl_openib_connect.bcf_start_connect(ep); + if(rc == OMPI_SUCCESS) + rc = OMPI_ERR_TEMP_OUT_OF_RESOURCE; + /* + * As long as we expect a message from the peer (in order + * to setup the connection) let the event engine pool the + * OOB events. Note: we increment it once peer active + * connection. + */ + opal_progress_event_users_increment(); + /* fall through */ + default: + opal_list_append(pending_list, (opal_list_item_t *)des); + break; + case MCA_BTL_IB_FAILED: + rc = OMPI_ERR_UNREACH; + break; + case MCA_BTL_IB_CONNECTED: + rc = OMPI_SUCCESS; + break; + } + + return rc; +} + END_C_DECLS #endif diff --git a/ompi/mca/btl/openib/connect/connect.h b/ompi/mca/btl/openib/connect/connect.h index 5a5f523369..e0fc8945fe 100644 --- a/ompi/mca/btl/openib/connect/connect.h +++ b/ompi/mca/btl/openib/connect/connect.h @@ -66,8 +66,6 @@ #ifndef BTL_OPENIB_CONNECT_H #define BTL_OPENIB_CONNECT_H -#include "btl_openib_endpoint.h" - BEGIN_C_DECLS /** @@ -85,7 +83,7 @@ typedef int (*ompi_btl_openib_connect_base_func_init_t)(void); * Function to initiate a connection to a remote process */ typedef int (*ompi_btl_openib_connect_base_func_start_connect_t) - (mca_btl_base_endpoint_t *e); + (struct mca_btl_base_endpoint_t *e); /** * Function to finalize the connection functions