btl/openib: fix unconnected datagram connection method (udcm)
The primary issue with udcm is that the immediate data in message acks were often bogus. This caused the sender to keep trying even though a message was received and acked. The fix is to use the source LID and QP to determine which message is being acked. In most cases this should work well since only one message will be in flight to any peer. This commit was SVN r28444.
Этот коммит содержится в:
родитель
527ea1d090
Коммит
422331b4da
@ -17,16 +17,6 @@
|
||||
* The UD connection module creates and listens on a unconnected
|
||||
* datagram (UD) queue pair (QP) for connections requests.
|
||||
*
|
||||
* This connection method uses a two-step process:
|
||||
* Step 1 (CONNECT):
|
||||
* A connect request is sent/received using an unconnected
|
||||
* datagram queue pair.
|
||||
* Step 2 (SYNC):
|
||||
* The connection is then synced by sending a 0-byte request
|
||||
* on a per-peer rc queue pair.
|
||||
* * This step is required to avoid a race condition between
|
||||
* the last UD message and the first RC BTL message.
|
||||
*
|
||||
* There are two ways a connection can be established by UD:
|
||||
* 1. One side starts a connection and the request is received before
|
||||
* the receiving side starts a connection. (One sided)
|
||||
@ -154,9 +144,6 @@ typedef struct udcm_module {
|
||||
opal_list_t cm_recv_msg_queue;
|
||||
bool cm_message_event_active;
|
||||
|
||||
/* ID of next outgoing message */
|
||||
uint32_t next_message_id;
|
||||
|
||||
/* The associated BTL */
|
||||
struct mca_btl_openib_module_t *btl;
|
||||
|
||||
@ -197,7 +184,6 @@ typedef enum {
|
||||
|
||||
typedef struct udcm_msg_hdr {
|
||||
udcm_message_type_t type;
|
||||
uint32_t id;
|
||||
|
||||
/* endpoint local to the sender */
|
||||
mca_btl_base_endpoint_t *rem_ep;
|
||||
@ -293,12 +279,13 @@ static int udcm_send_request (mca_btl_base_endpoint_t *lcl_ep,
|
||||
/*--------------------------------------------------------------------*/
|
||||
|
||||
#define UDCM_MIN_RECV_COUNT 512
|
||||
#define UDCM_MIN_TIMEOUT 1000000
|
||||
#define UDCM_MIN_TIMEOUT 500000
|
||||
|
||||
#define UDCM_SEND_CQ_SIZE 512
|
||||
|
||||
#define UDCM_WR_RECV_ID 0x20000000ll
|
||||
#define UDCM_WR_SEND_ID 0x10000000ll
|
||||
#define UDCM_WR_ACK_ID 0x10000000ll
|
||||
#define UDCM_WR_DIR_MASK 0x30000000ll
|
||||
|
||||
/* Useless 40 bytes of data that proceeds received scatter gather data.
|
||||
@ -689,6 +676,13 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl,
|
||||
|
||||
m->cm_exiting = true;
|
||||
|
||||
/* stop monitoring the channel's fd before destroying the listen qp */
|
||||
ompi_btl_openib_fd_unmonitor(m->cm_channel->fd, udcm_unmonitor, (void *)&barrier);
|
||||
|
||||
while (0 == barrier) {
|
||||
sched_yield();
|
||||
}
|
||||
|
||||
opal_mutex_lock (&m->cm_lock);
|
||||
|
||||
opal_mutex_lock (&m->cm_recv_msg_queue_lock);
|
||||
@ -718,13 +712,6 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl,
|
||||
|
||||
BTL_VERBOSE(("destroying listing thread"));
|
||||
|
||||
/* stop monitoring the channel's fd before destroying the listen qp */
|
||||
ompi_btl_openib_fd_unmonitor(m->cm_channel->fd, udcm_unmonitor, (void *)&barrier);
|
||||
|
||||
while (0 == barrier) {
|
||||
sched_yield();
|
||||
}
|
||||
|
||||
/* destroy the listen queue pair. this will cause ibv_get_cq_event to
|
||||
return. */
|
||||
udcm_module_destroy_listen_qp (m);
|
||||
@ -1368,13 +1355,10 @@ static int udcm_new_message (mca_btl_base_endpoint_t *lcl_ep,
|
||||
message->endpoint = lcl_ep;
|
||||
|
||||
opal_atomic_wmb ();
|
||||
opal_mutex_lock(&m->cm_lock);
|
||||
message->data->id = m->next_message_id++;
|
||||
opal_mutex_unlock(&m->cm_lock);
|
||||
|
||||
*msgp = message;
|
||||
|
||||
BTL_VERBOSE(("created message %d", message->data->id));
|
||||
BTL_VERBOSE(("created message with type %d", type));
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -1471,7 +1455,7 @@ static int udcm_send_reject (mca_btl_base_endpoint_t *lcl_ep,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int udcm_send_ack (mca_btl_base_endpoint_t *lcl_ep, uint32_t id)
|
||||
static int udcm_send_ack (mca_btl_base_endpoint_t *lcl_ep)
|
||||
{
|
||||
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
|
||||
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
|
||||
@ -1481,14 +1465,17 @@ static int udcm_send_ack (mca_btl_base_endpoint_t *lcl_ep, uint32_t id)
|
||||
/* NTH: need to lock here or we run into problems */
|
||||
opal_mutex_lock(&m->cm_send_lock);
|
||||
|
||||
wr.wr_id = UDCM_WR_SEND_ID;
|
||||
BTL_VERBOSE(("sending ack for message on ep %p", (void *) lcl_ep));
|
||||
|
||||
wr.wr_id = UDCM_WR_ACK_ID;
|
||||
wr.next = NULL;
|
||||
wr.num_sge = 0;
|
||||
/* use imm flag to signal the other side that this is an ack */
|
||||
wr.opcode = IBV_WR_SEND_WITH_IMM;
|
||||
wr.send_flags = IBV_SEND_SOLICITED | IBV_SEND_SIGNALED;
|
||||
wr.wr.ud.ah = udep->ah;
|
||||
|
||||
wr.imm_data = id;
|
||||
wr.imm_data = 0; /* dom't care */
|
||||
|
||||
wr.wr.ud.remote_qpn = UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_qp_num;
|
||||
wr.wr.ud.remote_qkey = 0;
|
||||
@ -1505,26 +1492,32 @@ static int udcm_send_ack (mca_btl_base_endpoint_t *lcl_ep, uint32_t id)
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int udcm_handle_ack (udcm_module_t *m, const uint32_t id)
|
||||
static int udcm_handle_ack (udcm_module_t *m, const uint32_t id, const uint16_t slid,
|
||||
const uint32_t rem_qp)
|
||||
{
|
||||
opal_list_item_t *item;
|
||||
udcm_message_sent_t *msg = NULL;
|
||||
udcm_message_sent_t *msg, *next;
|
||||
|
||||
pthread_mutex_lock (&m->cm_timeout_lock);
|
||||
|
||||
BTL_VERBOSE(("got ack for message %d", id));
|
||||
BTL_VERBOSE(("got ack for message 0x%08x from slid 0x%04x qp 0x%08x", id, slid,
|
||||
rem_qp));
|
||||
|
||||
for (item = opal_list_get_first (&m->flying_messages) ;
|
||||
item != opal_list_get_end (&m->flying_messages) ;
|
||||
item = opal_list_get_next (item)) {
|
||||
msg = (udcm_message_sent_t *) item;
|
||||
OPAL_LIST_FOREACH_SAFE(msg, next, &m->flying_messages, udcm_message_sent_t) {
|
||||
const struct mca_btl_base_endpoint_t *lcl_ep = msg->endpoint;
|
||||
|
||||
if (msg->data && id == msg->data->id) {
|
||||
opal_list_remove_item (&m->flying_messages, item);
|
||||
if (NULL == msg->data || NULL == msg->endpoint) {
|
||||
/* shouldn't happen */
|
||||
opal_list_remove_item(&m->flying_messages, &msg->super);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (slid == UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_lid &&
|
||||
rem_qp == UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_qp_num) {
|
||||
BTL_VERBOSE(("found matching message"));
|
||||
opal_list_remove_item (&m->flying_messages, &msg->super);
|
||||
|
||||
break;
|
||||
}
|
||||
msg = NULL;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock (&m->cm_timeout_lock);
|
||||
@ -1765,6 +1758,8 @@ static int udcm_finish_connection (mca_btl_openib_endpoint_t *lcl_ep)
|
||||
|
||||
mca_btl_openib_endpoint_cpc_complete(lcl_ep);
|
||||
|
||||
lcl_ep->endpoint_state = MCA_BTL_IB_CONNECTED;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -1778,13 +1773,20 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m)
|
||||
udcm_endpoint_t *udep;
|
||||
uint64_t dir;
|
||||
|
||||
memset(wc, 0, sizeof(wc));
|
||||
|
||||
count = ibv_poll_cq (event_cq, 20, wc);
|
||||
if (count < 0)
|
||||
return count;
|
||||
|
||||
for (i = 0 ; i < count && !m->cm_exiting ; i++) {
|
||||
for (i = 0 ; i < count ; i++) {
|
||||
dir = wc[i].wr_id & UDCM_WR_DIR_MASK;
|
||||
|
||||
BTL_VERBOSE(("WC: wr_id: 0x%016" PRIu64 ", status: %d, opcode: 0x%x, byte_len: %x, imm_data: 0x%08x, "
|
||||
"qp_num: 0x%08x, src_qp: 0x%08x, wc_flags: 0x%x, slid: 0x%04x\n",
|
||||
wc[i].wr_id, wc[i].status, wc[i].opcode, wc[i].byte_len,
|
||||
wc[i].imm_data, wc[i].qp_num, wc[i].src_qp, wc[i].wc_flags, wc[i].slid));
|
||||
|
||||
if (UDCM_WR_RECV_ID != dir) {
|
||||
opal_output (0, "unknown packet");
|
||||
continue;
|
||||
@ -1803,7 +1805,7 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m)
|
||||
|
||||
if (wc[i].wc_flags & IBV_WC_WITH_IMM) {
|
||||
/* ack! */
|
||||
udcm_handle_ack (m, wc[i].imm_data);
|
||||
udcm_handle_ack (m, wc[i].imm_data, wc[i].slid, wc[i].src_qp);
|
||||
udcm_module_post_one_recv (m, msg_num);
|
||||
|
||||
continue;
|
||||
@ -1877,21 +1879,19 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m)
|
||||
opal_list_append (&m->cm_recv_msg_queue, &item->super);
|
||||
opal_mutex_unlock(&m->cm_recv_msg_queue_lock);
|
||||
|
||||
udcm_send_ack (lcl_ep, msg_hdr->id);
|
||||
udcm_send_ack (lcl_ep);
|
||||
|
||||
/* Repost the receive */
|
||||
udcm_module_post_one_recv (m, msg_num);
|
||||
}
|
||||
|
||||
if (!m->cm_exiting) {
|
||||
opal_mutex_lock (&m->cm_recv_msg_queue_lock);
|
||||
if (opal_list_get_size (&m->cm_recv_msg_queue) &&
|
||||
!m->cm_message_event_active) {
|
||||
m->cm_message_event_active = true;
|
||||
ompi_btl_openib_fd_run_in_main (udcm_message_callback, (void *) m);
|
||||
}
|
||||
opal_mutex_unlock (&m->cm_recv_msg_queue_lock);
|
||||
opal_mutex_lock (&m->cm_recv_msg_queue_lock);
|
||||
if (opal_list_get_size (&m->cm_recv_msg_queue) &&
|
||||
!m->cm_message_event_active) {
|
||||
m->cm_message_event_active = true;
|
||||
ompi_btl_openib_fd_run_in_main (udcm_message_callback, (void *) m);
|
||||
}
|
||||
opal_mutex_unlock (&m->cm_recv_msg_queue_lock);
|
||||
|
||||
return count;
|
||||
}
|
||||
@ -1903,30 +1903,38 @@ static void *udcm_cq_event_dispatch(int fd, int flags, void *context)
|
||||
void *event_context;
|
||||
int rc;
|
||||
|
||||
if (OPAL_UNLIKELY(NULL == m || NULL == m->cm_channel)) {
|
||||
return NULL;
|
||||
}
|
||||
opal_mutex_lock (&m->cm_lock);
|
||||
|
||||
rc = ibv_get_cq_event (m->cm_channel, &event_cq, &event_context);
|
||||
do {
|
||||
if (OPAL_UNLIKELY(NULL == m || NULL == m->cm_channel)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (0 != rc || NULL == event_cq) {
|
||||
return NULL;
|
||||
}
|
||||
rc = ibv_get_cq_event (m->cm_channel, &event_cq, &event_context);
|
||||
|
||||
/* acknowlege the event */
|
||||
ibv_ack_cq_events (event_cq, 1);
|
||||
if (0 != rc || NULL == event_cq) {
|
||||
break;
|
||||
}
|
||||
|
||||
rc = udcm_process_messages (event_cq, m);
|
||||
if (rc < 0) {
|
||||
BTL_VERBOSE(("error processing incomming messages"));
|
||||
return NULL;
|
||||
}
|
||||
/* acknowlege the event */
|
||||
ibv_ack_cq_events (event_cq, 1);
|
||||
|
||||
if (m->cm_exiting) {
|
||||
break;
|
||||
}
|
||||
|
||||
rc = udcm_process_messages (event_cq, m);
|
||||
if (rc < 0) {
|
||||
BTL_VERBOSE(("error processing incomming messages"));
|
||||
break;
|
||||
}
|
||||
|
||||
if (false == m->cm_exiting) {
|
||||
if (ibv_req_notify_cq(event_cq, 0)) {
|
||||
BTL_VERBOSE(("error asking for cq notifications"));
|
||||
}
|
||||
}
|
||||
} while (0);
|
||||
|
||||
opal_mutex_unlock (&m->cm_lock);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
@ -1995,19 +2003,24 @@ static void udcm_send_timeout (udcm_module_t *m, udcm_message_sent_t *msg)
|
||||
mca_btl_base_endpoint_t *lcl_ep = msg->endpoint;
|
||||
|
||||
do {
|
||||
BTL_VERBOSE(("send for message %d timed out (msg = %p)", msg->data->id,
|
||||
(void *) msg));
|
||||
BTL_VERBOSE(("send for message to 0x%04x:0x%08x timed out",
|
||||
UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_lid,
|
||||
UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_qp_num));
|
||||
|
||||
/* This happens from time to time at the end of a run (probably die to a
|
||||
lost ack) */
|
||||
if (NULL == lcl_ep->endpoint_local_cpc_data) {
|
||||
/* This happens from time to time at the end of a run (probably due to a
|
||||
lost ack on the completion message). */
|
||||
if (NULL == lcl_ep->endpoint_local_cpc_data ||
|
||||
MCA_BTL_IB_CONNECTED == lcl_ep->endpoint_state) {
|
||||
OBJ_RELEASE (msg);
|
||||
break;
|
||||
}
|
||||
|
||||
if (msg->tries == udcm_max_retry) {
|
||||
opal_output (0, "too many retries sending message to %d, giving up",
|
||||
UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_qp_num);
|
||||
opal_output (0, "too many retries sending message to 0x%04x:0x%08x, giving up"
|
||||
" exiting: %d",
|
||||
UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_lid,
|
||||
UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_qp_num,
|
||||
m->cm_exiting);
|
||||
|
||||
/* We are running in the timeout thread. Invoke the error in the
|
||||
main thread */
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user