Commit 82f0a41e authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'nfs-for-4.16-2' of git://git.linux-nfs.org/projects/trondmy/linux-nfs

Pull more NFS client updates from Trond Myklebust:
 "A few bugfixes and some small sunrpc latency/performance improvements
  before the merge window closes:

  Stable fixes:

   - fix an incorrect calculation of the RDMA send scatter gather
     element limit

   - fix an Oops when attempting to free resources after RDMA device
     removal

  Bugfixes:

   - SUNRPC: Ensure we always release the TCP socket in a timely fashion
     when the connection is shut down.

   - SUNRPC: Don't call __UDPX_INC_STATS() from a preemptible context

  Latency/Performance:

   - SUNRPC: Queue latency sensitive socket tasks to the less contended
     xprtiod queue

   - SUNRPC: Make the xprtiod workqueue unbounded.

   - SUNRPC: Make the rpciod workqueue unbounded"

* tag 'nfs-for-4.16-2' of git://git.linux-nfs.org/projects/trondmy/linux-nfs:
  SUNRPC: Don't call __UDPX_INC_STATS() from a preemptible context
  fix parallelism for rpc tasks
  Make the xprtiod workqueue unbounded.
  SUNRPC: Queue latency-sensitive socket tasks to xprtiod
  SUNRPC: Ensure we always close the socket after a connection shuts down
  xprtrdma: Fix BUG after a device removal
  xprtrdma: Fix calculation of ri_max_send_sges
parents 858f45bf 0afa6b44
...@@ -229,6 +229,9 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *, ...@@ -229,6 +229,9 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *,
struct rpc_task *, struct rpc_task *,
rpc_action action, rpc_action action,
int priority); int priority);
void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *queue,
struct rpc_task *task);
void rpc_wake_up_queued_task(struct rpc_wait_queue *, void rpc_wake_up_queued_task(struct rpc_wait_queue *,
struct rpc_task *); struct rpc_task *);
void rpc_wake_up(struct rpc_wait_queue *); void rpc_wake_up(struct rpc_wait_queue *);
......
...@@ -458,6 +458,18 @@ static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct r ...@@ -458,6 +458,18 @@ static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct r
rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task); rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
} }
/*
* Wake up a task on a specific queue
*/
void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *queue,
struct rpc_task *task)
{
spin_lock_bh(&queue->lock);
rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
spin_unlock_bh(&queue->lock);
}
/* /*
* Wake up a task on a specific queue * Wake up a task on a specific queue
*/ */
...@@ -1092,12 +1104,12 @@ static int rpciod_start(void) ...@@ -1092,12 +1104,12 @@ static int rpciod_start(void)
* Create the rpciod thread and wait for it to start. * Create the rpciod thread and wait for it to start.
*/ */
dprintk("RPC: creating workqueue rpciod\n"); dprintk("RPC: creating workqueue rpciod\n");
wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0); wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
if (!wq) if (!wq)
goto out_failed; goto out_failed;
rpciod_workqueue = wq; rpciod_workqueue = wq;
/* Note: highpri because network receive is latency sensitive */ /* Note: highpri because network receive is latency sensitive */
wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); wq = alloc_workqueue("xprtiod", WQ_UNBOUND|WQ_MEM_RECLAIM|WQ_HIGHPRI, 0);
if (!wq) if (!wq)
goto free_rpciod; goto free_rpciod;
xprtiod_workqueue = wq; xprtiod_workqueue = wq;
......
...@@ -517,7 +517,8 @@ void xprt_write_space(struct rpc_xprt *xprt) ...@@ -517,7 +517,8 @@ void xprt_write_space(struct rpc_xprt *xprt)
if (xprt->snd_task) { if (xprt->snd_task) {
dprintk("RPC: write space: waking waiting task on " dprintk("RPC: write space: waking waiting task on "
"xprt %p\n", xprt); "xprt %p\n", xprt);
rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task); rpc_wake_up_queued_task_on_wq(xprtiod_workqueue,
&xprt->pending, xprt->snd_task);
} }
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
} }
......
...@@ -143,7 +143,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, ...@@ -143,7 +143,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
if (xdr->page_len) { if (xdr->page_len) {
remaining = xdr->page_len; remaining = xdr->page_len;
offset = offset_in_page(xdr->page_base); offset = offset_in_page(xdr->page_base);
count = 0; count = RPCRDMA_MIN_SEND_SGES;
while (remaining) { while (remaining) {
remaining -= min_t(unsigned int, remaining -= min_t(unsigned int,
PAGE_SIZE - offset, remaining); PAGE_SIZE - offset, remaining);
......
...@@ -505,7 +505,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ...@@ -505,7 +505,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
return -ENOMEM; return -ENOMEM;
} }
ia->ri_max_send_sges = max_sge - RPCRDMA_MIN_SEND_SGES; ia->ri_max_send_sges = max_sge;
if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
dprintk("RPC: %s: insufficient wqe's available\n", dprintk("RPC: %s: insufficient wqe's available\n",
...@@ -1502,6 +1502,9 @@ __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) ...@@ -1502,6 +1502,9 @@ __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
static void static void
rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
{ {
if (!rb)
return;
if (!rpcrdma_regbuf_is_mapped(rb)) if (!rpcrdma_regbuf_is_mapped(rb))
return; return;
...@@ -1517,9 +1520,6 @@ rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) ...@@ -1517,9 +1520,6 @@ rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
void void
rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
{ {
if (!rb)
return;
rpcrdma_dma_unmap_regbuf(rb); rpcrdma_dma_unmap_regbuf(rb);
kfree(rb); kfree(rb);
} }
......
...@@ -807,13 +807,6 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) ...@@ -807,13 +807,6 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
smp_mb__after_atomic(); smp_mb__after_atomic();
} }
static void xs_sock_mark_closed(struct rpc_xprt *xprt)
{
xs_sock_reset_connection_flags(xprt);
/* Mark transport as closed and wake up all pending tasks */
xprt_disconnect_done(xprt);
}
/** /**
* xs_error_report - callback to handle TCP socket state errors * xs_error_report - callback to handle TCP socket state errors
* @sk: socket * @sk: socket
...@@ -833,9 +826,6 @@ static void xs_error_report(struct sock *sk) ...@@ -833,9 +826,6 @@ static void xs_error_report(struct sock *sk)
err = -sk->sk_err; err = -sk->sk_err;
if (err == 0) if (err == 0)
goto out; goto out;
/* Is this a reset event? */
if (sk->sk_state == TCP_CLOSE)
xs_sock_mark_closed(xprt);
dprintk("RPC: xs_error_report client %p, error=%d...\n", dprintk("RPC: xs_error_report client %p, error=%d...\n",
xprt, -err); xprt, -err);
trace_rpc_socket_error(xprt, sk->sk_socket, err); trace_rpc_socket_error(xprt, sk->sk_socket, err);
...@@ -1078,18 +1068,18 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, ...@@ -1078,18 +1068,18 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
/* Suck it into the iovec, verify checksum if not done by hw. */ /* Suck it into the iovec, verify checksum if not done by hw. */
if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
__UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
spin_lock(&xprt->recv_lock); spin_lock(&xprt->recv_lock);
__UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
goto out_unpin; goto out_unpin;
} }
__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
spin_lock_bh(&xprt->transport_lock); spin_lock_bh(&xprt->transport_lock);
xprt_adjust_cwnd(xprt, task, copied); xprt_adjust_cwnd(xprt, task, copied);
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
spin_lock(&xprt->recv_lock); spin_lock(&xprt->recv_lock);
xprt_complete_rqst(task, copied); xprt_complete_rqst(task, copied);
__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
out_unpin: out_unpin:
xprt_unpin_rqst(rovr); xprt_unpin_rqst(rovr);
out_unlock: out_unlock:
...@@ -1655,9 +1645,11 @@ static void xs_tcp_state_change(struct sock *sk) ...@@ -1655,9 +1645,11 @@ static void xs_tcp_state_change(struct sock *sk)
if (test_and_clear_bit(XPRT_SOCK_CONNECTING, if (test_and_clear_bit(XPRT_SOCK_CONNECTING,
&transport->sock_state)) &transport->sock_state))
xprt_clear_connecting(xprt); xprt_clear_connecting(xprt);
clear_bit(XPRT_CLOSING, &xprt->state);
if (sk->sk_err) if (sk->sk_err)
xprt_wake_pending_tasks(xprt, -sk->sk_err); xprt_wake_pending_tasks(xprt, -sk->sk_err);
xs_sock_mark_closed(xprt); /* Trigger the socket release */
xs_tcp_force_close(xprt);
} }
out: out:
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
...@@ -2265,14 +2257,19 @@ static void xs_tcp_shutdown(struct rpc_xprt *xprt) ...@@ -2265,14 +2257,19 @@ static void xs_tcp_shutdown(struct rpc_xprt *xprt)
{ {
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct socket *sock = transport->sock; struct socket *sock = transport->sock;
int skst = transport->inet ? transport->inet->sk_state : TCP_CLOSE;
if (sock == NULL) if (sock == NULL)
return; return;
if (xprt_connected(xprt)) { switch (skst) {
default:
kernel_sock_shutdown(sock, SHUT_RDWR); kernel_sock_shutdown(sock, SHUT_RDWR);
trace_rpc_socket_shutdown(xprt, sock); trace_rpc_socket_shutdown(xprt, sock);
} else break;
case TCP_CLOSE:
case TCP_TIME_WAIT:
xs_reset_transport(transport); xs_reset_transport(transport);
}
} }
static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment