Commit 3280edda authored by Marko Mäkelä's avatar Marko Mäkelä

Merge 10.3 into 10.4

parents ced3ec4c 73aa31fb
/*****************************************************************************
Copyright (c) 2014, 2019, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2019, MariaDB Corporation.
Copyright (c) 2017, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -29,6 +29,7 @@ Created 03/11/2014 Shaohua Wang
#include "btr0cur.h"
#include "btr0pcur.h"
#include "ibuf0ibuf.h"
#include "page0page.h"
#include "trx0trx.h"
/** Innodb B-tree index fill factor for bulk load. */
......@@ -140,7 +141,6 @@ PageBulk::init()
}
m_block = new_block;
m_block->skip_flush_check = true;
m_page = new_page;
m_page_zip = new_page_zip;
m_page_no = new_page_no;
......@@ -160,7 +160,11 @@ PageBulk::init()
srv_page_size - dict_index_zip_pad_optimal_page_size(m_index);
m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP);
m_rec_no = page_header_get_field(new_page, PAGE_N_RECS);
/* Temporarily reset PAGE_DIRECTION_B from PAGE_NO_DIRECTION to 0,
without writing redo log, to ensure that needs_finish() will hold
on an empty page. */
ut_ad(m_page[PAGE_HEADER + PAGE_DIRECTION_B] == PAGE_NO_DIRECTION);
m_page[PAGE_HEADER + PAGE_DIRECTION_B] = 0;
ut_d(m_total_data = 0);
/* See page_copy_rec_list_end_to_created_page() */
ut_d(page_header_set_field(m_page, NULL, PAGE_HEAP_TOP,
......@@ -186,7 +190,7 @@ PageBulk::insert(
#ifdef UNIV_DEBUG
/* Check whether records are in order. */
if (!page_rec_is_infimum(m_cur_rec)) {
if (!page_rec_is_infimum_low(page_offset(m_cur_rec))) {
rec_t* old_rec = m_cur_rec;
rec_offs* old_offsets = rec_get_offsets(
old_rec, m_index, NULL, is_leaf,
......@@ -204,18 +208,21 @@ PageBulk::insert(
rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets);
/* 2. Insert the record in the linked list. */
rec_t* next_rec = page_rec_get_next(m_cur_rec);
page_rec_set_next(insert_rec, next_rec);
page_rec_set_next(m_cur_rec, insert_rec);
/* 3. Set the n_owned field in the inserted record to zero,
and set the heap_no field. */
if (m_is_comp) {
ulint next_offs = rec_get_next_offs(m_cur_rec, TRUE);
rec_set_next_offs_new(insert_rec, next_offs);
rec_set_next_offs_new(m_cur_rec, page_offset(insert_rec));
rec_set_n_owned_new(insert_rec, NULL, 0);
rec_set_heap_no_new(insert_rec,
PAGE_HEAP_NO_USER_LOW + m_rec_no);
} else {
ulint next_offs = rec_get_next_offs(m_cur_rec, FALSE);
rec_set_next_offs_old(insert_rec, next_offs);
rec_set_next_offs_old(m_cur_rec, page_offset(insert_rec));
rec_set_n_owned_old(insert_rec, 0);
rec_set_heap_no_old(insert_rec,
PAGE_HEAP_NO_USER_LOW + m_rec_no);
......@@ -243,17 +250,54 @@ PageBulk::insert(
m_cur_rec = insert_rec;
}
inline bool PageBulk::needs_finish() const
{
ut_ad(page_align(m_cur_rec) == m_block->frame);
ut_ad(m_page == m_block->frame);
if (!m_page[PAGE_HEADER + PAGE_DIRECTION_B])
return true;
ulint heap_no, n_heap= page_header_get_field(m_page, PAGE_N_HEAP);
ut_ad((n_heap & 0x7fff) >= PAGE_HEAP_NO_USER_LOW);
if (n_heap & 0x8000)
{
n_heap&= 0x7fff;
heap_no= rec_get_heap_no_new(m_cur_rec);
if (heap_no == PAGE_HEAP_NO_INFIMUM &&
page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_NEW_SUPREMUM_END)
return false;
}
else
{
heap_no= rec_get_heap_no_old(m_cur_rec);
if (heap_no == PAGE_HEAP_NO_INFIMUM &&
page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_OLD_SUPREMUM_END)
return false;
}
return heap_no != n_heap - 1;
}
/** Mark end of insertion to the page. Scan all records to set page dirs,
and set page header members.
Note: we refer to page_copy_rec_list_end_to_created_page. */
void
PageBulk::finish()
{
ut_ad(m_rec_no > 0);
ut_ad(!dict_index_is_spatial(m_index));
if (!needs_finish()) {
return;
}
ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no)
<= page_get_free_space_of_empty(m_is_comp));
#ifdef UNIV_DEBUG
/* See page_copy_rec_list_end_to_created_page() */
ut_d(page_dir_set_n_slots(m_page, NULL, srv_page_size / 2));
if (m_rec_no) {
page_dir_set_n_slots(m_page, NULL, srv_page_size / 2);
}
mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
ulint(m_heap_top - m_page));
#endif
ulint count = 0;
ulint n_recs = 0;
......@@ -262,8 +306,7 @@ PageBulk::finish()
page_dir_slot_t* slot = NULL;
/* Set owner & dir. */
do {
while (!page_rec_is_supremum(insert_rec)) {
count++;
n_recs++;
......@@ -280,7 +323,7 @@ PageBulk::finish()
}
insert_rec = page_rec_get_next(insert_rec);
} while (!page_rec_is_supremum(insert_rec));
}
if (slot_index > 0
&& (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
......@@ -303,10 +346,14 @@ PageBulk::finish()
page_dir_slot_set_rec(slot, page_get_supremum_rec(m_page));
page_dir_slot_set_n_owned(slot, NULL, count + 1);
ut_ad(!dict_index_is_spatial(m_index));
ut_ad(!page_get_instant(m_page));
if (!m_flush_observer && !m_page_zip) {
if (!m_rec_no) {
/* Restore PAGE_DIRECTION_B from 0 to
PAGE_NO_DIRECTION like it should be on an empty page,
again without writing redo log. */
m_page[PAGE_HEADER + PAGE_DIRECTION_B] = PAGE_NO_DIRECTION;
} else if (!m_flush_observer && !m_page_zip) {
mlog_write_ulint(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page,
2 + slot_index, MLOG_2BYTES, &m_mtr);
mlog_write_ulint(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
......@@ -343,25 +390,17 @@ PageBulk::finish()
mach_write_to_2(PAGE_HEADER + PAGE_N_DIRECTION + m_page, 0);
}
m_block->skip_flush_check = false;
ut_ad(!needs_finish());
ut_ad(page_validate(m_page, m_index));
}
/** Commit inserts done to the page
@param[in] success Flag whether all inserts succeed. */
void
PageBulk::commit(
bool success)
void PageBulk::commit(bool success)
{
if (success) {
ut_ad(page_validate(m_page, m_index));
/* Set no free space left and no buffered changes in ibuf. */
if (!dict_index_is_clust(m_index) && page_is_leaf(m_page)) {
ibuf_set_bitmap_for_bulk_load(
m_block, innobase_fill_factor == 100);
}
}
finish();
if (success && !dict_index_is_clust(m_index) && page_is_leaf(m_page))
ibuf_set_bitmap_for_bulk_load(m_block, innobase_fill_factor == 100);
m_mtr.commit();
}
......@@ -605,7 +644,9 @@ PageBulk::storeExt(
const big_rec_t* big_rec,
rec_offs* offsets)
{
/* Note: not all fileds are initialized in btr_pcur. */
finish();
/* Note: not all fields are initialized in btr_pcur. */
btr_pcur_t btr_pcur;
btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED;
btr_pcur.latch_mode = BTR_MODIFY_LEAF;
......@@ -634,7 +675,7 @@ Note: log_free_check requires holding no lock/latch in current thread. */
void
PageBulk::release()
{
ut_ad(!dict_index_is_spatial(m_index));
finish();
/* We fix the block because we will re-pin it soon. */
buf_block_buf_fix_inc(m_block, __FILE__, __LINE__);
......@@ -692,12 +733,11 @@ BtrBulk::pageSplit(
{
ut_ad(page_bulk->getPageZip() != NULL);
/* 1. Check if we have only one user record on the page. */
if (page_bulk->getRecNo() <= 1) {
return(DB_TOO_BIG_RECORD);
}
/* 2. create a new page. */
/* Initialize a new page */
PageBulk new_page_bulk(m_index, m_trx->id, FIL_NULL,
page_bulk->getLevel(), m_flush_observer);
dberr_t err = new_page_bulk.init();
......@@ -705,19 +745,18 @@ BtrBulk::pageSplit(
return(err);
}
/* 3. copy the upper half to new page. */
/* Copy the upper half to the new page. */
rec_t* split_rec = page_bulk->getSplitRec();
new_page_bulk.copyIn(split_rec);
page_bulk->copyOut(split_rec);
/* 4. commit the splitted page. */
/* Commit the pages after split. */
err = pageCommit(page_bulk, &new_page_bulk, true);
if (err != DB_SUCCESS) {
pageAbort(&new_page_bulk);
return(err);
}
/* 5. commit the new page. */
err = pageCommit(&new_page_bulk, next_page_bulk, true);
if (err != DB_SUCCESS) {
pageAbort(&new_page_bulk);
......@@ -943,11 +982,9 @@ BtrBulk::insert(
ut_ad(page_bulk->getLevel() == 0);
ut_ad(page_bulk == m_page_bulks.at(0));
/* Release all latched but leaf node. */
/* Release all pages above the leaf level */
for (ulint level = 1; level <= m_root_level; level++) {
PageBulk* page_bulk = m_page_bulks.at(level);
page_bulk->release();
m_page_bulks.at(level)->release();
}
err = page_bulk->storeExt(big_rec, offsets);
......@@ -1033,6 +1070,7 @@ BtrBulk::finish(dberr_t err)
return(err);
}
root_page_bulk.copyIn(first_rec);
root_page_bulk.finish();
/* Remove last page. */
btr_page_free(m_index, last_block, &mtr);
......
......@@ -1552,8 +1552,6 @@ buf_block_init(
#ifdef BTR_CUR_HASH_ADAPT
block->index = NULL;
#endif /* BTR_CUR_HASH_ADAPT */
block->skip_flush_check = false;
ut_d(block->page.in_page_hash = FALSE);
ut_d(block->page.in_zip_hash = FALSE);
ut_d(block->page.in_flush_list = FALSE);
......@@ -3854,7 +3852,6 @@ buf_block_init_low(
/*===============*/
buf_block_t* block) /*!< in: block to init */
{
block->skip_flush_check = false;
#ifdef BTR_CUR_HASH_ADAPT
/* No adaptive hash index entries may point to a previously
unused (and now freshly allocated) block. */
......
......@@ -858,10 +858,6 @@ buf_dblwr_check_block(
{
ut_ad(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
if (block->skip_flush_check) {
return;
}
switch (fil_page_get_type(block->frame)) {
case FIL_PAGE_INDEX:
case FIL_PAGE_TYPE_INSTANT:
......
......@@ -951,7 +951,6 @@ buf_LRU_get_free_block(
ut_ad(buf_pool_from_block(block) == buf_pool);
memset(&block->page.zip, 0, sizeof block->page.zip);
block->skip_flush_check = false;
block->page.flush_observer = NULL;
return(block);
}
......
/*****************************************************************************
Copyright (c) 2014, 2015, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2019, MariaDB Corporation.
Copyright (c) 2019, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -110,6 +110,9 @@ class PageBulk
dirs, and set page header members. */
void finish();
/** @return whether finish() actually needs to do something */
inline bool needs_finish() const;
/** Commit mtr for a page
@param[in] success Flag whether all inserts succeed. */
void commit(bool success);
......
......@@ -1813,9 +1813,6 @@ struct buf_block_t{
# define assert_block_ahi_empty_on_init(block) /* nothing */
# define assert_block_ahi_valid(block) /* nothing */
#endif /* BTR_CUR_HASH_ADAPT */
bool skip_flush_check;
/*!< Skip check in buf_dblwr_check_block
during bulk load, protected by lock.*/
# ifdef UNIV_DEBUG
/** @name Debug fields */
/* @{ */
......
......@@ -2002,10 +2002,9 @@ page_simple_validate_old(
n_slots = page_dir_get_n_slots(page);
if (UNIV_UNLIKELY(n_slots > srv_page_size / 4)) {
ib::error() << "Nonsensical number " << n_slots
<< " of page dir slots";
if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
ib::error() << "Nonsensical number of page dir slots: "
<< n_slots;
goto func_exit;
}
......@@ -2202,10 +2201,9 @@ page_simple_validate_new(
n_slots = page_dir_get_n_slots(page);
if (UNIV_UNLIKELY(n_slots > srv_page_size / 4)) {
ib::error() << "Nonsensical number " << n_slots
<< " of page dir slots";
if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
ib::error() << "Nonsensical number of page dir slots: "
<< n_slots;
goto func_exit;
}
......@@ -2417,6 +2415,7 @@ bool page_validate(const page_t* page, const dict_index_t* index)
<< " of table " << index->table->name;
return FALSE;
}
if (page_is_comp(page)) {
if (UNIV_UNLIKELY(!page_simple_validate_new(page))) {
goto func_exit2;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment