Commit d2e649ae authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-29440 InnoDB instant ALTER TABLE recovery must use READ UNCOMMITTED

In commit 8f8ba758 (MDEV-27234)
the data dictionary recovery was changed to use READ COMMITTED
so that table-rebuild operations (OPTIMIZE TABLE, TRUNCATE TABLE,
some forms of ALTER TABLE) would be recovered correctly.

However, for operations that avoid a table rebuild thanks to
being able to instantly ADD, DROP or reorder columns, recovery
must use the READ UNCOMMITTED isolation level so that changes to
the hidden metadata record can be rolled back.

We will detect instant operations by detecting uncommitted changes
to SYS_COLUMNS in case there is no uncommitted change of SYS_TABLES.ID
for the table. In any table-rebuilding DDL operation, the SYS_TABLES.ID
(and likely also the table name) will be updated.

As part of rolling back the instant ALTER TABLE operation, after the
operation on the hidden metadata record has been rolled back, a rollback
of an INSERT into SYS_COLUMNS in row_undo_ins_remove_clust_rec() will
invoke trx_t::evict_table() to discard the READ UNCOMMITTED definition
of the table. After that, subsequent recovery steps will load and use
the correct table definition.

Reviewed by: Thirunarayanan Balathandayuthapani
Tested by: Matthias Leich
parent 0fa4dd07
......@@ -176,5 +176,25 @@ t3 CREATE TABLE `t3` (
PRIMARY KEY (`id`),
UNIQUE KEY `v2` (`v2`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
DROP TABLE t1,t2,t3;
DROP TABLE t2,t3;
#
# MDEV-29440 InnoDB instant ALTER TABLE recovery wrongly uses
# READ COMMITTED isolation level instead of READ UNCOMMITTED
#
CREATE TABLE t2(a INT UNSIGNED PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1),(2),(3),(4),(5),(6);
connect ddl, localhost, root;
SET DEBUG_SYNC='innodb_alter_inplace_before_commit SIGNAL ddl WAIT_FOR ever';
ALTER TABLE t2 ADD COLUMN b TINYINT UNSIGNED NOT NULL DEFAULT 42 FIRST;
connection default;
SET DEBUG_SYNC='now WAIT_FOR ddl';
SET GLOBAL innodb_flush_log_at_trx_commit=1;
DELETE FROM t1;
# Kill the server
disconnect ddl;
# restart
CHECK TABLE t2;
Table Op Msg_type Msg_text
test.t2 check status OK
DROP TABLE t1,t2;
db.opt
......@@ -198,6 +198,30 @@ disconnect ddl;
SHOW CREATE TABLE t1;
SHOW CREATE TABLE t2;
SHOW CREATE TABLE t3;
DROP TABLE t1,t2,t3;
DROP TABLE t2,t3;
--echo #
--echo # MDEV-29440 InnoDB instant ALTER TABLE recovery wrongly uses
--echo # READ COMMITTED isolation level instead of READ UNCOMMITTED
--echo #
CREATE TABLE t2(a INT UNSIGNED PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1),(2),(3),(4),(5),(6);
connect ddl, localhost, root;
SET DEBUG_SYNC='innodb_alter_inplace_before_commit SIGNAL ddl WAIT_FOR ever';
--send
ALTER TABLE t2 ADD COLUMN b TINYINT UNSIGNED NOT NULL DEFAULT 42 FIRST;
connection default;
SET DEBUG_SYNC='now WAIT_FOR ddl';
SET GLOBAL innodb_flush_log_at_trx_commit=1;
DELETE FROM t1;
--source include/kill_mysqld.inc
disconnect ddl;
--source include/start_mysqld.inc
CHECK TABLE t2;
DROP TABLE t1,t2;
--list_files $MYSQLD_DATADIR/test
......@@ -74,6 +74,8 @@ dict_load_index_low(
byte* table_id, /*!< in/out: table id (8 bytes),
an "in" value if mtr
and "out" when !mtr */
bool uncommitted, /*!< in: false=READ COMMITTED,
true=READ UNCOMMITTED */
mem_heap_t* heap, /*!< in/out: temporary memory heap */
const rec_t* rec, /*!< in: SYS_INDEXES record */
mtr_t* mtr, /*!< in/out: mini-transaction,
......@@ -83,30 +85,30 @@ dict_load_index_low(
dict_index_t** index); /*!< out,own: index, or NULL */
/** Load a table column definition from a SYS_COLUMNS record to dict_table_t.
@param table table, or nullptr if the output will be in column
@param use_uncommitted 0=READ COMMITTED, 1=detect, 2=READ UNCOMMITTED
@param heap memory heap for temporary storage
@param column pointer to output buffer, or nullptr if table!=nullptr
@param table_id table identifier
@param col_name column name
@param rec SYS_COLUMNS record
@param mtr mini-transaction
@param nth_v_col nullptr, or pointer to a counter of virtual columns
@return error message
@retval NULL on success */
static
const char*
dict_load_column_low(
dict_table_t* table, /*!< in/out: table, could be NULL
if we just populate a dict_column_t
struct with information from
a SYS_COLUMNS record */
mem_heap_t* heap, /*!< in/out: memory heap
for temporary storage */
dict_col_t* column, /*!< out: dict_column_t to fill,
or NULL if table != NULL */
table_id_t* table_id, /*!< out: table id */
const char** col_name, /*!< out: column name */
const rec_t* rec, /*!< in: SYS_COLUMNS record */
mtr_t* mtr, /*!< in/out: mini-transaction */
ulint* nth_v_col); /*!< out: if not NULL, this
records the "n" of "nth" virtual
column */
@retval nullptr on success */
static const char *dict_load_column_low(dict_table_t *table,
unsigned use_uncommitted,
mem_heap_t *heap, dict_col_t *column,
table_id_t *table_id,
const char **col_name,
const rec_t *rec,
mtr_t *mtr,
ulint *nth_v_col);
/** Load a virtual column "mapping" (to base columns) information
from a SYS_VIRTUAL record
@param[in,out] table table
@param[in] uncommitted false=READ COMMITTED, true=READ UNCOMMITTED
@param[in,out] column mapped base column's dict_column_t
@param[in,out] table_id table id
@param[in,out] pos virtual column position
......@@ -118,6 +120,7 @@ static
const char*
dict_load_virtual_low(
dict_table_t* table,
bool uncommitted,
dict_col_t** column,
table_id_t* table_id,
ulint* pos,
......@@ -133,6 +136,8 @@ dict_load_field_low(
byte* index_id, /*!< in/out: index id (8 bytes)
an "in" value if index != NULL
and "out" if index == NULL */
bool uncommitted, /*!< in: false=READ COMMITTED,
true=READ UNCOMMITTED */
dict_index_t* index, /*!< in/out: index, could be NULL
if we just populate a dict_field_t
struct with information from
......@@ -251,18 +256,16 @@ dict_process_sys_indexes_rec(
dict_index_t* index, /*!< out: index to be filled */
table_id_t* table_id) /*!< out: index table id */
{
const char* err_msg;
byte buf[8];
ut_d(index->is_dummy = true);
ut_d(index->in_instant_init = false);
/* Parse the record, and get "dict_index_t" struct filled */
err_msg = dict_load_index_low(buf, heap, rec, nullptr, nullptr, &index);
*table_id = mach_read_from_8(buf);
return(err_msg);
const char *err_msg= dict_load_index_low(buf, false, heap, rec,
nullptr, nullptr, &index);
*table_id= mach_read_from_8(buf);
return err_msg;
}
/********************************************************************//**
......@@ -283,7 +286,7 @@ dict_process_sys_columns_rec(
const char* err_msg;
/* Parse the record, and get "dict_col_t" struct filled */
err_msg = dict_load_column_low(NULL, heap, column,
err_msg = dict_load_column_low(NULL, 0, heap, column,
table_id, col_name, rec, nullptr,
nth_v_col);
......@@ -304,7 +307,8 @@ dict_process_sys_virtual_rec(
ulint* pos,
ulint* base_pos)
{
return dict_load_virtual_low(nullptr, nullptr, table_id, pos, base_pos, rec);
return dict_load_virtual_low(nullptr, false, nullptr, table_id,
pos, base_pos, rec);
}
/********************************************************************//**
......@@ -328,7 +332,7 @@ dict_process_sys_fields_rec(
mach_write_to_8(last_index_id, last_id);
err_msg = dict_load_field_low(buf, NULL, sys_field,
err_msg = dict_load_field_low(buf, false, nullptr, sys_field,
pos, last_index_id, heap, nullptr, rec);
*index_id = mach_read_from_8(buf);
......@@ -632,6 +636,7 @@ enum table_read_status { READ_OK= 0, READ_ERROR, READ_NOT_FOUND };
/** Read and return 5 integer fields from a SYS_TABLES record.
@param[in] rec A record of SYS_TABLES
@param[in] uncommitted true=use READ UNCOMMITTED, false=READ COMMITTED
@param[in] mtr mini-transaction
@param[out] table_id Pointer to the table_id for this table
@param[out] space_id Pointer to the space_id for this table
......@@ -646,6 +651,7 @@ static
table_read_status
dict_sys_tables_rec_read(
const rec_t* rec,
bool uncommitted,
mtr_t* mtr,
table_id_t* table_id,
ulint* space_id,
......@@ -663,7 +669,7 @@ dict_sys_tables_rec_read(
rec, DICT_FLD__SYS_TABLES__DB_TRX_ID, &len);
ut_ad(len == 6 || len == UNIV_SQL_NULL);
trx_id_t id = len == 6 ? trx_read_trx_id(field) : 0;
if (id && trx_sys.find(nullptr, id, false)) {
if (id && !uncommitted && trx_sys.find(nullptr, id, false)) {
const auto savepoint = mtr->get_savepoint();
heap = mem_heap_create(1024);
dict_index_t* index = UT_LIST_GET_FIRST(
......@@ -906,7 +912,8 @@ void dict_check_tablespaces_and_store_max_id()
DBUG_PRINT("dict_check_sys_tables",
("name: %*.s", static_cast<int>(len), field));
if (dict_sys_tables_rec_read(rec, &mtr, &table_id, &space_id,
if (dict_sys_tables_rec_read(rec, false,
&mtr, &table_id, &space_id,
&n_cols, &flags, &flags2, nullptr)
!= READ_OK
|| space_id == TRX_SYS_SPACE) {
......@@ -976,29 +983,31 @@ void dict_check_tablespaces_and_store_max_id()
/** Error message for a delete-marked record in dict_load_column_low() */
static const char *dict_load_column_del= "delete-marked record in SYS_COLUMNS";
/** Error message for a missing record in dict_load_column_low() */
static const char *dict_load_column_none= "SYS_COLUMNS record not found";
/** Message for incomplete instant ADD/DROP in dict_load_column_low() */
static const char *dict_load_column_instant= "incomplete instant ADD/DROP";
/** Load a table column definition from a SYS_COLUMNS record to dict_table_t.
@param table table, or nullptr if the output will be in column
@param use_uncommitted 0=READ COMMITTED, 1=detect, 2=READ UNCOMMITTED
@param heap memory heap for temporary storage
@param column pointer to output buffer, or nullptr if table!=nullptr
@param table_id table identifier
@param col_name column name
@param rec SYS_COLUMNS record
@param mtr mini-transaction
@param nth_v_col nullptr, or pointer to a counter of virtual columns
@return error message
@retval NULL on success */
static
const char*
dict_load_column_low(
dict_table_t* table, /*!< in/out: table, could be NULL
if we just populate a dict_column_t
struct with information from
a SYS_COLUMNS record */
mem_heap_t* heap, /*!< in/out: memory heap
for temporary storage */
dict_col_t* column, /*!< out: dict_column_t to fill,
or NULL if table != NULL */
table_id_t* table_id, /*!< out: table id */
const char** col_name, /*!< out: column name */
const rec_t* rec, /*!< in: SYS_COLUMNS record */
mtr_t* mtr, /*!< in/out: mini-transaction */
ulint* nth_v_col) /*!< out: if not NULL, this
records the "n" of "nth" virtual
column */
@retval nullptr on success */
static const char *dict_load_column_low(dict_table_t *table,
unsigned use_uncommitted,
mem_heap_t *heap, dict_col_t *column,
table_id_t *table_id,
const char **col_name,
const rec_t *rec,
mtr_t *mtr,
ulint *nth_v_col)
{
char* name;
const byte* field;
......@@ -1044,7 +1053,11 @@ dict_load_column_low(
const trx_id_t trx_id = trx_read_trx_id(field);
if (trx_id && mtr && trx_sys.find(nullptr, trx_id, false)) {
if (trx_id && mtr && use_uncommitted < 2
&& trx_sys.find(nullptr, trx_id, false)) {
if (use_uncommitted) {
return dict_load_column_instant;
}
const auto savepoint = mtr->get_savepoint();
dict_index_t* index = UT_LIST_GET_FIRST(
dict_sys.sys_columns->indexes);
......@@ -1173,6 +1186,7 @@ static const char *dict_load_virtual_none= "SYS_VIRTUAL record not found";
/** Load a virtual column "mapping" (to base columns) information
from a SYS_VIRTUAL record
@param[in,out] table table
@param[in] uncommitted false=READ COMMITTED, true=READ UNCOMMITTED
@param[in,out] column mapped base column's dict_column_t
@param[in,out] table_id table id
@param[in,out] pos virtual column position
......@@ -1184,6 +1198,7 @@ static
const char*
dict_load_virtual_low(
dict_table_t* table,
bool uncommitted,
dict_col_t** column,
table_id_t* table_id,
ulint* pos,
......@@ -1247,7 +1262,8 @@ dict_load_virtual_low(
const trx_id_t trx_id = trx_read_trx_id(field);
if (trx_id && column && trx_sys.find(nullptr, trx_id, false)) {
if (trx_id && column && !uncommitted
&& trx_sys.find(nullptr, trx_id, false)) {
if (!rec_get_deleted_flag(rec, 0)) {
return dict_load_virtual_none;
}
......@@ -1263,16 +1279,17 @@ dict_load_virtual_low(
return(NULL);
}
/********************************************************************//**
Loads definitions for table columns. */
/** Load the definitions for table columns.
@param table table
@param use_uncommitted 0=READ COMMITTED, 1=detect, 2=READ UNCOMMITTED
@param heap memory heap for temporary storage
@return error code
@retval DB_SUCCESS on success
@retval DB_SUCCESS_LOCKED_REC on success if use_uncommitted=1
and instant ADD/DROP/reorder was detected */
MY_ATTRIBUTE((nonnull, warn_unused_result))
static
dberr_t
dict_load_columns(
/*==============*/
dict_table_t* table, /*!< in/out: table */
mem_heap_t* heap) /*!< in/out: memory heap
for temporary storage */
static dberr_t dict_load_columns(dict_table_t *table, unsigned use_uncommitted,
mem_heap_t *heap)
{
btr_pcur_t pcur;
mtr_t mtr;
......@@ -1320,7 +1337,8 @@ dict_load_columns(
const rec_t* rec = btr_pcur_get_rec(&pcur);
err_msg = btr_pcur_is_on_user_rec(&pcur)
? dict_load_column_low(table, heap, NULL, NULL,
? dict_load_column_low(table, use_uncommitted,
heap, NULL, NULL,
&name, rec, &mtr, &nth_v_col)
: dict_load_column_none;
......@@ -1328,6 +1346,9 @@ dict_load_columns(
} else if (err_msg == dict_load_column_del) {
n_skipped++;
goto next_rec;
} else if (err_msg == dict_load_column_instant) {
err = DB_SUCCESS_LOCKED_REC;
goto func_exit;
} else if (err_msg == dict_load_column_none
&& strstr(table->name.m_name,
"/" TEMP_FILE_PREFIX_INNODB)) {
......@@ -1384,13 +1405,13 @@ dict_load_columns(
}
/** Loads SYS_VIRTUAL info for one virtual column
@param[in,out] table table
@param[in] nth_v_col virtual column sequence num
*/
@param table table definition
@param uncommitted false=READ COMMITTED, true=READ UNCOMMITTED
@param nth_v_col virtual column position */
MY_ATTRIBUTE((nonnull, warn_unused_result))
static
dberr_t
dict_load_virtual_col(dict_table_t *table, ulint nth_v_col)
dict_load_virtual_col(dict_table_t *table, bool uncommitted, ulint nth_v_col)
{
const dict_v_col_t* v_col = dict_table_get_nth_v_col(table, nth_v_col);
......@@ -1440,7 +1461,7 @@ dict_load_virtual_col(dict_table_t *table, ulint nth_v_col)
ulint pos;
const char* err_msg
= btr_pcur_is_on_user_rec(&pcur)
? dict_load_virtual_low(table,
? dict_load_virtual_low(table, uncommitted,
&v_col->base_col[i - skipped],
NULL,
&pos, NULL,
......@@ -1470,12 +1491,13 @@ dict_load_virtual_col(dict_table_t *table, ulint nth_v_col)
}
/** Loads info from SYS_VIRTUAL for virtual columns.
@param[in,out] table table */
@param table table definition
@param uncommitted false=READ COMMITTED, true=READ UNCOMMITTED */
MY_ATTRIBUTE((nonnull, warn_unused_result))
static dberr_t dict_load_virtual(dict_table_t *table)
static dberr_t dict_load_virtual(dict_table_t *table, bool uncommitted)
{
for (ulint i= 0; i < table->n_v_cols; i++)
if (dberr_t err= dict_load_virtual_col(table, i))
if (dberr_t err= dict_load_virtual_col(table, uncommitted, i))
return err;
return DB_SUCCESS;
}
......@@ -1494,6 +1516,8 @@ dict_load_field_low(
byte* index_id, /*!< in/out: index id (8 bytes)
an "in" value if index != NULL
and "out" if index == NULL */
bool uncommitted, /*!< in: false=READ COMMITTED,
true=READ UNCOMMITTED */
dict_index_t* index, /*!< in/out: index, could be NULL
if we just populate a dict_field_t
struct with information from
......@@ -1585,7 +1609,8 @@ dict_load_field_low(
if (!trx_id) {
ut_ad(!rec_get_deleted_flag(rec, 0));
} else if (mtr && trx_sys.find(nullptr, trx_id, false)) {
} else if (!mtr || uncommitted) {
} else if (trx_sys.find(nullptr, trx_id, false)) {
const auto savepoint = mtr->get_savepoint();
dict_index_t* sys_field = UT_LIST_GET_FIRST(
dict_sys.sys_fields->indexes);
......@@ -1626,15 +1651,15 @@ dict_load_field_low(
return(NULL);
}
/********************************************************************//**
Loads definitions for index fields.
@return DB_SUCCESS if ok, DB_CORRUPTION if corruption */
static
ulint
dict_load_fields(
/*=============*/
dict_index_t* index, /*!< in/out: index whose fields to load */
mem_heap_t* heap) /*!< in: memory heap for temporary storage */
/**
Load definitions for index fields.
@param index index whose fields are to be loaded
@param uncommitted false=READ COMMITTED, true=READ UNCOMMITTED
@param heap memory heap for temporary storage
@return error code
@return DB_SUCCESS if the fields were loaded successfully */
static dberr_t dict_load_fields(dict_index_t *index, bool uncommitted,
mem_heap_t *heap)
{
btr_pcur_t pcur;
mtr_t mtr;
......@@ -1669,8 +1694,8 @@ dict_load_fields(
for (ulint i = 0; i < index->n_fields; i++) {
const char *err_msg = btr_pcur_is_on_user_rec(&pcur)
? dict_load_field_low(index_id, index,
NULL, NULL, NULL,
? dict_load_field_low(index_id, uncommitted, index,
nullptr, nullptr, nullptr,
heap, &mtr,
btr_pcur_get_rec(&pcur))
: dict_load_field_none;
......@@ -1717,6 +1742,8 @@ dict_load_index_low(
byte* table_id, /*!< in/out: table id (8 bytes),
an "in" value if mtr
and "out" when !mtr */
bool uncommitted, /*!< in: false=READ COMMITTED,
true=READ UNCOMMITTED */
mem_heap_t* heap, /*!< in/out: temporary memory heap */
const rec_t* rec, /*!< in: SYS_INDEXES record */
mtr_t* mtr, /*!< in/out: mini-transaction,
......@@ -1798,7 +1825,7 @@ dict_load_index_low(
const trx_id_t trx_id = trx_read_trx_id(field);
if (!trx_id) {
ut_ad(!rec_get_deleted_flag(rec, 0));
} else if (!mtr) {
} else if (!mtr || uncommitted) {
} else if (trx_sys.find(nullptr, trx_id, false)) {
const auto savepoint = mtr->get_savepoint();
dict_index_t* sys_index = UT_LIST_GET_FIRST(
......@@ -1876,20 +1903,18 @@ dict_load_index_low(
return(NULL);
}
/********************************************************************//**
Loads definitions for table indexes. Adds them to the data dictionary
cache.
@return DB_SUCCESS if ok, DB_CORRUPTION if corruption of dictionary
table or DB_UNSUPPORTED if table has unknown index type */
/** Load definitions for table indexes. Adds them to the data dictionary cache.
@param table table definition
@param uncommitted false=READ COMMITTED, true=READ UNCOMMITTED
@param heap memory heap for temporary storage
@param ignore_err errors to be ignored when loading the index definition
@return error code
@retval DB_SUCCESS if all indexes were successfully loaded
@retval DB_CORRUPTION if corruption of dictionary table
@retval DB_UNSUPPORTED if table has unknown index type */
static MY_ATTRIBUTE((nonnull))
dberr_t
dict_load_indexes(
/*==============*/
dict_table_t* table, /*!< in/out: table */
mem_heap_t* heap, /*!< in: memory heap for temporary storage */
dict_err_ignore_t ignore_err)
/*!< in: error to be ignored when
loading the index definition */
dberr_t dict_load_indexes(dict_table_t *table, bool uncommitted,
mem_heap_t *heap, dict_err_ignore_t ignore_err)
{
dict_index_t* sys_index;
btr_pcur_t pcur;
......@@ -1952,8 +1977,8 @@ dict_load_indexes(
}
}
err_msg = dict_load_index_low(table_id, heap, rec, &mtr, table,
&index);
err_msg = dict_load_index_low(table_id, uncommitted, heap, rec,
&mtr, table, &index);
ut_ad(!index == !!err_msg);
if (err_msg == dict_load_index_none) {
......@@ -2016,7 +2041,8 @@ dict_load_indexes(
} else if (index->page == FIL_NULL
&& table->is_readable()
&& (!(index->type & DICT_FTS))) {
if (ignore_err != DICT_ERR_IGNORE_DROP) {
if (!uncommitted
&& ignore_err != DICT_ERR_IGNORE_DROP) {
ib::error_or_warn(!(ignore_err
& DICT_ERR_IGNORE_INDEX))
<< "Index " << index->name
......@@ -2060,7 +2086,10 @@ dict_load_indexes(
of the database server */
dict_mem_index_free(index);
} else {
dict_load_fields(index, heap);
error = dict_load_fields(index, uncommitted, heap);
if (error != DB_SUCCESS) {
goto func_exit;
}
/* The data dictionary tables should never contain
invalid index definitions. If we ignored this error
......@@ -2123,11 +2152,12 @@ dict_load_indexes(
/** Load a table definition from a SYS_TABLES record to dict_table_t.
Do not load any columns or indexes.
@param[in,out] mtr mini-transaction
@param[in] uncommitted whether to use READ UNCOMMITTED isolation level
@param[in] rec SYS_TABLES record
@param[out,own] table table, or nullptr
@return error message
@retval nullptr on success */
const char *dict_load_table_low(mtr_t *mtr,
const char *dict_load_table_low(mtr_t *mtr, bool uncommitted,
const rec_t *rec, dict_table_t **table)
{
table_id_t table_id;
......@@ -2144,7 +2174,8 @@ const char *dict_load_table_low(mtr_t *mtr,
return(error_text);
}
if (auto r = dict_sys_tables_rec_read(rec, mtr, &table_id, &space_id,
if (auto r = dict_sys_tables_rec_read(rec, uncommitted, mtr,
&table_id, &space_id,
&t_num, &flags, &flags2,
&trx_id)) {
*table = NULL;
......@@ -2287,8 +2318,6 @@ static dict_table_t *dict_load_table_one(const span<const char> &name,
ut_ad(dict_sys.locked());
mtr.start();
dict_index_t *sys_index = dict_sys.sys_tables->indexes.start;
ut_ad(!dict_sys.sys_tables->not_redundant());
ut_ad(name_of_col_is(dict_sys.sys_tables, sys_index,
......@@ -2312,6 +2341,9 @@ static dict_table_t *dict_load_table_one(const span<const char> &name,
dfield_set_data(&dfield, name.data(), name.size());
dict_index_copy_types(&tuple, sys_index, 1);
bool uncommitted = false;
reload:
mtr.start();
dberr_t err = btr_pcur_open_on_user_rec(sys_index, &tuple, PAGE_CUR_GE,
BTR_SEARCH_LEAF, &pcur, &mtr);
......@@ -2331,7 +2363,8 @@ static dict_table_t *dict_load_table_one(const span<const char> &name,
}
dict_table_t* table;
if (const char* err_msg = dict_load_table_low(&mtr, rec, &table)) {
if (const char* err_msg =
dict_load_table_low(&mtr, uncommitted, rec, &table)) {
if (err_msg != dict_load_table_flags) {
ib::error() << err_msg;
}
......@@ -2341,15 +2374,32 @@ static dict_table_t *dict_load_table_one(const span<const char> &name,
goto err_exit;
}
const unsigned use_uncommitted = uncommitted
? 2
: table->id == mach_read_from_8(
rec + rec_get_field_start_offs(
rec, DICT_FLD__SYS_TABLES__ID));
mtr.commit();
mem_heap_t* heap = mem_heap_create(32000);
dict_load_tablespace(table, ignore_err);
if (dict_load_columns(table, heap) || dict_load_virtual(table)) {
evict:
dict_sys.remove(table);
switch (dict_load_columns(table, use_uncommitted, heap)) {
case DB_SUCCESS_LOCKED_REC:
ut_ad(!uncommitted);
uncommitted = true;
dict_mem_table_free(table);
mem_heap_free(heap);
goto reload;
case DB_SUCCESS:
if (!dict_load_virtual(table, uncommitted)) {
break;
}
/* fall through */
default:
dict_mem_table_free(table);
mem_heap_free(heap);
DBUG_RETURN(nullptr);
}
......@@ -2374,7 +2424,7 @@ static dict_table_t *dict_load_table_one(const span<const char> &name,
? DICT_ERR_IGNORE_ALL
: ignore_err;
err = dict_load_indexes(table, heap, index_load_err);
err = dict_load_indexes(table, uncommitted, heap, index_load_err);
if (err == DB_TABLE_CORRUPT) {
/* Refuse to load the table if the table has a corrupted
......@@ -2382,7 +2432,10 @@ static dict_table_t *dict_load_table_one(const span<const char> &name,
ut_ad(index_load_err != DICT_ERR_IGNORE_DROP);
ib::error() << "Refusing to load corrupted table "
<< table->name;
goto evict;
evict:
dict_sys.remove(table);
mem_heap_free(heap);
DBUG_RETURN(nullptr);
}
if (err != DB_SUCCESS || !table->is_readable()) {
......@@ -2437,14 +2490,12 @@ static dict_table_t *dict_load_table_one(const span<const char> &name,
changed when dict_load_foreigns() is called below */
table->fk_max_recusive_level = 0;
/* If the force recovery flag is set, we open the table irrespective
of the error condition, since the user may want to dump data from the
clustered index. However we load the foreign key information only if
/* We will load the foreign key information only if
all indexes were loaded. */
if (!table->is_readable()) {
/* Don't attempt to load the indexes from disk. */
} else if (err == DB_SUCCESS) {
err = dict_load_foreigns(table->name.m_name, nullptr,
err = dict_load_foreigns(table->name.m_name, nullptr, false,
0, true, ignore_err, fk_tables);
if (err != DB_SUCCESS) {
......@@ -2605,7 +2656,7 @@ dict_load_sys_table(
heap = mem_heap_create(1000);
dict_load_indexes(table, heap, DICT_ERR_IGNORE_NONE);
dict_load_indexes(table, false, heap, DICT_ERR_IGNORE_NONE);
mem_heap_free(heap);
}
......@@ -2775,6 +2826,8 @@ dberr_t
dict_load_foreign(
/*==============*/
const char* table_name, /*!< in: table name */
bool uncommitted, /*!< in: use READ UNCOMMITTED
transaction isolation level */
const char** col_names,
/*!< in: column names, or NULL
to use foreign->foreign_table->col_names */
......@@ -2862,7 +2915,8 @@ dict_load_foreign(
const trx_id_t tid = trx_read_trx_id(field);
if (tid && tid != trx_id && trx_sys.find(nullptr, tid, false)) {
if (tid && tid != trx_id && !uncommitted
&& trx_sys.find(nullptr, tid, false)) {
const auto savepoint = mtr.get_savepoint();
rec_offs* offsets = rec_get_offsets(
rec, sys_index, nullptr, true, ULINT_UNDEFINED, &heap);
......@@ -2991,6 +3045,8 @@ dict_load_foreigns(
const char* table_name, /*!< in: table name */
const char** col_names, /*!< in: column names, or NULL
to use table->col_names */
bool uncommitted, /*!< in: use READ UNCOMMITTED
transaction isolation level */
trx_id_t trx_id, /*!< in: DDL transaction id,
or 0 to check
recursive load of tables
......@@ -3105,7 +3161,7 @@ dict_load_foreigns(
/* Load the foreign constraint definition to the dictionary cache */
err = len < sizeof fk_id
? dict_load_foreign(table_name, col_names, trx_id,
? dict_load_foreign(table_name, uncommitted, col_names, trx_id,
check_recursive, check_charsets,
{fk_id, len}, ignore_err, fk_tables)
: DB_CORRUPTION;
......
......@@ -12839,7 +12839,7 @@ int create_table_info_t::create_table(bool create_fk)
if (err == DB_SUCCESS) {
/* Check that also referencing constraints are ok */
dict_names_t fk_tables;
err = dict_load_foreigns(m_table_name, nullptr,
err = dict_load_foreigns(m_table_name, nullptr, false,
m_trx->id, true,
DICT_ERR_IGNORE_NONE, fk_tables);
while (err == DB_SUCCESS && !fk_tables.empty()) {
......
......@@ -9881,7 +9881,7 @@ innobase_update_foreign_cache(
dict_names_t fk_tables;
err = dict_load_foreigns(user_table->name.m_name,
ctx->col_names, 1, true,
ctx->col_names, false, 1, true,
DICT_ERR_IGNORE_NONE,
fk_tables);
......@@ -9892,7 +9892,7 @@ innobase_update_foreign_cache(
loaded with "foreign_key checks" off,
so let's retry the loading with charset_check is off */
err = dict_load_foreigns(user_table->name.m_name,
ctx->col_names, 1, false,
ctx->col_names, false, 1, false,
DICT_ERR_IGNORE_NONE,
fk_tables);
......
......@@ -4772,7 +4772,7 @@ static const char *i_s_sys_tables_rec(const btr_pcur_t &pcur, mtr_t *mtr,
}
if (rec)
return dict_load_table_low(mtr, rec, table);
return dict_load_table_low(mtr, false, rec, table);
*table= dict_sys.load_table
(span<const char>{reinterpret_cast<const char*>(pcur.old_rec), len});
......
......@@ -89,6 +89,8 @@ dict_load_foreigns(
const char* table_name, /*!< in: table name */
const char** col_names, /*!< in: column names, or NULL
to use table->col_names */
bool uncommitted, /*!< in: use READ UNCOMMITTED
transaction isolation level */
trx_id_t trx_id, /*!< in: DDL transaction id,
or 0 to check
recursive load of tables
......@@ -125,11 +127,12 @@ dict_getnext_system(
/** Load a table definition from a SYS_TABLES record to dict_table_t.
Do not load any columns or indexes.
@param[in,out] mtr mini-transaction
@param[in] uncommitted whether to use READ UNCOMMITTED isolation level
@param[in] rec SYS_TABLES record
@param[out,own] table table, or nullptr
@return error message
@retval nullptr on success */
const char *dict_load_table_low(mtr_t *mtr,
const char *dict_load_table_low(mtr_t *mtr, bool uncommitted,
const rec_t *rec, dict_table_t **table)
MY_ATTRIBUTE((nonnull, warn_unused_result));
......
......@@ -2869,7 +2869,7 @@ row_rename_table_for_mysql(
dict_names_t fk_tables;
err = dict_load_foreigns(
new_name, nullptr, trx->id,
new_name, nullptr, false, trx->id,
!old_is_tmp || trx->check_foreigns,
use_fk
? DICT_ERR_IGNORE_NONE
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment