1
1

usnic: Basket of performance changes including:

- round segment buffer allocation to cache-line
    - split some routines into an inline fast section and a called
      slower section
    - introduce receive fastpath in component_progress that:
        o returns immediately if there is a packet available on priority
          queue and fastpath is enabled
        o disables fastpath for 1 time after use to provide fairness to
          other processing
        o defers receive buffer posting
        o defers bookeeping for receive until next call
          to usnic_component_progress

Authored-by: Reese Faucette <rfaucett@cisco.com>

Should be included in usnic v1.7.3 roll-up CMR (refs trac:3760)

This commit was SVN r29133.

The following Trac tickets were found above:
  Ticket 3760 --> https://svn.open-mpi.org/trac/ompi/ticket/3760
Этот коммит содержится в:
Dave Goodell 2013-09-06 03:18:57 +00:00
родитель e44337742f
Коммит 6dc54d372d
6 изменённых файлов: 582 добавлений и 330 удалений

Просмотреть файл

@ -179,6 +179,29 @@ is copied either into this reassembly buffer or directly into user memory.
When the last chunk of a fragment arrives, a PML callback is made for non-PUTs,
then the fragment info descriptor is released.
======================================
fast receive optimization
In order to optimize latency of small packets, the component progress routine
implements a fast path for receives. If the first completion is a receive on
the priority queue, then it is handled by a routine called
ompi_btl_usnic_recv_fast() which does nothing but validates that the packet
is OK to be received (sequence number OK and not a DUP) and then delivers it
to the PML. This packet is recorded in the channel structure, and all
bookeeping for the packet is deferred until the next time component_progress
is called again.
This fast path cannot be taken every time we pass through component_progress
because there will be other completions that need processing, and the receive
bookeeping for one fast receive must be complete before allowing another fast
receive to occur, as only one recv segment can be saved for deferred
processing at a time. This is handled by maintaining a variable in
ompi_btl_usnic_recv_fast() called fastpath_ok which is set to false every time
the fastpath is taken. A call into the regular progress routine will set this
flag back to true.
======================================
reliability:

Просмотреть файл

@ -834,18 +834,147 @@ static mca_btl_base_module_t** usnic_component_init(int* num_btl_modules,
/*
* Component progress
* The fast-path of an incoming packet available on the priority
* receive queue is handled directly in this routine, everything else
* is deferred to an external call, usnic_component_progress_2()
* This helps keep usnic_component_progress() very small and very responsive
* to a single incoming packet. We make sure not to always return
* immediately after one packet to avoid starvation, "fastpath_ok" is
* used for this.
*/
static int usnic_handle_completion(ompi_btl_usnic_module_t* module,
ompi_btl_usnic_channel_t *channel, struct ibv_wc *cwc);
static int usnic_component_progress_2(void);
static int usnic_component_progress(void)
{
uint32_t i;
int j, count = 0, num_events;
int count;
ompi_btl_usnic_recv_segment_t* rseg;
ompi_btl_usnic_module_t* module;
struct ibv_wc wc;
ompi_btl_usnic_channel_t *channel;
static bool fastpath_ok=true;
/* update our simulated clock */
usnic_ticks += 5000;
count = 0;
if (fastpath_ok) {
for (i = 0; i < mca_btl_usnic_component.num_modules; i++) {
module = mca_btl_usnic_component.usnic_active_modules[i];
channel = &module->mod_channels[USNIC_PRIORITY_CHANNEL];
assert(channel->chan_deferred_recv == NULL);
if (ibv_poll_cq(channel->cq, 1, &wc) == 1) {
if (OPAL_LIKELY(wc.opcode == IBV_WC_RECV &&
wc.status == IBV_WC_SUCCESS)) {
rseg = (ompi_btl_usnic_recv_segment_t*)wc.wr_id;
ompi_btl_usnic_recv_fast(module, rseg, channel);
fastpath_ok = false; /* prevent starvation */
return 1;
} else {
count += usnic_handle_completion(module, channel, &wc);
}
}
}
}
fastpath_ok = true;
return count + usnic_component_progress_2();
}
static int usnic_handle_completion(
ompi_btl_usnic_module_t* module,
ompi_btl_usnic_channel_t *channel,
struct ibv_wc *cwc)
{
ompi_btl_usnic_segment_t* seg;
ompi_btl_usnic_recv_segment_t* rseg;
struct ibv_recv_wr *bad_wr, *repost_recv_head;
seg = (ompi_btl_usnic_segment_t*)(unsigned long)cwc->wr_id;
rseg = (ompi_btl_usnic_recv_segment_t*)seg;
if (OPAL_UNLIKELY(cwc->status != IBV_WC_SUCCESS)) {
/* If it was a receive error, just drop it and keep
going. The sender will eventually re-send it. */
if (IBV_WC_RECV == cwc->opcode) {
if (cwc->byte_len <
(sizeof(ompi_btl_usnic_protocol_header_t)+
sizeof(ompi_btl_usnic_btl_header_t))) {
BTL_ERROR(("%s: RX error polling CQ[%d] with status %d for wr_id %" PRIx64 " vend_err %d, byte_len %d",
ibv_get_device_name(module->device),
channel->chan_index, cwc->status, cwc->wr_id,
cwc->vendor_err, cwc->byte_len));
} else {
/* silently count CRC errors */
++module->num_crc_errors;
}
rseg->rs_recv_desc.next = channel->repost_recv_head;
channel->repost_recv_head = &rseg->rs_recv_desc;
return 0;
} else {
BTL_ERROR(("%s: error polling CQ[%d] with status %d for wr_id %" PRIx64 " opcode %d, vend_err %d",
ibv_get_device_name(module->device), channel->chan_index,
cwc->status, cwc->wr_id, cwc->opcode,
cwc->vendor_err));
/* mark error on this channel */
channel->chan_error = true;
return 0;
}
}
/* Handle work completions */
switch(seg->us_type) {
/**** Send ACK completions ****/
case OMPI_BTL_USNIC_SEG_ACK:
assert(IBV_WC_SEND == cwc->opcode);
ompi_btl_usnic_ack_complete(module,
(ompi_btl_usnic_ack_segment_t *)seg);
break;
/**** Send of frag segment completion ****/
case OMPI_BTL_USNIC_SEG_FRAG:
assert(IBV_WC_SEND == cwc->opcode);
ompi_btl_usnic_frag_send_complete(module,
(ompi_btl_usnic_frag_segment_t*)seg);
break;
/**** Send of chunk segment completion ****/
case OMPI_BTL_USNIC_SEG_CHUNK:
assert(IBV_WC_SEND == cwc->opcode);
ompi_btl_usnic_chunk_send_complete(module,
(ompi_btl_usnic_chunk_segment_t*)seg);
break;
/**** Receive completions ****/
case OMPI_BTL_USNIC_SEG_RECV:
assert(IBV_WC_RECV == cwc->opcode);
ompi_btl_usnic_recv(module, rseg, channel);
break;
default:
BTL_ERROR(("Unhandled completion opcode %d segment type %d",
cwc->opcode, seg->us_type));
break;
}
return 1;
}
static int usnic_component_progress_2(void)
{
uint32_t i;
int j, count = 0, num_events;
struct ibv_recv_wr *bad_wr;
struct ibv_wc* cwc;
ompi_btl_usnic_module_t* module;
static struct ibv_wc wc[OMPI_BTL_USNIC_NUM_WC];
ompi_btl_usnic_channel_t *channel;
int rc;
int c;
/* update our simulated clock */
@ -859,6 +988,12 @@ static int usnic_component_progress(void)
for (c=0; c<USNIC_NUM_CHANNELS; ++c) {
channel = &module->mod_channels[c];
if (channel->chan_deferred_recv != NULL) {
ompi_btl_usnic_recv_frag_bookeeping(module,
channel->chan_deferred_recv, channel);
channel->chan_deferred_recv = NULL;
}
num_events = ibv_poll_cq(channel->cq, OMPI_BTL_USNIC_NUM_WC, wc);
opal_memchecker_base_mem_defined(&num_events, sizeof(num_events));
opal_memchecker_base_mem_defined(wc, sizeof(wc[0]) * num_events);
@ -869,106 +1004,31 @@ static int usnic_component_progress(void)
return OMPI_ERROR;
}
repost_recv_head = NULL;
/* Handle each event */
for (j = 0; j < num_events; j++) {
cwc = &wc[j];
seg = (ompi_btl_usnic_segment_t*)(unsigned long)cwc->wr_id;
rseg = (ompi_btl_usnic_recv_segment_t*)seg;
if (OPAL_UNLIKELY(cwc->status != IBV_WC_SUCCESS)) {
/* If it was a receive error, just drop it and keep
going. The sender will eventually re-send it. */
if (IBV_WC_RECV == cwc->opcode) {
if (cwc->byte_len <
(sizeof(ompi_btl_usnic_protocol_header_t)+
sizeof(ompi_btl_usnic_btl_header_t))) {
BTL_ERROR(("%s: RX error polling CQ[%d] with status %d for wr_id %" PRIx64 " vend_err %d, byte_len %d (%d of %d)",
ibv_get_device_name(module->device),
c, cwc->status, cwc->wr_id,
cwc->vendor_err, cwc->byte_len,
j, num_events));
} else {
/* silently count CRC errors */
++module->num_crc_errors;
}
rseg->rs_recv_desc.next = repost_recv_head;
repost_recv_head = &rseg->rs_recv_desc;
continue;
} else {
BTL_ERROR(("%s: error polling CQ[%d] with status %d for wr_id %" PRIx64 " opcode %d, vend_err %d (%d of %d)",
ibv_get_device_name(module->device), c,
cwc->status, cwc->wr_id, cwc->opcode,
cwc->vendor_err,
j, num_events));
}
return OMPI_ERROR;
}
/* Handle work completions */
switch(seg->us_type) {
/**** Send ACK completions ****/
case OMPI_BTL_USNIC_SEG_ACK:
assert(IBV_WC_SEND == cwc->opcode);
ompi_btl_usnic_ack_complete(module,
(ompi_btl_usnic_ack_segment_t *)seg);
break;
/**** Send of frag segment completion ****/
case OMPI_BTL_USNIC_SEG_FRAG:
assert(IBV_WC_SEND == cwc->opcode);
ompi_btl_usnic_frag_send_complete(module,
(ompi_btl_usnic_frag_segment_t*)seg);
break;
/**** Send of chunk segment completion ****/
case OMPI_BTL_USNIC_SEG_CHUNK:
assert(IBV_WC_SEND == cwc->opcode);
ompi_btl_usnic_chunk_send_complete(module,
(ompi_btl_usnic_chunk_segment_t*)seg);
break;
/**** Receive completions ****/
case OMPI_BTL_USNIC_SEG_RECV:
assert(IBV_WC_RECV == cwc->opcode);
ompi_btl_usnic_recv(module, rseg, &repost_recv_head);
break;
default:
BTL_ERROR(("Unhandled completion opcode %d segment type %d",
cwc->opcode, seg->us_type));
break;
}
count += usnic_handle_completion(module, channel, &wc[j]);
}
/* return error if detected - this may be slightly deferred
* since fastpath avoids the "if" of checking this.
*/
if (channel->chan_error) {
channel->chan_error = false;
return OMPI_ERROR;
}
count += num_events;
/* progress sends */
ompi_btl_usnic_module_progress_sends(module);
/* Re-post all the remaining receive buffers */
if (OPAL_LIKELY(repost_recv_head)) {
#if MSGDEBUG
/* For the debugging case, check the state of each
segment */
{
struct ibv_recv_wr *wr = repost_recv_head;
while (wr) {
seg = (ompi_btl_usnic_recv_segment_t*)(unsigned long)
wr->wr_id;
assert(OMPI_BTL_USNIC_SEG_RECV == seg->us_type);
FRAG_HISTORY(frag, "Re-post: ibv_post_recv");
wr = wr->next;
}
}
#endif
if (OPAL_UNLIKELY(ibv_post_recv(channel->qp,
repost_recv_head, &bad_wr) != 0)) {
if (OPAL_LIKELY(channel->repost_recv_head)) {
rc = ibv_post_recv(channel->qp,
channel->repost_recv_head, &bad_wr);
channel->repost_recv_head = NULL;
if (OPAL_UNLIKELY(rc != 0)) {
BTL_ERROR(("error posting recv: %s\n", strerror(errno)));
return OMPI_ERROR;
}
repost_recv_head = NULL;
}
}
}

Просмотреть файл

@ -1516,11 +1516,13 @@ static int
ompi_btl_usnic_channel_init(
ompi_btl_usnic_module_t *module,
struct ompi_btl_usnic_channel_t *channel,
int index,
int mtu,
int rd_num,
int sd_num)
{
struct ibv_context *ctx;
uint32_t segsize;
ompi_btl_usnic_recv_segment_t *rseg;
ompi_free_list_item_t* item;
struct ibv_recv_wr* bad_wr;
@ -1528,10 +1530,12 @@ ompi_btl_usnic_channel_init(
int rc;
ctx = module->device_context;
channel->chan_mtu = mtu;
channel->chan_rd_num = rd_num;
channel->chan_sd_num = sd_num;
channel->chan_index = index;
channel->chan_deferred_recv = NULL;
channel->chan_error = false;
channel->sd_wqe = sd_num;
channel->fastsend_wqe_thresh = sd_num - 10;
@ -1557,14 +1561,18 @@ ompi_btl_usnic_channel_init(
goto error;
}
/* Initialize pool of receive segments */
/*
* Initialize pool of receive segments. round MTU up to cache line size
* so that each segment is guaranteed to start on a cache line boundary
*/
segsize = (mtu + opal_cache_line_size - 1) & ~(opal_cache_line_size - 1);
OBJ_CONSTRUCT(&channel->recv_segs, ompi_free_list_t);
channel->recv_segs.ctx = module;
rc = ompi_free_list_init_new(&channel->recv_segs,
sizeof(ompi_btl_usnic_recv_segment_t),
opal_cache_line_size,
OBJ_CLASS(ompi_btl_usnic_recv_segment_t),
mtu,
segsize,
opal_cache_line_size,
rd_num,
rd_num,
@ -1649,6 +1657,7 @@ int ompi_btl_usnic_module_init(ompi_btl_usnic_module_t *module)
struct mca_mpool_base_resources_t mpool_resources;
struct ibv_context *ctx = module->device_context;
uint32_t ack_segment_len;
uint32_t segsize;
#if OPAL_HAVE_HWLOC
/* If this process is bound to a single NUMA locality, calculate
@ -1707,12 +1716,14 @@ int ompi_btl_usnic_module_init(ompi_btl_usnic_module_t *module)
/* initialize data and priority channels */
rc = ompi_btl_usnic_channel_init(module,
&module->mod_channels[USNIC_PRIORITY_CHANNEL],
USNIC_PRIORITY_CHANNEL,
module->tiny_mtu, module->prio_rd_num, module->prio_sd_num);
if (rc != OMPI_SUCCESS) {
goto chan_destroy;
}
rc = ompi_btl_usnic_channel_init(module,
&module->mod_channels[USNIC_DATA_CHANNEL],
USNIC_DATA_CHANNEL,
module->if_mtu, module->rd_num, module->sd_num);
if (rc != OMPI_SUCCESS) {
goto chan_destroy;
@ -1745,6 +1756,9 @@ int ompi_btl_usnic_module_init(ompi_btl_usnic_module_t *module)
/* list of endpoints that are ready to send */
OBJ_CONSTRUCT(&module->endpoints_with_sends, opal_list_t);
segsize = (module->if_mtu + opal_cache_line_size - 1) &
~(opal_cache_line_size - 1);
/* Send frags freelists */
module->small_send_frags.ctx = module;
@ -1753,7 +1767,7 @@ int ompi_btl_usnic_module_init(ompi_btl_usnic_module_t *module)
sizeof(ompi_btl_usnic_small_send_frag_t),
opal_cache_line_size,
OBJ_CLASS(ompi_btl_usnic_small_send_frag_t),
module->if_mtu,
segsize,
opal_cache_line_size,
module->sd_num * 4,
-1,
@ -1796,7 +1810,7 @@ int ompi_btl_usnic_module_init(ompi_btl_usnic_module_t *module)
sizeof(ompi_btl_usnic_chunk_segment_t),
opal_cache_line_size,
OBJ_CLASS(ompi_btl_usnic_chunk_segment_t),
module->if_mtu,
segsize,
opal_cache_line_size,
module->sd_num * 4,
-1,
@ -1806,7 +1820,8 @@ int ompi_btl_usnic_module_init(ompi_btl_usnic_module_t *module)
/* ACK segments freelist */
module->ack_segs.ctx = module;
ack_segment_len = sizeof(ompi_btl_usnic_btl_header_t);
ack_segment_len = (sizeof(ompi_btl_usnic_btl_header_t) +
opal_cache_line_size - 1) & ~(opal_cache_line_size - 1);
OBJ_CONSTRUCT(&module->ack_segs, ompi_free_list_t);
rc = ompi_free_list_init_new(&module->ack_segs,
sizeof(ompi_btl_usnic_ack_segment_t),

Просмотреть файл

@ -46,11 +46,14 @@ BEGIN_C_DECLS
* Forward declarations to avoid include loops
*/
struct ompi_btl_usnic_send_segment_t;
struct ompi_btl_usnic_recv_segment_t;
/*
* Abstraction of a set of IB queues
*/
typedef struct ompi_btl_usnic_channel_t {
int chan_index;
struct ibv_cq *cq;
int chan_mtu;
@ -63,12 +66,19 @@ typedef struct ompi_btl_usnic_channel_t {
/* fastsend enabled if sd_wqe >= fastsend_wqe_thresh */
int fastsend_wqe_thresh;
/* pointer to receive segment whose bookeeping has been deferred */
struct ompi_btl_usnic_recv_segment_t *chan_deferred_recv;
/** queue pair */
struct ibv_qp* qp;
struct ibv_recv_wr *repost_recv_head;
/** receive segments & buffers */
ompi_free_list_t recv_segs;
bool chan_error; /* set when error detected on channel */
/* statistics */
uint32_t num_channel_sends;
} ompi_btl_usnic_channel_t;

Просмотреть файл

@ -44,223 +44,13 @@
#include "btl_usnic_util.h"
/*
* Given an incoming segment, lookup the endpoint that sent it
*/
static inline ompi_btl_usnic_endpoint_t *
lookup_sender(ompi_btl_usnic_module_t *module, ompi_btl_usnic_segment_t *seg)
{
int ret;
ompi_btl_usnic_endpoint_t *sender;
/* Use the hashed RTE process name in the BTL header to uniquely
identify the sending process (using the MAC/hardware address
only identifies the sending server -- not the sending RTE
process). */
/* JMS We've experimented with using a handshake before sending
any data so that instead of looking up a hash on the
btl_header->sender, echo back the ptr to the sender's
ompi_proc. There was limited speedup with this scheme; more
investigation is required. */
ret = opal_hash_table_get_value_uint64(&module->senders,
seg->us_btl_header->sender,
(void**) &sender);
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
return sender;
}
/* The sender wasn't in the hash table, so do a slow lookup and
put the result in the hash table */
sender = ompi_btl_usnic_proc_lookup_endpoint(module,
seg->us_btl_header->sender);
if (NULL != sender) {
opal_hash_table_set_value_uint64(&module->senders,
seg->us_btl_header->sender, sender);
return sender;
}
/* Whoa -- not found at all! */
return NULL;
}
static inline int
ompi_btl_usnic_check_rx_seq(
ompi_btl_usnic_endpoint_t *endpoint,
ompi_btl_usnic_recv_segment_t *seg,
uint32_t *window_index)
{
uint32_t i;
ompi_btl_usnic_seq_t seq;
/*
* Handle piggy-backed ACK if present
*/
if (seg->rs_base.us_btl_header->ack_seq != 0) {
#if MSGDEBUG1
opal_output(0, "Handle piggy-packed ACK seq %"UDSEQ"\n", seg->rs_base.us_btl_header->ack_seq);
#endif
ompi_btl_usnic_handle_ack(endpoint,
seg->rs_base.us_btl_header->ack_seq);
}
/* Do we have room in the endpoint's receiver window?
Receiver window:
|-------- WINDOW_SIZE ----------|
+---------------------------------+
| highest_seq_rcvd |
| somewhere in this range |
+^--------------------------------+
|
+-- next_contig_seq_to_recv: the window left edge;
will always be less than highest_seq_rcvd
The good condition is
next_contig_seq_to_recv <= seq < next_contig_seq_to_recv + WINDOW_SIZE
And the bad condition is
seq < next_contig_seq_to_recv
or
seq >= next_contig_seg_to_recv + WINDOW_SIZE
*/
seq = seg->rs_base.us_btl_header->seq;
if (seq < endpoint->endpoint_next_contig_seq_to_recv ||
seq >= endpoint->endpoint_next_contig_seq_to_recv + WINDOW_SIZE) {
#if MSGDEBUG
opal_output(0, "<-- Received FRAG/CHUNK ep %p, seq %" UDSEQ " outside of window (%" UDSEQ " - %" UDSEQ "), %p, module %p -- DROPPED\n",
(void*)endpoint, seg->rs_base.us_btl_header->seq,
endpoint->endpoint_next_contig_seq_to_recv,
(endpoint->endpoint_next_contig_seq_to_recv +
WINDOW_SIZE - 1),
(void*) seg,
(void*) endpoint->endpoint_module);
#endif
/* Stats */
if (seq < endpoint->endpoint_next_contig_seq_to_recv) {
++endpoint->endpoint_module->num_oow_low_recvs;
} else {
++endpoint->endpoint_module->num_oow_high_recvs;
}
goto dup_needs_ack;
}
/* Ok, this segment is within the receiver window. Have we
already received it? It's possible that the sender has
re-sent a segment that we've already received (but not yet
ACKed).
We have saved all un-ACKed segment in an array on the
endpoint that is the same legnth as the receiver's window
(i.e., WINDOW_SIZE). We can use the incoming segment sequence
number to find its position in the array. It's a little
tricky because the left edge of the receiver window keeps
moving, so we use a starting reference point in the array
that is updated when we sent ACKs (and therefore move the
left edge of the receiver's window).
So this segment's index into the endpoint array is:
rel_posn_in_recv_win = seq - next_contig_seq_to_recv
array_posn = (rel_posn_in_recv_win + rfstart) % WINDOW_SIZE
rfstart is then updated when we send ACKs:
rfstart = (rfstart + num_acks_sent) % WINDOW_SIZE
*/
i = seq - endpoint->endpoint_next_contig_seq_to_recv;
i = WINDOW_SIZE_MOD(i + endpoint->endpoint_rfstart);
if (endpoint->endpoint_rcvd_segs[i]) {
#if MSGDEBUG
opal_output(0, "<-- Received FRAG/CHUNK ep %p, seq %" UDSEQ " from %s to %s, seg %p: duplicate -- DROPPED\n",
(void*) endpoint, bseg->us_btl_header->seq, src_mac, dest_mac,
(void*) seg);
#endif
/* highest_seq_rcvd is for debug stats only; it's not used
in any window calculations */
assert(seq <= endpoint->endpoint_highest_seq_rcvd);
/* next_contig_seq_to_recv-1 is the ack number we'll
send */
assert (seq > endpoint->endpoint_next_contig_seq_to_recv - 1);
/* Stats */
++endpoint->endpoint_module->num_dup_recvs;
goto dup_needs_ack;
}
/* Stats: is this the highest sequence number we've received? */
if (seq > endpoint->endpoint_highest_seq_rcvd) {
endpoint->endpoint_highest_seq_rcvd = seq;
}
*window_index = i;
return true;
dup_needs_ack:
if (!endpoint->endpoint_ack_needed) {
ompi_btl_usnic_add_to_endpoints_needing_ack(endpoint);
}
return false;
}
/*
* Packet has been fully processed, update the receive window
* to indicate that it and possible following contiguous sequence
* numbers have been received.
*/
static inline void
ompi_btl_usnic_update_window(
ompi_btl_usnic_endpoint_t *endpoint,
uint32_t window_index)
{
uint32_t i;
/* Enable ACK reply if not enabled */
#if MSGDEBUG1
opal_output(0, "ep: %p, ack_needed = %s\n", (void*)endpoint, endpoint->endpoint_ack_needed?"true":"false");
#endif
if (!endpoint->endpoint_ack_needed) {
ompi_btl_usnic_add_to_endpoints_needing_ack(endpoint);
}
/* give this process a chance to send something before ACKing */
if (0 == endpoint->endpoint_acktime) {
endpoint->endpoint_acktime = get_nsec() + 50000; /* 50 usec */
}
/* Save this incoming segment in the received segmentss array on the
endpoint. */
/* JMS Another optimization: make rcvd_segs be a bitmask (i.e.,
more cache friendly) */
endpoint->endpoint_rcvd_segs[window_index] = true;
/* See if the leftmost segment in the receiver window is
occupied. If so, advance the window. Repeat until we hit
an unoccupied position in the window. */
i = endpoint->endpoint_rfstart;
while (endpoint->endpoint_rcvd_segs[i]) {
endpoint->endpoint_rcvd_segs[i] = false;
endpoint->endpoint_next_contig_seq_to_recv++;
i = WINDOW_SIZE_MOD(i + 1);
#if MSGDEBUG
opal_output(0, "Advance window to %d; next seq to send %" UDSEQ, i,
endpoint->endpoint_next_contig_seq_to_recv);
#endif
}
endpoint->endpoint_rfstart = i;
}
/*
* We have received a segment, take action based on the
* packet type in the BTL header
*/
void ompi_btl_usnic_recv(ompi_btl_usnic_module_t *module,
void ompi_btl_usnic_recv_call(ompi_btl_usnic_module_t *module,
ompi_btl_usnic_recv_segment_t *seg,
struct ibv_recv_wr **repost_recv_head)
ompi_btl_usnic_channel_t *channel)
{
ompi_btl_usnic_segment_t *bseg;
mca_btl_active_message_callback_t* reg;
@ -296,8 +86,7 @@ void ompi_btl_usnic_recv(ompi_btl_usnic_module_t *module,
#endif
/* Find out who sent this segment */
endpoint = lookup_sender(module, bseg);
seg->rs_endpoint = endpoint;
endpoint = seg->rs_endpoint;
if (FAKE_RECV_FRAG_DROP || OPAL_UNLIKELY(NULL == endpoint)) {
/* No idea who this was from, so drop it */
#if MSGDEBUG1
@ -314,10 +103,8 @@ void ompi_btl_usnic_recv(ompi_btl_usnic_module_t *module,
/* Segment is an incoming frag */
if (OMPI_BTL_USNIC_PAYLOAD_TYPE_FRAG == bseg->us_btl_header->payload_type) {
/* Is incoming sequence # ok? */
if (!ompi_btl_usnic_check_rx_seq(endpoint, seg, &window_index)) {
goto repost;
}
/* do the receive bookeeping */
ompi_btl_usnic_recv_frag_bookeeping(module, seg, channel);
#if MSGDEBUG1
opal_output(0, "<-- Received FRAG ep %p, seq %" UDSEQ ", len=%d\n",
@ -342,15 +129,6 @@ void ompi_btl_usnic_recv(ompi_btl_usnic_module_t *module,
#endif
#endif
/*
* update window before callback because callback might
* generate a send, and we'd like to piggy-back ACK if possible
*/
ompi_btl_usnic_update_window(endpoint, window_index);
/* Stats */
++module->num_frag_recvs;
/* If this it not a PUT, Pass this segment up to the PML.
* Be sure to get the payload length from the BTL header because
* the L2 layer may artificially inflate (or otherwise change)
@ -378,7 +156,8 @@ void ompi_btl_usnic_recv(ompi_btl_usnic_module_t *module,
seg->rs_base.us_btl_header->payload_len);
}
goto repost;
/* do not jump to repost, alresdy done by bookeeping */
return;
}
/***********************************************************************/
@ -577,6 +356,6 @@ opal_output(0, "Start PUT to %p\n", chunk_hdr->ch_hdr.put_addr);
++module->num_recv_reposts;
/* Add recv to linked list for reposting */
seg->rs_recv_desc.next = *repost_recv_head;
*repost_recv_head = &seg->rs_recv_desc;
seg->rs_recv_desc.next = channel->repost_recv_head;
channel->repost_recv_head = &seg->rs_recv_desc;
}

Просмотреть файл

@ -14,10 +14,375 @@
#include "btl_usnic.h"
#include "btl_usnic_frag.h"
#include "btl_usnic_proc.h"
void ompi_btl_usnic_recv(ompi_btl_usnic_module_t *module,
void ompi_btl_usnic_recv_call(ompi_btl_usnic_module_t *module,
ompi_btl_usnic_recv_segment_t *rseg,
struct ibv_recv_wr **repost_recv_head);
ompi_btl_usnic_channel_t *channel);
/*
* Given an incoming segment, lookup the endpoint that sent it
*/
static inline ompi_btl_usnic_endpoint_t *
lookup_sender(ompi_btl_usnic_module_t *module, ompi_btl_usnic_segment_t *seg)
{
int ret;
ompi_btl_usnic_endpoint_t *sender;
/* Use the hashed ORTE process name in the BTL header to uniquely
identify the sending process (using the MAC/hardware address
only identifies the sending server -- not the sending ORTE
process). */
/* JMS We've experimented with using a handshake before sending
any data so that instead of looking up a hash on the
btl_header->sender, echo back the ptr to the sender's
ompi_proc. There was limited speedup with this scheme; more
investigation is required. */
ret = opal_hash_table_get_value_uint64(&module->senders,
seg->us_btl_header->sender,
(void**) &sender);
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
return sender;
}
/* The sender wasn't in the hash table, so do a slow lookup and
put the result in the hash table */
sender = ompi_btl_usnic_proc_lookup_endpoint(module,
seg->us_btl_header->sender);
if (NULL != sender) {
opal_hash_table_set_value_uint64(&module->senders,
seg->us_btl_header->sender, sender);
return sender;
}
/* Whoa -- not found at all! */
return NULL;
}
/*
* Packet has been fully processed, update the receive window
* to indicate that it and possible following contiguous sequence
* numbers have been received.
*/
static inline void
ompi_btl_usnic_update_window(
ompi_btl_usnic_endpoint_t *endpoint,
uint32_t window_index)
{
uint32_t i;
/* Enable ACK reply if not enabled */
#if MSGDEBUG1
opal_output(0, "ep: %p, ack_needed = %s\n", (void*)endpoint, endpoint->endpoint_ack_needed?"true":"false");
#endif
if (!endpoint->endpoint_ack_needed) {
ompi_btl_usnic_add_to_endpoints_needing_ack(endpoint);
}
/* give this process a chance to send something before ACKing */
if (0 == endpoint->endpoint_acktime) {
endpoint->endpoint_acktime = get_nsec() + 50000; /* 50 usec */
}
/* Save this incoming segment in the received segmentss array on the
endpoint. */
/* JMS Another optimization: make rcvd_segs be a bitmask (i.e.,
more cache friendly) */
endpoint->endpoint_rcvd_segs[window_index] = true;
/* See if the leftmost segment in the receiver window is
occupied. If so, advance the window. Repeat until we hit
an unoccupied position in the window. */
i = endpoint->endpoint_rfstart;
while (endpoint->endpoint_rcvd_segs[i]) {
endpoint->endpoint_rcvd_segs[i] = false;
endpoint->endpoint_next_contig_seq_to_recv++;
i = WINDOW_SIZE_MOD(i + 1);
#if MSGDEBUG
opal_output(0, "Advance window to %d; next seq to send %" UDSEQ, i,
endpoint->endpoint_next_contig_seq_to_recv);
#endif
}
endpoint->endpoint_rfstart = i;
}
static inline int
ompi_btl_usnic_check_rx_seq(
ompi_btl_usnic_endpoint_t *endpoint,
ompi_btl_usnic_recv_segment_t *seg,
uint32_t *window_index)
{
uint32_t i;
ompi_btl_usnic_seq_t seq;
/*
* Handle piggy-backed ACK if present
*/
if (seg->rs_base.us_btl_header->ack_seq != 0) {
#if MSGDEBUG1
opal_output(0, "Handle piggy-packed ACK seq %"UDSEQ"\n", seg->rs_base.us_btl_header->ack_seq);
#endif
ompi_btl_usnic_handle_ack(endpoint,
seg->rs_base.us_btl_header->ack_seq);
}
/* Do we have room in the endpoint's receiver window?
Receiver window:
|-------- WINDOW_SIZE ----------|
+---------------------------------+
| highest_seq_rcvd |
| somewhere in this range |
+^--------------------------------+
|
+-- next_contig_seq_to_recv: the window left edge;
will always be less than highest_seq_rcvd
The good condition is
next_contig_seq_to_recv <= seq < next_contig_seq_to_recv + WINDOW_SIZE
And the bad condition is
seq < next_contig_seq_to_recv
or
seq >= next_contig_seg_to_recv + WINDOW_SIZE
*/
seq = seg->rs_base.us_btl_header->seq;
if (seq < endpoint->endpoint_next_contig_seq_to_recv ||
seq >= endpoint->endpoint_next_contig_seq_to_recv + WINDOW_SIZE) {
#if MSGDEBUG
opal_output(0, "<-- Received FRAG/CHUNK ep %p, seq %" UDSEQ " outside of window (%" UDSEQ " - %" UDSEQ "), %p, module %p -- DROPPED\n",
(void*)endpoint, seg->rs_base.us_btl_header->seq,
endpoint->endpoint_next_contig_seq_to_recv,
(endpoint->endpoint_next_contig_seq_to_recv +
WINDOW_SIZE - 1),
(void*) seg,
(void*) endpoint->endpoint_module);
#endif
/* Stats */
if (seq < endpoint->endpoint_next_contig_seq_to_recv) {
++endpoint->endpoint_module->num_oow_low_recvs;
} else {
++endpoint->endpoint_module->num_oow_high_recvs;
}
goto dup_needs_ack;
}
/* Ok, this segment is within the receiver window. Have we
already received it? It's possible that the sender has
re-sent a segment that we've already received (but not yet
ACKed).
We have saved all un-ACKed segment in an array on the
endpoint that is the same legnth as the receiver's window
(i.e., WINDOW_SIZE). We can use the incoming segment sequence
number to find its position in the array. It's a little
tricky because the left edge of the receiver window keeps
moving, so we use a starting reference point in the array
that is updated when we sent ACKs (and therefore move the
left edge of the receiver's window).
So this segment's index into the endpoint array is:
rel_posn_in_recv_win = seq - next_contig_seq_to_recv
array_posn = (rel_posn_in_recv_win + rfstart) % WINDOW_SIZE
rfstart is then updated when we send ACKs:
rfstart = (rfstart + num_acks_sent) % WINDOW_SIZE
*/
i = seq - endpoint->endpoint_next_contig_seq_to_recv;
i = WINDOW_SIZE_MOD(i + endpoint->endpoint_rfstart);
if (endpoint->endpoint_rcvd_segs[i]) {
#if MSGDEBUG
opal_output(0, "<-- Received FRAG/CHUNK ep %p, seq %" UDSEQ " from %s to %s, seg %p: duplicate -- DROPPED\n",
(void*) endpoint, bseg->us_btl_header->seq, src_mac, dest_mac,
(void*) seg);
#endif
/* highest_seq_rcvd is for debug stats only; it's not used
in any window calculations */
assert(seq <= endpoint->endpoint_highest_seq_rcvd);
/* next_contig_seq_to_recv-1 is the ack number we'll
send */
assert (seq > endpoint->endpoint_next_contig_seq_to_recv - 1);
/* Stats */
++endpoint->endpoint_module->num_dup_recvs;
goto dup_needs_ack;
}
/* Stats: is this the highest sequence number we've received? */
if (seq > endpoint->endpoint_highest_seq_rcvd) {
endpoint->endpoint_highest_seq_rcvd = seq;
}
*window_index = i;
return true;
dup_needs_ack:
if (!endpoint->endpoint_ack_needed) {
ompi_btl_usnic_add_to_endpoints_needing_ack(endpoint);
}
return false;
}
/*
* We have received a segment, take action based on the
* packet type in the BTL header.
* Try to be fast here - defer as much bookkeeping until later as
* possible.
* See README.txt for a discussion of receive fastpath
*/
static inline void
ompi_btl_usnic_recv_fast(ompi_btl_usnic_module_t *module,
ompi_btl_usnic_recv_segment_t *seg,
ompi_btl_usnic_channel_t *channel)
{
ompi_btl_usnic_segment_t *bseg;
mca_btl_active_message_callback_t* reg;
ompi_btl_usnic_seq_t seq;
ompi_btl_usnic_endpoint_t *endpoint;
uint32_t window_index;
int i;
bseg = &seg->rs_base;
/* Find out who sent this segment */
endpoint = lookup_sender(module, bseg);
seg->rs_endpoint = endpoint;
if (endpoint != NULL && !endpoint->endpoint_exiting &&
(OMPI_BTL_USNIC_PAYLOAD_TYPE_FRAG ==
bseg->us_btl_header->payload_type) &&
seg->rs_base.us_btl_header->put_addr == NULL) {
/* Valgrind help */
opal_memchecker_base_mem_defined(
(void*)(seg->rs_recv_desc.sg_list[0].addr),
seg->rs_recv_desc.sg_list[0].length);
seq = seg->rs_base.us_btl_header->seq;
if (seq < endpoint->endpoint_next_contig_seq_to_recv ||
seq >= endpoint->endpoint_next_contig_seq_to_recv + WINDOW_SIZE) {
goto drop;
}
i = seq - endpoint->endpoint_next_contig_seq_to_recv;
i = WINDOW_SIZE_MOD(i + endpoint->endpoint_rfstart);
if (endpoint->endpoint_rcvd_segs[i]) {
goto drop;
}
/* Pass this segment up to the PML.
* Be sure to get the payload length from the BTL header because
* the L2 layer may artificially inflate (or otherwise change)
* the frame length to meet minimum sizes, add protocol information,
* etc.
*/
reg = mca_btl_base_active_message_trigger +
bseg->us_payload.pml_header->tag;
seg->rs_segment.seg_len = bseg->us_btl_header->payload_len;
reg->cbfunc(&module->super, bseg->us_payload.pml_header->tag,
&seg->rs_desc, reg->cbdata);
drop:
channel->chan_deferred_recv = seg;
} else {
ompi_btl_usnic_recv_call(module, seg, channel);
}
}
/*
*/
static inline int
ompi_btl_usnic_recv_frag_bookeeping(
ompi_btl_usnic_module_t* module,
ompi_btl_usnic_recv_segment_t *seg,
ompi_btl_usnic_channel_t *channel)
{
ompi_btl_usnic_endpoint_t* endpoint;
uint32_t window_index;
endpoint = seg->rs_endpoint;
/* Valgrind help */
opal_memchecker_base_mem_defined(
(void*)(seg->rs_recv_desc.sg_list[0].addr),
seg->rs_recv_desc.sg_list[0].length);
++module->num_total_recvs;
/* Do late processing of incoming sequence # */
if (!ompi_btl_usnic_check_rx_seq(endpoint, seg, &window_index)) {
goto repost;
}
++module->num_frag_recvs;
ompi_btl_usnic_update_window(endpoint, window_index);
repost:
++module->num_recv_reposts;
/* Add recv to linked list for reposting */
seg->rs_recv_desc.next = channel->repost_recv_head;
channel->repost_recv_head = &seg->rs_recv_desc;
}
/*
* We have received a segment, take action based on the
* packet type in the BTL header
*/
static inline void
ompi_btl_usnic_recv(ompi_btl_usnic_module_t *module,
ompi_btl_usnic_recv_segment_t *seg,
ompi_btl_usnic_channel_t *channel)
{
ompi_btl_usnic_segment_t *bseg;
mca_btl_active_message_callback_t* reg;
ompi_btl_usnic_endpoint_t *endpoint;
ompi_btl_usnic_btl_chunk_header_t *chunk_hdr;
uint32_t window_index;
#if MSGDEBUG1
char src_mac[32];
char dest_mac[32];
#endif
bseg = &seg->rs_base;
/* Find out who sent this segment */
endpoint = lookup_sender(module, bseg);
seg->rs_endpoint = endpoint;
if (endpoint != NULL && !endpoint->endpoint_exiting &&
(OMPI_BTL_USNIC_PAYLOAD_TYPE_FRAG ==
bseg->us_btl_header->payload_type) &&
seg->rs_base.us_btl_header->put_addr == NULL) {
/* do the receive bookeeping */
ompi_btl_usnic_recv_frag_bookeeping(module, seg, channel);
/* Pass this segment up to the PML.
* Be sure to get the payload length from the BTL header because
* the L2 layer may artificially inflate (or otherwise change)
* the frame length to meet minimum sizes, add protocol information,
* etc.
*/
reg = mca_btl_base_active_message_trigger +
bseg->us_payload.pml_header->tag;
seg->rs_segment.seg_len = bseg->us_btl_header->payload_len;
reg->cbfunc(&module->super, bseg->us_payload.pml_header->tag,
&seg->rs_desc, reg->cbdata);
} else {
ompi_btl_usnic_recv_call(module, seg, channel);
}
}
#endif /* BTL_USNIC_RECV_H */