Commit 818a2604 authored by Paolo Abeni's avatar Paolo Abeni

Merge branch 'soreuseport-fix-broken-so_incoming_cpu'

Kuniyuki Iwashima says:

====================
soreuseport: Fix broken SO_INCOMING_CPU.

setsockopt(SO_INCOMING_CPU) for UDP/TCP is broken since 4.5/4.6 due to
these commits:

  * e32ea7e7 ("soreuseport: fast reuseport UDP socket selection")
  * c125e80b ("soreuseport: fast reuseport TCP socket selection")

These commits introduced the O(1) socket selection algorithm and removed
O(n) iteration over the list, but it ignores the score calculated by
compute_score().  As a result, it caused two misbehaviours:

  * Unconnected sockets receive packets sent to connected sockets
  * SO_INCOMING_CPU does not work

The former is fixed by commit acdcecc6 ("udp: correct reuseport
selection with connected sockets").  This series fixes the latter and
adds some tests for SO_INCOMING_CPU.
====================

Link: https://lore.kernel.org/r/20221021204435.4259-1-kuniyu@amazon.comSigned-off-by: default avatarPaolo Abeni <pabeni@redhat.com>
parents 71920a77 6df96146
......@@ -16,6 +16,7 @@ struct sock_reuseport {
u16 max_socks; /* length of socks */
u16 num_socks; /* elements in socks */
u16 num_closed_socks; /* closed elements in socks */
u16 incoming_cpu;
/* The last synq overflow event timestamp of this
* reuse->socks[] group.
*/
......@@ -58,5 +59,6 @@ static inline bool reuseport_has_conns(struct sock *sk)
}
void reuseport_has_conns_set(struct sock *sk);
void reuseport_update_incoming_cpu(struct sock *sk, int val);
#endif /* _SOCK_REUSEPORT_H */
......@@ -1436,7 +1436,7 @@ int sk_setsockopt(struct sock *sk, int level, int optname,
break;
}
case SO_INCOMING_CPU:
WRITE_ONCE(sk->sk_incoming_cpu, val);
reuseport_update_incoming_cpu(sk, val);
break;
case SO_CNX_ADVICE:
......
......@@ -37,6 +37,70 @@ void reuseport_has_conns_set(struct sock *sk)
}
EXPORT_SYMBOL(reuseport_has_conns_set);
static void __reuseport_get_incoming_cpu(struct sock_reuseport *reuse)
{
/* Paired with READ_ONCE() in reuseport_select_sock_by_hash(). */
WRITE_ONCE(reuse->incoming_cpu, reuse->incoming_cpu + 1);
}
static void __reuseport_put_incoming_cpu(struct sock_reuseport *reuse)
{
/* Paired with READ_ONCE() in reuseport_select_sock_by_hash(). */
WRITE_ONCE(reuse->incoming_cpu, reuse->incoming_cpu - 1);
}
static void reuseport_get_incoming_cpu(struct sock *sk, struct sock_reuseport *reuse)
{
if (sk->sk_incoming_cpu >= 0)
__reuseport_get_incoming_cpu(reuse);
}
static void reuseport_put_incoming_cpu(struct sock *sk, struct sock_reuseport *reuse)
{
if (sk->sk_incoming_cpu >= 0)
__reuseport_put_incoming_cpu(reuse);
}
void reuseport_update_incoming_cpu(struct sock *sk, int val)
{
struct sock_reuseport *reuse;
int old_sk_incoming_cpu;
if (unlikely(!rcu_access_pointer(sk->sk_reuseport_cb))) {
/* Paired with REAE_ONCE() in sk_incoming_cpu_update()
* and compute_score().
*/
WRITE_ONCE(sk->sk_incoming_cpu, val);
return;
}
spin_lock_bh(&reuseport_lock);
/* This must be done under reuseport_lock to avoid a race with
* reuseport_grow(), which accesses sk->sk_incoming_cpu without
* lock_sock() when detaching a shutdown()ed sk.
*
* Paired with READ_ONCE() in reuseport_select_sock_by_hash().
*/
old_sk_incoming_cpu = sk->sk_incoming_cpu;
WRITE_ONCE(sk->sk_incoming_cpu, val);
reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock));
/* reuseport_grow() has detached a closed sk. */
if (!reuse)
goto out;
if (old_sk_incoming_cpu < 0 && val >= 0)
__reuseport_get_incoming_cpu(reuse);
else if (old_sk_incoming_cpu >= 0 && val < 0)
__reuseport_put_incoming_cpu(reuse);
out:
spin_unlock_bh(&reuseport_lock);
}
static int reuseport_sock_index(struct sock *sk,
const struct sock_reuseport *reuse,
bool closed)
......@@ -64,6 +128,7 @@ static void __reuseport_add_sock(struct sock *sk,
/* paired with smp_rmb() in reuseport_(select|migrate)_sock() */
smp_wmb();
reuse->num_socks++;
reuseport_get_incoming_cpu(sk, reuse);
}
static bool __reuseport_detach_sock(struct sock *sk,
......@@ -76,6 +141,7 @@ static bool __reuseport_detach_sock(struct sock *sk,
reuse->socks[i] = reuse->socks[reuse->num_socks - 1];
reuse->num_socks--;
reuseport_put_incoming_cpu(sk, reuse);
return true;
}
......@@ -86,6 +152,7 @@ static void __reuseport_add_closed_sock(struct sock *sk,
reuse->socks[reuse->max_socks - reuse->num_closed_socks - 1] = sk;
/* paired with READ_ONCE() in inet_csk_bind_conflict() */
WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks + 1);
reuseport_get_incoming_cpu(sk, reuse);
}
static bool __reuseport_detach_closed_sock(struct sock *sk,
......@@ -99,6 +166,7 @@ static bool __reuseport_detach_closed_sock(struct sock *sk,
reuse->socks[i] = reuse->socks[reuse->max_socks - reuse->num_closed_socks];
/* paired with READ_ONCE() in inet_csk_bind_conflict() */
WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks - 1);
reuseport_put_incoming_cpu(sk, reuse);
return true;
}
......@@ -166,6 +234,7 @@ int reuseport_alloc(struct sock *sk, bool bind_inany)
reuse->bind_inany = bind_inany;
reuse->socks[0] = sk;
reuse->num_socks = 1;
reuseport_get_incoming_cpu(sk, reuse);
rcu_assign_pointer(sk->sk_reuseport_cb, reuse);
out:
......@@ -209,6 +278,7 @@ static struct sock_reuseport *reuseport_grow(struct sock_reuseport *reuse)
more_reuse->reuseport_id = reuse->reuseport_id;
more_reuse->bind_inany = reuse->bind_inany;
more_reuse->has_conns = reuse->has_conns;
more_reuse->incoming_cpu = reuse->incoming_cpu;
memcpy(more_reuse->socks, reuse->socks,
reuse->num_socks * sizeof(struct sock *));
......@@ -458,18 +528,32 @@ static struct sock *run_bpf_filter(struct sock_reuseport *reuse, u16 socks,
static struct sock *reuseport_select_sock_by_hash(struct sock_reuseport *reuse,
u32 hash, u16 num_socks)
{
struct sock *first_valid_sk = NULL;
int i, j;
i = j = reciprocal_scale(hash, num_socks);
while (reuse->socks[i]->sk_state == TCP_ESTABLISHED) {
do {
struct sock *sk = reuse->socks[i];
if (sk->sk_state != TCP_ESTABLISHED) {
/* Paired with WRITE_ONCE() in __reuseport_(get|put)_incoming_cpu(). */
if (!READ_ONCE(reuse->incoming_cpu))
return sk;
/* Paired with WRITE_ONCE() in reuseport_update_incoming_cpu(). */
if (READ_ONCE(sk->sk_incoming_cpu) == raw_smp_processor_id())
return sk;
if (!first_valid_sk)
first_valid_sk = sk;
}
i++;
if (i >= num_socks)
i = 0;
if (i == j)
return NULL;
}
} while (i != j);
return reuse->socks[i];
return first_valid_sk;
}
/**
......
......@@ -25,6 +25,7 @@ rxtimestamp
sk_bind_sendto_listen
sk_connect_zero_addr
socket
so_incoming_cpu
so_netns_cookie
so_txtime
stress_reuseport_listen
......
......@@ -71,6 +71,7 @@ TEST_GEN_FILES += bind_bhash
TEST_GEN_PROGS += sk_bind_sendto_listen
TEST_GEN_PROGS += sk_connect_zero_addr
TEST_PROGS += test_ingress_egress_chaining.sh
TEST_GEN_PROGS += so_incoming_cpu
TEST_FILES := settings
......
// SPDX-License-Identifier: GPL-2.0
/* Copyright Amazon.com Inc. or its affiliates. */
#define _GNU_SOURCE
#include <sched.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/sysinfo.h>
#include "../kselftest_harness.h"
#define CLIENT_PER_SERVER 32 /* More sockets, more reliable */
#define NR_SERVER self->nproc
#define NR_CLIENT (CLIENT_PER_SERVER * NR_SERVER)
FIXTURE(so_incoming_cpu)
{
int nproc;
int *servers;
union {
struct sockaddr addr;
struct sockaddr_in in_addr;
};
socklen_t addrlen;
};
enum when_to_set {
BEFORE_REUSEPORT,
BEFORE_LISTEN,
AFTER_LISTEN,
AFTER_ALL_LISTEN,
};
FIXTURE_VARIANT(so_incoming_cpu)
{
int when_to_set;
};
FIXTURE_VARIANT_ADD(so_incoming_cpu, before_reuseport)
{
.when_to_set = BEFORE_REUSEPORT,
};
FIXTURE_VARIANT_ADD(so_incoming_cpu, before_listen)
{
.when_to_set = BEFORE_LISTEN,
};
FIXTURE_VARIANT_ADD(so_incoming_cpu, after_listen)
{
.when_to_set = AFTER_LISTEN,
};
FIXTURE_VARIANT_ADD(so_incoming_cpu, after_all_listen)
{
.when_to_set = AFTER_ALL_LISTEN,
};
FIXTURE_SETUP(so_incoming_cpu)
{
self->nproc = get_nprocs();
ASSERT_LE(2, self->nproc);
self->servers = malloc(sizeof(int) * NR_SERVER);
ASSERT_NE(self->servers, NULL);
self->in_addr.sin_family = AF_INET;
self->in_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
self->in_addr.sin_port = htons(0);
self->addrlen = sizeof(struct sockaddr_in);
}
FIXTURE_TEARDOWN(so_incoming_cpu)
{
int i;
for (i = 0; i < NR_SERVER; i++)
close(self->servers[i]);
free(self->servers);
}
void set_so_incoming_cpu(struct __test_metadata *_metadata, int fd, int cpu)
{
int ret;
ret = setsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, sizeof(int));
ASSERT_EQ(ret, 0);
}
int create_server(struct __test_metadata *_metadata,
FIXTURE_DATA(so_incoming_cpu) *self,
const FIXTURE_VARIANT(so_incoming_cpu) *variant,
int cpu)
{
int fd, ret;
fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
ASSERT_NE(fd, -1);
if (variant->when_to_set == BEFORE_REUSEPORT)
set_so_incoming_cpu(_metadata, fd, cpu);
ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &(int){1}, sizeof(int));
ASSERT_EQ(ret, 0);
ret = bind(fd, &self->addr, self->addrlen);
ASSERT_EQ(ret, 0);
if (variant->when_to_set == BEFORE_LISTEN)
set_so_incoming_cpu(_metadata, fd, cpu);
/* We don't use CLIENT_PER_SERVER here not to block
* this test at connect() if SO_INCOMING_CPU is broken.
*/
ret = listen(fd, NR_CLIENT);
ASSERT_EQ(ret, 0);
if (variant->when_to_set == AFTER_LISTEN)
set_so_incoming_cpu(_metadata, fd, cpu);
return fd;
}
void create_servers(struct __test_metadata *_metadata,
FIXTURE_DATA(so_incoming_cpu) *self,
const FIXTURE_VARIANT(so_incoming_cpu) *variant)
{
int i, ret;
for (i = 0; i < NR_SERVER; i++) {
self->servers[i] = create_server(_metadata, self, variant, i);
if (i == 0) {
ret = getsockname(self->servers[i], &self->addr, &self->addrlen);
ASSERT_EQ(ret, 0);
}
}
if (variant->when_to_set == AFTER_ALL_LISTEN) {
for (i = 0; i < NR_SERVER; i++)
set_so_incoming_cpu(_metadata, self->servers[i], i);
}
}
void create_clients(struct __test_metadata *_metadata,
FIXTURE_DATA(so_incoming_cpu) *self)
{
cpu_set_t cpu_set;
int i, j, fd, ret;
for (i = 0; i < NR_SERVER; i++) {
CPU_ZERO(&cpu_set);
CPU_SET(i, &cpu_set);
ASSERT_EQ(CPU_COUNT(&cpu_set), 1);
ASSERT_NE(CPU_ISSET(i, &cpu_set), 0);
/* Make sure SYN will be processed on the i-th CPU
* and finally distributed to the i-th listener.
*/
sched_setaffinity(0, sizeof(cpu_set), &cpu_set);
ASSERT_EQ(ret, 0);
for (j = 0; j < CLIENT_PER_SERVER; j++) {
fd = socket(AF_INET, SOCK_STREAM, 0);
ASSERT_NE(fd, -1);
ret = connect(fd, &self->addr, self->addrlen);
ASSERT_EQ(ret, 0);
close(fd);
}
}
}
void verify_incoming_cpu(struct __test_metadata *_metadata,
FIXTURE_DATA(so_incoming_cpu) *self)
{
int i, j, fd, cpu, ret, total = 0;
socklen_t len = sizeof(int);
for (i = 0; i < NR_SERVER; i++) {
for (j = 0; j < CLIENT_PER_SERVER; j++) {
/* If we see -EAGAIN here, SO_INCOMING_CPU is broken */
fd = accept(self->servers[i], &self->addr, &self->addrlen);
ASSERT_NE(fd, -1);
ret = getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, &len);
ASSERT_EQ(ret, 0);
ASSERT_EQ(cpu, i);
close(fd);
total++;
}
}
ASSERT_EQ(total, NR_CLIENT);
TH_LOG("SO_INCOMING_CPU is very likely to be "
"working correctly with %d sockets.", total);
}
TEST_F(so_incoming_cpu, test1)
{
create_servers(_metadata, self, variant);
create_clients(_metadata, self);
verify_incoming_cpu(_metadata, self);
}
TEST_F(so_incoming_cpu, test2)
{
int server;
create_servers(_metadata, self, variant);
/* No CPU specified */
server = create_server(_metadata, self, variant, -1);
close(server);
create_clients(_metadata, self);
verify_incoming_cpu(_metadata, self);
}
TEST_F(so_incoming_cpu, test3)
{
int server, client;
create_servers(_metadata, self, variant);
/* No CPU specified */
server = create_server(_metadata, self, variant, -1);
create_clients(_metadata, self);
/* Never receive any requests */
client = accept(server, &self->addr, &self->addrlen);
ASSERT_EQ(client, -1);
verify_incoming_cpu(_metadata, self);
}
TEST_HARNESS_MAIN
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment