1
1

btl/openib: remove extra threads

This commit removes the service and async event threads from the
openib btl. Both threads are replaced by opal progress thread
support. The run_in_main function is now supported by allocating an
event and adding it to the sync event base. This ensures that the
requested function is called as part of opal_progress.

Signed-off-by: Nathan Hjelm <hjelmn@lanl.gov>
Этот коммит содержится в:
Nathan Hjelm 2015-10-06 12:47:09 -06:00
родитель e0d9e6553f
Коммит b8af310efa
10 изменённых файлов: 396 добавлений и 1409 удалений

Просмотреть файл

@ -55,8 +55,6 @@ sources = \
btl_openib_async.h \
btl_openib_xrc.c \
btl_openib_xrc.h \
btl_openib_fd.h \
btl_openib_fd.c \
btl_openib_ip.h \
btl_openib_ip.c \
btl_openib_put.c \

Просмотреть файл

@ -638,28 +638,10 @@ static int prepare_device_for_use (mca_btl_openib_device_t *device)
OBJ_CONSTRUCT(&device->qps[qp_index].recv_free, opal_free_list_t);
}
if(mca_btl_openib_component.use_async_event_thread) {
mca_btl_openib_async_cmd_t async_command = {.a_cmd = OPENIB_ASYNC_CMD_FD_ADD,
.fd = device->ib_dev_context->async_fd,
.qp = NULL};
device->got_fatal_event = false;
device->got_port_event = false;
mca_btl_openib_async_add_device (device);
/* start the async even thread if it is not already started */
if (start_async_event_thread() != OPAL_SUCCESS)
return OPAL_ERROR;
device->got_fatal_event = false;
device->got_port_event = false;
if (write(mca_btl_openib_component.async_pipe[1],
&async_command, sizeof(mca_btl_openib_async_cmd_t))<0){
BTL_ERROR(("Failed to write to pipe [%d]",errno));
return OPAL_ERROR;
}
/* wait for ok from thread */
if (OPAL_SUCCESS !=
btl_openib_async_command_done(device->ib_dev_context->async_fd)) {
return OPAL_ERROR;
}
}
#if OPAL_ENABLE_PROGRESS_THREADS == 1
/* Prepare data for thread, but not starting it */
OBJ_CONSTRUCT(&device->thread, opal_thread_t);

Просмотреть файл

@ -48,6 +48,7 @@
#include "opal/mca/mpool/mpool.h"
#include "opal/mca/btl/base/btl_base_error.h"
#include "opal/mca/btl/base/base.h"
#include "opal/runtime/opal_progress_threads.h"
#include "connect/connect.h"
@ -227,9 +228,7 @@ struct mca_btl_openib_component_t {
int apm_ports;
unsigned int buffer_alignment; /**< Preferred communication buffer alignment in Bytes (must be power of two) */
int32_t error_counter; /**< Counts number on error events that we got on all devices */
int async_pipe[2]; /**< Pipe for comunication with async event thread */
int async_comp_pipe[2]; /**< Pipe for async thread comunication with main thread */
pthread_t async_thread; /**< Async thread that will handle fatal errors */
opal_event_base_t *async_evbase; /**< Async event base */
bool use_async_event_thread; /**< Use the async event handler */
mca_btl_openib_srq_manager_t srq_manager; /**< Hash table for all BTL SRQs */
#if BTL_OPENIB_FAILOVER_ENABLED
@ -410,6 +409,8 @@ typedef struct mca_btl_openib_device_t {
uint64_t mem_reg_max, mem_reg_active;
/* Device is ready for use */
bool ready_for_use;
/* Async event */
opal_event_t async_event;
} mca_btl_openib_device_t;
OBJ_CLASS_DECLARATION(mca_btl_openib_device_t);
@ -907,6 +908,15 @@ static inline int qp_cq_prio(const int qp)
#define BTL_OPENIB_RDMA_QP(QP) \
((QP) == mca_btl_openib_component.rdma_qp)
/**
* Run function as part of opal_progress()
*
* @param[in] fn function to run
* @param[in] arg function data
*/
int mca_btl_openib_run_in_main (void *(*fn)(void *), void *arg);
END_C_DECLS
#endif /* MCA_BTL_IB_H */

Просмотреть файл

@ -36,6 +36,10 @@
#include "btl_openib_proc.h"
#include "btl_openib_endpoint.h"
static opal_list_t ignore_qp_err_list;
static opal_mutex_t ignore_qp_err_list_lock;
static int32_t btl_openib_async_device_count = 0;
struct mca_btl_openib_async_poll {
int active_poll_size;
int poll_size;
@ -50,14 +54,7 @@ typedef struct {
OBJ_CLASS_INSTANCE(mca_btl_openib_qp_list, opal_list_item_t, NULL, NULL);
static int return_status = OPAL_ERROR;
static int btl_openib_async_poll_init(struct mca_btl_openib_async_poll *hcas_poll);
static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *hcas_poll, opal_list_t *ignore_qp_err_list);
static int btl_openib_async_deviceh(struct mca_btl_openib_async_poll *hcas_poll, int index,
opal_list_t *ignore_qp_err_list);
static const char *openib_event_to_str (enum ibv_event_type event);
static int send_command_comp(int in);
/* Function converts event to string (name)
* Open Fabris don't have function that do this job :(
@ -138,132 +135,6 @@ static mca_btl_openib_endpoint_t * xrc_qp2endpoint(uint32_t qp_num, mca_btl_open
#endif
/* Function inits mca_btl_openib_async_poll */
static int btl_openib_async_poll_init(struct mca_btl_openib_async_poll *devices_poll)
{
devices_poll->active_poll_size = 1;
devices_poll->poll_size = 4;
devices_poll->async_pollfd = malloc(sizeof(struct pollfd) * devices_poll->poll_size);
if (NULL == devices_poll->async_pollfd) {
BTL_ERROR(("Failed malloc: %s:%d"
, __FILE__, __LINE__));
return OPAL_ERROR;
}
/* Creating comunication channel with the main thread */
devices_poll->async_pollfd[0].fd = mca_btl_openib_component.async_pipe[0];
devices_poll->async_pollfd[0].events = POLLIN;
devices_poll->async_pollfd[0].revents = 0;
return OPAL_SUCCESS;
}
/* Send command completion to main thread */
static int send_command_comp(int in)
{
if (write(mca_btl_openib_component.async_comp_pipe[1], &in, sizeof(int)) < 0) {
BTL_ERROR(("Write failed [%d]",errno));
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
/* Function handle async thread commands */
static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *devices_poll, opal_list_t *ignore_qp_err_list)
{
struct pollfd *async_pollfd_tmp;
mca_btl_openib_async_cmd_t cmd;
int fd,flags,j,ret;
/* Got command from main thread */
ret = read(devices_poll->async_pollfd[0].fd, &cmd, sizeof(mca_btl_openib_async_cmd_t));
if (sizeof(mca_btl_openib_async_cmd_t) != ret) {
BTL_ERROR(("Read failed [%d]",errno));
return OPAL_ERROR;
}
BTL_VERBOSE(("Got cmd %d", cmd.a_cmd));
if (OPENIB_ASYNC_CMD_FD_ADD == cmd.a_cmd) {
fd = cmd.fd;
BTL_VERBOSE(("Got fd %d", fd));
BTL_VERBOSE(("Adding device [%d] to async event poll[%d]",
fd, devices_poll->active_poll_size));
flags = fcntl(fd, F_GETFL);
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
BTL_ERROR(("Failed to change file descriptor of async event"));
return OPAL_ERROR;
}
if ((devices_poll->active_poll_size + 1) > devices_poll->poll_size) {
devices_poll->poll_size+=devices_poll->poll_size;
async_pollfd_tmp = malloc(sizeof(struct pollfd) * devices_poll->poll_size);
if (NULL == async_pollfd_tmp) {
BTL_ERROR(("Failed malloc: %s:%d. "
"Fatal error, stoping asynch event thread"
, __FILE__, __LINE__));
return OPAL_ERROR;
}
memcpy (async_pollfd_tmp,devices_poll->async_pollfd,
sizeof(struct pollfd) * (devices_poll->active_poll_size));
free(devices_poll->async_pollfd);
devices_poll->async_pollfd = async_pollfd_tmp;
}
devices_poll->async_pollfd[devices_poll->active_poll_size].fd = fd;
devices_poll->async_pollfd[devices_poll->active_poll_size].events = POLLIN;
devices_poll->async_pollfd[devices_poll->active_poll_size].revents = 0;
devices_poll->active_poll_size++;
if (OPAL_SUCCESS != send_command_comp(fd)) {
return OPAL_ERROR;
}
} else if (OPENIB_ASYNC_CMD_FD_REMOVE == cmd.a_cmd) {
bool fd_found = false;
fd = cmd.fd;
BTL_VERBOSE(("Got fd %d", fd));
/* Removing device from poll */
BTL_VERBOSE(("Removing device [%d] from async event poll [%d]",
fd, devices_poll->active_poll_size));
if (devices_poll->active_poll_size > 1) {
for (j=0; (j < devices_poll->active_poll_size || !fd_found); j++) {
if (devices_poll->async_pollfd[j].fd == fd) {
devices_poll->async_pollfd[j].fd =
devices_poll->async_pollfd[devices_poll->active_poll_size-1].fd;
devices_poll->async_pollfd[j].events =
devices_poll->async_pollfd[devices_poll->active_poll_size-1].events;
devices_poll->async_pollfd[j].revents =
devices_poll->async_pollfd[devices_poll->active_poll_size-1].revents;
fd_found = true;
}
}
if (!fd_found) {
BTL_ERROR(("Requested FD[%d] was not found in poll array",fd));
return OPAL_ERROR;
}
}
devices_poll->active_poll_size--;
if (OPAL_SUCCESS != send_command_comp(fd)) {
return OPAL_ERROR;
}
} else if (OPENIB_ASYNC_IGNORE_QP_ERR == cmd.a_cmd) {
mca_btl_openib_qp_list *new_qp;
new_qp = OBJ_NEW(mca_btl_openib_qp_list);
BTL_VERBOSE(("Ignore errors on QP %p", (void *)cmd.qp));
new_qp->qp = cmd.qp;
opal_list_append(ignore_qp_err_list, (opal_list_item_t *)new_qp);
send_command_comp(OPENIB_ASYNC_IGNORE_QP_ERR);
} else if (OPENIB_ASYNC_THREAD_EXIT == cmd.a_cmd) {
/* Got 0 - command to close the thread */
opal_list_item_t *item;
BTL_VERBOSE(("Async event thread exit"));
free(devices_poll->async_pollfd);
return_status = OPAL_SUCCESS;
while ((item = opal_list_remove_first(ignore_qp_err_list))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(ignore_qp_err_list);
pthread_exit(&return_status);
}
return OPAL_SUCCESS;
}
/* The main idea of resizing SRQ algorithm -
We create a SRQ with size = rd_num, but for efficient usage of resources
@ -323,235 +194,118 @@ srq_limit_event_exit:
}
/* Function handle async device events */
static int btl_openib_async_deviceh(struct mca_btl_openib_async_poll *devices_poll, int index,
opal_list_t *ignore_qp_err_list)
static void btl_openib_async_device (int fd, short flags, void *arg)
{
int j;
mca_btl_openib_device_t *device = NULL;
mca_btl_openib_device_t *device = (mca_btl_openib_device_t *) arg;
struct ibv_async_event event;
int event_type;
/* We need to find correct device and process this event */
for (j=0; j < mca_btl_openib_component.ib_num_btls; j++) {
if (mca_btl_openib_component.openib_btls[j]->device->ib_dev_context->async_fd ==
devices_poll->async_pollfd[index].fd ) {
device = mca_btl_openib_component.openib_btls[j]->device;
break;
if (ibv_get_async_event((struct ibv_context *)device->ib_dev_context,&event) < 0) {
if (EWOULDBLOCK != errno) {
BTL_ERROR(("Failed to get async event"));
}
return;
}
if (NULL != device) {
if (ibv_get_async_event((struct ibv_context *)device->ib_dev_context,&event) < 0) {
if (EWOULDBLOCK == errno) {
/* No event found ?
* It was handled by somebody other */
return OPAL_SUCCESS;
} else {
BTL_ERROR(("Failed to get async event"));
return OPAL_ERROR;
event_type = event.event_type;
#if OPAL_HAVE_CONNECTX_XRC
/* is it XRC event ?*/
bool xrc_event = false;
if (IBV_XRC_QP_EVENT_FLAG & event.event_type) {
xrc_event = true;
/* Clean the bitnd handel as usual */
event_type ^= IBV_XRC_QP_EVENT_FLAG;
}
#endif
switch(event_type) {
case IBV_EVENT_PATH_MIG:
BTL_ERROR(("Alternative path migration event reported"));
if (APM_ENABLED) {
BTL_ERROR(("Trying to find additional path..."));
#if OPAL_HAVE_CONNECTX_XRC
if (xrc_event)
mca_btl_openib_load_apm_xrc_rcv(event.element.xrc_qp_num,
xrc_qp2endpoint(event.element.xrc_qp_num, device));
else
#endif
mca_btl_openib_load_apm(event.element.qp,
qp2endpoint(event.element.qp, device));
}
break;
case IBV_EVENT_DEVICE_FATAL:
/* Set the flag to fatal */
device->got_fatal_event = true;
/* It is not critical to protect the counter */
OPAL_THREAD_ADD32(&mca_btl_openib_component.error_counter, 1);
/* fall through */
case IBV_EVENT_CQ_ERR:
case IBV_EVENT_QP_FATAL:
if (event_type == IBV_EVENT_QP_FATAL) {
mca_btl_openib_qp_list *qp_item;
bool in_ignore_list = false;
BTL_VERBOSE(("QP is in err state %p", (void *)event.element.qp));
/* look through ignore list */
opal_mutex_lock (&ignore_qp_err_list_lock);
OPAL_LIST_FOREACH(qp_item, &ignore_qp_err_list, mca_btl_openib_qp_list) {
if (qp_item->qp == event.element.qp) {
BTL_VERBOSE(("QP %p is in error ignore list",
(void *)event.element.qp));
in_ignore_list = true;
break;
}
}
opal_mutex_unlock (&ignore_qp_err_list_lock);
if (in_ignore_list) {
break;
}
}
event_type = event.event_type;
#if OPAL_HAVE_CONNECTX_XRC
/* is it XRC event ?*/
bool xrc_event = false;
if (IBV_XRC_QP_EVENT_FLAG & event.event_type) {
xrc_event = true;
/* Clean the bitnd handel as usual */
event_type ^= IBV_XRC_QP_EVENT_FLAG;
}
#endif
switch(event_type) {
case IBV_EVENT_PATH_MIG:
BTL_ERROR(("Alternative path migration event reported"));
if (APM_ENABLED) {
BTL_ERROR(("Trying to find additional path..."));
#if OPAL_HAVE_CONNECTX_XRC
if (xrc_event)
mca_btl_openib_load_apm_xrc_rcv(event.element.xrc_qp_num,
xrc_qp2endpoint(event.element.xrc_qp_num, device));
else
#endif
mca_btl_openib_load_apm(event.element.qp,
qp2endpoint(event.element.qp, device));
}
break;
case IBV_EVENT_DEVICE_FATAL:
/* Set the flag to fatal */
device->got_fatal_event = true;
/* It is not critical to protect the counter */
OPAL_THREAD_ADD32(&mca_btl_openib_component.error_counter, 1);
/* fall through */
case IBV_EVENT_CQ_ERR:
case IBV_EVENT_QP_FATAL:
if (event_type == IBV_EVENT_QP_FATAL) {
opal_list_item_t *item;
mca_btl_openib_qp_list *qp_item;
bool in_ignore_list = false;
BTL_VERBOSE(("QP is in err state %p", (void *)event.element.qp));
/* look through ignore list */
for (item = opal_list_get_first(ignore_qp_err_list);
item != opal_list_get_end(ignore_qp_err_list);
item = opal_list_get_next(item)) {
qp_item = (mca_btl_openib_qp_list *)item;
if (qp_item->qp == event.element.qp) {
BTL_VERBOSE(("QP %p is in error ignore list",
(void *)event.element.qp));
in_ignore_list = true;
break;
}
}
if (in_ignore_list)
break;
}
case IBV_EVENT_QP_REQ_ERR:
case IBV_EVENT_QP_ACCESS_ERR:
case IBV_EVENT_PATH_MIG_ERR:
case IBV_EVENT_SRQ_ERR:
opal_show_help("help-mpi-btl-openib.txt", "of error event",
true,opal_process_info.nodename, (int)getpid(),
event_type,
openib_event_to_str((enum ibv_event_type)event_type));
break;
case IBV_EVENT_PORT_ERR:
opal_show_help("help-mpi-btl-openib.txt", "of error event",
true,opal_process_info.nodename, (int)getpid(),
event_type,
openib_event_to_str((enum ibv_event_type)event_type));
/* Set the flag to indicate port error */
device->got_port_event = true;
OPAL_THREAD_ADD32(&mca_btl_openib_component.error_counter, 1);
break;
case IBV_EVENT_COMM_EST:
case IBV_EVENT_PORT_ACTIVE:
case IBV_EVENT_SQ_DRAINED:
case IBV_EVENT_LID_CHANGE:
case IBV_EVENT_PKEY_CHANGE:
case IBV_EVENT_SM_CHANGE:
case IBV_EVENT_QP_LAST_WQE_REACHED:
/* fall through */
case IBV_EVENT_QP_REQ_ERR:
case IBV_EVENT_QP_ACCESS_ERR:
case IBV_EVENT_PATH_MIG_ERR:
case IBV_EVENT_SRQ_ERR:
opal_show_help("help-mpi-btl-openib.txt", "of error event",
true,opal_process_info.nodename, (int)getpid(),
event_type,
openib_event_to_str((enum ibv_event_type)event_type));
break;
case IBV_EVENT_PORT_ERR:
opal_show_help("help-mpi-btl-openib.txt", "of error event",
true,opal_process_info.nodename, (int)getpid(),
event_type,
openib_event_to_str((enum ibv_event_type)event_type));
/* Set the flag to indicate port error */
device->got_port_event = true;
OPAL_THREAD_ADD32(&mca_btl_openib_component.error_counter, 1);
break;
case IBV_EVENT_COMM_EST:
case IBV_EVENT_PORT_ACTIVE:
case IBV_EVENT_SQ_DRAINED:
case IBV_EVENT_LID_CHANGE:
case IBV_EVENT_PKEY_CHANGE:
case IBV_EVENT_SM_CHANGE:
case IBV_EVENT_QP_LAST_WQE_REACHED:
#if HAVE_DECL_IBV_EVENT_CLIENT_REREGISTER
case IBV_EVENT_CLIENT_REREGISTER:
case IBV_EVENT_CLIENT_REREGISTER:
#endif
break;
/* The event is signaled when number of prepost receive WQEs is going
under predefined threshold - srq_limit */
case IBV_EVENT_SRQ_LIMIT_REACHED:
if(OPAL_SUCCESS !=
btl_openib_async_srq_limit_event(event.element.srq)) {
return OPAL_ERROR;
}
break;
/* The event is signaled when number of prepost receive WQEs is going
under predefined threshold - srq_limit */
case IBV_EVENT_SRQ_LIMIT_REACHED:
(void) btl_openib_async_srq_limit_event (event.element.srq);
break;
default:
opal_show_help("help-mpi-btl-openib.txt", "of unknown event",
true,opal_process_info.nodename, (int)getpid(),
event_type);
}
ibv_ack_async_event(&event);
} else {
/* if (device == NULL), then failed to locate the device!
This should never happen... */
BTL_ERROR(("Failed to find device with FD %d. "
"Fatal error, stoping asynch event thread",
devices_poll->async_pollfd[index].fd));
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
/* This Async event thread is handling all async event of
* all btls/devices in openib component
*/
static void* btl_openib_async_thread(void * async)
{
int rc;
int i;
struct mca_btl_openib_async_poll devices_poll;
opal_list_t ignore_qp_err_list;
OBJ_CONSTRUCT(&ignore_qp_err_list, opal_list_t);
if (OPAL_SUCCESS != btl_openib_async_poll_init(&devices_poll)) {
BTL_ERROR(("Fatal error, stoping asynch event thread"));
pthread_exit(&return_status);
break;
default:
opal_show_help("help-mpi-btl-openib.txt", "of unknown event",
true,opal_process_info.nodename, (int)getpid(),
event_type);
}
while(1) {
rc = poll(devices_poll.async_pollfd, devices_poll.active_poll_size, -1);
if (rc < 0) {
if (errno != EINTR) {
BTL_ERROR(("Poll failed. Fatal error, stoping asynch event thread"));
pthread_exit(&return_status);
} else {
/* EINTR - we got interupt */
continue;
}
}
for(i = 0; i < devices_poll.active_poll_size; i++) {
switch (devices_poll.async_pollfd[i].revents) {
case 0:
/* no events */
break;
case POLLIN:
#if defined(__SVR4) && defined(__sun)
/*
* Need workaround for Solaris IB user verbs since
* "Poll on IB async fd returns POLLRDNORM revent even though it is masked out"
*/
case POLLIN | POLLRDNORM:
#endif
/* Processing our event */
if (0 == i) {
/* 0 poll we use for comunication with main thread */
if (OPAL_SUCCESS != btl_openib_async_commandh(&devices_poll,
&ignore_qp_err_list)) {
free(devices_poll.async_pollfd);
BTL_ERROR(("Failed to process async thread process. "
"Fatal error, stoping asynch event thread"));
pthread_exit(&return_status);
}
} else {
/* We get device event */
if (btl_openib_async_deviceh(&devices_poll, i,
&ignore_qp_err_list)) {
free(devices_poll.async_pollfd);
BTL_ERROR(("Failed to process async thread process. "
"Fatal error, stoping asynch event thread"));
pthread_exit(&return_status);
}
}
break;
default:
/* Get event other than POLLIN
* this case should not never happend */
BTL_ERROR(("Got unexpected event %d. "
"Fatal error, stoping asynch event thread",
devices_poll.async_pollfd[i].revents));
free(devices_poll.async_pollfd);
pthread_exit(&return_status);
}
}
}
return PTHREAD_CANCELED;
}
int btl_openib_async_command_done(int exp)
{
int comp;
if (read(mca_btl_openib_component.async_comp_pipe[0], &comp,
sizeof(int)) < (int) sizeof (int)){
BTL_ERROR(("Failed to read from pipe"));
return OPAL_ERROR;
}
if (exp != comp){
BTL_ERROR(("Get wrong completion on async command. Waiting for %d and got %d",
exp, comp));
return OPAL_ERROR;
}
return OPAL_SUCCESS;
ibv_ack_async_event(&event);
}
static void apm_update_attr(struct ibv_qp_attr *attr, enum ibv_qp_attr_mask *mask)
@ -685,34 +439,70 @@ void mca_btl_openib_load_apm_xrc_rcv(uint32_t qp_num, mca_btl_openib_endpoint_t
}
#endif
int start_async_event_thread(void)
int mca_btl_openib_async_init (void)
{
if (0 != mca_btl_openib_component.async_thread) {
if (!mca_btl_openib_component.use_async_event_thread ||
mca_btl_openib_component.async_evbase) {
return OPAL_SUCCESS;
}
mca_btl_openib_component.async_evbase = opal_progress_thread_init (NULL);
OBJ_CONSTRUCT(&ignore_qp_err_list, opal_list_t);
OBJ_CONSTRUCT(&ignore_qp_err_list_lock, opal_mutex_t);
/* Set the error counter to zero */
mca_btl_openib_component.error_counter = 0;
/* Create pipe for communication with async event thread */
if (pipe(mca_btl_openib_component.async_pipe)) {
BTL_ERROR(("Failed to create pipe for communication with "
"async event thread"));
return OPAL_ERROR;
}
if (pipe(mca_btl_openib_component.async_comp_pipe)) {
BTL_ERROR(("Failed to create comp pipe for communication with "
"main thread"));
return OPAL_ERROR;
}
/* Starting async event thread for the component */
if (pthread_create(&mca_btl_openib_component.async_thread, NULL,
(void*(*)(void*)) btl_openib_async_thread, NULL)) {
BTL_ERROR(("Failed to create async event thread"));
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
void mca_btl_openib_async_fini (void)
{
if (mca_btl_openib_component.async_evbase) {
OPAL_LIST_DESTRUCT(&ignore_qp_err_list);
OBJ_DESTRUCT(&ignore_qp_err_list_lock);
opal_progress_thread_finalize (NULL);
mca_btl_openib_component.async_evbase = NULL;
}
}
void mca_btl_openib_async_add_device (mca_btl_openib_device_t *device)
{
if (mca_btl_openib_component.async_evbase) {
if (1 == OPAL_THREAD_ADD32 (&btl_openib_async_device_count, 1)) {
mca_btl_openib_async_init ();
}
opal_event_set (mca_btl_openib_component.async_evbase, &device->async_event,
device->ib_dev_context->async_fd, OPAL_EV_READ | OPAL_EV_PERSIST,
btl_openib_async_device, device);
opal_event_add (&device->async_event, 0);
}
}
void mca_btl_openib_async_rem_device (mca_btl_openib_device_t *device)
{
if (mca_btl_openib_component.async_evbase) {
opal_event_del (&device->async_event);
if (0 == OPAL_THREAD_ADD32 (&btl_openib_async_device_count, -1)) {
mca_btl_openib_async_fini ();
}
}
}
void mca_btl_openib_async_add_qp_ignore (struct ibv_qp *qp)
{
if (mca_btl_openib_component.async_evbase) {
mca_btl_openib_qp_list *new_qp = OBJ_NEW(mca_btl_openib_qp_list);
if (OPAL_UNLIKELY(NULL == new_qp)) {
/* can allocate a small object. not much more can be done */
return;
}
BTL_VERBOSE(("Ignoring errors on QP %p", (void *) qp));
new_qp->qp = qp;
opal_mutex_lock (&ignore_qp_err_list_lock);
opal_list_append (&ignore_qp_err_list, (opal_list_item_t *) new_qp);
opal_mutex_unlock (&ignore_qp_err_list_lock);
}
}

Просмотреть файл

@ -3,6 +3,8 @@
* Copyright (c) 2014 Bull SAS. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* received.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -16,29 +18,42 @@
#define MCA_BTL_OPENIB_ASYNC_H
#include "btl_openib_endpoint.h"
int start_async_event_thread(void);
void mca_btl_openib_load_apm(struct ibv_qp *qp, mca_btl_openib_endpoint_t *ep);
int btl_openib_async_command_done(int exp);
#if OPAL_HAVE_CONNECTX_XRC
void mca_btl_openib_load_apm_xrc_rcv(uint32_t qp_num, mca_btl_openib_endpoint_t *ep);
#endif
#define APM_ENABLED (0 != mca_btl_openib_component.apm_lmc || 0 != mca_btl_openib_component.apm_ports)
/*
* Command types for communicating with the async thread
/**
* Initialize the async event base
*/
typedef enum {
OPENIB_ASYNC_CMD_FD_ADD,
OPENIB_ASYNC_CMD_FD_REMOVE,
OPENIB_ASYNC_IGNORE_QP_ERR,
OPENIB_ASYNC_THREAD_EXIT
} btl_openib_async_cmd_type_t;
int mca_btl_openib_async_init (void);
typedef struct {
btl_openib_async_cmd_type_t a_cmd;
int fd;
struct ibv_qp *qp;
} mca_btl_openib_async_cmd_t;
/**
* Finalize the async event base
*/
void mca_btl_openib_async_fini (void);
/**
* Register a device with the async event base
*
* @param[in] device device to register
*/
void mca_btl_openib_async_add_device (mca_btl_openib_device_t *device);
/**
* Deregister a device with the async event base
*
* @param[in] device device to deregister
*/
void mca_btl_openib_async_rem_device (mca_btl_openib_device_t *device);
/**
* Ignore error events on a queue pair
*
* @param[in] qp queue pair to ignore
*/
void mca_btl_openib_async_add_qp_ignore (struct ibv_qp *qp);
#endif

Просмотреть файл

@ -97,7 +97,6 @@
#include "btl_openib_ini.h"
#include "btl_openib_mca.h"
#include "btl_openib_xrc.h"
#include "btl_openib_fd.h"
#if BTL_OPENIB_FAILOVER_ENABLED
#include "btl_openib_failover.h"
#endif
@ -245,32 +244,13 @@ static int btl_openib_component_close(void)
{
int rc = OPAL_SUCCESS;
/* Tell the async thread to shutdown */
if (mca_btl_openib_component.use_async_event_thread &&
0 != mca_btl_openib_component.async_thread) {
mca_btl_openib_async_cmd_t async_command = {.a_cmd = OPENIB_ASYNC_THREAD_EXIT,
.fd = -1, .qp = NULL};
if (write(mca_btl_openib_component.async_pipe[1], &async_command,
sizeof(mca_btl_openib_async_cmd_t)) < 0) {
BTL_ERROR(("Failed to communicate with async event thread"));
rc = OPAL_ERROR;
} else {
if (pthread_join(mca_btl_openib_component.async_thread, NULL)) {
BTL_ERROR(("Failed to stop OpenIB async event thread"));
rc = OPAL_ERROR;
}
}
close(mca_btl_openib_component.async_pipe[0]);
close(mca_btl_openib_component.async_pipe[1]);
close(mca_btl_openib_component.async_comp_pipe[0]);
close(mca_btl_openib_component.async_comp_pipe[1]);
}
/* remove the async event from the event base */
mca_btl_openib_async_fini ();
OBJ_DESTRUCT(&mca_btl_openib_component.srq_manager.lock);
OBJ_DESTRUCT(&mca_btl_openib_component.srq_manager.srq_addr_table);
opal_btl_openib_connect_base_finalize();
opal_btl_openib_fd_finalize();
opal_btl_openib_ini_finalize();
if (NULL != mca_btl_openib_component.default_recv_qps) {
@ -906,7 +886,7 @@ static void device_construct(mca_btl_openib_device_t *device)
device->ib_dev_context = NULL;
device->ib_pd = NULL;
device->mpool = NULL;
#if OPAL_ENABLE_PROGRESS_THREADS
#if OPAL_ENABLE_PROGRESS_THREADS == 1
device->ib_channel = NULL;
#endif
device->btls = 0;
@ -926,10 +906,6 @@ static void device_construct(mca_btl_openib_device_t *device)
device->xrc_fd = -1;
#endif
device->qps = NULL;
mca_btl_openib_component.async_pipe[0] =
mca_btl_openib_component.async_pipe[1] = -1;
mca_btl_openib_component.async_comp_pipe[0] =
mca_btl_openib_component.async_comp_pipe[1] = -1;
OBJ_CONSTRUCT(&device->device_lock, opal_mutex_t);
OBJ_CONSTRUCT(&device->send_free_control, opal_free_list_t);
device->max_inline_data = 0;
@ -940,8 +916,8 @@ static void device_destruct(mca_btl_openib_device_t *device)
{
int i;
#if OPAL_ENABLE_PROGRESS_THREADS
if(device->progress) {
#if OPAL_ENABLE_PROGRESS_THREADS == 1
if (device->progress) {
device->progress = false;
if (pthread_cancel(device->thread.t_handle)) {
BTL_ERROR(("Failed to cancel OpenIB progress thread"));
@ -949,27 +925,15 @@ static void device_destruct(mca_btl_openib_device_t *device)
}
opal_thread_join(&device->thread, NULL);
}
if (ibv_destroy_comp_channel(device->ib_channel)) {
BTL_VERBOSE(("Failed to close comp_channel"));
goto device_error;
}
#endif
/* signaling to async_tread to stop poll for this device */
if (mca_btl_openib_component.use_async_event_thread &&
-1 != mca_btl_openib_component.async_pipe[1]) {
mca_btl_openib_async_cmd_t async_command = {.a_cmd = OPENIB_ASYNC_CMD_FD_REMOVE,
.fd = device->ib_dev_context->async_fd,
.qp = NULL};
if (write(mca_btl_openib_component.async_pipe[1], &async_command,
sizeof(mca_btl_openib_async_cmd_t)) < 0){
BTL_ERROR(("Failed to write to pipe"));
goto device_error;
}
/* wait for ok from thread */
if (OPAL_SUCCESS != btl_openib_async_command_done(device->ib_dev_context->async_fd)){
goto device_error;
}
}
mca_btl_openib_async_rem_device (device);
if(device->eager_rdma_buffers) {
int i;
@ -2562,11 +2526,6 @@ btl_openib_component_init(int *num_btl_modules,
goto no_btls;
}
/* Initialize FD listening */
if (OPAL_SUCCESS != opal_btl_openib_fd_init()) {
goto no_btls;
}
/* If we are using ptmalloc2 and there are no posix threads
available, this will cause memory corruption. Refuse to run.
Right now, ptmalloc2 is the only memory manager that we have on
@ -2741,7 +2700,7 @@ btl_openib_component_init(int *num_btl_modules,
OBJ_CONSTRUCT(&btl_list, opal_list_t);
OBJ_CONSTRUCT(&mca_btl_openib_component.ib_lock, opal_mutex_t);
mca_btl_openib_component.async_thread = 0;
distance = dev_sorted[0].distance;
for (found = false, i = 0;
i < num_devs && (-1 == mca_btl_openib_component.ib_max_btls ||
@ -3925,3 +3884,42 @@ int mca_btl_openib_post_srr(mca_btl_openib_module_t* openib_btl, const int qp)
return OPAL_ERROR;
}
struct mca_btl_openib_event_t {
opal_event_t super;
void *(*fn)(void *);
void *arg;
opal_event_t *event;
};
typedef struct mca_btl_openib_event_t mca_btl_openib_event_t;
static void *mca_btl_openib_run_once_cb (int fd, int flags, void *context)
{
mca_btl_openib_event_t *event = (mca_btl_openib_event_t *) context;
void *ret;
ret = event->fn (event->arg);
opal_event_del (&event->super);
free (event);
return ret;
}
int mca_btl_openib_run_in_main (void *(*fn)(void *), void *arg)
{
mca_btl_openib_event_t *event = malloc (sizeof (mca_btl_openib_event_t));
if (OPAL_UNLIKELY(NULL == event)) {
return OPAL_ERR_OUT_OF_RESOURCE;
}
event->fn = fn;
event->arg = arg;
opal_event_set (opal_sync_event_base, &event->super, -1, OPAL_EV_READ,
mca_btl_openib_run_once_cb, event);
opal_event_active (&event->super, OPAL_EV_READ, 1);
return OPAL_SUCCESS;
}

Просмотреть файл

@ -1,693 +0,0 @@
/*
* Copyright (c) 2008-2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2009 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* Note: this file is a little fast-n-loose --
* it uses this value in run-time "if" conditionals (vs. compile-time
* #if conditionals). We also don't protect including <pthread.h>.
* That's because this component currently only compiles on Linux and
* Solaris, and both of these OS's have pthreads. Using the run-time
* conditionals gives us better compile-time checking, even of code
* that isn't activated.
*
* Note, too, that the functionality in this file does *not* require
* all the heavyweight OMPI thread infrastructure (e.g., from
* --enable-mpi-thread-multiple or --enable-progress-threads). All work that
* is done in a separate progress thread is very carefully segregated
* from that of the main thread, and communication back to the main
* thread
*/
#include "opal_config.h"
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include "opal/class/opal_list.h"
#include "opal/mca/event/event.h"
#include "opal/util/output.h"
#include "opal/util/fd.h"
#include "opal/threads/threads.h"
#include "btl_openib_fd.h"
typedef union {
opal_btl_openib_fd_event_callback_fn_t *event;
opal_btl_openib_fd_main_callback_fn_t *main;
} callback_u_t;
/*
* Data for each registered item
*/
typedef struct {
opal_list_item_t super;
bool ri_event_used;
opal_event_t ri_event;
int ri_fd;
int ri_flags;
callback_u_t ri_callback;
void *ri_context;
} registered_item_t;
static OBJ_CLASS_INSTANCE(registered_item_t, opal_list_item_t, NULL, NULL);
/*
* Command types
*/
typedef enum {
/* Read by service thread */
CMD_TIME_TO_QUIT,
CMD_ADD_FD,
CMD_REMOVE_FD,
ACK_RAN_FUNCTION,
/* Read by service and main threads */
CMD_CALL_FUNCTION,
CMD_MAX
} cmd_type_t;
/*
* Commands. Fields ordered to avoid memory holes (and valgrind warnings).
*/
typedef struct {
callback_u_t pc_fn;
void *pc_context;
int pc_fd;
int pc_flags;
cmd_type_t pc_cmd;
char end;
} cmd_t;
/*
* Queued up list of commands to send to the main thread
*/
typedef struct {
opal_list_item_t super;
cmd_t cli_cmd;
} cmd_list_item_t;
static OBJ_CLASS_INSTANCE(cmd_list_item_t, opal_list_item_t, NULL, NULL);
static bool initialized = false;
static int cmd_size = 0;
static fd_set read_fds, write_fds;
static int max_fd;
static opal_list_t registered_items;
/* These items are only used in the threaded version */
/* Owned by the main thread */
static pthread_t thread;
static opal_event_t main_thread_event;
static int pipe_to_service_thread[2] = { -1, -1 };
/* Owned by the service thread */
static int pipe_to_main_thread[2] = { -1, -1 };
static const size_t max_outstanding_to_main_thread = 32;
static size_t waiting_for_ack_from_main_thread = 0;
static opal_list_t pending_to_main_thread;
/*
* Write a command to the main thread, or queue it up if the pipe is full
*/
static int write_to_main_thread(cmd_t *cmd)
{
/* Note that if we write too much to the main thread pipe and the
main thread doesn't check it often, we could fill up the pipe
and cause this thread to block. Bad! So we do some simple
counting here and ensure that we don't fill the pipe. If we
are in danger of that, then queue up the commands here in the
service thread. The main thread will ACK every CALL_FUNCTION
command, so we have a built-in mechanism to wake up the service
thread to drain any queued-up commands. */
if (opal_list_get_size(&pending_to_main_thread) > 0 ||
waiting_for_ack_from_main_thread >= max_outstanding_to_main_thread) {
cmd_list_item_t *cli = OBJ_NEW(cmd_list_item_t);
if (NULL == cli) {
return OPAL_ERR_OUT_OF_RESOURCE;
}
memcpy(&cli->cli_cmd, cmd, cmd_size);
opal_list_append(&pending_to_main_thread, &(cli->super));
} else {
OPAL_OUTPUT((-1, "fd: writing to main thread"));
opal_fd_write(pipe_to_main_thread[1], cmd_size, cmd);
++waiting_for_ack_from_main_thread;
}
return OPAL_SUCCESS;
}
static void service_fd_callback(int fd, short event, void *context)
{
registered_item_t *ri = (registered_item_t*) context;
ri->ri_callback.event(fd, event, ri->ri_context);
}
/*
* Add an fd to the listening set
*/
static int service_pipe_cmd_add_fd(bool use_libevent, cmd_t *cmd)
{
registered_item_t *ri = OBJ_NEW(registered_item_t);
if (NULL == ri) {
return OPAL_ERR_OUT_OF_RESOURCE;
}
ri->ri_event_used = false;
ri->ri_fd = cmd->pc_fd;
ri->ri_flags = cmd->pc_flags;
ri->ri_callback.event = cmd->pc_fn.event;
ri->ri_context = cmd->pc_context;
if (use_libevent) {
/* Make an event for this fd */
ri->ri_event_used = true;
opal_event_set(opal_sync_event_base, &ri->ri_event, ri->ri_fd,
ri->ri_flags | OPAL_EV_PERSIST, service_fd_callback,
ri);
opal_event_add(&ri->ri_event, 0);
} else {
/* Add the fd to the relevant fd local sets and update max_fd */
if (OPAL_EV_READ & ri->ri_flags) {
FD_SET(ri->ri_fd, &read_fds);
}
if (OPAL_EV_WRITE & cmd->pc_flags) {
FD_SET(ri->ri_fd, &write_fds);
}
max_fd = (max_fd > ri->ri_fd) ? max_fd : ri->ri_fd + 1;
}
opal_list_append(&registered_items, &ri->super);
return OPAL_SUCCESS;
}
/*
* Run a function
*/
static int service_pipe_cmd_call_function(cmd_t *cmd)
{
cmd_t local_cmd;
OPAL_OUTPUT((-1, "fd service thread: calling function!"));
/* Call the function */
if (NULL != cmd->pc_fn.main) {
cmd->pc_fn.main(cmd->pc_context);
}
/* Now ACK that we ran the function */
memset(&local_cmd, 0, cmd_size);
local_cmd.pc_cmd = ACK_RAN_FUNCTION;
opal_fd_write(pipe_to_main_thread[1], cmd_size, &local_cmd);
/* Done */
return OPAL_SUCCESS;
}
/*
* Remove an fd from the listening set
*/
static int service_pipe_cmd_remove_fd(cmd_t *cmd)
{
int i;
opal_list_item_t *item;
registered_item_t *ri;
OPAL_OUTPUT((-1, "service thread got unmonitor fd %d", cmd->pc_fd));
/* Go through the list of registered fd's and find the fd to
remove */
for (item = opal_list_get_first(&registered_items);
NULL != opal_list_get_end(&registered_items);
item = opal_list_get_next(item)) {
ri = (registered_item_t*) item;
if (cmd->pc_fd == ri->ri_fd) {
/* Found it. The item knows if it was used as a libevent
event or an entry in the local fd sets. */
if (ri->ri_event_used) {
/* Remove this event from libevent */
opal_event_del(&ri->ri_event);
} else {
/* Remove this item from the fd_sets and recalculate
MAX_FD */
FD_CLR(cmd->pc_fd, &read_fds);
FD_CLR(cmd->pc_fd, &write_fds);
for (max_fd = i = pipe_to_service_thread[0]; i < FD_SETSIZE; ++i) {
if (FD_ISSET(i, &read_fds) || FD_ISSET(i, &write_fds)) {
max_fd = i + 1;
}
}
}
/* Let the caller know that we have stopped monitoring
this fd (if they care) */
if (NULL != cmd->pc_fn.event) {
cmd->pc_fn.event(cmd->pc_fd, 0, cmd->pc_context);
}
/* Remove this item from the list of registered items and
release it */
opal_list_remove_item(&registered_items, item);
OBJ_RELEASE(item);
return OPAL_SUCCESS;
}
}
/* This shouldn't happen */
return OPAL_ERR_NOT_FOUND;
}
/*
* Call a function and ACK that we ran it
*/
static int main_pipe_cmd_call_function(cmd_t *cmd)
{
cmd_t local_cmd;
OPAL_OUTPUT((-1, "fd main thread: calling function!"));
/* Call the function */
if (NULL != cmd->pc_fn.main) {
cmd->pc_fn.main(cmd->pc_context);
}
/* Now ACK that we ran the function */
memset(&local_cmd, 0, cmd_size);
local_cmd.pc_cmd = ACK_RAN_FUNCTION;
opal_fd_write(pipe_to_service_thread[1], cmd_size, &local_cmd);
/* Done */
return OPAL_SUCCESS;
}
/*
* Act on pipe commands
*/
static bool service_pipe_cmd(void)
{
bool ret = false;
cmd_t cmd;
cmd_list_item_t *cli;
opal_fd_read(pipe_to_service_thread[0], cmd_size, &cmd);
switch (cmd.pc_cmd) {
case CMD_ADD_FD:
OPAL_OUTPUT((-1, "fd service thread: CMD_ADD_FD"));
if (OPAL_SUCCESS != service_pipe_cmd_add_fd(false, &cmd)) {
ret = true;
}
break;
case CMD_REMOVE_FD:
OPAL_OUTPUT((-1, "fd service thread: CMD_REMOVE_FD"));
if (OPAL_SUCCESS != service_pipe_cmd_remove_fd(&cmd)) {
ret = true;
}
break;
case CMD_CALL_FUNCTION:
OPAL_OUTPUT((-1, "fd service thread: CMD_RUN_FUNCTION"));
if (OPAL_SUCCESS != service_pipe_cmd_call_function(&cmd)) {
ret = true;
}
break;
case CMD_TIME_TO_QUIT:
OPAL_OUTPUT((-1, "fd service thread: CMD_TIME_TO_QUIT"));
ret = true;
break;
case ACK_RAN_FUNCTION:
/* We don't have a guarantee that the main thread will check
its pipe frequently, so we do some simple counting to
ensure we just don't have too many outstanding commands to
the main thread at any given time. The main thread will
ACK every CALL_FUNCTION command, so this thread will always
wake up and continue to drain any queued up functions. */
cli = (cmd_list_item_t*) opal_list_remove_first(&pending_to_main_thread);
if (NULL != cli) {
OPAL_OUTPUT((-1, "sending queued up cmd function to main thread"));
opal_fd_write(pipe_to_main_thread[1], cmd_size, &(cli->cli_cmd));
OBJ_RELEASE(cli);
} else {
--waiting_for_ack_from_main_thread;
}
break;
default:
OPAL_OUTPUT((-1, "fd service thread: unknown pipe command!"));
break;
}
return ret;
}
/*
* Main thread logic
*/
static void *service_thread_start(void *context)
{
int rc, flags;
fd_set read_fds_copy, write_fds_copy;
opal_list_item_t *item;
registered_item_t *ri;
/* Make an fd set that we can select() on */
FD_ZERO(&write_fds);
FD_ZERO(&read_fds);
FD_SET(pipe_to_service_thread[0], &read_fds);
max_fd = pipe_to_service_thread[0] + 1;
OPAL_OUTPUT((-1, "fd service thread running"));
/* Main loop waiting for commands over the fd's */
while (1) {
memcpy(&read_fds_copy, &read_fds, sizeof(read_fds));
memcpy(&write_fds_copy, &write_fds, sizeof(write_fds));
OPAL_OUTPUT((-1, "fd service thread blocking on select..."));
rc = select(max_fd, &read_fds_copy, &write_fds_copy, NULL, NULL);
if (0 != rc && EAGAIN == errno) {
continue;
}
OPAL_OUTPUT((-1, "fd service thread woke up!"));
if (0 > rc) {
if (EBADF == errno) {
/* We are assuming we lost a socket so set rc to 1 so we'll
* try to read a command off the service pipe to receive a
* rm command (corresponding to the socket that went away).
* If the EBADF is from the service pipe then the error
* condition will be handled by the service_pipe_cmd().
*/
OPAL_OUTPUT((-1,"fd service thread: non-EAGAIN from select %d", errno));
rc = 1;
}
}
if (rc > 0) {
if (FD_ISSET(pipe_to_service_thread[0], &read_fds_copy)) {
OPAL_OUTPUT((-1, "fd service thread: pipe command"));
if (service_pipe_cmd()) {
break;
}
OPAL_OUTPUT((-1, "fd service thread: back from pipe command"));
/* Continue to the top of the loop to see if there are more
* commands on the pipe. This is done to reset the fds
* list just in case the last select incurred an EBADF.
* Please do not remove this continue thinking one is trying
* to enforce a fairness of reading the sockets or we'll
* end up with segv's below when select incurs an EBADF.
*/
continue;
}
/* Go through all the registered events and see who had
activity */
if (!opal_list_is_empty(&registered_items)) {
for (item = opal_list_get_first(&registered_items);
item != opal_list_get_end(&registered_items);
item = opal_list_get_next(item)) {
ri = (registered_item_t*) item;
flags = 0;
/* See if this fd was ready for reading or writing
(fd's will only be in the read_fds or write_fds
set depending on what they registered for) */
if (FD_ISSET(ri->ri_fd, &read_fds_copy)) {
flags |= OPAL_EV_READ;
}
if (FD_ISSET(ri->ri_fd, &write_fds_copy)) {
flags |= OPAL_EV_WRITE;
}
/* If either was ready, invoke the callback */
if (0 != flags) {
OPAL_OUTPUT((-1, "fd service thread: invoking callback for registered fd %d", ri->ri_fd));
ri->ri_callback.event(ri->ri_fd, flags,
ri->ri_context);
OPAL_OUTPUT((-1, "fd service thread: back from callback for registered fd %d", ri->ri_fd));
}
}
}
}
}
/* All done */
OPAL_OUTPUT((-1, "fd service thread: exiting"));
opal_atomic_wmb();
return NULL;
}
static void main_thread_event_callback(int fd, short event, void *context)
{
cmd_t cmd;
OPAL_OUTPUT((-1, "main thread -- reading command"));
opal_fd_read(pipe_to_main_thread[0], cmd_size, &cmd);
switch (cmd.pc_cmd) {
case CMD_CALL_FUNCTION:
OPAL_OUTPUT((-1, "fd main thread: calling command"));
main_pipe_cmd_call_function(&cmd);
break;
default:
OPAL_OUTPUT((-1, "fd main thread: unknown pipe command: %d",
cmd.pc_cmd));
break;
}
}
/******************************************************************
* Main interface calls
******************************************************************/
/*
* Initialize
* Called by main thread
*/
int opal_btl_openib_fd_init(void)
{
if (!initialized) {
cmd_t bogus;
OBJ_CONSTRUCT(&registered_items, opal_list_t);
/* Calculate the real size of the cmd struct */
cmd_size = (int) (&(bogus.end) - ((char*) &bogus));
OBJ_CONSTRUCT(&pending_to_main_thread, opal_list_t);
/* Create pipes to communicate between the two threads */
if (0 != pipe(pipe_to_service_thread)) {
return OPAL_ERR_IN_ERRNO;
}
if (0 != pipe(pipe_to_main_thread)) {
return OPAL_ERR_IN_ERRNO;
}
/* Create a libevent event that is used in the main thread
to watch its pipe */
opal_event_set(opal_sync_event_base, &main_thread_event, pipe_to_main_thread[0],
OPAL_EV_READ | OPAL_EV_PERSIST,
main_thread_event_callback, NULL);
opal_event_add(&main_thread_event, 0);
/* Start the service thread */
if (0 != pthread_create(&thread, NULL, service_thread_start,
NULL)) {
int errno_save = errno;
opal_event_del(&main_thread_event);
close(pipe_to_service_thread[0]);
close(pipe_to_service_thread[1]);
close(pipe_to_main_thread[0]);
close(pipe_to_main_thread[1]);
errno = errno_save;
return OPAL_ERR_IN_ERRNO;
}
initialized = true;
}
return OPAL_SUCCESS;
}
/*
* Start monitoring an fd
* Called by main or service thread; callback will be in service thread
*/
int opal_btl_openib_fd_monitor(int fd, int flags,
opal_btl_openib_fd_event_callback_fn_t *callback,
void *context)
{
cmd_t cmd;
/* Sanity check */
if (fd < 0 || 0 == flags || NULL == callback) {
return OPAL_ERR_BAD_PARAM;
}
cmd.pc_cmd = CMD_ADD_FD;
cmd.pc_fd = fd;
cmd.pc_flags = flags;
cmd.pc_fn.event = callback;
cmd.pc_context = context;
/* For the threaded version, write a command down the pipe */
OPAL_OUTPUT((-1, "main thread sending monitor fd %d", fd));
opal_fd_write(pipe_to_service_thread[1], cmd_size, &cmd);
return OPAL_SUCCESS;
}
/*
* Stop monitoring an fd
* Called by main or service thread; callback will be in service thread
*/
int opal_btl_openib_fd_unmonitor(int fd,
opal_btl_openib_fd_event_callback_fn_t *callback,
void *context)
{
cmd_t cmd;
/* Sanity check */
if (fd < 0) {
return OPAL_ERR_BAD_PARAM;
}
cmd.pc_cmd = CMD_REMOVE_FD;
cmd.pc_fd = fd;
cmd.pc_flags = 0;
cmd.pc_fn.event = callback;
cmd.pc_context = context;
/* For the threaded version, write a command down the pipe */
OPAL_OUTPUT((-1, "main thread sending unmonitor fd %d", fd));
opal_fd_write(pipe_to_service_thread[1], cmd_size, &cmd);
return OPAL_SUCCESS;
}
/*
* Run in the service thread
* Called by main thread; callback will be in service thread
*/
int opal_btl_openib_fd_run_in_service(opal_btl_openib_fd_main_callback_fn_t *callback,
void *context)
{
cmd_t cmd;
cmd.pc_cmd = CMD_CALL_FUNCTION;
cmd.pc_fd = -1;
cmd.pc_flags = 0;
cmd.pc_fn.main = callback;
cmd.pc_context = context;
/* For the threaded version, write a command down the pipe */
OPAL_OUTPUT((-1, "main thread sending 'run in service'"));
opal_fd_write(pipe_to_service_thread[1], cmd_size, &cmd);
return OPAL_SUCCESS;
}
/*
* Run a function in the main thread
* Called by service thread
*/
int opal_btl_openib_fd_run_in_main(opal_btl_openib_fd_main_callback_fn_t *callback,
void *context)
{
cmd_t cmd;
OPAL_OUTPUT((-1, "run in main -- sending command"));
/* For the threaded version, write a command down the pipe */
cmd.pc_cmd = CMD_CALL_FUNCTION;
cmd.pc_fd = -1;
cmd.pc_flags = 0;
cmd.pc_fn.main = callback;
cmd.pc_context = context;
write_to_main_thread(&cmd);
return OPAL_SUCCESS;
}
int
opal_btl_openib_fd_main_thread_drain(void)
{
int nfds, ret;
fd_set rfds;
struct timeval tv;
while (1) {
FD_ZERO(&rfds);
FD_SET(pipe_to_main_thread[0], &rfds);
nfds = pipe_to_main_thread[0] + 1;
tv.tv_sec = 0;
tv.tv_usec = 0;
ret = select(nfds, &rfds, NULL, NULL, &tv);
if (ret > 0) {
main_thread_event_callback(pipe_to_main_thread[0], 0, NULL);
return 0;
} else {
return ret;
}
}
}
/*
* Finalize
* Called by main thread
*/
int opal_btl_openib_fd_finalize(void)
{
if (initialized) {
/* For the threaded version, send a command down the pipe */
cmd_t cmd;
OPAL_OUTPUT((-1, "shutting down openib fd"));
/* Check if the thread exists before asking it to quit */
if (ESRCH != pthread_kill(thread, 0)) {
memset(&cmd, 0, cmd_size);
cmd.pc_cmd = CMD_TIME_TO_QUIT;
if (OPAL_SUCCESS != opal_fd_write(pipe_to_service_thread[1],
cmd_size, &cmd)) {
/* We cancel the thread if there's an error
* sending the "quit" cmd. This only ever happens on
* a "restart" which could result in dangling
* fds. OMPI must not rely on the checkpointer to
* save/restore any fds or connections
*/
pthread_cancel(thread);
}
pthread_join(thread, NULL);
opal_atomic_rmb();
}
opal_event_del(&main_thread_event);
close(pipe_to_service_thread[0]);
close(pipe_to_service_thread[1]);
close(pipe_to_main_thread[0]);
close(pipe_to_main_thread[1]);
OBJ_DESTRUCT(&pending_to_main_thread);
OBJ_DESTRUCT(&registered_items);
}
initialized = false;
return OPAL_SUCCESS;
}

Просмотреть файл

@ -1,81 +0,0 @@
/*
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2009 Sandia National Laboratories. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OPAL_BTL_OPENIB_FD_H_
#define OPAL_BTL_OPENIB_FD_H_
#include "opal_config.h"
BEGIN_C_DECLS
/**
* Typedef for fd callback function
*/
typedef void *(opal_btl_openib_fd_event_callback_fn_t)(int fd, int flags,
void *context);
/**
* Typedef for generic callback function
*/
typedef void *(opal_btl_openib_fd_main_callback_fn_t)(void *context);
/**
* Initialize fd monitoring.
* Called by the main thread.
*/
int opal_btl_openib_fd_init(void);
/**
* Start monitoring an fd.
* Called by main or service thread; callback will be in service thread.
*/
int opal_btl_openib_fd_monitor(int fd, int flags,
opal_btl_openib_fd_event_callback_fn_t *callback,
void *context);
/**
* Stop monitoring an fd.
* Called by main or service thread; callback will be in service thread.
*/
int opal_btl_openib_fd_unmonitor(int fd,
opal_btl_openib_fd_event_callback_fn_t *callback,
void *context);
/**
* Run a function in the service thread.
* Called by the main thread.
*/
int opal_btl_openib_fd_run_in_service(opal_btl_openib_fd_main_callback_fn_t callback,
void *context);
/**
* Run a function in the main thread.
* Called by the service thread.
*/
int opal_btl_openib_fd_run_in_main(opal_btl_openib_fd_main_callback_fn_t callback,
void *context);
/**
* Drain all pending messages from the main thread's pipe.
* Likely only useful during finalize, when the event library
* won't fire callbacks.
*/
int opal_btl_openib_fd_main_thread_drain(void);
/**
* Finalize fd monitoring.
* Called by the main thread.
*/
int opal_btl_openib_fd_finalize(void);
END_C_DECLS
#endif /* OPAL_BTL_OPENIB_FD_H_ */

Просмотреть файл

@ -1,10 +1,11 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2007-2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2007-2008 Chelsio, Inc. All rights reserved.
* Copyright (c) 2008 Mellanox Technologies. All rights reserved.
* Copyright (c) 2009 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2010 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights
* Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
* Copyright (c) 2014 The University of Tennessee and The University
@ -54,8 +55,8 @@
#include "opal/util/error.h"
#include "opal/util/show_help.h"
#include "opal/util/proc.h"
#include "opal/runtime/opal_progress_threads.h"
#include "btl_openib_fd.h"
#include "btl_openib_proc.h"
#include "btl_openib_endpoint.h"
#include "connect/connect.h"
@ -211,8 +212,12 @@ static uint32_t rdmacm_addr = 0;
static int rdmacm_resolve_timeout = 30000;
static int rdmacm_resolve_max_retry_count = 20;
static bool rdmacm_reject_causes_connect_error = false;
static pthread_cond_t rdmacm_disconnect_cond;
static pthread_mutex_t rdmacm_disconnect_lock;
static volatile int disconnect_callbacks = 0;
static bool rdmacm_component_initialized = false;
static opal_event_base_t *rdmacm_event_base = NULL;
static opal_event_t rdmacm_event;
/* Calculate the *real* length of the message (not aligned/rounded
up) */
@ -1011,7 +1016,7 @@ static int handle_connect_request(struct rdma_cm_event *event)
((struct sockaddr_in *)peeraddr)->sin_addr.s_addr;
c->peer_tcp_port = rdma_get_dst_port(event->id);
}
opal_btl_openib_fd_run_in_main(show_help_cant_find_endpoint, c);
show_help_cant_find_endpoint (c);
#else
BTL_ERROR(("Cannot find endpoint."));
#endif
@ -1160,19 +1165,6 @@ out:
return OPAL_ERROR;
}
/*
* Invoked by service thread
*/
static void *rdmacm_unmonitor(int fd, int flags, void *context)
{
volatile int *barrier = (volatile int *) context;
OPAL_OUTPUT((-1, "SERVICE rdmacm unlocking main thread"));
*barrier = 1;
return NULL;
}
/*
* Runs in service thread
*
@ -1181,24 +1173,36 @@ static void *rdmacm_unmonitor(int fd, int flags, void *context)
* in the service thread while rdma_disconnect() is still running in
* the main thread (which causes all manner of Bad Things to occur).
*/
static void *call_disconnect_callback(void *v)
static void *call_disconnect_callback(int fd, int flags, void *v)
{
rdmacm_contents_t *contents = (rdmacm_contents_t *) v;
void *tmp = NULL;
id_context_t *context = (id_context_t*) v;
OPAL_OUTPUT((-1, "SERVICE Service thread calling disconnect on ID %p",
(void*) context->id));
opal_list_item_t *item;
if (!context->already_disconnected) {
tmp = context->id;
rdma_disconnect(context->id);
context->already_disconnected = true;
pthread_mutex_lock (&rdmacm_disconnect_lock);
while (NULL != (item = opal_list_remove_first(&contents->ids))) {
context = (id_context_t *) item;
OPAL_OUTPUT((-1, "RDMACM Event thread calling disconnect on ID %p",
(void*) context->id));
if (!context->already_disconnected) {
tmp = context->id;
rdma_disconnect(context->id);
context->already_disconnected = true;
}
OBJ_RELEASE(context);
OPAL_OUTPUT((-1, "RDMACM Event thread disconnect on ID %p done",
(void*) tmp));
}
OBJ_RELEASE(context);
/* Tell the main thread that we're done */
(void)opal_atomic_add(&disconnect_callbacks, 1);
OPAL_OUTPUT((-1, "SERVICE Service thread disconnect on ID %p done; count=%d",
(void*) tmp, disconnect_callbacks));
pthread_cond_signal(&rdmacm_disconnect_cond);
pthread_mutex_unlock(&rdmacm_disconnect_lock);
return NULL;
}
@ -1212,8 +1216,8 @@ static void *call_disconnect_callback(void *v)
*/
static int rdmacm_endpoint_finalize(struct mca_btl_base_endpoint_t *endpoint)
{
int num_to_wait_for;
opal_list_item_t *item, *item2;
rdmacm_contents_t *contents;
opal_event_t event;
BTL_VERBOSE(("Start disconnecting..."));
OPAL_OUTPUT((-1, "MAIN Endpoint finalizing"));
@ -1232,35 +1236,28 @@ static int rdmacm_endpoint_finalize(struct mca_btl_base_endpoint_t *endpoint)
* main thread and service thread.
*/
opal_mutex_lock(&client_list_lock);
num_to_wait_for = disconnect_callbacks = 0;
for (item = opal_list_get_first(&client_list);
item != opal_list_get_end(&client_list);
item = opal_list_get_next(item)) {
rdmacm_contents_t *contents = (rdmacm_contents_t *) item;
OPAL_LIST_FOREACH(contents, &client_list, rdmacm_contents_t) {
if (endpoint == contents->endpoint) {
while (NULL !=
(item2 = opal_list_remove_first(&(contents->ids)))) {
/* Fun race condition: we cannot call
rdma_disconnect() here in the main thread, because
if we do, there is a nonzero chance that the
DISCONNECT event will be delivered and get executed
in the service thread immediately. If this all
happens before rdma_disconnect() returns, all
manner of Bad Things can/will occur. So just
invoke rdma_disconnect() in the service thread
where we guarantee that we won't be processing an
event when it is called. */
OPAL_OUTPUT((-1, "MAIN Main thread calling disconnect on ID %p",
(void*) ((id_context_t*) item2)->id));
++num_to_wait_for;
opal_btl_openib_fd_run_in_service(call_disconnect_callback,
item2);
}
opal_list_remove_item(&client_list, (opal_list_item_t *) contents);
contents->on_client_list = false;
/* Fun race condition: we cannot call
rdma_disconnect() in this thread, because
if we do, there is a nonzero chance that the
DISCONNECT event will be delivered and get executed
in the rdcm event thread immediately. If this all
happens before rdma_disconnect() returns, all
manner of Bad Things can/will occur. So just
invoke rdma_disconnect() in the rdmacm event thread
where we guarantee that we won't be processing an
event when it is called. */
opal_event_set (rdmacm_event_base, &event, -1, OPAL_EV_READ,
call_disconnect_callback, contents);
opal_event_active (&event, OPAL_EV_READ, 1);
/* remove_item returns the item before the item removed,
meaning that the for list is still safe */
item = opal_list_remove_item(&client_list, item);
contents->on_client_list = false;
break;
}
}
@ -1270,10 +1267,11 @@ static int rdmacm_endpoint_finalize(struct mca_btl_base_endpoint_t *endpoint)
opal_mutex_unlock(&client_list_lock);
/* Now wait for all the disconnect callbacks to occur */
while (num_to_wait_for != disconnect_callbacks) {
opal_btl_openib_fd_main_thread_drain();
sched_yield();
pthread_mutex_lock(&rdmacm_disconnect_lock);
while (opal_list_get_size (&contents->ids)) {
pthread_cond_wait (&rdmacm_disconnect_cond, &rdmacm_disconnect_lock);
}
pthread_mutex_unlock(&rdmacm_disconnect_lock);
OPAL_OUTPUT((-1, "MAIN Endpoint finished finalizing"));
return OPAL_SUCCESS;
@ -1355,7 +1353,7 @@ static int rdmacm_connect_endpoint(id_context_t *context,
/* Ensure that all the writes back to the endpoint and associated
data structures have completed */
opal_atomic_wmb();
opal_btl_openib_fd_run_in_main(local_endpoint_cpc_complete, endpoint);
mca_btl_openib_run_in_main (local_endpoint_cpc_complete, endpoint);
return OPAL_SUCCESS;
}
@ -1668,9 +1666,8 @@ out:
/*
* Runs in main thread
*/
static void *show_help_rdmacm_event_error(void *c)
static void *show_help_rdmacm_event_error (struct rdma_cm_event *event)
{
struct rdma_cm_event *event = (struct rdma_cm_event*) c;
id_context_t *context = (id_context_t*) event->id->context;
if (RDMA_CM_EVENT_DEVICE_REMOVAL == event->event) {
@ -1802,7 +1799,7 @@ static int event_handler(struct rdma_cm_event *event)
case RDMA_CM_EVENT_CONNECT_RESPONSE:
case RDMA_CM_EVENT_ADDR_ERROR:
case RDMA_CM_EVENT_DEVICE_REMOVAL:
opal_btl_openib_fd_run_in_main(show_help_rdmacm_event_error, event);
show_help_rdmacm_event_error (event);
rc = OPAL_ERROR;
break;
@ -1817,7 +1814,7 @@ static int event_handler(struct rdma_cm_event *event)
rc = resolve_route(context);
break;
}
opal_btl_openib_fd_run_in_main(show_help_rdmacm_event_error, event);
show_help_rdmacm_event_error (event);
rc = OPAL_ERROR;
break;
@ -1833,7 +1830,7 @@ static int event_handler(struct rdma_cm_event *event)
}
/*
* Runs in service thread
* Runs in event thread
*/
static inline void rdmamcm_event_error(struct rdma_cm_event *event)
{
@ -1843,12 +1840,12 @@ static inline void rdmamcm_event_error(struct rdma_cm_event *event)
endpoint = ((id_context_t *)event->id->context)->contents->endpoint;
}
opal_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
endpoint);
mca_btl_openib_run_in_main (mca_btl_openib_endpoint_invoke_error,
endpoint);
}
/*
* Runs in service thread
* Runs in event thread
*/
static void *rdmacm_event_dispatch(int fd, int flags, void *context)
{
@ -2051,6 +2048,11 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, opal_btl_
rc = OPAL_ERR_NOT_SUPPORTED;
goto out;
}
if (!BTL_OPENIB_QP_TYPE_PP(0)) {
BTL_VERBOSE(("rdmacm CPC only supported when the first QP is a PP QP; skipped"));
rc = OPAL_ERR_NOT_SUPPORTED;
goto out;
}
BTL_VERBOSE(("rdmacm_component_query"));
@ -2072,6 +2074,7 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, opal_btl_
selected if QP 0 is PP */
(*cpc)->cbm_uses_cts = true;
/* Start monitoring the fd associated with the cm_device */
server = OBJ_NEW(rdmacm_contents_t);
if (NULL == server) {
rc = OPAL_ERR_OUT_OF_RESOURCE;
@ -2220,9 +2223,7 @@ out:
*/
static int rdmacm_component_finalize(void)
{
volatile int barrier = 0;
opal_list_item_t *item, *item2;
int rc;
BTL_VERBOSE(("rdmacm_component_finalize"));
@ -2232,36 +2233,20 @@ static int rdmacm_component_finalize(void)
return OPAL_SUCCESS;
}
if (NULL != event_channel) {
rc = opal_btl_openib_fd_unmonitor(event_channel->fd,
rdmacm_unmonitor, (void*) &barrier);
if (OPAL_SUCCESS != rc) {
BTL_ERROR(("Error disabling fd monitor"));
}
/* Wait for the service thread to stop monitoring the fd */
OPAL_OUTPUT((-1, "MAIN rdmacm_component_finalize: waiting for thread to finish"));
while (0 == barrier) {
sched_yield();
}
OPAL_OUTPUT((-1, "MAIN rdmacm_component_finalize: thread finished"));
if (rdmacm_event_base) {
opal_event_del (&rdmacm_event);
opal_progress_thread_finalize (NULL);
rdmacm_event_base = NULL;
}
/* The service thread is no longer running; no need to lock access
/* The event thread is no longer running; no need to lock access
to the client_list */
for (item = opal_list_remove_first(&client_list);
NULL != item;
item = opal_list_remove_first(&client_list)) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&client_list);
OPAL_LIST_DESTRUCT(&client_list);
/* For each of the items in the server list, there's only one item
in the "ids" list -- the server listener. So explicitly
destroy its RDMA ID context. */
for (item = opal_list_remove_first(&server_listener_list);
NULL != item;
item = opal_list_remove_first(&server_listener_list)) {
while (NULL != (item = opal_list_remove_first(&server_listener_list))) {
rdmacm_contents_t *contents = (rdmacm_contents_t*) item;
item2 = opal_list_remove_first(&(contents->ids));
OBJ_RELEASE(item2);
@ -2277,6 +2262,9 @@ static int rdmacm_component_finalize(void)
mca_btl_openib_free_rdma_addr_list();
pthread_cond_destroy (&rdmacm_disconnect_cond);
pthread_mutex_destroy (&rdmacm_disconnect_lock);
return OPAL_SUCCESS;
}
@ -2326,10 +2314,22 @@ static int rdmacm_component_init(void)
return OPAL_ERR_UNREACH;
}
/* Start monitoring the fd associated with the cm_device */
opal_btl_openib_fd_monitor(event_channel->fd, OPAL_EV_READ,
rdmacm_event_dispatch, NULL);
rdmacm_event_base = opal_progress_thread_init (NULL);
if (NULL == rdmacm_event_base) {
opal_output_verbose (5, opal_btl_base_framework.framework_output,
"openib BTL: could not create rdmacm event thread");
return OPAL_ERR_UNREACH;
}
opal_event_set (rdmacm_event_base, &rdmacm_event, event_channel->fd,
OPAL_EV_READ | OPAL_EV_PERSIST, rdmacm_event_dispatch, NULL);
opal_event_add (&rdmacm_event, 0);
pthread_cond_init (&rdmacm_disconnect_cond, NULL);
pthread_mutex_init (&rdmacm_disconnect_lock, NULL);
rdmacm_component_initialized = true;
return OPAL_SUCCESS;
}

Просмотреть файл

@ -66,10 +66,10 @@
#include "opal/util/error.h"
#include "opal/util/alfg.h"
#include "opal_stdint.h"
#include "opal/class/opal_fifo.h"
#include "btl_openib_endpoint.h"
#include "btl_openib_proc.h"
#include "btl_openib_fd.h"
#include "btl_openib_async.h"
#include "connect/connect.h"
@ -149,9 +149,7 @@ typedef struct udcm_module {
opal_mutex_t cm_send_lock;
/* Receive queue */
opal_mutex_t cm_recv_msg_queue_lock;
opal_list_t cm_recv_msg_queue;
bool cm_message_event_active;
opal_fifo_t cm_recv_msg_fifo;
/* The associated BTL */
struct mca_btl_openib_module_t *btl;
@ -159,8 +157,20 @@ typedef struct udcm_module {
/* This module's modex message */
modex_msg_t modex;
/** The channel is being monitored */
bool channel_monitored;
/* channel monitoring */
/** channel event base */
opal_event_base_t *channel_evbase;
/** channel monitoring event */
opal_event_t channel_event;
/* message processing */
/** mesage event is active */
int32_t cm_message_event_active;
/** message event */
opal_event_t cm_message_event;
} udcm_module_t;
/*
@ -303,7 +313,7 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl,
opal_btl_openib_connect_base_module_t *cpc);
static void *udcm_cq_event_dispatch(int fd, int flags, void *context);
static void *udcm_message_callback (void *context);
static void *udcm_message_callback (int fd, int flags, void *context);
static void udcm_set_message_timeout (udcm_message_sent_t *message);
static void udcm_cancel_message_timeout (udcm_message_sent_t *message);
@ -660,8 +670,7 @@ static int udcm_module_init (udcm_module_t *m, mca_btl_openib_module_t *btl)
OBJ_CONSTRUCT(&m->cm_lock, opal_mutex_t);
OBJ_CONSTRUCT(&m->cm_send_lock, opal_mutex_t);
OBJ_CONSTRUCT(&m->cm_recv_msg_queue, opal_list_t);
OBJ_CONSTRUCT(&m->cm_recv_msg_queue_lock, opal_mutex_t);
OBJ_CONSTRUCT(&m->cm_recv_msg_fifo, opal_fifo_t);
OBJ_CONSTRUCT(&m->flying_messages, opal_list_t);
OBJ_CONSTRUCT(&m->cm_timeout_lock, opal_mutex_t);
@ -733,15 +742,23 @@ static int udcm_module_init (udcm_module_t *m, mca_btl_openib_module_t *btl)
m->cm_exiting = false;
/* Monitor the fd associated with the completion channel */
opal_btl_openib_fd_monitor(m->cm_channel->fd, OPAL_EV_READ,
udcm_cq_event_dispatch, m);
m->channel_monitored = true;
m->channel_evbase = opal_progress_thread_init (NULL);
opal_event_set (m->channel_evbase, &m->channel_event,
m->cm_channel->fd, OPAL_EV_READ | OPAL_EV_PERSIST,
udcm_cq_event_dispatch, m);
opal_event_add (&m->channel_event, 0);
udcm_timeout_tv.tv_sec = udcm_timeout / 1000000;
udcm_timeout_tv.tv_usec = udcm_timeout - 1000000 *
udcm_timeout_tv.tv_sec;
m->cm_message_event_active = false;
m->cm_message_event_active = 0;
/* set up the message event */
opal_event_set (opal_sync_event_base, &m->cm_message_event, -1,
OPAL_EV_READ, udcm_message_callback, m);
/* Finally, request CQ notification */
if (0 != ibv_req_notify_cq (m->cm_recv_cq, 0)) {
@ -804,21 +821,11 @@ udcm_module_start_connect(opal_btl_openib_connect_base_module_t *cpc,
return rc;
}
static void *udcm_unmonitor(int fd, int flags, void *context)
{
volatile int *barrier = (volatile int *)context;
*barrier = 1;
return NULL;
}
static int udcm_module_finalize(mca_btl_openib_module_t *btl,
opal_btl_openib_connect_base_module_t *cpc)
{
udcm_module_t *m = (udcm_module_t *) cpc;
opal_list_item_t *item;
volatile int barrier = 0;
if (NULL == m) {
return OPAL_SUCCESS;
@ -826,27 +833,19 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl,
m->cm_exiting = true;
if (m->channel_monitored) {
/* stop monitoring the channel's fd before destroying the listen qp */
opal_btl_openib_fd_unmonitor(m->cm_channel->fd, udcm_unmonitor, (void *)&barrier);
while (0 == barrier) {
sched_yield();
}
if (m->channel_evbase) {
opal_event_del (&m->channel_event);
opal_progress_thread_finalize (NULL);
}
opal_mutex_lock (&m->cm_lock);
opal_mutex_lock (&m->cm_recv_msg_queue_lock);
/* clear message queue */
while ((item = opal_list_remove_first(&m->cm_recv_msg_queue))) {
while (NULL != (item = opal_fifo_pop_atomic (&m->cm_recv_msg_fifo))) {
OBJ_RELEASE(item);
}
opal_mutex_unlock (&m->cm_recv_msg_queue_lock);
OBJ_DESTRUCT(&m->cm_recv_msg_queue);
OBJ_DESTRUCT(&m->cm_recv_msg_fifo);
opal_mutex_lock (&m->cm_timeout_lock);
while ((item = opal_list_remove_first(&m->flying_messages))) {
@ -890,7 +889,6 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl,
opal_mutex_unlock (&m->cm_lock);
OBJ_DESTRUCT(&m->cm_send_lock);
OBJ_DESTRUCT(&m->cm_lock);
OBJ_DESTRUCT(&m->cm_recv_msg_queue_lock);
OBJ_DESTRUCT(&m->cm_timeout_lock);
return OPAL_SUCCESS;
@ -979,24 +977,7 @@ static void udcm_module_destroy_listen_qp (udcm_module_t *m)
return;
}
if (mca_btl_openib_component.use_async_event_thread &&
-1 != mca_btl_openib_component.async_pipe[1]) {
/* Tell the openib async thread to ignore ERR state on the QP
we are about to manually set the ERR state on */
mca_btl_openib_async_cmd_t async_command;
async_command.a_cmd = OPENIB_ASYNC_IGNORE_QP_ERR;
async_command.qp = m->listen_qp;
if (write(mca_btl_openib_component.async_pipe[1],
&async_command, sizeof(mca_btl_openib_async_cmd_t))<0){
BTL_ERROR(("Failed to write to pipe [%d]",errno));
return;
}
/* wait for ok from thread */
if (OPAL_SUCCESS !=
btl_openib_async_command_done(OPENIB_ASYNC_IGNORE_QP_ERR)) {
BTL_ERROR(("Command to openib async thread to ignore QP ERR state failed"));
}
}
mca_btl_openib_async_add_qp_ignore (m->listen_qp);
do {
/* Move listen QP into the ERR state to cancel all outstanding
@ -2078,9 +2059,7 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m)
/* Copy just the message header */
memcpy (&item->msg_hdr, &message->hdr, sizeof (message->hdr));
opal_mutex_lock(&m->cm_recv_msg_queue_lock);
opal_list_append (&m->cm_recv_msg_queue, &item->super);
opal_mutex_unlock(&m->cm_recv_msg_queue_lock);
opal_fifo_push_atomic (&m->cm_recv_msg_fifo, &item->super);
udcm_send_ack (lcl_ep, message->hdr.rem_ctx);
@ -2088,13 +2067,9 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m)
udcm_module_post_one_recv (m, msg_num);
}
opal_mutex_lock (&m->cm_recv_msg_queue_lock);
if (opal_list_get_size (&m->cm_recv_msg_queue) &&
!m->cm_message_event_active) {
m->cm_message_event_active = true;
opal_btl_openib_fd_run_in_main (udcm_message_callback, (void *) m);
if (0 == opal_atomic_swap_32 (&m->cm_message_event_active, 1)) {
opal_event_active (&m->cm_message_event, OPAL_EV_READ, 1);
}
opal_mutex_unlock (&m->cm_recv_msg_queue_lock);
return count;
}
@ -2142,18 +2117,15 @@ static void *udcm_cq_event_dispatch(int fd, int flags, void *context)
return NULL;
}
static void *udcm_message_callback (void *context)
static void *udcm_message_callback (int fd, int flags, void *context)
{
udcm_module_t *m = (udcm_module_t *) context;
udcm_message_recv_t *item;
BTL_VERBOSE(("running message thread"));
opal_mutex_lock(&m->cm_recv_msg_queue_lock);
while ((item = (udcm_message_recv_t *)
opal_list_remove_first (&m->cm_recv_msg_queue))) {
while ((item = (udcm_message_recv_t *) opal_fifo_pop_atomic (&m->cm_recv_msg_fifo))) {
mca_btl_openib_endpoint_t *lcl_ep = item->msg_hdr.lcl_ep;
opal_mutex_unlock(&m->cm_recv_msg_queue_lock);
OPAL_THREAD_LOCK(&lcl_ep->endpoint_lock);
@ -2189,14 +2161,11 @@ static void *udcm_message_callback (void *context)
}
OBJ_RELEASE (item);
opal_mutex_lock(&m->cm_recv_msg_queue_lock);
}
BTL_VERBOSE(("exiting message thread"));
m->cm_message_event_active = false;
opal_mutex_unlock(&m->cm_recv_msg_queue_lock);
opal_atomic_swap_32 (&m->cm_message_event_active, 0);
return NULL;
}
@ -2262,9 +2231,9 @@ static void udcm_send_timeout (evutil_socket_t fd, short event, void *arg)
UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_qp_num);
/* We are running in the timeout thread. Invoke the error in the
main thread */
opal_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
lcl_ep);
* "main thread" because it may call up into the pml or another
* component that may not have threading support enabled. */
mca_btl_openib_run_in_main (mca_btl_openib_endpoint_invoke_error, lcl_ep);
break;
}
@ -2274,8 +2243,7 @@ static void udcm_send_timeout (evutil_socket_t fd, short event, void *arg)
if (0 != udcm_post_send (lcl_ep, msg->data, msg->length, 0)) {
BTL_VERBOSE(("error reposting message"));
opal_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
lcl_ep);
mca_btl_openib_run_in_main (mca_btl_openib_endpoint_invoke_error, lcl_ep);
break;
}
} while (0);