1
1
Commit from a long-standing Mercurial tree that ended up incorporating a lot of things:

 * A few fixes for CPC interface changes in all the CPCs
 * Attempts (but not yet finished) to fix shutdown problems in the IB CM CPC
 * #1319: add CTS support (i.e., initiator guarantees to send first message; automatically activated for iWARP over the RDMA CM CPC)
   * Some variable and function renamings to make this be generic (e.g., alloc_credit_frag became alloc_control_frag)
   * CPCs no longer post receive buffers; they only post a single receive buffer for the CTS if they use CTS. Instead, the main BTL now posts the main sets of receive buffers. 
   * CPCs allocate a CTS buffer only if they're about to make a connection
 * RDMA CM improvements:
   * Use threaded mode openib fd monitoring to wait for for RDMA CM events
   * Synchronize endpoint finalization and disconnection between main thread and service thread to avoid/fix some race conditions
   * Converted several structs to be OBJs so that we can use reference counting to know when to invoke destructors
   * Make some new OBJ's have opal_list_item_t's as their base, thereby eliminating the need for the local list_item_t type
   * Renamed many variables to be internally consistent
   * Centralize the decision in an inline function as to whether this process or the remote process is supposed to be the initiator
   * Add oodles of OPAL_OUTPUT statements for debugging (hard-wired to output stream -1; to be activated by developers if they want/need them) 
   * Use rdma_create_qp() instead of ibv_create_qp()
 * openib fd monitoring improvements:
   * Renamed a bunch of functions and variables to be a little more obvious as to their true function
   * Use pipes to communicate between main thread and service thread
   * Add ability for main thread to invoke a function back on the service thread 
   * Ensure to set initiator_depth and responder_resources properly, but putting max_qp_rd_ataom and ma_qp_init_rd_atom in the modex (see rdma_connect(3))
   * Ensure to set the source IP address in rdma_resolve() to ensure that we select the correct OpenFabrics source port
   * Make new MCA param: openib_btl_connect_rdmacm_resolve_timeout
 * Other improvements:
   * btl_openib_device_type MCA param: can be "iw" or "ib" or "all" (or "infiniband" or "iwarp")
   * Somewhat improved error handling
   * Bunches of spelling fixes in comments, VERBOSE, and OUTPUT statements
   * Oodles of little coding style fixes
   * Changed shutdown ordering of btl; the device is now an OBJ with ref counting for destruction
   * Added some more show_help error messages
   * Change configury to only build IBCM / RDMACM if we have threads (because we need a progress thread) 

This commit was SVN r19686.

The following Trac tickets were found above:
  Ticket 1210 --> https://svn.open-mpi.org/trac/ompi/ticket/1210
Этот коммит содержится в:
Jeff Squyres 2008-10-06 00:46:02 +00:00
родитель 4602b0e94a
Коммит c42ab8ea37
22 изменённых файлов: 2391 добавлений и 967 удалений

Просмотреть файл

@ -162,12 +162,12 @@ else
************************************************************************
Open MPI was unable to find threading support on your system. In the
near future, the OMPI development team is considering requiring
threading support for proper OMPI execution. This is in part because
we are not aware of any users that do not have thread support - so we
need you to e-mail us at ompi@ompi-mpi.org and let us know about this
problem.
Open MPI was unable to find threading support on your system. The
OMPI development team is considering requiring threading support for
proper OMPI execution. This is in part because we are not aware of
any OpenFabrics users that do not have thread support -- so we need
you to e-mail the Open MPI Users mailing list to tell us if this is a
problem for you.
************************************************************************

Просмотреть файл

@ -933,31 +933,19 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
openib_btl = (mca_btl_openib_module_t*) btl;
/* Remove the btl from component list */
if ( mca_btl_openib_component.ib_num_btls > 1 ) {
for(i = 0; i < mca_btl_openib_component.ib_num_btls; i++){
if (mca_btl_openib_component.openib_btls[i] == openib_btl){
mca_btl_openib_component.openib_btls[i] =
mca_btl_openib_component.openib_btls[mca_btl_openib_component.ib_num_btls-1];
break;
}
}
}
mca_btl_openib_component.ib_num_btls--;
/* Release all QPs */
for(ep_index=0;
ep_index < opal_pointer_array_get_size(openib_btl->device->endpoints);
ep_index++) {
for (ep_index=0;
ep_index < opal_pointer_array_get_size(openib_btl->device->endpoints);
ep_index++) {
endpoint=opal_pointer_array_get_item(openib_btl->device->endpoints,
ep_index);
ep_index);
if(!endpoint) {
BTL_VERBOSE(("In finalize, got another null endpoint"));
continue;
}
if(endpoint->endpoint_btl != openib_btl)
if(endpoint->endpoint_btl != openib_btl) {
continue;
}
for(i = 0; i < openib_btl->device->eager_rdma_buffers_count; i++) {
if(openib_btl->device->eager_rdma_buffers[i] == endpoint) {
openib_btl->device->eager_rdma_buffers[i] = NULL;
@ -967,15 +955,6 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
OBJ_RELEASE(endpoint);
}
/* Finalize the CPC modules on this openib module */
for (i = 0; i < openib_btl->num_cpcs; ++i) {
if (NULL != openib_btl->cpcs[i]->cbm_finalize) {
openib_btl->cpcs[i]->cbm_finalize(openib_btl, openib_btl->cpcs[i]);
}
free(openib_btl->cpcs[i]);
}
free(openib_btl->cpcs);
/* Release SRQ resources */
for(qp = 0; qp < mca_btl_openib_component.num_qps; qp++) {
if(!BTL_OPENIB_QP_TYPE_PP(qp)) {
@ -992,11 +971,33 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
}
}
/* Finalize the CPC modules on this openib module */
for (i = 0; i < openib_btl->num_cpcs; ++i) {
if (NULL != openib_btl->cpcs[i]->cbm_finalize) {
openib_btl->cpcs[i]->cbm_finalize(openib_btl, openib_btl->cpcs[i]);
}
free(openib_btl->cpcs[i]);
}
free(openib_btl->cpcs);
/* Release device if there are no more users */
if(!(--openib_btl->device->btls)) {
if (!(--openib_btl->device->btls)) {
OBJ_RELEASE(openib_btl->device);
}
/* Remove the btl from component list */
if ( mca_btl_openib_component.ib_num_btls > 1 ) {
for(i = 0; i < mca_btl_openib_component.ib_num_btls; i++){
if (mca_btl_openib_component.openib_btls[i] == openib_btl){
mca_btl_openib_component.openib_btls[i] =
mca_btl_openib_component.openib_btls[mca_btl_openib_component.ib_num_btls-1];
break;
}
}
}
mca_btl_openib_component.ib_num_btls--;
OBJ_DESTRUCT(&openib_btl->ib_lock);
free(openib_btl);

Просмотреть файл

@ -117,6 +117,12 @@ typedef enum {
BTL_OPENIB_RQ_SOURCE_MAX
} btl_openib_receive_queues_source_t;
typedef enum {
BTL_OPENIB_DT_IB,
BTL_OPENIB_DT_IWARP,
BTL_OPENIB_DT_ALL
} btl_openib_device_type_t;
struct mca_btl_openib_component_t {
mca_btl_base_component_2_0_0_t super; /**< base BTL component */
@ -200,6 +206,7 @@ struct mca_btl_openib_component_t {
pthread_t async_thread; /**< Async thread that will handle fatal errors */
uint32_t use_async_event_thread; /**< Use the async event handler */
#endif
btl_openib_device_type_t device_type;
char *if_include;
char **if_include_list;
char *if_exclude;

Просмотреть файл

@ -173,8 +173,8 @@ static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *devices_p
devices_poll->poll_size+=devices_poll->poll_size;
async_pollfd_tmp = malloc(sizeof(struct pollfd) * devices_poll->poll_size);
if (NULL == async_pollfd_tmp) {
BTL_ERROR(("Failed malloc: %s:%d"
"Fatal error, stoping asyn event thread"
BTL_ERROR(("Failed malloc: %s:%d. "
"Fatal error, stoping asynch event thread"
, __FILE__, __LINE__));
return OMPI_ERROR;
}
@ -316,10 +316,10 @@ static int btl_openib_async_deviceh(struct mca_btl_openib_async_poll *devices_po
}
ibv_ack_async_event(&event);
} else {
/* the device == NULL , we failed to locate the devide
* this failure should not never happed */
BTL_ERROR(("Failed to find device with FD %d."
"Fatal error, stoping asyn event thread",
/* if (device == NULL), then failed to locate the device!
This should never happen... */
BTL_ERROR(("Failed to find device with FD %d. "
"Fatal error, stoping asynch event thread",
devices_poll->async_pollfd[index].fd));
return OMPI_ERROR;
}
@ -336,7 +336,7 @@ void* btl_openib_async_thread(void * async)
struct mca_btl_openib_async_poll devices_poll;
if (OMPI_SUCCESS != btl_openib_async_poll_init(&devices_poll)) {
BTL_ERROR(("Fatal error, stoping asyn event thread"));
BTL_ERROR(("Fatal error, stoping asynch event thread"));
pthread_exit(&return_status);
}
@ -344,7 +344,7 @@ void* btl_openib_async_thread(void * async)
rc = poll(devices_poll.async_pollfd, devices_poll.active_poll_size, -1);
if (rc < 0) {
if (errno != EINTR) {
BTL_ERROR(("Poll failed.Fatal error, stoping asyn event thread"));
BTL_ERROR(("Poll failed. Fatal error, stoping asynch event thread"));
pthread_exit(&return_status);
} else {
/* EINTR - we got interupt */
@ -362,16 +362,16 @@ void* btl_openib_async_thread(void * async)
/* 0 poll we use for comunication with main thread */
if (OMPI_SUCCESS != btl_openib_async_commandh(&devices_poll)) {
free(devices_poll.async_pollfd);
BTL_ERROR(("Failed to process async thread process."
"Fatal error, stoping asyn event thread"));
BTL_ERROR(("Failed to process async thread process. "
"Fatal error, stoping asynch event thread"));
pthread_exit(&return_status);
}
} else {
/* We get device event */
if (btl_openib_async_deviceh(&devices_poll, i)) {
free(devices_poll.async_pollfd);
BTL_ERROR(("Failed to process async thread process."
"Fatal error, stoping asyn event thread"));
BTL_ERROR(("Failed to process async thread process. "
"Fatal error, stoping asynch event thread"));
pthread_exit(&return_status);
}
}
@ -379,9 +379,9 @@ void* btl_openib_async_thread(void * async)
default:
/* Get event other than POLLIN
* this case should not never happend */
BTL_ERROR(("Got unexpected event %d."
"Fatal error, stoping asyn event thread"
,devices_poll.async_pollfd[i].revents));
BTL_ERROR(("Got unexpected event %d. "
"Fatal error, stoping asynch event thread",
devices_poll.async_pollfd[i].revents));
free(devices_poll.async_pollfd);
pthread_exit(&return_status);
}

Просмотреть файл

@ -440,6 +440,30 @@ static void btl_openib_control(mca_btl_base_module_t* btl,
(((unsigned char*)clsc_hdr) + skip);
}
break;
case MCA_BTL_OPENIB_CONTROL_CTS:
OPAL_OUTPUT((-1, "received CTS from %s (buffer %p): posted recvs %d, sent cts %d",
ep->endpoint_proc->proc_ompi->proc_hostname,
(void*) ctl_hdr,
ep->endpoint_posted_recvs, ep->endpoint_cts_sent));
ep->endpoint_cts_received = true;
/* Only send the CTS back and mark connected if:
- we have posted our receives (it's possible that we can
get this CTS before this side's CPC has called
cpc_complete())
- we have not yet sent our CTS
We don't even want to mark the endpoint connected() until
we have posted our receives because otherwise we will
trigger credit management (because the rd_credits will
still be negative), and Bad Things will happen. */
if (ep->endpoint_posted_recvs) {
if (!ep->endpoint_cts_sent) {
mca_btl_openib_endpoint_send_cts(ep);
}
mca_btl_openib_endpoint_connected(ep);
}
break;
default:
BTL_ERROR(("Unknown message type received by BTL"));
break;
@ -1158,6 +1182,14 @@ static bool inline is_credit_message(const mca_btl_openib_recv_frag_t *frag)
(MCA_BTL_OPENIB_CONTROL_CREDITS == chdr->type);
}
static bool inline is_cts_message(const mca_btl_openib_recv_frag_t *frag)
{
mca_btl_openib_control_header_t* chdr =
to_base_frag(frag)->segment.seg_addr.pval;
return (MCA_BTL_TAG_BTL == frag->hdr->tag) &&
(MCA_BTL_OPENIB_CONTROL_CTS == chdr->type);
}
static int32_t atoi_param(char *param, int32_t dflt)
{
if (NULL == param || '\0' == param[0]) {
@ -1746,6 +1778,14 @@ error:
if (device->ib_pd) {
ibv_dealloc_pd(device->ib_pd);
}
if (OMPI_SUCCESS != ret) {
orte_show_help("help-mpi-btl-openib.txt",
"error in device init", true,
orte_process_info.nodename,
ibv_get_device_name(device->ib_dev));
}
if (device->ib_dev_context) {
ibv_close_device(device->ib_dev_context);
}
@ -1952,6 +1992,7 @@ btl_openib_component_init(int *num_btl_modules,
struct dev_distance *dev_sorted;
int distance;
int index, value;
bool found;
mca_base_param_source_t source;
/* initialization */
@ -2147,27 +2188,61 @@ btl_openib_component_init(int *num_btl_modules,
mca_btl_openib_component.async_thread = 0;
#endif
distance = dev_sorted[0].distance;
for (i = 0; i < num_devs && (-1 == mca_btl_openib_component.ib_max_btls ||
for (found = false, i = 0;
i < num_devs && (-1 == mca_btl_openib_component.ib_max_btls ||
mca_btl_openib_component.ib_num_btls <
mca_btl_openib_component.ib_max_btls); i++) {
if (distance != dev_sorted[i].distance) {
break;
}
if (OMPI_SUCCESS !=
(ret = init_one_device(&btl_list, dev_sorted[i].ib_dev)))
/* Only take devices that match the type specified by
btl_openib_device_type */
switch (mca_btl_openib_component.device_type) {
case BTL_OPENIB_DT_IB:
#if defined(HAVE_STRUCT_IBV_DEVICE_TRANSPORT_TYPE)
if (IBV_TRANSPORT_IWARP == dev_sorted[i].ib_dev->transport_type) {
BTL_VERBOSE(("openib: only taking infiniband devices -- skipping %s",
ibv_get_device_name(dev_sorted[i].ib_dev)));
continue;
}
#endif
break;
}
if (OMPI_SUCCESS != ret) {
orte_show_help("help-mpi-btl-openib.txt",
"error in device init", true,
orte_process_info.nodename,
ibv_get_device_name(dev_sorted[i].ib_dev));
return NULL;
}
case BTL_OPENIB_DT_IWARP:
#if defined(HAVE_STRUCT_IBV_DEVICE_TRANSPORT_TYPE)
if (IBV_TRANSPORT_IB == dev_sorted[i].ib_dev->transport_type) {
BTL_VERBOSE(("openib: only taking iwarp devices -- skipping %s",
ibv_get_device_name(dev_sorted[i].ib_dev)));
continue;
}
#else
orte_show_help("help-mpi-btl-openib.txt", "no iwarp support",
true);
#endif
break;
case BTL_OPENIB_DT_ALL:
break;
}
found = true;
if (OMPI_SUCCESS !=
(ret = init_one_device(&btl_list, dev_sorted[i].ib_dev))) {
free(dev_sorted);
goto no_btls;
}
}
free(dev_sorted);
if (!found) {
orte_show_help("help-mpi-btl-openib.txt", "no devices right type",
true, orte_process_info.nodename,
((BTL_OPENIB_DT_IB == mca_btl_openib_component.device_type) ?
"InfiniBand" :
(BTL_OPENIB_DT_IWARP == mca_btl_openib_component.device_type) ?
"iWARP" : "<any>"));
goto no_btls;
}
/* If we got back from checking all the devices and find that
there are still items in the component.if_list, that means that
@ -2187,13 +2262,13 @@ btl_openib_component_init(int *num_btl_modules,
if(0 == mca_btl_openib_component.ib_num_btls) {
orte_show_help("help-mpi-btl-openib.txt",
"no active ports found", true, orte_process_info.nodename);
return NULL;
goto no_btls;
}
/* Setup the BSRQ QP's based on the final value of
mca_btl_openib_component.receive_queues. */
if (OMPI_SUCCESS != setup_qps()) {
return NULL;
goto no_btls;
}
/* For XRC:
@ -2235,7 +2310,7 @@ btl_openib_component_init(int *num_btl_modules,
"error in device init", true,
orte_process_info.nodename,
ibv_get_device_name(device->ib_dev));
return NULL;
goto no_btls;
}
}
}
@ -2246,14 +2321,14 @@ btl_openib_component_init(int *num_btl_modules,
mca_btl_openib_component.ib_num_btls);
if(NULL == mca_btl_openib_component.openib_btls) {
BTL_ERROR(("Failed malloc: %s:%d", __FILE__, __LINE__));
return NULL;
goto no_btls;
}
btls = (struct mca_btl_base_module_t **)
malloc(mca_btl_openib_component.ib_num_btls *
sizeof(struct mca_btl_base_module_t*));
if(NULL == btls) {
BTL_ERROR(("Failed malloc: %s:%d", __FILE__, __LINE__));
return NULL;
goto no_btls;
}
/* Copy the btl module structs into a contiguous array and fully
@ -2268,18 +2343,16 @@ btl_openib_component_init(int *num_btl_modules,
ret =
ompi_btl_openib_connect_base_select_for_local_port(openib_btl);
if (OMPI_SUCCESS != ret) {
orte_show_help("help-mpi-btl-openib.txt",
"failed load cpc", true,
orte_process_info.nodename,
ibv_get_device_name(openib_btl->device->ib_dev));
return NULL;
/* We already did a show_help in the lower layer */
goto no_btls;
}
mca_btl_openib_component.openib_btls[i] = openib_btl;
OBJ_RELEASE(ib_selected);
btls[i] = &openib_btl->super;
if(finish_btl_init(openib_btl) != OMPI_SUCCESS)
return NULL;
if (finish_btl_init(openib_btl) != OMPI_SUCCESS) {
goto no_btls;
}
}
btl_openib_modex_send();
@ -2454,17 +2527,29 @@ static int btl_openib_handle_incoming(mca_btl_openib_module_t *openib_btl,
}
OPAL_THREAD_UNLOCK(&erl->lock);
} else {
MCA_BTL_IB_FRAG_RETURN(frag);
if(BTL_OPENIB_QP_TYPE_PP(rqp)) {
if(OPAL_UNLIKELY(is_credit_msg))
OPAL_THREAD_ADD32(&ep->qps[cqp].u.pp_qp.cm_received, 1);
else
OPAL_THREAD_ADD32(&ep->qps[rqp].u.pp_qp.rd_posted, -1);
mca_btl_openib_endpoint_post_rr(ep, cqp);
if (is_cts_message(frag)) {
/* If this was a CTS, free it here (it was
malloc'ed+ibv_reg_mr'ed -- so it should *not* be
FRAG_RETURN'ed). */
int rc = ompi_btl_openib_connect_base_free_cts(ep);
if (OMPI_SUCCESS != rc) {
return rc;
}
} else {
mca_btl_openib_module_t *btl = ep->endpoint_btl;
OPAL_THREAD_ADD32(&btl->qps[rqp].u.srq_qp.rd_posted, -1);
mca_btl_openib_post_srr(btl, rqp);
/* Otherwise, FRAG_RETURN it and repost if necessary */
MCA_BTL_IB_FRAG_RETURN(frag);
if (BTL_OPENIB_QP_TYPE_PP(rqp)) {
if (OPAL_UNLIKELY(is_credit_msg)) {
OPAL_THREAD_ADD32(&ep->qps[cqp].u.pp_qp.cm_received, 1);
} else {
OPAL_THREAD_ADD32(&ep->qps[rqp].u.pp_qp.rd_posted, -1);
}
mca_btl_openib_endpoint_post_rr(ep, cqp);
} else {
mca_btl_openib_module_t *btl = ep->endpoint_btl;
OPAL_THREAD_ADD32(&btl->qps[rqp].u.srq_qp.rd_posted, -1);
mca_btl_openib_post_srr(btl, rqp);
}
}
}
@ -2631,17 +2716,21 @@ static void handle_wc(mca_btl_openib_device_t* device, const uint32_t cq,
if(endpoint)
openib_btl = endpoint->endpoint_btl;
if(wc->status != IBV_WC_SUCCESS)
if(wc->status != IBV_WC_SUCCESS) {
OPAL_OUTPUT((-1, "Got WC: ERROR"));
goto error;
}
/* Handle work completions */
switch(wc->opcode) {
case IBV_WC_RDMA_READ:
OPAL_OUTPUT((-1, "Got WC: RDMA_READ"));
OPAL_THREAD_ADD32(&endpoint->get_tokens, 1);
/* fall through */
case IBV_WC_RDMA_WRITE:
case IBV_WC_SEND:
OPAL_OUTPUT((-1, "Got WC: RDMA_WRITE or SEND"));
if(openib_frag_type(des) == MCA_BTL_OPENIB_FRAG_SEND) {
opal_list_item_t *i;
while((i = opal_list_remove_first(&to_send_frag(des)->coalesced_frags))) {
@ -2674,6 +2763,8 @@ static void handle_wc(mca_btl_openib_device_t* device, const uint32_t cq,
mca_btl_openib_frag_progress_pending_put_get(endpoint, qp);
break;
case IBV_WC_RECV:
OPAL_OUTPUT((-1, "Got WC: RDMA_RECV, qp %d, src qp %d, WR ID %p",
wc->qp_num, wc->src_qp, (void*) wc->wr_id));
if(wc->wc_flags & IBV_WC_WITH_IMM) {
endpoint = (mca_btl_openib_endpoint_t*)
opal_pointer_array_get_item(device->endpoints, wc->imm_data);

Просмотреть файл

@ -417,6 +417,7 @@ static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
endpoint->endpoint_proc = 0;
endpoint->endpoint_local_cpc = NULL;
endpoint->endpoint_remote_cpc_data = NULL;
endpoint->endpoint_initiator = false;
endpoint->endpoint_tstamp = 0.0;
endpoint->endpoint_state = MCA_BTL_IB_CLOSED;
endpoint->endpoint_retries = 0;
@ -442,6 +443,12 @@ static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
endpoint->use_eager_rdma = false;
endpoint->eager_rdma_remote.tokens = 0;
endpoint->eager_rdma_local.credits = 0;
endpoint->endpoint_cts_mr = NULL;
endpoint->endpoint_cts_frag.super.super.base.super.registration = NULL;
endpoint->endpoint_cts_frag.super.super.base.super.ptr = NULL;
endpoint->endpoint_posted_recvs = false;
endpoint->endpoint_cts_received = false;
endpoint->endpoint_cts_sent = false;
}
/*
@ -459,6 +466,9 @@ static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint)
endpoint->endpoint_local_cpc->cbm_endpoint_finalize(endpoint);
}
/* Release CTS buffer */
ompi_btl_openib_connect_base_free_cts(endpoint);
/* Release memory resources */
do {
/* Make sure that mca_btl_openib_endpoint_connect_eager_rdma ()
@ -531,8 +541,8 @@ static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint)
/*
* call when the connect module has created all the qp's on an
* endpoint and needs to have some receive buffers posted
* Called when the connect module has created all the qp's on an
* endpoint and needs to have some receive buffers posted.
*/
int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t *endpoint)
{
@ -549,6 +559,123 @@ int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t *endpoint)
return OMPI_SUCCESS;
}
static void cts_sent(mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* des,
int status)
{
/* Nothing to do/empty function (we can't pass in a NULL pointer
for the des_cbfunc) */
OPAL_OUTPUT((-1, "CTS send to %s completed",
ep->endpoint_proc->proc_ompi->proc_hostname));
}
/*
* Send CTS control fragment
*/
void mca_btl_openib_endpoint_send_cts(mca_btl_openib_endpoint_t *endpoint)
{
mca_btl_openib_send_control_frag_t *sc_frag;
mca_btl_base_descriptor_t *base_des;
mca_btl_openib_frag_t *openib_frag;
mca_btl_openib_com_frag_t *com_frag;
mca_btl_openib_control_header_t *ctl_hdr;
OPAL_OUTPUT((-1, "SENDING CTS to %s on qp index %d (QP num %d)",
endpoint->endpoint_proc->proc_ompi->proc_hostname,
mca_btl_openib_component.credits_qp,
endpoint->qps[mca_btl_openib_component.credits_qp].qp->lcl_qp->qp_num));
sc_frag = alloc_control_frag(endpoint->endpoint_btl);
if (OPAL_UNLIKELY(NULL == sc_frag)) {
BTL_ERROR(("Failed to allocate control buffer"));
mca_btl_openib_endpoint_invoke_error(endpoint);
return;
}
/* I dislike using the "to_<foo>()" macros; I prefer using the
explicit member fields to ensure I get the types right. Since
this is not a performance-criticial part of the code, it's
ok. */
com_frag = &(sc_frag->super.super);
openib_frag = &(com_frag->super);
base_des = &(openib_frag->base);
base_des->des_cbfunc = cts_sent;
base_des->des_cbdata = NULL;
base_des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
base_des->order = mca_btl_openib_component.credits_qp;
openib_frag->segment.seg_len = sizeof(mca_btl_openib_control_header_t);
com_frag->endpoint = endpoint;
sc_frag->hdr->tag = MCA_BTL_TAG_BTL;
sc_frag->hdr->cm_seen = 0;
sc_frag->hdr->credits = 0;
ctl_hdr = (mca_btl_openib_control_header_t*)
openib_frag->segment.seg_addr.pval;
ctl_hdr->type = MCA_BTL_OPENIB_CONTROL_CTS;
/* Send the fragment */
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
if (OMPI_SUCCESS != mca_btl_openib_endpoint_post_send(endpoint, sc_frag)) {
BTL_ERROR(("Failed to post CTS send"));
mca_btl_openib_endpoint_invoke_error(endpoint);
}
endpoint->endpoint_cts_sent = true;
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
}
/*
* Called when the CPC has established a connection on an endpoint
*/
void mca_btl_openib_endpoint_cpc_complete(mca_btl_openib_endpoint_t *endpoint)
{
/* If the CPC uses the CTS protocol, then start it up */
if (endpoint->endpoint_local_cpc->cbm_uses_cts) {
/* Post our receives, which will make credit management happy
(i.e., rd_credits will be 0) */
if (OMPI_SUCCESS != mca_btl_openib_endpoint_post_recvs(endpoint)) {
BTL_ERROR(("Failed to post receive buffers"));
mca_btl_openib_endpoint_invoke_error(endpoint);
return;
}
endpoint->endpoint_posted_recvs = true;
/* If this is IB, send the CTS immediately. If this is iWARP,
then only send the CTS if this endpoint was the initiator
of the connection (the receiver will send its CTS when it
receives this side's CTS). Also send the CTS if we already
received the peer's CTS (e.g., if this process was slow to
call cpc_complete(). */
OPAL_OUTPUT((-1, "cpc_complete to peer %s: is IB %d, initiatior %d, cts received: %d",
endpoint->endpoint_proc->proc_ompi->proc_hostname,
(IBV_TRANSPORT_IB ==
endpoint->endpoint_btl->device->ib_dev->transport_type),
endpoint->endpoint_initiator,
endpoint->endpoint_cts_received));
if (IBV_TRANSPORT_IB ==
endpoint->endpoint_btl->device->ib_dev->transport_type ||
endpoint->endpoint_initiator ||
endpoint->endpoint_cts_received) {
mca_btl_openib_endpoint_send_cts(endpoint);
/* If we've already got the CTS from the other side, then
mark us as connected */
if (endpoint->endpoint_cts_received) {
OPAL_OUTPUT((-1, "cpc_complete to %s -- already got CTS, so marking endpoint as complete",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
mca_btl_openib_endpoint_connected(endpoint);
}
}
OPAL_OUTPUT((-1, "cpc_complete to %s -- done",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
return;
}
/* Otherwise, just set the endpoint to "connected" */
mca_btl_openib_endpoint_connected(endpoint);
}
/*
* called when the connect module has completed setup of an endpoint
@ -560,6 +687,7 @@ void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t *endpoint)
mca_btl_openib_endpoint_t *ep;
bool master = false;
opal_output(-1, "Now we are CONNECTED");
if (MCA_BTL_XRC_ENABLED) {
OPAL_THREAD_LOCK(&endpoint->ib_addr->addr_lock);
if (MCA_BTL_IB_ADDR_CONNECTED == endpoint->ib_addr->status) {
@ -616,8 +744,8 @@ void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t *endpoint)
ep_item = opal_list_remove_first(&endpoint->ib_addr->pending_ep);
ep = (mca_btl_openib_endpoint_t *)ep_item;
if (OMPI_SUCCESS !=
endpoint->endpoint_local_cpc->cbm_start_connect(endpoint->endpoint_local_cpc,
ep)) {
ompi_btl_openib_connect_base_start(endpoint->endpoint_local_cpc,
ep)) {
BTL_ERROR(("Failed to connect pending endpoint\n"));
}
}
@ -696,7 +824,7 @@ void mca_btl_openib_endpoint_send_credits(mca_btl_openib_endpoint_t* endpoint,
frag = endpoint->qps[qp].credit_frag;
if(OPAL_UNLIKELY(NULL == frag)) {
frag = alloc_credit_frag(openib_btl);
frag = alloc_control_frag(openib_btl);
frag->qp_idx = qp;
endpoint->qps[qp].credit_frag = frag;
/* set those once and forever */
@ -785,7 +913,7 @@ static int mca_btl_openib_endpoint_send_eager_rdma(
mca_btl_openib_send_control_frag_t* frag;
int rc;
frag = alloc_credit_frag(openib_btl);
frag = alloc_control_frag(openib_btl);
if(NULL == frag) {
return -1;
}
@ -942,7 +1070,8 @@ void *mca_btl_openib_endpoint_invoke_error(void *context)
if (NULL == endpoint) {
int i;
for (i = 0; i < mca_btl_openib_component.ib_num_btls; ++i) {
if (NULL != mca_btl_openib_component.openib_btls[i]) {
if (NULL != mca_btl_openib_component.openib_btls[i] &&
NULL != mca_btl_openib_component.openib_btls[i]->error_cb) {
btl = mca_btl_openib_component.openib_btls[i];
break;
}
@ -952,9 +1081,11 @@ void *mca_btl_openib_endpoint_invoke_error(void *context)
}
/* If we didn't find a BTL, then just bail :-( */
if (NULL == btl) {
if (NULL == btl || NULL == btl->error_cb) {
orte_show_help("help-mpi-btl-openib.txt",
"cannot raise btl error", orte_process_info.nodename);
"cannot raise btl error", true,
orte_process_info.nodename,
__FILE__, __LINE__);
exit(1);
}

Просмотреть файл

@ -167,7 +167,15 @@ struct mca_btl_base_endpoint_t {
/** hook for local CPC to hang endpoint-specific data */
void *endpoint_local_cpc_data;
/** pointer to remote CPC's data (essentially its CPC modex message) */
/** If endpoint_local_cpc->cbm_uses_cts is true and this endpoint
is iWARP, then endpoint_initiator must be true on the side
that actually initiates the QP, false on the other side. This
bool is used to know which way to send the first CTS
message. */
bool endpoint_initiator;
/** pointer to remote proc's CPC data (essentially its CPC modex
message) */
ompi_btl_openib_connect_base_module_data_t *endpoint_remote_cpc_data;
/** current state of the connection */
@ -220,6 +228,24 @@ struct mca_btl_base_endpoint_t {
/** information about the remote port */
mca_btl_openib_rem_info_t rem_info;
/** Frag for initial wireup CTS protocol; will be NULL if CPC
indicates that it does not want to use CTS */
mca_btl_openib_recv_frag_t endpoint_cts_frag;
/** Memory registration info for the CTS frag */
struct ibv_mr *endpoint_cts_mr;
/** Whether we've posted receives on this EP or not (only used in
CTS protocol) */
bool endpoint_posted_recvs;
/** Whether we've received the CTS from the peer or not (only used
in CTS protocol) */
bool endpoint_cts_received;
/** Whether we've send out CTS to the peer or not (only used in
CTS protocol) */
bool endpoint_cts_sent;
};
typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t;
@ -244,6 +270,8 @@ int mca_btl_openib_endpoint_post_send(mca_btl_openib_endpoint_t*,
void mca_btl_openib_endpoint_send_credits(mca_btl_base_endpoint_t*, const int);
void mca_btl_openib_endpoint_connect_eager_rdma(mca_btl_openib_endpoint_t*);
int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t*);
void mca_btl_openib_endpoint_send_cts(mca_btl_openib_endpoint_t *endpoint);
void mca_btl_openib_endpoint_cpc_complete(mca_btl_openib_endpoint_t*);
void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t*);
void mca_btl_openib_endpoint_init(mca_btl_openib_module_t*,
mca_btl_base_endpoint_t*,
@ -278,6 +306,12 @@ static inline int post_recvs(mca_btl_base_endpoint_t *ep, const int qp,
wr = wr_list = &to_recv_frag(item)->rd_desc;
else
wr = wr->next = &to_recv_frag(item)->rd_desc;
OPAL_OUTPUT((-1, "Posting recv (QP num %d): WR ID %p, SG addr %p, len %d, lkey %d",
ep->qps[qp].qp->lcl_qp->qp_num,
(void*) wr->wr_id,
(void*) wr->sg_list[0].addr,
wr->sg_list[0].length,
wr->sg_list[0].lkey));
}
wr->next = NULL;
@ -286,7 +320,7 @@ static inline int post_recvs(mca_btl_base_endpoint_t *ep, const int qp,
if (0 == rc)
return OMPI_SUCCESS;
BTL_ERROR(("error %d posting receive on qp %d\n", rc, qp));
BTL_ERROR(("error %d posting receive on qp %d", rc, qp));
return OMPI_ERROR;
}

Просмотреть файл

@ -8,6 +8,23 @@
* $HEADER$
*/
/**
* Note: this file is a little fast-n-loose with OMPI_HAVE_THREADS --
* it uses this value in run-time "if" conditionals (vs. compile-time
* #if conditionals). We also don't protect including <pthread.h>.
* That's because this component currently only compiles on Linux and
* Solaris, and both of these OS's have pthreads. Using the run-time
* conditionals gives us bettern compile-time checking, even of code
* that isn't activated.
*
* Note, too, that the functionality in this file does *not* require
* all the heavyweight OMPI thread infrastructure (e.g., from
* --enable-mpi-threads or --enable-progress-threads). All work that
* is done in a separate progress thread is very carefully segregated
* from that of the main thread, and communication back to the main
* thread
*/
#include "ompi_config.h"
#include <pthread.h>
@ -24,6 +41,11 @@
#include "btl_openib_fd.h"
typedef union {
ompi_btl_openib_fd_event_callback_fn_t *event;
ompi_btl_openib_fd_main_callback_fn_t *main;
} callback_u_t;
/*
* Data for each registered item
*/
@ -33,10 +55,7 @@ typedef struct {
opal_event_t ri_event;
int ri_fd;
int ri_flags;
union {
ompi_btl_openib_fd_callback_fn_t *fd;
ompi_btl_openib_schedule_callback_fn_t *schedule;
} ri_callback;
callback_u_t ri_callback;
void *ri_context;
} registered_item_t;
@ -46,9 +65,14 @@ static OBJ_CLASS_INSTANCE(registered_item_t, opal_list_item_t, NULL, NULL);
* Command types
*/
typedef enum {
/* Read by service thread */
CMD_TIME_TO_QUIT,
CMD_ADD_FD,
CMD_REMOVE_FD,
ACK_RAN_FUNCTION,
/* Read by service and main threads */
CMD_CALL_FUNCTION,
CMD_MAX
} cmd_type_t;
@ -56,7 +80,7 @@ typedef enum {
* Commands. Fields ordered to avoid memory holes (and valgrind warnings).
*/
typedef struct {
ompi_btl_openib_fd_callback_fn_t *pc_callback;
callback_u_t pc_fn;
void *pc_context;
int pc_fd;
int pc_flags;
@ -64,6 +88,16 @@ typedef struct {
char end;
} cmd_t;
/*
* Queued up list of commands to send to the main thread
*/
typedef struct {
opal_list_item_t super;
cmd_t cli_cmd;
} cmd_list_item_t;
static OBJ_CLASS_INSTANCE(cmd_list_item_t, opal_list_item_t, NULL, NULL);
static bool initialized = false;
static int cmd_size = 0;
static fd_set read_fds, write_fds;
@ -71,117 +105,17 @@ static int max_fd;
static opal_list_t registered_items;
/* These items are only used in the threaded version */
/* Owned by the main thread */
static pthread_t thread;
static int pipe_fd[2] = { -1, -1 };
static opal_event_t main_thread_event;
static int pipe_to_service_thread[2] = { -1, -1 };
/* Owned by the service thread */
static int pipe_to_main_thread[2] = { -1, -1 };
static const size_t max_outstanding_to_main_thread = 32;
static size_t waiting_for_ack_from_main_thread = 0;
static opal_list_t pending_to_main_thread;
static void libevent_fd_callback(int fd, short event, void *context)
{
registered_item_t *ri = (registered_item_t*) context;
ri->ri_callback.fd(fd, event, ri->ri_context);
}
static void libevent_event_callback(int fd, short event, void *context)
{
registered_item_t *ri = (registered_item_t*) context;
ri->ri_callback.schedule(ri->ri_context);
/* JMS Can I free ri now? It contains the event... */
#if 0
OBJ_RELEASE(ri);
#endif
}
/*
* Add an fd to the listening set
*/
static int local_pipe_cmd_add_fd(bool use_libevent, cmd_t *cmd)
{
registered_item_t *ri = OBJ_NEW(registered_item_t);
if (NULL == ri) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
ri->ri_event_used = false;
ri->ri_fd = cmd->pc_fd;
ri->ri_flags = cmd->pc_flags;
ri->ri_callback.fd = cmd->pc_callback;
ri->ri_context = cmd->pc_context;
if (use_libevent) {
/* Make an event for this fd */
ri->ri_event_used = true;
memset(&ri->ri_event, 0, sizeof(ri->ri_event));
opal_event_set(&ri->ri_event, ri->ri_fd,
ri->ri_flags | OPAL_EV_PERSIST, libevent_fd_callback,
ri);
opal_event_add(&ri->ri_event, 0);
} else {
/* Add the fd to the relevant fd local sets and update max_fd */
if (OPAL_EV_READ & ri->ri_flags) {
FD_SET(ri->ri_fd, &read_fds);
}
if (OPAL_EV_WRITE & cmd->pc_flags) {
FD_SET(ri->ri_fd, &write_fds);
}
max_fd = (max_fd > ri->ri_fd) ? max_fd : ri->ri_fd + 1;
}
opal_list_append(&registered_items, &ri->super);
return OMPI_SUCCESS;
}
/*
* Remove an fd from the listening set
*/
static int local_pipe_cmd_remove_fd(cmd_t *cmd)
{
int i;
opal_list_item_t *item;
registered_item_t *ri;
/* Go through the list of registered fd's and find the fd to
remove */
for (item = opal_list_get_first(&registered_items);
NULL != opal_list_get_end(&registered_items);
item = opal_list_get_next(item)) {
ri = (registered_item_t*) item;
if (cmd->pc_fd == ri->ri_fd) {
/* Found it. The item knows if it was used as a libevent
event or an entry in the local fd sets. */
if (ri->ri_event_used) {
/* Remove this event from libevent */
opal_event_del(&ri->ri_event);
} else {
/* Remove this item from the fd_sets and recalculate
max_fd */
FD_CLR(cmd->pc_fd, &read_fds);
FD_CLR(cmd->pc_fd, &write_fds);
for (max_fd = i = pipe_fd[0]; i < FD_SETSIZE; ++i) {
if (FD_ISSET(i, &read_fds) || FD_ISSET(i, &write_fds)) {
max_fd = i + 1;
}
}
}
/* Let the caller know that we have stopped monitoring
this fd (if they care) */
if (NULL != cmd->pc_callback) {
cmd->pc_callback(cmd->pc_fd, 0, cmd->pc_context);
}
/* Remove this item from the list of registered items and
release it */
opal_list_remove_item(&registered_items, item);
OBJ_RELEASE(item);
return OMPI_SUCCESS;
}
}
/* This shouldn't happen */
return OMPI_ERR_NOT_FOUND;
}
/*
* Simple loop over reading from a fd
@ -225,6 +159,180 @@ static int write_fd(int fd, int len, void *buffer)
return OMPI_ERROR;
}
}
return OMPI_SUCCESS;
}
/*
* Write a command to the main thread, or queue it up if the pipe is full
*/
static int write_to_main_thread(cmd_t *cmd)
{
/* Note that if we write too much to the main thread pipe and the
main thread doesn't check it often, we could fill up the pipe
and cause this thread to block. Bad! So we do some simple
counting here and ensure that we don't fill the pipe. If we
are in danger of that, then queue up the commands here in the
service thread. The main thread will ACK every CALL_FUNCTION
command, so we have a built-in mechanism to wake up the service
thread to drain any queued-up commands. */
if (opal_list_get_size(&pending_to_main_thread) > 0 ||
waiting_for_ack_from_main_thread >= max_outstanding_to_main_thread) {
cmd_list_item_t *cli = OBJ_NEW(cmd_list_item_t);
if (NULL == cli) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy(&cli->cli_cmd, cmd, cmd_size);
opal_list_append(&pending_to_main_thread, &(cli->super));
} else {
OPAL_OUTPUT((-1, "fd: writing to main thread"));
write_fd(pipe_to_main_thread[1], cmd_size, cmd);
++waiting_for_ack_from_main_thread;
}
return OMPI_SUCCESS;
}
static void service_fd_callback(int fd, short event, void *context)
{
registered_item_t *ri = (registered_item_t*) context;
ri->ri_callback.event(fd, event, ri->ri_context);
}
/*
* Add an fd to the listening set
*/
static int service_pipe_cmd_add_fd(bool use_libevent, cmd_t *cmd)
{
registered_item_t *ri = OBJ_NEW(registered_item_t);
if (NULL == ri) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
ri->ri_event_used = false;
ri->ri_fd = cmd->pc_fd;
ri->ri_flags = cmd->pc_flags;
ri->ri_callback.event = cmd->pc_fn.event;
ri->ri_context = cmd->pc_context;
if (use_libevent) {
/* Make an event for this fd */
ri->ri_event_used = true;
memset(&ri->ri_event, 0, sizeof(ri->ri_event));
opal_event_set(&ri->ri_event, ri->ri_fd,
ri->ri_flags | OPAL_EV_PERSIST, service_fd_callback,
ri);
opal_event_add(&ri->ri_event, 0);
} else {
/* Add the fd to the relevant fd local sets and update max_fd */
if (OPAL_EV_READ & ri->ri_flags) {
FD_SET(ri->ri_fd, &read_fds);
}
if (OPAL_EV_WRITE & cmd->pc_flags) {
FD_SET(ri->ri_fd, &write_fds);
}
max_fd = (max_fd > ri->ri_fd) ? max_fd : ri->ri_fd + 1;
}
opal_list_append(&registered_items, &ri->super);
return OMPI_SUCCESS;
}
/*
* Run a function
*/
static int service_pipe_cmd_call_function(cmd_t *cmd)
{
cmd_t local_cmd;
OPAL_OUTPUT((-1, "fd service thread: calling function!"));
/* Call the function */
if (NULL != cmd->pc_fn.main) {
cmd->pc_fn.main(cmd->pc_context);
}
/* Now ACK that we ran the function */
memset(&local_cmd, 0, cmd_size);
local_cmd.pc_cmd = ACK_RAN_FUNCTION;
write_fd(pipe_to_main_thread[1], cmd_size, &local_cmd);
/* Done */
return OMPI_SUCCESS;
}
/*
* Remove an fd from the listening set
*/
static int service_pipe_cmd_remove_fd(cmd_t *cmd)
{
int i;
opal_list_item_t *item;
registered_item_t *ri;
OPAL_OUTPUT((-1, "service thread got unmonitor fd %d", cmd->pc_fd));
/* Go through the list of registered fd's and find the fd to
remove */
for (item = opal_list_get_first(&registered_items);
NULL != opal_list_get_end(&registered_items);
item = opal_list_get_next(item)) {
ri = (registered_item_t*) item;
if (cmd->pc_fd == ri->ri_fd) {
/* Found it. The item knows if it was used as a libevent
event or an entry in the local fd sets. */
if (ri->ri_event_used) {
/* Remove this event from libevent */
opal_event_del(&ri->ri_event);
} else {
/* Remove this item from the fd_sets and recalculate
MAX_FD */
FD_CLR(cmd->pc_fd, &read_fds);
FD_CLR(cmd->pc_fd, &write_fds);
for (max_fd = i = pipe_to_service_thread[0]; i < FD_SETSIZE; ++i) {
if (FD_ISSET(i, &read_fds) || FD_ISSET(i, &write_fds)) {
max_fd = i + 1;
}
}
}
/* Let the caller know that we have stopped monitoring
this fd (if they care) */
if (NULL != cmd->pc_fn.event) {
cmd->pc_fn.event(cmd->pc_fd, 0, cmd->pc_context);
}
/* Remove this item from the list of registered items and
release it */
opal_list_remove_item(&registered_items, item);
OBJ_RELEASE(item);
return OMPI_SUCCESS;
}
}
/* This shouldn't happen */
return OMPI_ERR_NOT_FOUND;
}
/*
* Call a function and ACK that we ran it
*/
static int main_pipe_cmd_call_function(cmd_t *cmd)
{
cmd_t local_cmd;
OPAL_OUTPUT((-1, "fd main thread: calling function!"));
/* Call the function */
if (NULL != cmd->pc_fn.main) {
cmd->pc_fn.main(cmd->pc_context);
}
/* Now ACK that we ran the function */
memset(&local_cmd, 0, cmd_size);
local_cmd.pc_cmd = ACK_RAN_FUNCTION;
write_fd(pipe_to_service_thread[1], cmd_size, &local_cmd);
/* Done */
return OMPI_SUCCESS;
}
@ -232,32 +340,59 @@ static int write_fd(int fd, int len, void *buffer)
/*
* Act on pipe commands
*/
static bool local_pipe_cmd(void)
static bool service_pipe_cmd(void)
{
bool ret = false;
cmd_t cmd;
cmd_list_item_t *cli;
read_fd(pipe_fd[0], cmd_size, &cmd);
read_fd(pipe_to_service_thread[0], cmd_size, &cmd);
switch (cmd.pc_cmd) {
case CMD_ADD_FD:
if (OMPI_SUCCESS != local_pipe_cmd_add_fd(false, &cmd)) {
OPAL_OUTPUT((-1, "fd service thread: CMD_ADD_FD"));
if (OMPI_SUCCESS != service_pipe_cmd_add_fd(false, &cmd)) {
ret = true;
}
break;
case CMD_REMOVE_FD:
if (OMPI_SUCCESS != local_pipe_cmd_remove_fd(&cmd)) {
OPAL_OUTPUT((-1, "fd service thread: CMD_REMOVE_FD"));
if (OMPI_SUCCESS != service_pipe_cmd_remove_fd(&cmd)) {
ret = true;
}
break;
case CMD_CALL_FUNCTION:
OPAL_OUTPUT((-1, "fd service thread: CMD_RUN_FUNCTION"));
if (OMPI_SUCCESS != service_pipe_cmd_call_function(&cmd)) {
ret = true;
}
break;
case CMD_TIME_TO_QUIT:
opal_output(-1, "fd listener thread: time to quit");
OPAL_OUTPUT((-1, "fd service thread: CMD_TIME_TO_QUIT"));
ret = true;
break;
case ACK_RAN_FUNCTION:
/* We don't have a guarantee that the main thread will check
its pipe frequently, so we do some simple counting to
ensure we just don't have too many outstanding commands to
the main thread at any given time. The main thread will
ACK every CALL_FUNCTION command, so this thread will always
wake up and continue to drain any queued up functions. */
cli = (cmd_list_item_t*) opal_list_remove_first(&pending_to_main_thread);
if (NULL != cli) {
OPAL_OUTPUT((-1, "sending queued up cmd function to main thread"));
write_fd(pipe_to_main_thread[1], cmd_size, &(cli->cli_cmd));
OBJ_RELEASE(cli);
} else {
--waiting_for_ack_from_main_thread;
}
break;
default:
opal_output(-1, "fd listener thread: unknown pipe command!");
OPAL_OUTPUT((-1, "fd service thread: unknown pipe command!"));
break;
}
@ -268,7 +403,7 @@ static bool local_pipe_cmd(void)
/*
* Main thread logic
*/
static void *thread_main(void *context)
static void *service_thread_start(void *context)
{
int rc, flags;
fd_set read_fds_copy, write_fds_copy;
@ -278,29 +413,29 @@ static void *thread_main(void *context)
/* Make an fd set that we can select() on */
FD_ZERO(&write_fds);
FD_ZERO(&read_fds);
FD_SET(pipe_fd[0], &read_fds);
max_fd = pipe_fd[0] + 1;
FD_SET(pipe_to_service_thread[0], &read_fds);
max_fd = pipe_to_service_thread[0] + 1;
opal_output(-1, "fd listener thread running");
OPAL_OUTPUT((-1, "fd service thread running"));
/* Main loop waiting for commands over the fd's */
while (1) {
memcpy(&read_fds_copy, &read_fds, sizeof(read_fds));
memcpy(&write_fds_copy, &write_fds, sizeof(write_fds));
opal_output(-1, "fd listener thread blocking on select...");
OPAL_OUTPUT((-1, "fd service thread blocking on select..."));
rc = select(max_fd, &read_fds_copy, &write_fds_copy, NULL, NULL);
if (0 != rc && EAGAIN == errno) {
continue;
}
opal_output(-1, "fd listener thread woke up!");
OPAL_OUTPUT((-1, "fd service thread woke up!"));
if (rc > 0) {
if (FD_ISSET(pipe_fd[0], &read_fds_copy)) {
opal_output(-1, "fd listener thread: pipe command");
if (local_pipe_cmd()) {
opal_output(-1, "fd listener thread: exiting");
if (FD_ISSET(pipe_to_service_thread[0], &read_fds_copy)) {
OPAL_OUTPUT((-1, "fd service thread: pipe command"));
if (service_pipe_cmd()) {
break;
}
OPAL_OUTPUT((-1, "fd service thread: back from pipe command"));
}
/* Go through all the registered events and see who had
@ -324,9 +459,10 @@ static void *thread_main(void *context)
/* If either was ready, invoke the callback */
if (0 != flags) {
opal_output(-1, "fd listener thread: invoking callback for registered fd %d", ri->ri_fd);
ri->ri_callback.fd(ri->ri_fd, flags,
ri->ri_context);
OPAL_OUTPUT((-1, "fd service thread: invoking callback for registered fd %d", ri->ri_fd));
ri->ri_callback.event(ri->ri_fd, flags,
ri->ri_context);
OPAL_OUTPUT((-1, "fd service thread: back from callback for registered fd %d", ri->ri_fd));
}
}
}
@ -334,12 +470,38 @@ static void *thread_main(void *context)
}
/* All done */
OPAL_OUTPUT((-1, "fd service thread: exiting"));
opal_atomic_wmb();
return NULL;
}
static void main_thread_event_callback(int fd, short event, void *context)
{
cmd_t cmd;
OPAL_OUTPUT((-1, "main thread -- reading command"));
read_fd(pipe_to_main_thread[0], cmd_size, &cmd);
switch (cmd.pc_cmd) {
case CMD_CALL_FUNCTION:
OPAL_OUTPUT((-1, "fd main thread: calling command"));
main_pipe_cmd_call_function(&cmd);
break;
default:
OPAL_OUTPUT((-1, "fd main thread: unknown pipe command: %d",
cmd.pc_cmd));
break;
}
}
/******************************************************************
* Main interface calls
******************************************************************/
/*
* Initialize
* Called by main thread
*/
int ompi_btl_openib_fd_init(void)
{
@ -348,19 +510,42 @@ int ompi_btl_openib_fd_init(void)
OBJ_CONSTRUCT(&registered_items, opal_list_t);
if (OMPI_HAVE_THREAD_SUPPORT) {
/* Create a pipe to communicate with the thread */
if (0 != pipe(pipe_fd)) {
/* Calculate the real size of the cmd struct */
cmd_size = (int) (&(bogus.end) - ((char*) &bogus));
if (OMPI_HAVE_THREADS) {
OBJ_CONSTRUCT(&pending_to_main_thread, opal_list_t);
/* Create pipes to communicate between the two threads */
if (0 != pipe(pipe_to_service_thread)) {
return OMPI_ERR_IN_ERRNO;
}
if (0 != pipe(pipe_to_main_thread)) {
return OMPI_ERR_IN_ERRNO;
}
if (0 != pthread_create(&thread, NULL, thread_main, NULL)) {
/* Create a libevent event that is used in the main thread
to watch its pipe */
memset(&main_thread_event, 0, sizeof(main_thread_event));
opal_event_set(&main_thread_event, pipe_to_main_thread[0],
OPAL_EV_READ | OPAL_EV_PERSIST,
main_thread_event_callback, NULL);
opal_event_add(&main_thread_event, 0);
/* Start the service thread */
if (0 != pthread_create(&thread, NULL, service_thread_start,
NULL)) {
int errno_save = errno;
opal_event_del(&main_thread_event);
close(pipe_to_service_thread[0]);
close(pipe_to_service_thread[1]);
close(pipe_to_main_thread[0]);
close(pipe_to_main_thread[1]);
errno = errno_save;
return OMPI_ERR_IN_ERRNO;
}
}
/* Calculate the real size of the cmd struct */
cmd_size = (int) (&(bogus.end) - ((char*) &bogus));
initialized = true;
}
return OMPI_SUCCESS;
@ -369,9 +554,10 @@ int ompi_btl_openib_fd_init(void)
/*
* Start monitoring an fd
* Called by main or service thread; callback will be in service thread
*/
int ompi_btl_openib_fd_monitor(int fd, int flags,
ompi_btl_openib_fd_callback_fn_t *callback,
ompi_btl_openib_fd_event_callback_fn_t *callback,
void *context)
{
cmd_t cmd;
@ -384,14 +570,15 @@ int ompi_btl_openib_fd_monitor(int fd, int flags,
cmd.pc_cmd = CMD_ADD_FD;
cmd.pc_fd = fd;
cmd.pc_flags = flags;
cmd.pc_callback = callback;
cmd.pc_fn.event = callback;
cmd.pc_context = context;
if (OMPI_HAVE_THREAD_SUPPORT) {
if (OMPI_HAVE_THREADS) {
/* For the threaded version, write a command down the pipe */
write_fd(pipe_fd[1], cmd_size, &cmd);
OPAL_OUTPUT((-1, "main thread sending monitor fd %d", fd));
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
} else {
/* Otherwise, add it directly */
local_pipe_cmd_add_fd(true, &cmd);
service_pipe_cmd_add_fd(true, &cmd);
}
return OMPI_SUCCESS;
@ -400,9 +587,10 @@ int ompi_btl_openib_fd_monitor(int fd, int flags,
/*
* Stop monitoring an fd
* Called by main or service thread; callback will be in service thread
*/
int ompi_btl_openib_fd_unmonitor(int fd,
ompi_btl_openib_fd_callback_fn_t *callback,
ompi_btl_openib_fd_event_callback_fn_t *callback,
void *context)
{
cmd_t cmd;
@ -415,14 +603,41 @@ int ompi_btl_openib_fd_unmonitor(int fd,
cmd.pc_cmd = CMD_REMOVE_FD;
cmd.pc_fd = fd;
cmd.pc_flags = 0;
cmd.pc_callback = callback;
cmd.pc_fn.event = callback;
cmd.pc_context = context;
if (OMPI_HAVE_THREAD_SUPPORT) {
if (OMPI_HAVE_THREADS) {
/* For the threaded version, write a command down the pipe */
write_fd(pipe_fd[1], cmd_size, &cmd);
OPAL_OUTPUT((-1, "main thread sending unmonitor fd %d", fd));
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
} else {
/* Otherwise, remove it directly */
local_pipe_cmd_remove_fd(&cmd);
service_pipe_cmd_remove_fd(&cmd);
}
return OMPI_SUCCESS;
}
/*
* Run in the service thread
* Called by main thread; callback will be in service thread
*/
int ompi_btl_openib_fd_run_in_service(ompi_btl_openib_fd_main_callback_fn_t *callback,
void *context)
{
cmd_t cmd;
cmd.pc_cmd = CMD_CALL_FUNCTION;
cmd.pc_fd = -1;
cmd.pc_flags = 0;
cmd.pc_fn.main = callback;
cmd.pc_context = context;
if (OMPI_HAVE_THREADS) {
/* For the threaded version, write a command down the pipe */
OPAL_OUTPUT((-1, "main thread sending 'run in service'"));
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
} else {
/* Otherwise, run it directly */
callback(context);
}
return OMPI_SUCCESS;
@ -430,31 +645,25 @@ int ompi_btl_openib_fd_unmonitor(int fd,
/*
* Run a function in the main thread
* Called by service thread
*/
int ompi_btl_openib_fd_schedule(ompi_btl_openib_schedule_callback_fn_t *callback,
void *context)
int ompi_btl_openib_fd_run_in_main(ompi_btl_openib_fd_main_callback_fn_t *callback,
void *context)
{
if (OMPI_HAVE_THREAD_SUPPORT) {
/* For the threaded version, schedule an event for "now" */
registered_item_t *ri;
struct timeval now;
if (OMPI_HAVE_THREADS) {
cmd_t cmd;
ri = OBJ_NEW(registered_item_t);
if (NULL == ri) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* Create an event that will run in the main thread */
ri->ri_fd = ri->ri_flags = -1;
ri->ri_callback.schedule = callback;
ri->ri_context = context;
ri->ri_event_used = true;
opal_evtimer_set(&ri->ri_event, libevent_event_callback, ri);
now.tv_sec = 0;
now.tv_usec = 0;
opal_evtimer_add(&ri->ri_event, &now);
OPAL_OUTPUT((-1, "run in main -- sending command"));
/* For the threaded version, write a command down the pipe */
cmd.pc_cmd = CMD_CALL_FUNCTION;
cmd.pc_fd = -1;
cmd.pc_flags = 0;
cmd.pc_fn.main = callback;
cmd.pc_context = context;
write_to_main_thread(&cmd);
} else {
/* For the non-threaded version, just call the function */
/* Otherwise, call it directly */
OPAL_OUTPUT((-1, "run in main -- calling now!"));
callback(context);
}
@ -463,21 +672,32 @@ int ompi_btl_openib_fd_schedule(ompi_btl_openib_schedule_callback_fn_t *callback
/*
* Finalize
* Called by main thread
*/
int ompi_btl_openib_fd_finalize(void)
{
if (initialized) {
if (OMPI_HAVE_THREAD_SUPPORT) {
if (OMPI_HAVE_THREADS) {
/* For the threaded version, send a command down the pipe */
cmd_t cmd;
OPAL_OUTPUT((-1, "shutting down openib fd"));
opal_event_del(&main_thread_event);
memset(&cmd, 0, cmd_size);
cmd.pc_cmd = CMD_TIME_TO_QUIT;
write_fd(pipe_fd[1], cmd_size, &cmd);
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
pthread_join(thread, NULL);
close(pipe_fd[0]);
close(pipe_fd[1]);
opal_atomic_rmb();
opal_event_del(&main_thread_event);
close(pipe_to_service_thread[0]);
close(pipe_to_service_thread[1]);
close(pipe_to_main_thread[0]);
close(pipe_to_main_thread[1]);
OBJ_DESTRUCT(&pending_to_main_thread);
}
OBJ_DESTRUCT(&registered_items);
}
initialized = false;

Просмотреть файл

@ -18,41 +18,53 @@ BEGIN_C_DECLS
/**
* Typedef for fd callback function
*/
typedef void *(ompi_btl_openib_fd_callback_fn_t)(int fd, int flags,
void *context);
typedef void *(ompi_btl_openib_fd_event_callback_fn_t)(int fd, int flags,
void *context);
/**
* Typedef for generic callback function
*/
typedef void *(ompi_btl_openib_schedule_callback_fn_t)(void *context);
typedef void *(ompi_btl_openib_fd_main_callback_fn_t)(void *context);
/**
* Initialize fd monitoring
* Initialize fd monitoring.
* Called by the main thread.
*/
int ompi_btl_openib_fd_init(void);
/**
* Start monitoring an fd
* Start monitoring an fd.
* Called by main or service thread; callback will be in service thread.
*/
int ompi_btl_openib_fd_monitor(int fd, int flags,
ompi_btl_openib_fd_callback_fn_t *callback,
ompi_btl_openib_fd_event_callback_fn_t *callback,
void *context);
/**
* Stop monitoring an fd
* Stop monitoring an fd.
* Called by main or service thread; callback will be in service thread.
*/
int ompi_btl_openib_fd_unmonitor(int fd,
ompi_btl_openib_fd_callback_fn_t *callback,
ompi_btl_openib_fd_event_callback_fn_t *callback,
void *context);
/**
* Run a function in the main thread
* Run a function in the service thread.
* Called by the main thread.
*/
int ompi_btl_openib_fd_schedule(ompi_btl_openib_schedule_callback_fn_t callback,
void *context);
int ompi_btl_openib_fd_run_in_service(ompi_btl_openib_fd_main_callback_fn_t callback,
void *context);
/**
* Finalize fd monitoring
* Run a function in the main thread.
* Called by the service thread.
*/
int ompi_btl_openib_fd_run_in_main(ompi_btl_openib_fd_main_callback_fn_t callback,
void *context);
/**
* Finalize fd monitoring.
* Called by the main thread.
*/
int ompi_btl_openib_fd_finalize(void);

Просмотреть файл

@ -116,6 +116,7 @@ typedef struct mca_btl_openib_footer_t mca_btl_openib_footer_t;
#define MCA_BTL_OPENIB_CONTROL_CREDITS 0
#define MCA_BTL_OPENIB_CONTROL_RDMA 1
#define MCA_BTL_OPENIB_CONTROL_COALESCED 2
#define MCA_BTL_OPENIB_CONTROL_CTS 3
struct mca_btl_openib_control_header_t {
uint8_t type;
@ -267,7 +268,7 @@ OBJ_CLASS_DECLARATION(mca_btl_openib_coalesced_frag_t);
*/
static inline mca_btl_openib_send_control_frag_t *
alloc_credit_frag(mca_btl_openib_module_t *btl)
alloc_control_frag(mca_btl_openib_module_t *btl)
{
int rc;
ompi_free_list_item_t *item;

Просмотреть файл

@ -16,6 +16,7 @@
#if OMPI_HAVE_RDMACM
#include <rdma/rdma_cma.h>
#include <malloc.h>
#include <stdio.h>
#include "opal/util/argv.h"
#include "opal/util/if.h"
@ -45,13 +46,36 @@ static OBJ_CLASS_INSTANCE(rdma_addr_list_t, opal_list_item_t,
NULL, NULL);
static opal_list_t *myaddrs = NULL;
#if OMPI_ENABLE_DEBUG
static char *stringify(uint32_t addr)
{
static char line[64];
memset(line, 0, sizeof(line));
snprintf(line, sizeof(line) - 1, "%d.%d.%d.%d (0x%x)",
#if defined(WORDS_BIGENDIAN)
(addr >> 24),
(addr >> 16) & 0xff,
(addr >> 8) & 0xff,
addr & 0xff,
#else
addr & 0xff,
(addr >> 8) & 0xff,
(addr >> 16) & 0xff,
(addr >> 24),
#endif
addr);
return line;
}
#endif
uint64_t mca_btl_openib_get_iwarp_subnet_id(struct ibv_device *ib_dev)
{
opal_list_item_t *item;
/* In the off chance that the user forces non-rdmacm cpc and iwarp, the list
* will be uninitialized. Return 0 to prevent crashes, and the lack of it
* actually working will be caught at a later stage.
/* In the off chance that the user forces non-rdmacm cpc and
* iwarp, the list will be uninitialized. Return 0 to prevent
* crashes, and the lack of it actually working will be caught at
* a later stage.
*/
if (NULL == myaddrs) {
return 0;
@ -69,16 +93,27 @@ uint64_t mca_btl_openib_get_iwarp_subnet_id(struct ibv_device *ib_dev)
return 0;
}
uint32_t mca_btl_openib_rdma_get_ipv4addr(struct ibv_context *verbs, uint8_t port)
uint32_t mca_btl_openib_rdma_get_ipv4addr(struct ibv_context *verbs,
uint8_t port)
{
opal_list_item_t *item;
/* Sanity check */
if (NULL == myaddrs) {
return 0;
}
BTL_VERBOSE(("Looking for %s:%d in IP address list",
ibv_get_device_name(verbs->device), port));
for (item = opal_list_get_first(myaddrs);
item != opal_list_get_end(myaddrs);
item = opal_list_get_next(item)) {
struct rdma_addr_list *addr = (struct rdma_addr_list *)item;
if (!strcmp(addr->dev_name, verbs->device->name) &&
port == addr->dev_port) {
BTL_VERBOSE(("FOUND: %s:%d is %s",
ibv_get_device_name(verbs->device), port,
stringify(addr->addr)));
return addr->addr;
}
}
@ -130,14 +165,14 @@ static int add_rdma_addr(struct sockaddr *ipaddr, uint32_t netmask)
ch = rdma_create_event_channel();
if (NULL == ch) {
BTL_ERROR(("failed creating event channel"));
BTL_VERBOSE(("failed creating RDMA CM event channel"));
rc = OMPI_ERROR;
goto out1;
}
rc = rdma_create_id(ch, &cm_id, NULL, RDMA_PS_TCP);
if (rc) {
BTL_ERROR(("rdma_create_id returned %d", rc));
BTL_VERBOSE(("rdma_create_id returned %d", rc));
rc = OMPI_ERROR;
goto out2;
}
@ -165,11 +200,12 @@ static int add_rdma_addr(struct sockaddr *ipaddr, uint32_t netmask)
myaddr->addr = sinp->sin_addr.s_addr;
myaddr->subnet = myaddr->addr & netmask;
inet_ntop(sinp->sin_family, &sinp->sin_addr,
myaddr->addr_str, sizeof myaddr->addr_str);
myaddr->addr_str, sizeof(myaddr->addr_str));
memcpy(myaddr->dev_name, cm_id->verbs->device->name, IBV_SYSFS_NAME_MAX);
myaddr->dev_port = cm_id->port_num;
BTL_VERBOSE(("adding addr %s dev %s port %d to rdma_addr_list",
myaddr->addr_str, myaddr->dev_name, myaddr->dev_port));
BTL_VERBOSE(("Adding addr %s (0x%x) as %s:%d",
myaddr->addr_str, myaddr->addr,
myaddr->dev_name, myaddr->dev_port));
opal_list_append(myaddrs, &(myaddr->super));
@ -225,7 +261,10 @@ void mca_btl_openib_free_rdma_addr_list(void)
OBJ_RELEASE(myaddrs);
}
}
#else
#else
/* !OMPI_HAVE_RDMACM case */
uint64_t mca_btl_openib_get_iwarp_subnet_id(struct ibv_device *ib_dev)
{
return 0;
@ -239,9 +278,10 @@ uint32_t mca_btl_openib_rdma_get_ipv4addr(struct ibv_context *verbs,
int mca_btl_openib_build_rdma_addr_list(void)
{
return 0;
return OMPI_SUCCESS;
}
void mca_btl_openib_free_rdma_addr_list(void) {
void mca_btl_openib_free_rdma_addr_list(void)
{
}
#endif

Просмотреть файл

@ -33,6 +33,12 @@
#include "btl_openib_mca.h"
#include "connect/base.h"
#ifdef HAVE_IBV_FORK_INIT
#define OMPI_HAVE_IBV_FORK_INIT 1
#else
#define OMPI_HAVE_IBV_FORK_INIT 0
#endif
/*
* Local flags
*/
@ -155,50 +161,50 @@ int btl_openib_register_mca_params(void)
1, &ival, 0));
mca_btl_openib_component.warn_nonexistent_if = (0 != ival);
#ifdef HAVE_IBV_FORK_INIT
ival2 = -1;
#else
ival2 = 0;
#endif
if (OMPI_HAVE_IBV_FORK_INIT) {
ival2 = -1;
} else {
ival2 = 0;
}
CHECK(reg_int("want_fork_support", NULL,
"Whether fork support is desired or not "
"(negative = try to enable fork support, but continue even if it is not available, 0 = do not enable fork support, positive = try to enable fork support and fail if it is not available)",
ival2, &ival, 0));
#ifdef HAVE_IBV_FORK_INIT
mca_btl_openib_component.want_fork_support = ival;
#else
if (0 != ival) {
orte_show_help("help-mpi-btl-openib.txt",
"ibv_fork requested but not supported", true,
orte_process_info.nodename);
return OMPI_ERROR;
if (OMPI_HAVE_IBV_FORK_INIT) {
mca_btl_openib_component.want_fork_support = ival;
} else {
if (0 != ival) {
orte_show_help("help-mpi-btl-openib.txt",
"ibv_fork requested but not supported", true,
orte_process_info.nodename);
return OMPI_ERROR;
}
}
#endif
asprintf(&str, "%s/mca-btl-openib-device-params.ini",
opal_install_dirs.pkgdatadir);
if (NULL == str) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
#ifdef HAVE_IBV_FORK_INIT
ival2 = -1;
#else
ival2 = 0;
#endif
if (OMPI_HAVE_IBV_FORK_INIT) {
ival2 = -1;
} else {
ival2 = 0;
}
CHECK(reg_int("want_fork_support", NULL,
"Whether fork support is desired or not "
"(negative = try to enable fork support, but continue even if it is not available, 0 = do not enable fork support, positive = try to enable fork support and fail if it is not available)",
ival2, &ival, 0));
#ifdef HAVE_IBV_FORK_INIT
mca_btl_openib_component.want_fork_support = ival;
#else
if (0 != ival) {
orte_show_help("help-mpi-btl-openib.txt",
"ibv_fork requested but not supported", true,
orte_process_info.nodename);
return OMPI_ERROR;
if (OMPI_HAVE_IBV_FORK_INIT) {
mca_btl_openib_component.want_fork_support = ival;
} else {
if (0 != ival) {
orte_show_help("help-mpi-btl-openib.txt",
"ibv_fork requested but not supported", true,
orte_process_info.nodename);
return OMPI_ERROR;
}
}
#endif
CHECK(reg_string("device_param_files", "hca_param_files",
"Colon-delimited list of INI-style files that contain device vendor/part-specific parameters",
@ -206,6 +212,25 @@ int btl_openib_register_mca_params(void)
0));
free(str);
CHECK(reg_string("device_type", NULL,
"Specify to only use IB or iWARP network adapters (infiniband = only use InfiniBand HCAs; iwarp = only use iWARP NICs; all = use any available adapters)",
"all", &str, 0));
if (0 == strcasecmp(str, "ib") ||
0 == strcasecmp(str, "infiniband")) {
mca_btl_openib_component.device_type = BTL_OPENIB_DT_IB;
} else if (0 == strcasecmp(str, "iw") ||
0 == strcasecmp(str, "iwarp")) {
mca_btl_openib_component.device_type = BTL_OPENIB_DT_IWARP;
} else if (0 == strcasecmp(str, "all")) {
mca_btl_openib_component.device_type = BTL_OPENIB_DT_ALL;
} else {
orte_show_help("help-mpi-btl-openib.txt",
"ibv_fork requested but not supported", true,
orte_process_info.nodename);
return OMPI_ERROR;
}
free(str);
CHECK(reg_int("max_btls", NULL,
"Maximum number of device ports to use "
"(-1 = use all available, otherwise must be >= 1)",
@ -487,11 +512,7 @@ int btl_openib_register_mca_params(void)
"have_fork_support",
"Whether the OpenFabrics stack supports applications that invoke the \"fork()\" system call or not (0 = no, 1 = yes). Note that this value does NOT indicate whether the system being run on supports \"fork()\" with OpenFabrics applications or not.",
false, true,
#ifdef HAVE_IBV_FORK_INIT
1,
#else
0,
#endif
OMPI_HAVE_IBV_FORK_INIT ? 1 : 0,
NULL);
mca_btl_openib_module.super.btl_exclusivity = MCA_BTL_EXCLUSIVITY_DEFAULT;

Просмотреть файл

@ -32,7 +32,7 @@ AC_DEFUN([MCA_btl_openib_POST_CONFIG], [
# [action-if-cant-compile])
# ------------------------------------------------
AC_DEFUN([MCA_btl_openib_CONFIG],[
OMPI_VAR_SCOPE_PUSH([cpcs])
OMPI_VAR_SCOPE_PUSH([cpcs have_threads])
cpcs="oob"
OMPI_CHECK_OPENIB([btl_openib],
@ -54,15 +54,23 @@ AC_DEFUN([MCA_btl_openib_CONFIG],[
$1],
[$2])
AC_MSG_CHECKING([for thread support (needed for ibcm/rdmacm)])
have_threads=`echo $THREAD_TYPE | awk '{ print [$]1 }'`
if test "x$have_threads" = "x"; then
have_threads=none
fi
AC_MSG_RESULT([$have_threads])
AS_IF([test "$btl_openib_happy" = "yes"],
[if test "x$btl_openib_have_xrc" = "x1"; then
cpcs="$cpcs xoob"
fi
if test "x$btl_openib_have_rdmacm" = "x1"; then
if test "x$btl_openib_have_rdmacm" = "x1" -a \
"$have_threads" != "none"; then
cpcs="$cpcs rdmacm"
fi
if test "x$btl_openib_have_ibcm" = "x1"; then
if test "x$btl_openib_have_ibcm" = "x1" -a \
"$have_threads" != "none"; then
cpcs="$cpcs ibcm"
fi
AC_MSG_CHECKING([which openib btl cpcs will be built])

Просмотреть файл

@ -15,6 +15,11 @@
BEGIN_C_DECLS
/*
* Forward declaration to resolve circular dependency
*/
struct mca_btl_base_endpoint_t;
/*
* Open function
*/
@ -57,6 +62,26 @@ int ompi_btl_openib_connect_base_get_cpc_index
ompi_btl_openib_connect_base_component_t *
ompi_btl_openib_connect_base_get_cpc_byindex(uint8_t index);
/*
* Allocate a CTS frag
*/
int ompi_btl_openib_connect_base_alloc_cts(
struct mca_btl_base_endpoint_t *endpoint);
/*
* Free a CTS frag
*/
int ompi_btl_openib_connect_base_free_cts(
struct mca_btl_base_endpoint_t *endpoint);
/*
* Start a new connection to an endpoint
*/
int ompi_btl_openib_connect_base_start(
ompi_btl_openib_connect_base_module_t *cpc,
struct mca_btl_base_endpoint_t *endpoint);
/*
* Component-wide CPC finalize
*/

Просмотреть файл

@ -18,10 +18,10 @@
#if HAVE_XRC
#include "connect/btl_openib_connect_xoob.h"
#endif
#if OMPI_HAVE_RDMACM
#if OMPI_HAVE_RDMACM && OMPI_HAVE_THREADS
#include "connect/btl_openib_connect_rdmacm.h"
#endif
#if OMPI_HAVE_IBCM
#if OMPI_HAVE_IBCM && OMPI_HAVE_THREADS
#include "connect/btl_openib_connect_ibcm.h"
#endif
@ -44,7 +44,7 @@ static ompi_btl_openib_connect_base_component_t *all[] = {
/* Always have an entry here so that the CP indexes will always be
the same: if RDMA CM is not available, use the "empty" CPC */
#if OMPI_HAVE_RDMACM
#if OMPI_HAVE_RDMACM && OMPI_HAVE_THREADS
&ompi_btl_openib_connect_rdmacm,
#else
&ompi_btl_openib_connect_empty,
@ -52,7 +52,7 @@ static ompi_btl_openib_connect_base_component_t *all[] = {
/* Always have an entry here so that the CP indexes will always be
the same: if IB CM is not available, use the "empty" CPC */
#if OMPI_HAVE_IBCM
#if OMPI_HAVE_IBCM && OMPI_HAVE_THREADS
&ompi_btl_openib_connect_ibcm,
#else
&ompi_btl_openib_connect_empty,
@ -273,6 +273,14 @@ int ompi_btl_openib_connect_base_select_for_local_port(mca_btl_openib_module_t *
opal_output(-1, "match cpc for local port: %s",
available[i]->cbc_name);
/* If the CPC wants to use the CTS protocol, check to ensure
that QP 0 is PP; if it's not, we can't use this CPC (or the
CTS protocol) */
if (!BTL_OPENIB_QP_TYPE_PP(0)) {
BTL_VERBOSE(("this CPConly supports when the first btl_openib_receive_queues QP is a PP QP"));
continue;
}
/* This CPC has indicated that it wants to run on this openib
BTL module. Woo hoo! */
++cpc_index;
@ -391,6 +399,92 @@ ompi_btl_openib_connect_base_get_cpc_byindex(uint8_t index)
NULL : all[index];
}
int ompi_btl_openib_connect_base_alloc_cts(mca_btl_base_endpoint_t *endpoint)
{
ompi_free_list_item_t *fli;
int length = sizeof(mca_btl_openib_header_t) +
sizeof(mca_btl_openib_header_coalesced_t) +
sizeof(mca_btl_openib_control_header_t) +
sizeof(mca_btl_openib_footer_t) +
mca_btl_openib_component.qp_infos[mca_btl_openib_component.credits_qp].size;
/* Explicitly don't use the mpool registration */
fli = &(endpoint->endpoint_cts_frag.super.super.base.super);
fli->registration = NULL;
fli->ptr = malloc(length);
if (NULL == fli->ptr) {
BTL_ERROR(("malloc failed"));
return OMPI_ERR_OUT_OF_RESOURCE;
}
endpoint->endpoint_cts_mr =
ibv_reg_mr(endpoint->endpoint_btl->device->ib_pd,
fli->ptr, length,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE |
IBV_ACCESS_REMOTE_READ);
OPAL_OUTPUT((-1, "registered memory %p, length %d", fli->ptr, length));
if (NULL == endpoint->endpoint_cts_mr) {
free(fli->ptr);
BTL_ERROR(("Failed to reg mr!"));
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* Copy the lkey where it needs to go */
endpoint->endpoint_cts_frag.super.sg_entry.lkey =
endpoint->endpoint_cts_frag.super.super.segment.seg_key.key32[0] =
endpoint->endpoint_cts_mr->lkey;
endpoint->endpoint_cts_frag.super.sg_entry.length = length;
/* Construct the rest of the recv_frag_t */
OBJ_CONSTRUCT(&(endpoint->endpoint_cts_frag), mca_btl_openib_recv_frag_t);
endpoint->endpoint_cts_frag.super.super.base.order =
mca_btl_openib_component.credits_qp;
endpoint->endpoint_cts_frag.super.endpoint = endpoint;
OPAL_OUTPUT((-1, "Got a CTS frag for peer %s, addr %p, length %d, lkey %d",
endpoint->endpoint_proc->proc_ompi->proc_hostname,
(void*) endpoint->endpoint_cts_frag.super.sg_entry.addr,
endpoint->endpoint_cts_frag.super.sg_entry.length,
endpoint->endpoint_cts_frag.super.sg_entry.lkey));
return OMPI_SUCCESS;
}
int ompi_btl_openib_connect_base_free_cts(mca_btl_base_endpoint_t *endpoint)
{
if (NULL != endpoint->endpoint_cts_mr) {
ibv_dereg_mr(endpoint->endpoint_cts_mr);
endpoint->endpoint_cts_mr = NULL;
}
if (NULL != endpoint->endpoint_cts_frag.super.super.base.super.ptr) {
free(endpoint->endpoint_cts_frag.super.super.base.super.ptr);
endpoint->endpoint_cts_frag.super.super.base.super.ptr = NULL;
OPAL_OUTPUT((-1, "Freeing CTS frag"));
}
return OMPI_SUCCESS;
}
/*
* Called to start a connection
*/
int ompi_btl_openib_connect_base_start(
ompi_btl_openib_connect_base_module_t *cpc,
mca_btl_base_endpoint_t *endpoint)
{
/* If the CPC uses the CTS protocol, provide a frag buffer for the
CPC to post. Must allocate these frags up here in the main
thread because the FREE_LIST_WAIT is not thread safe. */
if (cpc->cbm_uses_cts) {
int rc;
rc = ompi_btl_openib_connect_base_alloc_cts(endpoint);
if (OMPI_SUCCESS != rc) {
return rc;
}
}
return cpc->cbm_start_connect(cpc, endpoint);
}
/*
* Called during openib btl component close
*/

Просмотреть файл

@ -248,7 +248,7 @@
* normal RTU processing. If the RTU is received later, the IBCM
* system on the passive side will know that it's effectively a
* duplicate, and therefore can be ignored.
*/
*/
#include "ompi_config.h"
@ -649,6 +649,7 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
rc = OMPI_ERR_NOT_SUPPORTED;
goto error;
}
/* If we do not have struct ibv_device.transport_device, then
we're in an old version of OFED that is IB only (i.e., no
iWarp), so we can safely assume that we can use this CPC. */
@ -699,12 +700,13 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
BTL_VERBOSE(("created cpc module %p for btl %p",
(void*)m, (void*)btl));
/* See if we've already for an IB CM listener for this device */
/* See if we've already got an IB CM listener for this device */
for (item = opal_list_get_first(&ibcm_cm_listeners);
item != opal_list_get_end(&ibcm_cm_listeners);
item = opal_list_get_next(item)) {
cmh = (ibcm_listen_cm_id_t*) item;
if (cmh->ib_context == btl->device->ib_dev_context) {
OBJ_RETAIN(cmh);
break;
}
}
@ -753,6 +755,9 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
rc = OMPI_ERR_NOT_SUPPORTED;
goto error;
}
OPAL_OUTPUT((-1, "opened ibcm device 0x%" PRIx64 " (%s)",
(uint64_t) cmh->cm_device,
ibv_get_device_name(cmh->ib_context->device)));
if (0 != (rc = ib_cm_create_id(cmh->cm_device,
&cmh->listen_cm_id, NULL))) {
@ -779,9 +784,6 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
}
opal_list_append(&ibcm_cm_listeners, &(cmh->super));
} else {
/* We found an existing IB CM handle -- bump up the refcount */
OBJ_RETAIN(cmh);
}
m->cmh = cmh;
imli = OBJ_NEW(ibcm_module_list_item_t);
@ -831,6 +833,9 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
m->cpc.cbm_start_connect = ibcm_module_start_connect;
m->cpc.cbm_endpoint_finalize = ibcm_endpoint_finalize;
m->cpc.cbm_finalize = ibcm_module_finalize;
/* Setting uses_cts=true also guarantees that we'll only be
selected if QP 0 is PP */
m->cpc.cbm_uses_cts = true;
/* Start monitoring the fd associated with the cm_device */
ompi_btl_openib_fd_monitor(cmh->cm_device->fd, OPAL_EV_READ,
@ -901,7 +906,8 @@ static int qp_create_one(mca_btl_base_endpoint_t* endpoint, int qp,
init_attr.cap.max_send_sge = 1;
init_attr.cap.max_recv_sge = 1; /* we do not use SG list */
if(BTL_OPENIB_QP_TYPE_PP(qp)) {
init_attr.cap.max_recv_wr = max_recv_wr;
/* Add one for the CTS receive frag that will be posted */
init_attr.cap.max_recv_wr = max_recv_wr + 1;
} else {
init_attr.cap.max_recv_wr = 0;
}
@ -1265,7 +1271,7 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
/* Set the endpoint state to "connecting" (this function runs in
the main MPI thread; not the service thread, so we can set the
endpoint_state here). */
endpoint_state here with no memory barriers). */
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
/* Fill in the path record for this peer */
@ -1363,9 +1369,6 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
}
if (0 != (rc = ib_cm_send_req(req->super.cm_id, cm_req))) {
BTL_VERBOSE(("Got nonzero return from ib_cm_send_req: %d, errno %d", rc, errno));
if (-1 == rc) {
perror("Errno is ");
}
rc = OMPI_ERR_UNREACH;
goto err;
}
@ -1438,11 +1441,9 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
*/
static void *callback_unlock(int fd, int flags, void *context)
{
/* We need #if protection in order to prevent unused variable warning */
#if OMPI_HAVE_THREAD_SUPPORT
opal_mutex_t *m = (opal_mutex_t*) context;
OPAL_THREAD_UNLOCK(m);
#endif
volatile int *barrier = (volatile int *) context;
OPAL_OUTPUT((-1, "ibcm unlocking main thread"));
*barrier = 1;
return NULL;
}
@ -1455,7 +1456,7 @@ static void ibcm_listen_cm_id_constructor(ibcm_listen_cm_id_t *cmh)
static void ibcm_listen_cm_id_destructor(ibcm_listen_cm_id_t *cmh)
{
opal_mutex_t mutex;
volatile int barrier = 0;
opal_list_item_t *item;
/* Remove all the ibcm module items */
@ -1482,19 +1483,41 @@ static void ibcm_listen_cm_id_destructor(ibcm_listen_cm_id_t *cmh)
/* Stop monitoring the cm_device's fd (wait for it to be
released from the monitoring entity) */
OPAL_THREAD_LOCK(&mutex);
ompi_btl_openib_fd_unmonitor(cmh->cm_device->fd,
callback_unlock,
&mutex);
OPAL_THREAD_LOCK(&mutex);
(void*) &barrier);
/* JMS debug code while figuring out the IBCM problem */
#if 0
while (0 == barrier) {
sched_yield();
}
#else
{
time_t t = time(NULL);
OPAL_OUTPUT((-1, "main thread waiting for ibcm barrier"));
while (0 == barrier) {
sched_yield();
if (time(NULL) - t > 5) {
OPAL_OUTPUT((-1, "main thread been looping for a long time..."));
break;
}
}
}
#endif
/* Destroy the listener */
if (NULL != cmh->listen_cm_id) {
OPAL_OUTPUT((-1, "destryoing ibcm listener 0x%" PRIx64,
(uint64_t) cmh->listen_cm_id));
ib_cm_destroy_id(cmh->listen_cm_id);
}
/* Close the CM device */
if (NULL != cmh->cm_device) {
OPAL_OUTPUT((-1, "closing ibcm device 0x%" PRIx64 " (%s)",
(uint64_t) cmh->cm_device,
ibv_get_device_name(cmh->ib_context->device)));
ib_cm_close_device(cmh->cm_device);
}
}
@ -1564,7 +1587,7 @@ static int ibcm_module_finalize(mca_btl_openib_module_t *btl,
{
ibcm_module_t *m = (ibcm_module_t *) cpc;
/* If we previously successfully initialized, then destroy
/* If we previously successfully initialized, then release
everything */
if (NULL != m && NULL != m->cmh) {
OBJ_RELEASE(m->cmh);
@ -1702,21 +1725,6 @@ static int qp_to_rts(int qp_index, struct ib_cm_id *cm_id,
return OMPI_SUCCESS;
}
/*
* Callback (from main thread) when an incoming IBCM request needs to
* initiate a new connection in the other direction.
*/
static void *callback_set_endpoint_connecting(void *context)
{
mca_btl_openib_endpoint_t *endpoint =
(mca_btl_openib_endpoint_t *) context;
BTL_VERBOSE(("ibcm scheduled callback: setting endpoint to CONNECTING"));
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
return NULL;
}
/*
* Callback (from main thread) when an incoming IBCM request needs to
* initiate a new connection in the other direction.
@ -1895,9 +1903,11 @@ static int request_received(ibcm_listen_cm_id_t *cmh,
If this is not the first request, then assume that all the QPs
have been created already and we can just lookup what we need. */
if (!ie->ie_qps_created) {
/* Schedule to set the endpoint_state to "CONNECTING" */
ompi_btl_openib_fd_schedule(callback_set_endpoint_connecting,
endpoint);
/* Set the endpoint_state to "CONNECTING". This is running
in the service thread, so we need to do a write barrier. */
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
opal_atomic_wmb();
if (OMPI_SUCCESS != (rc = qp_create_all(endpoint, imodule))) {
rej_reason = REJ_PASSIVE_SIDE_ERROR;
BTL_ERROR(("qp_create_all failed -- reject"));
@ -1923,16 +1933,19 @@ static int request_received(ibcm_listen_cm_id_t *cmh,
goto reject;
}
/* Post receive buffers. Similar to QP creation, above, we post
*all* receive buffers at once (for all QPs). So ensure to only
do this for the first request received. If this is not the
first request on this endpoint, then assume that all the
receive buffers have been posted already. */
/* Post a single receive buffer on the smallest QP for the CTS
protocol */
if (!ie->ie_recv_buffers_posted) {
if (OMPI_SUCCESS !=
(rc = mca_btl_openib_endpoint_post_recvs(endpoint))) {
/* JMS */
BTL_VERBOSE(("failed to post recv buffers"));
struct ibv_recv_wr *bad_wr, *wr;
assert(NULL != endpoint->endpoint_cts_frag.super.super.base.super.ptr);
wr = &(endpoint->endpoint_cts_frag.rd_desc);
assert(NULL != wr);
wr->next = NULL;
OPAL_OUTPUT((-1, "REQUEST posting CTS recv buffer"));
if (0 != ibv_post_recv(endpoint->qps[mca_btl_openib_component.credits_qp].qp->lcl_qp, wr, &bad_wr)) {
BTL_VERBOSE(("failed to post CTS recv buffer"));
rej_reason = REJ_PASSIVE_SIDE_ERROR;
goto reject;
}
@ -2045,32 +2058,31 @@ static int request_received(ibcm_listen_cm_id_t *cmh,
(ompi_btl_openib_connect_base_module_t *) imodule;
cbdata->cscd_endpoint = endpoint;
BTL_VERBOSE(("starting connect in other direction"));
ompi_btl_openib_fd_schedule(callback_start_connect, cbdata);
ompi_btl_openib_fd_run_in_main(callback_start_connect, cbdata);
TIMER_STOP(REQUEST_RECEIVED);
return OMPI_SUCCESS;
}
BTL_ERROR(("malloc failed"));
}
/* Communicate to the upper layer that the connection on this
endpoint has failed */
TIMER_STOP(REQUEST_RECEIVED);
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
endpoint);
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
endpoint);
return rc;
}
/*
* Callback (from main thread) when the endpoint has been connected
*/
static void *callback_set_endpoint_connected(void *context)
static void *callback_set_endpoint_cpc_complete(void *context)
{
mca_btl_openib_endpoint_t *endpoint = (mca_btl_openib_endpoint_t*) context;
BTL_VERBOSE(("calling endpoint_connected"));
mca_btl_openib_endpoint_connected(endpoint);
BTL_VERBOSE(("*** CONNECTED endpoint_connected done!"));
BTL_VERBOSE(("calling endpoint_cpc_complete"));
mca_btl_openib_endpoint_cpc_complete(endpoint);
BTL_VERBOSE(("*** CONNECTED endpoint_cpc_complete done!"));
return NULL;
}
@ -2133,18 +2145,27 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
goto error;
}
/* Now that all the qp's are created locally, post some receive
buffers, setup credits, etc. The post_recvs() call posts the
buffers for all QPs at once, so be sure to only do this for the
*first* reply that is received on an endpoint. For all other
replies received on an endpoint, we can safely assume that the
receive buffers have already been posted. */
/* Now that all the qp's are created locally, post a single
receive buffer on the smallest QP for the CTS protocol. Be
sure to only do this for the *first* reply that is received on
an endpoint. For all other replies received on an endpoint, we
can safely assume that the CTS receive buffer has already been
posted. */
if (!ie->ie_recv_buffers_posted) {
if (OMPI_SUCCESS !=
(rc = mca_btl_openib_endpoint_post_recvs(endpoint))) {
BTL_VERBOSE(("failed to post recv buffers"));
struct ibv_recv_wr *bad_wr, *wr;
OPAL_OUTPUT((-1, "REPLY posting CTS recv buffer"));
assert(NULL != endpoint->endpoint_cts_frag.super.super.base.super.ptr);
wr = &(endpoint->endpoint_cts_frag.rd_desc);
assert(NULL != wr);
wr->next = NULL;
if (0 != ibv_post_recv(endpoint->qps[mca_btl_openib_component.credits_qp].qp->lcl_qp, wr, &bad_wr)) {
/* JMS */
BTL_VERBOSE(("failed to post CTS recv buffer"));
goto error;
}
ie->ie_recv_buffers_posted = true;
}
@ -2168,7 +2189,7 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
that we're done. */
if (0 == --(ie->ie_qps_to_connect)) {
BTL_VERBOSE(("REPLY telling main BTL we're connected"));
ompi_btl_openib_fd_schedule(callback_set_endpoint_connected, endpoint);
ompi_btl_openib_fd_run_in_main(callback_set_endpoint_cpc_complete, endpoint);
}
TIMER_STOP(REPLY_RECEIVED);
@ -2177,8 +2198,8 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
error:
/* Communicate to the upper layer that the connection on this
endpoint has failed */
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
endpoint);
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
endpoint);
return rc;
}
@ -2207,7 +2228,7 @@ static int ready_to_use_received(ibcm_listen_cm_id_t *h,
that we're done. */
if (0 == --(ie->ie_qps_to_connect)) {
BTL_VERBOSE(("RTU telling main BTL we're connected"));
ompi_btl_openib_fd_schedule(callback_set_endpoint_connected, endpoint);
ompi_btl_openib_fd_run_in_main(callback_set_endpoint_cpc_complete, endpoint);
}
BTL_VERBOSE(("all done"));
@ -2216,25 +2237,6 @@ static int ready_to_use_received(ibcm_listen_cm_id_t *h,
}
static int disconnect_request_received(ibcm_listen_cm_id_t *cmh,
struct ib_cm_event *event)
{
BTL_VERBOSE(("disconnect request received"));
return OMPI_SUCCESS;
}
static int disconnect_reply_received(ibcm_listen_cm_id_t *cmd,
struct ib_cm_event *event)
{
BTL_VERBOSE(("disconnect reply received"));
#if 0
ib_cm_send_drep(event->cm_id, NULL, 0);
#endif
return OMPI_SUCCESS;
}
static int reject_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
{
enum ib_cm_rej_reason reason = event->param.rej_rcvd.reason;
@ -2298,7 +2300,7 @@ static int reject_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
reason));
/* Communicate to the upper layer that the connection on this
endpoint has failed */
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error, NULL);
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, NULL);
return OMPI_ERR_NOT_FOUND;
}
@ -2330,8 +2332,8 @@ static int request_error(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
/* Communicate to the upper layer that the connection on this
endpoint has failed */
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
endpoint);
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
endpoint);
return OMPI_SUCCESS;
}
@ -2362,28 +2364,12 @@ static int reply_error(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
/* Communicate to the upper layer that the connection on this
endpoint has failed */
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
endpoint);
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
endpoint);
return OMPI_SUCCESS;
}
static int disconnect_request_error(ibcm_listen_cm_id_t *cmh,
struct ib_cm_event *e)
{
BTL_VERBOSE(("disconnect request error!"));
return OMPI_SUCCESS;
}
static int unhandled_event(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *e)
{
BTL_VERBOSE(("unhandled event error (%p, %d)",
(void*) e, e->event));
return OMPI_ERR_NOT_FOUND;
}
static void *ibcm_event_dispatch(int fd, int flags, void *context)
{
bool want_ack;
@ -2391,38 +2377,46 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context)
ibcm_listen_cm_id_t *cmh = (ibcm_listen_cm_id_t*) context;
struct ib_cm_event *e = NULL;
OPAL_OUTPUT((-1, "ibcm dispatch: on device 0x%" PRIx64", fd %d",
(uint64_t) cmh->cm_device, fd));
TIMER_START(CM_GET_EVENT);
/* Blocks until next event, which should be immediately (because
we shouldn't call this dispatch function unless there's
something ready to read) */
rc = ib_cm_get_event(cmh->cm_device, &e);
TIMER_STOP(CM_GET_EVENT);
if (-ENODATA == rc) {
OPAL_OUTPUT((-1, "ibcm dispatch: GOT NOT DATA!"));
return NULL;
}
while (-1 == rc && EAGAIN == errno) {
OPAL_OUTPUT((-1, "ibcm dispatch: GOT EAGAIN!"));
/* Try again */
rc = ib_cm_get_event(cmh->cm_device, &e);
}
if (0 == rc && NULL != e) {
want_ack = true;
switch (e->event) {
case IB_CM_REQ_RECEIVED:
OPAL_OUTPUT((-1, "ibcm dispatch: request received on fd %d", fd));
/* Incoming request */
rc = request_received(cmh, e);
break;
case IB_CM_REP_RECEIVED:
OPAL_OUTPUT((-1, "ibcm dispatch: reply received on fd %d", fd));
/* Reply received */
rc = reply_received(cmh, e);
break;
case IB_CM_RTU_RECEIVED:
OPAL_OUTPUT((-1, "ibcm dispatch: RTU received on fd %d", fd));
/* Ready to use! */
rc = ready_to_use_received(cmh, e);
break;
case IB_CM_DREQ_RECEIVED:
/* Disconnect request */
rc = disconnect_request_received(cmh, e);
break;
case IB_CM_DREP_RECEIVED:
/* Disconnect reply */
rc = disconnect_reply_received(cmh, e);
break;
case IB_CM_REJ_RECEIVED:
OPAL_OUTPUT((-1, "ibcm dispatch: reject received on fd %d", fd));
/* Rejected connection */
rc = reject_received(cmh, e);
/* reject_received() called ib_cm_ack_event so that the CM
@ -2431,22 +2425,32 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context)
break;
case IB_CM_REQ_ERROR:
OPAL_OUTPUT((-1, "ibcm dispatch: request error received on fd %d", fd));
/* Request error */
rc = request_error(cmh, e);
break;
case IB_CM_REP_ERROR:
OPAL_OUTPUT((-1, "ibcm dispatch: reply error received on fd %d", fd));
/* Reply error */
rc = reply_error(cmh, e);
break;
case IB_CM_DREQ_RECEIVED:
case IB_CM_DREP_RECEIVED:
case IB_CM_DREQ_ERROR:
/* Disconnect request error */
rc = disconnect_request_error(cmh, e);
OPAL_OUTPUT((-1, "ibcm dispatch: %s received on fd %d",
(IB_CM_DREQ_RECEIVED == e->event) ? "disconnect request" :
(IB_CM_DREP_RECEIVED == e->event) ? "disconnect reply" :
"disconnect request error", fd));
/* We don't care */
rc = OMPI_SUCCESS;
break;
default:
rc = unhandled_event(cmh, e);
/* This would be odd */
OPAL_OUTPUT((-1, "ibcm dispatch: unhandled event received on fd %d", fd));
rc = OMPI_ERR_NOT_FOUND;
break;
}
@ -2462,6 +2466,8 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context)
function), the dispatch function would have done that
already */
}
} else {
OPAL_OUTPUT((-1, "Got weird value back from ib_cm_get_event: %d", rc));
}
return NULL;

Просмотреть файл

@ -86,7 +86,7 @@ ompi_btl_openib_connect_base_component_t ompi_btl_openib_connect_oob = {
/* Query */
oob_component_query,
/* Finalize */
oob_component_finalize
oob_component_finalize,
};
/* Open - this functions sets up any oob specific commandline params */
@ -166,6 +166,7 @@ static int oob_component_query(mca_btl_openib_module_t *btl,
(*cpc)->cbm_start_connect = oob_module_start_connect;
(*cpc)->cbm_endpoint_finalize = NULL;
(*cpc)->cbm_finalize = NULL;
(*cpc)->cbm_uses_cts = false;
opal_output_verbose(5, mca_btl_base_output,
"openib BTL: oob CPC available for use on %s",
@ -828,19 +829,19 @@ static void rml_recv_cb(int status, orte_process_name_t* process_name,
} else {
send_connect_data(ib_endpoint, ENDPOINT_CONNECT_ACK);
/* Tell main BTL that we're done */
mca_btl_openib_endpoint_connected(ib_endpoint);
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
}
break;
case MCA_BTL_IB_WAITING_ACK:
/* Tell main BTL that we're done */
mca_btl_openib_endpoint_connected(ib_endpoint);
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
break;
case MCA_BTL_IB_CONNECT_ACK:
send_connect_data(ib_endpoint, ENDPOINT_CONNECT_ACK);
/* Tell main BTL that we're done */
mca_btl_openib_endpoint_connected(ib_endpoint);
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
break;
case MCA_BTL_IB_CONNECTED:

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -48,7 +48,7 @@ ompi_btl_openib_connect_base_component_t ompi_btl_openib_connect_xoob = {
/* Query */
xoob_component_query,
/* Finalize */
xoob_component_finalize
xoob_component_finalize,
};
typedef enum {
@ -892,7 +892,7 @@ static void xoob_rml_recv_cb(int status, orte_process_name_t* process_name,
mca_btl_openib_endpoint_invoke_error(NULL);
return;
}
mca_btl_openib_endpoint_connected(ib_endpoint);
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
OPAL_THREAD_UNLOCK(&ib_endpoint->endpoint_lock);
break;
case ENDPOINT_XOOB_CONNECT_XRC_RESPONSE:
@ -911,7 +911,7 @@ static void xoob_rml_recv_cb(int status, orte_process_name_t* process_name,
OPAL_THREAD_LOCK(&ib_endpoint->endpoint_lock);
/* we got srq numbers on our request */
XOOB_SET_REMOTE_INFO(ib_endpoint->rem_info, rem_info);
mca_btl_openib_endpoint_connected(ib_endpoint);
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
OPAL_THREAD_UNLOCK(&ib_endpoint->endpoint_lock);
break;
case ENDPOINT_XOOB_CONNECT_XRC_NR_RESPONSE:
@ -989,6 +989,7 @@ static int xoob_component_query(mca_btl_openib_module_t *openib_btl,
(*cpc)->cbm_start_connect = xoob_module_start_connect;
(*cpc)->cbm_endpoint_finalize = NULL;
(*cpc)->cbm_finalize = NULL;
(*cpc)->cbm_uses_cts = false;
opal_output_verbose(5, mca_btl_base_output,
"openib BTL: xoob CPC available for use on %s",

Просмотреть файл

@ -128,19 +128,29 @@
* used. There is only this function, which tells when a specific
* CPC/BTL module is no longer being used.
*
* There are two functions in the main openib BTL that the CPC will
* cbm_uses_cts: a bool that indicates whether the CPC will use the
* CTS protocol or not.
* - if true: the CPC will post the fragment on
* endpoint->endpoint_cts_frag as a receive buffer and will *not*
* call ompi_btl_openib_post_recvs().
* - if false: the CPC will call ompi_btl_openib_post_recvs() before
* calling ompi_btl_openib_cpc_complete().
*
* There are two functions in the main openib BTL that the CPC may
* call:
*
* - ompi_btl_openib_post_recvs(endpoint): once a QP is locally
* connected to the remote side (but we don't know if the remote side
* is connected to us yet), this function is invoked to post buffers
* on the QP, setup credits for the endpoint, etc.
* on the QP, setup credits for the endpoint, etc. This function is
* *only* invoked if the CPC's cbm_uses_cts is false.
*
* - ompi_btl_openib_connected(endpoint): once we know that a QP is
* connected on *both* sides, this function is invoked to tell the
* main openib BTL "ok, you can use this connection now." (e.g., the
* main openib BTL will start sending out fragments that were queued
* while the connection was establing, etc.).
* - ompi_btl_openib_cpc_complete(endpoint): once that a CPC knows
* that a QP is connected on *both* sides, this function is invoked to
* tell the main openib BTL "ok, you can use this connection now."
* (e.g., the main openib BTL will either invoke the CTS protocol or
* start sending out fragments that were queued while the connection
* was establishing, etc.).
*/
#ifndef BTL_OPENIB_CONNECT_H
#define BTL_OPENIB_CONNECT_H
@ -330,6 +340,14 @@ typedef struct ompi_btl_openib_connect_base_module_t {
/** Finalize the cpc module */
ompi_btl_openib_connect_base_module_finalize_fn_t cbm_finalize;
/** Whether this module will use the CTS protocol or not. This
directly states whether this module will call
mca_btl_openib_endpoint_post_recvs() or not: true = this
module will *not* call _post_recvs() and instead will post the
receive buffer provided at endpoint->endpoint_cts_frag on qp
0. */
bool cbm_uses_cts;
} ompi_btl_openib_connect_base_module_t;
END_C_DECLS

Просмотреть файл

@ -17,3 +17,52 @@ associated with it. The OpenFabrics RDMA CM connection scheme
Local host: %s
Local device: %s
#
[could not find matching endpoint]
The OpenFabrics device in an MPI process received an RDMA CM connect
request for a peer that it could not identify as part of this MPI job.
This should not happen. Your process is likely to abort; sorry.
Local host: %s
Local device: %s
Remote address: %s
Remote TCP port: %d
#
[illegal tcp port]
The btl_openib_connect_rdmacm_port MCA parameter was used to specify
an illegal TCP port value. TCP ports must be between 0 and 65536
(ports below 1024 can only be used by root).
TCP port: %d
This value was ignored.
#
[illegal timeout]
The btl_openib_connect_rdmacm_resolve_timeout parameter was used to
specify an illegal timeout value. Timeout values are specified in
miliseconds and must be greater than 0.
Timeout value: %d
This value was ignored.
#
[rdma cm device removal]
The RDMA CM returned that the device Open MPI was trying to use has
been removed.
Local host: %s
Local device: %s
Your MPI job will now abort, sorry.
#
[rdma cm event error]
The RDMA CM returned an event error while attempting to make a
connection. This type of error usually indicates a network
configuration error.
Local host: %s
Local device: %s
Error name: %s
Peer: %s
Your MPI job will now abort, sorry.

Просмотреть файл

@ -234,6 +234,17 @@ WARNING: There was an error initializing an OpenFabrics device.
Local host: %s
Local device: %s
#
[no devices right type]
WARNING: No OpenFabrics devices of the right type were found within
the requested bus distance. The OpenFabrics BTL will be ignored for
this run.
Local host: %s
Requested type: %s
If the "requested type" is "<any>", this usually means that *no*
OpenFabrics devices were found within the requested bus distance.
#
[default subnet prefix]
WARNING: There are more than one active ports on host '%s', but the
default subnet GID prefix was detected on more than one of these
@ -528,14 +539,6 @@ adapter default settings file:
%s/mca-btl-openib-device-params.ini
#
[failed load cpc]
No OpenFabrics connection schemes reported that they were able to be
used on a specific device. As such, the openib BTL (OpenFabrics
support) will be disabled.
Local host: %s
Local device: %s
#
[eager RDMA and progress threads]
WARNING: The openib BTL was directed to use "eager RDMA" for short
messages, but the openib BTL was compiled with progress threads
@ -552,3 +555,22 @@ therefore disabling the use of the openib BTL in this process for this
run.
Local host: %s
#
[cannot raise btl error]
The OpenFabrics driver in Open MPI tried to raise a fatal error, but
failed. Hopefully there was an error message before this one that
gave some more detailed information.
Local host: %s
Source file: %s
Source line: %d
Your job is now going to abort, sorry.
#
[no iwarp support]
Open MPI does not support iWARP devices with this version of OFED.
You need to upgrade to a later version of OFED (1.3 or later) for Open
MPI to support iWARP devices.
(This message is being displayed because you told Open MPI to use
iWARP devices via the btl_openib_device_type MCA parameter)