Commit 4b24467f authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12602 InnoDB: Failing assertion: space->n_pending_ops == 0

This fixes a regression caused by MDEV-12428.
When we introduced a variant of fil_space_acquire() that could
increment space->n_pending_ops after space->stop_new_ops was set,
the logic of fil_check_pending_operations() was broken.

fil_space_t::n_pending_ios: A new field to track read or write
access from the buffer pool routines immediately before a block
write or after a block read in the file system.

fil_space_acquire_for_io(), fil_space_release_for_io(): Similar
to fil_space_acquire_silent() and fil_space_release(), but
modify fil_space_t::n_pending_ios instead of fil_space_t::n_pending_ops.

fil_space_free_low(): Wait for space->n_pending_ios to reach 0,
to avoid accessing freed data in a concurrent thread. Future
calls to fil_space_acquire_for_io() will not find this tablespace,
because it will already have been detached from fil_system.

Adjust a number of places accordingly, and remove some redundant
tablespace lookups.

FIXME: buf_page_check_corrupt() should take a tablespace from
fil_space_acquire_for_io() as a parameter. This will be done
in the 10.1 version of this patch and merged from there.
That depends on MDEV-12253, which has not been merged from 10.1 yet.
parent f740d23c
...@@ -405,10 +405,22 @@ buf_pool_register_chunk( ...@@ -405,10 +405,22 @@ buf_pool_register_chunk(
/** Decrypt a page. /** Decrypt a page.
@param[in,out] bpage Page control block @param[in,out] bpage Page control block
@param[in,out] space tablespace
@return whether the operation was successful */ @return whether the operation was successful */
static static
bool bool
buf_page_decrypt_after_read(buf_page_t* bpage); buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
MY_ATTRIBUTE((nonnull));
/** Check if page is maybe compressed, encrypted or both when we encounter
corrupted page. Note that we can't be 100% sure if page is corrupted
or decrypt/decompress just failed.
@param[in,out] bpage Page
@return true if page corrupted, false if not */
static
bool
buf_page_check_corrupt(buf_page_t* bpage)
MY_ATTRIBUTE((nonnull, warn_unused_result));
/* prototypes for new functions added to ha_innodb.cc */ /* prototypes for new functions added to ha_innodb.cc */
trx_t* innobase_get_trx(); trx_t* innobase_get_trx();
...@@ -5844,16 +5856,14 @@ buf_mark_space_corrupt( ...@@ -5844,16 +5856,14 @@ buf_mark_space_corrupt(
return(ret); return(ret);
} }
/********************************************************************//** /** Check if page is maybe compressed, encrypted or both when we encounter
Check if page is maybe compressed, encrypted or both when we encounter
corrupted page. Note that we can't be 100% sure if page is corrupted corrupted page. Note that we can't be 100% sure if page is corrupted
or decrypt/decompress just failed. or decrypt/decompress just failed.
@param[in,out] bpage Page @param[in,out] bpage Page
@return true if page corrupted, false if not */ @return true if page corrupted, false if not */
UNIV_INTERN static
bool bool
buf_page_check_corrupt( buf_page_check_corrupt(buf_page_t* bpage)
buf_page_t* bpage)
{ {
byte* dst_frame = (bpage->zip.data) ? bpage->zip.data : byte* dst_frame = (bpage->zip.data) ? bpage->zip.data :
((buf_block_t*) bpage)->frame; ((buf_block_t*) bpage)->frame;
...@@ -5956,8 +5966,13 @@ buf_page_io_complete( ...@@ -5956,8 +5966,13 @@ buf_page_io_complete(
ulint read_space_id; ulint read_space_id;
ut_ad(bpage->zip.data != NULL || ((buf_block_t*)bpage)->frame != NULL); ut_ad(bpage->zip.data != NULL || ((buf_block_t*)bpage)->frame != NULL);
fil_space_t* space = fil_space_acquire_for_io(
bpage->id.space());
if (!space) {
return false;
}
buf_page_decrypt_after_read(bpage); buf_page_decrypt_after_read(bpage, space);
if (bpage->size.is_compressed()) { if (bpage->size.is_compressed()) {
frame = bpage->zip.data; frame = bpage->zip.data;
...@@ -6031,21 +6046,17 @@ buf_page_io_complete( ...@@ -6031,21 +6046,17 @@ buf_page_io_complete(
&& buf_mark_space_corrupt(bpage)) { && buf_mark_space_corrupt(bpage)) {
ib::info() << "Simulated IMPORT " ib::info() << "Simulated IMPORT "
"corruption"; "corruption";
fil_space_release_for_io(space);
return(true); return(true);
} }
goto page_not_corrupt; goto page_not_corrupt;
); );
if (!bpage->encrypted) { if (!bpage->encrypted) {
fil_system_enter();
fil_space_t* space = fil_space_get_by_id(bpage->id.space());
fil_system_exit();
ib::error() ib::error()
<< "Database page corruption on disk" << "Database page corruption on disk"
" or a failed file read of tablespace " " or a failed file read of tablespace "
<< (space->name ? space->name : "NULL") << space->name << " page " << bpage->id
<< " page " << bpage->id
<< ". You may have to recover from " << ". You may have to recover from "
<< "a backup."; << "a backup.";
...@@ -6074,6 +6085,7 @@ buf_page_io_complete( ...@@ -6074,6 +6085,7 @@ buf_page_io_complete(
if (bpage->id.space() > TRX_SYS_SPACE if (bpage->id.space() > TRX_SYS_SPACE
&& buf_mark_space_corrupt(bpage)) { && buf_mark_space_corrupt(bpage)) {
fil_space_release_for_io(space);
return(false); return(false);
} else { } else {
if (!bpage->encrypted) { if (!bpage->encrypted) {
...@@ -6099,6 +6111,7 @@ buf_page_io_complete( ...@@ -6099,6 +6111,7 @@ buf_page_io_complete(
ut_error; ut_error;
} }
fil_space_release_for_io(space);
return(false); return(false);
} }
} }
...@@ -6142,6 +6155,8 @@ buf_page_io_complete( ...@@ -6142,6 +6155,8 @@ buf_page_io_complete(
} }
} }
fil_space_release_for_io(space);
} else { } else {
/* io_type == BUF_IO_WRITE */ /* io_type == BUF_IO_WRITE */
if (bpage->slot) { if (bpage->slot) {
...@@ -7502,11 +7517,15 @@ buf_page_encrypt_before_write( ...@@ -7502,11 +7517,15 @@ buf_page_encrypt_before_write(
/** Decrypt a page. /** Decrypt a page.
@param[in,out] bpage Page control block @param[in,out] bpage Page control block
@param[in,out] space tablespace
@return whether the operation was successful */ @return whether the operation was successful */
static static
bool bool
buf_page_decrypt_after_read(buf_page_t* bpage) buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
{ {
ut_ad(space->n_pending_ios > 0);
ut_ad(space->id == bpage->id.space());
bool compressed = bpage->size.is_compressed(); bool compressed = bpage->size.is_compressed();
const page_size_t& size = bpage->size; const page_size_t& size = bpage->size;
byte* dst_frame = compressed ? bpage->zip.data : byte* dst_frame = compressed ? bpage->zip.data :
...@@ -7525,12 +7544,10 @@ buf_page_decrypt_after_read(buf_page_t* bpage) ...@@ -7525,12 +7544,10 @@ buf_page_decrypt_after_read(buf_page_t* bpage)
return (true); return (true);
} }
FilSpace space(bpage->id.space(), true);
/* Page is encrypted if encryption information is found from /* Page is encrypted if encryption information is found from
tablespace and page contains used key_version. This is true tablespace and page contains used key_version. This is true
also for pages first compressed and then encrypted. */ also for pages first compressed and then encrypted. */
if (!space() || !space()->crypt_data) { if (!space->crypt_data) {
key_version = 0; key_version = 0;
} }
...@@ -7602,6 +7619,7 @@ buf_page_decrypt_after_read(buf_page_t* bpage) ...@@ -7602,6 +7619,7 @@ buf_page_decrypt_after_read(buf_page_t* bpage)
} }
} }
ut_ad(space->n_pending_ios > 0);
return (success); return (success);
} }
......
...@@ -1006,7 +1006,7 @@ buf_flush_write_block_low( ...@@ -1006,7 +1006,7 @@ buf_flush_write_block_low(
buf_flush_t flush_type, /*!< in: type of flush */ buf_flush_t flush_type, /*!< in: type of flush */
bool sync) /*!< in: true if sync IO request */ bool sync) /*!< in: true if sync IO request */
{ {
fil_space_t* space = fil_space_acquire(bpage->id.space(), true); fil_space_t* space = fil_space_acquire_for_io(bpage->id.space());
if (!space) { if (!space) {
return; return;
} }
...@@ -1121,10 +1121,16 @@ buf_flush_write_block_low( ...@@ -1121,10 +1121,16 @@ buf_flush_write_block_low(
/* true means we want to evict this page from the /* true means we want to evict this page from the
LRU list as well. */ LRU list as well. */
/* The tablespace could already have been dropped,
because fil_io(request, sync) would already have
decremented the node->n_pending. However,
buf_page_io_complete() only needs to look up the
tablespace during read requests, not during writes. */
ut_ad(buf_page_get_io_fix(bpage) == BUF_IO_WRITE);
buf_page_io_complete(bpage, true); buf_page_io_complete(bpage, true);
} }
fil_space_release(space); fil_space_release_for_io(space);
/* Increment the counter of I/O operations used /* Increment the counter of I/O operations used
for selecting LRU policy. */ for selecting LRU policy. */
......
...@@ -629,7 +629,7 @@ fil_space_encrypt( ...@@ -629,7 +629,7 @@ fil_space_encrypt(
fil_space_crypt_t* crypt_data = space->crypt_data; fil_space_crypt_t* crypt_data = space->crypt_data;
const page_size_t page_size(space->flags); const page_size_t page_size(space->flags);
ut_ad(space->n_pending_ops); ut_ad(space->n_pending_ios > 0);
byte* tmp = fil_encrypt_buf(crypt_data, space->id, offset, lsn, byte* tmp = fil_encrypt_buf(crypt_data, space->id, offset, lsn,
src_frame, page_size, dst_frame); src_frame, page_size, dst_frame);
...@@ -821,7 +821,7 @@ fil_space_decrypt( ...@@ -821,7 +821,7 @@ fil_space_decrypt(
*decrypted = false; *decrypted = false;
ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted()); ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
ut_ad(space->n_pending_ops > 0); ut_ad(space->n_pending_ios > 0);
bool encrypted = fil_space_decrypt(space->crypt_data, tmp_frame, bool encrypted = fil_space_decrypt(space->crypt_data, tmp_frame,
page_size, src_frame, &err); page_size, src_frame, &err);
......
...@@ -1506,6 +1506,13 @@ fil_space_free_low( ...@@ -1506,6 +1506,13 @@ fil_space_free_low(
ut_ad(srv_fast_shutdown == 2 || !srv_was_started ut_ad(srv_fast_shutdown == 2 || !srv_was_started
|| space->max_lsn == 0); || space->max_lsn == 0);
/* Wait for fil_space_release_for_io(); after
fil_space_detach(), the tablespace cannot be found, so
fil_space_acquire_for_io() would return NULL */
while (space->n_pending_ios) {
os_thread_sleep(100);
}
for (fil_node_t* node = UT_LIST_GET_FIRST(space->chain); for (fil_node_t* node = UT_LIST_GET_FIRST(space->chain);
node != NULL; ) { node != NULL; ) {
ut_d(space->size -= node->size); ut_d(space->size -= node->size);
...@@ -2267,13 +2274,11 @@ Used by background threads that do not necessarily hold proper locks ...@@ -2267,13 +2274,11 @@ Used by background threads that do not necessarily hold proper locks
for concurrency control. for concurrency control.
@param[in] id tablespace ID @param[in] id tablespace ID
@param[in] silent whether to silently ignore missing tablespaces @param[in] silent whether to silently ignore missing tablespaces
@param[in] for_io whether to look up the tablespace while performing I/O
(possibly executing TRUNCATE)
@return the tablespace @return the tablespace
@retval NULL if missing or being deleted or truncated */ @retval NULL if missing or being deleted or truncated */
inline inline
fil_space_t* fil_space_t*
fil_space_acquire_low(ulint id, bool silent, bool for_io = false) fil_space_acquire_low(ulint id, bool silent)
{ {
fil_space_t* space; fil_space_t* space;
...@@ -2286,7 +2291,7 @@ fil_space_acquire_low(ulint id, bool silent, bool for_io = false) ...@@ -2286,7 +2291,7 @@ fil_space_acquire_low(ulint id, bool silent, bool for_io = false)
ib::warn() << "Trying to access missing" ib::warn() << "Trying to access missing"
" tablespace " << id; " tablespace " << id;
} }
} else if (!for_io && space->is_stopping()) { } else if (space->is_stopping()) {
space = NULL; space = NULL;
} else { } else {
space->n_pending_ops++; space->n_pending_ops++;
...@@ -2301,14 +2306,12 @@ fil_space_acquire_low(ulint id, bool silent, bool for_io = false) ...@@ -2301,14 +2306,12 @@ fil_space_acquire_low(ulint id, bool silent, bool for_io = false)
Used by background threads that do not necessarily hold proper locks Used by background threads that do not necessarily hold proper locks
for concurrency control. for concurrency control.
@param[in] id tablespace ID @param[in] id tablespace ID
@param[in] for_io whether to look up the tablespace while performing I/O
(possibly executing TRUNCATE)
@return the tablespace @return the tablespace
@retval NULL if missing or being deleted or truncated */ @retval NULL if missing or being deleted or truncated */
fil_space_t* fil_space_t*
fil_space_acquire(ulint id, bool for_io) fil_space_acquire(ulint id)
{ {
return(fil_space_acquire_low(id, false, for_io)); return(fil_space_acquire_low(id, false));
} }
/** Acquire a tablespace that may not exist. /** Acquire a tablespace that may not exist.
...@@ -2335,6 +2338,39 @@ fil_space_release(fil_space_t* space) ...@@ -2335,6 +2338,39 @@ fil_space_release(fil_space_t* space)
mutex_exit(&fil_system->mutex); mutex_exit(&fil_system->mutex);
} }
/** Acquire a tablespace for reading or writing a block,
when it could be dropped concurrently.
@param[in] id tablespace ID
@return the tablespace
@retval NULL if missing */
fil_space_t*
fil_space_acquire_for_io(ulint id)
{
mutex_enter(&fil_system->mutex);
fil_space_t* space = fil_space_get_by_id(id);
if (space) {
space->n_pending_ios++;
}
mutex_exit(&fil_system->mutex);
return(space);
}
/** Release a tablespace acquired with fil_space_acquire_for_io().
@param[in,out] space tablespace to release */
void
fil_space_release_for_io(fil_space_t* space)
{
mutex_enter(&fil_system->mutex);
ut_ad(space->magic_n == FIL_SPACE_MAGIC_N);
ut_ad(space->n_pending_ios > 0);
space->n_pending_ios--;
mutex_exit(&fil_system->mutex);
}
/********************************************************//** /********************************************************//**
Creates the database directory for a table if it does not exist yet. */ Creates the database directory for a table if it does not exist yet. */
void void
...@@ -2836,15 +2872,15 @@ enum fil_operation_t { ...@@ -2836,15 +2872,15 @@ enum fil_operation_t {
@return 0 if no operations else count + 1. */ @return 0 if no operations else count + 1. */
static static
ulint ulint
fil_check_pending_ops( fil_check_pending_ops(const fil_space_t* space, ulint count)
fil_space_t* space,
ulint count)
{ {
ut_ad(mutex_own(&fil_system->mutex)); ut_ad(mutex_own(&fil_system->mutex));
const ulint n_pending_ops = space ? space->n_pending_ops : 0; if (space == NULL) {
return 0;
}
if (n_pending_ops) { if (ulint n_pending_ops = space->n_pending_ops) {
if (count > 5000) { if (count > 5000) {
ib::warn() << "Trying to close/delete/truncate" ib::warn() << "Trying to close/delete/truncate"
...@@ -5531,7 +5567,7 @@ fil_flush( ...@@ -5531,7 +5567,7 @@ fil_flush(
void void
fil_flush(fil_space_t* space) fil_flush(fil_space_t* space)
{ {
ut_ad(space->n_pending_ops > 0); ut_ad(space->n_pending_ios > 0);
ut_ad(space->purpose == FIL_TYPE_TABLESPACE ut_ad(space->purpose == FIL_TYPE_TABLESPACE
|| space->purpose == FIL_TYPE_IMPORT); || space->purpose == FIL_TYPE_IMPORT);
......
...@@ -631,11 +631,11 @@ fil_decompress_page( ...@@ -631,11 +631,11 @@ fil_decompress_page(
/* Note that as we have found the page is corrupted, so /* Note that as we have found the page is corrupted, so
all this could be incorrect. */ all this could be incorrect. */
ulint space_id = mach_read_from_4(buf+FIL_PAGE_SPACE_ID); ulint space_id = mach_read_from_4(buf+FIL_PAGE_SPACE_ID);
const FilSpace space(space_id, true); fil_space_t* space = fil_space_acquire_for_io(space_id);
ib::error() << "Corruption: Page is marked as compressed" ib::error() << "Corruption: Page is marked as compressed"
<< " space: " << space_id << " name: " << " space: " << space_id << " name: "
<< (space() ? space()->name : "NULL") << (space ? space->name : "NULL")
<< " but uncompress failed with error: " << err << " but uncompress failed with error: " << err
<< " size: " << actual_size << " size: " << actual_size
<< " len: " << len << " len: " << len
...@@ -643,4 +643,5 @@ fil_decompress_page( ...@@ -643,4 +643,5 @@ fil_decompress_page(
<< fil_get_compression_alg_name(compression_alg) << "."; << fil_get_compression_alg_name(compression_alg) << ".";
buf_page_print(buf, univ_page_size, 0); buf_page_print(buf, univ_page_size, 0);
fil_space_release_for_io(space);
} }
...@@ -835,17 +835,6 @@ buf_page_is_checksum_valid_none( ...@@ -835,17 +835,6 @@ buf_page_is_checksum_valid_none(
) )
MY_ATTRIBUTE((nonnull(1), warn_unused_result)); MY_ATTRIBUTE((nonnull(1), warn_unused_result));
/********************************************************************//**
Check if page is maybe compressed, encrypted or both when we encounter
corrupted page. Note that we can't be 100% sure if page is corrupted
or decrypt/decompress just failed.
@param[in] bpage Page
@return true if page corrupted, false if not */
bool
buf_page_check_corrupt(
buf_page_t* bpage) /*!< in/out: buffer page read from disk */
MY_ATTRIBUTE((nonnull, warn_unused_result));
/** Checks if a page contains only zeroes. /** Checks if a page contains only zeroes.
@param[in] read_buf database page @param[in] read_buf database page
@param[in] page_size page size @param[in] page_size page size
......
...@@ -142,14 +142,21 @@ struct fil_space_t { ...@@ -142,14 +142,21 @@ struct fil_space_t {
ulint n_pending_flushes; /*!< this is positive when flushing ulint n_pending_flushes; /*!< this is positive when flushing
the tablespace to disk; dropping of the the tablespace to disk; dropping of the
tablespace is forbidden if this is positive */ tablespace is forbidden if this is positive */
ulint n_pending_ops;/*!< this is positive when we /** Number of pending buffer pool operations accessing the tablespace
have pending operations against this without holding a table lock or dict_operation_lock S-latch
tablespace. The pending operations can that would prevent the table (and tablespace) from being
be ibuf merges or lock validation code dropped. An example is change buffer merge.
trying to read a block. The tablespace cannot be dropped while this is nonzero,
Dropping of the tablespace is forbidden or while fil_node_t::n_pending is nonzero.
if this is positive. Protected by fil_system->mutex. */
Protected by fil_system->mutex. */ ulint n_pending_ops;
/** Number of pending block read or write operations
(when a write is imminent or a read has recently completed).
The tablespace object cannot be freed while this is nonzero,
but it can be detached from fil_system.
Note that fil_node_t::n_pending tracks actual pending I/O requests.
Protected by fil_system->mutex. */
ulint n_pending_ios;
hash_node_t hash; /*!< hash chain node */ hash_node_t hash; /*!< hash chain node */
hash_node_t name_hash;/*!< hash chain the name_hash table */ hash_node_t name_hash;/*!< hash chain the name_hash table */
rw_lock_t latch; /*!< latch protecting the file space storage rw_lock_t latch; /*!< latch protecting the file space storage
...@@ -726,12 +733,10 @@ MY_ATTRIBUTE((warn_unused_result)); ...@@ -726,12 +733,10 @@ MY_ATTRIBUTE((warn_unused_result));
Used by background threads that do not necessarily hold proper locks Used by background threads that do not necessarily hold proper locks
for concurrency control. for concurrency control.
@param[in] id tablespace ID @param[in] id tablespace ID
@param[in] for_io whether to look up the tablespace while performing I/O
(possibly executing TRUNCATE)
@return the tablespace @return the tablespace
@retval NULL if missing or being deleted or truncated */ @retval NULL if missing or being deleted or truncated */
fil_space_t* fil_space_t*
fil_space_acquire(ulint id, bool for_io = false) fil_space_acquire(ulint id)
MY_ATTRIBUTE((warn_unused_result)); MY_ATTRIBUTE((warn_unused_result));
/** Acquire a tablespace that may not exist. /** Acquire a tablespace that may not exist.
...@@ -749,6 +754,19 @@ fil_space_acquire_silent(ulint id) ...@@ -749,6 +754,19 @@ fil_space_acquire_silent(ulint id)
void void
fil_space_release(fil_space_t* space); fil_space_release(fil_space_t* space);
/** Acquire a tablespace for reading or writing a block,
when it could be dropped concurrently.
@param[in] id tablespace ID
@return the tablespace
@retval NULL if missing */
fil_space_t*
fil_space_acquire_for_io(ulint id);
/** Release a tablespace acquired with fil_space_acquire_for_io().
@param[in,out] space tablespace to release */
void
fil_space_release_for_io(fil_space_t* space);
/** Return the next fil_space_t. /** Return the next fil_space_t.
Once started, the caller must keep calling this until it returns NULL. Once started, the caller must keep calling this until it returns NULL.
fil_space_acquire() and fil_space_release() are invoked here which fil_space_acquire() and fil_space_release() are invoked here which
...@@ -785,12 +803,9 @@ class FilSpace ...@@ -785,12 +803,9 @@ class FilSpace
/** Constructor: Look up the tablespace and increment the /** Constructor: Look up the tablespace and increment the
reference count if found. reference count if found.
@param[in] space_id tablespace ID @param[in] space_id tablespace ID */
@param[in] for_io whether to look up the tablespace explicit FilSpace(ulint space_id)
while performing I/O : m_space(fil_space_acquire(space_id)) {}
(possibly executing TRUNCATE) */
explicit FilSpace(ulint space_id, bool for_io = false)
: m_space(fil_space_acquire(space_id, for_io)) {}
/** Assignment operator: This assumes that fil_space_acquire() /** Assignment operator: This assumes that fil_space_acquire()
has already been done for the fil_space_t. The caller must has already been done for the fil_space_t. The caller must
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment