1
1

Back up r19489, which was the result of a "svn ci -m ..." instead of

an "hg ci -m ...".  Oops.

This commit was SVN r19490.

The following SVN revision numbers were found above:
  r19489 --> open-mpi/ompi@ea866f9e26
Этот коммит содержится в:
Jeff Squyres 2008-09-03 08:45:33 +00:00
родитель ea866f9e26
Коммит 7b05a14d9a
16 изменённых файлов: 456 добавлений и 993 удалений

Просмотреть файл

@ -162,11 +162,12 @@ else
************************************************************************
Open MPI was unable to find threading support on your system. The
OMPI development team is considering requiring threading support for
proper OMPI execution. This is in part because we are not aware of
any users that do not have thread support - so we need you to e-mail
us at ompi@ompi-mpi.org and let us know about this problem.
Open MPI was unable to find threading support on your system. In the
near future, the OMPI development team is considering requiring
threading support for proper OMPI execution. This is in part because
we are not aware of any users that do not have thread support - so we
need you to e-mail us at ompi@ompi-mpi.org and let us know about this
problem.
************************************************************************

Просмотреть файл

@ -967,6 +967,15 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
OBJ_RELEASE(endpoint);
}
/* Finalize the CPC modules on this openib module */
for (i = 0; i < openib_btl->num_cpcs; ++i) {
if (NULL != openib_btl->cpcs[i]->cbm_finalize) {
openib_btl->cpcs[i]->cbm_finalize(openib_btl, openib_btl->cpcs[i]);
}
free(openib_btl->cpcs[i]);
}
free(openib_btl->cpcs);
/* Release SRQ resources */
for(qp = 0; qp < mca_btl_openib_component.num_qps; qp++) {
if(!BTL_OPENIB_QP_TYPE_PP(qp)) {
@ -983,15 +992,6 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
}
}
/* Finalize the CPC modules on this openib module */
for (i = 0; i < openib_btl->num_cpcs; ++i) {
if (NULL != openib_btl->cpcs[i]->cbm_finalize) {
openib_btl->cpcs[i]->cbm_finalize(openib_btl, openib_btl->cpcs[i]);
}
free(openib_btl->cpcs[i]);
}
free(openib_btl->cpcs);
/* Release device if there are no more users */
if(!(--openib_btl->device->btls)) {
OBJ_RELEASE(openib_btl->device);

Просмотреть файл

@ -440,28 +440,6 @@ static void btl_openib_control(mca_btl_base_module_t* btl,
(((unsigned char*)clsc_hdr) + skip);
}
break;
case MCA_BTL_OPENIB_CONTROL_CTS:
OPAL_OUTPUT((0, "received CTS control message: posted recvs %d, sent cts %d",
ep->endpoint_posted_recvs, ep->endpoint_cts_sent));
ep->endpoint_cts_received = true;
/* Only send the CTS back and mark connected if:
- we have posted our receives (it's possible that we can
get this CTS before this side's CPC has called
cpc_complete())
- we have not yet sent our CTS
We don't even want to mark the endpoint connected() until
we have posted our receives because otherwise we will
trigger credit management (because the rd_credits will
still be negative), and Bad Things will happen. */
if (ep->endpoint_posted_recvs) {
if (!ep->endpoint_cts_sent) {
mca_btl_openib_endpoint_send_cts(ep);
}
mca_btl_openib_endpoint_connected(ep);
}
break;
default:
BTL_ERROR(("Unknown message type received by BTL"));
break;

Просмотреть файл

@ -390,17 +390,6 @@ void mca_btl_openib_endpoint_init(mca_btl_openib_module_t *btl,
for (qp = 0; qp < mca_btl_openib_component.num_qps; qp++) {
endpoint_init_qp(ep, qp);
}
/* If the CPC uses the CTS protocol, provide a frag buffer for the
CPC to post */
if (ep->endpoint_local_cpc->cbm_uses_cts) {
int rc;
ompi_free_list_item_t* item;
OMPI_FREE_LIST_WAIT(&btl->device->qps[0].recv_free, item, rc);
to_base_frag(item)->base.order = 0;
to_com_frag(item)->endpoint = ep;
ep->endpoint_cts_frag = item;
}
}
static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
@ -428,7 +417,6 @@ static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
endpoint->endpoint_proc = 0;
endpoint->endpoint_local_cpc = NULL;
endpoint->endpoint_remote_cpc_data = NULL;
endpoint->endpoint_initiator = false;
endpoint->endpoint_tstamp = 0.0;
endpoint->endpoint_state = MCA_BTL_IB_CLOSED;
endpoint->endpoint_retries = 0;
@ -454,10 +442,6 @@ static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
endpoint->use_eager_rdma = false;
endpoint->eager_rdma_remote.tokens = 0;
endpoint->eager_rdma_local.credits = 0;
endpoint->endpoint_cts_frag = NULL;
endpoint->endpoint_posted_recvs = false;
endpoint->endpoint_cts_received = false;
endpoint->endpoint_cts_sent = false;
}
/*
@ -475,12 +459,6 @@ static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint)
endpoint->endpoint_local_cpc->cbm_endpoint_finalize(endpoint);
}
/* Release CTS buffer */
if (NULL != endpoint->endpoint_cts_frag) {
MCA_BTL_IB_FRAG_RETURN(endpoint->endpoint_cts_frag);
endpoint->endpoint_cts_frag = NULL;
}
/* Release memory resources */
do {
/* Make sure that mca_btl_openib_endpoint_connect_eager_rdma ()
@ -553,8 +531,8 @@ static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint)
/*
* Called when the connect module has created all the qp's on an
* endpoint and needs to have some receive buffers posted.
* call when the connect module has created all the qp's on an
* endpoint and needs to have some receive buffers posted
*/
int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t *endpoint)
{
@ -571,120 +549,6 @@ int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t *endpoint)
return OMPI_SUCCESS;
}
static void cts_sent(mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* des,
int status)
{
/* Nothing to do/empty function (we can't pass in a NULL pointer
for the des_cbfunc) */
opal_output(-1, "CTS send completed");
}
/*
* Send CTS control fragment
*/
void mca_btl_openib_endpoint_send_cts(mca_btl_openib_endpoint_t *endpoint)
{
mca_btl_openib_send_control_frag_t *sc_frag;
mca_btl_base_descriptor_t *base_des;
mca_btl_openib_frag_t *openib_frag;
mca_btl_openib_com_frag_t *com_frag;
mca_btl_openib_control_header_t *ctl_hdr;
OPAL_OUTPUT((0, "SENDING CTS to %s",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
sc_frag = alloc_control_frag(endpoint->endpoint_btl);
if (OPAL_UNLIKELY(NULL == sc_frag)) {
BTL_ERROR(("Failed to allocate control buffer"));
mca_btl_openib_endpoint_invoke_error(endpoint);
return;
}
/* I dislike using the "to_<foo>()" macros; I prefer using the
explicit member fields to ensure I get the types right. Since
this is not a performance-criticial part of the code, it's
ok. */
com_frag = &(sc_frag->super.super);
openib_frag = &(com_frag->super);
base_des = &(openib_frag->base);
base_des->des_cbfunc = cts_sent;
base_des->des_cbdata = NULL;
base_des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
base_des->order = mca_btl_openib_component.credits_qp;
openib_frag->segment.seg_len = sizeof(mca_btl_openib_control_header_t);
com_frag->endpoint = endpoint;
sc_frag->hdr->tag = MCA_BTL_TAG_BTL;
sc_frag->hdr->cm_seen = 0;
sc_frag->hdr->credits = 0;
ctl_hdr = (mca_btl_openib_control_header_t*)
openib_frag->segment.seg_addr.pval;
ctl_hdr->type = MCA_BTL_OPENIB_CONTROL_CTS;
/* Send the fragment */
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
if (OMPI_SUCCESS != mca_btl_openib_endpoint_post_send(endpoint, sc_frag)) {
BTL_ERROR(("Failed to post CTS send"));
mca_btl_openib_endpoint_invoke_error(endpoint);
}
endpoint->endpoint_cts_sent = true;
/* We've sent the frag, so it'll be reclaimed in the normal
fashion -- don't let endpoint_destruct() FRAG_RETURN this
frag */
endpoint->endpoint_cts_frag = NULL;
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
}
/*
* Called when the CPC has established a connection on an endpoint
*/
void mca_btl_openib_endpoint_cpc_complete(mca_btl_openib_endpoint_t *endpoint)
{
/* If the CPC uses the CTS protocol, then start it up */
if (NULL != endpoint->endpoint_cts_frag) {
/* Post our receives, which will make credit management happy
(i.e., rd_credits will be 0) */
if (OMPI_SUCCESS != mca_btl_openib_endpoint_post_recvs(endpoint)) {
BTL_ERROR(("Failed to post receive buffers"));
mca_btl_openib_endpoint_invoke_error(endpoint);
return;
}
endpoint->endpoint_posted_recvs = true;
/* If this is IB, send the CTS immediately. If this is iWARP,
then only send the CTS if this endpoint was the initiator
of the connection (the receiver will send its CTS when it
receives this side's CTS). Also send the CTS if we already
received the peer's CTS (e.g., if this process was slow to
call cpc_complete() and we had already received the other
side's CTS). */
OPAL_OUTPUT((0, "cpc_complete: is IB %d, initiatior %d, cts received: %d",
(IBV_TRANSPORT_IB ==
endpoint->endpoint_btl->device->ib_dev->transport_type),
endpoint->endpoint_initiator,
endpoint->endpoint_cts_received));
if (IBV_TRANSPORT_IB ==
endpoint->endpoint_btl->device->ib_dev->transport_type ||
endpoint->endpoint_initiator ||
endpoint->endpoint_cts_received) {
mca_btl_openib_endpoint_send_cts(endpoint);
/* If we've already got the CTS from the other side, then
mark us as connected */
if (endpoint->endpoint_cts_received) {
mca_btl_openib_endpoint_connected(endpoint);
}
}
return;
}
/* Otherwise, just set the endpoint to "connected" */
mca_btl_openib_endpoint_connected(endpoint);
}
/*
* called when the connect module has completed setup of an endpoint
@ -696,7 +560,6 @@ void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t *endpoint)
mca_btl_openib_endpoint_t *ep;
bool master = false;
opal_output(-1, "Now we are CONNECTED");
if (MCA_BTL_XRC_ENABLED) {
OPAL_THREAD_LOCK(&endpoint->ib_addr->addr_lock);
if (MCA_BTL_IB_ADDR_CONNECTED == endpoint->ib_addr->status) {
@ -829,7 +692,7 @@ void mca_btl_openib_endpoint_send_credits(mca_btl_openib_endpoint_t* endpoint,
frag = endpoint->qps[qp].credit_frag;
if(OPAL_UNLIKELY(NULL == frag)) {
frag = alloc_control_frag(openib_btl);
frag = alloc_credit_frag(openib_btl);
frag->qp_idx = qp;
endpoint->qps[qp].credit_frag = frag;
/* set those once and forever */
@ -918,7 +781,7 @@ static int mca_btl_openib_endpoint_send_eager_rdma(
mca_btl_openib_send_control_frag_t* frag;
int rc;
frag = alloc_control_frag(openib_btl);
frag = alloc_credit_frag(openib_btl);
if(NULL == frag) {
return -1;
}
@ -1075,8 +938,7 @@ void *mca_btl_openib_endpoint_invoke_error(void *context)
if (NULL == endpoint) {
int i;
for (i = 0; i < mca_btl_openib_component.ib_num_btls; ++i) {
if (NULL != mca_btl_openib_component.openib_btls[i] &&
NULL != mca_btl_openib_component.openib_btls[i]->error_cb) {
if (NULL != mca_btl_openib_component.openib_btls[i]) {
btl = mca_btl_openib_component.openib_btls[i];
break;
}
@ -1086,11 +948,9 @@ void *mca_btl_openib_endpoint_invoke_error(void *context)
}
/* If we didn't find a BTL, then just bail :-( */
if (NULL == btl || NULL == btl->error_cb) {
if (NULL == btl) {
orte_show_help("help-mpi-btl-openib.txt",
"cannot raise btl error", true,
orte_process_info.nodename,
__FILE__, __LINE__);
"cannot raise btl error", orte_process_info.nodename);
exit(1);
}

Просмотреть файл

@ -167,15 +167,7 @@ struct mca_btl_base_endpoint_t {
/** hook for local CPC to hang endpoint-specific data */
void *endpoint_local_cpc_data;
/** If endpoint_local_cpc->cbm_uses_cts is true and this endpoint
is iWARP, then endpoint_initiator must be true on the side
that actually initiates the QP, false on the other side. This
bool is used to know which way to send the first CTS
message. */
bool endpoint_initiator;
/** pointer to remote proc's CPC data (essentially its CPC modex
message) */
/** pointer to remote CPC's data (essentially its CPC modex message) */
ompi_btl_openib_connect_base_module_data_t *endpoint_remote_cpc_data;
/** current state of the connection */
@ -228,22 +220,6 @@ struct mca_btl_base_endpoint_t {
/** information about the remote port */
mca_btl_openib_rem_info_t rem_info;
/** Frag for initial wireup CTS protocol; will be NULL if CPC
indicates that it does not want to use CTS */
ompi_free_list_item_t *endpoint_cts_frag;
/** Whether we've posted receives on this EP or not (only used in
CTS protocol) */
bool endpoint_posted_recvs;
/** Whether we've received the CTS from the peer or not (only used
in CTS protocol) */
bool endpoint_cts_received;
/** Whether we've send out CTS to the peer or not (only used in
CTS protocol) */
bool endpoint_cts_sent;
};
typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t;
@ -268,8 +244,6 @@ int mca_btl_openib_endpoint_post_send(mca_btl_openib_endpoint_t*,
void mca_btl_openib_endpoint_send_credits(mca_btl_base_endpoint_t*, const int);
void mca_btl_openib_endpoint_connect_eager_rdma(mca_btl_openib_endpoint_t*);
int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t*);
void mca_btl_openib_endpoint_send_cts(mca_btl_openib_endpoint_t *endpoint);
void mca_btl_openib_endpoint_cpc_complete(mca_btl_openib_endpoint_t*);
void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t*);
void mca_btl_openib_endpoint_init(mca_btl_openib_module_t*,
mca_btl_base_endpoint_t*,
@ -312,7 +286,7 @@ static inline int post_recvs(mca_btl_base_endpoint_t *ep, const int qp,
if (0 == rc)
return OMPI_SUCCESS;
BTL_ERROR(("error %d posting receive on qp %d", rc, qp));
BTL_ERROR(("error %d posting receive on qp %d\n", rc, qp));
return OMPI_ERROR;
}

Просмотреть файл

@ -8,23 +8,6 @@
* $HEADER$
*/
/**
* Note: this file is a little fast-n-loose with OMPI_HAVE_THREADS --
* it uses this value in run-time "if" conditionals (vs. compile-time
* #if conditionals). We also don't protect including <pthread.h>.
* That's because this component currently only compiles on Linux and
* Solaris, and both of these OS's have pthreads. Using the run-time
* conditionals gives us bettern compile-time checking, even of code
* that isn't activated.
*
* Note, too, that the functionality in this file does *not* require
* all the heavyweight OMPI thread infrastructure (e.g., from
* --enable-mpi-threads or --enable-progress-threads). All work that
* is done in a separate progress thread is very carefully segregated
* from that of the main thread, and communication back to the main
* thread
*/
#include "ompi_config.h"
#include <pthread.h>
@ -41,11 +24,6 @@
#include "btl_openib_fd.h"
typedef union {
ompi_btl_openib_fd_event_callback_fn_t *event;
ompi_btl_openib_fd_main_callback_fn_t *main;
} callback_u_t;
/*
* Data for each registered item
*/
@ -55,7 +33,10 @@ typedef struct {
opal_event_t ri_event;
int ri_fd;
int ri_flags;
callback_u_t ri_callback;
union {
ompi_btl_openib_fd_callback_fn_t *fd;
ompi_btl_openib_schedule_callback_fn_t *schedule;
} ri_callback;
void *ri_context;
} registered_item_t;
@ -65,14 +46,9 @@ static OBJ_CLASS_INSTANCE(registered_item_t, opal_list_item_t, NULL, NULL);
* Command types
*/
typedef enum {
/* Read by service thread */
CMD_TIME_TO_QUIT,
CMD_ADD_FD,
CMD_REMOVE_FD,
ACK_RAN_FUNCTION,
/* Read by main thread */
CMD_CALL_FUNCTION,
CMD_MAX
} cmd_type_t;
@ -80,7 +56,7 @@ typedef enum {
* Commands. Fields ordered to avoid memory holes (and valgrind warnings).
*/
typedef struct {
callback_u_t pc_fn;
ompi_btl_openib_fd_callback_fn_t *pc_callback;
void *pc_context;
int pc_fd;
int pc_flags;
@ -88,16 +64,6 @@ typedef struct {
char end;
} cmd_t;
/*
* Queued up list of commands to send to the main thread
*/
typedef struct {
opal_list_item_t super;
cmd_t cli_cmd;
} cmd_list_item_t;
static OBJ_CLASS_INSTANCE(cmd_list_item_t, opal_list_item_t, NULL, NULL);
static bool initialized = false;
static int cmd_size = 0;
static fd_set read_fds, write_fds;
@ -105,17 +71,117 @@ static int max_fd;
static opal_list_t registered_items;
/* These items are only used in the threaded version */
/* Owned by the main thread */
static pthread_t thread;
static opal_event_t main_thread_event;
static int pipe_to_service_thread[2] = { -1, -1 };
static int pipe_fd[2] = { -1, -1 };
/* Owned by the service thread */
static int pipe_to_main_thread[2] = { -1, -1 };
static const size_t max_outstanding_to_main_thread = 32;
static size_t waiting_for_ack_from_main_thread = 0;
static opal_list_t pending_to_main_thread;
static void libevent_fd_callback(int fd, short event, void *context)
{
registered_item_t *ri = (registered_item_t*) context;
ri->ri_callback.fd(fd, event, ri->ri_context);
}
static void libevent_event_callback(int fd, short event, void *context)
{
registered_item_t *ri = (registered_item_t*) context;
ri->ri_callback.schedule(ri->ri_context);
/* JMS Can I free ri now? It contains the event... */
#if 0
OBJ_RELEASE(ri);
#endif
}
/*
* Add an fd to the listening set
*/
static int local_pipe_cmd_add_fd(bool use_libevent, cmd_t *cmd)
{
registered_item_t *ri = OBJ_NEW(registered_item_t);
if (NULL == ri) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
ri->ri_event_used = false;
ri->ri_fd = cmd->pc_fd;
ri->ri_flags = cmd->pc_flags;
ri->ri_callback.fd = cmd->pc_callback;
ri->ri_context = cmd->pc_context;
if (use_libevent) {
/* Make an event for this fd */
ri->ri_event_used = true;
memset(&ri->ri_event, 0, sizeof(ri->ri_event));
opal_event_set(&ri->ri_event, ri->ri_fd,
ri->ri_flags | OPAL_EV_PERSIST, libevent_fd_callback,
ri);
opal_event_add(&ri->ri_event, 0);
} else {
/* Add the fd to the relevant fd local sets and update max_fd */
if (OPAL_EV_READ & ri->ri_flags) {
FD_SET(ri->ri_fd, &read_fds);
}
if (OPAL_EV_WRITE & cmd->pc_flags) {
FD_SET(ri->ri_fd, &write_fds);
}
max_fd = (max_fd > ri->ri_fd) ? max_fd : ri->ri_fd + 1;
}
opal_list_append(&registered_items, &ri->super);
return OMPI_SUCCESS;
}
/*
* Remove an fd from the listening set
*/
static int local_pipe_cmd_remove_fd(cmd_t *cmd)
{
int i;
opal_list_item_t *item;
registered_item_t *ri;
/* Go through the list of registered fd's and find the fd to
remove */
for (item = opal_list_get_first(&registered_items);
NULL != opal_list_get_end(&registered_items);
item = opal_list_get_next(item)) {
ri = (registered_item_t*) item;
if (cmd->pc_fd == ri->ri_fd) {
/* Found it. The item knows if it was used as a libevent
event or an entry in the local fd sets. */
if (ri->ri_event_used) {
/* Remove this event from libevent */
opal_event_del(&ri->ri_event);
} else {
/* Remove this item from the fd_sets and recalculate
max_fd */
FD_CLR(cmd->pc_fd, &read_fds);
FD_CLR(cmd->pc_fd, &write_fds);
for (max_fd = i = pipe_fd[0]; i < FD_SETSIZE; ++i) {
if (FD_ISSET(i, &read_fds) || FD_ISSET(i, &write_fds)) {
max_fd = i + 1;
}
}
}
/* Let the caller know that we have stopped monitoring
this fd (if they care) */
if (NULL != cmd->pc_callback) {
cmd->pc_callback(cmd->pc_fd, 0, cmd->pc_context);
}
/* Remove this item from the list of registered items and
release it */
opal_list_remove_item(&registered_items, item);
OBJ_RELEASE(item);
return OMPI_SUCCESS;
}
}
/* This shouldn't happen */
return OMPI_ERR_NOT_FOUND;
}
/*
* Simple loop over reading from a fd
@ -159,159 +225,6 @@ static int write_fd(int fd, int len, void *buffer)
return OMPI_ERROR;
}
}
return OMPI_SUCCESS;
}
/*
* Write a command to the main thread, or queue it up if the pipe is full
*/
static int write_to_main_thread(cmd_t *cmd)
{
/* Note that if we write too much to the main thread pipe and the
main thread doesn't check it often, we could fill up the pipe
and cause this thread to block. Bad! So we do some simple
counting here and ensure that we don't fill the pipe. If we
are in danger of that, then queue up the commands here in the
service thread. The main thread will ACK every CALL_FUNCTION
command, so we have a built-in mechanism to wake up the service
thread to drain any queued-up commands. */
if (opal_list_get_size(&pending_to_main_thread) > 0 ||
waiting_for_ack_from_main_thread >= max_outstanding_to_main_thread) {
cmd_list_item_t *cli = OBJ_NEW(cmd_list_item_t);
if (NULL == cli) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy(&cli->cli_cmd, cmd, cmd_size);
opal_list_append(&pending_to_main_thread, &(cli->super));
} else {
OPAL_OUTPUT((-1, "fd: writing to main thread"));
write_fd(pipe_to_main_thread[1], cmd_size, cmd);
++waiting_for_ack_from_main_thread;
}
return OMPI_SUCCESS;
}
static void service_fd_callback(int fd, short event, void *context)
{
registered_item_t *ri = (registered_item_t*) context;
ri->ri_callback.event(fd, event, ri->ri_context);
}
/*
* Add an fd to the listening set
*/
static int service_pipe_cmd_add_fd(bool use_libevent, cmd_t *cmd)
{
registered_item_t *ri = OBJ_NEW(registered_item_t);
if (NULL == ri) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
ri->ri_event_used = false;
ri->ri_fd = cmd->pc_fd;
ri->ri_flags = cmd->pc_flags;
ri->ri_callback.event = cmd->pc_fn.event;
ri->ri_context = cmd->pc_context;
if (use_libevent) {
/* Make an event for this fd */
ri->ri_event_used = true;
memset(&ri->ri_event, 0, sizeof(ri->ri_event));
opal_event_set(&ri->ri_event, ri->ri_fd,
ri->ri_flags | OPAL_EV_PERSIST, service_fd_callback,
ri);
opal_event_add(&ri->ri_event, 0);
} else {
/* Add the fd to the relevant fd local sets and update max_fd */
if (OPAL_EV_READ & ri->ri_flags) {
FD_SET(ri->ri_fd, &read_fds);
}
if (OPAL_EV_WRITE & cmd->pc_flags) {
FD_SET(ri->ri_fd, &write_fds);
}
max_fd = (max_fd > ri->ri_fd) ? max_fd : ri->ri_fd + 1;
}
opal_list_append(&registered_items, &ri->super);
return OMPI_SUCCESS;
}
/*
* Remove an fd from the listening set
*/
static int service_pipe_cmd_remove_fd(cmd_t *cmd)
{
int i;
opal_list_item_t *item;
registered_item_t *ri;
OPAL_OUTPUT((-1, "service thread got unmonitor fd %d", cmd->pc_fd));
/* Go through the list of registered fd's and find the fd to
remove */
for (item = opal_list_get_first(&registered_items);
NULL != opal_list_get_end(&registered_items);
item = opal_list_get_next(item)) {
ri = (registered_item_t*) item;
if (cmd->pc_fd == ri->ri_fd) {
/* Found it. The item knows if it was used as a libevent
event or an entry in the local fd sets. */
if (ri->ri_event_used) {
/* Remove this event from libevent */
opal_event_del(&ri->ri_event);
} else {
/* Remove this item from the fd_sets and recalculate
MAX_FD */
FD_CLR(cmd->pc_fd, &read_fds);
FD_CLR(cmd->pc_fd, &write_fds);
for (max_fd = i = pipe_to_service_thread[0]; i < FD_SETSIZE; ++i) {
if (FD_ISSET(i, &read_fds) || FD_ISSET(i, &write_fds)) {
max_fd = i + 1;
}
}
}
/* Let the caller know that we have stopped monitoring
this fd (if they care) */
if (NULL != cmd->pc_fn.event) {
cmd->pc_fn.event(cmd->pc_fd, 0, cmd->pc_context);
}
/* Remove this item from the list of registered items and
release it */
opal_list_remove_item(&registered_items, item);
OBJ_RELEASE(item);
return OMPI_SUCCESS;
}
}
/* This shouldn't happen */
return OMPI_ERR_NOT_FOUND;
}
/*
* Call a function and ACK that we ran it
*/
static int main_pipe_cmd_call_function(cmd_t *cmd)
{
cmd_t local_cmd;
OPAL_OUTPUT((-1, "fd main thread: calling function!"));
/* Call the function */
if (NULL != cmd->pc_fn.main) {
cmd->pc_fn.main(cmd->pc_context);
}
/* Now ACK that we ran the function */
memset(&local_cmd, 0, cmd_size);
local_cmd.pc_cmd = ACK_RAN_FUNCTION;
write_fd(pipe_to_service_thread[1], cmd_size, &local_cmd);
/* Done */
return OMPI_SUCCESS;
}
@ -319,52 +232,32 @@ static int main_pipe_cmd_call_function(cmd_t *cmd)
/*
* Act on pipe commands
*/
static bool service_pipe_cmd(void)
static bool local_pipe_cmd(void)
{
bool ret = false;
cmd_t cmd;
cmd_list_item_t *cli;
read_fd(pipe_to_service_thread[0], cmd_size, &cmd);
read_fd(pipe_fd[0], cmd_size, &cmd);
switch (cmd.pc_cmd) {
case CMD_ADD_FD:
OPAL_OUTPUT((-1, "fd service thread: CMD_ADD_FD"));
if (OMPI_SUCCESS != service_pipe_cmd_add_fd(false, &cmd)) {
if (OMPI_SUCCESS != local_pipe_cmd_add_fd(false, &cmd)) {
ret = true;
}
break;
case CMD_REMOVE_FD:
OPAL_OUTPUT((-1, "fd service thread: CMD_REMOVE_FD"));
if (OMPI_SUCCESS != service_pipe_cmd_remove_fd(&cmd)) {
if (OMPI_SUCCESS != local_pipe_cmd_remove_fd(&cmd)) {
ret = true;
}
break;
case CMD_TIME_TO_QUIT:
OPAL_OUTPUT((-1, "fd service thread: CMD_TIME_TO_QUIT"));
opal_output(-1, "fd listener thread: time to quit");
ret = true;
break;
case ACK_RAN_FUNCTION:
/* We don't have a guarantee that the main thread will check
its pipe frequently, so we do some simple counting to
ensure we just don't have too many outstanding commands to
the main thread at any given time. The main thread will
ACK every CALL_FUNCTION command, so this thread will always
wake up and continue to drain any queued up functions. */
cli = (cmd_list_item_t*) opal_list_remove_first(&pending_to_main_thread);
if (NULL != cli) {
OPAL_OUTPUT((-1, "sending queued up cmd function to main thread"));
write_fd(pipe_to_main_thread[1], cmd_size, &(cli->cli_cmd));
OBJ_RELEASE(cli);
} else {
--waiting_for_ack_from_main_thread;
}
break;
default:
OPAL_OUTPUT((-1, "fd service thread: unknown pipe command!"));
opal_output(-1, "fd listener thread: unknown pipe command!");
break;
}
@ -375,7 +268,7 @@ static bool service_pipe_cmd(void)
/*
* Main thread logic
*/
static void *service_thread_start(void *context)
static void *thread_main(void *context)
{
int rc, flags;
fd_set read_fds_copy, write_fds_copy;
@ -385,29 +278,29 @@ static void *service_thread_start(void *context)
/* Make an fd set that we can select() on */
FD_ZERO(&write_fds);
FD_ZERO(&read_fds);
FD_SET(pipe_to_service_thread[0], &read_fds);
max_fd = pipe_to_service_thread[0] + 1;
FD_SET(pipe_fd[0], &read_fds);
max_fd = pipe_fd[0] + 1;
OPAL_OUTPUT((-1, "fd service thread running"));
opal_output(-1, "fd listener thread running");
/* Main loop waiting for commands over the fd's */
while (1) {
memcpy(&read_fds_copy, &read_fds, sizeof(read_fds));
memcpy(&write_fds_copy, &write_fds, sizeof(write_fds));
OPAL_OUTPUT((-1, "fd service thread blocking on select..."));
opal_output(-1, "fd listener thread blocking on select...");
rc = select(max_fd, &read_fds_copy, &write_fds_copy, NULL, NULL);
if (0 != rc && EAGAIN == errno) {
continue;
}
OPAL_OUTPUT((-1, "fd service thread woke up!"));
opal_output(-1, "fd listener thread woke up!");
if (rc > 0) {
if (FD_ISSET(pipe_to_service_thread[0], &read_fds_copy)) {
OPAL_OUTPUT((-1, "fd service thread: pipe command"));
if (service_pipe_cmd()) {
if (FD_ISSET(pipe_fd[0], &read_fds_copy)) {
opal_output(-1, "fd listener thread: pipe command");
if (local_pipe_cmd()) {
opal_output(-1, "fd listener thread: exiting");
break;
}
OPAL_OUTPUT((-1, "fd service thread: back from pipe command"));
}
/* Go through all the registered events and see who had
@ -431,10 +324,9 @@ static void *service_thread_start(void *context)
/* If either was ready, invoke the callback */
if (0 != flags) {
OPAL_OUTPUT((-1, "fd service thread: invoking callback for registered fd %d", ri->ri_fd));
ri->ri_callback.event(ri->ri_fd, flags,
ri->ri_context);
OPAL_OUTPUT((-1, "fd service thread: back from callback for registered fd %d", ri->ri_fd));
opal_output(-1, "fd listener thread: invoking callback for registered fd %d", ri->ri_fd);
ri->ri_callback.fd(ri->ri_fd, flags,
ri->ri_context);
}
}
}
@ -442,38 +334,12 @@ static void *service_thread_start(void *context)
}
/* All done */
OPAL_OUTPUT((-1, "fd service thread: exiting"));
opal_atomic_wmb();
return NULL;
}
static void main_thread_event_callback(int fd, short event, void *context)
{
cmd_t cmd;
OPAL_OUTPUT((-1, "main thread -- reading command"));
read_fd(pipe_to_main_thread[0], cmd_size, &cmd);
switch (cmd.pc_cmd) {
case CMD_CALL_FUNCTION:
OPAL_OUTPUT((-1, "fd main thread: calling command"));
main_pipe_cmd_call_function(&cmd);
break;
default:
OPAL_OUTPUT((-1, "fd main thread: unknown pipe command: %d",
cmd.pc_cmd));
break;
}
}
/******************************************************************
* Main interface calls
******************************************************************/
/*
* Initialize
* Called by main thread
*/
int ompi_btl_openib_fd_init(void)
{
@ -482,42 +348,19 @@ int ompi_btl_openib_fd_init(void)
OBJ_CONSTRUCT(&registered_items, opal_list_t);
/* Calculate the real size of the cmd struct */
cmd_size = (int) (&(bogus.end) - ((char*) &bogus));
if (OMPI_HAVE_THREADS) {
OBJ_CONSTRUCT(&pending_to_main_thread, opal_list_t);
/* Create pipes to communicate between the two threads */
if (0 != pipe(pipe_to_service_thread)) {
return OMPI_ERR_IN_ERRNO;
}
if (0 != pipe(pipe_to_main_thread)) {
if (OMPI_HAVE_THREAD_SUPPORT) {
/* Create a pipe to communicate with the thread */
if (0 != pipe(pipe_fd)) {
return OMPI_ERR_IN_ERRNO;
}
/* Create a libevent event that is used in the main thread
to watch its pipe */
memset(&main_thread_event, 0, sizeof(main_thread_event));
opal_event_set(&main_thread_event, pipe_to_main_thread[0],
OPAL_EV_READ | OPAL_EV_PERSIST,
main_thread_event_callback, NULL);
opal_event_add(&main_thread_event, 0);
/* Start the service thread */
if (0 != pthread_create(&thread, NULL, service_thread_start,
NULL)) {
int errno_save = errno;
opal_event_del(&main_thread_event);
close(pipe_to_service_thread[0]);
close(pipe_to_service_thread[1]);
close(pipe_to_main_thread[0]);
close(pipe_to_main_thread[1]);
errno = errno_save;
if (0 != pthread_create(&thread, NULL, thread_main, NULL)) {
return OMPI_ERR_IN_ERRNO;
}
}
/* Calculate the real size of the cmd struct */
cmd_size = (int) (&(bogus.end) - ((char*) &bogus));
initialized = true;
}
return OMPI_SUCCESS;
@ -526,10 +369,9 @@ int ompi_btl_openib_fd_init(void)
/*
* Start monitoring an fd
* Called by main or service thread; callback will be in service thread
*/
int ompi_btl_openib_fd_monitor(int fd, int flags,
ompi_btl_openib_fd_event_callback_fn_t *callback,
ompi_btl_openib_fd_callback_fn_t *callback,
void *context)
{
cmd_t cmd;
@ -542,15 +384,14 @@ int ompi_btl_openib_fd_monitor(int fd, int flags,
cmd.pc_cmd = CMD_ADD_FD;
cmd.pc_fd = fd;
cmd.pc_flags = flags;
cmd.pc_fn.event = callback;
cmd.pc_callback = callback;
cmd.pc_context = context;
if (OMPI_HAVE_THREADS) {
if (OMPI_HAVE_THREAD_SUPPORT) {
/* For the threaded version, write a command down the pipe */
OPAL_OUTPUT((-1, "main thread sending monitor fd %d", fd));
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
write_fd(pipe_fd[1], cmd_size, &cmd);
} else {
/* Otherwise, add it directly */
service_pipe_cmd_add_fd(true, &cmd);
local_pipe_cmd_add_fd(true, &cmd);
}
return OMPI_SUCCESS;
@ -559,10 +400,9 @@ int ompi_btl_openib_fd_monitor(int fd, int flags,
/*
* Stop monitoring an fd
* Called by main or service thread; callback will be in service thread
*/
int ompi_btl_openib_fd_unmonitor(int fd,
ompi_btl_openib_fd_event_callback_fn_t *callback,
ompi_btl_openib_fd_callback_fn_t *callback,
void *context)
{
cmd_t cmd;
@ -575,15 +415,14 @@ int ompi_btl_openib_fd_unmonitor(int fd,
cmd.pc_cmd = CMD_REMOVE_FD;
cmd.pc_fd = fd;
cmd.pc_flags = 0;
cmd.pc_fn.event = callback;
cmd.pc_callback = callback;
cmd.pc_context = context;
if (OMPI_HAVE_THREADS) {
if (OMPI_HAVE_THREAD_SUPPORT) {
/* For the threaded version, write a command down the pipe */
OPAL_OUTPUT((-1, "main thread sending unmonitor fd %d", fd));
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
write_fd(pipe_fd[1], cmd_size, &cmd);
} else {
/* Otherwise, remove it directly */
service_pipe_cmd_remove_fd(&cmd);
local_pipe_cmd_remove_fd(&cmd);
}
return OMPI_SUCCESS;
@ -591,25 +430,31 @@ int ompi_btl_openib_fd_unmonitor(int fd,
/*
* Run a function in the main thread
* Called by service thread
*/
int ompi_btl_openib_fd_run_in_main(ompi_btl_openib_fd_main_callback_fn_t *callback,
void *context)
int ompi_btl_openib_fd_schedule(ompi_btl_openib_schedule_callback_fn_t *callback,
void *context)
{
if (OMPI_HAVE_THREADS) {
cmd_t cmd;
if (OMPI_HAVE_THREAD_SUPPORT) {
/* For the threaded version, schedule an event for "now" */
registered_item_t *ri;
struct timeval now;
OPAL_OUTPUT((-1, "run in main -- sending command"));
/* For the threaded version, write a command down the pipe */
cmd.pc_cmd = CMD_CALL_FUNCTION;
cmd.pc_fd = -1;
cmd.pc_flags = 0;
cmd.pc_fn.main = callback;
cmd.pc_context = context;
write_to_main_thread(&cmd);
ri = OBJ_NEW(registered_item_t);
if (NULL == ri) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* Create an event that will run in the main thread */
ri->ri_fd = ri->ri_flags = -1;
ri->ri_callback.schedule = callback;
ri->ri_context = context;
ri->ri_event_used = true;
opal_evtimer_set(&ri->ri_event, libevent_event_callback, ri);
now.tv_sec = 0;
now.tv_usec = 0;
opal_evtimer_add(&ri->ri_event, &now);
} else {
/* Otherwise, call it directly */
OPAL_OUTPUT((-1, "run in main -- calling now!"));
/* For the non-threaded version, just call the function */
callback(context);
}
@ -618,32 +463,21 @@ int ompi_btl_openib_fd_run_in_main(ompi_btl_openib_fd_main_callback_fn_t *callba
/*
* Finalize
* Called by main thread
*/
int ompi_btl_openib_fd_finalize(void)
{
if (initialized) {
if (OMPI_HAVE_THREADS) {
if (OMPI_HAVE_THREAD_SUPPORT) {
/* For the threaded version, send a command down the pipe */
cmd_t cmd;
OPAL_OUTPUT((-1, "shutting down openib fd"));
opal_event_del(&main_thread_event);
memset(&cmd, 0, cmd_size);
cmd.pc_cmd = CMD_TIME_TO_QUIT;
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
write_fd(pipe_fd[1], cmd_size, &cmd);
pthread_join(thread, NULL);
opal_atomic_rmb();
opal_event_del(&main_thread_event);
close(pipe_to_service_thread[0]);
close(pipe_to_service_thread[1]);
close(pipe_to_main_thread[0]);
close(pipe_to_main_thread[1]);
OBJ_DESTRUCT(&pending_to_main_thread);
close(pipe_fd[0]);
close(pipe_fd[1]);
}
OBJ_DESTRUCT(&registered_items);
}
initialized = false;

Просмотреть файл

@ -18,46 +18,41 @@ BEGIN_C_DECLS
/**
* Typedef for fd callback function
*/
typedef void *(ompi_btl_openib_fd_event_callback_fn_t)(int fd, int flags,
void *context);
typedef void *(ompi_btl_openib_fd_callback_fn_t)(int fd, int flags,
void *context);
/**
* Typedef for generic callback function
*/
typedef void *(ompi_btl_openib_fd_main_callback_fn_t)(void *context);
typedef void *(ompi_btl_openib_schedule_callback_fn_t)(void *context);
/**
* Initialize fd monitoring.
* Called by the main thread.
* Initialize fd monitoring
*/
int ompi_btl_openib_fd_init(void);
/**
* Start monitoring an fd.
* Called by main or service thread; callback will be in service thread.
* Start monitoring an fd
*/
int ompi_btl_openib_fd_monitor(int fd, int flags,
ompi_btl_openib_fd_event_callback_fn_t *callback,
ompi_btl_openib_fd_callback_fn_t *callback,
void *context);
/**
* Stop monitoring an fd.
* Called by main or service thread; callback will be in service thread.
* Stop monitoring an fd
*/
int ompi_btl_openib_fd_unmonitor(int fd,
ompi_btl_openib_fd_event_callback_fn_t *callback,
ompi_btl_openib_fd_callback_fn_t *callback,
void *context);
/**
* Run a function in the main thread.
* Called by the service thread.
* Run a function in the main thread
*/
int ompi_btl_openib_fd_run_in_main(ompi_btl_openib_fd_main_callback_fn_t callback,
void *context);
int ompi_btl_openib_fd_schedule(ompi_btl_openib_schedule_callback_fn_t callback,
void *context);
/**
* Finalize fd monitoring.
* Called by the main thread.
* Finalize fd monitoring
*/
int ompi_btl_openib_fd_finalize(void);

Просмотреть файл

@ -116,7 +116,6 @@ typedef struct mca_btl_openib_footer_t mca_btl_openib_footer_t;
#define MCA_BTL_OPENIB_CONTROL_CREDITS 0
#define MCA_BTL_OPENIB_CONTROL_RDMA 1
#define MCA_BTL_OPENIB_CONTROL_COALESCED 2
#define MCA_BTL_OPENIB_CONTROL_CTS 3
struct mca_btl_openib_control_header_t {
uint8_t type;
@ -268,7 +267,7 @@ OBJ_CLASS_DECLARATION(mca_btl_openib_coalesced_frag_t);
*/
static inline mca_btl_openib_send_control_frag_t *
alloc_control_frag(mca_btl_openib_module_t *btl)
alloc_credit_frag(mca_btl_openib_module_t *btl)
{
int rc;
ompi_free_list_item_t *item;

Просмотреть файл

@ -32,7 +32,7 @@ AC_DEFUN([MCA_btl_openib_POST_CONFIG], [
# [action-if-cant-compile])
# ------------------------------------------------
AC_DEFUN([MCA_btl_openib_CONFIG],[
OMPI_VAR_SCOPE_PUSH([cpcs have_threads])
OMPI_VAR_SCOPE_PUSH([cpcs])
cpcs="oob"
OMPI_CHECK_OPENIB([btl_openib],
@ -54,23 +54,15 @@ AC_DEFUN([MCA_btl_openib_CONFIG],[
$1],
[$2])
AC_MSG_CHECKING([for thread support (needed for ibcm/rdmacm)])
have_threads=`echo $THREAD_TYPE | awk '{ print [$]1 }'`
if test "x$have_threads" = "x"; then
have_threads=none
fi
AC_MSG_RESULT([$have_threads])
AS_IF([test "$btl_openib_happy" = "yes"],
[if test "x$btl_openib_have_xrc" = "x1"; then
cpcs="$cpcs xoob"
fi
if test "x$btl_openib_have_rdmacm" = "x1" -a \
"$have_threads" != "none"; then
if test "x$btl_openib_have_rdmacm" = "x1"; then
cpcs="$cpcs rdmacm"
fi
if test "x$btl_openib_have_ibcm" = "x1" -a \
"$have_threads" != "none"; then
if test "x$btl_openib_have_ibcm" = "x1"; then
cpcs="$cpcs ibcm"
fi
AC_MSG_CHECKING([which openib btl cpcs will be built])

Просмотреть файл

@ -18,10 +18,10 @@
#if HAVE_XRC
#include "connect/btl_openib_connect_xoob.h"
#endif
#if OMPI_HAVE_RDMACM && OMPI_HAVE_THREADS
#if OMPI_HAVE_RDMACM
#include "connect/btl_openib_connect_rdmacm.h"
#endif
#if OMPI_HAVE_IBCM && OMPI_HAVE_THREADS
#if OMPI_HAVE_IBCM
#include "connect/btl_openib_connect_ibcm.h"
#endif
@ -44,7 +44,7 @@ static ompi_btl_openib_connect_base_component_t *all[] = {
/* Always have an entry here so that the CP indexes will always be
the same: if RDMA CM is not available, use the "empty" CPC */
#if OMPI_HAVE_RDMACM && OMPI_HAVE_THREADS
#if OMPI_HAVE_RDMACM
&ompi_btl_openib_connect_rdmacm,
#else
&ompi_btl_openib_connect_empty,
@ -52,7 +52,7 @@ static ompi_btl_openib_connect_base_component_t *all[] = {
/* Always have an entry here so that the CP indexes will always be
the same: if IB CM is not available, use the "empty" CPC */
#if OMPI_HAVE_IBCM && OMPI_HAVE_THREADS
#if OMPI_HAVE_IBCM
&ompi_btl_openib_connect_ibcm,
#else
&ompi_btl_openib_connect_empty,
@ -273,14 +273,6 @@ int ompi_btl_openib_connect_base_select_for_local_port(mca_btl_openib_module_t *
opal_output(-1, "match cpc for local port: %s",
available[i]->cbc_name);
/* If the CPC wants to use the CTS protocol, check to ensure
that QP 0 is PP; if it's not, we can't use this CPC (or the
CTS protocol) */
if (!BTL_OPENIB_QP_TYPE_PP(0)) {
BTL_VERBOSE(("this CPConly supports when the first btl_openib_receive_queues QP is a PP QP"));
continue;
}
/* This CPC has indicated that it wants to run on this openib
BTL module. Woo hoo! */
++cpc_index;

Просмотреть файл

@ -248,7 +248,7 @@
* normal RTU processing. If the RTU is received later, the IBCM
* system on the passive side will know that it's effectively a
* duplicate, and therefore can be ignored.
*/
*/
#include "ompi_config.h"
@ -649,7 +649,6 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
rc = OMPI_ERR_NOT_SUPPORTED;
goto error;
}
/* If we do not have struct ibv_device.transport_device, then
we're in an old version of OFED that is IB only (i.e., no
iWarp), so we can safely assume that we can use this CPC. */
@ -700,13 +699,12 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
BTL_VERBOSE(("created cpc module %p for btl %p",
(void*)m, (void*)btl));
/* See if we've already got an IB CM listener for this device */
/* See if we've already for an IB CM listener for this device */
for (item = opal_list_get_first(&ibcm_cm_listeners);
item != opal_list_get_end(&ibcm_cm_listeners);
item = opal_list_get_next(item)) {
cmh = (ibcm_listen_cm_id_t*) item;
if (cmh->ib_context == btl->device->ib_dev_context) {
OBJ_RETAIN(cmh);
break;
}
}
@ -755,9 +753,6 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
rc = OMPI_ERR_NOT_SUPPORTED;
goto error;
}
OPAL_OUTPUT((-1, "opened ibcm device 0x%" PRIx64 " (%s)",
(uint64_t) cmh->cm_device,
ibv_get_device_name(cmh->ib_context->device)));
if (0 != (rc = ib_cm_create_id(cmh->cm_device,
&cmh->listen_cm_id, NULL))) {
@ -784,6 +779,9 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
}
opal_list_append(&ibcm_cm_listeners, &(cmh->super));
} else {
/* We found an existing IB CM handle -- bump up the refcount */
OBJ_RETAIN(cmh);
}
m->cmh = cmh;
imli = OBJ_NEW(ibcm_module_list_item_t);
@ -833,9 +831,6 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
m->cpc.cbm_start_connect = ibcm_module_start_connect;
m->cpc.cbm_endpoint_finalize = ibcm_endpoint_finalize;
m->cpc.cbm_finalize = ibcm_module_finalize;
/* Setting uses_cts=true also guarantees that we'll only be
selected if QP 0 is PP */
m->cpc.cbm_uses_cts = true;
/* Start monitoring the fd associated with the cm_device */
ompi_btl_openib_fd_monitor(cmh->cm_device->fd, OPAL_EV_READ,
@ -906,8 +901,7 @@ static int qp_create_one(mca_btl_base_endpoint_t* endpoint, int qp,
init_attr.cap.max_send_sge = 1;
init_attr.cap.max_recv_sge = 1; /* we do not use SG list */
if(BTL_OPENIB_QP_TYPE_PP(qp)) {
/* Add one for the CTS receive frag that will be posted */
init_attr.cap.max_recv_wr = max_recv_wr + 1;
init_attr.cap.max_recv_wr = max_recv_wr;
} else {
init_attr.cap.max_recv_wr = 0;
}
@ -1271,7 +1265,7 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
/* Set the endpoint state to "connecting" (this function runs in
the main MPI thread; not the service thread, so we can set the
endpoint_state here with no memory barriers). */
endpoint_state here). */
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
/* Fill in the path record for this peer */
@ -1369,6 +1363,9 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
}
if (0 != (rc = ib_cm_send_req(req->super.cm_id, cm_req))) {
BTL_VERBOSE(("Got nonzero return from ib_cm_send_req: %d, errno %d", rc, errno));
if (-1 == rc) {
perror("Errno is ");
}
rc = OMPI_ERR_UNREACH;
goto err;
}
@ -1441,9 +1438,11 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
*/
static void *callback_unlock(int fd, int flags, void *context)
{
volatile int *barrier = (volatile int *) context;
OPAL_OUTPUT((-1, "ibcm unlocking main thread"));
*barrier = 1;
/* We need #if protection in order to prevent unused variable warning */
#if OMPI_HAVE_THREAD_SUPPORT
opal_mutex_t *m = (opal_mutex_t*) context;
OPAL_THREAD_UNLOCK(m);
#endif
return NULL;
}
@ -1456,7 +1455,7 @@ static void ibcm_listen_cm_id_constructor(ibcm_listen_cm_id_t *cmh)
static void ibcm_listen_cm_id_destructor(ibcm_listen_cm_id_t *cmh)
{
volatile int barrier = 0;
opal_mutex_t mutex;
opal_list_item_t *item;
/* Remove all the ibcm module items */
@ -1483,41 +1482,19 @@ static void ibcm_listen_cm_id_destructor(ibcm_listen_cm_id_t *cmh)
/* Stop monitoring the cm_device's fd (wait for it to be
released from the monitoring entity) */
OPAL_THREAD_LOCK(&mutex);
ompi_btl_openib_fd_unmonitor(cmh->cm_device->fd,
callback_unlock,
(void*) &barrier);
/* JMS debug code while figuring out the IBCM problem */
#if 0
while (0 == barrier) {
sched_yield();
}
#else
{
time_t t = time(NULL);
OPAL_OUTPUT((-1, "main thread waiting for ibcm barrier"));
while (0 == barrier) {
sched_yield();
if (time(NULL) - t > 5) {
OPAL_OUTPUT((-1, "main thread been looping for a long time..."));
break;
}
}
}
#endif
&mutex);
OPAL_THREAD_LOCK(&mutex);
/* Destroy the listener */
if (NULL != cmh->listen_cm_id) {
OPAL_OUTPUT((-1, "destryoing ibcm listener 0x%" PRIx64,
(uint64_t) cmh->listen_cm_id));
ib_cm_destroy_id(cmh->listen_cm_id);
}
/* Close the CM device */
if (NULL != cmh->cm_device) {
OPAL_OUTPUT((-1, "closing ibcm device 0x%" PRIx64 " (%s)",
(uint64_t) cmh->cm_device,
ibv_get_device_name(cmh->ib_context->device)));
ib_cm_close_device(cmh->cm_device);
}
}
@ -1587,7 +1564,7 @@ static int ibcm_module_finalize(mca_btl_openib_module_t *btl,
{
ibcm_module_t *m = (ibcm_module_t *) cpc;
/* If we previously successfully initialized, then release
/* If we previously successfully initialized, then destroy
everything */
if (NULL != m && NULL != m->cmh) {
OBJ_RELEASE(m->cmh);
@ -1725,6 +1702,21 @@ static int qp_to_rts(int qp_index, struct ib_cm_id *cm_id,
return OMPI_SUCCESS;
}
/*
* Callback (from main thread) when an incoming IBCM request needs to
* initiate a new connection in the other direction.
*/
static void *callback_set_endpoint_connecting(void *context)
{
mca_btl_openib_endpoint_t *endpoint =
(mca_btl_openib_endpoint_t *) context;
BTL_VERBOSE(("ibcm scheduled callback: setting endpoint to CONNECTING"));
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
return NULL;
}
/*
* Callback (from main thread) when an incoming IBCM request needs to
* initiate a new connection in the other direction.
@ -1903,11 +1895,9 @@ static int request_received(ibcm_listen_cm_id_t *cmh,
If this is not the first request, then assume that all the QPs
have been created already and we can just lookup what we need. */
if (!ie->ie_qps_created) {
/* Set the endpoint_state to "CONNECTING". This is running
in the service thread, so we need to do a write barrier. */
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
opal_atomic_wmb();
/* Schedule to set the endpoint_state to "CONNECTING" */
ompi_btl_openib_fd_schedule(callback_set_endpoint_connecting,
endpoint);
if (OMPI_SUCCESS != (rc = qp_create_all(endpoint, imodule))) {
rej_reason = REJ_PASSIVE_SIDE_ERROR;
BTL_ERROR(("qp_create_all failed -- reject"));
@ -1933,19 +1923,16 @@ static int request_received(ibcm_listen_cm_id_t *cmh,
goto reject;
}
/* Post a single receive buffer on the smallest QP for the CTS
protocol */
/* Post receive buffers. Similar to QP creation, above, we post
*all* receive buffers at once (for all QPs). So ensure to only
do this for the first request received. If this is not the
first request on this endpoint, then assume that all the
receive buffers have been posted already. */
if (!ie->ie_recv_buffers_posted) {
struct ibv_recv_wr *bad_wr, *wr;
assert(NULL != endpoint->endpoint_cts_frag);
wr = &to_recv_frag(endpoint->endpoint_cts_frag)->rd_desc;
assert(NULL != wr);
wr->next = NULL;
OPAL_OUTPUT((-1, "REQUEST posting CTS recv buffer"));
if (0 != ibv_post_recv(endpoint->qps[mca_btl_openib_component.credits_qp].qp->lcl_qp, wr, &bad_wr)) {
BTL_VERBOSE(("failed to post CTS recv buffer"));
if (OMPI_SUCCESS !=
(rc = mca_btl_openib_endpoint_post_recvs(endpoint))) {
/* JMS */
BTL_VERBOSE(("failed to post recv buffers"));
rej_reason = REJ_PASSIVE_SIDE_ERROR;
goto reject;
}
@ -2058,31 +2045,32 @@ static int request_received(ibcm_listen_cm_id_t *cmh,
(ompi_btl_openib_connect_base_module_t *) imodule;
cbdata->cscd_endpoint = endpoint;
BTL_VERBOSE(("starting connect in other direction"));
ompi_btl_openib_fd_run_in_main(callback_start_connect, cbdata);
ompi_btl_openib_fd_schedule(callback_start_connect, cbdata);
TIMER_STOP(REQUEST_RECEIVED);
return OMPI_SUCCESS;
}
BTL_ERROR(("malloc failed"));
}
/* Communicate to the upper layer that the connection on this
endpoint has failed */
TIMER_STOP(REQUEST_RECEIVED);
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
endpoint);
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
endpoint);
return rc;
}
/*
* Callback (from main thread) when the endpoint has been connected
*/
static void *callback_set_endpoint_cpc_complete(void *context)
static void *callback_set_endpoint_connected(void *context)
{
mca_btl_openib_endpoint_t *endpoint = (mca_btl_openib_endpoint_t*) context;
BTL_VERBOSE(("calling endpoint_cpc_complete"));
mca_btl_openib_endpoint_cpc_complete(endpoint);
BTL_VERBOSE(("*** CONNECTED endpoint_cpc_complete done!"));
BTL_VERBOSE(("calling endpoint_connected"));
mca_btl_openib_endpoint_connected(endpoint);
BTL_VERBOSE(("*** CONNECTED endpoint_connected done!"));
return NULL;
}
@ -2145,27 +2133,18 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
goto error;
}
/* Now that all the qp's are created locally, post a single
receive buffer on the smallest QP for the CTS protocol. Be
sure to only do this for the *first* reply that is received on
an endpoint. For all other replies received on an endpoint, we
can safely assume that the CTS receive buffer has already been
posted. */
/* Now that all the qp's are created locally, post some receive
buffers, setup credits, etc. The post_recvs() call posts the
buffers for all QPs at once, so be sure to only do this for the
*first* reply that is received on an endpoint. For all other
replies received on an endpoint, we can safely assume that the
receive buffers have already been posted. */
if (!ie->ie_recv_buffers_posted) {
struct ibv_recv_wr *bad_wr, *wr;
OPAL_OUTPUT((-1, "REPLY posting CTS recv buffer"));
assert(NULL != endpoint->endpoint_cts_frag);
wr = &to_recv_frag(endpoint->endpoint_cts_frag)->rd_desc;
assert(NULL != wr);
wr->next = NULL;
if (0 != ibv_post_recv(endpoint->qps[mca_btl_openib_component.credits_qp].qp->lcl_qp, wr, &bad_wr)) {
/* JMS */
BTL_VERBOSE(("failed to post CTS recv buffer"));
if (OMPI_SUCCESS !=
(rc = mca_btl_openib_endpoint_post_recvs(endpoint))) {
BTL_VERBOSE(("failed to post recv buffers"));
goto error;
}
ie->ie_recv_buffers_posted = true;
}
@ -2189,7 +2168,7 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
that we're done. */
if (0 == --(ie->ie_qps_to_connect)) {
BTL_VERBOSE(("REPLY telling main BTL we're connected"));
ompi_btl_openib_fd_run_in_main(callback_set_endpoint_cpc_complete, endpoint);
ompi_btl_openib_fd_schedule(callback_set_endpoint_connected, endpoint);
}
TIMER_STOP(REPLY_RECEIVED);
@ -2198,8 +2177,8 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
error:
/* Communicate to the upper layer that the connection on this
endpoint has failed */
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
endpoint);
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
endpoint);
return rc;
}
@ -2228,7 +2207,7 @@ static int ready_to_use_received(ibcm_listen_cm_id_t *h,
that we're done. */
if (0 == --(ie->ie_qps_to_connect)) {
BTL_VERBOSE(("RTU telling main BTL we're connected"));
ompi_btl_openib_fd_run_in_main(callback_set_endpoint_cpc_complete, endpoint);
ompi_btl_openib_fd_schedule(callback_set_endpoint_connected, endpoint);
}
BTL_VERBOSE(("all done"));
@ -2237,6 +2216,25 @@ static int ready_to_use_received(ibcm_listen_cm_id_t *h,
}
static int disconnect_request_received(ibcm_listen_cm_id_t *cmh,
struct ib_cm_event *event)
{
BTL_VERBOSE(("disconnect request received"));
return OMPI_SUCCESS;
}
static int disconnect_reply_received(ibcm_listen_cm_id_t *cmd,
struct ib_cm_event *event)
{
BTL_VERBOSE(("disconnect reply received"));
#if 0
ib_cm_send_drep(event->cm_id, NULL, 0);
#endif
return OMPI_SUCCESS;
}
static int reject_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
{
enum ib_cm_rej_reason reason = event->param.rej_rcvd.reason;
@ -2300,7 +2298,7 @@ static int reject_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
reason));
/* Communicate to the upper layer that the connection on this
endpoint has failed */
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, NULL);
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error, NULL);
return OMPI_ERR_NOT_FOUND;
}
@ -2332,8 +2330,8 @@ static int request_error(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
/* Communicate to the upper layer that the connection on this
endpoint has failed */
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
endpoint);
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
endpoint);
return OMPI_SUCCESS;
}
@ -2364,12 +2362,28 @@ static int reply_error(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
/* Communicate to the upper layer that the connection on this
endpoint has failed */
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
endpoint);
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
endpoint);
return OMPI_SUCCESS;
}
static int disconnect_request_error(ibcm_listen_cm_id_t *cmh,
struct ib_cm_event *e)
{
BTL_VERBOSE(("disconnect request error!"));
return OMPI_SUCCESS;
}
static int unhandled_event(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *e)
{
BTL_VERBOSE(("unhandled event error (%p, %d)",
(void*) e, e->event));
return OMPI_ERR_NOT_FOUND;
}
static void *ibcm_event_dispatch(int fd, int flags, void *context)
{
bool want_ack;
@ -2377,46 +2391,38 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context)
ibcm_listen_cm_id_t *cmh = (ibcm_listen_cm_id_t*) context;
struct ib_cm_event *e = NULL;
OPAL_OUTPUT((-1, "ibcm dispatch: on device 0x%" PRIx64", fd %d",
(uint64_t) cmh->cm_device, fd));
TIMER_START(CM_GET_EVENT);
/* Blocks until next event, which should be immediately (because
we shouldn't call this dispatch function unless there's
something ready to read) */
rc = ib_cm_get_event(cmh->cm_device, &e);
TIMER_STOP(CM_GET_EVENT);
if (-ENODATA == rc) {
OPAL_OUTPUT((-1, "ibcm dispatch: GOT NOT DATA!"));
return NULL;
}
while (-1 == rc && EAGAIN == errno) {
OPAL_OUTPUT((-1, "ibcm dispatch: GOT EAGAIN!"));
/* Try again */
rc = ib_cm_get_event(cmh->cm_device, &e);
}
if (0 == rc && NULL != e) {
want_ack = true;
switch (e->event) {
case IB_CM_REQ_RECEIVED:
OPAL_OUTPUT((-1, "ibcm dispatch: request received on fd %d", fd));
/* Incoming request */
rc = request_received(cmh, e);
break;
case IB_CM_REP_RECEIVED:
OPAL_OUTPUT((-1, "ibcm dispatch: reply received on fd %d", fd));
/* Reply received */
rc = reply_received(cmh, e);
break;
case IB_CM_RTU_RECEIVED:
OPAL_OUTPUT((-1, "ibcm dispatch: RTU received on fd %d", fd));
/* Ready to use! */
rc = ready_to_use_received(cmh, e);
break;
case IB_CM_DREQ_RECEIVED:
/* Disconnect request */
rc = disconnect_request_received(cmh, e);
break;
case IB_CM_DREP_RECEIVED:
/* Disconnect reply */
rc = disconnect_reply_received(cmh, e);
break;
case IB_CM_REJ_RECEIVED:
OPAL_OUTPUT((-1, "ibcm dispatch: reject received on fd %d", fd));
/* Rejected connection */
rc = reject_received(cmh, e);
/* reject_received() called ib_cm_ack_event so that the CM
@ -2425,32 +2431,22 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context)
break;
case IB_CM_REQ_ERROR:
OPAL_OUTPUT((-1, "ibcm dispatch: request error received on fd %d", fd));
/* Request error */
rc = request_error(cmh, e);
break;
case IB_CM_REP_ERROR:
OPAL_OUTPUT((-1, "ibcm dispatch: reply error received on fd %d", fd));
/* Reply error */
rc = reply_error(cmh, e);
break;
case IB_CM_DREQ_RECEIVED:
case IB_CM_DREP_RECEIVED:
case IB_CM_DREQ_ERROR:
OPAL_OUTPUT((-1, "ibcm dispatch: %s received on fd %d",
(IB_CM_DREQ_RECEIVED == e->event) ? "disconnect request" :
(IB_CM_DREP_RECEIVED == e->event) ? "disconnect reply" :
"disconnect request error", fd));
/* We don't care */
rc = OMPI_SUCCESS;
/* Disconnect request error */
rc = disconnect_request_error(cmh, e);
break;
default:
/* This would be odd */
OPAL_OUTPUT((-1, "ibcm dispatch: unhandled event received on fd %d", fd));
rc = OMPI_ERR_NOT_FOUND;
rc = unhandled_event(cmh, e);
break;
}
@ -2466,8 +2462,6 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context)
function), the dispatch function would have done that
already */
}
} else {
OPAL_OUTPUT((-1, "Got weird value back from ib_cm_get_event: %d", rc));
}
return NULL;

Просмотреть файл

@ -86,7 +86,7 @@ ompi_btl_openib_connect_base_component_t ompi_btl_openib_connect_oob = {
/* Query */
oob_component_query,
/* Finalize */
oob_component_finalize,
oob_component_finalize
};
/* Open - this functions sets up any oob specific commandline params */
@ -166,7 +166,6 @@ static int oob_component_query(mca_btl_openib_module_t *btl,
(*cpc)->cbm_start_connect = oob_module_start_connect;
(*cpc)->cbm_endpoint_finalize = NULL;
(*cpc)->cbm_finalize = NULL;
(*cpc)->cbm_uses_cts = false;
opal_output_verbose(5, mca_btl_base_output,
"openib BTL: oob CPC available for use on %s",
@ -829,19 +828,19 @@ static void rml_recv_cb(int status, orte_process_name_t* process_name,
} else {
send_connect_data(ib_endpoint, ENDPOINT_CONNECT_ACK);
/* Tell main BTL that we're done */
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
mca_btl_openib_endpoint_connected(ib_endpoint);
}
break;
case MCA_BTL_IB_WAITING_ACK:
/* Tell main BTL that we're done */
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
mca_btl_openib_endpoint_connected(ib_endpoint);
break;
case MCA_BTL_IB_CONNECT_ACK:
send_connect_data(ib_endpoint, ENDPOINT_CONNECT_ACK);
/* Tell main BTL that we're done */
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
mca_btl_openib_endpoint_connected(ib_endpoint);
break;
case MCA_BTL_IB_CONNECTED:

Просмотреть файл

@ -43,10 +43,12 @@
#undef event
static void rdmacm_component_register(void);
static int rdmacm_component_init(void);
static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl,
ompi_btl_openib_connect_base_module_t **cpc);
static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
mca_btl_base_endpoint_t *endpoint);
static int rdmacm_component_destroy(void);
static int rdmacm_component_init(void);
ompi_btl_openib_connect_base_component_t ompi_btl_openib_connect_rdmacm = {
"rdmacm",
@ -101,7 +103,7 @@ static int rdmacm_priority = 30;
static uint16_t rdmacm_port = 0;
static uint32_t rdmacm_addr = 0;
#define RDMACM_RESOLVE_ADDR_TIMEOUT 2000
#define RDMA_RESOLVE_ADDR_TIMEOUT 2000
/* Open - this functions sets up any rdma_cm specific commandline params */
static void rdmacm_component_register(void)
@ -185,9 +187,8 @@ static void rdmacm_cleanup(rdmacm_contents_t *local,
struct rdma_cm_id *id,
uint32_t num)
{
if (NULL == id) {
if (NULL == id)
return;
}
free(id->context);
id->context = NULL;
@ -250,8 +251,7 @@ static int rdmacm_setup_qp(rdmacm_contents_t *local,
attr.send_cq = local->openib_btl->device->ib_cq[BTL_OPENIB_LP_CQ];
attr.recv_cq = local->openib_btl->device->ib_cq[qp_cq_prio(qpnum)];
attr.srq = srq;
/* Add one for the CTS receive frag that will be posted */
attr.cap.max_recv_wr = max_recv_wr + 1;
attr.cap.max_recv_wr = max_recv_wr;
attr.cap.max_send_wr = max_send_wr;
attr.cap.max_inline_data = req_inline =
max_inline_size(qpnum, local->openib_btl->device);
@ -282,9 +282,9 @@ out:
return 1;
}
static int rdmacm_client_connect_one(rdmacm_contents_t *local,
message_t *message,
int num)
static int rdma_client_connect_one(rdmacm_contents_t *local,
message_t *message,
int num)
{
struct sockaddr_in din;
id_contexts_t *context;
@ -319,11 +319,10 @@ static int rdmacm_client_connect_one(rdmacm_contents_t *local,
* RDMA_CM_EVENT_ADDR_RESOLVED event will occur on the local event
* handler.
*/
OPAL_OUTPUT((0, "Resolving id: 0x%x", local->id[num]));
rc = rdma_resolve_addr(local->id[num],
NULL,
(struct sockaddr *)&din,
RDMACM_RESOLVE_ADDR_TIMEOUT);
RDMA_RESOLVE_ADDR_TIMEOUT);
if (0 != rc) {
BTL_ERROR(("Failed to resolve the remote address with %d", rc));
goto out1;
@ -337,78 +336,20 @@ out:
return OMPI_ERROR;
}
static char *stringify(uint32_t addr)
{
char *line;
asprintf(&line, "%d.%d.%d.%d",
addr & 0xff,
(addr >> 8) & 0xff,
(addr >> 16) & 0xff,
(addr >> 24));
return line;
}
/* To avoid all kinds of nasty race conditions, we only allow
* connections to be made in one direction. So use a simple
* (arbitrary) test to decide which direction is allowed to initiate
* the connection: the process with the lower IP address wins. If the
* IP addresses are the same (i.e., the MPI procs are on the same
* node), then the process with the lower TCP port wins.
*/
static bool i_initiate(uint32_t local_ipaddr, uint16_t local_port,
uint32_t remote_ipaddr, uint16_t remote_port)
{
char *a = stringify(local_ipaddr);
char *b = stringify(remote_ipaddr);
if (local_ipaddr > remote_ipaddr ||
(local_ipaddr == remote_ipaddr && local_port < remote_port)) {
OPAL_OUTPUT((0, "i_initiate (I WIN): local ipaddr %s, remote ipaddr %s",
a, b));
free(a);
free(b);
return true;
} else {
OPAL_OUTPUT((0, "i_initiate (I lose): local ipaddr %s, remote ipaddr %s",
a, b));
free(a);
free(b);
return false;
}
}
static int rdmacm_client_connect(rdmacm_contents_t *local, message_t *message)
static int rdma_client_connect(rdmacm_contents_t *local, message_t *message)
{
int rc, qp;
/* If we're not the initiator, allocate an extra ID for the bogus
QP that we expect to be rejected */
qp = mca_btl_openib_component.num_qps;
if (!local->endpoint->endpoint_initiator) {
++qp;
}
local->id = calloc(qp, sizeof(struct rdma_cm_id *));
local->id = malloc(sizeof(struct rdma_cm_id *) * mca_btl_openib_component.num_qps);
if (NULL == local->id) {
BTL_ERROR(("malloc error"));
return OMPI_ERR_OUT_OF_RESOURCE;
return OMPI_ERROR;
}
/* If we're the initiator, then open all the QPs */
if (local->endpoint->endpoint_initiator) {
for (qp = 0; qp < mca_btl_openib_component.num_qps; qp++) {
rc = rdmacm_client_connect_one(local, message, qp);
if (OMPI_SUCCESS != rc) {
BTL_ERROR(("rdmacm_client_connect_one error (real QP %d)",
qp));
goto out;
}
}
}
/* Otherwise, only open 1 QP that we expect to be rejected */
else {
rc = rdmacm_client_connect_one(local, message, qp - 1);
for (qp = 0; qp < mca_btl_openib_component.num_qps; qp++) {
rc = rdma_client_connect_one(local, message, qp);
if (OMPI_SUCCESS != rc) {
BTL_ERROR(("rdmacm_client_connect_one error (bogus QP)"));
BTL_ERROR(("rdma_client_connect_one error"));
goto out;
}
}
@ -417,12 +358,9 @@ static int rdmacm_client_connect(rdmacm_contents_t *local, message_t *message)
out:
for (; qp >= 0; qp--) {
if (NULL != local->id[qp]) {
rdmacm_cleanup(local, local->id[qp], qp);
}
rdmacm_cleanup(local, local->id[qp], qp);
}
return rc;
return OMPI_ERROR;
}
/* Connect method called by the upper layers to connect the local
@ -432,18 +370,11 @@ out:
static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
mca_btl_base_endpoint_t *endpoint)
{
rdmacm_contents_t *local;
message_t *message, *local_message;
rdmacm_contents_t *client;
message_t *message;
int rc;
/* Don't use the CPC to get the message, because this function is
invoked from the event_handler (to intitiate connections in the
Right direction), where we don't have the CPC, so it'll be
NULL. */
local_message =
(message_t *) endpoint->endpoint_local_cpc->data.cbm_modex_message;
message = (message_t *)
endpoint->endpoint_remote_cpc_data->cbm_modex_message;
message = (message_t *)endpoint->endpoint_remote_cpc_data->cbm_modex_message;
BTL_VERBOSE(("Connecting to remote ip addr = %x, port = %d ep state = %d",
message->ipaddr, message->tcp_port, endpoint->endpoint_state));
@ -454,43 +385,28 @@ static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cp
return OMPI_SUCCESS;
}
/* Set the endpoint state to "connecting" (this function runs in
the main MPI thread; not the service thread, so we can set the
endpoint_state here). */
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
endpoint->endpoint_state = MCA_BTL_IB_CONNECT_ACK;
local = calloc(1, sizeof(rdmacm_contents_t));
if (NULL == local) {
BTL_ERROR(("malloc of local failed"));
client = calloc(1, sizeof(rdmacm_contents_t));
if (NULL == client) {
BTL_ERROR(("malloc of client failed"));
goto out;
}
local->openib_btl = endpoint->endpoint_btl;
local->endpoint = endpoint;
local->server = false;
client->openib_btl = endpoint->endpoint_btl;
client->endpoint = endpoint;
client->server = false;
/* Populate the port information with the local port the server is
* listening on instead of the ephemerial port this client is
* connecting with. This port is used to determine which endpoint
* is being connected from, in the case where there are multiple
* listeners on the local system.
* is being connected from, in the isntance where there are
* multiple listeners on the local system.
*/
local->ipaddr = local_message->ipaddr;
local->tcp_port = local_message->tcp_port;
client->tcp_port = ((message_t *)endpoint->endpoint_local_cpc->data.cbm_modex_message)->tcp_port;
/* Are we the initiator? Or do we expect this connect request to
be rejected? */
endpoint->endpoint_initiator =
i_initiate(local->ipaddr, local->tcp_port,
message->ipaddr, message->tcp_port);
OPAL_OUTPUT((0, "Start connect; ep=0x%x (0x%x), I %s the initiator to %s",
endpoint,
endpoint->endpoint_local_cpc,
endpoint->endpoint_initiator ? "am" : "am NOT",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
rc = rdmacm_client_connect(local, message);
rc = rdma_client_connect(client, message);
if (0 != rc) {
BTL_ERROR(("rdmacm_client_connect error"));
BTL_ERROR(("rdma_client_connect error"));
goto out;
}
@ -531,23 +447,15 @@ static int handle_connect_request(rdmacm_contents_t *local,
}
message = endpoint->endpoint_remote_cpc_data->cbm_modex_message;
endpoint->endpoint_initiator =
i_initiate(local->ipaddr, local->tcp_port,
message->ipaddr, rem_port);
BTL_VERBOSE(("ep state = %d, local ipaddr = %x, remote ipaddr = %x, local port = %d, remote port = %d",
endpoint->endpoint_state, local->ipaddr, message->ipaddr,
local->tcp_port, rem_port));
BTL_VERBOSE(("ep state = %d, local ipaddr = %x, remote ipaddr = %x port %d",
endpoint->endpoint_state, local->ipaddr, message->ipaddr, rem_port));
OPAL_OUTPUT((0, "in handle_connect_request; ep=0x%x (0x%x), I still %s the initiator to %s",
endpoint,
endpoint->endpoint_local_cpc,
endpoint->endpoint_initiator ? "am" : "am NOT",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
if (endpoint->endpoint_initiator) {
if ((local->ipaddr == message->ipaddr && local->tcp_port < rem_port) ||
local->ipaddr > message->ipaddr) {
int race = 1;
OPAL_OUTPUT((0, "Received a connect request from an endpoint in the wrong direction"));
BTL_VERBOSE(("Received a connect request from an endpoint in the wrong direction"));
/* This will cause a event on the remote system. By passing in
* a value in the second arg of rdma_reject, the remote side
@ -560,17 +468,19 @@ static int handle_connect_request(rdmacm_contents_t *local,
goto out;
}
OPAL_OUTPUT((0, "Starting connection in other direction"));
rdmacm_module_start_connect(NULL, endpoint);
/* If there are multiple QPs attempting to connect from the
* wrong direction, only make one call to
* rdmacm_module_start_connect to connect in the proper
* direction, as it will connect to the remote side with the
* correct number of QPs.
*/
if (0 == qpnum) {
rdmacm_module_start_connect(NULL, endpoint);
}
return 0;
}
/* Set the endpoint_state to "CONNECTING". This is running
in the service thread, so we need to do a write barrier. */
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
opal_atomic_wmb();
endpoint->rem_info.rem_index = rem_index;
/* Setup QP for new connection */
@ -583,26 +493,24 @@ static int handle_connect_request(rdmacm_contents_t *local,
goto out;
}
/* Post a single receive buffer on the smallest QP for the CTS
protocol */
if (mca_btl_openib_component.credits_qp == qpnum) {
struct ibv_recv_wr *bad_wr, *wr;
assert(NULL != endpoint->endpoint_cts_frag);
wr = &to_recv_frag(endpoint->endpoint_cts_frag)->rd_desc;
assert(NULL != wr);
wr->next = NULL;
if (0 != ibv_post_recv(endpoint->qps[qpnum].qp->lcl_qp,
wr, &bad_wr)) {
BTL_ERROR(("failed to post CTS recv buffer"));
goto out1;
}
/* Recvs must be posted prior to accepting the rdma connection.
* Otherwise, it is possible to get data before there are recvs to
* put it, which for iWARP will result in tearing down of the
* connection.
*/
if (BTL_OPENIB_QP_TYPE_PP(qpnum)) {
rc = mca_btl_openib_endpoint_post_rr(endpoint, qpnum);
} else {
rc = mca_btl_openib_post_srr(endpoint->endpoint_btl, qpnum);
}
if (OMPI_SUCCESS != rc) {
BTL_ERROR(("mca_btl_openib_endpoint_post_rr_nolock error %d", rc));
goto out1;
}
/* Since the event id is already created, we cannot add this
* information in the normal way. Instead we must reference its
* location and put the data there so that it can be accessed later.
* location and put the data there so that it can be access later.
*/
event->id->context = malloc(sizeof(id_contexts_t));
if (NULL == event->id->context) {
@ -707,11 +615,7 @@ static int rdmacm_connection_shutdown(struct mca_btl_base_endpoint_t *endpoint)
if (NULL != cli->item->id[i] &&
NULL != cli->item->id[i]->qp &&
NULL != cli->item->endpoint->qps) {
opal_output(0, "Freeing rdmacm id %p",
cli->item->id[i]);
rdma_disconnect(cli->item->id[i]);
/* JMS shouldn't be necessary */
cli->item->id[i] = NULL;
rdma_disconnect(cli->item->id[i]);
}
}
}
@ -722,13 +626,11 @@ static int rdmacm_connection_shutdown(struct mca_btl_base_endpoint_t *endpoint)
/*
* Callback (from main thread) when the endpoint has been connected
*/
static void *local_endpoint_cpc_complete(void *context)
static void *local_endpoint_connected(void *context)
{
mca_btl_openib_endpoint_t *endpoint = (mca_btl_openib_endpoint_t *)context;
OPAL_OUTPUT((0, "local_endpoint_cpc_complete to %s",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
mca_btl_openib_endpoint_cpc_complete(endpoint);
mca_btl_openib_endpoint_connected(endpoint);
return NULL;
}
@ -739,11 +641,9 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even
mca_btl_openib_endpoint_t *endpoint;
message_t *message;
if (local->server) {
if (local->server)
endpoint = ((id_contexts_t *)event->id->context)->endpoint;
OPAL_OUTPUT((0, "Server CPC complete to %s",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
} else {
else {
list_item_t *li;
uint32_t rem_index;
@ -759,8 +659,6 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even
}
li->item = local;
opal_list_append(&client_list, &(li->super));
OPAL_OUTPUT((0, "Client CPC complete to %s",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
}
if (NULL == endpoint) {
BTL_ERROR(("Can't find endpoint"));
@ -768,17 +666,12 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even
}
data = (rdmacm_endpoint_local_cpc_data_t *)endpoint->endpoint_local_cpc_data;
/* Only notify the upper layers after the last QP has been connected */
/* Only notify the upper layers after the last QO has been connected */
if (++data->rdmacm_counter < mca_btl_openib_component.num_qps) {
BTL_VERBOSE(("%s count == %d", local->server?"server":"client", data->rdmacm_counter));
return 0;
}
OPAL_OUTPUT((0, "in connect_endpoint; ep=0x%x (0x%x), I still %s the initiator to %s",
endpoint,
endpoint->endpoint_local_cpc,
endpoint->endpoint_initiator ? "am" : "am NOT",
endpoint->endpoint_proc->proc_ompi->proc_hostname));
message = endpoint->endpoint_remote_cpc_data->cbm_modex_message;
BTL_VERBOSE(("%s connected!!! local %x remote %x state = %d",
local->server?"server":"client",
@ -786,19 +679,19 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even
message->ipaddr,
endpoint->endpoint_state));
ompi_btl_openib_fd_run_in_main(local_endpoint_cpc_complete, endpoint);
ompi_btl_openib_fd_schedule(local_endpoint_connected, endpoint);
return 0;
}
static int resolve_route(rdmacm_contents_t *local, int num)
static int start_connect(rdmacm_contents_t *local, int num)
{
int rc;
/* Resolve the route to the remote system. Once established, the
/* Resolve the route to the remote system. Onced established, the
* local system will get a RDMA_CM_EVENT_ROUTE_RESOLVED event.
*/
rc = rdma_resolve_route(local->id[num], RDMACM_RESOLVE_ADDR_TIMEOUT);
rc = rdma_resolve_route(local->id[num], RDMA_RESOLVE_ADDR_TIMEOUT);
if (0 != rc) {
BTL_ERROR(("Failed to resolve the route with %d", rc));
goto out;
@ -829,6 +722,7 @@ static int create_dummy_qp(rdmacm_contents_t *local, struct rdma_cm_id *id, int
struct ibv_qp_init_attr attr;
struct ibv_qp *qp;
/* create the qp via rdma_create_qp() */
memset(&attr, 0, sizeof(attr));
attr.qp_type = IBV_QPT_RC;
attr.send_cq = local->dummy_cq;
@ -856,44 +750,34 @@ out:
static int finish_connect(rdmacm_contents_t *local, int num)
{
struct rdma_conn_param conn_param;
conn_message_t msg;
int rc;
struct sockaddr *peeraddr, *localaddr;
uint32_t localipaddr, remoteipaddr;
uint16_t remoteport;
conn_message_t msg;
int rc;
remoteport = rdma_get_dst_port(local->id[num]);
localaddr = rdma_get_local_addr(local->id[num]);
peeraddr = rdma_get_peer_addr(local->id[num]);
localipaddr = ((struct sockaddr_in *)localaddr)->sin_addr.s_addr;
remoteipaddr = ((struct sockaddr_in *)peeraddr)->sin_addr.s_addr;
localaddr = rdma_get_local_addr(local->id[num]);
localipaddr = ((struct sockaddr_in *)localaddr)->sin_addr.s_addr;
/* If we're the initiator, then setup the QP's and post the CTS
message buffer */
if (local->endpoint->endpoint_initiator) {
if ((localipaddr == remoteipaddr && local->tcp_port <= remoteport) ||
localipaddr > remoteipaddr) {
rc = rdmacm_setup_qp(local, local->endpoint, local->id[num], num);
if (0 != rc) {
BTL_ERROR(("rdmacm_setup_qp error %d", rc));
goto out;
}
if (mca_btl_openib_component.credits_qp == num) {
/* Post a single receive buffer on the smallest QP for the CTS
protocol */
struct ibv_recv_wr *bad_wr, *wr;
assert(NULL != local->endpoint->endpoint_cts_frag);
wr = &to_recv_frag(local->endpoint->endpoint_cts_frag)->rd_desc;
assert(NULL != wr);
wr->next = NULL;
if (0 != ibv_post_recv(local->endpoint->qps[num].qp->lcl_qp,
wr, &bad_wr)) {
BTL_ERROR(("failed to post CTS recv buffer"));
goto out1;
}
if (BTL_OPENIB_QP_TYPE_PP(num)) {
rc = mca_btl_openib_endpoint_post_rr(local->endpoint, num);
} else {
rc = mca_btl_openib_post_srr(local->endpoint->endpoint_btl, num);
}
if (OMPI_SUCCESS != rc) {
BTL_ERROR(("mca_btl_openib_endpoint_post_rr_nolock error %d", rc));
goto out1;
}
} else {
/* If we are establishing a connection in the "wrong" direction,
@ -927,15 +811,12 @@ static int finish_connect(rdmacm_contents_t *local, int num)
msg.rem_index = local->endpoint->index;
msg.rem_port = local->tcp_port;
BTL_VERBOSE(("Connecting from %x, port %d to %x", localipaddr, msg.rem_port, remoteipaddr));
/* Now all of the local setup has been done. The remote system
* should now get a RDMA_CM_EVENT_CONNECT_REQUEST event to further
* the setup of the QP.
*/
OPAL_OUTPUT((0, "in finish_connect; ep=0x%x (0x%x), I still %s the initiator to %s",
local->endpoint,
local->endpoint->endpoint_local_cpc,
local->endpoint->endpoint_initiator ? "am" : "am NOT",
local->endpoint->endpoint_proc->proc_ompi->proc_hostname));
rc = rdma_connect(local->id[num], &conn_param);
if (0 != rc) {
BTL_ERROR(("rdma_connect Failed with %d", rc));
@ -952,7 +833,7 @@ out:
return -1;
}
static int event_handler(struct rdma_cm_event *event)
static int rdma_event_handler(struct rdma_cm_event *event)
{
rdmacm_contents_t *local;
struct sockaddr *peeraddr, *localaddr;
@ -969,7 +850,7 @@ static int event_handler(struct rdma_cm_event *event)
localipaddr = ((struct sockaddr_in *)localaddr)->sin_addr.s_addr;
peeripaddr = ((struct sockaddr_in *)peeraddr)->sin_addr.s_addr;
BTL_VERBOSE(("%s event_handler -- %s, status = %d to %x",
BTL_VERBOSE(("%s rdma_event_handler -- %s, status = %d to %x",
local->server?"server":"client",
rdma_event_str(event->event),
event->status,
@ -977,23 +858,19 @@ static int event_handler(struct rdma_cm_event *event)
switch (event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
OPAL_OUTPUT((0, "Address resolved: ID 0x%x", local->id[qpnum]));
rc = resolve_route(local, qpnum);
rc = start_connect(local, qpnum);
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
OPAL_OUTPUT((0, "Route resolved: ID 0x%x", local->id[qpnum]));
local->ipaddr = localipaddr;
rc = finish_connect(local, qpnum);
break;
case RDMA_CM_EVENT_CONNECT_REQUEST:
OPAL_OUTPUT((0, "Incoming connect request: 0x%x", local->id[qpnum]));
rc = handle_connect_request(local, event);
break;
case RDMA_CM_EVENT_ESTABLISHED:
OPAL_OUTPUT((0, "Connection established: 0x%x", local->id[qpnum]));
rc = rdmacm_connect_endpoint(local, event);
break;
@ -1003,16 +880,14 @@ static int event_handler(struct rdma_cm_event *event)
break;
case RDMA_CM_EVENT_REJECTED:
if ((NULL != event->param.conn.private_data) &&
(1 == *((int *)event->param.conn.private_data))) {
if ((NULL != event->param.conn.private_data) && (1 == *((int *)event->param.conn.private_data))) {
BTL_VERBOSE(("A good reject! for qp %d", qpnum));
if (NULL != local->id[qpnum]->qp) {
ibv_destroy_qp(local->id[qpnum]->qp);
local->id[qpnum]->qp = NULL;
}
if (NULL != local->dummy_cq) {
if (NULL != local->dummy_cq)
ibv_destroy_cq(local->dummy_cq);
}
rdmacm_cleanup(local, local->id[qpnum], qpnum);
rc = 0;
}
@ -1039,8 +914,8 @@ static inline void rdmamcm_event_error(struct rdma_cm_event *event)
endpoint = ((id_contexts_t *)event->id->context)->local->endpoint;
}
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
endpoint);
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
endpoint);
}
static void *rdmacm_event_dispatch(int fd, int flags, void *context)
@ -1077,9 +952,9 @@ static void *rdmacm_event_dispatch(int fd, int flags, void *context)
}
rdma_ack_cm_event(event);
rc = event_handler(&ecopy);
rc = rdma_event_handler(&ecopy);
if (0 != rc) {
BTL_ERROR(("Error event_handler -- %s, status = %d",
BTL_ERROR(("Error rdma_event_handler -- %s, status = %d",
rdma_event_str(ecopy.event),
ecopy.status));
@ -1106,11 +981,13 @@ static int rdmacm_init(mca_btl_openib_endpoint_t *endpoint)
data = calloc(1, sizeof(rdmacm_endpoint_local_cpc_data_t));
if (NULL == data) {
BTL_ERROR(("malloc failed"));
return OMPI_ERR_OUT_OF_RESOURCE;
goto out;
}
endpoint->endpoint_local_cpc_data = data;
return OMPI_SUCCESS;
return 0;
out:
return -1;
}
static int ipaddrcheck(rdmacm_contents_t *server,
@ -1191,7 +1068,8 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, ompi_btl_
/* RDMACM is not supported if we have any XRC QPs */
if (mca_btl_openib_component.num_xrc_qps > 0) {
BTL_VERBOSE(("rdmacm CPC not supported with XRC receive queues, please try xoob CPC; skipped"));
opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC not supported with XRC receive queues, please try xoob CPC; skipped");
rc = OMPI_ERR_NOT_SUPPORTED;
goto out;
}
@ -1210,9 +1088,6 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, ompi_btl_
(*cpc)->cbm_start_connect = rdmacm_module_start_connect;
(*cpc)->cbm_endpoint_finalize = rdmacm_connection_shutdown;
(*cpc)->cbm_finalize = NULL;
/* Setting uses_cts=true also guarantees that we'll only be
selected if QP 0 is PP */
(*cpc)->cbm_uses_cts = true;
server = malloc(sizeof(rdmacm_contents_t));
if (NULL == server) {

Просмотреть файл

@ -48,7 +48,7 @@ ompi_btl_openib_connect_base_component_t ompi_btl_openib_connect_xoob = {
/* Query */
xoob_component_query,
/* Finalize */
xoob_component_finalize,
xoob_component_finalize
};
typedef enum {
@ -892,7 +892,7 @@ static void xoob_rml_recv_cb(int status, orte_process_name_t* process_name,
mca_btl_openib_endpoint_invoke_error(NULL);
return;
}
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
mca_btl_openib_endpoint_connected(ib_endpoint);
OPAL_THREAD_UNLOCK(&ib_endpoint->endpoint_lock);
break;
case ENDPOINT_XOOB_CONNECT_XRC_RESPONSE:
@ -911,7 +911,7 @@ static void xoob_rml_recv_cb(int status, orte_process_name_t* process_name,
OPAL_THREAD_LOCK(&ib_endpoint->endpoint_lock);
/* we got srq numbers on our request */
XOOB_SET_REMOTE_INFO(ib_endpoint->rem_info, rem_info);
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
mca_btl_openib_endpoint_connected(ib_endpoint);
OPAL_THREAD_UNLOCK(&ib_endpoint->endpoint_lock);
break;
case ENDPOINT_XOOB_CONNECT_XRC_NR_RESPONSE:
@ -989,7 +989,6 @@ static int xoob_component_query(mca_btl_openib_module_t *openib_btl,
(*cpc)->cbm_start_connect = xoob_module_start_connect;
(*cpc)->cbm_endpoint_finalize = NULL;
(*cpc)->cbm_finalize = NULL;
(*cpc)->cbm_uses_cts = false;
opal_output_verbose(5, mca_btl_base_output,
"openib BTL: xoob CPC available for use on %s",

Просмотреть файл

@ -128,29 +128,19 @@
* used. There is only this function, which tells when a specific
* CPC/BTL module is no longer being used.
*
* cbm_uses_cts: a bool that indicates whether the CPC will use the
* CTS protocol or not.
* - if true: the CPC will post the fragment on
* endpoint->endpoint_cts_frag as a receive buffer and will *not*
* call ompi_btl_openib_post_recvs().
* - if false: the CPC will call ompi_btl_openib_post_recvs() before
* calling ompi_btl_openib_cpc_complete().
*
* There are two functions in the main openib BTL that the CPC may
* There are two functions in the main openib BTL that the CPC will
* call:
*
* - ompi_btl_openib_post_recvs(endpoint): once a QP is locally
* connected to the remote side (but we don't know if the remote side
* is connected to us yet), this function is invoked to post buffers
* on the QP, setup credits for the endpoint, etc. This function is
* *only* invoked if the CPC's cbm_uses_cts is false.
* on the QP, setup credits for the endpoint, etc.
*
* - ompi_btl_openib_cpc_complete(endpoint): once that a CPC knows
* that a QP is connected on *both* sides, this function is invoked to
* tell the main openib BTL "ok, you can use this connection now."
* (e.g., the main openib BTL will either invoke the CTS protocol or
* start sending out fragments that were queued while the connection
* was establishing, etc.).
* - ompi_btl_openib_connected(endpoint): once we know that a QP is
* connected on *both* sides, this function is invoked to tell the
* main openib BTL "ok, you can use this connection now." (e.g., the
* main openib BTL will start sending out fragments that were queued
* while the connection was establing, etc.).
*/
#ifndef BTL_OPENIB_CONNECT_H
#define BTL_OPENIB_CONNECT_H
@ -340,14 +330,6 @@ typedef struct ompi_btl_openib_connect_base_module_t {
/** Finalize the cpc module */
ompi_btl_openib_connect_base_module_finalize_fn_t cbm_finalize;
/** Whether this module will use the CTS protocol or not. This
directly states whether this module will call
mca_btl_openib_endpoint_post_recvs() or not: true = this
module will *not* call _post_recvs() and instead will post the
receive buffer provided at endpoint->endpoint_cts_frag on qp
0. */
bool cbm_uses_cts;
} ompi_btl_openib_connect_base_module_t;
END_C_DECLS

Просмотреть файл

@ -543,14 +543,3 @@ support. Short eager RDMA is not yet supported with progress threads;
its use has been disabled in this job.
This is a warning only; you job will attempt to continue.
#
[cannot raise btl error]
The OpenFabrics driver in Open MPI tried to raise a fatal error, but
failed. Hopefully there was an error message before this one that
gave some more detailed information.
Local host: %s
Source file: %s
Source line: %d
Your job is now going to abort, sorry.