Commit f1a79365 authored by Kent Overstreet's avatar Kent Overstreet Committed by Kent Overstreet

bcachefs: Don't block on journal reservation with btree locks held

Fixes a deadlock between the allocator thread, when it first starts up,
and journal replay
Signed-off-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
parent 129550c4
...@@ -638,12 +638,12 @@ static void btree_update_wait_on_journal(struct closure *cl) ...@@ -638,12 +638,12 @@ static void btree_update_wait_on_journal(struct closure *cl)
int ret; int ret;
ret = bch2_journal_open_seq_async(&c->journal, as->journal_seq, cl); ret = bch2_journal_open_seq_async(&c->journal, as->journal_seq, cl);
if (ret < 0) if (ret == -EAGAIN) {
goto err;
if (!ret) {
continue_at(cl, btree_update_wait_on_journal, system_wq); continue_at(cl, btree_update_wait_on_journal, system_wq);
return; return;
} }
if (ret < 0)
goto err;
bch2_journal_flush_seq_async(&c->journal, as->journal_seq, cl); bch2_journal_flush_seq_async(&c->journal, as->journal_seq, cl);
err: err:
......
...@@ -344,19 +344,35 @@ static inline int do_btree_insert_at(struct btree_insert *trans, ...@@ -344,19 +344,35 @@ static inline int do_btree_insert_at(struct btree_insert *trans,
trans_for_each_entry(trans, i) trans_for_each_entry(trans, i)
BUG_ON(i->iter->uptodate >= BTREE_ITER_NEED_RELOCK); BUG_ON(i->iter->uptodate >= BTREE_ITER_NEED_RELOCK);
memset(&trans->journal_res, 0, sizeof(trans->journal_res));
if (likely(!(trans->flags & BTREE_INSERT_JOURNAL_REPLAY))) {
u64s = 0; u64s = 0;
trans_for_each_entry(trans, i) trans_for_each_entry(trans, i)
u64s += jset_u64s(i->k->k.u64s); u64s += jset_u64s(i->k->k.u64s);
memset(&trans->journal_res, 0, sizeof(trans->journal_res)); while ((ret = bch2_journal_res_get(&c->journal,
&trans->journal_res, u64s,
JOURNAL_RES_GET_NONBLOCK)) == -EAGAIN) {
struct btree_iter *iter = trans->entries[0].iter;
bch2_btree_iter_unlock(iter);
ret = bch2_journal_res_get(&c->journal,
&trans->journal_res, u64s,
JOURNAL_RES_GET_CHECK);
if (ret)
return ret;
if (!bch2_btree_iter_relock(iter)) {
trans_restart(" (iter relock after journal res get blocked)");
return -EINTR;
}
}
ret = !(trans->flags & BTREE_INSERT_JOURNAL_REPLAY)
? bch2_journal_res_get(&c->journal,
&trans->journal_res,
u64s, u64s)
: 0;
if (ret) if (ret)
return ret; return ret;
}
multi_lock_write(c, trans); multi_lock_write(c, trans);
......
...@@ -335,15 +335,14 @@ u64 bch2_inode_journal_seq(struct journal *j, u64 inode) ...@@ -335,15 +335,14 @@ u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
} }
static int __journal_res_get(struct journal *j, struct journal_res *res, static int __journal_res_get(struct journal *j, struct journal_res *res,
unsigned u64s_min, unsigned u64s_max) unsigned flags)
{ {
struct bch_fs *c = container_of(j, struct bch_fs, journal); struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct journal_buf *buf; struct journal_buf *buf;
int ret; int ret;
retry: retry:
ret = journal_res_get_fast(j, res, u64s_min, u64s_max); if (journal_res_get_fast(j, res, flags))
if (ret) return 0;
return ret;
spin_lock(&j->lock); spin_lock(&j->lock);
/* /*
...@@ -351,10 +350,9 @@ static int __journal_res_get(struct journal *j, struct journal_res *res, ...@@ -351,10 +350,9 @@ static int __journal_res_get(struct journal *j, struct journal_res *res,
* that just did journal_entry_open() and call journal_entry_close() * that just did journal_entry_open() and call journal_entry_close()
* unnecessarily * unnecessarily
*/ */
ret = journal_res_get_fast(j, res, u64s_min, u64s_max); if (journal_res_get_fast(j, res, flags)) {
if (ret) {
spin_unlock(&j->lock); spin_unlock(&j->lock);
return 1; return 0;
} }
/* /*
...@@ -377,7 +375,12 @@ static int __journal_res_get(struct journal *j, struct journal_res *res, ...@@ -377,7 +375,12 @@ static int __journal_res_get(struct journal *j, struct journal_res *res,
spin_unlock(&j->lock); spin_unlock(&j->lock);
return -EROFS; return -EROFS;
case JOURNAL_ENTRY_INUSE: case JOURNAL_ENTRY_INUSE:
/* haven't finished writing out the previous one: */ /*
* The current journal entry is still open, but we failed to get
* a journal reservation because there's not enough space in it,
* and we can't close it and start another because we haven't
* finished writing out the previous entry:
*/
spin_unlock(&j->lock); spin_unlock(&j->lock);
trace_journal_entry_full(c); trace_journal_entry_full(c);
goto blocked; goto blocked;
...@@ -408,7 +411,7 @@ static int __journal_res_get(struct journal *j, struct journal_res *res, ...@@ -408,7 +411,7 @@ static int __journal_res_get(struct journal *j, struct journal_res *res,
blocked: blocked:
if (!j->res_get_blocked_start) if (!j->res_get_blocked_start)
j->res_get_blocked_start = local_clock() ?: 1; j->res_get_blocked_start = local_clock() ?: 1;
return 0; return -EAGAIN;
} }
/* /*
...@@ -422,14 +425,14 @@ static int __journal_res_get(struct journal *j, struct journal_res *res, ...@@ -422,14 +425,14 @@ static int __journal_res_get(struct journal *j, struct journal_res *res,
* btree node write locks. * btree node write locks.
*/ */
int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res, int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
unsigned u64s_min, unsigned u64s_max) unsigned flags)
{ {
int ret; int ret;
wait_event(j->wait, wait_event(j->wait,
(ret = __journal_res_get(j, res, u64s_min, (ret = __journal_res_get(j, res, flags)) != -EAGAIN ||
u64s_max))); (flags & JOURNAL_RES_GET_NONBLOCK));
return ret < 0 ? ret : 0; return ret;
} }
u64 bch2_journal_last_unwritten_seq(struct journal *j) u64 bch2_journal_last_unwritten_seq(struct journal *j)
...@@ -453,28 +456,55 @@ u64 bch2_journal_last_unwritten_seq(struct journal *j) ...@@ -453,28 +456,55 @@ u64 bch2_journal_last_unwritten_seq(struct journal *j)
* btree root - every journal entry contains the roots of all the btrees, so it * btree root - every journal entry contains the roots of all the btrees, so it
* doesn't need to bother with getting a journal reservation * doesn't need to bother with getting a journal reservation
*/ */
int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent) int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *cl)
{ {
int ret; struct bch_fs *c = container_of(j, struct bch_fs, journal);
bool need_reclaim = false;
retry:
spin_lock(&j->lock); spin_lock(&j->lock);
BUG_ON(seq > journal_cur_seq(j));
if (seq < journal_cur_seq(j) || if (seq < journal_cur_seq(j) ||
journal_entry_is_open(j)) { journal_entry_is_open(j)) {
spin_unlock(&j->lock); spin_unlock(&j->lock);
return 1; return 0;
}
if (journal_cur_seq(j) < seq) {
switch (journal_buf_switch(j, false)) {
case JOURNAL_ENTRY_ERROR:
spin_unlock(&j->lock);
return -EROFS;
case JOURNAL_ENTRY_INUSE:
/* haven't finished writing out the previous one: */
trace_journal_entry_full(c);
goto blocked;
case JOURNAL_ENTRY_CLOSED:
break;
case JOURNAL_UNLOCKED:
goto retry;
}
}
BUG_ON(journal_cur_seq(j) < seq);
if (!journal_entry_open(j)) {
need_reclaim = true;
goto blocked;
} }
ret = journal_entry_open(j);
if (!ret)
closure_wait(&j->async_wait, parent);
spin_unlock(&j->lock); spin_unlock(&j->lock);
if (!ret) return 0;
bch2_journal_reclaim_work(&j->reclaim_work.work); blocked:
if (!j->res_get_blocked_start)
j->res_get_blocked_start = local_clock() ?: 1;
return ret; closure_wait(&j->async_wait, cl);
spin_unlock(&j->lock);
if (need_reclaim)
bch2_journal_reclaim_work(&j->reclaim_work.work);
return -EAGAIN;
} }
static int journal_seq_error(struct journal *j, u64 seq) static int journal_seq_error(struct journal *j, u64 seq)
...@@ -594,11 +624,10 @@ int bch2_journal_flush_seq(struct journal *j, u64 seq) ...@@ -594,11 +624,10 @@ int bch2_journal_flush_seq(struct journal *j, u64 seq)
void bch2_journal_meta_async(struct journal *j, struct closure *parent) void bch2_journal_meta_async(struct journal *j, struct closure *parent)
{ {
struct journal_res res; struct journal_res res;
unsigned u64s = jset_u64s(0);
memset(&res, 0, sizeof(res)); memset(&res, 0, sizeof(res));
bch2_journal_res_get(j, &res, u64s, u64s); bch2_journal_res_get(j, &res, jset_u64s(0), 0);
bch2_journal_res_put(j, &res); bch2_journal_res_put(j, &res);
bch2_journal_flush_seq_async(j, res.seq, parent); bch2_journal_flush_seq_async(j, res.seq, parent);
...@@ -607,12 +636,11 @@ void bch2_journal_meta_async(struct journal *j, struct closure *parent) ...@@ -607,12 +636,11 @@ void bch2_journal_meta_async(struct journal *j, struct closure *parent)
int bch2_journal_meta(struct journal *j) int bch2_journal_meta(struct journal *j)
{ {
struct journal_res res; struct journal_res res;
unsigned u64s = jset_u64s(0);
int ret; int ret;
memset(&res, 0, sizeof(res)); memset(&res, 0, sizeof(res));
ret = bch2_journal_res_get(j, &res, u64s, u64s); ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0);
if (ret) if (ret)
return ret; return ret;
......
...@@ -272,12 +272,14 @@ static inline void bch2_journal_res_put(struct journal *j, ...@@ -272,12 +272,14 @@ static inline void bch2_journal_res_put(struct journal *j,
} }
int bch2_journal_res_get_slowpath(struct journal *, struct journal_res *, int bch2_journal_res_get_slowpath(struct journal *, struct journal_res *,
unsigned, unsigned); unsigned);
#define JOURNAL_RES_GET_NONBLOCK (1 << 0)
#define JOURNAL_RES_GET_CHECK (1 << 1)
static inline int journal_res_get_fast(struct journal *j, static inline int journal_res_get_fast(struct journal *j,
struct journal_res *res, struct journal_res *res,
unsigned u64s_min, unsigned flags)
unsigned u64s_max)
{ {
union journal_res_state old, new; union journal_res_state old, new;
u64 v = atomic64_read(&j->reservations.counter); u64 v = atomic64_read(&j->reservations.counter);
...@@ -289,42 +291,45 @@ static inline int journal_res_get_fast(struct journal *j, ...@@ -289,42 +291,45 @@ static inline int journal_res_get_fast(struct journal *j,
* Check if there is still room in the current journal * Check if there is still room in the current journal
* entry: * entry:
*/ */
if (old.cur_entry_offset + u64s_min > j->cur_entry_u64s) if (new.cur_entry_offset + res->u64s > j->cur_entry_u64s)
return 0; return 0;
res->offset = old.cur_entry_offset; if (flags & JOURNAL_RES_GET_CHECK)
res->u64s = min(u64s_max, j->cur_entry_u64s - return 1;
old.cur_entry_offset);
journal_state_inc(&new);
new.cur_entry_offset += res->u64s; new.cur_entry_offset += res->u64s;
journal_state_inc(&new);
} while ((v = atomic64_cmpxchg(&j->reservations.counter, } while ((v = atomic64_cmpxchg(&j->reservations.counter,
old.v, new.v)) != old.v); old.v, new.v)) != old.v);
res->ref = true; res->ref = true;
res->idx = new.idx; res->idx = old.idx;
res->seq = le64_to_cpu(j->buf[res->idx].data->seq); res->offset = old.cur_entry_offset;
res->seq = le64_to_cpu(j->buf[old.idx].data->seq);
return 1; return 1;
} }
static inline int bch2_journal_res_get(struct journal *j, struct journal_res *res, static inline int bch2_journal_res_get(struct journal *j, struct journal_res *res,
unsigned u64s_min, unsigned u64s_max) unsigned u64s, unsigned flags)
{ {
int ret; int ret;
EBUG_ON(res->ref); EBUG_ON(res->ref);
EBUG_ON(u64s_max < u64s_min);
EBUG_ON(!test_bit(JOURNAL_STARTED, &j->flags)); EBUG_ON(!test_bit(JOURNAL_STARTED, &j->flags));
if (journal_res_get_fast(j, res, u64s_min, u64s_max)) res->u64s = u64s;
if (journal_res_get_fast(j, res, flags))
goto out; goto out;
ret = bch2_journal_res_get_slowpath(j, res, u64s_min, u64s_max); ret = bch2_journal_res_get_slowpath(j, res, flags);
if (ret) if (ret)
return ret; return ret;
out: out:
if (!(flags & JOURNAL_RES_GET_CHECK)) {
lock_acquire_shared(&j->res_map, 0, 0, NULL, _THIS_IP_); lock_acquire_shared(&j->res_map, 0, 0, NULL, _THIS_IP_);
EBUG_ON(!res->ref); EBUG_ON(!res->ref);
}
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment