Commit 879ba197 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-11799 Doublewrite recovery can corrupt data pages

The purpose of the InnoDB doublewrite buffer is to make InnoDB
tolerant against cases where the server was killed in the middle
of a page write. (In Linux, killing a process may interrupt a
write system call, typically on a 4096-byte boundary.)

There may exist multiple copies of a page number in the doublewrite
buffer. Recovery should choose the latest valid copy of the page.
By design, the FIL_PAGE_LSN must not precede the latest checkpoint LSN
nor be later than the end of the recovered log.

For page_compressed and encrypted pages, we were missing proper
consistency checks. In the 10.4 data set generated for in MDEV-23231,
the data file contained a valid page_compressed page, and an
identical copy of that page was also present in the doublewrite
buffer. But, recovery would incorrectly consider the page invalid
and restore an uncompressed copy of the same page that had been
written before the log checkpoint. (In fact, no redo log was to
be applied to that page.)

buf_dblwr_process(): Validate the FIL_PAGE_LSN in the doublewrite
buffer pages, and always skip page 0, because those pages should
have been recovered by Datafile::restore_from_doublewrite() if
necessary.

Datafile::restore_from_doublewrite(): Choose the latest applicable
page from the doublewrite buffer.

recv_dblwr_t::find_page(): Also validate encrypted or
page_compressed pages.

recv_dblwr_t::validate_page(): New function to validate a page,
either a copy in a data file or in the doublewrite buffer.
Also validate encrypted or page_compressed pages.

This is joint work with Thirunarayanan Balathandayuthapani.
parent f35d1721
......@@ -532,30 +532,54 @@ buf_dblwr_init_or_load_pages(
void
buf_dblwr_process()
{
ut_ad(recv_sys->parse_start_lsn);
ulint page_no_dblwr = 0;
byte* read_buf;
byte* unaligned_read_buf;
recv_dblwr_t& recv_dblwr = recv_sys->dblwr;
if (!buf_dblwr) {
return;
}
unaligned_read_buf = static_cast<byte*>(
ut_malloc_nokey(3 * UNIV_PAGE_SIZE));
read_buf = static_cast<byte*>(
ut_align(unaligned_read_buf, UNIV_PAGE_SIZE));
byte* const buf = read_buf + UNIV_PAGE_SIZE;
aligned_malloc(3 * srv_page_size, srv_page_size));
byte* const buf = read_buf + srv_page_size;
for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin();
i != recv_dblwr.pages.end();
++i, ++page_no_dblwr) {
byte* page = *i;
ulint space_id = page_get_space_id(page);
fil_space_t* space = fil_space_get(space_id);
const ulint page_no = page_get_page_no(page);
if (!page_no) {
/* page 0 should have been recovered
already via Datafile::restore_from_doublewrite() */
continue;
}
const ulint space_id = page_get_space_id(page);
const lsn_t lsn = mach_read_from_8(page + FIL_PAGE_LSN);
if (recv_sys->parse_start_lsn > lsn) {
/* Pages written before the checkpoint are
not useful for recovery. */
continue;
}
const page_id_t page_id(space_id, page_no);
if (recv_sys->scanned_lsn < lsn) {
ib::warn() << "Ignoring a doublewrite copy of page "
<< page_id
<< " with future log sequence number "
<< lsn;
continue;
}
if (space == NULL) {
fil_space_t* space = fil_space_acquire_for_io(space_id);
if (!space) {
/* Maybe we have dropped the tablespace
and this page once belonged to it: do nothing */
continue;
......@@ -563,9 +587,6 @@ buf_dblwr_process()
fil_space_open_if_needed(space);
const ulint page_no = page_get_page_no(page);
const page_id_t page_id(space_id, page_no);
if (UNIV_UNLIKELY(page_no >= space->size)) {
/* Do not report the warning if the tablespace
......@@ -581,6 +602,8 @@ buf_dblwr_process()
<< space->name
<< " (" << space->size << " pages)";
}
next_page:
fil_space_release_for_io(space);
continue;
}
......@@ -609,76 +632,27 @@ buf_dblwr_process()
<< "error: " << err;
}
const bool is_all_zero = buf_is_zeroes(
span<const byte>(read_buf, page_size.physical()));
const bool expect_encrypted = space->crypt_data
&& space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
if (is_all_zero) {
if (buf_is_zeroes(span<const byte>(read_buf,
page_size.physical()))) {
/* We will check if the copy in the
doublewrite buffer is valid. If not, we will
ignore this page (there should be redo log
records to initialize it). */
} else if (recv_dblwr.validate_page(
page_id, read_buf, space, buf)) {
goto next_page;
} else {
/* Decompress the page before
validating the checksum. */
ulint decomp = fil_page_decompress(buf, read_buf);
if (!decomp || (decomp != srv_page_size
&& page_size.is_compressed())) {
goto bad;
}
if (expect_encrypted && mach_read_from_4(
read_buf
+ FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)
? fil_space_verify_crypt_checksum(read_buf,
page_size)
: !buf_page_is_corrupted(true, read_buf,
page_size, space)) {
/* The page is good; there is no need
to consult the doublewrite buffer. */
continue;
}
bad:
/* We intentionally skip this message for
is_all_zero pages. */
all-zero pages. */
ib::info()
<< "Trying to recover page " << page_id
<< " from the doublewrite buffer.";
}
ulint decomp = fil_page_decompress(buf, page);
if (!decomp || (decomp != srv_page_size
&& page_size.is_compressed())) {
continue;
}
page = recv_dblwr.find_page(page_id, space, buf);
if (expect_encrypted && mach_read_from_4(
page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)
? !fil_space_verify_crypt_checksum(page, page_size)
: buf_page_is_corrupted(true, page, page_size, space)) {
/* Theoretically we could have another good
copy for this page in the doublewrite
buffer. If not, we will report a fatal error
for a corrupted page somewhere else if that
page was truly needed. */
continue;
}
if (page_no == 0) {
/* Check the FSP_SPACE_FLAGS. */
ulint flags = fsp_header_get_flags(page);
if (!fsp_flags_is_valid(flags, space_id)
&& fsp_flags_convert_from_101(flags)
== ULINT_UNDEFINED) {
ib::warn() << "Ignoring a doublewrite copy"
" of page " << page_id
<< " due to invalid flags "
<< ib::hex(flags);
continue;
}
/* The flags on the page should be converted later. */
if (!page) {
goto next_page;
}
/* Write the good page from the doublewrite buffer to
......@@ -687,17 +661,18 @@ buf_dblwr_process()
IORequest write_request(IORequest::WRITE);
fil_io(write_request, true, page_id, page_size,
0, page_size.physical(),
const_cast<byte*>(page), NULL);
0, page_size.physical(), page, NULL);
ib::info() << "Recovered page " << page_id
<< " from the doublewrite buffer.";
goto next_page;
}
recv_dblwr.pages.clear();
fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
ut_free(unaligned_read_buf);
aligned_free(read_buf);
}
/****************************************************************//**
......
......@@ -768,10 +768,10 @@ Datafile::restore_from_doublewrite()
}
/* Find if double write buffer contains page_no of given space id. */
const byte* page = recv_sys->dblwr.find_page(m_space_id, 0);
const page_id_t page_id(m_space_id, 0);
const byte* page = recv_sys->dblwr.find_page(page_id);
if (page == NULL) {
if (!page) {
/* If the first page of the given user tablespace is not there
in the doublewrite buffer, then the recovery is going to fail
now. Hence this is treated as an error. */
......@@ -788,15 +788,10 @@ Datafile::restore_from_doublewrite()
FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + page);
if (!fsp_flags_is_valid(flags, m_space_id)) {
ulint cflags = fsp_flags_convert_from_101(flags);
if (cflags == ULINT_UNDEFINED) {
ib::warn()
<< "Ignoring a doublewrite copy of page "
<< page_id
<< " due to invalid flags " << ib::hex(flags);
return(true);
}
flags = cflags;
flags = fsp_flags_convert_from_101(flags);
/* recv_dblwr_t::validate_page() inside find_page()
checked this already. */
ut_ad(flags != ULINT_UNDEFINED);
/* The flags on the page should be converted later. */
}
......
......@@ -195,18 +195,30 @@ struct recv_t{
rec_list;/*!< list of log records for this page */
};
struct recv_dblwr_t {
struct recv_dblwr_t
{
/** Add a page frame to the doublewrite recovery buffer. */
void add(byte* page) {
pages.push_back(page);
}
void add(byte *page) { pages.push_back(page); }
/** Validate the page.
@param page_id page identifier
@param page page contents
@param space the tablespace of the page (not available for page 0)
@param tmp_buf 2*srv_page_size for decrypting and decompressing any
page_compressed or encrypted pages
@return whether the page is valid */
bool validate_page(const page_id_t page_id, const byte *page,
const fil_space_t *space, byte *tmp_buf);
/** Find a doublewrite copy of a page.
@param[in] space_id tablespace identifier
@param[in] page_no page number
@param page_id page identifier
@param space tablespace (not available for page_id.page_no()==0)
@param tmp_buf 2*srv_page_size for decrypting and decompressing any
page_compressed or encrypted pages
@return page frame
@retval NULL if no page was found */
const byte* find_page(ulint space_id, ulint page_no);
@retval NULL if no valid page for page_id was found */
byte* find_page(const page_id_t page_id, const fil_space_t *space= NULL,
byte *tmp_buf= NULL);
typedef std::list<byte*, ut_allocator<byte*> > list;
......
......@@ -57,6 +57,7 @@ Created 9/20/1997 Heikki Tuuri
#include "srv0start.h"
#include "trx0roll.h"
#include "row0merge.h"
#include "fil0pagecompress.h"
/** Log records are stored in the hash table in chunks at most of this size;
this must be less than UNIV_PAGE_SIZE as it is stored in the buffer pool */
......@@ -3910,6 +3911,8 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
rescan = true;
}
recv_sys->parse_start_lsn = checkpoint_lsn;
if (srv_operation == SRV_OPERATION_NORMAL) {
buf_dblwr_process();
}
......@@ -4084,26 +4087,91 @@ recv_recovery_rollback_active(void)
}
}
/** Find a doublewrite copy of a page.
@param[in] space_id tablespace identifier
@param[in] page_no page number
@return page frame
@retval NULL if no page was found */
const byte*
recv_dblwr_t::find_page(ulint space_id, ulint page_no)
bool recv_dblwr_t::validate_page(const page_id_t page_id,
const byte *page,
const fil_space_t *space,
byte *tmp_buf)
{
const byte *result= NULL;
if (page_id.page_no() == 0)
{
ulint flags= fsp_header_get_flags(page);
if (!fsp_flags_is_valid(flags, page_id.space()))
{
ulint cflags= fsp_flags_convert_from_101(flags);
if (cflags == ULINT_UNDEFINED)
{
ib::warn() << "Ignoring a doublewrite copy of page " << page_id
<< "due to invalid flags " << ib::hex(flags);
return false;
}
flags= cflags;
}
/* Page 0 is never page_compressed or encrypted. */
return !buf_page_is_corrupted(true, page, page_size_t(flags));
}
ut_ad(tmp_buf);
byte *tmp_frame= tmp_buf;
byte *tmp_page= tmp_buf + srv_page_size;
const uint16_t page_type= mach_read_from_2(page + FIL_PAGE_TYPE);
const page_size_t page_size(space->flags);
const bool expect_encrypted= space->crypt_data &&
space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
if (expect_encrypted &&
mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
{
if (!fil_space_verify_crypt_checksum(page, page_size))
return false;
if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)
return true;
if (page_size.is_compressed())
return false;
memcpy(tmp_page, page, page_size.physical());
if (!fil_space_decrypt(space, tmp_frame, tmp_page))
return false;
}
switch (page_type) {
case FIL_PAGE_PAGE_COMPRESSED:
memcpy(tmp_page, page, page_size.physical());
/* fall through */
case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
if (page_size.is_compressed())
return false; /* ROW_FORMAT=COMPRESSED cannot be page_compressed */
ulint decomp= fil_page_decompress(tmp_frame, tmp_page);
if (!decomp)
return false; /* decompression failed */
if (decomp == srv_page_size)
return false; /* the page was not compressed (invalid page type) */
return !buf_page_is_corrupted(true, tmp_page, page_size, space);
}
return !buf_page_is_corrupted(true, page, page_size, space);
}
byte *recv_dblwr_t::find_page(const page_id_t page_id,
const fil_space_t *space, byte *tmp_buf)
{
byte *result= NULL;
lsn_t max_lsn= 0;
for (list::const_iterator i = pages.begin(); i != pages.end(); ++i)
{
const byte *page= *i;
if (page_get_page_no(page) != page_no ||
page_get_space_id(page) != space_id)
byte *page= *i;
if (page_get_page_no(page) != page_id.page_no() ||
page_get_space_id(page) != page_id.space())
continue;
const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
if (lsn <= max_lsn)
if (lsn <= max_lsn ||
!validate_page(page_id, page, space, tmp_buf))
{
/* Mark processed for subsequent iterations in buf_dblwr_process() */
memset(page + FIL_PAGE_LSN, 0, 8);
continue;
}
max_lsn= lsn;
result= page;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment