Commit 36df41ed authored by unknown's avatar unknown

MDEV-4506: Parallel replication.

Add another test case.
parent 39794dc7
...@@ -80,7 +80,6 @@ INSERT INTO t2 VALUES (foo(11, ...@@ -80,7 +80,6 @@ INSERT INTO t2 VALUES (foo(11,
'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3', 'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3',
'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4')); 'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4'));
SET gtid_domain_id=0; SET gtid_domain_id=0;
SET binlog_format=@old_format;
SELECT * FROM t2 WHERE a >= 10 ORDER BY a; SELECT * FROM t2 WHERE a >= 10 ORDER BY a;
a a
10 10
...@@ -108,6 +107,110 @@ slave-bin.000002 # Query # # use `test`; INSERT INTO t2 VALUES (foo(10, ...@@ -108,6 +107,110 @@ slave-bin.000002 # Query # # use `test`; INSERT INTO t2 VALUES (foo(10,
'commit_before_enqueue SIGNAL ready1 WAIT_FOR cont1', 'commit_before_enqueue SIGNAL ready1 WAIT_FOR cont1',
'commit_after_release_LOCK_prepare_ordered SIGNAL ready2')) 'commit_after_release_LOCK_prepare_ordered SIGNAL ready2'))
slave-bin.000002 # Xid # # COMMIT /* XID */ slave-bin.000002 # Xid # # COMMIT /* XID */
FLUSH LOGS;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET debug_sync='RESET';
include/start_slave.inc
*** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
FLUSH LOGS;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
SET binlog_format=@old_format;
BEGIN;
INSERT INTO t3 VALUES (2,102);
BEGIN;
INSERT INTO t3 VALUES (4,104);
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
SET binlog_format=statement;
INSERT INTO t3 VALUES (2, foo(12,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued1 WAIT_FOR slave_cont1',
''));
SET debug_sync='now WAIT_FOR master_queued1';
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
SET binlog_format=statement;
INSERT INTO t3 VALUES (4, foo(14,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued2',
''));
SET debug_sync='now WAIT_FOR master_queued2';
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
SET binlog_format=statement;
INSERT INTO t3 VALUES (6, foo(16,
'group_commit_waiting_for_prior SIGNAL slave_queued3',
''));
SET debug_sync='now WAIT_FOR master_queued3';
SET debug_sync='now SIGNAL master_cont1';
SELECT * FROM t3 ORDER BY a;
a b
1 1
2 12
3 3
4 14
5 5
6 16
7 7
show binlog events in 'master-bin.000002' from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000002 # Binlog_checkpoint # # master-bin.000002
master-bin.000002 # Gtid # # GTID #-#-#
master-bin.000002 # Query # # use `test`; CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB
master-bin.000002 # Gtid # # BEGIN GTID #-#-#
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7)
master-bin.000002 # Xid # # COMMIT /* XID */
master-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=#
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (2, foo(12,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued1 WAIT_FOR slave_cont1',
''))
master-bin.000002 # Xid # # COMMIT /* XID */
master-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=#
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (4, foo(14,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued2',
''))
master-bin.000002 # Xid # # COMMIT /* XID */
master-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=#
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
'group_commit_waiting_for_prior SIGNAL slave_queued3',
''))
master-bin.000002 # Xid # # COMMIT /* XID */
SET debug_sync='now WAIT_FOR slave_queued3';
ROLLBACK;
SET debug_sync='now WAIT_FOR slave_queued1';
ROLLBACK;
SET debug_sync='now WAIT_FOR slave_queued2';
SET debug_sync='now SIGNAL slave_cont1';
SELECT * FROM t3 ORDER BY a;
a b
1 1
2 12
3 3
4 14
5 5
6 16
7 7
show binlog events in 'slave-bin.000003' from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info
slave-bin.000003 # Binlog_checkpoint # # slave-bin.000003
slave-bin.000003 # Gtid # # GTID #-#-#
slave-bin.000003 # Query # # use `test`; CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB
slave-bin.000003 # Gtid # # BEGIN GTID #-#-#
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7)
slave-bin.000003 # Xid # # COMMIT /* XID */
slave-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=#
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (2, foo(12,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued1 WAIT_FOR slave_cont1',
''))
slave-bin.000003 # Xid # # COMMIT /* XID */
slave-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=#
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (4, foo(14,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued2',
''))
slave-bin.000003 # Xid # # COMMIT /* XID */
slave-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=#
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
'group_commit_waiting_for_prior SIGNAL slave_queued3',
''))
slave-bin.000003 # Xid # # COMMIT /* XID */
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL binlog_format=@old_format; SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=0;
...@@ -117,5 +220,5 @@ include/stop_slave.inc ...@@ -117,5 +220,5 @@ include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc include/start_slave.inc
DROP function foo; DROP function foo;
DROP TABLE t1,t2; DROP TABLE t1,t2,t3;
include/rpl_end.inc include/rpl_end.inc
...@@ -29,7 +29,7 @@ INSERT INTO t2 VALUES (1); ...@@ -29,7 +29,7 @@ INSERT INTO t2 VALUES (1);
--sync_with_master --sync_with_master
# Block the table t1 to simulate a replicated query taking a long time. # Block the table t1 to simulate a replicated query taking a long time.
--connect (con_temp,127.0.0.1,root,,test,$SERVER_MYPORT_2,) --connect (con_temp1,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
LOCK TABLE t1 WRITE; LOCK TABLE t1 WRITE;
--connection server_1 --connection server_1
...@@ -53,7 +53,7 @@ INSERT INTO t2 VALUES (6); ...@@ -53,7 +53,7 @@ INSERT INTO t2 VALUES (6);
SELECT * FROM t2 ORDER by a; SELECT * FROM t2 ORDER by a;
--connection con_temp --connection con_temp1
SELECT * FROM t1; SELECT * FROM t1;
UNLOCK TABLES; UNLOCK TABLES;
...@@ -125,7 +125,6 @@ INSERT INTO t2 VALUES (foo(11, ...@@ -125,7 +125,6 @@ INSERT INTO t2 VALUES (foo(11,
'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3', 'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3',
'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4')); 'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4'));
SET gtid_domain_id=0; SET gtid_domain_id=0;
SET binlog_format=@old_format;
SELECT * FROM t2 WHERE a >= 10 ORDER BY a; SELECT * FROM t2 WHERE a >= 10 ORDER BY a;
--connection server_2 --connection server_2
...@@ -148,7 +147,124 @@ SELECT * FROM t2 WHERE a >= 10 ORDER BY a; ...@@ -148,7 +147,124 @@ SELECT * FROM t2 WHERE a >= 10 ORDER BY a;
# BINLOG output). # BINLOG output).
--let $binlog_file= slave-bin.000002 --let $binlog_file= slave-bin.000002
--source include/show_binlog_events.inc --source include/show_binlog_events.inc
FLUSH LOGS;
# Restart all the slave parallel worker threads, to clear all debug_sync actions.
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET debug_sync='RESET';
--source include/start_slave.inc
--echo *** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
--connection server_1
FLUSH LOGS;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
# Create some sentinel rows so that the rows inserted in parallel fall into
# separate gaps and do not cause gap lock conflicts.
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
SET binlog_format=@old_format;
--save_master_pos
--connection server_2
--sync_with_master
# We want to test that the transactions can execute out-of-order on
# the slave, but still end up committing in-order, and in a single
# group commit.
#
# The idea is to group-commit three transactions together on the master:
# A, B, and C. On the slave, C will execute the insert first, then A,
# and then B. But B manages to complete before A has time to commit, so
# all three end up committing together.
#
# So we start by setting up some row locks that will block transactions
# A and B from executing, allowing C to run first.
--connection con_temp1
BEGIN;
INSERT INTO t3 VALUES (2,102);
--connect (con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
BEGIN;
INSERT INTO t3 VALUES (4,104);
# On the master, queue three INSERT transactions as a single group commit.
--connect (con_temp3,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
SET binlog_format=statement;
send INSERT INTO t3 VALUES (2, foo(12,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued1 WAIT_FOR slave_cont1',
''));
--connection server_1
SET debug_sync='now WAIT_FOR master_queued1';
--connect (con_temp4,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
SET binlog_format=statement;
send INSERT INTO t3 VALUES (4, foo(14,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued2',
''));
--connection server_1
SET debug_sync='now WAIT_FOR master_queued2';
--connect (con_temp5,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
SET binlog_format=statement;
send INSERT INTO t3 VALUES (6, foo(16,
'group_commit_waiting_for_prior SIGNAL slave_queued3',
''));
--connection server_1
SET debug_sync='now WAIT_FOR master_queued3';
SET debug_sync='now SIGNAL master_cont1';
--connection con_temp3
REAP;
--connection con_temp4
REAP;
--connection con_temp5
REAP;
--connection server_1
SELECT * FROM t3 ORDER BY a;
--let $binlog_file= master-bin.000002
--source include/show_binlog_events.inc
# First, wait until insert 3 is ready to queue up for group commit, but is
# waiting for insert 2 to commit before it can do so itself.
--connection server_2
SET debug_sync='now WAIT_FOR slave_queued3';
# Next, let insert 1 proceed, and allow it to queue up as the group commit
# leader, but let it wait for insert 2 to also queue up before proceeding.
--connection con_temp1
ROLLBACK;
--connection server_2
SET debug_sync='now WAIT_FOR slave_queued1';
# Now let insert 2 proceed and queue up.
--connection con_temp2
ROLLBACK;
--connection server_2
SET debug_sync='now WAIT_FOR slave_queued2';
# And finally, we can let insert 1 proceed and do the group commit with all
# three insert transactions together.
SET debug_sync='now SIGNAL slave_cont1';
# Wait for the commit to complete and check that all three transactions
# group-committed together (will be seen in the binlog as all three having
# cid=# on their GTID event).
--let $wait_condition= SELECT COUNT(*) = 3 FROM t3 WHERE a IN (2,4,6)
--source include/wait_condition.inc
SELECT * FROM t3 ORDER BY a;
--let $binlog_file= slave-bin.000003
--source include/show_binlog_events.inc
--connection server_2
--source include/stop_slave.inc --source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format; SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=0;
...@@ -163,6 +279,6 @@ SET GLOBAL slave_parallel_threads=@old_parallel_threads; ...@@ -163,6 +279,6 @@ SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--connection server_1 --connection server_1
DROP function foo; DROP function foo;
DROP TABLE t1,t2; DROP TABLE t1,t2,t3;
--source include/rpl_end.inc --source include/rpl_end.inc
...@@ -6573,11 +6573,12 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, ...@@ -6573,11 +6573,12 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
*/ */
bool bool
MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry) MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
{ {
group_commit_entry *orig_queue; group_commit_entry *entry, *orig_queue;
wait_for_commit *list, *cur, *last; wait_for_commit *list, *cur, *last;
wait_for_commit *wfc; wait_for_commit *wfc;
DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit");
/* /*
Check if we need to wait for another transaction to commit before us. Check if we need to wait for another transaction to commit before us.
...@@ -6587,8 +6588,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry) ...@@ -6587,8 +6588,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry)
another safe check under lock, to avoid the race where the other another safe check under lock, to avoid the race where the other
transaction wakes us up between the check and the wait. transaction wakes us up between the check and the wait.
*/ */
wfc= entry->thd->wait_for_commit_ptr; wfc= orig_entry->thd->wait_for_commit_ptr;
entry->queued_by_other= false; orig_entry->queued_by_other= false;
if (wfc && wfc->waiting_for_commit) if (wfc && wfc->waiting_for_commit)
{ {
mysql_mutex_lock(&wfc->LOCK_wait_commit); mysql_mutex_lock(&wfc->LOCK_wait_commit);
...@@ -6604,12 +6605,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry) ...@@ -6604,12 +6605,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry)
get us included in its own group commit. If this happens, the get us included in its own group commit. If this happens, the
queued_by_other flag is set. queued_by_other flag is set.
*/ */
wfc->opaque_pointer= entry; wfc->opaque_pointer= orig_entry;
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
do do
{ {
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
} while (wfc->waiting_for_commit); } while (wfc->waiting_for_commit);
wfc->opaque_pointer= NULL; wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
orig_entry->queued_by_other));
} }
mysql_mutex_unlock(&wfc->LOCK_wait_commit); mysql_mutex_unlock(&wfc->LOCK_wait_commit);
} }
...@@ -6619,12 +6623,12 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry) ...@@ -6619,12 +6623,12 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry)
commit queue (and possibly already done the entire binlog commit for us), commit queue (and possibly already done the entire binlog commit for us),
then there is nothing else to do. then there is nothing else to do.
*/ */
if (entry->queued_by_other) if (orig_entry->queued_by_other)
return false; DBUG_RETURN(false);
/* Now enqueue ourselves in the group commit queue. */ /* Now enqueue ourselves in the group commit queue. */
DEBUG_SYNC(entry->thd, "commit_before_enqueue"); DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue");
entry->thd->clear_wakeup_ready(); orig_entry->thd->clear_wakeup_ready();
mysql_mutex_lock(&LOCK_prepare_ordered); mysql_mutex_lock(&LOCK_prepare_ordered);
orig_queue= group_commit_queue; orig_queue= group_commit_queue;
...@@ -6657,6 +6661,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry) ...@@ -6657,6 +6661,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry)
list= wfc; list= wfc;
cur= list; cur= list;
last= list; last= list;
entry= orig_entry;
for (;;) for (;;)
{ {
/* Add the entry to the group commit queue. */ /* Add the entry to the group commit queue. */
...@@ -6783,9 +6788,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry) ...@@ -6783,9 +6788,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry)
if (opt_binlog_commit_wait_count > 0) if (opt_binlog_commit_wait_count > 0)
mysql_cond_signal(&COND_prepare_ordered); mysql_cond_signal(&COND_prepare_ordered);
mysql_mutex_unlock(&LOCK_prepare_ordered); mysql_mutex_unlock(&LOCK_prepare_ordered);
DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered"); DEBUG_SYNC(orig_entry->thd, "commit_after_release_LOCK_prepare_ordered");
return orig_queue == NULL; DBUG_PRINT("info", ("Queued for group commit as %s\n",
(orig_queue == NULL) ? "leader" : "participant"));
DBUG_RETURN(orig_queue == NULL);
} }
bool bool
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment