1
1

remove sync step from udcm as it really isn't necessary

This commit was SVN r26724.
Этот коммит содержится в:
Nathan Hjelm 2012-07-02 22:54:44 +00:00
родитель 1a90471374
Коммит 9f3717959e

Просмотреть файл

@ -31,8 +31,7 @@
* 1. One side starts a connection and the request is received before
* the receiving side starts a connection. (One sided)
* 2. Both sides send a request before either receives a request.
* (Simulaneous). The sync sender/receiver are determined by
* each peer's lid and listen qp.
* (Simulaneous).
*
* The protocol for case 1 looks like:
* peer1 peer2
@ -43,8 +42,8 @@
* |<------| CONNECT
* move QPs to RTS | |
* post rc send | |
* SYNC |<----->| SYNC
* | |
* |<------| COMPLETE
* COMPLETE |------>|
*
* The protocol for case 2 looks like:
* peer1 peer2
@ -52,7 +51,7 @@
* CONNECT |<----->| CONNECT
* move QPs to RTS | | move QPs to RTS
* post rc send | | post rc recv
* SYNC |<----->| SYNC
* COMPLETE |<----->| COMPLETE
*
*/
@ -119,7 +118,7 @@ typedef struct udcm_module {
struct ibv_qp *listen_qp;
/* Work request completion queues */
struct ibv_cq *cm_send_cq, *cm_recv_cq, *cm_sync_cq;
struct ibv_cq *cm_send_cq, *cm_recv_cq;
/* Completion channel for receive completions */
struct ibv_comp_channel *cm_channel;
@ -137,7 +136,6 @@ typedef struct udcm_module {
size_t msg_length;
/* timeout thread */
bool cm_timeout_thread_started;
pthread_t cm_timeout_thread;
pthread_cond_t cm_timeout_cond;
pthread_mutex_t cm_timeout_lock;
@ -172,12 +170,8 @@ typedef struct {
opal_mutex_t udep_lock;
struct ibv_ah *ah;
struct ibv_qp *sync_qp;
uint32_t sync_psn;
uint32_t rem_sync_qpn, rem_sync_psn;
bool sent_req, recv_req;
bool lcl_init, rem_init;
/* Has this endpoint's data been initialized */
bool udep_initialized, udep_created_qps;
@ -190,7 +184,8 @@ typedef struct udcm_qp_t {
typedef enum udcm_message_type {
UDCM_MESSAGE_CONNECT = 100,
UDCM_MESSAGE_REJECT = 101
UDCM_MESSAGE_COMPLETE = 101,
UDCM_MESSAGE_REJECT = 102
} udcm_message_type_t;
typedef enum {
@ -296,11 +291,10 @@ static int udcm_send_request (mca_btl_base_endpoint_t *lcl_ep,
/*--------------------------------------------------------------------*/
#define UDCM_MIN_RECV_COUNT 1024
#define UDCM_MIN_RECV_COUNT 512
#define UDCM_MIN_TIMEOUT 1000000
#define UDCM_SEND_CQ_SIZE 1024
#define UDCM_SYNC_CQ_SIZE 1024
#define UDCM_SEND_CQ_SIZE 512
#define UDCM_WR_RECV_ID 0x20000000ll
#define UDCM_WR_SEND_ID 0x10000000ll
@ -429,6 +423,8 @@ static int udcm_component_query(mca_btl_openib_module_t *btl,
break;
}
signal (SIGSEGV, SIG_DFL);
rc = udcm_module_init (m, btl);
if (OMPI_SUCCESS != rc) {
break;
@ -493,10 +489,6 @@ static int udcm_endpoint_finalize(struct mca_btl_base_endpoint_t *lcl_ep)
ibv_destroy_ah(udep->ah);
}
if (udep->sync_qp) {
ibv_destroy_qp(udep->sync_qp);
}
OBJ_DESTRUCT(&udep->udep_lock);
free(lcl_ep->endpoint_local_cpc_data);
@ -513,18 +505,6 @@ static int udcm_module_init (udcm_module_t *m, mca_btl_openib_module_t *btl)
BTL_VERBOSE(("created cpc module %p for btl %p",
(void*)m, (void*)btl));
m->cm_exiting = false;
m->cm_timeout_thread_started = false;
OBJ_CONSTRUCT(&m->cm_lock, opal_mutex_t);
OBJ_CONSTRUCT(&m->cm_send_lock, opal_mutex_t);
OBJ_CONSTRUCT(&m->cm_recv_msg_queue, opal_list_t);
OBJ_CONSTRUCT(&m->cm_recv_msg_queue_lock, opal_mutex_t);
OBJ_CONSTRUCT(&m->flying_messages, opal_list_t);
pthread_mutex_init (&m->cm_timeout_lock, NULL);
pthread_cond_init (&m->cm_timeout_cond, NULL);
m->btl = btl;
/* Create completion channel */
@ -551,14 +531,6 @@ static int udcm_module_init (udcm_module_t *m, mca_btl_openib_module_t *btl)
return OMPI_ERR_NOT_SUPPORTED;
}
m->cm_sync_cq = create_cq_compat (btl->device->ib_dev_context,
UDCM_SYNC_CQ_SIZE, NULL,
m->cm_channel, 0);
if (NULL == m->cm_sync_cq) {
BTL_VERBOSE(("error creating ud sync completion queue"));
return OMPI_ERR_NOT_SUPPORTED;
}
if (0 != (rc = udcm_module_allocate_buffers (m))) {
BTL_VERBOSE(("error allocating cm buffers"));
return rc;
@ -599,11 +571,23 @@ static int udcm_module_init (udcm_module_t *m, mca_btl_openib_module_t *btl)
m->cpc.cbm_uses_cts = false;
m->cm_exiting = false;
/* Monitor the fd associated with the completion channel */
ompi_btl_openib_fd_monitor(m->cm_channel->fd, OPAL_EV_READ,
udcm_cq_event_dispatch, m);
OBJ_CONSTRUCT(&m->cm_lock, opal_mutex_t);
OBJ_CONSTRUCT(&m->cm_send_lock, opal_mutex_t);
OBJ_CONSTRUCT(&m->cm_recv_msg_queue, opal_list_t);
OBJ_CONSTRUCT(&m->cm_recv_msg_queue_lock, opal_mutex_t);
OBJ_CONSTRUCT(&m->flying_messages, opal_list_t);
pthread_mutex_init (&m->cm_timeout_lock, NULL);
/* start timeout thread */
pthread_cond_init (&m->cm_timeout_cond, NULL);
if (0 != (rc = udcm_module_start_timeout_thread (m))) {
return rc;
}
@ -620,10 +604,6 @@ static int udcm_module_init (udcm_module_t *m, mca_btl_openib_module_t *btl)
return OMPI_ERROR;
}
if (0 != ibv_req_notify_cq (m->cm_sync_cq, 0)) {
BTL_VERBOSE(("error requesting sync completions"));
return OMPI_ERROR;
}
/* Ready to use */
return OMPI_SUCCESS;
@ -663,8 +643,6 @@ udcm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
break;
}
udep->lcl_init = true;
rc = udcm_send_request (lcl_ep, NULL);
} while (0);
@ -683,41 +661,45 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl,
return OMPI_SUCCESS;
}
opal_mutex_lock (&m->cm_lock);
m->cm_exiting = true;
/* stop timeout thread */
if (m->cm_timeout_thread_started) {
pthread_mutex_lock (&m->cm_timeout_lock);
pthread_cond_signal (&m->cm_timeout_cond);
pthread_mutex_unlock (&m->cm_timeout_lock);
opal_mutex_lock (&m->cm_lock);
pthread_join (m->cm_timeout_thread, NULL);
m->cm_timeout_thread_started = false;
opal_mutex_lock (&m->cm_recv_msg_queue_lock);
/* clear message queue */
while ((item = opal_list_remove_first(&m->cm_recv_msg_queue))) {
OBJ_RELEASE(item);
}
pthread_mutex_destroy (&m->cm_timeout_lock);
pthread_cond_destroy (&m->cm_timeout_cond);
opal_mutex_unlock (&m->cm_recv_msg_queue_lock);
/* stop monitoring the channel's fd */
if (NULL != m->cm_channel) {
ompi_btl_openib_fd_unmonitor(m->cm_channel->fd, NULL, NULL);
OBJ_DESTRUCT(&m->cm_recv_msg_queue);
pthread_mutex_lock (&m->cm_timeout_lock);
while ((item = opal_list_remove_first(&m->flying_messages))) {
OBJ_RELEASE(item);
}
/* wake up timeout thread */
pthread_cond_signal (&m->cm_timeout_cond);
pthread_mutex_unlock (&m->cm_timeout_lock);
pthread_join (m->cm_timeout_thread, NULL);
OBJ_DESTRUCT(&m->flying_messages);
BTL_VERBOSE(("destroying listing thread"));
/* destroy the listen queue pair. this will cause ibv_get_cq_event to
return. */
udcm_module_destroy_listen_qp (m);
udcm_module_destroy_buffers (m);
/* stop monitoring the channel's fd */
ompi_btl_openib_fd_unmonitor(m->cm_channel->fd, NULL, NULL);
/* destroy completion queues */
if (m->cm_sync_cq) {
if (0 != ibv_destroy_cq (m->cm_sync_cq)) {
BTL_VERBOSE(("failed to destroy sync CQ. errno = %d",
errno));
}
}
udcm_module_destroy_buffers (m);
if (m->cm_send_cq) {
if (0 != ibv_destroy_cq (m->cm_send_cq)) {
@ -733,7 +715,6 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl,
}
}
/* destroy completion channel */
if (m->cm_channel) {
if (0 != ibv_destroy_comp_channel (m->cm_channel)) {
BTL_VERBOSE(("failed to completion channel. errno = %d",
@ -743,25 +724,14 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl,
m->cm_channel = NULL;
}
/* clear message queue */
opal_mutex_lock (&m->cm_recv_msg_queue_lock);
while ((item = opal_list_remove_first(&m->cm_recv_msg_queue))) {
OBJ_RELEASE(item);
}
opal_mutex_unlock (&m->cm_recv_msg_queue_lock);
OBJ_DESTRUCT(&m->cm_recv_msg_queue);
while ((item = opal_list_remove_first(&m->flying_messages))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&m->flying_messages);
opal_mutex_unlock (&m->cm_lock);
OBJ_DESTRUCT(&m->cm_send_lock);
OBJ_DESTRUCT(&m->cm_lock);
OBJ_DESTRUCT(&m->cm_recv_msg_queue_lock);
pthread_mutex_destroy (&m->cm_timeout_lock);
pthread_cond_destroy (&m->cm_timeout_cond);
return OMPI_SUCCESS;
}
@ -853,12 +823,10 @@ static void udcm_module_destroy_listen_qp (udcm_module_t *m)
/* Move listen QP into the ERR state to cancel all outstanding
work requests */
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.qp_state = IBV_QPS_ERR;
attr.sq_psn = 0;
attr_mask = IBV_QP_STATE | IBV_QP_SQ_PSN;
attr.qp_state = IBV_QPS_ERR;
if (0 != ibv_modify_qp(m->listen_qp, &attr, IBV_QP_STATE)) {
BTL_VERBOSE(("error modifying qp to ERR. errno = %d",
errno));
@ -890,9 +858,8 @@ static int udcm_module_allocate_buffers (udcm_module_t *m)
{
size_t total_size;
/* Send one extra qp for the sync qp */
m->msg_length = sizeof (udcm_msg_hdr_t) +
(mca_btl_openib_component.num_qps + 1) * sizeof (udcm_qp_t);
mca_btl_openib_component.num_qps * sizeof (udcm_qp_t);
total_size = (udcm_recv_count + 1) * (m->msg_length +
UDCM_GRH_SIZE);
@ -1033,50 +1000,11 @@ static inline int udcm_rc_qp_to_init (struct ibv_qp *qp,
return OMPI_SUCCESS;
}
static int udcm_create_sync_qp (mca_btl_base_endpoint_t *lcl_ep)
{
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
struct ibv_qp_init_attr init_attr;
int rc;
memset (&init_attr, 0, sizeof (init_attr));
init_attr.qp_type = IBV_QPT_RC;
init_attr.send_cq = m->cm_sync_cq;
init_attr.recv_cq = m->cm_sync_cq;
init_attr.cap.max_send_sge = 1;
init_attr.cap.max_recv_sge = 1;
init_attr.cap.max_recv_wr = 1;
init_attr.cap.max_send_wr = 1;
udep->sync_qp = ibv_create_qp(m->btl->device->ib_pd, &init_attr);
if (NULL == udep->sync_qp) {
orte_show_help("help-mpi-btl-openib-cpc-base.txt",
"ibv_create_qp failed", true, orte_process_info.nodename,
ibv_get_device_name(m->btl->device->ib_dev),
"Reliable connected (RC)");
return OMPI_ERROR;
}
/* Move the QP into the INIT state */
rc = udcm_rc_qp_to_init (udep->sync_qp, m->btl);
if (OMPI_SUCCESS != rc) {
ibv_destroy_qp (udep->sync_qp);
udep->sync_qp = NULL;
return rc;
}
udep->sync_psn = lrand48() & 0xffffff;
return OMPI_SUCCESS;
}
static int udcm_sync_qp_to_rts (mca_btl_base_endpoint_t *lcl_ep)
static inline int udcm_rc_qp_to_rtr (mca_btl_base_endpoint_t *lcl_ep,
int qp_index)
{
struct ibv_qp *qp = lcl_ep->qps[qp_index].qp->lcl_qp;
mca_btl_openib_module_t *btl = lcl_ep->endpoint_btl;
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
struct ibv_qp_attr attr;
enum ibv_mtu mtu;
int rc;
@ -1084,54 +1012,80 @@ static int udcm_sync_qp_to_rts (mca_btl_base_endpoint_t *lcl_ep)
mtu = (btl->device->mtu < lcl_ep->rem_info.rem_mtu) ?
btl->device->mtu : lcl_ep->rem_info.rem_mtu;
BTL_VERBOSE(("transitioning sync QP %p to RTS", (void *) udep->sync_qp));
/* Move the QP into the RTR state */
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
/* Setup attributes */
attr.path_mtu = mtu;
attr.dest_qp_num = udep->rem_sync_qpn;
attr.rq_psn = udep->rem_sync_psn;
attr.max_dest_rd_atomic = 0;
attr.min_rnr_timer = 14;
attr.max_dest_rd_atomic = mca_btl_openib_component.ib_max_rdma_dst_ops;
attr.min_rnr_timer = mca_btl_openib_component.ib_min_rnr_timer;
attr.dest_qp_num = lcl_ep->rem_info.rem_qps[qp_index].rem_qp_num;
attr.rq_psn = lcl_ep->rem_info.rem_qps[qp_index].rem_psn;
attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = lcl_ep->rem_info.rem_lid;
attr.ah_attr.src_path_bits = btl->src_path_bits;
attr.ah_attr.port_num = btl->port_num;
attr.ah_attr.sl = mca_btl_openib_component.ib_service_level;
attr.ah_attr.static_rate = 0;
rc = ibv_modify_qp(udep->sync_qp, &attr,IBV_QP_STATE | IBV_QP_PATH_MTU |
IBV_QP_RQ_PSN | IBV_QP_AV | IBV_QP_DEST_QPN |
IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER);
#if (ENABLE_DYNAMIC_SL)
/* if user enabled dynamic SL, get it from PathRecord */
if (0 != mca_btl_openib_component.ib_path_record_service_level) {
int rc = btl_openib_connect_get_pathrecord_sl(qp->context,
attr.ah_attr.port_num,
btl->lid,
attr.ah_attr.dlid);
if (OMPI_ERROR == rc) {
return OMPI_ERROR;
}
attr.ah_attr.sl = rc;
}
#endif
rc = ibv_modify_qp(qp, &attr, IBV_QP_STATE | IBV_QP_PATH_MTU |
IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER |
IBV_QP_RQ_PSN | IBV_QP_AV | IBV_QP_DEST_QPN);
if (OPAL_UNLIKELY(0 != rc)) {
BTL_ERROR(("error modifing QP to RTR errno says %s",
strerror(errno)));
BTL_ERROR(("error modifing QP to RTR errno says %s", strerror(errno)));
return OMPI_ERROR;
}
BTL_VERBOSE(("transitioning sync QP %p to RTS", (void *)udep->sync_qp));
return OMPI_SUCCESS;
}
static inline int udcm_rc_qp_to_rts (mca_btl_base_endpoint_t *lcl_ep,
int qp_index)
{
struct ibv_qp *qp = lcl_ep->qps[qp_index].qp->lcl_qp;
struct ibv_qp_attr attr;
int rc;
BTL_VERBOSE(("transitioning QP %p to RTS", (void *)qp));
/* Move the QP into the RTS state */
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.sq_psn = udep->sync_psn;
attr.timeout = 14;
attr.retry_cnt = 7;
attr.rnr_retry = 7;
attr.max_rd_atomic = 0;
attr.timeout = mca_btl_openib_component.ib_timeout;
attr.retry_cnt = mca_btl_openib_component.ib_retry_count;
/* On PP QPs we have SW flow control, no need for rnr retries. Setting
* it to zero helps to catch bugs */
attr.rnr_retry = BTL_OPENIB_QP_TYPE_PP(qp_index) ? 0 :
mca_btl_openib_component.ib_rnr_retry;
attr.sq_psn = lcl_ep->qps[qp_index].qp->lcl_psn;
attr.max_rd_atomic = mca_btl_openib_component.ib_max_rdma_dst_ops;
rc = ibv_modify_qp(udep->sync_qp, &attr, IBV_QP_STATE | IBV_QP_SQ_PSN |
IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT |
IBV_QP_MAX_QP_RD_ATOMIC | IBV_QP_RNR_RETRY);
rc = ibv_modify_qp(qp, &attr, IBV_QP_STATE | IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC);
if (OPAL_UNLIKELY(0 != rc)) {
BTL_ERROR(("error modifing QP %p to RTS errno says %s",
(void *) udep->sync_qp, strerror(errno)));
(void *) qp, strerror(errno)));
return OMPI_ERROR;
}
BTL_VERBOSE(("successfully set RTS"));
return OMPI_SUCCESS;
}
@ -1143,8 +1097,10 @@ static int udcm_qp_create_one(udcm_module_t *m, mca_btl_base_endpoint_t* lcl_ep,
int qp, struct ibv_srq *srq, uint32_t max_recv_wr,
uint32_t max_send_wr)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
struct ibv_qp_init_attr init_attr;
size_t req_inline;
int rc;
memset(&init_attr, 0, sizeof(init_attr));
@ -1189,6 +1145,22 @@ static int udcm_qp_create_one(udcm_module_t *m, mca_btl_base_endpoint_t* lcl_ep,
lcl_ep->qps[qp].qp->lcl_psn = lrand48() & 0xffffff;
lcl_ep->qps[qp].credit_frag = NULL;
rc = udcm_rc_qp_to_init (lcl_ep->qps[qp].qp->lcl_qp, m->btl);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
/* If we have already received a request go ahead and move to
RTS. */
if (udep->recv_req) {
rc = udcm_rc_qp_to_rtr (lcl_ep, qp);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
return udcm_rc_qp_to_rts (lcl_ep, qp);
}
return OMPI_SUCCESS;
}
@ -1399,9 +1371,6 @@ static int udcm_send_request (mca_btl_base_endpoint_t *lcl_ep,
/* NTH: TODO -- add XRC support */
}
qps[mca_btl_openib_component.num_qps].psn = udep->sync_psn;
qps[mca_btl_openib_component.num_qps].qp_num = udep->sync_qp->qp_num;
if (0 != (rc = udcm_post_send (lcl_ep, msg->data, m->msg_length, 0))) {
BTL_VERBOSE(("error posting REQ"));
return rc;
@ -1412,6 +1381,29 @@ static int udcm_send_request (mca_btl_base_endpoint_t *lcl_ep,
return 0;
}
static int udcm_send_complete (mca_btl_base_endpoint_t *lcl_ep,
mca_btl_base_endpoint_t *rem_ep)
{
udcm_message_sent_t *msg;
int rc;
if (0 != (rc = udcm_new_message (lcl_ep, rem_ep, UDCM_MESSAGE_COMPLETE,
sizeof (udcm_msg_hdr_t), &msg))) {
return rc;
}
rc = udcm_post_send (lcl_ep, msg->data, sizeof (udcm_msg_hdr_t), 0);
if (0 != rc) {
BTL_VERBOSE(("error posting complete"));
return rc;
}
udcm_set_message_timeout (msg);
return 0;
}
static int udcm_send_reject (mca_btl_base_endpoint_t *lcl_ep,
mca_btl_base_endpoint_t *rem_ep,
int rej_reason)
@ -1548,12 +1540,6 @@ static int udcm_endpoint_init_qps (mca_btl_base_endpoint_t *lcl_ep)
(void *) lcl_ep));
break;
}
if (OMPI_SUCCESS != (rc = udcm_create_sync_qp (lcl_ep))) {
BTL_VERBOSE(("could not create sync qp for endpoint %p",
(void *) lcl_ep));
break;
}
} while (0);
if (OMPI_SUCCESS == rc) {
@ -1570,94 +1556,23 @@ static int udcm_endpoint_init_qps (mca_btl_base_endpoint_t *lcl_ep)
*/
static int udcm_qps_to_rts(mca_btl_openib_endpoint_t *lcl_ep)
{
mca_btl_openib_module_t *btl = lcl_ep->endpoint_btl;
struct ibv_qp_attr attr;
enum ibv_mtu mtu;
int qp_index, rc;
mtu = (btl->device->mtu < lcl_ep->rem_info.rem_mtu) ?
btl->device->mtu : lcl_ep->rem_info.rem_mtu;
BTL_VERBOSE(("Set MTU to IBV value %d (%s bytes)", mtu,
(mtu == IBV_MTU_256) ? "256" :
(mtu == IBV_MTU_512) ? "512" :
(mtu == IBV_MTU_1024) ? "1024" :
(mtu == IBV_MTU_2048) ? "2048" :
(mtu == IBV_MTU_4096) ? "4096" :
"unknown (!)"));
for (qp_index = 0 ; qp_index < mca_btl_openib_component.num_qps ;
++qp_index) {
struct ibv_qp *qp = lcl_ep->qps[qp_index].qp->lcl_qp;
if (lcl_ep->qps[qp_index].qp->lcl_qp->state == IBV_QPS_RTS) {
continue;
}
rc = udcm_rc_qp_to_init (qp, lcl_ep->endpoint_btl);
rc = udcm_rc_qp_to_rtr (lcl_ep, qp_index);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
break;
return rc;
}
/* Move the QP into the RTR state */
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
/* Setup attributes */
attr.path_mtu = mtu;
attr.max_dest_rd_atomic = mca_btl_openib_component.ib_max_rdma_dst_ops;
attr.min_rnr_timer = mca_btl_openib_component.ib_min_rnr_timer;
attr.dest_qp_num = lcl_ep->rem_info.rem_qps[qp_index].rem_qp_num;
attr.rq_psn = lcl_ep->rem_info.rem_qps[qp_index].rem_psn;
attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = lcl_ep->rem_info.rem_lid;
attr.ah_attr.src_path_bits = btl->src_path_bits;
attr.ah_attr.port_num = btl->port_num;
attr.ah_attr.sl = mca_btl_openib_component.ib_service_level;
attr.ah_attr.static_rate = 0;
#if (ENABLE_DYNAMIC_SL)
/* if user enabled dynamic SL, get it from PathRecord */
if (0 != mca_btl_openib_component.ib_path_record_service_level) {
int rc = btl_openib_connect_get_pathrecord_sl(qp->context,
attr.ah_attr.port_num,
btl->lid,
attr.ah_attr.dlid);
if (OMPI_ERROR == rc) {
return OMPI_ERROR;
}
attr.ah_attr.sl = rc;
}
#endif
rc = ibv_modify_qp(qp, &attr, IBV_QP_STATE | IBV_QP_PATH_MTU |
IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER |
IBV_QP_RQ_PSN | IBV_QP_AV | IBV_QP_DEST_QPN);
if (OPAL_UNLIKELY(0 != rc)) {
BTL_ERROR(("error modifing QP to RTR errno says %s", strerror(errno)));
return OMPI_ERROR;
rc = udcm_rc_qp_to_rts (lcl_ep, qp_index);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
BTL_VERBOSE(("transitioning QP %p to RTS", (void *)qp));
/* Move the QP into the RTS state */
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.timeout = mca_btl_openib_component.ib_timeout;
attr.retry_cnt = mca_btl_openib_component.ib_retry_count;
/* On PP QPs we have SW flow control, no need for rnr retries. Setting
* it to zero helps to catch bugs */
attr.rnr_retry = BTL_OPENIB_QP_TYPE_PP(qp_index) ? 0 :
mca_btl_openib_component.ib_rnr_retry;
attr.sq_psn = lcl_ep->qps[qp_index].qp->lcl_psn;
attr.max_rd_atomic = mca_btl_openib_component.ib_max_rdma_dst_ops;
rc = ibv_modify_qp(qp, &attr, IBV_QP_STATE | IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC);
if (OPAL_UNLIKELY(0 != rc)) {
BTL_ERROR(("error modifing QP %p to RTS errno says %s",
(void *) qp, strerror(errno)));
return OMPI_ERROR;
}
BTL_VERBOSE(("successfully set RTS"));
}
/* All done */
@ -1702,12 +1617,6 @@ static int udcm_setup_qps (mca_btl_base_endpoint_t *lcl_ep)
return rc;
}
if (OMPI_SUCCESS != (rc = udcm_sync_qp_to_rts (lcl_ep))) {
BTL_VERBOSE(("failed to move sync QP to RTS"));
return rc;
}
/* Ensure that all the writes back to the endpoint and associated
data structures have completed */
opal_atomic_wmb();
@ -1716,62 +1625,12 @@ static int udcm_setup_qps (mca_btl_base_endpoint_t *lcl_ep)
return OMPI_SUCCESS;
}
static int udcm_sync_connection (mca_btl_openib_endpoint_t *lcl_ep, bool i_send)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
struct ibv_send_wr send_wr, *bad_send_wr;
struct ibv_recv_wr recv_wr, *bad_recv_wr;
int rc;
if (NULL == udep->sync_qp) {
/* already synced? */
return OMPI_SUCCESS;
}
BTL_VERBOSE(("syncing connection for local endpoint %p", (void *) lcl_ep));
opal_mutex_lock(&m->cm_send_lock);
if (i_send) {
send_wr.wr_id = (uint64_t)lcl_ep;
send_wr.next = NULL;
send_wr.sg_list = NULL;
send_wr.num_sge = 0;
send_wr.opcode = IBV_WR_SEND;
send_wr.send_flags = IBV_SEND_SIGNALED | IBV_SEND_SOLICITED;
rc = ibv_post_send (udep->sync_qp, &send_wr, &bad_send_wr);
if (0 != rc) {
opal_output (0, "error posting sync send wr. errno = %d", errno);
return OMPI_ERROR;
}
} else {
recv_wr.wr_id = (uint64_t)lcl_ep;
recv_wr.next = NULL;
recv_wr.sg_list = NULL;
recv_wr.num_sge = 0;
rc = ibv_post_recv (udep->sync_qp, &recv_wr, &bad_recv_wr);
if (0 != rc) {
opal_output (0, "error posting sync recv wr. errno = %d", errno);
return OMPI_ERROR;
}
}
opal_mutex_unlock(&m->cm_send_lock);
return OMPI_SUCCESS;
}
static int udcm_handle_connect(mca_btl_openib_endpoint_t *lcl_ep,
mca_btl_openib_endpoint_t *rem_ep)
{
udcm_reject_reason_t rej_reason = UDCM_REJ_REMOTE_ERROR;
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
int rc = OMPI_ERROR;
bool i_send;
do {
if (NULL == udep) {
@ -1805,23 +1664,6 @@ static int udcm_handle_connect(mca_btl_openib_endpoint_t *lcl_ep,
break;
}
if (false == udep->lcl_init) {
/* Remote initiated. local should receive */
i_send = false;
} else if (false == udep->rem_init) {
/* Local initiated. remote should receive */
i_send = true;
} else {
/* Simultaneous connection attempt. Break tie. */
i_send = i_initiate (m, lcl_ep);
}
/* post receive (passive) or send (active) */
rc = udcm_sync_connection (lcl_ep, i_send);
if (OMPI_SUCCESS != rc) {
break;
}
if (false == udep->sent_req) {
rc = udcm_send_request (lcl_ep, rem_ep);
@ -1830,6 +1672,11 @@ static int udcm_handle_connect(mca_btl_openib_endpoint_t *lcl_ep,
}
}
rc = udcm_send_complete (lcl_ep, rem_ep);
if (OMPI_SUCCESS != rc) {
break;
}
opal_mutex_unlock (&udep->udep_lock);
return OMPI_SUCCESS;
@ -1875,8 +1722,6 @@ static int udcm_finish_connection (mca_btl_openib_endpoint_t *lcl_ep)
data structures have completed */
opal_atomic_wmb();
/* Need to hold the endpoint lock before calling cpc_complete */
OPAL_THREAD_LOCK(&lcl_ep->endpoint_lock);
mca_btl_openib_endpoint_cpc_complete(lcl_ep);
return OMPI_SUCCESS;
@ -1891,7 +1736,6 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m)
struct ibv_wc wc[20];
udcm_endpoint_t *udep;
uint64_t dir;
bool rem_init;
count = ibv_poll_cq (event_cq, 20, wc);
if (count < 0)
@ -1931,9 +1775,6 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m)
wc[i].src_qp,
msg_hdr->data.req.rem_port_num,
wc[i].slid);
rem_init = true;
} else {
rem_init = false;
}
if (NULL == lcl_ep ) {
@ -1972,7 +1813,6 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m)
/* Save remote queue pair information */
udcm_qp_t *rem_qps = (udcm_qp_t *)(msg_hdr + 1);
udep->rem_init = rem_init;
lcl_ep->rem_info.rem_index = msg_hdr->data.req.rem_ep_index;
for (qp_index = 0 ; qp_index < mca_btl_openib_component.num_qps ;
@ -1983,9 +1823,6 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m)
lcl_ep->rem_info.rem_qps[qp_index].rem_qp_num =
rem_qps[qp_index].qp_num;
}
udep->rem_sync_qpn = rem_qps[qp_index].qp_num;
udep->rem_sync_psn = rem_qps[qp_index].psn;
}
opal_mutex_unlock (&udep->udep_lock);
@ -2018,42 +1855,6 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m)
return count;
}
static int udcm_process_sync (udcm_module_t *m)
{
struct ibv_cq *event_cq = m->cm_sync_cq;
mca_btl_openib_endpoint_t *lcl_ep;
struct ibv_wc wc[20];
udcm_endpoint_t *udep;
int rc, i, count;
count = ibv_poll_cq (event_cq, 20, wc);
if (count < 0)
return count;
for (i = 0 ; i < count && !m->cm_exiting ; i++) {
lcl_ep = (mca_btl_openib_endpoint_t *) wc[i].wr_id;
udep = UDCM_ENDPOINT_DATA(lcl_ep);
BTL_VERBOSE(("sync complete for endpoint %p", (void *) lcl_ep));
if (IBV_WC_SUCCESS != wc[i].status) {
opal_output (0, "sync work completion failed with status %d",
wc[i].status);
continue;
}
udcm_finish_connection (lcl_ep);
rc = ibv_destroy_qp (udep->sync_qp);
if (0 != rc) {
opal_output (0, "error. could not destroy sync qp");
}
udep->sync_qp = NULL;
}
return 0;
}
static void *udcm_cq_event_dispatch(int fd, int flags, void *context)
{
udcm_module_t *m = (udcm_module_t *) context;
@ -2072,18 +1873,10 @@ static void *udcm_cq_event_dispatch(int fd, int flags, void *context)
/* acknowlege the event */
ibv_ack_cq_events (event_cq, 1);
if (event_cq == m->cm_recv_cq) {
rc = udcm_process_messages (event_cq, m);
if (rc < 0) {
BTL_VERBOSE(("error processing incomming messages"));
return NULL;
}
} else {
rc = udcm_process_sync (m);
if (rc < 0) {
BTL_VERBOSE(("error processing incomming messages"));
return NULL;
}
rc = udcm_process_messages (event_cq, m);
if (rc < 0) {
BTL_VERBOSE(("error processing incomming messages"));
return NULL;
}
if (false == m->cm_exiting) {
@ -2113,16 +1906,19 @@ static void *udcm_message_callback (void *context)
switch (item->msg_hdr.type) {
case UDCM_MESSAGE_CONNECT:
udcm_handle_connect (lcl_ep, item->msg_hdr.rem_ep);
OPAL_THREAD_UNLOCK(&lcl_ep->endpoint_lock);
break;
case UDCM_MESSAGE_REJECT:
udcm_handle_reject (lcl_ep, &item->msg_hdr);
OPAL_THREAD_UNLOCK(&lcl_ep->endpoint_lock);
break;
case UDCM_MESSAGE_COMPLETE:
udcm_finish_connection (lcl_ep);
break;
default:
BTL_VERBOSE(("unknown message type"));
}
OPAL_THREAD_UNLOCK(&lcl_ep->endpoint_lock);
OBJ_RELEASE (item);
opal_mutex_lock(&m->cm_recv_msg_queue_lock);
@ -2273,7 +2069,5 @@ static int udcm_module_start_timeout_thread (udcm_module_t *m)
return OMPI_ERROR;
}
m->cm_timeout_thread_started = true;
return OMPI_SUCCESS;
}