Commit bc323727 authored by Jan Lindström's avatar Jan Lindström

MDEV-10977: [ERROR] InnoDB: Block in space_id 0 in file ibdata1 encrypted.

MDEV-10394: Innodb system table space corrupted

Analysis: After we have read the page in buf_page_io_complete try to
find if the page is encrypted or corrupted. Encryption was determined
by reading FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION field from FIL-header
as a key_version. However, this field is not always zero even when
encryption is not used. Thus, incorrect key_version could lead situation where
decryption is tried to page that is not encrypted.

Fix: We still read key_version information from FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
field but also check if tablespace has encryption information before trying
encrypt the page.
parent c1bbedbd
...@@ -526,6 +526,14 @@ buf_page_is_checksum_valid_crc32( ...@@ -526,6 +526,14 @@ buf_page_is_checksum_valid_crc32(
{ {
ib_uint32_t crc32 = buf_calc_page_crc32(read_buf); ib_uint32_t crc32 = buf_calc_page_crc32(read_buf);
#ifdef UNIV_DEBUG_LEVEL2
if (!(checksum_field1 == crc32 && checksum_field2 == crc32)) {
ib_logf(IB_LOG_LEVEL_INFO,
"Page checksum crc32 not valid field1 %lu field2 %lu crc32 %lu.",
checksum_field1, checksum_field2, crc32);
}
#endif
return(checksum_field1 == crc32 && checksum_field2 == crc32); return(checksum_field1 == crc32 && checksum_field2 == crc32);
} }
...@@ -553,6 +561,13 @@ buf_page_is_checksum_valid_innodb( ...@@ -553,6 +561,13 @@ buf_page_is_checksum_valid_innodb(
if (checksum_field2 != mach_read_from_4(read_buf + FIL_PAGE_LSN) if (checksum_field2 != mach_read_from_4(read_buf + FIL_PAGE_LSN)
&& checksum_field2 != buf_calc_page_old_checksum(read_buf)) { && checksum_field2 != buf_calc_page_old_checksum(read_buf)) {
#ifdef UNIV_DEBUG_LEVEL2
ib_logf(IB_LOG_LEVEL_INFO,
"Page checksum innodb not valid field1 %lu field2 %lu crc32 %lu lsn %lu.",
checksum_field1, checksum_field2, buf_calc_page_old_checksum(read_buf),
mach_read_from_4(read_buf + FIL_PAGE_LSN)
);
#endif
return(false); return(false);
} }
...@@ -563,6 +578,13 @@ buf_page_is_checksum_valid_innodb( ...@@ -563,6 +578,13 @@ buf_page_is_checksum_valid_innodb(
if (checksum_field1 != 0 if (checksum_field1 != 0
&& checksum_field1 != buf_calc_page_new_checksum(read_buf)) { && checksum_field1 != buf_calc_page_new_checksum(read_buf)) {
#ifdef UNIV_DEBUG_LEVEL2
ib_logf(IB_LOG_LEVEL_INFO,
"Page checksum innodb not valid field1 %lu field2 %lu crc32 %lu lsn %lu.",
checksum_field1, checksum_field2, buf_calc_page_old_checksum(read_buf),
mach_read_from_4(read_buf + FIL_PAGE_LSN)
);
#endif
return(false); return(false);
} }
...@@ -581,6 +603,16 @@ buf_page_is_checksum_valid_none( ...@@ -581,6 +603,16 @@ buf_page_is_checksum_valid_none(
ulint checksum_field1, ulint checksum_field1,
ulint checksum_field2) ulint checksum_field2)
{ {
#ifdef UNIV_DEBUG_LEVEL2
if (!(checksum_field1 == checksum_field2 || checksum_field1 == BUF_NO_CHECKSUM_MAGIC)) {
ib_logf(IB_LOG_LEVEL_INFO,
"Page checksum none not valid field1 %lu field2 %lu crc32 %lu lsn %lu.",
checksum_field1, checksum_field2, buf_calc_page_old_checksum(read_buf),
mach_read_from_4(read_buf + FIL_PAGE_LSN)
);
}
#endif
return(checksum_field1 == checksum_field2 return(checksum_field1 == checksum_field2
&& checksum_field1 == BUF_NO_CHECKSUM_MAGIC); && checksum_field1 == BUF_NO_CHECKSUM_MAGIC);
} }
...@@ -598,9 +630,21 @@ buf_page_is_corrupted( ...@@ -598,9 +630,21 @@ buf_page_is_corrupted(
ulint zip_size) /*!< in: size of compressed page; ulint zip_size) /*!< in: size of compressed page;
0 for uncompressed pages */ 0 for uncompressed pages */
{ {
ulint page_encrypted = fil_page_is_encrypted(read_buf);
ulint checksum_field1; ulint checksum_field1;
ulint checksum_field2; ulint checksum_field2;
ulint space_id = mach_read_from_4(
read_buf + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
fil_space_crypt_t* crypt_data = fil_space_get_crypt_data(space_id);
bool page_encrypted = false;
/* Page is encrypted if encryption information is found from
tablespace and page contains used key_version. This is true
also for pages first compressed and then encrypted. */
if (crypt_data &&
crypt_data->type != CRYPT_SCHEME_UNENCRYPTED &&
fil_page_is_encrypted(read_buf)) {
page_encrypted = true;
}
if (!page_encrypted && !zip_size if (!page_encrypted && !zip_size
&& memcmp(read_buf + FIL_PAGE_LSN + 4, && memcmp(read_buf + FIL_PAGE_LSN + 4,
...@@ -610,6 +654,11 @@ buf_page_is_corrupted( ...@@ -610,6 +654,11 @@ buf_page_is_corrupted(
/* Stored log sequence numbers at the start and the end /* Stored log sequence numbers at the start and the end
of page do not match */ of page do not match */
ib_logf(IB_LOG_LEVEL_INFO,
"Log sequence number at the start %lu and the end %lu do not match.",
mach_read_from_4(read_buf + FIL_PAGE_LSN + 4),
mach_read_from_4(read_buf + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM + 4));
return(TRUE); return(TRUE);
} }
...@@ -676,6 +725,9 @@ buf_page_is_corrupted( ...@@ -676,6 +725,9 @@ buf_page_is_corrupted(
/* make sure that the page is really empty */ /* make sure that the page is really empty */
for (ulint i = 0; i < UNIV_PAGE_SIZE; i++) { for (ulint i = 0; i < UNIV_PAGE_SIZE; i++) {
if (read_buf[i] != 0) { if (read_buf[i] != 0) {
ib_logf(IB_LOG_LEVEL_INFO,
"Checksum fields zero but page is not empty.");
return(TRUE); return(TRUE);
} }
} }
...@@ -686,7 +738,7 @@ buf_page_is_corrupted( ...@@ -686,7 +738,7 @@ buf_page_is_corrupted(
DBUG_EXECUTE_IF("buf_page_is_corrupt_failure", return(TRUE); ); DBUG_EXECUTE_IF("buf_page_is_corrupt_failure", return(TRUE); );
ulint page_no = mach_read_from_4(read_buf + FIL_PAGE_OFFSET); ulint page_no = mach_read_from_4(read_buf + FIL_PAGE_OFFSET);
ulint space_id = mach_read_from_4(read_buf + FIL_PAGE_SPACE_ID);
const srv_checksum_algorithm_t curr_algo = const srv_checksum_algorithm_t curr_algo =
static_cast<srv_checksum_algorithm_t>(srv_checksum_algorithm); static_cast<srv_checksum_algorithm_t>(srv_checksum_algorithm);
...@@ -4531,7 +4583,7 @@ buf_page_io_complete( ...@@ -4531,7 +4583,7 @@ buf_page_io_complete(
if (io_type == BUF_IO_READ) { if (io_type == BUF_IO_READ) {
ulint read_page_no; ulint read_page_no;
ulint read_space_id; ulint read_space_id;
byte* frame; byte* frame = NULL;
if (!buf_page_decrypt_after_read(bpage)) { if (!buf_page_decrypt_after_read(bpage)) {
/* encryption error! */ /* encryption error! */
...@@ -4540,7 +4592,8 @@ buf_page_io_complete( ...@@ -4540,7 +4592,8 @@ buf_page_io_complete(
} else { } else {
frame = ((buf_block_t*) bpage)->frame; frame = ((buf_block_t*) bpage)->frame;
} }
goto corrupt;
goto database_corrupted;
} }
if (buf_page_get_zip_size(bpage)) { if (buf_page_get_zip_size(bpage)) {
...@@ -4551,7 +4604,7 @@ buf_page_io_complete( ...@@ -4551,7 +4604,7 @@ buf_page_io_complete(
FALSE)) { FALSE)) {
buf_pool->n_pend_unzip--; buf_pool->n_pend_unzip--;
goto corrupt; goto database_corrupted;
} }
buf_pool->n_pend_unzip--; buf_pool->n_pend_unzip--;
} else { } else {
...@@ -4614,7 +4667,7 @@ buf_page_io_complete( ...@@ -4614,7 +4667,7 @@ buf_page_io_complete(
} }
goto page_not_corrupt; goto page_not_corrupt;
;); ;);
corrupt: database_corrupted:
bool corrupted = buf_page_check_corrupt(bpage); bool corrupted = buf_page_check_corrupt(bpage);
if (corrupted) { if (corrupted) {
...@@ -6177,6 +6230,19 @@ buf_page_decrypt_after_read( ...@@ -6177,6 +6230,19 @@ buf_page_decrypt_after_read(
bool page_compressed_encrypted = fil_page_is_compressed_encrypted(dst_frame); bool page_compressed_encrypted = fil_page_is_compressed_encrypted(dst_frame);
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage); buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
bool success = true; bool success = true;
ulint space_id = mach_read_from_4(
dst_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
fil_space_crypt_t* crypt_data = fil_space_get_crypt_data(space_id);
/* Page is encrypted if encryption information is found from
tablespace and page contains used key_version. This is true
also for pages first compressed and then encrypted. */
if (!crypt_data ||
(crypt_data &&
crypt_data->type == CRYPT_SCHEME_UNENCRYPTED &&
key_version != 0)) {
key_version = 0;
}
/* If page is encrypted read post-encryption checksum */ /* If page is encrypted read post-encryption checksum */
if (!page_compressed_encrypted && key_version != 0) { if (!page_compressed_encrypted && key_version != 0) {
......
...@@ -595,6 +595,14 @@ buf_page_is_checksum_valid_crc32( ...@@ -595,6 +595,14 @@ buf_page_is_checksum_valid_crc32(
{ {
ib_uint32_t crc32 = buf_calc_page_crc32(read_buf); ib_uint32_t crc32 = buf_calc_page_crc32(read_buf);
#ifdef UNIV_DEBUG_LEVEL2
if (!(checksum_field1 == crc32 && checksum_field2 == crc32)) {
ib_logf(IB_LOG_LEVEL_INFO,
"Page checksum crc32 not valid field1 %lu field2 %lu crc32 %lu.",
checksum_field1, checksum_field2, crc32);
}
#endif
return(checksum_field1 == crc32 && checksum_field2 == crc32); return(checksum_field1 == crc32 && checksum_field2 == crc32);
} }
...@@ -622,6 +630,13 @@ buf_page_is_checksum_valid_innodb( ...@@ -622,6 +630,13 @@ buf_page_is_checksum_valid_innodb(
if (checksum_field2 != mach_read_from_4(read_buf + FIL_PAGE_LSN) if (checksum_field2 != mach_read_from_4(read_buf + FIL_PAGE_LSN)
&& checksum_field2 != buf_calc_page_old_checksum(read_buf)) { && checksum_field2 != buf_calc_page_old_checksum(read_buf)) {
#ifdef UNIV_DEBUG_LEVEL2
ib_logf(IB_LOG_LEVEL_INFO,
"Page checksum innodb not valid field1 %lu field2 %lu crc32 %lu lsn %lu.",
checksum_field1, checksum_field2, buf_calc_page_old_checksum(read_buf),
mach_read_from_4(read_buf + FIL_PAGE_LSN)
);
#endif
return(false); return(false);
} }
...@@ -632,6 +647,13 @@ buf_page_is_checksum_valid_innodb( ...@@ -632,6 +647,13 @@ buf_page_is_checksum_valid_innodb(
if (checksum_field1 != 0 if (checksum_field1 != 0
&& checksum_field1 != buf_calc_page_new_checksum(read_buf)) { && checksum_field1 != buf_calc_page_new_checksum(read_buf)) {
#ifdef UNIV_DEBUG_LEVEL2
ib_logf(IB_LOG_LEVEL_INFO,
"Page checksum innodb not valid field1 %lu field2 %lu crc32 %lu lsn %lu.",
checksum_field1, checksum_field2, buf_calc_page_old_checksum(read_buf),
mach_read_from_4(read_buf + FIL_PAGE_LSN)
);
#endif
return(false); return(false);
} }
...@@ -650,6 +672,16 @@ buf_page_is_checksum_valid_none( ...@@ -650,6 +672,16 @@ buf_page_is_checksum_valid_none(
ulint checksum_field1, ulint checksum_field1,
ulint checksum_field2) ulint checksum_field2)
{ {
#ifdef UNIV_DEBUG_LEVEL2
if (!(checksum_field1 == checksum_field2 || checksum_field1 == BUF_NO_CHECKSUM_MAGIC)) {
ib_logf(IB_LOG_LEVEL_INFO,
"Page checksum none not valid field1 %lu field2 %lu crc32 %lu lsn %lu.",
checksum_field1, checksum_field2, buf_calc_page_old_checksum(read_buf),
mach_read_from_4(read_buf + FIL_PAGE_LSN)
);
}
#endif
return(checksum_field1 == checksum_field2 return(checksum_field1 == checksum_field2
&& checksum_field1 == BUF_NO_CHECKSUM_MAGIC); && checksum_field1 == BUF_NO_CHECKSUM_MAGIC);
} }
...@@ -667,9 +699,21 @@ buf_page_is_corrupted( ...@@ -667,9 +699,21 @@ buf_page_is_corrupted(
ulint zip_size) /*!< in: size of compressed page; ulint zip_size) /*!< in: size of compressed page;
0 for uncompressed pages */ 0 for uncompressed pages */
{ {
ulint page_encrypted = fil_page_is_encrypted(read_buf);
ulint checksum_field1; ulint checksum_field1;
ulint checksum_field2; ulint checksum_field2;
ulint space_id = mach_read_from_4(
read_buf + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
fil_space_crypt_t* crypt_data = fil_space_get_crypt_data(space_id);
bool page_encrypted = false;
/* Page is encrypted if encryption information is found from
tablespace and page contains used key_version. This is true
also for pages first compressed and then encrypted. */
if (crypt_data &&
crypt_data->type != CRYPT_SCHEME_UNENCRYPTED &&
fil_page_is_encrypted(read_buf)) {
page_encrypted = true;
}
if (!page_encrypted && !zip_size if (!page_encrypted && !zip_size
&& memcmp(read_buf + FIL_PAGE_LSN + 4, && memcmp(read_buf + FIL_PAGE_LSN + 4,
...@@ -679,6 +723,11 @@ buf_page_is_corrupted( ...@@ -679,6 +723,11 @@ buf_page_is_corrupted(
/* Stored log sequence numbers at the start and the end /* Stored log sequence numbers at the start and the end
of page do not match */ of page do not match */
ib_logf(IB_LOG_LEVEL_INFO,
"Log sequence number at the start %lu and the end %lu do not match.",
mach_read_from_4(read_buf + FIL_PAGE_LSN + 4),
mach_read_from_4(read_buf + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM + 4));
return(TRUE); return(TRUE);
} }
...@@ -745,6 +794,9 @@ buf_page_is_corrupted( ...@@ -745,6 +794,9 @@ buf_page_is_corrupted(
/* make sure that the page is really empty */ /* make sure that the page is really empty */
for (ulint i = 0; i < UNIV_PAGE_SIZE; i++) { for (ulint i = 0; i < UNIV_PAGE_SIZE; i++) {
if (read_buf[i] != 0) { if (read_buf[i] != 0) {
ib_logf(IB_LOG_LEVEL_INFO,
"Checksum fields zero but page is not empty.");
return(TRUE); return(TRUE);
} }
} }
...@@ -755,7 +807,7 @@ buf_page_is_corrupted( ...@@ -755,7 +807,7 @@ buf_page_is_corrupted(
DBUG_EXECUTE_IF("buf_page_is_corrupt_failure", return(TRUE); ); DBUG_EXECUTE_IF("buf_page_is_corrupt_failure", return(TRUE); );
ulint page_no = mach_read_from_4(read_buf + FIL_PAGE_OFFSET); ulint page_no = mach_read_from_4(read_buf + FIL_PAGE_OFFSET);
ulint space_id = mach_read_from_4(read_buf + FIL_PAGE_SPACE_ID);
const srv_checksum_algorithm_t curr_algo = const srv_checksum_algorithm_t curr_algo =
static_cast<srv_checksum_algorithm_t>(srv_checksum_algorithm); static_cast<srv_checksum_algorithm_t>(srv_checksum_algorithm);
...@@ -4645,7 +4697,7 @@ buf_page_io_complete( ...@@ -4645,7 +4697,7 @@ buf_page_io_complete(
if (io_type == BUF_IO_READ) { if (io_type == BUF_IO_READ) {
ulint read_page_no; ulint read_page_no;
ulint read_space_id; ulint read_space_id;
byte* frame; byte* frame = NULL;
if (!buf_page_decrypt_after_read(bpage)) { if (!buf_page_decrypt_after_read(bpage)) {
/* encryption error! */ /* encryption error! */
...@@ -4654,7 +4706,8 @@ buf_page_io_complete( ...@@ -4654,7 +4706,8 @@ buf_page_io_complete(
} else { } else {
frame = ((buf_block_t*) bpage)->frame; frame = ((buf_block_t*) bpage)->frame;
} }
goto corrupt;
goto database_corrupted;
} }
if (buf_page_get_zip_size(bpage)) { if (buf_page_get_zip_size(bpage)) {
...@@ -4666,7 +4719,8 @@ buf_page_io_complete( ...@@ -4666,7 +4719,8 @@ buf_page_io_complete(
os_atomic_decrement_ulint( os_atomic_decrement_ulint(
&buf_pool->n_pend_unzip, 1); &buf_pool->n_pend_unzip, 1);
goto corrupt;
goto database_corrupted;
} }
os_atomic_decrement_ulint(&buf_pool->n_pend_unzip, 1); os_atomic_decrement_ulint(&buf_pool->n_pend_unzip, 1);
} else { } else {
...@@ -4731,7 +4785,7 @@ buf_page_io_complete( ...@@ -4731,7 +4785,7 @@ buf_page_io_complete(
} }
goto page_not_corrupt; goto page_not_corrupt;
;); ;);
corrupt: database_corrupted:
bool corrupted = buf_page_check_corrupt(bpage); bool corrupted = buf_page_check_corrupt(bpage);
...@@ -6374,6 +6428,19 @@ buf_page_decrypt_after_read( ...@@ -6374,6 +6428,19 @@ buf_page_decrypt_after_read(
bool page_compressed_encrypted = fil_page_is_compressed_encrypted(dst_frame); bool page_compressed_encrypted = fil_page_is_compressed_encrypted(dst_frame);
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage); buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
bool success = true; bool success = true;
ulint space_id = mach_read_from_4(
dst_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
fil_space_crypt_t* crypt_data = fil_space_get_crypt_data(space_id);
/* Page is encrypted if encryption information is found from
tablespace and page contains used key_version. This is true
also for pages first compressed and then encrypted. */
if (!crypt_data ||
(crypt_data &&
crypt_data->type == CRYPT_SCHEME_UNENCRYPTED &&
key_version != 0)) {
key_version = 0;
}
/* If page is encrypted read post-encryption checksum */ /* If page is encrypted read post-encryption checksum */
if (!page_compressed_encrypted && key_version != 0) { if (!page_compressed_encrypted && key_version != 0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment