From 566667ac6195a6c7761577a49cfb02735516ae97 Mon Sep 17 00:00:00 2001 From: Pavel Shamis Date: Thu, 2 Nov 2006 16:15:21 +0000 Subject: [PATCH] Adding progress thread support to OpenIB BTL. Reviewed by Gleb. This commit was SVN r12411. --- config/ompi_check_openib.m4 | 4 - ompi/mca/btl/openib/btl_openib.c | 62 ++++++++++- ompi/mca/btl/openib/btl_openib.h | 9 ++ ompi/mca/btl/openib/btl_openib_component.c | 123 ++++++++++++++++----- ompi/mca/btl/openib/btl_openib_mca.c | 4 + 5 files changed, 169 insertions(+), 33 deletions(-) diff --git a/config/ompi_check_openib.m4 b/config/ompi_check_openib.m4 index 9e75f172e5..c7de70f6e3 100644 --- a/config/ompi_check_openib.m4 +++ b/config/ompi_check_openib.m4 @@ -144,10 +144,6 @@ AC_DEFUN([OMPI_CHECK_OPENIB],[ LIBS="$ompi_check_openib_$1_save_LIBS"], [ompi_check_openib_happy="no"]) - AS_IF([test "$ompi_check_openib_happy" = "yes" -a "$enable_progress_threads" = "yes"], - [AC_MSG_WARN([OpenIB driver does not currently support progress threads. Disabling BTL.]) - ompi_check_openib_happy="no"]) - AS_IF([test "$ompi_check_openib_happy" = "yes"], [$2], [AS_IF([test ! -z "$with_openib" -a "$with_openib" != "no"], diff --git a/ompi/mca/btl/openib/btl_openib.c b/ompi/mca/btl/openib/btl_openib.c index 61be9502e4..c4c4180e3b 100644 --- a/ompi/mca/btl/openib/btl_openib.c +++ b/ompi/mca/btl/openib/btl_openib.c @@ -618,6 +618,14 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl) } #endif +#if OMPI_ENABLE_PROGRESS_THREADS == 1 + if(openib_btl->hca->progress) { + openib_btl->hca->progress = false; + if (pthread_cancel(openib_btl->hca->thread.t_handle)) + BTL_ERROR(("Failed to cancel OpenIB progress thread")); + opal_thread_join(&openib_btl->hca->thread, NULL); + } +#endif return OMPI_SUCCESS; } @@ -763,6 +771,8 @@ int mca_btl_openib_get( mca_btl_base_module_t* btl, */ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl) { + int rc; + /* Allocate Protection Domain */ openib_btl->poll_cq = false; @@ -796,6 +806,17 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl) } /* Create the low and high priority queue pairs */ +#if OMPI_ENABLE_PROGRESS_THREADS == 1 +#if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3 + openib_btl->ib_cq[BTL_OPENIB_LP_QP] = + ibv_create_cq(openib_btl->hca->ib_dev_context, + mca_btl_openib_component.ib_cq_size, openib_btl->hca->ib_channel); +#else + openib_btl->ib_cq[BTL_OPENIB_LP_QP] = + ibv_create_cq(openib_btl->hca->ib_dev_context, + mca_btl_openib_component.ib_cq_size, openib_btl, openib_btl->hca->ib_channel, 0); +#endif +#else /* OMPI_ENABLE_PROGRESS_THREADS DISABLED */ #if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3 openib_btl->ib_cq[BTL_OPENIB_LP_QP] = ibv_create_cq(openib_btl->hca->ib_dev_context, @@ -805,6 +826,7 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl) ibv_create_cq(openib_btl->hca->ib_dev_context, mca_btl_openib_component.ib_cq_size, NULL, NULL, 0); #endif +#endif /* OMPI_ENABLE_PROGRESS_THREADS */ if(NULL == openib_btl->ib_cq[BTL_OPENIB_LP_QP]) { BTL_ERROR(("error creating low priority cq for %s errno says %s\n", @@ -813,6 +835,25 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl) return OMPI_ERROR; } +#if OMPI_ENABLE_PROGRESS_THREADS == 1 + if(ibv_req_notify_cq(openib_btl->ib_cq[BTL_OPENIB_LP_QP], 0)) { + BTL_ERROR(("error requesting low priority cq notification for %s" + " errno says %s\n", + ibv_get_device_name(openib_btl->hca->ib_dev), + strerror(errno))); + return OMPI_ERROR; + } + +#if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3 + openib_btl->ib_cq[BTL_OPENIB_HP_QP] = + ibv_create_cq(openib_btl->hca->ib_dev_context, + mca_btl_openib_component.ib_cq_size, openib_btl->hca->ib_channel); +#else + openib_btl->ib_cq[BTL_OPENIB_HP_QP] = + ibv_create_cq(openib_btl->hca->ib_dev_context, + mca_btl_openib_component.ib_cq_size, openib_btl, openib_btl->hca->ib_channel, 0); +#endif +#else /* OMPI_ENABLE_PROGRESS_THREADS DISABLED */ #if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3 openib_btl->ib_cq[BTL_OPENIB_HP_QP] = ibv_create_cq(openib_btl->hca->ib_dev_context, @@ -822,6 +863,7 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl) ibv_create_cq(openib_btl->hca->ib_dev_context, mca_btl_openib_component.ib_cq_size, NULL, NULL, 0); #endif +#endif /* OMPI_ENABLE_PROGRESS_THREADS */ if(NULL == openib_btl->ib_cq[BTL_OPENIB_HP_QP]) { BTL_ERROR(("error creating high priority cq for %s errno says %s\n", @@ -829,7 +871,25 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl) strerror(errno))); return OMPI_ERROR; } + +#if OMPI_ENABLE_PROGRESS_THREADS == 1 + if(ibv_req_notify_cq(openib_btl->ib_cq[BTL_OPENIB_HP_QP], 0)) { + BTL_ERROR(("error requesting high priority cq notification for %s" + " errno says %s\n", + ibv_get_device_name(openib_btl->hca->ib_dev), + strerror(errno))); + return OMPI_ERROR; + } + OPAL_THREAD_LOCK(&openib_btl->hca->hca_lock); + if (!openib_btl->hca->progress){ + openib_btl->hca->progress = true; + if(OPAL_SUCCESS != (rc = opal_thread_start(&openib_btl->hca->thread))) { + BTL_ERROR(("Unable to create progress thread, retval=%d", rc)); + return rc; + } + } + OPAL_THREAD_UNLOCK(&openib_btl->hca->hca_lock); +#endif - return OMPI_SUCCESS; } diff --git a/ompi/mca/btl/openib/btl_openib.h b/ompi/mca/btl/openib/btl_openib.h index 05d1522981..d66e9b81e4 100644 --- a/ompi/mca/btl/openib/btl_openib.h +++ b/ompi/mca/btl/openib/btl_openib.h @@ -151,6 +151,12 @@ typedef struct mca_btl_openib_port_info_t mca_btl_openib_port_info_t; struct mca_btl_openib_hca_t { struct ibv_device *ib_dev; /* the ib device */ +#if OMPI_ENABLE_PROGRESS_THREADS == 1 + struct ibv_comp_channel *ib_channel; /* Channel event for the HCA */ + opal_thread_t thread; /* Progress thread */ + volatile bool progress; /* Progress status */ +#endif + opal_mutex_t hca_lock; /* hca level lock */ struct ibv_context *ib_dev_context; struct ibv_device_attr ib_dev_attr; struct ibv_pd *ib_pd; @@ -212,6 +218,9 @@ struct mca_btl_openib_module_t { extern mca_btl_openib_module_t mca_btl_openib_module; +#if OMPI_ENABLE_PROGRESS_THREADS == 1 +extern void* mca_btl_openib_progress_thread(opal_object_t*); +#endif /** * Register a callback function that is called on receipt * of a fragment. diff --git a/ompi/mca/btl/openib/btl_openib_component.c b/ompi/mca/btl/openib/btl_openib_component.c index 39d48f6cb5..7b0fee7c43 100644 --- a/ompi/mca/btl/openib/btl_openib_component.c +++ b/ompi/mca/btl/openib/btl_openib_component.c @@ -77,6 +77,7 @@ static int btl_openib_handle_incoming(mca_btl_openib_module_t *openib_btl, size_t byte_len, const int prio); static char* btl_openib_component_status_to_string(enum ibv_wc_status status); static int btl_openib_component_progress(void); +static int btl_openib_module_progress(mca_btl_openib_module_t *openib_btl); static void btl_openib_frag_progress_pending( mca_btl_openib_module_t* openib_btl, mca_btl_base_endpoint_t *endpoint, const int prio); @@ -314,6 +315,7 @@ static int init_one_hca(opal_list_t *btl_list, struct ibv_device* ib_dev) hca->ib_dev = ib_dev; hca->ib_dev_context = ibv_open_device(ib_dev); hca->btls = 0; + OBJ_CONSTRUCT(&hca->hca_lock, opal_mutex_t); if(NULL == hca->ib_dev_context){ BTL_ERROR(("error obtaining device context for %s errno says %s\n", ibv_get_device_name(ib_dev), strerror(errno))); @@ -402,7 +404,18 @@ static int init_one_hca(opal_list_t *btl_list, struct ibv_device* ib_dev) goto dealloc_pd; } +#if OMPI_ENABLE_PROGRESS_THREADS == 1 + hca->ib_channel = ibv_create_comp_channel(hca->ib_dev_context); + if (NULL == hca->ib_channel) { + BTL_ERROR(("error creating channel for %s errno says %s\n", + ibv_get_device_name(hca->ib_dev), + strerror(errno))); + goto mpool_destroy; + } +#endif + ret = OMPI_SUCCESS; + /* Note ports are 1 based hence j = 1 */ for(i = 1; i <= hca->ib_dev_attr.phys_port_cnt; i++){ struct ibv_port_attr ib_port_attr; @@ -427,9 +440,21 @@ static int init_one_hca(opal_list_t *btl_list, struct ibv_device* ib_dev) } } - if (hca->btls != 0) + if (hca->btls != 0){ +#if OMPI_ENABLE_PROGRESS_THREADS == 1 + /* Prepare data for thread, but not starting it */ + OBJ_CONSTRUCT(&hca->thread, opal_thread_t); + hca->thread.t_run = mca_btl_openib_progress_thread; + hca->thread.t_arg = hca; + hca->progress = false; +#endif return ret; + } +#if OMPI_ENABLE_PROGRESS_THREADS == 1 + ibv_destroy_comp_channel(hca->ib_channel); +#endif +mpool_destroy: mca_mpool_base_module_destroy(hca->mpool); dealloc_pd: ibv_dealloc_pd(hca->ib_pd); @@ -470,15 +495,6 @@ btl_openib_component_init(int *num_btl_modules, *num_btl_modules = 0; num_devs = 0; - /* openib BTL does not currently support progress threads, so - disable the component if they were requested */ - if (enable_progress_threads) { - mca_btl_base_error_no_nics("OpenIB", "HCA"); - mca_btl_openib_component.ib_num_btls = 0; - btl_openib_modex_send(); - return NULL; - } - seedv[0] = orte_process_info.my_name->vpid; seedv[1] = opal_sys_timer_get_cycles(); seedv[2] = opal_sys_timer_get_cycles(); @@ -974,6 +990,49 @@ static void btl_openib_frag_progress_pending( } } +#if OMPI_ENABLE_PROGRESS_THREADS == 1 +void* mca_btl_openib_progress_thread(opal_object_t* arg) +{ + mca_btl_openib_module_t* openib_btl; + opal_thread_t* thread = (opal_thread_t*)arg; + mca_btl_openib_hca_t* hca = thread->t_arg; + unsigned int ev_lp, ev_hp; + struct ibv_cq *ev_cq; + void *ev_ctx; + int qp; + + /* This thread enter in a cancel enabled state */ + pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL ); + pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL ); + + while (hca->progress) { + while(opal_progress_threads()) { + while(opal_progress_threads()) + sched_yield(); + usleep(100); /* give app a chance to re-enter library */ + } + + if(ibv_get_cq_event(hca->ib_channel, &ev_cq, &ev_ctx)) + BTL_ERROR(("Failed to get CQ event with error %s", + strerror(errno))); + if(ibv_req_notify_cq(ev_cq, 0)) { + BTL_ERROR(("Couldn't request CQ notification with error %s", + strerror(errno))); + } + openib_btl=(mca_btl_openib_module_t*)ev_ctx; + + if (ev_cq == openib_btl->ib_cq[BTL_OPENIB_LP_QP]) + ibv_ack_cq_events (openib_btl->ib_cq[BTL_OPENIB_LP_QP], 1); + else + ibv_ack_cq_events (openib_btl->ib_cq[BTL_OPENIB_HP_QP], 1); + + while(btl_openib_module_progress(openib_btl)); + } + + return PTHREAD_CANCELED; +} +#endif + /* * IB component progress. */ @@ -1042,23 +1101,32 @@ static int btl_openib_component_progress(void) if(count) return count; for(i = 0; i < mca_btl_openib_component.ib_num_btls; i++) { - openib_btl = &mca_btl_openib_component.openib_btls[i]; - - /* We have two completion queues, one for "high" priority and one for - * "low". Check high priority before low priority */ - for(qp = 0; qp < 2; qp++) { - ne = ibv_poll_cq(openib_btl->ib_cq[qp], 1, &wc); + return btl_openib_module_progress(&mca_btl_openib_component.openib_btls[i]); + } +} - if(0 == ne) - continue; +static int btl_openib_module_progress(mca_btl_openib_module_t* openib_btl) +{ + static char *qp_name[] = {"HP", "LP"}; + int i, j, c, qp; + int count = 0,ne = 0, ret; + mca_btl_openib_frag_t* frag; + mca_btl_openib_endpoint_t* endpoint; + struct ibv_wc wc; - if(ne < 0 || wc.status != IBV_WC_SUCCESS) - goto error; + for(qp = 0; qp < 2; qp++) { + ne = ibv_poll_cq(openib_btl->ib_cq[qp], 1, &wc); - frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id; - endpoint = frag->endpoint; - /* Handle work completions */ - switch(wc.opcode) { + if(0 == ne) + continue; + + if(ne < 0 || wc.status != IBV_WC_SUCCESS) + goto error; + + frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id; + endpoint = frag->endpoint; + /* Handle work completions */ + switch(wc.opcode) { case IBV_WC_RDMA_READ: assert(BTL_OPENIB_LP_QP == qp); OPAL_THREAD_ADD32(&endpoint->get_tokens, 1); @@ -1124,7 +1192,6 @@ static int btl_openib_component_progress(void) openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL); break; - } } } return count; @@ -1139,8 +1206,8 @@ error: if(frag) { endpoint = (mca_btl_openib_endpoint_t*) frag->endpoint; if(endpoint && - endpoint->endpoint_proc && - endpoint->endpoint_proc->proc_ompi) { + endpoint->endpoint_proc && + endpoint->endpoint_proc->proc_ompi) { remote_proc = endpoint->endpoint_proc->proc_ompi; } } @@ -1149,7 +1216,7 @@ error: "status number %d for wr_id %llu opcode %d", qp_name[qp], btl_openib_component_status_to_string(wc.status), - wc.status, wc.wr_id, wc.opcode)); + wc.status, wc.wr_id, wc.opcode)); if(wc.status == IBV_WC_RETRY_EXC_ERR) { opal_show_help("help-mpi-btl-openib.txt", "btl_openib:retry-exceeded", true); diff --git a/ompi/mca/btl/openib/btl_openib_mca.c b/ompi/mca/btl/openib/btl_openib_mca.c index 5c6b45ad11..484c005e30 100644 --- a/ompi/mca/btl/openib/btl_openib_mca.c +++ b/ompi/mca/btl/openib/btl_openib_mca.c @@ -297,6 +297,10 @@ int btl_openib_register_mca_params(void) CHECK(reg_int("use_eager_rdma", "Use RDMA for eager messages ", 1, &ival, 0)); mca_btl_openib_component.use_eager_rdma = (uint32_t) (ival != 0); +#if OMPI_ENABLE_PROGRESS_THREADS == 1 + /* Fast rdma path isn't supported by PROGRESS_THREAD */ + mca_btl_openib_component.use_eager_rdma = 0; +#endif CHECK(reg_int("eager_rdma_threshold", "Use RDMA for short messages after this number of "