Adding progress thread support to OpenIB BTL.
Reviewed by Gleb. This commit was SVN r12411.
Этот коммит содержится в:
родитель
f4e81176fe
Коммит
566667ac61
@ -144,10 +144,6 @@ AC_DEFUN([OMPI_CHECK_OPENIB],[
|
||||
LIBS="$ompi_check_openib_$1_save_LIBS"],
|
||||
[ompi_check_openib_happy="no"])
|
||||
|
||||
AS_IF([test "$ompi_check_openib_happy" = "yes" -a "$enable_progress_threads" = "yes"],
|
||||
[AC_MSG_WARN([OpenIB driver does not currently support progress threads. Disabling BTL.])
|
||||
ompi_check_openib_happy="no"])
|
||||
|
||||
AS_IF([test "$ompi_check_openib_happy" = "yes"],
|
||||
[$2],
|
||||
[AS_IF([test ! -z "$with_openib" -a "$with_openib" != "no"],
|
||||
|
@ -618,6 +618,14 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
|
||||
}
|
||||
#endif
|
||||
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
if(openib_btl->hca->progress) {
|
||||
openib_btl->hca->progress = false;
|
||||
if (pthread_cancel(openib_btl->hca->thread.t_handle))
|
||||
BTL_ERROR(("Failed to cancel OpenIB progress thread"));
|
||||
opal_thread_join(&openib_btl->hca->thread, NULL);
|
||||
}
|
||||
#endif
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -763,6 +771,8 @@ int mca_btl_openib_get( mca_btl_base_module_t* btl,
|
||||
*/
|
||||
int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* Allocate Protection Domain */
|
||||
openib_btl->poll_cq = false;
|
||||
|
||||
@ -796,6 +806,17 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl)
|
||||
}
|
||||
|
||||
/* Create the low and high priority queue pairs */
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
#if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3
|
||||
openib_btl->ib_cq[BTL_OPENIB_LP_QP] =
|
||||
ibv_create_cq(openib_btl->hca->ib_dev_context,
|
||||
mca_btl_openib_component.ib_cq_size, openib_btl->hca->ib_channel);
|
||||
#else
|
||||
openib_btl->ib_cq[BTL_OPENIB_LP_QP] =
|
||||
ibv_create_cq(openib_btl->hca->ib_dev_context,
|
||||
mca_btl_openib_component.ib_cq_size, openib_btl, openib_btl->hca->ib_channel, 0);
|
||||
#endif
|
||||
#else /* OMPI_ENABLE_PROGRESS_THREADS DISABLED */
|
||||
#if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3
|
||||
openib_btl->ib_cq[BTL_OPENIB_LP_QP] =
|
||||
ibv_create_cq(openib_btl->hca->ib_dev_context,
|
||||
@ -805,6 +826,7 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl)
|
||||
ibv_create_cq(openib_btl->hca->ib_dev_context,
|
||||
mca_btl_openib_component.ib_cq_size, NULL, NULL, 0);
|
||||
#endif
|
||||
#endif /* OMPI_ENABLE_PROGRESS_THREADS */
|
||||
|
||||
if(NULL == openib_btl->ib_cq[BTL_OPENIB_LP_QP]) {
|
||||
BTL_ERROR(("error creating low priority cq for %s errno says %s\n",
|
||||
@ -813,6 +835,25 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl)
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
if(ibv_req_notify_cq(openib_btl->ib_cq[BTL_OPENIB_LP_QP], 0)) {
|
||||
BTL_ERROR(("error requesting low priority cq notification for %s"
|
||||
" errno says %s\n",
|
||||
ibv_get_device_name(openib_btl->hca->ib_dev),
|
||||
strerror(errno)));
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
#if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3
|
||||
openib_btl->ib_cq[BTL_OPENIB_HP_QP] =
|
||||
ibv_create_cq(openib_btl->hca->ib_dev_context,
|
||||
mca_btl_openib_component.ib_cq_size, openib_btl->hca->ib_channel);
|
||||
#else
|
||||
openib_btl->ib_cq[BTL_OPENIB_HP_QP] =
|
||||
ibv_create_cq(openib_btl->hca->ib_dev_context,
|
||||
mca_btl_openib_component.ib_cq_size, openib_btl, openib_btl->hca->ib_channel, 0);
|
||||
#endif
|
||||
#else /* OMPI_ENABLE_PROGRESS_THREADS DISABLED */
|
||||
#if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3
|
||||
openib_btl->ib_cq[BTL_OPENIB_HP_QP] =
|
||||
ibv_create_cq(openib_btl->hca->ib_dev_context,
|
||||
@ -822,6 +863,7 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl)
|
||||
ibv_create_cq(openib_btl->hca->ib_dev_context,
|
||||
mca_btl_openib_component.ib_cq_size, NULL, NULL, 0);
|
||||
#endif
|
||||
#endif /* OMPI_ENABLE_PROGRESS_THREADS */
|
||||
|
||||
if(NULL == openib_btl->ib_cq[BTL_OPENIB_HP_QP]) {
|
||||
BTL_ERROR(("error creating high priority cq for %s errno says %s\n",
|
||||
@ -829,7 +871,25 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl)
|
||||
strerror(errno)));
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
if(ibv_req_notify_cq(openib_btl->ib_cq[BTL_OPENIB_HP_QP], 0)) {
|
||||
BTL_ERROR(("error requesting high priority cq notification for %s"
|
||||
" errno says %s\n",
|
||||
ibv_get_device_name(openib_btl->hca->ib_dev),
|
||||
strerror(errno)));
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
OPAL_THREAD_LOCK(&openib_btl->hca->hca_lock);
|
||||
if (!openib_btl->hca->progress){
|
||||
openib_btl->hca->progress = true;
|
||||
if(OPAL_SUCCESS != (rc = opal_thread_start(&openib_btl->hca->thread))) {
|
||||
BTL_ERROR(("Unable to create progress thread, retval=%d", rc));
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&openib_btl->hca->hca_lock);
|
||||
#endif
|
||||
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -151,6 +151,12 @@ typedef struct mca_btl_openib_port_info_t mca_btl_openib_port_info_t;
|
||||
|
||||
struct mca_btl_openib_hca_t {
|
||||
struct ibv_device *ib_dev; /* the ib device */
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
struct ibv_comp_channel *ib_channel; /* Channel event for the HCA */
|
||||
opal_thread_t thread; /* Progress thread */
|
||||
volatile bool progress; /* Progress status */
|
||||
#endif
|
||||
opal_mutex_t hca_lock; /* hca level lock */
|
||||
struct ibv_context *ib_dev_context;
|
||||
struct ibv_device_attr ib_dev_attr;
|
||||
struct ibv_pd *ib_pd;
|
||||
@ -212,6 +218,9 @@ struct mca_btl_openib_module_t {
|
||||
|
||||
extern mca_btl_openib_module_t mca_btl_openib_module;
|
||||
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
extern void* mca_btl_openib_progress_thread(opal_object_t*);
|
||||
#endif
|
||||
/**
|
||||
* Register a callback function that is called on receipt
|
||||
* of a fragment.
|
||||
|
@ -77,6 +77,7 @@ static int btl_openib_handle_incoming(mca_btl_openib_module_t *openib_btl,
|
||||
size_t byte_len, const int prio);
|
||||
static char* btl_openib_component_status_to_string(enum ibv_wc_status status);
|
||||
static int btl_openib_component_progress(void);
|
||||
static int btl_openib_module_progress(mca_btl_openib_module_t *openib_btl);
|
||||
static void btl_openib_frag_progress_pending(
|
||||
mca_btl_openib_module_t* openib_btl, mca_btl_base_endpoint_t *endpoint,
|
||||
const int prio);
|
||||
@ -314,6 +315,7 @@ static int init_one_hca(opal_list_t *btl_list, struct ibv_device* ib_dev)
|
||||
hca->ib_dev = ib_dev;
|
||||
hca->ib_dev_context = ibv_open_device(ib_dev);
|
||||
hca->btls = 0;
|
||||
OBJ_CONSTRUCT(&hca->hca_lock, opal_mutex_t);
|
||||
if(NULL == hca->ib_dev_context){
|
||||
BTL_ERROR(("error obtaining device context for %s errno says %s\n",
|
||||
ibv_get_device_name(ib_dev), strerror(errno)));
|
||||
@ -402,7 +404,18 @@ static int init_one_hca(opal_list_t *btl_list, struct ibv_device* ib_dev)
|
||||
goto dealloc_pd;
|
||||
}
|
||||
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
hca->ib_channel = ibv_create_comp_channel(hca->ib_dev_context);
|
||||
if (NULL == hca->ib_channel) {
|
||||
BTL_ERROR(("error creating channel for %s errno says %s\n",
|
||||
ibv_get_device_name(hca->ib_dev),
|
||||
strerror(errno)));
|
||||
goto mpool_destroy;
|
||||
}
|
||||
#endif
|
||||
|
||||
ret = OMPI_SUCCESS;
|
||||
|
||||
/* Note ports are 1 based hence j = 1 */
|
||||
for(i = 1; i <= hca->ib_dev_attr.phys_port_cnt; i++){
|
||||
struct ibv_port_attr ib_port_attr;
|
||||
@ -427,9 +440,21 @@ static int init_one_hca(opal_list_t *btl_list, struct ibv_device* ib_dev)
|
||||
}
|
||||
}
|
||||
|
||||
if (hca->btls != 0)
|
||||
if (hca->btls != 0){
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
/* Prepare data for thread, but not starting it */
|
||||
OBJ_CONSTRUCT(&hca->thread, opal_thread_t);
|
||||
hca->thread.t_run = mca_btl_openib_progress_thread;
|
||||
hca->thread.t_arg = hca;
|
||||
hca->progress = false;
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
ibv_destroy_comp_channel(hca->ib_channel);
|
||||
#endif
|
||||
mpool_destroy:
|
||||
mca_mpool_base_module_destroy(hca->mpool);
|
||||
dealloc_pd:
|
||||
ibv_dealloc_pd(hca->ib_pd);
|
||||
@ -470,15 +495,6 @@ btl_openib_component_init(int *num_btl_modules,
|
||||
*num_btl_modules = 0;
|
||||
num_devs = 0;
|
||||
|
||||
/* openib BTL does not currently support progress threads, so
|
||||
disable the component if they were requested */
|
||||
if (enable_progress_threads) {
|
||||
mca_btl_base_error_no_nics("OpenIB", "HCA");
|
||||
mca_btl_openib_component.ib_num_btls = 0;
|
||||
btl_openib_modex_send();
|
||||
return NULL;
|
||||
}
|
||||
|
||||
seedv[0] = orte_process_info.my_name->vpid;
|
||||
seedv[1] = opal_sys_timer_get_cycles();
|
||||
seedv[2] = opal_sys_timer_get_cycles();
|
||||
@ -974,6 +990,49 @@ static void btl_openib_frag_progress_pending(
|
||||
}
|
||||
}
|
||||
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
void* mca_btl_openib_progress_thread(opal_object_t* arg)
|
||||
{
|
||||
mca_btl_openib_module_t* openib_btl;
|
||||
opal_thread_t* thread = (opal_thread_t*)arg;
|
||||
mca_btl_openib_hca_t* hca = thread->t_arg;
|
||||
unsigned int ev_lp, ev_hp;
|
||||
struct ibv_cq *ev_cq;
|
||||
void *ev_ctx;
|
||||
int qp;
|
||||
|
||||
/* This thread enter in a cancel enabled state */
|
||||
pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL );
|
||||
pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL );
|
||||
|
||||
while (hca->progress) {
|
||||
while(opal_progress_threads()) {
|
||||
while(opal_progress_threads())
|
||||
sched_yield();
|
||||
usleep(100); /* give app a chance to re-enter library */
|
||||
}
|
||||
|
||||
if(ibv_get_cq_event(hca->ib_channel, &ev_cq, &ev_ctx))
|
||||
BTL_ERROR(("Failed to get CQ event with error %s",
|
||||
strerror(errno)));
|
||||
if(ibv_req_notify_cq(ev_cq, 0)) {
|
||||
BTL_ERROR(("Couldn't request CQ notification with error %s",
|
||||
strerror(errno)));
|
||||
}
|
||||
openib_btl=(mca_btl_openib_module_t*)ev_ctx;
|
||||
|
||||
if (ev_cq == openib_btl->ib_cq[BTL_OPENIB_LP_QP])
|
||||
ibv_ack_cq_events (openib_btl->ib_cq[BTL_OPENIB_LP_QP], 1);
|
||||
else
|
||||
ibv_ack_cq_events (openib_btl->ib_cq[BTL_OPENIB_HP_QP], 1);
|
||||
|
||||
while(btl_openib_module_progress(openib_btl));
|
||||
}
|
||||
|
||||
return PTHREAD_CANCELED;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* IB component progress.
|
||||
*/
|
||||
@ -1042,23 +1101,32 @@ static int btl_openib_component_progress(void)
|
||||
if(count) return count;
|
||||
|
||||
for(i = 0; i < mca_btl_openib_component.ib_num_btls; i++) {
|
||||
openib_btl = &mca_btl_openib_component.openib_btls[i];
|
||||
|
||||
/* We have two completion queues, one for "high" priority and one for
|
||||
* "low". Check high priority before low priority */
|
||||
for(qp = 0; qp < 2; qp++) {
|
||||
ne = ibv_poll_cq(openib_btl->ib_cq[qp], 1, &wc);
|
||||
return btl_openib_module_progress(&mca_btl_openib_component.openib_btls[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if(0 == ne)
|
||||
continue;
|
||||
static int btl_openib_module_progress(mca_btl_openib_module_t* openib_btl)
|
||||
{
|
||||
static char *qp_name[] = {"HP", "LP"};
|
||||
int i, j, c, qp;
|
||||
int count = 0,ne = 0, ret;
|
||||
mca_btl_openib_frag_t* frag;
|
||||
mca_btl_openib_endpoint_t* endpoint;
|
||||
struct ibv_wc wc;
|
||||
|
||||
if(ne < 0 || wc.status != IBV_WC_SUCCESS)
|
||||
goto error;
|
||||
for(qp = 0; qp < 2; qp++) {
|
||||
ne = ibv_poll_cq(openib_btl->ib_cq[qp], 1, &wc);
|
||||
|
||||
frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id;
|
||||
endpoint = frag->endpoint;
|
||||
/* Handle work completions */
|
||||
switch(wc.opcode) {
|
||||
if(0 == ne)
|
||||
continue;
|
||||
|
||||
if(ne < 0 || wc.status != IBV_WC_SUCCESS)
|
||||
goto error;
|
||||
|
||||
frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id;
|
||||
endpoint = frag->endpoint;
|
||||
/* Handle work completions */
|
||||
switch(wc.opcode) {
|
||||
case IBV_WC_RDMA_READ:
|
||||
assert(BTL_OPENIB_LP_QP == qp);
|
||||
OPAL_THREAD_ADD32(&endpoint->get_tokens, 1);
|
||||
@ -1124,7 +1192,6 @@ static int btl_openib_component_progress(void)
|
||||
openib_btl->error_cb(&openib_btl->super,
|
||||
MCA_BTL_ERROR_FLAGS_FATAL);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return count;
|
||||
@ -1139,8 +1206,8 @@ error:
|
||||
if(frag) {
|
||||
endpoint = (mca_btl_openib_endpoint_t*) frag->endpoint;
|
||||
if(endpoint &&
|
||||
endpoint->endpoint_proc &&
|
||||
endpoint->endpoint_proc->proc_ompi) {
|
||||
endpoint->endpoint_proc &&
|
||||
endpoint->endpoint_proc->proc_ompi) {
|
||||
remote_proc = endpoint->endpoint_proc->proc_ompi;
|
||||
}
|
||||
}
|
||||
@ -1149,7 +1216,7 @@ error:
|
||||
"status number %d for wr_id %llu opcode %d",
|
||||
qp_name[qp],
|
||||
btl_openib_component_status_to_string(wc.status),
|
||||
wc.status, wc.wr_id, wc.opcode));
|
||||
wc.status, wc.wr_id, wc.opcode));
|
||||
if(wc.status == IBV_WC_RETRY_EXC_ERR) {
|
||||
opal_show_help("help-mpi-btl-openib.txt",
|
||||
"btl_openib:retry-exceeded", true);
|
||||
|
@ -297,6 +297,10 @@ int btl_openib_register_mca_params(void)
|
||||
CHECK(reg_int("use_eager_rdma", "Use RDMA for eager messages ",
|
||||
1, &ival, 0));
|
||||
mca_btl_openib_component.use_eager_rdma = (uint32_t) (ival != 0);
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
/* Fast rdma path isn't supported by PROGRESS_THREAD */
|
||||
mca_btl_openib_component.use_eager_rdma = 0;
|
||||
#endif
|
||||
|
||||
CHECK(reg_int("eager_rdma_threshold",
|
||||
"Use RDMA for short messages after this number of "
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user