Commit f21e9348 authored by David Howells's avatar David Howells

rxrpc: Simplify ACK handling

Now that general ACK transmission is done from the same thread as incoming
DATA packet wrangling, there's no possibility that the SACK table will be
being updated by the latter whilst the former is trying to copy it to an
ACK.

This means that we can safely rotate the SACK table whilst updating it
without having to take a lock, rather than keeping all the bits inside it
in fixed place and copying and then rotating it in the transmitter.

Therefore, simplify SACK handing by keeping track of starting point in the
ring and rotate slots down as we consume them.
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent 5bbf9533
...@@ -422,6 +422,13 @@ ...@@ -422,6 +422,13 @@
EM(RXRPC_ACK_IDLE, "IDL") \ EM(RXRPC_ACK_IDLE, "IDL") \
E_(RXRPC_ACK__INVALID, "-?-") E_(RXRPC_ACK__INVALID, "-?-")
#define rxrpc_sack_traces \
EM(rxrpc_sack_advance, "ADV") \
EM(rxrpc_sack_fill, "FIL") \
EM(rxrpc_sack_nack, "NAK") \
EM(rxrpc_sack_none, "---") \
E_(rxrpc_sack_oos, "OOS")
#define rxrpc_completions \ #define rxrpc_completions \
EM(RXRPC_CALL_SUCCEEDED, "Succeeded") \ EM(RXRPC_CALL_SUCCEEDED, "Succeeded") \
EM(RXRPC_CALL_REMOTELY_ABORTED, "RemoteAbort") \ EM(RXRPC_CALL_REMOTELY_ABORTED, "RemoteAbort") \
...@@ -497,6 +504,7 @@ enum rxrpc_recvmsg_trace { rxrpc_recvmsg_traces } __mode(byte); ...@@ -497,6 +504,7 @@ enum rxrpc_recvmsg_trace { rxrpc_recvmsg_traces } __mode(byte);
enum rxrpc_req_ack_trace { rxrpc_req_ack_traces } __mode(byte); enum rxrpc_req_ack_trace { rxrpc_req_ack_traces } __mode(byte);
enum rxrpc_rtt_rx_trace { rxrpc_rtt_rx_traces } __mode(byte); enum rxrpc_rtt_rx_trace { rxrpc_rtt_rx_traces } __mode(byte);
enum rxrpc_rtt_tx_trace { rxrpc_rtt_tx_traces } __mode(byte); enum rxrpc_rtt_tx_trace { rxrpc_rtt_tx_traces } __mode(byte);
enum rxrpc_sack_trace { rxrpc_sack_traces } __mode(byte);
enum rxrpc_skb_trace { rxrpc_skb_traces } __mode(byte); enum rxrpc_skb_trace { rxrpc_skb_traces } __mode(byte);
enum rxrpc_timer_trace { rxrpc_timer_traces } __mode(byte); enum rxrpc_timer_trace { rxrpc_timer_traces } __mode(byte);
enum rxrpc_tx_point { rxrpc_tx_points } __mode(byte); enum rxrpc_tx_point { rxrpc_tx_points } __mode(byte);
...@@ -531,6 +539,7 @@ rxrpc_recvmsg_traces; ...@@ -531,6 +539,7 @@ rxrpc_recvmsg_traces;
rxrpc_req_ack_traces; rxrpc_req_ack_traces;
rxrpc_rtt_rx_traces; rxrpc_rtt_rx_traces;
rxrpc_rtt_tx_traces; rxrpc_rtt_tx_traces;
rxrpc_sack_traces;
rxrpc_skb_traces; rxrpc_skb_traces;
rxrpc_timer_traces; rxrpc_timer_traces;
rxrpc_tx_points; rxrpc_tx_points;
...@@ -1929,6 +1938,33 @@ TRACE_EVENT(rxrpc_call_poked, ...@@ -1929,6 +1938,33 @@ TRACE_EVENT(rxrpc_call_poked,
__entry->call_debug_id) __entry->call_debug_id)
); );
TRACE_EVENT(rxrpc_sack,
TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq,
unsigned int sack, enum rxrpc_sack_trace what),
TP_ARGS(call, seq, sack, what),
TP_STRUCT__entry(
__field(unsigned int, call_debug_id)
__field(rxrpc_seq_t, seq)
__field(unsigned int, sack)
__field(enum rxrpc_sack_trace, what)
),
TP_fast_assign(
__entry->call_debug_id = call->debug_id;
__entry->seq = seq;
__entry->sack = sack;
__entry->what = what;
),
TP_printk("c=%08x q=%08x %s k=%x",
__entry->call_debug_id,
__entry->seq,
__print_symbolic(__entry->what, rxrpc_sack_traces),
__entry->sack)
);
#undef EM #undef EM
#undef E_ #undef E_
......
...@@ -691,6 +691,7 @@ struct rxrpc_call { ...@@ -691,6 +691,7 @@ struct rxrpc_call {
/* Receive-phase ACK management (ACKs we send). */ /* Receive-phase ACK management (ACKs we send). */
u8 ackr_reason; /* reason to ACK */ u8 ackr_reason; /* reason to ACK */
u16 ackr_sack_base; /* Starting slot in SACK table ring */
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */ rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
rxrpc_seq_t ackr_window; /* Base of SACK window */ rxrpc_seq_t ackr_window; /* Base of SACK window */
rxrpc_seq_t ackr_wtop; /* Base of SACK window */ rxrpc_seq_t ackr_wtop; /* Base of SACK window */
......
...@@ -368,6 +368,7 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -368,6 +368,7 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct sk_buff *oos; struct sk_buff *oos;
rxrpc_serial_t serial = sp->hdr.serial; rxrpc_serial_t serial = sp->hdr.serial;
unsigned int sack = call->ackr_sack_base;
rxrpc_seq_t window = call->ackr_window; rxrpc_seq_t window = call->ackr_window;
rxrpc_seq_t wtop = call->ackr_wtop; rxrpc_seq_t wtop = call->ackr_wtop;
rxrpc_seq_t wlimit = window + call->rx_winsize - 1; rxrpc_seq_t wlimit = window + call->rx_winsize - 1;
...@@ -410,9 +411,6 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -410,9 +411,6 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
/* Queue the packet. */ /* Queue the packet. */
if (seq == window) { if (seq == window) {
rxrpc_seq_t reset_from;
bool reset_sack = false;
if (sp->hdr.flags & RXRPC_REQUEST_ACK) if (sp->hdr.flags & RXRPC_REQUEST_ACK)
ack_reason = RXRPC_ACK_REQUESTED; ack_reason = RXRPC_ACK_REQUESTED;
/* Send an immediate ACK if we fill in a hole */ /* Send an immediate ACK if we fill in a hole */
...@@ -422,8 +420,14 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -422,8 +420,14 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
call->ackr_nr_unacked++; call->ackr_nr_unacked++;
window++; window++;
if (after(window, wtop)) if (after(window, wtop)) {
trace_rxrpc_sack(call, seq, sack, rxrpc_sack_none);
wtop = window; wtop = window;
} else {
trace_rxrpc_sack(call, seq, sack, rxrpc_sack_advance);
sack = (sack + 1) % RXRPC_SACK_SIZE;
}
rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg); rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg);
...@@ -440,10 +444,9 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -440,10 +444,9 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
__skb_unlink(oos, &call->rx_oos_queue); __skb_unlink(oos, &call->rx_oos_queue);
last = osp->hdr.flags & RXRPC_LAST_PACKET; last = osp->hdr.flags & RXRPC_LAST_PACKET;
seq = osp->hdr.seq; seq = osp->hdr.seq;
if (!reset_sack) { call->ackr_sack_table[sack] = 0;
reset_from = seq; trace_rxrpc_sack(call, seq, sack, rxrpc_sack_fill);
reset_sack = true; sack = (sack + 1) % RXRPC_SACK_SIZE;
}
window++; window++;
rxrpc_input_queue_data(call, oos, window, wtop, rxrpc_input_queue_data(call, oos, window, wtop,
...@@ -452,31 +455,28 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -452,31 +455,28 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
spin_unlock(&call->recvmsg_queue.lock); spin_unlock(&call->recvmsg_queue.lock);
if (reset_sack) { call->ackr_sack_base = sack;
do {
call->ackr_sack_table[reset_from % RXRPC_SACK_SIZE] = 0;
} while (reset_from++, before(reset_from, window));
}
} else { } else {
bool keep = false; unsigned int slot;
ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE; ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE;
if (!call->ackr_sack_table[seq % RXRPC_SACK_SIZE]) { slot = seq - window;
call->ackr_sack_table[seq % RXRPC_SACK_SIZE] = 1; sack = (sack + slot) % RXRPC_SACK_SIZE;
keep = 1;
if (call->ackr_sack_table[sack % RXRPC_SACK_SIZE]) {
ack_reason = RXRPC_ACK_DUPLICATE;
goto send_ack;
} }
call->ackr_sack_table[sack % RXRPC_SACK_SIZE] |= 1;
trace_rxrpc_sack(call, seq, sack, rxrpc_sack_oos);
if (after(seq + 1, wtop)) { if (after(seq + 1, wtop)) {
wtop = seq + 1; wtop = seq + 1;
rxrpc_input_update_ack_window(call, window, wtop); rxrpc_input_update_ack_window(call, window, wtop);
} }
if (!keep) {
ack_reason = RXRPC_ACK_DUPLICATE;
goto send_ack;
}
skb_queue_walk(&call->rx_oos_queue, oos) { skb_queue_walk(&call->rx_oos_queue, oos) {
struct rxrpc_skb_priv *osp = rxrpc_skb(oos); struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
......
...@@ -83,56 +83,36 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn, ...@@ -83,56 +83,36 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
struct rxrpc_txbuf *txb) struct rxrpc_txbuf *txb)
{ {
struct rxrpc_ackinfo ackinfo; struct rxrpc_ackinfo ackinfo;
unsigned int qsize; unsigned int qsize, sack, wrap, to;
rxrpc_seq_t window, wtop, wrap_point, ix, first; rxrpc_seq_t window, wtop;
int rsize; int rsize;
u32 mtu, jmax; u32 mtu, jmax;
u8 *ackp = txb->acks; u8 *ackp = txb->acks;
u8 sack_buffer[sizeof(call->ackr_sack_table)] __aligned(8);
call->ackr_nr_unacked = 0; call->ackr_nr_unacked = 0;
atomic_set(&call->ackr_nr_consumed, 0); atomic_set(&call->ackr_nr_consumed, 0);
rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill); rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
/* Barrier against rxrpc_input_data(). */
retry:
window = call->ackr_window; window = call->ackr_window;
wtop = call->ackr_wtop; wtop = call->ackr_wtop;
sack = call->ackr_sack_base % RXRPC_SACK_SIZE;
txb->ack.firstPacket = htonl(window); txb->ack.firstPacket = htonl(window);
txb->ack.nAcks = 0; txb->ack.nAcks = wtop - window;
if (after(wtop, window)) { if (after(wtop, window)) {
/* Try to copy the SACK ring locklessly. We can use the copy, wrap = RXRPC_SACK_SIZE - sack;
* only if the now-current top of the window didn't go past the to = min_t(unsigned int, txb->ack.nAcks, RXRPC_SACK_SIZE);
* previously read base - otherwise we can't know whether we
* have old data or new data.
*/
memcpy(sack_buffer, call->ackr_sack_table, sizeof(sack_buffer));
wrap_point = window + RXRPC_SACK_SIZE - 1;
window = call->ackr_window;
wtop = call->ackr_wtop;
if (after(wtop, wrap_point)) {
cond_resched();
goto retry;
}
/* The buffer is maintained as a ring with an invariant mapping
* between bit position and sequence number, so we'll probably
* need to rotate it.
*/
txb->ack.nAcks = wtop - window;
ix = window % RXRPC_SACK_SIZE;
first = sizeof(sack_buffer) - ix;
if (ix + txb->ack.nAcks <= RXRPC_SACK_SIZE) { if (sack + txb->ack.nAcks <= RXRPC_SACK_SIZE) {
memcpy(txb->acks, sack_buffer + ix, txb->ack.nAcks); memcpy(txb->acks, call->ackr_sack_table + sack, txb->ack.nAcks);
} else { } else {
memcpy(txb->acks, sack_buffer + ix, first); memcpy(txb->acks, call->ackr_sack_table + sack, wrap);
memcpy(txb->acks + first, sack_buffer, memcpy(txb->acks + wrap, call->ackr_sack_table,
txb->ack.nAcks - first); to - wrap);
} }
ackp += txb->ack.nAcks; ackp += to;
} else if (before(wtop, window)) { } else if (before(wtop, window)) {
pr_warn("ack window backward %x %x", window, wtop); pr_warn("ack window backward %x %x", window, wtop);
} else if (txb->ack.reason == RXRPC_ACK_DELAY) { } else if (txb->ack.reason == RXRPC_ACK_DELAY) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment