MDEV-23456 fil_space_crypt_t::write_page0() is accessing an uninitialized page

buf_page_create() is invoked when page is initialized. So that
previous contents of the page ignored. In few cases, it calls
buf_page_get_gen() is called to fetch the page from buffer pool.
It should take x-latch on the page. If other thread uses the block
or block io state is different from BUF_IO_NONE then release the
mutex and check the state and buffer fix count again. For compressed
page, use the existing free block from LRU list to create new page.
Retry to fetch the compressed page if it is in flush list

fseg_create(), fseg_create_general(): Introduce block as a parameter
where segment header is placed. It is used to avoid repetitive
x-latch on the same page

Change the assert to check whether the page has SX latch and
X latch in all callee function of buf_page_create()

mtr_t::get_fix_count(): Get the buffer fix count of the given
block added by the mtr

FindBlock is added to find the buffer fix count of the given
block acquired by the mini-transaction
parent f99cace7
...@@ -1086,8 +1086,7 @@ btr_create( ...@@ -1086,8 +1086,7 @@ btr_create(
if (type & DICT_IBUF) { if (type & DICT_IBUF) {
/* Allocate first the ibuf header page */ /* Allocate first the ibuf header page */
buf_block_t* ibuf_hdr_block = fseg_create( buf_block_t* ibuf_hdr_block = fseg_create(
space, 0, space, IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
if (ibuf_hdr_block == NULL) { if (ibuf_hdr_block == NULL) {
return(FIL_NULL); return(FIL_NULL);
...@@ -1118,7 +1117,7 @@ btr_create( ...@@ -1118,7 +1117,7 @@ btr_create(
flst_init(block->frame + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, flst_init(block->frame + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
mtr); mtr);
} else { } else {
block = fseg_create(space, 0, block = fseg_create(space,
PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr); PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
if (block == NULL) { if (block == NULL) {
...@@ -1127,8 +1126,9 @@ btr_create( ...@@ -1127,8 +1126,9 @@ btr_create(
buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW); buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
if (!fseg_create(space, block->page.id.page_no(), if (!fseg_create(space,
PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr)) { PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr,
block)) {
/* Not enough space for new segment, free root /* Not enough space for new segment, free root
segment before return. */ segment before return. */
btr_free_root(block, mtr, btr_free_root(block, mtr,
......
...@@ -5564,14 +5564,13 @@ buf_page_create( ...@@ -5564,14 +5564,13 @@ buf_page_create(
buf_frame_t* frame; buf_frame_t* frame;
buf_block_t* block; buf_block_t* block;
buf_block_t* free_block = NULL; buf_block_t* free_block = NULL;
buf_pool_t* buf_pool = buf_pool_get(page_id); buf_pool_t* buf_pool= buf_pool_get(page_id);
rw_lock_t* hash_lock; rw_lock_t* hash_lock;
ut_ad(mtr->is_active()); ut_ad(mtr->is_active());
ut_ad(page_id.space() != 0 || !page_size.is_compressed()); ut_ad(page_id.space() != 0 || !page_size.is_compressed());
loop:
free_block = buf_LRU_get_free_block(buf_pool); free_block = buf_LRU_get_free_block(buf_pool);
buf_pool_mutex_enter(buf_pool); buf_pool_mutex_enter(buf_pool);
hash_lock = buf_page_hash_lock_get(buf_pool, page_id); hash_lock = buf_page_hash_lock_get(buf_pool, page_id);
...@@ -5583,20 +5582,67 @@ buf_page_create( ...@@ -5583,20 +5582,67 @@ buf_page_create(
&& buf_page_in_file(&block->page) && buf_page_in_file(&block->page)
&& !buf_pool_watch_is_sentinel(buf_pool, &block->page)) { && !buf_pool_watch_is_sentinel(buf_pool, &block->page)) {
ut_d(block->page.file_page_was_freed = FALSE); ut_d(block->page.file_page_was_freed = FALSE);
buf_page_state page_state = buf_block_get_state(block);
#ifdef BTR_CUR_HASH_ADAPT #ifdef BTR_CUR_HASH_ADAPT
bool drop_hash_entry = const dict_index_t *drop_hash_entry= NULL;
(block->page.state == BUF_BLOCK_FILE_PAGE #endif
&& block->index); switch (page_state) {
default:
ut_ad(0);
break;
case BUF_BLOCK_ZIP_PAGE:
case BUF_BLOCK_ZIP_DIRTY:
buf_block_init_low(free_block);
mutex_enter(&buf_pool->zip_mutex);
if (drop_hash_entry) { buf_page_mutex_enter(free_block);
mutex_enter(&block->mutex); if (buf_page_get_io_fix(&block->page) != BUF_IO_NONE) {
/* Avoid a hang if I/O is going on. Release mutex_exit(&buf_pool->zip_mutex);
the buffer pool mutex and page hash lock rw_lock_x_unlock(hash_lock);
and wait for I/O to complete */ buf_LRU_block_free_non_file_page(free_block);
while (buf_block_get_io_fix(block) != BUF_IO_NONE) { buf_pool_mutex_exit(buf_pool);
buf_page_mutex_exit(free_block);
goto loop;
}
rw_lock_x_lock(&free_block->lock);
buf_relocate(&block->page, &free_block->page);
if (page_state == BUF_BLOCK_ZIP_DIRTY) {
ut_ad(block->page.in_flush_list);
ut_ad(block->page.oldest_modification > 0);
buf_flush_relocate_on_flush_list(
&block->page, &free_block->page);
} else {
ut_ad(block->page.oldest_modification == 0);
ut_ad(!block->page.in_flush_list);
#ifdef UNIV_DEBUG
UT_LIST_REMOVE(
buf_pool->zip_clean, &block->page);
#endif
}
free_block->page.state = BUF_BLOCK_FILE_PAGE;
mutex_exit(&buf_pool->zip_mutex);
free_block->lock_hash_val = lock_rec_hash(
page_id.space(), page_id.page_no());
buf_unzip_LRU_add_block(free_block, false);
buf_page_free_descriptor(&block->page);
block = free_block;
buf_block_fix(block); buf_block_fix(block);
mutex_exit(&block->mutex); buf_page_mutex_exit(free_block);
free_block = NULL;
break;
case BUF_BLOCK_FILE_PAGE:
buf_block_fix(block);
const int32_t num_fix_count =
mtr->get_fix_count(block) + 1;
buf_page_mutex_enter(block);
while (buf_block_get_io_fix(block) != BUF_IO_NONE
|| (num_fix_count
!= block->page.buf_fix_count)) {
buf_page_mutex_exit(block);
buf_pool_mutex_exit(buf_pool); buf_pool_mutex_exit(buf_pool);
rw_lock_x_unlock(hash_lock); rw_lock_x_unlock(hash_lock);
...@@ -5604,33 +5650,39 @@ buf_page_create( ...@@ -5604,33 +5650,39 @@ buf_page_create(
buf_pool_mutex_enter(buf_pool); buf_pool_mutex_enter(buf_pool);
rw_lock_x_lock(hash_lock); rw_lock_x_lock(hash_lock);
mutex_enter(&block->mutex); buf_page_mutex_enter(block);
buf_block_unfix(block);
} }
rw_lock_x_lock(&block->lock); rw_lock_x_lock(&block->lock);
mutex_exit(&block->mutex); buf_page_mutex_exit(block);
} #ifdef BTR_CUR_HASH_ADAPT
drop_hash_entry = block->index;
#endif #endif
break;
}
/* Page can be found in buf_pool */ /* Page can be found in buf_pool */
buf_pool_mutex_exit(buf_pool); buf_pool_mutex_exit(buf_pool);
rw_lock_x_unlock(hash_lock); rw_lock_x_unlock(hash_lock);
if (free_block) {
buf_block_free(free_block); buf_block_free(free_block);
}
#ifdef BTR_CUR_HASH_ADAPT #ifdef BTR_CUR_HASH_ADAPT
if (drop_hash_entry) { if (drop_hash_entry) {
btr_search_drop_page_hash_index(block); btr_search_drop_page_hash_index(block);
rw_lock_x_unlock(&block->lock);
} }
#endif /* BTR_CUR_HASH_ADAPT */ #endif /* BTR_CUR_HASH_ADAPT */
if (!recv_recovery_is_on()) { #ifdef UNIV_DEBUG
return buf_page_get_with_no_latch(page_id, page_size, if (!fsp_is_system_temporary(page_id.space())) {
mtr); rw_lock_s_lock_nowait(
&block->debug_latch,
__FILE__, __LINE__);
} }
#endif /* UNIV_DEBUG */
mtr_memo_push(mtr, block, MTR_MEMO_PAGE_X_FIX);
mutex_exit(&recv_sys->mutex);
block = buf_page_get_with_no_latch(page_id, page_size, mtr);
mutex_enter(&recv_sys->mutex);
return block; return block;
} }
...@@ -5645,6 +5697,8 @@ buf_page_create( ...@@ -5645,6 +5697,8 @@ buf_page_create(
buf_page_init(buf_pool, page_id, page_size, block); buf_page_init(buf_pool, page_id, page_size, block);
rw_lock_x_lock(&block->lock);
rw_lock_x_unlock(hash_lock); rw_lock_x_unlock(hash_lock);
/* The block must be put to the LRU list */ /* The block must be put to the LRU list */
...@@ -5662,7 +5716,6 @@ buf_page_create( ...@@ -5662,7 +5716,6 @@ buf_page_create(
by IO-fixing and X-latching the block. */ by IO-fixing and X-latching the block. */
buf_page_set_io_fix(&block->page, BUF_IO_READ); buf_page_set_io_fix(&block->page, BUF_IO_READ);
rw_lock_x_lock(&block->lock);
buf_page_mutex_exit(block); buf_page_mutex_exit(block);
/* buf_pool->mutex may be released and reacquired by /* buf_pool->mutex may be released and reacquired by
...@@ -5684,12 +5737,11 @@ buf_page_create( ...@@ -5684,12 +5737,11 @@ buf_page_create(
buf_unzip_LRU_add_block(block, FALSE); buf_unzip_LRU_add_block(block, FALSE);
buf_page_set_io_fix(&block->page, BUF_IO_NONE); buf_page_set_io_fix(&block->page, BUF_IO_NONE);
rw_lock_x_unlock(&block->lock);
} }
buf_pool_mutex_exit(buf_pool); buf_pool_mutex_exit(buf_pool);
mtr_memo_push(mtr, block, MTR_MEMO_BUF_FIX); mtr_memo_push(mtr, block, MTR_MEMO_PAGE_X_FIX);
buf_page_set_accessed(&block->page); buf_page_set_accessed(&block->page);
......
...@@ -170,6 +170,7 @@ buf_dblwr_create() ...@@ -170,6 +170,7 @@ buf_dblwr_create()
{ {
buf_block_t* block2; buf_block_t* block2;
buf_block_t* new_block; buf_block_t* new_block;
buf_block_t* trx_sys_block;
byte* doublewrite; byte* doublewrite;
byte* fseg_header; byte* fseg_header;
ulint page_no; ulint page_no;
...@@ -209,9 +210,14 @@ buf_dblwr_create() ...@@ -209,9 +210,14 @@ buf_dblwr_create()
} }
} }
block2 = fseg_create(TRX_SYS_SPACE, TRX_SYS_PAGE_NO, trx_sys_block = buf_page_get(
page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
page_size_t(srv_page_size, srv_page_size, 0), RW_X_LATCH,
&mtr);
block2 = fseg_create(TRX_SYS_SPACE,
TRX_SYS_DOUBLEWRITE TRX_SYS_DOUBLEWRITE
+ TRX_SYS_DOUBLEWRITE_FSEG, &mtr); + TRX_SYS_DOUBLEWRITE_FSEG, &mtr, trx_sys_block);
if (block2 == NULL) { if (block2 == NULL) {
too_small: too_small:
......
...@@ -179,7 +179,7 @@ dict_hdr_create( ...@@ -179,7 +179,7 @@ dict_hdr_create(
/* Create the dictionary header file block in a new, allocated file /* Create the dictionary header file block in a new, allocated file
segment in the system tablespace */ segment in the system tablespace */
block = fseg_create(DICT_HDR_SPACE, 0, block = fseg_create(DICT_HDR_SPACE,
DICT_HDR + DICT_HDR_FSEG_HEADER, mtr); DICT_HDR + DICT_HDR_FSEG_HEADER, mtr);
ut_a(DICT_HDR_PAGE_NO == block->page.id.page_no()); ut_a(DICT_HDR_PAGE_NO == block->page.id.page_no());
......
This diff is collapsed.
...@@ -407,42 +407,52 @@ fsp_header_inc_size( ...@@ -407,42 +407,52 @@ fsp_header_inc_size(
ulint space_id, /*!< in: space id */ ulint space_id, /*!< in: space id */
ulint size_inc, /*!< in: size increment in pages */ ulint size_inc, /*!< in: size increment in pages */
mtr_t* mtr); /*!< in/out: mini-transaction */ mtr_t* mtr); /*!< in/out: mini-transaction */
/**********************************************************************//**
Creates a new segment. /** Creates a new segment.
@param[in] space space id
@param[in] byte_offset byte offset of the created segment header
on the page
@param[in,out] mtr mini-transaction
@param[in,out] block block where segment header is placed;
If it is null then new page will be
allocated and it will belong to
the created segment
@return the block where the segment header is placed, x-latched, NULL @return the block where the segment header is placed, x-latched, NULL
if could not create segment because of lack of space */ if could not create segment because of lack of space */
buf_block_t* buf_block_t*
fseg_create( fseg_create(
/*========*/ ulint space,
ulint space_id,/*!< in: space id */ ulint byte_offset,
ulint page, /*!< in: page where the segment header is placed: if mtr_t* mtr,
this is != 0, the page must belong to another segment, buf_block_t* block=NULL);
if this is 0, a new page will be allocated and it
will belong to the created segment */ /** Creates a new segment.
ulint byte_offset, /*!< in: byte offset of the created segment header @param[in] space_id space_id
on the page */ @param[in] byte_offset byte offset of the created segment
mtr_t* mtr); /*!< in/out: mini-transaction */ header on the page
/**********************************************************************//** @param[in] has_done_reservation TRUE if the caller has already
Creates a new segment. done the reservation for the pages
with fsp_reserve_free_externts
(at least 2 extents: one for
the inode and the other for the
segment) then there is no need to do
the check for this individual
operation
@param[in,out] mtr mini-transaction
@param[in] block block where the segment header is
placed. If it is null then new page
will be allocated and it will belong
to the created segment
@return the block where the segment header is placed, x-latched, NULL @return the block where the segment header is placed, x-latched, NULL
if could not create segment because of lack of space */ if could not create segment because of lack of space */
buf_block_t* buf_block_t*
fseg_create_general( fseg_create_general(
/*================*/ ulint space_id,
ulint space_id,/*!< in: space id */ ulint byte_offset,
ulint page, /*!< in: page where the segment header is placed: if ibool has_done_reservation,
this is != 0, the page must belong to another segment, mtr_t* mtr,
if this is 0, a new page will be allocated and it buf_block_t* block);
will belong to the created segment */
ulint byte_offset, /*!< in: byte offset of the created segment header
on the page */
ibool has_done_reservation, /*!< in: TRUE if the caller has already
done the reservation for the pages with
fsp_reserve_free_extents (at least 2 extents: one for
the inode and the other for the segment) then there is
no need to do the check for this individual
operation */
mtr_t* mtr); /*!< in/out: mini-transaction */
/**********************************************************************//** /**********************************************************************//**
Calculates the number of pages reserved by a segment, and how many pages are Calculates the number of pages reserved by a segment, and how many pages are
currently used. currently used.
......
...@@ -434,6 +434,10 @@ struct mtr_t { ...@@ -434,6 +434,10 @@ struct mtr_t {
static inline bool is_block_dirtied(const buf_block_t* block) static inline bool is_block_dirtied(const buf_block_t* block)
MY_ATTRIBUTE((warn_unused_result)); MY_ATTRIBUTE((warn_unused_result));
/** Get the buffer fix count for the block added by this mtr.
@param[in] block block to be checked
@return number of buffer count added by this mtr */
int32_t get_fix_count(buf_block_t *block);
private: private:
/** Look up the system tablespace. */ /** Look up the system tablespace. */
void lookup_sys_space(); void lookup_sys_space();
......
...@@ -2362,7 +2362,6 @@ static buf_block_t* recv_recovery_create_page_low(const page_id_t page_id, ...@@ -2362,7 +2362,6 @@ static buf_block_t* recv_recovery_create_page_low(const page_id_t page_id,
{ {
i.created = true; i.created = true;
buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK); buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
mtr.x_latch_at_savepoint(0, block);
recv_recover_page(block, mtr, recv_addr, i.lsn); recv_recover_page(block, mtr, recv_addr, i.lsn);
ut_ad(mtr.has_committed()); ut_ad(mtr.has_committed());
} }
......
...@@ -308,6 +308,32 @@ struct DebugCheck { ...@@ -308,6 +308,32 @@ struct DebugCheck {
}; };
#endif #endif
/** Find buffer fix count of the given block acquired by the
mini-transaction */
struct FindBlock
{
int32_t num_fix;
buf_block_t *block;
FindBlock(buf_block_t *block_buf): num_fix(0), block(block_buf) {}
bool operator()(const mtr_memo_slot_t* slot)
{
if (slot->object != NULL)
{
buf_block_t *mtr_block= reinterpret_cast<buf_block_t*>(slot->object);
if (mtr_block == block)
num_fix++;
}
return true;
}
int32_t get_num_fix()
{
return num_fix;
}
};
/** Release a resource acquired by the mini-transaction. */ /** Release a resource acquired by the mini-transaction. */
struct ReleaseBlocks { struct ReleaseBlocks {
/** Release specific object */ /** Release specific object */
...@@ -804,6 +830,15 @@ mtr_t::release_free_extents(ulint n_reserved) ...@@ -804,6 +830,15 @@ mtr_t::release_free_extents(ulint n_reserved)
space->release_free_extents(n_reserved); space->release_free_extents(n_reserved);
} }
int32_t mtr_t::get_fix_count(buf_block_t *block)
{
struct FindBlock find_block(block);
Iterate<FindBlock> iteration(find_block);
if (m_memo.for_each_block(iteration))
return iteration.functor.get_num_fix();
return 0;
}
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/** Check if memo contains the given item. /** Check if memo contains the given item.
@return true if contains */ @return true if contains */
......
...@@ -57,7 +57,7 @@ trx_rseg_header_create( ...@@ -57,7 +57,7 @@ trx_rseg_header_create(
MTR_MEMO_SPACE_X_LOCK)); MTR_MEMO_SPACE_X_LOCK));
/* Allocate a new file segment for the rollback segment */ /* Allocate a new file segment for the rollback segment */
block = fseg_create(space, 0, TRX_RSEG + TRX_RSEG_FSEG_HEADER, mtr); block = fseg_create(space, TRX_RSEG + TRX_RSEG_FSEG_HEADER, mtr);
if (block == NULL) { if (block == NULL) {
/* No space left */ /* No space left */
......
...@@ -422,7 +422,7 @@ trx_sysf_create( ...@@ -422,7 +422,7 @@ trx_sysf_create(
mtr_x_lock_space(TRX_SYS_SPACE, mtr); mtr_x_lock_space(TRX_SYS_SPACE, mtr);
/* Create the trx sys file block in a new allocated file segment */ /* Create the trx sys file block in a new allocated file segment */
block = fseg_create(TRX_SYS_SPACE, 0, TRX_SYS + TRX_SYS_FSEG_HEADER, block = fseg_create(TRX_SYS_SPACE, TRX_SYS + TRX_SYS_FSEG_HEADER,
mtr); mtr);
buf_block_dbg_add_level(block, SYNC_TRX_SYS_HEADER); buf_block_dbg_add_level(block, SYNC_TRX_SYS_HEADER);
......
...@@ -446,9 +446,9 @@ trx_undo_seg_create( ...@@ -446,9 +446,9 @@ trx_undo_seg_create(
} }
/* Allocate a new file segment for the undo log */ /* Allocate a new file segment for the undo log */
block = fseg_create_general(space, 0, block = fseg_create_general(space,
TRX_UNDO_SEG_HDR TRX_UNDO_SEG_HDR
+ TRX_UNDO_FSEG_HEADER, TRUE, mtr); + TRX_UNDO_FSEG_HEADER, TRUE, mtr, NULL);
fil_space_release_free_extents(space, n_reserved); fil_space_release_free_extents(space, n_reserved);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment