Commit 8be6e1b1 authored by Paul E. McKenney's avatar Paul E. McKenney

rcu: Use timer as backstop for NOCB deferred wakeups

The handling of RCU's no-CBs CPUs has a maintenance headache, namely
that if call_rcu() is invoked with interrupts disabled, the rcuo kthread
wakeup must be defered to a point where we can be sure that scheduler
locks are not held.  Of course, there are a lot of code paths leading
from an interrupts-disabled invocation of call_rcu(), and missing any
one of these can result in excessive callback-invocation latency, and
potentially even system hangs.

This commit therefore uses a timer to guarantee that the wakeup will
eventually occur.  If one of the deferred-wakeup points kicks in, then
the timer is simply cancelled.

This commit also fixes up an incomplete removal of commits that were
intended to plug remaining exit paths, which should have the added
benefit of reducing the overhead of RCU's context-switch hooks.  In
addition, it simplifies leader-to-follower callback-list handoff by
introducing locking.  The call_rcu()-to-leader handoff continues to
use atomic operations in order to maintain good real-time latency for
common-case use of call_rcu().
Signed-off-by: default avatarPaul E. McKenney <paulmck@linux.vnet.ibm.com>
[ paulmck: Dan Carpenter fix for mod_timer() usage bug found by smatch. ]
parent 520eccdf
......@@ -268,7 +268,9 @@ struct rcu_data {
struct rcu_head **nocb_follower_tail;
struct swait_queue_head nocb_wq; /* For nocb kthreads to sleep on. */
struct task_struct *nocb_kthread;
raw_spinlock_t nocb_lock; /* Guard following pair of fields. */
int nocb_defer_wakeup; /* Defer wakeup of nocb_kthread. */
struct timer_list nocb_timer; /* Enforce finite deferral. */
/* The following fields are used by the leader, hence own cacheline. */
struct rcu_head *nocb_gp_head ____cacheline_internodealigned_in_smp;
......
......@@ -1788,22 +1788,61 @@ bool rcu_is_nocb_cpu(int cpu)
}
/*
* Kick the leader kthread for this NOCB group.
* Kick the leader kthread for this NOCB group. Caller holds ->nocb_lock
* and this function releases it.
*/
static void wake_nocb_leader(struct rcu_data *rdp, bool force)
static void __wake_nocb_leader(struct rcu_data *rdp, bool force,
unsigned long flags)
__releases(rdp->nocb_lock)
{
struct rcu_data *rdp_leader = rdp->nocb_leader;
if (!READ_ONCE(rdp_leader->nocb_kthread))
lockdep_assert_held(&rdp->nocb_lock);
if (!READ_ONCE(rdp_leader->nocb_kthread)) {
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
return;
if (READ_ONCE(rdp_leader->nocb_leader_sleep) || force) {
}
if (rdp_leader->nocb_leader_sleep || force) {
/* Prior smp_mb__after_atomic() orders against prior enqueue. */
WRITE_ONCE(rdp_leader->nocb_leader_sleep, false);
del_timer(&rdp->nocb_timer);
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
smp_mb(); /* ->nocb_leader_sleep before swake_up(). */
swake_up(&rdp_leader->nocb_wq);
} else {
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
}
}
/*
* Kick the leader kthread for this NOCB group, but caller has not
* acquired locks.
*/
static void wake_nocb_leader(struct rcu_data *rdp, bool force)
{
unsigned long flags;
raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
__wake_nocb_leader(rdp, force, flags);
}
/*
* Arrange to wake the leader kthread for this NOCB group at some
* future time when it is safe to do so.
*/
static void wake_nocb_leader_defer(struct rcu_data *rdp, int waketype,
const char *reason)
{
unsigned long flags;
raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT)
mod_timer(&rdp->nocb_timer, jiffies + 1);
WRITE_ONCE(rdp->nocb_defer_wakeup, waketype);
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, reason);
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
}
/*
* Does the specified CPU need an RCU callback for the specified flavor
* of rcu_barrier()?
......@@ -1891,10 +1930,7 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
TPS("WakeEmpty"));
} else {
WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE);
/* Store ->nocb_defer_wakeup before ->rcu_urgent_qs. */
smp_store_release(this_cpu_ptr(&rcu_dynticks.rcu_urgent_qs), true);
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE,
TPS("WakeEmptyIsDeferred"));
}
rdp->qlen_last_fqs_check = 0;
......@@ -1905,10 +1941,7 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
TPS("WakeOvf"));
} else {
WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_FORCE);
/* Store ->nocb_defer_wakeup before ->rcu_urgent_qs. */
smp_store_release(this_cpu_ptr(&rcu_dynticks.rcu_urgent_qs), true);
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE,
TPS("WakeOvfIsDeferred"));
}
rdp->qlen_last_fqs_check = LONG_MAX / 2;
......@@ -2031,6 +2064,7 @@ static void rcu_nocb_wait_gp(struct rcu_data *rdp)
static void nocb_leader_wait(struct rcu_data *my_rdp)
{
bool firsttime = true;
unsigned long flags;
bool gotcbs;
struct rcu_data *rdp;
struct rcu_head **tail;
......@@ -2042,7 +2076,11 @@ static void nocb_leader_wait(struct rcu_data *my_rdp)
trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Sleep");
swait_event_interruptible(my_rdp->nocb_wq,
!READ_ONCE(my_rdp->nocb_leader_sleep));
/* Memory barrier handled by smp_mb() calls below and repoll. */
raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
my_rdp->nocb_leader_sleep = true;
WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
del_timer(&my_rdp->nocb_timer);
raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
} else if (firsttime) {
firsttime = false; /* Don't drown trace log with "Poll"! */
trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Poll");
......@@ -2054,7 +2092,7 @@ static void nocb_leader_wait(struct rcu_data *my_rdp)
* nocb_gp_head, where they await a grace period.
*/
gotcbs = false;
smp_mb(); /* wakeup before ->nocb_head reads. */
smp_mb(); /* wakeup and _sleep before ->nocb_head reads. */
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
rdp->nocb_gp_head = READ_ONCE(rdp->nocb_head);
if (!rdp->nocb_gp_head)
......@@ -2066,25 +2104,14 @@ static void nocb_leader_wait(struct rcu_data *my_rdp)
gotcbs = true;
}
/*
* If there were no callbacks, sleep a bit, rescan after a
* memory barrier, and go retry.
*/
/* No callbacks? Sleep a bit if polling, and go retry. */
if (unlikely(!gotcbs)) {
if (!rcu_nocb_poll)
trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu,
"WokeEmpty");
WARN_ON(signal_pending(current));
if (rcu_nocb_poll) {
schedule_timeout_interruptible(1);
/* Rescan in case we were a victim of memory ordering. */
my_rdp->nocb_leader_sleep = true;
smp_mb(); /* Ensure _sleep true before scan. */
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower)
if (READ_ONCE(rdp->nocb_head)) {
/* Found CB, so short-circuit next wait. */
my_rdp->nocb_leader_sleep = false;
break;
} else {
trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu,
"WokeEmpty");
}
goto wait_again;
}
......@@ -2092,30 +2119,26 @@ static void nocb_leader_wait(struct rcu_data *my_rdp)
/* Wait for one grace period. */
rcu_nocb_wait_gp(my_rdp);
/*
* We left ->nocb_leader_sleep unset to reduce cache thrashing.
* We set it now, but recheck for new callbacks while
* traversing our follower list.
*/
my_rdp->nocb_leader_sleep = true;
smp_mb(); /* Ensure _sleep true before scan of ->nocb_head. */
/* Each pass through the following loop wakes a follower, if needed. */
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
if (READ_ONCE(rdp->nocb_head))
if (!rcu_nocb_poll &&
READ_ONCE(rdp->nocb_head) &&
READ_ONCE(my_rdp->nocb_leader_sleep)) {
raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
my_rdp->nocb_leader_sleep = false;/* No need to sleep.*/
raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
}
if (!rdp->nocb_gp_head)
continue; /* No CBs, so no need to wake follower. */
/* Append callbacks to follower's "done" list. */
tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail);
raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
tail = rdp->nocb_follower_tail;
rdp->nocb_follower_tail = rdp->nocb_gp_tail;
*tail = rdp->nocb_gp_head;
smp_mb__after_atomic(); /* Store *tail before wakeup. */
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
if (rdp != my_rdp && tail == &rdp->nocb_follower_head) {
/*
* List was empty, wake up the follower.
* Memory barriers supplied by atomic_long_add().
*/
/* List was empty, so wake up the follower. */
swake_up(&rdp->nocb_wq);
}
}
......@@ -2131,28 +2154,16 @@ static void nocb_leader_wait(struct rcu_data *my_rdp)
*/
static void nocb_follower_wait(struct rcu_data *rdp)
{
bool firsttime = true;
for (;;) {
if (!rcu_nocb_poll) {
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
"FollowerSleep");
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "FollowerSleep");
swait_event_interruptible(rdp->nocb_wq,
READ_ONCE(rdp->nocb_follower_head));
} else if (firsttime) {
/* Don't drown trace log with "Poll"! */
firsttime = false;
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "Poll");
}
if (smp_load_acquire(&rdp->nocb_follower_head)) {
/* ^^^ Ensure CB invocation follows _head test. */
return;
}
if (!rcu_nocb_poll)
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
"WokeEmpty");
WARN_ON(signal_pending(current));
schedule_timeout_interruptible(1);
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeEmpty");
}
}
......@@ -2165,6 +2176,7 @@ static void nocb_follower_wait(struct rcu_data *rdp)
static int rcu_nocb_kthread(void *arg)
{
int c, cl;
unsigned long flags;
struct rcu_head *list;
struct rcu_head *next;
struct rcu_head **tail;
......@@ -2179,11 +2191,14 @@ static int rcu_nocb_kthread(void *arg)
nocb_follower_wait(rdp);
/* Pull the ready-to-invoke callbacks onto local list. */
list = READ_ONCE(rdp->nocb_follower_head);
raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
list = rdp->nocb_follower_head;
rdp->nocb_follower_head = NULL;
tail = rdp->nocb_follower_tail;
rdp->nocb_follower_tail = &rdp->nocb_follower_head;
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
BUG_ON(!list);
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeNonEmpty");
WRITE_ONCE(rdp->nocb_follower_head, NULL);
tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head);
/* Each pass through the following loop invokes a callback. */
trace_rcu_batch_start(rdp->rsp->name,
......@@ -2226,18 +2241,39 @@ static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp)
}
/* Do a deferred wakeup of rcu_nocb_kthread(). */
static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp)
{
unsigned long flags;
int ndw;
if (!rcu_nocb_need_deferred_wakeup(rdp))
raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
if (!rcu_nocb_need_deferred_wakeup(rdp)) {
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
return;
}
ndw = READ_ONCE(rdp->nocb_defer_wakeup);
WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
wake_nocb_leader(rdp, ndw == RCU_NOCB_WAKE_FORCE);
__wake_nocb_leader(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("DeferredWake"));
}
/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
static void do_nocb_deferred_wakeup_timer(unsigned long x)
{
do_nocb_deferred_wakeup_common((struct rcu_data *)x);
}
/*
* Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
* This means we do an inexact common-case check. Note that if
* we miss, ->nocb_timer will eventually clean things up.
*/
static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
{
if (rcu_nocb_need_deferred_wakeup(rdp))
do_nocb_deferred_wakeup_common(rdp);
}
void __init rcu_init_nohz(void)
{
int cpu;
......@@ -2287,6 +2323,9 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
rdp->nocb_tail = &rdp->nocb_head;
init_swait_queue_head(&rdp->nocb_wq);
rdp->nocb_follower_tail = &rdp->nocb_follower_head;
raw_spin_lock_init(&rdp->nocb_lock);
setup_timer(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer,
(unsigned long)rdp);
}
/*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment