Commit 8d73a73a authored by David S. Miller's avatar David S. Miller

Merge tag 'rxrpc-fixes-20200820' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc, afs: Fix probing issues

Here are some fixes for rxrpc and afs to fix issues in the RTT measuring in
rxrpc and thence the Volume Location server probing in afs:

 (1) Move the serial number of a received ACK into a local variable to
     simplify the next patch.

 (2) Fix the loss of RTT samples due to extra interposed ACKs causing
     baseline information to be discarded too early.  This is a particular
     problem for afs when it sends a single very short call to probe a
     server it hasn't talked to recently.

 (3) Fix rxrpc_kernel_get_srtt() to indicate whether it actually has seen
     any valid samples or not.

 (4) Remove a field that's set/woken, but never read/waited on.

 (5) Expose the RTT and other probe information through procfs to make
     debugging of this stuff easier.

 (6) Fix VL rotation in afs to only use summary information from VL probing
     and not the probe running state (which gets clobbered when next a
     probe is issued).

 (7) Fix VL rotation to actually return the error aggregated from the probe
     errors.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents af8ea111 e4686c79
......@@ -161,8 +161,8 @@ void afs_fileserver_probe_result(struct afs_call *call)
}
}
rtt_us = rxrpc_kernel_get_srtt(call->net->socket, call->rxcall);
if (rtt_us < server->probe.rtt) {
if (rxrpc_kernel_get_srtt(call->net->socket, call->rxcall, &rtt_us) &&
rtt_us < server->probe.rtt) {
server->probe.rtt = rtt_us;
server->rtt = rtt_us;
alist->preferred = index;
......
......@@ -401,22 +401,24 @@ struct afs_vlserver {
#define AFS_VLSERVER_FL_PROBED 0 /* The VL server has been probed */
#define AFS_VLSERVER_FL_PROBING 1 /* VL server is being probed */
#define AFS_VLSERVER_FL_IS_YFS 2 /* Server is YFS not AFS */
#define AFS_VLSERVER_FL_RESPONDING 3 /* VL server is responding */
rwlock_t lock; /* Lock on addresses */
atomic_t usage;
unsigned int rtt; /* Server's current RTT in uS */
/* Probe state */
wait_queue_head_t probe_wq;
atomic_t probe_outstanding;
spinlock_t probe_lock;
struct {
unsigned int rtt; /* RTT as ktime/64 */
unsigned int rtt; /* RTT in uS */
u32 abort_code;
short error;
bool have_result;
bool responded:1;
bool is_yfs:1;
bool not_yfs:1;
bool local_failure:1;
unsigned short flags;
#define AFS_VLSERVER_PROBE_RESPONDED 0x01 /* At least once response (may be abort) */
#define AFS_VLSERVER_PROBE_IS_YFS 0x02 /* The peer appears to be YFS */
#define AFS_VLSERVER_PROBE_NOT_YFS 0x04 /* The peer appears not to be YFS */
#define AFS_VLSERVER_PROBE_LOCAL_FAILURE 0x08 /* A local failure prevented a probe */
} probe;
u16 port;
......
......@@ -310,6 +310,11 @@ static int afs_proc_cell_vlservers_show(struct seq_file *m, void *v)
alist->preferred == i ? '>' : '-',
&alist->addrs[i].transport);
}
seq_printf(m, " info: fl=%lx rtt=%d\n", vlserver->flags, vlserver->rtt);
seq_printf(m, " probe: fl=%x e=%d ac=%d out=%d\n",
vlserver->probe.flags, vlserver->probe.error,
vlserver->probe.abort_code,
atomic_read(&vlserver->probe_outstanding));
return 0;
}
......
......@@ -21,6 +21,7 @@ struct afs_vlserver *afs_alloc_vlserver(const char *name, size_t name_len,
rwlock_init(&vlserver->lock);
init_waitqueue_head(&vlserver->probe_wq);
spin_lock_init(&vlserver->probe_lock);
vlserver->rtt = UINT_MAX;
vlserver->name_len = name_len;
vlserver->port = port;
memcpy(vlserver->name, name, name_len);
......
......@@ -11,15 +11,33 @@
#include "internal.h"
#include "protocol_yfs.h"
static bool afs_vl_probe_done(struct afs_vlserver *server)
/*
* Handle the completion of a set of probes.
*/
static void afs_finished_vl_probe(struct afs_vlserver *server)
{
if (!atomic_dec_and_test(&server->probe_outstanding))
return false;
if (!(server->probe.flags & AFS_VLSERVER_PROBE_RESPONDED)) {
server->rtt = UINT_MAX;
clear_bit(AFS_VLSERVER_FL_RESPONDING, &server->flags);
}
wake_up_var(&server->probe_outstanding);
clear_bit_unlock(AFS_VLSERVER_FL_PROBING, &server->flags);
wake_up_bit(&server->flags, AFS_VLSERVER_FL_PROBING);
return true;
}
/*
* Handle the completion of a probe RPC call.
*/
static void afs_done_one_vl_probe(struct afs_vlserver *server, bool wake_up)
{
if (atomic_dec_and_test(&server->probe_outstanding)) {
afs_finished_vl_probe(server);
wake_up = true;
}
if (wake_up)
wake_up_all(&server->probe_wq);
}
/*
......@@ -45,15 +63,20 @@ void afs_vlserver_probe_result(struct afs_call *call)
server->probe.error = 0;
goto responded;
case -ECONNABORTED:
if (!server->probe.responded) {
if (!(server->probe.flags & AFS_VLSERVER_PROBE_RESPONDED)) {
server->probe.abort_code = call->abort_code;
server->probe.error = ret;
}
goto responded;
case -ENOMEM:
case -ENONET:
server->probe.local_failure = true;
afs_io_error(call, afs_io_error_vl_probe_fail);
case -EKEYEXPIRED:
case -EKEYREVOKED:
case -EKEYREJECTED:
server->probe.flags |= AFS_VLSERVER_PROBE_LOCAL_FAILURE;
if (server->probe.error == 0)
server->probe.error = ret;
trace_afs_io_error(call->debug_id, ret, afs_io_error_vl_probe_fail);
goto out;
case -ECONNRESET: /* Responded, but call expired. */
case -ERFKILL:
......@@ -67,12 +90,12 @@ void afs_vlserver_probe_result(struct afs_call *call)
default:
clear_bit(index, &alist->responded);
set_bit(index, &alist->failed);
if (!server->probe.responded &&
if (!(server->probe.flags & AFS_VLSERVER_PROBE_RESPONDED) &&
(server->probe.error == 0 ||
server->probe.error == -ETIMEDOUT ||
server->probe.error == -ETIME))
server->probe.error = ret;
afs_io_error(call, afs_io_error_vl_probe_fail);
trace_afs_io_error(call->debug_id, ret, afs_io_error_vl_probe_fail);
goto out;
}
......@@ -81,39 +104,36 @@ void afs_vlserver_probe_result(struct afs_call *call)
clear_bit(index, &alist->failed);
if (call->service_id == YFS_VL_SERVICE) {
server->probe.is_yfs = true;
server->probe.flags |= AFS_VLSERVER_PROBE_IS_YFS;
set_bit(AFS_VLSERVER_FL_IS_YFS, &server->flags);
alist->addrs[index].srx_service = call->service_id;
} else {
server->probe.not_yfs = true;
if (!server->probe.is_yfs) {
server->probe.flags |= AFS_VLSERVER_PROBE_NOT_YFS;
if (!(server->probe.flags & AFS_VLSERVER_PROBE_IS_YFS)) {
clear_bit(AFS_VLSERVER_FL_IS_YFS, &server->flags);
alist->addrs[index].srx_service = call->service_id;
}
}
rtt_us = rxrpc_kernel_get_srtt(call->net->socket, call->rxcall);
if (rtt_us < server->probe.rtt) {
if (rxrpc_kernel_get_srtt(call->net->socket, call->rxcall, &rtt_us) &&
rtt_us < server->probe.rtt) {
server->probe.rtt = rtt_us;
server->rtt = rtt_us;
alist->preferred = index;
have_result = true;
}
smp_wmb(); /* Set rtt before responded. */
server->probe.responded = true;
server->probe.flags |= AFS_VLSERVER_PROBE_RESPONDED;
set_bit(AFS_VLSERVER_FL_PROBED, &server->flags);
set_bit(AFS_VLSERVER_FL_RESPONDING, &server->flags);
have_result = true;
out:
spin_unlock(&server->probe_lock);
_debug("probe [%u][%u] %pISpc rtt=%u ret=%d",
server_index, index, &alist->addrs[index].transport, rtt_us, ret);
have_result |= afs_vl_probe_done(server);
if (have_result) {
server->probe.have_result = true;
wake_up_var(&server->probe.have_result);
wake_up_all(&server->probe_wq);
}
afs_done_one_vl_probe(server, have_result);
}
/*
......@@ -151,11 +171,10 @@ static bool afs_do_probe_vlserver(struct afs_net *net,
in_progress = true;
} else {
afs_prioritise_error(_e, PTR_ERR(call), ac.abort_code);
afs_done_one_vl_probe(server, false);
}
}
if (!in_progress)
afs_vl_probe_done(server);
return in_progress;
}
......@@ -193,7 +212,7 @@ int afs_wait_for_vl_probes(struct afs_vlserver_list *vllist,
{
struct wait_queue_entry *waits;
struct afs_vlserver *server;
unsigned int rtt = UINT_MAX;
unsigned int rtt = UINT_MAX, rtt_s;
bool have_responders = false;
int pref = -1, i;
......@@ -205,7 +224,7 @@ int afs_wait_for_vl_probes(struct afs_vlserver_list *vllist,
server = vllist->servers[i].server;
if (!test_bit(AFS_VLSERVER_FL_PROBING, &server->flags))
__clear_bit(i, &untried);
if (server->probe.responded)
if (server->probe.flags & AFS_VLSERVER_PROBE_RESPONDED)
have_responders = true;
}
}
......@@ -231,7 +250,7 @@ int afs_wait_for_vl_probes(struct afs_vlserver_list *vllist,
for (i = 0; i < vllist->nr_servers; i++) {
if (test_bit(i, &untried)) {
server = vllist->servers[i].server;
if (server->probe.responded)
if (server->probe.flags & AFS_VLSERVER_PROBE_RESPONDED)
goto stop;
if (test_bit(AFS_VLSERVER_FL_PROBING, &server->flags))
still_probing = true;
......@@ -249,10 +268,11 @@ int afs_wait_for_vl_probes(struct afs_vlserver_list *vllist,
for (i = 0; i < vllist->nr_servers; i++) {
if (test_bit(i, &untried)) {
server = vllist->servers[i].server;
if (server->probe.responded &&
server->probe.rtt < rtt) {
rtt_s = READ_ONCE(server->rtt);
if (test_bit(AFS_VLSERVER_FL_RESPONDING, &server->flags) &&
rtt_s < rtt) {
pref = i;
rtt = server->probe.rtt;
rtt = rtt_s;
}
remove_wait_queue(&server->probe_wq, &waits[i]);
......
......@@ -192,7 +192,8 @@ bool afs_select_vlserver(struct afs_vl_cursor *vc)
for (i = 0; i < vc->server_list->nr_servers; i++) {
struct afs_vlserver *s = vc->server_list->servers[i].server;
if (!test_bit(i, &vc->untried) || !s->probe.responded)
if (!test_bit(i, &vc->untried) ||
!test_bit(AFS_VLSERVER_FL_RESPONDING, &s->flags))
continue;
if (s->probe.rtt < rtt) {
vc->index = i;
......@@ -262,10 +263,14 @@ bool afs_select_vlserver(struct afs_vl_cursor *vc)
for (i = 0; i < vc->server_list->nr_servers; i++) {
struct afs_vlserver *s = vc->server_list->servers[i].server;
if (test_bit(AFS_VLSERVER_FL_RESPONDING, &s->flags))
e.responded = true;
afs_prioritise_error(&e, READ_ONCE(s->probe.error),
s->probe.abort_code);
}
error = e.error;
failed_set_error:
vc->error = error;
failed:
......
......@@ -59,7 +59,7 @@ bool rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *,
void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *);
void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
struct sockaddr_rxrpc *);
u32 rxrpc_kernel_get_srtt(struct socket *, struct rxrpc_call *);
bool rxrpc_kernel_get_srtt(struct socket *, struct rxrpc_call *, u32 *);
int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
rxrpc_user_attach_call_t, unsigned long, gfp_t,
unsigned int);
......
......@@ -138,11 +138,16 @@ enum rxrpc_recvmsg_trace {
};
enum rxrpc_rtt_tx_trace {
rxrpc_rtt_tx_cancel,
rxrpc_rtt_tx_data,
rxrpc_rtt_tx_no_slot,
rxrpc_rtt_tx_ping,
};
enum rxrpc_rtt_rx_trace {
rxrpc_rtt_rx_cancel,
rxrpc_rtt_rx_lost,
rxrpc_rtt_rx_obsolete,
rxrpc_rtt_rx_ping_response,
rxrpc_rtt_rx_requested_ack,
};
......@@ -339,10 +344,15 @@ enum rxrpc_tx_point {
E_(rxrpc_recvmsg_wait, "WAIT")
#define rxrpc_rtt_tx_traces \
EM(rxrpc_rtt_tx_cancel, "CNCE") \
EM(rxrpc_rtt_tx_data, "DATA") \
EM(rxrpc_rtt_tx_no_slot, "FULL") \
E_(rxrpc_rtt_tx_ping, "PING")
#define rxrpc_rtt_rx_traces \
EM(rxrpc_rtt_rx_cancel, "CNCL") \
EM(rxrpc_rtt_rx_obsolete, "OBSL") \
EM(rxrpc_rtt_rx_lost, "LOST") \
EM(rxrpc_rtt_rx_ping_response, "PONG") \
E_(rxrpc_rtt_rx_requested_ack, "RACK")
......@@ -1087,38 +1097,43 @@ TRACE_EVENT(rxrpc_recvmsg,
TRACE_EVENT(rxrpc_rtt_tx,
TP_PROTO(struct rxrpc_call *call, enum rxrpc_rtt_tx_trace why,
rxrpc_serial_t send_serial),
int slot, rxrpc_serial_t send_serial),
TP_ARGS(call, why, send_serial),
TP_ARGS(call, why, slot, send_serial),
TP_STRUCT__entry(
__field(unsigned int, call )
__field(enum rxrpc_rtt_tx_trace, why )
__field(int, slot )
__field(rxrpc_serial_t, send_serial )
),
TP_fast_assign(
__entry->call = call->debug_id;
__entry->why = why;
__entry->slot = slot;
__entry->send_serial = send_serial;
),
TP_printk("c=%08x %s sr=%08x",
TP_printk("c=%08x [%d] %s sr=%08x",
__entry->call,
__entry->slot,
__print_symbolic(__entry->why, rxrpc_rtt_tx_traces),
__entry->send_serial)
);
TRACE_EVENT(rxrpc_rtt_rx,
TP_PROTO(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
int slot,
rxrpc_serial_t send_serial, rxrpc_serial_t resp_serial,
u32 rtt, u32 rto),
TP_ARGS(call, why, send_serial, resp_serial, rtt, rto),
TP_ARGS(call, why, slot, send_serial, resp_serial, rtt, rto),
TP_STRUCT__entry(
__field(unsigned int, call )
__field(enum rxrpc_rtt_rx_trace, why )
__field(int, slot )
__field(rxrpc_serial_t, send_serial )
__field(rxrpc_serial_t, resp_serial )
__field(u32, rtt )
......@@ -1128,14 +1143,16 @@ TRACE_EVENT(rxrpc_rtt_rx,
TP_fast_assign(
__entry->call = call->debug_id;
__entry->why = why;
__entry->slot = slot;
__entry->send_serial = send_serial;
__entry->resp_serial = resp_serial;
__entry->rtt = rtt;
__entry->rto = rto;
),
TP_printk("c=%08x %s sr=%08x rr=%08x rtt=%u rto=%u",
TP_printk("c=%08x [%d] %s sr=%08x rr=%08x rtt=%u rto=%u",
__entry->call,
__entry->slot,
__print_symbolic(__entry->why, rxrpc_rtt_rx_traces),
__entry->send_serial,
__entry->resp_serial,
......
......@@ -488,7 +488,6 @@ enum rxrpc_call_flag {
RXRPC_CALL_RX_LAST, /* Received the last packet (at rxtx_top) */
RXRPC_CALL_TX_LAST, /* Last packet in Tx buffer (at rxtx_top) */
RXRPC_CALL_SEND_PING, /* A ping will need to be sent */
RXRPC_CALL_PINGING, /* Ping in process */
RXRPC_CALL_RETRANS_TIMEOUT, /* Retransmission due to timeout occurred */
RXRPC_CALL_BEGAN_RX_TIMER, /* We began the expect_rx_by timer */
RXRPC_CALL_RX_HEARD, /* The peer responded at least once to this call */
......@@ -673,9 +672,13 @@ struct rxrpc_call {
rxrpc_seq_t ackr_consumed; /* Highest packet shown consumed */
rxrpc_seq_t ackr_seen; /* Highest packet shown seen */
/* ping management */
rxrpc_serial_t ping_serial; /* Last ping sent */
ktime_t ping_time; /* Time last ping sent */
/* RTT management */
rxrpc_serial_t rtt_serial[4]; /* Serial number of DATA or PING sent */
ktime_t rtt_sent_at[4]; /* Time packet sent */
unsigned long rtt_avail; /* Mask of available slots in bits 0-3,
* Mask of pending samples in 8-11 */
#define RXRPC_CALL_RTT_AVAIL_MASK 0xf
#define RXRPC_CALL_RTT_PEND_SHIFT 8
/* transmission-phase ACK management */
ktime_t acks_latest_ts; /* Timestamp of latest ACK received */
......@@ -1037,7 +1040,7 @@ static inline bool __rxrpc_abort_eproto(struct rxrpc_call *call,
/*
* rtt.c
*/
void rxrpc_peer_add_rtt(struct rxrpc_call *, enum rxrpc_rtt_rx_trace,
void rxrpc_peer_add_rtt(struct rxrpc_call *, enum rxrpc_rtt_rx_trace, int,
rxrpc_serial_t, rxrpc_serial_t, ktime_t, ktime_t);
unsigned long rxrpc_get_rto_backoff(struct rxrpc_peer *, bool);
void rxrpc_peer_init_rtt(struct rxrpc_peer *);
......
......@@ -153,6 +153,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
call->cong_ssthresh = RXRPC_RXTX_BUFF_SIZE - 1;
call->rxnet = rxnet;
call->rtt_avail = RXRPC_CALL_RTT_AVAIL_MASK;
atomic_inc(&rxnet->nr_calls);
return call;
......
......@@ -608,36 +608,57 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
}
/*
* Process a requested ACK.
* See if there's a cached RTT probe to complete.
*/
static void rxrpc_input_requested_ack(struct rxrpc_call *call,
ktime_t resp_time,
rxrpc_serial_t orig_serial,
rxrpc_serial_t ack_serial)
static void rxrpc_complete_rtt_probe(struct rxrpc_call *call,
ktime_t resp_time,
rxrpc_serial_t acked_serial,
rxrpc_serial_t ack_serial,
enum rxrpc_rtt_rx_trace type)
{
struct rxrpc_skb_priv *sp;
struct sk_buff *skb;
rxrpc_serial_t orig_serial;
unsigned long avail;
ktime_t sent_at;
int ix;
bool matched = false;
int i;
for (ix = 0; ix < RXRPC_RXTX_BUFF_SIZE; ix++) {
skb = call->rxtx_buffer[ix];
if (!skb)
continue;
avail = READ_ONCE(call->rtt_avail);
smp_rmb(); /* Read avail bits before accessing data. */
sent_at = skb->tstamp;
smp_rmb(); /* Read timestamp before serial. */
sp = rxrpc_skb(skb);
if (sp->hdr.serial != orig_serial)
for (i = 0; i < ARRAY_SIZE(call->rtt_serial); i++) {
if (!test_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &avail))
continue;
goto found;
}
return;
sent_at = call->rtt_sent_at[i];
orig_serial = call->rtt_serial[i];
if (orig_serial == acked_serial) {
clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
smp_mb(); /* Read data before setting avail bit */
set_bit(i, &call->rtt_avail);
if (type != rxrpc_rtt_rx_cancel)
rxrpc_peer_add_rtt(call, type, i, acked_serial, ack_serial,
sent_at, resp_time);
else
trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_cancel, i,
orig_serial, acked_serial, 0, 0);
matched = true;
}
/* If a later serial is being acked, then mark this slot as
* being available.
*/
if (after(acked_serial, orig_serial)) {
trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_obsolete, i,
orig_serial, acked_serial, 0, 0);
clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
smp_wmb();
set_bit(i, &call->rtt_avail);
}
}
found:
rxrpc_peer_add_rtt(call, rxrpc_rtt_rx_requested_ack,
orig_serial, ack_serial, sent_at, resp_time);
if (!matched)
trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_lost, 9, 0, acked_serial, 0, 0);
}
/*
......@@ -682,27 +703,11 @@ static void rxrpc_input_check_for_lost_ack(struct rxrpc_call *call)
*/
static void rxrpc_input_ping_response(struct rxrpc_call *call,
ktime_t resp_time,
rxrpc_serial_t orig_serial,
rxrpc_serial_t acked_serial,
rxrpc_serial_t ack_serial)
{
rxrpc_serial_t ping_serial;
ktime_t ping_time;
ping_time = call->ping_time;
smp_rmb();
ping_serial = READ_ONCE(call->ping_serial);
if (orig_serial == call->acks_lost_ping)
if (acked_serial == call->acks_lost_ping)
rxrpc_input_check_for_lost_ack(call);
if (before(orig_serial, ping_serial) ||
!test_and_clear_bit(RXRPC_CALL_PINGING, &call->flags))
return;
if (after(orig_serial, ping_serial))
return;
rxrpc_peer_add_rtt(call, rxrpc_rtt_rx_ping_response,
orig_serial, ack_serial, ping_time, resp_time);
}
/*
......@@ -843,7 +848,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
struct rxrpc_ackinfo info;
u8 acks[RXRPC_MAXACKS];
} buf;
rxrpc_serial_t acked_serial;
rxrpc_serial_t ack_serial, acked_serial;
rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt;
int nr_acks, offset, ioffset;
......@@ -856,6 +861,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
}
offset += sizeof(buf.ack);
ack_serial = sp->hdr.serial;
acked_serial = ntohl(buf.ack.serial);
first_soft_ack = ntohl(buf.ack.firstPacket);
prev_pkt = ntohl(buf.ack.previousPacket);
......@@ -864,31 +870,42 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
summary.ack_reason = (buf.ack.reason < RXRPC_ACK__INVALID ?
buf.ack.reason : RXRPC_ACK__INVALID);
trace_rxrpc_rx_ack(call, sp->hdr.serial, acked_serial,
trace_rxrpc_rx_ack(call, ack_serial, acked_serial,
first_soft_ack, prev_pkt,
summary.ack_reason, nr_acks);
if (buf.ack.reason == RXRPC_ACK_PING_RESPONSE)
switch (buf.ack.reason) {
case RXRPC_ACK_PING_RESPONSE:
rxrpc_input_ping_response(call, skb->tstamp, acked_serial,
sp->hdr.serial);
if (buf.ack.reason == RXRPC_ACK_REQUESTED)
rxrpc_input_requested_ack(call, skb->tstamp, acked_serial,
sp->hdr.serial);
ack_serial);
rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial,
rxrpc_rtt_rx_ping_response);
break;
case RXRPC_ACK_REQUESTED:
rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial,
rxrpc_rtt_rx_requested_ack);
break;
default:
if (acked_serial != 0)
rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial,
rxrpc_rtt_rx_cancel);
break;
}
if (buf.ack.reason == RXRPC_ACK_PING) {
_proto("Rx ACK %%%u PING Request", sp->hdr.serial);
_proto("Rx ACK %%%u PING Request", ack_serial);
rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
sp->hdr.serial, true, true,
ack_serial, true, true,
rxrpc_propose_ack_respond_to_ping);
} else if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED,
sp->hdr.serial, true, true,
ack_serial, true, true,
rxrpc_propose_ack_respond_to_ack);
}
/* Discard any out-of-order or duplicate ACKs (outside lock). */
if (!rxrpc_is_ack_valid(call, first_soft_ack, prev_pkt)) {
trace_rxrpc_rx_discard_ack(call->debug_id, sp->hdr.serial,
trace_rxrpc_rx_discard_ack(call->debug_id, ack_serial,
first_soft_ack, call->ackr_first_seq,
prev_pkt, call->ackr_prev_seq);
return;
......@@ -904,7 +921,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
/* Discard any out-of-order or duplicate ACKs (inside lock). */
if (!rxrpc_is_ack_valid(call, first_soft_ack, prev_pkt)) {
trace_rxrpc_rx_discard_ack(call->debug_id, sp->hdr.serial,
trace_rxrpc_rx_discard_ack(call->debug_id, ack_serial,
first_soft_ack, call->ackr_first_seq,
prev_pkt, call->ackr_prev_seq);
goto out;
......@@ -964,7 +981,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
RXRPC_TX_ANNO_LAST &&
summary.nr_acks == call->tx_top - hard_ack &&
rxrpc_is_client_call(call))
rxrpc_propose_ACK(call, RXRPC_ACK_PING, sp->hdr.serial,
rxrpc_propose_ACK(call, RXRPC_ACK_PING, ack_serial,
false, true,
rxrpc_propose_ack_ping_for_lost_reply);
......
......@@ -123,6 +123,49 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
return top - hard_ack + 3;
}
/*
* Record the beginning of an RTT probe.
*/
static int rxrpc_begin_rtt_probe(struct rxrpc_call *call, rxrpc_serial_t serial,
enum rxrpc_rtt_tx_trace why)
{
unsigned long avail = call->rtt_avail;
int rtt_slot = 9;
if (!(avail & RXRPC_CALL_RTT_AVAIL_MASK))
goto no_slot;
rtt_slot = __ffs(avail & RXRPC_CALL_RTT_AVAIL_MASK);
if (!test_and_clear_bit(rtt_slot, &call->rtt_avail))
goto no_slot;
call->rtt_serial[rtt_slot] = serial;
call->rtt_sent_at[rtt_slot] = ktime_get_real();
smp_wmb(); /* Write data before avail bit */
set_bit(rtt_slot + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
trace_rxrpc_rtt_tx(call, why, rtt_slot, serial);
return rtt_slot;
no_slot:
trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_no_slot, rtt_slot, serial);
return -1;
}
/*
* Cancel an RTT probe.
*/
static void rxrpc_cancel_rtt_probe(struct rxrpc_call *call,
rxrpc_serial_t serial, int rtt_slot)
{
if (rtt_slot != -1) {
clear_bit(rtt_slot + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
smp_wmb(); /* Clear pending bit before setting slot */
set_bit(rtt_slot, &call->rtt_avail);
trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_cancel, rtt_slot, serial);
}
}
/*
* Send an ACK call packet.
*/
......@@ -136,7 +179,7 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
rxrpc_serial_t serial;
rxrpc_seq_t hard_ack, top;
size_t len, n;
int ret;
int ret, rtt_slot = -1;
u8 reason;
if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
......@@ -196,18 +239,8 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
if (_serial)
*_serial = serial;
if (ping) {
call->ping_serial = serial;
smp_wmb();
/* We need to stick a time in before we send the packet in case
* the reply gets back before kernel_sendmsg() completes - but
* asking UDP to send the packet can take a relatively long
* time.
*/
call->ping_time = ktime_get_real();
set_bit(RXRPC_CALL_PINGING, &call->flags);
trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_ping, serial);
}
if (ping)
rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_ping);
ret = kernel_sendmsg(conn->params.local->socket, &msg, iov, 2, len);
conn->params.peer->last_tx_at = ktime_get_seconds();
......@@ -221,8 +254,7 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
if (call->state < RXRPC_CALL_COMPLETE) {
if (ret < 0) {
if (ping)
clear_bit(RXRPC_CALL_PINGING, &call->flags);
rxrpc_cancel_rtt_probe(call, serial, rtt_slot);
rxrpc_propose_ACK(call, pkt->ack.reason,
ntohl(pkt->ack.serial),
false, true,
......@@ -321,7 +353,7 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
struct kvec iov[2];
rxrpc_serial_t serial;
size_t len;
int ret;
int ret, rtt_slot = -1;
_enter(",{%d}", skb->len);
......@@ -397,6 +429,8 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
sp->hdr.serial = serial;
smp_wmb(); /* Set serial before timestamp */
skb->tstamp = ktime_get_real();
if (whdr.flags & RXRPC_REQUEST_ACK)
rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_data);
/* send the packet by UDP
* - returns -EMSGSIZE if UDP would have to fragment the packet
......@@ -408,12 +442,15 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
conn->params.peer->last_tx_at = ktime_get_seconds();
up_read(&conn->params.local->defrag_sem);
if (ret < 0)
if (ret < 0) {
rxrpc_cancel_rtt_probe(call, serial, rtt_slot);
trace_rxrpc_tx_fail(call->debug_id, serial, ret,
rxrpc_tx_point_call_data_nofrag);
else
} else {
trace_rxrpc_tx_packet(call->debug_id, &whdr,
rxrpc_tx_point_call_data_nofrag);
}
rxrpc_tx_backoff(call, ret);
if (ret == -EMSGSIZE)
goto send_fragmentable;
......@@ -422,7 +459,6 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
if (ret >= 0) {
if (whdr.flags & RXRPC_REQUEST_ACK) {
call->peer->rtt_last_req = skb->tstamp;
trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_data, serial);
if (call->peer->rtt_count > 1) {
unsigned long nowj = jiffies, ack_lost_at;
......@@ -469,6 +505,8 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
sp->hdr.serial = serial;
smp_wmb(); /* Set serial before timestamp */
skb->tstamp = ktime_get_real();
if (whdr.flags & RXRPC_REQUEST_ACK)
rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_data);
switch (conn->params.local->srx.transport.family) {
case AF_INET6:
......@@ -487,12 +525,14 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
BUG();
}
if (ret < 0)
if (ret < 0) {
rxrpc_cancel_rtt_probe(call, serial, rtt_slot);
trace_rxrpc_tx_fail(call->debug_id, serial, ret,
rxrpc_tx_point_call_data_frag);
else
} else {
trace_rxrpc_tx_packet(call->debug_id, &whdr,
rxrpc_tx_point_call_data_frag);
}
rxrpc_tx_backoff(call, ret);
up_write(&conn->params.local->defrag_sem);
......
......@@ -502,11 +502,21 @@ EXPORT_SYMBOL(rxrpc_kernel_get_peer);
* rxrpc_kernel_get_srtt - Get a call's peer smoothed RTT
* @sock: The socket on which the call is in progress.
* @call: The call to query
* @_srtt: Where to store the SRTT value.
*
* Get the call's peer smoothed RTT.
* Get the call's peer smoothed RTT in uS.
*/
u32 rxrpc_kernel_get_srtt(struct socket *sock, struct rxrpc_call *call)
bool rxrpc_kernel_get_srtt(struct socket *sock, struct rxrpc_call *call,
u32 *_srtt)
{
return call->peer->srtt_us >> 3;
struct rxrpc_peer *peer = call->peer;
if (peer->rtt_count == 0) {
*_srtt = 1000000; /* 1S */
return false;
}
*_srtt = call->peer->srtt_us >> 3;
return true;
}
EXPORT_SYMBOL(rxrpc_kernel_get_srtt);
......@@ -146,6 +146,7 @@ static void rxrpc_ack_update_rtt(struct rxrpc_peer *peer, long rtt_us)
* exclusive access to the peer RTT data.
*/
void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
int rtt_slot,
rxrpc_serial_t send_serial, rxrpc_serial_t resp_serial,
ktime_t send_time, ktime_t resp_time)
{
......@@ -162,7 +163,7 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
peer->rtt_count++;
spin_unlock(&peer->rtt_input_lock);
trace_rxrpc_rtt_rx(call, why, send_serial, resp_serial,
trace_rxrpc_rtt_rx(call, why, rtt_slot, send_serial, resp_serial,
peer->srtt_us >> 3, peer->rto_j);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment