Commit 8660b7d8 authored by Paul E. McKenney's avatar Paul E. McKenney

srcu: Use rcu_segcblist to track SRCU callbacks

This commit switches SRCU from custom-built callback queues to the new
rcu_segcblist structure.  This change associates grace-period sequence
numbers with groups of callbacks, which will be needed for efficient
processing of per-CPU callbacks.
Signed-off-by: default avatarPaul E. McKenney <paulmck@linux.vnet.ibm.com>
parent ac367c1c
...@@ -92,7 +92,6 @@ static inline struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp) ...@@ -92,7 +92,6 @@ static inline struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp)
rhp = rclp->head; rhp = rclp->head;
if (!rhp) if (!rhp)
return NULL; return NULL;
prefetch(rhp);
rclp->len--; rclp->len--;
rclp->head = rhp->next; rclp->head = rhp->next;
if (!rclp->head) if (!rclp->head)
...@@ -175,6 +174,15 @@ struct rcu_segcblist { ...@@ -175,6 +174,15 @@ struct rcu_segcblist {
long len_lazy; long len_lazy;
}; };
#define RCU_SEGCBLIST_INITIALIZER(n) \
{ \
.head = NULL, \
.tails[RCU_DONE_TAIL] = &n.head, \
.tails[RCU_WAIT_TAIL] = &n.head, \
.tails[RCU_NEXT_READY_TAIL] = &n.head, \
.tails[RCU_NEXT_TAIL] = &n.head, \
}
/* /*
* Initialize an rcu_segcblist structure. * Initialize an rcu_segcblist structure.
*/ */
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
* Lai Jiangshan <laijs@cn.fujitsu.com> * Lai Jiangshan <laijs@cn.fujitsu.com>
* *
* For detailed explanation of Read-Copy Update mechanism see - * For detailed explanation of Read-Copy Update mechanism see -
* Documentation/RCU/ *.txt * Documentation/RCU/ *.txt
* *
*/ */
...@@ -32,31 +32,20 @@ ...@@ -32,31 +32,20 @@
#include <linux/mutex.h> #include <linux/mutex.h>
#include <linux/rcupdate.h> #include <linux/rcupdate.h>
#include <linux/workqueue.h> #include <linux/workqueue.h>
#include <linux/rcu_segcblist.h>
struct srcu_array { struct srcu_array {
unsigned long lock_count[2]; unsigned long lock_count[2];
unsigned long unlock_count[2]; unsigned long unlock_count[2];
}; };
struct rcu_batch {
struct rcu_head *head, **tail;
};
#define RCU_BATCH_INIT(name) { NULL, &(name.head) }
struct srcu_struct { struct srcu_struct {
unsigned long completed; unsigned long completed;
unsigned long srcu_gp_seq; unsigned long srcu_gp_seq;
struct srcu_array __percpu *per_cpu_ref; struct srcu_array __percpu *per_cpu_ref;
spinlock_t queue_lock; /* protect ->batch_queue, ->running */ spinlock_t queue_lock; /* protect ->srcu_cblist, ->srcu_state */
int srcu_state; int srcu_state;
/* callbacks just queued */ struct rcu_segcblist srcu_cblist;
struct rcu_batch batch_queue;
/* callbacks try to do the first check_zero */
struct rcu_batch batch_check0;
/* callbacks done with the first check_zero and the flip */
struct rcu_batch batch_check1;
struct rcu_batch batch_done;
struct delayed_work work; struct delayed_work work;
#ifdef CONFIG_DEBUG_LOCK_ALLOC #ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map; struct lockdep_map dep_map;
...@@ -97,10 +86,7 @@ void process_srcu(struct work_struct *work); ...@@ -97,10 +86,7 @@ void process_srcu(struct work_struct *work);
.per_cpu_ref = &name##_srcu_array, \ .per_cpu_ref = &name##_srcu_array, \
.queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \ .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \
.srcu_state = SRCU_STATE_IDLE, \ .srcu_state = SRCU_STATE_IDLE, \
.batch_queue = RCU_BATCH_INIT(name.batch_queue), \ .srcu_cblist = RCU_SEGCBLIST_INITIALIZER(name.srcu_cblist),\
.batch_check0 = RCU_BATCH_INIT(name.batch_check0), \
.batch_check1 = RCU_BATCH_INIT(name.batch_check1), \
.batch_done = RCU_BATCH_INIT(name.batch_done), \
.work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\ .work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\
__SRCU_DEP_MAP_INIT(name) \ __SRCU_DEP_MAP_INIT(name) \
} }
......
...@@ -87,6 +87,12 @@ static inline unsigned long rcu_seq_snap(unsigned long *sp) ...@@ -87,6 +87,12 @@ static inline unsigned long rcu_seq_snap(unsigned long *sp)
return s; return s;
} }
/* Return the current value the update side's sequence number, no ordering. */
static inline unsigned long rcu_seq_current(unsigned long *sp)
{
return READ_ONCE(*sp);
}
/* /*
* Given a snapshot from rcu_seq_snap(), determine whether or not a * Given a snapshot from rcu_seq_snap(), determine whether or not a
* full update-side operation has occurred. * full update-side operation has occurred.
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
* Lai Jiangshan <laijs@cn.fujitsu.com> * Lai Jiangshan <laijs@cn.fujitsu.com>
* *
* For detailed explanation of Read-Copy Update mechanism see - * For detailed explanation of Read-Copy Update mechanism see -
* Documentation/RCU/ *.txt * Documentation/RCU/ *.txt
* *
*/ */
...@@ -38,85 +38,13 @@ ...@@ -38,85 +38,13 @@
#include "rcu.h" #include "rcu.h"
/*
* Initialize an rcu_batch structure to empty.
*/
static inline void rcu_batch_init(struct rcu_batch *b)
{
b->head = NULL;
b->tail = &b->head;
}
/*
* Enqueue a callback onto the tail of the specified rcu_batch structure.
*/
static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head)
{
*b->tail = head;
b->tail = &head->next;
}
/*
* Is the specified rcu_batch structure empty?
*/
static inline bool rcu_batch_empty(struct rcu_batch *b)
{
return b->tail == &b->head;
}
/*
* Are all batches empty for the specified srcu_struct?
*/
static inline bool rcu_all_batches_empty(struct srcu_struct *sp)
{
return rcu_batch_empty(&sp->batch_done) &&
rcu_batch_empty(&sp->batch_check1) &&
rcu_batch_empty(&sp->batch_check0) &&
rcu_batch_empty(&sp->batch_queue);
}
/*
* Remove the callback at the head of the specified rcu_batch structure
* and return a pointer to it, or return NULL if the structure is empty.
*/
static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b)
{
struct rcu_head *head;
if (rcu_batch_empty(b))
return NULL;
head = b->head;
b->head = head->next;
if (b->tail == &head->next)
rcu_batch_init(b);
return head;
}
/*
* Move all callbacks from the rcu_batch structure specified by "from" to
* the structure specified by "to".
*/
static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from)
{
if (!rcu_batch_empty(from)) {
*to->tail = from->head;
to->tail = from->tail;
rcu_batch_init(from);
}
}
static int init_srcu_struct_fields(struct srcu_struct *sp) static int init_srcu_struct_fields(struct srcu_struct *sp)
{ {
sp->completed = 0; sp->completed = 0;
sp->srcu_gp_seq = 0; sp->srcu_gp_seq = 0;
spin_lock_init(&sp->queue_lock); spin_lock_init(&sp->queue_lock);
sp->srcu_state = SRCU_STATE_IDLE; sp->srcu_state = SRCU_STATE_IDLE;
rcu_batch_init(&sp->batch_queue); rcu_segcblist_init(&sp->srcu_cblist);
rcu_batch_init(&sp->batch_check0);
rcu_batch_init(&sp->batch_check1);
rcu_batch_init(&sp->batch_done);
INIT_DELAYED_WORK(&sp->work, process_srcu); INIT_DELAYED_WORK(&sp->work, process_srcu);
sp->per_cpu_ref = alloc_percpu(struct srcu_array); sp->per_cpu_ref = alloc_percpu(struct srcu_array);
return sp->per_cpu_ref ? 0 : -ENOMEM; return sp->per_cpu_ref ? 0 : -ENOMEM;
...@@ -268,7 +196,7 @@ void cleanup_srcu_struct(struct srcu_struct *sp) ...@@ -268,7 +196,7 @@ void cleanup_srcu_struct(struct srcu_struct *sp)
{ {
if (WARN_ON(srcu_readers_active(sp))) if (WARN_ON(srcu_readers_active(sp)))
return; /* Leakage unless caller handles error. */ return; /* Leakage unless caller handles error. */
if (WARN_ON(!rcu_all_batches_empty(sp))) if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist)))
return; /* Leakage unless caller handles error. */ return; /* Leakage unless caller handles error. */
flush_delayed_work(&sp->work); flush_delayed_work(&sp->work);
if (WARN_ON(READ_ONCE(sp->srcu_state) != SRCU_STATE_IDLE)) if (WARN_ON(READ_ONCE(sp->srcu_state) != SRCU_STATE_IDLE))
...@@ -324,6 +252,8 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); ...@@ -324,6 +252,8 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock);
*/ */
static void srcu_gp_start(struct srcu_struct *sp) static void srcu_gp_start(struct srcu_struct *sp)
{ {
rcu_segcblist_accelerate(&sp->srcu_cblist,
rcu_seq_snap(&sp->srcu_gp_seq));
WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1); WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1);
rcu_seq_start(&sp->srcu_gp_seq); rcu_seq_start(&sp->srcu_gp_seq);
} }
...@@ -371,6 +301,11 @@ static void srcu_gp_end(struct srcu_struct *sp) ...@@ -371,6 +301,11 @@ static void srcu_gp_end(struct srcu_struct *sp)
{ {
rcu_seq_end(&sp->srcu_gp_seq); rcu_seq_end(&sp->srcu_gp_seq);
WRITE_ONCE(sp->srcu_state, SRCU_STATE_DONE); WRITE_ONCE(sp->srcu_state, SRCU_STATE_DONE);
spin_lock_irq(&sp->queue_lock);
rcu_segcblist_advance(&sp->srcu_cblist,
rcu_seq_current(&sp->srcu_gp_seq));
spin_unlock_irq(&sp->queue_lock);
} }
/* /*
...@@ -409,7 +344,7 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head, ...@@ -409,7 +344,7 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
head->func = func; head->func = func;
spin_lock_irqsave(&sp->queue_lock, flags); spin_lock_irqsave(&sp->queue_lock, flags);
smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
rcu_batch_queue(&sp->batch_queue, head); rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) { if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) {
srcu_gp_start(sp); srcu_gp_start(sp);
queue_delayed_work(system_power_efficient_wq, &sp->work, 0); queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
...@@ -445,13 +380,13 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) ...@@ -445,13 +380,13 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) { if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) {
/* steal the processing owner */ /* steal the processing owner */
rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
srcu_gp_start(sp); srcu_gp_start(sp);
rcu_batch_queue(&sp->batch_check0, head);
spin_unlock_irq(&sp->queue_lock); spin_unlock_irq(&sp->queue_lock);
/* give the processing owner to work_struct */ /* give the processing owner to work_struct */
srcu_reschedule(sp, 0); srcu_reschedule(sp, 0);
} else { } else {
rcu_batch_queue(&sp->batch_queue, head); rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
spin_unlock_irq(&sp->queue_lock); spin_unlock_irq(&sp->queue_lock);
} }
...@@ -548,19 +483,6 @@ EXPORT_SYMBOL_GPL(srcu_batches_completed); ...@@ -548,19 +483,6 @@ EXPORT_SYMBOL_GPL(srcu_batches_completed);
#define SRCU_CALLBACK_BATCH 10 #define SRCU_CALLBACK_BATCH 10
#define SRCU_INTERVAL 1 #define SRCU_INTERVAL 1
/*
* Move any new SRCU callbacks to the first stage of the SRCU grace
* period pipeline.
*/
static void srcu_collect_new(struct srcu_struct *sp)
{
if (!rcu_batch_empty(&sp->batch_queue)) {
spin_lock_irq(&sp->queue_lock);
rcu_batch_move(&sp->batch_check0, &sp->batch_queue);
spin_unlock_irq(&sp->queue_lock);
}
}
/* /*
* Core SRCU state machine. Advance callbacks from ->batch_check0 to * Core SRCU state machine. Advance callbacks from ->batch_check0 to
* ->batch_check1 and then to ->batch_done as readers drain. * ->batch_check1 and then to ->batch_done as readers drain.
...@@ -586,26 +508,7 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) ...@@ -586,26 +508,7 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
idx = 1 ^ (sp->completed & 1); idx = 1 ^ (sp->completed & 1);
if (!try_check_zero(sp, idx, trycount)) if (!try_check_zero(sp, idx, trycount))
return; /* readers present, retry after SRCU_INTERVAL */ return; /* readers present, retry after SRCU_INTERVAL */
/*
* The callbacks in ->batch_check1 have already done
* with their first zero check and flip back when they were
* enqueued on ->batch_check0 in a previous invocation of
* srcu_advance_batches(). (Presumably try_check_zero()
* returned false during that invocation, leaving the
* callbacks stranded on ->batch_check1.) They are therefore
* ready to invoke, so move them to ->batch_done.
*/
rcu_batch_move(&sp->batch_done, &sp->batch_check1);
srcu_flip(sp); srcu_flip(sp);
/*
* The callbacks in ->batch_check0 just finished their
* first check zero and flip, so move them to ->batch_check1
* for future checking on the other idx.
*/
rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN2); WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN2);
} }
...@@ -619,14 +522,6 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) ...@@ -619,14 +522,6 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
trycount = trycount < 2 ? 2 : trycount; trycount = trycount < 2 ? 2 : trycount;
if (!try_check_zero(sp, idx, trycount)) if (!try_check_zero(sp, idx, trycount))
return; /* readers present, retry after SRCU_INTERVAL */ return; /* readers present, retry after SRCU_INTERVAL */
/*
* The callbacks in ->batch_check1 have now waited for
* all pre-existing readers using both idx values. They are
* therefore ready to invoke, so move them to ->batch_done.
*/
rcu_batch_move(&sp->batch_done, &sp->batch_check1);
srcu_gp_end(sp); srcu_gp_end(sp);
} }
} }
...@@ -639,17 +534,26 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) ...@@ -639,17 +534,26 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
*/ */
static void srcu_invoke_callbacks(struct srcu_struct *sp) static void srcu_invoke_callbacks(struct srcu_struct *sp)
{ {
int i; struct rcu_cblist ready_cbs;
struct rcu_head *head; struct rcu_head *rhp;
for (i = 0; i < SRCU_CALLBACK_BATCH; i++) { spin_lock_irq(&sp->queue_lock);
head = rcu_batch_dequeue(&sp->batch_done); if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) {
if (!head) spin_unlock_irq(&sp->queue_lock);
break; return;
}
rcu_cblist_init(&ready_cbs);
rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs);
spin_unlock_irq(&sp->queue_lock);
rhp = rcu_cblist_dequeue(&ready_cbs);
for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) {
local_bh_disable(); local_bh_disable();
head->func(head); rhp->func(rhp);
local_bh_enable(); local_bh_enable();
} }
spin_lock_irq(&sp->queue_lock);
rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs);
spin_unlock_irq(&sp->queue_lock);
} }
/* /*
...@@ -660,9 +564,9 @@ static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay) ...@@ -660,9 +564,9 @@ static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
{ {
bool pending = true; bool pending = true;
if (rcu_all_batches_empty(sp)) { if (rcu_segcblist_empty(&sp->srcu_cblist)) {
spin_lock_irq(&sp->queue_lock); spin_lock_irq(&sp->queue_lock);
if (rcu_all_batches_empty(sp) && if (rcu_segcblist_empty(&sp->srcu_cblist) &&
READ_ONCE(sp->srcu_state) == SRCU_STATE_DONE) { READ_ONCE(sp->srcu_state) == SRCU_STATE_DONE) {
WRITE_ONCE(sp->srcu_state, SRCU_STATE_IDLE); WRITE_ONCE(sp->srcu_state, SRCU_STATE_IDLE);
pending = false; pending = false;
...@@ -683,7 +587,6 @@ void process_srcu(struct work_struct *work) ...@@ -683,7 +587,6 @@ void process_srcu(struct work_struct *work)
sp = container_of(work, struct srcu_struct, work.work); sp = container_of(work, struct srcu_struct, work.work);
srcu_collect_new(sp);
srcu_advance_batches(sp, 1); srcu_advance_batches(sp, 1);
srcu_invoke_callbacks(sp); srcu_invoke_callbacks(sp);
srcu_reschedule(sp, SRCU_INTERVAL); srcu_reschedule(sp, SRCU_INTERVAL);
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
#include <linux/seqlock.h> #include <linux/seqlock.h>
#include <linux/swait.h> #include <linux/swait.h>
#include <linux/stop_machine.h> #include <linux/stop_machine.h>
#include "rcu_segcblist.h" #include <linux/rcu_segcblist.h>
/* /*
* Define shape of hierarchy based on NR_CPUS, CONFIG_RCU_FANOUT, and * Define shape of hierarchy based on NR_CPUS, CONFIG_RCU_FANOUT, and
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment