Commit 1331e7a1 authored by Paul E. McKenney's avatar Paul E. McKenney Committed by Paul E. McKenney

rcu: Remove _rcu_barrier() dependency on __stop_machine()

Currently, _rcu_barrier() relies on preempt_disable() to prevent
any CPU from going offline, which in turn depends on CPU hotplug's
use of __stop_machine().

This patch therefore makes _rcu_barrier() use get_online_cpus() to
block CPU-hotplug operations.  This has the added benefit of removing
the need for _rcu_barrier() to adopt callbacks:  Because CPU-hotplug
operations are excluded, there can be no callbacks to adopt.  This
commit simplifies the code accordingly.
Signed-off-by: default avatarPaul E. McKenney <paul.mckenney@linaro.org>
Signed-off-by: default avatarPaul E. McKenney <paulmck@linux.vnet.ibm.com>
Reviewed-by: default avatarJosh Triplett <josh@joshtriplett.org>
parent a10d206e
...@@ -1392,17 +1392,6 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp) ...@@ -1392,17 +1392,6 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
int i; int i;
struct rcu_data *rdp = __this_cpu_ptr(rsp->rda); struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
/*
* If there is an rcu_barrier() operation in progress, then
* only the task doing that operation is permitted to adopt
* callbacks. To do otherwise breaks rcu_barrier() and friends
* by causing them to fail to wait for the callbacks in the
* orphanage.
*/
if (rsp->rcu_barrier_in_progress &&
rsp->rcu_barrier_in_progress != current)
return;
/* Do the accounting first. */ /* Do the accounting first. */
rdp->qlen_lazy += rsp->qlen_lazy; rdp->qlen_lazy += rsp->qlen_lazy;
rdp->qlen += rsp->qlen; rdp->qlen += rsp->qlen;
...@@ -1457,9 +1446,8 @@ static void rcu_cleanup_dying_cpu(struct rcu_state *rsp) ...@@ -1457,9 +1446,8 @@ static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
* The CPU has been completely removed, and some other CPU is reporting * The CPU has been completely removed, and some other CPU is reporting
* this fact from process context. Do the remainder of the cleanup, * this fact from process context. Do the remainder of the cleanup,
* including orphaning the outgoing CPU's RCU callbacks, and also * including orphaning the outgoing CPU's RCU callbacks, and also
* adopting them, if there is no _rcu_barrier() instance running. * adopting them. There can only be one CPU hotplug operation at a time,
* There can only be one CPU hotplug operation at a time, so no other * so no other CPU can be attempting to update rcu_cpu_kthread_task.
* CPU can be attempting to update rcu_cpu_kthread_task.
*/ */
static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp) static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
{ {
...@@ -1521,10 +1509,6 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp) ...@@ -1521,10 +1509,6 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
#else /* #ifdef CONFIG_HOTPLUG_CPU */ #else /* #ifdef CONFIG_HOTPLUG_CPU */
static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
{
}
static void rcu_cleanup_dying_cpu(struct rcu_state *rsp) static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
{ {
} }
...@@ -2328,13 +2312,10 @@ static void rcu_barrier_func(void *type) ...@@ -2328,13 +2312,10 @@ static void rcu_barrier_func(void *type)
static void _rcu_barrier(struct rcu_state *rsp) static void _rcu_barrier(struct rcu_state *rsp)
{ {
int cpu; int cpu;
unsigned long flags;
struct rcu_data *rdp; struct rcu_data *rdp;
struct rcu_data rd;
unsigned long snap = ACCESS_ONCE(rsp->n_barrier_done); unsigned long snap = ACCESS_ONCE(rsp->n_barrier_done);
unsigned long snap_done; unsigned long snap_done;
init_rcu_head_on_stack(&rd.barrier_head);
_rcu_barrier_trace(rsp, "Begin", -1, snap); _rcu_barrier_trace(rsp, "Begin", -1, snap);
/* Take mutex to serialize concurrent rcu_barrier() requests. */ /* Take mutex to serialize concurrent rcu_barrier() requests. */
...@@ -2374,70 +2355,30 @@ static void _rcu_barrier(struct rcu_state *rsp) ...@@ -2374,70 +2355,30 @@ static void _rcu_barrier(struct rcu_state *rsp)
/* /*
* Initialize the count to one rather than to zero in order to * Initialize the count to one rather than to zero in order to
* avoid a too-soon return to zero in case of a short grace period * avoid a too-soon return to zero in case of a short grace period
* (or preemption of this task). Also flag this task as doing * (or preemption of this task). Exclude CPU-hotplug operations
* an rcu_barrier(). This will prevent anyone else from adopting * to ensure that no offline CPU has callbacks queued.
* orphaned callbacks, which could cause otherwise failure if a
* CPU went offline and quickly came back online. To see this,
* consider the following sequence of events:
*
* 1. We cause CPU 0 to post an rcu_barrier_callback() callback.
* 2. CPU 1 goes offline, orphaning its callbacks.
* 3. CPU 0 adopts CPU 1's orphaned callbacks.
* 4. CPU 1 comes back online.
* 5. We cause CPU 1 to post an rcu_barrier_callback() callback.
* 6. Both rcu_barrier_callback() callbacks are invoked, awakening
* us -- but before CPU 1's orphaned callbacks are invoked!!!
*/ */
init_completion(&rsp->barrier_completion); init_completion(&rsp->barrier_completion);
atomic_set(&rsp->barrier_cpu_count, 1); atomic_set(&rsp->barrier_cpu_count, 1);
raw_spin_lock_irqsave(&rsp->onofflock, flags); get_online_cpus();
rsp->rcu_barrier_in_progress = current;
raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
/* /*
* Force every CPU with callbacks to register a new callback * Force each CPU with callbacks to register a new callback.
* that will tell us when all the preceding callbacks have * When that callback is invoked, we will know that all of the
* been invoked. If an offline CPU has callbacks, wait for * corresponding CPU's preceding callbacks have been invoked.
* it to either come back online or to finish orphaning those
* callbacks.
*/ */
for_each_possible_cpu(cpu) { for_each_online_cpu(cpu) {
preempt_disable();
rdp = per_cpu_ptr(rsp->rda, cpu); rdp = per_cpu_ptr(rsp->rda, cpu);
if (cpu_is_offline(cpu)) { if (ACCESS_ONCE(rdp->qlen)) {
_rcu_barrier_trace(rsp, "Offline", cpu,
rsp->n_barrier_done);
preempt_enable();
while (cpu_is_offline(cpu) && ACCESS_ONCE(rdp->qlen))
schedule_timeout_interruptible(1);
} else if (ACCESS_ONCE(rdp->qlen)) {
_rcu_barrier_trace(rsp, "OnlineQ", cpu, _rcu_barrier_trace(rsp, "OnlineQ", cpu,
rsp->n_barrier_done); rsp->n_barrier_done);
smp_call_function_single(cpu, rcu_barrier_func, rsp, 1); smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
preempt_enable();
} else { } else {
_rcu_barrier_trace(rsp, "OnlineNQ", cpu, _rcu_barrier_trace(rsp, "OnlineNQ", cpu,
rsp->n_barrier_done); rsp->n_barrier_done);
preempt_enable();
} }
} }
put_online_cpus();
/*
* Now that all online CPUs have rcu_barrier_callback() callbacks
* posted, we can adopt all of the orphaned callbacks and place
* an rcu_barrier_callback() callback after them. When that is done,
* we are guaranteed to have an rcu_barrier_callback() callback
* following every callback that could possibly have been
* registered before _rcu_barrier() was called.
*/
raw_spin_lock_irqsave(&rsp->onofflock, flags);
rcu_adopt_orphan_cbs(rsp);
rsp->rcu_barrier_in_progress = NULL;
raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
atomic_inc(&rsp->barrier_cpu_count);
smp_mb__after_atomic_inc(); /* Ensure atomic_inc() before callback. */
rd.rsp = rsp;
rsp->call(&rd.barrier_head, rcu_barrier_callback);
/* /*
* Now that we have an rcu_barrier_callback() callback on each * Now that we have an rcu_barrier_callback() callback on each
...@@ -2458,8 +2399,6 @@ static void _rcu_barrier(struct rcu_state *rsp) ...@@ -2458,8 +2399,6 @@ static void _rcu_barrier(struct rcu_state *rsp)
/* Other rcu_barrier() invocations can now safely proceed. */ /* Other rcu_barrier() invocations can now safely proceed. */
mutex_unlock(&rsp->barrier_mutex); mutex_unlock(&rsp->barrier_mutex);
destroy_rcu_head_on_stack(&rd.barrier_head);
} }
/** /**
......
...@@ -398,9 +398,6 @@ struct rcu_state { ...@@ -398,9 +398,6 @@ struct rcu_state {
struct rcu_head **orphan_donetail; /* Tail of above. */ struct rcu_head **orphan_donetail; /* Tail of above. */
long qlen_lazy; /* Number of lazy callbacks. */ long qlen_lazy; /* Number of lazy callbacks. */
long qlen; /* Total number of callbacks. */ long qlen; /* Total number of callbacks. */
struct task_struct *rcu_barrier_in_progress;
/* Task doing rcu_barrier(), */
/* or NULL if no barrier. */
struct mutex barrier_mutex; /* Guards barrier fields. */ struct mutex barrier_mutex; /* Guards barrier fields. */
atomic_t barrier_cpu_count; /* # CPUs waiting on. */ atomic_t barrier_cpu_count; /* # CPUs waiting on. */
struct completion barrier_completion; /* Wake at barrier end. */ struct completion barrier_completion; /* Wake at barrier end. */
......
...@@ -51,8 +51,8 @@ static int show_rcubarrier(struct seq_file *m, void *unused) ...@@ -51,8 +51,8 @@ static int show_rcubarrier(struct seq_file *m, void *unused)
struct rcu_state *rsp; struct rcu_state *rsp;
for_each_rcu_flavor(rsp) for_each_rcu_flavor(rsp)
seq_printf(m, "%s: %c bcc: %d nbd: %lu\n", seq_printf(m, "%s: bcc: %d nbd: %lu\n",
rsp->name, rsp->rcu_barrier_in_progress ? 'B' : '.', rsp->name,
atomic_read(&rsp->barrier_cpu_count), atomic_read(&rsp->barrier_cpu_count),
rsp->n_barrier_done); rsp->n_barrier_done);
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment