Commit 8c43f963 authored by Marko Mäkelä's avatar Marko Mäkelä

Follow-up to MDEV-12112: corruption in encrypted table may be overlooked

The initial fix only covered a part of Mariabackup.
This fix hardens InnoDB and XtraDB in a similar way, in order
to reduce the probability of mistaking a corrupted encrypted page
for a valid unencrypted one.

This is based on work by Thirunarayanan Balathandayuthapani.

fil_space_verify_crypt_checksum(): Assert that key_version!=0.
Let the callers guarantee that. Now that we have this assertion,
we also know that buf_page_is_zeroes() cannot hold.
Also, remove all diagnostic output and related parameters,
and let the relevant callers emit such messages.
Last but not least, validate the post-encryption checksum
according to the innodb_checksum_algorithm (only accepting
one checksum for the strict variants), and no longer
try to validate the page as if it was unencrypted.

buf_page_is_zeroes(): Move to the compilation unit of the only callers,
and declare static.

xb_fil_cur_read(), buf_page_check_corrupt(): Add a condition before
calling fil_space_verify_crypt_checksum(). This is a non-functional
change.

buf_dblwr_process(): Validate the page only as encrypted or unencrypted,
but not both.
parent 517c59c5
...@@ -522,7 +522,16 @@ is_page_corrupted( ...@@ -522,7 +522,16 @@ is_page_corrupted(
normal method. */ normal method. */
if (is_encrypted && key_version != 0) { if (is_encrypted && key_version != 0) {
is_corrupted = !fil_space_verify_crypt_checksum(buf, is_corrupted = !fil_space_verify_crypt_checksum(buf,
page_size.is_compressed() ? page_size.physical() : 0, NULL, cur_page_num); page_size.is_compressed() ? page_size.physical() : 0);
if (is_corrupted && log_file) {
fprintf(log_file,
"Page " ULINTPF ":%llu may be corrupted;"
" key_version=" ULINTPF "\n",
space_id, cur_page_num,
mach_read_from_4(
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ buf));
}
} else { } else {
is_corrupted = true; is_corrupted = true;
} }
......
...@@ -351,9 +351,14 @@ xb_fil_cur_read( ...@@ -351,9 +351,14 @@ xb_fil_cur_read(
&& page_no >= FSP_EXTENT_SIZE && page_no >= FSP_EXTENT_SIZE
&& page_no < FSP_EXTENT_SIZE * 3) { && page_no < FSP_EXTENT_SIZE * 3) {
/* We ignore the doublewrite buffer pages */ /* We ignore the doublewrite buffer pages */
} else if (fil_space_verify_crypt_checksum( } else if (mach_read_from_4(
page, cursor->zip_size, page
space, page_no)) { + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)
&& space->crypt_data
&& space->crypt_data->type
!= CRYPT_SCHEME_UNENCRYPTED
&& fil_space_verify_crypt_checksum(
page, cursor->zip_size)) {
ut_ad(mach_read_from_4(page + FIL_PAGE_SPACE_ID) ut_ad(mach_read_from_4(page + FIL_PAGE_SPACE_ID)
== space->id); == space->id);
......
call mtr.add_suppression("InnoDB: The page \\[page id: space=[1-9][0-9]*, page number=[1-9][0-9]*\\] in file '.*test.t[123]\\.ibd' cannot be decrypted\\."); call mtr.add_suppression("InnoDB: Encrypted page \\d+:[36] in file .*test.t[123]\\.ibd looks corrupted; key_version=3221342974");
call mtr.add_suppression("InnoDB: Database page corruption on disk or a failed file read of tablespace test/t[0-9]+ page \[page id: space=[0-9]+, page number=[0-9]+\]. You may have to recover from a backup.");
SET GLOBAL innodb_file_format = `Barracuda`; SET GLOBAL innodb_file_format = `Barracuda`;
SET GLOBAL innodb_file_per_table = ON; SET GLOBAL innodb_file_per_table = ON;
set global innodb_compression_algorithm = 1; set global innodb_compression_algorithm = 1;
......
...@@ -7,8 +7,7 @@ ...@@ -7,8 +7,7 @@
# Don't test under embedded # Don't test under embedded
-- source include/not_embedded.inc -- source include/not_embedded.inc
call mtr.add_suppression("InnoDB: The page \\[page id: space=[1-9][0-9]*, page number=[1-9][0-9]*\\] in file '.*test.t[123]\\.ibd' cannot be decrypted\\."); call mtr.add_suppression("InnoDB: Encrypted page \\d+:[36] in file .*test.t[123]\\.ibd looks corrupted; key_version=3221342974");
call mtr.add_suppression("InnoDB: Database page corruption on disk or a failed file read of tablespace test/t[0-9]+ page \[page id: space=[0-9]+, page number=[0-9]+\]. You may have to recover from a backup.");
--disable_warnings --disable_warnings
SET GLOBAL innodb_file_format = `Barracuda`; SET GLOBAL innodb_file_format = `Barracuda`;
...@@ -53,17 +52,17 @@ perl; ...@@ -53,17 +52,17 @@ perl;
open(FILE, "+<", "$ENV{MYSQLD_DATADIR}/test/t1.ibd") or die "open"; open(FILE, "+<", "$ENV{MYSQLD_DATADIR}/test/t1.ibd") or die "open";
binmode FILE; binmode FILE;
seek(FILE, $ENV{'INNODB_PAGE_SIZE'} * 3 + 26, SEEK_SET) or die "seek"; seek(FILE, $ENV{'INNODB_PAGE_SIZE'} * 3 + 26, SEEK_SET) or die "seek";
print FILE pack("H*", "c00lcafedeadb017"); print FILE pack("H*", "c001cafedeadb017");
close FILE or die "close"; close FILE or die "close";
open(FILE, "+<", "$ENV{MYSQLD_DATADIR}/test/t2.ibd") or die "open"; open(FILE, "+<", "$ENV{MYSQLD_DATADIR}/test/t2.ibd") or die "open";
binmode FILE; binmode FILE;
seek(FILE, $ENV{'INNODB_PAGE_SIZE'} * 3 + 26, SEEK_SET) or die "seek"; seek(FILE, $ENV{'INNODB_PAGE_SIZE'} * 3 + 26, SEEK_SET) or die "seek";
print FILE pack("H*", "c00lcafedeadb017"); print FILE pack("H*", "c001cafedeadb017");
close FILE or die "close"; close FILE or die "close";
open(FILE, "+<", "$ENV{MYSQLD_DATADIR}/test/t3.ibd") or die "open"; open(FILE, "+<", "$ENV{MYSQLD_DATADIR}/test/t3.ibd") or die "open";
binmode FILE; binmode FILE;
seek(FILE, $ENV{'INNODB_PAGE_SIZE'} * 3 + 26, SEEK_SET) or die "seek"; seek(FILE, $ENV{'INNODB_PAGE_SIZE'} * 3 + 26, SEEK_SET) or die "seek";
print FILE pack("H*", "c00lcafedeadb017"); print FILE pack("H*", "c001cafedeadb017");
close FILE or die "close"; close FILE or die "close";
EOF EOF
......
...@@ -451,8 +451,15 @@ static bool buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space) ...@@ -451,8 +451,15 @@ static bool buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
/* Verify encryption checksum before we even try to /* Verify encryption checksum before we even try to
decrypt. */ decrypt. */
if (!fil_space_verify_crypt_checksum( if (!fil_space_verify_crypt_checksum(
dst_frame, buf_page_get_zip_size(bpage), NULL, dst_frame, buf_page_get_zip_size(bpage))) {
bpage->offset)) { ib_logf(IB_LOG_LEVEL_ERROR,
"Encrypted page %u:%u in file %s"
" looks corrupted; key_version=" ULINTPF,
bpage->space, bpage->offset,
space->chain.start->name,
mach_read_from_4(
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ dst_frame));
decrypt_failed: decrypt_failed:
/* Mark page encrypted in case it should be. */ /* Mark page encrypted in case it should be. */
if (space->crypt_data->type if (space->crypt_data->type
...@@ -667,24 +674,6 @@ buf_block_alloc( ...@@ -667,24 +674,6 @@ buf_block_alloc(
#endif /* !UNIV_HOTBACKUP */ #endif /* !UNIV_HOTBACKUP */
#endif /* !UNIV_INNOCHECKSUM */ #endif /* !UNIV_INNOCHECKSUM */
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
@return whether the page is all zeroes */
UNIV_INTERN
bool
buf_page_is_zeroes(const byte* read_buf, ulint zip_size)
{
const ulint page_size = zip_size ? zip_size : UNIV_PAGE_SIZE;
for (ulint i = 0; i < page_size; i++) {
if (read_buf[i] != 0) {
return(false);
}
}
return(true);
}
/** Checks if the page is in crc32 checksum format. /** Checks if the page is in crc32 checksum format.
@param[in] read_buf database page @param[in] read_buf database page
@param[in] checksum_field1 new checksum field @param[in] checksum_field1 new checksum field
...@@ -4752,19 +4741,15 @@ or decrypt/decompress just failed. ...@@ -4752,19 +4741,15 @@ or decrypt/decompress just failed.
@retval DB_DECRYPTION_FAILED if page post encryption checksum matches but @retval DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does not match. after decryption normal page checksum does not match.
@retval DB_TABLESPACE_DELETED if accessed tablespace is not found */ @retval DB_TABLESPACE_DELETED if accessed tablespace is not found */
static static dberr_t buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space)
dberr_t
buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space)
{ {
ut_ad(space->n_pending_ios > 0); ut_ad(space->n_pending_ios > 0);
ulint zip_size = buf_page_get_zip_size(bpage); ulint zip_size = buf_page_get_zip_size(bpage);
byte* dst_frame = (zip_size) ? bpage->zip.data : byte* dst_frame = (zip_size) ? bpage->zip.data :
((buf_block_t*) bpage)->frame; ((buf_block_t*) bpage)->frame;
bool still_encrypted = false;
dberr_t err = DB_SUCCESS; dberr_t err = DB_SUCCESS;
bool corrupted = false; bool corrupted = false;
fil_space_crypt_t* crypt_data = space->crypt_data;
/* In buf_decrypt_after_read we have either decrypted the page if /* In buf_decrypt_after_read we have either decrypted the page if
page post encryption checksum matches and used key_id is found page post encryption checksum matches and used key_id is found
...@@ -4772,11 +4757,12 @@ buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space) ...@@ -4772,11 +4757,12 @@ buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space)
not decrypted and it could be either encrypted and corrupted not decrypted and it could be either encrypted and corrupted
or corrupted or good page. If we decrypted, there page could or corrupted or good page. If we decrypted, there page could
still be corrupted if used key does not match. */ still be corrupted if used key does not match. */
still_encrypted = (crypt_data && const bool still_encrypted = mach_read_from_4(
crypt_data->type != CRYPT_SCHEME_UNENCRYPTED && dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)
!bpage->encrypted && && space->crypt_data
fil_space_verify_crypt_checksum(dst_frame, zip_size, && space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED
space, bpage->offset)); && !bpage->encrypted
&& fil_space_verify_crypt_checksum(dst_frame, zip_size);
if (!still_encrypted) { if (!still_encrypted) {
/* If traditional checksums match, we assume that page is /* If traditional checksums match, we assume that page is
......
/***************************************************************************** /*****************************************************************************
Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation. Copyright (c) 2013, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -362,6 +362,22 @@ buf_dblwr_create() ...@@ -362,6 +362,22 @@ buf_dblwr_create()
goto start_again; goto start_again;
} }
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
@return whether the page is all zeroes */
static bool buf_page_is_zeroes(const byte* read_buf, ulint zip_size)
{
const ulint page_size = zip_size ? zip_size : UNIV_PAGE_SIZE;
for (ulint i = 0; i < page_size; i++) {
if (read_buf[i] != 0) {
return false;
}
}
return true;
}
/****************************************************************//** /****************************************************************//**
At a database startup initializes the doublewrite buffer memory structure if At a database startup initializes the doublewrite buffer memory structure if
we already have a doublewrite buffer created in the data files. If we are we already have a doublewrite buffer created in the data files. If we are
...@@ -556,6 +572,9 @@ buf_dblwr_process() ...@@ -556,6 +572,9 @@ buf_dblwr_process()
const bool is_all_zero = buf_page_is_zeroes( const bool is_all_zero = buf_page_is_zeroes(
read_buf, zip_size); read_buf, zip_size);
const bool expect_encrypted = space()->crypt_data
&& space()->crypt_data->type
!= CRYPT_SCHEME_UNENCRYPTED;
if (is_all_zero) { if (is_all_zero) {
/* We will check if the copy in the /* We will check if the copy in the
...@@ -570,10 +589,13 @@ buf_dblwr_process() ...@@ -570,10 +589,13 @@ buf_dblwr_process()
goto bad; goto bad;
} }
if (fil_space_verify_crypt_checksum( if (expect_encrypted && mach_read_from_4(
read_buf, zip_size, NULL, page_no) read_buf
|| !buf_page_is_corrupted( + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)
true, read_buf, zip_size, space())) { ? fil_space_verify_crypt_checksum(read_buf,
zip_size)
: !buf_page_is_corrupted(true, read_buf,
zip_size, space())) {
/* The page is good; there is no need /* The page is good; there is no need
to consult the doublewrite buffer. */ to consult the doublewrite buffer. */
continue; continue;
...@@ -592,9 +614,11 @@ buf_dblwr_process() ...@@ -592,9 +614,11 @@ buf_dblwr_process()
if (!decomp || (decomp != srv_page_size && zip_size)) { if (!decomp || (decomp != srv_page_size && zip_size)) {
goto bad_doublewrite; goto bad_doublewrite;
} }
if (!fil_space_verify_crypt_checksum(page, zip_size, NULL,
page_no) if (expect_encrypted && mach_read_from_4(
&& buf_page_is_corrupted(true, page, zip_size, space)) { page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)
? !fil_space_verify_crypt_checksum(page, zip_size)
: buf_page_is_corrupted(true, page, zip_size, space())) {
if (!is_all_zero) { if (!is_all_zero) {
bad_doublewrite: bad_doublewrite:
ib_logf(IB_LOG_LEVEL_WARN, ib_logf(IB_LOG_LEVEL_WARN,
......
...@@ -662,7 +662,7 @@ fil_encrypt_buf( ...@@ -662,7 +662,7 @@ fil_encrypt_buf(
// store the post-encryption checksum after the key-version // store the post-encryption checksum after the key-version
mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4, checksum); mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4, checksum);
ut_ad(fil_space_verify_crypt_checksum(dst_frame, zip_size, NULL, offset)); ut_ad(fil_space_verify_crypt_checksum(dst_frame, zip_size));
srv_stats.pages_encrypted.inc(); srv_stats.pages_encrypted.inc();
...@@ -2568,167 +2568,66 @@ encrypted, or corrupted. ...@@ -2568,167 +2568,66 @@ encrypted, or corrupted.
@param[in] page Page to verify @param[in] page Page to verify
@param[in] zip_size zip size @param[in] zip_size zip size
@param[in] space Tablespace @return whether the encrypted page is OK */
@param[in] pageno Page no
@return true if page is encrypted AND OK, false otherwise */
UNIV_INTERN UNIV_INTERN
bool bool fil_space_verify_crypt_checksum(const byte* page, ulint zip_size)
fil_space_verify_crypt_checksum(
byte* page,
ulint zip_size,
#ifndef UNIV_INNOCHECKSUM
const fil_space_t* space,
#else
const void* space,
#endif
ulint pageno)
{ {
uint key_version = mach_read_from_4(page+ FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION); ut_ad(mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION));
/* If page is not encrypted, return false */
if (key_version == 0) {
return(false);
}
srv_checksum_algorithm_t algorithm =
static_cast<srv_checksum_algorithm_t>(srv_checksum_algorithm);
/* If no checksum is used, can't continue checking. */
if (algorithm == SRV_CHECKSUM_ALGORITHM_NONE) {
return(true);
}
/* Read stored post encryption checksum. */
ib_uint32_t checksum = mach_read_from_4(
page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
/* Declare empty pages non-corrupted */
if (checksum == 0
&& *reinterpret_cast<const ib_uint64_t*>(page + FIL_PAGE_LSN) == 0
&& buf_page_is_zeroes(page, zip_size)) {
return(true);
}
/* Compressed and encrypted pages do not have checksum. Assume not /* Compressed and encrypted pages do not have checksum. Assume not
corrupted. Page verification happens after decompression in corrupted. Page verification happens after decompression in
buf_page_io_complete() using buf_page_is_corrupted(). */ buf_page_io_complete() using buf_page_is_corrupted(). */
if (mach_read_from_2(page+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) { if (mach_read_from_2(page + FIL_PAGE_TYPE)
return (true); == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
return true;
} }
ib_uint32_t cchecksum1 = 0; /* Read stored post encryption checksum. */
ib_uint32_t cchecksum2 = 0; const ib_uint32_t checksum = mach_read_from_4(
page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
/* Calculate checksums */
if (zip_size) {
cchecksum1 = page_zip_calc_checksum(
page, zip_size, SRV_CHECKSUM_ALGORITHM_CRC32);
cchecksum2 = (cchecksum1 == checksum)
? 0
: page_zip_calc_checksum(
page, zip_size,
SRV_CHECKSUM_ALGORITHM_INNODB);
} else {
cchecksum1 = buf_calc_page_crc32(page);
cchecksum2 = (cchecksum1 == checksum)
? 0
: buf_calc_page_new_checksum(page);
}
/* If stored checksum matches one of the calculated checksums /* If stored checksum matches one of the calculated checksums
page is not corrupted. */ page is not corrupted. */
srv_checksum_algorithm_t algorithm = srv_checksum_algorithm_t(
srv_checksum_algorithm);
switch (algorithm) {
case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
if (zip_size) {
return checksum == page_zip_calc_checksum(
page, zip_size, SRV_CHECKSUM_ALGORITHM_CRC32);
}
bool encrypted = (checksum == cchecksum1 || checksum == cchecksum2 return checksum == buf_calc_page_crc32(page);
|| checksum == BUF_NO_CHECKSUM_MAGIC); case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
if (zip_size) {
/* MySQL 5.6 and MariaDB 10.0 and 10.1 will write an LSN to the return checksum == page_zip_calc_checksum(
first page of each system tablespace file at page, zip_size, SRV_CHECKSUM_ALGORITHM_INNODB);
FIL_PAGE_FILE_FLUSH_LSN offset. On other pages and in other files,
the field might have been uninitialized until MySQL 5.5. In MySQL 5.7
(and MariaDB Server 10.2.2) WL#7990 stopped writing the field for other
than page 0 of the system tablespace.
Starting from MariaDB 10.1 the field has been repurposed for
encryption key_version.
Starting with MySQL 5.7 (and MariaDB Server 10.2), the
field has been repurposed for SPATIAL INDEX pages for
FIL_RTREE_SPLIT_SEQ_NUM.
Note that FIL_PAGE_FILE_FLUSH_LSN is not included in the InnoDB page
checksum.
Thus, FIL_PAGE_FILE_FLUSH_LSN could contain any value. While the
field would usually be 0 for pages that are not encrypted, we cannot
assume that a nonzero value means that the page is encrypted.
Therefore we must validate the page both as encrypted and unencrypted
when FIL_PAGE_FILE_FLUSH_LSN does not contain 0.
*/
uint32_t checksum1 = mach_read_from_4(page + FIL_PAGE_SPACE_OR_CHKSUM);
uint32_t checksum2;
bool valid = false;
if (zip_size) {
valid = (checksum1 == cchecksum1);
checksum2 = checksum1;
} else {
checksum2 = mach_read_from_4(
page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM);
switch (algorithm) {
case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
valid = buf_page_is_checksum_valid_crc32(page, checksum1,
checksum2);
break;
case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
valid = buf_page_is_checksum_valid_innodb(page, checksum1,
checksum2);
break;
case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
valid = buf_page_is_checksum_valid_none(page, checksum1,
checksum2);
break;
case SRV_CHECKSUM_ALGORITHM_CRC32:
case SRV_CHECKSUM_ALGORITHM_INNODB:
valid = buf_page_is_checksum_valid_crc32(
page, checksum1, checksum2)
|| buf_page_is_checksum_valid_innodb(
page, checksum1, checksum2);
break;
case SRV_CHECKSUM_ALGORITHM_NONE:
ut_error;
} }
} return checksum == buf_calc_page_new_checksum(page);
case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
return checksum == BUF_NO_CHECKSUM_MAGIC;
case SRV_CHECKSUM_ALGORITHM_NONE:
return true;
case SRV_CHECKSUM_ALGORITHM_INNODB:
case SRV_CHECKSUM_ALGORITHM_CRC32:
if (checksum == BUF_NO_CHECKSUM_MAGIC) {
return true;
}
if (zip_size) {
if (checksum == page_zip_calc_checksum(
page, zip_size, algorithm)) {
return true;
}
if (encrypted && valid) { algorithm = algorithm == SRV_CHECKSUM_ALGORITHM_INNODB
/* If page is encrypted and traditional checksums match, ? SRV_CHECKSUM_ALGORITHM_CRC32
page could be still encrypted, or not encrypted and valid or : SRV_CHECKSUM_ALGORITHM_INNODB;
corrupted. */ return checksum == page_zip_calc_checksum(
#ifndef UNIV_INNOCHECKSUM page, zip_size, algorithm);
ib_logf(IB_LOG_LEVEL_ERROR,
" Page " ULINTPF " in space %s (" ULINTPF ") maybe corrupted."
" Post encryption checksum %u stored [%u:%u] key_version %u",
pageno,
space ? space->name : "N/A",
mach_read_from_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID),
checksum, checksum1, checksum2, key_version);
#else
if (log_file) {
fprintf(log_file,
"Page " ULINTPF ":" ULINTPF " may be corrupted."
" Post encryption checksum %u"
" stored [%u:%u] key_version %u\n",
pageno,
mach_read_from_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID),
checksum, checksum1, checksum2,
key_version);
} }
#endif /* UNIV_INNOCHECKSUM */
encrypted = false; return checksum == buf_calc_page_crc32(page)
|| checksum == buf_calc_page_new_checksum(page);
} }
return(encrypted);
} }
...@@ -703,14 +703,6 @@ buf_page_is_corrupted( ...@@ -703,14 +703,6 @@ buf_page_is_corrupted(
#endif #endif
MY_ATTRIBUTE((warn_unused_result)); MY_ATTRIBUTE((warn_unused_result));
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
@return whether the page is all zeroes */
UNIV_INTERN
bool
buf_page_is_zeroes(const byte* read_buf, ulint zip_size);
#ifndef UNIV_INNOCHECKSUM #ifndef UNIV_INNOCHECKSUM
#ifndef UNIV_HOTBACKUP #ifndef UNIV_HOTBACKUP
......
/***************************************************************************** /*****************************************************************************
Copyright (C) 2013, 2015, Google Inc. All Rights Reserved. Copyright (C) 2013, 2015, Google Inc. All Rights Reserved.
Copyright (c) 2015, 2017, MariaDB Corporation. Copyright (c) 2015, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -415,20 +415,9 @@ encrypted, or corrupted. ...@@ -415,20 +415,9 @@ encrypted, or corrupted.
@param[in] page Page to verify @param[in] page Page to verify
@param[in] zip_size zip size @param[in] zip_size zip size
@param[in] space Tablespace @return whether the encrypted page is OK */
@param[in] pageno Page no
@return true if page is encrypted AND OK, false otherwise */
UNIV_INTERN UNIV_INTERN
bool bool fil_space_verify_crypt_checksum(const byte* page, ulint zip_size)
fil_space_verify_crypt_checksum(
byte* page,
ulint zip_size,
#ifndef UNIV_INNOCHECKSUM
const fil_space_t* space,
#else
const void* space,
#endif
ulint pageno)
MY_ATTRIBUTE((warn_unused_result)); MY_ATTRIBUTE((warn_unused_result));
#ifndef UNIV_INNOCHECKSUM #ifndef UNIV_INNOCHECKSUM
......
...@@ -3537,8 +3537,7 @@ fil_iterate( ...@@ -3537,8 +3537,7 @@ fil_iterate(
} }
} else { } else {
if (!fil_space_verify_crypt_checksum( if (!fil_space_verify_crypt_checksum(
src, callback.get_zip_size(), src, callback.get_zip_size())) {
NULL, block->page.offset)) {
goto page_corrupted; goto page_corrupted;
} }
......
...@@ -481,8 +481,15 @@ static bool buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space) ...@@ -481,8 +481,15 @@ static bool buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
/* Verify encryption checksum before we even try to /* Verify encryption checksum before we even try to
decrypt. */ decrypt. */
if (!fil_space_verify_crypt_checksum( if (!fil_space_verify_crypt_checksum(
dst_frame, buf_page_get_zip_size(bpage), NULL, dst_frame, buf_page_get_zip_size(bpage))) {
bpage->offset)) { ib_logf(IB_LOG_LEVEL_ERROR,
"Encrypted page %u:%u in file %s"
" looks corrupted; key_version=" ULINTPF,
bpage->space, bpage->offset,
space->chain.start->name,
mach_read_from_4(
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ dst_frame));
decrypt_failed: decrypt_failed:
/* Mark page encrypted in case it should be. */ /* Mark page encrypted in case it should be. */
if (space->crypt_data->type if (space->crypt_data->type
...@@ -728,24 +735,6 @@ buf_block_alloc( ...@@ -728,24 +735,6 @@ buf_block_alloc(
} }
#endif /* !UNIV_HOTBACKUP */ #endif /* !UNIV_HOTBACKUP */
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
@return whether the page is all zeroes */
UNIV_INTERN
bool
buf_page_is_zeroes(const byte* read_buf, ulint zip_size)
{
const ulint page_size = zip_size ? zip_size : UNIV_PAGE_SIZE;
for (ulint i = 0; i < page_size; i++) {
if (read_buf[i] != 0) {
return(false);
}
}
return(true);
}
/** Checks if the page is in crc32 checksum format. /** Checks if the page is in crc32 checksum format.
@param[in] read_buf database page @param[in] read_buf database page
@param[in] checksum_field1 new checksum field @param[in] checksum_field1 new checksum field
...@@ -4777,19 +4766,15 @@ or decrypt/decompress just failed. ...@@ -4777,19 +4766,15 @@ or decrypt/decompress just failed.
@retval DB_DECRYPTION_FAILED if page post encryption checksum matches but @retval DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does not match. after decryption normal page checksum does not match.
@retval DB_TABLESPACE_DELETED if accessed tablespace is not found */ @retval DB_TABLESPACE_DELETED if accessed tablespace is not found */
static static dberr_t buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space)
dberr_t
buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space)
{ {
ut_ad(space->n_pending_ios > 0); ut_ad(space->n_pending_ios > 0);
ulint zip_size = buf_page_get_zip_size(bpage); ulint zip_size = buf_page_get_zip_size(bpage);
byte* dst_frame = (zip_size) ? bpage->zip.data : byte* dst_frame = (zip_size) ? bpage->zip.data :
((buf_block_t*) bpage)->frame; ((buf_block_t*) bpage)->frame;
bool still_encrypted = false;
dberr_t err = DB_SUCCESS; dberr_t err = DB_SUCCESS;
bool corrupted = false; bool corrupted = false;
fil_space_crypt_t* crypt_data = space->crypt_data;
/* In buf_decrypt_after_read we have either decrypted the page if /* In buf_decrypt_after_read we have either decrypted the page if
page post encryption checksum matches and used key_id is found page post encryption checksum matches and used key_id is found
...@@ -4797,11 +4782,12 @@ buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space) ...@@ -4797,11 +4782,12 @@ buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space)
not decrypted and it could be either encrypted and corrupted not decrypted and it could be either encrypted and corrupted
or corrupted or good page. If we decrypted, there page could or corrupted or good page. If we decrypted, there page could
still be corrupted if used key does not match. */ still be corrupted if used key does not match. */
still_encrypted = (crypt_data && const bool still_encrypted = mach_read_from_4(
crypt_data->type != CRYPT_SCHEME_UNENCRYPTED && dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)
!bpage->encrypted && && space->crypt_data
fil_space_verify_crypt_checksum(dst_frame, zip_size, && space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED
space, bpage->offset)); && !bpage->encrypted
&& fil_space_verify_crypt_checksum(dst_frame, zip_size);
if (!still_encrypted) { if (!still_encrypted) {
/* If traditional checksums match, we assume that page is /* If traditional checksums match, we assume that page is
......
/***************************************************************************** /*****************************************************************************
Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation. Copyright (c) 2013, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -362,6 +362,22 @@ buf_dblwr_create() ...@@ -362,6 +362,22 @@ buf_dblwr_create()
goto start_again; goto start_again;
} }
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
@return whether the page is all zeroes */
static bool buf_page_is_zeroes(const byte* read_buf, ulint zip_size)
{
const ulint page_size = zip_size ? zip_size : UNIV_PAGE_SIZE;
for (ulint i = 0; i < page_size; i++) {
if (read_buf[i] != 0) {
return false;
}
}
return true;
}
/****************************************************************//** /****************************************************************//**
At a database startup initializes the doublewrite buffer memory structure if At a database startup initializes the doublewrite buffer memory structure if
we already have a doublewrite buffer created in the data files. If we are we already have a doublewrite buffer created in the data files. If we are
...@@ -556,6 +572,9 @@ buf_dblwr_process() ...@@ -556,6 +572,9 @@ buf_dblwr_process()
const bool is_all_zero = buf_page_is_zeroes( const bool is_all_zero = buf_page_is_zeroes(
read_buf, zip_size); read_buf, zip_size);
const bool expect_encrypted = space()->crypt_data
&& space()->crypt_data->type
!= CRYPT_SCHEME_UNENCRYPTED;
if (is_all_zero) { if (is_all_zero) {
/* We will check if the copy in the /* We will check if the copy in the
...@@ -566,17 +585,17 @@ buf_dblwr_process() ...@@ -566,17 +585,17 @@ buf_dblwr_process()
/* Decompress the page before /* Decompress the page before
validating the checksum. */ validating the checksum. */
ulint decomp = fil_page_decompress(buf, read_buf); ulint decomp = fil_page_decompress(buf, read_buf);
if (!decomp) {
goto bad;
}
if (!decomp || (decomp != srv_page_size && zip_size)) { if (!decomp || (decomp != srv_page_size && zip_size)) {
goto bad; goto bad;
} }
if (fil_space_verify_crypt_checksum( if (expect_encrypted && mach_read_from_4(
read_buf, zip_size, NULL, page_no) read_buf
|| !buf_page_is_corrupted( + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)
true, read_buf, zip_size, space())) { ? fil_space_verify_crypt_checksum(read_buf,
zip_size)
: !buf_page_is_corrupted(true, read_buf,
zip_size, space())) {
/* The page is good; there is no need /* The page is good; there is no need
to consult the doublewrite buffer. */ to consult the doublewrite buffer. */
continue; continue;
...@@ -595,9 +614,11 @@ buf_dblwr_process() ...@@ -595,9 +614,11 @@ buf_dblwr_process()
if (!decomp || (decomp != srv_page_size && zip_size)) { if (!decomp || (decomp != srv_page_size && zip_size)) {
goto bad_doublewrite; goto bad_doublewrite;
} }
if (!fil_space_verify_crypt_checksum(page, zip_size, NULL,
page_no) if (expect_encrypted && mach_read_from_4(
&& buf_page_is_corrupted(true, page, zip_size, space)) { page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)
? !fil_space_verify_crypt_checksum(page, zip_size)
: buf_page_is_corrupted(true, page, zip_size, space())) {
if (!is_all_zero) { if (!is_all_zero) {
bad_doublewrite: bad_doublewrite:
ib_logf(IB_LOG_LEVEL_WARN, ib_logf(IB_LOG_LEVEL_WARN,
......
...@@ -662,7 +662,7 @@ fil_encrypt_buf( ...@@ -662,7 +662,7 @@ fil_encrypt_buf(
// store the post-encryption checksum after the key-version // store the post-encryption checksum after the key-version
mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4, checksum); mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4, checksum);
ut_ad(fil_space_verify_crypt_checksum(dst_frame, zip_size, NULL, offset)); ut_ad(fil_space_verify_crypt_checksum(dst_frame, zip_size));
srv_stats.pages_encrypted.inc(); srv_stats.pages_encrypted.inc();
...@@ -2568,167 +2568,66 @@ encrypted, or corrupted. ...@@ -2568,167 +2568,66 @@ encrypted, or corrupted.
@param[in] page Page to verify @param[in] page Page to verify
@param[in] zip_size zip size @param[in] zip_size zip size
@param[in] space Tablespace @return whether the encrypted page is OK */
@param[in] pageno Page no
@return true if page is encrypted AND OK, false otherwise */
UNIV_INTERN UNIV_INTERN
bool bool fil_space_verify_crypt_checksum(const byte* page, ulint zip_size)
fil_space_verify_crypt_checksum(
byte* page,
ulint zip_size,
#ifndef UNIV_INNOCHECKSUM
const fil_space_t* space,
#else
const void* space,
#endif
ulint pageno)
{ {
uint key_version = mach_read_from_4(page+ FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION); ut_ad(mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION));
/* If page is not encrypted, return false */
if (key_version == 0) {
return(false);
}
srv_checksum_algorithm_t algorithm =
static_cast<srv_checksum_algorithm_t>(srv_checksum_algorithm);
/* If no checksum is used, can't continue checking. */
if (algorithm == SRV_CHECKSUM_ALGORITHM_NONE) {
return(true);
}
/* Read stored post encryption checksum. */
ib_uint32_t checksum = mach_read_from_4(
page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
/* Declare empty pages non-corrupted */
if (checksum == 0
&& *reinterpret_cast<const ib_uint64_t*>(page + FIL_PAGE_LSN) == 0
&& buf_page_is_zeroes(page, zip_size)) {
return(true);
}
/* Compressed and encrypted pages do not have checksum. Assume not /* Compressed and encrypted pages do not have checksum. Assume not
corrupted. Page verification happens after decompression in corrupted. Page verification happens after decompression in
buf_page_io_complete() using buf_page_is_corrupted(). */ buf_page_io_complete() using buf_page_is_corrupted(). */
if (mach_read_from_2(page+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) { if (mach_read_from_2(page + FIL_PAGE_TYPE)
return (true); == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
return true;
} }
ib_uint32_t cchecksum1 = 0; /* Read stored post encryption checksum. */
ib_uint32_t cchecksum2 = 0; const ib_uint32_t checksum = mach_read_from_4(
page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
/* Calculate checksums */
if (zip_size) {
cchecksum1 = page_zip_calc_checksum(
page, zip_size, SRV_CHECKSUM_ALGORITHM_CRC32);
cchecksum2 = (cchecksum1 == checksum)
? 0
: page_zip_calc_checksum(
page, zip_size,
SRV_CHECKSUM_ALGORITHM_INNODB);
} else {
cchecksum1 = buf_calc_page_crc32(page);
cchecksum2 = (cchecksum1 == checksum)
? 0
: buf_calc_page_new_checksum(page);
}
/* If stored checksum matches one of the calculated checksums /* If stored checksum matches one of the calculated checksums
page is not corrupted. */ page is not corrupted. */
srv_checksum_algorithm_t algorithm = srv_checksum_algorithm_t(
srv_checksum_algorithm);
switch (algorithm) {
case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
if (zip_size) {
return checksum == page_zip_calc_checksum(
page, zip_size, SRV_CHECKSUM_ALGORITHM_CRC32);
}
bool encrypted = (checksum == cchecksum1 || checksum == cchecksum2 return checksum == buf_calc_page_crc32(page);
|| checksum == BUF_NO_CHECKSUM_MAGIC); case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
if (zip_size) {
/* MySQL 5.6 and MariaDB 10.0 and 10.1 will write an LSN to the return checksum == page_zip_calc_checksum(
first page of each system tablespace file at page, zip_size, SRV_CHECKSUM_ALGORITHM_INNODB);
FIL_PAGE_FILE_FLUSH_LSN offset. On other pages and in other files,
the field might have been uninitialized until MySQL 5.5. In MySQL 5.7
(and MariaDB Server 10.2.2) WL#7990 stopped writing the field for other
than page 0 of the system tablespace.
Starting from MariaDB 10.1 the field has been repurposed for
encryption key_version.
Starting with MySQL 5.7 (and MariaDB Server 10.2), the
field has been repurposed for SPATIAL INDEX pages for
FIL_RTREE_SPLIT_SEQ_NUM.
Note that FIL_PAGE_FILE_FLUSH_LSN is not included in the InnoDB page
checksum.
Thus, FIL_PAGE_FILE_FLUSH_LSN could contain any value. While the
field would usually be 0 for pages that are not encrypted, we cannot
assume that a nonzero value means that the page is encrypted.
Therefore we must validate the page both as encrypted and unencrypted
when FIL_PAGE_FILE_FLUSH_LSN does not contain 0.
*/
uint32_t checksum1 = mach_read_from_4(page + FIL_PAGE_SPACE_OR_CHKSUM);
uint32_t checksum2;
bool valid = false;
if (zip_size) {
valid = (checksum1 == cchecksum1);
checksum2 = checksum1;
} else {
checksum2 = mach_read_from_4(
page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM);
switch (algorithm) {
case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
valid = buf_page_is_checksum_valid_crc32(page, checksum1,
checksum2);
break;
case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
valid = buf_page_is_checksum_valid_innodb(page, checksum1,
checksum2);
break;
case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
valid = buf_page_is_checksum_valid_none(page, checksum1,
checksum2);
break;
case SRV_CHECKSUM_ALGORITHM_CRC32:
case SRV_CHECKSUM_ALGORITHM_INNODB:
valid = buf_page_is_checksum_valid_crc32(
page, checksum1, checksum2)
|| buf_page_is_checksum_valid_innodb(
page, checksum1, checksum2);
break;
case SRV_CHECKSUM_ALGORITHM_NONE:
ut_error;
} }
} return checksum == buf_calc_page_new_checksum(page);
case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
return checksum == BUF_NO_CHECKSUM_MAGIC;
case SRV_CHECKSUM_ALGORITHM_NONE:
return true;
case SRV_CHECKSUM_ALGORITHM_INNODB:
case SRV_CHECKSUM_ALGORITHM_CRC32:
if (checksum == BUF_NO_CHECKSUM_MAGIC) {
return true;
}
if (zip_size) {
if (checksum == page_zip_calc_checksum(
page, zip_size, algorithm)) {
return true;
}
if (encrypted && valid) { algorithm = algorithm == SRV_CHECKSUM_ALGORITHM_INNODB
/* If page is encrypted and traditional checksums match, ? SRV_CHECKSUM_ALGORITHM_CRC32
page could be still encrypted, or not encrypted and valid or : SRV_CHECKSUM_ALGORITHM_INNODB;
corrupted. */ return checksum == page_zip_calc_checksum(
#ifndef UNIV_INNOCHECKSUM page, zip_size, algorithm);
ib_logf(IB_LOG_LEVEL_ERROR,
" Page " ULINTPF " in space %s (" ULINTPF ") maybe corrupted."
" Post encryption checksum %u stored [%u:%u] key_version %u",
pageno,
space ? space->name : "N/A",
mach_read_from_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID),
checksum, checksum1, checksum2, key_version);
#else
if (log_file) {
fprintf(log_file,
"Page " ULINTPF ":" ULINTPF " may be corrupted."
" Post encryption checksum %u"
" stored [%u:%u] key_version %u\n",
pageno,
mach_read_from_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID),
checksum, checksum1, checksum2,
key_version);
} }
#endif /* UNIV_INNOCHECKSUM */
encrypted = false; return checksum == buf_calc_page_crc32(page)
|| checksum == buf_calc_page_new_checksum(page);
} }
return(encrypted);
} }
...@@ -692,13 +692,6 @@ buf_page_is_corrupted( ...@@ -692,13 +692,6 @@ buf_page_is_corrupted(
ulint zip_size, ulint zip_size,
const fil_space_t* space) const fil_space_t* space)
MY_ATTRIBUTE((warn_unused_result)); MY_ATTRIBUTE((warn_unused_result));
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
@return whether the page is all zeroes */
UNIV_INTERN
bool
buf_page_is_zeroes(const byte* read_buf, ulint zip_size);
#ifndef UNIV_HOTBACKUP #ifndef UNIV_HOTBACKUP
/**********************************************************************//** /**********************************************************************//**
Gets the space id, page offset, and byte offset within page of a Gets the space id, page offset, and byte offset within page of a
......
...@@ -409,16 +409,9 @@ encrypted, or corrupted. ...@@ -409,16 +409,9 @@ encrypted, or corrupted.
@param[in] page Page to verify @param[in] page Page to verify
@param[in] zip_size zip size @param[in] zip_size zip size
@param[in] space Tablespace @return whether the encrypted page is OK */
@param[in] pageno Page no
@return true if page is encrypted AND OK, false otherwise */
UNIV_INTERN UNIV_INTERN
bool bool fil_space_verify_crypt_checksum(const byte* page, ulint zip_size)
fil_space_verify_crypt_checksum(
byte* page,
ulint zip_size,
const fil_space_t* space,
ulint pageno)
MY_ATTRIBUTE((warn_unused_result)); MY_ATTRIBUTE((warn_unused_result));
/********************************************************************* /*********************************************************************
......
...@@ -3537,8 +3537,7 @@ fil_iterate( ...@@ -3537,8 +3537,7 @@ fil_iterate(
} }
} else { } else {
if (!fil_space_verify_crypt_checksum( if (!fil_space_verify_crypt_checksum(
src, callback.get_zip_size(), src, callback.get_zip_size())) {
NULL, block->page.offset)) {
goto page_corrupted; goto page_corrupted;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment