Commit 9b80f930 authored by Sujatha's avatar Sujatha

MDEV-20645: Replication consistency is broken as workers miss the error...

MDEV-20645: Replication consistency is broken as workers miss the error notification from an earlier failed group.

Analysis:
========
In general if there are three groups.
1 - Inserts 32 which fails due to local entry '32' on slave.
2 - Inserts 33
3 - Inserts 34

Each group considers itself as a waiter and it waits for prior group 'waitee'.
This is done in 'register_wait_for_prior_event_group_commit'. If there is no
other parallel group being scheduled then no waitee will be there.

Let us assume 3 groups are being scheduled in parallel.

3-> waits for 2-> waits for->1

'1' upon completion it checks is there any registered subsequent waiter. If
so it wakes up the subsequent waiter with its execution status. This execution
status is stored in wakeup_error.

If '1' failed then it sends corresponding wakeup_error to 2. Then '2' aborts
and it propagates error to '3'.  So all further commits are aborted.  This
mechanism works only when all transactions reach a stage where they are
waiting for their prior commit to complete.

In case of optimistic following scenario occurs.

1,2,3 are scheduled in parallel.

3 - Reaches group_commit_code waits for 2 to complete.
1 - errors out sets stop_on_error_sub_id=1.

When a group execution results in error its corresponding sub_id is set to
'stop_on_error_sub_id'. Any new groups queued for execution will check if
their sub_id is > stop_on_error_sub_id.  If it is true their execution will be
skipped as prior group execution failed.  'skip_event_group=1' will be set.
Since the execution of SQL thread is about to stop we just skip execution of
all the following event groups.  We still do all the normal waiting and wakeup
processing between the event groups as a simple way to ensure that everything
is stopped and cleaned up correctly.

Upon error '1' transaction checks for registered waiters. Since no one is
there it simply goes away.

2 - Starts the execution. It checks do I have a waitee.

Since wait_commit_sub_id == entry->last_committed_sub_id no waitee is set.

Secondly: 'entry->stop_on_error_sub_id' is set by '1'st execution.  Now
'handle_parallel_thread' code checks if the current group 'sub_id' is greater
than the 'sub_id' set within 'stop_on_error_sub_id'.

Since the above is true 'skip_event_group=true' is set.  Simply call
'wait_for_prior_commit' to wakeup all waiters.  Group '2' didn't had any
waitee and its execution is skipped.  Hence its wakeup_error=0.It sends a
positive wakeup signal to '3'. Which commits. This results in a missed
transaction. i.e 33 is missed and 34 is committed.

Fix:
===
When a worker learns that an earlier transaction execution has failed, and it
should not proceed for further execution, it should mark its own execution
status as failed so that it alerts its followers to abort as well.
parent 677cc644
include/master-slave.inc
[connection master]
connection server_2;
include/stop_slave.inc
connection server_2;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL slave_parallel_mode='optimistic';
SET GLOBAL slave_parallel_threads= 3;
CHANGE MASTER TO master_use_gtid=slave_pos;
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
include/start_slave.inc
connection server_2;
connection server_1;
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=InnoDB;
include/save_master_gtid.inc
connection server_1;
connection server_2;
include/sync_with_master_gtid.inc
connection server_2;
connect con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_2,;
BEGIN;
INSERT INTO t1 VALUES (32);
connection server_1;
INSERT INTO t1 VALUES (32);
connection server_2;
SET GLOBAL debug_dbug="+d,hold_worker_on_schedule";
SET debug_sync="debug_sync_action SIGNAL reached_pause WAIT_FOR continue_worker";
connection server_1;
SET gtid_seq_no=100;
INSERT INTO t1 VALUES (33);
connection server_2;
SET debug_sync='now WAIT_FOR reached_pause';
connection server_1;
INSERT INTO t1 VALUES (34);
connection server_2;
connection con_temp2;
COMMIT;
connection server_2;
include/stop_slave.inc
connection server_2;
include/assert.inc [table t1 should have zero rows where a>32]
connection server_2;
SELECT * FROM t1 WHERE a>32;
a
DELETE FROM t1 WHERE a=32;
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
SET GLOBAL slave_parallel_mode=@old_parallel_mode;
SET GLOBAL debug_dbug=@old_debug;
SET DEBUG_SYNC= 'RESET';
include/start_slave.inc
connection server_2;
connection server_1;
DROP TABLE t1;
include/rpl_end.inc
--source suite/rpl/include/rpl_parallel_ignored_errors.inc
# ==== Purpose ====
#
# Test verifies that, in parallel replication, transaction failure notification
# is propagated to all the workers. Workers should abort the execution of
# transaction event groups, whose event positions are higher than the failing
# transaction group.
#
# ==== Implementation ====
#
# Steps:
# 0 - Create a table t1 on master which has a primary key. Enable parallel
# replication on slave with slave_parallel_mode='optimistic' and
# slave_parallel_threads=3.
# 1 - On slave start a transaction and execute a local INSERT statement
# which will insert value 32. This is done to block the INSERT coming
# from master.
# 2 - On master execute an INSERT statement with value 32, so that it is
# blocked on slave.
# 3 - On slave enable a debug sync point such that it holds the worker thread
# execution as soon as work is scheduled to it.
# 4 - INSERT value 33 on master. It will be held on slave by other worker
# thread due to debug simulation.
# 5 - INSERT value 34 on master.
# 6 - On slave, enusre that INSERT 34 has reached a state where it waits for
# its prior transactions to commit.
# 7 - Commit the local INSERT 32 on slave server so that first worker will
# error out.
# 8 - Now send a continue signal to second worker processing 33. It should
# wakeup and propagate the error to INSERT 34.
# 9 - Upon slave stop due to error, check that no rows are found after the
# failed INSERT 32.
#
# ==== References ====
#
# MDEV-20645: Replication consistency is broken as workers miss the error
# notification from an earlier failed group.
#
--source include/have_innodb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
--source include/have_binlog_format_statement.inc
--source include/master-slave.inc
--enable_connect_log
--connection server_2
--source include/stop_slave.inc
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL slave_parallel_mode='optimistic';
SET GLOBAL slave_parallel_threads= 3;
CHANGE MASTER TO master_use_gtid=slave_pos;
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
--source include/start_slave.inc
--connection server_1
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=InnoDB;
--source include/save_master_gtid.inc
--connection server_2
--source include/sync_with_master_gtid.inc
--connect (con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
BEGIN;
INSERT INTO t1 VALUES (32);
--connection server_1
INSERT INTO t1 VALUES (32);
--connection server_2
--let $wait_condition= SELECT COUNT(*) = 1 FROM information_schema.processlist WHERE info like "INSERT INTO t1 VALUES (32)"
--source include/wait_condition.inc
SET GLOBAL debug_dbug="+d,hold_worker_on_schedule";
SET debug_sync="debug_sync_action SIGNAL reached_pause WAIT_FOR continue_worker";
--connection server_1
SET gtid_seq_no=100;
INSERT INTO t1 VALUES (33);
--connection server_2
SET debug_sync='now WAIT_FOR reached_pause';
--connection server_1
INSERT INTO t1 VALUES (34);
--connection server_2
--let $wait_condition= SELECT COUNT(*) = 1 FROM information_schema.processlist WHERE state like "Waiting for prior transaction to commit"
--source include/wait_condition.inc
--connection con_temp2
COMMIT;
# Clean up.
--connection server_2
--source include/stop_slave.inc
--let $assert_cond= COUNT(*) = 0 FROM t1 WHERE a>32
--let $assert_text= table t1 should have zero rows where a>32
--source include/assert.inc
SELECT * FROM t1 WHERE a>32;
DELETE FROM t1 WHERE a=32;
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
SET GLOBAL slave_parallel_mode=@old_parallel_mode;
SET GLOBAL debug_dbug=@old_debug;
SET DEBUG_SYNC= 'RESET';
--source include/start_slave.inc
--connection server_1
DROP TABLE t1;
--disable_connect_log
--source include/rpl_end.inc
include/master-slave.inc
[connection master]
connection server_2;
include/stop_slave.inc
connection server_2;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL slave_parallel_mode='optimistic';
SET GLOBAL slave_parallel_threads= 3;
CHANGE MASTER TO master_use_gtid=slave_pos;
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
include/start_slave.inc
connection server_2;
connection server_1;
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=InnoDB;
include/save_master_gtid.inc
connection server_1;
connection server_2;
include/sync_with_master_gtid.inc
connection server_2;
connect con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_2,;
BEGIN;
INSERT INTO t1 VALUES (32);
connection server_1;
INSERT INTO t1 VALUES (32);
connection server_2;
SET GLOBAL debug_dbug="+d,hold_worker_on_schedule";
SET debug_sync="debug_sync_action SIGNAL reached_pause WAIT_FOR continue_worker";
connection server_1;
SET gtid_seq_no=100;
INSERT INTO t1 VALUES (33);
connection server_2;
SET debug_sync='now WAIT_FOR reached_pause';
connection server_1;
INSERT INTO t1 VALUES (34);
connection server_2;
connection con_temp2;
COMMIT;
connection server_2;
include/stop_slave.inc
connection server_2;
include/assert.inc [table t1 should have zero rows where a>32]
connection server_2;
SELECT * FROM t1 WHERE a>32;
a
DELETE FROM t1 WHERE a=32;
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
SET GLOBAL slave_parallel_mode=@old_parallel_mode;
SET GLOBAL debug_dbug=@old_debug;
SET DEBUG_SYNC= 'RESET';
include/start_slave.inc
connection server_2;
connection server_1;
DROP TABLE t1;
include/rpl_end.inc
--source include/rpl_parallel_ignored_errors.inc
...@@ -228,6 +228,12 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, ...@@ -228,6 +228,12 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX) entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
entry->stop_on_error_sub_id= sub_id; entry->stop_on_error_sub_id= sub_id;
mysql_mutex_unlock(&entry->LOCK_parallel_entry); mysql_mutex_unlock(&entry->LOCK_parallel_entry);
DBUG_EXECUTE_IF("hold_worker_on_schedule", {
if (entry->stop_on_error_sub_id < (uint64)ULONGLONG_MAX)
{
debug_sync_set_action(thd, STRING_WITH_LEN("now SIGNAL continue_worker"));
}
});
if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING) if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
wait_for_pending_deadlock_kill(thd, rgi); wait_for_pending_deadlock_kill(thd, rgi);
...@@ -1096,6 +1102,13 @@ handle_rpl_parallel_thread(void *arg) ...@@ -1096,6 +1102,13 @@ handle_rpl_parallel_thread(void *arg)
bool did_enter_cond= false; bool did_enter_cond= false;
PSI_stage_info old_stage; PSI_stage_info old_stage;
DBUG_EXECUTE_IF("hold_worker_on_schedule", {
if (rgi->current_gtid.domain_id == 0 &&
rgi->current_gtid.seq_no == 100) {
debug_sync_set_action(thd,
STRING_WITH_LEN("now SIGNAL reached_pause WAIT_FOR continue_worker"));
}
});
DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", { DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", {
if (rgi->current_gtid.domain_id == 0 && if (rgi->current_gtid.domain_id == 0 &&
rgi->current_gtid.seq_no == 100) { rgi->current_gtid.seq_no == 100) {
...@@ -1137,7 +1150,10 @@ handle_rpl_parallel_thread(void *arg) ...@@ -1137,7 +1150,10 @@ handle_rpl_parallel_thread(void *arg)
skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage); skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
{
skip_event_group= true; skip_event_group= true;
rgi->worker_error= 1;
}
if (likely(!skip_event_group)) if (likely(!skip_event_group))
do_ftwrl_wait(rgi, &did_enter_cond, &old_stage); do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment