Commit bb328a2a authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-24188 Hang in buf_page_create() after reusing a previously freed page

The fix of MDEV-23456 (commit b1009ae5)
introduced a livelock between page flushing and a thread that is
executing buf_page_create().

buf_page_create(): If the current mini-transaction is holding
an exclusive latch on the page, do not attempt to acquire another
one, and do not care about any I/O fix.

mtr_t::have_x_latch(): Replaces mtr_t::get_fix_count().

dyn_buf_t::for_each_block(const Functor&) const: A new variant.

rw_lock_own(): Add a const qualifier.

Reviewed by: Thirunarayanan Balathandayuthapani
parent 190e8a4c
...@@ -5511,6 +5511,7 @@ buf_page_create( ...@@ -5511,6 +5511,7 @@ buf_page_create(
&& !buf_pool_watch_is_sentinel(buf_pool, &block->page)) { && !buf_pool_watch_is_sentinel(buf_pool, &block->page)) {
ut_d(block->page.file_page_was_freed = FALSE); ut_d(block->page.file_page_was_freed = FALSE);
buf_page_state page_state = buf_block_get_state(block); buf_page_state page_state = buf_block_get_state(block);
bool have_x_latch = false;
#ifdef BTR_CUR_HASH_ADAPT #ifdef BTR_CUR_HASH_ADAPT
const dict_index_t *drop_hash_entry= NULL; const dict_index_t *drop_hash_entry= NULL;
#endif #endif
...@@ -5563,26 +5564,26 @@ buf_page_create( ...@@ -5563,26 +5564,26 @@ buf_page_create(
free_block = NULL; free_block = NULL;
break; break;
case BUF_BLOCK_FILE_PAGE: case BUF_BLOCK_FILE_PAGE:
buf_block_fix(block); have_x_latch = mtr->have_x_latch(*block);
const int32_t num_fix_count = if (!have_x_latch) {
mtr->get_fix_count(block) + 1; buf_block_fix(block);
buf_page_mutex_enter(block);
while (buf_block_get_io_fix(block) != BUF_IO_NONE
|| (num_fix_count
!= block->page.buf_fix_count)) {
buf_page_mutex_exit(block);
buf_pool_mutex_exit(buf_pool);
rw_lock_x_unlock(hash_lock);
os_thread_yield();
buf_pool_mutex_enter(buf_pool);
rw_lock_x_lock(hash_lock);
buf_page_mutex_enter(block); buf_page_mutex_enter(block);
while (buf_block_get_io_fix(block)
!= BUF_IO_NONE
|| block->page.buf_fix_count != 1) {
buf_page_mutex_exit(block);
buf_pool_mutex_exit(buf_pool);
rw_lock_x_unlock(hash_lock);
os_thread_sleep(1000);
buf_pool_mutex_enter(buf_pool);
rw_lock_x_lock(hash_lock);
buf_page_mutex_enter(block);
}
rw_lock_x_lock(&block->lock);
buf_page_mutex_exit(block);
} }
rw_lock_x_lock(&block->lock);
buf_page_mutex_exit(block);
#ifdef BTR_CUR_HASH_ADAPT #ifdef BTR_CUR_HASH_ADAPT
drop_hash_entry = block->index; drop_hash_entry = block->index;
#endif #endif
...@@ -5601,16 +5602,17 @@ buf_page_create( ...@@ -5601,16 +5602,17 @@ buf_page_create(
} }
#endif /* BTR_CUR_HASH_ADAPT */ #endif /* BTR_CUR_HASH_ADAPT */
if (!have_x_latch) {
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
if (!fsp_is_system_temporary(page_id.space())) { if (!fsp_is_system_temporary(page_id.space())) {
rw_lock_s_lock_nowait( rw_lock_s_lock_nowait(
&block->debug_latch, &block->debug_latch,
__FILE__, __LINE__); __FILE__, __LINE__);
} }
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
mtr_memo_push(mtr, block, MTR_MEMO_PAGE_X_FIX); mtr_memo_push(mtr, block, MTR_MEMO_PAGE_X_FIX);
}
return block; return block;
} }
......
/***************************************************************************** /*****************************************************************************
Copyright (c) 2013, 2016, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2013, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2019, MariaDB Corporation. Copyright (c) 2019, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -347,6 +347,24 @@ class dyn_buf_t { ...@@ -347,6 +347,24 @@ class dyn_buf_t {
return(true); return(true);
} }
/**
Iterate over each block and call the functor.
@return false if iteration was terminated. */
template <typename Functor>
bool for_each_block(const Functor& functor) const
{
for (typename list_t::iterator it = m_list.begin(),
end = m_list.end();
it != end; ++it) {
if (!functor(&*it)) {
return false;
}
}
return(true);
}
/** /**
Iterate over all the blocks in reverse and call the iterator Iterate over all the blocks in reverse and call the iterator
@return false if iteration was terminated. */ @return false if iteration was terminated. */
......
...@@ -434,10 +434,9 @@ struct mtr_t { ...@@ -434,10 +434,9 @@ struct mtr_t {
static inline bool is_block_dirtied(const buf_block_t* block) static inline bool is_block_dirtied(const buf_block_t* block)
MY_ATTRIBUTE((warn_unused_result)); MY_ATTRIBUTE((warn_unused_result));
/** Get the buffer fix count for the block added by this mtr. /** Check if we are holding a block latch in exclusive mode
@param[in] block block to be checked @param block buffer pool block to search for */
@return number of buffer count added by this mtr */ bool have_x_latch(const buf_block_t& block) const;
int32_t get_fix_count(buf_block_t *block);
private: private:
/** Look up the system tablespace. */ /** Look up the system tablespace. */
void lookup_sys_space(); void lookup_sys_space();
......
...@@ -508,7 +508,7 @@ the pass value == 0. */ ...@@ -508,7 +508,7 @@ the pass value == 0. */
ibool ibool
rw_lock_own( rw_lock_own(
/*========*/ /*========*/
rw_lock_t* lock, /*!< in: rw-lock */ const rw_lock_t*lock, /*!< in: rw-lock */
ulint lock_type) /*!< in: lock type: RW_LOCK_S, ulint lock_type) /*!< in: lock type: RW_LOCK_S,
RW_LOCK_X */ RW_LOCK_X */
MY_ATTRIBUTE((warn_unused_result)); MY_ATTRIBUTE((warn_unused_result));
......
...@@ -308,24 +308,6 @@ struct DebugCheck { ...@@ -308,24 +308,6 @@ struct DebugCheck {
}; };
#endif #endif
/** Find buffer fix count of the given block acquired by the
mini-transaction */
struct FindBlock
{
int32_t num_fix;
const buf_block_t *const block;
FindBlock(const buf_block_t *block_buf): num_fix(0), block(block_buf) {}
bool operator()(const mtr_memo_slot_t* slot)
{
if (slot->object == block)
ut_d(if (slot->type != MTR_MEMO_MODIFY))
num_fix++;
return true;
}
};
/** Release a resource acquired by the mini-transaction. */ /** Release a resource acquired by the mini-transaction. */
struct ReleaseBlocks { struct ReleaseBlocks {
/** Release specific object */ /** Release specific object */
...@@ -822,12 +804,48 @@ mtr_t::release_free_extents(ulint n_reserved) ...@@ -822,12 +804,48 @@ mtr_t::release_free_extents(ulint n_reserved)
space->release_free_extents(n_reserved); space->release_free_extents(n_reserved);
} }
int32_t mtr_t::get_fix_count(buf_block_t *block) /** Find out whether a block was X-latched by the mini-transaction */
struct FindBlockX
{ {
Iterate<FindBlock> iteration((FindBlock(block))); const buf_block_t &block;
if (m_memo.for_each_block(iteration))
return iteration.functor.num_fix; FindBlockX(const buf_block_t &block): block(block) {}
return 0;
/** @return whether the block was not found x-latched */
bool operator()(const mtr_memo_slot_t *slot) const
{
return slot->object != &block || slot->type == MTR_MEMO_PAGE_X_FIX;
}
};
#ifdef UNIV_DEBUG
/** Assert that the block is not present in the mini-transaction */
struct FindNoBlock
{
const buf_block_t &block;
FindNoBlock(const buf_block_t &block): block(block) {}
/** @return whether the block was not found */
bool operator()(const mtr_memo_slot_t *slot) const
{
return slot->object != &block;
}
};
#endif /* UNIV_DEBUG */
bool mtr_t::have_x_latch(const buf_block_t &block) const
{
if (m_memo.for_each_block(CIterate<FindBlockX>(FindBlockX(block))))
{
ut_ad(m_memo.for_each_block(CIterate<FindNoBlock>(FindNoBlock(block))));
ut_ad(!memo_contains_flagged(&block,
MTR_MEMO_PAGE_S_FIX | MTR_MEMO_PAGE_SX_FIX |
MTR_MEMO_BUF_FIX | MTR_MEMO_MODIFY));
return false;
}
ut_ad(rw_lock_own(&block.lock, RW_LOCK_X));
return true;
} }
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
...@@ -844,15 +862,17 @@ mtr_t::memo_contains( ...@@ -844,15 +862,17 @@ mtr_t::memo_contains(
return(false); return(false);
} }
const rw_lock_t *lock = static_cast<const rw_lock_t*>(object);
switch (type) { switch (type) {
case MTR_MEMO_X_LOCK: case MTR_MEMO_X_LOCK:
ut_ad(rw_lock_own((rw_lock_t*) object, RW_LOCK_X)); ut_ad(rw_lock_own(lock, RW_LOCK_X));
break; break;
case MTR_MEMO_SX_LOCK: case MTR_MEMO_SX_LOCK:
ut_ad(rw_lock_own((rw_lock_t*) object, RW_LOCK_SX)); ut_ad(rw_lock_own(lock, RW_LOCK_SX));
break; break;
case MTR_MEMO_S_LOCK: case MTR_MEMO_S_LOCK:
ut_ad(rw_lock_own((rw_lock_t*) object, RW_LOCK_S)); ut_ad(rw_lock_own(lock, RW_LOCK_S));
break; break;
} }
......
...@@ -990,7 +990,7 @@ the pass value == 0. ...@@ -990,7 +990,7 @@ the pass value == 0.
ibool ibool
rw_lock_own( rw_lock_own(
/*========*/ /*========*/
rw_lock_t* lock, /*!< in: rw-lock */ const rw_lock_t*lock, /*!< in: rw-lock */
ulint lock_type) /*!< in: lock type: RW_LOCK_S, ulint lock_type) /*!< in: lock type: RW_LOCK_S,
RW_LOCK_X */ RW_LOCK_X */
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment