1
1

Fix async thread creation and destruction. Create async thread only when it is

needed instead of creating it and then canceling if it is not needed. Change
error handling during finalize so that it will not skip async thread
destruction. Otherwise async thread may segfault during openib module unloading.

This commit was SVN r16782.
Этот коммит содержится в:
Gleb Natapov 2007-11-28 07:14:34 +00:00
родитель 5463eb892c
Коммит b123696d57
2 изменённых файлов: 63 добавлений и 70 удалений

Просмотреть файл

@ -729,10 +729,10 @@ static int mca_btl_finalize_hca(struct mca_btl_openib_hca_t *hca)
}
#endif
/* signaling to async_tread to stop poll for this hca */
if (mca_btl_openib_component.use_async_event_thread) {
hca_to_remove=-(hca->ib_dev_context->async_fd);
if (write(mca_btl_openib_component.async_pipe[1],
&hca_to_remove,sizeof(int))<0){
if(mca_btl_openib_component.use_async_event_thread) {
hca_to_remove = -(hca->ib_dev_context->async_fd);
if (write(mca_btl_openib_component.async_pipe[1], &hca_to_remove,
sizeof(int)) < 0){
BTL_ERROR(("Failed to write to pipe"));
return OMPI_ERROR;
}
@ -780,9 +780,7 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
mca_btl_openib_module_t* openib_btl;
mca_btl_openib_endpoint_t* endpoint;
int ep_index, rdma_index, i;
int qp;
/* return OMPI_SUCCESS; */
int qp, rc = OMPI_SUCCESS;
openib_btl = (mca_btl_openib_module_t*) btl;
@ -796,6 +794,7 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
}
}
}
mca_btl_openib_component.ib_num_btls--;
/* Release eager RDMAs */
@ -824,7 +823,7 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
}
/* Release SRQ resources */
for(qp = 0; qp < mca_btl_openib_component.num_qps; qp++) {
if(BTL_OPENIB_QP_TYPE_SRQ(qp)){
if(BTL_OPENIB_QP_TYPE_SRQ(qp)){
MCA_BTL_OPENIB_CLEAN_PENDING_FRAGS(
&openib_btl->qps[qp].u.srq_qp.pending_frags[0]);
MCA_BTL_OPENIB_CLEAN_PENDING_FRAGS(
@ -832,26 +831,22 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
if (ibv_destroy_srq(openib_btl->qps[qp].u.srq_qp.srq)){
BTL_VERBOSE(("Failed to close SRQ %d", qp));
return OMPI_ERROR;
rc = OMPI_ERROR;
}
/* Destroy free lists */
OBJ_DESTRUCT(&openib_btl->qps[qp].u.srq_qp.pending_frags[0]);
OBJ_DESTRUCT(&openib_btl->qps[qp].u.srq_qp.pending_frags[1]);
OBJ_DESTRUCT(&openib_btl->qps[qp].send_free);
OBJ_DESTRUCT(&openib_btl->qps[qp].recv_free);
} else {
/* Destroy free lists */
OBJ_DESTRUCT(&openib_btl->qps[qp].send_free);
OBJ_DESTRUCT(&openib_btl->qps[qp].recv_free);
}
/* Destroy free lists */
OBJ_DESTRUCT(&openib_btl->qps[qp].send_free);
OBJ_DESTRUCT(&openib_btl->qps[qp].recv_free);
}
OBJ_DESTRUCT(&openib_btl->send_free_control);
OBJ_DESTRUCT(&openib_btl->send_user_free);
OBJ_DESTRUCT(&openib_btl->recv_user_free);
/* Release pending lists */
if (!(--openib_btl->hca->btls)) {
/* All btls for the HCA were closed
@ -859,31 +854,37 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
*/
if (OMPI_SUCCESS != mca_btl_finalize_hca(openib_btl->hca)) {
BTL_VERBOSE(("Failed to close HCA"));
return OMPI_ERROR;
rc = OMPI_ERROR;
}
}
#if OMPI_HAVE_THREADS
if (mca_btl_openib_component.use_async_event_thread &&
! mca_btl_openib_component.ib_num_btls) {
0 == mca_btl_openib_component.ib_num_btls &&
mca_btl_openib_component.async_thread != 0) {
/* signaling to async_tread to stop */
int async_command=0;
if (write(mca_btl_openib_component.async_pipe[1],
&async_command,sizeof(int))<0){
BTL_ERROR(("Failed to write to pipe"));
return OMPI_ERROR;
}
if (pthread_join(mca_btl_openib_component.async_thread, NULL)) {
BTL_ERROR(("Failed to stop OpenIB async event thread"));
return OMPI_ERROR;
if(write(mca_btl_openib_component.async_pipe[1], &async_command,
sizeof(int)) < 0) {
BTL_ERROR(("Failed to communicate with async event thread"));
rc = OMPI_ERROR;
} else {
if(pthread_join(mca_btl_openib_component.async_thread, NULL)) {
BTL_ERROR(("Failed to stop OpenIB async event thread"));
rc = OMPI_ERROR;
}
}
close(mca_btl_openib_component.async_pipe[0]);
close(mca_btl_openib_component.async_pipe[1]);
}
#endif
OBJ_DESTRUCT(&openib_btl->ib_lock);
free(openib_btl);
BTL_VERBOSE(("Success in closing BTL resources"));
return OMPI_SUCCESS;
return rc;
}
/*

Просмотреть файл

@ -297,6 +297,30 @@ static inline int param_register_int(const char* param_name, int default_value)
return param_value;
}
#if OMPI_HAVE_THREADS
static int start_async_event_thread(void)
{
/* Set the fatal counter to zero */
mca_btl_openib_component.fatal_counter = 0;
/* Create pipe for communication with async event thread */
if(pipe(mca_btl_openib_component.async_pipe)) {
BTL_ERROR(("Failed to create pipe for communication with "
"async event thread"));
return OMPI_ERROR;
}
/* Starting async event thread for the component */
if(pthread_create(&mca_btl_openib_component.async_thread, NULL,
(void*(*)(void*))btl_openib_async_thread, NULL)) {
BTL_ERROR(("Failed to create async event thread"));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
#endif
static int init_one_port(opal_list_t *btl_list, mca_btl_openib_hca_t *hca,
uint8_t port_num, uint16_t pkey_index,
struct ibv_port_attr *ib_port_attr)
@ -627,10 +651,14 @@ static int init_one_hca(opal_list_t *btl_list, struct ibv_device* ib_dev)
if (hca->btls > 0) {
#if OMPI_HAVE_THREADS
if (mca_btl_openib_component.use_async_event_thread) {
if(0 == mca_btl_openib_component.async_thread) {
/* async thread is not yet started, so start it here */
if(start_async_event_thread() != OMPI_SUCCESS)
goto comp_channel;
}
hca->got_fatal_event = false;
if (write(mca_btl_openib_component.async_pipe[1],
&hca->ib_dev_context->async_fd,
sizeof(int))<0){
&hca->ib_dev_context->async_fd, sizeof(int))<0){
BTL_ERROR(("Failed to write to pipe [%d]",errno));
goto comp_channel;
}
@ -850,17 +878,6 @@ btl_openib_component_init(int *num_btl_modules,
if (OMPI_SUCCESS != (ret = ompi_btl_openib_ini_init())) {
goto no_btls;
}
#if OMPI_HAVE_THREADS
/* Set the fatal counter to zero */
mca_btl_openib_component.fatal_counter = 0;
/* Create pipe for comunication with async event thread */
if (mca_btl_openib_component.use_async_event_thread) {
if (pipe (mca_btl_openib_component.async_pipe)) {
BTL_ERROR(("Failed to create pipe for comunication with async event thread"));
return NULL;
}
}
#endif
/* If we want fork support, try to enable it */
#ifdef HAVE_IBV_FORK_INIT
@ -948,24 +965,14 @@ btl_openib_component_init(int *num_btl_modules,
OBJ_CONSTRUCT(&btl_list, opal_list_t);
OBJ_CONSTRUCT(&mca_btl_openib_component.ib_lock, opal_mutex_t);
for (i = 0; i < num_devs &&
(-1 == mca_btl_openib_component.ib_max_btls ||
mca_btl_openib_component.ib_num_btls <
mca_btl_openib_component.ib_max_btls); i++){
#if OMPI_HAVE_THREADS
if (mca_btl_openib_component.use_async_event_thread &&
0 == i) {
/* Starting async event thread for the component */
if (pthread_create(&mca_btl_openib_component.async_thread,NULL,
(void*(*)(void*))btl_openib_async_thread,NULL)) {
BTL_ERROR(("Failed to create async event thread for openib"));
return NULL;
}
}
mca_btl_openib_component.async_thread = 0;
#endif
if (OMPI_SUCCESS != (ret = init_one_hca(&btl_list, ib_devs[i]))) {
for(i = 0; i < num_devs && (-1 == mca_btl_openib_component.ib_max_btls ||
mca_btl_openib_component.ib_num_btls <
mca_btl_openib_component.ib_max_btls); i++) {
if(OMPI_SUCCESS != (ret = init_one_hca(&btl_list, ib_devs[i])))
break;
}
}
if(ret != OMPI_SUCCESS) {
@ -989,21 +996,6 @@ btl_openib_component_init(int *num_btl_modules,
}
if(0 == mca_btl_openib_component.ib_num_btls) {
#if OMPI_HAVE_THREADS
if (mca_btl_openib_component.use_async_event_thread) {
int async_command = 0;
/* signaling to async_tread to stop poll for this hca*/
if (write(mca_btl_openib_component.async_pipe[1],
&async_command,sizeof(int))<0){
BTL_ERROR(("Failed to write to pipe"));
return NULL;
}
if (pthread_join(mca_btl_openib_component.async_thread, NULL)) {
BTL_ERROR(("Failed to stop OpenIB async event thread"));
return NULL;
}
}
#endif
opal_show_help("help-mpi-btl-openib.txt",
"no active ports found", true, orte_system_info.nodename);
return NULL;