Commit ce3cec27 authored by Alexei Starovoitov's avatar Alexei Starovoitov

Merge branch 'xsk-cleanup'

Magnus Karlsson says:

====================
This patch set cleans up the ring access functions of AF_XDP in hope
that it will now be easier to understand and maintain. I used to get a
headache every time I looked at this code in order to really understand it,
but now I do think it is a lot less painful.

The code has been simplified a lot and as a bonus we get better
performance in nearly all cases. On my new 2.1 GHz Cascade Lake
machine with a standard default config plus AF_XDP support and
CONFIG_PREEMPT on I get the following results in percent performance
increases with this patch set compared to without it:

Zero-copy (-N):
          rxdrop        txpush        l2fwd
1 core:    -2%            0%            3%
2 cores:    4%            0%            3%

Zero-copy with poll() (-N -p):
          rxdrop        txpush        l2fwd
1 core:     3%            0%            1%
2 cores:   21%            0%            9%

Skb mode (-S):
Shows a 0% to 5% performance improvement over the same benchmarks as
above.

Here 1 core means that we are running the driver processing and the
application on the same core, while 2 cores means that they execute on
separate cores. The applications are from the xdpsock sample app.

On my older 2.0 Ghz Broadwell machine that I used for the v1, I get
the following results:

Zero-copy (-N):
          rxdrop        txpush        l2fwd
1 core:     4%            5%            4%
2 cores:    1%            0%            2%

Zero-copy with poll() (-N -p):
          rxdrop        txpush        l2fwd
1 core:     1%            3%            3%
2 cores:   22%            0%            5%

Skb mode (-S):
Shows a 0% to 1% performance improvement over the same benchmarks as
above.

When a results says 21 or 22% better, as in the case of poll mode with
2 cores and rxdrop, my first reaction is that it must be a
bug. Everything else shows between 0% and 5% performance
improvement. What is giving rise to 22%? A quick bisect indicates that
it is patches 2, 3, 4, 5, and 6 that are giving rise to most of this
improvement. So not one patch in particular, but something around 4%
improvement from each one of them. Note that exactly this benchmark
has previously had an extraordinary slow down compared to when running
without poll syscalls. For all the other poll tests above, the
slowdown has always been around 4% for using poll syscalls. But with
the bad performing test in question, it was above 25%. Interestingly,
after this clean up, the slow down is 4%, just like all the other poll
tests. Please take an extra peek at this so I have not messed up
something.

The 0% for several txpush results are due to the test bottlenecking on
a non-CPU HW resource. If I eliminated that bottleneck on my system, I
would expect to see an increase there too.

Changes v1 -> v2:
* Corrected textual errors in the commit logs (Sergei and Martin)
* Fixed the functions that detect empty and full rings so that they
  now operate on the global ring state (Maxim)

This patch has been applied against commit a352a824 ("Merge branch 'libbpf-extern-followups'")

Structure of the patch set:

Patch 1: Eliminate the lazy update threshold used when preallocating
         entries in the completion ring
Patch 2: Simplify the detection of empty and full rings
Patch 3: Consolidate the two local producer pointers into one
Patch 4: Standardize the naming of the producer ring access functions
Patch 5: Eliminate the Rx batch size used for the fill ring
Patch 6: Simplify the functions xskq_nb_avail and xskq_nb_free
Patch 7: Simplify and standardize the naming of the consumer ring
         access functions
Patch 8: Change the names of the validation functions to improve
         readability and also the return value of these functions
Patch 9: Change the name of xsk_umem_discard_addr() to
         xsk_umem_release_addr() to better reflect the new
         names. Requires a name change in the drivers that support AF_XDP
         zero-copy.
Patch 10: Remove unnecessary READ_ONCE of data in the ring
Patch 11: Add overall function naming comment and reorder the functions
          for easier reference
Patch 12: Use the struct_size helper function when allocating rings
====================
Reviewed-by: default avatarBjörn Töpel <bjorn.topel@intel.com>
Tested-by: default avatarBjörn Töpel <bjorn.topel@intel.com>
Acked-by: default avatarBjörn Töpel <bjorn.topel@intel.com>
Signed-off-by: default avatarAlexei Starovoitov <ast@kernel.org>
parents 99cacdc6 1d9cb1f3
......@@ -269,7 +269,7 @@ static bool i40e_alloc_buffer_zc(struct i40e_ring *rx_ring,
bi->handle = xsk_umem_adjust_offset(umem, handle, umem->headroom);
xsk_umem_discard_addr(umem);
xsk_umem_release_addr(umem);
return true;
}
......@@ -306,7 +306,7 @@ static bool i40e_alloc_buffer_slow_zc(struct i40e_ring *rx_ring,
bi->handle = xsk_umem_adjust_offset(umem, handle, umem->headroom);
xsk_umem_discard_addr_rq(umem);
xsk_umem_release_addr_rq(umem);
return true;
}
......
......@@ -555,7 +555,7 @@ ice_alloc_buf_fast_zc(struct ice_ring *rx_ring, struct ice_rx_buf *rx_buf)
rx_buf->handle = handle + umem->headroom;
xsk_umem_discard_addr(umem);
xsk_umem_release_addr(umem);
return true;
}
......@@ -591,7 +591,7 @@ ice_alloc_buf_slow_zc(struct ice_ring *rx_ring, struct ice_rx_buf *rx_buf)
rx_buf->handle = handle + umem->headroom;
xsk_umem_discard_addr_rq(umem);
xsk_umem_release_addr_rq(umem);
return true;
}
......
......@@ -277,7 +277,7 @@ static bool ixgbe_alloc_buffer_zc(struct ixgbe_ring *rx_ring,
bi->handle = xsk_umem_adjust_offset(umem, handle, umem->headroom);
xsk_umem_discard_addr(umem);
xsk_umem_release_addr(umem);
return true;
}
......@@ -304,7 +304,7 @@ static bool ixgbe_alloc_buffer_slow_zc(struct ixgbe_ring *rx_ring,
bi->handle = xsk_umem_adjust_offset(umem, handle, umem->headroom);
xsk_umem_discard_addr_rq(umem);
xsk_umem_release_addr_rq(umem);
return true;
}
......
......@@ -35,7 +35,7 @@ int mlx5e_xsk_page_alloc_umem(struct mlx5e_rq *rq,
*/
dma_info->addr = xdp_umem_get_dma(umem, handle);
xsk_umem_discard_addr_rq(umem);
xsk_umem_release_addr_rq(umem);
dma_sync_single_for_device(rq->pdev, dma_info->addr, PAGE_SIZE,
DMA_BIDIRECTIONAL);
......
......@@ -118,8 +118,8 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp);
bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs);
/* Used from netdev driver */
bool xsk_umem_has_addrs(struct xdp_umem *umem, u32 cnt);
u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr);
void xsk_umem_discard_addr(struct xdp_umem *umem);
bool xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr);
void xsk_umem_release_addr(struct xdp_umem *umem);
void xsk_umem_complete_tx(struct xdp_umem *umem, u32 nb_entries);
bool xsk_umem_consume_tx(struct xdp_umem *umem, struct xdp_desc *desc);
void xsk_umem_consume_tx_done(struct xdp_umem *umem);
......@@ -197,7 +197,7 @@ static inline bool xsk_umem_has_addrs_rq(struct xdp_umem *umem, u32 cnt)
return xsk_umem_has_addrs(umem, cnt - rq->length);
}
static inline u64 *xsk_umem_peek_addr_rq(struct xdp_umem *umem, u64 *addr)
static inline bool xsk_umem_peek_addr_rq(struct xdp_umem *umem, u64 *addr)
{
struct xdp_umem_fq_reuse *rq = umem->fq_reuse;
......@@ -208,12 +208,12 @@ static inline u64 *xsk_umem_peek_addr_rq(struct xdp_umem *umem, u64 *addr)
return addr;
}
static inline void xsk_umem_discard_addr_rq(struct xdp_umem *umem)
static inline void xsk_umem_release_addr_rq(struct xdp_umem *umem)
{
struct xdp_umem_fq_reuse *rq = umem->fq_reuse;
if (!rq->length)
xsk_umem_discard_addr(umem);
xsk_umem_release_addr(umem);
else
rq->length--;
}
......@@ -258,7 +258,7 @@ static inline u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr)
return NULL;
}
static inline void xsk_umem_discard_addr(struct xdp_umem *umem)
static inline void xsk_umem_release_addr(struct xdp_umem *umem)
{
}
......@@ -332,7 +332,7 @@ static inline u64 *xsk_umem_peek_addr_rq(struct xdp_umem *umem, u64 *addr)
return NULL;
}
static inline void xsk_umem_discard_addr_rq(struct xdp_umem *umem)
static inline void xsk_umem_release_addr_rq(struct xdp_umem *umem)
{
}
......
......@@ -41,21 +41,21 @@ bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs)
bool xsk_umem_has_addrs(struct xdp_umem *umem, u32 cnt)
{
return xskq_has_addrs(umem->fq, cnt);
return xskq_cons_has_entries(umem->fq, cnt);
}
EXPORT_SYMBOL(xsk_umem_has_addrs);
u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr)
bool xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr)
{
return xskq_peek_addr(umem->fq, addr, umem);
return xskq_cons_peek_addr(umem->fq, addr, umem);
}
EXPORT_SYMBOL(xsk_umem_peek_addr);
void xsk_umem_discard_addr(struct xdp_umem *umem)
void xsk_umem_release_addr(struct xdp_umem *umem)
{
xskq_discard_addr(umem->fq);
xskq_cons_release(umem->fq);
}
EXPORT_SYMBOL(xsk_umem_discard_addr);
EXPORT_SYMBOL(xsk_umem_release_addr);
void xsk_set_rx_need_wakeup(struct xdp_umem *umem)
{
......@@ -126,7 +126,7 @@ static void __xsk_rcv_memcpy(struct xdp_umem *umem, u64 addr, void *from_buf,
void *to_buf = xdp_umem_get_data(umem, addr);
addr = xsk_umem_add_offset_to_addr(addr);
if (xskq_crosses_non_contig_pg(umem, addr, len + metalen)) {
if (xskq_cons_crosses_non_contig_pg(umem, addr, len + metalen)) {
void *next_pg_addr = umem->pages[(addr >> PAGE_SHIFT) + 1].addr;
u64 page_start = addr & ~(PAGE_SIZE - 1);
u64 first_len = PAGE_SIZE - (addr - page_start);
......@@ -148,7 +148,7 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
u32 metalen;
int err;
if (!xskq_peek_addr(xs->umem->fq, &addr, xs->umem) ||
if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) ||
len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) {
xs->rx_dropped++;
return -ENOSPC;
......@@ -167,9 +167,9 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
offset += metalen;
addr = xsk_umem_adjust_offset(xs->umem, addr, offset);
err = xskq_produce_batch_desc(xs->rx, addr, len);
err = xskq_prod_reserve_desc(xs->rx, addr, len);
if (!err) {
xskq_discard_addr(xs->umem->fq);
xskq_cons_release(xs->umem->fq);
xdp_return_buff(xdp);
return 0;
}
......@@ -180,7 +180,7 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
{
int err = xskq_produce_batch_desc(xs->rx, (u64)xdp->handle, len);
int err = xskq_prod_reserve_desc(xs->rx, xdp->handle, len);
if (err)
xs->rx_dropped++;
......@@ -216,7 +216,7 @@ static int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
static void xsk_flush(struct xdp_sock *xs)
{
xskq_produce_flush_desc(xs->rx);
xskq_prod_submit(xs->rx);
xs->sk.sk_data_ready(&xs->sk);
}
......@@ -236,7 +236,7 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
goto out_unlock;
}
if (!xskq_peek_addr(xs->umem->fq, &addr, xs->umem) ||
if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) ||
len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) {
err = -ENOSPC;
goto out_drop;
......@@ -247,12 +247,12 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
memcpy(buffer, xdp->data_meta, len + metalen);
addr = xsk_umem_adjust_offset(xs->umem, addr, metalen);
err = xskq_produce_batch_desc(xs->rx, addr, len);
err = xskq_prod_reserve_desc(xs->rx, addr, len);
if (err)
goto out_drop;
xskq_discard_addr(xs->umem->fq);
xskq_produce_flush_desc(xs->rx);
xskq_cons_release(xs->umem->fq);
xskq_prod_submit(xs->rx);
spin_unlock_bh(&xs->rx_lock);
......@@ -294,7 +294,7 @@ void __xsk_map_flush(void)
void xsk_umem_complete_tx(struct xdp_umem *umem, u32 nb_entries)
{
xskq_produce_flush_addr_n(umem->cq, nb_entries);
xskq_prod_submit_n(umem->cq, nb_entries);
}
EXPORT_SYMBOL(xsk_umem_complete_tx);
......@@ -316,13 +316,18 @@ bool xsk_umem_consume_tx(struct xdp_umem *umem, struct xdp_desc *desc)
rcu_read_lock();
list_for_each_entry_rcu(xs, &umem->xsk_list, list) {
if (!xskq_peek_desc(xs->tx, desc, umem))
if (!xskq_cons_peek_desc(xs->tx, desc, umem))
continue;
if (xskq_produce_addr_lazy(umem->cq, desc->addr))
/* This is the backpreassure mechanism for the Tx path.
* Reserve space in the completion queue and only proceed
* if there is space in it. This avoids having to implement
* any buffering in the Tx path.
*/
if (xskq_prod_reserve_addr(umem->cq, desc->addr))
goto out;
xskq_discard_desc(xs->tx);
xskq_cons_release(xs->tx);
rcu_read_unlock();
return true;
}
......@@ -348,7 +353,7 @@ static void xsk_destruct_skb(struct sk_buff *skb)
unsigned long flags;
spin_lock_irqsave(&xs->tx_completion_lock, flags);
WARN_ON_ONCE(xskq_produce_addr(xs->umem->cq, addr));
xskq_prod_submit_addr(xs->umem->cq, addr);
spin_unlock_irqrestore(&xs->tx_completion_lock, flags);
sock_wfree(skb);
......@@ -368,7 +373,7 @@ static int xsk_generic_xmit(struct sock *sk)
if (xs->queue_id >= xs->dev->real_num_tx_queues)
goto out;
while (xskq_peek_desc(xs->tx, &desc, xs->umem)) {
while (xskq_cons_peek_desc(xs->tx, &desc, xs->umem)) {
char *buffer;
u64 addr;
u32 len;
......@@ -389,7 +394,12 @@ static int xsk_generic_xmit(struct sock *sk)
addr = desc.addr;
buffer = xdp_umem_get_data(xs->umem, addr);
err = skb_store_bits(skb, 0, buffer, len);
if (unlikely(err) || xskq_reserve_addr(xs->umem->cq)) {
/* This is the backpreassure mechanism for the Tx path.
* Reserve space in the completion queue and only proceed
* if there is space in it. This avoids having to implement
* any buffering in the Tx path.
*/
if (unlikely(err) || xskq_prod_reserve(xs->umem->cq)) {
kfree_skb(skb);
goto out;
}
......@@ -401,7 +411,7 @@ static int xsk_generic_xmit(struct sock *sk)
skb->destructor = xsk_destruct_skb;
err = dev_direct_xmit(skb, xs->queue_id);
xskq_discard_desc(xs->tx);
xskq_cons_release(xs->tx);
/* Ignore NET_XMIT_CN as packet might have been sent */
if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) {
/* SKB completed but not sent */
......@@ -470,9 +480,9 @@ static __poll_t xsk_poll(struct file *file, struct socket *sock,
__xsk_sendmsg(sk);
}
if (xs->rx && !xskq_empty_desc(xs->rx))
if (xs->rx && !xskq_prod_is_empty(xs->rx))
mask |= EPOLLIN | EPOLLRDNORM;
if (xs->tx && !xskq_full_desc(xs->tx))
if (xs->tx && !xskq_cons_is_full(xs->tx))
mask |= EPOLLOUT | EPOLLWRNORM;
return mask;
......
......@@ -18,14 +18,14 @@ void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask)
q->chunk_mask = chunk_mask;
}
static u32 xskq_umem_get_ring_size(struct xsk_queue *q)
static size_t xskq_get_ring_size(struct xsk_queue *q, bool umem_queue)
{
return sizeof(struct xdp_umem_ring) + q->nentries * sizeof(u64);
}
struct xdp_umem_ring *umem_ring;
struct xdp_rxtx_ring *rxtx_ring;
static u32 xskq_rxtx_get_ring_size(struct xsk_queue *q)
{
return sizeof(struct xdp_ring) + q->nentries * sizeof(struct xdp_desc);
if (umem_queue)
return struct_size(umem_ring, desc, q->nentries);
return struct_size(rxtx_ring, desc, q->nentries);
}
struct xsk_queue *xskq_create(u32 nentries, bool umem_queue)
......@@ -43,8 +43,7 @@ struct xsk_queue *xskq_create(u32 nentries, bool umem_queue)
gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN |
__GFP_COMP | __GFP_NORETRY;
size = umem_queue ? xskq_umem_get_ring_size(q) :
xskq_rxtx_get_ring_size(q);
size = xskq_get_ring_size(q, umem_queue);
q->ring = (struct xdp_ring *)__get_free_pages(gfp_flags,
get_order(size));
......
......@@ -10,9 +10,6 @@
#include <linux/if_xdp.h>
#include <net/xdp_sock.h>
#define RX_BATCH_SIZE 16
#define LAZY_UPDATE_THRESHOLD 128
struct xdp_ring {
u32 producer ____cacheline_aligned_in_smp;
u32 consumer ____cacheline_aligned_in_smp;
......@@ -36,10 +33,8 @@ struct xsk_queue {
u64 size;
u32 ring_mask;
u32 nentries;
u32 prod_head;
u32 prod_tail;
u32 cons_head;
u32 cons_tail;
u32 cached_prod;
u32 cached_cons;
struct xdp_ring *ring;
u64 invalid_descs;
};
......@@ -86,56 +81,31 @@ struct xsk_queue {
* now and again after circling through the ring.
*/
/* Common functions operating for both RXTX and umem queues */
static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
{
return q ? q->invalid_descs : 0;
}
static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
{
u32 entries = q->prod_tail - q->cons_tail;
if (entries == 0) {
/* Refresh the local pointer */
q->prod_tail = READ_ONCE(q->ring->producer);
entries = q->prod_tail - q->cons_tail;
}
return (entries > dcnt) ? dcnt : entries;
}
static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
{
u32 free_entries = q->nentries - (producer - q->cons_tail);
if (free_entries >= dcnt)
return free_entries;
/* Refresh the local tail pointer */
q->cons_tail = READ_ONCE(q->ring->consumer);
return q->nentries - (producer - q->cons_tail);
}
static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
{
u32 entries = q->prod_tail - q->cons_tail;
if (entries >= cnt)
return true;
/* Refresh the local pointer. */
q->prod_tail = READ_ONCE(q->ring->producer);
entries = q->prod_tail - q->cons_tail;
return entries >= cnt;
}
/* The operations on the rings are the following:
*
* producer consumer
*
* RESERVE entries PEEK in the ring for entries
* WRITE data into the ring READ data from the ring
* SUBMIT entries RELEASE entries
*
* The producer reserves one or more entries in the ring. It can then
* fill in these entries and finally submit them so that they can be
* seen and read by the consumer.
*
* The consumer peeks into the ring to see if the producer has written
* any new entries. If so, the producer can then read these entries
* and when it is done reading them release them back to the producer
* so that the producer can use these slots to fill in new entries.
*
* The function names below reflect these operations.
*/
/* UMEM queue */
/* Functions that read and validate content from consumer rings. */
static inline bool xskq_crosses_non_contig_pg(struct xdp_umem *umem, u64 addr,
u64 length)
static inline bool xskq_cons_crosses_non_contig_pg(struct xdp_umem *umem,
u64 addr,
u64 length)
{
bool cross_pg = (addr & (PAGE_SIZE - 1)) + length > PAGE_SIZE;
bool next_pg_contig =
......@@ -145,9 +115,16 @@ static inline bool xskq_crosses_non_contig_pg(struct xdp_umem *umem, u64 addr,
return cross_pg && !next_pg_contig;
}
static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr)
static inline bool xskq_cons_is_valid_unaligned(struct xsk_queue *q,
u64 addr,
u64 length,
struct xdp_umem *umem)
{
if (addr >= q->size) {
u64 base_addr = xsk_umem_extract_addr(addr);
addr = xsk_umem_add_offset_to_addr(addr);
if (base_addr >= q->size || addr >= q->size ||
xskq_cons_crosses_non_contig_pg(umem, addr, length)) {
q->invalid_descs++;
return false;
}
......@@ -155,15 +132,9 @@ static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr)
return true;
}
static inline bool xskq_is_valid_addr_unaligned(struct xsk_queue *q, u64 addr,
u64 length,
struct xdp_umem *umem)
static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr)
{
u64 base_addr = xsk_umem_extract_addr(addr);
addr = xsk_umem_add_offset_to_addr(addr);
if (base_addr >= q->size || addr >= q->size ||
xskq_crosses_non_contig_pg(umem, addr, length)) {
if (addr >= q->size) {
q->invalid_descs++;
return false;
}
......@@ -171,204 +142,240 @@ static inline bool xskq_is_valid_addr_unaligned(struct xsk_queue *q, u64 addr,
return true;
}
static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr,
struct xdp_umem *umem)
static inline bool xskq_cons_read_addr(struct xsk_queue *q, u64 *addr,
struct xdp_umem *umem)
{
while (q->cons_tail != q->cons_head) {
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
unsigned int idx = q->cons_tail & q->ring_mask;
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
*addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask;
while (q->cached_cons != q->cached_prod) {
u32 idx = q->cached_cons & q->ring_mask;
*addr = ring->desc[idx] & q->chunk_mask;
if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
if (xskq_is_valid_addr_unaligned(q, *addr,
if (xskq_cons_is_valid_unaligned(q, *addr,
umem->chunk_size_nohr,
umem))
return addr;
return true;
goto out;
}
if (xskq_is_valid_addr(q, *addr))
return addr;
if (xskq_cons_is_valid_addr(q, *addr))
return true;
out:
q->cons_tail++;
q->cached_cons++;
}
return NULL;
return false;
}
static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr,
struct xdp_umem *umem)
static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
struct xdp_desc *d,
struct xdp_umem *umem)
{
if (q->cons_tail == q->cons_head) {
smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
if (!xskq_cons_is_valid_unaligned(q, d->addr, d->len, umem))
return false;
if (d->len > umem->chunk_size_nohr || d->options) {
q->invalid_descs++;
return false;
}
/* Order consumer and data */
smp_rmb();
return true;
}
return xskq_validate_addr(q, addr, umem);
}
if (!xskq_cons_is_valid_addr(q, d->addr))
return false;
static inline void xskq_discard_addr(struct xsk_queue *q)
{
q->cons_tail++;
if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) ||
d->options) {
q->invalid_descs++;
return false;
}
return true;
}
static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
static inline bool xskq_cons_read_desc(struct xsk_queue *q,
struct xdp_desc *desc,
struct xdp_umem *umem)
{
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
if (xskq_nb_free(q, q->prod_tail, 1) == 0)
return -ENOSPC;
while (q->cached_cons != q->cached_prod) {
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
u32 idx = q->cached_cons & q->ring_mask;
/* A, matches D */
ring->desc[q->prod_tail++ & q->ring_mask] = addr;
*desc = ring->desc[idx];
if (xskq_cons_is_valid_desc(q, desc, umem))
return true;
/* Order producer and data */
smp_wmb(); /* B, matches C */
q->cached_cons++;
}
WRITE_ONCE(q->ring->producer, q->prod_tail);
return 0;
return false;
}
static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
/* Functions for consumers */
static inline void __xskq_cons_release(struct xsk_queue *q)
{
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cached_cons);
}
if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0)
return -ENOSPC;
static inline void __xskq_cons_peek(struct xsk_queue *q)
{
/* Refresh the local pointer */
q->cached_prod = READ_ONCE(q->ring->producer);
smp_rmb(); /* C, matches B */
}
/* A, matches D */
ring->desc[q->prod_head++ & q->ring_mask] = addr;
return 0;
static inline void xskq_cons_get_entries(struct xsk_queue *q)
{
__xskq_cons_release(q);
__xskq_cons_peek(q);
}
static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
u32 nb_entries)
static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
{
/* Order producer and data */
smp_wmb(); /* B, matches C */
u32 entries = q->cached_prod - q->cached_cons;
q->prod_tail += nb_entries;
WRITE_ONCE(q->ring->producer, q->prod_tail);
if (entries >= cnt)
return true;
__xskq_cons_peek(q);
entries = q->cached_prod - q->cached_cons;
return entries >= cnt;
}
static inline int xskq_reserve_addr(struct xsk_queue *q)
static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr,
struct xdp_umem *umem)
{
if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC;
if (q->cached_prod == q->cached_cons)
xskq_cons_get_entries(q);
return xskq_cons_read_addr(q, addr, umem);
}
/* A, matches D */
q->prod_head++;
return 0;
static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
struct xdp_desc *desc,
struct xdp_umem *umem)
{
if (q->cached_prod == q->cached_cons)
xskq_cons_get_entries(q);
return xskq_cons_read_desc(q, desc, umem);
}
/* Rx/Tx queue */
static inline void xskq_cons_release(struct xsk_queue *q)
{
/* To improve performance, only update local state here.
* Reflect this to global state when we get new entries
* from the ring in xskq_cons_get_entries().
*/
q->cached_cons++;
}
static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d,
struct xdp_umem *umem)
static inline bool xskq_cons_is_full(struct xsk_queue *q)
{
if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
if (!xskq_is_valid_addr_unaligned(q, d->addr, d->len, umem))
return false;
/* No barriers needed since data is not accessed */
return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==
q->nentries;
}
if (d->len > umem->chunk_size_nohr || d->options) {
q->invalid_descs++;
return false;
}
/* Functions for producers */
return true;
}
static inline bool xskq_prod_is_full(struct xsk_queue *q)
{
u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
if (!xskq_is_valid_addr(q, d->addr))
if (free_entries)
return false;
if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) ||
d->options) {
q->invalid_descs++;
return false;
}
/* Refresh the local tail pointer */
q->cached_cons = READ_ONCE(q->ring->consumer);
free_entries = q->nentries - (q->cached_prod - q->cached_cons);
return true;
return !free_entries;
}
static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q,
struct xdp_desc *desc,
struct xdp_umem *umem)
static inline int xskq_prod_reserve(struct xsk_queue *q)
{
while (q->cons_tail != q->cons_head) {
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
unsigned int idx = q->cons_tail & q->ring_mask;
*desc = READ_ONCE(ring->desc[idx]);
if (xskq_is_valid_desc(q, desc, umem))
return desc;
q->cons_tail++;
}
if (xskq_prod_is_full(q))
return -ENOSPC;
return NULL;
/* A, matches D */
q->cached_prod++;
return 0;
}
static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
struct xdp_desc *desc,
struct xdp_umem *umem)
static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
{
if (q->cons_tail == q->cons_head) {
smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
/* Order consumer and data */
smp_rmb(); /* C, matches B */
}
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
return xskq_validate_desc(q, desc, umem);
}
if (xskq_prod_is_full(q))
return -ENOSPC;
static inline void xskq_discard_desc(struct xsk_queue *q)
{
q->cons_tail++;
/* A, matches D */
ring->desc[q->cached_prod++ & q->ring_mask] = addr;
return 0;
}
static inline int xskq_produce_batch_desc(struct xsk_queue *q,
u64 addr, u32 len)
static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
u64 addr, u32 len)
{
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
unsigned int idx;
u32 idx;
if (xskq_nb_free(q, q->prod_head, 1) == 0)
if (xskq_prod_is_full(q))
return -ENOSPC;
/* A, matches D */
idx = (q->prod_head++) & q->ring_mask;
idx = q->cached_prod++ & q->ring_mask;
ring->desc[idx].addr = addr;
ring->desc[idx].len = len;
return 0;
}
static inline void xskq_produce_flush_desc(struct xsk_queue *q)
static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
{
/* Order producer and data */
smp_wmb(); /* B, matches C */
q->prod_tail = q->prod_head;
WRITE_ONCE(q->ring->producer, q->prod_tail);
WRITE_ONCE(q->ring->producer, idx);
}
static inline void xskq_prod_submit(struct xsk_queue *q)
{
__xskq_prod_submit(q, q->cached_prod);
}
static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
{
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
u32 idx = q->ring->producer;
ring->desc[idx++ & q->ring_mask] = addr;
__xskq_prod_submit(q, idx);
}
static inline bool xskq_full_desc(struct xsk_queue *q)
static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
{
return xskq_nb_avail(q, q->nentries) == q->nentries;
__xskq_prod_submit(q, q->ring->producer + nb_entries);
}
static inline bool xskq_empty_desc(struct xsk_queue *q)
static inline bool xskq_prod_is_empty(struct xsk_queue *q)
{
return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries;
/* No barriers needed since data is not accessed */
return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
}
/* For both producers and consumers */
static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
{
return q ? q->invalid_descs : 0;
}
void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment