Commit f8f30863 authored by Kent Overstreet's avatar Kent Overstreet Committed by Kent Overstreet

bcachefs: Avoid atomics in write fast path

This adds some horrible hacks, but the atomic ops for closures were
getting to be a pretty expensive part of the write path. We don't want
to rip out closures entirely from the write path, because they're used
for e.g. waiting on the allocator, or waiting on the journal flush, and
that stuff would get really ugly without closures.
Signed-off-by: default avatarKent Overstreet <kent.overstreet@gmail.com>
Signed-off-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
parent f7f63211
...@@ -54,7 +54,7 @@ struct bch_writepage_io { ...@@ -54,7 +54,7 @@ struct bch_writepage_io {
}; };
struct dio_write { struct dio_write {
struct closure cl; struct completion done;
struct kiocb *req; struct kiocb *req;
struct mm_struct *mm; struct mm_struct *mm;
unsigned loop:1, unsigned loop:1,
...@@ -1755,8 +1755,6 @@ static noinline int bch2_dio_write_copy_iov(struct dio_write *dio) ...@@ -1755,8 +1755,6 @@ static noinline int bch2_dio_write_copy_iov(struct dio_write *dio)
return 0; return 0;
} }
static void bch2_dio_write_loop_async(struct closure *);
static long bch2_dio_write_loop(struct dio_write *dio) static long bch2_dio_write_loop(struct dio_write *dio)
{ {
bool kthread = (current->flags & PF_KTHREAD) != 0; bool kthread = (current->flags & PF_KTHREAD) != 0;
...@@ -1830,23 +1828,20 @@ static long bch2_dio_write_loop(struct dio_write *dio) ...@@ -1830,23 +1828,20 @@ static long bch2_dio_write_loop(struct dio_write *dio)
task_io_account_write(bio->bi_iter.bi_size); task_io_account_write(bio->bi_iter.bi_size);
closure_call(&dio->op.cl, bch2_write, NULL, &dio->cl);
if (!dio->sync && !dio->loop && dio->iter.count) { if (!dio->sync && !dio->loop && dio->iter.count) {
if (bch2_dio_write_copy_iov(dio)) { if (bch2_dio_write_copy_iov(dio)) {
dio->op.error = -ENOMEM; dio->sync = true;
goto err_wait_io; goto do_io;
} }
} }
err_wait_io: do_io:
dio->loop = true; dio->loop = true;
closure_call(&dio->op.cl, bch2_write, NULL, NULL);
if (!dio->sync) { if (dio->sync)
continue_at(&dio->cl, bch2_dio_write_loop_async, NULL); wait_for_completion(&dio->done);
else
return -EIOCBQUEUED; return -EIOCBQUEUED;
}
closure_sync(&dio->cl);
loop: loop:
i_sectors_acct(c, inode, &dio->quota_res, i_sectors_acct(c, inode, &dio->quota_res,
dio->op.i_sectors_delta); dio->op.i_sectors_delta);
...@@ -1863,7 +1858,9 @@ static long bch2_dio_write_loop(struct dio_write *dio) ...@@ -1863,7 +1858,9 @@ static long bch2_dio_write_loop(struct dio_write *dio)
put_page(bv->bv_page); put_page(bv->bv_page);
if (!dio->iter.count || dio->op.error) if (!dio->iter.count || dio->op.error)
break; break;
bio_reset(bio, NULL, REQ_OP_WRITE); bio_reset(bio, NULL, REQ_OP_WRITE);
reinit_completion(&dio->done);
} }
ret = dio->op.error ?: ((long) dio->op.written << 9); ret = dio->op.error ?: ((long) dio->op.written << 9);
...@@ -1875,8 +1872,6 @@ static long bch2_dio_write_loop(struct dio_write *dio) ...@@ -1875,8 +1872,6 @@ static long bch2_dio_write_loop(struct dio_write *dio)
if (dio->free_iov) if (dio->free_iov)
kfree(dio->iter.__iov); kfree(dio->iter.__iov);
closure_debug_destroy(&dio->cl);
sync = dio->sync; sync = dio->sync;
bio_put(bio); bio_put(bio);
...@@ -1890,10 +1885,13 @@ static long bch2_dio_write_loop(struct dio_write *dio) ...@@ -1890,10 +1885,13 @@ static long bch2_dio_write_loop(struct dio_write *dio)
return ret; return ret;
} }
static void bch2_dio_write_loop_async(struct closure *cl) static void bch2_dio_write_loop_async(struct bch_write_op *op)
{ {
struct dio_write *dio = container_of(cl, struct dio_write, cl); struct dio_write *dio = container_of(op, struct dio_write, op);
if (dio->sync)
complete(&dio->done);
else
bch2_dio_write_loop(dio); bch2_dio_write_loop(dio);
} }
...@@ -1922,7 +1920,7 @@ ssize_t bch2_direct_write(struct kiocb *req, struct iov_iter *iter) ...@@ -1922,7 +1920,7 @@ ssize_t bch2_direct_write(struct kiocb *req, struct iov_iter *iter)
GFP_KERNEL, GFP_KERNEL,
&c->dio_write_bioset); &c->dio_write_bioset);
dio = container_of(bio, struct dio_write, op.wbio.bio); dio = container_of(bio, struct dio_write, op.wbio.bio);
closure_init(&dio->cl, NULL); init_completion(&dio->done);
dio->req = req; dio->req = req;
dio->mm = current->mm; dio->mm = current->mm;
dio->loop = false; dio->loop = false;
...@@ -1933,6 +1931,7 @@ ssize_t bch2_direct_write(struct kiocb *req, struct iov_iter *iter) ...@@ -1933,6 +1931,7 @@ ssize_t bch2_direct_write(struct kiocb *req, struct iov_iter *iter)
dio->iter = *iter; dio->iter = *iter;
bch2_write_op_init(&dio->op, c, opts); bch2_write_op_init(&dio->op, c, opts);
dio->op.end_io = bch2_dio_write_loop_async;
dio->op.target = opts.foreground_target; dio->op.target = opts.foreground_target;
op_journal_seq_set(&dio->op, &inode->ei_journal_seq); op_journal_seq_set(&dio->op, &inode->ei_journal_seq);
dio->op.write_point = writepoint_hashed((unsigned long) current); dio->op.write_point = writepoint_hashed((unsigned long) current);
...@@ -1962,7 +1961,6 @@ ssize_t bch2_direct_write(struct kiocb *req, struct iov_iter *iter) ...@@ -1962,7 +1961,6 @@ ssize_t bch2_direct_write(struct kiocb *req, struct iov_iter *iter)
err: err:
bch2_disk_reservation_put(c, &dio->op.res); bch2_disk_reservation_put(c, &dio->op.res);
bch2_quota_reservation_put(c, inode, &dio->quota_res); bch2_quota_reservation_put(c, inode, &dio->quota_res);
closure_debug_destroy(&dio->cl);
bio_put(bio); bio_put(bio);
return ret; return ret;
} }
......
...@@ -513,7 +513,12 @@ static void bch2_write_done(struct closure *cl) ...@@ -513,7 +513,12 @@ static void bch2_write_done(struct closure *cl)
bch2_time_stats_update(&c->times[BCH_TIME_data_write], op->start_time); bch2_time_stats_update(&c->times[BCH_TIME_data_write], op->start_time);
if (op->end_io)
op->end_io(op);
if (cl->parent)
closure_return(cl); closure_return(cl);
else
closure_debug_destroy(cl);
} }
/** /**
...@@ -622,8 +627,10 @@ static void bch2_write_endio(struct bio *bio) ...@@ -622,8 +627,10 @@ static void bch2_write_endio(struct bio *bio)
if (parent) if (parent)
bio_endio(&parent->bio); bio_endio(&parent->bio);
else else if (!(op->flags & BCH_WRITE_SKIP_CLOSURE_PUT))
closure_put(cl); closure_put(cl);
else
continue_at_nobarrier(cl, bch2_write_index, index_update_wq(op));
} }
static void init_append_extent(struct bch_write_op *op, static void init_append_extent(struct bch_write_op *op,
...@@ -828,15 +835,14 @@ static enum prep_encoded_ret { ...@@ -828,15 +835,14 @@ static enum prep_encoded_ret {
return PREP_ENCODED_OK; return PREP_ENCODED_OK;
} }
static int bch2_write_extent(struct bch_write_op *op, struct write_point *wp) static int bch2_write_extent(struct bch_write_op *op, struct write_point *wp,
struct bio **_dst)
{ {
struct bch_fs *c = op->c; struct bch_fs *c = op->c;
struct bio *src = &op->wbio.bio, *dst = src; struct bio *src = &op->wbio.bio, *dst = src;
struct bvec_iter saved_iter; struct bvec_iter saved_iter;
struct bkey_i *key_to_write;
void *ec_buf; void *ec_buf;
unsigned key_to_write_offset = op->insert_keys.top_p - struct bpos ec_pos = op->pos;
op->insert_keys.keys_p;
unsigned total_output = 0, total_input = 0; unsigned total_output = 0, total_input = 0;
bool bounce = false; bool bounce = false;
bool page_alloc_failed = false; bool page_alloc_failed = false;
...@@ -855,6 +861,7 @@ static int bch2_write_extent(struct bch_write_op *op, struct write_point *wp) ...@@ -855,6 +861,7 @@ static int bch2_write_extent(struct bch_write_op *op, struct write_point *wp)
case PREP_ENCODED_CHECKSUM_ERR: case PREP_ENCODED_CHECKSUM_ERR:
goto csum_err; goto csum_err;
case PREP_ENCODED_DO_WRITE: case PREP_ENCODED_DO_WRITE:
/* XXX look for bug here */
if (ec_buf) { if (ec_buf) {
dst = bch2_write_bio_alloc(c, wp, src, dst = bch2_write_bio_alloc(c, wp, src,
&page_alloc_failed, &page_alloc_failed,
...@@ -1004,31 +1011,15 @@ static int bch2_write_extent(struct bch_write_op *op, struct write_point *wp) ...@@ -1004,31 +1011,15 @@ static int bch2_write_extent(struct bch_write_op *op, struct write_point *wp)
dst->bi_iter.bi_size = total_output; dst->bi_iter.bi_size = total_output;
do_write: do_write:
/* might have done a realloc... */ /* might have done a realloc... */
bch2_ec_add_backpointer(c, wp, ec_pos, total_input >> 9);
key_to_write = (void *) (op->insert_keys.keys_p + key_to_write_offset); *_dst = dst;
bch2_ec_add_backpointer(c, wp,
bkey_start_pos(&key_to_write->k),
total_input >> 9);
bch2_alloc_sectors_done(c, wp);
dst->bi_end_io = bch2_write_endio;
dst->bi_private = &op->cl;
dst->bi_opf = REQ_OP_WRITE;
closure_get(dst->bi_private);
bch2_submit_wbio_replicas(to_wbio(dst), c, BCH_DATA_USER,
key_to_write);
return more; return more;
csum_err: csum_err:
bch_err(c, "error verifying existing checksum while " bch_err(c, "error verifying existing checksum while "
"rewriting existing data (memory corruption?)"); "rewriting existing data (memory corruption?)");
ret = -EIO; ret = -EIO;
err: err:
bch2_alloc_sectors_done(c, wp);
if (to_wbio(dst)->bounce) if (to_wbio(dst)->bounce)
bch2_bio_free_pages_pool(c, dst); bch2_bio_free_pages_pool(c, dst);
if (to_wbio(dst)->put_bio) if (to_wbio(dst)->put_bio)
...@@ -1042,11 +1033,17 @@ static void __bch2_write(struct closure *cl) ...@@ -1042,11 +1033,17 @@ static void __bch2_write(struct closure *cl)
struct bch_write_op *op = container_of(cl, struct bch_write_op, cl); struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
struct bch_fs *c = op->c; struct bch_fs *c = op->c;
struct write_point *wp; struct write_point *wp;
struct bio *bio;
bool skip_put = true;
int ret; int ret;
again: again:
memset(&op->failed, 0, sizeof(op->failed)); memset(&op->failed, 0, sizeof(op->failed));
do { do {
struct bkey_i *key_to_write;
unsigned key_to_write_offset = op->insert_keys.top_p -
op->insert_keys.keys_p;
/* +1 for possible cache device: */ /* +1 for possible cache device: */
if (op->open_buckets.nr + op->nr_replicas + 1 > if (op->open_buckets.nr + op->nr_replicas + 1 >
ARRAY_SIZE(op->open_buckets.v)) ARRAY_SIZE(op->open_buckets.v))
...@@ -1080,21 +1077,38 @@ static void __bch2_write(struct closure *cl) ...@@ -1080,21 +1077,38 @@ static void __bch2_write(struct closure *cl)
} }
bch2_open_bucket_get(c, wp, &op->open_buckets); bch2_open_bucket_get(c, wp, &op->open_buckets);
ret = bch2_write_extent(op, wp, &bio);
ret = bch2_write_extent(op, wp); bch2_alloc_sectors_done(c, wp);
if (ret < 0) if (ret < 0)
goto err; goto err;
if (ret)
skip_put = false;
bio->bi_end_io = bch2_write_endio;
bio->bi_private = &op->cl;
bio->bi_opf = REQ_OP_WRITE;
if (!skip_put)
closure_get(bio->bi_private);
else
op->flags |= BCH_WRITE_SKIP_CLOSURE_PUT;
key_to_write = (void *) (op->insert_keys.keys_p +
key_to_write_offset);
bch2_submit_wbio_replicas(to_wbio(bio), c, BCH_DATA_USER,
key_to_write);
} while (ret); } while (ret);
if (!skip_put)
continue_at(cl, bch2_write_index, index_update_wq(op)); continue_at(cl, bch2_write_index, index_update_wq(op));
return; return;
err: err:
op->error = ret; op->error = ret;
continue_at(cl, !bch2_keylist_empty(&op->insert_keys) continue_at(cl, bch2_write_index, index_update_wq(op));
? bch2_write_index
: bch2_write_done, index_update_wq(op));
return; return;
flush_io: flush_io:
closure_sync(cl); closure_sync(cl);
......
...@@ -37,6 +37,7 @@ enum bch_write_flags { ...@@ -37,6 +37,7 @@ enum bch_write_flags {
/* Internal: */ /* Internal: */
BCH_WRITE_JOURNAL_SEQ_PTR = (1 << 8), BCH_WRITE_JOURNAL_SEQ_PTR = (1 << 8),
BCH_WRITE_SKIP_CLOSURE_PUT = (1 << 9),
}; };
static inline u64 *op_journal_seq(struct bch_write_op *op) static inline u64 *op_journal_seq(struct bch_write_op *op)
...@@ -71,6 +72,7 @@ static inline void bch2_write_op_init(struct bch_write_op *op, struct bch_fs *c, ...@@ -71,6 +72,7 @@ static inline void bch2_write_op_init(struct bch_write_op *op, struct bch_fs *c,
struct bch_io_opts opts) struct bch_io_opts opts)
{ {
op->c = c; op->c = c;
op->end_io = NULL;
op->flags = 0; op->flags = 0;
op->written = 0; op->written = 0;
op->error = 0; op->error = 0;
......
...@@ -95,6 +95,7 @@ struct bch_write_bio { ...@@ -95,6 +95,7 @@ struct bch_write_bio {
struct bch_write_op { struct bch_write_op {
struct closure cl; struct closure cl;
struct bch_fs *c; struct bch_fs *c;
void (*end_io)(struct bch_write_op *);
u64 start_time; u64 start_time;
unsigned written; /* sectors */ unsigned written; /* sectors */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment