Commit e4bf7919 authored by Kent Overstreet's avatar Kent Overstreet Committed by Jens Axboe

bcache: Fix, improve efficiency of closure_sync()

Eliminates cases where sync can race and fail to complete / get stuck.
Removes many status flags and simplifies entering-and-exiting closure
sleeping behaviors.

[mlyle: fixed conflicts due to changed return behavior in mainline.
extended commit comment, and squashed down two commits that were mostly
contradictory to get to this state.  Changed __set_current_state to
set_current_state per Jens review comment]
Signed-off-by: default avatarKent Overstreet <kent.overstreet@gmail.com>
Signed-off-by: default avatarMichael Lyle <mlyle@lyle.org>
Reviewed-by: default avatarMichael Lyle <mlyle@lyle.org>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent b1092c9a
...@@ -18,10 +18,6 @@ static inline void closure_put_after_sub(struct closure *cl, int flags) ...@@ -18,10 +18,6 @@ static inline void closure_put_after_sub(struct closure *cl, int flags)
BUG_ON(flags & CLOSURE_GUARD_MASK); BUG_ON(flags & CLOSURE_GUARD_MASK);
BUG_ON(!r && (flags & ~CLOSURE_DESTRUCTOR)); BUG_ON(!r && (flags & ~CLOSURE_DESTRUCTOR));
/* Must deliver precisely one wakeup */
if (r == 1 && (flags & CLOSURE_SLEEPING))
wake_up_process(cl->task);
if (!r) { if (!r) {
if (cl->fn && !(flags & CLOSURE_DESTRUCTOR)) { if (cl->fn && !(flags & CLOSURE_DESTRUCTOR)) {
atomic_set(&cl->remaining, atomic_set(&cl->remaining,
...@@ -100,28 +96,34 @@ bool closure_wait(struct closure_waitlist *waitlist, struct closure *cl) ...@@ -100,28 +96,34 @@ bool closure_wait(struct closure_waitlist *waitlist, struct closure *cl)
} }
EXPORT_SYMBOL(closure_wait); EXPORT_SYMBOL(closure_wait);
/** struct closure_syncer {
* closure_sync - sleep until a closure has nothing left to wait on struct task_struct *task;
* int done;
* Sleeps until the refcount hits 1 - the thread that's running the closure owns };
* the last refcount.
*/ static void closure_sync_fn(struct closure *cl)
void closure_sync(struct closure *cl)
{ {
while (1) { cl->s->done = 1;
__closure_start_sleep(cl); wake_up_process(cl->s->task);
closure_set_ret_ip(cl); }
if ((atomic_read(&cl->remaining) & void __closure_sync(struct closure *cl)
CLOSURE_REMAINING_MASK) == 1) {
break; struct closure_syncer s = { .task = current };
cl->s = &s;
continue_at(cl, closure_sync_fn, NULL);
while (1) {
set_current_state(TASK_UNINTERRUPTIBLE);
if (s.done)
break;
schedule(); schedule();
} }
__closure_end_sleep(cl); __set_current_state(TASK_RUNNING);
} }
EXPORT_SYMBOL(closure_sync); EXPORT_SYMBOL(__closure_sync);
#ifdef CONFIG_BCACHE_CLOSURES_DEBUG #ifdef CONFIG_BCACHE_CLOSURES_DEBUG
...@@ -168,12 +170,10 @@ static int debug_seq_show(struct seq_file *f, void *data) ...@@ -168,12 +170,10 @@ static int debug_seq_show(struct seq_file *f, void *data)
cl, (void *) cl->ip, cl->fn, cl->parent, cl, (void *) cl->ip, cl->fn, cl->parent,
r & CLOSURE_REMAINING_MASK); r & CLOSURE_REMAINING_MASK);
seq_printf(f, "%s%s%s%s\n", seq_printf(f, "%s%s\n",
test_bit(WORK_STRUCT_PENDING_BIT, test_bit(WORK_STRUCT_PENDING_BIT,
work_data_bits(&cl->work)) ? "Q" : "", work_data_bits(&cl->work)) ? "Q" : "",
r & CLOSURE_RUNNING ? "R" : "", r & CLOSURE_RUNNING ? "R" : "");
r & CLOSURE_STACK ? "S" : "",
r & CLOSURE_SLEEPING ? "Sl" : "");
if (r & CLOSURE_WAITING) if (r & CLOSURE_WAITING)
seq_printf(f, " W %pF\n", seq_printf(f, " W %pF\n",
......
...@@ -103,6 +103,7 @@ ...@@ -103,6 +103,7 @@
*/ */
struct closure; struct closure;
struct closure_syncer;
typedef void (closure_fn) (struct closure *); typedef void (closure_fn) (struct closure *);
struct closure_waitlist { struct closure_waitlist {
...@@ -115,10 +116,6 @@ enum closure_state { ...@@ -115,10 +116,6 @@ enum closure_state {
* the thread that owns the closure, and cleared by the thread that's * the thread that owns the closure, and cleared by the thread that's
* waking up the closure. * waking up the closure.
* *
* CLOSURE_SLEEPING: Must be set before a thread uses a closure to sleep
* - indicates that cl->task is valid and closure_put() may wake it up.
* Only set or cleared by the thread that owns the closure.
*
* The rest are for debugging and don't affect behaviour: * The rest are for debugging and don't affect behaviour:
* *
* CLOSURE_RUNNING: Set when a closure is running (i.e. by * CLOSURE_RUNNING: Set when a closure is running (i.e. by
...@@ -128,22 +125,16 @@ enum closure_state { ...@@ -128,22 +125,16 @@ enum closure_state {
* continue_at() and closure_return() clear it for you, if you're doing * continue_at() and closure_return() clear it for you, if you're doing
* something unusual you can use closure_set_dead() which also helps * something unusual you can use closure_set_dead() which also helps
* annotate where references are being transferred. * annotate where references are being transferred.
*
* CLOSURE_STACK: Sanity check - remaining should never hit 0 on a
* closure with this flag set
*/ */
CLOSURE_BITS_START = (1 << 23), CLOSURE_BITS_START = (1U << 27),
CLOSURE_DESTRUCTOR = (1 << 23), CLOSURE_DESTRUCTOR = (1U << 27),
CLOSURE_WAITING = (1 << 25), CLOSURE_WAITING = (1U << 29),
CLOSURE_SLEEPING = (1 << 27), CLOSURE_RUNNING = (1U << 31),
CLOSURE_RUNNING = (1 << 29),
CLOSURE_STACK = (1 << 31),
}; };
#define CLOSURE_GUARD_MASK \ #define CLOSURE_GUARD_MASK \
((CLOSURE_DESTRUCTOR|CLOSURE_WAITING|CLOSURE_SLEEPING| \ ((CLOSURE_DESTRUCTOR|CLOSURE_WAITING|CLOSURE_RUNNING) << 1)
CLOSURE_RUNNING|CLOSURE_STACK) << 1)
#define CLOSURE_REMAINING_MASK (CLOSURE_BITS_START - 1) #define CLOSURE_REMAINING_MASK (CLOSURE_BITS_START - 1)
#define CLOSURE_REMAINING_INITIALIZER (1|CLOSURE_RUNNING) #define CLOSURE_REMAINING_INITIALIZER (1|CLOSURE_RUNNING)
...@@ -152,7 +143,7 @@ struct closure { ...@@ -152,7 +143,7 @@ struct closure {
union { union {
struct { struct {
struct workqueue_struct *wq; struct workqueue_struct *wq;
struct task_struct *task; struct closure_syncer *s;
struct llist_node list; struct llist_node list;
closure_fn *fn; closure_fn *fn;
}; };
...@@ -178,7 +169,19 @@ void closure_sub(struct closure *cl, int v); ...@@ -178,7 +169,19 @@ void closure_sub(struct closure *cl, int v);
void closure_put(struct closure *cl); void closure_put(struct closure *cl);
void __closure_wake_up(struct closure_waitlist *list); void __closure_wake_up(struct closure_waitlist *list);
bool closure_wait(struct closure_waitlist *list, struct closure *cl); bool closure_wait(struct closure_waitlist *list, struct closure *cl);
void closure_sync(struct closure *cl); void __closure_sync(struct closure *cl);
/**
* closure_sync - sleep until a closure a closure has nothing left to wait on
*
* Sleeps until the refcount hits 1 - the thread that's running the closure owns
* the last refcount.
*/
static inline void closure_sync(struct closure *cl)
{
if ((atomic_read(&cl->remaining) & CLOSURE_REMAINING_MASK) != 1)
__closure_sync(cl);
}
#ifdef CONFIG_BCACHE_CLOSURES_DEBUG #ifdef CONFIG_BCACHE_CLOSURES_DEBUG
...@@ -215,24 +218,6 @@ static inline void closure_set_waiting(struct closure *cl, unsigned long f) ...@@ -215,24 +218,6 @@ static inline void closure_set_waiting(struct closure *cl, unsigned long f)
#endif #endif
} }
static inline void __closure_end_sleep(struct closure *cl)
{
__set_current_state(TASK_RUNNING);
if (atomic_read(&cl->remaining) & CLOSURE_SLEEPING)
atomic_sub(CLOSURE_SLEEPING, &cl->remaining);
}
static inline void __closure_start_sleep(struct closure *cl)
{
closure_set_ip(cl);
cl->task = current;
set_current_state(TASK_UNINTERRUPTIBLE);
if (!(atomic_read(&cl->remaining) & CLOSURE_SLEEPING))
atomic_add(CLOSURE_SLEEPING, &cl->remaining);
}
static inline void closure_set_stopped(struct closure *cl) static inline void closure_set_stopped(struct closure *cl)
{ {
atomic_sub(CLOSURE_RUNNING, &cl->remaining); atomic_sub(CLOSURE_RUNNING, &cl->remaining);
...@@ -241,7 +226,6 @@ static inline void closure_set_stopped(struct closure *cl) ...@@ -241,7 +226,6 @@ static inline void closure_set_stopped(struct closure *cl)
static inline void set_closure_fn(struct closure *cl, closure_fn *fn, static inline void set_closure_fn(struct closure *cl, closure_fn *fn,
struct workqueue_struct *wq) struct workqueue_struct *wq)
{ {
BUG_ON(object_is_on_stack(cl));
closure_set_ip(cl); closure_set_ip(cl);
cl->fn = fn; cl->fn = fn;
cl->wq = wq; cl->wq = wq;
...@@ -300,7 +284,7 @@ static inline void closure_init(struct closure *cl, struct closure *parent) ...@@ -300,7 +284,7 @@ static inline void closure_init(struct closure *cl, struct closure *parent)
static inline void closure_init_stack(struct closure *cl) static inline void closure_init_stack(struct closure *cl)
{ {
memset(cl, 0, sizeof(struct closure)); memset(cl, 0, sizeof(struct closure));
atomic_set(&cl->remaining, CLOSURE_REMAINING_INITIALIZER|CLOSURE_STACK); atomic_set(&cl->remaining, CLOSURE_REMAINING_INITIALIZER);
} }
/** /**
...@@ -322,6 +306,8 @@ static inline void closure_wake_up(struct closure_waitlist *list) ...@@ -322,6 +306,8 @@ static inline void closure_wake_up(struct closure_waitlist *list)
* This is because after calling continue_at() you no longer have a ref on @cl, * This is because after calling continue_at() you no longer have a ref on @cl,
* and whatever @cl owns may be freed out from under you - a running closure fn * and whatever @cl owns may be freed out from under you - a running closure fn
* has a ref on its own closure which continue_at() drops. * has a ref on its own closure which continue_at() drops.
*
* Note you are expected to immediately return after using this macro.
*/ */
#define continue_at(_cl, _fn, _wq) \ #define continue_at(_cl, _fn, _wq) \
do { \ do { \
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment