Commit 054f1036 authored by Marko Mäkelä's avatar Marko Mäkelä

Merge 10.4 into 10.5

parents 20512a68 3280edda
......@@ -29,6 +29,7 @@ Created 03/11/2014 Shaohua Wang
#include "btr0cur.h"
#include "btr0pcur.h"
#include "ibuf0ibuf.h"
#include "page0page.h"
#include "trx0trx.h"
/** Innodb B-tree index fill factor for bulk load. */
......@@ -123,7 +124,6 @@ PageBulk::init()
}
m_block = new_block;
m_block->skip_flush_check = true;
m_page = new_page;
m_page_no = new_page_no;
m_cur_rec = page_get_infimum_rec(new_page);
......@@ -142,7 +142,11 @@ PageBulk::init()
srv_page_size - dict_index_zip_pad_optimal_page_size(m_index);
m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP);
m_rec_no = page_header_get_field(new_page, PAGE_N_RECS);
/* Temporarily reset PAGE_DIRECTION_B from PAGE_NO_DIRECTION to 0,
without writing redo log, to ensure that needs_finish() will hold
on an empty page. */
ut_ad(m_page[PAGE_HEADER + PAGE_DIRECTION_B] == PAGE_NO_DIRECTION);
m_page[PAGE_HEADER + PAGE_DIRECTION_B] = 0;
ut_d(m_total_data = 0);
return(DB_SUCCESS);
......@@ -358,7 +362,6 @@ and set page header members.
template<PageBulk::format fmt>
inline void PageBulk::finishPage()
{
ut_ad(m_rec_no > 0);
ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED));
ut_ad((fmt != REDUNDANT) == m_is_comp);
......@@ -374,7 +377,7 @@ inline void PageBulk::finishPage()
ut_ad(offset >= PAGE_NEW_SUPREMUM - PAGE_NEW_INFIMUM);
offset= static_cast<uint16_t>(offset + PAGE_NEW_INFIMUM);
/* Set owner & dir. */
do
while (offset != PAGE_NEW_SUPREMUM)
{
ut_ad(offset >= PAGE_NEW_SUPREMUM);
ut_ad(offset < page_offset(slot));
......@@ -401,7 +404,6 @@ inline void PageBulk::finishPage()
ut_ad(next);
offset= next;
}
while (offset != PAGE_NEW_SUPREMUM);
if (slot0 != slot && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 <=
PAGE_DIR_SLOT_MAX_N_OWNED))
......@@ -431,7 +433,7 @@ inline void PageBulk::finishPage()
mach_read_from_2(PAGE_OLD_INFIMUM - REC_NEXT + m_page);
/* Set owner & dir. */
do
while (insert_rec != m_page + PAGE_OLD_SUPREMUM)
{
count++;
n_recs++;
......@@ -446,7 +448,6 @@ inline void PageBulk::finishPage()
insert_rec= m_page + mach_read_from_2(insert_rec - REC_NEXT);
}
while (insert_rec != m_page + PAGE_OLD_SUPREMUM);
if (slot0 != slot && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 <=
PAGE_DIR_SLOT_MAX_N_OWNED))
......@@ -465,9 +466,8 @@ inline void PageBulk::finishPage()
false, &m_mtr);
}
ut_ad(!m_index->is_spatial());
if (fmt != COMPRESSED)
if (!m_rec_no);
else if (fmt != COMPRESSED)
{
static_assert(PAGE_N_DIR_SLOTS == 0, "compatibility");
alignas(8) byte page_header[PAGE_N_HEAP + 2];
......@@ -496,45 +496,75 @@ inline void PageBulk::finishPage()
}
}
inline bool PageBulk::needs_finish() const
{
ut_ad(page_align(m_cur_rec) == m_block->frame);
ut_ad(m_page == m_block->frame);
if (!m_page[PAGE_HEADER + PAGE_DIRECTION_B])
return true;
ulint heap_no, n_heap= page_header_get_field(m_page, PAGE_N_HEAP);
ut_ad((n_heap & 0x7fff) >= PAGE_HEAP_NO_USER_LOW);
if (n_heap & 0x8000)
{
n_heap&= 0x7fff;
heap_no= rec_get_heap_no_new(m_cur_rec);
if (heap_no == PAGE_HEAP_NO_INFIMUM &&
page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_NEW_SUPREMUM_END)
return false;
}
else
{
heap_no= rec_get_heap_no_old(m_cur_rec);
if (heap_no == PAGE_HEAP_NO_INFIMUM &&
page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_OLD_SUPREMUM_END)
return false;
}
return heap_no != n_heap - 1;
}
/** Mark end of insertion to the page. Scan all records to set page dirs,
and set page header members.
@tparam compressed whether the page is in ROW_FORMAT=COMPRESSED */
inline void PageBulk::finish()
{
if (UNIV_LIKELY_NULL(m_page_zip))
ut_ad(!m_index->is_spatial());
if (!needs_finish());
else if (UNIV_LIKELY_NULL(m_page_zip))
finishPage<COMPRESSED>();
else if (m_is_comp)
finishPage<DYNAMIC>();
else
finishPage<REDUNDANT>();
/* In MariaDB 10.2, 10.3, 10.4, we would initialize
PAGE_DIRECTION_B, PAGE_N_DIRECTION, PAGE_LAST_INSERT
in the same way as we would during normal INSERT operations.
Starting with MariaDB Server 10.5, bulk insert will not
touch those fields. */
ut_ad(!m_page[PAGE_HEADER + PAGE_INSTANT]);
/* Restore the temporary change of PageBulk::init() that was necessary to
ensure that PageBulk::needs_finish() holds on an empty page. */
m_page[PAGE_HEADER + PAGE_DIRECTION_B]= PAGE_NO_DIRECTION;
ut_ad(!page_header_get_field(m_page, PAGE_FREE));
ut_ad(!page_header_get_field(m_page, PAGE_GARBAGE));
ut_ad(!page_header_get_field(m_page, PAGE_LAST_INSERT));
ut_ad(page_header_get_field(m_page, PAGE_INSTANT) == PAGE_NO_DIRECTION);
ut_ad(!page_header_get_field(m_page, PAGE_N_DIRECTION));
ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no) <=
page_get_free_space_of_empty(m_is_comp));
m_block->skip_flush_check= false;
ut_ad(!needs_finish());
ut_ad(page_validate(m_page, m_index));
}
/** Commit inserts done to the page
@param[in] success Flag whether all inserts succeed. */
void
PageBulk::commit(
bool success)
void PageBulk::commit(bool success)
{
if (success) {
ut_ad(page_validate(m_page, m_index));
/* Set no free space left and no buffered changes in ibuf. */
if (!dict_index_is_clust(m_index) && page_is_leaf(m_page)) {
ibuf_set_bitmap_for_bulk_load(
m_block, innobase_fill_factor == 100);
}
}
m_mtr.commit();
finish();
if (success && !dict_index_is_clust(m_index) && page_is_leaf(m_page))
ibuf_set_bitmap_for_bulk_load(m_block, innobase_fill_factor == 100);
m_mtr.commit();
}
/** Compress a page of compressed table
......@@ -775,7 +805,9 @@ PageBulk::storeExt(
const big_rec_t* big_rec,
rec_offs* offsets)
{
/* Note: not all fileds are initialized in btr_pcur. */
finish();
/* Note: not all fields are initialized in btr_pcur. */
btr_pcur_t btr_pcur;
btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED;
btr_pcur.latch_mode = BTR_MODIFY_LEAF;
......@@ -804,7 +836,7 @@ Note: log_free_check requires holding no lock/latch in current thread. */
void
PageBulk::release()
{
ut_ad(!dict_index_is_spatial(m_index));
finish();
/* We fix the block because we will re-pin it soon. */
buf_block_buf_fix_inc(m_block, __FILE__, __LINE__);
......@@ -856,12 +888,11 @@ BtrBulk::pageSplit(
{
ut_ad(page_bulk->getPageZip() != NULL);
/* 1. Check if we have only one user record on the page. */
if (page_bulk->getRecNo() <= 1) {
return(DB_TOO_BIG_RECORD);
}
/* 2. create a new page. */
/* Initialize a new page */
PageBulk new_page_bulk(m_index, m_trx->id, FIL_NULL,
page_bulk->getLevel());
dberr_t err = new_page_bulk.init();
......@@ -869,19 +900,18 @@ BtrBulk::pageSplit(
return(err);
}
/* 3. copy the upper half to new page. */
/* Copy the upper half to the new page. */
rec_t* split_rec = page_bulk->getSplitRec();
new_page_bulk.copyIn(split_rec);
page_bulk->copyOut(split_rec);
/* 4. commit the splitted page. */
/* Commit the pages after split. */
err = pageCommit(page_bulk, &new_page_bulk, true);
if (err != DB_SUCCESS) {
pageAbort(&new_page_bulk);
return(err);
}
/* 5. commit the new page. */
err = pageCommit(&new_page_bulk, next_page_bulk, true);
if (err != DB_SUCCESS) {
pageAbort(&new_page_bulk);
......@@ -1103,11 +1133,9 @@ BtrBulk::insert(
ut_ad(page_bulk->getLevel() == 0);
ut_ad(page_bulk == m_page_bulks.at(0));
/* Release all latched but leaf node. */
/* Release all pages above the leaf level */
for (ulint level = 1; level <= m_root_level; level++) {
PageBulk* page_bulk = m_page_bulks.at(level);
page_bulk->release();
m_page_bulks.at(level)->release();
}
err = page_bulk->storeExt(big_rec, offsets);
......@@ -1190,6 +1218,7 @@ BtrBulk::finish(dberr_t err)
return(err);
}
root_page_bulk.copyIn(first_rec);
root_page_bulk.finish();
/* Remove last page. */
btr_page_free(m_index, last_block, &mtr);
......
......@@ -1306,8 +1306,6 @@ buf_block_init(buf_block_t* block, byte* frame)
#ifdef BTR_CUR_HASH_ADAPT
block->index = NULL;
#endif /* BTR_CUR_HASH_ADAPT */
block->skip_flush_check = false;
ut_d(block->in_unzip_LRU_list = false);
ut_d(block->in_withdraw_list = false);
......@@ -2801,7 +2799,6 @@ buf_block_init_low(
/*===============*/
buf_block_t* block) /*!< in: block to init */
{
block->skip_flush_check = false;
#ifdef BTR_CUR_HASH_ADAPT
/* No adaptive hash index entries may point to a previously
unused (and now freshly allocated) block. */
......
......@@ -784,10 +784,6 @@ buf_dblwr_check_block(
{
ut_ad(block->page.state() == BUF_BLOCK_FILE_PAGE);
if (block->skip_flush_check) {
return;
}
switch (fil_page_get_type(block->frame)) {
case FIL_PAGE_INDEX:
case FIL_PAGE_TYPE_INSTANT:
......
......@@ -760,12 +760,11 @@ buf_block_t* buf_LRU_get_free_block(bool have_mutex)
/* If there is a block in the free list, take it */
block = buf_LRU_get_free_only();
if (block != NULL) {
if (block) {
if (!have_mutex) {
mutex_exit(&buf_pool.mutex);
}
memset(&block->page.zip, 0, sizeof block->page.zip);
block->skip_flush_check = false;
return(block);
}
......
......@@ -1784,12 +1784,6 @@ fil_crypt_rotate_page(
mtr.write<1,mtr_t::FORCED>(*block,
&frame[FIL_PAGE_SPACE_ID],
frame[FIL_PAGE_SPACE_ID]);
/* This may be a freed page. Until
MDEV-21347 has been fixed, a page on which
BtrBulk::finish() invoked btr_page_free() may
be an inconsistent B-tree page. For now,
let us disable the flush-time check. */
block->skip_flush_check = true;
/* statistics */
state->crypt_stat.pages_modified++;
......
......@@ -118,6 +118,9 @@ class PageBulk
dirs, and set page header members. */
inline void finish();
/** @return whether finish() actually needs to do something */
inline bool needs_finish() const;
/** Commit mtr for a page
@param[in] success Flag whether all inserts succeed. */
void commit(bool success);
......
......@@ -1186,9 +1186,6 @@ struct buf_block_t{
# define assert_block_ahi_empty_on_init(block) /* nothing */
# define assert_block_ahi_valid(block) /* nothing */
#endif /* BTR_CUR_HASH_ADAPT */
bool skip_flush_check;
/*!< Skip check in buf_dblwr_check_block
during bulk load, protected by lock.*/
# ifdef UNIV_DEBUG
/** @name Debug fields */
/* @{ */
......
......@@ -1608,10 +1608,9 @@ page_simple_validate_old(
n_slots = page_dir_get_n_slots(page);
if (UNIV_UNLIKELY(n_slots > srv_page_size / 4)) {
ib::error() << "Nonsensical number " << n_slots
<< " of page dir slots";
if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
ib::error() << "Nonsensical number of page dir slots: "
<< n_slots;
goto func_exit;
}
......@@ -1808,10 +1807,9 @@ page_simple_validate_new(
n_slots = page_dir_get_n_slots(page);
if (UNIV_UNLIKELY(n_slots > srv_page_size / 4)) {
ib::error() << "Nonsensical number " << n_slots
<< " of page dir slots";
if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
ib::error() << "Nonsensical number of page dir slots: "
<< n_slots;
goto func_exit;
}
......@@ -2023,6 +2021,7 @@ bool page_validate(const page_t* page, const dict_index_t* index)
<< " of table " << index->table->name;
return FALSE;
}
if (page_is_comp(page)) {
if (UNIV_UNLIKELY(!page_simple_validate_new(page))) {
goto func_exit2;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment