Commit 370318f8 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-6386: Assertion `thd->transaction.stmt.is_empty() || thd->in_sub_stmt ||...

MDEV-6386: Assertion `thd->transaction.stmt.is_empty() || thd->in_sub_stmt || (thd->state_flags & Open_tables_state::BACKUPS_AVAIL)' fails with parallel replication

The direct cause of the assertion was missing error handling in
record_gtid(). If ha_commit_trans() fails for the statement commit, there was
missing code to catch the error and do ha_rollback_trans() in this case; this
caused close_thread_tables() to assert.

Normally, this error case is not hit, but in this case it was triggered due to
another bug: When a transaction T1 fails during parallel replication, the code
would signal following transactions that they could start to run without
properly marking the error condition. This caused subsequent transactions to
incorrectly start replicating, only to get an error later during their own
commit step. This was particularly serious if the subsequent transactions were
DDL or MyISAM updates, which cannot be rolled back and would leave replication
in an inconsistent state.

Fixed by 1) in case of error, only signal following transactions to continue
once the error has been properly marked and those transactions will know not
to start; and 2) implement proper error handling in record_gtid() in the case
that statement commit fails.
parent b9ddeeff
include/master-slave.inc
[connection master]
ALTER TABLE mysql.gtid_slave_pos ENGINE = InnoDB;
FLUSH LOGS;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) Engine=InnoDB;
include/stop_slave.inc
SET sql_log_bin= 0;
INSERT INTO t1 VALUES (1, 2);
SET sql_log_bin= 1;
CHANGE MASTER TO master_use_gtid= current_pos;
Contents on slave before:
SELECT * FROM t1 ORDER BY a;
a b
1 2
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=8;
CREATE TEMPORARY TABLE t2 LIKE t1;
INSERT INTO t2 VALUE (1, 1);
INSERT INTO t2 VALUE (2, 1);
INSERT INTO t2 VALUE (3, 1);
INSERT INTO t2 VALUE (4, 1);
INSERT INTO t2 VALUE (5, 1);
INSERT INTO t1 SELECT * FROM t2;
DROP TEMPORARY TABLE t2;
Contents on master:
SELECT * FROM t1 ORDER BY a;
a b
1 1
2 1
3 1
4 1
5 1
START SLAVE;
include/wait_for_slave_sql_error.inc [errno=1062]
STOP SLAVE IO_THREAD;
Contents on slave on slave error:
SELECT * FROM t1 ORDER BY a;
a b
1 2
SET sql_log_bin= 0;
DELETE FROM t1 WHERE a=1;
SET sql_log_bin= 1;
include/start_slave.inc
Contents on slave after:
SELECT * FROM t1 ORDER BY a;
a b
1 1
2 1
3 1
4 1
5 1
DROP TABLE t1;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel;
include/start_slave.inc
include/rpl_end.inc
--source include/have_innodb.inc
--source include/master-slave.inc
--connection master
# ToDo: Remove this FLUSH LOGS when MDEV-6403 is fixed.
ALTER TABLE mysql.gtid_slave_pos ENGINE = InnoDB;
FLUSH LOGS;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) Engine=InnoDB;
--sync_slave_with_master
--connection slave
--source include/stop_slave.inc
# Provoke a duplicate key error on replication.
SET sql_log_bin= 0;
INSERT INTO t1 VALUES (1, 2);
SET sql_log_bin= 1;
CHANGE MASTER TO master_use_gtid= current_pos;
--echo Contents on slave before:
SELECT * FROM t1 ORDER BY a;
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=8;
--connection master
CREATE TEMPORARY TABLE t2 LIKE t1;
INSERT INTO t2 VALUE (1, 1);
INSERT INTO t2 VALUE (2, 1);
INSERT INTO t2 VALUE (3, 1);
INSERT INTO t2 VALUE (4, 1);
INSERT INTO t2 VALUE (5, 1);
INSERT INTO t1 SELECT * FROM t2;
DROP TEMPORARY TABLE t2;
--save_master_pos
--echo Contents on master:
SELECT * FROM t1 ORDER BY a;
--connection slave
START SLAVE;
# The slave will stop with a duplicate key error.
# The bug was 1) that the next DROP TEMPORARY TABLE would be allowed to run
# anyway, and 2) that then record_gtid() would get an error during commit
# (since the prior commit failed), and this error was not correctly handled,
# which caused an assertion about closing tables while a statement was still
# active.
--let $slave_sql_errno=1062
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
--echo Contents on slave on slave error:
SELECT * FROM t1 ORDER BY a;
# Resolve the duplicate key error so replication can be resumed.
SET sql_log_bin= 0;
DELETE FROM t1 WHERE a=1;
SET sql_log_bin= 1;
--source include/start_slave.inc
--sync_with_master
--echo Contents on slave after:
SELECT * FROM t1 ORDER BY a;
--connection master
DROP TABLE t1;
--connection slave
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel;
--source include/start_slave.inc
--source include/rpl_end.inc
...@@ -666,7 +666,7 @@ end: ...@@ -666,7 +666,7 @@ end:
if (table_opened) if (table_opened)
{ {
if (err) if (err || (err= ha_commit_trans(thd, FALSE)))
{ {
/* /*
If error, we need to put any remaining elist back into the HASH so we If error, we need to put any remaining elist back into the HASH so we
...@@ -680,13 +680,8 @@ end: ...@@ -680,13 +680,8 @@ end:
} }
ha_rollback_trans(thd, FALSE); ha_rollback_trans(thd, FALSE);
close_thread_tables(thd);
}
else
{
ha_commit_trans(thd, FALSE);
close_thread_tables(thd);
} }
close_thread_tables(thd);
if (in_transaction) if (in_transaction)
thd->mdl_context.release_statement_locks(); thd->mdl_context.release_statement_locks();
else else
......
...@@ -486,7 +486,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -486,7 +486,7 @@ handle_rpl_parallel_thread(void *arg)
(event_type == QUERY_EVENT && (event_type == QUERY_EVENT &&
(((Query_log_event *)events->ev)->is_commit() || (((Query_log_event *)events->ev)->is_commit() ||
((Query_log_event *)events->ev)->is_rollback())); ((Query_log_event *)events->ev)->is_rollback()));
if (group_ending) if (group_ending && likely(!rgi->worker_error))
{ {
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
rgi->mark_start_commit(); rgi->mark_start_commit();
...@@ -498,7 +498,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -498,7 +498,7 @@ handle_rpl_parallel_thread(void *arg)
processing between the event groups as a simple way to ensure that processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly. everything is stopped and cleaned up correctly.
*/ */
if (!rgi->worker_error && !skip_event_group) if (likely(!rgi->worker_error) && !skip_event_group)
err= rpt_handle_event(events, rpt); err= rpt_handle_event(events, rpt);
else else
err= thd->wait_for_prior_commit(); err= thd->wait_for_prior_commit();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment