Up to SVN r19488
This commit was SVN r19489. The following SVN revision numbers were found above: r19488 --> open-mpi/ompi@c0b8a4a9b5
Этот коммит содержится в:
родитель
c0b8a4a9b5
Коммит
ea866f9e26
@ -162,12 +162,11 @@ else
|
||||
|
||||
************************************************************************
|
||||
|
||||
Open MPI was unable to find threading support on your system. In the
|
||||
near future, the OMPI development team is considering requiring
|
||||
threading support for proper OMPI execution. This is in part because
|
||||
we are not aware of any users that do not have thread support - so we
|
||||
need you to e-mail us at ompi@ompi-mpi.org and let us know about this
|
||||
problem.
|
||||
Open MPI was unable to find threading support on your system. The
|
||||
OMPI development team is considering requiring threading support for
|
||||
proper OMPI execution. This is in part because we are not aware of
|
||||
any users that do not have thread support - so we need you to e-mail
|
||||
us at ompi@ompi-mpi.org and let us know about this problem.
|
||||
|
||||
************************************************************************
|
||||
|
||||
|
@ -967,15 +967,6 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
|
||||
OBJ_RELEASE(endpoint);
|
||||
}
|
||||
|
||||
/* Finalize the CPC modules on this openib module */
|
||||
for (i = 0; i < openib_btl->num_cpcs; ++i) {
|
||||
if (NULL != openib_btl->cpcs[i]->cbm_finalize) {
|
||||
openib_btl->cpcs[i]->cbm_finalize(openib_btl, openib_btl->cpcs[i]);
|
||||
}
|
||||
free(openib_btl->cpcs[i]);
|
||||
}
|
||||
free(openib_btl->cpcs);
|
||||
|
||||
/* Release SRQ resources */
|
||||
for(qp = 0; qp < mca_btl_openib_component.num_qps; qp++) {
|
||||
if(!BTL_OPENIB_QP_TYPE_PP(qp)) {
|
||||
@ -992,6 +983,15 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl)
|
||||
}
|
||||
}
|
||||
|
||||
/* Finalize the CPC modules on this openib module */
|
||||
for (i = 0; i < openib_btl->num_cpcs; ++i) {
|
||||
if (NULL != openib_btl->cpcs[i]->cbm_finalize) {
|
||||
openib_btl->cpcs[i]->cbm_finalize(openib_btl, openib_btl->cpcs[i]);
|
||||
}
|
||||
free(openib_btl->cpcs[i]);
|
||||
}
|
||||
free(openib_btl->cpcs);
|
||||
|
||||
/* Release device if there are no more users */
|
||||
if(!(--openib_btl->device->btls)) {
|
||||
OBJ_RELEASE(openib_btl->device);
|
||||
|
@ -440,6 +440,28 @@ static void btl_openib_control(mca_btl_base_module_t* btl,
|
||||
(((unsigned char*)clsc_hdr) + skip);
|
||||
}
|
||||
break;
|
||||
case MCA_BTL_OPENIB_CONTROL_CTS:
|
||||
OPAL_OUTPUT((0, "received CTS control message: posted recvs %d, sent cts %d",
|
||||
ep->endpoint_posted_recvs, ep->endpoint_cts_sent));
|
||||
ep->endpoint_cts_received = true;
|
||||
|
||||
/* Only send the CTS back and mark connected if:
|
||||
- we have posted our receives (it's possible that we can
|
||||
get this CTS before this side's CPC has called
|
||||
cpc_complete())
|
||||
- we have not yet sent our CTS
|
||||
|
||||
We don't even want to mark the endpoint connected() until
|
||||
we have posted our receives because otherwise we will
|
||||
trigger credit management (because the rd_credits will
|
||||
still be negative), and Bad Things will happen. */
|
||||
if (ep->endpoint_posted_recvs) {
|
||||
if (!ep->endpoint_cts_sent) {
|
||||
mca_btl_openib_endpoint_send_cts(ep);
|
||||
}
|
||||
mca_btl_openib_endpoint_connected(ep);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
BTL_ERROR(("Unknown message type received by BTL"));
|
||||
break;
|
||||
|
@ -390,6 +390,17 @@ void mca_btl_openib_endpoint_init(mca_btl_openib_module_t *btl,
|
||||
for (qp = 0; qp < mca_btl_openib_component.num_qps; qp++) {
|
||||
endpoint_init_qp(ep, qp);
|
||||
}
|
||||
|
||||
/* If the CPC uses the CTS protocol, provide a frag buffer for the
|
||||
CPC to post */
|
||||
if (ep->endpoint_local_cpc->cbm_uses_cts) {
|
||||
int rc;
|
||||
ompi_free_list_item_t* item;
|
||||
OMPI_FREE_LIST_WAIT(&btl->device->qps[0].recv_free, item, rc);
|
||||
to_base_frag(item)->base.order = 0;
|
||||
to_com_frag(item)->endpoint = ep;
|
||||
ep->endpoint_cts_frag = item;
|
||||
}
|
||||
}
|
||||
|
||||
static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
|
||||
@ -417,6 +428,7 @@ static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
|
||||
endpoint->endpoint_proc = 0;
|
||||
endpoint->endpoint_local_cpc = NULL;
|
||||
endpoint->endpoint_remote_cpc_data = NULL;
|
||||
endpoint->endpoint_initiator = false;
|
||||
endpoint->endpoint_tstamp = 0.0;
|
||||
endpoint->endpoint_state = MCA_BTL_IB_CLOSED;
|
||||
endpoint->endpoint_retries = 0;
|
||||
@ -442,6 +454,10 @@ static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
|
||||
endpoint->use_eager_rdma = false;
|
||||
endpoint->eager_rdma_remote.tokens = 0;
|
||||
endpoint->eager_rdma_local.credits = 0;
|
||||
endpoint->endpoint_cts_frag = NULL;
|
||||
endpoint->endpoint_posted_recvs = false;
|
||||
endpoint->endpoint_cts_received = false;
|
||||
endpoint->endpoint_cts_sent = false;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -459,6 +475,12 @@ static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint)
|
||||
endpoint->endpoint_local_cpc->cbm_endpoint_finalize(endpoint);
|
||||
}
|
||||
|
||||
/* Release CTS buffer */
|
||||
if (NULL != endpoint->endpoint_cts_frag) {
|
||||
MCA_BTL_IB_FRAG_RETURN(endpoint->endpoint_cts_frag);
|
||||
endpoint->endpoint_cts_frag = NULL;
|
||||
}
|
||||
|
||||
/* Release memory resources */
|
||||
do {
|
||||
/* Make sure that mca_btl_openib_endpoint_connect_eager_rdma ()
|
||||
@ -531,8 +553,8 @@ static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint)
|
||||
|
||||
|
||||
/*
|
||||
* call when the connect module has created all the qp's on an
|
||||
* endpoint and needs to have some receive buffers posted
|
||||
* Called when the connect module has created all the qp's on an
|
||||
* endpoint and needs to have some receive buffers posted.
|
||||
*/
|
||||
int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t *endpoint)
|
||||
{
|
||||
@ -549,6 +571,120 @@ int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t *endpoint)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static void cts_sent(mca_btl_base_module_t* btl,
|
||||
struct mca_btl_base_endpoint_t* ep,
|
||||
struct mca_btl_base_descriptor_t* des,
|
||||
int status)
|
||||
{
|
||||
/* Nothing to do/empty function (we can't pass in a NULL pointer
|
||||
for the des_cbfunc) */
|
||||
opal_output(-1, "CTS send completed");
|
||||
}
|
||||
|
||||
/*
|
||||
* Send CTS control fragment
|
||||
*/
|
||||
void mca_btl_openib_endpoint_send_cts(mca_btl_openib_endpoint_t *endpoint)
|
||||
{
|
||||
mca_btl_openib_send_control_frag_t *sc_frag;
|
||||
mca_btl_base_descriptor_t *base_des;
|
||||
mca_btl_openib_frag_t *openib_frag;
|
||||
mca_btl_openib_com_frag_t *com_frag;
|
||||
mca_btl_openib_control_header_t *ctl_hdr;
|
||||
|
||||
OPAL_OUTPUT((0, "SENDING CTS to %s",
|
||||
endpoint->endpoint_proc->proc_ompi->proc_hostname));
|
||||
sc_frag = alloc_control_frag(endpoint->endpoint_btl);
|
||||
if (OPAL_UNLIKELY(NULL == sc_frag)) {
|
||||
BTL_ERROR(("Failed to allocate control buffer"));
|
||||
mca_btl_openib_endpoint_invoke_error(endpoint);
|
||||
return;
|
||||
}
|
||||
|
||||
/* I dislike using the "to_<foo>()" macros; I prefer using the
|
||||
explicit member fields to ensure I get the types right. Since
|
||||
this is not a performance-criticial part of the code, it's
|
||||
ok. */
|
||||
com_frag = &(sc_frag->super.super);
|
||||
openib_frag = &(com_frag->super);
|
||||
base_des = &(openib_frag->base);
|
||||
|
||||
base_des->des_cbfunc = cts_sent;
|
||||
base_des->des_cbdata = NULL;
|
||||
base_des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
|
||||
base_des->order = mca_btl_openib_component.credits_qp;
|
||||
openib_frag->segment.seg_len = sizeof(mca_btl_openib_control_header_t);
|
||||
com_frag->endpoint = endpoint;
|
||||
|
||||
sc_frag->hdr->tag = MCA_BTL_TAG_BTL;
|
||||
sc_frag->hdr->cm_seen = 0;
|
||||
sc_frag->hdr->credits = 0;
|
||||
|
||||
ctl_hdr = (mca_btl_openib_control_header_t*)
|
||||
openib_frag->segment.seg_addr.pval;
|
||||
ctl_hdr->type = MCA_BTL_OPENIB_CONTROL_CTS;
|
||||
|
||||
/* Send the fragment */
|
||||
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
|
||||
if (OMPI_SUCCESS != mca_btl_openib_endpoint_post_send(endpoint, sc_frag)) {
|
||||
BTL_ERROR(("Failed to post CTS send"));
|
||||
mca_btl_openib_endpoint_invoke_error(endpoint);
|
||||
}
|
||||
endpoint->endpoint_cts_sent = true;
|
||||
/* We've sent the frag, so it'll be reclaimed in the normal
|
||||
fashion -- don't let endpoint_destruct() FRAG_RETURN this
|
||||
frag */
|
||||
endpoint->endpoint_cts_frag = NULL;
|
||||
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Called when the CPC has established a connection on an endpoint
|
||||
*/
|
||||
void mca_btl_openib_endpoint_cpc_complete(mca_btl_openib_endpoint_t *endpoint)
|
||||
{
|
||||
/* If the CPC uses the CTS protocol, then start it up */
|
||||
if (NULL != endpoint->endpoint_cts_frag) {
|
||||
/* Post our receives, which will make credit management happy
|
||||
(i.e., rd_credits will be 0) */
|
||||
if (OMPI_SUCCESS != mca_btl_openib_endpoint_post_recvs(endpoint)) {
|
||||
BTL_ERROR(("Failed to post receive buffers"));
|
||||
mca_btl_openib_endpoint_invoke_error(endpoint);
|
||||
return;
|
||||
}
|
||||
endpoint->endpoint_posted_recvs = true;
|
||||
|
||||
/* If this is IB, send the CTS immediately. If this is iWARP,
|
||||
then only send the CTS if this endpoint was the initiator
|
||||
of the connection (the receiver will send its CTS when it
|
||||
receives this side's CTS). Also send the CTS if we already
|
||||
received the peer's CTS (e.g., if this process was slow to
|
||||
call cpc_complete() and we had already received the other
|
||||
side's CTS). */
|
||||
OPAL_OUTPUT((0, "cpc_complete: is IB %d, initiatior %d, cts received: %d",
|
||||
(IBV_TRANSPORT_IB ==
|
||||
endpoint->endpoint_btl->device->ib_dev->transport_type),
|
||||
endpoint->endpoint_initiator,
|
||||
endpoint->endpoint_cts_received));
|
||||
if (IBV_TRANSPORT_IB ==
|
||||
endpoint->endpoint_btl->device->ib_dev->transport_type ||
|
||||
endpoint->endpoint_initiator ||
|
||||
endpoint->endpoint_cts_received) {
|
||||
mca_btl_openib_endpoint_send_cts(endpoint);
|
||||
|
||||
/* If we've already got the CTS from the other side, then
|
||||
mark us as connected */
|
||||
if (endpoint->endpoint_cts_received) {
|
||||
mca_btl_openib_endpoint_connected(endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/* Otherwise, just set the endpoint to "connected" */
|
||||
mca_btl_openib_endpoint_connected(endpoint);
|
||||
}
|
||||
|
||||
/*
|
||||
* called when the connect module has completed setup of an endpoint
|
||||
@ -560,6 +696,7 @@ void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t *endpoint)
|
||||
mca_btl_openib_endpoint_t *ep;
|
||||
bool master = false;
|
||||
|
||||
opal_output(-1, "Now we are CONNECTED");
|
||||
if (MCA_BTL_XRC_ENABLED) {
|
||||
OPAL_THREAD_LOCK(&endpoint->ib_addr->addr_lock);
|
||||
if (MCA_BTL_IB_ADDR_CONNECTED == endpoint->ib_addr->status) {
|
||||
@ -692,7 +829,7 @@ void mca_btl_openib_endpoint_send_credits(mca_btl_openib_endpoint_t* endpoint,
|
||||
frag = endpoint->qps[qp].credit_frag;
|
||||
|
||||
if(OPAL_UNLIKELY(NULL == frag)) {
|
||||
frag = alloc_credit_frag(openib_btl);
|
||||
frag = alloc_control_frag(openib_btl);
|
||||
frag->qp_idx = qp;
|
||||
endpoint->qps[qp].credit_frag = frag;
|
||||
/* set those once and forever */
|
||||
@ -781,7 +918,7 @@ static int mca_btl_openib_endpoint_send_eager_rdma(
|
||||
mca_btl_openib_send_control_frag_t* frag;
|
||||
int rc;
|
||||
|
||||
frag = alloc_credit_frag(openib_btl);
|
||||
frag = alloc_control_frag(openib_btl);
|
||||
if(NULL == frag) {
|
||||
return -1;
|
||||
}
|
||||
@ -938,7 +1075,8 @@ void *mca_btl_openib_endpoint_invoke_error(void *context)
|
||||
if (NULL == endpoint) {
|
||||
int i;
|
||||
for (i = 0; i < mca_btl_openib_component.ib_num_btls; ++i) {
|
||||
if (NULL != mca_btl_openib_component.openib_btls[i]) {
|
||||
if (NULL != mca_btl_openib_component.openib_btls[i] &&
|
||||
NULL != mca_btl_openib_component.openib_btls[i]->error_cb) {
|
||||
btl = mca_btl_openib_component.openib_btls[i];
|
||||
break;
|
||||
}
|
||||
@ -948,9 +1086,11 @@ void *mca_btl_openib_endpoint_invoke_error(void *context)
|
||||
}
|
||||
|
||||
/* If we didn't find a BTL, then just bail :-( */
|
||||
if (NULL == btl) {
|
||||
if (NULL == btl || NULL == btl->error_cb) {
|
||||
orte_show_help("help-mpi-btl-openib.txt",
|
||||
"cannot raise btl error", orte_process_info.nodename);
|
||||
"cannot raise btl error", true,
|
||||
orte_process_info.nodename,
|
||||
__FILE__, __LINE__);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -167,7 +167,15 @@ struct mca_btl_base_endpoint_t {
|
||||
/** hook for local CPC to hang endpoint-specific data */
|
||||
void *endpoint_local_cpc_data;
|
||||
|
||||
/** pointer to remote CPC's data (essentially its CPC modex message) */
|
||||
/** If endpoint_local_cpc->cbm_uses_cts is true and this endpoint
|
||||
is iWARP, then endpoint_initiator must be true on the side
|
||||
that actually initiates the QP, false on the other side. This
|
||||
bool is used to know which way to send the first CTS
|
||||
message. */
|
||||
bool endpoint_initiator;
|
||||
|
||||
/** pointer to remote proc's CPC data (essentially its CPC modex
|
||||
message) */
|
||||
ompi_btl_openib_connect_base_module_data_t *endpoint_remote_cpc_data;
|
||||
|
||||
/** current state of the connection */
|
||||
@ -220,6 +228,22 @@ struct mca_btl_base_endpoint_t {
|
||||
|
||||
/** information about the remote port */
|
||||
mca_btl_openib_rem_info_t rem_info;
|
||||
|
||||
/** Frag for initial wireup CTS protocol; will be NULL if CPC
|
||||
indicates that it does not want to use CTS */
|
||||
ompi_free_list_item_t *endpoint_cts_frag;
|
||||
|
||||
/** Whether we've posted receives on this EP or not (only used in
|
||||
CTS protocol) */
|
||||
bool endpoint_posted_recvs;
|
||||
|
||||
/** Whether we've received the CTS from the peer or not (only used
|
||||
in CTS protocol) */
|
||||
bool endpoint_cts_received;
|
||||
|
||||
/** Whether we've send out CTS to the peer or not (only used in
|
||||
CTS protocol) */
|
||||
bool endpoint_cts_sent;
|
||||
};
|
||||
|
||||
typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t;
|
||||
@ -244,6 +268,8 @@ int mca_btl_openib_endpoint_post_send(mca_btl_openib_endpoint_t*,
|
||||
void mca_btl_openib_endpoint_send_credits(mca_btl_base_endpoint_t*, const int);
|
||||
void mca_btl_openib_endpoint_connect_eager_rdma(mca_btl_openib_endpoint_t*);
|
||||
int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t*);
|
||||
void mca_btl_openib_endpoint_send_cts(mca_btl_openib_endpoint_t *endpoint);
|
||||
void mca_btl_openib_endpoint_cpc_complete(mca_btl_openib_endpoint_t*);
|
||||
void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t*);
|
||||
void mca_btl_openib_endpoint_init(mca_btl_openib_module_t*,
|
||||
mca_btl_base_endpoint_t*,
|
||||
@ -286,7 +312,7 @@ static inline int post_recvs(mca_btl_base_endpoint_t *ep, const int qp,
|
||||
if (0 == rc)
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
BTL_ERROR(("error %d posting receive on qp %d\n", rc, qp));
|
||||
BTL_ERROR(("error %d posting receive on qp %d", rc, qp));
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,23 @@
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
/**
|
||||
* Note: this file is a little fast-n-loose with OMPI_HAVE_THREADS --
|
||||
* it uses this value in run-time "if" conditionals (vs. compile-time
|
||||
* #if conditionals). We also don't protect including <pthread.h>.
|
||||
* That's because this component currently only compiles on Linux and
|
||||
* Solaris, and both of these OS's have pthreads. Using the run-time
|
||||
* conditionals gives us bettern compile-time checking, even of code
|
||||
* that isn't activated.
|
||||
*
|
||||
* Note, too, that the functionality in this file does *not* require
|
||||
* all the heavyweight OMPI thread infrastructure (e.g., from
|
||||
* --enable-mpi-threads or --enable-progress-threads). All work that
|
||||
* is done in a separate progress thread is very carefully segregated
|
||||
* from that of the main thread, and communication back to the main
|
||||
* thread
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include <pthread.h>
|
||||
@ -24,6 +41,11 @@
|
||||
#include "btl_openib_fd.h"
|
||||
|
||||
|
||||
typedef union {
|
||||
ompi_btl_openib_fd_event_callback_fn_t *event;
|
||||
ompi_btl_openib_fd_main_callback_fn_t *main;
|
||||
} callback_u_t;
|
||||
|
||||
/*
|
||||
* Data for each registered item
|
||||
*/
|
||||
@ -33,10 +55,7 @@ typedef struct {
|
||||
opal_event_t ri_event;
|
||||
int ri_fd;
|
||||
int ri_flags;
|
||||
union {
|
||||
ompi_btl_openib_fd_callback_fn_t *fd;
|
||||
ompi_btl_openib_schedule_callback_fn_t *schedule;
|
||||
} ri_callback;
|
||||
callback_u_t ri_callback;
|
||||
void *ri_context;
|
||||
} registered_item_t;
|
||||
|
||||
@ -46,9 +65,14 @@ static OBJ_CLASS_INSTANCE(registered_item_t, opal_list_item_t, NULL, NULL);
|
||||
* Command types
|
||||
*/
|
||||
typedef enum {
|
||||
/* Read by service thread */
|
||||
CMD_TIME_TO_QUIT,
|
||||
CMD_ADD_FD,
|
||||
CMD_REMOVE_FD,
|
||||
ACK_RAN_FUNCTION,
|
||||
|
||||
/* Read by main thread */
|
||||
CMD_CALL_FUNCTION,
|
||||
CMD_MAX
|
||||
} cmd_type_t;
|
||||
|
||||
@ -56,7 +80,7 @@ typedef enum {
|
||||
* Commands. Fields ordered to avoid memory holes (and valgrind warnings).
|
||||
*/
|
||||
typedef struct {
|
||||
ompi_btl_openib_fd_callback_fn_t *pc_callback;
|
||||
callback_u_t pc_fn;
|
||||
void *pc_context;
|
||||
int pc_fd;
|
||||
int pc_flags;
|
||||
@ -64,6 +88,16 @@ typedef struct {
|
||||
char end;
|
||||
} cmd_t;
|
||||
|
||||
/*
|
||||
* Queued up list of commands to send to the main thread
|
||||
*/
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
cmd_t cli_cmd;
|
||||
} cmd_list_item_t;
|
||||
|
||||
static OBJ_CLASS_INSTANCE(cmd_list_item_t, opal_list_item_t, NULL, NULL);
|
||||
|
||||
static bool initialized = false;
|
||||
static int cmd_size = 0;
|
||||
static fd_set read_fds, write_fds;
|
||||
@ -71,117 +105,17 @@ static int max_fd;
|
||||
static opal_list_t registered_items;
|
||||
|
||||
/* These items are only used in the threaded version */
|
||||
/* Owned by the main thread */
|
||||
static pthread_t thread;
|
||||
static int pipe_fd[2] = { -1, -1 };
|
||||
static opal_event_t main_thread_event;
|
||||
static int pipe_to_service_thread[2] = { -1, -1 };
|
||||
|
||||
/* Owned by the service thread */
|
||||
static int pipe_to_main_thread[2] = { -1, -1 };
|
||||
static const size_t max_outstanding_to_main_thread = 32;
|
||||
static size_t waiting_for_ack_from_main_thread = 0;
|
||||
static opal_list_t pending_to_main_thread;
|
||||
|
||||
static void libevent_fd_callback(int fd, short event, void *context)
|
||||
{
|
||||
registered_item_t *ri = (registered_item_t*) context;
|
||||
ri->ri_callback.fd(fd, event, ri->ri_context);
|
||||
}
|
||||
|
||||
|
||||
static void libevent_event_callback(int fd, short event, void *context)
|
||||
{
|
||||
registered_item_t *ri = (registered_item_t*) context;
|
||||
ri->ri_callback.schedule(ri->ri_context);
|
||||
/* JMS Can I free ri now? It contains the event... */
|
||||
#if 0
|
||||
OBJ_RELEASE(ri);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Add an fd to the listening set
|
||||
*/
|
||||
static int local_pipe_cmd_add_fd(bool use_libevent, cmd_t *cmd)
|
||||
{
|
||||
registered_item_t *ri = OBJ_NEW(registered_item_t);
|
||||
if (NULL == ri) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
ri->ri_event_used = false;
|
||||
ri->ri_fd = cmd->pc_fd;
|
||||
ri->ri_flags = cmd->pc_flags;
|
||||
ri->ri_callback.fd = cmd->pc_callback;
|
||||
ri->ri_context = cmd->pc_context;
|
||||
|
||||
if (use_libevent) {
|
||||
/* Make an event for this fd */
|
||||
ri->ri_event_used = true;
|
||||
memset(&ri->ri_event, 0, sizeof(ri->ri_event));
|
||||
opal_event_set(&ri->ri_event, ri->ri_fd,
|
||||
ri->ri_flags | OPAL_EV_PERSIST, libevent_fd_callback,
|
||||
ri);
|
||||
opal_event_add(&ri->ri_event, 0);
|
||||
} else {
|
||||
/* Add the fd to the relevant fd local sets and update max_fd */
|
||||
if (OPAL_EV_READ & ri->ri_flags) {
|
||||
FD_SET(ri->ri_fd, &read_fds);
|
||||
}
|
||||
if (OPAL_EV_WRITE & cmd->pc_flags) {
|
||||
FD_SET(ri->ri_fd, &write_fds);
|
||||
}
|
||||
max_fd = (max_fd > ri->ri_fd) ? max_fd : ri->ri_fd + 1;
|
||||
}
|
||||
|
||||
opal_list_append(®istered_items, &ri->super);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Remove an fd from the listening set
|
||||
*/
|
||||
static int local_pipe_cmd_remove_fd(cmd_t *cmd)
|
||||
{
|
||||
int i;
|
||||
opal_list_item_t *item;
|
||||
registered_item_t *ri;
|
||||
|
||||
/* Go through the list of registered fd's and find the fd to
|
||||
remove */
|
||||
for (item = opal_list_get_first(®istered_items);
|
||||
NULL != opal_list_get_end(®istered_items);
|
||||
item = opal_list_get_next(item)) {
|
||||
ri = (registered_item_t*) item;
|
||||
if (cmd->pc_fd == ri->ri_fd) {
|
||||
/* Found it. The item knows if it was used as a libevent
|
||||
event or an entry in the local fd sets. */
|
||||
if (ri->ri_event_used) {
|
||||
/* Remove this event from libevent */
|
||||
opal_event_del(&ri->ri_event);
|
||||
} else {
|
||||
/* Remove this item from the fd_sets and recalculate
|
||||
max_fd */
|
||||
FD_CLR(cmd->pc_fd, &read_fds);
|
||||
FD_CLR(cmd->pc_fd, &write_fds);
|
||||
for (max_fd = i = pipe_fd[0]; i < FD_SETSIZE; ++i) {
|
||||
if (FD_ISSET(i, &read_fds) || FD_ISSET(i, &write_fds)) {
|
||||
max_fd = i + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Let the caller know that we have stopped monitoring
|
||||
this fd (if they care) */
|
||||
if (NULL != cmd->pc_callback) {
|
||||
cmd->pc_callback(cmd->pc_fd, 0, cmd->pc_context);
|
||||
}
|
||||
|
||||
/* Remove this item from the list of registered items and
|
||||
release it */
|
||||
opal_list_remove_item(®istered_items, item);
|
||||
OBJ_RELEASE(item);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
/* This shouldn't happen */
|
||||
return OMPI_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
/*
|
||||
* Simple loop over reading from a fd
|
||||
@ -225,6 +159,159 @@ static int write_fd(int fd, int len, void *buffer)
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Write a command to the main thread, or queue it up if the pipe is full
|
||||
*/
|
||||
static int write_to_main_thread(cmd_t *cmd)
|
||||
{
|
||||
/* Note that if we write too much to the main thread pipe and the
|
||||
main thread doesn't check it often, we could fill up the pipe
|
||||
and cause this thread to block. Bad! So we do some simple
|
||||
counting here and ensure that we don't fill the pipe. If we
|
||||
are in danger of that, then queue up the commands here in the
|
||||
service thread. The main thread will ACK every CALL_FUNCTION
|
||||
command, so we have a built-in mechanism to wake up the service
|
||||
thread to drain any queued-up commands. */
|
||||
if (opal_list_get_size(&pending_to_main_thread) > 0 ||
|
||||
waiting_for_ack_from_main_thread >= max_outstanding_to_main_thread) {
|
||||
cmd_list_item_t *cli = OBJ_NEW(cmd_list_item_t);
|
||||
if (NULL == cli) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
memcpy(&cli->cli_cmd, cmd, cmd_size);
|
||||
opal_list_append(&pending_to_main_thread, &(cli->super));
|
||||
} else {
|
||||
OPAL_OUTPUT((-1, "fd: writing to main thread"));
|
||||
write_fd(pipe_to_main_thread[1], cmd_size, cmd);
|
||||
++waiting_for_ack_from_main_thread;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static void service_fd_callback(int fd, short event, void *context)
|
||||
{
|
||||
registered_item_t *ri = (registered_item_t*) context;
|
||||
ri->ri_callback.event(fd, event, ri->ri_context);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Add an fd to the listening set
|
||||
*/
|
||||
static int service_pipe_cmd_add_fd(bool use_libevent, cmd_t *cmd)
|
||||
{
|
||||
registered_item_t *ri = OBJ_NEW(registered_item_t);
|
||||
if (NULL == ri) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
ri->ri_event_used = false;
|
||||
ri->ri_fd = cmd->pc_fd;
|
||||
ri->ri_flags = cmd->pc_flags;
|
||||
ri->ri_callback.event = cmd->pc_fn.event;
|
||||
ri->ri_context = cmd->pc_context;
|
||||
|
||||
if (use_libevent) {
|
||||
/* Make an event for this fd */
|
||||
ri->ri_event_used = true;
|
||||
memset(&ri->ri_event, 0, sizeof(ri->ri_event));
|
||||
opal_event_set(&ri->ri_event, ri->ri_fd,
|
||||
ri->ri_flags | OPAL_EV_PERSIST, service_fd_callback,
|
||||
ri);
|
||||
opal_event_add(&ri->ri_event, 0);
|
||||
} else {
|
||||
/* Add the fd to the relevant fd local sets and update max_fd */
|
||||
if (OPAL_EV_READ & ri->ri_flags) {
|
||||
FD_SET(ri->ri_fd, &read_fds);
|
||||
}
|
||||
if (OPAL_EV_WRITE & cmd->pc_flags) {
|
||||
FD_SET(ri->ri_fd, &write_fds);
|
||||
}
|
||||
max_fd = (max_fd > ri->ri_fd) ? max_fd : ri->ri_fd + 1;
|
||||
}
|
||||
|
||||
opal_list_append(®istered_items, &ri->super);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Remove an fd from the listening set
|
||||
*/
|
||||
static int service_pipe_cmd_remove_fd(cmd_t *cmd)
|
||||
{
|
||||
int i;
|
||||
opal_list_item_t *item;
|
||||
registered_item_t *ri;
|
||||
|
||||
OPAL_OUTPUT((-1, "service thread got unmonitor fd %d", cmd->pc_fd));
|
||||
/* Go through the list of registered fd's and find the fd to
|
||||
remove */
|
||||
for (item = opal_list_get_first(®istered_items);
|
||||
NULL != opal_list_get_end(®istered_items);
|
||||
item = opal_list_get_next(item)) {
|
||||
ri = (registered_item_t*) item;
|
||||
if (cmd->pc_fd == ri->ri_fd) {
|
||||
/* Found it. The item knows if it was used as a libevent
|
||||
event or an entry in the local fd sets. */
|
||||
if (ri->ri_event_used) {
|
||||
/* Remove this event from libevent */
|
||||
opal_event_del(&ri->ri_event);
|
||||
} else {
|
||||
/* Remove this item from the fd_sets and recalculate
|
||||
MAX_FD */
|
||||
FD_CLR(cmd->pc_fd, &read_fds);
|
||||
FD_CLR(cmd->pc_fd, &write_fds);
|
||||
for (max_fd = i = pipe_to_service_thread[0]; i < FD_SETSIZE; ++i) {
|
||||
if (FD_ISSET(i, &read_fds) || FD_ISSET(i, &write_fds)) {
|
||||
max_fd = i + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Let the caller know that we have stopped monitoring
|
||||
this fd (if they care) */
|
||||
if (NULL != cmd->pc_fn.event) {
|
||||
cmd->pc_fn.event(cmd->pc_fd, 0, cmd->pc_context);
|
||||
}
|
||||
|
||||
/* Remove this item from the list of registered items and
|
||||
release it */
|
||||
opal_list_remove_item(®istered_items, item);
|
||||
OBJ_RELEASE(item);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
/* This shouldn't happen */
|
||||
return OMPI_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Call a function and ACK that we ran it
|
||||
*/
|
||||
static int main_pipe_cmd_call_function(cmd_t *cmd)
|
||||
{
|
||||
cmd_t local_cmd;
|
||||
|
||||
OPAL_OUTPUT((-1, "fd main thread: calling function!"));
|
||||
/* Call the function */
|
||||
if (NULL != cmd->pc_fn.main) {
|
||||
cmd->pc_fn.main(cmd->pc_context);
|
||||
}
|
||||
|
||||
/* Now ACK that we ran the function */
|
||||
memset(&local_cmd, 0, cmd_size);
|
||||
local_cmd.pc_cmd = ACK_RAN_FUNCTION;
|
||||
write_fd(pipe_to_service_thread[1], cmd_size, &local_cmd);
|
||||
|
||||
/* Done */
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -232,32 +319,52 @@ static int write_fd(int fd, int len, void *buffer)
|
||||
/*
|
||||
* Act on pipe commands
|
||||
*/
|
||||
static bool local_pipe_cmd(void)
|
||||
static bool service_pipe_cmd(void)
|
||||
{
|
||||
bool ret = false;
|
||||
cmd_t cmd;
|
||||
cmd_list_item_t *cli;
|
||||
|
||||
read_fd(pipe_fd[0], cmd_size, &cmd);
|
||||
read_fd(pipe_to_service_thread[0], cmd_size, &cmd);
|
||||
switch (cmd.pc_cmd) {
|
||||
case CMD_ADD_FD:
|
||||
if (OMPI_SUCCESS != local_pipe_cmd_add_fd(false, &cmd)) {
|
||||
OPAL_OUTPUT((-1, "fd service thread: CMD_ADD_FD"));
|
||||
if (OMPI_SUCCESS != service_pipe_cmd_add_fd(false, &cmd)) {
|
||||
ret = true;
|
||||
}
|
||||
break;
|
||||
|
||||
case CMD_REMOVE_FD:
|
||||
if (OMPI_SUCCESS != local_pipe_cmd_remove_fd(&cmd)) {
|
||||
OPAL_OUTPUT((-1, "fd service thread: CMD_REMOVE_FD"));
|
||||
if (OMPI_SUCCESS != service_pipe_cmd_remove_fd(&cmd)) {
|
||||
ret = true;
|
||||
}
|
||||
break;
|
||||
|
||||
case CMD_TIME_TO_QUIT:
|
||||
opal_output(-1, "fd listener thread: time to quit");
|
||||
OPAL_OUTPUT((-1, "fd service thread: CMD_TIME_TO_QUIT"));
|
||||
ret = true;
|
||||
break;
|
||||
|
||||
case ACK_RAN_FUNCTION:
|
||||
/* We don't have a guarantee that the main thread will check
|
||||
its pipe frequently, so we do some simple counting to
|
||||
ensure we just don't have too many outstanding commands to
|
||||
the main thread at any given time. The main thread will
|
||||
ACK every CALL_FUNCTION command, so this thread will always
|
||||
wake up and continue to drain any queued up functions. */
|
||||
cli = (cmd_list_item_t*) opal_list_remove_first(&pending_to_main_thread);
|
||||
if (NULL != cli) {
|
||||
OPAL_OUTPUT((-1, "sending queued up cmd function to main thread"));
|
||||
write_fd(pipe_to_main_thread[1], cmd_size, &(cli->cli_cmd));
|
||||
OBJ_RELEASE(cli);
|
||||
} else {
|
||||
--waiting_for_ack_from_main_thread;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
opal_output(-1, "fd listener thread: unknown pipe command!");
|
||||
OPAL_OUTPUT((-1, "fd service thread: unknown pipe command!"));
|
||||
break;
|
||||
}
|
||||
|
||||
@ -268,7 +375,7 @@ static bool local_pipe_cmd(void)
|
||||
/*
|
||||
* Main thread logic
|
||||
*/
|
||||
static void *thread_main(void *context)
|
||||
static void *service_thread_start(void *context)
|
||||
{
|
||||
int rc, flags;
|
||||
fd_set read_fds_copy, write_fds_copy;
|
||||
@ -278,29 +385,29 @@ static void *thread_main(void *context)
|
||||
/* Make an fd set that we can select() on */
|
||||
FD_ZERO(&write_fds);
|
||||
FD_ZERO(&read_fds);
|
||||
FD_SET(pipe_fd[0], &read_fds);
|
||||
max_fd = pipe_fd[0] + 1;
|
||||
FD_SET(pipe_to_service_thread[0], &read_fds);
|
||||
max_fd = pipe_to_service_thread[0] + 1;
|
||||
|
||||
opal_output(-1, "fd listener thread running");
|
||||
OPAL_OUTPUT((-1, "fd service thread running"));
|
||||
|
||||
/* Main loop waiting for commands over the fd's */
|
||||
while (1) {
|
||||
memcpy(&read_fds_copy, &read_fds, sizeof(read_fds));
|
||||
memcpy(&write_fds_copy, &write_fds, sizeof(write_fds));
|
||||
opal_output(-1, "fd listener thread blocking on select...");
|
||||
OPAL_OUTPUT((-1, "fd service thread blocking on select..."));
|
||||
rc = select(max_fd, &read_fds_copy, &write_fds_copy, NULL, NULL);
|
||||
if (0 != rc && EAGAIN == errno) {
|
||||
continue;
|
||||
}
|
||||
|
||||
opal_output(-1, "fd listener thread woke up!");
|
||||
OPAL_OUTPUT((-1, "fd service thread woke up!"));
|
||||
if (rc > 0) {
|
||||
if (FD_ISSET(pipe_fd[0], &read_fds_copy)) {
|
||||
opal_output(-1, "fd listener thread: pipe command");
|
||||
if (local_pipe_cmd()) {
|
||||
opal_output(-1, "fd listener thread: exiting");
|
||||
if (FD_ISSET(pipe_to_service_thread[0], &read_fds_copy)) {
|
||||
OPAL_OUTPUT((-1, "fd service thread: pipe command"));
|
||||
if (service_pipe_cmd()) {
|
||||
break;
|
||||
}
|
||||
OPAL_OUTPUT((-1, "fd service thread: back from pipe command"));
|
||||
}
|
||||
|
||||
/* Go through all the registered events and see who had
|
||||
@ -324,9 +431,10 @@ static void *thread_main(void *context)
|
||||
|
||||
/* If either was ready, invoke the callback */
|
||||
if (0 != flags) {
|
||||
opal_output(-1, "fd listener thread: invoking callback for registered fd %d", ri->ri_fd);
|
||||
ri->ri_callback.fd(ri->ri_fd, flags,
|
||||
ri->ri_context);
|
||||
OPAL_OUTPUT((-1, "fd service thread: invoking callback for registered fd %d", ri->ri_fd));
|
||||
ri->ri_callback.event(ri->ri_fd, flags,
|
||||
ri->ri_context);
|
||||
OPAL_OUTPUT((-1, "fd service thread: back from callback for registered fd %d", ri->ri_fd));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -334,12 +442,38 @@ static void *thread_main(void *context)
|
||||
}
|
||||
|
||||
/* All done */
|
||||
OPAL_OUTPUT((-1, "fd service thread: exiting"));
|
||||
opal_atomic_wmb();
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static void main_thread_event_callback(int fd, short event, void *context)
|
||||
{
|
||||
cmd_t cmd;
|
||||
|
||||
OPAL_OUTPUT((-1, "main thread -- reading command"));
|
||||
read_fd(pipe_to_main_thread[0], cmd_size, &cmd);
|
||||
switch (cmd.pc_cmd) {
|
||||
case CMD_CALL_FUNCTION:
|
||||
OPAL_OUTPUT((-1, "fd main thread: calling command"));
|
||||
main_pipe_cmd_call_function(&cmd);
|
||||
break;
|
||||
|
||||
default:
|
||||
OPAL_OUTPUT((-1, "fd main thread: unknown pipe command: %d",
|
||||
cmd.pc_cmd));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/******************************************************************
|
||||
* Main interface calls
|
||||
******************************************************************/
|
||||
|
||||
/*
|
||||
* Initialize
|
||||
* Called by main thread
|
||||
*/
|
||||
int ompi_btl_openib_fd_init(void)
|
||||
{
|
||||
@ -348,19 +482,42 @@ int ompi_btl_openib_fd_init(void)
|
||||
|
||||
OBJ_CONSTRUCT(®istered_items, opal_list_t);
|
||||
|
||||
if (OMPI_HAVE_THREAD_SUPPORT) {
|
||||
/* Create a pipe to communicate with the thread */
|
||||
if (0 != pipe(pipe_fd)) {
|
||||
/* Calculate the real size of the cmd struct */
|
||||
cmd_size = (int) (&(bogus.end) - ((char*) &bogus));
|
||||
|
||||
if (OMPI_HAVE_THREADS) {
|
||||
OBJ_CONSTRUCT(&pending_to_main_thread, opal_list_t);
|
||||
|
||||
/* Create pipes to communicate between the two threads */
|
||||
if (0 != pipe(pipe_to_service_thread)) {
|
||||
return OMPI_ERR_IN_ERRNO;
|
||||
}
|
||||
if (0 != pipe(pipe_to_main_thread)) {
|
||||
return OMPI_ERR_IN_ERRNO;
|
||||
}
|
||||
|
||||
if (0 != pthread_create(&thread, NULL, thread_main, NULL)) {
|
||||
/* Create a libevent event that is used in the main thread
|
||||
to watch its pipe */
|
||||
memset(&main_thread_event, 0, sizeof(main_thread_event));
|
||||
opal_event_set(&main_thread_event, pipe_to_main_thread[0],
|
||||
OPAL_EV_READ | OPAL_EV_PERSIST,
|
||||
main_thread_event_callback, NULL);
|
||||
opal_event_add(&main_thread_event, 0);
|
||||
|
||||
/* Start the service thread */
|
||||
if (0 != pthread_create(&thread, NULL, service_thread_start,
|
||||
NULL)) {
|
||||
int errno_save = errno;
|
||||
opal_event_del(&main_thread_event);
|
||||
close(pipe_to_service_thread[0]);
|
||||
close(pipe_to_service_thread[1]);
|
||||
close(pipe_to_main_thread[0]);
|
||||
close(pipe_to_main_thread[1]);
|
||||
errno = errno_save;
|
||||
return OMPI_ERR_IN_ERRNO;
|
||||
}
|
||||
}
|
||||
|
||||
/* Calculate the real size of the cmd struct */
|
||||
cmd_size = (int) (&(bogus.end) - ((char*) &bogus));
|
||||
initialized = true;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
@ -369,9 +526,10 @@ int ompi_btl_openib_fd_init(void)
|
||||
|
||||
/*
|
||||
* Start monitoring an fd
|
||||
* Called by main or service thread; callback will be in service thread
|
||||
*/
|
||||
int ompi_btl_openib_fd_monitor(int fd, int flags,
|
||||
ompi_btl_openib_fd_callback_fn_t *callback,
|
||||
ompi_btl_openib_fd_event_callback_fn_t *callback,
|
||||
void *context)
|
||||
{
|
||||
cmd_t cmd;
|
||||
@ -384,14 +542,15 @@ int ompi_btl_openib_fd_monitor(int fd, int flags,
|
||||
cmd.pc_cmd = CMD_ADD_FD;
|
||||
cmd.pc_fd = fd;
|
||||
cmd.pc_flags = flags;
|
||||
cmd.pc_callback = callback;
|
||||
cmd.pc_fn.event = callback;
|
||||
cmd.pc_context = context;
|
||||
if (OMPI_HAVE_THREAD_SUPPORT) {
|
||||
if (OMPI_HAVE_THREADS) {
|
||||
/* For the threaded version, write a command down the pipe */
|
||||
write_fd(pipe_fd[1], cmd_size, &cmd);
|
||||
OPAL_OUTPUT((-1, "main thread sending monitor fd %d", fd));
|
||||
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
|
||||
} else {
|
||||
/* Otherwise, add it directly */
|
||||
local_pipe_cmd_add_fd(true, &cmd);
|
||||
service_pipe_cmd_add_fd(true, &cmd);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
@ -400,9 +559,10 @@ int ompi_btl_openib_fd_monitor(int fd, int flags,
|
||||
|
||||
/*
|
||||
* Stop monitoring an fd
|
||||
* Called by main or service thread; callback will be in service thread
|
||||
*/
|
||||
int ompi_btl_openib_fd_unmonitor(int fd,
|
||||
ompi_btl_openib_fd_callback_fn_t *callback,
|
||||
ompi_btl_openib_fd_event_callback_fn_t *callback,
|
||||
void *context)
|
||||
{
|
||||
cmd_t cmd;
|
||||
@ -415,14 +575,15 @@ int ompi_btl_openib_fd_unmonitor(int fd,
|
||||
cmd.pc_cmd = CMD_REMOVE_FD;
|
||||
cmd.pc_fd = fd;
|
||||
cmd.pc_flags = 0;
|
||||
cmd.pc_callback = callback;
|
||||
cmd.pc_fn.event = callback;
|
||||
cmd.pc_context = context;
|
||||
if (OMPI_HAVE_THREAD_SUPPORT) {
|
||||
if (OMPI_HAVE_THREADS) {
|
||||
/* For the threaded version, write a command down the pipe */
|
||||
write_fd(pipe_fd[1], cmd_size, &cmd);
|
||||
OPAL_OUTPUT((-1, "main thread sending unmonitor fd %d", fd));
|
||||
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
|
||||
} else {
|
||||
/* Otherwise, remove it directly */
|
||||
local_pipe_cmd_remove_fd(&cmd);
|
||||
service_pipe_cmd_remove_fd(&cmd);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
@ -430,31 +591,25 @@ int ompi_btl_openib_fd_unmonitor(int fd,
|
||||
|
||||
/*
|
||||
* Run a function in the main thread
|
||||
* Called by service thread
|
||||
*/
|
||||
int ompi_btl_openib_fd_schedule(ompi_btl_openib_schedule_callback_fn_t *callback,
|
||||
void *context)
|
||||
int ompi_btl_openib_fd_run_in_main(ompi_btl_openib_fd_main_callback_fn_t *callback,
|
||||
void *context)
|
||||
{
|
||||
if (OMPI_HAVE_THREAD_SUPPORT) {
|
||||
/* For the threaded version, schedule an event for "now" */
|
||||
registered_item_t *ri;
|
||||
struct timeval now;
|
||||
if (OMPI_HAVE_THREADS) {
|
||||
cmd_t cmd;
|
||||
|
||||
ri = OBJ_NEW(registered_item_t);
|
||||
if (NULL == ri) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* Create an event that will run in the main thread */
|
||||
ri->ri_fd = ri->ri_flags = -1;
|
||||
ri->ri_callback.schedule = callback;
|
||||
ri->ri_context = context;
|
||||
ri->ri_event_used = true;
|
||||
opal_evtimer_set(&ri->ri_event, libevent_event_callback, ri);
|
||||
now.tv_sec = 0;
|
||||
now.tv_usec = 0;
|
||||
opal_evtimer_add(&ri->ri_event, &now);
|
||||
OPAL_OUTPUT((-1, "run in main -- sending command"));
|
||||
/* For the threaded version, write a command down the pipe */
|
||||
cmd.pc_cmd = CMD_CALL_FUNCTION;
|
||||
cmd.pc_fd = -1;
|
||||
cmd.pc_flags = 0;
|
||||
cmd.pc_fn.main = callback;
|
||||
cmd.pc_context = context;
|
||||
write_to_main_thread(&cmd);
|
||||
} else {
|
||||
/* For the non-threaded version, just call the function */
|
||||
/* Otherwise, call it directly */
|
||||
OPAL_OUTPUT((-1, "run in main -- calling now!"));
|
||||
callback(context);
|
||||
}
|
||||
|
||||
@ -463,21 +618,32 @@ int ompi_btl_openib_fd_schedule(ompi_btl_openib_schedule_callback_fn_t *callback
|
||||
|
||||
/*
|
||||
* Finalize
|
||||
* Called by main thread
|
||||
*/
|
||||
int ompi_btl_openib_fd_finalize(void)
|
||||
{
|
||||
if (initialized) {
|
||||
if (OMPI_HAVE_THREAD_SUPPORT) {
|
||||
if (OMPI_HAVE_THREADS) {
|
||||
/* For the threaded version, send a command down the pipe */
|
||||
cmd_t cmd;
|
||||
OPAL_OUTPUT((-1, "shutting down openib fd"));
|
||||
opal_event_del(&main_thread_event);
|
||||
memset(&cmd, 0, cmd_size);
|
||||
cmd.pc_cmd = CMD_TIME_TO_QUIT;
|
||||
write_fd(pipe_fd[1], cmd_size, &cmd);
|
||||
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
|
||||
|
||||
pthread_join(thread, NULL);
|
||||
close(pipe_fd[0]);
|
||||
close(pipe_fd[1]);
|
||||
opal_atomic_rmb();
|
||||
|
||||
opal_event_del(&main_thread_event);
|
||||
|
||||
close(pipe_to_service_thread[0]);
|
||||
close(pipe_to_service_thread[1]);
|
||||
close(pipe_to_main_thread[0]);
|
||||
close(pipe_to_main_thread[1]);
|
||||
OBJ_DESTRUCT(&pending_to_main_thread);
|
||||
}
|
||||
OBJ_DESTRUCT(®istered_items);
|
||||
}
|
||||
initialized = false;
|
||||
|
||||
|
@ -18,41 +18,46 @@ BEGIN_C_DECLS
|
||||
/**
|
||||
* Typedef for fd callback function
|
||||
*/
|
||||
typedef void *(ompi_btl_openib_fd_callback_fn_t)(int fd, int flags,
|
||||
void *context);
|
||||
typedef void *(ompi_btl_openib_fd_event_callback_fn_t)(int fd, int flags,
|
||||
void *context);
|
||||
|
||||
/**
|
||||
* Typedef for generic callback function
|
||||
*/
|
||||
typedef void *(ompi_btl_openib_schedule_callback_fn_t)(void *context);
|
||||
typedef void *(ompi_btl_openib_fd_main_callback_fn_t)(void *context);
|
||||
|
||||
/**
|
||||
* Initialize fd monitoring
|
||||
* Initialize fd monitoring.
|
||||
* Called by the main thread.
|
||||
*/
|
||||
int ompi_btl_openib_fd_init(void);
|
||||
|
||||
/**
|
||||
* Start monitoring an fd
|
||||
* Start monitoring an fd.
|
||||
* Called by main or service thread; callback will be in service thread.
|
||||
*/
|
||||
int ompi_btl_openib_fd_monitor(int fd, int flags,
|
||||
ompi_btl_openib_fd_callback_fn_t *callback,
|
||||
ompi_btl_openib_fd_event_callback_fn_t *callback,
|
||||
void *context);
|
||||
|
||||
/**
|
||||
* Stop monitoring an fd
|
||||
* Stop monitoring an fd.
|
||||
* Called by main or service thread; callback will be in service thread.
|
||||
*/
|
||||
int ompi_btl_openib_fd_unmonitor(int fd,
|
||||
ompi_btl_openib_fd_callback_fn_t *callback,
|
||||
ompi_btl_openib_fd_event_callback_fn_t *callback,
|
||||
void *context);
|
||||
|
||||
/**
|
||||
* Run a function in the main thread
|
||||
* Run a function in the main thread.
|
||||
* Called by the service thread.
|
||||
*/
|
||||
int ompi_btl_openib_fd_schedule(ompi_btl_openib_schedule_callback_fn_t callback,
|
||||
void *context);
|
||||
int ompi_btl_openib_fd_run_in_main(ompi_btl_openib_fd_main_callback_fn_t callback,
|
||||
void *context);
|
||||
|
||||
/**
|
||||
* Finalize fd monitoring
|
||||
* Finalize fd monitoring.
|
||||
* Called by the main thread.
|
||||
*/
|
||||
int ompi_btl_openib_fd_finalize(void);
|
||||
|
||||
|
@ -116,6 +116,7 @@ typedef struct mca_btl_openib_footer_t mca_btl_openib_footer_t;
|
||||
#define MCA_BTL_OPENIB_CONTROL_CREDITS 0
|
||||
#define MCA_BTL_OPENIB_CONTROL_RDMA 1
|
||||
#define MCA_BTL_OPENIB_CONTROL_COALESCED 2
|
||||
#define MCA_BTL_OPENIB_CONTROL_CTS 3
|
||||
|
||||
struct mca_btl_openib_control_header_t {
|
||||
uint8_t type;
|
||||
@ -267,7 +268,7 @@ OBJ_CLASS_DECLARATION(mca_btl_openib_coalesced_frag_t);
|
||||
*/
|
||||
|
||||
static inline mca_btl_openib_send_control_frag_t *
|
||||
alloc_credit_frag(mca_btl_openib_module_t *btl)
|
||||
alloc_control_frag(mca_btl_openib_module_t *btl)
|
||||
{
|
||||
int rc;
|
||||
ompi_free_list_item_t *item;
|
||||
|
@ -32,7 +32,7 @@ AC_DEFUN([MCA_btl_openib_POST_CONFIG], [
|
||||
# [action-if-cant-compile])
|
||||
# ------------------------------------------------
|
||||
AC_DEFUN([MCA_btl_openib_CONFIG],[
|
||||
OMPI_VAR_SCOPE_PUSH([cpcs])
|
||||
OMPI_VAR_SCOPE_PUSH([cpcs have_threads])
|
||||
cpcs="oob"
|
||||
|
||||
OMPI_CHECK_OPENIB([btl_openib],
|
||||
@ -54,15 +54,23 @@ AC_DEFUN([MCA_btl_openib_CONFIG],[
|
||||
$1],
|
||||
[$2])
|
||||
|
||||
AC_MSG_CHECKING([for thread support (needed for ibcm/rdmacm)])
|
||||
have_threads=`echo $THREAD_TYPE | awk '{ print [$]1 }'`
|
||||
if test "x$have_threads" = "x"; then
|
||||
have_threads=none
|
||||
fi
|
||||
AC_MSG_RESULT([$have_threads])
|
||||
|
||||
AS_IF([test "$btl_openib_happy" = "yes"],
|
||||
[if test "x$btl_openib_have_xrc" = "x1"; then
|
||||
cpcs="$cpcs xoob"
|
||||
fi
|
||||
if test "x$btl_openib_have_rdmacm" = "x1"; then
|
||||
if test "x$btl_openib_have_rdmacm" = "x1" -a \
|
||||
"$have_threads" != "none"; then
|
||||
cpcs="$cpcs rdmacm"
|
||||
fi
|
||||
if test "x$btl_openib_have_ibcm" = "x1"; then
|
||||
if test "x$btl_openib_have_ibcm" = "x1" -a \
|
||||
"$have_threads" != "none"; then
|
||||
cpcs="$cpcs ibcm"
|
||||
fi
|
||||
AC_MSG_CHECKING([which openib btl cpcs will be built])
|
||||
|
@ -18,10 +18,10 @@
|
||||
#if HAVE_XRC
|
||||
#include "connect/btl_openib_connect_xoob.h"
|
||||
#endif
|
||||
#if OMPI_HAVE_RDMACM
|
||||
#if OMPI_HAVE_RDMACM && OMPI_HAVE_THREADS
|
||||
#include "connect/btl_openib_connect_rdmacm.h"
|
||||
#endif
|
||||
#if OMPI_HAVE_IBCM
|
||||
#if OMPI_HAVE_IBCM && OMPI_HAVE_THREADS
|
||||
#include "connect/btl_openib_connect_ibcm.h"
|
||||
#endif
|
||||
|
||||
@ -44,7 +44,7 @@ static ompi_btl_openib_connect_base_component_t *all[] = {
|
||||
|
||||
/* Always have an entry here so that the CP indexes will always be
|
||||
the same: if RDMA CM is not available, use the "empty" CPC */
|
||||
#if OMPI_HAVE_RDMACM
|
||||
#if OMPI_HAVE_RDMACM && OMPI_HAVE_THREADS
|
||||
&ompi_btl_openib_connect_rdmacm,
|
||||
#else
|
||||
&ompi_btl_openib_connect_empty,
|
||||
@ -52,7 +52,7 @@ static ompi_btl_openib_connect_base_component_t *all[] = {
|
||||
|
||||
/* Always have an entry here so that the CP indexes will always be
|
||||
the same: if IB CM is not available, use the "empty" CPC */
|
||||
#if OMPI_HAVE_IBCM
|
||||
#if OMPI_HAVE_IBCM && OMPI_HAVE_THREADS
|
||||
&ompi_btl_openib_connect_ibcm,
|
||||
#else
|
||||
&ompi_btl_openib_connect_empty,
|
||||
@ -273,6 +273,14 @@ int ompi_btl_openib_connect_base_select_for_local_port(mca_btl_openib_module_t *
|
||||
opal_output(-1, "match cpc for local port: %s",
|
||||
available[i]->cbc_name);
|
||||
|
||||
/* If the CPC wants to use the CTS protocol, check to ensure
|
||||
that QP 0 is PP; if it's not, we can't use this CPC (or the
|
||||
CTS protocol) */
|
||||
if (!BTL_OPENIB_QP_TYPE_PP(0)) {
|
||||
BTL_VERBOSE(("this CPConly supports when the first btl_openib_receive_queues QP is a PP QP"));
|
||||
continue;
|
||||
}
|
||||
|
||||
/* This CPC has indicated that it wants to run on this openib
|
||||
BTL module. Woo hoo! */
|
||||
++cpc_index;
|
||||
|
@ -248,7 +248,7 @@
|
||||
* normal RTU processing. If the RTU is received later, the IBCM
|
||||
* system on the passive side will know that it's effectively a
|
||||
* duplicate, and therefore can be ignored.
|
||||
*/
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
@ -649,6 +649,7 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
|
||||
rc = OMPI_ERR_NOT_SUPPORTED;
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* If we do not have struct ibv_device.transport_device, then
|
||||
we're in an old version of OFED that is IB only (i.e., no
|
||||
iWarp), so we can safely assume that we can use this CPC. */
|
||||
@ -699,12 +700,13 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
|
||||
BTL_VERBOSE(("created cpc module %p for btl %p",
|
||||
(void*)m, (void*)btl));
|
||||
|
||||
/* See if we've already for an IB CM listener for this device */
|
||||
/* See if we've already got an IB CM listener for this device */
|
||||
for (item = opal_list_get_first(&ibcm_cm_listeners);
|
||||
item != opal_list_get_end(&ibcm_cm_listeners);
|
||||
item = opal_list_get_next(item)) {
|
||||
cmh = (ibcm_listen_cm_id_t*) item;
|
||||
if (cmh->ib_context == btl->device->ib_dev_context) {
|
||||
OBJ_RETAIN(cmh);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -753,6 +755,9 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
|
||||
rc = OMPI_ERR_NOT_SUPPORTED;
|
||||
goto error;
|
||||
}
|
||||
OPAL_OUTPUT((-1, "opened ibcm device 0x%" PRIx64 " (%s)",
|
||||
(uint64_t) cmh->cm_device,
|
||||
ibv_get_device_name(cmh->ib_context->device)));
|
||||
|
||||
if (0 != (rc = ib_cm_create_id(cmh->cm_device,
|
||||
&cmh->listen_cm_id, NULL))) {
|
||||
@ -779,9 +784,6 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
|
||||
}
|
||||
|
||||
opal_list_append(&ibcm_cm_listeners, &(cmh->super));
|
||||
} else {
|
||||
/* We found an existing IB CM handle -- bump up the refcount */
|
||||
OBJ_RETAIN(cmh);
|
||||
}
|
||||
m->cmh = cmh;
|
||||
imli = OBJ_NEW(ibcm_module_list_item_t);
|
||||
@ -831,6 +833,9 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl,
|
||||
m->cpc.cbm_start_connect = ibcm_module_start_connect;
|
||||
m->cpc.cbm_endpoint_finalize = ibcm_endpoint_finalize;
|
||||
m->cpc.cbm_finalize = ibcm_module_finalize;
|
||||
/* Setting uses_cts=true also guarantees that we'll only be
|
||||
selected if QP 0 is PP */
|
||||
m->cpc.cbm_uses_cts = true;
|
||||
|
||||
/* Start monitoring the fd associated with the cm_device */
|
||||
ompi_btl_openib_fd_monitor(cmh->cm_device->fd, OPAL_EV_READ,
|
||||
@ -901,7 +906,8 @@ static int qp_create_one(mca_btl_base_endpoint_t* endpoint, int qp,
|
||||
init_attr.cap.max_send_sge = 1;
|
||||
init_attr.cap.max_recv_sge = 1; /* we do not use SG list */
|
||||
if(BTL_OPENIB_QP_TYPE_PP(qp)) {
|
||||
init_attr.cap.max_recv_wr = max_recv_wr;
|
||||
/* Add one for the CTS receive frag that will be posted */
|
||||
init_attr.cap.max_recv_wr = max_recv_wr + 1;
|
||||
} else {
|
||||
init_attr.cap.max_recv_wr = 0;
|
||||
}
|
||||
@ -1265,7 +1271,7 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
|
||||
|
||||
/* Set the endpoint state to "connecting" (this function runs in
|
||||
the main MPI thread; not the service thread, so we can set the
|
||||
endpoint_state here). */
|
||||
endpoint_state here with no memory barriers). */
|
||||
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
|
||||
|
||||
/* Fill in the path record for this peer */
|
||||
@ -1363,9 +1369,6 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
|
||||
}
|
||||
if (0 != (rc = ib_cm_send_req(req->super.cm_id, cm_req))) {
|
||||
BTL_VERBOSE(("Got nonzero return from ib_cm_send_req: %d, errno %d", rc, errno));
|
||||
if (-1 == rc) {
|
||||
perror("Errno is ");
|
||||
}
|
||||
rc = OMPI_ERR_UNREACH;
|
||||
goto err;
|
||||
}
|
||||
@ -1438,11 +1441,9 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
|
||||
*/
|
||||
static void *callback_unlock(int fd, int flags, void *context)
|
||||
{
|
||||
/* We need #if protection in order to prevent unused variable warning */
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
opal_mutex_t *m = (opal_mutex_t*) context;
|
||||
OPAL_THREAD_UNLOCK(m);
|
||||
#endif
|
||||
volatile int *barrier = (volatile int *) context;
|
||||
OPAL_OUTPUT((-1, "ibcm unlocking main thread"));
|
||||
*barrier = 1;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -1455,7 +1456,7 @@ static void ibcm_listen_cm_id_constructor(ibcm_listen_cm_id_t *cmh)
|
||||
|
||||
static void ibcm_listen_cm_id_destructor(ibcm_listen_cm_id_t *cmh)
|
||||
{
|
||||
opal_mutex_t mutex;
|
||||
volatile int barrier = 0;
|
||||
opal_list_item_t *item;
|
||||
|
||||
/* Remove all the ibcm module items */
|
||||
@ -1482,19 +1483,41 @@ static void ibcm_listen_cm_id_destructor(ibcm_listen_cm_id_t *cmh)
|
||||
|
||||
/* Stop monitoring the cm_device's fd (wait for it to be
|
||||
released from the monitoring entity) */
|
||||
OPAL_THREAD_LOCK(&mutex);
|
||||
ompi_btl_openib_fd_unmonitor(cmh->cm_device->fd,
|
||||
callback_unlock,
|
||||
&mutex);
|
||||
OPAL_THREAD_LOCK(&mutex);
|
||||
|
||||
(void*) &barrier);
|
||||
/* JMS debug code while figuring out the IBCM problem */
|
||||
#if 0
|
||||
while (0 == barrier) {
|
||||
sched_yield();
|
||||
}
|
||||
#else
|
||||
{
|
||||
time_t t = time(NULL);
|
||||
OPAL_OUTPUT((-1, "main thread waiting for ibcm barrier"));
|
||||
while (0 == barrier) {
|
||||
sched_yield();
|
||||
if (time(NULL) - t > 5) {
|
||||
OPAL_OUTPUT((-1, "main thread been looping for a long time..."));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Destroy the listener */
|
||||
if (NULL != cmh->listen_cm_id) {
|
||||
OPAL_OUTPUT((-1, "destryoing ibcm listener 0x%" PRIx64,
|
||||
(uint64_t) cmh->listen_cm_id));
|
||||
ib_cm_destroy_id(cmh->listen_cm_id);
|
||||
}
|
||||
|
||||
/* Close the CM device */
|
||||
if (NULL != cmh->cm_device) {
|
||||
OPAL_OUTPUT((-1, "closing ibcm device 0x%" PRIx64 " (%s)",
|
||||
(uint64_t) cmh->cm_device,
|
||||
ibv_get_device_name(cmh->ib_context->device)));
|
||||
ib_cm_close_device(cmh->cm_device);
|
||||
}
|
||||
}
|
||||
@ -1564,7 +1587,7 @@ static int ibcm_module_finalize(mca_btl_openib_module_t *btl,
|
||||
{
|
||||
ibcm_module_t *m = (ibcm_module_t *) cpc;
|
||||
|
||||
/* If we previously successfully initialized, then destroy
|
||||
/* If we previously successfully initialized, then release
|
||||
everything */
|
||||
if (NULL != m && NULL != m->cmh) {
|
||||
OBJ_RELEASE(m->cmh);
|
||||
@ -1702,21 +1725,6 @@ static int qp_to_rts(int qp_index, struct ib_cm_id *cm_id,
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Callback (from main thread) when an incoming IBCM request needs to
|
||||
* initiate a new connection in the other direction.
|
||||
*/
|
||||
static void *callback_set_endpoint_connecting(void *context)
|
||||
{
|
||||
mca_btl_openib_endpoint_t *endpoint =
|
||||
(mca_btl_openib_endpoint_t *) context;
|
||||
|
||||
BTL_VERBOSE(("ibcm scheduled callback: setting endpoint to CONNECTING"));
|
||||
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Callback (from main thread) when an incoming IBCM request needs to
|
||||
* initiate a new connection in the other direction.
|
||||
@ -1895,9 +1903,11 @@ static int request_received(ibcm_listen_cm_id_t *cmh,
|
||||
If this is not the first request, then assume that all the QPs
|
||||
have been created already and we can just lookup what we need. */
|
||||
if (!ie->ie_qps_created) {
|
||||
/* Schedule to set the endpoint_state to "CONNECTING" */
|
||||
ompi_btl_openib_fd_schedule(callback_set_endpoint_connecting,
|
||||
endpoint);
|
||||
/* Set the endpoint_state to "CONNECTING". This is running
|
||||
in the service thread, so we need to do a write barrier. */
|
||||
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
|
||||
opal_atomic_wmb();
|
||||
|
||||
if (OMPI_SUCCESS != (rc = qp_create_all(endpoint, imodule))) {
|
||||
rej_reason = REJ_PASSIVE_SIDE_ERROR;
|
||||
BTL_ERROR(("qp_create_all failed -- reject"));
|
||||
@ -1923,16 +1933,19 @@ static int request_received(ibcm_listen_cm_id_t *cmh,
|
||||
goto reject;
|
||||
}
|
||||
|
||||
/* Post receive buffers. Similar to QP creation, above, we post
|
||||
*all* receive buffers at once (for all QPs). So ensure to only
|
||||
do this for the first request received. If this is not the
|
||||
first request on this endpoint, then assume that all the
|
||||
receive buffers have been posted already. */
|
||||
/* Post a single receive buffer on the smallest QP for the CTS
|
||||
protocol */
|
||||
if (!ie->ie_recv_buffers_posted) {
|
||||
if (OMPI_SUCCESS !=
|
||||
(rc = mca_btl_openib_endpoint_post_recvs(endpoint))) {
|
||||
/* JMS */
|
||||
BTL_VERBOSE(("failed to post recv buffers"));
|
||||
struct ibv_recv_wr *bad_wr, *wr;
|
||||
|
||||
assert(NULL != endpoint->endpoint_cts_frag);
|
||||
wr = &to_recv_frag(endpoint->endpoint_cts_frag)->rd_desc;
|
||||
assert(NULL != wr);
|
||||
wr->next = NULL;
|
||||
|
||||
OPAL_OUTPUT((-1, "REQUEST posting CTS recv buffer"));
|
||||
if (0 != ibv_post_recv(endpoint->qps[mca_btl_openib_component.credits_qp].qp->lcl_qp, wr, &bad_wr)) {
|
||||
BTL_VERBOSE(("failed to post CTS recv buffer"));
|
||||
rej_reason = REJ_PASSIVE_SIDE_ERROR;
|
||||
goto reject;
|
||||
}
|
||||
@ -2045,32 +2058,31 @@ static int request_received(ibcm_listen_cm_id_t *cmh,
|
||||
(ompi_btl_openib_connect_base_module_t *) imodule;
|
||||
cbdata->cscd_endpoint = endpoint;
|
||||
BTL_VERBOSE(("starting connect in other direction"));
|
||||
ompi_btl_openib_fd_schedule(callback_start_connect, cbdata);
|
||||
ompi_btl_openib_fd_run_in_main(callback_start_connect, cbdata);
|
||||
|
||||
TIMER_STOP(REQUEST_RECEIVED);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
BTL_ERROR(("malloc failed"));
|
||||
}
|
||||
|
||||
/* Communicate to the upper layer that the connection on this
|
||||
endpoint has failed */
|
||||
TIMER_STOP(REQUEST_RECEIVED);
|
||||
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
|
||||
endpoint);
|
||||
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
|
||||
endpoint);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
* Callback (from main thread) when the endpoint has been connected
|
||||
*/
|
||||
static void *callback_set_endpoint_connected(void *context)
|
||||
static void *callback_set_endpoint_cpc_complete(void *context)
|
||||
{
|
||||
mca_btl_openib_endpoint_t *endpoint = (mca_btl_openib_endpoint_t*) context;
|
||||
|
||||
BTL_VERBOSE(("calling endpoint_connected"));
|
||||
mca_btl_openib_endpoint_connected(endpoint);
|
||||
BTL_VERBOSE(("*** CONNECTED endpoint_connected done!"));
|
||||
BTL_VERBOSE(("calling endpoint_cpc_complete"));
|
||||
mca_btl_openib_endpoint_cpc_complete(endpoint);
|
||||
BTL_VERBOSE(("*** CONNECTED endpoint_cpc_complete done!"));
|
||||
|
||||
return NULL;
|
||||
}
|
||||
@ -2133,18 +2145,27 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Now that all the qp's are created locally, post some receive
|
||||
buffers, setup credits, etc. The post_recvs() call posts the
|
||||
buffers for all QPs at once, so be sure to only do this for the
|
||||
*first* reply that is received on an endpoint. For all other
|
||||
replies received on an endpoint, we can safely assume that the
|
||||
receive buffers have already been posted. */
|
||||
/* Now that all the qp's are created locally, post a single
|
||||
receive buffer on the smallest QP for the CTS protocol. Be
|
||||
sure to only do this for the *first* reply that is received on
|
||||
an endpoint. For all other replies received on an endpoint, we
|
||||
can safely assume that the CTS receive buffer has already been
|
||||
posted. */
|
||||
if (!ie->ie_recv_buffers_posted) {
|
||||
if (OMPI_SUCCESS !=
|
||||
(rc = mca_btl_openib_endpoint_post_recvs(endpoint))) {
|
||||
BTL_VERBOSE(("failed to post recv buffers"));
|
||||
struct ibv_recv_wr *bad_wr, *wr;
|
||||
|
||||
OPAL_OUTPUT((-1, "REPLY posting CTS recv buffer"));
|
||||
assert(NULL != endpoint->endpoint_cts_frag);
|
||||
wr = &to_recv_frag(endpoint->endpoint_cts_frag)->rd_desc;
|
||||
assert(NULL != wr);
|
||||
wr->next = NULL;
|
||||
|
||||
if (0 != ibv_post_recv(endpoint->qps[mca_btl_openib_component.credits_qp].qp->lcl_qp, wr, &bad_wr)) {
|
||||
/* JMS */
|
||||
BTL_VERBOSE(("failed to post CTS recv buffer"));
|
||||
goto error;
|
||||
}
|
||||
|
||||
ie->ie_recv_buffers_posted = true;
|
||||
}
|
||||
|
||||
@ -2168,7 +2189,7 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
|
||||
that we're done. */
|
||||
if (0 == --(ie->ie_qps_to_connect)) {
|
||||
BTL_VERBOSE(("REPLY telling main BTL we're connected"));
|
||||
ompi_btl_openib_fd_schedule(callback_set_endpoint_connected, endpoint);
|
||||
ompi_btl_openib_fd_run_in_main(callback_set_endpoint_cpc_complete, endpoint);
|
||||
}
|
||||
|
||||
TIMER_STOP(REPLY_RECEIVED);
|
||||
@ -2177,8 +2198,8 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
|
||||
error:
|
||||
/* Communicate to the upper layer that the connection on this
|
||||
endpoint has failed */
|
||||
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
|
||||
endpoint);
|
||||
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
|
||||
endpoint);
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -2207,7 +2228,7 @@ static int ready_to_use_received(ibcm_listen_cm_id_t *h,
|
||||
that we're done. */
|
||||
if (0 == --(ie->ie_qps_to_connect)) {
|
||||
BTL_VERBOSE(("RTU telling main BTL we're connected"));
|
||||
ompi_btl_openib_fd_schedule(callback_set_endpoint_connected, endpoint);
|
||||
ompi_btl_openib_fd_run_in_main(callback_set_endpoint_cpc_complete, endpoint);
|
||||
}
|
||||
|
||||
BTL_VERBOSE(("all done"));
|
||||
@ -2216,25 +2237,6 @@ static int ready_to_use_received(ibcm_listen_cm_id_t *h,
|
||||
}
|
||||
|
||||
|
||||
static int disconnect_request_received(ibcm_listen_cm_id_t *cmh,
|
||||
struct ib_cm_event *event)
|
||||
{
|
||||
BTL_VERBOSE(("disconnect request received"));
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int disconnect_reply_received(ibcm_listen_cm_id_t *cmd,
|
||||
struct ib_cm_event *event)
|
||||
{
|
||||
BTL_VERBOSE(("disconnect reply received"));
|
||||
#if 0
|
||||
ib_cm_send_drep(event->cm_id, NULL, 0);
|
||||
#endif
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int reject_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
|
||||
{
|
||||
enum ib_cm_rej_reason reason = event->param.rej_rcvd.reason;
|
||||
@ -2298,7 +2300,7 @@ static int reject_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
|
||||
reason));
|
||||
/* Communicate to the upper layer that the connection on this
|
||||
endpoint has failed */
|
||||
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error, NULL);
|
||||
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, NULL);
|
||||
return OMPI_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
@ -2330,8 +2332,8 @@ static int request_error(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
|
||||
|
||||
/* Communicate to the upper layer that the connection on this
|
||||
endpoint has failed */
|
||||
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
|
||||
endpoint);
|
||||
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
|
||||
endpoint);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -2362,28 +2364,12 @@ static int reply_error(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event)
|
||||
|
||||
/* Communicate to the upper layer that the connection on this
|
||||
endpoint has failed */
|
||||
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
|
||||
endpoint);
|
||||
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
|
||||
endpoint);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int disconnect_request_error(ibcm_listen_cm_id_t *cmh,
|
||||
struct ib_cm_event *e)
|
||||
{
|
||||
BTL_VERBOSE(("disconnect request error!"));
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int unhandled_event(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *e)
|
||||
{
|
||||
BTL_VERBOSE(("unhandled event error (%p, %d)",
|
||||
(void*) e, e->event));
|
||||
return OMPI_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
|
||||
static void *ibcm_event_dispatch(int fd, int flags, void *context)
|
||||
{
|
||||
bool want_ack;
|
||||
@ -2391,38 +2377,46 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context)
|
||||
ibcm_listen_cm_id_t *cmh = (ibcm_listen_cm_id_t*) context;
|
||||
struct ib_cm_event *e = NULL;
|
||||
|
||||
OPAL_OUTPUT((-1, "ibcm dispatch: on device 0x%" PRIx64", fd %d",
|
||||
(uint64_t) cmh->cm_device, fd));
|
||||
TIMER_START(CM_GET_EVENT);
|
||||
/* Blocks until next event, which should be immediately (because
|
||||
we shouldn't call this dispatch function unless there's
|
||||
something ready to read) */
|
||||
rc = ib_cm_get_event(cmh->cm_device, &e);
|
||||
TIMER_STOP(CM_GET_EVENT);
|
||||
if (-ENODATA == rc) {
|
||||
OPAL_OUTPUT((-1, "ibcm dispatch: GOT NOT DATA!"));
|
||||
return NULL;
|
||||
}
|
||||
while (-1 == rc && EAGAIN == errno) {
|
||||
OPAL_OUTPUT((-1, "ibcm dispatch: GOT EAGAIN!"));
|
||||
/* Try again */
|
||||
rc = ib_cm_get_event(cmh->cm_device, &e);
|
||||
}
|
||||
if (0 == rc && NULL != e) {
|
||||
want_ack = true;
|
||||
switch (e->event) {
|
||||
case IB_CM_REQ_RECEIVED:
|
||||
OPAL_OUTPUT((-1, "ibcm dispatch: request received on fd %d", fd));
|
||||
/* Incoming request */
|
||||
rc = request_received(cmh, e);
|
||||
break;
|
||||
|
||||
case IB_CM_REP_RECEIVED:
|
||||
OPAL_OUTPUT((-1, "ibcm dispatch: reply received on fd %d", fd));
|
||||
/* Reply received */
|
||||
rc = reply_received(cmh, e);
|
||||
break;
|
||||
|
||||
case IB_CM_RTU_RECEIVED:
|
||||
OPAL_OUTPUT((-1, "ibcm dispatch: RTU received on fd %d", fd));
|
||||
/* Ready to use! */
|
||||
rc = ready_to_use_received(cmh, e);
|
||||
break;
|
||||
|
||||
case IB_CM_DREQ_RECEIVED:
|
||||
/* Disconnect request */
|
||||
rc = disconnect_request_received(cmh, e);
|
||||
break;
|
||||
|
||||
case IB_CM_DREP_RECEIVED:
|
||||
/* Disconnect reply */
|
||||
rc = disconnect_reply_received(cmh, e);
|
||||
break;
|
||||
|
||||
|
||||
case IB_CM_REJ_RECEIVED:
|
||||
OPAL_OUTPUT((-1, "ibcm dispatch: reject received on fd %d", fd));
|
||||
/* Rejected connection */
|
||||
rc = reject_received(cmh, e);
|
||||
/* reject_received() called ib_cm_ack_event so that the CM
|
||||
@ -2431,22 +2425,32 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context)
|
||||
break;
|
||||
|
||||
case IB_CM_REQ_ERROR:
|
||||
OPAL_OUTPUT((-1, "ibcm dispatch: request error received on fd %d", fd));
|
||||
/* Request error */
|
||||
rc = request_error(cmh, e);
|
||||
break;
|
||||
|
||||
case IB_CM_REP_ERROR:
|
||||
OPAL_OUTPUT((-1, "ibcm dispatch: reply error received on fd %d", fd));
|
||||
/* Reply error */
|
||||
rc = reply_error(cmh, e);
|
||||
break;
|
||||
|
||||
case IB_CM_DREQ_RECEIVED:
|
||||
case IB_CM_DREP_RECEIVED:
|
||||
case IB_CM_DREQ_ERROR:
|
||||
/* Disconnect request error */
|
||||
rc = disconnect_request_error(cmh, e);
|
||||
OPAL_OUTPUT((-1, "ibcm dispatch: %s received on fd %d",
|
||||
(IB_CM_DREQ_RECEIVED == e->event) ? "disconnect request" :
|
||||
(IB_CM_DREP_RECEIVED == e->event) ? "disconnect reply" :
|
||||
"disconnect request error", fd));
|
||||
/* We don't care */
|
||||
rc = OMPI_SUCCESS;
|
||||
break;
|
||||
|
||||
default:
|
||||
rc = unhandled_event(cmh, e);
|
||||
/* This would be odd */
|
||||
OPAL_OUTPUT((-1, "ibcm dispatch: unhandled event received on fd %d", fd));
|
||||
rc = OMPI_ERR_NOT_FOUND;
|
||||
break;
|
||||
}
|
||||
|
||||
@ -2462,6 +2466,8 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context)
|
||||
function), the dispatch function would have done that
|
||||
already */
|
||||
}
|
||||
} else {
|
||||
OPAL_OUTPUT((-1, "Got weird value back from ib_cm_get_event: %d", rc));
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -86,7 +86,7 @@ ompi_btl_openib_connect_base_component_t ompi_btl_openib_connect_oob = {
|
||||
/* Query */
|
||||
oob_component_query,
|
||||
/* Finalize */
|
||||
oob_component_finalize
|
||||
oob_component_finalize,
|
||||
};
|
||||
|
||||
/* Open - this functions sets up any oob specific commandline params */
|
||||
@ -166,6 +166,7 @@ static int oob_component_query(mca_btl_openib_module_t *btl,
|
||||
(*cpc)->cbm_start_connect = oob_module_start_connect;
|
||||
(*cpc)->cbm_endpoint_finalize = NULL;
|
||||
(*cpc)->cbm_finalize = NULL;
|
||||
(*cpc)->cbm_uses_cts = false;
|
||||
|
||||
opal_output_verbose(5, mca_btl_base_output,
|
||||
"openib BTL: oob CPC available for use on %s",
|
||||
@ -828,19 +829,19 @@ static void rml_recv_cb(int status, orte_process_name_t* process_name,
|
||||
} else {
|
||||
send_connect_data(ib_endpoint, ENDPOINT_CONNECT_ACK);
|
||||
/* Tell main BTL that we're done */
|
||||
mca_btl_openib_endpoint_connected(ib_endpoint);
|
||||
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
|
||||
}
|
||||
break;
|
||||
|
||||
case MCA_BTL_IB_WAITING_ACK:
|
||||
/* Tell main BTL that we're done */
|
||||
mca_btl_openib_endpoint_connected(ib_endpoint);
|
||||
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
|
||||
break;
|
||||
|
||||
case MCA_BTL_IB_CONNECT_ACK:
|
||||
send_connect_data(ib_endpoint, ENDPOINT_CONNECT_ACK);
|
||||
/* Tell main BTL that we're done */
|
||||
mca_btl_openib_endpoint_connected(ib_endpoint);
|
||||
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
|
||||
break;
|
||||
|
||||
case MCA_BTL_IB_CONNECTED:
|
||||
|
@ -43,12 +43,10 @@
|
||||
#undef event
|
||||
|
||||
static void rdmacm_component_register(void);
|
||||
static int rdmacm_component_init(void);
|
||||
static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl,
|
||||
ompi_btl_openib_connect_base_module_t **cpc);
|
||||
static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
|
||||
mca_btl_base_endpoint_t *endpoint);
|
||||
static int rdmacm_component_destroy(void);
|
||||
static int rdmacm_component_init(void);
|
||||
|
||||
ompi_btl_openib_connect_base_component_t ompi_btl_openib_connect_rdmacm = {
|
||||
"rdmacm",
|
||||
@ -103,7 +101,7 @@ static int rdmacm_priority = 30;
|
||||
static uint16_t rdmacm_port = 0;
|
||||
static uint32_t rdmacm_addr = 0;
|
||||
|
||||
#define RDMA_RESOLVE_ADDR_TIMEOUT 2000
|
||||
#define RDMACM_RESOLVE_ADDR_TIMEOUT 2000
|
||||
|
||||
/* Open - this functions sets up any rdma_cm specific commandline params */
|
||||
static void rdmacm_component_register(void)
|
||||
@ -187,8 +185,9 @@ static void rdmacm_cleanup(rdmacm_contents_t *local,
|
||||
struct rdma_cm_id *id,
|
||||
uint32_t num)
|
||||
{
|
||||
if (NULL == id)
|
||||
if (NULL == id) {
|
||||
return;
|
||||
}
|
||||
|
||||
free(id->context);
|
||||
id->context = NULL;
|
||||
@ -251,7 +250,8 @@ static int rdmacm_setup_qp(rdmacm_contents_t *local,
|
||||
attr.send_cq = local->openib_btl->device->ib_cq[BTL_OPENIB_LP_CQ];
|
||||
attr.recv_cq = local->openib_btl->device->ib_cq[qp_cq_prio(qpnum)];
|
||||
attr.srq = srq;
|
||||
attr.cap.max_recv_wr = max_recv_wr;
|
||||
/* Add one for the CTS receive frag that will be posted */
|
||||
attr.cap.max_recv_wr = max_recv_wr + 1;
|
||||
attr.cap.max_send_wr = max_send_wr;
|
||||
attr.cap.max_inline_data = req_inline =
|
||||
max_inline_size(qpnum, local->openib_btl->device);
|
||||
@ -282,9 +282,9 @@ out:
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int rdma_client_connect_one(rdmacm_contents_t *local,
|
||||
message_t *message,
|
||||
int num)
|
||||
static int rdmacm_client_connect_one(rdmacm_contents_t *local,
|
||||
message_t *message,
|
||||
int num)
|
||||
{
|
||||
struct sockaddr_in din;
|
||||
id_contexts_t *context;
|
||||
@ -319,10 +319,11 @@ static int rdma_client_connect_one(rdmacm_contents_t *local,
|
||||
* RDMA_CM_EVENT_ADDR_RESOLVED event will occur on the local event
|
||||
* handler.
|
||||
*/
|
||||
OPAL_OUTPUT((0, "Resolving id: 0x%x", local->id[num]));
|
||||
rc = rdma_resolve_addr(local->id[num],
|
||||
NULL,
|
||||
(struct sockaddr *)&din,
|
||||
RDMA_RESOLVE_ADDR_TIMEOUT);
|
||||
RDMACM_RESOLVE_ADDR_TIMEOUT);
|
||||
if (0 != rc) {
|
||||
BTL_ERROR(("Failed to resolve the remote address with %d", rc));
|
||||
goto out1;
|
||||
@ -336,20 +337,78 @@ out:
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
static int rdma_client_connect(rdmacm_contents_t *local, message_t *message)
|
||||
static char *stringify(uint32_t addr)
|
||||
{
|
||||
char *line;
|
||||
asprintf(&line, "%d.%d.%d.%d",
|
||||
addr & 0xff,
|
||||
(addr >> 8) & 0xff,
|
||||
(addr >> 16) & 0xff,
|
||||
(addr >> 24));
|
||||
return line;
|
||||
}
|
||||
|
||||
/* To avoid all kinds of nasty race conditions, we only allow
|
||||
* connections to be made in one direction. So use a simple
|
||||
* (arbitrary) test to decide which direction is allowed to initiate
|
||||
* the connection: the process with the lower IP address wins. If the
|
||||
* IP addresses are the same (i.e., the MPI procs are on the same
|
||||
* node), then the process with the lower TCP port wins.
|
||||
*/
|
||||
static bool i_initiate(uint32_t local_ipaddr, uint16_t local_port,
|
||||
uint32_t remote_ipaddr, uint16_t remote_port)
|
||||
{
|
||||
char *a = stringify(local_ipaddr);
|
||||
char *b = stringify(remote_ipaddr);
|
||||
|
||||
if (local_ipaddr > remote_ipaddr ||
|
||||
(local_ipaddr == remote_ipaddr && local_port < remote_port)) {
|
||||
OPAL_OUTPUT((0, "i_initiate (I WIN): local ipaddr %s, remote ipaddr %s",
|
||||
a, b));
|
||||
free(a);
|
||||
free(b);
|
||||
return true;
|
||||
} else {
|
||||
OPAL_OUTPUT((0, "i_initiate (I lose): local ipaddr %s, remote ipaddr %s",
|
||||
a, b));
|
||||
free(a);
|
||||
free(b);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
static int rdmacm_client_connect(rdmacm_contents_t *local, message_t *message)
|
||||
{
|
||||
int rc, qp;
|
||||
|
||||
local->id = malloc(sizeof(struct rdma_cm_id *) * mca_btl_openib_component.num_qps);
|
||||
/* If we're not the initiator, allocate an extra ID for the bogus
|
||||
QP that we expect to be rejected */
|
||||
qp = mca_btl_openib_component.num_qps;
|
||||
if (!local->endpoint->endpoint_initiator) {
|
||||
++qp;
|
||||
}
|
||||
local->id = calloc(qp, sizeof(struct rdma_cm_id *));
|
||||
if (NULL == local->id) {
|
||||
BTL_ERROR(("malloc error"));
|
||||
return OMPI_ERROR;
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
for (qp = 0; qp < mca_btl_openib_component.num_qps; qp++) {
|
||||
rc = rdma_client_connect_one(local, message, qp);
|
||||
/* If we're the initiator, then open all the QPs */
|
||||
if (local->endpoint->endpoint_initiator) {
|
||||
for (qp = 0; qp < mca_btl_openib_component.num_qps; qp++) {
|
||||
rc = rdmacm_client_connect_one(local, message, qp);
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
BTL_ERROR(("rdmacm_client_connect_one error (real QP %d)",
|
||||
qp));
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Otherwise, only open 1 QP that we expect to be rejected */
|
||||
else {
|
||||
rc = rdmacm_client_connect_one(local, message, qp - 1);
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
BTL_ERROR(("rdma_client_connect_one error"));
|
||||
BTL_ERROR(("rdmacm_client_connect_one error (bogus QP)"));
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
@ -358,9 +417,12 @@ static int rdma_client_connect(rdmacm_contents_t *local, message_t *message)
|
||||
|
||||
out:
|
||||
for (; qp >= 0; qp--) {
|
||||
rdmacm_cleanup(local, local->id[qp], qp);
|
||||
if (NULL != local->id[qp]) {
|
||||
rdmacm_cleanup(local, local->id[qp], qp);
|
||||
}
|
||||
}
|
||||
return OMPI_ERROR;
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Connect method called by the upper layers to connect the local
|
||||
@ -370,11 +432,18 @@ out:
|
||||
static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
|
||||
mca_btl_base_endpoint_t *endpoint)
|
||||
{
|
||||
rdmacm_contents_t *client;
|
||||
message_t *message;
|
||||
rdmacm_contents_t *local;
|
||||
message_t *message, *local_message;
|
||||
int rc;
|
||||
|
||||
message = (message_t *)endpoint->endpoint_remote_cpc_data->cbm_modex_message;
|
||||
/* Don't use the CPC to get the message, because this function is
|
||||
invoked from the event_handler (to intitiate connections in the
|
||||
Right direction), where we don't have the CPC, so it'll be
|
||||
NULL. */
|
||||
local_message =
|
||||
(message_t *) endpoint->endpoint_local_cpc->data.cbm_modex_message;
|
||||
message = (message_t *)
|
||||
endpoint->endpoint_remote_cpc_data->cbm_modex_message;
|
||||
|
||||
BTL_VERBOSE(("Connecting to remote ip addr = %x, port = %d ep state = %d",
|
||||
message->ipaddr, message->tcp_port, endpoint->endpoint_state));
|
||||
@ -385,28 +454,43 @@ static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cp
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
endpoint->endpoint_state = MCA_BTL_IB_CONNECT_ACK;
|
||||
/* Set the endpoint state to "connecting" (this function runs in
|
||||
the main MPI thread; not the service thread, so we can set the
|
||||
endpoint_state here). */
|
||||
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
|
||||
|
||||
client = calloc(1, sizeof(rdmacm_contents_t));
|
||||
if (NULL == client) {
|
||||
BTL_ERROR(("malloc of client failed"));
|
||||
local = calloc(1, sizeof(rdmacm_contents_t));
|
||||
if (NULL == local) {
|
||||
BTL_ERROR(("malloc of local failed"));
|
||||
goto out;
|
||||
}
|
||||
|
||||
client->openib_btl = endpoint->endpoint_btl;
|
||||
client->endpoint = endpoint;
|
||||
client->server = false;
|
||||
local->openib_btl = endpoint->endpoint_btl;
|
||||
local->endpoint = endpoint;
|
||||
local->server = false;
|
||||
/* Populate the port information with the local port the server is
|
||||
* listening on instead of the ephemerial port this client is
|
||||
* connecting with. This port is used to determine which endpoint
|
||||
* is being connected from, in the isntance where there are
|
||||
* multiple listeners on the local system.
|
||||
* is being connected from, in the case where there are multiple
|
||||
* listeners on the local system.
|
||||
*/
|
||||
client->tcp_port = ((message_t *)endpoint->endpoint_local_cpc->data.cbm_modex_message)->tcp_port;
|
||||
local->ipaddr = local_message->ipaddr;
|
||||
local->tcp_port = local_message->tcp_port;
|
||||
|
||||
rc = rdma_client_connect(client, message);
|
||||
/* Are we the initiator? Or do we expect this connect request to
|
||||
be rejected? */
|
||||
endpoint->endpoint_initiator =
|
||||
i_initiate(local->ipaddr, local->tcp_port,
|
||||
message->ipaddr, message->tcp_port);
|
||||
OPAL_OUTPUT((0, "Start connect; ep=0x%x (0x%x), I %s the initiator to %s",
|
||||
endpoint,
|
||||
endpoint->endpoint_local_cpc,
|
||||
endpoint->endpoint_initiator ? "am" : "am NOT",
|
||||
endpoint->endpoint_proc->proc_ompi->proc_hostname));
|
||||
|
||||
rc = rdmacm_client_connect(local, message);
|
||||
if (0 != rc) {
|
||||
BTL_ERROR(("rdma_client_connect error"));
|
||||
BTL_ERROR(("rdmacm_client_connect error"));
|
||||
goto out;
|
||||
}
|
||||
|
||||
@ -447,15 +531,23 @@ static int handle_connect_request(rdmacm_contents_t *local,
|
||||
}
|
||||
|
||||
message = endpoint->endpoint_remote_cpc_data->cbm_modex_message;
|
||||
endpoint->endpoint_initiator =
|
||||
i_initiate(local->ipaddr, local->tcp_port,
|
||||
message->ipaddr, rem_port);
|
||||
|
||||
BTL_VERBOSE(("ep state = %d, local ipaddr = %x, remote ipaddr = %x port %d",
|
||||
endpoint->endpoint_state, local->ipaddr, message->ipaddr, rem_port));
|
||||
BTL_VERBOSE(("ep state = %d, local ipaddr = %x, remote ipaddr = %x, local port = %d, remote port = %d",
|
||||
endpoint->endpoint_state, local->ipaddr, message->ipaddr,
|
||||
local->tcp_port, rem_port));
|
||||
|
||||
if ((local->ipaddr == message->ipaddr && local->tcp_port < rem_port) ||
|
||||
local->ipaddr > message->ipaddr) {
|
||||
OPAL_OUTPUT((0, "in handle_connect_request; ep=0x%x (0x%x), I still %s the initiator to %s",
|
||||
endpoint,
|
||||
endpoint->endpoint_local_cpc,
|
||||
endpoint->endpoint_initiator ? "am" : "am NOT",
|
||||
endpoint->endpoint_proc->proc_ompi->proc_hostname));
|
||||
if (endpoint->endpoint_initiator) {
|
||||
int race = 1;
|
||||
|
||||
BTL_VERBOSE(("Received a connect request from an endpoint in the wrong direction"));
|
||||
OPAL_OUTPUT((0, "Received a connect request from an endpoint in the wrong direction"));
|
||||
|
||||
/* This will cause a event on the remote system. By passing in
|
||||
* a value in the second arg of rdma_reject, the remote side
|
||||
@ -468,19 +560,17 @@ static int handle_connect_request(rdmacm_contents_t *local,
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* If there are multiple QPs attempting to connect from the
|
||||
* wrong direction, only make one call to
|
||||
* rdmacm_module_start_connect to connect in the proper
|
||||
* direction, as it will connect to the remote side with the
|
||||
* correct number of QPs.
|
||||
*/
|
||||
if (0 == qpnum) {
|
||||
rdmacm_module_start_connect(NULL, endpoint);
|
||||
}
|
||||
OPAL_OUTPUT((0, "Starting connection in other direction"));
|
||||
rdmacm_module_start_connect(NULL, endpoint);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Set the endpoint_state to "CONNECTING". This is running
|
||||
in the service thread, so we need to do a write barrier. */
|
||||
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
|
||||
opal_atomic_wmb();
|
||||
|
||||
endpoint->rem_info.rem_index = rem_index;
|
||||
|
||||
/* Setup QP for new connection */
|
||||
@ -493,24 +583,26 @@ static int handle_connect_request(rdmacm_contents_t *local,
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* Recvs must be posted prior to accepting the rdma connection.
|
||||
* Otherwise, it is possible to get data before there are recvs to
|
||||
* put it, which for iWARP will result in tearing down of the
|
||||
* connection.
|
||||
*/
|
||||
if (BTL_OPENIB_QP_TYPE_PP(qpnum)) {
|
||||
rc = mca_btl_openib_endpoint_post_rr(endpoint, qpnum);
|
||||
} else {
|
||||
rc = mca_btl_openib_post_srr(endpoint->endpoint_btl, qpnum);
|
||||
}
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
BTL_ERROR(("mca_btl_openib_endpoint_post_rr_nolock error %d", rc));
|
||||
goto out1;
|
||||
/* Post a single receive buffer on the smallest QP for the CTS
|
||||
protocol */
|
||||
if (mca_btl_openib_component.credits_qp == qpnum) {
|
||||
struct ibv_recv_wr *bad_wr, *wr;
|
||||
|
||||
assert(NULL != endpoint->endpoint_cts_frag);
|
||||
wr = &to_recv_frag(endpoint->endpoint_cts_frag)->rd_desc;
|
||||
assert(NULL != wr);
|
||||
wr->next = NULL;
|
||||
|
||||
if (0 != ibv_post_recv(endpoint->qps[qpnum].qp->lcl_qp,
|
||||
wr, &bad_wr)) {
|
||||
BTL_ERROR(("failed to post CTS recv buffer"));
|
||||
goto out1;
|
||||
}
|
||||
}
|
||||
|
||||
/* Since the event id is already created, we cannot add this
|
||||
* information in the normal way. Instead we must reference its
|
||||
* location and put the data there so that it can be access later.
|
||||
* location and put the data there so that it can be accessed later.
|
||||
*/
|
||||
event->id->context = malloc(sizeof(id_contexts_t));
|
||||
if (NULL == event->id->context) {
|
||||
@ -615,7 +707,11 @@ static int rdmacm_connection_shutdown(struct mca_btl_base_endpoint_t *endpoint)
|
||||
if (NULL != cli->item->id[i] &&
|
||||
NULL != cli->item->id[i]->qp &&
|
||||
NULL != cli->item->endpoint->qps) {
|
||||
rdma_disconnect(cli->item->id[i]);
|
||||
opal_output(0, "Freeing rdmacm id %p",
|
||||
cli->item->id[i]);
|
||||
rdma_disconnect(cli->item->id[i]);
|
||||
/* JMS shouldn't be necessary */
|
||||
cli->item->id[i] = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -626,11 +722,13 @@ static int rdmacm_connection_shutdown(struct mca_btl_base_endpoint_t *endpoint)
|
||||
/*
|
||||
* Callback (from main thread) when the endpoint has been connected
|
||||
*/
|
||||
static void *local_endpoint_connected(void *context)
|
||||
static void *local_endpoint_cpc_complete(void *context)
|
||||
{
|
||||
mca_btl_openib_endpoint_t *endpoint = (mca_btl_openib_endpoint_t *)context;
|
||||
|
||||
mca_btl_openib_endpoint_connected(endpoint);
|
||||
OPAL_OUTPUT((0, "local_endpoint_cpc_complete to %s",
|
||||
endpoint->endpoint_proc->proc_ompi->proc_hostname));
|
||||
mca_btl_openib_endpoint_cpc_complete(endpoint);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
@ -641,9 +739,11 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even
|
||||
mca_btl_openib_endpoint_t *endpoint;
|
||||
message_t *message;
|
||||
|
||||
if (local->server)
|
||||
if (local->server) {
|
||||
endpoint = ((id_contexts_t *)event->id->context)->endpoint;
|
||||
else {
|
||||
OPAL_OUTPUT((0, "Server CPC complete to %s",
|
||||
endpoint->endpoint_proc->proc_ompi->proc_hostname));
|
||||
} else {
|
||||
list_item_t *li;
|
||||
uint32_t rem_index;
|
||||
|
||||
@ -659,6 +759,8 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even
|
||||
}
|
||||
li->item = local;
|
||||
opal_list_append(&client_list, &(li->super));
|
||||
OPAL_OUTPUT((0, "Client CPC complete to %s",
|
||||
endpoint->endpoint_proc->proc_ompi->proc_hostname));
|
||||
}
|
||||
if (NULL == endpoint) {
|
||||
BTL_ERROR(("Can't find endpoint"));
|
||||
@ -666,12 +768,17 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even
|
||||
}
|
||||
data = (rdmacm_endpoint_local_cpc_data_t *)endpoint->endpoint_local_cpc_data;
|
||||
|
||||
/* Only notify the upper layers after the last QO has been connected */
|
||||
/* Only notify the upper layers after the last QP has been connected */
|
||||
if (++data->rdmacm_counter < mca_btl_openib_component.num_qps) {
|
||||
BTL_VERBOSE(("%s count == %d", local->server?"server":"client", data->rdmacm_counter));
|
||||
return 0;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT((0, "in connect_endpoint; ep=0x%x (0x%x), I still %s the initiator to %s",
|
||||
endpoint,
|
||||
endpoint->endpoint_local_cpc,
|
||||
endpoint->endpoint_initiator ? "am" : "am NOT",
|
||||
endpoint->endpoint_proc->proc_ompi->proc_hostname));
|
||||
message = endpoint->endpoint_remote_cpc_data->cbm_modex_message;
|
||||
BTL_VERBOSE(("%s connected!!! local %x remote %x state = %d",
|
||||
local->server?"server":"client",
|
||||
@ -679,19 +786,19 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even
|
||||
message->ipaddr,
|
||||
endpoint->endpoint_state));
|
||||
|
||||
ompi_btl_openib_fd_schedule(local_endpoint_connected, endpoint);
|
||||
ompi_btl_openib_fd_run_in_main(local_endpoint_cpc_complete, endpoint);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int start_connect(rdmacm_contents_t *local, int num)
|
||||
static int resolve_route(rdmacm_contents_t *local, int num)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* Resolve the route to the remote system. Onced established, the
|
||||
/* Resolve the route to the remote system. Once established, the
|
||||
* local system will get a RDMA_CM_EVENT_ROUTE_RESOLVED event.
|
||||
*/
|
||||
rc = rdma_resolve_route(local->id[num], RDMA_RESOLVE_ADDR_TIMEOUT);
|
||||
rc = rdma_resolve_route(local->id[num], RDMACM_RESOLVE_ADDR_TIMEOUT);
|
||||
if (0 != rc) {
|
||||
BTL_ERROR(("Failed to resolve the route with %d", rc));
|
||||
goto out;
|
||||
@ -722,7 +829,6 @@ static int create_dummy_qp(rdmacm_contents_t *local, struct rdma_cm_id *id, int
|
||||
struct ibv_qp_init_attr attr;
|
||||
struct ibv_qp *qp;
|
||||
|
||||
/* create the qp via rdma_create_qp() */
|
||||
memset(&attr, 0, sizeof(attr));
|
||||
attr.qp_type = IBV_QPT_RC;
|
||||
attr.send_cq = local->dummy_cq;
|
||||
@ -750,34 +856,44 @@ out:
|
||||
static int finish_connect(rdmacm_contents_t *local, int num)
|
||||
{
|
||||
struct rdma_conn_param conn_param;
|
||||
struct sockaddr *peeraddr, *localaddr;
|
||||
uint32_t localipaddr, remoteipaddr;
|
||||
uint16_t remoteport;
|
||||
conn_message_t msg;
|
||||
int rc;
|
||||
|
||||
struct sockaddr *peeraddr, *localaddr;
|
||||
uint32_t localipaddr, remoteipaddr;
|
||||
uint16_t remoteport;
|
||||
|
||||
remoteport = rdma_get_dst_port(local->id[num]);
|
||||
localaddr = rdma_get_local_addr(local->id[num]);
|
||||
peeraddr = rdma_get_peer_addr(local->id[num]);
|
||||
localipaddr = ((struct sockaddr_in *)localaddr)->sin_addr.s_addr;
|
||||
remoteipaddr = ((struct sockaddr_in *)peeraddr)->sin_addr.s_addr;
|
||||
|
||||
if ((localipaddr == remoteipaddr && local->tcp_port <= remoteport) ||
|
||||
localipaddr > remoteipaddr) {
|
||||
localaddr = rdma_get_local_addr(local->id[num]);
|
||||
localipaddr = ((struct sockaddr_in *)localaddr)->sin_addr.s_addr;
|
||||
|
||||
/* If we're the initiator, then setup the QP's and post the CTS
|
||||
message buffer */
|
||||
if (local->endpoint->endpoint_initiator) {
|
||||
rc = rdmacm_setup_qp(local, local->endpoint, local->id[num], num);
|
||||
if (0 != rc) {
|
||||
BTL_ERROR(("rdmacm_setup_qp error %d", rc));
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (BTL_OPENIB_QP_TYPE_PP(num)) {
|
||||
rc = mca_btl_openib_endpoint_post_rr(local->endpoint, num);
|
||||
} else {
|
||||
rc = mca_btl_openib_post_srr(local->endpoint->endpoint_btl, num);
|
||||
}
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
BTL_ERROR(("mca_btl_openib_endpoint_post_rr_nolock error %d", rc));
|
||||
goto out1;
|
||||
if (mca_btl_openib_component.credits_qp == num) {
|
||||
/* Post a single receive buffer on the smallest QP for the CTS
|
||||
protocol */
|
||||
|
||||
struct ibv_recv_wr *bad_wr, *wr;
|
||||
assert(NULL != local->endpoint->endpoint_cts_frag);
|
||||
wr = &to_recv_frag(local->endpoint->endpoint_cts_frag)->rd_desc;
|
||||
assert(NULL != wr);
|
||||
wr->next = NULL;
|
||||
|
||||
if (0 != ibv_post_recv(local->endpoint->qps[num].qp->lcl_qp,
|
||||
wr, &bad_wr)) {
|
||||
BTL_ERROR(("failed to post CTS recv buffer"));
|
||||
goto out1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* If we are establishing a connection in the "wrong" direction,
|
||||
@ -811,12 +927,15 @@ static int finish_connect(rdmacm_contents_t *local, int num)
|
||||
msg.rem_index = local->endpoint->index;
|
||||
msg.rem_port = local->tcp_port;
|
||||
|
||||
BTL_VERBOSE(("Connecting from %x, port %d to %x", localipaddr, msg.rem_port, remoteipaddr));
|
||||
|
||||
/* Now all of the local setup has been done. The remote system
|
||||
* should now get a RDMA_CM_EVENT_CONNECT_REQUEST event to further
|
||||
* the setup of the QP.
|
||||
*/
|
||||
OPAL_OUTPUT((0, "in finish_connect; ep=0x%x (0x%x), I still %s the initiator to %s",
|
||||
local->endpoint,
|
||||
local->endpoint->endpoint_local_cpc,
|
||||
local->endpoint->endpoint_initiator ? "am" : "am NOT",
|
||||
local->endpoint->endpoint_proc->proc_ompi->proc_hostname));
|
||||
rc = rdma_connect(local->id[num], &conn_param);
|
||||
if (0 != rc) {
|
||||
BTL_ERROR(("rdma_connect Failed with %d", rc));
|
||||
@ -833,7 +952,7 @@ out:
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int rdma_event_handler(struct rdma_cm_event *event)
|
||||
static int event_handler(struct rdma_cm_event *event)
|
||||
{
|
||||
rdmacm_contents_t *local;
|
||||
struct sockaddr *peeraddr, *localaddr;
|
||||
@ -850,7 +969,7 @@ static int rdma_event_handler(struct rdma_cm_event *event)
|
||||
localipaddr = ((struct sockaddr_in *)localaddr)->sin_addr.s_addr;
|
||||
peeripaddr = ((struct sockaddr_in *)peeraddr)->sin_addr.s_addr;
|
||||
|
||||
BTL_VERBOSE(("%s rdma_event_handler -- %s, status = %d to %x",
|
||||
BTL_VERBOSE(("%s event_handler -- %s, status = %d to %x",
|
||||
local->server?"server":"client",
|
||||
rdma_event_str(event->event),
|
||||
event->status,
|
||||
@ -858,19 +977,23 @@ static int rdma_event_handler(struct rdma_cm_event *event)
|
||||
|
||||
switch (event->event) {
|
||||
case RDMA_CM_EVENT_ADDR_RESOLVED:
|
||||
rc = start_connect(local, qpnum);
|
||||
OPAL_OUTPUT((0, "Address resolved: ID 0x%x", local->id[qpnum]));
|
||||
rc = resolve_route(local, qpnum);
|
||||
break;
|
||||
|
||||
case RDMA_CM_EVENT_ROUTE_RESOLVED:
|
||||
OPAL_OUTPUT((0, "Route resolved: ID 0x%x", local->id[qpnum]));
|
||||
local->ipaddr = localipaddr;
|
||||
rc = finish_connect(local, qpnum);
|
||||
break;
|
||||
|
||||
case RDMA_CM_EVENT_CONNECT_REQUEST:
|
||||
OPAL_OUTPUT((0, "Incoming connect request: 0x%x", local->id[qpnum]));
|
||||
rc = handle_connect_request(local, event);
|
||||
break;
|
||||
|
||||
case RDMA_CM_EVENT_ESTABLISHED:
|
||||
OPAL_OUTPUT((0, "Connection established: 0x%x", local->id[qpnum]));
|
||||
rc = rdmacm_connect_endpoint(local, event);
|
||||
break;
|
||||
|
||||
@ -880,14 +1003,16 @@ static int rdma_event_handler(struct rdma_cm_event *event)
|
||||
break;
|
||||
|
||||
case RDMA_CM_EVENT_REJECTED:
|
||||
if ((NULL != event->param.conn.private_data) && (1 == *((int *)event->param.conn.private_data))) {
|
||||
if ((NULL != event->param.conn.private_data) &&
|
||||
(1 == *((int *)event->param.conn.private_data))) {
|
||||
BTL_VERBOSE(("A good reject! for qp %d", qpnum));
|
||||
if (NULL != local->id[qpnum]->qp) {
|
||||
ibv_destroy_qp(local->id[qpnum]->qp);
|
||||
local->id[qpnum]->qp = NULL;
|
||||
}
|
||||
if (NULL != local->dummy_cq)
|
||||
if (NULL != local->dummy_cq) {
|
||||
ibv_destroy_cq(local->dummy_cq);
|
||||
}
|
||||
rdmacm_cleanup(local, local->id[qpnum], qpnum);
|
||||
rc = 0;
|
||||
}
|
||||
@ -914,8 +1039,8 @@ static inline void rdmamcm_event_error(struct rdma_cm_event *event)
|
||||
endpoint = ((id_contexts_t *)event->id->context)->local->endpoint;
|
||||
}
|
||||
|
||||
ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error,
|
||||
endpoint);
|
||||
ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error,
|
||||
endpoint);
|
||||
}
|
||||
|
||||
static void *rdmacm_event_dispatch(int fd, int flags, void *context)
|
||||
@ -952,9 +1077,9 @@ static void *rdmacm_event_dispatch(int fd, int flags, void *context)
|
||||
}
|
||||
rdma_ack_cm_event(event);
|
||||
|
||||
rc = rdma_event_handler(&ecopy);
|
||||
rc = event_handler(&ecopy);
|
||||
if (0 != rc) {
|
||||
BTL_ERROR(("Error rdma_event_handler -- %s, status = %d",
|
||||
BTL_ERROR(("Error event_handler -- %s, status = %d",
|
||||
rdma_event_str(ecopy.event),
|
||||
ecopy.status));
|
||||
|
||||
@ -981,13 +1106,11 @@ static int rdmacm_init(mca_btl_openib_endpoint_t *endpoint)
|
||||
data = calloc(1, sizeof(rdmacm_endpoint_local_cpc_data_t));
|
||||
if (NULL == data) {
|
||||
BTL_ERROR(("malloc failed"));
|
||||
goto out;
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
endpoint->endpoint_local_cpc_data = data;
|
||||
|
||||
return 0;
|
||||
out:
|
||||
return -1;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static int ipaddrcheck(rdmacm_contents_t *server,
|
||||
@ -1068,8 +1191,7 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, ompi_btl_
|
||||
|
||||
/* RDMACM is not supported if we have any XRC QPs */
|
||||
if (mca_btl_openib_component.num_xrc_qps > 0) {
|
||||
opal_output_verbose(5, mca_btl_base_output,
|
||||
"openib BTL: rdmacm CPC not supported with XRC receive queues, please try xoob CPC; skipped");
|
||||
BTL_VERBOSE(("rdmacm CPC not supported with XRC receive queues, please try xoob CPC; skipped"));
|
||||
rc = OMPI_ERR_NOT_SUPPORTED;
|
||||
goto out;
|
||||
}
|
||||
@ -1088,6 +1210,9 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, ompi_btl_
|
||||
(*cpc)->cbm_start_connect = rdmacm_module_start_connect;
|
||||
(*cpc)->cbm_endpoint_finalize = rdmacm_connection_shutdown;
|
||||
(*cpc)->cbm_finalize = NULL;
|
||||
/* Setting uses_cts=true also guarantees that we'll only be
|
||||
selected if QP 0 is PP */
|
||||
(*cpc)->cbm_uses_cts = true;
|
||||
|
||||
server = malloc(sizeof(rdmacm_contents_t));
|
||||
if (NULL == server) {
|
||||
|
@ -48,7 +48,7 @@ ompi_btl_openib_connect_base_component_t ompi_btl_openib_connect_xoob = {
|
||||
/* Query */
|
||||
xoob_component_query,
|
||||
/* Finalize */
|
||||
xoob_component_finalize
|
||||
xoob_component_finalize,
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
@ -892,7 +892,7 @@ static void xoob_rml_recv_cb(int status, orte_process_name_t* process_name,
|
||||
mca_btl_openib_endpoint_invoke_error(NULL);
|
||||
return;
|
||||
}
|
||||
mca_btl_openib_endpoint_connected(ib_endpoint);
|
||||
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
|
||||
OPAL_THREAD_UNLOCK(&ib_endpoint->endpoint_lock);
|
||||
break;
|
||||
case ENDPOINT_XOOB_CONNECT_XRC_RESPONSE:
|
||||
@ -911,7 +911,7 @@ static void xoob_rml_recv_cb(int status, orte_process_name_t* process_name,
|
||||
OPAL_THREAD_LOCK(&ib_endpoint->endpoint_lock);
|
||||
/* we got srq numbers on our request */
|
||||
XOOB_SET_REMOTE_INFO(ib_endpoint->rem_info, rem_info);
|
||||
mca_btl_openib_endpoint_connected(ib_endpoint);
|
||||
mca_btl_openib_endpoint_cpc_complete(ib_endpoint);
|
||||
OPAL_THREAD_UNLOCK(&ib_endpoint->endpoint_lock);
|
||||
break;
|
||||
case ENDPOINT_XOOB_CONNECT_XRC_NR_RESPONSE:
|
||||
@ -989,6 +989,7 @@ static int xoob_component_query(mca_btl_openib_module_t *openib_btl,
|
||||
(*cpc)->cbm_start_connect = xoob_module_start_connect;
|
||||
(*cpc)->cbm_endpoint_finalize = NULL;
|
||||
(*cpc)->cbm_finalize = NULL;
|
||||
(*cpc)->cbm_uses_cts = false;
|
||||
|
||||
opal_output_verbose(5, mca_btl_base_output,
|
||||
"openib BTL: xoob CPC available for use on %s",
|
||||
|
@ -128,19 +128,29 @@
|
||||
* used. There is only this function, which tells when a specific
|
||||
* CPC/BTL module is no longer being used.
|
||||
*
|
||||
* There are two functions in the main openib BTL that the CPC will
|
||||
* cbm_uses_cts: a bool that indicates whether the CPC will use the
|
||||
* CTS protocol or not.
|
||||
* - if true: the CPC will post the fragment on
|
||||
* endpoint->endpoint_cts_frag as a receive buffer and will *not*
|
||||
* call ompi_btl_openib_post_recvs().
|
||||
* - if false: the CPC will call ompi_btl_openib_post_recvs() before
|
||||
* calling ompi_btl_openib_cpc_complete().
|
||||
*
|
||||
* There are two functions in the main openib BTL that the CPC may
|
||||
* call:
|
||||
*
|
||||
* - ompi_btl_openib_post_recvs(endpoint): once a QP is locally
|
||||
* connected to the remote side (but we don't know if the remote side
|
||||
* is connected to us yet), this function is invoked to post buffers
|
||||
* on the QP, setup credits for the endpoint, etc.
|
||||
* on the QP, setup credits for the endpoint, etc. This function is
|
||||
* *only* invoked if the CPC's cbm_uses_cts is false.
|
||||
*
|
||||
* - ompi_btl_openib_connected(endpoint): once we know that a QP is
|
||||
* connected on *both* sides, this function is invoked to tell the
|
||||
* main openib BTL "ok, you can use this connection now." (e.g., the
|
||||
* main openib BTL will start sending out fragments that were queued
|
||||
* while the connection was establing, etc.).
|
||||
* - ompi_btl_openib_cpc_complete(endpoint): once that a CPC knows
|
||||
* that a QP is connected on *both* sides, this function is invoked to
|
||||
* tell the main openib BTL "ok, you can use this connection now."
|
||||
* (e.g., the main openib BTL will either invoke the CTS protocol or
|
||||
* start sending out fragments that were queued while the connection
|
||||
* was establishing, etc.).
|
||||
*/
|
||||
#ifndef BTL_OPENIB_CONNECT_H
|
||||
#define BTL_OPENIB_CONNECT_H
|
||||
@ -330,6 +340,14 @@ typedef struct ompi_btl_openib_connect_base_module_t {
|
||||
|
||||
/** Finalize the cpc module */
|
||||
ompi_btl_openib_connect_base_module_finalize_fn_t cbm_finalize;
|
||||
|
||||
/** Whether this module will use the CTS protocol or not. This
|
||||
directly states whether this module will call
|
||||
mca_btl_openib_endpoint_post_recvs() or not: true = this
|
||||
module will *not* call _post_recvs() and instead will post the
|
||||
receive buffer provided at endpoint->endpoint_cts_frag on qp
|
||||
0. */
|
||||
bool cbm_uses_cts;
|
||||
} ompi_btl_openib_connect_base_module_t;
|
||||
|
||||
END_C_DECLS
|
||||
|
@ -543,3 +543,14 @@ support. Short eager RDMA is not yet supported with progress threads;
|
||||
its use has been disabled in this job.
|
||||
|
||||
This is a warning only; you job will attempt to continue.
|
||||
#
|
||||
[cannot raise btl error]
|
||||
The OpenFabrics driver in Open MPI tried to raise a fatal error, but
|
||||
failed. Hopefully there was an error message before this one that
|
||||
gave some more detailed information.
|
||||
|
||||
Local host: %s
|
||||
Source file: %s
|
||||
Source line: %d
|
||||
|
||||
Your job is now going to abort, sorry.
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user