Commit 0dc3b84a authored by Josef Bacik's avatar Josef Bacik

Btrfs: fix num_workers_starting bug and other bugs in async thread

Al pointed out we have some random problems with the way we account for
num_workers_starting in the async thread stuff.  First of all we need to make
sure to decrement num_workers_starting if we fail to start the worker, so make
__btrfs_start_workers do this.  Also fix __btrfs_start_workers so that it
doesn't call btrfs_stop_workers(), there is no point in stopping everybody if we
failed to create a worker.  Also check_pending_worker_creates needs to call
__btrfs_start_work in it's work function since it already increments
num_workers_starting.

People only start one worker at a time, so get rid of the num_workers argument
everywhere, and make btrfs_queue_worker a void since it will always succeed.
Thanks,
Signed-off-by: default avatarJosef Bacik <josef@redhat.com>
parent 5dbc8fca
...@@ -64,6 +64,8 @@ struct btrfs_worker_thread { ...@@ -64,6 +64,8 @@ struct btrfs_worker_thread {
int idle; int idle;
}; };
static int __btrfs_start_workers(struct btrfs_workers *workers);
/* /*
* btrfs_start_workers uses kthread_run, which can block waiting for memory * btrfs_start_workers uses kthread_run, which can block waiting for memory
* for a very long time. It will actually throttle on page writeback, * for a very long time. It will actually throttle on page writeback,
...@@ -88,27 +90,10 @@ static void start_new_worker_func(struct btrfs_work *work) ...@@ -88,27 +90,10 @@ static void start_new_worker_func(struct btrfs_work *work)
{ {
struct worker_start *start; struct worker_start *start;
start = container_of(work, struct worker_start, work); start = container_of(work, struct worker_start, work);
btrfs_start_workers(start->queue, 1); __btrfs_start_workers(start->queue);
kfree(start); kfree(start);
} }
static int start_new_worker(struct btrfs_workers *queue)
{
struct worker_start *start;
int ret;
start = kzalloc(sizeof(*start), GFP_NOFS);
if (!start)
return -ENOMEM;
start->work.func = start_new_worker_func;
start->queue = queue;
ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work);
if (ret)
kfree(start);
return ret;
}
/* /*
* helper function to move a thread onto the idle list after it * helper function to move a thread onto the idle list after it
* has finished some requests. * has finished some requests.
...@@ -153,12 +138,20 @@ static void check_busy_worker(struct btrfs_worker_thread *worker) ...@@ -153,12 +138,20 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
static void check_pending_worker_creates(struct btrfs_worker_thread *worker) static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
{ {
struct btrfs_workers *workers = worker->workers; struct btrfs_workers *workers = worker->workers;
struct worker_start *start;
unsigned long flags; unsigned long flags;
rmb(); rmb();
if (!workers->atomic_start_pending) if (!workers->atomic_start_pending)
return; return;
start = kzalloc(sizeof(*start), GFP_NOFS);
if (!start)
return;
start->work.func = start_new_worker_func;
start->queue = workers;
spin_lock_irqsave(&workers->lock, flags); spin_lock_irqsave(&workers->lock, flags);
if (!workers->atomic_start_pending) if (!workers->atomic_start_pending)
goto out; goto out;
...@@ -170,10 +163,11 @@ static void check_pending_worker_creates(struct btrfs_worker_thread *worker) ...@@ -170,10 +163,11 @@ static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
workers->num_workers_starting += 1; workers->num_workers_starting += 1;
spin_unlock_irqrestore(&workers->lock, flags); spin_unlock_irqrestore(&workers->lock, flags);
start_new_worker(workers); btrfs_queue_worker(workers->atomic_worker_start, &start->work);
return; return;
out: out:
kfree(start);
spin_unlock_irqrestore(&workers->lock, flags); spin_unlock_irqrestore(&workers->lock, flags);
} }
...@@ -462,56 +456,55 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max, ...@@ -462,56 +456,55 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
* starts new worker threads. This does not enforce the max worker * starts new worker threads. This does not enforce the max worker
* count in case you need to temporarily go past it. * count in case you need to temporarily go past it.
*/ */
static int __btrfs_start_workers(struct btrfs_workers *workers, static int __btrfs_start_workers(struct btrfs_workers *workers)
int num_workers)
{ {
struct btrfs_worker_thread *worker; struct btrfs_worker_thread *worker;
int ret = 0; int ret = 0;
int i;
for (i = 0; i < num_workers; i++) { worker = kzalloc(sizeof(*worker), GFP_NOFS);
worker = kzalloc(sizeof(*worker), GFP_NOFS); if (!worker) {
if (!worker) { ret = -ENOMEM;
ret = -ENOMEM; goto fail;
goto fail; }
}
INIT_LIST_HEAD(&worker->pending); INIT_LIST_HEAD(&worker->pending);
INIT_LIST_HEAD(&worker->prio_pending); INIT_LIST_HEAD(&worker->prio_pending);
INIT_LIST_HEAD(&worker->worker_list); INIT_LIST_HEAD(&worker->worker_list);
spin_lock_init(&worker->lock); spin_lock_init(&worker->lock);
atomic_set(&worker->num_pending, 0); atomic_set(&worker->num_pending, 0);
atomic_set(&worker->refs, 1); atomic_set(&worker->refs, 1);
worker->workers = workers; worker->workers = workers;
worker->task = kthread_run(worker_loop, worker, worker->task = kthread_run(worker_loop, worker,
"btrfs-%s-%d", workers->name, "btrfs-%s-%d", workers->name,
workers->num_workers + i); workers->num_workers + 1);
if (IS_ERR(worker->task)) { if (IS_ERR(worker->task)) {
ret = PTR_ERR(worker->task); ret = PTR_ERR(worker->task);
kfree(worker); kfree(worker);
goto fail; goto fail;
}
spin_lock_irq(&workers->lock);
list_add_tail(&worker->worker_list, &workers->idle_list);
worker->idle = 1;
workers->num_workers++;
workers->num_workers_starting--;
WARN_ON(workers->num_workers_starting < 0);
spin_unlock_irq(&workers->lock);
} }
spin_lock_irq(&workers->lock);
list_add_tail(&worker->worker_list, &workers->idle_list);
worker->idle = 1;
workers->num_workers++;
workers->num_workers_starting--;
WARN_ON(workers->num_workers_starting < 0);
spin_unlock_irq(&workers->lock);
return 0; return 0;
fail: fail:
btrfs_stop_workers(workers); spin_lock_irq(&workers->lock);
workers->num_workers_starting--;
spin_unlock_irq(&workers->lock);
return ret; return ret;
} }
int btrfs_start_workers(struct btrfs_workers *workers, int num_workers) int btrfs_start_workers(struct btrfs_workers *workers)
{ {
spin_lock_irq(&workers->lock); spin_lock_irq(&workers->lock);
workers->num_workers_starting += num_workers; workers->num_workers_starting++;
spin_unlock_irq(&workers->lock); spin_unlock_irq(&workers->lock);
return __btrfs_start_workers(workers, num_workers); return __btrfs_start_workers(workers);
} }
/* /*
...@@ -568,6 +561,7 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) ...@@ -568,6 +561,7 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
struct btrfs_worker_thread *worker; struct btrfs_worker_thread *worker;
unsigned long flags; unsigned long flags;
struct list_head *fallback; struct list_head *fallback;
int ret;
again: again:
spin_lock_irqsave(&workers->lock, flags); spin_lock_irqsave(&workers->lock, flags);
...@@ -584,7 +578,9 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) ...@@ -584,7 +578,9 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
workers->num_workers_starting++; workers->num_workers_starting++;
spin_unlock_irqrestore(&workers->lock, flags); spin_unlock_irqrestore(&workers->lock, flags);
/* we're below the limit, start another worker */ /* we're below the limit, start another worker */
__btrfs_start_workers(workers, 1); ret = __btrfs_start_workers(workers);
if (ret)
goto fallback;
goto again; goto again;
} }
} }
...@@ -665,7 +661,7 @@ void btrfs_set_work_high_prio(struct btrfs_work *work) ...@@ -665,7 +661,7 @@ void btrfs_set_work_high_prio(struct btrfs_work *work)
/* /*
* places a struct btrfs_work into the pending queue of one of the kthreads * places a struct btrfs_work into the pending queue of one of the kthreads
*/ */
int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
{ {
struct btrfs_worker_thread *worker; struct btrfs_worker_thread *worker;
unsigned long flags; unsigned long flags;
...@@ -673,7 +669,7 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) ...@@ -673,7 +669,7 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
/* don't requeue something already on a list */ /* don't requeue something already on a list */
if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
goto out; return;
worker = find_worker(workers); worker = find_worker(workers);
if (workers->ordered) { if (workers->ordered) {
...@@ -712,7 +708,4 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) ...@@ -712,7 +708,4 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
if (wake) if (wake)
wake_up_process(worker->task); wake_up_process(worker->task);
spin_unlock_irqrestore(&worker->lock, flags); spin_unlock_irqrestore(&worker->lock, flags);
out:
return 0;
} }
...@@ -109,8 +109,8 @@ struct btrfs_workers { ...@@ -109,8 +109,8 @@ struct btrfs_workers {
char *name; char *name;
}; };
int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work); void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work);
int btrfs_start_workers(struct btrfs_workers *workers, int num_workers); int btrfs_start_workers(struct btrfs_workers *workers);
int btrfs_stop_workers(struct btrfs_workers *workers); int btrfs_stop_workers(struct btrfs_workers *workers);
void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max, void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
struct btrfs_workers *async_starter); struct btrfs_workers *async_starter);
......
...@@ -2194,19 +2194,27 @@ struct btrfs_root *open_ctree(struct super_block *sb, ...@@ -2194,19 +2194,27 @@ struct btrfs_root *open_ctree(struct super_block *sb,
fs_info->endio_meta_write_workers.idle_thresh = 2; fs_info->endio_meta_write_workers.idle_thresh = 2;
fs_info->readahead_workers.idle_thresh = 2; fs_info->readahead_workers.idle_thresh = 2;
btrfs_start_workers(&fs_info->workers, 1); /*
btrfs_start_workers(&fs_info->generic_worker, 1); * btrfs_start_workers can really only fail because of ENOMEM so just
btrfs_start_workers(&fs_info->submit_workers, 1); * return -ENOMEM if any of these fail.
btrfs_start_workers(&fs_info->delalloc_workers, 1); */
btrfs_start_workers(&fs_info->fixup_workers, 1); ret = btrfs_start_workers(&fs_info->workers);
btrfs_start_workers(&fs_info->endio_workers, 1); ret |= btrfs_start_workers(&fs_info->generic_worker);
btrfs_start_workers(&fs_info->endio_meta_workers, 1); ret |= btrfs_start_workers(&fs_info->submit_workers);
btrfs_start_workers(&fs_info->endio_meta_write_workers, 1); ret |= btrfs_start_workers(&fs_info->delalloc_workers);
btrfs_start_workers(&fs_info->endio_write_workers, 1); ret |= btrfs_start_workers(&fs_info->fixup_workers);
btrfs_start_workers(&fs_info->endio_freespace_worker, 1); ret |= btrfs_start_workers(&fs_info->endio_workers);
btrfs_start_workers(&fs_info->delayed_workers, 1); ret |= btrfs_start_workers(&fs_info->endio_meta_workers);
btrfs_start_workers(&fs_info->caching_workers, 1); ret |= btrfs_start_workers(&fs_info->endio_meta_write_workers);
btrfs_start_workers(&fs_info->readahead_workers, 1); ret |= btrfs_start_workers(&fs_info->endio_write_workers);
ret |= btrfs_start_workers(&fs_info->endio_freespace_worker);
ret |= btrfs_start_workers(&fs_info->delayed_workers);
ret |= btrfs_start_workers(&fs_info->caching_workers);
ret |= btrfs_start_workers(&fs_info->readahead_workers);
if (ret) {
ret = -ENOMEM;
goto fail_sb_buffer;
}
fs_info->bdi.ra_pages *= btrfs_super_num_devices(disk_super); fs_info->bdi.ra_pages *= btrfs_super_num_devices(disk_super);
fs_info->bdi.ra_pages = max(fs_info->bdi.ra_pages, fs_info->bdi.ra_pages = max(fs_info->bdi.ra_pages,
......
...@@ -1535,18 +1535,22 @@ static noinline_for_stack int scrub_supers(struct scrub_dev *sdev) ...@@ -1535,18 +1535,22 @@ static noinline_for_stack int scrub_supers(struct scrub_dev *sdev)
static noinline_for_stack int scrub_workers_get(struct btrfs_root *root) static noinline_for_stack int scrub_workers_get(struct btrfs_root *root)
{ {
struct btrfs_fs_info *fs_info = root->fs_info; struct btrfs_fs_info *fs_info = root->fs_info;
int ret = 0;
mutex_lock(&fs_info->scrub_lock); mutex_lock(&fs_info->scrub_lock);
if (fs_info->scrub_workers_refcnt == 0) { if (fs_info->scrub_workers_refcnt == 0) {
btrfs_init_workers(&fs_info->scrub_workers, "scrub", btrfs_init_workers(&fs_info->scrub_workers, "scrub",
fs_info->thread_pool_size, &fs_info->generic_worker); fs_info->thread_pool_size, &fs_info->generic_worker);
fs_info->scrub_workers.idle_thresh = 4; fs_info->scrub_workers.idle_thresh = 4;
btrfs_start_workers(&fs_info->scrub_workers, 1); ret = btrfs_start_workers(&fs_info->scrub_workers);
if (ret)
goto out;
} }
++fs_info->scrub_workers_refcnt; ++fs_info->scrub_workers_refcnt;
out:
mutex_unlock(&fs_info->scrub_lock); mutex_unlock(&fs_info->scrub_lock);
return 0; return ret;
} }
static noinline_for_stack void scrub_workers_put(struct btrfs_root *root) static noinline_for_stack void scrub_workers_put(struct btrfs_root *root)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment