Commit c531adaf authored by David S. Miller's avatar David S. Miller

Merge branch 'ipa-RX-replenish'

Alex Elder says:

====================
net: ipa: improve RX buffer replenishing

This series revises the algorithm used for replenishing receive
buffers on RX endpoints.  Currently there are two atomic variables
that track how many receive buffers can be sent to the hardware.
The new algorithm obviates the need for those, by just assuming we
always want to provide the hardware with buffers until it can hold
no more.

The first patch eliminates an atomic variable that's not required.
The next moves some code into the main replenish function's caller,
making one of the called function's arguments unnecessary.   The
next six refactor things a bit more, adding a new helper function
that allows us to eliminate an additional atomic variable.  And the
final two implement two more minor improvements.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents b93235e6 9654d8c4
......@@ -320,6 +320,17 @@ gsi_trans_tre_release(struct gsi_trans_info *trans_info, u32 tre_count)
atomic_add(tre_count, &trans_info->tre_avail);
}
/* Return true if no transactions are allocated, false otherwise */
bool gsi_channel_trans_idle(struct gsi *gsi, u32 channel_id)
{
u32 tre_max = gsi_channel_tre_max(gsi, channel_id);
struct gsi_trans_info *trans_info;
trans_info = &gsi->channel[channel_id].trans_info;
return atomic_read(&trans_info->tre_avail) == tre_max;
}
/* Allocate a GSI transaction on a channel */
struct gsi_trans *gsi_channel_trans_alloc(struct gsi *gsi, u32 channel_id,
u32 tre_count,
......
......@@ -129,6 +129,16 @@ void *gsi_trans_pool_alloc_dma(struct gsi_trans_pool *pool, dma_addr_t *addr);
*/
void gsi_trans_pool_exit_dma(struct device *dev, struct gsi_trans_pool *pool);
/**
* gsi_channel_trans_idle() - Return whether no transactions are allocated
* @gsi: GSI pointer
* @channel_id: Channel the transaction is associated with
*
* Return: True if no transactions are allocated, false otherwise
*
*/
bool gsi_channel_trans_idle(struct gsi *gsi, u32 channel_id);
/**
* gsi_channel_trans_alloc() - Allocate a GSI transaction on a channel
* @gsi: GSI pointer
......
......@@ -25,7 +25,8 @@
#define atomic_dec_not_zero(v) atomic_add_unless((v), -1, 0)
#define IPA_REPLENISH_BATCH 16
/* Hardware is told about receive buffers once a "batch" has been queued */
#define IPA_REPLENISH_BATCH 16 /* Must be non-zero */
/* The amount of RX buffer space consumed by standard skb overhead */
#define IPA_RX_BUFFER_OVERHEAD (PAGE_SIZE - SKB_MAX_ORDER(NET_SKB_PAD, 0))
......@@ -1036,10 +1037,9 @@ static void ipa_endpoint_status(struct ipa_endpoint *endpoint)
iowrite32(val, ipa->reg_virt + offset);
}
static int ipa_endpoint_replenish_one(struct ipa_endpoint *endpoint)
static int ipa_endpoint_replenish_one(struct ipa_endpoint *endpoint,
struct gsi_trans *trans)
{
struct gsi_trans *trans;
bool doorbell = false;
struct page *page;
u32 buffer_size;
u32 offset;
......@@ -1051,121 +1051,84 @@ static int ipa_endpoint_replenish_one(struct ipa_endpoint *endpoint)
if (!page)
return -ENOMEM;
trans = ipa_endpoint_trans_alloc(endpoint, 1);
if (!trans)
goto err_free_pages;
/* Offset the buffer to make space for skb headroom */
offset = NET_SKB_PAD;
len = buffer_size - offset;
ret = gsi_trans_page_add(trans, page, len, offset);
if (ret)
goto err_trans_free;
trans->data = page; /* transaction owns page now */
if (++endpoint->replenish_ready == IPA_REPLENISH_BATCH) {
doorbell = true;
endpoint->replenish_ready = 0;
}
gsi_trans_commit(trans, doorbell);
return 0;
err_trans_free:
gsi_trans_free(trans);
err_free_pages:
__free_pages(page, get_order(buffer_size));
__free_pages(page, get_order(buffer_size));
else
trans->data = page; /* transaction owns page now */
return -ENOMEM;
return ret;
}
/**
* ipa_endpoint_replenish() - Replenish endpoint receive buffers
* @endpoint: Endpoint to be replenished
* @add_one: Whether this is replacing a just-consumed buffer
*
* The IPA hardware can hold a fixed number of receive buffers for an RX
* endpoint, based on the number of entries in the underlying channel ring
* buffer. If an endpoint's "backlog" is non-zero, it indicates how many
* more receive buffers can be supplied to the hardware. Replenishing for
* an endpoint can be disabled, in which case requests to replenish a
* buffer are "saved", and transferred to the backlog once it is re-enabled
* again.
* an endpoint can be disabled, in which case buffers are not queued to
* the hardware.
*/
static void ipa_endpoint_replenish(struct ipa_endpoint *endpoint, bool add_one)
static void ipa_endpoint_replenish(struct ipa_endpoint *endpoint)
{
struct gsi *gsi;
u32 backlog;
int delta;
struct gsi_trans *trans;
if (!test_bit(IPA_REPLENISH_ENABLED, endpoint->replenish_flags)) {
if (add_one)
atomic_inc(&endpoint->replenish_saved);
if (!test_bit(IPA_REPLENISH_ENABLED, endpoint->replenish_flags))
return;
}
/* If already active, just update the backlog */
if (test_and_set_bit(IPA_REPLENISH_ACTIVE, endpoint->replenish_flags)) {
if (add_one)
atomic_inc(&endpoint->replenish_backlog);
/* Skip it if it's already active */
if (test_and_set_bit(IPA_REPLENISH_ACTIVE, endpoint->replenish_flags))
return;
}
while (atomic_dec_not_zero(&endpoint->replenish_backlog))
if (ipa_endpoint_replenish_one(endpoint))
while ((trans = ipa_endpoint_trans_alloc(endpoint, 1))) {
bool doorbell;
if (ipa_endpoint_replenish_one(endpoint, trans))
goto try_again_later;
clear_bit(IPA_REPLENISH_ACTIVE, endpoint->replenish_flags);
if (add_one)
atomic_inc(&endpoint->replenish_backlog);
/* Ring the doorbell if we've got a full batch */
doorbell = !(++endpoint->replenish_count % IPA_REPLENISH_BATCH);
gsi_trans_commit(trans, doorbell);
}
clear_bit(IPA_REPLENISH_ACTIVE, endpoint->replenish_flags);
return;
try_again_later:
gsi_trans_free(trans);
clear_bit(IPA_REPLENISH_ACTIVE, endpoint->replenish_flags);
/* The last one didn't succeed, so fix the backlog */
delta = add_one ? 2 : 1;
backlog = atomic_add_return(delta, &endpoint->replenish_backlog);
/* Whenever a receive buffer transaction completes we'll try to
* replenish again. It's unlikely, but if we fail to supply even
* one buffer, nothing will trigger another replenish attempt.
* Receive buffer transactions use one TRE, so schedule work to
* try replenishing again if our backlog is *all* available TREs.
* If the hardware has no receive buffers queued, schedule work to
* try replenishing again.
*/
gsi = &endpoint->ipa->gsi;
if (backlog == gsi_channel_tre_max(gsi, endpoint->channel_id))
if (gsi_channel_trans_idle(&endpoint->ipa->gsi, endpoint->channel_id))
schedule_delayed_work(&endpoint->replenish_work,
msecs_to_jiffies(1));
}
static void ipa_endpoint_replenish_enable(struct ipa_endpoint *endpoint)
{
struct gsi *gsi = &endpoint->ipa->gsi;
u32 max_backlog;
u32 saved;
set_bit(IPA_REPLENISH_ENABLED, endpoint->replenish_flags);
while ((saved = atomic_xchg(&endpoint->replenish_saved, 0)))
atomic_add(saved, &endpoint->replenish_backlog);
/* Start replenishing if hardware currently has no buffers */
max_backlog = gsi_channel_tre_max(gsi, endpoint->channel_id);
if (atomic_read(&endpoint->replenish_backlog) == max_backlog)
ipa_endpoint_replenish(endpoint, false);
if (gsi_channel_trans_idle(&endpoint->ipa->gsi, endpoint->channel_id))
ipa_endpoint_replenish(endpoint);
}
static void ipa_endpoint_replenish_disable(struct ipa_endpoint *endpoint)
{
u32 backlog;
clear_bit(IPA_REPLENISH_ENABLED, endpoint->replenish_flags);
while ((backlog = atomic_xchg(&endpoint->replenish_backlog, 0)))
atomic_add(backlog, &endpoint->replenish_saved);
}
static void ipa_endpoint_replenish_work(struct work_struct *work)
......@@ -1175,7 +1138,7 @@ static void ipa_endpoint_replenish_work(struct work_struct *work)
endpoint = container_of(dwork, struct ipa_endpoint, replenish_work);
ipa_endpoint_replenish(endpoint, false);
ipa_endpoint_replenish(endpoint);
}
static void ipa_endpoint_skb_copy(struct ipa_endpoint *endpoint,
......@@ -1380,10 +1343,8 @@ static void ipa_endpoint_rx_complete(struct ipa_endpoint *endpoint,
{
struct page *page;
ipa_endpoint_replenish(endpoint, true);
if (trans->cancelled)
return;
goto done;
/* Parse or build a socket buffer using the actual received length */
page = trans->data;
......@@ -1391,6 +1352,8 @@ static void ipa_endpoint_rx_complete(struct ipa_endpoint *endpoint,
ipa_endpoint_status_parse(endpoint, page, trans->len);
else if (ipa_endpoint_skb_build(endpoint, page, trans->len))
trans->data = NULL; /* Pages have been consumed */
done:
ipa_endpoint_replenish(endpoint);
}
void ipa_endpoint_trans_complete(struct ipa_endpoint *endpoint,
......@@ -1727,9 +1690,6 @@ static void ipa_endpoint_setup_one(struct ipa_endpoint *endpoint)
*/
clear_bit(IPA_REPLENISH_ENABLED, endpoint->replenish_flags);
clear_bit(IPA_REPLENISH_ACTIVE, endpoint->replenish_flags);
atomic_set(&endpoint->replenish_saved,
gsi_channel_tre_max(gsi, endpoint->channel_id));
atomic_set(&endpoint->replenish_backlog, 0);
INIT_DELAYED_WORK(&endpoint->replenish_work,
ipa_endpoint_replenish_work);
}
......@@ -1905,6 +1865,8 @@ u32 ipa_endpoint_init(struct ipa *ipa, u32 count,
enum ipa_endpoint_name name;
u32 filter_map;
BUILD_BUG_ON(!IPA_REPLENISH_BATCH);
if (!ipa_endpoint_data_valid(ipa, count, data))
return 0; /* Error */
......
......@@ -65,9 +65,7 @@ enum ipa_replenish_flag {
* @evt_ring_id: GSI event ring used by the endpoint
* @netdev: Network device pointer, if endpoint uses one
* @replenish_flags: Replenishing state flags
* @replenish_ready: Number of replenish transactions without doorbell
* @replenish_saved: Replenish requests held while disabled
* @replenish_backlog: Number of buffers needed to fill hardware queue
* @replenish_count: Total number of replenish transactions committed
* @replenish_work: Work item used for repeated replenish failures
*/
struct ipa_endpoint {
......@@ -86,9 +84,7 @@ struct ipa_endpoint {
/* Receive buffer replenishing for RX endpoints */
DECLARE_BITMAP(replenish_flags, IPA_REPLENISH_COUNT);
u32 replenish_ready;
atomic_t replenish_saved;
atomic_t replenish_backlog;
u64 replenish_count;
struct delayed_work replenish_work; /* global wq */
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment