Commit 0f90728b authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-16809 Allow full redo logging for ALTER TABLE

Introduce the configuration option innodb_log_optimize_ddl
for controlling whether native index creation or table-rebuild
in InnoDB should keep optimizing the redo log
(and writing MLOG_INDEX_LOAD records to ensure that
concurrent backup would fail).

By default, we have innodb_log_optimize_ddl=ON, that is,
the default behaviour that was introduced in MariaDB 10.2.2
(with the merge of InnoDB from MySQL 5.7) will be unchanged.

BtrBulk::m_trx: Replaces m_trx_id. We must be able to check for
KILL QUERY even if !m_flush_observer (innodb_log_optimize_ddl=OFF).

page_cur_insert_rec_write_log(): Declare globally, so that this
can be called from PageBulk::insert().

row_merge_insert_index_tuples(): Remove the unused parameter trx_id.

row_merge_build_indexes(): Enable or disable redo logging based on
the innodb_log_optimize_ddl parameter.

PageBulk::init(), PageBulk::insert(), PageBulk::finish(): Write
redo log records if needed. For ROW_FORMAT=COMPRESSED, redo log
will be written in PageBulk::compress() unless we called
m_mtr.set_log_mode(MTR_LOG_NO_REDO).
parent 32eb5823
SET GLOBAL innodb_log_optimize_ddl=OFF;
CREATE TABLE tz(id BIGINT PRIMARY KEY, i INT)
ENGINE=InnoDB ROW_FORMAT=COMPRESSED;
INSERT INTO tz(id) select * from seq_1_to_10000;
CREATE TABLE tr(id BIGINT PRIMARY KEY, i INT)
ENGINE=InnoDB ROW_FORMAT=REDUNDANT;
INSERT INTO tr(id) select * from seq_1_to_10000;
CREATE TABLE td(id BIGINT PRIMARY KEY, i INT)
ENGINE=InnoDB;
INSERT INTO td(id) select * from seq_1_to_10000;
CREATE PROCEDURE a()
BEGIN
ALTER TABLE tz ADD INDEX(i);
ALTER TABLE tr ADD INDEX(i);
ALTER TABLE td ADD INDEX(i);
END //
call a();
# shutdown server
# remove datadir
# xtrabackup move back
# restart server
DROP PROCEDURE a;
CHECK TABLE tz,tr,td;
Table Op Msg_type Msg_text
test.tz check status OK
test.tr check status OK
test.td check status OK
SELECT COUNT(*) FROM tz;
COUNT(*)
10000
SELECT COUNT(*) FROM tr;
COUNT(*)
10000
SELECT COUNT(*) FROM td;
COUNT(*)
10000
DROP TABLE tz,tr,td;
# see unsupported_redo.test for the opposite (default) case
--source include/have_innodb.inc
--source include/have_sequence.inc
SET GLOBAL innodb_log_optimize_ddl=OFF;
CREATE TABLE tz(id BIGINT PRIMARY KEY, i INT)
ENGINE=InnoDB ROW_FORMAT=COMPRESSED;
INSERT INTO tz(id) select * from seq_1_to_10000;
CREATE TABLE tr(id BIGINT PRIMARY KEY, i INT)
ENGINE=InnoDB ROW_FORMAT=REDUNDANT;
INSERT INTO tr(id) select * from seq_1_to_10000;
CREATE TABLE td(id BIGINT PRIMARY KEY, i INT)
ENGINE=InnoDB;
INSERT INTO td(id) select * from seq_1_to_10000;
DELIMITER //;
CREATE PROCEDURE a()
BEGIN
ALTER TABLE tz ADD INDEX(i);
ALTER TABLE tr ADD INDEX(i);
ALTER TABLE td ADD INDEX(i);
END //
DELIMITER ;//
let $targetdir=$MYSQLTEST_VARDIR/tmp/backup;
send call a();
--disable_result_log
exec $XTRABACKUP --defaults-file=$MYSQLTEST_VARDIR/my.cnf --backup --target-dir=$targetdir;
--enable_result_log
exec $XTRABACKUP --prepare --target-dir=$targetdir;
reap;
-- source include/restart_and_restore.inc
--rmdir $targetdir
DROP PROCEDURE a;
CHECK TABLE tz,tr,td;
SELECT COUNT(*) FROM tz;
SELECT COUNT(*) FROM tr;
SELECT COUNT(*) FROM td;
DROP TABLE tz,tr,td;
......@@ -1798,6 +1798,20 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST NULL
READ_ONLY YES
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME INNODB_LOG_OPTIMIZE_DDL
SESSION_VALUE NULL
GLOBAL_VALUE ON
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE ON
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BOOLEAN
VARIABLE_COMMENT Reduce redo logging when natively creating indexes or rebuilding tables. Setting this OFF avoids delay due to page flushing and allows concurrent backup.
NUMERIC_MIN_VALUE NULL
NUMERIC_MAX_VALUE NULL
NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST OFF,ON
READ_ONLY NO
COMMAND_LINE_ARGUMENT OPTIONAL
VARIABLE_NAME INNODB_LOG_WRITE_AHEAD_SIZE
SESSION_VALUE NULL
GLOBAL_VALUE 8192
......
......@@ -29,9 +29,12 @@ Created 03/11/2014 Shaohua Wang
#include "btr0cur.h"
#include "btr0pcur.h"
#include "ibuf0ibuf.h"
#include "trx0trx.h"
/** Innodb B-tree index fill factor for bulk load. */
long innobase_fill_factor;
/** whether to reduce redo logging during ALTER TABLE */
my_bool innodb_log_optimize_ddl;
/** Initialize members, allocate page if needed and start mtr.
Note: we commit all mtrs on failure.
......@@ -49,8 +52,12 @@ PageBulk::init()
m_mtr.start();
mtr_x_lock(&m_index->lock, &m_mtr);
m_mtr.set_log_mode(MTR_LOG_NO_REDO);
m_mtr.set_flush_observer(m_flush_observer);
if (m_flush_observer) {
m_mtr.set_log_mode(MTR_LOG_NO_REDO);
m_mtr.set_flush_observer(m_flush_observer);
} else {
m_mtr.set_named_space(m_index->space);
}
if (m_page_no == FIL_NULL) {
mtr_t alloc_mtr;
......@@ -59,8 +66,8 @@ PageBulk::init()
because we don't guarantee pages are committed following
the allocation order, and we will always generate redo log
for page allocation, even when creating a new tablespace. */
mtr_start(&alloc_mtr);
alloc_mtr.set_named_space(dict_index_get_space(m_index));
alloc_mtr.start();
alloc_mtr.set_named_space(m_index->space);
ulint n_reserved;
bool success;
......@@ -90,18 +97,29 @@ PageBulk::init()
if (new_page_zip) {
page_create_zip(new_block, m_index, m_level, 0,
NULL, &m_mtr);
memset(FIL_PAGE_PREV + new_page, 0xff, 8);
page_zip_write_header(new_page_zip,
FIL_PAGE_PREV + new_page,
8, &m_mtr);
mach_write_to_8(PAGE_HEADER + PAGE_INDEX_ID + new_page,
m_index->id);
page_zip_write_header(new_page_zip,
PAGE_HEADER + PAGE_INDEX_ID
+ new_page, 8, &m_mtr);
} else {
ut_ad(!dict_index_is_spatial(m_index));
page_create(new_block, &m_mtr,
dict_table_is_comp(m_index->table),
false);
btr_page_set_level(new_page, NULL, m_level, &m_mtr);
mlog_write_ulint(FIL_PAGE_PREV + new_page, FIL_NULL,
MLOG_4BYTES, &m_mtr);
mlog_write_ulint(FIL_PAGE_NEXT + new_page, FIL_NULL,
MLOG_4BYTES, &m_mtr);
mlog_write_ulint(PAGE_HEADER + PAGE_LEVEL + new_page,
m_level, MLOG_2BYTES, &m_mtr);
mlog_write_ull(PAGE_HEADER + PAGE_INDEX_ID + new_page,
m_index->id, &m_mtr);
}
btr_page_set_next(new_page, NULL, FIL_NULL, &m_mtr);
btr_page_set_prev(new_page, NULL, FIL_NULL, &m_mtr);
btr_page_set_index_id(new_page, NULL, m_index->id, &m_mtr);
} else {
page_id_t page_id(dict_index_get_space(m_index), m_page_no);
page_size_t page_size(dict_table_page_size(m_index->table));
......@@ -116,11 +134,12 @@ PageBulk::init()
ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
btr_page_set_level(new_page, NULL, m_level, &m_mtr);
btr_page_set_level(new_page, new_page_zip, m_level, &m_mtr);
}
if (!m_level && dict_index_is_sec_or_ibuf(m_index)) {
page_update_max_trx_id(new_block, NULL, m_trx_id, &m_mtr);
page_update_max_trx_id(new_block, new_page_zip, m_trx_id,
&m_mtr);
}
m_block = new_block;
......@@ -146,7 +165,9 @@ PageBulk::init()
m_rec_no = page_header_get_field(new_page, PAGE_N_RECS);
ut_d(m_total_data = 0);
page_header_set_field(m_page, NULL, PAGE_HEAP_TOP, UNIV_PAGE_SIZE - 1);
/* See page_copy_rec_list_end_to_created_page() */
ut_d(page_header_set_field(m_page, NULL, PAGE_HEAP_TOP,
srv_page_size - 1));
return(DB_SUCCESS);
}
......@@ -213,6 +234,14 @@ PageBulk::insert(
m_free_space -= rec_size + slot_size;
m_heap_top += rec_size;
m_rec_no += 1;
if (!m_flush_observer && !m_page_zip) {
/* For ROW_FORMAT=COMPRESSED, redo log may be written
in PageBulk::compress(). */
page_cur_insert_rec_write_log(insert_rec, rec_size,
m_cur_rec, m_index, &m_mtr);
}
m_cur_rec = insert_rec;
}
......@@ -223,15 +252,10 @@ void
PageBulk::finish()
{
ut_ad(m_rec_no > 0);
#ifdef UNIV_DEBUG
ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no)
<= page_get_free_space_of_empty(m_is_comp));
/* To pass the debug tests we have to set these dummy values
in the debug version */
page_dir_set_n_slots(m_page, NULL, UNIV_PAGE_SIZE / 2);
#endif
/* See page_copy_rec_list_end_to_created_page() */
ut_d(page_dir_set_n_slots(m_page, NULL, srv_page_size / 2));
ulint count = 0;
ulint n_recs = 0;
......@@ -282,14 +306,43 @@ PageBulk::finish()
page_dir_slot_set_n_owned(slot, NULL, count + 1);
ut_ad(!dict_index_is_spatial(m_index));
page_dir_set_n_slots(m_page, NULL, 2 + slot_index);
page_header_set_ptr(m_page, NULL, PAGE_HEAP_TOP, m_heap_top);
page_dir_set_n_heap(m_page, NULL, PAGE_HEAP_NO_USER_LOW + m_rec_no);
page_header_set_field(m_page, NULL, PAGE_N_RECS, m_rec_no);
page_header_set_ptr(m_page, NULL, PAGE_LAST_INSERT, m_cur_rec);
page_header_set_field(m_page, NULL, PAGE_DIRECTION, PAGE_RIGHT);
page_header_set_field(m_page, NULL, PAGE_N_DIRECTION, 0);
if (!m_flush_observer && !m_page_zip) {
mlog_write_ulint(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page,
2 + slot_index, MLOG_2BYTES, &m_mtr);
mlog_write_ulint(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
ulint(m_heap_top - m_page),
MLOG_2BYTES, &m_mtr);
mlog_write_ulint(PAGE_HEADER + PAGE_N_HEAP + m_page,
(PAGE_HEAP_NO_USER_LOW + m_rec_no)
| ulint(m_is_comp) << 15,
MLOG_2BYTES, &m_mtr);
mlog_write_ulint(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no,
MLOG_2BYTES, &m_mtr);
mlog_write_ulint(PAGE_HEADER + PAGE_LAST_INSERT + m_page,
ulint(m_cur_rec - m_page),
MLOG_2BYTES, &m_mtr);
mlog_write_ulint(PAGE_HEADER + PAGE_DIRECTION + m_page,
PAGE_RIGHT, MLOG_2BYTES, &m_mtr);
mlog_write_ulint(PAGE_HEADER + PAGE_N_DIRECTION + m_page, 0,
MLOG_2BYTES, &m_mtr);
} else {
/* For ROW_FORMAT=COMPRESSED, redo log may be written
in PageBulk::compress(). */
mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page,
2 + slot_index);
mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
ulint(m_heap_top - m_page));
mach_write_to_2(PAGE_HEADER + PAGE_N_HEAP + m_page,
(PAGE_HEAP_NO_USER_LOW + m_rec_no)
| ulint(m_is_comp) << 15);
mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no);
mach_write_to_2(PAGE_HEADER + PAGE_LAST_INSERT + m_page,
ulint(m_cur_rec - m_page));
mach_write_to_2(PAGE_HEADER + PAGE_DIRECTION + m_page,
PAGE_RIGHT);
mach_write_to_2(PAGE_HEADER + PAGE_N_DIRECTION + m_page, 0);
}
m_block->skip_flush_check = false;
}
......@@ -470,20 +523,30 @@ PageBulk::copyOut(
/** Set next page
@param[in] next_page_no next page no */
void
PageBulk::setNext(
ulint next_page_no)
inline void PageBulk::setNext(ulint next_page_no)
{
btr_page_set_next(m_page, NULL, next_page_no, &m_mtr);
if (UNIV_LIKELY_NULL(m_page_zip)) {
/* For ROW_FORMAT=COMPRESSED, redo log may be written
in PageBulk::compress(). */
mach_write_to_4(m_page + FIL_PAGE_NEXT, next_page_no);
} else {
mlog_write_ulint(m_page + FIL_PAGE_NEXT, next_page_no,
MLOG_4BYTES, &m_mtr);
}
}
/** Set previous page
@param[in] prev_page_no previous page no */
void
PageBulk::setPrev(
ulint prev_page_no)
inline void PageBulk::setPrev(ulint prev_page_no)
{
btr_page_set_prev(m_page, NULL, prev_page_no, &m_mtr);
if (UNIV_LIKELY_NULL(m_page_zip)) {
/* For ROW_FORMAT=COMPRESSED, redo log may be written
in PageBulk::compress(). */
mach_write_to_4(m_page + FIL_PAGE_PREV, prev_page_no);
} else {
mlog_write_ulint(m_page + FIL_PAGE_PREV, prev_page_no,
MLOG_4BYTES, &m_mtr);
}
}
/** Check if required space is available in the page for the rec to be inserted.
......@@ -591,8 +654,12 @@ PageBulk::latch()
{
m_mtr.start();
mtr_x_lock(&m_index->lock, &m_mtr);
m_mtr.set_log_mode(MTR_LOG_NO_REDO);
m_mtr.set_flush_observer(m_flush_observer);
if (m_flush_observer) {
m_mtr.set_log_mode(MTR_LOG_NO_REDO);
m_mtr.set_flush_observer(m_flush_observer);
} else {
m_mtr.set_named_space(m_index->space);
}
/* In case the block is S-latched by page_cleaner. */
if (!buf_page_optimistic_get(RW_X_LATCH, m_block, m_modify_clock,
......@@ -635,7 +702,7 @@ BtrBulk::pageSplit(
}
/* 2. create a new page. */
PageBulk new_page_bulk(m_index, m_trx_id, FIL_NULL,
PageBulk new_page_bulk(m_index, m_trx->id, FIL_NULL,
page_bulk->getLevel(), m_flush_observer);
dberr_t err = new_page_bulk.init();
if (err != DB_SUCCESS) {
......@@ -714,8 +781,7 @@ BtrBulk::pageCommit(
}
/** Log free check */
void
BtrBulk::logFreeCheck()
inline void BtrBulk::logFreeCheck()
{
if (log_sys->check_flush_or_checkpoint) {
release();
......@@ -766,7 +832,7 @@ BtrBulk::insert(
/* Check if we need to create a PageBulk for the level. */
if (level + 1 > m_page_bulks.size()) {
PageBulk* new_page_bulk
= UT_NEW_NOKEY(PageBulk(m_index, m_trx_id, FIL_NULL,
= UT_NEW_NOKEY(PageBulk(m_index, m_trx->id, FIL_NULL,
level, m_flush_observer));
err = new_page_bulk->init();
if (err != DB_SUCCESS) {
......@@ -819,7 +885,7 @@ BtrBulk::insert(
if (!page_bulk->isSpaceAvailable(rec_size)) {
/* Create a sibling page_bulk. */
PageBulk* sibling_page_bulk;
sibling_page_bulk = UT_NEW_NOKEY(PageBulk(m_index, m_trx_id,
sibling_page_bulk = UT_NEW_NOKEY(PageBulk(m_index, m_trx->id,
FIL_NULL, level,
m_flush_observer));
err = sibling_page_bulk->init();
......@@ -845,8 +911,11 @@ BtrBulk::insert(
/* Important: log_free_check whether we need a checkpoint. */
if (page_is_leaf(sibling_page_bulk->getPage())) {
/* Check whether trx is interrupted */
if (m_flush_observer->check_interrupted()) {
if (trx_is_interrupted(m_trx)) {
if (m_flush_observer) {
m_flush_observer->interrupted();
}
err = DB_INTERRUPTED;
goto func_exit;
}
......@@ -944,7 +1013,7 @@ BtrBulk::finish(dberr_t err)
last_page_no);
page_size_t page_size(dict_table_page_size(m_index->table));
ulint root_page_no = dict_index_get_page(m_index);
PageBulk root_page_bulk(m_index, m_trx_id,
PageBulk root_page_bulk(m_index, m_trx->id,
root_page_no, m_root_level,
m_flush_observer);
......
......@@ -3735,18 +3735,12 @@ FlushObserver::~FlushObserver()
DBUG_LOG("flush", "~FlushObserver(): trx->id=" << m_trx->id);
}
/** Check whether trx is interrupted
@return true if trx is interrupted */
bool
FlushObserver::check_interrupted()
/** Check whether the operation has been interrupted */
void FlushObserver::check_interrupted()
{
if (trx_is_interrupted(m_trx)) {
interrupted();
return(true);
}
return(false);
}
/** Notify observer of a flush
......
......@@ -20359,6 +20359,13 @@ static MYSQL_SYSVAR_BOOL(log_compressed_pages, page_zip_log_pages,
" compression algorithm doesn't change.",
NULL, NULL, TRUE);
static MYSQL_SYSVAR_BOOL(log_optimize_ddl, innodb_log_optimize_ddl,
PLUGIN_VAR_OPCMDARG,
"Reduce redo logging when natively creating indexes or rebuilding tables."
" Setting this OFF avoids delay due to page flushing and"
" allows concurrent backup.",
NULL, NULL, TRUE);
static MYSQL_SYSVAR_ULONG(autoextend_increment,
sys_tablespace_auto_extend_increment,
PLUGIN_VAR_RQCMDARG,
......@@ -21278,6 +21285,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(log_write_ahead_size),
MYSQL_SYSVAR(log_group_home_dir),
MYSQL_SYSVAR(log_compressed_pages),
MYSQL_SYSVAR(log_optimize_ddl),
MYSQL_SYSVAR(max_dirty_pages_pct),
MYSQL_SYSVAR(max_dirty_pages_pct_lwm),
MYSQL_SYSVAR(adaptive_flushing_lwm),
......
......@@ -34,6 +34,8 @@ Created 03/11/2014 Shaohua Wang
/** Innodb B-tree index fill factor for bulk load. */
extern long innobase_fill_factor;
/** whether to reduce redo logging during ALTER TABLE */
extern my_bool innodb_log_optimize_ddl;
/*
The proper function call sequence of PageBulk is as below:
......@@ -146,11 +148,11 @@ class PageBulk
/** Set next page
@param[in] next_page_no next page no */
void setNext(ulint next_page_no);
inline void setNext(ulint next_page_no);
/** Set previous page
@param[in] prev_page_no previous page no */
void setPrev(ulint prev_page_no);
inline void setPrev(ulint prev_page_no);
/** Release block by commiting mtr */
inline void release();
......@@ -257,7 +259,7 @@ class PageBulk
when the block is re-pinned */
ib_uint64_t m_modify_clock;
/** Flush observer */
/** Flush observer, or NULL if redo logging is enabled */
FlushObserver* m_flush_observer;
/** Operation result DB_SUCCESS or error code */
......@@ -272,19 +274,19 @@ class BtrBulk
public:
/** Constructor
@param[in] index B-tree index
@param[in] trx_id transaction id
@param[in] trx transaction
@param[in] observer flush observer */
BtrBulk(
dict_index_t* index,
trx_id_t trx_id,
const trx_t* trx,
FlushObserver* observer)
:
m_index(index),
m_trx_id(trx_id),
m_trx(trx),
m_flush_observer(observer)
{
ut_ad(m_flush_observer != NULL);
#ifdef UNIV_DEBUG
if (m_flush_observer)
fil_space_inc_redo_skipped_count(m_index->space);
#endif /* UNIV_DEBUG */
}
......@@ -293,6 +295,7 @@ class BtrBulk
~BtrBulk()
{
#ifdef UNIV_DEBUG
if (m_flush_observer)
fil_space_dec_redo_skipped_count(m_index->space);
#endif /* UNIV_DEBUG */
}
......@@ -354,20 +357,20 @@ class BtrBulk
}
/** Log free check */
void logFreeCheck();
inline void logFreeCheck();
private:
/** B-tree index */
dict_index_t* m_index;
dict_index_t*const m_index;
/** Transaction id */
trx_id_t m_trx_id;
/** Transaction */
const trx_t*const m_trx;
/** Root page level */
ulint m_root_level;
/** Flush observer */
FlushObserver* m_flush_observer;
/** Flush observer, or NULL if redo logging is enabled */
FlushObserver*const m_flush_observer;
/** Page cursor vector for all level */
page_bulk_vector m_page_bulks;
......
......@@ -371,9 +371,8 @@ class FlushObserver {
m_interrupted = true;
}
/** Check whether trx is interrupted
@return true if trx is interrupted */
bool check_interrupted();
/** Check whether the operation has been interrupted */
void check_interrupted();
/** Flush dirty pages. */
void flush();
......@@ -395,7 +394,7 @@ class FlushObserver {
const ulint m_space_id;
/** Trx instance */
trx_t* const m_trx;
const trx_t* const m_trx;
/** Performance schema accounting object, used by ALTER TABLE.
If not NULL, then stage->begin_phase_flush() will be called initially,
......
/*****************************************************************************
Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -330,6 +331,20 @@ page_cur_open_on_rnd_user_rec(
/*==========================*/
buf_block_t* block, /*!< in: page */
page_cur_t* cursor);/*!< out: page cursor */
/** Write a redo log record of inserting a record into an index page.
@param[in] insert_rec inserted record
@param[in] rec_size rec_get_size(insert_rec)
@param[in] cursor_rec predecessor of insert_rec
@param[in,out] index index tree
@param[in,out] mtr mini-transaction */
void
page_cur_insert_rec_write_log(
const rec_t* insert_rec,
ulint rec_size,
const rec_t* cursor_rec,
dict_index_t* index,
mtr_t* mtr)
MY_ATTRIBUTE((nonnull));
/***********************************************************//**
Parses a log record of a record insert on a page.
@return end of log record or NULL */
......
......@@ -2,6 +2,7 @@
Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2012, Facebook Inc.
Copyright (c) 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -834,18 +835,19 @@ page_cur_open_on_rnd_user_rec(
} while (rnd--);
}
/***********************************************************//**
Writes the log record of a record insert on a page. */
static
/** Write a redo log record of inserting a record into an index page.
@param[in] insert_rec inserted record
@param[in] rec_size rec_get_size(insert_rec)
@param[in] cursor_rec predecessor of insert_rec
@param[in,out] index index tree
@param[in,out] mtr mini-transaction */
void
page_cur_insert_rec_write_log(
/*==========================*/
rec_t* insert_rec, /*!< in: inserted physical record */
ulint rec_size, /*!< in: insert_rec size */
rec_t* cursor_rec, /*!< in: record the
cursor is pointing to */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mini-transaction handle */
const rec_t* insert_rec,
ulint rec_size,
const rec_t* cursor_rec,
dict_index_t* index,
mtr_t* mtr)
{
ulint cur_rec_size;
ulint extra_size;
......
......@@ -1684,11 +1684,10 @@ row_fts_merge_insert(
dict_table_close(aux_table, FALSE, FALSE);
aux_index = dict_table_get_first_index(aux_table);
FlushObserver* observer;
observer = psort_info[0].psort_common->trx->flush_observer;
/* Create bulk load instance */
ins_ctx.btr_bulk = UT_NEW_NOKEY(BtrBulk(aux_index, trx->id, observer));
ins_ctx.btr_bulk = UT_NEW_NOKEY(
BtrBulk(aux_index, trx, psort_info[0].psort_common->trx
->flush_observer));
/* Create tuple for insert */
ins_ctx.tuple = dtuple_create(heap, dict_index_get_n_fields(aux_index));
......
......@@ -274,7 +274,6 @@ class index_tuple_info_t {
#define FTS_PENDING_DOC_MEMORY_LIMIT 1000000
/** Insert sorted data tuples to the index.
@param[in] trx_id transaction identifier
@param[in] index index to be inserted
@param[in] old_table old table
@param[in] fd file descriptor
......@@ -289,7 +288,6 @@ and then stage->inc() will be called for each record that is processed.
static MY_ATTRIBUTE((warn_unused_result))
dberr_t
row_merge_insert_index_tuples(
trx_id_t trx_id,
dict_index_t* index,
const dict_table_t* old_table,
int fd,
......@@ -2402,14 +2400,14 @@ row_merge_read_clustered_index(
if (clust_btr_bulk == NULL) {
clust_btr_bulk = UT_NEW_NOKEY(
BtrBulk(index[i],
trx->id,
observer));
trx,
observer/**/));
} else {
clust_btr_bulk->latch();
}
err = row_merge_insert_index_tuples(
trx->id, index[i], old_table,
index[i], old_table,
-1, NULL, buf, clust_btr_bulk,
table_total_rows,
curr_progress,
......@@ -2515,11 +2513,11 @@ row_merge_read_clustered_index(
trx->error_key_num = i;
goto all_done;);
BtrBulk btr_bulk(index[i], trx->id,
BtrBulk btr_bulk(index[i], trx,
observer);
err = row_merge_insert_index_tuples(
trx->id, index[i], old_table,
index[i], old_table,
-1, NULL, buf, &btr_bulk,
table_total_rows,
curr_progress,
......@@ -3375,7 +3373,6 @@ row_merge_mtuple_to_dtuple(
}
/** Insert sorted data tuples to the index.
@param[in] trx_id transaction identifier
@param[in] index index to be inserted
@param[in] old_table old table
@param[in] fd file descriptor
......@@ -3390,7 +3387,6 @@ and then stage->inc() will be called for each record that is processed.
static MY_ATTRIBUTE((warn_unused_result))
dberr_t
row_merge_insert_index_tuples(
trx_id_t trx_id,
dict_index_t* index,
const dict_table_t* old_table,
int fd,
......@@ -3428,7 +3424,6 @@ row_merge_insert_index_tuples(
ut_ad(!srv_read_only_mode);
ut_ad(!(index->type & DICT_FTS));
ut_ad(!dict_index_is_spatial(index));
ut_ad(trx_id);
if (stage != NULL) {
stage->begin_phase_insert();
......@@ -4632,11 +4627,15 @@ row_merge_build_indexes(
we use bulk load to create all types of indexes except spatial index,
for which redo logging is enabled. If we create only spatial indexes,
we don't need to flush dirty pages at all. */
bool need_flush_observer = (old_table != new_table);
bool need_flush_observer = bool(innodb_log_optimize_ddl);
for (i = 0; i < n_indexes; i++) {
if (!dict_index_is_spatial(indexes[i])) {
need_flush_observer = true;
if (need_flush_observer) {
need_flush_observer = old_table != new_table;
for (i = 0; i < n_indexes; i++) {
if (!dict_index_is_spatial(indexes[i])) {
need_flush_observer = true;
}
}
}
......@@ -4883,7 +4882,7 @@ row_merge_build_indexes(
os_thread_sleep(20000000);); /* 20 sec */
if (error == DB_SUCCESS) {
BtrBulk btr_bulk(sort_idx, trx->id,
BtrBulk btr_bulk(sort_idx, trx,
flush_observer);
pct_cost = (COST_BUILD_INDEX_STATIC +
......@@ -4903,7 +4902,7 @@ row_merge_build_indexes(
}
error = row_merge_insert_index_tuples(
trx->id, sort_idx, old_table,
sort_idx, old_table,
merge_files[i].fd, block, NULL,
&btr_bulk,
merge_files[i].n_rec, pct_progress, pct_cost,
......@@ -4936,14 +4935,16 @@ row_merge_build_indexes(
ut_ad(sort_idx->online_status
== ONLINE_INDEX_COMPLETE);
} else {
ut_ad(need_flush_observer);
if (flush_observer) {
flush_observer->flush();
row_merge_write_redo(indexes[i]);
}
if (global_system_variables.log_warnings > 2) {
sql_print_information(
"InnoDB: Online DDL : Applying"
" log to index");
}
flush_observer->flush();
row_merge_write_redo(indexes[i]);
DEBUG_SYNC_C("row_log_apply_before");
error = row_log_apply(trx, sort_idx, table, stage);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment