diff --git a/config/ompi_config_threads.m4 b/config/ompi_config_threads.m4 index 94a99adc47..ea190f8b50 100644 --- a/config/ompi_config_threads.m4 +++ b/config/ompi_config_threads.m4 @@ -162,12 +162,11 @@ else ************************************************************************ -Open MPI was unable to find threading support on your system. In the -near future, the OMPI development team is considering requiring -threading support for proper OMPI execution. This is in part because -we are not aware of any users that do not have thread support - so we -need you to e-mail us at ompi@ompi-mpi.org and let us know about this -problem. +Open MPI was unable to find threading support on your system. The +OMPI development team is considering requiring threading support for +proper OMPI execution. This is in part because we are not aware of +any users that do not have thread support - so we need you to e-mail +us at ompi@ompi-mpi.org and let us know about this problem. ************************************************************************ diff --git a/ompi/mca/btl/openib/btl_openib.c b/ompi/mca/btl/openib/btl_openib.c index 73cce39e4e..f582342831 100644 --- a/ompi/mca/btl/openib/btl_openib.c +++ b/ompi/mca/btl/openib/btl_openib.c @@ -967,15 +967,6 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl) OBJ_RELEASE(endpoint); } - /* Finalize the CPC modules on this openib module */ - for (i = 0; i < openib_btl->num_cpcs; ++i) { - if (NULL != openib_btl->cpcs[i]->cbm_finalize) { - openib_btl->cpcs[i]->cbm_finalize(openib_btl, openib_btl->cpcs[i]); - } - free(openib_btl->cpcs[i]); - } - free(openib_btl->cpcs); - /* Release SRQ resources */ for(qp = 0; qp < mca_btl_openib_component.num_qps; qp++) { if(!BTL_OPENIB_QP_TYPE_PP(qp)) { @@ -992,6 +983,15 @@ int mca_btl_openib_finalize(struct mca_btl_base_module_t* btl) } } + /* Finalize the CPC modules on this openib module */ + for (i = 0; i < openib_btl->num_cpcs; ++i) { + if (NULL != openib_btl->cpcs[i]->cbm_finalize) { + openib_btl->cpcs[i]->cbm_finalize(openib_btl, openib_btl->cpcs[i]); + } + free(openib_btl->cpcs[i]); + } + free(openib_btl->cpcs); + /* Release device if there are no more users */ if(!(--openib_btl->device->btls)) { OBJ_RELEASE(openib_btl->device); diff --git a/ompi/mca/btl/openib/btl_openib_component.c b/ompi/mca/btl/openib/btl_openib_component.c index 01e6e35b7b..b06509d8c4 100644 --- a/ompi/mca/btl/openib/btl_openib_component.c +++ b/ompi/mca/btl/openib/btl_openib_component.c @@ -440,6 +440,28 @@ static void btl_openib_control(mca_btl_base_module_t* btl, (((unsigned char*)clsc_hdr) + skip); } break; + case MCA_BTL_OPENIB_CONTROL_CTS: + OPAL_OUTPUT((0, "received CTS control message: posted recvs %d, sent cts %d", + ep->endpoint_posted_recvs, ep->endpoint_cts_sent)); + ep->endpoint_cts_received = true; + + /* Only send the CTS back and mark connected if: + - we have posted our receives (it's possible that we can + get this CTS before this side's CPC has called + cpc_complete()) + - we have not yet sent our CTS + + We don't even want to mark the endpoint connected() until + we have posted our receives because otherwise we will + trigger credit management (because the rd_credits will + still be negative), and Bad Things will happen. */ + if (ep->endpoint_posted_recvs) { + if (!ep->endpoint_cts_sent) { + mca_btl_openib_endpoint_send_cts(ep); + } + mca_btl_openib_endpoint_connected(ep); + } + break; default: BTL_ERROR(("Unknown message type received by BTL")); break; diff --git a/ompi/mca/btl/openib/btl_openib_endpoint.c b/ompi/mca/btl/openib/btl_openib_endpoint.c index d1db30dca8..57c558306b 100644 --- a/ompi/mca/btl/openib/btl_openib_endpoint.c +++ b/ompi/mca/btl/openib/btl_openib_endpoint.c @@ -390,6 +390,17 @@ void mca_btl_openib_endpoint_init(mca_btl_openib_module_t *btl, for (qp = 0; qp < mca_btl_openib_component.num_qps; qp++) { endpoint_init_qp(ep, qp); } + + /* If the CPC uses the CTS protocol, provide a frag buffer for the + CPC to post */ + if (ep->endpoint_local_cpc->cbm_uses_cts) { + int rc; + ompi_free_list_item_t* item; + OMPI_FREE_LIST_WAIT(&btl->device->qps[0].recv_free, item, rc); + to_base_frag(item)->base.order = 0; + to_com_frag(item)->endpoint = ep; + ep->endpoint_cts_frag = item; + } } static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint) @@ -417,6 +428,7 @@ static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint) endpoint->endpoint_proc = 0; endpoint->endpoint_local_cpc = NULL; endpoint->endpoint_remote_cpc_data = NULL; + endpoint->endpoint_initiator = false; endpoint->endpoint_tstamp = 0.0; endpoint->endpoint_state = MCA_BTL_IB_CLOSED; endpoint->endpoint_retries = 0; @@ -442,6 +454,10 @@ static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint) endpoint->use_eager_rdma = false; endpoint->eager_rdma_remote.tokens = 0; endpoint->eager_rdma_local.credits = 0; + endpoint->endpoint_cts_frag = NULL; + endpoint->endpoint_posted_recvs = false; + endpoint->endpoint_cts_received = false; + endpoint->endpoint_cts_sent = false; } /* @@ -459,6 +475,12 @@ static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint) endpoint->endpoint_local_cpc->cbm_endpoint_finalize(endpoint); } + /* Release CTS buffer */ + if (NULL != endpoint->endpoint_cts_frag) { + MCA_BTL_IB_FRAG_RETURN(endpoint->endpoint_cts_frag); + endpoint->endpoint_cts_frag = NULL; + } + /* Release memory resources */ do { /* Make sure that mca_btl_openib_endpoint_connect_eager_rdma () @@ -531,8 +553,8 @@ static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint) /* - * call when the connect module has created all the qp's on an - * endpoint and needs to have some receive buffers posted + * Called when the connect module has created all the qp's on an + * endpoint and needs to have some receive buffers posted. */ int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t *endpoint) { @@ -549,6 +571,120 @@ int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t *endpoint) return OMPI_SUCCESS; } +static void cts_sent(mca_btl_base_module_t* btl, + struct mca_btl_base_endpoint_t* ep, + struct mca_btl_base_descriptor_t* des, + int status) +{ + /* Nothing to do/empty function (we can't pass in a NULL pointer + for the des_cbfunc) */ + opal_output(-1, "CTS send completed"); +} + +/* + * Send CTS control fragment + */ +void mca_btl_openib_endpoint_send_cts(mca_btl_openib_endpoint_t *endpoint) +{ + mca_btl_openib_send_control_frag_t *sc_frag; + mca_btl_base_descriptor_t *base_des; + mca_btl_openib_frag_t *openib_frag; + mca_btl_openib_com_frag_t *com_frag; + mca_btl_openib_control_header_t *ctl_hdr; + + OPAL_OUTPUT((0, "SENDING CTS to %s", + endpoint->endpoint_proc->proc_ompi->proc_hostname)); + sc_frag = alloc_control_frag(endpoint->endpoint_btl); + if (OPAL_UNLIKELY(NULL == sc_frag)) { + BTL_ERROR(("Failed to allocate control buffer")); + mca_btl_openib_endpoint_invoke_error(endpoint); + return; + } + + /* I dislike using the "to_()" macros; I prefer using the + explicit member fields to ensure I get the types right. Since + this is not a performance-criticial part of the code, it's + ok. */ + com_frag = &(sc_frag->super.super); + openib_frag = &(com_frag->super); + base_des = &(openib_frag->base); + + base_des->des_cbfunc = cts_sent; + base_des->des_cbdata = NULL; + base_des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; + base_des->order = mca_btl_openib_component.credits_qp; + openib_frag->segment.seg_len = sizeof(mca_btl_openib_control_header_t); + com_frag->endpoint = endpoint; + + sc_frag->hdr->tag = MCA_BTL_TAG_BTL; + sc_frag->hdr->cm_seen = 0; + sc_frag->hdr->credits = 0; + + ctl_hdr = (mca_btl_openib_control_header_t*) + openib_frag->segment.seg_addr.pval; + ctl_hdr->type = MCA_BTL_OPENIB_CONTROL_CTS; + + /* Send the fragment */ + OPAL_THREAD_LOCK(&endpoint->endpoint_lock); + if (OMPI_SUCCESS != mca_btl_openib_endpoint_post_send(endpoint, sc_frag)) { + BTL_ERROR(("Failed to post CTS send")); + mca_btl_openib_endpoint_invoke_error(endpoint); + } + endpoint->endpoint_cts_sent = true; + /* We've sent the frag, so it'll be reclaimed in the normal + fashion -- don't let endpoint_destruct() FRAG_RETURN this + frag */ + endpoint->endpoint_cts_frag = NULL; + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); +} + +/* + * Called when the CPC has established a connection on an endpoint + */ +void mca_btl_openib_endpoint_cpc_complete(mca_btl_openib_endpoint_t *endpoint) +{ + /* If the CPC uses the CTS protocol, then start it up */ + if (NULL != endpoint->endpoint_cts_frag) { + /* Post our receives, which will make credit management happy + (i.e., rd_credits will be 0) */ + if (OMPI_SUCCESS != mca_btl_openib_endpoint_post_recvs(endpoint)) { + BTL_ERROR(("Failed to post receive buffers")); + mca_btl_openib_endpoint_invoke_error(endpoint); + return; + } + endpoint->endpoint_posted_recvs = true; + + /* If this is IB, send the CTS immediately. If this is iWARP, + then only send the CTS if this endpoint was the initiator + of the connection (the receiver will send its CTS when it + receives this side's CTS). Also send the CTS if we already + received the peer's CTS (e.g., if this process was slow to + call cpc_complete() and we had already received the other + side's CTS). */ + OPAL_OUTPUT((0, "cpc_complete: is IB %d, initiatior %d, cts received: %d", + (IBV_TRANSPORT_IB == + endpoint->endpoint_btl->device->ib_dev->transport_type), + endpoint->endpoint_initiator, + endpoint->endpoint_cts_received)); + if (IBV_TRANSPORT_IB == + endpoint->endpoint_btl->device->ib_dev->transport_type || + endpoint->endpoint_initiator || + endpoint->endpoint_cts_received) { + mca_btl_openib_endpoint_send_cts(endpoint); + + /* If we've already got the CTS from the other side, then + mark us as connected */ + if (endpoint->endpoint_cts_received) { + mca_btl_openib_endpoint_connected(endpoint); + } + } + + return; + } + + /* Otherwise, just set the endpoint to "connected" */ + mca_btl_openib_endpoint_connected(endpoint); +} /* * called when the connect module has completed setup of an endpoint @@ -560,6 +696,7 @@ void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t *endpoint) mca_btl_openib_endpoint_t *ep; bool master = false; + opal_output(-1, "Now we are CONNECTED"); if (MCA_BTL_XRC_ENABLED) { OPAL_THREAD_LOCK(&endpoint->ib_addr->addr_lock); if (MCA_BTL_IB_ADDR_CONNECTED == endpoint->ib_addr->status) { @@ -692,7 +829,7 @@ void mca_btl_openib_endpoint_send_credits(mca_btl_openib_endpoint_t* endpoint, frag = endpoint->qps[qp].credit_frag; if(OPAL_UNLIKELY(NULL == frag)) { - frag = alloc_credit_frag(openib_btl); + frag = alloc_control_frag(openib_btl); frag->qp_idx = qp; endpoint->qps[qp].credit_frag = frag; /* set those once and forever */ @@ -781,7 +918,7 @@ static int mca_btl_openib_endpoint_send_eager_rdma( mca_btl_openib_send_control_frag_t* frag; int rc; - frag = alloc_credit_frag(openib_btl); + frag = alloc_control_frag(openib_btl); if(NULL == frag) { return -1; } @@ -938,7 +1075,8 @@ void *mca_btl_openib_endpoint_invoke_error(void *context) if (NULL == endpoint) { int i; for (i = 0; i < mca_btl_openib_component.ib_num_btls; ++i) { - if (NULL != mca_btl_openib_component.openib_btls[i]) { + if (NULL != mca_btl_openib_component.openib_btls[i] && + NULL != mca_btl_openib_component.openib_btls[i]->error_cb) { btl = mca_btl_openib_component.openib_btls[i]; break; } @@ -948,9 +1086,11 @@ void *mca_btl_openib_endpoint_invoke_error(void *context) } /* If we didn't find a BTL, then just bail :-( */ - if (NULL == btl) { + if (NULL == btl || NULL == btl->error_cb) { orte_show_help("help-mpi-btl-openib.txt", - "cannot raise btl error", orte_process_info.nodename); + "cannot raise btl error", true, + orte_process_info.nodename, + __FILE__, __LINE__); exit(1); } diff --git a/ompi/mca/btl/openib/btl_openib_endpoint.h b/ompi/mca/btl/openib/btl_openib_endpoint.h index 1dd348a02f..af304336ad 100644 --- a/ompi/mca/btl/openib/btl_openib_endpoint.h +++ b/ompi/mca/btl/openib/btl_openib_endpoint.h @@ -167,7 +167,15 @@ struct mca_btl_base_endpoint_t { /** hook for local CPC to hang endpoint-specific data */ void *endpoint_local_cpc_data; - /** pointer to remote CPC's data (essentially its CPC modex message) */ + /** If endpoint_local_cpc->cbm_uses_cts is true and this endpoint + is iWARP, then endpoint_initiator must be true on the side + that actually initiates the QP, false on the other side. This + bool is used to know which way to send the first CTS + message. */ + bool endpoint_initiator; + + /** pointer to remote proc's CPC data (essentially its CPC modex + message) */ ompi_btl_openib_connect_base_module_data_t *endpoint_remote_cpc_data; /** current state of the connection */ @@ -220,6 +228,22 @@ struct mca_btl_base_endpoint_t { /** information about the remote port */ mca_btl_openib_rem_info_t rem_info; + + /** Frag for initial wireup CTS protocol; will be NULL if CPC + indicates that it does not want to use CTS */ + ompi_free_list_item_t *endpoint_cts_frag; + + /** Whether we've posted receives on this EP or not (only used in + CTS protocol) */ + bool endpoint_posted_recvs; + + /** Whether we've received the CTS from the peer or not (only used + in CTS protocol) */ + bool endpoint_cts_received; + + /** Whether we've send out CTS to the peer or not (only used in + CTS protocol) */ + bool endpoint_cts_sent; }; typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t; @@ -244,6 +268,8 @@ int mca_btl_openib_endpoint_post_send(mca_btl_openib_endpoint_t*, void mca_btl_openib_endpoint_send_credits(mca_btl_base_endpoint_t*, const int); void mca_btl_openib_endpoint_connect_eager_rdma(mca_btl_openib_endpoint_t*); int mca_btl_openib_endpoint_post_recvs(mca_btl_openib_endpoint_t*); +void mca_btl_openib_endpoint_send_cts(mca_btl_openib_endpoint_t *endpoint); +void mca_btl_openib_endpoint_cpc_complete(mca_btl_openib_endpoint_t*); void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t*); void mca_btl_openib_endpoint_init(mca_btl_openib_module_t*, mca_btl_base_endpoint_t*, @@ -286,7 +312,7 @@ static inline int post_recvs(mca_btl_base_endpoint_t *ep, const int qp, if (0 == rc) return OMPI_SUCCESS; - BTL_ERROR(("error %d posting receive on qp %d\n", rc, qp)); + BTL_ERROR(("error %d posting receive on qp %d", rc, qp)); return OMPI_ERROR; } diff --git a/ompi/mca/btl/openib/btl_openib_fd.c b/ompi/mca/btl/openib/btl_openib_fd.c index 588ccb60af..7f80129ae4 100644 --- a/ompi/mca/btl/openib/btl_openib_fd.c +++ b/ompi/mca/btl/openib/btl_openib_fd.c @@ -8,6 +8,23 @@ * $HEADER$ */ +/** + * Note: this file is a little fast-n-loose with OMPI_HAVE_THREADS -- + * it uses this value in run-time "if" conditionals (vs. compile-time + * #if conditionals). We also don't protect including . + * That's because this component currently only compiles on Linux and + * Solaris, and both of these OS's have pthreads. Using the run-time + * conditionals gives us bettern compile-time checking, even of code + * that isn't activated. + * + * Note, too, that the functionality in this file does *not* require + * all the heavyweight OMPI thread infrastructure (e.g., from + * --enable-mpi-threads or --enable-progress-threads). All work that + * is done in a separate progress thread is very carefully segregated + * from that of the main thread, and communication back to the main + * thread + */ + #include "ompi_config.h" #include @@ -24,6 +41,11 @@ #include "btl_openib_fd.h" +typedef union { + ompi_btl_openib_fd_event_callback_fn_t *event; + ompi_btl_openib_fd_main_callback_fn_t *main; +} callback_u_t; + /* * Data for each registered item */ @@ -33,10 +55,7 @@ typedef struct { opal_event_t ri_event; int ri_fd; int ri_flags; - union { - ompi_btl_openib_fd_callback_fn_t *fd; - ompi_btl_openib_schedule_callback_fn_t *schedule; - } ri_callback; + callback_u_t ri_callback; void *ri_context; } registered_item_t; @@ -46,9 +65,14 @@ static OBJ_CLASS_INSTANCE(registered_item_t, opal_list_item_t, NULL, NULL); * Command types */ typedef enum { + /* Read by service thread */ CMD_TIME_TO_QUIT, CMD_ADD_FD, CMD_REMOVE_FD, + ACK_RAN_FUNCTION, + + /* Read by main thread */ + CMD_CALL_FUNCTION, CMD_MAX } cmd_type_t; @@ -56,7 +80,7 @@ typedef enum { * Commands. Fields ordered to avoid memory holes (and valgrind warnings). */ typedef struct { - ompi_btl_openib_fd_callback_fn_t *pc_callback; + callback_u_t pc_fn; void *pc_context; int pc_fd; int pc_flags; @@ -64,6 +88,16 @@ typedef struct { char end; } cmd_t; +/* + * Queued up list of commands to send to the main thread + */ +typedef struct { + opal_list_item_t super; + cmd_t cli_cmd; +} cmd_list_item_t; + +static OBJ_CLASS_INSTANCE(cmd_list_item_t, opal_list_item_t, NULL, NULL); + static bool initialized = false; static int cmd_size = 0; static fd_set read_fds, write_fds; @@ -71,117 +105,17 @@ static int max_fd; static opal_list_t registered_items; /* These items are only used in the threaded version */ +/* Owned by the main thread */ static pthread_t thread; -static int pipe_fd[2] = { -1, -1 }; +static opal_event_t main_thread_event; +static int pipe_to_service_thread[2] = { -1, -1 }; +/* Owned by the service thread */ +static int pipe_to_main_thread[2] = { -1, -1 }; +static const size_t max_outstanding_to_main_thread = 32; +static size_t waiting_for_ack_from_main_thread = 0; +static opal_list_t pending_to_main_thread; -static void libevent_fd_callback(int fd, short event, void *context) -{ - registered_item_t *ri = (registered_item_t*) context; - ri->ri_callback.fd(fd, event, ri->ri_context); -} - - -static void libevent_event_callback(int fd, short event, void *context) -{ - registered_item_t *ri = (registered_item_t*) context; - ri->ri_callback.schedule(ri->ri_context); - /* JMS Can I free ri now? It contains the event... */ -#if 0 - OBJ_RELEASE(ri); -#endif -} - - -/* - * Add an fd to the listening set - */ -static int local_pipe_cmd_add_fd(bool use_libevent, cmd_t *cmd) -{ - registered_item_t *ri = OBJ_NEW(registered_item_t); - if (NULL == ri) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - ri->ri_event_used = false; - ri->ri_fd = cmd->pc_fd; - ri->ri_flags = cmd->pc_flags; - ri->ri_callback.fd = cmd->pc_callback; - ri->ri_context = cmd->pc_context; - - if (use_libevent) { - /* Make an event for this fd */ - ri->ri_event_used = true; - memset(&ri->ri_event, 0, sizeof(ri->ri_event)); - opal_event_set(&ri->ri_event, ri->ri_fd, - ri->ri_flags | OPAL_EV_PERSIST, libevent_fd_callback, - ri); - opal_event_add(&ri->ri_event, 0); - } else { - /* Add the fd to the relevant fd local sets and update max_fd */ - if (OPAL_EV_READ & ri->ri_flags) { - FD_SET(ri->ri_fd, &read_fds); - } - if (OPAL_EV_WRITE & cmd->pc_flags) { - FD_SET(ri->ri_fd, &write_fds); - } - max_fd = (max_fd > ri->ri_fd) ? max_fd : ri->ri_fd + 1; - } - - opal_list_append(®istered_items, &ri->super); - return OMPI_SUCCESS; -} - - -/* - * Remove an fd from the listening set - */ -static int local_pipe_cmd_remove_fd(cmd_t *cmd) -{ - int i; - opal_list_item_t *item; - registered_item_t *ri; - - /* Go through the list of registered fd's and find the fd to - remove */ - for (item = opal_list_get_first(®istered_items); - NULL != opal_list_get_end(®istered_items); - item = opal_list_get_next(item)) { - ri = (registered_item_t*) item; - if (cmd->pc_fd == ri->ri_fd) { - /* Found it. The item knows if it was used as a libevent - event or an entry in the local fd sets. */ - if (ri->ri_event_used) { - /* Remove this event from libevent */ - opal_event_del(&ri->ri_event); - } else { - /* Remove this item from the fd_sets and recalculate - max_fd */ - FD_CLR(cmd->pc_fd, &read_fds); - FD_CLR(cmd->pc_fd, &write_fds); - for (max_fd = i = pipe_fd[0]; i < FD_SETSIZE; ++i) { - if (FD_ISSET(i, &read_fds) || FD_ISSET(i, &write_fds)) { - max_fd = i + 1; - } - } - } - - /* Let the caller know that we have stopped monitoring - this fd (if they care) */ - if (NULL != cmd->pc_callback) { - cmd->pc_callback(cmd->pc_fd, 0, cmd->pc_context); - } - - /* Remove this item from the list of registered items and - release it */ - opal_list_remove_item(®istered_items, item); - OBJ_RELEASE(item); - return OMPI_SUCCESS; - } - } - - /* This shouldn't happen */ - return OMPI_ERR_NOT_FOUND; -} /* * Simple loop over reading from a fd @@ -225,6 +159,159 @@ static int write_fd(int fd, int len, void *buffer) return OMPI_ERROR; } } + + return OMPI_SUCCESS; +} + + +/* + * Write a command to the main thread, or queue it up if the pipe is full + */ +static int write_to_main_thread(cmd_t *cmd) +{ + /* Note that if we write too much to the main thread pipe and the + main thread doesn't check it often, we could fill up the pipe + and cause this thread to block. Bad! So we do some simple + counting here and ensure that we don't fill the pipe. If we + are in danger of that, then queue up the commands here in the + service thread. The main thread will ACK every CALL_FUNCTION + command, so we have a built-in mechanism to wake up the service + thread to drain any queued-up commands. */ + if (opal_list_get_size(&pending_to_main_thread) > 0 || + waiting_for_ack_from_main_thread >= max_outstanding_to_main_thread) { + cmd_list_item_t *cli = OBJ_NEW(cmd_list_item_t); + if (NULL == cli) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + memcpy(&cli->cli_cmd, cmd, cmd_size); + opal_list_append(&pending_to_main_thread, &(cli->super)); + } else { + OPAL_OUTPUT((-1, "fd: writing to main thread")); + write_fd(pipe_to_main_thread[1], cmd_size, cmd); + ++waiting_for_ack_from_main_thread; + } + + return OMPI_SUCCESS; +} + +static void service_fd_callback(int fd, short event, void *context) +{ + registered_item_t *ri = (registered_item_t*) context; + ri->ri_callback.event(fd, event, ri->ri_context); +} + + +/* + * Add an fd to the listening set + */ +static int service_pipe_cmd_add_fd(bool use_libevent, cmd_t *cmd) +{ + registered_item_t *ri = OBJ_NEW(registered_item_t); + if (NULL == ri) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + ri->ri_event_used = false; + ri->ri_fd = cmd->pc_fd; + ri->ri_flags = cmd->pc_flags; + ri->ri_callback.event = cmd->pc_fn.event; + ri->ri_context = cmd->pc_context; + + if (use_libevent) { + /* Make an event for this fd */ + ri->ri_event_used = true; + memset(&ri->ri_event, 0, sizeof(ri->ri_event)); + opal_event_set(&ri->ri_event, ri->ri_fd, + ri->ri_flags | OPAL_EV_PERSIST, service_fd_callback, + ri); + opal_event_add(&ri->ri_event, 0); + } else { + /* Add the fd to the relevant fd local sets and update max_fd */ + if (OPAL_EV_READ & ri->ri_flags) { + FD_SET(ri->ri_fd, &read_fds); + } + if (OPAL_EV_WRITE & cmd->pc_flags) { + FD_SET(ri->ri_fd, &write_fds); + } + max_fd = (max_fd > ri->ri_fd) ? max_fd : ri->ri_fd + 1; + } + + opal_list_append(®istered_items, &ri->super); + return OMPI_SUCCESS; +} + + +/* + * Remove an fd from the listening set + */ +static int service_pipe_cmd_remove_fd(cmd_t *cmd) +{ + int i; + opal_list_item_t *item; + registered_item_t *ri; + + OPAL_OUTPUT((-1, "service thread got unmonitor fd %d", cmd->pc_fd)); + /* Go through the list of registered fd's and find the fd to + remove */ + for (item = opal_list_get_first(®istered_items); + NULL != opal_list_get_end(®istered_items); + item = opal_list_get_next(item)) { + ri = (registered_item_t*) item; + if (cmd->pc_fd == ri->ri_fd) { + /* Found it. The item knows if it was used as a libevent + event or an entry in the local fd sets. */ + if (ri->ri_event_used) { + /* Remove this event from libevent */ + opal_event_del(&ri->ri_event); + } else { + /* Remove this item from the fd_sets and recalculate + MAX_FD */ + FD_CLR(cmd->pc_fd, &read_fds); + FD_CLR(cmd->pc_fd, &write_fds); + for (max_fd = i = pipe_to_service_thread[0]; i < FD_SETSIZE; ++i) { + if (FD_ISSET(i, &read_fds) || FD_ISSET(i, &write_fds)) { + max_fd = i + 1; + } + } + } + + /* Let the caller know that we have stopped monitoring + this fd (if they care) */ + if (NULL != cmd->pc_fn.event) { + cmd->pc_fn.event(cmd->pc_fd, 0, cmd->pc_context); + } + + /* Remove this item from the list of registered items and + release it */ + opal_list_remove_item(®istered_items, item); + OBJ_RELEASE(item); + return OMPI_SUCCESS; + } + } + + /* This shouldn't happen */ + return OMPI_ERR_NOT_FOUND; +} + + +/* + * Call a function and ACK that we ran it + */ +static int main_pipe_cmd_call_function(cmd_t *cmd) +{ + cmd_t local_cmd; + + OPAL_OUTPUT((-1, "fd main thread: calling function!")); + /* Call the function */ + if (NULL != cmd->pc_fn.main) { + cmd->pc_fn.main(cmd->pc_context); + } + + /* Now ACK that we ran the function */ + memset(&local_cmd, 0, cmd_size); + local_cmd.pc_cmd = ACK_RAN_FUNCTION; + write_fd(pipe_to_service_thread[1], cmd_size, &local_cmd); + + /* Done */ return OMPI_SUCCESS; } @@ -232,32 +319,52 @@ static int write_fd(int fd, int len, void *buffer) /* * Act on pipe commands */ -static bool local_pipe_cmd(void) +static bool service_pipe_cmd(void) { bool ret = false; cmd_t cmd; + cmd_list_item_t *cli; - read_fd(pipe_fd[0], cmd_size, &cmd); + read_fd(pipe_to_service_thread[0], cmd_size, &cmd); switch (cmd.pc_cmd) { case CMD_ADD_FD: - if (OMPI_SUCCESS != local_pipe_cmd_add_fd(false, &cmd)) { + OPAL_OUTPUT((-1, "fd service thread: CMD_ADD_FD")); + if (OMPI_SUCCESS != service_pipe_cmd_add_fd(false, &cmd)) { ret = true; } break; case CMD_REMOVE_FD: - if (OMPI_SUCCESS != local_pipe_cmd_remove_fd(&cmd)) { + OPAL_OUTPUT((-1, "fd service thread: CMD_REMOVE_FD")); + if (OMPI_SUCCESS != service_pipe_cmd_remove_fd(&cmd)) { ret = true; } break; case CMD_TIME_TO_QUIT: - opal_output(-1, "fd listener thread: time to quit"); + OPAL_OUTPUT((-1, "fd service thread: CMD_TIME_TO_QUIT")); ret = true; break; + case ACK_RAN_FUNCTION: + /* We don't have a guarantee that the main thread will check + its pipe frequently, so we do some simple counting to + ensure we just don't have too many outstanding commands to + the main thread at any given time. The main thread will + ACK every CALL_FUNCTION command, so this thread will always + wake up and continue to drain any queued up functions. */ + cli = (cmd_list_item_t*) opal_list_remove_first(&pending_to_main_thread); + if (NULL != cli) { + OPAL_OUTPUT((-1, "sending queued up cmd function to main thread")); + write_fd(pipe_to_main_thread[1], cmd_size, &(cli->cli_cmd)); + OBJ_RELEASE(cli); + } else { + --waiting_for_ack_from_main_thread; + } + break; + default: - opal_output(-1, "fd listener thread: unknown pipe command!"); + OPAL_OUTPUT((-1, "fd service thread: unknown pipe command!")); break; } @@ -268,7 +375,7 @@ static bool local_pipe_cmd(void) /* * Main thread logic */ -static void *thread_main(void *context) +static void *service_thread_start(void *context) { int rc, flags; fd_set read_fds_copy, write_fds_copy; @@ -278,29 +385,29 @@ static void *thread_main(void *context) /* Make an fd set that we can select() on */ FD_ZERO(&write_fds); FD_ZERO(&read_fds); - FD_SET(pipe_fd[0], &read_fds); - max_fd = pipe_fd[0] + 1; + FD_SET(pipe_to_service_thread[0], &read_fds); + max_fd = pipe_to_service_thread[0] + 1; - opal_output(-1, "fd listener thread running"); + OPAL_OUTPUT((-1, "fd service thread running")); /* Main loop waiting for commands over the fd's */ while (1) { memcpy(&read_fds_copy, &read_fds, sizeof(read_fds)); memcpy(&write_fds_copy, &write_fds, sizeof(write_fds)); - opal_output(-1, "fd listener thread blocking on select..."); + OPAL_OUTPUT((-1, "fd service thread blocking on select...")); rc = select(max_fd, &read_fds_copy, &write_fds_copy, NULL, NULL); if (0 != rc && EAGAIN == errno) { continue; } - opal_output(-1, "fd listener thread woke up!"); + OPAL_OUTPUT((-1, "fd service thread woke up!")); if (rc > 0) { - if (FD_ISSET(pipe_fd[0], &read_fds_copy)) { - opal_output(-1, "fd listener thread: pipe command"); - if (local_pipe_cmd()) { - opal_output(-1, "fd listener thread: exiting"); + if (FD_ISSET(pipe_to_service_thread[0], &read_fds_copy)) { + OPAL_OUTPUT((-1, "fd service thread: pipe command")); + if (service_pipe_cmd()) { break; } + OPAL_OUTPUT((-1, "fd service thread: back from pipe command")); } /* Go through all the registered events and see who had @@ -324,9 +431,10 @@ static void *thread_main(void *context) /* If either was ready, invoke the callback */ if (0 != flags) { - opal_output(-1, "fd listener thread: invoking callback for registered fd %d", ri->ri_fd); - ri->ri_callback.fd(ri->ri_fd, flags, - ri->ri_context); + OPAL_OUTPUT((-1, "fd service thread: invoking callback for registered fd %d", ri->ri_fd)); + ri->ri_callback.event(ri->ri_fd, flags, + ri->ri_context); + OPAL_OUTPUT((-1, "fd service thread: back from callback for registered fd %d", ri->ri_fd)); } } } @@ -334,12 +442,38 @@ static void *thread_main(void *context) } /* All done */ + OPAL_OUTPUT((-1, "fd service thread: exiting")); + opal_atomic_wmb(); return NULL; } +static void main_thread_event_callback(int fd, short event, void *context) +{ + cmd_t cmd; + + OPAL_OUTPUT((-1, "main thread -- reading command")); + read_fd(pipe_to_main_thread[0], cmd_size, &cmd); + switch (cmd.pc_cmd) { + case CMD_CALL_FUNCTION: + OPAL_OUTPUT((-1, "fd main thread: calling command")); + main_pipe_cmd_call_function(&cmd); + break; + + default: + OPAL_OUTPUT((-1, "fd main thread: unknown pipe command: %d", + cmd.pc_cmd)); + break; + } +} + +/****************************************************************** + * Main interface calls + ******************************************************************/ + /* * Initialize + * Called by main thread */ int ompi_btl_openib_fd_init(void) { @@ -348,19 +482,42 @@ int ompi_btl_openib_fd_init(void) OBJ_CONSTRUCT(®istered_items, opal_list_t); - if (OMPI_HAVE_THREAD_SUPPORT) { - /* Create a pipe to communicate with the thread */ - if (0 != pipe(pipe_fd)) { + /* Calculate the real size of the cmd struct */ + cmd_size = (int) (&(bogus.end) - ((char*) &bogus)); + + if (OMPI_HAVE_THREADS) { + OBJ_CONSTRUCT(&pending_to_main_thread, opal_list_t); + + /* Create pipes to communicate between the two threads */ + if (0 != pipe(pipe_to_service_thread)) { + return OMPI_ERR_IN_ERRNO; + } + if (0 != pipe(pipe_to_main_thread)) { return OMPI_ERR_IN_ERRNO; } - if (0 != pthread_create(&thread, NULL, thread_main, NULL)) { + /* Create a libevent event that is used in the main thread + to watch its pipe */ + memset(&main_thread_event, 0, sizeof(main_thread_event)); + opal_event_set(&main_thread_event, pipe_to_main_thread[0], + OPAL_EV_READ | OPAL_EV_PERSIST, + main_thread_event_callback, NULL); + opal_event_add(&main_thread_event, 0); + + /* Start the service thread */ + if (0 != pthread_create(&thread, NULL, service_thread_start, + NULL)) { + int errno_save = errno; + opal_event_del(&main_thread_event); + close(pipe_to_service_thread[0]); + close(pipe_to_service_thread[1]); + close(pipe_to_main_thread[0]); + close(pipe_to_main_thread[1]); + errno = errno_save; return OMPI_ERR_IN_ERRNO; } } - /* Calculate the real size of the cmd struct */ - cmd_size = (int) (&(bogus.end) - ((char*) &bogus)); initialized = true; } return OMPI_SUCCESS; @@ -369,9 +526,10 @@ int ompi_btl_openib_fd_init(void) /* * Start monitoring an fd + * Called by main or service thread; callback will be in service thread */ int ompi_btl_openib_fd_monitor(int fd, int flags, - ompi_btl_openib_fd_callback_fn_t *callback, + ompi_btl_openib_fd_event_callback_fn_t *callback, void *context) { cmd_t cmd; @@ -384,14 +542,15 @@ int ompi_btl_openib_fd_monitor(int fd, int flags, cmd.pc_cmd = CMD_ADD_FD; cmd.pc_fd = fd; cmd.pc_flags = flags; - cmd.pc_callback = callback; + cmd.pc_fn.event = callback; cmd.pc_context = context; - if (OMPI_HAVE_THREAD_SUPPORT) { + if (OMPI_HAVE_THREADS) { /* For the threaded version, write a command down the pipe */ - write_fd(pipe_fd[1], cmd_size, &cmd); + OPAL_OUTPUT((-1, "main thread sending monitor fd %d", fd)); + write_fd(pipe_to_service_thread[1], cmd_size, &cmd); } else { /* Otherwise, add it directly */ - local_pipe_cmd_add_fd(true, &cmd); + service_pipe_cmd_add_fd(true, &cmd); } return OMPI_SUCCESS; @@ -400,9 +559,10 @@ int ompi_btl_openib_fd_monitor(int fd, int flags, /* * Stop monitoring an fd + * Called by main or service thread; callback will be in service thread */ int ompi_btl_openib_fd_unmonitor(int fd, - ompi_btl_openib_fd_callback_fn_t *callback, + ompi_btl_openib_fd_event_callback_fn_t *callback, void *context) { cmd_t cmd; @@ -415,14 +575,15 @@ int ompi_btl_openib_fd_unmonitor(int fd, cmd.pc_cmd = CMD_REMOVE_FD; cmd.pc_fd = fd; cmd.pc_flags = 0; - cmd.pc_callback = callback; + cmd.pc_fn.event = callback; cmd.pc_context = context; - if (OMPI_HAVE_THREAD_SUPPORT) { + if (OMPI_HAVE_THREADS) { /* For the threaded version, write a command down the pipe */ - write_fd(pipe_fd[1], cmd_size, &cmd); + OPAL_OUTPUT((-1, "main thread sending unmonitor fd %d", fd)); + write_fd(pipe_to_service_thread[1], cmd_size, &cmd); } else { /* Otherwise, remove it directly */ - local_pipe_cmd_remove_fd(&cmd); + service_pipe_cmd_remove_fd(&cmd); } return OMPI_SUCCESS; @@ -430,31 +591,25 @@ int ompi_btl_openib_fd_unmonitor(int fd, /* * Run a function in the main thread + * Called by service thread */ -int ompi_btl_openib_fd_schedule(ompi_btl_openib_schedule_callback_fn_t *callback, - void *context) +int ompi_btl_openib_fd_run_in_main(ompi_btl_openib_fd_main_callback_fn_t *callback, + void *context) { - if (OMPI_HAVE_THREAD_SUPPORT) { - /* For the threaded version, schedule an event for "now" */ - registered_item_t *ri; - struct timeval now; + if (OMPI_HAVE_THREADS) { + cmd_t cmd; - ri = OBJ_NEW(registered_item_t); - if (NULL == ri) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - /* Create an event that will run in the main thread */ - ri->ri_fd = ri->ri_flags = -1; - ri->ri_callback.schedule = callback; - ri->ri_context = context; - ri->ri_event_used = true; - opal_evtimer_set(&ri->ri_event, libevent_event_callback, ri); - now.tv_sec = 0; - now.tv_usec = 0; - opal_evtimer_add(&ri->ri_event, &now); + OPAL_OUTPUT((-1, "run in main -- sending command")); + /* For the threaded version, write a command down the pipe */ + cmd.pc_cmd = CMD_CALL_FUNCTION; + cmd.pc_fd = -1; + cmd.pc_flags = 0; + cmd.pc_fn.main = callback; + cmd.pc_context = context; + write_to_main_thread(&cmd); } else { - /* For the non-threaded version, just call the function */ + /* Otherwise, call it directly */ + OPAL_OUTPUT((-1, "run in main -- calling now!")); callback(context); } @@ -463,21 +618,32 @@ int ompi_btl_openib_fd_schedule(ompi_btl_openib_schedule_callback_fn_t *callback /* * Finalize + * Called by main thread */ int ompi_btl_openib_fd_finalize(void) { if (initialized) { - if (OMPI_HAVE_THREAD_SUPPORT) { + if (OMPI_HAVE_THREADS) { /* For the threaded version, send a command down the pipe */ cmd_t cmd; + OPAL_OUTPUT((-1, "shutting down openib fd")); + opal_event_del(&main_thread_event); memset(&cmd, 0, cmd_size); cmd.pc_cmd = CMD_TIME_TO_QUIT; - write_fd(pipe_fd[1], cmd_size, &cmd); + write_fd(pipe_to_service_thread[1], cmd_size, &cmd); pthread_join(thread, NULL); - close(pipe_fd[0]); - close(pipe_fd[1]); + opal_atomic_rmb(); + + opal_event_del(&main_thread_event); + + close(pipe_to_service_thread[0]); + close(pipe_to_service_thread[1]); + close(pipe_to_main_thread[0]); + close(pipe_to_main_thread[1]); + OBJ_DESTRUCT(&pending_to_main_thread); } + OBJ_DESTRUCT(®istered_items); } initialized = false; diff --git a/ompi/mca/btl/openib/btl_openib_fd.h b/ompi/mca/btl/openib/btl_openib_fd.h index a544acbad4..02d79bdaaf 100644 --- a/ompi/mca/btl/openib/btl_openib_fd.h +++ b/ompi/mca/btl/openib/btl_openib_fd.h @@ -18,41 +18,46 @@ BEGIN_C_DECLS /** * Typedef for fd callback function */ -typedef void *(ompi_btl_openib_fd_callback_fn_t)(int fd, int flags, - void *context); +typedef void *(ompi_btl_openib_fd_event_callback_fn_t)(int fd, int flags, + void *context); /** * Typedef for generic callback function */ -typedef void *(ompi_btl_openib_schedule_callback_fn_t)(void *context); +typedef void *(ompi_btl_openib_fd_main_callback_fn_t)(void *context); /** - * Initialize fd monitoring + * Initialize fd monitoring. + * Called by the main thread. */ int ompi_btl_openib_fd_init(void); /** - * Start monitoring an fd + * Start monitoring an fd. + * Called by main or service thread; callback will be in service thread. */ int ompi_btl_openib_fd_monitor(int fd, int flags, - ompi_btl_openib_fd_callback_fn_t *callback, + ompi_btl_openib_fd_event_callback_fn_t *callback, void *context); /** - * Stop monitoring an fd + * Stop monitoring an fd. + * Called by main or service thread; callback will be in service thread. */ int ompi_btl_openib_fd_unmonitor(int fd, - ompi_btl_openib_fd_callback_fn_t *callback, + ompi_btl_openib_fd_event_callback_fn_t *callback, void *context); /** - * Run a function in the main thread + * Run a function in the main thread. + * Called by the service thread. */ -int ompi_btl_openib_fd_schedule(ompi_btl_openib_schedule_callback_fn_t callback, - void *context); +int ompi_btl_openib_fd_run_in_main(ompi_btl_openib_fd_main_callback_fn_t callback, + void *context); /** - * Finalize fd monitoring + * Finalize fd monitoring. + * Called by the main thread. */ int ompi_btl_openib_fd_finalize(void); diff --git a/ompi/mca/btl/openib/btl_openib_frag.h b/ompi/mca/btl/openib/btl_openib_frag.h index 7a191e403b..64dff84f18 100644 --- a/ompi/mca/btl/openib/btl_openib_frag.h +++ b/ompi/mca/btl/openib/btl_openib_frag.h @@ -116,6 +116,7 @@ typedef struct mca_btl_openib_footer_t mca_btl_openib_footer_t; #define MCA_BTL_OPENIB_CONTROL_CREDITS 0 #define MCA_BTL_OPENIB_CONTROL_RDMA 1 #define MCA_BTL_OPENIB_CONTROL_COALESCED 2 +#define MCA_BTL_OPENIB_CONTROL_CTS 3 struct mca_btl_openib_control_header_t { uint8_t type; @@ -267,7 +268,7 @@ OBJ_CLASS_DECLARATION(mca_btl_openib_coalesced_frag_t); */ static inline mca_btl_openib_send_control_frag_t * -alloc_credit_frag(mca_btl_openib_module_t *btl) +alloc_control_frag(mca_btl_openib_module_t *btl) { int rc; ompi_free_list_item_t *item; diff --git a/ompi/mca/btl/openib/configure.m4 b/ompi/mca/btl/openib/configure.m4 index dd69825c24..c222a4d8d0 100644 --- a/ompi/mca/btl/openib/configure.m4 +++ b/ompi/mca/btl/openib/configure.m4 @@ -32,7 +32,7 @@ AC_DEFUN([MCA_btl_openib_POST_CONFIG], [ # [action-if-cant-compile]) # ------------------------------------------------ AC_DEFUN([MCA_btl_openib_CONFIG],[ - OMPI_VAR_SCOPE_PUSH([cpcs]) + OMPI_VAR_SCOPE_PUSH([cpcs have_threads]) cpcs="oob" OMPI_CHECK_OPENIB([btl_openib], @@ -54,15 +54,23 @@ AC_DEFUN([MCA_btl_openib_CONFIG],[ $1], [$2]) + AC_MSG_CHECKING([for thread support (needed for ibcm/rdmacm)]) + have_threads=`echo $THREAD_TYPE | awk '{ print [$]1 }'` + if test "x$have_threads" = "x"; then + have_threads=none + fi + AC_MSG_RESULT([$have_threads]) AS_IF([test "$btl_openib_happy" = "yes"], [if test "x$btl_openib_have_xrc" = "x1"; then cpcs="$cpcs xoob" fi - if test "x$btl_openib_have_rdmacm" = "x1"; then + if test "x$btl_openib_have_rdmacm" = "x1" -a \ + "$have_threads" != "none"; then cpcs="$cpcs rdmacm" fi - if test "x$btl_openib_have_ibcm" = "x1"; then + if test "x$btl_openib_have_ibcm" = "x1" -a \ + "$have_threads" != "none"; then cpcs="$cpcs ibcm" fi AC_MSG_CHECKING([which openib btl cpcs will be built]) diff --git a/ompi/mca/btl/openib/connect/btl_openib_connect_base.c b/ompi/mca/btl/openib/connect/btl_openib_connect_base.c index f868b41e06..bec4fd80e0 100644 --- a/ompi/mca/btl/openib/connect/btl_openib_connect_base.c +++ b/ompi/mca/btl/openib/connect/btl_openib_connect_base.c @@ -18,10 +18,10 @@ #if HAVE_XRC #include "connect/btl_openib_connect_xoob.h" #endif -#if OMPI_HAVE_RDMACM +#if OMPI_HAVE_RDMACM && OMPI_HAVE_THREADS #include "connect/btl_openib_connect_rdmacm.h" #endif -#if OMPI_HAVE_IBCM +#if OMPI_HAVE_IBCM && OMPI_HAVE_THREADS #include "connect/btl_openib_connect_ibcm.h" #endif @@ -44,7 +44,7 @@ static ompi_btl_openib_connect_base_component_t *all[] = { /* Always have an entry here so that the CP indexes will always be the same: if RDMA CM is not available, use the "empty" CPC */ -#if OMPI_HAVE_RDMACM +#if OMPI_HAVE_RDMACM && OMPI_HAVE_THREADS &ompi_btl_openib_connect_rdmacm, #else &ompi_btl_openib_connect_empty, @@ -52,7 +52,7 @@ static ompi_btl_openib_connect_base_component_t *all[] = { /* Always have an entry here so that the CP indexes will always be the same: if IB CM is not available, use the "empty" CPC */ -#if OMPI_HAVE_IBCM +#if OMPI_HAVE_IBCM && OMPI_HAVE_THREADS &ompi_btl_openib_connect_ibcm, #else &ompi_btl_openib_connect_empty, @@ -273,6 +273,14 @@ int ompi_btl_openib_connect_base_select_for_local_port(mca_btl_openib_module_t * opal_output(-1, "match cpc for local port: %s", available[i]->cbc_name); + /* If the CPC wants to use the CTS protocol, check to ensure + that QP 0 is PP; if it's not, we can't use this CPC (or the + CTS protocol) */ + if (!BTL_OPENIB_QP_TYPE_PP(0)) { + BTL_VERBOSE(("this CPConly supports when the first btl_openib_receive_queues QP is a PP QP")); + continue; + } + /* This CPC has indicated that it wants to run on this openib BTL module. Woo hoo! */ ++cpc_index; diff --git a/ompi/mca/btl/openib/connect/btl_openib_connect_ibcm.c b/ompi/mca/btl/openib/connect/btl_openib_connect_ibcm.c index 6076ea5d57..3ddad2ed00 100644 --- a/ompi/mca/btl/openib/connect/btl_openib_connect_ibcm.c +++ b/ompi/mca/btl/openib/connect/btl_openib_connect_ibcm.c @@ -248,7 +248,7 @@ * normal RTU processing. If the RTU is received later, the IBCM * system on the passive side will know that it's effectively a * duplicate, and therefore can be ignored. - */ + */ #include "ompi_config.h" @@ -649,6 +649,7 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl, rc = OMPI_ERR_NOT_SUPPORTED; goto error; } + /* If we do not have struct ibv_device.transport_device, then we're in an old version of OFED that is IB only (i.e., no iWarp), so we can safely assume that we can use this CPC. */ @@ -699,12 +700,13 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl, BTL_VERBOSE(("created cpc module %p for btl %p", (void*)m, (void*)btl)); - /* See if we've already for an IB CM listener for this device */ + /* See if we've already got an IB CM listener for this device */ for (item = opal_list_get_first(&ibcm_cm_listeners); item != opal_list_get_end(&ibcm_cm_listeners); item = opal_list_get_next(item)) { cmh = (ibcm_listen_cm_id_t*) item; if (cmh->ib_context == btl->device->ib_dev_context) { + OBJ_RETAIN(cmh); break; } } @@ -753,6 +755,9 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl, rc = OMPI_ERR_NOT_SUPPORTED; goto error; } + OPAL_OUTPUT((-1, "opened ibcm device 0x%" PRIx64 " (%s)", + (uint64_t) cmh->cm_device, + ibv_get_device_name(cmh->ib_context->device))); if (0 != (rc = ib_cm_create_id(cmh->cm_device, &cmh->listen_cm_id, NULL))) { @@ -779,9 +784,6 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl, } opal_list_append(&ibcm_cm_listeners, &(cmh->super)); - } else { - /* We found an existing IB CM handle -- bump up the refcount */ - OBJ_RETAIN(cmh); } m->cmh = cmh; imli = OBJ_NEW(ibcm_module_list_item_t); @@ -831,6 +833,9 @@ static int ibcm_component_query(mca_btl_openib_module_t *btl, m->cpc.cbm_start_connect = ibcm_module_start_connect; m->cpc.cbm_endpoint_finalize = ibcm_endpoint_finalize; m->cpc.cbm_finalize = ibcm_module_finalize; + /* Setting uses_cts=true also guarantees that we'll only be + selected if QP 0 is PP */ + m->cpc.cbm_uses_cts = true; /* Start monitoring the fd associated with the cm_device */ ompi_btl_openib_fd_monitor(cmh->cm_device->fd, OPAL_EV_READ, @@ -901,7 +906,8 @@ static int qp_create_one(mca_btl_base_endpoint_t* endpoint, int qp, init_attr.cap.max_send_sge = 1; init_attr.cap.max_recv_sge = 1; /* we do not use SG list */ if(BTL_OPENIB_QP_TYPE_PP(qp)) { - init_attr.cap.max_recv_wr = max_recv_wr; + /* Add one for the CTS receive frag that will be posted */ + init_attr.cap.max_recv_wr = max_recv_wr + 1; } else { init_attr.cap.max_recv_wr = 0; } @@ -1265,7 +1271,7 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc, /* Set the endpoint state to "connecting" (this function runs in the main MPI thread; not the service thread, so we can set the - endpoint_state here). */ + endpoint_state here with no memory barriers). */ endpoint->endpoint_state = MCA_BTL_IB_CONNECTING; /* Fill in the path record for this peer */ @@ -1363,9 +1369,6 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc, } if (0 != (rc = ib_cm_send_req(req->super.cm_id, cm_req))) { BTL_VERBOSE(("Got nonzero return from ib_cm_send_req: %d, errno %d", rc, errno)); - if (-1 == rc) { - perror("Errno is "); - } rc = OMPI_ERR_UNREACH; goto err; } @@ -1438,11 +1441,9 @@ static int ibcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc, */ static void *callback_unlock(int fd, int flags, void *context) { -/* We need #if protection in order to prevent unused variable warning */ -#if OMPI_HAVE_THREAD_SUPPORT - opal_mutex_t *m = (opal_mutex_t*) context; - OPAL_THREAD_UNLOCK(m); -#endif + volatile int *barrier = (volatile int *) context; + OPAL_OUTPUT((-1, "ibcm unlocking main thread")); + *barrier = 1; return NULL; } @@ -1455,7 +1456,7 @@ static void ibcm_listen_cm_id_constructor(ibcm_listen_cm_id_t *cmh) static void ibcm_listen_cm_id_destructor(ibcm_listen_cm_id_t *cmh) { - opal_mutex_t mutex; + volatile int barrier = 0; opal_list_item_t *item; /* Remove all the ibcm module items */ @@ -1482,19 +1483,41 @@ static void ibcm_listen_cm_id_destructor(ibcm_listen_cm_id_t *cmh) /* Stop monitoring the cm_device's fd (wait for it to be released from the monitoring entity) */ - OPAL_THREAD_LOCK(&mutex); ompi_btl_openib_fd_unmonitor(cmh->cm_device->fd, callback_unlock, - &mutex); - OPAL_THREAD_LOCK(&mutex); + + (void*) &barrier); + /* JMS debug code while figuring out the IBCM problem */ +#if 0 + while (0 == barrier) { + sched_yield(); + } +#else + { + time_t t = time(NULL); + OPAL_OUTPUT((-1, "main thread waiting for ibcm barrier")); + while (0 == barrier) { + sched_yield(); + if (time(NULL) - t > 5) { + OPAL_OUTPUT((-1, "main thread been looping for a long time...")); + break; + } + } + } +#endif /* Destroy the listener */ if (NULL != cmh->listen_cm_id) { + OPAL_OUTPUT((-1, "destryoing ibcm listener 0x%" PRIx64, + (uint64_t) cmh->listen_cm_id)); ib_cm_destroy_id(cmh->listen_cm_id); } /* Close the CM device */ if (NULL != cmh->cm_device) { + OPAL_OUTPUT((-1, "closing ibcm device 0x%" PRIx64 " (%s)", + (uint64_t) cmh->cm_device, + ibv_get_device_name(cmh->ib_context->device))); ib_cm_close_device(cmh->cm_device); } } @@ -1564,7 +1587,7 @@ static int ibcm_module_finalize(mca_btl_openib_module_t *btl, { ibcm_module_t *m = (ibcm_module_t *) cpc; - /* If we previously successfully initialized, then destroy + /* If we previously successfully initialized, then release everything */ if (NULL != m && NULL != m->cmh) { OBJ_RELEASE(m->cmh); @@ -1702,21 +1725,6 @@ static int qp_to_rts(int qp_index, struct ib_cm_id *cm_id, return OMPI_SUCCESS; } -/* - * Callback (from main thread) when an incoming IBCM request needs to - * initiate a new connection in the other direction. - */ -static void *callback_set_endpoint_connecting(void *context) -{ - mca_btl_openib_endpoint_t *endpoint = - (mca_btl_openib_endpoint_t *) context; - - BTL_VERBOSE(("ibcm scheduled callback: setting endpoint to CONNECTING")); - endpoint->endpoint_state = MCA_BTL_IB_CONNECTING; - - return NULL; -} - /* * Callback (from main thread) when an incoming IBCM request needs to * initiate a new connection in the other direction. @@ -1895,9 +1903,11 @@ static int request_received(ibcm_listen_cm_id_t *cmh, If this is not the first request, then assume that all the QPs have been created already and we can just lookup what we need. */ if (!ie->ie_qps_created) { - /* Schedule to set the endpoint_state to "CONNECTING" */ - ompi_btl_openib_fd_schedule(callback_set_endpoint_connecting, - endpoint); + /* Set the endpoint_state to "CONNECTING". This is running + in the service thread, so we need to do a write barrier. */ + endpoint->endpoint_state = MCA_BTL_IB_CONNECTING; + opal_atomic_wmb(); + if (OMPI_SUCCESS != (rc = qp_create_all(endpoint, imodule))) { rej_reason = REJ_PASSIVE_SIDE_ERROR; BTL_ERROR(("qp_create_all failed -- reject")); @@ -1923,16 +1933,19 @@ static int request_received(ibcm_listen_cm_id_t *cmh, goto reject; } - /* Post receive buffers. Similar to QP creation, above, we post - *all* receive buffers at once (for all QPs). So ensure to only - do this for the first request received. If this is not the - first request on this endpoint, then assume that all the - receive buffers have been posted already. */ + /* Post a single receive buffer on the smallest QP for the CTS + protocol */ if (!ie->ie_recv_buffers_posted) { - if (OMPI_SUCCESS != - (rc = mca_btl_openib_endpoint_post_recvs(endpoint))) { - /* JMS */ - BTL_VERBOSE(("failed to post recv buffers")); + struct ibv_recv_wr *bad_wr, *wr; + + assert(NULL != endpoint->endpoint_cts_frag); + wr = &to_recv_frag(endpoint->endpoint_cts_frag)->rd_desc; + assert(NULL != wr); + wr->next = NULL; + + OPAL_OUTPUT((-1, "REQUEST posting CTS recv buffer")); + if (0 != ibv_post_recv(endpoint->qps[mca_btl_openib_component.credits_qp].qp->lcl_qp, wr, &bad_wr)) { + BTL_VERBOSE(("failed to post CTS recv buffer")); rej_reason = REJ_PASSIVE_SIDE_ERROR; goto reject; } @@ -2045,32 +2058,31 @@ static int request_received(ibcm_listen_cm_id_t *cmh, (ompi_btl_openib_connect_base_module_t *) imodule; cbdata->cscd_endpoint = endpoint; BTL_VERBOSE(("starting connect in other direction")); - ompi_btl_openib_fd_schedule(callback_start_connect, cbdata); + ompi_btl_openib_fd_run_in_main(callback_start_connect, cbdata); TIMER_STOP(REQUEST_RECEIVED); return OMPI_SUCCESS; } - BTL_ERROR(("malloc failed")); } /* Communicate to the upper layer that the connection on this endpoint has failed */ TIMER_STOP(REQUEST_RECEIVED); - ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error, - endpoint); + ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, + endpoint); return rc; } /* * Callback (from main thread) when the endpoint has been connected */ -static void *callback_set_endpoint_connected(void *context) +static void *callback_set_endpoint_cpc_complete(void *context) { mca_btl_openib_endpoint_t *endpoint = (mca_btl_openib_endpoint_t*) context; - BTL_VERBOSE(("calling endpoint_connected")); - mca_btl_openib_endpoint_connected(endpoint); - BTL_VERBOSE(("*** CONNECTED endpoint_connected done!")); + BTL_VERBOSE(("calling endpoint_cpc_complete")); + mca_btl_openib_endpoint_cpc_complete(endpoint); + BTL_VERBOSE(("*** CONNECTED endpoint_cpc_complete done!")); return NULL; } @@ -2133,18 +2145,27 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event) goto error; } - /* Now that all the qp's are created locally, post some receive - buffers, setup credits, etc. The post_recvs() call posts the - buffers for all QPs at once, so be sure to only do this for the - *first* reply that is received on an endpoint. For all other - replies received on an endpoint, we can safely assume that the - receive buffers have already been posted. */ + /* Now that all the qp's are created locally, post a single + receive buffer on the smallest QP for the CTS protocol. Be + sure to only do this for the *first* reply that is received on + an endpoint. For all other replies received on an endpoint, we + can safely assume that the CTS receive buffer has already been + posted. */ if (!ie->ie_recv_buffers_posted) { - if (OMPI_SUCCESS != - (rc = mca_btl_openib_endpoint_post_recvs(endpoint))) { - BTL_VERBOSE(("failed to post recv buffers")); + struct ibv_recv_wr *bad_wr, *wr; + + OPAL_OUTPUT((-1, "REPLY posting CTS recv buffer")); + assert(NULL != endpoint->endpoint_cts_frag); + wr = &to_recv_frag(endpoint->endpoint_cts_frag)->rd_desc; + assert(NULL != wr); + wr->next = NULL; + + if (0 != ibv_post_recv(endpoint->qps[mca_btl_openib_component.credits_qp].qp->lcl_qp, wr, &bad_wr)) { + /* JMS */ + BTL_VERBOSE(("failed to post CTS recv buffer")); goto error; } + ie->ie_recv_buffers_posted = true; } @@ -2168,7 +2189,7 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event) that we're done. */ if (0 == --(ie->ie_qps_to_connect)) { BTL_VERBOSE(("REPLY telling main BTL we're connected")); - ompi_btl_openib_fd_schedule(callback_set_endpoint_connected, endpoint); + ompi_btl_openib_fd_run_in_main(callback_set_endpoint_cpc_complete, endpoint); } TIMER_STOP(REPLY_RECEIVED); @@ -2177,8 +2198,8 @@ static int reply_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event) error: /* Communicate to the upper layer that the connection on this endpoint has failed */ - ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error, - endpoint); + ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, + endpoint); return rc; } @@ -2207,7 +2228,7 @@ static int ready_to_use_received(ibcm_listen_cm_id_t *h, that we're done. */ if (0 == --(ie->ie_qps_to_connect)) { BTL_VERBOSE(("RTU telling main BTL we're connected")); - ompi_btl_openib_fd_schedule(callback_set_endpoint_connected, endpoint); + ompi_btl_openib_fd_run_in_main(callback_set_endpoint_cpc_complete, endpoint); } BTL_VERBOSE(("all done")); @@ -2216,25 +2237,6 @@ static int ready_to_use_received(ibcm_listen_cm_id_t *h, } -static int disconnect_request_received(ibcm_listen_cm_id_t *cmh, - struct ib_cm_event *event) -{ - BTL_VERBOSE(("disconnect request received")); - return OMPI_SUCCESS; -} - - -static int disconnect_reply_received(ibcm_listen_cm_id_t *cmd, - struct ib_cm_event *event) -{ - BTL_VERBOSE(("disconnect reply received")); -#if 0 - ib_cm_send_drep(event->cm_id, NULL, 0); -#endif - return OMPI_SUCCESS; -} - - static int reject_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event) { enum ib_cm_rej_reason reason = event->param.rej_rcvd.reason; @@ -2298,7 +2300,7 @@ static int reject_received(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event) reason)); /* Communicate to the upper layer that the connection on this endpoint has failed */ - ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error, NULL); + ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, NULL); return OMPI_ERR_NOT_FOUND; } @@ -2330,8 +2332,8 @@ static int request_error(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event) /* Communicate to the upper layer that the connection on this endpoint has failed */ - ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error, - endpoint); + ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, + endpoint); return OMPI_SUCCESS; } @@ -2362,28 +2364,12 @@ static int reply_error(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *event) /* Communicate to the upper layer that the connection on this endpoint has failed */ - ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error, - endpoint); + ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, + endpoint); return OMPI_SUCCESS; } -static int disconnect_request_error(ibcm_listen_cm_id_t *cmh, - struct ib_cm_event *e) -{ - BTL_VERBOSE(("disconnect request error!")); - return OMPI_SUCCESS; -} - - -static int unhandled_event(ibcm_listen_cm_id_t *cmh, struct ib_cm_event *e) -{ - BTL_VERBOSE(("unhandled event error (%p, %d)", - (void*) e, e->event)); - return OMPI_ERR_NOT_FOUND; -} - - static void *ibcm_event_dispatch(int fd, int flags, void *context) { bool want_ack; @@ -2391,38 +2377,46 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context) ibcm_listen_cm_id_t *cmh = (ibcm_listen_cm_id_t*) context; struct ib_cm_event *e = NULL; + OPAL_OUTPUT((-1, "ibcm dispatch: on device 0x%" PRIx64", fd %d", + (uint64_t) cmh->cm_device, fd)); TIMER_START(CM_GET_EVENT); + /* Blocks until next event, which should be immediately (because + we shouldn't call this dispatch function unless there's + something ready to read) */ rc = ib_cm_get_event(cmh->cm_device, &e); TIMER_STOP(CM_GET_EVENT); + if (-ENODATA == rc) { + OPAL_OUTPUT((-1, "ibcm dispatch: GOT NOT DATA!")); + return NULL; + } + while (-1 == rc && EAGAIN == errno) { + OPAL_OUTPUT((-1, "ibcm dispatch: GOT EAGAIN!")); + /* Try again */ + rc = ib_cm_get_event(cmh->cm_device, &e); + } if (0 == rc && NULL != e) { want_ack = true; switch (e->event) { case IB_CM_REQ_RECEIVED: + OPAL_OUTPUT((-1, "ibcm dispatch: request received on fd %d", fd)); /* Incoming request */ rc = request_received(cmh, e); break; case IB_CM_REP_RECEIVED: + OPAL_OUTPUT((-1, "ibcm dispatch: reply received on fd %d", fd)); /* Reply received */ rc = reply_received(cmh, e); break; case IB_CM_RTU_RECEIVED: + OPAL_OUTPUT((-1, "ibcm dispatch: RTU received on fd %d", fd)); /* Ready to use! */ rc = ready_to_use_received(cmh, e); break; - - case IB_CM_DREQ_RECEIVED: - /* Disconnect request */ - rc = disconnect_request_received(cmh, e); - break; - - case IB_CM_DREP_RECEIVED: - /* Disconnect reply */ - rc = disconnect_reply_received(cmh, e); - break; - + case IB_CM_REJ_RECEIVED: + OPAL_OUTPUT((-1, "ibcm dispatch: reject received on fd %d", fd)); /* Rejected connection */ rc = reject_received(cmh, e); /* reject_received() called ib_cm_ack_event so that the CM @@ -2431,22 +2425,32 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context) break; case IB_CM_REQ_ERROR: + OPAL_OUTPUT((-1, "ibcm dispatch: request error received on fd %d", fd)); /* Request error */ rc = request_error(cmh, e); break; case IB_CM_REP_ERROR: + OPAL_OUTPUT((-1, "ibcm dispatch: reply error received on fd %d", fd)); /* Reply error */ rc = reply_error(cmh, e); break; + case IB_CM_DREQ_RECEIVED: + case IB_CM_DREP_RECEIVED: case IB_CM_DREQ_ERROR: - /* Disconnect request error */ - rc = disconnect_request_error(cmh, e); + OPAL_OUTPUT((-1, "ibcm dispatch: %s received on fd %d", + (IB_CM_DREQ_RECEIVED == e->event) ? "disconnect request" : + (IB_CM_DREP_RECEIVED == e->event) ? "disconnect reply" : + "disconnect request error", fd)); + /* We don't care */ + rc = OMPI_SUCCESS; break; default: - rc = unhandled_event(cmh, e); + /* This would be odd */ + OPAL_OUTPUT((-1, "ibcm dispatch: unhandled event received on fd %d", fd)); + rc = OMPI_ERR_NOT_FOUND; break; } @@ -2462,6 +2466,8 @@ static void *ibcm_event_dispatch(int fd, int flags, void *context) function), the dispatch function would have done that already */ } + } else { + OPAL_OUTPUT((-1, "Got weird value back from ib_cm_get_event: %d", rc)); } return NULL; diff --git a/ompi/mca/btl/openib/connect/btl_openib_connect_oob.c b/ompi/mca/btl/openib/connect/btl_openib_connect_oob.c index 3276be1db5..af31618304 100644 --- a/ompi/mca/btl/openib/connect/btl_openib_connect_oob.c +++ b/ompi/mca/btl/openib/connect/btl_openib_connect_oob.c @@ -86,7 +86,7 @@ ompi_btl_openib_connect_base_component_t ompi_btl_openib_connect_oob = { /* Query */ oob_component_query, /* Finalize */ - oob_component_finalize + oob_component_finalize, }; /* Open - this functions sets up any oob specific commandline params */ @@ -166,6 +166,7 @@ static int oob_component_query(mca_btl_openib_module_t *btl, (*cpc)->cbm_start_connect = oob_module_start_connect; (*cpc)->cbm_endpoint_finalize = NULL; (*cpc)->cbm_finalize = NULL; + (*cpc)->cbm_uses_cts = false; opal_output_verbose(5, mca_btl_base_output, "openib BTL: oob CPC available for use on %s", @@ -828,19 +829,19 @@ static void rml_recv_cb(int status, orte_process_name_t* process_name, } else { send_connect_data(ib_endpoint, ENDPOINT_CONNECT_ACK); /* Tell main BTL that we're done */ - mca_btl_openib_endpoint_connected(ib_endpoint); + mca_btl_openib_endpoint_cpc_complete(ib_endpoint); } break; case MCA_BTL_IB_WAITING_ACK: /* Tell main BTL that we're done */ - mca_btl_openib_endpoint_connected(ib_endpoint); + mca_btl_openib_endpoint_cpc_complete(ib_endpoint); break; case MCA_BTL_IB_CONNECT_ACK: send_connect_data(ib_endpoint, ENDPOINT_CONNECT_ACK); /* Tell main BTL that we're done */ - mca_btl_openib_endpoint_connected(ib_endpoint); + mca_btl_openib_endpoint_cpc_complete(ib_endpoint); break; case MCA_BTL_IB_CONNECTED: diff --git a/ompi/mca/btl/openib/connect/btl_openib_connect_rdmacm.c b/ompi/mca/btl/openib/connect/btl_openib_connect_rdmacm.c index de1738d074..90241f2706 100644 --- a/ompi/mca/btl/openib/connect/btl_openib_connect_rdmacm.c +++ b/ompi/mca/btl/openib/connect/btl_openib_connect_rdmacm.c @@ -43,12 +43,10 @@ #undef event static void rdmacm_component_register(void); +static int rdmacm_component_init(void); static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, ompi_btl_openib_connect_base_module_t **cpc); -static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc, - mca_btl_base_endpoint_t *endpoint); static int rdmacm_component_destroy(void); -static int rdmacm_component_init(void); ompi_btl_openib_connect_base_component_t ompi_btl_openib_connect_rdmacm = { "rdmacm", @@ -103,7 +101,7 @@ static int rdmacm_priority = 30; static uint16_t rdmacm_port = 0; static uint32_t rdmacm_addr = 0; -#define RDMA_RESOLVE_ADDR_TIMEOUT 2000 +#define RDMACM_RESOLVE_ADDR_TIMEOUT 2000 /* Open - this functions sets up any rdma_cm specific commandline params */ static void rdmacm_component_register(void) @@ -187,8 +185,9 @@ static void rdmacm_cleanup(rdmacm_contents_t *local, struct rdma_cm_id *id, uint32_t num) { - if (NULL == id) + if (NULL == id) { return; + } free(id->context); id->context = NULL; @@ -251,7 +250,8 @@ static int rdmacm_setup_qp(rdmacm_contents_t *local, attr.send_cq = local->openib_btl->device->ib_cq[BTL_OPENIB_LP_CQ]; attr.recv_cq = local->openib_btl->device->ib_cq[qp_cq_prio(qpnum)]; attr.srq = srq; - attr.cap.max_recv_wr = max_recv_wr; + /* Add one for the CTS receive frag that will be posted */ + attr.cap.max_recv_wr = max_recv_wr + 1; attr.cap.max_send_wr = max_send_wr; attr.cap.max_inline_data = req_inline = max_inline_size(qpnum, local->openib_btl->device); @@ -282,9 +282,9 @@ out: return 1; } -static int rdma_client_connect_one(rdmacm_contents_t *local, - message_t *message, - int num) +static int rdmacm_client_connect_one(rdmacm_contents_t *local, + message_t *message, + int num) { struct sockaddr_in din; id_contexts_t *context; @@ -319,10 +319,11 @@ static int rdma_client_connect_one(rdmacm_contents_t *local, * RDMA_CM_EVENT_ADDR_RESOLVED event will occur on the local event * handler. */ + OPAL_OUTPUT((0, "Resolving id: 0x%x", local->id[num])); rc = rdma_resolve_addr(local->id[num], NULL, (struct sockaddr *)&din, - RDMA_RESOLVE_ADDR_TIMEOUT); + RDMACM_RESOLVE_ADDR_TIMEOUT); if (0 != rc) { BTL_ERROR(("Failed to resolve the remote address with %d", rc)); goto out1; @@ -336,20 +337,78 @@ out: return OMPI_ERROR; } -static int rdma_client_connect(rdmacm_contents_t *local, message_t *message) +static char *stringify(uint32_t addr) +{ + char *line; + asprintf(&line, "%d.%d.%d.%d", + addr & 0xff, + (addr >> 8) & 0xff, + (addr >> 16) & 0xff, + (addr >> 24)); + return line; +} + +/* To avoid all kinds of nasty race conditions, we only allow + * connections to be made in one direction. So use a simple + * (arbitrary) test to decide which direction is allowed to initiate + * the connection: the process with the lower IP address wins. If the + * IP addresses are the same (i.e., the MPI procs are on the same + * node), then the process with the lower TCP port wins. + */ +static bool i_initiate(uint32_t local_ipaddr, uint16_t local_port, + uint32_t remote_ipaddr, uint16_t remote_port) +{ + char *a = stringify(local_ipaddr); + char *b = stringify(remote_ipaddr); + + if (local_ipaddr > remote_ipaddr || + (local_ipaddr == remote_ipaddr && local_port < remote_port)) { + OPAL_OUTPUT((0, "i_initiate (I WIN): local ipaddr %s, remote ipaddr %s", + a, b)); + free(a); + free(b); + return true; + } else { + OPAL_OUTPUT((0, "i_initiate (I lose): local ipaddr %s, remote ipaddr %s", + a, b)); + free(a); + free(b); + return false; + } +} + +static int rdmacm_client_connect(rdmacm_contents_t *local, message_t *message) { int rc, qp; - local->id = malloc(sizeof(struct rdma_cm_id *) * mca_btl_openib_component.num_qps); + /* If we're not the initiator, allocate an extra ID for the bogus + QP that we expect to be rejected */ + qp = mca_btl_openib_component.num_qps; + if (!local->endpoint->endpoint_initiator) { + ++qp; + } + local->id = calloc(qp, sizeof(struct rdma_cm_id *)); if (NULL == local->id) { BTL_ERROR(("malloc error")); - return OMPI_ERROR; + return OMPI_ERR_OUT_OF_RESOURCE; } - for (qp = 0; qp < mca_btl_openib_component.num_qps; qp++) { - rc = rdma_client_connect_one(local, message, qp); + /* If we're the initiator, then open all the QPs */ + if (local->endpoint->endpoint_initiator) { + for (qp = 0; qp < mca_btl_openib_component.num_qps; qp++) { + rc = rdmacm_client_connect_one(local, message, qp); + if (OMPI_SUCCESS != rc) { + BTL_ERROR(("rdmacm_client_connect_one error (real QP %d)", + qp)); + goto out; + } + } + } + /* Otherwise, only open 1 QP that we expect to be rejected */ + else { + rc = rdmacm_client_connect_one(local, message, qp - 1); if (OMPI_SUCCESS != rc) { - BTL_ERROR(("rdma_client_connect_one error")); + BTL_ERROR(("rdmacm_client_connect_one error (bogus QP)")); goto out; } } @@ -358,9 +417,12 @@ static int rdma_client_connect(rdmacm_contents_t *local, message_t *message) out: for (; qp >= 0; qp--) { - rdmacm_cleanup(local, local->id[qp], qp); + if (NULL != local->id[qp]) { + rdmacm_cleanup(local, local->id[qp], qp); + } } - return OMPI_ERROR; + + return rc; } /* Connect method called by the upper layers to connect the local @@ -370,11 +432,18 @@ out: static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc, mca_btl_base_endpoint_t *endpoint) { - rdmacm_contents_t *client; - message_t *message; + rdmacm_contents_t *local; + message_t *message, *local_message; int rc; - message = (message_t *)endpoint->endpoint_remote_cpc_data->cbm_modex_message; + /* Don't use the CPC to get the message, because this function is + invoked from the event_handler (to intitiate connections in the + Right direction), where we don't have the CPC, so it'll be + NULL. */ + local_message = + (message_t *) endpoint->endpoint_local_cpc->data.cbm_modex_message; + message = (message_t *) + endpoint->endpoint_remote_cpc_data->cbm_modex_message; BTL_VERBOSE(("Connecting to remote ip addr = %x, port = %d ep state = %d", message->ipaddr, message->tcp_port, endpoint->endpoint_state)); @@ -385,28 +454,43 @@ static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cp return OMPI_SUCCESS; } - endpoint->endpoint_state = MCA_BTL_IB_CONNECT_ACK; + /* Set the endpoint state to "connecting" (this function runs in + the main MPI thread; not the service thread, so we can set the + endpoint_state here). */ + endpoint->endpoint_state = MCA_BTL_IB_CONNECTING; - client = calloc(1, sizeof(rdmacm_contents_t)); - if (NULL == client) { - BTL_ERROR(("malloc of client failed")); + local = calloc(1, sizeof(rdmacm_contents_t)); + if (NULL == local) { + BTL_ERROR(("malloc of local failed")); goto out; } - client->openib_btl = endpoint->endpoint_btl; - client->endpoint = endpoint; - client->server = false; + local->openib_btl = endpoint->endpoint_btl; + local->endpoint = endpoint; + local->server = false; /* Populate the port information with the local port the server is * listening on instead of the ephemerial port this client is * connecting with. This port is used to determine which endpoint - * is being connected from, in the isntance where there are - * multiple listeners on the local system. + * is being connected from, in the case where there are multiple + * listeners on the local system. */ - client->tcp_port = ((message_t *)endpoint->endpoint_local_cpc->data.cbm_modex_message)->tcp_port; + local->ipaddr = local_message->ipaddr; + local->tcp_port = local_message->tcp_port; - rc = rdma_client_connect(client, message); + /* Are we the initiator? Or do we expect this connect request to + be rejected? */ + endpoint->endpoint_initiator = + i_initiate(local->ipaddr, local->tcp_port, + message->ipaddr, message->tcp_port); + OPAL_OUTPUT((0, "Start connect; ep=0x%x (0x%x), I %s the initiator to %s", + endpoint, + endpoint->endpoint_local_cpc, + endpoint->endpoint_initiator ? "am" : "am NOT", + endpoint->endpoint_proc->proc_ompi->proc_hostname)); + + rc = rdmacm_client_connect(local, message); if (0 != rc) { - BTL_ERROR(("rdma_client_connect error")); + BTL_ERROR(("rdmacm_client_connect error")); goto out; } @@ -447,15 +531,23 @@ static int handle_connect_request(rdmacm_contents_t *local, } message = endpoint->endpoint_remote_cpc_data->cbm_modex_message; + endpoint->endpoint_initiator = + i_initiate(local->ipaddr, local->tcp_port, + message->ipaddr, rem_port); - BTL_VERBOSE(("ep state = %d, local ipaddr = %x, remote ipaddr = %x port %d", - endpoint->endpoint_state, local->ipaddr, message->ipaddr, rem_port)); + BTL_VERBOSE(("ep state = %d, local ipaddr = %x, remote ipaddr = %x, local port = %d, remote port = %d", + endpoint->endpoint_state, local->ipaddr, message->ipaddr, + local->tcp_port, rem_port)); - if ((local->ipaddr == message->ipaddr && local->tcp_port < rem_port) || - local->ipaddr > message->ipaddr) { + OPAL_OUTPUT((0, "in handle_connect_request; ep=0x%x (0x%x), I still %s the initiator to %s", + endpoint, + endpoint->endpoint_local_cpc, + endpoint->endpoint_initiator ? "am" : "am NOT", + endpoint->endpoint_proc->proc_ompi->proc_hostname)); + if (endpoint->endpoint_initiator) { int race = 1; - BTL_VERBOSE(("Received a connect request from an endpoint in the wrong direction")); + OPAL_OUTPUT((0, "Received a connect request from an endpoint in the wrong direction")); /* This will cause a event on the remote system. By passing in * a value in the second arg of rdma_reject, the remote side @@ -468,19 +560,17 @@ static int handle_connect_request(rdmacm_contents_t *local, goto out; } - /* If there are multiple QPs attempting to connect from the - * wrong direction, only make one call to - * rdmacm_module_start_connect to connect in the proper - * direction, as it will connect to the remote side with the - * correct number of QPs. - */ - if (0 == qpnum) { - rdmacm_module_start_connect(NULL, endpoint); - } + OPAL_OUTPUT((0, "Starting connection in other direction")); + rdmacm_module_start_connect(NULL, endpoint); return 0; } + /* Set the endpoint_state to "CONNECTING". This is running + in the service thread, so we need to do a write barrier. */ + endpoint->endpoint_state = MCA_BTL_IB_CONNECTING; + opal_atomic_wmb(); + endpoint->rem_info.rem_index = rem_index; /* Setup QP for new connection */ @@ -493,24 +583,26 @@ static int handle_connect_request(rdmacm_contents_t *local, goto out; } - /* Recvs must be posted prior to accepting the rdma connection. - * Otherwise, it is possible to get data before there are recvs to - * put it, which for iWARP will result in tearing down of the - * connection. - */ - if (BTL_OPENIB_QP_TYPE_PP(qpnum)) { - rc = mca_btl_openib_endpoint_post_rr(endpoint, qpnum); - } else { - rc = mca_btl_openib_post_srr(endpoint->endpoint_btl, qpnum); - } - if (OMPI_SUCCESS != rc) { - BTL_ERROR(("mca_btl_openib_endpoint_post_rr_nolock error %d", rc)); - goto out1; + /* Post a single receive buffer on the smallest QP for the CTS + protocol */ + if (mca_btl_openib_component.credits_qp == qpnum) { + struct ibv_recv_wr *bad_wr, *wr; + + assert(NULL != endpoint->endpoint_cts_frag); + wr = &to_recv_frag(endpoint->endpoint_cts_frag)->rd_desc; + assert(NULL != wr); + wr->next = NULL; + + if (0 != ibv_post_recv(endpoint->qps[qpnum].qp->lcl_qp, + wr, &bad_wr)) { + BTL_ERROR(("failed to post CTS recv buffer")); + goto out1; + } } /* Since the event id is already created, we cannot add this * information in the normal way. Instead we must reference its - * location and put the data there so that it can be access later. + * location and put the data there so that it can be accessed later. */ event->id->context = malloc(sizeof(id_contexts_t)); if (NULL == event->id->context) { @@ -615,7 +707,11 @@ static int rdmacm_connection_shutdown(struct mca_btl_base_endpoint_t *endpoint) if (NULL != cli->item->id[i] && NULL != cli->item->id[i]->qp && NULL != cli->item->endpoint->qps) { - rdma_disconnect(cli->item->id[i]); + opal_output(0, "Freeing rdmacm id %p", + cli->item->id[i]); + rdma_disconnect(cli->item->id[i]); + /* JMS shouldn't be necessary */ + cli->item->id[i] = NULL; } } } @@ -626,11 +722,13 @@ static int rdmacm_connection_shutdown(struct mca_btl_base_endpoint_t *endpoint) /* * Callback (from main thread) when the endpoint has been connected */ -static void *local_endpoint_connected(void *context) +static void *local_endpoint_cpc_complete(void *context) { mca_btl_openib_endpoint_t *endpoint = (mca_btl_openib_endpoint_t *)context; - mca_btl_openib_endpoint_connected(endpoint); + OPAL_OUTPUT((0, "local_endpoint_cpc_complete to %s", + endpoint->endpoint_proc->proc_ompi->proc_hostname)); + mca_btl_openib_endpoint_cpc_complete(endpoint); return NULL; } @@ -641,9 +739,11 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even mca_btl_openib_endpoint_t *endpoint; message_t *message; - if (local->server) + if (local->server) { endpoint = ((id_contexts_t *)event->id->context)->endpoint; - else { + OPAL_OUTPUT((0, "Server CPC complete to %s", + endpoint->endpoint_proc->proc_ompi->proc_hostname)); + } else { list_item_t *li; uint32_t rem_index; @@ -659,6 +759,8 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even } li->item = local; opal_list_append(&client_list, &(li->super)); + OPAL_OUTPUT((0, "Client CPC complete to %s", + endpoint->endpoint_proc->proc_ompi->proc_hostname)); } if (NULL == endpoint) { BTL_ERROR(("Can't find endpoint")); @@ -666,12 +768,17 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even } data = (rdmacm_endpoint_local_cpc_data_t *)endpoint->endpoint_local_cpc_data; - /* Only notify the upper layers after the last QO has been connected */ + /* Only notify the upper layers after the last QP has been connected */ if (++data->rdmacm_counter < mca_btl_openib_component.num_qps) { BTL_VERBOSE(("%s count == %d", local->server?"server":"client", data->rdmacm_counter)); return 0; } + OPAL_OUTPUT((0, "in connect_endpoint; ep=0x%x (0x%x), I still %s the initiator to %s", + endpoint, + endpoint->endpoint_local_cpc, + endpoint->endpoint_initiator ? "am" : "am NOT", + endpoint->endpoint_proc->proc_ompi->proc_hostname)); message = endpoint->endpoint_remote_cpc_data->cbm_modex_message; BTL_VERBOSE(("%s connected!!! local %x remote %x state = %d", local->server?"server":"client", @@ -679,19 +786,19 @@ static int rdmacm_connect_endpoint(rdmacm_contents_t *local, struct rdma_cm_even message->ipaddr, endpoint->endpoint_state)); - ompi_btl_openib_fd_schedule(local_endpoint_connected, endpoint); + ompi_btl_openib_fd_run_in_main(local_endpoint_cpc_complete, endpoint); return 0; } -static int start_connect(rdmacm_contents_t *local, int num) +static int resolve_route(rdmacm_contents_t *local, int num) { int rc; - /* Resolve the route to the remote system. Onced established, the + /* Resolve the route to the remote system. Once established, the * local system will get a RDMA_CM_EVENT_ROUTE_RESOLVED event. */ - rc = rdma_resolve_route(local->id[num], RDMA_RESOLVE_ADDR_TIMEOUT); + rc = rdma_resolve_route(local->id[num], RDMACM_RESOLVE_ADDR_TIMEOUT); if (0 != rc) { BTL_ERROR(("Failed to resolve the route with %d", rc)); goto out; @@ -722,7 +829,6 @@ static int create_dummy_qp(rdmacm_contents_t *local, struct rdma_cm_id *id, int struct ibv_qp_init_attr attr; struct ibv_qp *qp; - /* create the qp via rdma_create_qp() */ memset(&attr, 0, sizeof(attr)); attr.qp_type = IBV_QPT_RC; attr.send_cq = local->dummy_cq; @@ -750,34 +856,44 @@ out: static int finish_connect(rdmacm_contents_t *local, int num) { struct rdma_conn_param conn_param; - struct sockaddr *peeraddr, *localaddr; - uint32_t localipaddr, remoteipaddr; - uint16_t remoteport; conn_message_t msg; int rc; + struct sockaddr *peeraddr, *localaddr; + uint32_t localipaddr, remoteipaddr; + uint16_t remoteport; + remoteport = rdma_get_dst_port(local->id[num]); - localaddr = rdma_get_local_addr(local->id[num]); peeraddr = rdma_get_peer_addr(local->id[num]); - localipaddr = ((struct sockaddr_in *)localaddr)->sin_addr.s_addr; remoteipaddr = ((struct sockaddr_in *)peeraddr)->sin_addr.s_addr; - if ((localipaddr == remoteipaddr && local->tcp_port <= remoteport) || - localipaddr > remoteipaddr) { + localaddr = rdma_get_local_addr(local->id[num]); + localipaddr = ((struct sockaddr_in *)localaddr)->sin_addr.s_addr; + + /* If we're the initiator, then setup the QP's and post the CTS + message buffer */ + if (local->endpoint->endpoint_initiator) { rc = rdmacm_setup_qp(local, local->endpoint, local->id[num], num); if (0 != rc) { BTL_ERROR(("rdmacm_setup_qp error %d", rc)); goto out; } - if (BTL_OPENIB_QP_TYPE_PP(num)) { - rc = mca_btl_openib_endpoint_post_rr(local->endpoint, num); - } else { - rc = mca_btl_openib_post_srr(local->endpoint->endpoint_btl, num); - } - if (OMPI_SUCCESS != rc) { - BTL_ERROR(("mca_btl_openib_endpoint_post_rr_nolock error %d", rc)); - goto out1; + if (mca_btl_openib_component.credits_qp == num) { + /* Post a single receive buffer on the smallest QP for the CTS + protocol */ + + struct ibv_recv_wr *bad_wr, *wr; + assert(NULL != local->endpoint->endpoint_cts_frag); + wr = &to_recv_frag(local->endpoint->endpoint_cts_frag)->rd_desc; + assert(NULL != wr); + wr->next = NULL; + + if (0 != ibv_post_recv(local->endpoint->qps[num].qp->lcl_qp, + wr, &bad_wr)) { + BTL_ERROR(("failed to post CTS recv buffer")); + goto out1; + } } } else { /* If we are establishing a connection in the "wrong" direction, @@ -811,12 +927,15 @@ static int finish_connect(rdmacm_contents_t *local, int num) msg.rem_index = local->endpoint->index; msg.rem_port = local->tcp_port; - BTL_VERBOSE(("Connecting from %x, port %d to %x", localipaddr, msg.rem_port, remoteipaddr)); - /* Now all of the local setup has been done. The remote system * should now get a RDMA_CM_EVENT_CONNECT_REQUEST event to further * the setup of the QP. */ + OPAL_OUTPUT((0, "in finish_connect; ep=0x%x (0x%x), I still %s the initiator to %s", + local->endpoint, + local->endpoint->endpoint_local_cpc, + local->endpoint->endpoint_initiator ? "am" : "am NOT", + local->endpoint->endpoint_proc->proc_ompi->proc_hostname)); rc = rdma_connect(local->id[num], &conn_param); if (0 != rc) { BTL_ERROR(("rdma_connect Failed with %d", rc)); @@ -833,7 +952,7 @@ out: return -1; } -static int rdma_event_handler(struct rdma_cm_event *event) +static int event_handler(struct rdma_cm_event *event) { rdmacm_contents_t *local; struct sockaddr *peeraddr, *localaddr; @@ -850,7 +969,7 @@ static int rdma_event_handler(struct rdma_cm_event *event) localipaddr = ((struct sockaddr_in *)localaddr)->sin_addr.s_addr; peeripaddr = ((struct sockaddr_in *)peeraddr)->sin_addr.s_addr; - BTL_VERBOSE(("%s rdma_event_handler -- %s, status = %d to %x", + BTL_VERBOSE(("%s event_handler -- %s, status = %d to %x", local->server?"server":"client", rdma_event_str(event->event), event->status, @@ -858,19 +977,23 @@ static int rdma_event_handler(struct rdma_cm_event *event) switch (event->event) { case RDMA_CM_EVENT_ADDR_RESOLVED: - rc = start_connect(local, qpnum); + OPAL_OUTPUT((0, "Address resolved: ID 0x%x", local->id[qpnum])); + rc = resolve_route(local, qpnum); break; case RDMA_CM_EVENT_ROUTE_RESOLVED: + OPAL_OUTPUT((0, "Route resolved: ID 0x%x", local->id[qpnum])); local->ipaddr = localipaddr; rc = finish_connect(local, qpnum); break; case RDMA_CM_EVENT_CONNECT_REQUEST: + OPAL_OUTPUT((0, "Incoming connect request: 0x%x", local->id[qpnum])); rc = handle_connect_request(local, event); break; case RDMA_CM_EVENT_ESTABLISHED: + OPAL_OUTPUT((0, "Connection established: 0x%x", local->id[qpnum])); rc = rdmacm_connect_endpoint(local, event); break; @@ -880,14 +1003,16 @@ static int rdma_event_handler(struct rdma_cm_event *event) break; case RDMA_CM_EVENT_REJECTED: - if ((NULL != event->param.conn.private_data) && (1 == *((int *)event->param.conn.private_data))) { + if ((NULL != event->param.conn.private_data) && + (1 == *((int *)event->param.conn.private_data))) { BTL_VERBOSE(("A good reject! for qp %d", qpnum)); if (NULL != local->id[qpnum]->qp) { ibv_destroy_qp(local->id[qpnum]->qp); local->id[qpnum]->qp = NULL; } - if (NULL != local->dummy_cq) + if (NULL != local->dummy_cq) { ibv_destroy_cq(local->dummy_cq); + } rdmacm_cleanup(local, local->id[qpnum], qpnum); rc = 0; } @@ -914,8 +1039,8 @@ static inline void rdmamcm_event_error(struct rdma_cm_event *event) endpoint = ((id_contexts_t *)event->id->context)->local->endpoint; } - ompi_btl_openib_fd_schedule(mca_btl_openib_endpoint_invoke_error, - endpoint); + ompi_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, + endpoint); } static void *rdmacm_event_dispatch(int fd, int flags, void *context) @@ -952,9 +1077,9 @@ static void *rdmacm_event_dispatch(int fd, int flags, void *context) } rdma_ack_cm_event(event); - rc = rdma_event_handler(&ecopy); + rc = event_handler(&ecopy); if (0 != rc) { - BTL_ERROR(("Error rdma_event_handler -- %s, status = %d", + BTL_ERROR(("Error event_handler -- %s, status = %d", rdma_event_str(ecopy.event), ecopy.status)); @@ -981,13 +1106,11 @@ static int rdmacm_init(mca_btl_openib_endpoint_t *endpoint) data = calloc(1, sizeof(rdmacm_endpoint_local_cpc_data_t)); if (NULL == data) { BTL_ERROR(("malloc failed")); - goto out; + return OMPI_ERR_OUT_OF_RESOURCE; } endpoint->endpoint_local_cpc_data = data; - return 0; -out: - return -1; + return OMPI_SUCCESS; } static int ipaddrcheck(rdmacm_contents_t *server, @@ -1068,8 +1191,7 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, ompi_btl_ /* RDMACM is not supported if we have any XRC QPs */ if (mca_btl_openib_component.num_xrc_qps > 0) { - opal_output_verbose(5, mca_btl_base_output, - "openib BTL: rdmacm CPC not supported with XRC receive queues, please try xoob CPC; skipped"); + BTL_VERBOSE(("rdmacm CPC not supported with XRC receive queues, please try xoob CPC; skipped")); rc = OMPI_ERR_NOT_SUPPORTED; goto out; } @@ -1088,6 +1210,9 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, ompi_btl_ (*cpc)->cbm_start_connect = rdmacm_module_start_connect; (*cpc)->cbm_endpoint_finalize = rdmacm_connection_shutdown; (*cpc)->cbm_finalize = NULL; + /* Setting uses_cts=true also guarantees that we'll only be + selected if QP 0 is PP */ + (*cpc)->cbm_uses_cts = true; server = malloc(sizeof(rdmacm_contents_t)); if (NULL == server) { diff --git a/ompi/mca/btl/openib/connect/btl_openib_connect_xoob.c b/ompi/mca/btl/openib/connect/btl_openib_connect_xoob.c index be0472a129..b0d6405e44 100644 --- a/ompi/mca/btl/openib/connect/btl_openib_connect_xoob.c +++ b/ompi/mca/btl/openib/connect/btl_openib_connect_xoob.c @@ -48,7 +48,7 @@ ompi_btl_openib_connect_base_component_t ompi_btl_openib_connect_xoob = { /* Query */ xoob_component_query, /* Finalize */ - xoob_component_finalize + xoob_component_finalize, }; typedef enum { @@ -892,7 +892,7 @@ static void xoob_rml_recv_cb(int status, orte_process_name_t* process_name, mca_btl_openib_endpoint_invoke_error(NULL); return; } - mca_btl_openib_endpoint_connected(ib_endpoint); + mca_btl_openib_endpoint_cpc_complete(ib_endpoint); OPAL_THREAD_UNLOCK(&ib_endpoint->endpoint_lock); break; case ENDPOINT_XOOB_CONNECT_XRC_RESPONSE: @@ -911,7 +911,7 @@ static void xoob_rml_recv_cb(int status, orte_process_name_t* process_name, OPAL_THREAD_LOCK(&ib_endpoint->endpoint_lock); /* we got srq numbers on our request */ XOOB_SET_REMOTE_INFO(ib_endpoint->rem_info, rem_info); - mca_btl_openib_endpoint_connected(ib_endpoint); + mca_btl_openib_endpoint_cpc_complete(ib_endpoint); OPAL_THREAD_UNLOCK(&ib_endpoint->endpoint_lock); break; case ENDPOINT_XOOB_CONNECT_XRC_NR_RESPONSE: @@ -989,6 +989,7 @@ static int xoob_component_query(mca_btl_openib_module_t *openib_btl, (*cpc)->cbm_start_connect = xoob_module_start_connect; (*cpc)->cbm_endpoint_finalize = NULL; (*cpc)->cbm_finalize = NULL; + (*cpc)->cbm_uses_cts = false; opal_output_verbose(5, mca_btl_base_output, "openib BTL: xoob CPC available for use on %s", diff --git a/ompi/mca/btl/openib/connect/connect.h b/ompi/mca/btl/openib/connect/connect.h index 73ad0ddf81..5b50390b9f 100644 --- a/ompi/mca/btl/openib/connect/connect.h +++ b/ompi/mca/btl/openib/connect/connect.h @@ -128,19 +128,29 @@ * used. There is only this function, which tells when a specific * CPC/BTL module is no longer being used. * - * There are two functions in the main openib BTL that the CPC will + * cbm_uses_cts: a bool that indicates whether the CPC will use the + * CTS protocol or not. + * - if true: the CPC will post the fragment on + * endpoint->endpoint_cts_frag as a receive buffer and will *not* + * call ompi_btl_openib_post_recvs(). + * - if false: the CPC will call ompi_btl_openib_post_recvs() before + * calling ompi_btl_openib_cpc_complete(). + * + * There are two functions in the main openib BTL that the CPC may * call: * * - ompi_btl_openib_post_recvs(endpoint): once a QP is locally * connected to the remote side (but we don't know if the remote side * is connected to us yet), this function is invoked to post buffers - * on the QP, setup credits for the endpoint, etc. + * on the QP, setup credits for the endpoint, etc. This function is + * *only* invoked if the CPC's cbm_uses_cts is false. * - * - ompi_btl_openib_connected(endpoint): once we know that a QP is - * connected on *both* sides, this function is invoked to tell the - * main openib BTL "ok, you can use this connection now." (e.g., the - * main openib BTL will start sending out fragments that were queued - * while the connection was establing, etc.). + * - ompi_btl_openib_cpc_complete(endpoint): once that a CPC knows + * that a QP is connected on *both* sides, this function is invoked to + * tell the main openib BTL "ok, you can use this connection now." + * (e.g., the main openib BTL will either invoke the CTS protocol or + * start sending out fragments that were queued while the connection + * was establishing, etc.). */ #ifndef BTL_OPENIB_CONNECT_H #define BTL_OPENIB_CONNECT_H @@ -330,6 +340,14 @@ typedef struct ompi_btl_openib_connect_base_module_t { /** Finalize the cpc module */ ompi_btl_openib_connect_base_module_finalize_fn_t cbm_finalize; + + /** Whether this module will use the CTS protocol or not. This + directly states whether this module will call + mca_btl_openib_endpoint_post_recvs() or not: true = this + module will *not* call _post_recvs() and instead will post the + receive buffer provided at endpoint->endpoint_cts_frag on qp + 0. */ + bool cbm_uses_cts; } ompi_btl_openib_connect_base_module_t; END_C_DECLS diff --git a/ompi/mca/btl/openib/help-mpi-btl-openib.txt b/ompi/mca/btl/openib/help-mpi-btl-openib.txt index 1785b69bf5..1ade3e4522 100644 --- a/ompi/mca/btl/openib/help-mpi-btl-openib.txt +++ b/ompi/mca/btl/openib/help-mpi-btl-openib.txt @@ -543,3 +543,14 @@ support. Short eager RDMA is not yet supported with progress threads; its use has been disabled in this job. This is a warning only; you job will attempt to continue. +# +[cannot raise btl error] +The OpenFabrics driver in Open MPI tried to raise a fatal error, but +failed. Hopefully there was an error message before this one that +gave some more detailed information. + + Local host: %s + Source file: %s + Source line: %d + +Your job is now going to abort, sorry.