Commit 96784eb1 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-7668: Intermediate master groups CREATE TEMPORARY with INSERT, causing...

MDEV-7668: Intermediate master groups CREATE TEMPORARY with INSERT, causing parallel replication failure

Parallel replication depends on locking (table locks, row locks, etc.) to
prevent two conflicting transactions from running and committing in parallel.
But temporary tables are designed to be visible only to one thread, and have
no such locking.

In the concrete issue, an intermediate master could commit a CREATE TEMPORARY
TABLE in the same group commit as in INSERT into that table. Thus, a
lower-level master could attempt to run them in parallel and get an error.

More generally, we need protection from parallel replication trying to run
transactions in parallel that access a common temporary table.

This patch simply causes use of a temporary table from parallel replication
to wait for all previous transactions to commit, serialising the replication
at that point.

(A more fine-grained locking could be added later, possibly. However,
using temporary tables in statement-based replication is in any case
normally undesirable; for example a restart of the server will lose
temporary tables and can break replication).

Note that row-based replication is not affected, as it does not do any
temporary tables on the slave-side.

This patch also cleans up the locking around protecting the list of
temporary tables in Relay_log_info. This used to take the
rli->data_lock at the end of every statement, which is very bad for
concurrency. With this patch, the lock is not taken unless temporary
tables (with statement-based binlogging) are in use on the slave.
parent 040027c8
include/rpl_init.inc [topology=1->2->3]
*** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure ***
SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
SET GLOBAL binlog_direct_non_transactional_updates=OFF;
SET SESSION binlog_direct_non_transactional_updates=OFF;
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
include/stop_slave.inc
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=10;
SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count;
SET GLOBAL binlog_commit_wait_count=2;
SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec;
SET GLOBAL binlog_commit_wait_usec=2000000;
SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
SET GLOBAL binlog_direct_non_transactional_updates=OFF;
SET SESSION binlog_direct_non_transactional_updates=OFF;
CHANGE MASTER TO master_use_gtid=current_pos;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=current_pos;
BEGIN;
CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY;
COMMIT;
INSERT INTO t2 VALUES (1);
INSERT INTO t1 SELECT a, a*10 FROM t2;
DROP TABLE t2;
include/save_master_gtid.inc
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 10
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 10
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
SET GLOBAL binlog_commit_wait_count=@old_commit_count;
SET GLOBAL binlog_commit_wait_usec=@old_commit_usec;
SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
include/start_slave.inc
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table");
DROP TABLE t1;
include/rpl_end.inc
!include ../my.cnf
[mysqld.1]
log-slave-updates
loose-innodb
[mysqld.2]
log-slave-updates
loose-innodb
[mysqld.3]
log-slave-updates
loose-innodb
[ENV]
SERVER_MYPORT_3= @mysqld.3.port
SERVER_MYSOCK_3= @mysqld.3.socket
--source include/have_innodb.inc
--let $rpl_topology=1->2->3
--source include/rpl_init.inc
--echo *** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure ***
--connection server_1
SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
SET GLOBAL binlog_direct_non_transactional_updates=OFF;
SET SESSION binlog_direct_non_transactional_updates=OFF;
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
--save_master_pos
--connection server_2
--sync_with_master
--save_master_pos
--source include/stop_slave.inc
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=10;
SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count;
SET GLOBAL binlog_commit_wait_count=2;
SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec;
SET GLOBAL binlog_commit_wait_usec=2000000;
SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
SET GLOBAL binlog_direct_non_transactional_updates=OFF;
SET SESSION binlog_direct_non_transactional_updates=OFF;
CHANGE MASTER TO master_use_gtid=current_pos;
--connection server_3
--sync_with_master
--save_master_pos
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=current_pos;
--connection server_1
BEGIN;
CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY;
COMMIT;
INSERT INTO t2 VALUES (1);
INSERT INTO t1 SELECT a, a*10 FROM t2;
DROP TABLE t2;
--source include/save_master_gtid.inc
--connection server_2
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
--connection server_3
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
# Clean up
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
SET GLOBAL binlog_commit_wait_count=@old_commit_count;
SET GLOBAL binlog_commit_wait_usec=@old_commit_usec;
SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
--source include/start_slave.inc
--connection server_3
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc
--connection server_1
SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table");
DROP TABLE t1;
--source include/rpl_end.inc
...@@ -661,6 +661,8 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd) ...@@ -661,6 +661,8 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
if (thd->temporary_tables)
{
thd->lock_temporary_tables(); thd->lock_temporary_tables();
for (TABLE *table= thd->temporary_tables ; table ; table= table->next) for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
{ {
...@@ -676,6 +678,7 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd) ...@@ -676,6 +678,7 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd)
*/ */
thd->temporary_tables= 0; thd->temporary_tables= 0;
} }
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -5586,6 +5589,14 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton, ...@@ -5586,6 +5589,14 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton,
(uint) thd->variables.server_id, (uint) thd->variables.server_id,
(ulong) thd->variables.pseudo_thread_id)); (ulong) thd->variables.pseudo_thread_id));
if (add_to_temporary_tables_list)
{
/* Temporary tables are not safe for parallel replication. */
if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
thd->wait_for_prior_commit())
return NULL;
}
/* Create the cache_key for temporary tables */ /* Create the cache_key for temporary tables */
key_length= create_tmp_table_def_key(thd, cache_key, db, table_name); key_length= create_tmp_table_def_key(thd, cache_key, db, table_name);
...@@ -5822,6 +5833,25 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl) ...@@ -5822,6 +5833,25 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl)
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
/*
Temporary tables are not safe for parallel replication. They were
designed to be visible to one thread only, so have no table locking.
Thus there is no protection against two conflicting transactions
committing in parallel and things like that.
So for now, anything that uses temporary tables will be serialised
with anything before it, when using parallel replication.
ToDo: We might be able to introduce a reference count or something
on temp tables, and have slave worker threads wait for it to reach
zero before being allowed to use the temp table. Might not be worth
it though, as statement-based replication using temporary tables is
in any case rather fragile.
*/
if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
thd->wait_for_prior_commit())
DBUG_RETURN(true);
#ifdef WITH_PARTITION_STORAGE_ENGINE #ifdef WITH_PARTITION_STORAGE_ENGINE
if (tl->partition_names) if (tl->partition_names)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment