Commit 139b5fbd authored by Paolo Abeni's avatar Paolo Abeni

Merge branch 'vsock-updates-for-so_rcvlowat-handling'

Arseniy Krasnov says:

====================
vsock: updates for SO_RCVLOWAT handling

This patchset includes some updates for SO_RCVLOWAT:

1) af_vsock:
   During my experiments with zerocopy receive, i found, that in some
   cases, poll() implementation violates POSIX: when socket has non-
   default SO_RCVLOWAT(e.g. not 1), poll() will always set POLLIN and
   POLLRDNORM bits in 'revents' even number of bytes available to read
   on socket is smaller than SO_RCVLOWAT value. In this case,user sees
   POLLIN flag and then tries to read data(for example using  'read()'
   call), but read call will be blocked, because  SO_RCVLOWAT logic is
   supported in dequeue loop in af_vsock.c. But the same time,  POSIX
   requires that:

   "POLLIN     Data other than high-priority data may be read without
               blocking.
    POLLRDNORM Normal data may be read without blocking."

   See https://www.open-std.org/jtc1/sc22/open/n4217.pdf, page 293.

   So, we have, that poll() syscall returns POLLIN, but read call will
   be blocked.

   Also in man page socket(7) i found that:

   "Since Linux 2.6.28, select(2), poll(2), and epoll(7) indicate a
   socket as readable only if at least SO_RCVLOWAT bytes are available."

   I checked TCP callback for poll()(net/ipv4/tcp.c, tcp_poll()), it
   uses SO_RCVLOWAT value to set POLLIN bit, also i've tested TCP with
   this case for TCP socket, it works as POSIX required.

   I've added some fixes to af_vsock.c and virtio_transport_common.c,
   test is also implemented.

2) virtio/vsock:
   It adds some optimization to wake ups, when new data arrived. Now,
   SO_RCVLOWAT is considered before wake up sleepers who wait new data.
   There is no sense, to kick waiter, when number of available bytes
   in socket's queue < SO_RCVLOWAT, because if we wake up reader in
   this case, it will wait for SO_RCVLOWAT data anyway during dequeue,
   or in poll() case, POLLIN/POLLRDNORM bits won't be set, so such
   exit from poll() will be "spurious". This logic is also used in TCP
   sockets.

3) vmci/vsock:
   Same as 2), but i'm not sure about this changes. Will be very good,
   to get comments from someone who knows this code.

4) Hyper-V:
   As Dexuan Cui mentioned, for Hyper-V transport it is difficult to
   support SO_RCVLOWAT, so he suggested to disable this feature for
   Hyper-V.
====================

Link: https://lore.kernel.org/r/de41de4c-0345-34d7-7c36-4345258b7ba8@sberdevices.ruSigned-off-by: default avatarPaolo Abeni <pabeni@redhat.com>
parents ab485081 b1346338
...@@ -78,6 +78,7 @@ struct vsock_sock { ...@@ -78,6 +78,7 @@ struct vsock_sock {
s64 vsock_stream_has_data(struct vsock_sock *vsk); s64 vsock_stream_has_data(struct vsock_sock *vsk);
s64 vsock_stream_has_space(struct vsock_sock *vsk); s64 vsock_stream_has_space(struct vsock_sock *vsk);
struct sock *vsock_create_connected(struct sock *parent); struct sock *vsock_create_connected(struct sock *parent);
void vsock_data_ready(struct sock *sk);
/**** TRANSPORT ****/ /**** TRANSPORT ****/
...@@ -135,6 +136,7 @@ struct vsock_transport { ...@@ -135,6 +136,7 @@ struct vsock_transport {
u64 (*stream_rcvhiwat)(struct vsock_sock *); u64 (*stream_rcvhiwat)(struct vsock_sock *);
bool (*stream_is_active)(struct vsock_sock *); bool (*stream_is_active)(struct vsock_sock *);
bool (*stream_allow)(u32 cid, u32 port); bool (*stream_allow)(u32 cid, u32 port);
int (*set_rcvlowat)(struct vsock_sock *vsk, int val);
/* SEQ_PACKET. */ /* SEQ_PACKET. */
ssize_t (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg, ssize_t (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
......
...@@ -882,6 +882,16 @@ s64 vsock_stream_has_space(struct vsock_sock *vsk) ...@@ -882,6 +882,16 @@ s64 vsock_stream_has_space(struct vsock_sock *vsk)
} }
EXPORT_SYMBOL_GPL(vsock_stream_has_space); EXPORT_SYMBOL_GPL(vsock_stream_has_space);
void vsock_data_ready(struct sock *sk)
{
struct vsock_sock *vsk = vsock_sk(sk);
if (vsock_stream_has_data(vsk) >= sk->sk_rcvlowat ||
sock_flag(sk, SOCK_DONE))
sk->sk_data_ready(sk);
}
EXPORT_SYMBOL_GPL(vsock_data_ready);
static int vsock_release(struct socket *sock) static int vsock_release(struct socket *sock)
{ {
__vsock_release(sock->sk, 0); __vsock_release(sock->sk, 0);
...@@ -1066,8 +1076,9 @@ static __poll_t vsock_poll(struct file *file, struct socket *sock, ...@@ -1066,8 +1076,9 @@ static __poll_t vsock_poll(struct file *file, struct socket *sock,
if (transport && transport->stream_is_active(vsk) && if (transport && transport->stream_is_active(vsk) &&
!(sk->sk_shutdown & RCV_SHUTDOWN)) { !(sk->sk_shutdown & RCV_SHUTDOWN)) {
bool data_ready_now = false; bool data_ready_now = false;
int target = sock_rcvlowat(sk, 0, INT_MAX);
int ret = transport->notify_poll_in( int ret = transport->notify_poll_in(
vsk, 1, &data_ready_now); vsk, target, &data_ready_now);
if (ret < 0) { if (ret < 0) {
mask |= EPOLLERR; mask |= EPOLLERR;
} else { } else {
...@@ -2137,6 +2148,25 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, ...@@ -2137,6 +2148,25 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
return err; return err;
} }
static int vsock_set_rcvlowat(struct sock *sk, int val)
{
const struct vsock_transport *transport;
struct vsock_sock *vsk;
vsk = vsock_sk(sk);
if (val > vsk->buffer_size)
return -EINVAL;
transport = vsk->transport;
if (transport && transport->set_rcvlowat)
return transport->set_rcvlowat(vsk, val);
WRITE_ONCE(sk->sk_rcvlowat, val ? : 1);
return 0;
}
static const struct proto_ops vsock_stream_ops = { static const struct proto_ops vsock_stream_ops = {
.family = PF_VSOCK, .family = PF_VSOCK,
.owner = THIS_MODULE, .owner = THIS_MODULE,
...@@ -2156,6 +2186,7 @@ static const struct proto_ops vsock_stream_ops = { ...@@ -2156,6 +2186,7 @@ static const struct proto_ops vsock_stream_ops = {
.recvmsg = vsock_connectible_recvmsg, .recvmsg = vsock_connectible_recvmsg,
.mmap = sock_no_mmap, .mmap = sock_no_mmap,
.sendpage = sock_no_sendpage, .sendpage = sock_no_sendpage,
.set_rcvlowat = vsock_set_rcvlowat,
}; };
static const struct proto_ops vsock_seqpacket_ops = { static const struct proto_ops vsock_seqpacket_ops = {
......
...@@ -815,6 +815,12 @@ int hvs_notify_send_post_enqueue(struct vsock_sock *vsk, ssize_t written, ...@@ -815,6 +815,12 @@ int hvs_notify_send_post_enqueue(struct vsock_sock *vsk, ssize_t written,
return 0; return 0;
} }
static
int hvs_set_rcvlowat(struct vsock_sock *vsk, int val)
{
return -EOPNOTSUPP;
}
static struct vsock_transport hvs_transport = { static struct vsock_transport hvs_transport = {
.module = THIS_MODULE, .module = THIS_MODULE,
...@@ -850,6 +856,7 @@ static struct vsock_transport hvs_transport = { ...@@ -850,6 +856,7 @@ static struct vsock_transport hvs_transport = {
.notify_send_pre_enqueue = hvs_notify_send_pre_enqueue, .notify_send_pre_enqueue = hvs_notify_send_pre_enqueue,
.notify_send_post_enqueue = hvs_notify_send_post_enqueue, .notify_send_post_enqueue = hvs_notify_send_post_enqueue,
.set_rcvlowat = hvs_set_rcvlowat
}; };
static bool hvs_check_transport(struct vsock_sock *vsk) static bool hvs_check_transport(struct vsock_sock *vsk)
......
...@@ -634,10 +634,7 @@ virtio_transport_notify_poll_in(struct vsock_sock *vsk, ...@@ -634,10 +634,7 @@ virtio_transport_notify_poll_in(struct vsock_sock *vsk,
size_t target, size_t target,
bool *data_ready_now) bool *data_ready_now)
{ {
if (vsock_stream_has_data(vsk)) *data_ready_now = vsock_stream_has_data(vsk) >= target;
*data_ready_now = true;
else
*data_ready_now = false;
return 0; return 0;
} }
...@@ -1084,7 +1081,7 @@ virtio_transport_recv_connected(struct sock *sk, ...@@ -1084,7 +1081,7 @@ virtio_transport_recv_connected(struct sock *sk,
switch (le16_to_cpu(pkt->hdr.op)) { switch (le16_to_cpu(pkt->hdr.op)) {
case VIRTIO_VSOCK_OP_RW: case VIRTIO_VSOCK_OP_RW:
virtio_transport_recv_enqueue(vsk, pkt); virtio_transport_recv_enqueue(vsk, pkt);
sk->sk_data_ready(sk); vsock_data_ready(sk);
return err; return err;
case VIRTIO_VSOCK_OP_CREDIT_REQUEST: case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
virtio_transport_send_credit_update(vsk); virtio_transport_send_credit_update(vsk);
......
...@@ -307,7 +307,7 @@ vmci_transport_handle_wrote(struct sock *sk, ...@@ -307,7 +307,7 @@ vmci_transport_handle_wrote(struct sock *sk,
struct vsock_sock *vsk = vsock_sk(sk); struct vsock_sock *vsk = vsock_sk(sk);
PKT_FIELD(vsk, sent_waiting_read) = false; PKT_FIELD(vsk, sent_waiting_read) = false;
#endif #endif
sk->sk_data_ready(sk); vsock_data_ready(sk);
} }
static void vmci_transport_notify_pkt_socket_init(struct sock *sk) static void vmci_transport_notify_pkt_socket_init(struct sock *sk)
...@@ -340,12 +340,12 @@ vmci_transport_notify_pkt_poll_in(struct sock *sk, ...@@ -340,12 +340,12 @@ vmci_transport_notify_pkt_poll_in(struct sock *sk,
{ {
struct vsock_sock *vsk = vsock_sk(sk); struct vsock_sock *vsk = vsock_sk(sk);
if (vsock_stream_has_data(vsk)) { if (vsock_stream_has_data(vsk) >= target) {
*data_ready_now = true; *data_ready_now = true;
} else { } else {
/* We can't read right now because there is nothing in the /* We can't read right now because there is not enough data
* queue. Ask for notifications when there is something to * in the queue. Ask for notifications when there is something
* read. * to read.
*/ */
if (sk->sk_state == TCP_ESTABLISHED) { if (sk->sk_state == TCP_ESTABLISHED) {
if (!send_waiting_read(sk, 1)) if (!send_waiting_read(sk, 1))
......
...@@ -84,7 +84,7 @@ vmci_transport_handle_wrote(struct sock *sk, ...@@ -84,7 +84,7 @@ vmci_transport_handle_wrote(struct sock *sk,
bool bottom_half, bool bottom_half,
struct sockaddr_vm *dst, struct sockaddr_vm *src) struct sockaddr_vm *dst, struct sockaddr_vm *src)
{ {
sk->sk_data_ready(sk); vsock_data_ready(sk);
} }
static void vsock_block_update_write_window(struct sock *sk) static void vsock_block_update_write_window(struct sock *sk)
...@@ -161,12 +161,12 @@ vmci_transport_notify_pkt_poll_in(struct sock *sk, ...@@ -161,12 +161,12 @@ vmci_transport_notify_pkt_poll_in(struct sock *sk,
{ {
struct vsock_sock *vsk = vsock_sk(sk); struct vsock_sock *vsk = vsock_sk(sk);
if (vsock_stream_has_data(vsk)) { if (vsock_stream_has_data(vsk) >= target) {
*data_ready_now = true; *data_ready_now = true;
} else { } else {
/* We can't read right now because there is nothing in the /* We can't read right now because there is not enough data
* queue. Ask for notifications when there is something to * in the queue. Ask for notifications when there is something
* read. * to read.
*/ */
if (sk->sk_state == TCP_ESTABLISHED) if (sk->sk_state == TCP_ESTABLISHED)
vsock_block_update_write_window(sk); vsock_block_update_write_window(sk);
...@@ -282,7 +282,7 @@ vmci_transport_notify_pkt_recv_post_dequeue( ...@@ -282,7 +282,7 @@ vmci_transport_notify_pkt_recv_post_dequeue(
/* See the comment in /* See the comment in
* vmci_transport_notify_pkt_send_post_enqueue(). * vmci_transport_notify_pkt_send_post_enqueue().
*/ */
sk->sk_data_ready(sk); vsock_data_ready(sk);
} }
return err; return err;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <time.h> #include <time.h>
#include <sys/mman.h> #include <sys/mman.h>
#include <poll.h>
#include "timeout.h" #include "timeout.h"
#include "control.h" #include "control.h"
...@@ -596,6 +597,108 @@ static void test_seqpacket_invalid_rec_buffer_server(const struct test_opts *opt ...@@ -596,6 +597,108 @@ static void test_seqpacket_invalid_rec_buffer_server(const struct test_opts *opt
close(fd); close(fd);
} }
#define RCVLOWAT_BUF_SIZE 128
static void test_stream_poll_rcvlowat_server(const struct test_opts *opts)
{
int fd;
int i;
fd = vsock_stream_accept(VMADDR_CID_ANY, 1234, NULL);
if (fd < 0) {
perror("accept");
exit(EXIT_FAILURE);
}
/* Send 1 byte. */
send_byte(fd, 1, 0);
control_writeln("SRVSENT");
/* Wait until client is ready to receive rest of data. */
control_expectln("CLNSENT");
for (i = 0; i < RCVLOWAT_BUF_SIZE - 1; i++)
send_byte(fd, 1, 0);
/* Keep socket in active state. */
control_expectln("POLLDONE");
close(fd);
}
static void test_stream_poll_rcvlowat_client(const struct test_opts *opts)
{
unsigned long lowat_val = RCVLOWAT_BUF_SIZE;
char buf[RCVLOWAT_BUF_SIZE];
struct pollfd fds;
ssize_t read_res;
short poll_flags;
int fd;
fd = vsock_stream_connect(opts->peer_cid, 1234);
if (fd < 0) {
perror("connect");
exit(EXIT_FAILURE);
}
if (setsockopt(fd, SOL_SOCKET, SO_RCVLOWAT,
&lowat_val, sizeof(lowat_val))) {
perror("setsockopt");
exit(EXIT_FAILURE);
}
control_expectln("SRVSENT");
/* At this point, server sent 1 byte. */
fds.fd = fd;
poll_flags = POLLIN | POLLRDNORM;
fds.events = poll_flags;
/* Try to wait for 1 sec. */
if (poll(&fds, 1, 1000) < 0) {
perror("poll");
exit(EXIT_FAILURE);
}
/* poll() must return nothing. */
if (fds.revents) {
fprintf(stderr, "Unexpected poll result %hx\n",
fds.revents);
exit(EXIT_FAILURE);
}
/* Tell server to send rest of data. */
control_writeln("CLNSENT");
/* Poll for data. */
if (poll(&fds, 1, 10000) < 0) {
perror("poll");
exit(EXIT_FAILURE);
}
/* Only these two bits are expected. */
if (fds.revents != poll_flags) {
fprintf(stderr, "Unexpected poll result %hx\n",
fds.revents);
exit(EXIT_FAILURE);
}
/* Use MSG_DONTWAIT, if call is going to wait, EAGAIN
* will be returned.
*/
read_res = recv(fd, buf, sizeof(buf), MSG_DONTWAIT);
if (read_res != RCVLOWAT_BUF_SIZE) {
fprintf(stderr, "Unexpected recv result %zi\n",
read_res);
exit(EXIT_FAILURE);
}
control_writeln("POLLDONE");
close(fd);
}
static struct test_case test_cases[] = { static struct test_case test_cases[] = {
{ {
.name = "SOCK_STREAM connection reset", .name = "SOCK_STREAM connection reset",
...@@ -646,6 +749,11 @@ static struct test_case test_cases[] = { ...@@ -646,6 +749,11 @@ static struct test_case test_cases[] = {
.run_client = test_seqpacket_invalid_rec_buffer_client, .run_client = test_seqpacket_invalid_rec_buffer_client,
.run_server = test_seqpacket_invalid_rec_buffer_server, .run_server = test_seqpacket_invalid_rec_buffer_server,
}, },
{
.name = "SOCK_STREAM poll() + SO_RCVLOWAT",
.run_client = test_stream_poll_rcvlowat_client,
.run_server = test_stream_poll_rcvlowat_server,
},
{}, {},
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment