Commit d00185c4 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12353: Replace MLOG_PAGE_CREATE_RTREE, MLOG_PAGE_COMP_CREATE_RTREE

page_create(): Create normal B-tree pages. Callers that create
R-tree pages will set FIL_PAGE_TYPE and reset the split
sequence number afterwards.

The creation of ROW_FORMAT=COMPRESSED pages is unaffected;
they will be logged as compressed page images.

page_create_low(): Take const buf_block_t* as a parameter.
Let the callers invoke buf_block_modify_clock_inc().
parent b3d02a1f
......@@ -438,28 +438,32 @@ btr_page_create(
ulint level, /*!< in: the B-tree level of the page */
mtr_t* mtr) /*!< in: mtr */
{
page_t* page = buf_block_get_frame(block);
ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
byte *index_id= &page[PAGE_HEADER + PAGE_INDEX_ID];
byte *index_id= &block->frame[PAGE_HEADER + PAGE_INDEX_ID];
if (UNIV_LIKELY_NULL(page_zip)) {
page_create_zip(block, index, level, 0, mtr);
mach_write_to_8(index_id, index->id);
page_zip_write_header(page_zip, index_id, 8, mtr);
} else {
page_create(block, mtr, dict_table_is_comp(index->table),
dict_index_is_spatial(index));
page_create(block, mtr, dict_table_is_comp(index->table));
if (index->is_spatial()) {
static_assert(((FIL_PAGE_INDEX & 0xff00)
| byte(FIL_PAGE_RTREE))
== FIL_PAGE_RTREE, "compatibility");
mtr->write<1>(*block, FIL_PAGE_TYPE + 1 + block->frame,
byte(FIL_PAGE_RTREE));
if (mach_read_from_8(block->frame
+ FIL_RTREE_SPLIT_SEQ_NUM)) {
mtr->memset(block, FIL_RTREE_SPLIT_SEQ_NUM,
8, 0);
}
}
/* Set the level of the new index page */
mtr->write<2,mtr_t::OPT>(*block, PAGE_HEADER + PAGE_LEVEL
+ block->frame, level);
mtr->write<8,mtr_t::OPT>(*block, index_id, index->id);
}
/* For Spatial Index, initialize the Split Sequence Number */
if (dict_index_is_spatial(index)) {
page_set_ssn_id(block, page_zip, 0, mtr);
}
}
/**************************************************************//**
......@@ -1127,8 +1131,19 @@ btr_create(
memset_aligned<8>(FIL_PAGE_PREV + block->page.zip.data,
0xff, 8);
} else {
page_create(block, mtr, index->table->not_redundant(),
index->is_spatial());
page_create(block, mtr, index->table->not_redundant());
if (index->is_spatial()) {
static_assert(((FIL_PAGE_INDEX & 0xff00)
| byte(FIL_PAGE_RTREE))
== FIL_PAGE_RTREE, "compatibility");
mtr->write<1>(*block, FIL_PAGE_TYPE + 1 + block->frame,
byte(FIL_PAGE_RTREE));
if (mach_read_from_8(block->frame
+ FIL_RTREE_SPLIT_SEQ_NUM)) {
mtr->memset(block, FIL_RTREE_SPLIT_SEQ_NUM,
8, 0);
}
}
/* Set the level of the new index page */
mtr->write<2,mtr_t::OPT>(*block, PAGE_HEADER + PAGE_LEVEL
+ block->frame, 0U);
......@@ -1382,7 +1397,6 @@ static void btr_page_reorganize_low(page_cur_t *cursor, dict_index_t *index,
ulint max_ins_size1;
ulint max_ins_size2;
ulint pos;
bool is_spatial;
ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
ut_ad(!is_buf_block_get_page_zip(block));
......@@ -1400,11 +1414,6 @@ static void btr_page_reorganize_low(page_cur_t *cursor, dict_index_t *index,
MONITOR_INC(MONITOR_INDEX_REORG_ATTEMPTS);
/* This function can be called by log redo with a "dummy" index.
So we would trust more on the original page's type */
is_spatial = (fil_page_get_type(page) == FIL_PAGE_RTREE
|| dict_index_is_spatial(index));
/* Copy the old page to temporary space */
memcpy_aligned<UNIV_PAGE_SIZE_MIN>(temp_block->frame, block->frame,
srv_page_size);
......@@ -1419,7 +1428,17 @@ static void btr_page_reorganize_low(page_cur_t *cursor, dict_index_t *index,
/* Recreate the page: note that global data on page (possible
segment headers, next page-field, etc.) is preserved intact */
page_create(block, mtr, dict_table_is_comp(index->table), is_spatial);
page_create(block, mtr, index->table->not_redundant());
if (index->is_spatial()) {
static_assert(((FIL_PAGE_INDEX & 0xff00)
| byte(FIL_PAGE_RTREE))
== FIL_PAGE_RTREE, "compatibility");
mtr->write<1>(*block, FIL_PAGE_TYPE + 1 + block->frame,
byte(FIL_PAGE_RTREE));
if (mach_read_from_8(block->frame + FIL_RTREE_SPLIT_SEQ_NUM)) {
mtr->memset(block, FIL_RTREE_SPLIT_SEQ_NUM, 8, 0);
}
}
/* Copy the records from the temporary space to the recreated page;
do not copy the lock bits yet */
......@@ -1682,8 +1701,19 @@ btr_page_empty(
if (page_zip) {
page_create_zip(block, index, level, autoinc, mtr);
} else {
page_create(block, mtr, dict_table_is_comp(index->table),
dict_index_is_spatial(index));
page_create(block, mtr, index->table->not_redundant());
if (index->is_spatial()) {
static_assert(((FIL_PAGE_INDEX & 0xff00)
| byte(FIL_PAGE_RTREE))
== FIL_PAGE_RTREE, "compatibility");
mtr->write<1>(*block, FIL_PAGE_TYPE + 1 + block->frame,
byte(FIL_PAGE_RTREE));
if (mach_read_from_8(block->frame
+ FIL_RTREE_SPLIT_SEQ_NUM)) {
mtr->memset(block, FIL_RTREE_SPLIT_SEQ_NUM,
8, 0);
}
}
mtr->write<2,mtr_t::OPT>(*block, PAGE_HEADER + PAGE_LEVEL
+ block->frame, level);
if (autoinc) {
......
......@@ -100,8 +100,7 @@ PageBulk::init()
} else {
ut_ad(!m_index->is_spatial());
page_create(new_block, &m_mtr,
m_index->table->not_redundant(),
false);
m_index->table->not_redundant());
compile_time_assert(FIL_PAGE_NEXT
== FIL_PAGE_PREV + 4);
compile_time_assert(FIL_NULL == 0xffffffff);
......
......@@ -662,8 +662,7 @@ btr_scrub_free_page(
}
page_create(block, mtr,
dict_table_is_comp(scrub_data->current_table),
dict_index_is_spatial(scrub_data->current_index));
dict_table_is_comp(scrub_data->current_table));
mtr_commit(mtr);
......
......@@ -983,21 +983,14 @@ inline
uint16_t
page_get_instant(const page_t* page);
/** Create an uncompressed index page.
@param[in,out] block buffer block
@param[in,out] mtr mini-transaction
@param[in] comp set unless ROW_FORMAT=REDUNDANT */
void page_create(buf_block_t* block, mtr_t* mtr, bool comp);
/**********************************************************//**
Create an uncompressed B-tree index page.
@return pointer to the page */
page_t*
page_create(
/*========*/
buf_block_t* block, /*!< in: a buffer block where the
page is created */
mtr_t* mtr, /*!< in: mini-transaction handle */
ulint comp, /*!< in: nonzero=compact page format */
bool is_rtree); /*!< in: if creating R-tree page */
/**********************************************************//**
Create a compressed B-tree index page.
@return pointer to the page */
page_t*
Create a compressed B-tree index page. */
void
page_create_zip(
/*============*/
buf_block_t* block, /*!< in/out: a buffer frame
......@@ -1157,15 +1150,10 @@ page_parse_delete_rec_list(
buf_block_t* block, /*!< in/out: buffer block or NULL */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr); /*!< in: mtr or NULL */
/** Parses a redo log record of creating a page.
@param[in,out] block buffer block, or NULL
@param[in] comp nonzero=compact page format
@param[in] is_rtree whether it is rtree page */
void
page_parse_create(
buf_block_t* block,
ulint comp,
bool is_rtree);
/** Create an index page.
@param[in,out] block buffer block
@param[in] comp nonzero=compact page format */
void page_create_low(const buf_block_t* block, bool comp);
/************************************************************//**
Prints record contents including the data relevant only in
......
......@@ -1665,13 +1665,23 @@ recv_parse_or_apply_log_rec_body(
}
break;
case MLOG_PAGE_CREATE: case MLOG_COMP_PAGE_CREATE:
case MLOG_PAGE_CREATE_RTREE: case MLOG_COMP_PAGE_CREATE_RTREE:
/* Allow anything in page_type when creating a page. */
ut_a(!page_zip);
page_parse_create(block, type == MLOG_COMP_PAGE_CREATE, false);
if (!block) {
break;
case MLOG_PAGE_CREATE_RTREE: case MLOG_COMP_PAGE_CREATE_RTREE:
page_parse_create(block, type == MLOG_COMP_PAGE_CREATE_RTREE,
true);
}
page_create_low(block, type == MLOG_COMP_PAGE_CREATE
|| type == MLOG_COMP_PAGE_CREATE_RTREE);
if (type == MLOG_PAGE_CREATE_RTREE
|| type == MLOG_COMP_PAGE_CREATE_RTREE) {
static_assert(((FIL_PAGE_INDEX & 0xff00)
| byte(FIL_PAGE_RTREE))
== FIL_PAGE_RTREE, "compatibility");
page[FIL_PAGE_TYPE] = byte(FIL_PAGE_RTREE);
memset(page + FIL_RTREE_SPLIT_SEQ_NUM, 0, 8);
}
break;
case MLOG_UNDO_INSERT:
ut_ad(!page || page_type == FIL_PAGE_UNDO_LOG);
......@@ -2036,7 +2046,7 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
const recv_t* recv = static_cast<const recv_t*>(l);
ut_ad(recv->start_lsn);
ut_ad(recv_start_lsn < recv->start_lsn);
ut_ad(recv_start_lsn <= recv->start_lsn);
ut_d(recv_start_lsn = recv->start_lsn);
if (recv->start_lsn < page_lsn) {
......@@ -2120,11 +2130,12 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
#endif /* UNIV_ZIP_DEBUG */
if (start_lsn) {
buf_block_modify_clock_inc(block);
log_flush_order_mutex_enter();
buf_flush_note_modification(block, start_lsn, end_lsn);
log_flush_order_mutex_exit();
} else if (free_page && init) {
/* There have been no operations than MLOG_INIT_FREE_PAGE.
/* There have been no operations that modify the page.
Any buffered changes must not be merged. A subsequent
buf_page_create() from a user thread should discard
any buffered changes. */
......
......@@ -274,17 +274,10 @@ static const byte infimum_supremum_compact[] = {
's', 'u', 'p', 'r', 'e', 'm', 'u', 'm'
};
/**********************************************************//**
The index page creation function.
@return pointer to the page */
static
page_t*
page_create_low(
/*============*/
buf_block_t* block, /*!< in: a buffer block where the
page is created */
ulint comp, /*!< in: nonzero=compact page format */
bool is_rtree) /*!< in: if it is an R-Tree page */
/** Create an index page.
@param[in,out] block buffer block
@param[in] comp nonzero=compact page format */
void page_create_low(const buf_block_t* block, bool comp)
{
page_t* page;
......@@ -293,15 +286,9 @@ page_create_low(
compile_time_assert(PAGE_BTR_IBUF_FREE_LIST_NODE + FLST_NODE_SIZE
<= PAGE_DATA);
buf_block_modify_clock_inc(block);
page = buf_block_get_frame(block);
if (is_rtree) {
fil_page_set_type(page, FIL_PAGE_RTREE);
} else {
fil_page_set_type(page, FIL_PAGE_INDEX);
}
memset(page + PAGE_HEADER, 0, PAGE_HEADER_PRIV_END);
page[PAGE_HEADER + PAGE_N_DIR_SLOTS + 1] = 2;
......@@ -334,36 +321,13 @@ page_create_low(
page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE + 1]
= PAGE_OLD_INFIMUM;
}
return(page);
}
/** Parses a redo log record of creating a page.
@param[in,out] block buffer block, or NULL
@param[in] comp nonzero=compact page format
@param[in] is_rtree whether it is rtree page */
void
page_parse_create(
buf_block_t* block,
ulint comp,
bool is_rtree)
{
if (block != NULL) {
page_create_low(block, comp, is_rtree);
}
}
/**********************************************************//**
Create an uncompressed B-tree or R-tree index page.
@return pointer to the page */
page_t*
page_create(
/*========*/
buf_block_t* block, /*!< in: a buffer block where the
page is created */
mtr_t* mtr, /*!< in: mini-transaction handle */
ulint comp, /*!< in: nonzero=compact page format */
bool is_rtree) /*!< in: whether it is a R-Tree page */
/** Create an uncompressed index page.
@param[in,out] block buffer block
@param[in,out] mtr mini-transaction
@param[in] comp set unless ROW_FORMAT=REDUNDANT */
void page_create(buf_block_t* block, mtr_t* mtr, bool comp)
{
ut_ad(mtr->is_named_space(block->page.id.space()));
mtr->set_modified();
......@@ -371,24 +335,21 @@ page_create(
ut_ad(mtr->get_log_mode() == MTR_LOG_NONE
|| mtr->get_log_mode() == MTR_LOG_NO_REDO);
} else {
mlog_id_t type = is_rtree
? (comp
? MLOG_COMP_PAGE_CREATE_RTREE
: MLOG_PAGE_CREATE_RTREE)
: (comp ? MLOG_COMP_PAGE_CREATE : MLOG_PAGE_CREATE);
mlog_id_t type = comp
? MLOG_COMP_PAGE_CREATE : MLOG_PAGE_CREATE;
byte *l= mtr->get_log()->open(11);
l = mlog_write_initial_log_record_low(
type, block->page.id.space(), block->page.id.page_no(),
l, mtr);
mlog_close(mtr, l);
}
return(page_create_low(block, comp, is_rtree));
buf_block_modify_clock_inc(block);
page_create_low(block, comp);
}
/**********************************************************//**
Create a compressed B-tree index page.
@return pointer to the page */
page_t*
Create a compressed B-tree index page. */
void
page_create_zip(
/*============*/
buf_block_t* block, /*!< in/out: a buffer frame
......@@ -401,8 +362,6 @@ page_create_zip(
mtr_t* mtr) /*!< in/out: mini-transaction
handle */
{
page_t* page;
ut_ad(block);
ut_ad(buf_block_get_page_zip(block));
ut_ad(dict_table_is_comp(index->table));
......@@ -423,17 +382,24 @@ page_create_zip(
|| !dict_index_is_sec_or_ibuf(index)
|| index->table->is_temporary());
page = page_create_low(block, TRUE, dict_index_is_spatial(index));
mach_write_to_2(PAGE_HEADER + PAGE_LEVEL + page, level);
mach_write_to_8(PAGE_HEADER + PAGE_MAX_TRX_ID + page, max_trx_id);
buf_block_modify_clock_inc(block);
page_create_low(block, true);
if (index->is_spatial()) {
mach_write_to_2(FIL_PAGE_TYPE + block->frame, FIL_PAGE_RTREE);
memset(block->frame + FIL_RTREE_SPLIT_SEQ_NUM, 0, 8);
memset(block->page.zip.data + FIL_RTREE_SPLIT_SEQ_NUM, 0, 8);
}
mach_write_to_2(PAGE_HEADER + PAGE_LEVEL + block->frame, level);
mach_write_to_8(PAGE_HEADER + PAGE_MAX_TRX_ID + block->frame,
max_trx_id);
if (!page_zip_compress(block, index, page_zip_level, mtr)) {
/* The compression of a newly created
page should always succeed. */
ut_error;
}
return(page);
}
/**********************************************************//**
......@@ -446,10 +412,9 @@ page_create_empty(
mtr_t* mtr) /*!< in/out: mini-transaction */
{
trx_id_t max_trx_id;
page_t* page = buf_block_get_frame(block);
page_zip_des_t* page_zip= buf_block_get_page_zip(block);
ut_ad(fil_page_index_page_check(page));
ut_ad(fil_page_index_page_check(block->frame));
ut_ad(!index->is_dummy);
ut_ad(block->page.id.space() == index->table->space->id);
......@@ -459,12 +424,12 @@ page_create_empty(
for MVCC. */
if (dict_index_is_sec_or_ibuf(index)
&& !index->table->is_temporary()
&& page_is_leaf(page)) {
max_trx_id = page_get_max_trx_id(page);
&& page_is_leaf(block->frame)) {
max_trx_id = page_get_max_trx_id(block->frame);
ut_ad(max_trx_id);
} else if (block->page.id.page_no() == index->page) {
/* Preserve PAGE_ROOT_AUTO_INC. */
max_trx_id = page_get_max_trx_id(page);
max_trx_id = page_get_max_trx_id(block->frame);
} else {
max_trx_id = 0;
}
......@@ -472,11 +437,23 @@ page_create_empty(
if (page_zip) {
ut_ad(!index->table->is_temporary());
page_create_zip(block, index,
page_header_get_field(page, PAGE_LEVEL),
page_header_get_field(block->frame,
PAGE_LEVEL),
max_trx_id, mtr);
} else {
page_create(block, mtr, index->table->not_redundant(),
index->is_spatial());
page_create(block, mtr, index->table->not_redundant());
if (index->is_spatial()) {
static_assert(((FIL_PAGE_INDEX & 0xff00)
| byte(FIL_PAGE_RTREE))
== FIL_PAGE_RTREE, "compatibility");
mtr->write<1>(*block, FIL_PAGE_TYPE + 1 + block->frame,
byte(FIL_PAGE_RTREE));
if (mach_read_from_8(block->frame
+ FIL_RTREE_SPLIT_SEQ_NUM)) {
mtr->memset(block, FIL_RTREE_SPLIT_SEQ_NUM,
8, 0);
}
}
if (max_trx_id) {
mtr->write<8>(*block, PAGE_HEADER + PAGE_MAX_TRX_ID
......
......@@ -4667,6 +4667,7 @@ page_zip_reorganize(
page_t* temp_page;
ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
ut_ad(block->page.zip.data);
ut_ad(page_is_comp(page));
ut_ad(!dict_index_is_ibuf(index));
ut_ad(!index->table->is_temporary());
......@@ -4689,7 +4690,14 @@ page_zip_reorganize(
/* Recreate the page: note that global data on page (possible
segment headers, next page-field, etc.) is preserved intact */
page_create(block, mtr, TRUE, dict_index_is_spatial(index));
page_create(block, mtr, true);
if (index->is_spatial()) {
mach_write_to_2(FIL_PAGE_TYPE + page, FIL_PAGE_RTREE);
memcpy_aligned<2>(block->page.zip.data + FIL_PAGE_TYPE,
page + FIL_PAGE_TYPE, 2);
memset(FIL_RTREE_SPLIT_SEQ_NUM + page, 0, 8);
memset(FIL_RTREE_SPLIT_SEQ_NUM + block->page.zip.data, 0, 8);
}
/* Copy the records from the temporary space to the recreated page;
do not copy the lock bits yet */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment