Commit 4f8943f8 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Replace direct task wakeups from softirq context

Replace the direct task wakeups from inside a softirq context with
wakeups from a process context.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 7e0a0e38
...@@ -56,6 +56,7 @@ struct sock_xprt { ...@@ -56,6 +56,7 @@ struct sock_xprt {
*/ */
unsigned long sock_state; unsigned long sock_state;
struct delayed_work connect_worker; struct delayed_work connect_worker;
struct work_struct error_worker;
struct work_struct recv_worker; struct work_struct recv_worker;
struct mutex recv_mutex; struct mutex recv_mutex;
struct sockaddr_storage srcaddr; struct sockaddr_storage srcaddr;
...@@ -84,6 +85,10 @@ struct sock_xprt { ...@@ -84,6 +85,10 @@ struct sock_xprt {
#define XPRT_SOCK_CONNECTING 1U #define XPRT_SOCK_CONNECTING 1U
#define XPRT_SOCK_DATA_READY (2) #define XPRT_SOCK_DATA_READY (2)
#define XPRT_SOCK_UPD_TIMEOUT (3) #define XPRT_SOCK_UPD_TIMEOUT (3)
#define XPRT_SOCK_WAKE_ERROR (4)
#define XPRT_SOCK_WAKE_WRITE (5)
#define XPRT_SOCK_WAKE_PENDING (6)
#define XPRT_SOCK_WAKE_DISCONNECT (7)
#endif /* __KERNEL__ */ #endif /* __KERNEL__ */
......
...@@ -1211,6 +1211,15 @@ static void xs_sock_reset_state_flags(struct rpc_xprt *xprt) ...@@ -1211,6 +1211,15 @@ static void xs_sock_reset_state_flags(struct rpc_xprt *xprt)
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state);
clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state);
clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state);
}
static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr)
{
set_bit(nr, &transport->sock_state);
queue_work(xprtiod_workqueue, &transport->error_worker);
} }
static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
...@@ -1231,6 +1240,7 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) ...@@ -1231,6 +1240,7 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
*/ */
static void xs_error_report(struct sock *sk) static void xs_error_report(struct sock *sk)
{ {
struct sock_xprt *transport;
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
int err; int err;
...@@ -1238,13 +1248,14 @@ static void xs_error_report(struct sock *sk) ...@@ -1238,13 +1248,14 @@ static void xs_error_report(struct sock *sk)
if (!(xprt = xprt_from_sock(sk))) if (!(xprt = xprt_from_sock(sk)))
goto out; goto out;
transport = container_of(xprt, struct sock_xprt, xprt);
err = -sk->sk_err; err = -sk->sk_err;
if (err == 0) if (err == 0)
goto out; goto out;
dprintk("RPC: xs_error_report client %p, error=%d...\n", dprintk("RPC: xs_error_report client %p, error=%d...\n",
xprt, -err); xprt, -err);
trace_rpc_socket_error(xprt, sk->sk_socket, err); trace_rpc_socket_error(xprt, sk->sk_socket, err);
xprt_wake_pending_tasks(xprt, err); xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR);
out: out:
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
} }
...@@ -1507,7 +1518,7 @@ static void xs_tcp_state_change(struct sock *sk) ...@@ -1507,7 +1518,7 @@ static void xs_tcp_state_change(struct sock *sk)
xprt->stat.connect_count++; xprt->stat.connect_count++;
xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_time += (long)jiffies -
xprt->stat.connect_start; xprt->stat.connect_start;
xprt_wake_pending_tasks(xprt, -EAGAIN); xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
} }
spin_unlock(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
break; break;
...@@ -1525,7 +1536,7 @@ static void xs_tcp_state_change(struct sock *sk) ...@@ -1525,7 +1536,7 @@ static void xs_tcp_state_change(struct sock *sk)
/* The server initiated a shutdown of the socket */ /* The server initiated a shutdown of the socket */
xprt->connect_cookie++; xprt->connect_cookie++;
clear_bit(XPRT_CONNECTED, &xprt->state); clear_bit(XPRT_CONNECTED, &xprt->state);
xs_tcp_force_close(xprt); xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
/* fall through */ /* fall through */
case TCP_CLOSING: case TCP_CLOSING:
/* /*
...@@ -1547,7 +1558,7 @@ static void xs_tcp_state_change(struct sock *sk) ...@@ -1547,7 +1558,7 @@ static void xs_tcp_state_change(struct sock *sk)
xprt_clear_connecting(xprt); xprt_clear_connecting(xprt);
clear_bit(XPRT_CLOSING, &xprt->state); clear_bit(XPRT_CLOSING, &xprt->state);
/* Trigger the socket release */ /* Trigger the socket release */
xs_tcp_force_close(xprt); xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
} }
out: out:
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
...@@ -1556,6 +1567,7 @@ static void xs_tcp_state_change(struct sock *sk) ...@@ -1556,6 +1567,7 @@ static void xs_tcp_state_change(struct sock *sk)
static void xs_write_space(struct sock *sk) static void xs_write_space(struct sock *sk)
{ {
struct socket_wq *wq; struct socket_wq *wq;
struct sock_xprt *transport;
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
if (!sk->sk_socket) if (!sk->sk_socket)
...@@ -1564,13 +1576,14 @@ static void xs_write_space(struct sock *sk) ...@@ -1564,13 +1576,14 @@ static void xs_write_space(struct sock *sk)
if (unlikely(!(xprt = xprt_from_sock(sk)))) if (unlikely(!(xprt = xprt_from_sock(sk))))
return; return;
transport = container_of(xprt, struct sock_xprt, xprt);
rcu_read_lock(); rcu_read_lock();
wq = rcu_dereference(sk->sk_wq); wq = rcu_dereference(sk->sk_wq);
if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0) if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0)
goto out; goto out;
if (xprt_write_space(xprt)) xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE);
sk->sk_write_pending--; sk->sk_write_pending--;
out: out:
rcu_read_unlock(); rcu_read_unlock();
} }
...@@ -2461,6 +2474,56 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -2461,6 +2474,56 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
delay); delay);
} }
static void xs_wake_disconnect(struct sock_xprt *transport)
{
if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state))
xs_tcp_force_close(&transport->xprt);
}
static void xs_wake_write(struct sock_xprt *transport)
{
if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state))
xprt_write_space(&transport->xprt);
}
static void xs_wake_error(struct sock_xprt *transport)
{
int sockerr;
int sockerr_len = sizeof(sockerr);
if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
return;
mutex_lock(&transport->recv_mutex);
if (transport->sock == NULL)
goto out;
if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
goto out;
if (kernel_getsockopt(transport->sock, SOL_SOCKET, SO_ERROR,
(char *)&sockerr, &sockerr_len) != 0)
goto out;
if (sockerr < 0)
xprt_wake_pending_tasks(&transport->xprt, sockerr);
out:
mutex_unlock(&transport->recv_mutex);
}
static void xs_wake_pending(struct sock_xprt *transport)
{
if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state))
xprt_wake_pending_tasks(&transport->xprt, -EAGAIN);
}
static void xs_error_handle(struct work_struct *work)
{
struct sock_xprt *transport = container_of(work,
struct sock_xprt, error_worker);
xs_wake_disconnect(transport);
xs_wake_write(transport);
xs_wake_error(transport);
xs_wake_pending(transport);
}
/** /**
* xs_local_print_stats - display AF_LOCAL socket-specifc stats * xs_local_print_stats - display AF_LOCAL socket-specifc stats
* @xprt: rpc_xprt struct containing statistics * @xprt: rpc_xprt struct containing statistics
...@@ -2873,6 +2936,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) ...@@ -2873,6 +2936,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
xprt->timeout = &xs_local_default_timeout; xprt->timeout = &xs_local_default_timeout;
INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
INIT_WORK(&transport->error_worker, xs_error_handle);
INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
switch (sun->sun_family) { switch (sun->sun_family) {
...@@ -2943,6 +3007,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) ...@@ -2943,6 +3007,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
xprt->timeout = &xs_udp_default_timeout; xprt->timeout = &xs_udp_default_timeout;
INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
INIT_WORK(&transport->error_worker, xs_error_handle);
INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
switch (addr->sa_family) { switch (addr->sa_family) {
...@@ -3024,6 +3089,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) ...@@ -3024,6 +3089,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
(xprt->timeout->to_retries + 1); (xprt->timeout->to_retries + 1);
INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
INIT_WORK(&transport->error_worker, xs_error_handle);
INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
switch (addr->sa_family) { switch (addr->sa_family) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment