Commit 52edc374 authored by Marko Mäkelä's avatar Marko Mäkelä

Merge 10.4 into 10.5

parents 50a11f39 70d4500c
......@@ -483,6 +483,8 @@ buf_dblwr_init_or_load_pages(
void
buf_dblwr_process()
{
ut_ad(recv_sys.parse_start_lsn);
ulint page_no_dblwr = 0;
byte* read_buf;
recv_dblwr_t& recv_dblwr = recv_sys.dblwr;
......@@ -492,17 +494,43 @@ buf_dblwr_process()
}
read_buf = static_cast<byte*>(
aligned_malloc(2 * srv_page_size, srv_page_size));
aligned_malloc(3 * srv_page_size, srv_page_size));
byte* const buf = read_buf + srv_page_size;
for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin();
i != recv_dblwr.pages.end();
++i, ++page_no_dblwr) {
byte* page = *i;
ulint space_id = page_get_space_id(page);
fil_space_t* space = fil_space_get(space_id);
const ulint page_no = page_get_page_no(page);
if (!page_no) {
/* page 0 should have been recovered
already via Datafile::restore_from_doublewrite() */
continue;
}
const ulint space_id = page_get_space_id(page);
const lsn_t lsn = mach_read_from_8(page + FIL_PAGE_LSN);
if (recv_sys.parse_start_lsn > lsn) {
/* Pages written before the checkpoint are
not useful for recovery. */
continue;
}
if (space == NULL) {
const page_id_t page_id(space_id, page_no);
if (recv_sys.scanned_lsn < lsn) {
ib::warn() << "Ignoring a doublewrite copy of page "
<< page_id
<< " with future log sequence number "
<< lsn;
continue;
}
fil_space_t* space = fil_space_acquire_for_io(space_id);
if (!space) {
/* Maybe we have dropped the tablespace
and this page once belonged to it: do nothing */
continue;
......@@ -510,9 +538,6 @@ buf_dblwr_process()
fil_space_open_if_needed(space);
const ulint page_no = page_get_page_no(page);
const page_id_t page_id(space_id, page_no);
if (UNIV_UNLIKELY(page_no >= space->size)) {
/* Do not report the warning for undo
......@@ -525,6 +550,8 @@ buf_dblwr_process()
<< space->name
<< " (" << space->size << " pages)";
}
next_page:
space->release_for_io();
continue;
}
......@@ -557,92 +584,32 @@ buf_dblwr_process()
fio.node->space->release_for_io();
}
const bool is_all_zero = buf_is_zeroes(
span<const byte>(read_buf, physical_size));
const bool expect_encrypted = space->crypt_data
&& space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
bool is_corrupted = false;
if (is_all_zero) {
if (buf_is_zeroes(span<const byte>(read_buf, physical_size))) {
/* We will check if the copy in the
doublewrite buffer is valid. If not, we will
ignore this page (there should be redo log
records to initialize it). */
} else if (recv_dblwr.validate_page(
page_id, read_buf, space, buf)) {
goto next_page;
} else {
/* Decompress the page before
validating the checksum. */
ulint decomp = fil_page_decompress(buf, read_buf,
space->flags);
if (!decomp || (zip_size && decomp != srv_page_size)) {
goto bad;
}
if (expect_encrypted
&& buf_page_get_key_version(read_buf, space->flags)) {
is_corrupted = !buf_page_verify_crypt_checksum(
read_buf, space->flags);
} else {
is_corrupted = buf_page_is_corrupted(
true, read_buf, space->flags);
}
if (!is_corrupted) {
/* The page is good; there is no need
to consult the doublewrite buffer. */
continue;
}
bad:
/* We intentionally skip this message for
is_all_zero pages. */
all-zero pages. */
ib::info()
<< "Trying to recover page " << page_id
<< " from the doublewrite buffer.";
}
ulint decomp = fil_page_decompress(buf, page, space->flags);
if (!decomp || (zip_size && decomp != srv_page_size)) {
continue;
}
if (expect_encrypted
&& buf_page_get_key_version(read_buf, space->flags)) {
is_corrupted = !buf_page_verify_crypt_checksum(
page, space->flags);
} else {
is_corrupted = buf_page_is_corrupted(
true, page, space->flags);
}
if (is_corrupted) {
/* Theoretically we could have another good
copy for this page in the doublewrite
buffer. If not, we will report a fatal error
for a corrupted page somewhere else if that
page was truly needed. */
continue;
}
page = recv_dblwr.find_page(page_id, space, buf);
if (page_no == 0) {
/* Check the FSP_SPACE_FLAGS. */
ulint flags = fsp_header_get_flags(page);
if (!fil_space_t::is_valid_flags(flags, space_id)
&& fsp_flags_convert_from_101(flags)
== ULINT_UNDEFINED) {
ib::warn() << "Ignoring a doublewrite copy"
" of page " << page_id
<< " due to invalid flags "
<< ib::hex(flags);
continue;
}
/* The flags on the page should be converted later. */
if (!page) {
goto next_page;
}
/* Write the good page from the doublewrite buffer to
the intended position. */
fio = fil_io(IORequestWrite, true, page_id, zip_size,
0, physical_size, const_cast<byte*>(page),
nullptr);
0, physical_size, page, nullptr);
if (fio.node) {
ut_ad(fio.err == DB_SUCCESS);
......@@ -651,6 +618,8 @@ buf_dblwr_process()
<< "' from the doublewrite buffer.";
fio.node->space->release_for_io();
}
goto next_page;
}
recv_dblwr.pages.clear();
......
......@@ -768,10 +768,10 @@ Datafile::restore_from_doublewrite()
}
/* Find if double write buffer contains page_no of given space id. */
const byte* page = recv_sys.dblwr.find_page(m_space_id, 0);
const page_id_t page_id(m_space_id, 0);
const byte* page = recv_sys.dblwr.find_page(page_id);
if (page == NULL) {
if (!page) {
/* If the first page of the given user tablespace is not there
in the doublewrite buffer, then the recovery is going to fail
now. Hence this is treated as an error. */
......@@ -788,15 +788,10 @@ Datafile::restore_from_doublewrite()
FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + page);
if (!fil_space_t::is_valid_flags(flags, m_space_id)) {
ulint cflags = fsp_flags_convert_from_101(flags);
if (cflags == ULINT_UNDEFINED) {
ib::warn()
<< "Ignoring a doublewrite copy of page "
<< page_id
<< " due to invalid flags " << ib::hex(flags);
return(true);
}
flags = cflags;
flags = fsp_flags_convert_from_101(flags);
/* recv_dblwr_t::validate_page() inside find_page()
checked this already. */
ut_ad(flags != ULINT_UNDEFINED);
/* The flags on the page should be converted later. */
}
......
......@@ -116,18 +116,30 @@ struct log_rec_t
const lsn_t lsn;
};
struct recv_dblwr_t {
struct recv_dblwr_t
{
/** Add a page frame to the doublewrite recovery buffer. */
void add(byte* page) {
pages.push_front(page);
}
void add(byte *page) { pages.push_front(page); }
/** Validate the page.
@param page_id page identifier
@param page page contents
@param space the tablespace of the page (not available for page 0)
@param tmp_buf 2*srv_page_size for decrypting and decompressing any
page_compressed or encrypted pages
@return whether the page is valid */
bool validate_page(const page_id_t page_id, const byte *page,
const fil_space_t *space, byte *tmp_buf);
/** Find a doublewrite copy of a page.
@param[in] space_id tablespace identifier
@param[in] page_no page number
@param page_id page identifier
@param space tablespace (not available for page_id.page_no()==0)
@param tmp_buf 2*srv_page_size for decrypting and decompressing any
page_compressed or encrypted pages
@return page frame
@retval NULL if no page was found */
const byte* find_page(ulint space_id, ulint page_no);
@retval NULL if no valid page for page_id was found */
byte* find_page(const page_id_t page_id, const fil_space_t *space= NULL,
byte *tmp_buf= NULL);
typedef std::deque<byte*, ut_allocator<byte*> > list;
......
......@@ -54,6 +54,7 @@ Created 9/20/1997 Heikki Tuuri
#include "buf0rea.h"
#include "srv0srv.h"
#include "srv0start.h"
#include "fil0pagecompress.h"
/** Read-ahead area in applying log records to file pages */
#define RECV_READ_AHEAD_AREA 32U
......@@ -3542,6 +3543,8 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
rescan = true;
}
recv_sys.parse_start_lsn = checkpoint_lsn;
if (srv_operation == SRV_OPERATION_NORMAL) {
buf_dblwr_process();
}
......@@ -3671,25 +3674,92 @@ recv_recovery_from_checkpoint_finish(void)
ut_d(sync_check_enable());
}
/** Find a doublewrite copy of a page.
@param[in] space_id tablespace identifier
@param[in] page_no page number
@return page frame
@retval NULL if no page was found */
const byte*
recv_dblwr_t::find_page(ulint space_id, ulint page_no)
bool recv_dblwr_t::validate_page(const page_id_t page_id,
const byte *page,
const fil_space_t *space,
byte *tmp_buf)
{
const byte *result= NULL;
if (page_id.page_no() == 0)
{
ulint flags= fsp_header_get_flags(page);
if (!fil_space_t::is_valid_flags(flags, page_id.space()))
{
ulint cflags= fsp_flags_convert_from_101(flags);
if (cflags == ULINT_UNDEFINED)
{
ib::warn() << "Ignoring a doublewrite copy of page " << page_id
<< "due to invalid flags " << ib::hex(flags);
return false;
}
flags= cflags;
}
/* Page 0 is never page_compressed or encrypted. */
return !buf_page_is_corrupted(true, page, flags);
}
ut_ad(tmp_buf);
byte *tmp_frame= tmp_buf;
byte *tmp_page= tmp_buf + srv_page_size;
const uint16_t page_type= mach_read_from_2(page + FIL_PAGE_TYPE);
const bool expect_encrypted= space->crypt_data &&
space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
if (space->full_crc32())
return !buf_page_is_corrupted(true, page, space->flags);
if (expect_encrypted &&
mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
{
if (!fil_space_verify_crypt_checksum(page, space->zip_size()))
return false;
if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)
return true;
if (space->zip_size())
return false;
memcpy(tmp_page, page, space->physical_size());
if (!fil_space_decrypt(space, tmp_frame, tmp_page))
return false;
}
switch (page_type) {
case FIL_PAGE_PAGE_COMPRESSED:
memcpy(tmp_page, page, space->physical_size());
/* fall through */
case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
if (space->zip_size())
return false; /* ROW_FORMAT=COMPRESSED cannot be page_compressed */
ulint decomp= fil_page_decompress(tmp_frame, tmp_page, space->flags);
if (!decomp)
return false; /* decompression failed */
if (decomp == srv_page_size)
return false; /* the page was not compressed (invalid page type) */
return !buf_page_is_corrupted(true, tmp_page, space->flags);
}
return !buf_page_is_corrupted(true, page, space->flags);
}
byte *recv_dblwr_t::find_page(const page_id_t page_id,
const fil_space_t *space, byte *tmp_buf)
{
byte *result= NULL;
lsn_t max_lsn= 0;
for (const byte *page : pages)
for (byte *page : pages)
{
if (page_get_page_no(page) != page_no ||
page_get_space_id(page) != space_id)
if (page_get_page_no(page) != page_id.page_no() ||
page_get_space_id(page) != page_id.space())
continue;
const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
if (lsn <= max_lsn)
if (lsn <= max_lsn ||
!validate_page(page_id, page, space, tmp_buf))
{
/* Mark processed for subsequent iterations in buf_dblwr_process() */
memset(page + FIL_PAGE_LSN, 0, 8);
continue;
}
max_lsn= lsn;
result= page;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment