Commit 12818d24 authored by Brian Foster's avatar Brian Foster Committed by Dave Chinner

xfs: rework log recovery to submit buffers on LSN boundaries

The fix to log recovery to update the metadata LSN in recovered buffers
introduces the requirement that a buffer is submitted only once per
current LSN. Log recovery currently submits buffers on transaction
boundaries. This is not sufficient as the abstraction between log
records and transactions allows for various scenarios where multiple
transactions can share the same current LSN. If independent transactions
share an LSN and both modify the same buffer, log recovery can
incorrectly skip updates and leave the filesystem in an inconsisent
state.

In preparation for proper metadata LSN updates during log recovery,
update log recovery to submit buffers for write on LSN change boundaries
rather than transaction boundaries. Explicitly track the current LSN in
a new struct xlog field to handle the various corner cases of when the
current LSN may or may not change.
Signed-off-by: default avatarBrian Foster <bfoster@redhat.com>
Reviewed-by: default avatarDave Chinner <dchinner@redhat.com>
Signed-off-by: default avatarDave Chinner <david@fromorbit.com>
parent ddeb14f4
...@@ -413,7 +413,8 @@ struct xlog { ...@@ -413,7 +413,8 @@ struct xlog {
/* log record crc error injection factor */ /* log record crc error injection factor */
uint32_t l_badcrc_factor; uint32_t l_badcrc_factor;
#endif #endif
/* log recovery lsn tracking (for buffer submission */
xfs_lsn_t l_recovery_lsn;
}; };
#define XLOG_BUF_CANCEL_BUCKET(log, blkno) \ #define XLOG_BUF_CANCEL_BUCKET(log, blkno) \
......
...@@ -3846,14 +3846,13 @@ STATIC int ...@@ -3846,14 +3846,13 @@ STATIC int
xlog_recover_commit_trans( xlog_recover_commit_trans(
struct xlog *log, struct xlog *log,
struct xlog_recover *trans, struct xlog_recover *trans,
int pass) int pass,
struct list_head *buffer_list)
{ {
int error = 0; int error = 0;
int error2;
int items_queued = 0; int items_queued = 0;
struct xlog_recover_item *item; struct xlog_recover_item *item;
struct xlog_recover_item *next; struct xlog_recover_item *next;
LIST_HEAD (buffer_list);
LIST_HEAD (ra_list); LIST_HEAD (ra_list);
LIST_HEAD (done_list); LIST_HEAD (done_list);
...@@ -3876,7 +3875,7 @@ xlog_recover_commit_trans( ...@@ -3876,7 +3875,7 @@ xlog_recover_commit_trans(
items_queued++; items_queued++;
if (items_queued >= XLOG_RECOVER_COMMIT_QUEUE_MAX) { if (items_queued >= XLOG_RECOVER_COMMIT_QUEUE_MAX) {
error = xlog_recover_items_pass2(log, trans, error = xlog_recover_items_pass2(log, trans,
&buffer_list, &ra_list); buffer_list, &ra_list);
list_splice_tail_init(&ra_list, &done_list); list_splice_tail_init(&ra_list, &done_list);
items_queued = 0; items_queued = 0;
} }
...@@ -3894,15 +3893,14 @@ xlog_recover_commit_trans( ...@@ -3894,15 +3893,14 @@ xlog_recover_commit_trans(
if (!list_empty(&ra_list)) { if (!list_empty(&ra_list)) {
if (!error) if (!error)
error = xlog_recover_items_pass2(log, trans, error = xlog_recover_items_pass2(log, trans,
&buffer_list, &ra_list); buffer_list, &ra_list);
list_splice_tail_init(&ra_list, &done_list); list_splice_tail_init(&ra_list, &done_list);
} }
if (!list_empty(&done_list)) if (!list_empty(&done_list))
list_splice_init(&done_list, &trans->r_itemq); list_splice_init(&done_list, &trans->r_itemq);
error2 = xfs_buf_delwri_submit(&buffer_list); return error;
return error ? error : error2;
} }
STATIC void STATIC void
...@@ -4085,7 +4083,8 @@ xlog_recovery_process_trans( ...@@ -4085,7 +4083,8 @@ xlog_recovery_process_trans(
char *dp, char *dp,
unsigned int len, unsigned int len,
unsigned int flags, unsigned int flags,
int pass) int pass,
struct list_head *buffer_list)
{ {
int error = 0; int error = 0;
bool freeit = false; bool freeit = false;
...@@ -4109,7 +4108,8 @@ xlog_recovery_process_trans( ...@@ -4109,7 +4108,8 @@ xlog_recovery_process_trans(
error = xlog_recover_add_to_cont_trans(log, trans, dp, len); error = xlog_recover_add_to_cont_trans(log, trans, dp, len);
break; break;
case XLOG_COMMIT_TRANS: case XLOG_COMMIT_TRANS:
error = xlog_recover_commit_trans(log, trans, pass); error = xlog_recover_commit_trans(log, trans, pass,
buffer_list);
/* success or fail, we are now done with this transaction. */ /* success or fail, we are now done with this transaction. */
freeit = true; freeit = true;
break; break;
...@@ -4191,10 +4191,12 @@ xlog_recover_process_ophdr( ...@@ -4191,10 +4191,12 @@ xlog_recover_process_ophdr(
struct xlog_op_header *ohead, struct xlog_op_header *ohead,
char *dp, char *dp,
char *end, char *end,
int pass) int pass,
struct list_head *buffer_list)
{ {
struct xlog_recover *trans; struct xlog_recover *trans;
unsigned int len; unsigned int len;
int error;
/* Do we understand who wrote this op? */ /* Do we understand who wrote this op? */
if (ohead->oh_clientid != XFS_TRANSACTION && if (ohead->oh_clientid != XFS_TRANSACTION &&
...@@ -4221,8 +4223,39 @@ xlog_recover_process_ophdr( ...@@ -4221,8 +4223,39 @@ xlog_recover_process_ophdr(
return 0; return 0;
} }
/*
* The recovered buffer queue is drained only once we know that all
* recovery items for the current LSN have been processed. This is
* required because:
*
* - Buffer write submission updates the metadata LSN of the buffer.
* - Log recovery skips items with a metadata LSN >= the current LSN of
* the recovery item.
* - Separate recovery items against the same metadata buffer can share
* a current LSN. I.e., consider that the LSN of a recovery item is
* defined as the starting LSN of the first record in which its
* transaction appears, that a record can hold multiple transactions,
* and/or that a transaction can span multiple records.
*
* In other words, we are allowed to submit a buffer from log recovery
* once per current LSN. Otherwise, we may incorrectly skip recovery
* items and cause corruption.
*
* We don't know up front whether buffers are updated multiple times per
* LSN. Therefore, track the current LSN of each commit log record as it
* is processed and drain the queue when it changes. Use commit records
* because they are ordered correctly by the logging code.
*/
if (log->l_recovery_lsn != trans->r_lsn &&
ohead->oh_flags & XLOG_COMMIT_TRANS) {
error = xfs_buf_delwri_submit(buffer_list);
if (error)
return error;
log->l_recovery_lsn = trans->r_lsn;
}
return xlog_recovery_process_trans(log, trans, dp, len, return xlog_recovery_process_trans(log, trans, dp, len,
ohead->oh_flags, pass); ohead->oh_flags, pass, buffer_list);
} }
/* /*
...@@ -4240,7 +4273,8 @@ xlog_recover_process_data( ...@@ -4240,7 +4273,8 @@ xlog_recover_process_data(
struct hlist_head rhash[], struct hlist_head rhash[],
struct xlog_rec_header *rhead, struct xlog_rec_header *rhead,
char *dp, char *dp,
int pass) int pass,
struct list_head *buffer_list)
{ {
struct xlog_op_header *ohead; struct xlog_op_header *ohead;
char *end; char *end;
...@@ -4262,7 +4296,7 @@ xlog_recover_process_data( ...@@ -4262,7 +4296,7 @@ xlog_recover_process_data(
/* errors will abort recovery */ /* errors will abort recovery */
error = xlog_recover_process_ophdr(log, rhash, rhead, ohead, error = xlog_recover_process_ophdr(log, rhash, rhead, ohead,
dp, end, pass); dp, end, pass, buffer_list);
if (error) if (error)
return error; return error;
...@@ -4685,7 +4719,8 @@ xlog_recover_process( ...@@ -4685,7 +4719,8 @@ xlog_recover_process(
struct hlist_head rhash[], struct hlist_head rhash[],
struct xlog_rec_header *rhead, struct xlog_rec_header *rhead,
char *dp, char *dp,
int pass) int pass,
struct list_head *buffer_list)
{ {
int error; int error;
__le32 crc; __le32 crc;
...@@ -4732,7 +4767,8 @@ xlog_recover_process( ...@@ -4732,7 +4767,8 @@ xlog_recover_process(
if (error) if (error)
return error; return error;
return xlog_recover_process_data(log, rhash, rhead, dp, pass); return xlog_recover_process_data(log, rhash, rhead, dp, pass,
buffer_list);
} }
STATIC int STATIC int
...@@ -4793,9 +4829,11 @@ xlog_do_recovery_pass( ...@@ -4793,9 +4829,11 @@ xlog_do_recovery_pass(
char *offset; char *offset;
xfs_buf_t *hbp, *dbp; xfs_buf_t *hbp, *dbp;
int error = 0, h_size, h_len; int error = 0, h_size, h_len;
int error2 = 0;
int bblks, split_bblks; int bblks, split_bblks;
int hblks, split_hblks, wrapped_hblks; int hblks, split_hblks, wrapped_hblks;
struct hlist_head rhash[XLOG_RHASH_SIZE]; struct hlist_head rhash[XLOG_RHASH_SIZE];
LIST_HEAD (buffer_list);
ASSERT(head_blk != tail_blk); ASSERT(head_blk != tail_blk);
rhead_blk = 0; rhead_blk = 0;
...@@ -4981,7 +5019,7 @@ xlog_do_recovery_pass( ...@@ -4981,7 +5019,7 @@ xlog_do_recovery_pass(
} }
error = xlog_recover_process(log, rhash, rhead, offset, error = xlog_recover_process(log, rhash, rhead, offset,
pass); pass, &buffer_list);
if (error) if (error)
goto bread_err2; goto bread_err2;
...@@ -5012,7 +5050,8 @@ xlog_do_recovery_pass( ...@@ -5012,7 +5050,8 @@ xlog_do_recovery_pass(
if (error) if (error)
goto bread_err2; goto bread_err2;
error = xlog_recover_process(log, rhash, rhead, offset, pass); error = xlog_recover_process(log, rhash, rhead, offset, pass,
&buffer_list);
if (error) if (error)
goto bread_err2; goto bread_err2;
...@@ -5025,10 +5064,17 @@ xlog_do_recovery_pass( ...@@ -5025,10 +5064,17 @@ xlog_do_recovery_pass(
bread_err1: bread_err1:
xlog_put_bp(hbp); xlog_put_bp(hbp);
/*
* Submit buffers that have been added from the last record processed,
* regardless of error status.
*/
if (!list_empty(&buffer_list))
error2 = xfs_buf_delwri_submit(&buffer_list);
if (error && first_bad) if (error && first_bad)
*first_bad = rhead_blk; *first_bad = rhead_blk;
return error; return error ? error : error2;
} }
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment