Commit 3627dd7f authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-16416 Crash on IMPORT TABLESPACE of a ROW_FORMAT=COMPRESSED table

fil_iterate(): Invoke fil_encrypt_buf() correctly when
a ROW_FORMAT=COMPRESSED table with a physical page size of
innodb_page_size is being imported. Also, validate the page checksum
before decryption, and reduce the scope of some variables.

AbstractCallback::operator()(): Remove the parameter 'offset'.
The check for it in FetchIndexRootPages::operator() was basically
redundant and dead code since the previous refactoring.
parent 1d4e1d32
call mtr.add_suppression("InnoDB: Tablespace for table .* is set as discarded.");
call mtr.add_suppression("InnoDB: Cannot calculate statistics for table .* because the .ibd file is missing. Please refer to .* for how to resolve the issue.");
SET @innodb_file_format_orig = @@GLOBAL.innodb_file_format;
SET @innodb_file_per_table_orig = @@GLOBAL.innodb_file_per_table;
SET @innodb_compression_algo = @@GLOBAL.innodb_compression_algorithm;
SET GLOBAL innodb_file_format = `Barracuda`;
SET GLOBAL innodb_file_per_table = ON;
SET GLOBAL innodb_compression_algorithm = 1;
......@@ -134,3 +137,6 @@ NOT FOUND /tmpres/ in t3.ibd
NOT FOUND /mysql/ in t4.ibd
DROP PROCEDURE innodb_insert_proc;
DROP TABLE t1,t2,t3,t4;
SET GLOBAL innodb_file_format = @innodb_file_format_orig;
SET GLOBAL innodb_file_per_table = @innodb_file_per_table_orig;
SET GLOBAL innodb_compression_algorithm = @innodb_compression_algo;
-- source include/have_innodb.inc
-- source include/innodb_page_size_small.inc
-- source include/have_file_key_management_plugin.inc
# embedded does not support restart
-- source include/not_embedded.inc
-- source include/not_valgrind.inc
# Avoid CrashReporter popup on Mac
-- source include/not_crashrep.inc
#
# MDEV-8770: Incorrect error message when importing page compressed tablespace
......@@ -13,17 +9,13 @@
call mtr.add_suppression("InnoDB: Tablespace for table .* is set as discarded.");
call mtr.add_suppression("InnoDB: Cannot calculate statistics for table .* because the .ibd file is missing. Please refer to .* for how to resolve the issue.");
--disable_query_log
let $innodb_file_format_orig = `SELECT @@innodb_file_format`;
let $innodb_file_per_table_orig = `SELECT @@innodb_file_per_table`;
let $innodb_compression_algo = `SELECT @@innodb_compression_algorithm`;
--enable_query_log
SET @innodb_file_format_orig = @@GLOBAL.innodb_file_format;
SET @innodb_file_per_table_orig = @@GLOBAL.innodb_file_per_table;
SET @innodb_compression_algo = @@GLOBAL.innodb_compression_algorithm;
--disable_warnings
SET GLOBAL innodb_file_format = `Barracuda`;
SET GLOBAL innodb_file_per_table = ON;
SET GLOBAL innodb_compression_algorithm = 1;
--enable_warnings
--let $MYSQLD_TMPDIR = `SELECT @@tmpdir`
--let $MYSQLD_DATADIR = `SELECT @@datadir`
......@@ -37,7 +29,9 @@ create table t1(c1 bigint not null, b char(200)) engine=innodb encrypted=yes en
show warnings;
create table t2(c1 bigint not null, b char(200)) engine=innodb page_compressed=1 encrypted=yes encryption_key_id=4;
show warnings;
create table t3(c1 bigint not null, b char(200)) engine=innodb row_format=compressed encrypted=yes encryption_key_id=4;
let $kbs= `select floor(@@global.innodb_page_size/1024)`;
--replace_regex / key_block_size=\d+//i
eval create table t3(c1 bigint not null, b char(200)) engine=innodb row_format=compressed encrypted=yes encryption_key_id=4 key_block_size=$kbs;
show warnings;
create table t4(c1 bigint not null, b char(200)) engine=innodb page_compressed=1;
show warnings;
......@@ -98,6 +92,7 @@ ALTER TABLE t2 IMPORT TABLESPACE;
SHOW CREATE TABLE t2;
SELECT COUNT(*) FROM t2;
ALTER TABLE t3 IMPORT TABLESPACE;
--replace_regex / key_block_size=\d+//i
SHOW CREATE TABLE t3;
SELECT COUNT(*) FROM t3;
ALTER TABLE t4 IMPORT TABLESPACE;
......@@ -125,11 +120,6 @@ SELECT COUNT(*) FROM t4;
DROP PROCEDURE innodb_insert_proc;
DROP TABLE t1,t2,t3,t4;
# reset system
--disable_warnings
--disable_query_log
EVAL SET GLOBAL innodb_file_per_table = $innodb_file_per_table_orig;
EVAL SET GLOBAL innodb_file_format = $innodb_file_format_orig;
EVAL SET GLOBAL innodb_compression_algorithm = $innodb_compression_algo;
--enable_query_log
--enable_warnings
SET GLOBAL innodb_file_format = @innodb_file_format_orig;
SET GLOBAL innodb_file_per_table = @innodb_file_per_table_orig;
SET GLOBAL innodb_compression_algorithm = @innodb_compression_algo;
......@@ -427,12 +427,9 @@ class AbstractCallback
updated then its state must be set to BUF_PAGE_NOT_USED. For
compressed tables the page descriptor memory will be at offset:
block->frame + UNIV_PAGE_SIZE;
@param offset - physical offset within the file
@param block - block read from file, note it is not from the buffer pool
@param block block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
virtual dberr_t operator()(
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW = 0;
virtual dberr_t operator()(buf_block_t* block) UNIV_NOTHROW = 0;
/**
@return the space id of the tablespace */
......@@ -689,12 +686,9 @@ struct FetchIndexRootPages : public AbstractCallback {
/**
Called for each block as it is read from the file.
@param offset - physical offset in the file
@param block - block to convert, it is not from the buffer pool.
@param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */
virtual dberr_t operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW;
dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
/** Update the import configuration that will be used to import
the tablespace. */
......@@ -712,13 +706,9 @@ Called for each block as it is read from the file. Check index pages to
determine the exact row format. We can't get that from the tablespace
header flags alone.
@param offset - physical offset in the file
@param block - block to convert, it is not from the buffer pool.
@param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */
dberr_t
FetchIndexRootPages::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
dberr_t FetchIndexRootPages::operator()(buf_block_t* block) UNIV_NOTHROW
{
if (is_interrupted()) return DB_INTERRUPTED;
......@@ -726,15 +716,7 @@ FetchIndexRootPages::operator() (
ulint page_type = fil_page_get_type(page);
if (block->page.offset * m_page_size != offset) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Page offset doesn't match file offset: "
"page offset: %u, file offset: " ULINTPF,
block->page.offset,
(ulint) (offset / m_page_size));
return DB_CORRUPTION;
} else if (page_type == FIL_PAGE_TYPE_XDES) {
if (page_type == FIL_PAGE_TYPE_XDES) {
return set_current_xdes(block->page.offset, page);
} else if (page_type == FIL_PAGE_INDEX
&& !is_free(block->page.offset)
......@@ -877,12 +859,9 @@ class PageConverter : public AbstractCallback {
/**
Called for each block as it is read from the file.
@param offset - physical offset in the file
@param block - block to convert, it is not from the buffer pool.
@param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */
virtual dberr_t operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW;
dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
private:
/**
Update the page, set the space id, max trx id and index id.
......@@ -2039,10 +2018,9 @@ PageConverter::update_page(
/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED.
@param block - block read from file, note it is not from the buffer pool
@param block block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
dberr_t
PageConverter::operator() (os_offset_t, buf_block_t* block) UNIV_NOTHROW
dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
{
/* If we already had an old page with matching number
in the buffer pool, evict it now, because
......@@ -3430,26 +3408,15 @@ fil_iterate(
}
bool updated = false;
os_offset_t page_off = offset;
ulint n_pages_read = (ulint) n_bytes / iter.page_size;
const ulint size = iter.page_size;
block->page.offset = page_off / size;
block->page.offset = offset / size;
for (ulint i = 0; i < n_pages_read;
++i, page_off += size, block->frame += size,
block->page.offset++) {
bool decrypted = false;
dberr_t err = DB_SUCCESS;
++i, block->frame += size, block->page.offset++) {
byte* src = readptr + (i * size);
byte* dst = io_buffer + (i * size);
bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
const bool page_compressed
= page_type
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
const ulint page_no = page_get_page_no(src);
if (!page_no && page_off) {
if (!page_no && block->page.offset) {
const ulint* b = reinterpret_cast<const ulint*>
(src);
const ulint* const e = b + size / sizeof *b;
......@@ -3464,11 +3431,46 @@ fil_iterate(
continue;
}
if (page_no != page_off / size) {
goto page_corrupted;
if (page_no != block->page.offset) {
page_corrupted:
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset "
UINT64PF " looks corrupted.",
callback.filename(),
ulong(offset / size), offset);
return DB_CORRUPTION;
}
if (encrypted) {
bool decrypted = false;
dberr_t err = DB_SUCCESS;
byte* dst = io_buffer + (i * size);
bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
const bool page_compressed
= page_type
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
if (!encrypted) {
} else if (!mach_read_from_4(
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ src)) {
not_encrypted:
if (!page_compressed
&& !block->page.zip.data) {
block->frame = src;
frame_changed = true;
} else {
ut_ad(dst != src);
memcpy(dst, src, size);
}
} else {
if (!fil_space_verify_crypt_checksum(
src, callback.get_zip_size(),
NULL, block->page.offset)) {
goto page_corrupted;
}
decrypted = fil_space_decrypt(
iter.crypt_data, dst,
iter.page_size, src, &err);
......@@ -3477,18 +3479,11 @@ fil_iterate(
return err;
}
if (decrypted) {
updated = true;
} else {
if (!page_compressed
&& !block->page.zip.data) {
block->frame = src;
frame_changed = true;
} else {
ut_ad(dst != src);
memcpy(dst, src, size);
}
if (!decrypted) {
goto not_encrypted;
}
updated = true;
}
/* If the original page is page_compressed, we need
......@@ -3502,16 +3497,10 @@ fil_iterate(
encrypted && !frame_changed
? dst : src,
callback.get_zip_size(), NULL)) {
page_corrupted:
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset "
UINT64PF " looks corrupted.",
callback.filename(),
ulong(offset / size), offset);
return DB_CORRUPTION;
goto page_corrupted;
}
if ((err = callback(page_off, block)) != DB_SUCCESS) {
if ((err = callback(block)) != DB_SUCCESS) {
return err;
} else if (!updated) {
updated = buf_block_get_state(block)
......@@ -3584,19 +3573,15 @@ fil_iterate(
buffer pool is not being used at all! */
if (decrypted && encrypted) {
byte *dest = writeptr + (i * size);
ulint space = mach_read_from_4(
src + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
ulint offset = mach_read_from_4(src + FIL_PAGE_OFFSET);
ib_uint64_t lsn = mach_read_from_8(src + FIL_PAGE_LSN);
byte* tmp = fil_encrypt_buf(
iter.crypt_data,
space,
offset,
lsn,
src,
iter.page_size == UNIV_PAGE_SIZE ? 0 : iter.page_size,
dest);
iter.crypt_data,
callback.get_space_id(),
block->page.offset,
mach_read_from_8(src + FIL_PAGE_LSN),
src,
callback.get_zip_size(),
dest);
if (tmp == src) {
/* TODO: remove unnecessary memcpy's */
......
......@@ -427,12 +427,9 @@ class AbstractCallback
updated then its state must be set to BUF_PAGE_NOT_USED. For
compressed tables the page descriptor memory will be at offset:
block->frame + UNIV_PAGE_SIZE;
@param offset - physical offset within the file
@param block - block read from file, note it is not from the buffer pool
@param block block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
virtual dberr_t operator()(
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW = 0;
virtual dberr_t operator()(buf_block_t* block) UNIV_NOTHROW = 0;
/**
@return the space id of the tablespace */
......@@ -689,12 +686,9 @@ struct FetchIndexRootPages : public AbstractCallback {
/**
Called for each block as it is read from the file.
@param offset - physical offset in the file
@param block - block to convert, it is not from the buffer pool.
@param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */
virtual dberr_t operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW;
dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
/** Update the import configuration that will be used to import
the tablespace. */
......@@ -712,13 +706,9 @@ Called for each block as it is read from the file. Check index pages to
determine the exact row format. We can't get that from the tablespace
header flags alone.
@param offset - physical offset in the file
@param block - block to convert, it is not from the buffer pool.
@param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */
dberr_t
FetchIndexRootPages::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
dberr_t FetchIndexRootPages::operator()(buf_block_t* block) UNIV_NOTHROW
{
if (is_interrupted()) return DB_INTERRUPTED;
......@@ -726,15 +716,7 @@ FetchIndexRootPages::operator() (
ulint page_type = fil_page_get_type(page);
if (block->page.offset * m_page_size != offset) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Page offset doesn't match file offset: "
"page offset: %u, file offset: " ULINTPF,
block->page.offset,
(ulint) (offset / m_page_size));
return DB_CORRUPTION;
} else if (page_type == FIL_PAGE_TYPE_XDES) {
if (page_type == FIL_PAGE_TYPE_XDES) {
return set_current_xdes(block->page.offset, page);
} else if (page_type == FIL_PAGE_INDEX
&& !is_free(block->page.offset)
......@@ -877,12 +859,9 @@ class PageConverter : public AbstractCallback {
/**
Called for each block as it is read from the file.
@param offset - physical offset in the file
@param block - block to convert, it is not from the buffer pool.
@param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */
virtual dberr_t operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW;
dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
private:
/**
Update the page, set the space id, max trx id and index id.
......@@ -2038,10 +2017,9 @@ PageConverter::update_page(
/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED.
@param block - block read from file, note it is not from the buffer pool
@param block block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
dberr_t
PageConverter::operator() (os_offset_t, buf_block_t* block) UNIV_NOTHROW
dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
{
/* If we already had an old page with matching number
in the buffer pool, evict it now, because
......@@ -3429,26 +3407,15 @@ fil_iterate(
}
bool updated = false;
os_offset_t page_off = offset;
ulint n_pages_read = (ulint) n_bytes / iter.page_size;
const ulint size = iter.page_size;
block->page.offset = page_off / size;
block->page.offset = offset / size;
for (ulint i = 0; i < n_pages_read;
++i, page_off += size, block->frame += size,
block->page.offset++) {
bool decrypted = false;
dberr_t err = DB_SUCCESS;
++i, block->frame += size, block->page.offset++) {
byte* src = readptr + (i * size);
byte* dst = io_buffer + (i * size);
bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
const bool page_compressed
= page_type
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
const ulint page_no = page_get_page_no(src);
if (!page_no && page_off) {
if (!page_no && block->page.offset) {
const ulint* b = reinterpret_cast<const ulint*>
(src);
const ulint* const e = b + size / sizeof *b;
......@@ -3463,11 +3430,46 @@ fil_iterate(
continue;
}
if (page_no != page_off / size) {
goto page_corrupted;
if (page_no != block->page.offset) {
page_corrupted:
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset "
UINT64PF " looks corrupted.",
callback.filename(),
ulong(offset / size), offset);
return DB_CORRUPTION;
}
if (encrypted) {
bool decrypted = false;
dberr_t err = DB_SUCCESS;
byte* dst = io_buffer + (i * size);
bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
const bool page_compressed
= page_type
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
if (!encrypted) {
} else if (!mach_read_from_4(
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ src)) {
not_encrypted:
if (!page_compressed
&& !block->page.zip.data) {
block->frame = src;
frame_changed = true;
} else {
ut_ad(dst != src);
memcpy(dst, src, size);
}
} else {
if (!fil_space_verify_crypt_checksum(
src, callback.get_zip_size(),
NULL, block->page.offset)) {
goto page_corrupted;
}
decrypted = fil_space_decrypt(
iter.crypt_data, dst,
iter.page_size, src, &err);
......@@ -3476,18 +3478,11 @@ fil_iterate(
return err;
}
if (decrypted) {
updated = true;
} else {
if (!page_compressed
&& !block->page.zip.data) {
block->frame = src;
frame_changed = true;
} else {
ut_ad(dst != src);
memcpy(dst, src, size);
}
if (!decrypted) {
goto not_encrypted;
}
updated = true;
}
/* If the original page is page_compressed, we need
......@@ -3501,16 +3496,10 @@ fil_iterate(
encrypted && !frame_changed
? dst : src,
callback.get_zip_size(), NULL)) {
page_corrupted:
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset "
UINT64PF " looks corrupted.",
callback.filename(),
ulong(offset / size), offset);
return DB_CORRUPTION;
goto page_corrupted;
}
if ((err = callback(page_off, block)) != DB_SUCCESS) {
if ((err = callback(block)) != DB_SUCCESS) {
return err;
} else if (!updated) {
updated = buf_block_get_state(block)
......@@ -3583,19 +3572,15 @@ fil_iterate(
buffer pool is not being used at all! */
if (decrypted && encrypted) {
byte *dest = writeptr + (i * size);
ulint space = mach_read_from_4(
src + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
ulint offset = mach_read_from_4(src + FIL_PAGE_OFFSET);
ib_uint64_t lsn = mach_read_from_8(src + FIL_PAGE_LSN);
byte* tmp = fil_encrypt_buf(
iter.crypt_data,
space,
offset,
lsn,
src,
iter.page_size == UNIV_PAGE_SIZE ? 0 : iter.page_size,
dest);
iter.crypt_data,
callback.get_space_id(),
block->page.offset,
mach_read_from_8(src + FIL_PAGE_LSN),
src,
callback.get_zip_size(),
dest);
if (tmp == src) {
/* TODO: remove unnecessary memcpy's */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment