Commit 40a5f1b1 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: RPC transport queue must be low latency

rpciod can easily get congested due to the long list of queued rpc_tasks.
Having the receive queue wait in turn for those tasks to complete can
therefore be a bottleneck.

Address the problem by separating the workqueues into:
- rpciod: manages rpc_tasks
- xprtiod: manages transport related work.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@primarydata.com>
parent 5157b956
...@@ -247,6 +247,7 @@ void rpc_show_tasks(struct net *); ...@@ -247,6 +247,7 @@ void rpc_show_tasks(struct net *);
int rpc_init_mempool(void); int rpc_init_mempool(void);
void rpc_destroy_mempool(void); void rpc_destroy_mempool(void);
extern struct workqueue_struct *rpciod_workqueue; extern struct workqueue_struct *rpciod_workqueue;
extern struct workqueue_struct *xprtiod_workqueue;
void rpc_prepare_task(struct rpc_task *task); void rpc_prepare_task(struct rpc_task *task);
static inline int rpc_wait_for_completion_task(struct rpc_task *task) static inline int rpc_wait_for_completion_task(struct rpc_task *task)
......
...@@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue; ...@@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue;
/* /*
* rpciod-related stuff * rpciod-related stuff
*/ */
struct workqueue_struct *rpciod_workqueue; struct workqueue_struct *rpciod_workqueue __read_mostly;
struct workqueue_struct *xprtiod_workqueue __read_mostly;
/* /*
* Disable the timer for a given RPC task. Should be called with * Disable the timer for a given RPC task. Should be called with
...@@ -1071,10 +1072,22 @@ static int rpciod_start(void) ...@@ -1071,10 +1072,22 @@ static int rpciod_start(void)
* Create the rpciod thread and wait for it to start. * Create the rpciod thread and wait for it to start.
*/ */
dprintk("RPC: creating workqueue rpciod\n"); dprintk("RPC: creating workqueue rpciod\n");
/* Note: highpri because network receive is latency sensitive */ wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); if (!wq)
goto out_failed;
rpciod_workqueue = wq; rpciod_workqueue = wq;
return rpciod_workqueue != NULL; /* Note: highpri because network receive is latency sensitive */
wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
if (!wq)
goto free_rpciod;
xprtiod_workqueue = wq;
return 1;
free_rpciod:
wq = rpciod_workqueue;
rpciod_workqueue = NULL;
destroy_workqueue(wq);
out_failed:
return 0;
} }
static void rpciod_stop(void) static void rpciod_stop(void)
...@@ -1088,6 +1101,9 @@ static void rpciod_stop(void) ...@@ -1088,6 +1101,9 @@ static void rpciod_stop(void)
wq = rpciod_workqueue; wq = rpciod_workqueue;
rpciod_workqueue = NULL; rpciod_workqueue = NULL;
destroy_workqueue(wq); destroy_workqueue(wq);
wq = xprtiod_workqueue;
xprtiod_workqueue = NULL;
destroy_workqueue(wq);
} }
void void
......
...@@ -220,7 +220,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt) ...@@ -220,7 +220,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
clear_bit(XPRT_LOCKED, &xprt->state); clear_bit(XPRT_LOCKED, &xprt->state);
smp_mb__after_atomic(); smp_mb__after_atomic();
} else } else
queue_work(rpciod_workqueue, &xprt->task_cleanup); queue_work(xprtiod_workqueue, &xprt->task_cleanup);
} }
/* /*
...@@ -645,7 +645,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt) ...@@ -645,7 +645,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
set_bit(XPRT_CLOSE_WAIT, &xprt->state); set_bit(XPRT_CLOSE_WAIT, &xprt->state);
/* Try to schedule an autoclose RPC call */ /* Try to schedule an autoclose RPC call */
if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
queue_work(rpciod_workqueue, &xprt->task_cleanup); queue_work(xprtiod_workqueue, &xprt->task_cleanup);
xprt_wake_pending_tasks(xprt, -EAGAIN); xprt_wake_pending_tasks(xprt, -EAGAIN);
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
} }
...@@ -672,7 +672,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) ...@@ -672,7 +672,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
set_bit(XPRT_CLOSE_WAIT, &xprt->state); set_bit(XPRT_CLOSE_WAIT, &xprt->state);
/* Try to schedule an autoclose RPC call */ /* Try to schedule an autoclose RPC call */
if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
queue_work(rpciod_workqueue, &xprt->task_cleanup); queue_work(xprtiod_workqueue, &xprt->task_cleanup);
xprt_wake_pending_tasks(xprt, -EAGAIN); xprt_wake_pending_tasks(xprt, -EAGAIN);
out: out:
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
...@@ -689,7 +689,7 @@ xprt_init_autodisconnect(unsigned long data) ...@@ -689,7 +689,7 @@ xprt_init_autodisconnect(unsigned long data)
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
goto out_abort; goto out_abort;
spin_unlock(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
queue_work(rpciod_workqueue, &xprt->task_cleanup); queue_work(xprtiod_workqueue, &xprt->task_cleanup);
return; return;
out_abort: out_abort:
spin_unlock(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
......
...@@ -1095,7 +1095,7 @@ static void xs_data_ready(struct sock *sk) ...@@ -1095,7 +1095,7 @@ static void xs_data_ready(struct sock *sk)
if (xprt->reestablish_timeout) if (xprt->reestablish_timeout)
xprt->reestablish_timeout = 0; xprt->reestablish_timeout = 0;
if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
queue_work(rpciod_workqueue, &transport->recv_worker); queue_work(xprtiod_workqueue, &transport->recv_worker);
} }
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
} }
...@@ -2378,7 +2378,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -2378,7 +2378,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
/* Start by resetting any existing state */ /* Start by resetting any existing state */
xs_reset_transport(transport); xs_reset_transport(transport);
queue_delayed_work(rpciod_workqueue, queue_delayed_work(xprtiod_workqueue,
&transport->connect_worker, &transport->connect_worker,
xprt->reestablish_timeout); xprt->reestablish_timeout);
xprt->reestablish_timeout <<= 1; xprt->reestablish_timeout <<= 1;
...@@ -2388,7 +2388,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -2388,7 +2388,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO; xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
} else { } else {
dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);
queue_delayed_work(rpciod_workqueue, queue_delayed_work(xprtiod_workqueue,
&transport->connect_worker, 0); &transport->connect_worker, 0);
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment