Commit 60cd6e63 authored by David S. Miller's avatar David S. Miller

Merge tag 'rxrpc-rewrite-20160922-v2' of...

Merge tag 'rxrpc-rewrite-20160922-v2' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc: Preparation for slow-start algorithm [ver #2]

Here are some patches that prepare for improvements in ACK generation and
for the implementation of the slow-start part of the protocol:

 (1) Stop storing the protocol header in the Tx socket buffers, but rather
     generate it on the fly.  This potentially saves a little space and
     makes it easier to alter the header just before transmission (the
     flags may get altered and the serial number has to be changed).

 (2) Mask off the Tx buffer annotations and add a flag to record which ones
     have already been resent.

 (3) Track RTT on a per-peer basis for use in future changes.  Tracepoints
     are added to log this.

 (4) Send PING ACKs in response to incoming calls to elicit a PING-RESPONSE
     ACK from which RTT data can be calculated.  The response also carries
     other useful information.

 (5) Expedite PING-RESPONSE ACK generation from sendmsg.  If we're actively
     using sendmsg, this allows us, under some circumstances, to avoid
     having to rely on the background work item to run to generate this
     ACK.

     This requires ktime_sub_ms() to be added.

 (6) Set the REQUEST-ACK flag on some DATA packets to elicit ACK-REQUESTED
     ACKs from which RTT data can be calculated.

 (7) Limit the use of pings and ACK requests for RTT determination.

Changes:

 (V2) Don't use the C division operator for 64-bit division.  One instance
      should use do_div() and the other should be using nsecs_to_jiffies().

      The last two patches got transposed, leading to an undefined symbol
      in one of them.
Reported-by: default avatarkbuild test robot <lkp@intel.com>
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents cdd0766d fc943f67
......@@ -231,6 +231,11 @@ static inline ktime_t ktime_sub_us(const ktime_t kt, const u64 usec)
return ktime_sub_ns(kt, usec * NSEC_PER_USEC);
}
static inline ktime_t ktime_sub_ms(const ktime_t kt, const u64 msec)
{
return ktime_sub_ns(kt, msec * NSEC_PER_MSEC);
}
extern ktime_t ktime_add_safe(const ktime_t lhs, const ktime_t rhs);
/**
......
......@@ -353,6 +353,67 @@ TRACE_EVENT(rxrpc_recvmsg,
__entry->ret)
);
TRACE_EVENT(rxrpc_rtt_tx,
TP_PROTO(struct rxrpc_call *call, enum rxrpc_rtt_tx_trace why,
rxrpc_serial_t send_serial),
TP_ARGS(call, why, send_serial),
TP_STRUCT__entry(
__field(struct rxrpc_call *, call )
__field(enum rxrpc_rtt_tx_trace, why )
__field(rxrpc_serial_t, send_serial )
),
TP_fast_assign(
__entry->call = call;
__entry->why = why;
__entry->send_serial = send_serial;
),
TP_printk("c=%p %s sr=%08x",
__entry->call,
rxrpc_rtt_tx_traces[__entry->why],
__entry->send_serial)
);
TRACE_EVENT(rxrpc_rtt_rx,
TP_PROTO(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
rxrpc_serial_t send_serial, rxrpc_serial_t resp_serial,
s64 rtt, u8 nr, s64 avg),
TP_ARGS(call, why, send_serial, resp_serial, rtt, nr, avg),
TP_STRUCT__entry(
__field(struct rxrpc_call *, call )
__field(enum rxrpc_rtt_rx_trace, why )
__field(u8, nr )
__field(rxrpc_serial_t, send_serial )
__field(rxrpc_serial_t, resp_serial )
__field(s64, rtt )
__field(u64, avg )
),
TP_fast_assign(
__entry->call = call;
__entry->why = why;
__entry->send_serial = send_serial;
__entry->resp_serial = resp_serial;
__entry->rtt = rtt;
__entry->nr = nr;
__entry->avg = avg;
),
TP_printk("c=%p %s sr=%08x rr=%08x rtt=%lld nr=%u avg=%lld",
__entry->call,
rxrpc_rtt_rx_traces[__entry->why],
__entry->send_serial,
__entry->resp_serial,
__entry->rtt,
__entry->nr,
__entry->avg)
);
#endif /* _TRACE_RXRPC_H */
/* This part must be outside protection */
......
......@@ -142,10 +142,7 @@ struct rxrpc_host_header {
*/
struct rxrpc_skb_priv {
union {
unsigned long resend_at; /* time in jiffies at which to resend */
struct {
u8 nr_jumbo; /* Number of jumbo subpackets */
};
u8 nr_jumbo; /* Number of jumbo subpackets */
};
union {
unsigned int offset; /* offset into buffer of next read */
......@@ -258,10 +255,12 @@ struct rxrpc_peer {
/* calculated RTT cache */
#define RXRPC_RTT_CACHE_SIZE 32
suseconds_t rtt; /* current RTT estimate (in uS) */
unsigned int rtt_point; /* next entry at which to insert */
unsigned int rtt_usage; /* amount of cache actually used */
suseconds_t rtt_cache[RXRPC_RTT_CACHE_SIZE]; /* calculated RTT cache */
ktime_t rtt_last_req; /* Time of last RTT request */
u64 rtt; /* Current RTT estimate (in nS) */
u64 rtt_sum; /* Sum of cache contents */
u64 rtt_cache[RXRPC_RTT_CACHE_SIZE]; /* Determined RTT cache */
u8 rtt_cursor; /* next entry at which to insert */
u8 rtt_usage; /* amount of cache actually used */
};
/*
......@@ -385,10 +384,9 @@ struct rxrpc_connection {
int debug_id; /* debug ID for printks */
atomic_t serial; /* packet serial number counter */
unsigned int hi_serial; /* highest serial number received */
u32 security_nonce; /* response re-use preventer */
u8 size_align; /* data size alignment (for security) */
u8 header_size; /* rxrpc + security header size */
u8 security_size; /* security header size */
u32 security_nonce; /* response re-use preventer */
u8 security_ix; /* security type */
u8 out_clientflag; /* RXRPC_CLIENT_INITIATED if we are client */
};
......@@ -403,6 +401,7 @@ enum rxrpc_call_flag {
RXRPC_CALL_EXPOSED, /* The call was exposed to the world */
RXRPC_CALL_RX_LAST, /* Received the last packet (at rxtx_top) */
RXRPC_CALL_TX_LAST, /* Last packet in Tx buffer (at rxtx_top) */
RXRPC_CALL_PINGING, /* Ping in process */
};
/*
......@@ -487,6 +486,8 @@ struct rxrpc_call {
u32 call_id; /* call ID on connection */
u32 cid; /* connection ID plus channel index */
int debug_id; /* debug ID for printks */
unsigned short rx_pkt_offset; /* Current recvmsg packet offset */
unsigned short rx_pkt_len; /* Current recvmsg packet len */
/* Rx/Tx circular buffer, depending on phase.
*
......@@ -506,6 +507,8 @@ struct rxrpc_call {
#define RXRPC_TX_ANNO_UNACK 1
#define RXRPC_TX_ANNO_NAK 2
#define RXRPC_TX_ANNO_RETRANS 3
#define RXRPC_TX_ANNO_MASK 0x03
#define RXRPC_TX_ANNO_RESENT 0x04
#define RXRPC_RX_ANNO_JUMBO 0x3f /* Jumbo subpacket number + 1 if not zero */
#define RXRPC_RX_ANNO_JLAST 0x40 /* Set if last element of a jumbo packet */
#define RXRPC_RX_ANNO_VERIFIED 0x80 /* Set if verified and decrypted */
......@@ -528,8 +531,8 @@ struct rxrpc_call {
u16 ackr_skew; /* skew on packet being ACK'd */
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
rxrpc_seq_t ackr_prev_seq; /* previous sequence number received */
unsigned short rx_pkt_offset; /* Current recvmsg packet offset */
unsigned short rx_pkt_len; /* Current recvmsg packet len */
rxrpc_serial_t ackr_ping; /* Last ping sent */
ktime_t ackr_ping_time; /* Time last ping sent */
/* transmission-phase ACK management */
rxrpc_serial_t acks_latest; /* serial number of latest ACK received */
......@@ -656,6 +659,22 @@ enum rxrpc_recvmsg_trace {
extern const char rxrpc_recvmsg_traces[rxrpc_recvmsg__nr_trace][5];
enum rxrpc_rtt_tx_trace {
rxrpc_rtt_tx_ping,
rxrpc_rtt_tx_data,
rxrpc_rtt_tx__nr_trace
};
extern const char rxrpc_rtt_tx_traces[rxrpc_rtt_tx__nr_trace][5];
enum rxrpc_rtt_rx_trace {
rxrpc_rtt_rx_ping_response,
rxrpc_rtt_rx_requested_ack,
rxrpc_rtt_rx__nr_trace
};
extern const char rxrpc_rtt_rx_traces[rxrpc_rtt_rx__nr_trace][5];
extern const char *const rxrpc_pkts[];
extern const char *rxrpc_acks(u8 reason);
......@@ -946,7 +965,7 @@ extern const s8 rxrpc_ack_priority[];
* output.c
*/
int rxrpc_send_call_packet(struct rxrpc_call *, u8);
int rxrpc_send_data_packet(struct rxrpc_connection *, struct sk_buff *);
int rxrpc_send_data_packet(struct rxrpc_call *, struct sk_buff *);
void rxrpc_reject_packets(struct rxrpc_local *);
/*
......@@ -954,6 +973,8 @@ void rxrpc_reject_packets(struct rxrpc_local *);
*/
void rxrpc_error_report(struct sock *);
void rxrpc_peer_error_distributor(struct work_struct *);
void rxrpc_peer_add_rtt(struct rxrpc_call *, enum rxrpc_rtt_rx_trace,
rxrpc_serial_t, rxrpc_serial_t, ktime_t, ktime_t);
/*
* peer_object.c
......
......@@ -139,16 +139,17 @@ void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
*/
static void rxrpc_resend(struct rxrpc_call *call)
{
struct rxrpc_wire_header *whdr;
struct rxrpc_skb_priv *sp;
struct sk_buff *skb;
rxrpc_seq_t cursor, seq, top;
unsigned long resend_at, now;
ktime_t now = ktime_get_real(), max_age, oldest, resend_at;
int ix;
u8 annotation;
u8 annotation, anno_type;
_enter("{%d,%d}", call->tx_hard_ack, call->tx_top);
max_age = ktime_sub_ms(now, rxrpc_resend_timeout);
spin_lock_bh(&call->lock);
cursor = call->tx_hard_ack;
......@@ -161,31 +162,33 @@ static void rxrpc_resend(struct rxrpc_call *call)
* the packets in the Tx buffer we're going to resend and what the new
* resend timeout will be.
*/
now = jiffies;
resend_at = now + rxrpc_resend_timeout;
oldest = now;
for (seq = cursor + 1; before_eq(seq, top); seq++) {
ix = seq & RXRPC_RXTX_BUFF_MASK;
annotation = call->rxtx_annotations[ix];
if (annotation == RXRPC_TX_ANNO_ACK)
anno_type = annotation & RXRPC_TX_ANNO_MASK;
annotation &= ~RXRPC_TX_ANNO_MASK;
if (anno_type == RXRPC_TX_ANNO_ACK)
continue;
skb = call->rxtx_buffer[ix];
rxrpc_see_skb(skb, rxrpc_skb_tx_seen);
sp = rxrpc_skb(skb);
if (annotation == RXRPC_TX_ANNO_UNACK) {
if (time_after(sp->resend_at, now)) {
if (time_before(sp->resend_at, resend_at))
resend_at = sp->resend_at;
if (anno_type == RXRPC_TX_ANNO_UNACK) {
if (ktime_after(skb->tstamp, max_age)) {
if (ktime_before(skb->tstamp, oldest))
oldest = skb->tstamp;
continue;
}
}
/* Okay, we need to retransmit a packet. */
call->rxtx_annotations[ix] = RXRPC_TX_ANNO_RETRANS;
call->rxtx_annotations[ix] = RXRPC_TX_ANNO_RETRANS | annotation;
}
call->resend_at = resend_at;
resend_at = ktime_sub(ktime_add_ns(oldest, rxrpc_resend_timeout), now);
call->resend_at = jiffies + nsecs_to_jiffies(ktime_to_ns(resend_at));
/* Now go through the Tx window and perform the retransmissions. We
* have to drop the lock for each send. If an ACK comes in whilst the
......@@ -195,29 +198,21 @@ static void rxrpc_resend(struct rxrpc_call *call)
for (seq = cursor + 1; before_eq(seq, top); seq++) {
ix = seq & RXRPC_RXTX_BUFF_MASK;
annotation = call->rxtx_annotations[ix];
if (annotation != RXRPC_TX_ANNO_RETRANS)
anno_type = annotation & RXRPC_TX_ANNO_MASK;
if (anno_type != RXRPC_TX_ANNO_RETRANS)
continue;
skb = call->rxtx_buffer[ix];
rxrpc_get_skb(skb, rxrpc_skb_tx_got);
spin_unlock_bh(&call->lock);
sp = rxrpc_skb(skb);
/* Each Tx packet needs a new serial number */
sp->hdr.serial = atomic_inc_return(&call->conn->serial);
whdr = (struct rxrpc_wire_header *)skb->head;
whdr->serial = htonl(sp->hdr.serial);
if (rxrpc_send_data_packet(call->conn, skb) < 0) {
call->resend_at = now + 2;
if (rxrpc_send_data_packet(call, skb) < 0) {
rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
return;
}
if (rxrpc_is_client_call(call))
rxrpc_expose_client_call(call);
sp->resend_at = now + rxrpc_resend_timeout;
rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
spin_lock_bh(&call->lock);
......@@ -227,10 +222,17 @@ static void rxrpc_resend(struct rxrpc_call *call)
* received and the packet might have been hard-ACK'd (in which
* case it will no longer be in the buffer).
*/
if (after(seq, call->tx_hard_ack) &&
(call->rxtx_annotations[ix] == RXRPC_TX_ANNO_RETRANS ||
call->rxtx_annotations[ix] == RXRPC_TX_ANNO_NAK))
call->rxtx_annotations[ix] = RXRPC_TX_ANNO_UNACK;
if (after(seq, call->tx_hard_ack)) {
annotation = call->rxtx_annotations[ix];
anno_type = annotation & RXRPC_TX_ANNO_MASK;
if (anno_type == RXRPC_TX_ANNO_RETRANS ||
anno_type == RXRPC_TX_ANNO_NAK) {
annotation &= ~RXRPC_TX_ANNO_MASK;
annotation |= RXRPC_TX_ANNO_UNACK;
}
annotation |= RXRPC_TX_ANNO_RESENT;
call->rxtx_annotations[ix] = annotation;
}
if (after(call->tx_hard_ack, seq))
seq = call->tx_hard_ack;
......
......@@ -53,7 +53,6 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
spin_lock_init(&conn->state_lock);
conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
conn->size_align = 4;
conn->header_size = sizeof(struct rxrpc_wire_header);
conn->idle_timestamp = jiffies;
}
......
......@@ -36,6 +36,22 @@ static void rxrpc_proto_abort(const char *why,
}
}
/*
* Ping the other end to fill our RTT cache and to retrieve the rwind
* and MTU parameters.
*/
static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb,
int skew)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
ktime_t now = skb->tstamp;
if (call->peer->rtt_usage < 3 ||
ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), now))
rxrpc_propose_ACK(call, RXRPC_ACK_PING, skew, sp->hdr.serial,
true, true);
}
/*
* Apply a hard ACK by advancing the Tx window.
*/
......@@ -342,6 +358,64 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb,
_leave(" [queued]");
}
/*
* Process a requested ACK.
*/
static void rxrpc_input_requested_ack(struct rxrpc_call *call,
ktime_t resp_time,
rxrpc_serial_t orig_serial,
rxrpc_serial_t ack_serial)
{
struct rxrpc_skb_priv *sp;
struct sk_buff *skb;
ktime_t sent_at;
int ix;
for (ix = 0; ix < RXRPC_RXTX_BUFF_SIZE; ix++) {
skb = call->rxtx_buffer[ix];
if (!skb)
continue;
sp = rxrpc_skb(skb);
if (sp->hdr.serial != orig_serial)
continue;
smp_rmb();
sent_at = skb->tstamp;
goto found;
}
return;
found:
rxrpc_peer_add_rtt(call, rxrpc_rtt_rx_requested_ack,
orig_serial, ack_serial, sent_at, resp_time);
}
/*
* Process a ping response.
*/
static void rxrpc_input_ping_response(struct rxrpc_call *call,
ktime_t resp_time,
rxrpc_serial_t orig_serial,
rxrpc_serial_t ack_serial)
{
rxrpc_serial_t ping_serial;
ktime_t ping_time;
ping_time = call->ackr_ping_time;
smp_rmb();
ping_serial = call->ackr_ping;
if (!test_bit(RXRPC_CALL_PINGING, &call->flags) ||
before(orig_serial, ping_serial))
return;
clear_bit(RXRPC_CALL_PINGING, &call->flags);
if (after(orig_serial, ping_serial))
return;
rxrpc_peer_add_rtt(call, rxrpc_rtt_rx_ping_response,
orig_serial, ack_serial, ping_time, resp_time);
}
/*
* Process the extra information that may be appended to an ACK packet
*/
......@@ -388,17 +462,25 @@ static void rxrpc_input_soft_acks(struct rxrpc_call *call, u8 *acks,
{
bool resend = false;
int ix;
u8 annotation, anno_type;
for (; nr_acks > 0; nr_acks--, seq++) {
ix = seq & RXRPC_RXTX_BUFF_MASK;
annotation = call->rxtx_annotations[ix];
anno_type = annotation & RXRPC_TX_ANNO_MASK;
annotation &= ~RXRPC_TX_ANNO_MASK;
switch (*acks++) {
case RXRPC_ACK_TYPE_ACK:
call->rxtx_annotations[ix] = RXRPC_TX_ANNO_ACK;
if (anno_type == RXRPC_TX_ANNO_ACK)
continue;
call->rxtx_annotations[ix] =
RXRPC_TX_ANNO_ACK | annotation;
break;
case RXRPC_ACK_TYPE_NACK:
if (call->rxtx_annotations[ix] == RXRPC_TX_ANNO_NAK)
if (anno_type == RXRPC_TX_ANNO_NAK)
continue;
call->rxtx_annotations[ix] = RXRPC_TX_ANNO_NAK;
call->rxtx_annotations[ix] =
RXRPC_TX_ANNO_NAK | annotation;
resend = true;
break;
default:
......@@ -430,6 +512,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
struct rxrpc_ackinfo info;
u8 acks[RXRPC_MAXACKS];
} buf;
rxrpc_serial_t acked_serial;
rxrpc_seq_t first_soft_ack, hard_ack;
int nr_acks, offset;
......@@ -441,6 +524,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
}
sp->offset += sizeof(buf.ack);
acked_serial = ntohl(buf.ack.serial);
first_soft_ack = ntohl(buf.ack.firstPacket);
hard_ack = first_soft_ack - 1;
nr_acks = buf.ack.nAcks;
......@@ -452,10 +536,17 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
ntohs(buf.ack.maxSkew),
first_soft_ack,
ntohl(buf.ack.previousPacket),
ntohl(buf.ack.serial),
acked_serial,
rxrpc_acks(buf.ack.reason),
buf.ack.nAcks);
if (buf.ack.reason == RXRPC_ACK_PING_RESPONSE)
rxrpc_input_ping_response(call, skb->tstamp, acked_serial,
sp->hdr.serial);
if (buf.ack.reason == RXRPC_ACK_REQUESTED)
rxrpc_input_requested_ack(call, skb->tstamp, acked_serial,
sp->hdr.serial);
if (buf.ack.reason == RXRPC_ACK_PING) {
_proto("Rx ACK %%%u PING Request", sp->hdr.serial);
rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
......@@ -822,6 +913,7 @@ void rxrpc_data_ready(struct sock *udp_sk)
rcu_read_unlock();
goto reject_packet;
}
rxrpc_send_ping(call, skb, skew);
}
rxrpc_input_call_packet(call, skb, skew);
......
......@@ -68,9 +68,9 @@ unsigned int rxrpc_rx_mtu = 5692;
unsigned int rxrpc_rx_jumbo_max = 4;
/*
* Time till packet resend (in jiffies).
* Time till packet resend (in milliseconds).
*/
unsigned int rxrpc_resend_timeout = 4 * HZ;
unsigned int rxrpc_resend_timeout = 4 * 1000;
const char *const rxrpc_pkts[] = {
"?00",
......@@ -83,11 +83,12 @@ const s8 rxrpc_ack_priority[] = {
[RXRPC_ACK_DELAY] = 1,
[RXRPC_ACK_REQUESTED] = 2,
[RXRPC_ACK_IDLE] = 3,
[RXRPC_ACK_PING_RESPONSE] = 4,
[RXRPC_ACK_DUPLICATE] = 5,
[RXRPC_ACK_OUT_OF_SEQUENCE] = 6,
[RXRPC_ACK_EXCEEDS_WINDOW] = 7,
[RXRPC_ACK_NOSPACE] = 8,
[RXRPC_ACK_DUPLICATE] = 4,
[RXRPC_ACK_OUT_OF_SEQUENCE] = 5,
[RXRPC_ACK_EXCEEDS_WINDOW] = 6,
[RXRPC_ACK_NOSPACE] = 7,
[RXRPC_ACK_PING_RESPONSE] = 8,
[RXRPC_ACK_PING] = 9,
};
const char *rxrpc_acks(u8 reason)
......@@ -182,3 +183,13 @@ const char rxrpc_recvmsg_traces[rxrpc_recvmsg__nr_trace][5] = {
[rxrpc_recvmsg_to_be_accepted] = "TBAC",
[rxrpc_recvmsg_return] = "RETN",
};
const char rxrpc_rtt_tx_traces[rxrpc_rtt_tx__nr_trace][5] = {
[rxrpc_rtt_tx_ping] = "PING",
[rxrpc_rtt_tx_data] = "DATA",
};
const char rxrpc_rtt_rx_traces[rxrpc_rtt_rx__nr_trace][5] = {
[rxrpc_rtt_rx_ping_response] = "PONG",
[rxrpc_rtt_rx_requested_ack] = "RACK",
};
......@@ -57,6 +57,9 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_call *call,
pkt->ack.reason = call->ackr_reason;
pkt->ack.nAcks = top - hard_ack;
if (pkt->ack.reason == RXRPC_ACK_PING)
pkt->whdr.flags |= RXRPC_REQUEST_ACK;
if (after(top, hard_ack)) {
seq = hard_ack + 1;
do {
......@@ -97,6 +100,7 @@ int rxrpc_send_call_packet(struct rxrpc_call *call, u8 type)
struct kvec iov[2];
rxrpc_serial_t serial;
size_t len, n;
bool ping = false;
int ioc, ret;
u32 abort_code;
......@@ -147,6 +151,7 @@ int rxrpc_send_call_packet(struct rxrpc_call *call, u8 type)
ret = 0;
goto out;
}
ping = (call->ackr_reason == RXRPC_ACK_PING);
n = rxrpc_fill_out_ack(call, pkt);
call->ackr_reason = 0;
......@@ -183,12 +188,29 @@ int rxrpc_send_call_packet(struct rxrpc_call *call, u8 type)
goto out;
}
if (ping) {
call->ackr_ping = serial;
smp_wmb();
/* We need to stick a time in before we send the packet in case
* the reply gets back before kernel_sendmsg() completes - but
* asking UDP to send the packet can take a relatively long
* time, so we update the time after, on the assumption that
* the packet transmission is more likely to happen towards the
* end of the kernel_sendmsg() call.
*/
call->ackr_ping_time = ktime_get_real();
set_bit(RXRPC_CALL_PINGING, &call->flags);
trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_ping, serial);
}
ret = kernel_sendmsg(conn->params.local->socket,
&msg, iov, ioc, len);
if (ping)
call->ackr_ping_time = ktime_get_real();
if (ret < 0 && call->state < RXRPC_CALL_COMPLETE) {
switch (type) {
case RXRPC_PACKET_TYPE_ACK:
clear_bit(RXRPC_CALL_PINGING, &call->flags);
rxrpc_propose_ACK(call, pkt->ack.reason,
ntohs(pkt->ack.maxSkew),
ntohl(pkt->ack.serial),
......@@ -208,23 +230,52 @@ int rxrpc_send_call_packet(struct rxrpc_call *call, u8 type)
/*
* send a packet through the transport endpoint
*/
int rxrpc_send_data_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb)
{
struct kvec iov[1];
struct rxrpc_connection *conn = call->conn;
struct rxrpc_wire_header whdr;
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct msghdr msg;
struct kvec iov[2];
rxrpc_serial_t serial;
size_t len;
int ret, opt;
_enter(",{%d}", skb->len);
iov[0].iov_base = skb->head;
iov[0].iov_len = skb->len;
/* Each transmission of a Tx packet needs a new serial number */
serial = atomic_inc_return(&conn->serial);
whdr.epoch = htonl(conn->proto.epoch);
whdr.cid = htonl(call->cid);
whdr.callNumber = htonl(call->call_id);
whdr.seq = htonl(sp->hdr.seq);
whdr.serial = htonl(serial);
whdr.type = RXRPC_PACKET_TYPE_DATA;
whdr.flags = sp->hdr.flags;
whdr.userStatus = 0;
whdr.securityIndex = call->security_ix;
whdr._rsvd = htons(sp->hdr._rsvd);
whdr.serviceId = htons(call->service_id);
iov[0].iov_base = &whdr;
iov[0].iov_len = sizeof(whdr);
iov[1].iov_base = skb->head;
iov[1].iov_len = skb->len;
len = iov[0].iov_len + iov[1].iov_len;
msg.msg_name = &conn->params.peer->srx.transport;
msg.msg_namelen = conn->params.peer->srx.transport_len;
msg.msg_name = &call->peer->srx.transport;
msg.msg_namelen = call->peer->srx.transport_len;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
/* If our RTT cache needs working on, request an ACK. */
if ((call->peer->rtt_usage < 3 && sp->hdr.seq & 1) ||
ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000),
ktime_get_real()))
whdr.flags |= RXRPC_REQUEST_ACK;
if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
static int lose;
if ((lose++ & 7) == 7) {
......@@ -234,26 +285,39 @@ int rxrpc_send_data_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
}
}
_proto("Tx DATA %%%u { #%u }", serial, sp->hdr.seq);
/* send the packet with the don't fragment bit set if we currently
* think it's small enough */
if (skb->len - sizeof(struct rxrpc_wire_header) < conn->params.peer->maxdata) {
down_read(&conn->params.local->defrag_sem);
/* send the packet by UDP
* - returns -EMSGSIZE if UDP would have to fragment the packet
* to go out of the interface
* - in which case, we'll have processed the ICMP error
* message and update the peer record
*/
ret = kernel_sendmsg(conn->params.local->socket, &msg, iov, 1,
iov[0].iov_len);
up_read(&conn->params.local->defrag_sem);
if (ret == -EMSGSIZE)
goto send_fragmentable;
_leave(" = %d [%u]", ret, conn->params.peer->maxdata);
return ret;
if (iov[1].iov_len >= call->peer->maxdata)
goto send_fragmentable;
down_read(&conn->params.local->defrag_sem);
/* send the packet by UDP
* - returns -EMSGSIZE if UDP would have to fragment the packet
* to go out of the interface
* - in which case, we'll have processed the ICMP error
* message and update the peer record
*/
ret = kernel_sendmsg(conn->params.local->socket, &msg, iov, 2, len);
up_read(&conn->params.local->defrag_sem);
if (ret == -EMSGSIZE)
goto send_fragmentable;
done:
if (ret >= 0) {
ktime_t now = ktime_get_real();
skb->tstamp = now;
smp_wmb();
sp->hdr.serial = serial;
if (whdr.flags & RXRPC_REQUEST_ACK) {
call->peer->rtt_last_req = now;
trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_data, serial);
}
}
_leave(" = %d [%u]", ret, call->peer->maxdata);
return ret;
send_fragmentable:
/* attempt to send this message with fragmentation enabled */
......@@ -268,8 +332,8 @@ int rxrpc_send_data_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
SOL_IP, IP_MTU_DISCOVER,
(char *)&opt, sizeof(opt));
if (ret == 0) {
ret = kernel_sendmsg(conn->params.local->socket, &msg, iov, 1,
iov[0].iov_len);
ret = kernel_sendmsg(conn->params.local->socket, &msg,
iov, 2, len);
opt = IP_PMTUDISC_DO;
kernel_setsockopt(conn->params.local->socket, SOL_IP,
......@@ -298,8 +362,7 @@ int rxrpc_send_data_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
}
up_write(&conn->params.local->defrag_sem);
_leave(" = %d [frag %u]", ret, conn->params.peer->maxdata);
return ret;
goto done;
}
/*
......
......@@ -305,3 +305,44 @@ void rxrpc_peer_error_distributor(struct work_struct *work)
rxrpc_put_peer(peer);
_leave("");
}
/*
* Add RTT information to cache. This is called in softirq mode and has
* exclusive access to the peer RTT data.
*/
void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
rxrpc_serial_t send_serial, rxrpc_serial_t resp_serial,
ktime_t send_time, ktime_t resp_time)
{
struct rxrpc_peer *peer = call->peer;
s64 rtt;
u64 sum = peer->rtt_sum, avg;
u8 cursor = peer->rtt_cursor, usage = peer->rtt_usage;
rtt = ktime_to_ns(ktime_sub(resp_time, send_time));
if (rtt < 0)
return;
/* Replace the oldest datum in the RTT buffer */
sum -= peer->rtt_cache[cursor];
sum += rtt;
peer->rtt_cache[cursor] = rtt;
peer->rtt_cursor = (cursor + 1) & (RXRPC_RTT_CACHE_SIZE - 1);
peer->rtt_sum = sum;
if (usage < RXRPC_RTT_CACHE_SIZE) {
usage++;
peer->rtt_usage = usage;
}
/* Now recalculate the average */
if (usage == RXRPC_RTT_CACHE_SIZE) {
avg = sum / RXRPC_RTT_CACHE_SIZE;
} else {
avg = sum;
do_div(avg, usage);
}
peer->rtt = avg;
trace_rxrpc_rtt_rx(call, why, send_serial, resp_serial, rtt,
usage, avg);
}
......@@ -244,6 +244,7 @@ static void rxrpc_init_peer(struct rxrpc_peer *peer, unsigned long hash_key)
peer->hash_key = hash_key;
rxrpc_assess_MTU_size(peer);
peer->mtu = peer->if_mtu;
peer->rtt_last_req = ktime_get_real();
switch (peer->srx.transport.family) {
case AF_INET:
......
......@@ -80,12 +80,10 @@ static int rxkad_init_connection_security(struct rxrpc_connection *conn)
case RXRPC_SECURITY_AUTH:
conn->size_align = 8;
conn->security_size = sizeof(struct rxkad_level1_hdr);
conn->header_size += sizeof(struct rxkad_level1_hdr);
break;
case RXRPC_SECURITY_ENCRYPT:
conn->size_align = 8;
conn->security_size = sizeof(struct rxkad_level2_hdr);
conn->header_size += sizeof(struct rxkad_level2_hdr);
break;
default:
ret = -EKEYREJECTED;
......@@ -161,7 +159,7 @@ static int rxkad_secure_packet_auth(const struct rxrpc_call *call,
_enter("");
check = sp->hdr.seq ^ sp->hdr.callNumber;
check = sp->hdr.seq ^ call->call_id;
data_size |= (u32)check << 16;
hdr.data_size = htonl(data_size);
......@@ -205,7 +203,7 @@ static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
_enter("");
check = sp->hdr.seq ^ sp->hdr.callNumber;
check = sp->hdr.seq ^ call->call_id;
rxkhdr.data_size = htonl(data_size | (u32)check << 16);
rxkhdr.checksum = 0;
......@@ -277,7 +275,7 @@ static int rxkad_secure_packet(struct rxrpc_call *call,
/* calculate the security checksum */
x = (call->cid & RXRPC_CHANNELMASK) << (32 - RXRPC_CIDSHIFT);
x |= sp->hdr.seq & 0x3fffffff;
call->crypto_buf[0] = htonl(sp->hdr.callNumber);
call->crypto_buf[0] = htonl(call->call_id);
call->crypto_buf[1] = htonl(x);
sg_init_one(&sg, call->crypto_buf, 8);
......
......@@ -134,13 +134,10 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
write_unlock_bh(&call->state_lock);
}
_proto("Tx DATA %%%u { #%u }", sp->hdr.serial, sp->hdr.seq);
if (seq == 1 && rxrpc_is_client_call(call))
rxrpc_expose_client_call(call);
sp->resend_at = jiffies + rxrpc_resend_timeout;
ret = rxrpc_send_data_packet(call->conn, skb);
ret = rxrpc_send_data_packet(call, skb);
if (ret < 0) {
_debug("need instant resend %d", ret);
rxrpc_instant_resend(call, ix);
......@@ -150,29 +147,6 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
_leave("");
}
/*
* Convert a host-endian header into a network-endian header.
*/
static void rxrpc_insert_header(struct sk_buff *skb)
{
struct rxrpc_wire_header whdr;
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
whdr.epoch = htonl(sp->hdr.epoch);
whdr.cid = htonl(sp->hdr.cid);
whdr.callNumber = htonl(sp->hdr.callNumber);
whdr.seq = htonl(sp->hdr.seq);
whdr.serial = htonl(sp->hdr.serial);
whdr.type = sp->hdr.type;
whdr.flags = sp->hdr.flags;
whdr.userStatus = sp->hdr.userStatus;
whdr.securityIndex = sp->hdr.securityIndex;
whdr._rsvd = htons(sp->hdr._rsvd);
whdr.serviceId = htons(sp->hdr.serviceId);
memcpy(skb->head, &whdr, sizeof(whdr));
}
/*
* send data through a socket
* - must be called in process context
......@@ -205,6 +179,10 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
copied = 0;
do {
/* Check to see if there's a ping ACK to reply to. */
if (call->ackr_reason == RXRPC_ACK_PING_RESPONSE)
rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ACK);
if (!skb) {
size_t size, chunk, max, space;
......@@ -232,7 +210,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
space = chunk + call->conn->size_align;
space &= ~(call->conn->size_align - 1UL);
size = space + call->conn->header_size;
size = space + call->conn->security_size;
_debug("SIZE: %zu/%zu/%zu", chunk, space, size);
......@@ -248,9 +226,9 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
ASSERTCMP(skb->mark, ==, 0);
_debug("HS: %u", call->conn->header_size);
skb_reserve(skb, call->conn->header_size);
skb->len += call->conn->header_size;
_debug("HS: %u", call->conn->security_size);
skb_reserve(skb, call->conn->security_size);
skb->len += call->conn->security_size;
sp = rxrpc_skb(skb);
sp->remain = chunk;
......@@ -312,33 +290,21 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
seq = call->tx_top + 1;
sp->hdr.epoch = conn->proto.epoch;
sp->hdr.cid = call->cid;
sp->hdr.callNumber = call->call_id;
sp->hdr.seq = seq;
sp->hdr.serial = atomic_inc_return(&conn->serial);
sp->hdr.type = RXRPC_PACKET_TYPE_DATA;
sp->hdr.userStatus = 0;
sp->hdr.securityIndex = call->security_ix;
sp->hdr._rsvd = 0;
sp->hdr.serviceId = call->service_id;
sp->hdr.flags = conn->out_clientflag;
sp->hdr.flags = conn->out_clientflag;
if (msg_data_left(msg) == 0 && !more)
sp->hdr.flags |= RXRPC_LAST_PACKET;
else if (call->tx_top - call->tx_hard_ack <
call->tx_winsize)
sp->hdr.flags |= RXRPC_MORE_PACKETS;
if (more && seq & 1)
sp->hdr.flags |= RXRPC_REQUEST_ACK;
ret = conn->security->secure_packet(
call, skb, skb->mark,
skb->head + sizeof(struct rxrpc_wire_header));
call, skb, skb->mark, skb->head);
if (ret < 0)
goto out;
rxrpc_insert_header(skb);
rxrpc_queue_packet(call, skb, !msg_data_left(msg) && !more);
skb = NULL;
}
......
......@@ -59,7 +59,7 @@ static struct ctl_table rxrpc_sysctl_table[] = {
.data = &rxrpc_resend_timeout,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec_ms_jiffies,
.proc_handler = proc_dointvec,
.extra1 = (void *)&one,
},
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment