Commit f619d79b authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-25491: Race condition between DROP TABLE and purge of SYS_INDEXES record

btr_free_if_exists(): Always use the BUF_GET_POSSIBLY_FREED mode
when accessing pages, because due to MDEV-24589 the function
fil_space_t::set_stopping(true) can be called at any time during
the execution of this function.

mtr_t::m_freeing_tree: New data member for debugging purposes.

buf_page_get_low(): Assert that the BUF_GET mode is not being used
anywhere during the execution of btr_free_if_exists().

In all code related to freeing or allocating pages, we will add some
robustness, by making more use of BUF_GET_POSSIBLY_FREED and by
reporting an error instead of crashing in some cases of corruption.
parent a81aec15
......@@ -939,9 +939,11 @@ static void btr_free_root(buf_block_t *block, mtr_t *mtr)
#endif /* UNIV_BTR_DEBUG */
/* Free the entire segment in small steps. */
ut_d(mtr->freeing_tree());
while (!fseg_free_step(PAGE_HEADER + PAGE_BTR_SEG_TOP + block->frame, mtr));
}
MY_ATTRIBUTE((warn_unused_result))
/** Prepare to free a B-tree.
@param[in] page_id page id
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
......@@ -949,33 +951,29 @@ static void btr_free_root(buf_block_t *block, mtr_t *mtr)
@param[in,out] mtr mini-transaction
@return root block, to invoke btr_free_but_not_root() and btr_free_root()
@retval NULL if the page is no longer a matching B-tree page */
static MY_ATTRIBUTE((warn_unused_result))
buf_block_t*
btr_free_root_check(
const page_id_t page_id,
ulint zip_size,
index_id_t index_id,
mtr_t* mtr)
static
buf_block_t *btr_free_root_check(const page_id_t page_id, ulint zip_size,
index_id_t index_id, mtr_t *mtr)
{
ut_ad(page_id.space() != SRV_TMP_SPACE_ID);
ut_ad(index_id != BTR_FREED_INDEX_ID);
buf_block_t* block = buf_page_get(
page_id, zip_size, RW_X_LATCH, mtr);
if (block) {
if (fil_page_index_page_check(block->frame)
&& index_id == btr_page_get_index_id(block->frame)) {
/* This should be a root page.
It should not be possible to reassign the same
index_id for some other index in the tablespace. */
ut_ad(!page_has_siblings(block->frame));
} else {
block = NULL;
}
}
ut_ad(page_id.space() != SRV_TMP_SPACE_ID);
ut_ad(index_id != BTR_FREED_INDEX_ID);
buf_block_t *block= buf_page_get_gen(page_id, zip_size, RW_X_LATCH,
nullptr, BUF_GET_POSSIBLY_FREED, mtr);
if (!block);
else if (block->page.status == buf_page_t::FREED)
block= nullptr;
else if (fil_page_index_page_check(block->frame) &&
index_id == btr_page_get_index_id(block->frame))
/* This should be a root page. It should not be possible to
reassign the same index_id for some other index in the
tablespace. */
ut_ad(!page_has_siblings(block->frame));
else
block= nullptr;
return(block);
return block;
}
/** Initialize the root page of the b-tree
......@@ -1131,6 +1129,7 @@ btr_free_but_not_root(
ut_ad(!page_has_siblings(block->frame));
leaf_loop:
mtr_start(&mtr);
ut_d(mtr.freeing_tree());
mtr_set_log_mode(&mtr, log_mode);
mtr.set_named_space_id(block->page.id().space());
......@@ -1230,23 +1229,20 @@ void dict_index_t::clear(que_thr_t *thr)
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
@param[in] index_id PAGE_INDEX_ID contents
@param[in,out] mtr mini-transaction */
void
btr_free_if_exists(
const page_id_t page_id,
ulint zip_size,
index_id_t index_id,
mtr_t* mtr)
void btr_free_if_exists(const page_id_t page_id, ulint zip_size,
index_id_t index_id, mtr_t *mtr)
{
buf_block_t* root = btr_free_root_check(
page_id, zip_size, index_id, mtr);
if (root == NULL) {
return;
}
btr_free_but_not_root(root, mtr->get_log_mode());
mtr->set_named_space_id(page_id.space());
btr_free_root(root, mtr);
if (fil_space_t *space= fil_space_t::get(page_id.space()))
{
if (buf_block_t *root= btr_free_root_check(page_id, zip_size, index_id,
mtr))
{
btr_free_but_not_root(root, mtr->get_log_mode());
mtr->set_named_space(space);
btr_free_root(root, mtr);
}
space->release();
}
}
/** Free an index tree in a temporary tablespace.
......
......@@ -2585,6 +2585,7 @@ buf_page_get_low(
/* fall through */
case BUF_GET:
case BUF_GET_IF_IN_POOL_OR_WATCH:
ut_ad(!mtr->is_freeing_tree());
fil_space_t* s = fil_space_get(page_id.space());
ut_ad(s);
ut_ad(s->zip_size() == zip_size);
......
This diff is collapsed.
/*****************************************************************************
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2019, 2020, MariaDB Corporation.
Copyright (c) 2019, 2021, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -150,10 +150,10 @@ static void flst_insert_after(buf_block_t *base, uint16_t boffset,
else
{
buf_block_t *block;
flst_node_t *next= fut_get_ptr(add->page.id().space(), add->zip_size(),
next_addr, RW_SX_LATCH, mtr, &block);
flst_write_addr(*block, next + FLST_PREV,
add->page.id().page_no(), aoffset, mtr);
if (flst_node_t *next= fut_get_ptr(add->page.id().space(), add->zip_size(),
next_addr, RW_SX_LATCH, mtr, &block))
flst_write_addr(*block, next + FLST_PREV,
add->page.id().page_no(), aoffset, mtr);
}
flst_write_addr(*cur, cur->frame + coffset + FLST_NEXT,
......@@ -201,10 +201,10 @@ static void flst_insert_before(buf_block_t *base, uint16_t boffset,
else
{
buf_block_t *block;
flst_node_t *prev= fut_get_ptr(add->page.id().space(), add->zip_size(),
prev_addr, RW_SX_LATCH, mtr, &block);
flst_write_addr(*block, prev + FLST_NEXT,
add->page.id().page_no(), aoffset, mtr);
if (flst_node_t *prev= fut_get_ptr(add->page.id().space(), add->zip_size(),
prev_addr, RW_SX_LATCH, mtr, &block))
flst_write_addr(*block, prev + FLST_NEXT,
add->page.id().page_no(), aoffset, mtr);
}
flst_write_addr(*cur, cur->frame + coffset + FLST_PREV,
......@@ -254,9 +254,10 @@ void flst_add_last(buf_block_t *base, uint16_t boffset,
? add->frame + addr.boffset
: fut_get_ptr(add->page.id().space(), add->zip_size(), addr,
RW_SX_LATCH, mtr, &cur);
flst_insert_after(base, boffset, cur,
static_cast<uint16_t>(c - cur->frame),
add, aoffset, mtr);
if (c)
flst_insert_after(base, boffset, cur,
static_cast<uint16_t>(c - cur->frame),
add, aoffset, mtr);
}
}
......@@ -287,9 +288,10 @@ void flst_add_first(buf_block_t *base, uint16_t boffset,
? add->frame + addr.boffset
: fut_get_ptr(add->page.id().space(), add->zip_size(), addr,
RW_SX_LATCH, mtr, &cur);
flst_insert_before(base, boffset, cur,
static_cast<uint16_t>(c - cur->frame),
add, aoffset, mtr);
if (c)
flst_insert_before(base, boffset, cur,
static_cast<uint16_t>(c - cur->frame),
add, aoffset, mtr);
}
}
......@@ -318,12 +320,12 @@ void flst_remove(buf_block_t *base, uint16_t boffset,
else
{
buf_block_t *block= cur;
flst_node_t *prev= prev_addr.page == cur->page.id().page_no()
? cur->frame + prev_addr.boffset
: fut_get_ptr(cur->page.id().space(), cur->zip_size(), prev_addr,
RW_SX_LATCH, mtr, &block);
flst_write_addr(*block, prev + FLST_NEXT,
next_addr.page, next_addr.boffset, mtr);
if (flst_node_t *prev= prev_addr.page == cur->page.id().page_no()
? cur->frame + prev_addr.boffset
: fut_get_ptr(cur->page.id().space(), cur->zip_size(), prev_addr,
RW_SX_LATCH, mtr, &block))
flst_write_addr(*block, prev + FLST_NEXT,
next_addr.page, next_addr.boffset, mtr);
}
if (next_addr.page == FIL_NULL)
......@@ -332,12 +334,12 @@ void flst_remove(buf_block_t *base, uint16_t boffset,
else
{
buf_block_t *block= cur;
flst_node_t *next= next_addr.page == cur->page.id().page_no()
? cur->frame + next_addr.boffset
: fut_get_ptr(cur->page.id().space(), cur->zip_size(), next_addr,
RW_SX_LATCH, mtr, &block);
flst_write_addr(*block, next + FLST_PREV,
prev_addr.page, prev_addr.boffset, mtr);
if (flst_node_t *next= next_addr.page == cur->page.id().page_no()
? cur->frame + next_addr.boffset
: fut_get_ptr(cur->page.id().space(), cur->zip_size(), next_addr,
RW_SX_LATCH, mtr, &block))
flst_write_addr(*block, next + FLST_PREV,
prev_addr.page, prev_addr.boffset, mtr);
}
byte *len= &base->frame[boffset + FLST_LEN];
......@@ -369,6 +371,7 @@ void flst_validate(const buf_block_t *base, uint16_t boffset, mtr_t *mtr)
const flst_node_t *node= fut_get_ptr(base->page.id().space(),
base->zip_size(), addr,
RW_SX_LATCH, &mtr2);
ut_ad(node);
addr= flst_get_next_addr(node);
mtr2.commit();
}
......@@ -383,6 +386,7 @@ void flst_validate(const buf_block_t *base, uint16_t boffset, mtr_t *mtr)
const flst_node_t *node= fut_get_ptr(base->page.id().space(),
base->zip_size(), addr,
RW_SX_LATCH, &mtr2);
ut_ad(node);
addr= flst_get_prev_addr(node);
mtr2.commit();
}
......
/*****************************************************************************
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2019, MariaDB Corporation.
Copyright (c) 2019, 2021, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -39,7 +39,7 @@ Created 12/13/1995 Heikki Tuuri
@param[in,out] mtr mini-transaction
@return pointer to a byte in (*ptr_block)->frame; the *ptr_block is
bufferfixed and latched */
UNIV_INLINE
inline
byte*
fut_get_ptr(
ulint space,
......@@ -57,10 +57,15 @@ fut_get_ptr(
|| (rw_latch == RW_X_LATCH)
|| (rw_latch == RW_SX_LATCH));
block = buf_page_get(page_id_t(space, addr.page), zip_size,
rw_latch, mtr);
ptr = buf_block_get_frame(block) + addr.boffset;
block = buf_page_get_gen(page_id_t(space, addr.page), zip_size,
rw_latch, nullptr, BUF_GET_POSSIBLY_FREED,
mtr);
if (!block) {
} else if (block->page.status == buf_page_t::FREED) {
block = nullptr;
} else {
ptr = buf_block_get_frame(block) + addr.boffset;
}
if (ptr_block != NULL) {
*ptr_block = block;
......
......@@ -633,11 +633,17 @@ struct mtr_t {
{ ut_ad(!m_commit || m_start); return m_start && !m_commit; }
/** @return whether the mini-transaction has been committed */
bool has_committed() const { ut_ad(!m_commit || m_start); return m_commit; }
/** @return whether the mini-transaction is freeing an index tree */
bool is_freeing_tree() const { return m_freeing_tree; }
/** Notify that the mini-transaction is freeing an index tree */
void freeing_tree() { m_freeing_tree= true; }
private:
/** whether start() has been called */
bool m_start= false;
/** whether commit() has been called */
bool m_commit= false;
/** whether freeing_tree() has been called */
bool m_freeing_tree= false;
#endif
/** The page of the most recent m_log record written, or NULL */
......
......@@ -362,6 +362,7 @@ void mtr_t::start()
ut_d(m_start= true);
ut_d(m_commit= false);
ut_d(m_freeing_tree= false);
m_last= nullptr;
m_last_offset= 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment