Commit 88733282 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-32050: Look up tables in the purge coordinator

The InnoDB table lookup in purge worker threads is a bottleneck that can
degrade a slow shutdown to utilize less than 2 threads. Let us fix that
bottleneck by constructing a local lookup table that does not require any
synchronization while the undo log records of the current batch
are being processed.

TRX_PURGE_TABLE_BUCKETS: The initial number of std::unordered_map
hash buckets used during a purge batch. This could avoid some
resizing and rehashing in trx_purge_attach_undo_recs().

purge_node_t::tables: A lookup table from table ID to an already
looked up and locked table. Replaces many fields.

trx_purge_attach_undo_recs(): Look up each table in the purge batch
only once.

trx_purge(): Close all tables and release MDL at the end of the batch.

trx_purge_table_open(), trx_purge_table_acquire(): Open a table in purge
and acquire a metadata lock on it. This replaces
dict_table_open_on_id<true>() and dict_acquire_mdl_shared().

purge_sys_t::close_and_reopen(): In case of an MDL conflict, close and
reopen all tables that are covered by the current purge batch.
It may be that some of the tables have been dropped meanwhile and can
be ignored. This replaces wait_SYS() and wait_FTS().

row_purge_parse_undo_rec(): Make purge_coordinator_task issue a
MDL warrant to any purge_worker_task which might need it
when innodb_purge_threads>1.

purge_node_t::end(): Clear the MDL warrant.

Reviewed by: Vladislav Lesin and Vladislav Vaintroub
parent 39bb5ebb
SET GLOBAL innodb_max_purge_lag_wait=0;
connect stop_purge,localhost,root;
START TRANSACTION WITH CONSISTENT SNAPSHOT;
connection default;
CREATE TABLE t1(c1 INT NOT NULL, c2 INT, PRIMARY KEY(c1)) Engine=InnoDB;
SHOW CREATE TABLE t1;
Table Create Table
......@@ -143,6 +147,7 @@ ERROR 23000: Duplicate entry '0' for key 'i'
SET DEBUG_SYNC='now SIGNAL log2';
connection con1;
disconnect con1;
disconnect stop_purge;
connection default;
SET DEBUG_SYNC='RESET';
DROP TABLE t1;
......
......@@ -437,6 +437,7 @@ COUNT(c22f)
CHECK TABLE t1;
Table Op Msg_type Msg_text
test.t1 check status OK
SET GLOBAL innodb_max_purge_lag_wait=0;
ALTER TABLE t1 ADD UNIQUE INDEX c3p5(c3(5));
ERROR 23000: Duplicate entry 'NULL' for key 'c3p5'
UPDATE t1 SET c3 = NULL WHERE c3 = '';
......
......@@ -274,6 +274,7 @@ WHERE variable_name = 'innodb_encryption_n_rowlog_blocks_encrypted');
SET @rowlog_decrypt_1=
(SELECT variable_value FROM information_schema.global_status
WHERE variable_name = 'innodb_encryption_n_rowlog_blocks_decrypted');
SET GLOBAL innodb_max_purge_lag_wait=0;
SET DEBUG_SYNC = 'row_log_table_apply1_before SIGNAL rebuilt3 WAIT_FOR dml3_done';
ALTER TABLE t1 ADD PRIMARY KEY(c22f), CHANGE c2 c22f INT;
ERROR 42000: Multiple primary key defined
......
......@@ -42,14 +42,14 @@ TRUNCATE TABLE t2;
ERROR HY000: Table 't2' is read only
TRUNCATE TABLE t3;
ERROR HY000: Table 't3' is read only
# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0
# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0 --skip-innodb-fast-shutdown
TRUNCATE TABLE t1;
TRUNCATE TABLE t2;
TRUNCATE TABLE t3;
corrupted SYS_TABLES.MIX_LEN for test/t1
corrupted SYS_TABLES.MIX_LEN for test/t2
corrupted SYS_TABLES.MIX_LEN for test/t3
# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0
# restart: --innodb-data-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-log-group-home-dir=MYSQLTEST_VARDIR/tmp/row_format_redundant --innodb-data-file-path=ibdata1:1M:autoextend --innodb-undo-tablespaces=0 --innodb-stats-persistent=0 --skip-innodb-fast-shutdown
TRUNCATE TABLE t1;
ERROR 42S02: Table 'test.t1' doesn't exist in engine
TRUNCATE TABLE t2;
......
......@@ -6,6 +6,10 @@
--source include/big_test.inc
let $MYSQLD_DATADIR= `select @@datadir`;
SET GLOBAL innodb_max_purge_lag_wait=0;
connect (stop_purge,localhost,root);
START TRANSACTION WITH CONSISTENT SNAPSHOT;
connection default;
#
# Test for BUG# 12739098, check whether trx->error_status is reset on error.
......@@ -155,6 +159,7 @@ SET DEBUG_SYNC='now SIGNAL log2';
--connection con1
reap;
--disconnect con1
--disconnect stop_purge
--connection default
SET DEBUG_SYNC='RESET';
DROP TABLE t1;
......
......@@ -369,6 +369,8 @@ SELECT
connection con1;
SELECT COUNT(c22f) FROM t1;
CHECK TABLE t1;
# Avoid a strange DEBUG_SYNC timeout on c3p5_created.
SET GLOBAL innodb_max_purge_lag_wait=0;
# Create a column prefix index.
--error ER_DUP_ENTRY
......
......@@ -255,6 +255,7 @@ SET @rowlog_decrypt_1=
(SELECT variable_value FROM information_schema.global_status
WHERE variable_name = 'innodb_encryption_n_rowlog_blocks_decrypted');
SET GLOBAL innodb_max_purge_lag_wait=0;
# Accumulate and apply some modification log.
SET DEBUG_SYNC = 'row_log_table_apply1_before SIGNAL rebuilt3 WAIT_FOR dml3_done';
--error ER_MULTIPLE_PRI_KEY
......
......@@ -78,7 +78,7 @@ TRUNCATE TABLE t2;
--error ER_OPEN_AS_READONLY
TRUNCATE TABLE t3;
--let $restart_parameters= $d
--let $restart_parameters= $d --skip-innodb-fast-shutdown
--source include/restart_mysqld.inc
TRUNCATE TABLE t1;
......
......@@ -269,6 +269,7 @@ Table Op Msg_type Msg_text
test.t1 check status OK
connection default;
UNLOCK TABLES;
SET GLOBAL innodb_max_purge_lag_wait=0;
connection con2;
DROP TABLE t1;
connection default;
......@@ -348,6 +349,7 @@ SELECT * FROM t1;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
connection default;
UNLOCK TABLES;
SET GLOBAL innodb_max_purge_lag_wait=0;
connection con2;
DROP TABLE t1;
disconnect con2;
......
......@@ -114,6 +114,8 @@ CHECK TABLE t1;
--connection default
UNLOCK TABLES;
# Avoid a MDL conflict between purge and the DROP TABLE below
SET GLOBAL innodb_max_purge_lag_wait=0;
--connection con2
DROP TABLE t1;
......@@ -222,6 +224,8 @@ SELECT * FROM t1;
--connection default
UNLOCK TABLES;
# Avoid a MDL conflict between purge and the DROP TABLE below
SET GLOBAL innodb_max_purge_lag_wait=0;
--connection con2
DROP TABLE t1;
......
......@@ -662,7 +662,7 @@ dict_table_t::parse_name<>(char(&)[NAME_LEN + 1], char(&)[NAME_LEN + 1],
@param[in] table_op operation to perform when opening
@return table object after locking MDL shared
@retval nullptr if the table is not readable, or if trylock && MDL blocked */
template<bool trylock, bool purge_thd>
template<bool trylock>
dict_table_t*
dict_acquire_mdl_shared(dict_table_t *table,
THD *thd,
......@@ -678,7 +678,6 @@ dict_acquire_mdl_shared(dict_table_t *table,
if (trylock)
{
static_assert(!trylock || !purge_thd, "usage");
dict_sys.freeze(SRW_LOCK_CALL);
db_len= dict_get_db_name_len(table->name.m_name);
dict_sys.unfreeze();
......@@ -749,13 +748,7 @@ dict_acquire_mdl_shared(dict_table_t *table,
}
}
retry_table_open:
dict_sys.freeze(SRW_LOCK_CALL);
if (purge_thd && purge_sys.must_wait_FTS())
{
not_found= reinterpret_cast<dict_table_t*>(-1);
goto return_without_mdl;
}
table= dict_sys.find_table(table_id);
if (table)
table->acquire();
......@@ -763,11 +756,6 @@ dict_acquire_mdl_shared(dict_table_t *table,
{
dict_sys.unfreeze();
dict_sys.lock(SRW_LOCK_CALL);
if (purge_thd && purge_sys.must_wait_FTS())
{
dict_sys.unlock();
goto retry_table_open;
}
table= dict_load_table_on_id(table_id,
table_op == DICT_TABLE_OP_LOAD_TABLESPACE
? DICT_ERR_IGNORE_RECOVER_LOCK
......@@ -826,24 +814,21 @@ dict_acquire_mdl_shared(dict_table_t *table,
goto retry;
}
template dict_table_t* dict_acquire_mdl_shared<false, false>
template dict_table_t* dict_acquire_mdl_shared<false>
(dict_table_t*,THD*,MDL_ticket**,dict_table_op_t);
template dict_table_t* dict_acquire_mdl_shared<true, false>
template dict_table_t* dict_acquire_mdl_shared<true>
(dict_table_t*,THD*,MDL_ticket**,dict_table_op_t);
/** Look up a table by numeric identifier.
@tparam purge_thd Whether the function is called by purge thread
@param[in] table_id table identifier
@param[in] dict_locked data dictionary locked
@param[in] table_op operation to perform when opening
@param[in,out] thd background thread, or NULL to not acquire MDL
@param[out] mdl mdl ticket, or NULL
@return table, NULL if does not exist */
template <bool purge_thd>
dict_table_t*
dict_table_open_on_id(table_id_t table_id, bool dict_locked,
dict_table_op_t table_op, THD *thd,
MDL_ticket **mdl)
dict_table_t *dict_table_open_on_id(table_id_t table_id, bool dict_locked,
dict_table_op_t table_op, THD *thd,
MDL_ticket **mdl)
{
if (!dict_locked)
dict_sys.freeze(SRW_LOCK_CALL);
......@@ -852,16 +837,9 @@ dict_table_open_on_id(table_id_t table_id, bool dict_locked,
if (table)
{
if (purge_thd && purge_sys.must_wait_FTS())
{
table= reinterpret_cast<dict_table_t*>(-1);
goto func_exit;
}
table->acquire();
if (thd && !dict_locked)
table= dict_acquire_mdl_shared<false, purge_thd>(
table, thd, mdl, table_op);
table= dict_acquire_mdl_shared<false>(table, thd, mdl, table_op);
}
else if (table_op != DICT_TABLE_OP_OPEN_ONLY_IF_CACHED)
{
......@@ -875,44 +853,26 @@ dict_table_open_on_id(table_id_t table_id, bool dict_locked,
? DICT_ERR_IGNORE_RECOVER_LOCK
: DICT_ERR_IGNORE_FK_NOKEY);
if (table)
{
if (purge_thd && purge_sys.must_wait_FTS())
{
dict_sys.unlock();
return reinterpret_cast<dict_table_t*>(-1);
}
table->acquire();
}
if (!dict_locked)
{
dict_sys.unlock();
if (table && thd)
{
dict_sys.freeze(SRW_LOCK_CALL);
table= dict_acquire_mdl_shared<false, purge_thd>(
table, thd, mdl, table_op);
table= dict_acquire_mdl_shared<false>(table, thd, mdl, table_op);
dict_sys.unfreeze();
}
return table;
}
}
func_exit:
if (!dict_locked)
dict_sys.unfreeze();
return table;
}
template dict_table_t* dict_table_open_on_id<false>
(table_id_t table_id, bool dict_locked,
dict_table_op_t table_op, THD *thd,
MDL_ticket **mdl);
template dict_table_t* dict_table_open_on_id<true>
(table_id_t table_id, bool dict_locked,
dict_table_op_t table_op, THD *thd,
MDL_ticket **mdl);
/********************************************************************//**
Looks for column n position in the clustered index.
@return position in internal representation of the clustered index */
......
......@@ -1609,10 +1609,11 @@ and common table associated with the fts table.
already stopped*/
void purge_sys_t::stop_FTS(const dict_table_t &table, bool already_stopped)
{
dict_sys.lock(SRW_LOCK_CALL);
if (!already_stopped)
purge_sys.stop_FTS();
dict_sys.lock(SRW_LOCK_CALL);
fts_table_t fts_table;
char table_name[MAX_FULL_NAME_LEN];
......
......@@ -132,7 +132,7 @@ enum dict_table_op_t {
@param[in] table_op operation to perform when opening
@return table object after locking MDL shared
@retval NULL if the table is not readable, or if trylock && MDL blocked */
template<bool trylock, bool purge_thd= false>
template<bool trylock>
dict_table_t*
dict_acquire_mdl_shared(dict_table_t *table,
THD *thd,
......@@ -146,7 +146,6 @@ dict_acquire_mdl_shared(dict_table_t *table,
@param[in,out] thd background thread, or NULL to not acquire MDL
@param[out] mdl mdl ticket, or NULL
@return table, NULL if does not exist */
template<bool purge_thd= false>
dict_table_t*
dict_table_open_on_id(table_id_t table_id, bool dict_locked,
dict_table_op_t table_op, THD *thd= nullptr,
......
......@@ -24,8 +24,7 @@ Purge obsolete records
Created 3/14/1997 Heikki Tuuri
*******************************************************/
#ifndef row0purge_h
#define row0purge_h
#pragma once
#include "que0types.h"
#include "btr0types.h"
......@@ -35,6 +34,7 @@ Created 3/14/1997 Heikki Tuuri
#include "row0mysql.h"
#include "mysqld.h"
#include <queue>
#include <unordered_map>
class MDL_ticket;
/** Determines if it is possible to remove a secondary index entry.
......@@ -89,155 +89,70 @@ struct trx_purge_rec_t
roll_ptr_t roll_ptr;
};
/* Purge node structure */
struct purge_node_t{
que_common_t common; /*!< node type: QUE_NODE_PURGE */
/*----------------------*/
/* Local storage for this graph node */
roll_ptr_t roll_ptr;/* roll pointer to undo log record */
undo_no_t undo_no;/*!< undo number of the record */
byte rec_type;/*!< undo log record type: TRX_UNDO_INSERT_REC,
... */
byte cmpl_info;/* compiler analysis info of an update */
private:
/** latest unavailable table ID (do not bother looking up again) */
table_id_t unavailable_table_id;
/** the latest modification of the table definition identified by
unavailable_table_id, or TRX_ID_MAX */
trx_id_t def_trx_id;
public:
dict_table_t* table; /*!< table where purge is done */
upd_t* update; /*!< update vector for a clustered index
record */
const dtuple_t* ref; /*!< NULL, or row reference to the next row to
handle */
dtuple_t* row; /*!< NULL, or a copy (also fields copied to
heap) of the indexed fields of the row to
handle */
dict_index_t* index; /*!< NULL, or the next index whose record should
be handled */
mem_heap_t* heap; /*!< memory heap used as auxiliary storage for
row; this must be emptied after a successful
purge of a row */
ibool found_clust;/*!< whether the clustered index record
determined by ref was found in the clustered
index, and we were able to position pcur on
it */
btr_pcur_t pcur; /*!< persistent cursor used in searching the
clustered index record */
#ifdef UNIV_DEBUG
/** whether the operation is in progress */
bool in_progress;
#endif
trx_id_t trx_id; /*!< trx id for this purging record */
/** meta-data lock for the table name */
MDL_ticket* mdl_ticket;
/** table id of the previous undo log record */
table_id_t last_table_id;
/** purge thread */
THD* purge_thd;
/** metadata lock holds for this number of undo log recs */
int mdl_hold_recs;
/** Purge worker context */
struct purge_node_t
{
/** node type: QUE_NODE_PURGE */
que_common_t common;
/** Undo recs to purge */
std::queue<trx_purge_rec_t> undo_recs;
/** DB_TRX_ID of the undo log record */
trx_id_t trx_id;
/** DB_ROLL_PTR pointing to undo log record */
roll_ptr_t roll_ptr;
/** Constructor */
explicit purge_node_t(que_thr_t* parent) :
common(QUE_NODE_PURGE, parent),
unavailable_table_id(0),
table(NULL),
heap(mem_heap_create(256)),
/** undo number of the record */
undo_no_t undo_no;
/** record type: TRX_UNDO_INSERT_REC, ... */
byte rec_type;
/** compiler analysis info of an update */
byte cmpl_info;
/** whether the clustered index record determined by ref was found
in the clustered index of the table, and we were able to position
pcur on it */
bool found_clust;
#ifdef UNIV_DEBUG
in_progress(false),
/** whether the operation is in progress */
bool in_progress= false;
#endif
mdl_ticket(NULL),
last_table_id(0),
purge_thd(NULL),
mdl_hold_recs(0)
{
}
/** table where purge is done */
dict_table_t *table= nullptr;
/** update vector for a clustered index record */
upd_t *update;
/** row reference to the next row to handle, or nullptr */
const dtuple_t *ref;
/** nullptr, or a deep copy of the indexed fields of the row to handle */
dtuple_t *row;
/** nullptr, or the next index of table whose record should be handled */
dict_index_t *index;
/** memory heap used as auxiliary storage; must be emptied between rows */
mem_heap_t *heap;
/** persistent cursor to the clustered index record */
btr_pcur_t pcur;
/** Undo recs to purge */
std::queue<trx_purge_rec_t> undo_recs;
/** map of table identifiers to table handles and meta-data locks */
std::unordered_map<table_id_t, std::pair<dict_table_t*,MDL_ticket*>> tables;
/** Constructor */
explicit purge_node_t(que_thr_t *parent) :
common(QUE_NODE_PURGE, parent), heap(mem_heap_create(256)),
tables(TRX_PURGE_TABLE_BUCKETS) {}
#ifdef UNIV_DEBUG
/***********************************************************//**
Validate the persisent cursor. The purge node has two references
to the clustered index record - one via the ref member, and the
other via the persistent cursor. These two references must match
each other if the found_clust flag is set.
@return true if the persistent cursor is consistent with
the ref member.*/
bool validate_pcur();
/** Validate the persistent cursor. The purge node has two references
to the clustered index record: ref and pcur, which must match
each other if found_clust.
@return whether pcur is consistent with ref */
bool validate_pcur();
#endif
/** Determine if a table should be skipped in purge.
@param[in] table_id table identifier
@return whether to skip the table lookup and processing */
bool is_skipped(table_id_t id) const
{
return id == unavailable_table_id && trx_id <= def_trx_id;
}
/** Remember that a table should be skipped in purge.
@param[in] id table identifier
@param[in] limit last transaction for which to skip */
void skip(table_id_t id, trx_id_t limit)
{
DBUG_ASSERT(limit >= trx_id);
unavailable_table_id = id;
def_trx_id = limit;
}
/** Start processing an undo log record. */
inline void start();
/** Close the existing table and release the MDL for it. */
void close_table()
{
last_table_id= 0;
if (!table)
{
ut_ad(!mdl_ticket);
return;
}
innobase_reset_background_thd(purge_thd);
dict_table_close(table, false, purge_thd, mdl_ticket);
table= nullptr;
mdl_ticket= nullptr;
}
/** Retail mdl for the table id.
@param[in] table_id table id to be processed
@return true if retain mdl */
bool retain_mdl(table_id_t table_id)
{
ut_ad(table_id);
if (last_table_id == table_id && mdl_hold_recs < 100)
{
ut_ad(table);
mdl_hold_recs++;
return true;
}
mdl_hold_recs= 0;
close_table();
return false;
}
/** Reset the state at end
@return the query graph parent */
inline que_node_t *end();
inline que_node_t *end(THD *);
};
#endif
......@@ -135,7 +135,11 @@ class purge_sys_t
that are older than view will be removed. */
ReadViewBase view;
/** whether purge is enabled; protected by latch and std::atomic */
std::atomic<bool> m_enabled;
std::atomic<bool> m_enabled{false};
public:
/** whether purge is active (may hold table handles) */
std::atomic<bool> m_active{false};
private:
/** number of pending stop() calls without resume() */
Atomic_counter<uint32_t> m_paused;
/** number of stop_SYS() calls without resume_SYS() */
......@@ -252,33 +256,30 @@ class purge_sys_t
}
/** @return whether the purge tasks are active */
bool running() const;
static bool running();
/** Stop purge during FLUSH TABLES FOR EXPORT. */
void stop();
/** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */
void resume();
/** Close and reopen all tables in case of a MDL conflict with DDL */
dict_table_t *close_and_reopen(table_id_t id, THD *thd, MDL_ticket **mdl);
private:
void wait_SYS();
void wait_FTS();
/** Suspend purge during a DDL operation on FULLTEXT INDEX tables */
void wait_FTS(bool also_sys);
public:
/** Suspend purge in data dictionary tables */
void stop_SYS();
void stop_SYS() { m_SYS_paused++; }
/** Resume purge in data dictionary tables */
static void resume_SYS(void *);
/** @return whether stop_SYS() is in effect */
bool must_wait_SYS() const { return m_SYS_paused; }
/** check stop_SYS() */
void check_stop_SYS() { if (must_wait_SYS()) wait_SYS(); }
/** Pause purge during a DDL operation that could drop FTS_ tables. */
void stop_FTS() { m_FTS_paused++; }
void stop_FTS();
/** Resume purge after stop_FTS(). */
void resume_FTS() { ut_d(const auto p=) m_FTS_paused--; ut_ad(p); }
/** @return whether stop_SYS() is in effect */
bool must_wait_FTS() const { return m_FTS_paused; }
/** check stop_SYS() */
void check_stop_FTS() { if (must_wait_FTS()) wait_FTS(); }
/** Determine if the history of a transaction is purgeable.
@param trx_id transaction identifier
......@@ -313,6 +314,8 @@ class purge_sys_t
template<bool also_end_view= false>
void clone_oldest_view()
{
if (!also_end_view)
wait_FTS(true);
latch.wr_lock(SRW_LOCK_CALL);
trx_sys.clone_oldest_view(&view);
if (also_end_view)
......
......@@ -109,6 +109,11 @@ typedef byte trx_undo_rec_t;
typedef std::vector<trx_id_t, ut_allocator<trx_id_t> > trx_ids_t;
/** Number of std::unordered_map hash buckets expected to be needed
for table IDs in a purge batch. GNU libstdc++ would default to 1 and
enlarge and rehash on demand. */
static constexpr size_t TRX_PURGE_TABLE_BUCKETS= 128;
/** The number of rollback segments; rollback segment id must fit in
the 7 bits reserved for it in DB_ROLL_PTR. */
static constexpr unsigned TRX_SYS_N_RSEGS= 128;
......
......@@ -48,6 +48,7 @@ Created 3/14/1997 Heikki Tuuri
#include "ha_innodb.h"
#include "fil0fil.h"
#include "debug_sync.h"
#include <mysql/service_thd_mdl.h>
/*************************************************************************
IMPORTANT NOTE: Any operation that generates redo MUST check that there
......@@ -114,7 +115,6 @@ row_purge_remove_clust_if_poss_low(
if (table_id) {
retry:
purge_sys.check_stop_FTS();
dict_sys.lock(SRW_LOCK_CALL);
table = dict_sys.find_table(table_id);
if (!table) {
......@@ -189,7 +189,6 @@ row_purge_remove_clust_if_poss_low(
ibuf_delete_for_discarded_space(space_id);
}
purge_sys.check_stop_SYS();
mtr.start();
index->set_modified(mtr);
......@@ -640,25 +639,12 @@ row_purge_del_mark(
return result;
}
void purge_sys_t::wait_SYS()
{
while (must_wait_SYS())
std::this_thread::sleep_for(std::chrono::seconds(1));
}
void purge_sys_t::wait_FTS()
{
while (must_wait_FTS())
std::this_thread::sleep_for(std::chrono::seconds(1));
}
/** Reset DB_TRX_ID, DB_ROLL_PTR of a clustered index record
whose old history can no longer be observed.
@param[in,out] node purge node
@param[in,out] mtr mini-transaction (will be started and committed) */
static void row_purge_reset_trx_id(purge_node_t* node, mtr_t* mtr)
{
retry:
/* Reset DB_TRX_ID, DB_ROLL_PTR for old records. */
mtr->start();
......@@ -694,17 +680,6 @@ static void row_purge_reset_trx_id(purge_node_t* node, mtr_t* mtr)
ut_ad(!rec_get_deleted_flag(
rec, rec_offs_comp(offsets))
|| rec_is_alter_metadata(rec, *index));
switch (node->table->id) {
case DICT_TABLES_ID:
case DICT_COLUMNS_ID:
case DICT_INDEXES_ID:
if (purge_sys.must_wait_SYS()) {
mtr->commit();
purge_sys.check_stop_SYS();
goto retry;
}
}
DBUG_LOG("purge", "reset DB_TRX_ID="
<< ib::hex(row_get_rec_trx_id(
rec, index, offsets)));
......@@ -1045,10 +1020,7 @@ row_purge_parse_undo_rec(
case TRX_UNDO_EMPTY:
case TRX_UNDO_INSERT_METADATA:
case TRX_UNDO_INSERT_REC:
/* These records do not store any transaction identifier.
FIXME: Update SYS_TABLES.ID on both DISCARD TABLESPACE
and IMPORT TABLESPACE to get rid of the repeated lookups! */
/* These records do not store any transaction identifier. */
node->trx_id = TRX_ID_MAX;
break;
default:
......@@ -1064,58 +1036,30 @@ row_purge_parse_undo_rec(
break;
}
if (node->is_skipped(table_id)) {
auto &tables_entry= node->tables[table_id];
node->table = tables_entry.first;
if (!node->table) {
return false;
}
trx_id_t trx_id = TRX_ID_MAX;
if (node->retain_mdl(table_id)) {
ut_ad(node->table != NULL);
goto already_locked;
}
try_again:
purge_sys.check_stop_FTS();
node->table = dict_table_open_on_id<true>(
table_id, false, DICT_TABLE_OP_NORMAL, node->purge_thd,
&node->mdl_ticket);
if (node->table == reinterpret_cast<dict_table_t*>(-1)) {
/* purge stop signal */
goto try_again;
}
if (!node->table) {
/* The table has been dropped: no need to do purge and
release mdl happened as a part of open process itself */
goto err_exit;
#ifndef DBUG_OFF
if (MDL_ticket* mdl = tables_entry.second) {
static_cast<MDL_context*>(thd_mdl_context(current_thd))
->lock_warrant = mdl->get_ctx();
}
already_locked:
#endif
ut_ad(!node->table->is_temporary());
clust_index = dict_table_get_first_index(node->table);
if (!clust_index || clust_index->is_corrupted()) {
if (clust_index->is_corrupted()) {
/* The table was corrupt in the data dictionary.
dict_set_corrupted() works on an index, and
we do not have an index to call it with. */
DBUG_ASSERT(table_id == node->table->id);
trx_id = node->table->def_trx_id;
if (!trx_id) {
trx_id = TRX_ID_MAX;
}
err_exit:
node->close_table();
node->skip(table_id, trx_id);
return false;
}
node->last_table_id = table_id;
switch (type) {
case TRX_UNDO_INSERT_METADATA:
node->ref = &trx_undo_metadata;
......@@ -1265,19 +1209,19 @@ inline void purge_node_t::start()
found_clust= false;
rec_type= 0;
cmpl_info= 0;
if (!purge_thd)
purge_thd= current_thd;
}
/** Reset the state at end
@return the query graph parent */
inline que_node_t *purge_node_t::end()
inline que_node_t *purge_node_t::end(THD *thd)
{
DBUG_ASSERT(common.type == QUE_NODE_PURGE);
close_table();
ut_ad(undo_recs.empty());
ut_d(in_progress= false);
purge_thd= nullptr;
innobase_reset_background_thd(thd);
#ifndef DBUG_OFF
static_cast<MDL_context*>(thd_mdl_context(thd))->lock_warrant= nullptr;
#endif
mem_heap_empty(heap);
return common.parent;
}
......@@ -1305,7 +1249,7 @@ row_purge_step(
row_purge(node, purge_rec.undo_rec, thr);
}
thr->run_node = node->end();
thr->run_node = node->end(current_thd);
return(thr);
}
......
......@@ -1294,43 +1294,35 @@ void purge_sys_t::wake_if_not_active()
}
/** @return whether the purge tasks are active */
bool purge_sys_t::running() const
bool purge_sys_t::running()
{
return purge_coordinator_task.is_running();
}
/** Suspend purge in data dictionary tables */
void purge_sys_t::stop_SYS()
void purge_sys_t::stop_FTS()
{
latch.rd_lock(SRW_LOCK_CALL);
++m_SYS_paused;
m_FTS_paused++;
latch.rd_unlock();
while (m_active)
std::this_thread::sleep_for(std::chrono::seconds(1));
}
/** Stop purge during FLUSH TABLES FOR EXPORT */
void purge_sys_t::stop()
{
for (;;)
{
latch.wr_lock(SRW_LOCK_CALL);
if (!enabled())
{
/* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
ut_ad(!srv_undo_sources);
latch.wr_unlock();
return;
}
ut_ad(srv_n_purge_threads > 0);
if (!must_wait_SYS())
break;
latch.wr_lock(SRW_LOCK_CALL);
if (!enabled())
{
/* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
ut_ad(!srv_undo_sources);
latch.wr_unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
return;
}
ut_ad(srv_n_purge_threads > 0);
const auto paused= m_paused++;
latch.wr_unlock();
......@@ -1346,9 +1338,8 @@ void purge_sys_t::stop()
/** Resume purge in data dictionary tables */
void purge_sys_t::resume_SYS(void *)
{
ut_d(const auto s=)
purge_sys.m_SYS_paused--;
ut_ad(s);
ut_d(auto paused=) purge_sys.m_SYS_paused--;
ut_ad(paused);
}
/** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */
......
......@@ -38,10 +38,10 @@ Created 3/26/1996 Heikki Tuuri
#include "trx0roll.h"
#include "trx0rseg.h"
#include "trx0trx.h"
#include "dict0load.h"
#include <mysql/service_thd_mdl.h>
#include <mysql/service_wsrep.h>
#include <unordered_map>
/** Maximum allowable purge history length. <=0 means 'infinite'. */
ulong srv_max_purge_lag = 0;
......@@ -169,7 +169,6 @@ void purge_sys_t::create()
ut_ad(!heap);
ut_ad(!enabled());
m_paused= 0;
m_SYS_paused= 0;
query= purge_graph_build();
next_stored= false;
rseg= NULL;
......@@ -1100,11 +1099,177 @@ trx_purge_fetch_next_rec(
return trx_purge_get_next_rec(n_pages_handled, heap);
}
/** Close all tables that were opened in a purge batch for a worker.
@param node purge task context
@param thd purge coordinator thread handle */
static void trx_purge_close_tables(purge_node_t *node, THD *thd)
{
for (auto &t : node->tables)
{
if (!t.second.first);
else if (t.second.first == reinterpret_cast<dict_table_t*>(-1));
else
{
dict_table_close(t.second.first, false, thd, t.second.second);
t.second.first= reinterpret_cast<dict_table_t*>(-1);
}
}
}
void purge_sys_t::wait_FTS(bool also_sys)
{
bool paused;
do
{
latch.wr_lock(SRW_LOCK_CALL);
paused= m_FTS_paused || (also_sys && m_SYS_paused);
latch.wr_unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
while (paused);
}
__attribute__((nonnull))
/** Aqcuire a metadata lock on a table.
@param table table handle
@param mdl_context metadata lock acquisition context
@param mdl metadata lcok
@return table handle
@retval nullptr if the table is not found or accessible
@retval -1 if the purge of history must be suspended due to DDL */
static dict_table_t *trx_purge_table_acquire(dict_table_t *table,
MDL_context *mdl_context,
MDL_ticket **mdl)
{
ut_ad(dict_sys.frozen_not_locked());
*mdl= nullptr;
if (!table->is_readable() || table->corrupted)
{
table->release();
return nullptr;
}
size_t db_len= dict_get_db_name_len(table->name.m_name);
if (db_len == 0)
return table; /* InnoDB system tables are not covered by MDL */
if (purge_sys.must_wait_FTS())
{
must_wait:
table->release();
return reinterpret_cast<dict_table_t*>(-1);
}
char db_buf[NAME_LEN + 1];
char tbl_buf[NAME_LEN + 1];
size_t tbl_len;
if (!table->parse_name<true>(db_buf, tbl_buf, &db_len, &tbl_len))
/* The name of an intermediate table starts with #sql */
return table;
{
MDL_request request;
MDL_REQUEST_INIT(&request,MDL_key::TABLE, db_buf, tbl_buf, MDL_SHARED,
MDL_EXPLICIT);
if (mdl_context->try_acquire_lock(&request))
goto must_wait;
*mdl= request.ticket;
if (!*mdl)
goto must_wait;
}
return table;
}
/** Open a table handle for the purge of committed transaction history
@param table_id InnoDB table identifier
@param mdl_context metadata lock acquisition context
@param mdl metadata lcok
@return table handle
@retval nullptr if the table is not found or accessible
@retval -1 if the purge of history must be suspended due to DDL */
static dict_table_t *trx_purge_table_open(table_id_t table_id,
MDL_context *mdl_context,
MDL_ticket **mdl)
{
dict_sys.freeze(SRW_LOCK_CALL);
dict_table_t *table= dict_sys.find_table(table_id);
if (table)
table->acquire();
else
{
dict_sys.unfreeze();
dict_sys.lock(SRW_LOCK_CALL);
table= dict_load_table_on_id(table_id, DICT_ERR_IGNORE_FK_NOKEY);
if (table)
table->acquire();
dict_sys.unlock();
if (!table)
return nullptr;
dict_sys.freeze(SRW_LOCK_CALL);
}
table= trx_purge_table_acquire(table, mdl_context, mdl);
dict_sys.unfreeze();
return table;
}
ATTRIBUTE_COLD
dict_table_t *purge_sys_t::close_and_reopen(table_id_t id, THD *thd,
MDL_ticket **mdl)
{
MDL_context *mdl_context= static_cast<MDL_context*>(thd_mdl_context(thd));
ut_ad(mdl_context);
retry:
ut_ad(m_active);
for (que_thr_t *thr= UT_LIST_GET_FIRST(purge_sys.query->thrs); thr;
thr= UT_LIST_GET_NEXT(thrs, thr))
{
purge_node_t *node= static_cast<purge_node_t*>(thr->child);
trx_purge_close_tables(node, thd);
}
m_active= false;
wait_FTS(false);
m_active= true;
dict_table_t *table= trx_purge_table_open(id, mdl_context, mdl);
if (table == reinterpret_cast<dict_table_t*>(-1))
goto retry;
for (que_thr_t *thr= UT_LIST_GET_FIRST(purge_sys.query->thrs); thr;
thr= UT_LIST_GET_NEXT(thrs, thr))
{
purge_node_t *node= static_cast<purge_node_t*>(thr->child);
for (auto &t : node->tables)
{
if (t.second.first)
{
t.second.first= trx_purge_table_open(t.first, mdl_context,
&t.second.second);
if (t.second.first == reinterpret_cast<dict_table_t*>(-1))
{
if (table)
dict_table_close(table, false, thd, *mdl);
goto retry;
}
}
}
}
return table;
}
/** Run a purge batch.
@param n_purge_threads number of purge threads
@return new purge_sys.head and the number of undo log pages handled */
static std::pair<purge_sys_t::iterator,ulint>
trx_purge_attach_undo_recs(ulint n_purge_threads)
trx_purge_attach_undo_recs(ulint n_purge_threads, THD *thd)
{
que_thr_t* thr;
ulint i;
......@@ -1146,8 +1311,14 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
i = 0;
std::unordered_map<table_id_t, purge_node_t*> table_id_map;
std::unordered_map<table_id_t, purge_node_t*>
table_id_map(TRX_PURGE_TABLE_BUCKETS);
mem_heap_empty(purge_sys.heap);
purge_sys.m_active = true;
MDL_context* const mdl_context
= static_cast<MDL_context*>(thd_mdl_context(thd));
ut_ad(mdl_context);
while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) {
trx_purge_rec_t purge_rec;
......@@ -1174,9 +1345,17 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
table_id_t table_id = trx_undo_rec_get_table_id(
purge_rec.undo_rec);
purge_node_t *& table_node = table_id_map[table_id];
purge_node_t*& table_node = table_id_map[table_id];
if (!table_node) {
std::pair<dict_table_t*,MDL_ticket*> p;
p.first = trx_purge_table_open(table_id, mdl_context,
&p.second);
if (p.first == reinterpret_cast<dict_table_t*>(-1)) {
p.first = purge_sys.close_and_reopen(
table_id, thd, &p.second);
}
thr = UT_LIST_GET_NEXT(thrs, thr);
if (!(++i % n_purge_threads)) {
......@@ -1186,15 +1365,24 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
table_node = static_cast<purge_node_t*>(thr->child);
ut_a(que_node_get_type(table_node) == QUE_NODE_PURGE);
ut_d(auto i=)
table_node->tables.emplace(table_id, p);
ut_ad(i.second);
if (p.first) {
goto enqueue;
}
} else if (table_node->tables[table_id].first) {
enqueue:
table_node->undo_recs.push(purge_rec);
}
table_node->undo_recs.push(purge_rec);
if (n_pages_handled >= srv_purge_batch_size) {
break;
}
}
purge_sys.m_active = false;
ut_ad(head <= purge_sys.tail);
return std::make_pair(head, n_pages_handled);
......@@ -1260,8 +1448,10 @@ TRANSACTIONAL_TARGET ulint trx_purge(ulint n_tasks, ulint history_size)
}
#endif /* UNIV_DEBUG */
THD* const thd = current_thd;
/* Fetch the UNDO recs that need to be purged. */
const auto n = trx_purge_attach_undo_recs(n_tasks);
const auto n = trx_purge_attach_undo_recs(n_tasks, thd);
{
ulint delay = n.second ? srv_max_purge_lag : 0;
......@@ -1297,6 +1487,13 @@ TRANSACTIONAL_TARGET ulint trx_purge(ulint n_tasks, ulint history_size)
trx_purge_wait_for_workers_to_complete();
for (thr = UT_LIST_GET_FIRST(purge_sys.query->thrs); thr;
thr = UT_LIST_GET_NEXT(thrs, thr)) {
purge_node_t* node = static_cast<purge_node_t*>(thr->child);
trx_purge_close_tables(node, thd);
node->tables.clear();
}
purge_sys.clone_end_view(n.first);
MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment