Commit 5d6742b3 authored by Paul E. McKenney's avatar Paul E. McKenney

rcu/nocb: Use rcu_segcblist for no-CBs CPUs

Currently the RCU callbacks for no-CBs CPUs are queued on a series of
ad-hoc linked lists, which means that these callbacks cannot benefit
from "drive-by" grace periods, thus suffering needless delays prior
to invocation.  In addition, the no-CBs grace-period kthreads first
wait for callbacks to appear and later wait for a new grace period,
which means that callbacks appearing during a grace-period wait can
be delayed.  These delays increase memory footprint, and could even
result in an out-of-memory condition.

This commit therefore enqueues RCU callbacks from no-CBs CPUs on the
rcu_segcblist structure that is already used by non-no-CBs CPUs.  It also
restructures the no-CBs grace-period kthread to be checking for incoming
callbacks while waiting for grace periods.  Also, instead of waiting
for a new grace period, it waits for the closest grace period that will
cause some of the callbacks to be safe to invoke.  All of these changes
reduce callback latency and thus the number of outstanding callbacks,
in turn reducing the probability of an out-of-memory condition.
Signed-off-by: default avatarPaul E. McKenney <paulmck@linux.ibm.com>
parent e83e73f5
...@@ -100,7 +100,6 @@ TRACE_EVENT_RCU(rcu_grace_period, ...@@ -100,7 +100,6 @@ TRACE_EVENT_RCU(rcu_grace_period,
* "Startedroot": Requested a nocb grace period based on root-node data. * "Startedroot": Requested a nocb grace period based on root-node data.
* "NoGPkthread": The RCU grace-period kthread has not yet started. * "NoGPkthread": The RCU grace-period kthread has not yet started.
* "StartWait": Start waiting for the requested grace period. * "StartWait": Start waiting for the requested grace period.
* "ResumeWait": Resume waiting after signal.
* "EndWait": Complete wait. * "EndWait": Complete wait.
* "Cleanup": Clean up rcu_node structure after previous GP. * "Cleanup": Clean up rcu_node structure after previous GP.
* "CleanupMore": Clean up, and another GP is needed. * "CleanupMore": Clean up, and another GP is needed.
......
...@@ -127,6 +127,18 @@ struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp) ...@@ -127,6 +127,18 @@ struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp)
return NULL; return NULL;
} }
/*
* Return false if there are no CBs awaiting grace periods, otherwise,
* return true and store the nearest waited-upon grace period into *lp.
*/
bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp)
{
if (!rcu_segcblist_pend_cbs(rsclp))
return false;
*lp = rsclp->gp_seq[RCU_WAIT_TAIL];
return true;
}
/* /*
* Enqueue the specified callback onto the specified rcu_segcblist * Enqueue the specified callback onto the specified rcu_segcblist
* structure, updating accounting as needed. Note that the ->len * structure, updating accounting as needed. Note that the ->len
......
...@@ -89,6 +89,7 @@ bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp); ...@@ -89,6 +89,7 @@ bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp);
bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp); bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp);
struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp); struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp);
struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp); struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp);
bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp);
void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp, void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
struct rcu_head *rhp, bool lazy); struct rcu_head *rhp, bool lazy);
bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp, bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
......
...@@ -1343,8 +1343,10 @@ static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp) ...@@ -1343,8 +1343,10 @@ static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp)
*/ */
static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp) static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp)
{ {
bool ret; bool ret = false;
bool need_gp; bool need_gp;
const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
rcu_segcblist_is_offloaded(&rdp->cblist);
raw_lockdep_assert_held_rcu_node(rnp); raw_lockdep_assert_held_rcu_node(rnp);
...@@ -1354,10 +1356,12 @@ static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp) ...@@ -1354,10 +1356,12 @@ static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp)
/* Handle the ends of any preceding grace periods first. */ /* Handle the ends of any preceding grace periods first. */
if (rcu_seq_completed_gp(rdp->gp_seq, rnp->gp_seq) || if (rcu_seq_completed_gp(rdp->gp_seq, rnp->gp_seq) ||
unlikely(READ_ONCE(rdp->gpwrap))) { unlikely(READ_ONCE(rdp->gpwrap))) {
ret = rcu_advance_cbs(rnp, rdp); /* Advance callbacks. */ if (!offloaded)
ret = rcu_advance_cbs(rnp, rdp); /* Advance CBs. */
trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuend")); trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuend"));
} else { } else {
ret = rcu_accelerate_cbs(rnp, rdp); /* Recent callbacks. */ if (!offloaded)
ret = rcu_accelerate_cbs(rnp, rdp); /* Recent CBs. */
} }
/* Now handle the beginnings of any new-to-this-CPU grace periods. */ /* Now handle the beginnings of any new-to-this-CPU grace periods. */
...@@ -1658,6 +1662,7 @@ static void rcu_gp_cleanup(void) ...@@ -1658,6 +1662,7 @@ static void rcu_gp_cleanup(void)
unsigned long gp_duration; unsigned long gp_duration;
bool needgp = false; bool needgp = false;
unsigned long new_gp_seq; unsigned long new_gp_seq;
bool offloaded;
struct rcu_data *rdp; struct rcu_data *rdp;
struct rcu_node *rnp = rcu_get_root(); struct rcu_node *rnp = rcu_get_root();
struct swait_queue_head *sq; struct swait_queue_head *sq;
...@@ -1723,7 +1728,9 @@ static void rcu_gp_cleanup(void) ...@@ -1723,7 +1728,9 @@ static void rcu_gp_cleanup(void)
needgp = true; needgp = true;
} }
/* Advance CBs to reduce false positives below. */ /* Advance CBs to reduce false positives below. */
if (!rcu_accelerate_cbs(rnp, rdp) && needgp) { offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
rcu_segcblist_is_offloaded(&rdp->cblist);
if ((offloaded || !rcu_accelerate_cbs(rnp, rdp)) && needgp) {
WRITE_ONCE(rcu_state.gp_flags, RCU_GP_FLAG_INIT); WRITE_ONCE(rcu_state.gp_flags, RCU_GP_FLAG_INIT);
rcu_state.gp_req_activity = jiffies; rcu_state.gp_req_activity = jiffies;
trace_rcu_grace_period(rcu_state.name, trace_rcu_grace_period(rcu_state.name,
...@@ -1917,7 +1924,9 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp) ...@@ -1917,7 +1924,9 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp)
{ {
unsigned long flags; unsigned long flags;
unsigned long mask; unsigned long mask;
bool needwake; bool needwake = false;
const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
rcu_segcblist_is_offloaded(&rdp->cblist);
struct rcu_node *rnp; struct rcu_node *rnp;
rnp = rdp->mynode; rnp = rdp->mynode;
...@@ -1944,7 +1953,8 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp) ...@@ -1944,7 +1953,8 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp)
* This GP can't end until cpu checks in, so all of our * This GP can't end until cpu checks in, so all of our
* callbacks can be processed during the next GP. * callbacks can be processed during the next GP.
*/ */
needwake = rcu_accelerate_cbs(rnp, rdp); if (!offloaded)
needwake = rcu_accelerate_cbs(rnp, rdp);
rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags); rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
/* ^^^ Released rnp->lock */ /* ^^^ Released rnp->lock */
...@@ -2082,7 +2092,6 @@ static void rcu_do_batch(struct rcu_data *rdp) ...@@ -2082,7 +2092,6 @@ static void rcu_do_batch(struct rcu_data *rdp)
struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl); struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
long bl, count; long bl, count;
WARN_ON_ONCE(rdp->cblist.offloaded);
/* If no callbacks are ready, just return. */ /* If no callbacks are ready, just return. */
if (!rcu_segcblist_ready_cbs(&rdp->cblist)) { if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
trace_rcu_batch_start(rcu_state.name, trace_rcu_batch_start(rcu_state.name,
...@@ -2101,13 +2110,14 @@ static void rcu_do_batch(struct rcu_data *rdp) ...@@ -2101,13 +2110,14 @@ static void rcu_do_batch(struct rcu_data *rdp)
* callback counts, as rcu_barrier() needs to be conservative. * callback counts, as rcu_barrier() needs to be conservative.
*/ */
local_irq_save(flags); local_irq_save(flags);
rcu_nocb_lock(rdp);
WARN_ON_ONCE(cpu_is_offline(smp_processor_id())); WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
bl = rdp->blimit; bl = rdp->blimit;
trace_rcu_batch_start(rcu_state.name, trace_rcu_batch_start(rcu_state.name,
rcu_segcblist_n_lazy_cbs(&rdp->cblist), rcu_segcblist_n_lazy_cbs(&rdp->cblist),
rcu_segcblist_n_cbs(&rdp->cblist), bl); rcu_segcblist_n_cbs(&rdp->cblist), bl);
rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl); rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl);
local_irq_restore(flags); rcu_nocb_unlock_irqrestore(rdp, flags);
/* Invoke callbacks. */ /* Invoke callbacks. */
rhp = rcu_cblist_dequeue(&rcl); rhp = rcu_cblist_dequeue(&rcl);
...@@ -2120,12 +2130,22 @@ static void rcu_do_batch(struct rcu_data *rdp) ...@@ -2120,12 +2130,22 @@ static void rcu_do_batch(struct rcu_data *rdp)
* Note: The rcl structure counts down from zero. * Note: The rcl structure counts down from zero.
*/ */
if (-rcl.len >= bl && if (-rcl.len >= bl &&
!rcu_segcblist_is_offloaded(&rdp->cblist) &&
(need_resched() || (need_resched() ||
(!is_idle_task(current) && !rcu_is_callbacks_kthread()))) (!is_idle_task(current) && !rcu_is_callbacks_kthread())))
break; break;
if (rcu_segcblist_is_offloaded(&rdp->cblist)) {
WARN_ON_ONCE(in_serving_softirq());
local_bh_enable();
lockdep_assert_irqs_enabled();
cond_resched_tasks_rcu_qs();
lockdep_assert_irqs_enabled();
local_bh_disable();
}
} }
local_irq_save(flags); local_irq_save(flags);
rcu_nocb_lock(rdp);
count = -rcl.len; count = -rcl.len;
trace_rcu_batch_end(rcu_state.name, count, !!rcl.head, need_resched(), trace_rcu_batch_end(rcu_state.name, count, !!rcl.head, need_resched(),
is_idle_task(current), rcu_is_callbacks_kthread()); is_idle_task(current), rcu_is_callbacks_kthread());
...@@ -2153,10 +2173,11 @@ static void rcu_do_batch(struct rcu_data *rdp) ...@@ -2153,10 +2173,11 @@ static void rcu_do_batch(struct rcu_data *rdp)
*/ */
WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) != (count == 0)); WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) != (count == 0));
local_irq_restore(flags); rcu_nocb_unlock_irqrestore(rdp, flags);
/* Re-invoke RCU core processing if there are callbacks remaining. */ /* Re-invoke RCU core processing if there are callbacks remaining. */
if (rcu_segcblist_ready_cbs(&rdp->cblist)) if (!rcu_segcblist_is_offloaded(&rdp->cblist) &&
rcu_segcblist_ready_cbs(&rdp->cblist))
invoke_rcu_core(); invoke_rcu_core();
} }
...@@ -2312,7 +2333,8 @@ static __latent_entropy void rcu_core(void) ...@@ -2312,7 +2333,8 @@ static __latent_entropy void rcu_core(void)
rcu_check_gp_start_stall(rnp, rdp, rcu_jiffies_till_stall_check()); rcu_check_gp_start_stall(rnp, rdp, rcu_jiffies_till_stall_check());
/* If there are callbacks ready, invoke them. */ /* If there are callbacks ready, invoke them. */
if (rcu_segcblist_ready_cbs(&rdp->cblist) && if (!rcu_segcblist_is_offloaded(&rdp->cblist) &&
rcu_segcblist_ready_cbs(&rdp->cblist) &&
likely(READ_ONCE(rcu_scheduler_fully_active))) likely(READ_ONCE(rcu_scheduler_fully_active)))
rcu_do_batch(rdp); rcu_do_batch(rdp);
...@@ -2492,10 +2514,11 @@ static void rcu_leak_callback(struct rcu_head *rhp) ...@@ -2492,10 +2514,11 @@ static void rcu_leak_callback(struct rcu_head *rhp)
* is expected to specify a CPU. * is expected to specify a CPU.
*/ */
static void static void
__call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) __call_rcu(struct rcu_head *head, rcu_callback_t func, bool lazy)
{ {
unsigned long flags; unsigned long flags;
struct rcu_data *rdp; struct rcu_data *rdp;
bool was_alldone;
/* Misaligned rcu_head! */ /* Misaligned rcu_head! */
WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1)); WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1));
...@@ -2517,29 +2540,17 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) ...@@ -2517,29 +2540,17 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
rdp = this_cpu_ptr(&rcu_data); rdp = this_cpu_ptr(&rcu_data);
/* Add the callback to our list. */ /* Add the callback to our list. */
if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist)) || if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist))) {
rcu_segcblist_is_offloaded(&rdp->cblist) || cpu != -1) { // This can trigger due to call_rcu() from offline CPU:
int offline; WARN_ON_ONCE(rcu_scheduler_active != RCU_SCHEDULER_INACTIVE);
if (cpu != -1)
rdp = per_cpu_ptr(&rcu_data, cpu);
if (likely(rdp->mynode)) {
/* Post-boot, so this should be for a no-CBs CPU. */
offline = !__call_rcu_nocb(rdp, head, lazy, flags);
WARN_ON_ONCE(offline);
/* Offline CPU, _call_rcu() illegal, leak callback. */
local_irq_restore(flags);
return;
}
/*
* Very early boot, before rcu_init(). Initialize if needed
* and then drop through to queue the callback.
*/
WARN_ON_ONCE(cpu != -1);
WARN_ON_ONCE(!rcu_is_watching()); WARN_ON_ONCE(!rcu_is_watching());
// Very early boot, before rcu_init(). Initialize if needed
// and then drop through to queue the callback.
if (rcu_segcblist_empty(&rdp->cblist)) if (rcu_segcblist_empty(&rdp->cblist))
rcu_segcblist_init(&rdp->cblist); rcu_segcblist_init(&rdp->cblist);
} }
rcu_nocb_lock(rdp);
was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
rcu_segcblist_enqueue(&rdp->cblist, head, lazy); rcu_segcblist_enqueue(&rdp->cblist, head, lazy);
if (__is_kfree_rcu_offset((unsigned long)func)) if (__is_kfree_rcu_offset((unsigned long)func))
trace_rcu_kfree_callback(rcu_state.name, head, trace_rcu_kfree_callback(rcu_state.name, head,
...@@ -2552,8 +2563,13 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) ...@@ -2552,8 +2563,13 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
rcu_segcblist_n_cbs(&rdp->cblist)); rcu_segcblist_n_cbs(&rdp->cblist));
/* Go handle any RCU core processing required. */ /* Go handle any RCU core processing required. */
__call_rcu_core(rdp, head, flags); if (IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
local_irq_restore(flags); unlikely(rcu_segcblist_is_offloaded(&rdp->cblist))) {
__call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */
} else {
__call_rcu_core(rdp, head, flags);
local_irq_restore(flags);
}
} }
/** /**
...@@ -2593,7 +2609,7 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) ...@@ -2593,7 +2609,7 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
*/ */
void call_rcu(struct rcu_head *head, rcu_callback_t func) void call_rcu(struct rcu_head *head, rcu_callback_t func)
{ {
__call_rcu(head, func, -1, 0); __call_rcu(head, func, 0);
} }
EXPORT_SYMBOL_GPL(call_rcu); EXPORT_SYMBOL_GPL(call_rcu);
...@@ -2606,7 +2622,7 @@ EXPORT_SYMBOL_GPL(call_rcu); ...@@ -2606,7 +2622,7 @@ EXPORT_SYMBOL_GPL(call_rcu);
*/ */
void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
{ {
__call_rcu(head, func, -1, 1); __call_rcu(head, func, 1);
} }
EXPORT_SYMBOL_GPL(kfree_call_rcu); EXPORT_SYMBOL_GPL(kfree_call_rcu);
...@@ -2806,6 +2822,7 @@ static void rcu_barrier_func(void *unused) ...@@ -2806,6 +2822,7 @@ static void rcu_barrier_func(void *unused)
rcu_barrier_trace(TPS("IRQ"), -1, rcu_state.barrier_sequence); rcu_barrier_trace(TPS("IRQ"), -1, rcu_state.barrier_sequence);
rdp->barrier_head.func = rcu_barrier_callback; rdp->barrier_head.func = rcu_barrier_callback;
debug_rcu_head_queue(&rdp->barrier_head); debug_rcu_head_queue(&rdp->barrier_head);
rcu_nocb_lock(rdp);
if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head, 0)) { if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head, 0)) {
atomic_inc(&rcu_state.barrier_cpu_count); atomic_inc(&rcu_state.barrier_cpu_count);
} else { } else {
...@@ -2813,6 +2830,7 @@ static void rcu_barrier_func(void *unused) ...@@ -2813,6 +2830,7 @@ static void rcu_barrier_func(void *unused)
rcu_barrier_trace(TPS("IRQNQ"), -1, rcu_barrier_trace(TPS("IRQNQ"), -1,
rcu_state.barrier_sequence); rcu_state.barrier_sequence);
} }
rcu_nocb_unlock(rdp);
} }
/** /**
...@@ -2867,19 +2885,7 @@ void rcu_barrier(void) ...@@ -2867,19 +2885,7 @@ void rcu_barrier(void)
if (!cpu_online(cpu) && if (!cpu_online(cpu) &&
!rcu_segcblist_is_offloaded(&rdp->cblist)) !rcu_segcblist_is_offloaded(&rdp->cblist))
continue; continue;
if (rcu_segcblist_is_offloaded(&rdp->cblist)) { if (rcu_segcblist_n_cbs(&rdp->cblist)) {
if (!rcu_nocb_cpu_needs_barrier(cpu)) {
rcu_barrier_trace(TPS("OfflineNoCB"), cpu,
rcu_state.barrier_sequence);
} else {
rcu_barrier_trace(TPS("OnlineNoCB"), cpu,
rcu_state.barrier_sequence);
smp_mb__before_atomic();
atomic_inc(&rcu_state.barrier_cpu_count);
__call_rcu(&rdp->barrier_head,
rcu_barrier_callback, cpu, 0);
}
} else if (rcu_segcblist_n_cbs(&rdp->cblist)) {
rcu_barrier_trace(TPS("OnlineQ"), cpu, rcu_barrier_trace(TPS("OnlineQ"), cpu,
rcu_state.barrier_sequence); rcu_state.barrier_sequence);
smp_call_function_single(cpu, rcu_barrier_func, NULL, 1); smp_call_function_single(cpu, rcu_barrier_func, NULL, 1);
...@@ -3169,10 +3175,7 @@ void rcutree_migrate_callbacks(int cpu) ...@@ -3169,10 +3175,7 @@ void rcutree_migrate_callbacks(int cpu)
local_irq_save(flags); local_irq_save(flags);
my_rdp = this_cpu_ptr(&rcu_data); my_rdp = this_cpu_ptr(&rcu_data);
my_rnp = my_rdp->mynode; my_rnp = my_rdp->mynode;
if (rcu_nocb_adopt_orphan_cbs(my_rdp, rdp, flags)) { rcu_nocb_lock(my_rdp); /* irqs already disabled. */
local_irq_restore(flags);
return;
}
raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */ raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
/* Leverage recent GPs and set GP for new callbacks. */ /* Leverage recent GPs and set GP for new callbacks. */
needwake = rcu_advance_cbs(my_rnp, rdp) || needwake = rcu_advance_cbs(my_rnp, rdp) ||
...@@ -3180,9 +3183,16 @@ void rcutree_migrate_callbacks(int cpu) ...@@ -3180,9 +3183,16 @@ void rcutree_migrate_callbacks(int cpu)
rcu_segcblist_merge(&my_rdp->cblist, &rdp->cblist); rcu_segcblist_merge(&my_rdp->cblist, &rdp->cblist);
WARN_ON_ONCE(rcu_segcblist_empty(&my_rdp->cblist) != WARN_ON_ONCE(rcu_segcblist_empty(&my_rdp->cblist) !=
!rcu_segcblist_n_cbs(&my_rdp->cblist)); !rcu_segcblist_n_cbs(&my_rdp->cblist));
raw_spin_unlock_irqrestore_rcu_node(my_rnp, flags); if (rcu_segcblist_is_offloaded(&my_rdp->cblist)) {
raw_spin_unlock_rcu_node(my_rnp); /* irqs remain disabled. */
__call_rcu_nocb_wake(my_rdp, true, flags);
} else {
rcu_nocb_unlock(my_rdp); /* irqs remain disabled. */
raw_spin_unlock_irqrestore_rcu_node(my_rnp, flags);
}
if (needwake) if (needwake)
rcu_gp_kthread_wake(); rcu_gp_kthread_wake();
lockdep_assert_irqs_enabled();
WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 || WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 ||
!rcu_segcblist_empty(&rdp->cblist), !rcu_segcblist_empty(&rdp->cblist),
"rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 1stCB=%p\n", "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 1stCB=%p\n",
......
...@@ -211,7 +211,9 @@ struct rcu_data { ...@@ -211,7 +211,9 @@ struct rcu_data {
/* CBs waiting for GP. */ /* CBs waiting for GP. */
struct rcu_head **nocb_gp_tail; struct rcu_head **nocb_gp_tail;
bool nocb_gp_sleep; /* Is the nocb GP thread asleep? */ bool nocb_gp_sleep; /* Is the nocb GP thread asleep? */
bool nocb_gp_forced; /* Forced nocb GP thread wakeup? */
struct swait_queue_head nocb_gp_wq; /* For nocb kthreads to sleep on. */ struct swait_queue_head nocb_gp_wq; /* For nocb kthreads to sleep on. */
bool nocb_cb_sleep; /* Is the nocb CB thread asleep? */
struct task_struct *nocb_cb_kthread; struct task_struct *nocb_cb_kthread;
struct rcu_data *nocb_next_cb_rdp; struct rcu_data *nocb_next_cb_rdp;
/* Next rcu_data in wakeup chain. */ /* Next rcu_data in wakeup chain. */
...@@ -421,20 +423,20 @@ static bool rcu_preempt_has_tasks(struct rcu_node *rnp); ...@@ -421,20 +423,20 @@ static bool rcu_preempt_has_tasks(struct rcu_node *rnp);
static bool rcu_preempt_need_deferred_qs(struct task_struct *t); static bool rcu_preempt_need_deferred_qs(struct task_struct *t);
static void rcu_preempt_deferred_qs(struct task_struct *t); static void rcu_preempt_deferred_qs(struct task_struct *t);
static void zero_cpu_stall_ticks(struct rcu_data *rdp); static void zero_cpu_stall_ticks(struct rcu_data *rdp);
static bool rcu_nocb_cpu_needs_barrier(int cpu);
static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
static void rcu_init_one_nocb(struct rcu_node *rnp); static void rcu_init_one_nocb(struct rcu_node *rnp);
static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
bool lazy, unsigned long flags); unsigned long flags);
static bool rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp,
struct rcu_data *rdp,
unsigned long flags);
static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp); static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp);
static void do_nocb_deferred_wakeup(struct rcu_data *rdp); static void do_nocb_deferred_wakeup(struct rcu_data *rdp);
static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp); static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp);
static void rcu_spawn_cpu_nocb_kthread(int cpu); static void rcu_spawn_cpu_nocb_kthread(int cpu);
static void __init rcu_spawn_nocb_kthreads(void); static void __init rcu_spawn_nocb_kthreads(void);
static void rcu_nocb_lock(struct rcu_data *rdp);
static void rcu_nocb_unlock(struct rcu_data *rdp);
static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
unsigned long flags);
#ifdef CONFIG_RCU_NOCB_CPU #ifdef CONFIG_RCU_NOCB_CPU
static void __init rcu_organize_nocb_kthreads(void); static void __init rcu_organize_nocb_kthreads(void);
#endif /* #ifdef CONFIG_RCU_NOCB_CPU */ #endif /* #ifdef CONFIG_RCU_NOCB_CPU */
......
...@@ -1494,6 +1494,45 @@ static int __init parse_rcu_nocb_poll(char *arg) ...@@ -1494,6 +1494,45 @@ static int __init parse_rcu_nocb_poll(char *arg)
} }
early_param("rcu_nocb_poll", parse_rcu_nocb_poll); early_param("rcu_nocb_poll", parse_rcu_nocb_poll);
/*
* Acquire the specified rcu_data structure's ->nocb_lock, but only
* if it corresponds to a no-CBs CPU.
*/
static void rcu_nocb_lock(struct rcu_data *rdp)
{
if (rcu_segcblist_is_offloaded(&rdp->cblist)) {
lockdep_assert_irqs_disabled();
raw_spin_lock(&rdp->nocb_lock);
}
}
/*
* Release the specified rcu_data structure's ->nocb_lock, but only
* if it corresponds to a no-CBs CPU.
*/
static void rcu_nocb_unlock(struct rcu_data *rdp)
{
if (rcu_segcblist_is_offloaded(&rdp->cblist)) {
lockdep_assert_irqs_disabled();
raw_spin_unlock(&rdp->nocb_lock);
}
}
/*
* Release the specified rcu_data structure's ->nocb_lock and restore
* interrupts, but only if it corresponds to a no-CBs CPU.
*/
static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
unsigned long flags)
{
if (rcu_segcblist_is_offloaded(&rdp->cblist)) {
lockdep_assert_irqs_disabled();
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
} else {
local_irq_restore(flags);
}
}
/* /*
* Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended
* grace period. * grace period.
...@@ -1526,7 +1565,7 @@ bool rcu_is_nocb_cpu(int cpu) ...@@ -1526,7 +1565,7 @@ bool rcu_is_nocb_cpu(int cpu)
* Kick the GP kthread for this NOCB group. Caller holds ->nocb_lock * Kick the GP kthread for this NOCB group. Caller holds ->nocb_lock
* and this function releases it. * and this function releases it.
*/ */
static void __wake_nocb_gp(struct rcu_data *rdp, bool force, static void wake_nocb_gp(struct rcu_data *rdp, bool force,
unsigned long flags) unsigned long flags)
__releases(rdp->nocb_lock) __releases(rdp->nocb_lock)
{ {
...@@ -1537,30 +1576,19 @@ static void __wake_nocb_gp(struct rcu_data *rdp, bool force, ...@@ -1537,30 +1576,19 @@ static void __wake_nocb_gp(struct rcu_data *rdp, bool force,
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
return; return;
} }
if (rdp_gp->nocb_gp_sleep || force) { if (READ_ONCE(rdp_gp->nocb_gp_sleep) || force) {
/* Prior smp_mb__after_atomic() orders against prior enqueue. */
WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
del_timer(&rdp->nocb_timer); del_timer(&rdp->nocb_timer);
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
smp_mb(); /* ->nocb_gp_sleep before swake_up_one(). */ smp_mb(); /* enqueue before ->nocb_gp_sleep. */
swake_up_one(&rdp_gp->nocb_gp_wq); raw_spin_lock_irqsave(&rdp_gp->nocb_lock, flags);
WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
raw_spin_unlock_irqrestore(&rdp_gp->nocb_lock, flags);
wake_up_process(rdp_gp->nocb_gp_kthread);
} else { } else {
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
} }
} }
/*
* Kick the GP kthread for this NOCB group, but caller has not
* acquired locks.
*/
static void wake_nocb_gp(struct rcu_data *rdp, bool force)
{
unsigned long flags;
raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
__wake_nocb_gp(rdp, force, flags);
}
/* /*
* Arrange to wake the GP kthread for this NOCB group at some future * Arrange to wake the GP kthread for this NOCB group at some future
* time when it is safe to do so. * time when it is safe to do so.
...@@ -1568,295 +1596,148 @@ static void wake_nocb_gp(struct rcu_data *rdp, bool force) ...@@ -1568,295 +1596,148 @@ static void wake_nocb_gp(struct rcu_data *rdp, bool force)
static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
const char *reason) const char *reason)
{ {
unsigned long flags;
raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT) if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT)
mod_timer(&rdp->nocb_timer, jiffies + 1); mod_timer(&rdp->nocb_timer, jiffies + 1);
WRITE_ONCE(rdp->nocb_defer_wakeup, waketype); WRITE_ONCE(rdp->nocb_defer_wakeup, waketype);
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
}
/* Does rcu_barrier need to queue an RCU callback on the specified CPU? */
static bool rcu_nocb_cpu_needs_barrier(int cpu)
{
struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
unsigned long ret;
#ifdef CONFIG_PROVE_RCU
struct rcu_head *rhp;
#endif /* #ifdef CONFIG_PROVE_RCU */
/*
* Check count of all no-CBs callbacks awaiting invocation.
* There needs to be a barrier before this function is called,
* but associated with a prior determination that no more
* callbacks would be posted. In the worst case, the first
* barrier in rcu_barrier() suffices (but the caller cannot
* necessarily rely on this, not a substitute for the caller
* getting the concurrency design right!). There must also be a
* barrier between the following load and posting of a callback
* (if a callback is in fact needed). This is associated with an
* atomic_inc() in the caller.
*/
ret = rcu_get_n_cbs_nocb_cpu(rdp);
#ifdef CONFIG_PROVE_RCU
rhp = READ_ONCE(rdp->nocb_head);
if (!rhp)
rhp = READ_ONCE(rdp->nocb_gp_head);
if (!rhp)
rhp = READ_ONCE(rdp->nocb_cb_head);
/* Having no rcuo kthread but CBs after scheduler starts is bad! */
if (!READ_ONCE(rdp->nocb_cb_kthread) && rhp &&
rcu_scheduler_fully_active) {
/* RCU callback enqueued before CPU first came online??? */
pr_err("RCU: Never-onlined no-CBs CPU %d has CB %p\n",
cpu, rhp->func);
WARN_ON_ONCE(1);
}
#endif /* #ifdef CONFIG_PROVE_RCU */
return !!ret;
} }
/* /*
* Enqueue the specified string of rcu_head structures onto the specified * Awaken the no-CBs grace-period kthead if needed, either due to it
* CPU's no-CBs lists. The CPU is specified by rdp, the head of the * legitimately being asleep or due to overload conditions.
* string by rhp, and the tail of the string by rhtp. The non-lazy/lazy
* counts are supplied by rhcount and rhcount_lazy.
* *
* If warranted, also wake up the kthread servicing this CPUs queues. * If warranted, also wake up the kthread servicing this CPUs queues.
*/ */
static void __call_rcu_nocb_enqueue(struct rcu_data *rdp, static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
struct rcu_head *rhp, unsigned long flags)
struct rcu_head **rhtp, __releases(rdp->nocb_lock)
int rhcount, int rhcount_lazy,
unsigned long flags)
{ {
int len; int len;
struct rcu_head **old_rhpp;
struct task_struct *t; struct task_struct *t;
/* Enqueue the callback on the nocb list and update counts. */ // If we are being polled or there is no kthread, just leave.
atomic_long_add(rhcount, &rdp->nocb_q_count);
/* rcu_barrier() relies on ->nocb_q_count add before xchg. */
old_rhpp = xchg(&rdp->nocb_tail, rhtp);
WRITE_ONCE(*old_rhpp, rhp);
atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy);
smp_mb__after_atomic(); /* Store *old_rhpp before _wake test. */
/* If we are not being polled and there is a kthread, awaken it ... */
t = READ_ONCE(rdp->nocb_gp_kthread); t = READ_ONCE(rdp->nocb_gp_kthread);
if (rcu_nocb_poll || !t) { if (rcu_nocb_poll || !t) {
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
TPS("WakeNotPoll")); TPS("WakeNotPoll"));
rcu_nocb_unlock_irqrestore(rdp, flags);
return; return;
} }
len = rcu_get_n_cbs_nocb_cpu(rdp); // Need to actually to a wakeup.
if (old_rhpp == &rdp->nocb_head) { len = rcu_segcblist_n_cbs(&rdp->cblist);
if (was_alldone) {
if (!irqs_disabled_flags(flags)) { if (!irqs_disabled_flags(flags)) {
/* ... if queue was empty ... */ /* ... if queue was empty ... */
wake_nocb_gp(rdp, false); wake_nocb_gp(rdp, false, flags);
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
TPS("WakeEmpty")); TPS("WakeEmpty"));
} else { } else {
wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE, wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
TPS("WakeEmptyIsDeferred")); TPS("WakeEmptyIsDeferred"));
rcu_nocb_unlock_irqrestore(rdp, flags);
} }
rdp->qlen_last_fqs_check = 0; rdp->qlen_last_fqs_check = 0;
} else if (len > rdp->qlen_last_fqs_check + qhimark) { } else if (len > rdp->qlen_last_fqs_check + qhimark) {
/* ... or if many callbacks queued. */ /* ... or if many callbacks queued. */
if (!irqs_disabled_flags(flags)) { if (!irqs_disabled_flags(flags)) {
wake_nocb_gp(rdp, true); wake_nocb_gp(rdp, true, flags);
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
TPS("WakeOvf")); TPS("WakeOvf"));
} else { } else {
wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE, wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
TPS("WakeOvfIsDeferred")); TPS("WakeOvfIsDeferred"));
rcu_nocb_unlock_irqrestore(rdp, flags);
} }
rdp->qlen_last_fqs_check = LONG_MAX / 2; rdp->qlen_last_fqs_check = LONG_MAX / 2;
} else { } else {
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot")); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
rcu_nocb_unlock_irqrestore(rdp, flags);
} }
if (!irqs_disabled_flags(flags))
lockdep_assert_irqs_enabled();
return; return;
} }
/* /*
* This is a helper for __call_rcu(), which invokes this when the normal * No-CBs GP kthreads come here to wait for additional callbacks to show up
* callback queue is inoperable. If this is not a no-CBs CPU, this * or for grace periods to end.
* function returns failure back to __call_rcu(), which can complain
* appropriately.
*
* Otherwise, this function queues the callback where the corresponding
* "rcuo" kthread can find it.
*/
static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
bool lazy, unsigned long flags)
{
if (!rcu_segcblist_is_offloaded(&rdp->cblist))
return false;
__call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy, flags);
if (__is_kfree_rcu_offset((unsigned long)rhp->func))
trace_rcu_kfree_callback(rcu_state.name, rhp,
(unsigned long)rhp->func,
-atomic_long_read(&rdp->nocb_q_count_lazy),
-rcu_get_n_cbs_nocb_cpu(rdp));
else
trace_rcu_callback(rcu_state.name, rhp,
-atomic_long_read(&rdp->nocb_q_count_lazy),
-rcu_get_n_cbs_nocb_cpu(rdp));
return true;
}
/*
* Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is
* not a no-CBs CPU.
*/
static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp,
struct rcu_data *rdp,
unsigned long flags)
{
lockdep_assert_irqs_disabled();
if (!rcu_segcblist_is_offloaded(&my_rdp->cblist))
return false; /* Not NOCBs CPU, caller must migrate CBs. */
__call_rcu_nocb_enqueue(my_rdp, rcu_segcblist_head(&rdp->cblist),
rcu_segcblist_tail(&rdp->cblist),
rcu_segcblist_n_cbs(&rdp->cblist),
rcu_segcblist_n_lazy_cbs(&rdp->cblist), flags);
rcu_segcblist_init(&rdp->cblist);
rcu_segcblist_disable(&rdp->cblist);
return true;
}
/*
* If necessary, kick off a new grace period, and either way wait
* for a subsequent grace period to complete.
*/
static void rcu_nocb_wait_gp(struct rcu_data *rdp)
{
unsigned long c;
bool d;
unsigned long flags;
bool needwake;
struct rcu_node *rnp = rdp->mynode;
local_irq_save(flags);
c = rcu_seq_snap(&rcu_state.gp_seq);
if (!rdp->gpwrap && ULONG_CMP_GE(rdp->gp_seq_needed, c)) {
local_irq_restore(flags);
} else {
raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */
needwake = rcu_start_this_gp(rnp, rdp, c);
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
if (needwake)
rcu_gp_kthread_wake();
}
/*
* Wait for the grace period. Do so interruptibly to avoid messing
* up the load average.
*/
trace_rcu_this_gp(rnp, rdp, c, TPS("StartWait"));
for (;;) {
swait_event_interruptible_exclusive(
rnp->nocb_gp_wq[rcu_seq_ctr(c) & 0x1],
(d = rcu_seq_done(&rnp->gp_seq, c)));
if (likely(d))
break;
WARN_ON(signal_pending(current));
trace_rcu_this_gp(rnp, rdp, c, TPS("ResumeWait"));
}
trace_rcu_this_gp(rnp, rdp, c, TPS("EndWait"));
smp_mb(); /* Ensure that CB invocation happens after GP end. */
}
/*
* No-CBs GP kthreads come here to wait for additional callbacks to show up.
* This function does not return until callbacks appear.
*/ */
static void nocb_gp_wait(struct rcu_data *my_rdp) static void nocb_gp_wait(struct rcu_data *my_rdp)
{ {
bool firsttime = true; int __maybe_unused cpu = my_rdp->cpu;
unsigned long cur_gp_seq;
unsigned long flags; unsigned long flags;
bool gotcbs; bool gotcbs;
bool needwait_gp = false;
bool needwake;
bool needwake_gp;
struct rcu_data *rdp; struct rcu_data *rdp;
struct rcu_head **tail; struct rcu_node *rnp;
unsigned long wait_gp_seq;
/* Wait for callbacks to appear. */
if (!rcu_nocb_poll) {
trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Sleep"));
swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq,
!READ_ONCE(my_rdp->nocb_gp_sleep));
raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
my_rdp->nocb_gp_sleep = true;
WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
del_timer(&my_rdp->nocb_timer);
raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
} else if (firsttime) {
firsttime = false; /* Don't drown trace log with "Poll"! */
trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Poll"));
}
/* /*
* Each pass through the following loop checks for CBs. * Each pass through the following loop checks for CBs and for the
* We are our own first CB kthread. Any CBs found are moved to * nearest grace period (if any) to wait for next. The CB kthreads
* nocb_gp_head, where they await a grace period. * and the global grace-period kthread are awakened if needed.
*/ */
gotcbs = false;
smp_mb(); /* wakeup and _sleep before ->nocb_head reads. */
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) { for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
rdp->nocb_gp_head = READ_ONCE(rdp->nocb_head); if (rcu_segcblist_empty(&rdp->cblist))
if (!rdp->nocb_gp_head) continue; /* No callbacks here, try next. */
continue; /* No CBs here, try next. */ rnp = rdp->mynode;
raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
/* Move callbacks to wait-for-GP list, which is empty. */ WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
WRITE_ONCE(rdp->nocb_head, NULL); del_timer(&my_rdp->nocb_timer);
rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head); raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */
gotcbs = true; needwake_gp = rcu_advance_cbs(rnp, rdp);
} raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
// Need to wait on some grace period?
/* No callbacks? Sleep a bit if polling, and go retry. */ if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) {
if (unlikely(!gotcbs)) { if (!needwait_gp ||
WARN_ON(signal_pending(current)); ULONG_CMP_LT(cur_gp_seq, wait_gp_seq))
if (rcu_nocb_poll) { wait_gp_seq = cur_gp_seq;
schedule_timeout_interruptible(1); needwait_gp = true;
} else {
trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu,
TPS("WokeEmpty"));
} }
return; if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
} needwake = rdp->nocb_cb_sleep;
WRITE_ONCE(rdp->nocb_cb_sleep, false);
/* Wait for one grace period. */ smp_mb(); /* CB invocation -after- GP end. */
rcu_nocb_wait_gp(my_rdp); } else {
needwake = false;
/* Each pass through this loop wakes a CB kthread, if needed. */
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
if (!rcu_nocb_poll &&
READ_ONCE(rdp->nocb_head) &&
READ_ONCE(my_rdp->nocb_gp_sleep)) {
raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
my_rdp->nocb_gp_sleep = false;/* No need to sleep.*/
raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
} }
if (!rdp->nocb_gp_head)
continue; /* No CBs, so no need to wake kthread. */
/* Append callbacks to CB kthread's "done" list. */
raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
tail = rdp->nocb_cb_tail;
rdp->nocb_cb_tail = rdp->nocb_gp_tail;
*tail = rdp->nocb_gp_head;
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
if (tail == &rdp->nocb_cb_head) { if (needwake) {
/* List was empty, so wake up the kthread. */
swake_up_one(&rdp->nocb_cb_wq); swake_up_one(&rdp->nocb_cb_wq);
gotcbs = true;
} }
if (needwake_gp)
rcu_gp_kthread_wake();
}
if (rcu_nocb_poll) {
/* Polling, so trace if first poll in the series. */
if (gotcbs)
trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll"));
schedule_timeout_interruptible(1);
} else if (!needwait_gp) {
/* Wait for callbacks to appear. */
trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep"));
swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq,
!READ_ONCE(my_rdp->nocb_gp_sleep));
} else {
rnp = my_rdp->mynode;
trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));
swait_event_interruptible_exclusive(
rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1],
rcu_seq_done(&rnp->gp_seq, wait_gp_seq) ||
!READ_ONCE(my_rdp->nocb_gp_sleep));
trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));
}
if (!rcu_nocb_poll) {
raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
WRITE_ONCE(my_rdp->nocb_gp_sleep, true);
raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
} }
WARN_ON(signal_pending(current));
} }
/* /*
...@@ -1871,92 +1752,69 @@ static int rcu_nocb_gp_kthread(void *arg) ...@@ -1871,92 +1752,69 @@ static int rcu_nocb_gp_kthread(void *arg)
{ {
struct rcu_data *rdp = arg; struct rcu_data *rdp = arg;
for (;;) for (;;) {
nocb_gp_wait(rdp); nocb_gp_wait(rdp);
cond_resched_tasks_rcu_qs();
}
return 0; return 0;
} }
/* /*
* No-CBs CB kthreads come here to wait for additional callbacks to show up. * Invoke any ready callbacks from the corresponding no-CBs CPU,
* This function returns true ("keep waiting") until callbacks appear and * then, if there are no more, wait for more to appear.
* then false ("stop waiting") when callbacks finally do appear.
*/ */
static bool nocb_cb_wait(struct rcu_data *rdp) static void nocb_cb_wait(struct rcu_data *rdp)
{ {
unsigned long flags;
bool needwake_gp = false;
struct rcu_node *rnp = rdp->mynode;
local_irq_save(flags);
rcu_momentary_dyntick_idle();
local_irq_restore(flags);
local_bh_disable();
rcu_do_batch(rdp);
local_bh_enable();
lockdep_assert_irqs_enabled();
raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */
needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
if (needwake_gp)
rcu_gp_kthread_wake();
return;
}
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep")); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
WRITE_ONCE(rdp->nocb_cb_sleep, true);
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
if (needwake_gp)
rcu_gp_kthread_wake();
swait_event_interruptible_exclusive(rdp->nocb_cb_wq, swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
READ_ONCE(rdp->nocb_cb_head)); !READ_ONCE(rdp->nocb_cb_sleep));
if (smp_load_acquire(&rdp->nocb_cb_head)) { /* VVV */ if (!smp_load_acquire(&rdp->nocb_cb_sleep)) { /* VVV */
/* ^^^ Ensure CB invocation follows _head test. */ /* ^^^ Ensure CB invocation follows _sleep test. */
return false; return;
} }
WARN_ON(signal_pending(current)); WARN_ON(signal_pending(current));
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty")); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
return true;
} }
/* /*
* Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes * Per-rcu_data kthread, but only for no-CBs CPUs. Repeatedly invoke
* callbacks queued by the corresponding no-CBs CPU, however, there is an * nocb_cb_wait() to do the dirty work.
* optional GP-CB relationship so that the grace-period kthreads don't
* have to do quite so many wakeups (as in they only need to wake the
* no-CBs GP kthreads, not the CB kthreads).
*/ */
static int rcu_nocb_cb_kthread(void *arg) static int rcu_nocb_cb_kthread(void *arg)
{ {
int c, cl;
unsigned long flags;
struct rcu_head *list;
struct rcu_head *next;
struct rcu_head **tail;
struct rcu_data *rdp = arg; struct rcu_data *rdp = arg;
/* Each pass through this loop invokes one batch of callbacks */ // Each pass through this loop does one callback batch, and,
// if there are no more ready callbacks, waits for them.
for (;;) { for (;;) {
/* Wait for callbacks. */ nocb_cb_wait(rdp);
while (nocb_cb_wait(rdp)) cond_resched_tasks_rcu_qs();
continue;
/* Pull the ready-to-invoke callbacks onto local list. */
raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
list = rdp->nocb_cb_head;
rdp->nocb_cb_head = NULL;
tail = rdp->nocb_cb_tail;
rdp->nocb_cb_tail = &rdp->nocb_cb_head;
raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
if (WARN_ON_ONCE(!list))
continue;
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeNonEmpty"));
/* Each pass through the following loop invokes a callback. */
trace_rcu_batch_start(rcu_state.name,
atomic_long_read(&rdp->nocb_q_count_lazy),
rcu_get_n_cbs_nocb_cpu(rdp), -1);
c = cl = 0;
while (list) {
next = list->next;
/* Wait for enqueuing to complete, if needed. */
while (next == NULL && &list->next != tail) {
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
TPS("WaitQueue"));
schedule_timeout_interruptible(1);
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
TPS("WokeQueue"));
next = list->next;
}
debug_rcu_head_unqueue(list);
local_bh_disable();
if (__rcu_reclaim(rcu_state.name, list))
cl++;
c++;
local_bh_enable();
cond_resched_tasks_rcu_qs();
list = next;
}
trace_rcu_batch_end(rcu_state.name, c, !!list, 0, 0, 1);
smp_mb__before_atomic(); /* _add after CB invocation. */
atomic_long_add(-c, &rdp->nocb_q_count);
atomic_long_add(-cl, &rdp->nocb_q_count_lazy);
} }
return 0; return 0;
} }
...@@ -1980,7 +1838,7 @@ static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp) ...@@ -1980,7 +1838,7 @@ static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp)
} }
ndw = READ_ONCE(rdp->nocb_defer_wakeup); ndw = READ_ONCE(rdp->nocb_defer_wakeup);
WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
__wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags); wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake")); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
} }
...@@ -2194,10 +2052,21 @@ static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp) ...@@ -2194,10 +2052,21 @@ static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp)
#else /* #ifdef CONFIG_RCU_NOCB_CPU */ #else /* #ifdef CONFIG_RCU_NOCB_CPU */
static bool rcu_nocb_cpu_needs_barrier(int cpu) /* No ->nocb_lock to acquire. */
static void rcu_nocb_lock(struct rcu_data *rdp)
{ {
WARN_ON_ONCE(1); /* Should be dead code. */ }
return false;
/* No ->nocb_lock to release. */
static void rcu_nocb_unlock(struct rcu_data *rdp)
{
}
/* No ->nocb_lock to release. */
static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
unsigned long flags)
{
local_irq_restore(flags);
} }
static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq) static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
...@@ -2213,17 +2082,10 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) ...@@ -2213,17 +2082,10 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
{ {
} }
static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
bool lazy, unsigned long flags) unsigned long flags)
{ {
return false; WARN_ON_ONCE(1); /* Should be dead code! */
}
static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp,
struct rcu_data *rdp,
unsigned long flags)
{
return false;
} }
static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp) static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment