1
1

Adds synchronisation between main thread and service thread in

btl_openib_connect_udcm when notifying not to listen to an fd to ensure
that the main thread does not continue until the service thread has
processed the message

Adds ability to send message to openib async thread to tell it to
ignore the ERR state on a specific QP. Adds this call to udcm_module_finalize
so when we set the error state on the QP it doesn't cause the 
openib async thread to abort the mpi program prematurely

Fixes trac:3161

This commit was SVN r27064.

The following Trac tickets were found above:
  Ticket 3161 --> https://svn.open-mpi.org/trac/ompi/ticket/3161
Этот коммит содержится в:
Christopher Yeoh 2012-08-16 03:56:21 +00:00
родитель 7867330dcc
Коммит cc091f4979
4 изменённых файлов: 137 добавлений и 22 удалений

Просмотреть файл

@ -36,11 +36,19 @@ struct mca_btl_openib_async_poll {
};
typedef struct mca_btl_openib_async_poll mca_btl_openib_async_poll;
typedef struct {
opal_list_item_t super;
struct ibv_qp *qp;
} mca_btl_openib_qp_list;
OBJ_CLASS_INSTANCE(mca_btl_openib_qp_list, opal_list_item_t, NULL, NULL);
static int return_status = OMPI_ERROR;
static int btl_openib_async_poll_init(struct mca_btl_openib_async_poll *hcas_poll);
static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *hcas_poll);
static int btl_openib_async_deviceh(struct mca_btl_openib_async_poll *hcas_poll, int index);
static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *hcas_poll, opal_list_t *ignore_qp_err_list);
static int btl_openib_async_deviceh(struct mca_btl_openib_async_poll *hcas_poll, int index,
opal_list_t *ignore_qp_err_list);
static const char *openib_event_to_str (enum ibv_event_type event);
static int send_command_comp(int in);
@ -151,17 +159,21 @@ static int send_command_comp(int in)
}
/* Function handle async thread commands */
static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *devices_poll)
static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *devices_poll, opal_list_t *ignore_qp_err_list)
{
struct pollfd *async_pollfd_tmp;
mca_btl_openib_async_cmd_t cmd;
int fd,flags,j;
/* Got command from main thread */
if (read(devices_poll->async_pollfd[0].fd, &fd, sizeof(int)) < 0) {
if (read(devices_poll->async_pollfd[0].fd, &cmd, sizeof(mca_btl_openib_async_cmd_t)) < 0) {
BTL_ERROR(("Read failed [%d]",errno));
return OMPI_ERROR;
}
BTL_VERBOSE(("GOT event from -> %d",fd));
if (fd > 0) {
BTL_VERBOSE(("Got cmd %d", cmd.a_cmd));
if (OPENIB_ASYNC_CMD_FD_ADD == cmd.a_cmd) {
fd = cmd.fd;
BTL_VERBOSE(("Got fd %d", fd));
BTL_VERBOSE(("Adding device [%d] to async event poll[%d]",
fd, devices_poll->active_poll_size));
flags = fcntl(fd, F_GETFL);
@ -190,10 +202,13 @@ static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *devices_p
if (OMPI_SUCCESS != send_command_comp(fd)) {
return OMPI_ERROR;
}
} else if (fd < 0) {
} else if (OPENIB_ASYNC_CMD_FD_REMOVE == cmd.a_cmd) {
bool fd_found = false;
fd = cmd.fd;
BTL_VERBOSE(("Got fd %d", fd));
/* Removing device from poll */
fd = -(fd);
BTL_VERBOSE(("Removing device [%d] from async event poll [%d]",
fd, devices_poll->active_poll_size));
if (devices_poll->active_poll_size > 1) {
@ -214,14 +229,29 @@ static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *devices_p
}
}
devices_poll->active_poll_size--;
if (OMPI_SUCCESS != send_command_comp(-(fd))) {
if (OMPI_SUCCESS != send_command_comp(fd)) {
return OMPI_ERROR;
}
} else {
} else if (OPENIB_ASYNC_IGNORE_QP_ERR == cmd.a_cmd) {
mca_btl_openib_qp_list *new_qp;
new_qp = OBJ_NEW(mca_btl_openib_qp_list);
BTL_VERBOSE(("Ignore errors on QP %p", (void *)cmd.qp));
new_qp->qp = cmd.qp;
opal_list_append(ignore_qp_err_list, (opal_list_item_t *)new_qp);
send_command_comp(OPENIB_ASYNC_IGNORE_QP_ERR);
} else if (OPENIB_ASYNC_THREAD_EXIT == cmd.a_cmd) {
/* Got 0 - command to close the thread */
opal_list_item_t *item;
BTL_VERBOSE(("Async event thread exit"));
free(devices_poll->async_pollfd);
return_status = OMPI_SUCCESS;
while ((item = opal_list_remove_first(ignore_qp_err_list))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(ignore_qp_err_list);
pthread_exit(&return_status);
}
return OMPI_SUCCESS;
@ -285,7 +315,8 @@ srq_limit_event_exit:
}
/* Function handle async device events */
static int btl_openib_async_deviceh(struct mca_btl_openib_async_poll *devices_poll, int index)
static int btl_openib_async_deviceh(struct mca_btl_openib_async_poll *devices_poll, int index,
opal_list_t *ignore_qp_err_list)
{
int j;
mca_btl_openib_device_t *device = NULL;
@ -344,6 +375,29 @@ static int btl_openib_async_deviceh(struct mca_btl_openib_async_poll *devices_po
OPAL_THREAD_ADD32(&mca_btl_openib_component.error_counter, 1);
case IBV_EVENT_CQ_ERR:
case IBV_EVENT_QP_FATAL:
if (event_type == IBV_EVENT_QP_FATAL) {
opal_list_item_t *item;
mca_btl_openib_qp_list *qp_item;
bool in_ignore_list = false;
BTL_VERBOSE(("QP is in err state %p", (void *)event.element.qp));
/* look through ignore list */
for (item = opal_list_get_first(ignore_qp_err_list);
item != opal_list_get_end(ignore_qp_err_list);
item = opal_list_get_next(item)) {
qp_item = (mca_btl_openib_qp_list *)item;
if (qp_item->qp == event.element.qp) {
BTL_VERBOSE(("QP %p is in error ignore list",
(void *)event.element.qp));
in_ignore_list = true;
break;
}
}
if (in_ignore_list)
break;
}
case IBV_EVENT_QP_REQ_ERR:
case IBV_EVENT_QP_ACCESS_ERR:
case IBV_EVENT_PATH_MIG_ERR:
@ -409,6 +463,9 @@ void* btl_openib_async_thread(void * async)
int rc;
int i;
struct mca_btl_openib_async_poll devices_poll;
opal_list_t ignore_qp_err_list;
OBJ_CONSTRUCT(&ignore_qp_err_list, opal_list_t);
if (OMPI_SUCCESS != btl_openib_async_poll_init(&devices_poll)) {
BTL_ERROR(("Fatal error, stoping asynch event thread"));
@ -442,7 +499,8 @@ void* btl_openib_async_thread(void * async)
/* Processing our event */
if (0 == i) {
/* 0 poll we use for comunication with main thread */
if (OMPI_SUCCESS != btl_openib_async_commandh(&devices_poll)) {
if (OMPI_SUCCESS != btl_openib_async_commandh(&devices_poll,
&ignore_qp_err_list)) {
free(devices_poll.async_pollfd);
BTL_ERROR(("Failed to process async thread process. "
"Fatal error, stoping asynch event thread"));
@ -450,7 +508,8 @@ void* btl_openib_async_thread(void * async)
}
} else {
/* We get device event */
if (btl_openib_async_deviceh(&devices_poll, i)) {
if (btl_openib_async_deviceh(&devices_poll, i,
&ignore_qp_err_list)) {
free(devices_poll.async_pollfd);
BTL_ERROR(("Failed to process async thread process. "
"Fatal error, stoping asynch event thread"));

Просмотреть файл

@ -22,4 +22,20 @@ void mca_btl_openib_load_apm_xrc_rcv(uint32_t qp_num, mca_btl_openib_endpo
#define APM_ENABLED (0 != mca_btl_openib_component.apm_lmc || 0 != mca_btl_openib_component.apm_ports)
/*
* Command types for communicating with the async thread
*/
typedef enum {
OPENIB_ASYNC_CMD_FD_ADD,
OPENIB_ASYNC_CMD_FD_REMOVE,
OPENIB_ASYNC_IGNORE_QP_ERR,
OPENIB_ASYNC_THREAD_EXIT
} btl_openib_async_cmd_type_t;
typedef struct {
btl_openib_async_cmd_type_t a_cmd;
int fd;
struct ibv_qp *qp;
} mca_btl_openib_async_cmd_t;
#endif

Просмотреть файл

@ -236,9 +236,10 @@ static int btl_openib_component_close(void)
/* Tell the async thread to shutdown */
if (mca_btl_openib_component.use_async_event_thread &&
0 != mca_btl_openib_component.async_thread) {
int async_command = 0;
mca_btl_openib_async_cmd_t async_command;
async_command.a_cmd = OPENIB_ASYNC_THREAD_EXIT;
if (write(mca_btl_openib_component.async_pipe[1], &async_command,
sizeof(int)) < 0) {
sizeof(mca_btl_openib_async_cmd_t)) < 0) {
BTL_ERROR(("Failed to communicate with async event thread"));
rc = OMPI_ERROR;
} else {
@ -1009,15 +1010,16 @@ static void device_destruct(mca_btl_openib_device_t *device)
/* signaling to async_tread to stop poll for this device */
if (mca_btl_openib_component.use_async_event_thread &&
-1 != mca_btl_openib_component.async_pipe[1]) {
int device_to_remove;
device_to_remove = -(device->ib_dev_context->async_fd);
if (write(mca_btl_openib_component.async_pipe[1], &device_to_remove,
sizeof(int)) < 0){
mca_btl_openib_async_cmd_t async_command;
async_command.a_cmd = OPENIB_ASYNC_CMD_FD_REMOVE;
async_command.fd = device->ib_dev_context->async_fd;
if (write(mca_btl_openib_component.async_pipe[1], &async_command,
sizeof(mca_btl_openib_async_cmd_t)) < 0){
BTL_ERROR(("Failed to write to pipe"));
goto device_error;
}
/* wait for ok from thread */
if (OMPI_SUCCESS != btl_openib_async_command_done(device_to_remove)){
if (OMPI_SUCCESS != btl_openib_async_command_done(device->ib_dev_context->async_fd)){
goto device_error;
}
}
@ -1102,6 +1104,7 @@ static int prepare_device_for_use(mca_btl_openib_device_t *device)
#if OPAL_HAVE_THREADS
if(mca_btl_openib_component.use_async_event_thread) {
mca_btl_openib_async_cmd_t async_command;
if(0 == mca_btl_openib_component.async_thread) {
/* async thread is not yet started, so start it here */
if(start_async_event_thread() != OMPI_SUCCESS)
@ -1109,8 +1112,10 @@ static int prepare_device_for_use(mca_btl_openib_device_t *device)
}
device->got_fatal_event = false;
device->got_port_event = false;
async_command.a_cmd = OPENIB_ASYNC_CMD_FD_ADD;
async_command.fd = device->ib_dev_context->async_fd;
if (write(mca_btl_openib_component.async_pipe[1],
&device->ib_dev_context->async_fd, sizeof(int))<0){
&async_command, sizeof(mca_btl_openib_async_cmd_t))<0){
BTL_ERROR(("Failed to write to pipe [%d]",errno));
return OMPI_ERROR;
}

Просмотреть файл

@ -75,6 +75,7 @@
#include "btl_openib_endpoint.h"
#include "btl_openib_proc.h"
#include "btl_openib_fd.h"
#include "btl_openib_async.h"
#include "connect/connect.h"
#include "ompi/mca/mpool/grdma/mpool_grdma.h"
@ -652,11 +653,21 @@ udcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
return rc;
}
static void *udcm_unmonitor(int fd, int flags, void *context)
{
volatile int *barrier = (volatile int *)context;
*barrier = 1;
return NULL;
}
static int udcm_module_finalize(mca_btl_openib_module_t *btl,
ompi_btl_openib_connect_base_module_t *cpc)
{
udcm_module_t *m = (udcm_module_t *) cpc;
opal_list_item_t *item;
volatile int barrier = 0;
if (NULL == m) {
return OMPI_SUCCESS;
@ -694,7 +705,13 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl,
BTL_VERBOSE(("destroying listing thread"));
/* stop monitoring the channel's fd before destroying the listen qp */
ompi_btl_openib_fd_unmonitor(m->cm_channel->fd, NULL, NULL);
ompi_btl_openib_fd_unmonitor(m->cm_channel->fd, udcm_unmonitor, (void *)&barrier);
while (0 == barrier) {
#ifndef __WINDOWS__
sched_yield();
#endif
}
/* destroy the listen queue pair. this will cause ibv_get_cq_event to
return. */
@ -815,11 +832,27 @@ static void udcm_module_destroy_listen_qp (udcm_module_t *m)
enum ibv_qp_attr_mask attr_mask;
struct ibv_qp_attr attr;
struct ibv_wc wc;
mca_btl_openib_async_cmd_t async_command;
if (NULL == m->listen_qp) {
return;
}
/* Tell the openib async thread to ignore ERR state on the QP
we are about to manually set the ERR state on */
async_command.a_cmd = OPENIB_ASYNC_IGNORE_QP_ERR;
async_command.qp = m->listen_qp;
if (write(mca_btl_openib_component.async_pipe[1],
&async_command, sizeof(mca_btl_openib_async_cmd_t))<0){
BTL_ERROR(("Failed to write to pipe [%d]",errno));
return;
}
/* wait for ok from thread */
if (OMPI_SUCCESS !=
btl_openib_async_command_done(OPENIB_ASYNC_IGNORE_QP_ERR)) {
BTL_ERROR(("Command to openib async thread to ignore QP ERR state failed"));
}
do {
/* Move listen QP into the ERR state to cancel all outstanding
work requests */
@ -828,6 +861,8 @@ static void udcm_module_destroy_listen_qp (udcm_module_t *m)
attr.sq_psn = 0;
attr_mask = IBV_QP_STATE | IBV_QP_SQ_PSN;
BTL_VERBOSE(("Setting qp to err state %p", (void *)m->listen_qp));
if (0 != ibv_modify_qp(m->listen_qp, &attr, IBV_QP_STATE)) {
BTL_VERBOSE(("error modifying qp to ERR. errno = %d",
errno));