/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ * */ #include "oob_ud.h" #define min(a,b) ((a) < (b) ? (a) : (b)) static int mca_oob_ud_event_send_ack (mca_oob_ud_port_t *port, mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg); static int mca_oob_ud_event_send_nack (mca_oob_ud_port_t *port, mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg); static int mca_oob_ud_event_handle_ack (mca_oob_ud_port_t *port, mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg_hdr); static int mca_oob_ud_event_handle_nack (mca_oob_ud_port_t *port, mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg_hdr); static int mca_oob_ud_event_handle_completion (mca_oob_ud_port_t *port, mca_oob_ud_msg_hdr_t *msg); static int mca_oob_ud_event_handle_data_ok (mca_oob_ud_port_t *port, mca_oob_ud_msg_hdr_t *msg); static int mca_oob_ud_event_handle_req (mca_oob_ud_port_t *port, mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg_hdr); static int mca_oob_ud_event_handle_rep (mca_oob_ud_port_t *port, mca_oob_ud_msg_hdr_t *msg); static int mca_oob_ud_event_handle_end (mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg_hdr); static void *mca_oob_ud_event_dispatch(int fd, int flags, void *context); static void *mca_oob_ud_complete_dispatch(int fd, int flags, void *context); static void mca_oob_ud_stop_events(mca_oob_ud_device_t *device); static inline opal_list_item_t *mca_oob_ud_list_get_first (opal_list_t *list) { return (opal_list_get_size (list) == 0) ? NULL : opal_list_get_first (list); } static inline opal_list_item_t *mca_oob_ud_list_get_next (opal_list_t *list, opal_list_item_t *item) { opal_list_item_t *next = opal_list_get_next (item); return (opal_list_get_end(list) == next) ? NULL : next; } static bool event_started = false; void mca_oob_ud_event_start_monitor (mca_oob_ud_device_t *device) { if (!event_started) { #if !ORTE_ENABLE_PROGRESS_THREADS opal_progress_event_users_increment (); #endif opal_event_set (orte_event_base, &device->event, device->ib_channel->fd, OPAL_EV_READ, mca_oob_ud_event_dispatch, (void *) device); opal_event_add (&device->event, NULL); event_started = true; } } void mca_oob_ud_event_stop_monitor (mca_oob_ud_device_t *device) { if (event_started) { #if !ORTE_ENABLE_PROGRESS_THREADS opal_progress_event_users_decrement (); #endif opal_event_del (&device->event); mca_oob_ud_stop_events (device); event_started = false; } } struct mca_oob_ud_msg_item_t { opal_list_item_t super; mca_oob_ud_msg_hdr_t *hdr; mca_oob_ud_port_t *port; mca_oob_ud_peer_t *peer; int msg_num; }; typedef struct mca_oob_ud_msg_item_t mca_oob_ud_msg_item_t; OBJ_CLASS_DECLARATION(mca_oob_ud_msg_item_t); static void mca_oob_ud_msg_item_construct (mca_oob_ud_msg_item_t *item) { memset ((char *) item + sizeof (item->super), 0, sizeof (*item) - sizeof (item->super)); } static void mca_oob_ud_msg_item_destruct (mca_oob_ud_msg_item_t *item) { if (item->hdr) { /* repost the receive request */ mca_oob_ud_port_post_one_recv (item->port, item->msg_num); } } OBJ_CLASS_INSTANCE(mca_oob_ud_msg_item_t, opal_list_item_t, mca_oob_ud_msg_item_construct, mca_oob_ud_msg_item_destruct); static int mca_oob_ud_msg_item_cmp (opal_list_item_t **a, opal_list_item_t **b) { mca_oob_ud_msg_item_t *aitem = *((mca_oob_ud_msg_item_t **) a); mca_oob_ud_msg_item_t *bitem = *((mca_oob_ud_msg_item_t **) b); if (aitem->peer == bitem->peer) { return (aitem->hdr->msg_id > bitem->hdr->msg_id ? 1 : -1); } else { return (aitem->peer > bitem->peer) ? 1 : -1; } } static int mca_oob_ud_process_messages (struct ibv_cq *event_cq, mca_oob_ud_port_t *port) { mca_oob_ud_msg_item_t *msg_item, *next_item; opal_list_t *processing_msgs = &mca_oob_ud_component.ud_event_processing_msgs; mca_oob_ud_peer_t *peer; mca_oob_ud_msg_hdr_t *msg_hdr; int msg_num, i, count; struct ibv_wc wc[40]; bool peer_nacked; count = ibv_poll_cq (event_cq, 40, wc); if (count < 0) return count; /* acknowlege the events */ ibv_ack_cq_events (event_cq, count); for (i = 0 ; i < count ; ++i) { msg_num = (int)(wc[i].wr_id & (~MCA_OOB_UD_RECV_WR)); msg_hdr = (mca_oob_ud_msg_hdr_t *) (port->msg_buf.ptr + msg_num * port->mtu); VALGRIND_MAKE_MEM_DEFINED(msg_hdr, wc[i].byte_len); if (!(wc[i].wr_id & MCA_OOB_UD_RECV_WR) || IBV_WC_SUCCESS != wc[i].status) { mca_oob_ud_port_post_one_recv (port, msg_num); continue; } peer = mca_oob_ud_get_peer (port, &msg_hdr->ra.name, wc[i].src_qp, msg_hdr->ra.qkey, wc[i].slid, msg_hdr->ra.port_num); if (peer) { if (MCA_OOB_UD_MSG_ACK != msg_hdr->msg_type && MCA_OOB_UD_MSG_NACK != msg_hdr->msg_type && MCA_OOB_UD_MSG_END != msg_hdr->msg_type) { mca_oob_ud_msg_item_t *msg_item = OBJ_NEW(mca_oob_ud_msg_item_t); msg_item->msg_num = msg_num; msg_item->hdr = msg_hdr; msg_item->port = port; msg_item->peer = peer; opal_list_append (processing_msgs, (opal_list_item_t *) msg_item); } else { if (MCA_OOB_UD_MSG_ACK == msg_hdr->msg_type) { (void) mca_oob_ud_event_handle_ack (port, peer, msg_hdr); } else if (MCA_OOB_UD_MSG_NACK == msg_hdr->msg_type) { (void) mca_oob_ud_event_handle_nack (port, peer, msg_hdr); } else { mca_oob_ud_event_handle_end (peer, msg_hdr); } mca_oob_ud_port_post_one_recv (port, msg_num); } } else { OPAL_OUTPUT_VERBOSE((10, mca_oob_base_output, "%s oob:ud:process_message got a null peer for message id %" PRIu64, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg_hdr->msg_id)); mca_oob_ud_port_post_one_recv (port, msg_num); } } /* Sort messages by peer then id */ opal_list_sort (processing_msgs, mca_oob_ud_msg_item_cmp); /* Send ACKs/NACKs and throw away out-of-order messages */ msg_item = (mca_oob_ud_msg_item_t *) mca_oob_ud_list_get_first (processing_msgs); for (peer = NULL, peer_nacked = false ; NULL != msg_item ; msg_item = next_item) { if (peer != msg_item->peer) { peer_nacked = false; } peer = msg_item->peer; next_item = (mca_oob_ud_msg_item_t *) mca_oob_ud_list_get_next (processing_msgs, (opal_list_item_t *)msg_item); if (false == peer_nacked) { if (msg_item->hdr->msg_id > peer->peer_expected_id) { (void) mca_oob_ud_event_send_nack (msg_item->port, peer, msg_item->hdr); peer_nacked = true; } else if (NULL == next_item || (next_item->peer != msg_item->peer)) { (void) mca_oob_ud_event_send_ack (msg_item->port, msg_item->peer, msg_item->hdr); } } if (msg_item->hdr->msg_id != peer->peer_expected_id) { opal_list_remove_item (processing_msgs, (opal_list_item_t *) msg_item); OBJ_RELEASE(msg_item); } else { peer->peer_expected_id++; } } /* Process remaining messages */ while (NULL != (msg_item = (mca_oob_ud_msg_item_t *) opal_list_remove_first (processing_msgs))) { switch (msg_item->hdr->msg_type) { case MCA_OOB_UD_MSG_REQUEST: mca_oob_ud_event_handle_req (port, msg_item->peer, msg_item->hdr); break; case MCA_OOB_UD_MSG_REPLY: mca_oob_ud_event_handle_rep (port, msg_item->hdr); break; case MCA_OOB_UD_MSG_COMPLETE: mca_oob_ud_event_handle_completion (port, msg_item->hdr); break; case MCA_OOB_UD_MSG_DATA_OK: mca_oob_ud_event_handle_data_ok (port, msg_item->hdr); break; case MCA_OOB_UD_MSG_END: mca_oob_ud_event_handle_end (peer, msg_item->hdr); break; default: /* do nothing */ break; } OBJ_RELEASE(msg_item); } return count; } static int mca_oob_ud_event_handle_ack (mca_oob_ud_port_t *port, mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg_hdr) { mca_oob_ud_msg_t *msg; OPAL_OUTPUT_VERBOSE((10, mca_oob_base_output, "%s oob:ud:event_handle_ack got ack for msg id %" PRIu64 " from peer %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg_hdr->msg_id, ORTE_NAME_PRINT(&peer->peer_name))); OPAL_THREAD_LOCK(&peer->peer_lock); mca_oob_ud_peer_stop_timer (peer); while (NULL != (msg = (mca_oob_ud_msg_t *) mca_oob_ud_list_get_first (&peer->peer_flying_messages))) { if (msg->hdr->msg_id > msg_hdr->msg_id) { break; } (void) opal_list_remove_first (&peer->peer_flying_messages); (void) mca_oob_ud_msg_status_update (msg, MCA_OOB_UD_MSG_STATUS_COMPLETE); } mca_oob_ud_peer_start_timer (peer); OPAL_THREAD_UNLOCK(&peer->peer_lock); return ORTE_SUCCESS; } static int mca_oob_ud_event_handle_nack (mca_oob_ud_port_t *port, mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg_hdr) { mca_oob_ud_msg_t *msg; OPAL_OUTPUT_VERBOSE((10, mca_oob_base_output, "%s oob:ud:event_handle_nack got nack for msg id %" PRIu64 " from peer %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg_hdr->msg_id, ORTE_NAME_PRINT(&peer->peer_name))); OPAL_THREAD_LOCK(&peer->peer_lock); mca_oob_ud_peer_stop_timer (peer); while (NULL != (msg = (mca_oob_ud_msg_t *) mca_oob_ud_list_get_first (&peer->peer_flying_messages))) { if (msg->hdr->msg_id >= msg_hdr->msg_id) { break; } (void) opal_list_remove_first (&peer->peer_flying_messages); (void) mca_oob_ud_msg_status_update (msg, MCA_OOB_UD_MSG_STATUS_COMPLETE); } /* repost remaining messages */ mca_oob_ud_peer_post_all (peer); /* reset and start the timer */ mca_oob_ud_peer_reset_timer (peer); mca_oob_ud_peer_start_timer (peer); OPAL_THREAD_UNLOCK(&peer->peer_lock); return ORTE_SUCCESS; } static int mca_oob_ud_event_handle_end (mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg_hdr) { OPAL_OUTPUT_VERBOSE((10, mca_oob_base_output, "%s oob:ud:event_handle_end got end message from peer %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer->peer_name))); mca_oob_ud_peer_lost (peer); return ORTE_SUCCESS; } static int mca_oob_ud_event_send_ack (mca_oob_ud_port_t *port, mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg_hdr) { mca_oob_ud_msg_hdr_t tmp_hdr; int rc = ORTE_SUCCESS; struct ibv_send_wr wr; struct ibv_sge sge; OPAL_OUTPUT_VERBOSE((10, mca_oob_base_output, "%s oob:ud:event_send_ack sending ack for message id %" PRIu64 " peer = %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg_hdr->msg_id, ORTE_NAME_PRINT(&peer->peer_name))); /* reuse registered buffer to send ack (just need to change the type/return address) */ memcpy (&tmp_hdr, msg_hdr, sizeof (tmp_hdr)); msg_hdr->msg_type = MCA_OOB_UD_MSG_ACK; /* set return address */ msg_hdr->ra.qkey = 0; msg_hdr->ra.name = *ORTE_PROC_MY_NAME; msg_hdr->ra.port_num = port->port_num; mca_oob_ud_fill_sge (&sge, msg_hdr, sizeof (*msg_hdr), port->msg_buf.mr->lkey); mca_oob_ud_fill_send_wr (&wr, &sge, 1, peer); rc = mca_oob_ud_qp_post_send (&port->listen_qp, &wr, 1); if (ORTE_SUCCESS != rc) { opal_output (0, "oob:ud:event_send_ack error posting ack!"); return rc; } memcpy (msg_hdr, &tmp_hdr, sizeof (tmp_hdr)); return ORTE_SUCCESS; } static int mca_oob_ud_event_send_nack (mca_oob_ud_port_t *port, mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg_hdr) { mca_oob_ud_msg_hdr_t tmp_hdr; int rc = ORTE_SUCCESS; struct ibv_send_wr wr; struct ibv_sge sge; OPAL_OUTPUT_VERBOSE((10, mca_oob_base_output, "%s oob:ud:event_send_nack sending nack for message id %" PRIu64 " peer = %s. msg_id = %" PRIu64, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), peer->peer_expected_id, ORTE_NAME_PRINT(&peer->peer_name), msg_hdr->msg_id)); /* reuse registered buffer to send the nack (just need to change the type/return address) */ memcpy (&tmp_hdr, msg_hdr, sizeof (tmp_hdr)); msg_hdr->msg_type = MCA_OOB_UD_MSG_NACK; /* set return address */ msg_hdr->ra.qkey = 0; msg_hdr->ra.name = *ORTE_PROC_MY_NAME; msg_hdr->ra.port_num = port->port_num; msg_hdr->msg_id = peer->peer_expected_id; mca_oob_ud_fill_sge (&sge, msg_hdr, sizeof (*msg_hdr), port->msg_buf.mr->lkey); mca_oob_ud_fill_send_wr (&wr, &sge, 1, peer); rc = mca_oob_ud_qp_post_send (&port->listen_qp, &wr, 1); if (ORTE_SUCCESS != rc) { opal_output (0, "oob:ud:event_send_ack error posting nack!"); return rc; } memcpy (msg_hdr, &tmp_hdr, sizeof (tmp_hdr)); return ORTE_SUCCESS; } void mca_oob_ud_event_queue_completed (mca_oob_ud_req_t *req) { struct timeval now = {0, 0}; mca_oob_ud_req_append_to_list (req, &mca_oob_ud_component.ud_event_queued_reqs); if (!opal_event_evtimer_pending (&mca_oob_ud_component.ud_complete_event, &now)) { opal_event_evtimer_set (orte_event_base, &mca_oob_ud_component.ud_complete_event, mca_oob_ud_complete_dispatch, NULL); opal_event_add (&mca_oob_ud_component.ud_complete_event, &now); } } static int mca_oob_ud_event_handle_completion (mca_oob_ud_port_t *port, mca_oob_ud_msg_hdr_t *msg_hdr) { mca_oob_ud_req_t *recv_req = msg_hdr->msg_lcl_ctx; bool brc; if (NULL == recv_req) { return ORTE_ERROR; } OPAL_OUTPUT_VERBOSE((5, mca_oob_base_output, "%s oob:ud:event_handle_completion got " "completion message for request %p", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (void *) recv_req)); brc = mca_oob_ud_req_is_in_list (recv_req, &mca_oob_ud_component.ud_active_recvs); if (false == brc) { /* duplicate completion message? */ OPAL_OUTPUT_VERBOSE((0, mca_oob_base_output, "%s oob:ud:event_handle_completion apparent duplicate completion. " "request %p. req list = %p", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (void *) recv_req, (void *) recv_req->req_list)); return ORTE_SUCCESS; } recv_req->state = MCA_OOB_UD_REQ_COMPLETE; mca_oob_ud_event_queue_completed (recv_req); return ORTE_SUCCESS; } static int mca_oob_ud_event_handle_data_ok (mca_oob_ud_port_t *port, mca_oob_ud_msg_hdr_t *msg_hdr) { mca_oob_ud_req_t *send_req = msg_hdr->msg_lcl_ctx; bool brc; if (NULL == send_req) { /* ack! */ return ORTE_ERROR; } OPAL_OUTPUT_VERBOSE((10, mca_oob_base_output, "%s oob:ud:event_handle_data_ok got data ok message for " "request %p", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (void *) send_req)); brc = mca_oob_ud_req_is_in_list (send_req, &mca_oob_ud_component.ud_active_sends); if (false == brc) { OPAL_OUTPUT_VERBOSE((0, mca_oob_base_output, "%s oob:ud:event_handle_data_ok apparent duplicate data ok. " "request %p. req list = %p", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (void *) send_req, (void *) send_req->req_list)); /* duplicate data ok message? */ return ORTE_SUCCESS; } send_req->state = MCA_OOB_UD_REQ_COMPLETE; mca_oob_ud_event_queue_completed (send_req); return ORTE_SUCCESS; } static int mca_oob_ud_event_handle_req (mca_oob_ud_port_t *port, mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg_hdr) { mca_oob_ud_req_t *recv_req; int rc; rc = mca_oob_ud_recv_match_send (port, peer, msg_hdr, &recv_req); if (ORTE_SUCCESS == rc) { mca_oob_ud_event_queue_completed (recv_req); } return rc; } static int mca_oob_ud_event_handle_rep (mca_oob_ud_port_t *port, mca_oob_ud_msg_hdr_t *msg_hdr) { mca_oob_ud_req_t *send_req = (mca_oob_ud_req_t *) msg_hdr->msg_lcl_ctx; bool brc; OPAL_OUTPUT_VERBOSE((10, mca_oob_base_output, "%s oob:ud:event_handle_rep got reply for request %p", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (void *) send_req)); brc = mca_oob_ud_req_is_in_list (send_req, &mca_oob_ud_component.ud_active_sends); if (false == brc) { OPAL_OUTPUT_VERBOSE((0, mca_oob_base_output, "%s oob:ud:event_handle_rep no send matches reply", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); /* duplicate reply message? */ return ORTE_SUCCESS; } send_req->req_mtu = min(send_req->req_mtu, msg_hdr->msg_data.rep.mtu); send_req->req_rem_data_len = msg_hdr->msg_data.rep.data_len; send_req->req_rem_ctx = msg_hdr->msg_rem_ctx; send_req->req_rem_qpn = msg_hdr->msg_data.rep.qpn; mca_oob_ud_event_queue_completed (send_req); return ORTE_SUCCESS; } static void *mca_oob_ud_event_dispatch(int fd, int flags, void *context) { int rc; mca_oob_ud_device_t *device = (mca_oob_ud_device_t *) context; mca_oob_ud_port_t *port = NULL; struct ibv_cq *event_cq = NULL; void *event_context = NULL; do { rc = ibv_get_cq_event (device->ib_channel, &event_cq, &event_context); } while (rc && errno == EINTR); if (NULL == event_cq) { /* re-arm the event */ opal_event_add (&port->device->event, NULL); return NULL; } port = (mca_oob_ud_port_t *) event_context; rc = mca_oob_ud_process_messages (event_cq, port); if (rc < 0) { opal_output (0, "%s oob:ud:event_dispatch error processing messages", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); return NULL; } if (ibv_req_notify_cq(event_cq, 0)) { opal_output (0, "%s oob:ud:event_dispatch error asking for cq notifications", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); } /* re-arm the event */ opal_event_add (&port->device->event, NULL); return NULL; } static void *mca_oob_ud_complete_dispatch(int fd, int flags, void *context) { mca_oob_ud_req_t *req; OPAL_THREAD_LOCK(&mca_oob_ud_component.ud_match_lock); while (NULL != (req = (mca_oob_ud_req_t *) opal_list_remove_first (&mca_oob_ud_component.ud_event_queued_reqs))) { OPAL_THREAD_UNLOCK(&mca_oob_ud_component.ud_match_lock); OPAL_OUTPUT_VERBOSE((10, mca_oob_base_output, "%s oob:ud:event_process processing request %p", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (void *) req)); req->req_list = NULL; switch (req->type) { case MCA_OOB_UD_REQ_RECV: case MCA_OOB_UD_REQ_UNEX: if (req->state == MCA_OOB_UD_REQ_COMPLETE) { mca_oob_ud_recv_complete (req); } else { mca_oob_ud_req_append_to_list (req, &mca_oob_ud_component.ud_active_recvs); mca_oob_ud_recv_try (req); } break; case MCA_OOB_UD_REQ_SEND: if (req->state == MCA_OOB_UD_REQ_COMPLETE) { mca_oob_ud_send_complete (req, ORTE_SUCCESS); } else { mca_oob_ud_req_append_to_list (req, &mca_oob_ud_component.ud_active_sends); mca_oob_ud_send_try (req); } break; default: break; } OPAL_THREAD_LOCK(&mca_oob_ud_component.ud_match_lock); } return NULL; } static void mca_oob_ud_stop_events (mca_oob_ud_device_t *device) { opal_list_item_t *item; OPAL_OUTPUT_VERBOSE((5, mca_oob_base_output, "%s oob:ud:stop_events stopping event processing", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); for (item = opal_list_get_first (&device->ports) ; item != opal_list_get_end (&device->ports) ; item = opal_list_get_next (item)) { mca_oob_ud_port_t *port = (mca_oob_ud_port_t *) item; /* flush all receives */ mca_oob_ud_qp_to_reset (&port->listen_qp); } OPAL_OUTPUT_VERBOSE((5, mca_oob_base_output, "%s oob:ud:stop_events events stopped", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); }