Commit 70d4500c authored by Marko Mäkelä's avatar Marko Mäkelä

Merge 10.3 into 10.4

parents 9216114c 8bb2170d
...@@ -530,30 +530,54 @@ buf_dblwr_init_or_load_pages( ...@@ -530,30 +530,54 @@ buf_dblwr_init_or_load_pages(
void void
buf_dblwr_process() buf_dblwr_process()
{ {
ut_ad(recv_sys.parse_start_lsn);
ulint page_no_dblwr = 0; ulint page_no_dblwr = 0;
byte* read_buf; byte* read_buf;
byte* unaligned_read_buf;
recv_dblwr_t& recv_dblwr = recv_sys.dblwr; recv_dblwr_t& recv_dblwr = recv_sys.dblwr;
if (!buf_dblwr) { if (!buf_dblwr) {
return; return;
} }
unaligned_read_buf = static_cast<byte*>(
ut_malloc_nokey(3U << srv_page_size_shift));
read_buf = static_cast<byte*>( read_buf = static_cast<byte*>(
ut_align(unaligned_read_buf, srv_page_size)); aligned_malloc(3 * srv_page_size, srv_page_size));
byte* const buf = read_buf + srv_page_size; byte* const buf = read_buf + srv_page_size;
for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin(); for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin();
i != recv_dblwr.pages.end(); i != recv_dblwr.pages.end();
++i, ++page_no_dblwr) { ++i, ++page_no_dblwr) {
byte* page = *i; byte* page = *i;
ulint space_id = page_get_space_id(page); const ulint page_no = page_get_page_no(page);
fil_space_t* space = fil_space_get(space_id);
if (!page_no) {
/* page 0 should have been recovered
already via Datafile::restore_from_doublewrite() */
continue;
}
const ulint space_id = page_get_space_id(page);
const lsn_t lsn = mach_read_from_8(page + FIL_PAGE_LSN);
if (space == NULL) { if (recv_sys.parse_start_lsn > lsn) {
/* Pages written before the checkpoint are
not useful for recovery. */
continue;
}
const page_id_t page_id(space_id, page_no);
if (recv_sys.scanned_lsn < lsn) {
ib::warn() << "Ignoring a doublewrite copy of page "
<< page_id
<< " with future log sequence number "
<< lsn;
continue;
}
fil_space_t* space = fil_space_acquire_for_io(space_id);
if (!space) {
/* Maybe we have dropped the tablespace /* Maybe we have dropped the tablespace
and this page once belonged to it: do nothing */ and this page once belonged to it: do nothing */
continue; continue;
...@@ -561,9 +585,6 @@ buf_dblwr_process() ...@@ -561,9 +585,6 @@ buf_dblwr_process()
fil_space_open_if_needed(space); fil_space_open_if_needed(space);
const ulint page_no = page_get_page_no(page);
const page_id_t page_id(space_id, page_no);
if (UNIV_UNLIKELY(page_no >= space->size)) { if (UNIV_UNLIKELY(page_no >= space->size)) {
/* Do not report the warning for undo /* Do not report the warning for undo
...@@ -576,6 +597,8 @@ buf_dblwr_process() ...@@ -576,6 +597,8 @@ buf_dblwr_process()
<< space->name << space->name
<< " (" << space->size << " pages)"; << " (" << space->size << " pages)";
} }
next_page:
space->release_for_io();
continue; continue;
} }
...@@ -604,85 +627,26 @@ buf_dblwr_process() ...@@ -604,85 +627,26 @@ buf_dblwr_process()
<< "error: " << err; << "error: " << err;
} }
const bool is_all_zero = buf_is_zeroes( if (buf_is_zeroes(span<const byte>(read_buf, physical_size))) {
span<const byte>(read_buf, physical_size));
const bool expect_encrypted = space->crypt_data
&& space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
bool is_corrupted = false;
if (is_all_zero) {
/* We will check if the copy in the /* We will check if the copy in the
doublewrite buffer is valid. If not, we will doublewrite buffer is valid. If not, we will
ignore this page (there should be redo log ignore this page (there should be redo log
records to initialize it). */ records to initialize it). */
} else if (recv_dblwr.validate_page(
page_id, read_buf, space, buf)) {
goto next_page;
} else { } else {
/* Decompress the page before
validating the checksum. */
ulint decomp = fil_page_decompress(buf, read_buf,
space->flags);
if (!decomp || (zip_size && decomp != srv_page_size)) {
goto bad;
}
if (expect_encrypted
&& buf_page_get_key_version(read_buf, space->flags)) {
is_corrupted = !buf_page_verify_crypt_checksum(
read_buf, space->flags);
} else {
is_corrupted = buf_page_is_corrupted(
true, read_buf, space->flags);
}
if (!is_corrupted) {
/* The page is good; there is no need
to consult the doublewrite buffer. */
continue;
}
bad:
/* We intentionally skip this message for /* We intentionally skip this message for
is_all_zero pages. */ all-zero pages. */
ib::info() ib::info()
<< "Trying to recover page " << page_id << "Trying to recover page " << page_id
<< " from the doublewrite buffer."; << " from the doublewrite buffer.";
} }
ulint decomp = fil_page_decompress(buf, page, space->flags); page = recv_dblwr.find_page(page_id, space, buf);
if (!decomp || (zip_size && decomp != srv_page_size)) {
continue;
}
if (expect_encrypted
&& buf_page_get_key_version(read_buf, space->flags)) {
is_corrupted = !buf_page_verify_crypt_checksum(
page, space->flags);
} else {
is_corrupted = buf_page_is_corrupted(
true, page, space->flags);
}
if (is_corrupted) { if (!page) {
/* Theoretically we could have another good goto next_page;
copy for this page in the doublewrite
buffer. If not, we will report a fatal error
for a corrupted page somewhere else if that
page was truly needed. */
continue;
}
if (page_no == 0) {
/* Check the FSP_SPACE_FLAGS. */
ulint flags = fsp_header_get_flags(page);
if (!fil_space_t::is_valid_flags(flags, space_id)
&& fsp_flags_convert_from_101(flags)
== ULINT_UNDEFINED) {
ib::warn() << "Ignoring a doublewrite copy"
" of page " << page_id
<< " due to invalid flags "
<< ib::hex(flags);
continue;
}
/* The flags on the page should be converted later. */
} }
/* Write the good page from the doublewrite buffer to /* Write the good page from the doublewrite buffer to
...@@ -691,17 +655,18 @@ buf_dblwr_process() ...@@ -691,17 +655,18 @@ buf_dblwr_process()
IORequest write_request(IORequest::WRITE); IORequest write_request(IORequest::WRITE);
fil_io(write_request, true, page_id, zip_size, fil_io(write_request, true, page_id, zip_size,
0, physical_size, 0, physical_size, page, nullptr);
const_cast<byte*>(page), NULL);
ib::info() << "Recovered page " << page_id ib::info() << "Recovered page " << page_id
<< " from the doublewrite buffer."; << " from the doublewrite buffer.";
goto next_page;
} }
recv_dblwr.pages.clear(); recv_dblwr.pages.clear();
fil_flush_file_spaces(FIL_TYPE_TABLESPACE); fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
ut_free(unaligned_read_buf); aligned_free(read_buf);
} }
/****************************************************************//** /****************************************************************//**
......
...@@ -778,10 +778,10 @@ Datafile::restore_from_doublewrite() ...@@ -778,10 +778,10 @@ Datafile::restore_from_doublewrite()
} }
/* Find if double write buffer contains page_no of given space id. */ /* Find if double write buffer contains page_no of given space id. */
const byte* page = recv_sys.dblwr.find_page(m_space_id, 0);
const page_id_t page_id(m_space_id, 0); const page_id_t page_id(m_space_id, 0);
const byte* page = recv_sys.dblwr.find_page(page_id);
if (page == NULL) { if (!page) {
/* If the first page of the given user tablespace is not there /* If the first page of the given user tablespace is not there
in the doublewrite buffer, then the recovery is going to fail in the doublewrite buffer, then the recovery is going to fail
now. Hence this is treated as an error. */ now. Hence this is treated as an error. */
...@@ -798,15 +798,10 @@ Datafile::restore_from_doublewrite() ...@@ -798,15 +798,10 @@ Datafile::restore_from_doublewrite()
FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + page); FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + page);
if (!fil_space_t::is_valid_flags(flags, m_space_id)) { if (!fil_space_t::is_valid_flags(flags, m_space_id)) {
ulint cflags = fsp_flags_convert_from_101(flags); flags = fsp_flags_convert_from_101(flags);
if (cflags == ULINT_UNDEFINED) { /* recv_dblwr_t::validate_page() inside find_page()
ib::warn() checked this already. */
<< "Ignoring a doublewrite copy of page " ut_ad(flags != ULINT_UNDEFINED);
<< page_id
<< " due to invalid flags " << ib::hex(flags);
return(true);
}
flags = cflags;
/* The flags on the page should be converted later. */ /* The flags on the page should be converted later. */
} }
......
...@@ -165,18 +165,30 @@ struct recv_t{ ...@@ -165,18 +165,30 @@ struct recv_t{
rec_list;/*!< list of log records for this page */ rec_list;/*!< list of log records for this page */
}; };
struct recv_dblwr_t { struct recv_dblwr_t
{
/** Add a page frame to the doublewrite recovery buffer. */ /** Add a page frame to the doublewrite recovery buffer. */
void add(byte* page) { void add(byte *page) { pages.push_front(page); }
pages.push_front(page);
} /** Validate the page.
@param page_id page identifier
@param page page contents
@param space the tablespace of the page (not available for page 0)
@param tmp_buf 2*srv_page_size for decrypting and decompressing any
page_compressed or encrypted pages
@return whether the page is valid */
bool validate_page(const page_id_t page_id, const byte *page,
const fil_space_t *space, byte *tmp_buf);
/** Find a doublewrite copy of a page. /** Find a doublewrite copy of a page.
@param[in] space_id tablespace identifier @param page_id page identifier
@param[in] page_no page number @param space tablespace (not available for page_id.page_no()==0)
@param tmp_buf 2*srv_page_size for decrypting and decompressing any
page_compressed or encrypted pages
@return page frame @return page frame
@retval NULL if no page was found */ @retval NULL if no valid page for page_id was found */
const byte* find_page(ulint space_id, ulint page_no); byte* find_page(const page_id_t page_id, const fil_space_t *space= NULL,
byte *tmp_buf= NULL);
typedef std::deque<byte*, ut_allocator<byte*> > list; typedef std::deque<byte*, ut_allocator<byte*> > list;
......
...@@ -56,6 +56,7 @@ Created 9/20/1997 Heikki Tuuri ...@@ -56,6 +56,7 @@ Created 9/20/1997 Heikki Tuuri
#include "srv0start.h" #include "srv0start.h"
#include "trx0roll.h" #include "trx0roll.h"
#include "row0merge.h" #include "row0merge.h"
#include "fil0pagecompress.h"
/** Log records are stored in the hash table in chunks at most of this size; /** Log records are stored in the hash table in chunks at most of this size;
this must be less than srv_page_size as it is stored in the buffer pool */ this must be less than srv_page_size as it is stored in the buffer pool */
...@@ -3757,6 +3758,8 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn) ...@@ -3757,6 +3758,8 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
rescan = true; rescan = true;
} }
recv_sys.parse_start_lsn = checkpoint_lsn;
if (srv_operation == SRV_OPERATION_NORMAL) { if (srv_operation == SRV_OPERATION_NORMAL) {
buf_dblwr_process(); buf_dblwr_process();
} }
...@@ -3926,25 +3929,92 @@ recv_recovery_rollback_active(void) ...@@ -3926,25 +3929,92 @@ recv_recovery_rollback_active(void)
} }
} }
/** Find a doublewrite copy of a page. bool recv_dblwr_t::validate_page(const page_id_t page_id,
@param[in] space_id tablespace identifier const byte *page,
@param[in] page_no page number const fil_space_t *space,
@return page frame byte *tmp_buf)
@retval NULL if no page was found */ {
const byte* if (page_id.page_no() == 0)
recv_dblwr_t::find_page(ulint space_id, ulint page_no) {
ulint flags= fsp_header_get_flags(page);
if (!fil_space_t::is_valid_flags(flags, page_id.space()))
{
ulint cflags= fsp_flags_convert_from_101(flags);
if (cflags == ULINT_UNDEFINED)
{
ib::warn() << "Ignoring a doublewrite copy of page " << page_id
<< "due to invalid flags " << ib::hex(flags);
return false;
}
flags= cflags;
}
/* Page 0 is never page_compressed or encrypted. */
return !buf_page_is_corrupted(true, page, flags);
}
ut_ad(tmp_buf);
byte *tmp_frame= tmp_buf;
byte *tmp_page= tmp_buf + srv_page_size;
const uint16_t page_type= mach_read_from_2(page + FIL_PAGE_TYPE);
const bool expect_encrypted= space->crypt_data &&
space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
if (space->full_crc32())
return !buf_page_is_corrupted(true, page, space->flags);
if (expect_encrypted &&
mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
{
if (!fil_space_verify_crypt_checksum(page, space->zip_size()))
return false;
if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)
return true;
if (space->zip_size())
return false;
memcpy(tmp_page, page, space->physical_size());
if (!fil_space_decrypt(space, tmp_frame, tmp_page))
return false;
}
switch (page_type) {
case FIL_PAGE_PAGE_COMPRESSED:
memcpy(tmp_page, page, space->physical_size());
/* fall through */
case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
if (space->zip_size())
return false; /* ROW_FORMAT=COMPRESSED cannot be page_compressed */
ulint decomp= fil_page_decompress(tmp_frame, tmp_page, space->flags);
if (!decomp)
return false; /* decompression failed */
if (decomp == srv_page_size)
return false; /* the page was not compressed (invalid page type) */
return !buf_page_is_corrupted(true, tmp_page, space->flags);
}
return !buf_page_is_corrupted(true, page, space->flags);
}
byte *recv_dblwr_t::find_page(const page_id_t page_id,
const fil_space_t *space, byte *tmp_buf)
{ {
const byte *result= NULL; byte *result= NULL;
lsn_t max_lsn= 0; lsn_t max_lsn= 0;
for (const byte *page : pages) for (byte *page : pages)
{ {
if (page_get_page_no(page) != page_no || if (page_get_page_no(page) != page_id.page_no() ||
page_get_space_id(page) != space_id) page_get_space_id(page) != page_id.space())
continue; continue;
const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN); const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
if (lsn <= max_lsn) if (lsn <= max_lsn ||
!validate_page(page_id, page, space, tmp_buf))
{
/* Mark processed for subsequent iterations in buf_dblwr_process() */
memset(page + FIL_PAGE_LSN, 0, 8);
continue; continue;
}
max_lsn= lsn; max_lsn= lsn;
result= page; result= page;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment