Commit 9c859dc9 authored by Kent Overstreet's avatar Kent Overstreet Committed by Kent Overstreet

bcachefs: Assorted journal refactoring

Also improve error reporting - only return an error from
bch2_journal_flush_seq() if we had an error writing that entry (i.e. not
if there was an error with a newer entry).
Signed-off-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
parent 60476b14
......@@ -90,7 +90,7 @@ static enum {
} journal_buf_switch(struct journal *j, bool need_write_just_set)
{
struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct journal_buf *buf;
struct journal_buf *buf = journal_cur_buf(j);
union journal_res_state old, new;
u64 v = atomic64_read(&j->reservations.counter);
......@@ -101,8 +101,11 @@ static enum {
if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
return JOURNAL_ENTRY_CLOSED;
if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) {
/* this entry will never be written: */
closure_wake_up(&buf->wait);
return JOURNAL_ENTRY_ERROR;
}
if (new.prev_buf_unwritten)
return JOURNAL_ENTRY_INUSE;
......@@ -123,7 +126,6 @@ static enum {
clear_bit(JOURNAL_NEED_WRITE, &j->flags);
buf = &j->buf[old.idx];
buf->data->u64s = cpu_to_le32(old.cur_entry_offset);
j->prev_buf_sectors =
......@@ -270,34 +272,42 @@ static int journal_entry_open(struct journal *j)
return 1;
}
/*
* returns true if there's nothing to flush and no journal write still in flight
*/
static bool journal_flush_write(struct journal *j)
static bool __journal_entry_close(struct journal *j)
{
bool ret;
spin_lock(&j->lock);
ret = !j->reservations.prev_buf_unwritten;
bool set_need_write;
if (!journal_entry_is_open(j)) {
spin_unlock(&j->lock);
return ret;
return true;
}
set_bit(JOURNAL_NEED_WRITE, &j->flags);
if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED)
ret = false;
else
set_need_write = !test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags);
if (set_need_write)
j->need_write_time = local_clock();
switch (journal_buf_switch(j, set_need_write)) {
case JOURNAL_ENTRY_INUSE:
spin_unlock(&j->lock);
return ret;
return false;
default:
spin_unlock(&j->lock);
fallthrough;
case JOURNAL_UNLOCKED:
return true;
}
}
static bool journal_entry_close(struct journal *j)
{
spin_lock(&j->lock);
return __journal_entry_close(j);
}
static void journal_write_work(struct work_struct *work)
{
struct journal *j = container_of(work, struct journal, write_work.work);
journal_flush_write(j);
journal_entry_close(j);
}
/*
......@@ -467,6 +477,37 @@ int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *pare
return ret;
}
static int journal_seq_error(struct journal *j, u64 seq)
{
union journal_res_state state = READ_ONCE(j->reservations);
if (seq == journal_cur_seq(j))
return bch2_journal_error(j);
if (seq + 1 == journal_cur_seq(j) &&
!state.prev_buf_unwritten &&
seq > j->seq_ondisk)
return -EIO;
return 0;
}
static inline struct journal_buf *
journal_seq_to_buf(struct journal *j, u64 seq)
{
/* seq should be for a journal entry that has been opened: */
BUG_ON(seq > journal_cur_seq(j));
BUG_ON(seq == journal_cur_seq(j) &&
j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL);
if (seq == journal_cur_seq(j))
return journal_cur_buf(j);
if (seq + 1 == journal_cur_seq(j) &&
j->reservations.prev_buf_unwritten)
return journal_prev_buf(j);
return NULL;
}
/**
* bch2_journal_wait_on_seq - wait for a journal entry to be written
*
......@@ -475,31 +516,22 @@ int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *pare
* can wait for an arbitrary amount of time (up to @j->write_delay_ms, which is
* configurable).
*/
void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
void bch2_journal_wait_on_seq(struct journal *j, u64 seq,
struct closure *parent)
{
spin_lock(&j->lock);
BUG_ON(seq > journal_cur_seq(j));
struct journal_buf *buf;
if (bch2_journal_error(j)) {
spin_unlock(&j->lock);
return;
}
spin_lock(&j->lock);
if (seq == journal_cur_seq(j)) {
if (!closure_wait(&journal_cur_buf(j)->wait, parent))
BUG();
} else if (seq + 1 == journal_cur_seq(j) &&
j->reservations.prev_buf_unwritten) {
if (!closure_wait(&journal_prev_buf(j)->wait, parent))
if ((buf = journal_seq_to_buf(j, seq))) {
if (!closure_wait(&buf->wait, parent))
BUG();
smp_mb();
/* check if raced with write completion (or failure) */
if (!j->reservations.prev_buf_unwritten ||
bch2_journal_error(j))
closure_wake_up(&journal_prev_buf(j)->wait);
if (seq == journal_cur_seq(j)) {
smp_mb();
if (bch2_journal_error(j))
closure_wake_up(&buf->wait);
}
}
spin_unlock(&j->lock);
......@@ -511,108 +543,35 @@ void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent
* like bch2_journal_wait_on_seq, except that it triggers a write immediately if
* necessary
*/
void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent)
void bch2_journal_flush_seq_async(struct journal *j, u64 seq,
struct closure *parent)
{
struct journal_buf *buf;
spin_lock(&j->lock);
BUG_ON(seq > journal_cur_seq(j));
if (bch2_journal_error(j)) {
spin_unlock(&j->lock);
return;
}
if (seq == journal_cur_seq(j)) {
bool set_need_write = false;
buf = journal_cur_buf(j);
if (parent && !closure_wait(&buf->wait, parent))
BUG();
if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
j->need_write_time = local_clock();
set_need_write = true;
}
switch (journal_buf_switch(j, set_need_write)) {
case JOURNAL_ENTRY_ERROR:
if (parent)
closure_wake_up(&buf->wait);
break;
case JOURNAL_ENTRY_CLOSED:
/*
* Journal entry hasn't been opened yet, but caller
* claims it has something
*/
BUG();
case JOURNAL_ENTRY_INUSE:
break;
case JOURNAL_UNLOCKED:
return;
}
} else if (parent &&
seq + 1 == journal_cur_seq(j) &&
j->reservations.prev_buf_unwritten) {
buf = journal_prev_buf(j);
if (parent &&
(buf = journal_seq_to_buf(j, seq)))
if (!closure_wait(&buf->wait, parent))
BUG();
smp_mb();
/* check if raced with write completion (or failure) */
if (!j->reservations.prev_buf_unwritten ||
bch2_journal_error(j))
closure_wake_up(&buf->wait);
}
spin_unlock(&j->lock);
if (seq == journal_cur_seq(j))
__journal_entry_close(j);
else
spin_unlock(&j->lock);
}
static int journal_seq_flushed(struct journal *j, u64 seq)
{
struct journal_buf *buf;
int ret = 1;
int ret;
spin_lock(&j->lock);
BUG_ON(seq > journal_cur_seq(j));
if (seq == journal_cur_seq(j)) {
bool set_need_write = false;
ret = 0;
buf = journal_cur_buf(j);
ret = seq <= j->seq_ondisk ? 1 : journal_seq_error(j, seq);
if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
j->need_write_time = local_clock();
set_need_write = true;
}
switch (journal_buf_switch(j, set_need_write)) {
case JOURNAL_ENTRY_ERROR:
ret = -EIO;
break;
case JOURNAL_ENTRY_CLOSED:
/*
* Journal entry hasn't been opened yet, but caller
* claims it has something
*/
BUG();
case JOURNAL_ENTRY_INUSE:
break;
case JOURNAL_UNLOCKED:
return 0;
}
} else if (seq + 1 == journal_cur_seq(j) &&
j->reservations.prev_buf_unwritten) {
ret = bch2_journal_error(j);
}
spin_unlock(&j->lock);
if (seq == journal_cur_seq(j))
__journal_entry_close(j);
else
spin_unlock(&j->lock);
return ret;
}
......@@ -914,13 +873,16 @@ void bch2_fs_journal_stop(struct journal *j)
{
struct bch_fs *c = container_of(j, struct bch_fs, journal);
wait_event(j->wait, journal_flush_write(j));
wait_event(j->wait, journal_entry_close(j));
/* do we need to write another journal entry? */
if (test_bit(JOURNAL_NOT_EMPTY, &j->flags) ||
c->btree_roots_dirty)
bch2_journal_meta(j);
BUG_ON(journal_entry_is_open(j) ||
j->reservations.prev_buf_unwritten);
BUG_ON(!bch2_journal_error(j) &&
test_bit(JOURNAL_NOT_EMPTY, &j->flags));
......
......@@ -1209,6 +1209,8 @@ static void journal_write_done(struct closure *cl)
u64 seq = le64_to_cpu(w->data->seq);
u64 last_seq = le64_to_cpu(w->data->last_seq);
bch2_time_stats_update(j->write_time, j->write_start_time);
if (!devs.nr) {
bch_err(c, "unable to write journal to sufficient devices");
goto err;
......@@ -1216,11 +1218,11 @@ static void journal_write_done(struct closure *cl)
if (bch2_mark_replicas(c, BCH_DATA_JOURNAL, devs))
goto err;
out:
bch2_time_stats_update(j->write_time, j->write_start_time);
spin_lock(&j->lock);
j->seq_ondisk = seq;
j->last_seq_ondisk = last_seq;
if (seq >= j->pin.front)
journal_seq_pin(j, seq)->devs = devs;
......@@ -1232,7 +1234,7 @@ static void journal_write_done(struct closure *cl)
* bch2_fs_journal_stop():
*/
mod_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
out:
/* also must come before signalling write completion: */
closure_debug_destroy(cl);
......@@ -1250,6 +1252,7 @@ static void journal_write_done(struct closure *cl)
err:
bch2_fatal_error(c);
bch2_journal_halt(j);
spin_lock(&j->lock);
goto out;
}
......
......@@ -151,7 +151,8 @@ struct journal {
/* Sequence number of most recent journal entry (last entry in @pin) */
atomic64_t seq;
/* last_seq from the most recent journal entry written */
/* seq, last_seq from the most recent journal entry successfully written */
u64 seq_ondisk;
u64 last_seq_ondisk;
/*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment