1
1

Fixing race condition between main thread and async event thread

during openib finalization.

This commit was SVN r18895.
Этот коммит содержится в:
Pavel Shamis 2008-07-13 16:21:49 +00:00
родитель 9827f8ed91
Коммит 12379e7f3e
5 изменённых файлов: 54 добавлений и 0 удалений

Просмотреть файл

@ -1015,6 +1015,8 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
} }
close(mca_btl_openib_component.async_pipe[0]); close(mca_btl_openib_component.async_pipe[0]);
close(mca_btl_openib_component.async_pipe[1]); close(mca_btl_openib_component.async_pipe[1]);
close(mca_btl_openib_component.async_comp_pipe[0]);
close(mca_btl_openib_component.async_comp_pipe[1]);
} }
#endif #endif

Просмотреть файл

@ -196,6 +196,7 @@ struct mca_btl_openib_component_t {
#if OMPI_HAVE_THREADS #if OMPI_HAVE_THREADS
int32_t fatal_counter; /**< Counts number on fatal events that we got on all hcas */ int32_t fatal_counter; /**< Counts number on fatal events that we got on all hcas */
int async_pipe[2]; /**< Pipe for comunication with async event thread */ int async_pipe[2]; /**< Pipe for comunication with async event thread */
int async_comp_pipe[2]; /**< Pipe for async thread comunication with main thread */
pthread_t async_thread; /**< Async thread that will handle fatal errors */ pthread_t async_thread; /**< Async thread that will handle fatal errors */
uint32_t use_async_event_thread; /**< Use the async event handler */ uint32_t use_async_event_thread; /**< Use the async event handler */
#endif #endif

Просмотреть файл

@ -42,6 +42,7 @@ static int btl_openib_async_poll_init(struct mca_btl_openib_async_poll *hcas_pol
static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *hcas_poll); static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *hcas_poll);
static int btl_openib_async_hcah(struct mca_btl_openib_async_poll *hcas_poll, int index); static int btl_openib_async_hcah(struct mca_btl_openib_async_poll *hcas_poll, int index);
static const char *openib_event_to_str (enum ibv_event_type event); static const char *openib_event_to_str (enum ibv_event_type event);
static int send_command_comp(int in);
/* Function converts event to string (name) /* Function converts event to string (name)
* Open Fabris don't have function that do this job :( * Open Fabris don't have function that do this job :(
@ -139,6 +140,16 @@ static int btl_openib_async_poll_init(struct mca_btl_openib_async_poll *hcas_pol
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/* Send command completion to main thread */
static int send_command_comp(int in)
{
if (write(mca_btl_openib_component.async_comp_pipe[1], &in, sizeof(int)) < 0) {
BTL_ERROR(("Write failed [%d]",errno));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/* Function handle async thread commands */ /* Function handle async thread commands */
static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *hcas_poll) static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *hcas_poll)
{ {
@ -176,6 +187,9 @@ static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *hcas_poll
hcas_poll->async_pollfd[hcas_poll->active_poll_size].events = POLLIN; hcas_poll->async_pollfd[hcas_poll->active_poll_size].events = POLLIN;
hcas_poll->async_pollfd[hcas_poll->active_poll_size].revents = 0; hcas_poll->async_pollfd[hcas_poll->active_poll_size].revents = 0;
hcas_poll->active_poll_size++; hcas_poll->active_poll_size++;
if (OMPI_SUCCESS != send_command_comp(fd)) {
return OMPI_ERROR;
}
} else if (fd < 0) { } else if (fd < 0) {
bool fd_found = false; bool fd_found = false;
/* Removing HCA from poll */ /* Removing HCA from poll */
@ -200,6 +214,9 @@ static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *hcas_poll
} }
} }
hcas_poll->active_poll_size--; hcas_poll->active_poll_size--;
if (OMPI_SUCCESS != send_command_comp(-(fd))) {
return OMPI_ERROR;
}
} else { } else {
/* Got 0 - command to close the thread */ /* Got 0 - command to close the thread */
BTL_VERBOSE(("Async event thread exit")); BTL_VERBOSE(("Async event thread exit"));
@ -373,6 +390,22 @@ void* btl_openib_async_thread(void * async)
return PTHREAD_CANCELED; return PTHREAD_CANCELED;
} }
int btl_openib_async_command_done(int exp)
{
int comp;
if (read(mca_btl_openib_component.async_comp_pipe[0], &comp,
sizeof(int)) < 0){
BTL_ERROR(("Failed to read from pipe"));
return OMPI_ERROR;
}
if (exp != comp){
BTL_ERROR(("Get wrong completion on async command. Waiting for %d and got %d",
exp, comp));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
static void apm_update_attr(struct ibv_qp_attr *attr, enum ibv_qp_attr_mask *mask) static void apm_update_attr(struct ibv_qp_attr *attr, enum ibv_qp_attr_mask *mask)
{ {
*mask = IBV_QP_ALT_PATH|IBV_QP_PATH_MIG_STATE; *mask = IBV_QP_ALT_PATH|IBV_QP_PATH_MIG_STATE;

Просмотреть файл

@ -15,6 +15,7 @@
void* btl_openib_async_thread(void *one_hca); void* btl_openib_async_thread(void *one_hca);
void mca_btl_openib_load_apm(struct ibv_qp *qp, mca_btl_openib_endpoint_t *ep); void mca_btl_openib_load_apm(struct ibv_qp *qp, mca_btl_openib_endpoint_t *ep);
int btl_openib_async_command_done(int exp);
#if HAVE_XRC #if HAVE_XRC
void mca_btl_openib_load_apm_xrc_rcv(uint32_t qp_num, mca_btl_openib_endpoint_t *ep); void mca_btl_openib_load_apm_xrc_rcv(uint32_t qp_num, mca_btl_openib_endpoint_t *ep);
#endif #endif

Просмотреть файл

@ -474,6 +474,12 @@ static int start_async_event_thread(void)
return OMPI_ERROR; return OMPI_ERROR;
} }
if(pipe(mca_btl_openib_component.async_comp_pipe)) {
BTL_ERROR(("Failed to create comp pipe for communication with "
"main thread"));
return OMPI_ERROR;
}
/* Starting async event thread for the component */ /* Starting async event thread for the component */
if(pthread_create(&mca_btl_openib_component.async_thread, NULL, if(pthread_create(&mca_btl_openib_component.async_thread, NULL,
(void*(*)(void*))btl_openib_async_thread, NULL)) { (void*(*)(void*))btl_openib_async_thread, NULL)) {
@ -726,6 +732,8 @@ static void hca_construct(mca_btl_openib_hca_t *hca)
#if OMPI_HAVE_THREADS #if OMPI_HAVE_THREADS
mca_btl_openib_component.async_pipe[0] = mca_btl_openib_component.async_pipe[0] =
mca_btl_openib_component.async_pipe[1] = -1; mca_btl_openib_component.async_pipe[1] = -1;
mca_btl_openib_component.async_comp_pipe[0] =
mca_btl_openib_component.async_comp_pipe[1] = -1;
#endif #endif
OBJ_CONSTRUCT(&hca->hca_lock, opal_mutex_t); OBJ_CONSTRUCT(&hca->hca_lock, opal_mutex_t);
OBJ_CONSTRUCT(&hca->send_free_control, ompi_free_list_t); OBJ_CONSTRUCT(&hca->send_free_control, ompi_free_list_t);
@ -761,6 +769,10 @@ static void hca_destruct(mca_btl_openib_hca_t *hca)
BTL_ERROR(("Failed to write to pipe")); BTL_ERROR(("Failed to write to pipe"));
goto hca_error; goto hca_error;
} }
/* wait for ok from thread */
if (OMPI_SUCCESS != btl_openib_async_command_done(hca_to_remove)){
goto hca_error;
}
} }
#endif #endif
@ -854,6 +866,11 @@ static int prepare_hca_for_use(mca_btl_openib_hca_t *hca)
BTL_ERROR(("Failed to write to pipe [%d]",errno)); BTL_ERROR(("Failed to write to pipe [%d]",errno));
return OMPI_ERROR; return OMPI_ERROR;
} }
/* wait for ok from thread */
if (OMPI_SUCCESS !=
btl_openib_async_command_done(hca->ib_dev_context->async_fd)){
return OMPI_ERROR;
}
} }
#if OMPI_ENABLE_PROGRESS_THREADS == 1 #if OMPI_ENABLE_PROGRESS_THREADS == 1
/* Prepare data for thread, but not starting it */ /* Prepare data for thread, but not starting it */