Commit a8248fc4 authored by Paolo Abeni's avatar Paolo Abeni

Merge tag 'rxrpc-next-20230131' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
Here's the fifth part of patches in the process of moving rxrpc from doing
a lot of its stuff in softirq context to doing it in an I/O thread in
process context and thereby making it easier to support a larger SACK
table.

The full description is in the description for the first part[1] which is
now upstream.  The second and third parts are also upstream[2].  A subset
of the original fourth part[3] got applied as a fix for a race[4].

The fifth part includes some cleanups:

 (1) Miscellaneous trace header cleanups: fix a trace string, display the
     security index in rx_packet rather than displaying the type twice,
     remove some whitespace to make checkpatch happier and remove some
     excess tabulation.

 (2) Convert ->recvmsg_lock to a spinlock as it's only ever locked
     exclusively.

 (3) Make ->ackr_window and ->ackr_nr_unacked non-atomic as they're only
     used in the I/O thread.

 (4) Don't use call->tx_lock to access ->tx_buffer as that is only accessed
     inside the I/O thread.  sendmsg() loads onto ->tx_sendmsg and the I/O
     thread decants from that to the buffer.

 (5) Remove local->defrag_sem as DATA packets are transmitted serially by
     the I/O thread.

 (6) Remove the service connection bundle is it was only used for its
     channel_lock - which has now gone.

And some more significant changes:

 (7) Add a debugging option to allow a delay to be injected into packet
     reception to help investigate the behaviour over longer links than
     just a few cm.

 (8) Generate occasional PING ACKs to probe for RTT information during a
     receive heavy call.

 (9) Simplify the SACK table maintenance and ACK generation.  Now that both
     parts are done in the same thread, there's no possibility of a race
     and no need to try and be cunning to avoid taking a BH spinlock whilst
     copying the SACK table (which in the future will be up to 2K) and no
     need to rotate the copy to fit the ACK packet table.

(10) Use SKB_CONSUMED when freeing received DATA packets (stop dropwatch
     complaining).

* tag 'rxrpc-next-20230131' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs:
  rxrpc: Kill service bundle
  rxrpc: Change rx_packet tracepoint to display securityIndex not type twice
  rxrpc: Show consumed and freed packets as non-dropped in dropwatch
  rxrpc: Remove local->defrag_sem
  rxrpc: Don't lock call->tx_lock to access call->tx_buffer
  rxrpc: Simplify ACK handling
  rxrpc: De-atomic call->ackr_window and call->ackr_nr_unacked
  rxrpc: Generate extra pings for RTT during heavy-receive call
  rxrpc: Allow a delay to be injected into packet reception
  rxrpc: Convert call->recvmsg_lock to a spinlock
  rxrpc: Shrink the tabulation in the rxrpc trace header a bit
  rxrpc: Remove whitespace before ')' in trace header
  rxrpc: Fix trace string
====================

Link: https://lore.kernel.org/all/20230131171227.3912130-1-dhowells@redhat.com/Signed-off-by: default avatarPaolo Abeni <pabeni@redhat.com>
parents 609aa68d 550130a0
This diff is collapsed.
......@@ -36,6 +36,15 @@ config AF_RXRPC_INJECT_LOSS
Say Y here to inject packet loss by discarding some received and some
transmitted packets.
config AF_RXRPC_INJECT_RX_DELAY
bool "Inject delay into packet reception"
depends on SYSCTL
help
Say Y here to inject a delay into packet reception, allowing an
extended RTT time to be modelled. The delay can be configured using
/proc/sys/net/rxrpc/rxrpc_inject_rx_delay, setting a number of
milliseconds up to 0.5s (note that the granularity is actually in
jiffies).
config AF_RXRPC_DEBUG
bool "RxRPC dynamic debugging"
......
......@@ -786,7 +786,7 @@ static int rxrpc_create(struct net *net, struct socket *sock, int protocol,
INIT_LIST_HEAD(&rx->sock_calls);
INIT_LIST_HEAD(&rx->to_be_accepted);
INIT_LIST_HEAD(&rx->recvmsg_q);
rwlock_init(&rx->recvmsg_lock);
spin_lock_init(&rx->recvmsg_lock);
rwlock_init(&rx->call_lock);
memset(&rx->srx, 0, sizeof(rx->srx));
......
......@@ -149,7 +149,7 @@ struct rxrpc_sock {
struct list_head sock_calls; /* List of calls owned by this socket */
struct list_head to_be_accepted; /* calls awaiting acceptance */
struct list_head recvmsg_q; /* Calls awaiting recvmsg's attention */
rwlock_t recvmsg_lock; /* Lock for recvmsg_q */
spinlock_t recvmsg_lock; /* Lock for recvmsg_q */
struct key *key; /* security for this socket */
struct key *securities; /* list of server security descriptors */
struct rb_root calls; /* User ID -> call mapping */
......@@ -284,7 +284,9 @@ struct rxrpc_local {
struct task_struct *io_thread;
struct completion io_thread_ready; /* Indication that the I/O thread started */
struct rxrpc_sock *service; /* Service(s) listening on this endpoint */
struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
struct sk_buff_head rx_delay_queue; /* Delay injection queue */
#endif
struct sk_buff_head rx_queue; /* Received packets */
struct list_head conn_attend_q; /* Conns requiring immediate attention */
struct list_head call_attend_q; /* Calls requiring immediate attention */
......@@ -688,9 +690,11 @@ struct rxrpc_call {
/* Receive-phase ACK management (ACKs we send). */
u8 ackr_reason; /* reason to ACK */
u16 ackr_sack_base; /* Starting slot in SACK table ring */
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
atomic64_t ackr_window; /* Base (in LSW) and top (in MSW) of SACK window */
atomic_t ackr_nr_unacked; /* Number of unacked packets */
rxrpc_seq_t ackr_window; /* Base of SACK window */
rxrpc_seq_t ackr_wtop; /* Base of SACK window */
unsigned int ackr_nr_unacked; /* Number of unacked packets */
atomic_t ackr_nr_consumed; /* Number of packets needing hard ACK */
struct {
#define RXRPC_SACK_SIZE 256
......@@ -1109,6 +1113,9 @@ extern unsigned long rxrpc_idle_ack_delay;
extern unsigned int rxrpc_rx_window_size;
extern unsigned int rxrpc_rx_mtu;
extern unsigned int rxrpc_rx_jumbo_max;
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
extern unsigned long rxrpc_inject_rx_delay;
#endif
/*
* net_ns.c
......
......@@ -195,7 +195,7 @@ void rxrpc_discard_prealloc(struct rxrpc_sock *rx)
tail = b->peer_backlog_tail;
while (CIRC_CNT(head, tail, size) > 0) {
struct rxrpc_peer *peer = b->peer_backlog[tail];
rxrpc_put_local(peer->local, rxrpc_local_put_prealloc_conn);
rxrpc_put_local(peer->local, rxrpc_local_put_prealloc_peer);
kfree(peer);
tail = (tail + 1) & (size - 1);
}
......
......@@ -498,9 +498,18 @@ bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
rxrpc_propose_ack_rx_idle);
if (atomic_read(&call->ackr_nr_unacked) > 2)
rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
rxrpc_propose_ack_input_data);
if (call->ackr_nr_unacked > 2) {
if (call->peer->rtt_count < 3)
rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
rxrpc_propose_ack_ping_for_rtt);
else if (ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000),
ktime_get_real()))
rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
rxrpc_propose_ack_ping_for_old_rtt);
else
rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
rxrpc_propose_ack_input_data);
}
/* Make sure the timer is restarted */
if (!__rxrpc_call_is_complete(call)) {
......
......@@ -167,7 +167,8 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
call->tx_total_len = -1;
call->next_rx_timo = 20 * HZ;
call->next_req_timo = 1 * HZ;
atomic64_set(&call->ackr_window, 0x100000001ULL);
call->ackr_window = 1;
call->ackr_wtop = 1;
memset(&call->sock_node, 0xed, sizeof(call->sock_node));
......@@ -560,7 +561,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
rxrpc_put_call_slot(call);
/* Make sure we don't get any more notifications */
write_lock(&rx->recvmsg_lock);
spin_lock(&rx->recvmsg_lock);
if (!list_empty(&call->recvmsg_link)) {
_debug("unlinking once-pending call %p { e=%lx f=%lx }",
......@@ -573,7 +574,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
call->recvmsg_link.next = NULL;
call->recvmsg_link.prev = NULL;
write_unlock(&rx->recvmsg_lock);
spin_unlock(&rx->recvmsg_lock);
if (put)
rxrpc_put_call(call, rxrpc_call_put_unnotify);
......
......@@ -8,11 +8,6 @@
#include <linux/slab.h>
#include "ar-internal.h"
static struct rxrpc_bundle rxrpc_service_dummy_bundle = {
.ref = REFCOUNT_INIT(1),
.debug_id = UINT_MAX,
};
/*
* Find a service connection under RCU conditions.
*
......@@ -132,8 +127,6 @@ struct rxrpc_connection *rxrpc_prealloc_service_connection(struct rxrpc_net *rxn
*/
conn->state = RXRPC_CONN_SERVICE_PREALLOC;
refcount_set(&conn->ref, 2);
conn->bundle = rxrpc_get_bundle(&rxrpc_service_dummy_bundle,
rxrpc_bundle_get_service_conn);
atomic_inc(&rxnet->nr_conns);
write_lock(&rxnet->conn_lock);
......
......@@ -338,7 +338,8 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
static void rxrpc_input_update_ack_window(struct rxrpc_call *call,
rxrpc_seq_t window, rxrpc_seq_t wtop)
{
atomic64_set_release(&call->ackr_window, ((u64)wtop) << 32 | window);
call->ackr_window = window;
call->ackr_wtop = wtop;
}
/*
......@@ -367,9 +368,9 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct sk_buff *oos;
rxrpc_serial_t serial = sp->hdr.serial;
u64 win = atomic64_read(&call->ackr_window);
rxrpc_seq_t window = lower_32_bits(win);
rxrpc_seq_t wtop = upper_32_bits(win);
unsigned int sack = call->ackr_sack_base;
rxrpc_seq_t window = call->ackr_window;
rxrpc_seq_t wtop = call->ackr_wtop;
rxrpc_seq_t wlimit = window + call->rx_winsize - 1;
rxrpc_seq_t seq = sp->hdr.seq;
bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
......@@ -410,20 +411,23 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
/* Queue the packet. */
if (seq == window) {
rxrpc_seq_t reset_from;
bool reset_sack = false;
if (sp->hdr.flags & RXRPC_REQUEST_ACK)
ack_reason = RXRPC_ACK_REQUESTED;
/* Send an immediate ACK if we fill in a hole */
else if (!skb_queue_empty(&call->rx_oos_queue))
ack_reason = RXRPC_ACK_DELAY;
else
atomic_inc_return(&call->ackr_nr_unacked);
call->ackr_nr_unacked++;
window++;
if (after(window, wtop))
if (after(window, wtop)) {
trace_rxrpc_sack(call, seq, sack, rxrpc_sack_none);
wtop = window;
} else {
trace_rxrpc_sack(call, seq, sack, rxrpc_sack_advance);
sack = (sack + 1) % RXRPC_SACK_SIZE;
}
rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg);
......@@ -440,43 +444,39 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
__skb_unlink(oos, &call->rx_oos_queue);
last = osp->hdr.flags & RXRPC_LAST_PACKET;
seq = osp->hdr.seq;
if (!reset_sack) {
reset_from = seq;
reset_sack = true;
}
call->ackr_sack_table[sack] = 0;
trace_rxrpc_sack(call, seq, sack, rxrpc_sack_fill);
sack = (sack + 1) % RXRPC_SACK_SIZE;
window++;
rxrpc_input_queue_data(call, oos, window, wtop,
rxrpc_receive_queue_oos);
rxrpc_receive_queue_oos);
}
spin_unlock(&call->recvmsg_queue.lock);
if (reset_sack) {
do {
call->ackr_sack_table[reset_from % RXRPC_SACK_SIZE] = 0;
} while (reset_from++, before(reset_from, window));
}
call->ackr_sack_base = sack;
} else {
bool keep = false;
unsigned int slot;
ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE;
if (!call->ackr_sack_table[seq % RXRPC_SACK_SIZE]) {
call->ackr_sack_table[seq % RXRPC_SACK_SIZE] = 1;
keep = 1;
slot = seq - window;
sack = (sack + slot) % RXRPC_SACK_SIZE;
if (call->ackr_sack_table[sack % RXRPC_SACK_SIZE]) {
ack_reason = RXRPC_ACK_DUPLICATE;
goto send_ack;
}
call->ackr_sack_table[sack % RXRPC_SACK_SIZE] |= 1;
trace_rxrpc_sack(call, seq, sack, rxrpc_sack_oos);
if (after(seq + 1, wtop)) {
wtop = seq + 1;
rxrpc_input_update_ack_window(call, window, wtop);
}
if (!keep) {
ack_reason = RXRPC_ACK_DUPLICATE;
goto send_ack;
}
skb_queue_walk(&call->rx_oos_queue, oos) {
struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
......@@ -567,8 +567,8 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
rxrpc_serial_t serial = sp->hdr.serial;
rxrpc_seq_t seq0 = sp->hdr.seq;
_enter("{%llx,%x},{%u,%x}",
atomic64_read(&call->ackr_window), call->rx_highest_seq,
_enter("{%x,%x,%x},{%u,%x}",
call->ackr_window, call->ackr_wtop, call->rx_highest_seq,
skb->len, seq0);
if (__rxrpc_call_is_complete(call))
......
......@@ -25,6 +25,7 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
*/
int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
{
struct sk_buff_head *rx_queue;
struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
if (unlikely(!local)) {
......@@ -36,7 +37,16 @@ int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
skb->mark = RXRPC_SKB_MARK_PACKET;
rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
skb_queue_tail(&local->rx_queue, skb);
rx_queue = &local->rx_queue;
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
if (rxrpc_inject_rx_delay ||
!skb_queue_empty(&local->rx_delay_queue)) {
skb->tstamp = ktime_add_ms(skb->tstamp, rxrpc_inject_rx_delay);
rx_queue = &local->rx_delay_queue;
}
#endif
skb_queue_tail(rx_queue, skb);
rxrpc_wake_up_io_thread(local);
return 0;
}
......@@ -407,6 +417,9 @@ int rxrpc_io_thread(void *data)
struct rxrpc_local *local = data;
struct rxrpc_call *call;
struct sk_buff *skb;
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
ktime_t now;
#endif
bool should_stop;
complete(&local->io_thread_ready);
......@@ -481,6 +494,17 @@ int rxrpc_io_thread(void *data)
continue;
}
/* Inject a delay into packets if requested. */
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
now = ktime_get_real();
while ((skb = skb_peek(&local->rx_delay_queue))) {
if (ktime_before(now, skb->tstamp))
break;
skb = skb_dequeue(&local->rx_delay_queue);
skb_queue_tail(&local->rx_queue, skb);
}
#endif
if (!skb_queue_empty(&local->rx_queue)) {
spin_lock_irq(&local->rx_queue.lock);
skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
......@@ -502,6 +526,28 @@ int rxrpc_io_thread(void *data)
if (should_stop)
break;
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
skb = skb_peek(&local->rx_delay_queue);
if (skb) {
unsigned long timeout;
ktime_t tstamp = skb->tstamp;
ktime_t now = ktime_get_real();
s64 delay_ns = ktime_to_ns(ktime_sub(tstamp, now));
if (delay_ns <= 0) {
__set_current_state(TASK_RUNNING);
continue;
}
timeout = nsecs_to_jiffies(delay_ns);
timeout = max(timeout, 1UL);
schedule_timeout(timeout);
__set_current_state(TASK_RUNNING);
continue;
}
#endif
schedule();
}
......
......@@ -108,8 +108,10 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
local->net = net;
local->rxnet = rxrpc_net(net);
INIT_HLIST_NODE(&local->link);
init_rwsem(&local->defrag_sem);
init_completion(&local->io_thread_ready);
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
skb_queue_head_init(&local->rx_delay_queue);
#endif
skb_queue_head_init(&local->rx_queue);
INIT_LIST_HEAD(&local->conn_attend_q);
INIT_LIST_HEAD(&local->call_attend_q);
......@@ -434,6 +436,9 @@ void rxrpc_destroy_local(struct rxrpc_local *local)
/* At this point, there should be no more packets coming in to the
* local endpoint.
*/
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
rxrpc_purge_queue(&local->rx_delay_queue);
#endif
rxrpc_purge_queue(&local->rx_queue);
rxrpc_purge_client_connections(local);
}
......
......@@ -53,3 +53,10 @@ unsigned int rxrpc_rx_mtu = 5692;
* sender that we're willing to handle.
*/
unsigned int rxrpc_rx_jumbo_max = 4;
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
/*
* The delay to inject into packet reception.
*/
unsigned long rxrpc_inject_rx_delay;
#endif
......@@ -83,59 +83,36 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
struct rxrpc_txbuf *txb)
{
struct rxrpc_ackinfo ackinfo;
unsigned int qsize;
rxrpc_seq_t window, wtop, wrap_point, ix, first;
unsigned int qsize, sack, wrap, to;
rxrpc_seq_t window, wtop;
int rsize;
u64 wtmp;
u32 mtu, jmax;
u8 *ackp = txb->acks;
u8 sack_buffer[sizeof(call->ackr_sack_table)] __aligned(8);
atomic_set(&call->ackr_nr_unacked, 0);
call->ackr_nr_unacked = 0;
atomic_set(&call->ackr_nr_consumed, 0);
rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
/* Barrier against rxrpc_input_data(). */
retry:
wtmp = atomic64_read_acquire(&call->ackr_window);
window = lower_32_bits(wtmp);
wtop = upper_32_bits(wtmp);
window = call->ackr_window;
wtop = call->ackr_wtop;
sack = call->ackr_sack_base % RXRPC_SACK_SIZE;
txb->ack.firstPacket = htonl(window);
txb->ack.nAcks = 0;
txb->ack.nAcks = wtop - window;
if (after(wtop, window)) {
/* Try to copy the SACK ring locklessly. We can use the copy,
* only if the now-current top of the window didn't go past the
* previously read base - otherwise we can't know whether we
* have old data or new data.
*/
memcpy(sack_buffer, call->ackr_sack_table, sizeof(sack_buffer));
wrap_point = window + RXRPC_SACK_SIZE - 1;
wtmp = atomic64_read_acquire(&call->ackr_window);
window = lower_32_bits(wtmp);
wtop = upper_32_bits(wtmp);
if (after(wtop, wrap_point)) {
cond_resched();
goto retry;
}
/* The buffer is maintained as a ring with an invariant mapping
* between bit position and sequence number, so we'll probably
* need to rotate it.
*/
txb->ack.nAcks = wtop - window;
ix = window % RXRPC_SACK_SIZE;
first = sizeof(sack_buffer) - ix;
wrap = RXRPC_SACK_SIZE - sack;
to = min_t(unsigned int, txb->ack.nAcks, RXRPC_SACK_SIZE);
if (ix + txb->ack.nAcks <= RXRPC_SACK_SIZE) {
memcpy(txb->acks, sack_buffer + ix, txb->ack.nAcks);
if (sack + txb->ack.nAcks <= RXRPC_SACK_SIZE) {
memcpy(txb->acks, call->ackr_sack_table + sack, txb->ack.nAcks);
} else {
memcpy(txb->acks, sack_buffer + ix, first);
memcpy(txb->acks + first, sack_buffer,
txb->ack.nAcks - first);
memcpy(txb->acks, call->ackr_sack_table + sack, wrap);
memcpy(txb->acks + wrap, call->ackr_sack_table,
to - wrap);
}
ackp += txb->ack.nAcks;
ackp += to;
} else if (before(wtop, window)) {
pr_warn("ack window backward %x %x", window, wtop);
} else if (txb->ack.reason == RXRPC_ACK_DELAY) {
......@@ -253,12 +230,15 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
ret = do_udp_sendmsg(conn->local->socket, &msg, len);
call->peer->last_tx_at = ktime_get_seconds();
if (ret < 0)
if (ret < 0) {
trace_rxrpc_tx_fail(call->debug_id, serial, ret,
rxrpc_tx_point_call_ack);
else
} else {
trace_rxrpc_tx_packet(call->debug_id, &txb->wire,
rxrpc_tx_point_call_ack);
if (txb->wire.flags & RXRPC_REQUEST_ACK)
call->peer->rtt_last_req = ktime_get_real();
}
rxrpc_tx_backoff(call, ret);
if (!__rxrpc_call_is_complete(call)) {
......@@ -429,8 +409,6 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
if (txb->len >= call->peer->maxdata)
goto send_fragmentable;
down_read(&conn->local->defrag_sem);
txb->last_sent = ktime_get_real();
if (txb->wire.flags & RXRPC_REQUEST_ACK)
rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_data);
......@@ -445,7 +423,6 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
ret = do_udp_sendmsg(conn->local->socket, &msg, len);
conn->peer->last_tx_at = ktime_get_seconds();
up_read(&conn->local->defrag_sem);
if (ret < 0) {
rxrpc_inc_stat(call->rxnet, stat_tx_data_send_fail);
rxrpc_cancel_rtt_probe(call, serial, rtt_slot);
......@@ -506,8 +483,6 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
/* attempt to send this message with fragmentation enabled */
_debug("send fragment");
down_write(&conn->local->defrag_sem);
txb->last_sent = ktime_get_real();
if (txb->wire.flags & RXRPC_REQUEST_ACK)
rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_data);
......@@ -539,8 +514,6 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
rxrpc_tx_point_call_data_frag);
}
rxrpc_tx_backoff(call, ret);
up_write(&conn->local->defrag_sem);
goto done;
}
......
......@@ -55,7 +55,6 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
unsigned long timeout = 0;
rxrpc_seq_t acks_hard_ack;
char lbuff[50], rbuff[50];
u64 wtmp;
if (v == &rxnet->calls) {
seq_puts(seq,
......@@ -83,7 +82,6 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
}
acks_hard_ack = READ_ONCE(call->acks_hard_ack);
wtmp = atomic64_read_acquire(&call->ackr_window);
seq_printf(seq,
"UDP %-47.47s %-47.47s %4x %08x %08x %s %3u"
" %-8.8s %08x %08x %08x %02x %08x %02x %08x %02x %06lx\n",
......@@ -98,7 +96,7 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
call->abort_code,
call->debug_id,
acks_hard_ack, READ_ONCE(call->tx_top) - acks_hard_ack,
lower_32_bits(wtmp), upper_32_bits(wtmp) - lower_32_bits(wtmp),
call->ackr_window, call->ackr_wtop - call->ackr_window,
call->rx_serial,
call->cong_cwnd,
timeout);
......
......@@ -40,12 +40,12 @@ void rxrpc_notify_socket(struct rxrpc_call *call)
call->notify_rx(sk, call, call->user_call_ID);
spin_unlock(&call->notify_lock);
} else {
write_lock(&rx->recvmsg_lock);
spin_lock(&rx->recvmsg_lock);
if (list_empty(&call->recvmsg_link)) {
rxrpc_get_call(call, rxrpc_call_get_notify_socket);
list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
}
write_unlock(&rx->recvmsg_lock);
spin_unlock(&rx->recvmsg_lock);
if (!sock_flag(sk, SOCK_DEAD)) {
_debug("call %ps", sk->sk_data_ready);
......@@ -95,7 +95,7 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
}
trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
lower_32_bits(atomic64_read(&call->ackr_window)) - 1,
call->ackr_window - 1,
call->rx_pkt_offset, call->rx_pkt_len, ret);
return ret;
}
......@@ -175,13 +175,13 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
rx_pkt_len = call->rx_pkt_len;
if (rxrpc_call_has_failed(call)) {
seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
seq = call->ackr_window - 1;
ret = -EIO;
goto done;
}
if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
seq = call->ackr_window - 1;
ret = 1;
goto done;
}
......@@ -335,14 +335,14 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
/* Find the next call and dequeue it if we're not just peeking. If we
* do dequeue it, that comes with a ref that we will need to release.
*/
write_lock(&rx->recvmsg_lock);
spin_lock(&rx->recvmsg_lock);
l = rx->recvmsg_q.next;
call = list_entry(l, struct rxrpc_call, recvmsg_link);
if (!(flags & MSG_PEEK))
list_del_init(&call->recvmsg_link);
else
rxrpc_get_call(call, rxrpc_call_get_recvmsg);
write_unlock(&rx->recvmsg_lock);
spin_unlock(&rx->recvmsg_lock);
call_debug_id = call->debug_id;
trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
......@@ -431,9 +431,9 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
error_requeue_call:
if (!(flags & MSG_PEEK)) {
write_lock(&rx->recvmsg_lock);
spin_lock(&rx->recvmsg_lock);
list_add(&call->recvmsg_link, &rx->recvmsg_q);
write_unlock(&rx->recvmsg_lock);
spin_unlock(&rx->recvmsg_lock);
trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
} else {
rxrpc_put_call(call, rxrpc_call_put_recvmsg);
......
......@@ -63,7 +63,7 @@ void rxrpc_free_skb(struct sk_buff *skb, enum rxrpc_skb_trace why)
if (skb) {
int n = atomic_dec_return(select_skb_count(skb));
trace_rxrpc_skb(skb, refcount_read(&skb->users), n, why);
kfree_skb(skb);
kfree_skb_reason(skb, SKB_CONSUMED);
}
}
......@@ -78,6 +78,6 @@ void rxrpc_purge_queue(struct sk_buff_head *list)
int n = atomic_dec_return(select_skb_count(skb));
trace_rxrpc_skb(skb, refcount_read(&skb->users), n,
rxrpc_skb_put_purge);
kfree_skb(skb);
kfree_skb_reason(skb, SKB_CONSUMED);
}
}
......@@ -17,6 +17,9 @@ static const unsigned int n_65535 = 65535;
static const unsigned int n_max_acks = 255;
static const unsigned long one_jiffy = 1;
static const unsigned long max_jiffies = MAX_JIFFY_OFFSET;
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
static const unsigned long max_500 = 500;
#endif
/*
* RxRPC operating parameters.
......@@ -63,6 +66,19 @@ static struct ctl_table rxrpc_sysctl_table[] = {
.extra2 = (void *)&max_jiffies,
},
/* Values used in milliseconds */
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
{
.procname = "inject_rx_delay",
.data = &rxrpc_inject_rx_delay,
.maxlen = sizeof(unsigned long),
.mode = 0644,
.proc_handler = proc_doulongvec_minmax,
.extra1 = (void *)SYSCTL_LONG_ZERO,
.extra2 = (void *)&max_500,
},
#endif
/* Non-time values */
{
.procname = "reap_client_conns",
......@@ -109,7 +125,6 @@ static struct ctl_table rxrpc_sysctl_table[] = {
.extra1 = (void *)SYSCTL_ONE,
.extra2 = (void *)&four,
},
{ }
};
......
......@@ -110,12 +110,8 @@ void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
_enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
for (;;) {
spin_lock(&call->tx_lock);
txb = list_first_entry_or_null(&call->tx_buffer,
struct rxrpc_txbuf, call_link);
if (!txb)
break;
while ((txb = list_first_entry_or_null(&call->tx_buffer,
struct rxrpc_txbuf, call_link))) {
hard_ack = smp_load_acquire(&call->acks_hard_ack);
if (before(hard_ack, txb->seq))
break;
......@@ -128,15 +124,11 @@ void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
spin_unlock(&call->tx_lock);
rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
if (after(call->acks_hard_ack, call->tx_bottom + 128))
wake = true;
}
spin_unlock(&call->tx_lock);
if (wake)
wake_up(&call->waitq);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment