Commit 9a0e6acc authored by NeilBrown's avatar NeilBrown Committed by Chuck Lever

SUNRPC: use lwq for sp_sockets - renamed to sp_xprts

lwq avoids using back pointers in lists, and uses less locking.
This introduces a new spinlock, but the other one will be removed in a
future patch.

For svc_clean_up_xprts(), we now dequeue the entire queue, walk it to
remove and process the xprts that need cleaning up, then re-enqueue the
remaining queue.
Signed-off-by: default avatarNeilBrown <neilb@suse.de>
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 5b80147e
......@@ -17,6 +17,7 @@
#include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/auth.h>
#include <linux/sunrpc/svcauth.h>
#include <linux/lwq.h>
#include <linux/wait.h>
#include <linux/mm.h>
#include <linux/pagevec.h>
......@@ -34,7 +35,7 @@
struct svc_pool {
unsigned int sp_id; /* pool id; also node id on NUMA */
spinlock_t sp_lock; /* protects all fields */
struct list_head sp_sockets; /* pending sockets */
struct lwq sp_xprts; /* pending transports */
unsigned int sp_nrthreads; /* # of threads in pool */
struct list_head sp_all_threads; /* all server threads */
struct llist_head sp_idle_threads; /* idle server threads */
......
......@@ -54,7 +54,7 @@ struct svc_xprt {
const struct svc_xprt_ops *xpt_ops;
struct kref xpt_ref;
struct list_head xpt_list;
struct list_head xpt_ready;
struct lwq_node xpt_ready;
unsigned long xpt_flags;
struct svc_serv *xpt_server; /* service for transport */
......
......@@ -508,7 +508,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
i, serv->sv_name);
pool->sp_id = i;
INIT_LIST_HEAD(&pool->sp_sockets);
lwq_init(&pool->sp_xprts);
INIT_LIST_HEAD(&pool->sp_all_threads);
init_llist_head(&pool->sp_idle_threads);
spin_lock_init(&pool->sp_lock);
......
......@@ -201,7 +201,6 @@ void svc_xprt_init(struct net *net, struct svc_xprt_class *xcl,
kref_init(&xprt->xpt_ref);
xprt->xpt_server = serv;
INIT_LIST_HEAD(&xprt->xpt_list);
INIT_LIST_HEAD(&xprt->xpt_ready);
INIT_LIST_HEAD(&xprt->xpt_deferred);
INIT_LIST_HEAD(&xprt->xpt_users);
mutex_init(&xprt->xpt_mutex);
......@@ -472,9 +471,7 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
pool = svc_pool_for_cpu(xprt->xpt_server);
percpu_counter_inc(&pool->sp_sockets_queued);
spin_lock_bh(&pool->sp_lock);
list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
spin_unlock_bh(&pool->sp_lock);
lwq_enqueue(&xprt->xpt_ready, &pool->sp_xprts);
svc_pool_wake_idle_thread(pool);
}
......@@ -487,18 +484,9 @@ static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
{
struct svc_xprt *xprt = NULL;
if (list_empty(&pool->sp_sockets))
goto out;
spin_lock_bh(&pool->sp_lock);
if (likely(!list_empty(&pool->sp_sockets))) {
xprt = list_first_entry(&pool->sp_sockets,
struct svc_xprt, xpt_ready);
list_del_init(&xprt->xpt_ready);
xprt = lwq_dequeue(&pool->sp_xprts, struct svc_xprt, xpt_ready);
if (xprt)
svc_xprt_get(xprt);
}
spin_unlock_bh(&pool->sp_lock);
out:
return xprt;
}
......@@ -708,7 +696,7 @@ svc_thread_should_sleep(struct svc_rqst *rqstp)
return false;
/* was a socket queued? */
if (!list_empty(&pool->sp_sockets))
if (!lwq_empty(&pool->sp_xprts))
return false;
/* are we shutting down? */
......@@ -1050,7 +1038,6 @@ static void svc_delete_xprt(struct svc_xprt *xprt)
spin_lock_bh(&serv->sv_lock);
list_del_init(&xprt->xpt_list);
WARN_ON_ONCE(!list_empty(&xprt->xpt_ready));
if (test_bit(XPT_TEMP, &xprt->xpt_flags))
serv->sv_tmpcnt--;
spin_unlock_bh(&serv->sv_lock);
......@@ -1101,36 +1088,26 @@ static int svc_close_list(struct svc_serv *serv, struct list_head *xprt_list, st
return ret;
}
static struct svc_xprt *svc_dequeue_net(struct svc_serv *serv, struct net *net)
static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net)
{
struct svc_pool *pool;
struct svc_xprt *xprt;
struct svc_xprt *tmp;
int i;
for (i = 0; i < serv->sv_nrpools; i++) {
pool = &serv->sv_pools[i];
struct svc_pool *pool = &serv->sv_pools[i];
struct llist_node *q, **t1, *t2;
spin_lock_bh(&pool->sp_lock);
list_for_each_entry_safe(xprt, tmp, &pool->sp_sockets, xpt_ready) {
if (xprt->xpt_net != net)
continue;
list_del_init(&xprt->xpt_ready);
spin_unlock_bh(&pool->sp_lock);
return xprt;
q = lwq_dequeue_all(&pool->sp_xprts);
lwq_for_each_safe(xprt, t1, t2, &q, xpt_ready) {
if (xprt->xpt_net == net) {
set_bit(XPT_CLOSE, &xprt->xpt_flags);
svc_delete_xprt(xprt);
xprt = NULL;
}
spin_unlock_bh(&pool->sp_lock);
}
return NULL;
}
static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net)
{
struct svc_xprt *xprt;
while ((xprt = svc_dequeue_net(serv, net))) {
set_bit(XPT_CLOSE, &xprt->xpt_flags);
svc_delete_xprt(xprt);
if (q)
lwq_enqueue_batch(q, &pool->sp_xprts);
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment