Commit d16b4a77 authored by Kent Overstreet's avatar Kent Overstreet Committed by Kent Overstreet

bcachefs: Assorted journal refactoring

Signed-off-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
parent ecf37a4a
...@@ -17,23 +17,14 @@ ...@@ -17,23 +17,14 @@
#include "super-io.h" #include "super-io.h"
#include "trace.h" #include "trace.h"
static bool journal_entry_is_open(struct journal *j) static bool __journal_entry_is_open(union journal_res_state state)
{ {
return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL; return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
} }
void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set) static bool journal_entry_is_open(struct journal *j)
{ {
struct journal_buf *w = journal_prev_buf(j); return __journal_entry_is_open(j->reservations);
atomic_dec_bug(&journal_seq_pin(j, le64_to_cpu(w->data->seq))->count);
if (!need_write_just_set &&
test_bit(JOURNAL_NEED_WRITE, &j->flags))
bch2_time_stats_update(j->delay_time,
j->need_write_time);
closure_call(&j->io, bch2_journal_write, system_highpri_wq, NULL);
} }
static void journal_pin_new_entry(struct journal *j, int count) static void journal_pin_new_entry(struct journal *j, int count)
...@@ -77,39 +68,76 @@ static inline bool journal_entry_empty(struct jset *j) ...@@ -77,39 +68,76 @@ static inline bool journal_entry_empty(struct jset *j)
return true; return true;
} }
static enum { void bch2_journal_halt(struct journal *j)
JOURNAL_ENTRY_ERROR, {
JOURNAL_ENTRY_INUSE, union journal_res_state old, new;
JOURNAL_ENTRY_CLOSED, u64 v = atomic64_read(&j->reservations.counter);
JOURNAL_UNLOCKED,
} journal_buf_switch(struct journal *j, bool need_write_just_set) do {
old.v = new.v = v;
if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
return;
new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
} while ((v = atomic64_cmpxchg(&j->reservations.counter,
old.v, new.v)) != old.v);
journal_wake(j);
closure_wake_up(&journal_cur_buf(j)->wait);
closure_wake_up(&journal_prev_buf(j)->wait);
}
/* journal entry close/open: */
void __bch2_journal_buf_put(struct journal *j, bool need_write_just_set)
{
struct journal_buf *w = journal_prev_buf(j);
atomic_dec_bug(&journal_seq_pin(j, le64_to_cpu(w->data->seq))->count);
if (!need_write_just_set &&
test_bit(JOURNAL_NEED_WRITE, &j->flags))
bch2_time_stats_update(j->delay_time,
j->need_write_time);
clear_bit(JOURNAL_NEED_WRITE, &j->flags);
closure_call(&j->io, bch2_journal_write, system_highpri_wq, NULL);
}
/*
* Returns true if journal entry is now closed:
*/
static bool __journal_entry_close(struct journal *j)
{ {
struct bch_fs *c = container_of(j, struct bch_fs, journal); struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct journal_buf *buf = journal_cur_buf(j); struct journal_buf *buf = journal_cur_buf(j);
union journal_res_state old, new; union journal_res_state old, new;
u64 v = atomic64_read(&j->reservations.counter); u64 v = atomic64_read(&j->reservations.counter);
bool set_need_write = false;
unsigned sectors;
lockdep_assert_held(&j->lock); lockdep_assert_held(&j->lock);
do { do {
old.v = new.v = v; old.v = new.v = v;
if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL) if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
return JOURNAL_ENTRY_CLOSED; return true;
if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) { if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) {
/* this entry will never be written: */ /* this entry will never be written: */
closure_wake_up(&buf->wait); closure_wake_up(&buf->wait);
return JOURNAL_ENTRY_ERROR; return true;
} }
if (new.prev_buf_unwritten) if (!test_bit(JOURNAL_NEED_WRITE, &j->flags)) {
return JOURNAL_ENTRY_INUSE; set_bit(JOURNAL_NEED_WRITE, &j->flags);
j->need_write_time = local_clock();
set_need_write = true;
}
/* if (new.prev_buf_unwritten)
* avoid race between setting buf->data->u64s and return false;
* journal_res_put starting write:
*/
journal_state_inc(&new);
new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL; new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
new.idx++; new.idx++;
...@@ -119,15 +147,12 @@ static enum { ...@@ -119,15 +147,12 @@ static enum {
} while ((v = atomic64_cmpxchg(&j->reservations.counter, } while ((v = atomic64_cmpxchg(&j->reservations.counter,
old.v, new.v)) != old.v); old.v, new.v)) != old.v);
clear_bit(JOURNAL_NEED_WRITE, &j->flags);
buf->data->u64s = cpu_to_le32(old.cur_entry_offset); buf->data->u64s = cpu_to_le32(old.cur_entry_offset);
j->prev_buf_sectors = sectors = vstruct_blocks_plus(buf->data, c->block_bits,
vstruct_blocks_plus(buf->data, c->block_bits, buf->u64s_reserved) << c->block_bits;
buf->u64s_reserved) * BUG_ON(sectors > buf->sectors);
c->opts.block_size; buf->sectors = sectors;
BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
bkey_extent_init(&buf->key); bkey_extent_init(&buf->key);
...@@ -163,32 +188,22 @@ static enum { ...@@ -163,32 +188,22 @@ static enum {
bch2_journal_buf_init(j); bch2_journal_buf_init(j);
cancel_delayed_work(&j->write_work); cancel_delayed_work(&j->write_work);
spin_unlock(&j->lock);
/* ugh - might be called from __journal_res_get() under wait_event() */ /* ugh - might be called from __journal_res_get() under wait_event() */
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
bch2_journal_buf_put(j, old.idx, need_write_just_set); bch2_journal_buf_put(j, old.idx, set_need_write);
return true;
return JOURNAL_UNLOCKED;
} }
void bch2_journal_halt(struct journal *j) static bool journal_entry_close(struct journal *j)
{ {
union journal_res_state old, new; bool ret;
u64 v = atomic64_read(&j->reservations.counter);
do {
old.v = new.v = v;
if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
return;
new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL; spin_lock(&j->lock);
} while ((v = atomic64_cmpxchg(&j->reservations.counter, ret = __journal_entry_close(j);
old.v, new.v)) != old.v); spin_unlock(&j->lock);
journal_wake(j); return ret;
closure_wake_up(&journal_cur_buf(j)->wait);
closure_wake_up(&journal_prev_buf(j)->wait);
} }
/* /*
...@@ -196,17 +211,16 @@ void bch2_journal_halt(struct journal *j) ...@@ -196,17 +211,16 @@ void bch2_journal_halt(struct journal *j)
* journal reservation - journal entry is open means journal is dirty: * journal reservation - journal entry is open means journal is dirty:
* *
* returns: * returns:
* 1: success * 0: success
* 0: journal currently full (must wait) * -ENOSPC: journal currently full, must invoke reclaim
* -EROFS: insufficient rw devices * -EAGAIN: journal blocked, must wait
* -EIO: journal error * -EROFS: insufficient rw devices or journal error
*/ */
static int journal_entry_open(struct journal *j) static int journal_entry_open(struct journal *j)
{ {
struct journal_buf *buf = journal_cur_buf(j); struct journal_buf *buf = journal_cur_buf(j);
union journal_res_state old, new; union journal_res_state old, new;
ssize_t u64s; int u64s, ret;
int sectors;
u64 v; u64 v;
lockdep_assert_held(&j->lock); lockdep_assert_held(&j->lock);
...@@ -216,29 +230,22 @@ static int journal_entry_open(struct journal *j) ...@@ -216,29 +230,22 @@ static int journal_entry_open(struct journal *j)
return -EAGAIN; return -EAGAIN;
if (!fifo_free(&j->pin)) if (!fifo_free(&j->pin))
return 0; return -ENOSPC;
sectors = bch2_journal_entry_sectors(j); ret = bch2_journal_space_available(j);
if (sectors <= 0) if (ret)
return sectors; return ret;
buf->disk_sectors = sectors;
buf->u64s_reserved = j->entry_u64s_reserved; buf->u64s_reserved = j->entry_u64s_reserved;
buf->disk_sectors = j->cur_entry_sectors;
buf->sectors = min(buf->disk_sectors, buf->buf_size >> 9);
sectors = min_t(unsigned, sectors, buf->size >> 9); u64s = (int) (buf->sectors << 9) / sizeof(u64) -
j->cur_buf_sectors = sectors; journal_entry_overhead(j);
u64s = clamp_t(int, u64s, 0, JOURNAL_ENTRY_CLOSED_VAL - 1);
u64s = (sectors << 9) / sizeof(u64);
/* Subtract the journal header */
u64s -= sizeof(struct jset) / sizeof(u64);
u64s -= buf->u64s_reserved;
u64s = max_t(ssize_t, 0L, u64s);
BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL);
if (u64s <= le32_to_cpu(buf->data->u64s)) if (u64s <= le32_to_cpu(buf->data->u64s))
return 0; return -ENOSPC;
/* /*
* Must be set before marking the journal entry as open: * Must be set before marking the journal entry as open:
...@@ -250,10 +257,11 @@ static int journal_entry_open(struct journal *j) ...@@ -250,10 +257,11 @@ static int journal_entry_open(struct journal *j)
old.v = new.v = v; old.v = new.v = v;
if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
return -EIO; return -EROFS;
/* Handle any already added entries */ /* Handle any already added entries */
new.cur_entry_offset = le32_to_cpu(buf->data->u64s); new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
journal_state_inc(&new);
} while ((v = atomic64_cmpxchg(&j->reservations.counter, } while ((v = atomic64_cmpxchg(&j->reservations.counter,
old.v, new.v)) != old.v); old.v, new.v)) != old.v);
...@@ -266,48 +274,16 @@ static int journal_entry_open(struct journal *j) ...@@ -266,48 +274,16 @@ static int journal_entry_open(struct journal *j)
&j->write_work, &j->write_work,
msecs_to_jiffies(j->write_delay_ms)); msecs_to_jiffies(j->write_delay_ms));
journal_wake(j); journal_wake(j);
return 1; return 0;
}
static bool __journal_entry_close(struct journal *j)
{
bool set_need_write;
if (!journal_entry_is_open(j)) {
spin_unlock(&j->lock);
return true;
}
set_need_write = !test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags);
if (set_need_write)
j->need_write_time = local_clock();
switch (journal_buf_switch(j, set_need_write)) {
case JOURNAL_ENTRY_INUSE:
spin_unlock(&j->lock);
return false;
default:
spin_unlock(&j->lock);
fallthrough;
case JOURNAL_UNLOCKED:
return false;
}
}
static bool journal_entry_close(struct journal *j)
{
spin_lock(&j->lock);
return __journal_entry_close(j);
} }
static bool journal_quiesced(struct journal *j) static bool journal_quiesced(struct journal *j)
{ {
bool ret; union journal_res_state state = READ_ONCE(j->reservations);
bool ret = !state.prev_buf_unwritten && !__journal_entry_is_open(state);
spin_lock(&j->lock); if (!ret)
ret = !j->reservations.prev_buf_unwritten && journal_entry_close(j);
!journal_entry_is_open(j);
__journal_entry_close(j);
return ret; return ret;
} }
...@@ -357,7 +333,11 @@ static int __journal_res_get(struct journal *j, struct journal_res *res, ...@@ -357,7 +333,11 @@ static int __journal_res_get(struct journal *j, struct journal_res *res,
if (journal_res_get_fast(j, res, flags)) if (journal_res_get_fast(j, res, flags))
return 0; return 0;
if (bch2_journal_error(j))
return -EROFS;
spin_lock(&j->lock); spin_lock(&j->lock);
/* /*
* Recheck after taking the lock, so we don't race with another thread * Recheck after taking the lock, so we don't race with another thread
* that just did journal_entry_open() and call journal_entry_close() * that just did journal_entry_open() and call journal_entry_close()
...@@ -375,56 +355,42 @@ static int __journal_res_get(struct journal *j, struct journal_res *res, ...@@ -375,56 +355,42 @@ static int __journal_res_get(struct journal *j, struct journal_res *res,
*/ */
buf = journal_cur_buf(j); buf = journal_cur_buf(j);
if (journal_entry_is_open(j) && if (journal_entry_is_open(j) &&
buf->size >> 9 < buf->disk_sectors && buf->buf_size >> 9 < buf->disk_sectors &&
buf->size < JOURNAL_ENTRY_SIZE_MAX) buf->buf_size < JOURNAL_ENTRY_SIZE_MAX)
j->buf_size_want = max(j->buf_size_want, buf->size << 1); j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1);
/* if (journal_entry_is_open(j) &&
* Close the current journal entry if necessary, then try to start a new !__journal_entry_close(j)) {
* one:
*/
switch (journal_buf_switch(j, false)) {
case JOURNAL_ENTRY_ERROR:
spin_unlock(&j->lock);
return -EROFS;
case JOURNAL_ENTRY_INUSE:
/* /*
* The current journal entry is still open, but we failed to get * We failed to get a reservation on the current open journal
* a journal reservation because there's not enough space in it, * entry because it's full, and we can't close it because
* and we can't close it and start another because we haven't * there's still a previous one in flight:
* finished writing out the previous entry:
*/ */
spin_unlock(&j->lock);
trace_journal_entry_full(c); trace_journal_entry_full(c);
goto blocked; ret = -EAGAIN;
case JOURNAL_ENTRY_CLOSED: } else {
break; ret = journal_entry_open(j);
case JOURNAL_UNLOCKED:
goto retry;
} }
/* We now have a new, closed journal buf - see if we can open it: */ if ((ret == -EAGAIN || ret == -ENOSPC) &&
ret = journal_entry_open(j); !j->res_get_blocked_start)
j->res_get_blocked_start = local_clock() ?: 1;
spin_unlock(&j->lock); spin_unlock(&j->lock);
if (ret < 0) if (!ret)
return ret;
if (ret)
goto retry; goto retry;
if (ret == -ENOSPC) {
/*
* Journal is full - can't rely on reclaim from work item due to
* freezing:
*/
trace_journal_full(c);
bch2_journal_reclaim_work(&j->reclaim_work.work);
ret = -EAGAIN;
}
/* Journal's full, we have to wait */ return ret;
/*
* Direct reclaim - can't rely on reclaim from work item
* due to freezing..
*/
bch2_journal_reclaim_work(&j->reclaim_work.work);
trace_journal_full(c);
blocked:
if (!j->res_get_blocked_start)
j->res_get_blocked_start = local_clock() ?: 1;
return -EAGAIN;
} }
/* /*
...@@ -461,7 +427,7 @@ void bch2_journal_entry_res_resize(struct journal *j, ...@@ -461,7 +427,7 @@ void bch2_journal_entry_res_resize(struct journal *j,
j->entry_u64s_reserved += d; j->entry_u64s_reserved += d;
if (d <= 0) if (d <= 0)
goto out_unlock; goto out;
j->cur_entry_u64s -= d; j->cur_entry_u64s -= d;
smp_mb(); smp_mb();
...@@ -474,15 +440,12 @@ void bch2_journal_entry_res_resize(struct journal *j, ...@@ -474,15 +440,12 @@ void bch2_journal_entry_res_resize(struct journal *j,
* Not enough room in current journal entry, have to flush it: * Not enough room in current journal entry, have to flush it:
*/ */
__journal_entry_close(j); __journal_entry_close(j);
goto out; } else {
journal_cur_buf(j)->u64s_reserved += d;
} }
journal_cur_buf(j)->u64s_reserved += d;
out_unlock:
spin_unlock(&j->lock);
out: out:
spin_unlock(&j->lock);
res->u64s += d; res->u64s += d;
return;
} }
/* journal flushing: */ /* journal flushing: */
...@@ -512,47 +475,47 @@ int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *cl) ...@@ -512,47 +475,47 @@ int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *cl)
{ {
struct bch_fs *c = container_of(j, struct bch_fs, journal); struct bch_fs *c = container_of(j, struct bch_fs, journal);
int ret; int ret;
retry:
spin_lock(&j->lock); spin_lock(&j->lock);
if (seq < journal_cur_seq(j) || /*
* Can't try to open more than one sequence number ahead:
*/
BUG_ON(journal_cur_seq(j) < seq && !journal_entry_is_open(j));
if (journal_cur_seq(j) > seq ||
journal_entry_is_open(j)) { journal_entry_is_open(j)) {
spin_unlock(&j->lock); spin_unlock(&j->lock);
return 0; return 0;
} }
if (journal_cur_seq(j) < seq) { if (journal_cur_seq(j) < seq &&
switch (journal_buf_switch(j, false)) { !__journal_entry_close(j)) {
case JOURNAL_ENTRY_ERROR: /* haven't finished writing out the previous one: */
spin_unlock(&j->lock); trace_journal_entry_full(c);
return -EROFS; ret = -EAGAIN;
case JOURNAL_ENTRY_INUSE: } else {
/* haven't finished writing out the previous one: */ BUG_ON(journal_cur_seq(j) != seq);
trace_journal_entry_full(c);
goto blocked;
case JOURNAL_ENTRY_CLOSED:
break;
case JOURNAL_UNLOCKED:
goto retry;
}
}
BUG_ON(journal_cur_seq(j) < seq);
ret = journal_entry_open(j); ret = journal_entry_open(j);
if (ret) {
spin_unlock(&j->lock);
return ret < 0 ? ret : 0;
} }
blocked:
if (!j->res_get_blocked_start) if ((ret == -EAGAIN || ret == -ENOSPC) &&
!j->res_get_blocked_start)
j->res_get_blocked_start = local_clock() ?: 1; j->res_get_blocked_start = local_clock() ?: 1;
closure_wait(&j->async_wait, cl); if (ret == -EAGAIN || ret == -ENOSPC)
closure_wait(&j->async_wait, cl);
spin_unlock(&j->lock); spin_unlock(&j->lock);
bch2_journal_reclaim_work(&j->reclaim_work.work); if (ret == -ENOSPC) {
return -EAGAIN; trace_journal_full(c);
bch2_journal_reclaim_work(&j->reclaim_work.work);
ret = -EAGAIN;
}
return ret;
} }
static int journal_seq_error(struct journal *j, u64 seq) static int journal_seq_error(struct journal *j, u64 seq)
...@@ -635,8 +598,7 @@ void bch2_journal_flush_seq_async(struct journal *j, u64 seq, ...@@ -635,8 +598,7 @@ void bch2_journal_flush_seq_async(struct journal *j, u64 seq,
if (seq == journal_cur_seq(j)) if (seq == journal_cur_seq(j))
__journal_entry_close(j); __journal_entry_close(j);
else spin_unlock(&j->lock);
spin_unlock(&j->lock);
} }
static int journal_seq_flushed(struct journal *j, u64 seq) static int journal_seq_flushed(struct journal *j, u64 seq)
...@@ -648,8 +610,7 @@ static int journal_seq_flushed(struct journal *j, u64 seq) ...@@ -648,8 +610,7 @@ static int journal_seq_flushed(struct journal *j, u64 seq)
if (seq == journal_cur_seq(j)) if (seq == journal_cur_seq(j))
__journal_entry_close(j); __journal_entry_close(j);
else spin_unlock(&j->lock);
spin_unlock(&j->lock);
return ret; return ret;
} }
...@@ -783,7 +744,7 @@ static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr, ...@@ -783,7 +744,7 @@ static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
goto err; goto err;
journal_buckets = bch2_sb_resize_journal(&ca->disk_sb, journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
nr + sizeof(*journal_buckets) / sizeof(u64)); nr + sizeof(*journal_buckets) / sizeof(u64));
if (!journal_buckets) if (!journal_buckets)
goto err; goto err;
...@@ -846,9 +807,9 @@ static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr, ...@@ -846,9 +807,9 @@ static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
ja->nr++; ja->nr++;
bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL, bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL,
ca->mi.bucket_size, ca->mi.bucket_size,
gc_phase(GC_PHASE_SB), gc_phase(GC_PHASE_SB),
0); 0);
if (c) { if (c) {
spin_unlock(&c->journal.lock); spin_unlock(&c->journal.lock);
...@@ -899,7 +860,7 @@ int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca, ...@@ -899,7 +860,7 @@ int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
*/ */
if (bch2_disk_reservation_get(c, &disk_res, if (bch2_disk_reservation_get(c, &disk_res,
bucket_to_sector(ca, nr - ja->nr), 1, 0)) { bucket_to_sector(ca, nr - ja->nr), 1, 0)) {
mutex_unlock(&c->sb_lock); mutex_unlock(&c->sb_lock);
return -ENOSPC; return -ENOSPC;
} }
...@@ -996,7 +957,7 @@ void bch2_fs_journal_start(struct journal *j) ...@@ -996,7 +957,7 @@ void bch2_fs_journal_start(struct journal *j)
journal_pin_new_entry(j, 0); journal_pin_new_entry(j, 0);
/* /*
* journal_buf_switch() only inits the next journal entry when it * __journal_entry_close() only inits the next journal entry when it
* closes an open journal entry - the very first journal entry gets * closes an open journal entry - the very first journal entry gets
* initialized here: * initialized here:
*/ */
...@@ -1063,8 +1024,8 @@ int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) ...@@ -1063,8 +1024,8 @@ int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
void bch2_fs_journal_exit(struct journal *j) void bch2_fs_journal_exit(struct journal *j)
{ {
kvpfree(j->buf[1].data, j->buf[1].size); kvpfree(j->buf[1].data, j->buf[1].buf_size);
kvpfree(j->buf[0].data, j->buf[0].size); kvpfree(j->buf[0].data, j->buf[0].buf_size);
free_fifo(&j->pin); free_fifo(&j->pin);
} }
...@@ -1088,8 +1049,8 @@ int bch2_fs_journal_init(struct journal *j) ...@@ -1088,8 +1049,8 @@ int bch2_fs_journal_init(struct journal *j)
lockdep_init_map(&j->res_map, "journal res", &res_key, 0); lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
j->buf[0].size = JOURNAL_ENTRY_SIZE_MIN; j->buf[0].buf_size = JOURNAL_ENTRY_SIZE_MIN;
j->buf[1].size = JOURNAL_ENTRY_SIZE_MIN; j->buf[1].buf_size = JOURNAL_ENTRY_SIZE_MIN;
j->write_delay_ms = 1000; j->write_delay_ms = 1000;
j->reclaim_delay_ms = 100; j->reclaim_delay_ms = 100;
...@@ -1102,8 +1063,8 @@ int bch2_fs_journal_init(struct journal *j) ...@@ -1102,8 +1063,8 @@ int bch2_fs_journal_init(struct journal *j)
{ .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v); { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) || if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
!(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) || !(j->buf[0].data = kvpmalloc(j->buf[0].buf_size, GFP_KERNEL)) ||
!(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL))) { !(j->buf[1].data = kvpmalloc(j->buf[1].buf_size, GFP_KERNEL))) {
ret = -ENOMEM; ret = -ENOMEM;
goto out; goto out;
} }
......
...@@ -179,6 +179,11 @@ static inline unsigned jset_u64s(unsigned u64s) ...@@ -179,6 +179,11 @@ static inline unsigned jset_u64s(unsigned u64s)
return u64s + sizeof(struct jset_entry) / sizeof(u64); return u64s + sizeof(struct jset_entry) / sizeof(u64);
} }
static inline int journal_entry_overhead(struct journal *j)
{
return sizeof(struct jset) / sizeof(u64) + j->entry_u64s_reserved;
}
static inline struct jset_entry * static inline struct jset_entry *
bch2_journal_add_entry_noreservation(struct journal_buf *buf, size_t u64s) bch2_journal_add_entry_noreservation(struct journal_buf *buf, size_t u64s)
{ {
...@@ -225,7 +230,7 @@ static inline void bch2_journal_add_keys(struct journal *j, struct journal_res * ...@@ -225,7 +230,7 @@ static inline void bch2_journal_add_keys(struct journal *j, struct journal_res *
id, 0, k, k->k.u64s); id, 0, k, k->k.u64s);
} }
void bch2_journal_buf_put_slowpath(struct journal *, bool); void __bch2_journal_buf_put(struct journal *, bool);
static inline void bch2_journal_buf_put(struct journal *j, unsigned idx, static inline void bch2_journal_buf_put(struct journal *j, unsigned idx,
bool need_write_just_set) bool need_write_just_set)
...@@ -236,17 +241,10 @@ static inline void bch2_journal_buf_put(struct journal *j, unsigned idx, ...@@ -236,17 +241,10 @@ static inline void bch2_journal_buf_put(struct journal *j, unsigned idx,
.buf0_count = idx == 0, .buf0_count = idx == 0,
.buf1_count = idx == 1, .buf1_count = idx == 1,
}).v, &j->reservations.counter); }).v, &j->reservations.counter);
if (!journal_state_count(s, idx)) {
EBUG_ON(s.idx != idx && !s.prev_buf_unwritten); EBUG_ON(s.idx == idx || !s.prev_buf_unwritten);
__bch2_journal_buf_put(j, need_write_just_set);
/* }
* Do not initiate a journal write if the journal is in an error state
* (previous journal entry write may have failed)
*/
if (s.idx != idx &&
!journal_state_count(s, idx) &&
s.cur_entry_offset != JOURNAL_ENTRY_ERROR_VAL)
bch2_journal_buf_put_slowpath(j, need_write_just_set);
} }
/* /*
...@@ -333,6 +331,8 @@ static inline int bch2_journal_res_get(struct journal *j, struct journal_res *re ...@@ -333,6 +331,8 @@ static inline int bch2_journal_res_get(struct journal *j, struct journal_res *re
return 0; return 0;
} }
/* journal_entry_res: */
void bch2_journal_entry_res_resize(struct journal *, void bch2_journal_entry_res_resize(struct journal *,
struct journal_entry_res *, struct journal_entry_res *,
unsigned); unsigned);
......
...@@ -902,13 +902,16 @@ static unsigned journal_dev_buckets_available(struct journal *j, ...@@ -902,13 +902,16 @@ static unsigned journal_dev_buckets_available(struct journal *j,
return available; return available;
} }
/* returns number of sectors available for next journal entry: */ int bch2_journal_space_available(struct journal *j)
int bch2_journal_entry_sectors(struct journal *j)
{ {
struct bch_fs *c = container_of(j, struct bch_fs, journal); struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct bch_dev *ca; struct bch_dev *ca;
unsigned sectors_available = UINT_MAX; unsigned sectors_next_entry = UINT_MAX;
unsigned i, nr_online = 0, nr_devs = 0; unsigned i, nr_online = 0, nr_devs = 0;
unsigned unwritten_sectors = j->reservations.prev_buf_unwritten
? journal_prev_buf(j)->sectors
: 0;
int ret = 0;
lockdep_assert_held(&j->lock); lockdep_assert_held(&j->lock);
...@@ -921,16 +924,16 @@ int bch2_journal_entry_sectors(struct journal *j) ...@@ -921,16 +924,16 @@ int bch2_journal_entry_sectors(struct journal *j)
if (!ja->nr) if (!ja->nr)
continue; continue;
nr_online++;
buckets_this_device = journal_dev_buckets_available(j, ja); buckets_this_device = journal_dev_buckets_available(j, ja);
sectors_this_device = ja->sectors_free; sectors_this_device = ja->sectors_free;
nr_online++;
/* /*
* We that we don't allocate the space for a journal entry * We that we don't allocate the space for a journal entry
* until we write it out - thus, account for it here: * until we write it out - thus, account for it here:
*/ */
if (j->prev_buf_sectors >= sectors_this_device) { if (unwritten_sectors >= sectors_this_device) {
if (!buckets_this_device) if (!buckets_this_device)
continue; continue;
...@@ -938,7 +941,7 @@ int bch2_journal_entry_sectors(struct journal *j) ...@@ -938,7 +941,7 @@ int bch2_journal_entry_sectors(struct journal *j)
sectors_this_device = ca->mi.bucket_size; sectors_this_device = ca->mi.bucket_size;
} }
sectors_this_device -= j->prev_buf_sectors; sectors_this_device -= unwritten_sectors;
if (buckets_this_device) if (buckets_this_device)
sectors_this_device = ca->mi.bucket_size; sectors_this_device = ca->mi.bucket_size;
...@@ -946,19 +949,26 @@ int bch2_journal_entry_sectors(struct journal *j) ...@@ -946,19 +949,26 @@ int bch2_journal_entry_sectors(struct journal *j)
if (!sectors_this_device) if (!sectors_this_device)
continue; continue;
sectors_available = min(sectors_available, sectors_next_entry = min(sectors_next_entry,
sectors_this_device); sectors_this_device);
nr_devs++; nr_devs++;
} }
rcu_read_unlock(); rcu_read_unlock();
if (nr_online < c->opts.metadata_replicas_required) if (nr_online < c->opts.metadata_replicas_required) {
return -EROFS; ret = -EROFS;
sectors_next_entry = 0;
} else if (!sectors_next_entry ||
nr_devs < min_t(unsigned, nr_online,
c->opts.metadata_replicas)) {
ret = -ENOSPC;
sectors_next_entry = 0;
}
if (nr_devs < min_t(unsigned, nr_online, c->opts.metadata_replicas)) WRITE_ONCE(j->cur_entry_sectors, sectors_next_entry);
return 0;
return sectors_available; return ret;
} }
static void __journal_write_alloc(struct journal *j, static void __journal_write_alloc(struct journal *j,
...@@ -1059,9 +1069,6 @@ static int journal_write_alloc(struct journal *j, struct journal_buf *w, ...@@ -1059,9 +1069,6 @@ static int journal_write_alloc(struct journal *j, struct journal_buf *w,
__journal_write_alloc(j, w, &devs_sorted, __journal_write_alloc(j, w, &devs_sorted,
sectors, &replicas, replicas_want); sectors, &replicas, replicas_want);
done: done:
if (replicas >= replicas_want)
j->prev_buf_sectors = 0;
spin_unlock(&j->lock); spin_unlock(&j->lock);
rcu_read_unlock(); rcu_read_unlock();
...@@ -1117,17 +1124,17 @@ static void journal_buf_realloc(struct journal *j, struct journal_buf *buf) ...@@ -1117,17 +1124,17 @@ static void journal_buf_realloc(struct journal *j, struct journal_buf *buf)
unsigned new_size = READ_ONCE(j->buf_size_want); unsigned new_size = READ_ONCE(j->buf_size_want);
void *new_buf; void *new_buf;
if (buf->size >= new_size) if (buf->buf_size >= new_size)
return; return;
new_buf = kvpmalloc(new_size, GFP_NOIO|__GFP_NOWARN); new_buf = kvpmalloc(new_size, GFP_NOIO|__GFP_NOWARN);
if (!new_buf) if (!new_buf)
return; return;
memcpy(new_buf, buf->data, buf->size); memcpy(new_buf, buf->data, buf->buf_size);
kvpfree(buf->data, buf->size); kvpfree(buf->data, buf->buf_size);
buf->data = new_buf; buf->data = new_buf;
buf->size = new_size; buf->buf_size = new_size;
} }
static void journal_write_done(struct closure *cl) static void journal_write_done(struct closure *cl)
...@@ -1227,15 +1234,14 @@ void bch2_journal_write(struct closure *cl) ...@@ -1227,15 +1234,14 @@ void bch2_journal_write(struct closure *cl)
j->write_start_time = local_clock(); j->write_start_time = local_clock();
start = vstruct_last(w->data); start = vstruct_last(jset);
end = bch2_journal_super_entries_add_common(c, start, end = bch2_journal_super_entries_add_common(c, start,
le64_to_cpu(jset->seq)); le64_to_cpu(jset->seq));
u64s = (u64 *) end - (u64 *) start; u64s = (u64 *) end - (u64 *) start;
BUG_ON(u64s > j->entry_u64s_reserved); BUG_ON(u64s > j->entry_u64s_reserved);
le32_add_cpu(&w->data->u64s, u64s); le32_add_cpu(&jset->u64s, u64s);
BUG_ON(vstruct_sectors(jset, c->block_bits) > BUG_ON(vstruct_sectors(jset, c->block_bits) > w->sectors);
w->disk_sectors);
journal_write_compact(jset); journal_write_compact(jset);
...@@ -1273,10 +1279,10 @@ void bch2_journal_write(struct closure *cl) ...@@ -1273,10 +1279,10 @@ void bch2_journal_write(struct closure *cl)
goto err; goto err;
sectors = vstruct_sectors(jset, c->block_bits); sectors = vstruct_sectors(jset, c->block_bits);
BUG_ON(sectors > j->prev_buf_sectors); BUG_ON(sectors > w->sectors);
bytes = vstruct_bytes(w->data); bytes = vstruct_bytes(jset);
memset((void *) w->data + bytes, 0, (sectors << 9) - bytes); memset((void *) jset + bytes, 0, (sectors << 9) - bytes);
if (journal_write_alloc(j, w, sectors)) { if (journal_write_alloc(j, w, sectors)) {
bch2_journal_halt(j); bch2_journal_halt(j);
...@@ -1286,6 +1292,12 @@ void bch2_journal_write(struct closure *cl) ...@@ -1286,6 +1292,12 @@ void bch2_journal_write(struct closure *cl)
return; return;
} }
/*
* write is allocated, no longer need to account for it in
* bch2_journal_entry_sectors:
*/
w->sectors = 0;
/* /*
* XXX: we really should just disable the entire journal in nochanges * XXX: we really should just disable the entire journal in nochanges
* mode * mode
...@@ -1316,7 +1328,7 @@ void bch2_journal_write(struct closure *cl) ...@@ -1316,7 +1328,7 @@ void bch2_journal_write(struct closure *cl)
trace_journal_write(bio); trace_journal_write(bio);
closure_bio_submit(bio, cl); closure_bio_submit(bio, cl);
ca->journal.bucket_seq[ca->journal.cur_idx] = le64_to_cpu(w->data->seq); ca->journal.bucket_seq[ca->journal.cur_idx] = le64_to_cpu(jset->seq);
} }
for_each_rw_member(ca, c, i) for_each_rw_member(ca, c, i)
......
...@@ -40,7 +40,7 @@ int bch2_journal_read(struct bch_fs *, struct list_head *); ...@@ -40,7 +40,7 @@ int bch2_journal_read(struct bch_fs *, struct list_head *);
void bch2_journal_entries_free(struct list_head *); void bch2_journal_entries_free(struct list_head *);
int bch2_journal_replay(struct bch_fs *, struct list_head *); int bch2_journal_replay(struct bch_fs *, struct list_head *);
int bch2_journal_entry_sectors(struct journal *); int bch2_journal_space_available(struct journal *);
void bch2_journal_write(struct closure *); void bch2_journal_write(struct closure *);
#endif /* _BCACHEFS_JOURNAL_IO_H */ #endif /* _BCACHEFS_JOURNAL_IO_H */
...@@ -22,8 +22,10 @@ struct journal_buf { ...@@ -22,8 +22,10 @@ struct journal_buf {
struct closure_waitlist wait; struct closure_waitlist wait;
unsigned size; unsigned buf_size; /* size in bytes of @data */
unsigned disk_sectors; unsigned sectors; /* maximum size for current entry */
unsigned disk_sectors; /* maximum size entry could have been, if
buf_size was bigger */
unsigned u64s_reserved; unsigned u64s_reserved;
/* bloom filter: */ /* bloom filter: */
unsigned long has_inode[1024 / sizeof(unsigned long)]; unsigned long has_inode[1024 / sizeof(unsigned long)];
...@@ -129,9 +131,14 @@ struct journal { ...@@ -129,9 +131,14 @@ struct journal {
unsigned long flags; unsigned long flags;
union journal_res_state reservations; union journal_res_state reservations;
/* Max size of current journal entry */
unsigned cur_entry_u64s; unsigned cur_entry_u64s;
unsigned prev_buf_sectors; unsigned cur_entry_sectors;
unsigned cur_buf_sectors;
/* Reserved space in journal entry to be used just prior to write */
unsigned entry_u64s_reserved;
unsigned buf_size_want; unsigned buf_size_want;
/* /*
...@@ -159,9 +166,6 @@ struct journal { ...@@ -159,9 +166,6 @@ struct journal {
u64 seq_ondisk; u64 seq_ondisk;
u64 last_seq_ondisk; u64 last_seq_ondisk;
/* Reserved space in journal entry to be used just prior to write */
unsigned entry_u64s_reserved;
/* /*
* FIFO of journal entries whose btree updates have not yet been * FIFO of journal entries whose btree updates have not yet been
* written out. * written out.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment