Commit b82c602d authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12602 InnoDB: Failing assertion: space->n_pending_ops == 0

This fixes a regression caused by MDEV-12428.
When we introduced a variant of fil_space_acquire() that could
increment space->n_pending_ops after space->stop_new_ops was set,
the logic of fil_check_pending_operations() was broken.

fil_space_t::n_pending_ios: A new field to track read or write
access from the buffer pool routines immediately before a block
write or after a block read in the file system.

fil_space_acquire_for_io(), fil_space_release_for_io(): Similar
to fil_space_acquire_silent() and fil_space_release(), but
modify fil_space_t::n_pending_ios instead of fil_space_t::n_pending_ops.

Adjust a number of places accordingly, and remove some redundant
tablespace lookups.

The following parts of this fix differ from the 10.2 version of this fix:

buf_page_get_corrupt(): Add a tablespace parameter.

In 10.2, we already had a two-phase process of freeing fil_space objects
(first, fil_space_detach(), then release fil_system->mutex, and finally
free the fil_space and fil_node objects).

fil_space_free_and_mutex_exit(): Renamed from fil_space_free().
Detach the tablespace from the fil_system cache, release the
fil_system->mutex, and then wait for space->n_pending_ios to reach 0,
to avoid accessing freed data in a concurrent thread.
During the wait, future calls to fil_space_acquire_for_io() will
not find this tablespace, and the count can only be decremented to 0,
at which point it is safe to free the objects.

fil_node_free_part1(), fil_node_free_part2(): Refactored from
fil_node_free().
parent 6935d660
......@@ -317,6 +317,15 @@ on the io_type */
? (counter##_READ) \
: (counter##_WRITTEN))
/** Decrypt a page.
@param[in,out] bpage Page control block
@param[in,out] space tablespace
@return whether the operation was successful */
static
bool
buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
MY_ATTRIBUTE((nonnull));
/* prototypes for new functions added to ha_innodb.cc */
trx_t* innobase_get_trx();
......@@ -483,16 +492,13 @@ buf_block_alloc(
}
#endif /* !UNIV_HOTBACKUP */
/********************************************************************//**
Checks if a page is all zeroes.
@return TRUE if the page is all zeroes */
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
@return whether the page is all zeroes */
UNIV_INTERN
bool
buf_page_is_zeroes(
/*===============*/
const byte* read_buf, /*!< in: a database page */
const ulint zip_size) /*!< in: size of compressed page;
0 for uncompressed pages */
buf_page_is_zeroes(const byte* read_buf, ulint zip_size)
{
const ulint page_size = zip_size ? zip_size : UNIV_PAGE_SIZE;
......@@ -607,8 +613,7 @@ buf_page_is_checksum_valid_none(
&& checksum_field1 == BUF_NO_CHECKSUM_MAGIC);
}
/********************************************************************//**
Checks if a page is corrupt.
/** Check if a page is corrupt.
@param[in] check_lsn true if LSN should be checked
@param[in] read_buf Page to be checked
@param[in] zip_size compressed size or 0
......@@ -4439,34 +4444,30 @@ buf_mark_space_corrupt(
buf_pool_mutex_exit(buf_pool);
}
/********************************************************************//**
Check if page is maybe compressed, encrypted or both when we encounter
/** Check if page is maybe compressed, encrypted or both when we encounter
corrupted page. Note that we can't be 100% sure if page is corrupted
or decrypt/decompress just failed.
@param[in,out] bpage Page
@return DB_SUCCESS if page has been read and is not corrupted,
@retval DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
@retval DB_DECRYPTION_FAILED if page post encryption checksum matches but
@param[in,out] bpage page
@param[in,out] space tablespace from fil_space_acquire_for_io()
@return whether the operation succeeded
@retval DB_SUCCESS if page has been read and is not corrupted
@retval DB_PAGE_CORRUPTED if page based on checksum check is corrupted
@retval DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does not match.
@retval DB_TABLESPACE_DELETED if accessed tablespace is not found */
@retval DB_TABLESPACE_DELETED if accessed tablespace is not found */
static
dberr_t
buf_page_check_corrupt(buf_page_t* bpage)
buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space)
{
ut_ad(space->n_pending_ios > 0);
ulint zip_size = buf_page_get_zip_size(bpage);
byte* dst_frame = (zip_size) ? bpage->zip.data :
((buf_block_t*) bpage)->frame;
FilSpace space(bpage->space, true);
bool still_encrypted = false;
dberr_t err = DB_SUCCESS;
bool corrupted = false;
fil_space_crypt_t* crypt_data = NULL;
if (!space()) {
return(DB_TABLESPACE_DELETED);
}
crypt_data = space()->crypt_data;
fil_space_crypt_t* crypt_data = space->crypt_data;
/* In buf_decrypt_after_read we have either decrypted the page if
page post encryption checksum matches and used key_id is found
......@@ -4478,11 +4479,12 @@ buf_page_check_corrupt(buf_page_t* bpage)
crypt_data->type != CRYPT_SCHEME_UNENCRYPTED &&
!bpage->encrypted &&
fil_space_verify_crypt_checksum(dst_frame, zip_size,
space(), bpage->offset));
space, bpage->offset));
if (!still_encrypted) {
/* If traditional checksums match, we assume that page is
not anymore encrypted. */
corrupted = buf_page_is_corrupted(true, dst_frame, zip_size, space());
corrupted = buf_page_is_corrupted(true, dst_frame, zip_size,
space);
if (!corrupted) {
bpage->encrypted = false;
......@@ -4505,7 +4507,7 @@ buf_page_check_corrupt(buf_page_t* bpage)
", page number=%u]"
" in file %s cannot be decrypted.",
bpage->space, bpage->offset,
space()->name);
space->name);
ib_logf(IB_LOG_LEVEL_INFO,
"However key management plugin or used key_version " ULINTPF
......@@ -4523,28 +4525,24 @@ buf_page_check_corrupt(buf_page_t* bpage)
return (err);
}
/********************************************************************//**
Completes an asynchronous read or write request of a file page to or from
the buffer pool.
/** Complete a read or write request of a file page to or from the buffer pool.
@param[in,out] bpage Page to complete
@param[in] evict whether or not to evict the page
from LRU list.
@return DB_SUCCESS if page has been read and is not corrupted,
DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does not match.
in write only DB_SUCCESS is possible. */
@return whether the operation succeeded
@retval DB_SUCCESS always when writing, or if a read page was OK
@retval DB_PAGE_CORRUPTED if the checksum fails on a page read
@retval DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does
not match */
UNIV_INTERN
dberr_t
buf_page_io_complete(
buf_page_t* bpage,
bool evict)
buf_page_io_complete(buf_page_t* bpage, bool evict)
{
enum buf_io_fix io_type;
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
const ibool uncompressed = (buf_page_get_state(bpage)
== BUF_BLOCK_FILE_PAGE);
fil_space_t* space = NULL;
byte* frame = NULL;
dberr_t err = DB_SUCCESS;
......@@ -4564,7 +4562,13 @@ buf_page_io_complete(
ulint read_space_id = 0;
uint key_version = 0;
buf_page_decrypt_after_read(bpage);
ut_ad(bpage->zip.data || ((buf_block_t*)bpage)->frame);
fil_space_t* space = fil_space_acquire_for_io(bpage->space);
if (!space) {
return(DB_TABLESPACE_DELETED);
}
buf_page_decrypt_after_read(bpage, space);
if (buf_page_get_zip_size(bpage)) {
frame = bpage->zip.data;
......@@ -4635,7 +4639,7 @@ buf_page_io_complete(
bpage->offset);
}
err = buf_page_check_corrupt(bpage);
err = buf_page_check_corrupt(bpage, space);
database_corrupted:
......@@ -4647,6 +4651,7 @@ buf_page_io_complete(
buf_mark_space_corrupt(bpage);
ib_logf(IB_LOG_LEVEL_INFO,
"Simulated page corruption");
fil_space_release_for_io(space);
return(err);
}
err = DB_SUCCESS;
......@@ -4654,9 +4659,6 @@ buf_page_io_complete(
);
if (err == DB_PAGE_CORRUPTED) {
fil_system_enter();
space = fil_space_get_by_id(bpage->space);
ib_logf(IB_LOG_LEVEL_ERROR,
"Database page corruption on disk"
" or a failed file read of tablespace %s"
......@@ -4667,8 +4669,6 @@ buf_page_io_complete(
space->name,
bpage->space, bpage->offset);
fil_system_exit();
buf_page_print(frame, buf_page_get_zip_size(bpage),
BUF_PAGE_PRINT_NO_CRASH);
......@@ -4693,6 +4693,7 @@ buf_page_io_complete(
table as corrupted instead of crashing server */
if (bpage->space > TRX_SYS_SPACE) {
buf_mark_space_corrupt(bpage);
fil_space_release_for_io(space);
return(err);
} else {
ib_logf(IB_LOG_LEVEL_FATAL,
......@@ -4730,6 +4731,8 @@ buf_page_io_complete(
}
}
fil_space_release_for_io(space);
} else {
/* io_type == BUF_IO_WRITE */
if (bpage->slot) {
......@@ -6156,16 +6159,17 @@ buf_page_encrypt_before_write(
return dst_frame;
}
/********************************************************************//**
Decrypt page after it has been read from disk
@param[in,out] bpage Page control block
@return true if successfull, false if something went wrong
*/
UNIV_INTERN
/** Decrypt a page.
@param[in,out] bpage Page control block
@param[in,out] space tablespace
@return whether the operation was successful */
static
bool
buf_page_decrypt_after_read(
buf_page_t* bpage)
buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
{
ut_ad(space->n_pending_ios > 0);
ut_ad(space->id == bpage->space);
ulint zip_size = buf_page_get_zip_size(bpage);
ulint size = (zip_size) ? zip_size : UNIV_PAGE_SIZE;
......@@ -6183,12 +6187,10 @@ buf_page_decrypt_after_read(
return (true);
}
FilSpace space(bpage->space, false, true);
/* Page is encrypted if encryption information is found from
tablespace and page contains used key_version. This is true
also for pages first compressed and then encrypted. */
if (!space() || !space()->crypt_data) {
if (!space->crypt_data) {
key_version = 0;
}
......@@ -6225,9 +6227,9 @@ buf_page_decrypt_after_read(
/* Mark page encrypted in case it should
be. */
if (key_version && space()->crypt_data &&
space()->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED) {
bpage->encrypted=true;
if (space->crypt_data->type
!= CRYPT_SCHEME_UNENCRYPTED) {
bpage->encrypted = true;
}
return (false);
......@@ -6241,12 +6243,8 @@ buf_page_decrypt_after_read(
#endif
/* decrypt using crypt_buf to dst_frame */
byte* res = fil_space_decrypt(space(),
slot->crypt_buf,
dst_frame,
&bpage->encrypted);
if (!res) {
if (!fil_space_decrypt(space, slot->crypt_buf,
dst_frame, &bpage->encrypted)) {
success = false;
}
......@@ -6277,5 +6275,6 @@ buf_page_decrypt_after_read(
}
}
ut_ad(space->n_pending_ios > 0);
return (success);
}
......@@ -831,7 +831,7 @@ buf_flush_write_block_low(
buf_flush_t flush_type, /*!< in: type of flush */
bool sync) /*!< in: true if sync IO request */
{
fil_space_t* space = fil_space_acquire(bpage->space, true);
fil_space_t* space = fil_space_acquire_for_io(bpage->space);
if (!space) {
return;
}
......@@ -956,6 +956,13 @@ buf_flush_write_block_low(
ut_ad(flush_type == BUF_FLUSH_SINGLE_PAGE);
fil_flush(space);
/* The tablespace could already have been dropped,
because fil_io(request, sync) would already have
decremented the node->n_pending. However,
buf_page_io_complete() only needs to look up the
tablespace during read requests, not during writes. */
ut_ad(buf_page_get_io_fix(bpage) == BUF_IO_WRITE);
/* true means we want to evict this page from the
LRU list as well. */
#ifdef UNIV_DEBUG
......@@ -966,7 +973,7 @@ buf_flush_write_block_low(
ut_ad(err == DB_SUCCESS);
}
fil_space_release(space);
fil_space_release_for_io(space);
/* Increment the counter of I/O operations used
for selecting LRU policy. */
......
......@@ -683,7 +683,7 @@ fil_space_encrypt(
}
fil_space_crypt_t* crypt_data = space->crypt_data;
ut_ad(space->n_pending_ops);
ut_ad(space->n_pending_ios > 0);
ulint zip_size = fsp_flags_get_zip_size(space->flags);
byte* tmp = fil_encrypt_buf(crypt_data, space->id, offset, lsn, src_frame, zip_size, dst_frame);
......@@ -860,7 +860,7 @@ fil_space_decrypt(
*decrypted = false;
ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
ut_ad(space->n_pending_ops > 0);
ut_ad(space->n_pending_ios > 0);
bool encrypted = fil_space_decrypt(
space->crypt_data,
......
This diff is collapsed.
/*****************************************************************************
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -679,13 +679,13 @@ buf_page_is_checksum_valid_none(
ulint checksum_field2)
MY_ATTRIBUTE((warn_unused_result));
/********************************************************************//**
Checks if a page is corrupt.
/** Check if a page is corrupt.
@param[in] check_lsn true if LSN should be checked
@param[in] read_buf Page to be checked
@param[in] zip_size compressed size or 0
@param[in] space Pointer to tablespace
@return true if corrupted, false if not */
UNIV_INTERN
bool
buf_page_is_corrupted(
bool check_lsn,
......@@ -693,15 +693,13 @@ buf_page_is_corrupted(
ulint zip_size,
const fil_space_t* space)
MY_ATTRIBUTE((warn_unused_result));
/********************************************************************//**
Checks if a page is all zeroes.
@return TRUE if the page is all zeroes */
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
@return whether the page is all zeroes */
UNIV_INTERN
bool
buf_page_is_zeroes(
/*===============*/
const byte* read_buf, /*!< in: a database page */
const ulint zip_size); /*!< in: size of compressed page;
0 for uncompressed pages */
buf_page_is_zeroes(const byte* read_buf, ulint zip_size);
#ifndef UNIV_HOTBACKUP
/**********************************************************************//**
Gets the space id, page offset, and byte offset within page of a
......@@ -1240,20 +1238,20 @@ buf_page_init_for_read(
version of the tablespace in case we have done
DISCARD + IMPORT */
ulint offset);/*!< in: page number */
/********************************************************************//**
Completes an asynchronous read or write request of a file page to or from
the buffer pool.
@param[in,out] bpage pointer to the block in question
@param[in] evict true if page should be evicted from LRU
@return DB_SUCCESS if page has been read and is not corrupted,
DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does not match.*/
/** Complete a read or write request of a file page to or from the buffer pool.
@param[in,out] bpage Page to complete
@param[in] evict whether or not to evict the page
from LRU list.
@return whether the operation succeeded
@retval DB_SUCCESS always when writing, or if a read page was OK
@retval DB_PAGE_CORRUPTED if the checksum fails on a page read
@retval DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does
not match */
UNIV_INTERN
dberr_t
buf_page_io_complete(
buf_page_t* bpage,
bool evict = false);
buf_page_io_complete(buf_page_t* bpage, bool evict = false)
MY_ATTRIBUTE((nonnull));
/********************************************************************//**
Calculates a folded value of a file page address to use in the page hash
......
......@@ -320,13 +320,21 @@ struct fil_space_t {
ulint n_pending_flushes; /*!< this is positive when flushing
the tablespace to disk; dropping of the
tablespace is forbidden if this is positive */
ulint n_pending_ops;/*!< this is positive when we
have pending operations against this
tablespace. The pending operations can
be ibuf merges or lock validation code
trying to read a block.
Dropping of the tablespace is forbidden
if this is positive */
/** Number of pending buffer pool operations accessing the tablespace
without holding a table lock or dict_operation_lock S-latch
that would prevent the table (and tablespace) from being
dropped. An example is change buffer merge.
The tablespace cannot be dropped while this is nonzero,
or while fil_node_t::n_pending is nonzero.
Protected by fil_system->mutex. */
ulint n_pending_ops;
/** Number of pending block read or write operations
(when a write is imminent or a read has recently completed).
The tablespace object cannot be freed while this is nonzero,
but it can be detached from fil_system.
Note that fil_node_t::n_pending tracks actual pending I/O requests.
Protected by fil_system->mutex. */
ulint n_pending_ios;
hash_node_t hash; /*!< hash chain node */
hash_node_t name_hash;/*!< hash chain the name_hash table */
#ifndef UNIV_HOTBACKUP
......@@ -646,13 +654,11 @@ Used by background threads that do not necessarily hold proper locks
for concurrency control.
@param[in] id tablespace ID
@param[in] silent whether to silently ignore missing tablespaces
@param[in] for_io whether to look up the tablespace while performing I/O
(possibly executing TRUNCATE)
@return the tablespace
@retval NULL if missing or being deleted or truncated */
UNIV_INTERN
fil_space_t*
fil_space_acquire_low(ulint id, bool silent, bool for_io = false)
fil_space_acquire_low(ulint id, bool silent)
MY_ATTRIBUTE((warn_unused_result));
/** Acquire a tablespace when it could be dropped concurrently.
......@@ -665,31 +671,45 @@ for concurrency control.
@retval NULL if missing or being deleted or truncated */
inline
fil_space_t*
fil_space_acquire(ulint id, bool for_io = false)
fil_space_acquire(ulint id)
{
return (fil_space_acquire_low(id, false, for_io));
return(fil_space_acquire_low(id, false));
}
/** Acquire a tablespace that may not exist.
Used by background threads that do not necessarily hold proper locks
for concurrency control.
@param[in] id tablespace ID
@param[in] for_io whether to look up the tablespace while performing I/O
(possibly executing TRUNCATE)
@return the tablespace
@retval NULL if missing or being deleted */
inline
fil_space_t*
fil_space_acquire_silent(ulint id, bool for_io = false)
fil_space_acquire_silent(ulint id)
{
return (fil_space_acquire_low(id, true, for_io));
return(fil_space_acquire_low(id, true));
}
/** Release a tablespace acquired with fil_space_acquire().
@param[in,out] space tablespace to release */
UNIV_INTERN
void
fil_space_release(fil_space_t* space);
/** Acquire a tablespace for reading or writing a block,
when it could be dropped concurrently.
@param[in] id tablespace ID
@return the tablespace
@retval NULL if missing */
UNIV_INTERN
fil_space_t*
fil_space_acquire_for_io(ulint id);
/** Release a tablespace acquired with fil_space_acquire_for_io().
@param[in,out] space tablespace to release */
UNIV_INTERN
void
fil_space_release_for_io(fil_space_t* space);
/** Return the next fil_space_t.
Once started, the caller must keep calling this until it returns NULL.
fil_space_acquire() and fil_space_release() are invoked here which
......@@ -698,6 +718,7 @@ blocks a concurrent operation from dropping the tablespace.
If NULL, use the first fil_space_t on fil_system->space_list.
@return pointer to the next fil_space_t.
@retval NULL if this was the last */
UNIV_INTERN
fil_space_t*
fil_space_next(
fil_space_t* prev_space)
......@@ -711,6 +732,7 @@ blocks a concurrent operation from dropping the tablespace.
If NULL, use the first fil_space_t on fil_system->space_list.
@return pointer to the next fil_space_t.
@retval NULL if this was the last*/
UNIV_INTERN
fil_space_t*
fil_space_keyrotate_next(
fil_space_t* prev_space)
......@@ -727,12 +749,9 @@ class FilSpace
/** Constructor: Look up the tablespace and increment the
reference count if found.
@param[in] space_id tablespace ID
@param[in] silent whether not print any errors
@param[in] for_io whether to look up the tablespace
while performing I/O
(possibly executing TRUNCATE) */
explicit FilSpace(ulint space_id, bool silent = false, bool for_io = false)
: m_space(fil_space_acquire_low(space_id, silent, for_io)) {}
@param[in] silent whether not to print any errors */
explicit FilSpace(ulint space_id, bool silent = false)
: m_space(fil_space_acquire_low(space_id, silent)) {}
/** Assignment operator: This assumes that fil_space_acquire()
has already been done for the fil_space_t. The caller must
......
......@@ -65,6 +65,15 @@ Created 11/5/1995 Heikki Tuuri
#include "fil0pagecompress.h"
#include "ha_prototypes.h"
/** Decrypt a page.
@param[in,out] bpage Page control block
@param[in,out] space tablespace
@return whether the operation was successful */
static
bool
buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
MY_ATTRIBUTE((nonnull));
/* prototypes for new functions added to ha_innodb.cc */
trx_t* innobase_get_trx();
......@@ -548,16 +557,13 @@ buf_block_alloc(
}
#endif /* !UNIV_HOTBACKUP */
/********************************************************************//**
Checks if a page is all zeroes.
@return TRUE if the page is all zeroes */
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
@return whether the page is all zeroes */
UNIV_INTERN
bool
buf_page_is_zeroes(
/*===============*/
const byte* read_buf, /*!< in: a database page */
const ulint zip_size) /*!< in: size of compressed page;
0 for uncompressed pages */
buf_page_is_zeroes(const byte* read_buf, ulint zip_size)
{
const ulint page_size = zip_size ? zip_size : UNIV_PAGE_SIZE;
......@@ -673,8 +679,7 @@ buf_page_is_checksum_valid_none(
&& checksum_field1 == BUF_NO_CHECKSUM_MAGIC);
}
/********************************************************************//**
Checks if a page is corrupt.
/** Check if a page is corrupt.
@param[in] check_lsn true if LSN should be checked
@param[in] read_buf Page to be checked
@param[in] zip_size compressed size or 0
......@@ -4526,34 +4531,30 @@ buf_mark_space_corrupt(
mutex_exit(&buf_pool->LRU_list_mutex);
}
/********************************************************************//**
Check if page is maybe compressed, encrypted or both when we encounter
/** Check if page is maybe compressed, encrypted or both when we encounter
corrupted page. Note that we can't be 100% sure if page is corrupted
or decrypt/decompress just failed.
@param[in,out] bpage Page
@return DB_SUCCESS if page has been read and is not corrupted,
@retval DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
@retval DB_DECRYPTION_FAILED if page post encryption checksum matches but
@param[in,out] bpage page
@param[in,out] space tablespace from fil_space_acquire_for_io()
@return whether the operation succeeded
@retval DB_SUCCESS if page has been read and is not corrupted
@retval DB_PAGE_CORRUPTED if page based on checksum check is corrupted
@retval DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does not match.
@retval DB_TABLESPACE_DELETED if accessed tablespace is not found */
@retval DB_TABLESPACE_DELETED if accessed tablespace is not found */
static
dberr_t
buf_page_check_corrupt(buf_page_t* bpage)
buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space)
{
ut_ad(space->n_pending_ios > 0);
ulint zip_size = buf_page_get_zip_size(bpage);
byte* dst_frame = (zip_size) ? bpage->zip.data :
((buf_block_t*) bpage)->frame;
FilSpace space(bpage->space, true);
bool still_encrypted = false;
dberr_t err = DB_SUCCESS;
bool corrupted = false;
fil_space_crypt_t* crypt_data = NULL;
if (!space()) {
return(DB_TABLESPACE_DELETED);
}
crypt_data = space()->crypt_data;
fil_space_crypt_t* crypt_data = space->crypt_data;
/* In buf_decrypt_after_read we have either decrypted the page if
page post encryption checksum matches and used key_id is found
......@@ -4565,12 +4566,12 @@ buf_page_check_corrupt(buf_page_t* bpage)
crypt_data->type != CRYPT_SCHEME_UNENCRYPTED &&
!bpage->encrypted &&
fil_space_verify_crypt_checksum(dst_frame, zip_size,
space(), bpage->offset));
space, bpage->offset));
if (!still_encrypted) {
/* If traditional checksums match, we assume that page is
not anymore encrypted. */
corrupted = buf_page_is_corrupted(true, dst_frame, zip_size, space());
corrupted = buf_page_is_corrupted(true, dst_frame, zip_size,
space);
if (!corrupted) {
bpage->encrypted = false;
......@@ -4593,7 +4594,7 @@ buf_page_check_corrupt(buf_page_t* bpage)
", page number=%u]"
" in file %s cannot be decrypted.",
bpage->space, bpage->offset,
space()->name);
space->name);
ib_logf(IB_LOG_LEVEL_INFO,
"However key management plugin or used key_version " ULINTPF
......@@ -4611,26 +4612,23 @@ buf_page_check_corrupt(buf_page_t* bpage)
return (err);
}
/********************************************************************//**
Completes an asynchronous read or write request of a file page to or from
the buffer pool.
/** Complete a read or write request of a file page to or from the buffer pool.
@param[in,out] bpage Page to complete
@return DB_SUCCESS if page has been read and is not corrupted,
DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does not match.
in write only DB_SUCCESS is possible. */
@return whether the operation succeeded
@retval DB_SUCCESS always when writing, or if a read page was OK
@retval DB_PAGE_CORRUPTED if the checksum fails on a page read
@retval DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does
not match */
UNIV_INTERN
dberr_t
buf_page_io_complete(
buf_page_t* bpage)
buf_page_io_complete(buf_page_t* bpage)
{
enum buf_io_fix io_type;
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
const ibool uncompressed = (buf_page_get_state(bpage)
== BUF_BLOCK_FILE_PAGE);
bool have_LRU_mutex = false;
fil_space_t* space = NULL;
byte* frame = NULL;
dberr_t err = DB_SUCCESS;
......@@ -4650,7 +4648,13 @@ buf_page_io_complete(
ulint read_space_id = 0;
uint key_version = 0;
buf_page_decrypt_after_read(bpage);
ut_ad(bpage->zip.data || ((buf_block_t*)bpage)->frame);
fil_space_t* space = fil_space_acquire_for_io(bpage->space);
if (!space) {
return(DB_TABLESPACE_DELETED);
}
buf_page_decrypt_after_read(bpage, space);
if (buf_page_get_zip_size(bpage)) {
frame = bpage->zip.data;
......@@ -4724,7 +4728,7 @@ buf_page_io_complete(
if (UNIV_LIKELY(!bpage->is_corrupt ||
!srv_pass_corrupt_table)) {
err = buf_page_check_corrupt(bpage);
err = buf_page_check_corrupt(bpage, space);
}
database_corrupted:
......@@ -4737,6 +4741,7 @@ buf_page_io_complete(
buf_mark_space_corrupt(bpage);
ib_logf(IB_LOG_LEVEL_INFO,
"Simulated page corruption");
fil_space_release_for_io(space);
return(err);
}
err = DB_SUCCESS;
......@@ -4744,9 +4749,6 @@ buf_page_io_complete(
);
if (err == DB_PAGE_CORRUPTED) {
fil_system_enter();
space = fil_space_get_by_id(bpage->space);
ib_logf(IB_LOG_LEVEL_ERROR,
"Database page corruption on disk"
" or a failed file read of tablespace %s"
......@@ -4757,8 +4759,6 @@ buf_page_io_complete(
space->name,
bpage->space, bpage->offset);
fil_system_exit();
buf_page_print(frame, buf_page_get_zip_size(bpage),
BUF_PAGE_PRINT_NO_CRASH);
......@@ -4795,6 +4795,7 @@ buf_page_io_complete(
table as corrupted instead of crashing server */
if (bpage->space > TRX_SYS_SPACE) {
buf_mark_space_corrupt(bpage);
fil_space_release_for_io(space);
return(err);
} else {
ib_logf(IB_LOG_LEVEL_FATAL,
......@@ -4833,6 +4834,8 @@ buf_page_io_complete(
}
}
fil_space_release_for_io(space);
} else {
/* io_type == BUF_IO_WRITE */
if (bpage->slot) {
......@@ -6303,16 +6306,17 @@ buf_page_encrypt_before_write(
return dst_frame;
}
/********************************************************************//**
Decrypt page after it has been read from disk
@param[in,out] bpage Page control block
@return true if successfull, false if something went wrong
*/
UNIV_INTERN
/** Decrypt a page.
@param[in,out] bpage Page control block
@param[in,out] space tablespace
@return whether the operation was successful */
static
bool
buf_page_decrypt_after_read(
buf_page_t* bpage)
buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
{
ut_ad(space->n_pending_ios > 0);
ut_ad(space->id == bpage->space);
ulint zip_size = buf_page_get_zip_size(bpage);
ulint size = (zip_size) ? zip_size : UNIV_PAGE_SIZE;
......@@ -6330,12 +6334,10 @@ buf_page_decrypt_after_read(
return (true);
}
FilSpace space(bpage->space, false, true);
/* Page is encrypted if encryption information is found from
tablespace and page contains used key_version. This is true
also for pages first compressed and then encrypted. */
if (!space() || !space()->crypt_data) {
if (!space->crypt_data) {
key_version = 0;
}
......@@ -6372,8 +6374,8 @@ buf_page_decrypt_after_read(
/* Mark page encrypted in case it should
be. */
if (key_version && space()->crypt_data &&
space()->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED) {
if (space->crypt_data->type
!= CRYPT_SCHEME_UNENCRYPTED) {
bpage->encrypted = true;
}
......@@ -6388,12 +6390,8 @@ buf_page_decrypt_after_read(
#endif
/* decrypt using crypt_buf to dst_frame */
byte* res = fil_space_decrypt(space(),
slot->crypt_buf,
dst_frame,
&bpage->encrypted);
if (!res) {
if (!fil_space_decrypt(space, slot->crypt_buf,
dst_frame, &bpage->encrypted)) {
success = false;
}
......@@ -6424,5 +6422,6 @@ buf_page_decrypt_after_read(
}
}
ut_ad(space->n_pending_ios > 0);
return (success);
}
......@@ -873,7 +873,7 @@ buf_flush_write_block_low(
buf_flush_t flush_type, /*!< in: type of flush */
bool sync) /*!< in: true if sync IO request */
{
fil_space_t* space = fil_space_acquire(bpage->space, true);
fil_space_t* space = fil_space_acquire_for_io(bpage->space);
if (!space) {
return;
}
......@@ -995,6 +995,13 @@ buf_flush_write_block_low(
ut_ad(flush_type == BUF_FLUSH_SINGLE_PAGE);
fil_flush(space);
/* The tablespace could already have been dropped,
because fil_io(request, sync) would already have
decremented the node->n_pending. However,
buf_page_io_complete() only needs to look up the
tablespace during read requests, not during writes. */
ut_ad(buf_page_get_io_fix_unlocked(bpage) == BUF_IO_WRITE);
#ifdef UNIV_DEBUG
dberr_t err =
#endif
......@@ -1003,7 +1010,7 @@ buf_flush_write_block_low(
ut_ad(err == DB_SUCCESS);
}
fil_space_release(space);
fil_space_release_for_io(space);
/* Increment the counter of I/O operations used
for selecting LRU policy. */
......
......@@ -683,7 +683,7 @@ fil_space_encrypt(
}
fil_space_crypt_t* crypt_data = space->crypt_data;
ut_ad(space->n_pending_ops);
ut_ad(space->n_pending_ios > 0);
ulint zip_size = fsp_flags_get_zip_size(space->flags);
byte* tmp = fil_encrypt_buf(crypt_data, space->id, offset, lsn, src_frame, zip_size, dst_frame);
......@@ -860,7 +860,7 @@ fil_space_decrypt(
*decrypted = false;
ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
ut_ad(space->n_pending_ops > 0);
ut_ad(space->n_pending_ios > 0);
bool encrypted = fil_space_decrypt(
space->crypt_data,
......
This diff is collapsed.
/*****************************************************************************
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -675,13 +675,13 @@ buf_page_is_checksum_valid_none(
ulint checksum_field2)
MY_ATTRIBUTE((warn_unused_result));
/********************************************************************//**
Checks if a page is corrupt.
/** Check if a page is corrupt.
@param[in] check_lsn true if LSN should be checked
@param[in] read_buf Page to be checked
@param[in] zip_size compressed size or 0
@param[in] space Pointer to tablespace
@return true if corrupted, false if not */
UNIV_INTERN
bool
buf_page_is_corrupted(
bool check_lsn,
......@@ -689,15 +689,13 @@ buf_page_is_corrupted(
ulint zip_size,
const fil_space_t* space)
MY_ATTRIBUTE((warn_unused_result));
/********************************************************************//**
Checks if a page is all zeroes.
@return TRUE if the page is all zeroes */
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
@return whether the page is all zeroes */
UNIV_INTERN
bool
buf_page_is_zeroes(
/*===============*/
const byte* read_buf, /*!< in: a database page */
const ulint zip_size); /*!< in: size of compressed page;
0 for uncompressed pages */
buf_page_is_zeroes(const byte* read_buf, ulint zip_size);
#ifndef UNIV_HOTBACKUP
/**********************************************************************//**
Gets the space id, page offset, and byte offset within page of a
......@@ -1259,18 +1257,18 @@ buf_page_init_for_read(
version of the tablespace in case we have done
DISCARD + IMPORT */
ulint offset);/*!< in: page number */
/********************************************************************//**
Completes an asynchronous read or write request of a file page to or from
the buffer pool.
@param[in,out] bpage pointer to the block in question
@return DB_SUCCESS if page has been read and is not corrupted,
DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does not match.*/
/** Complete a read or write request of a file page to or from the buffer pool.
@param[in,out] bpage Page to complete
@return whether the operation succeeded
@retval DB_SUCCESS always when writing, or if a read page was OK
@retval DB_PAGE_CORRUPTED if the checksum fails on a page read
@retval DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does
not match */
UNIV_INTERN
dberr_t
buf_page_io_complete(
buf_page_t* bpage);
buf_page_io_complete(buf_page_t* bpage)
MY_ATTRIBUTE((nonnull));
/********************************************************************//**
Calculates a folded value of a file page address to use in the page hash
table.
......
......@@ -314,13 +314,21 @@ struct fil_space_t {
ulint n_pending_flushes; /*!< this is positive when flushing
the tablespace to disk; dropping of the
tablespace is forbidden if this is positive */
ulint n_pending_ops;/*!< this is positive when we
have pending operations against this
tablespace. The pending operations can
be ibuf merges or lock validation code
trying to read a block.
Dropping of the tablespace is forbidden
if this is positive */
/** Number of pending buffer pool operations accessing the tablespace
without holding a table lock or dict_operation_lock S-latch
that would prevent the table (and tablespace) from being
dropped. An example is change buffer merge.
The tablespace cannot be dropped while this is nonzero,
or while fil_node_t::n_pending is nonzero.
Protected by fil_system->mutex. */
ulint n_pending_ops;
/** Number of pending block read or write operations
(when a write is imminent or a read has recently completed).
The tablespace object cannot be freed while this is nonzero,
but it can be detached from fil_system.
Note that fil_node_t::n_pending tracks actual pending I/O requests.
Protected by fil_system->mutex. */
ulint n_pending_ios;
hash_node_t hash; /*!< hash chain node */
hash_node_t name_hash;/*!< hash chain the name_hash table */
#ifndef UNIV_HOTBACKUP
......@@ -652,13 +660,11 @@ Used by background threads that do not necessarily hold proper locks
for concurrency control.
@param[in] id tablespace ID
@param[in] silent whether to silently ignore missing tablespaces
@param[in] for_io whether to look up the tablespace while performing I/O
(possibly executing TRUNCATE)
@return the tablespace
@retval NULL if missing or being deleted or truncated */
UNIV_INTERN
fil_space_t*
fil_space_acquire_low(ulint id, bool silent, bool for_io = false)
fil_space_acquire_low(ulint id, bool silent)
MY_ATTRIBUTE((warn_unused_result));
/** Acquire a tablespace when it could be dropped concurrently.
......@@ -671,31 +677,45 @@ for concurrency control.
@retval NULL if missing or being deleted or truncated */
inline
fil_space_t*
fil_space_acquire(ulint id, bool for_io = false)
fil_space_acquire(ulint id)
{
return (fil_space_acquire_low(id, false, for_io));
return(fil_space_acquire_low(id, false));
}
/** Acquire a tablespace that may not exist.
Used by background threads that do not necessarily hold proper locks
for concurrency control.
@param[in] id tablespace ID
@param[in] for_io whether to look up the tablespace while performing I/O
(possibly executing TRUNCATE)
@return the tablespace
@retval NULL if missing or being deleted */
inline
fil_space_t*
fil_space_acquire_silent(ulint id, bool for_io = false)
fil_space_acquire_silent(ulint id)
{
return (fil_space_acquire_low(id, true, for_io));
return(fil_space_acquire_low(id, true));
}
/** Release a tablespace acquired with fil_space_acquire().
@param[in,out] space tablespace to release */
UNIV_INTERN
void
fil_space_release(fil_space_t* space);
/** Acquire a tablespace for reading or writing a block,
when it could be dropped concurrently.
@param[in] id tablespace ID
@return the tablespace
@retval NULL if missing */
UNIV_INTERN
fil_space_t*
fil_space_acquire_for_io(ulint id);
/** Release a tablespace acquired with fil_space_acquire_for_io().
@param[in,out] space tablespace to release */
UNIV_INTERN
void
fil_space_release_for_io(fil_space_t* space);
/** Return the next fil_space_t.
Once started, the caller must keep calling this until it returns NULL.
fil_space_acquire() and fil_space_release() are invoked here which
......@@ -704,6 +724,7 @@ blocks a concurrent operation from dropping the tablespace.
If NULL, use the first fil_space_t on fil_system->space_list.
@return pointer to the next fil_space_t.
@retval NULL if this was the last */
UNIV_INTERN
fil_space_t*
fil_space_next(
fil_space_t* prev_space)
......@@ -717,6 +738,7 @@ blocks a concurrent operation from dropping the tablespace.
If NULL, use the first fil_space_t on fil_system->space_list.
@return pointer to the next fil_space_t.
@retval NULL if this was the last*/
UNIV_INTERN
fil_space_t*
fil_space_keyrotate_next(
fil_space_t* prev_space)
......@@ -733,12 +755,9 @@ class FilSpace
/** Constructor: Look up the tablespace and increment the
reference count if found.
@param[in] space_id tablespace ID
@param[in] silent whether not print any errors
@param[in] for_io whether to look up the tablespace
while performing I/O
(possibly executing TRUNCATE) */
explicit FilSpace(ulint space_id, bool silent = false, bool for_io = false)
: m_space(fil_space_acquire_low(space_id, silent, for_io)) {}
@param[in] silent whether not to print any errors */
explicit FilSpace(ulint space_id, bool silent = false)
: m_space(fil_space_acquire_low(space_id, silent)) {}
/** Assignment operator: This assumes that fil_space_acquire()
has already been done for the fil_space_t. The caller must
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment