Commit 3b961347 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-7888, MDEV-7929: Parallel replication hangs sometimes on ANALYZE TABLE or DDL

The hangs occur when the group_commit_orderer object is freed before the last
mark_start_commit() call on it - this loses the wakeup to other waiting worker
threads, causing them to hang until killed manually.

The object was freed because wakeup_subsequent_commits() was called two early
in two places. For MDEV-7888, during ANALYZE TABLE, and for MDEV-7929 during
record_gtid() after processing a DDL event. The group_commit_orderer object
can be freed when its last transaction has called wait_for_prior_commit().

Fix by implementing a suspend/resume mechanism for wakeup_subsequent_commits()
that can be used in places where a transaction is committed without this being
the commit of the actual replication event group.

Also add a protection mechanism (that asserts in debug builds) which can
prevent the too-early free and hang if other similar bugs should remain in
other parts of the code.
parent 880f2273
...@@ -1421,6 +1421,64 @@ a b ...@@ -1421,6 +1421,64 @@ a b
99 99 99 99
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL slave_transaction_retries= @old_retries; SET GLOBAL slave_transaction_retries= @old_retries;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
*** MDEV-7888: ANALYZE TABLE does wakeup_subsequent_commits(), causing wrong binlog order and parallel replication hang ***
include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug= '+d,inject_analyze_table_sleep';
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @commit_id= 10000;
ANALYZE TABLE t2;
Table Op Msg_type Msg_text
test.t2 analyze status OK
INSERT INTO t3 VALUES (120, 0);
SET @commit_id= 10001;
INSERT INTO t3 VALUES (121, 0);
SET SESSION debug_dbug=@old_dbug;
SELECT * FROM t3 WHERE a >= 120 ORDER BY a;
a b
120 0
121 0
include/save_master_gtid.inc
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t3 WHERE a >= 120 ORDER BY a;
a b
120 0
121 0
include/stop_slave.inc
SET GLOBAL debug_dbug= @old_debug;
include/start_slave.inc
*** MDEV-7929: record_gtid() for non-transactional event group calls wakeup_subsequent_commits() too early, causing slave hang. ***
include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug= '+d,inject_record_gtid_serverid_100_sleep';
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @old_server_id= @@SESSION.server_id;
SET SESSION server_id= 100;
SET @commit_id= 10010;
ALTER TABLE t1 COMMENT "Hulubulu!";
SET SESSION server_id= @old_server_id;
INSERT INTO t3 VALUES (130, 0);
SET @commit_id= 10011;
INSERT INTO t3 VALUES (131, 0);
SET SESSION debug_dbug=@old_dbug;
SELECT * FROM t3 WHERE a >= 130 ORDER BY a;
a b
130 0
131 0
include/save_master_gtid.inc
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t3 WHERE a >= 130 ORDER BY a;
a b
130 0
131 0
include/stop_slave.inc
SET GLOBAL debug_dbug= @old_debug;
include/start_slave.inc include/start_slave.inc
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
......
...@@ -2042,6 +2042,100 @@ SELECT * FROM t8 ORDER BY a; ...@@ -2042,6 +2042,100 @@ SELECT * FROM t8 ORDER BY a;
--source include/stop_slave.inc --source include/stop_slave.inc
SET GLOBAL slave_transaction_retries= @old_retries; SET GLOBAL slave_transaction_retries= @old_retries;
SET GLOBAL slave_parallel_threads=10;
--source include/start_slave.inc
--echo *** MDEV-7888: ANALYZE TABLE does wakeup_subsequent_commits(), causing wrong binlog order and parallel replication hang ***
--connection server_2
--source include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug= '+d,inject_analyze_table_sleep';
--connection server_1
# Inject two group commits. The bug was that ANALYZE TABLE would call
# wakeup_subsequent_commits() too early, allowing the following transaction
# in the same group to run ahead and binlog and free the GCO. Then we get
# wrong binlog order and later access freed GCO, which causes lost wakeup
# of following GCO and thus replication hang.
# We injected a small sleep in ANALYZE to make the race easier to hit (this
# can only cause false negatives in versions with the bug, not false positives,
# so sleep is ok here. And it's in general not possible to trigger reliably
# the race with debug_sync, since the bugfix makes the race impossible).
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
# Group commit with cid=10000, two event groups.
SET @commit_id= 10000;
ANALYZE TABLE t2;
INSERT INTO t3 VALUES (120, 0);
# Group commit with cid=10001, one event group.
SET @commit_id= 10001;
INSERT INTO t3 VALUES (121, 0);
SET SESSION debug_dbug=@old_dbug;
SELECT * FROM t3 WHERE a >= 120 ORDER BY a;
--source include/save_master_gtid.inc
--connection server_2
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t3 WHERE a >= 120 ORDER BY a;
--source include/stop_slave.inc
SET GLOBAL debug_dbug= @old_debug;
--source include/start_slave.inc
--echo *** MDEV-7929: record_gtid() for non-transactional event group calls wakeup_subsequent_commits() too early, causing slave hang. ***
--connection server_2
--source include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug= '+d,inject_record_gtid_serverid_100_sleep';
--connection server_1
# Inject two group commits. The bug was that record_gtid for a
# non-transactional event group would commit its own transaction, which would
# cause ha_commit_trans() to call wakeup_subsequent_commits() too early. This
# in turn lead to access to freed group_commit_orderer object, losing a wakeup
# and causing slave threads to hang.
# We inject a small sleep in the corresponding record_gtid() to make the race
# easier to hit.
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
# Group commit with cid=10010, two event groups.
SET @old_server_id= @@SESSION.server_id;
SET SESSION server_id= 100;
SET @commit_id= 10010;
ALTER TABLE t1 COMMENT "Hulubulu!";
SET SESSION server_id= @old_server_id;
INSERT INTO t3 VALUES (130, 0);
# Group commit with cid=10011, one event group.
SET @commit_id= 10011;
INSERT INTO t3 VALUES (131, 0);
SET SESSION debug_dbug=@old_dbug;
SELECT * FROM t3 WHERE a >= 130 ORDER BY a;
--source include/save_master_gtid.inc
--connection server_2
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t3 WHERE a >= 130 ORDER BY a;
--source include/stop_slave.inc
SET GLOBAL debug_dbug= @old_debug;
--source include/start_slave.inc --source include/start_slave.inc
......
...@@ -5850,6 +5850,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -5850,6 +5850,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
if (direct) if (direct)
{ {
int res; int res;
uint64 commit_id= 0;
DBUG_PRINT("info", ("direct is set")); DBUG_PRINT("info", ("direct is set"));
if ((res= thd->wait_for_prior_commit())) if ((res= thd->wait_for_prior_commit()))
DBUG_RETURN(res); DBUG_RETURN(res);
...@@ -5857,7 +5858,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -5857,7 +5858,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
my_org_b_tell= my_b_tell(file); my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log); mysql_mutex_lock(&LOCK_log);
prev_binlog_id= current_binlog_id; prev_binlog_id= current_binlog_id;
if (write_gtid_event(thd, true, using_trans, 0)) DBUG_EXECUTE_IF("binlog_force_commit_id",
{
const LEX_STRING name= { C_STRING_WITH_LEN("commit_id") };
bool null_value;
user_var_entry *entry=
(user_var_entry*) my_hash_search(&thd->user_vars,
(uchar*) name.str, name.length);
commit_id= entry->val_int(&null_value);
});
if (write_gtid_event(thd, true, using_trans, commit_id))
goto err; goto err;
} }
else else
......
...@@ -515,6 +515,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, ...@@ -515,6 +515,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
element *elem; element *elem;
ulonglong thd_saved_option= thd->variables.option_bits; ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup; Query_tables_list lex_backup;
wait_for_commit* suspended_wfc;
DBUG_ENTER("record_gtid"); DBUG_ENTER("record_gtid");
if (unlikely(!loaded)) if (unlikely(!loaded))
...@@ -538,6 +539,28 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, ...@@ -538,6 +539,28 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
DBUG_RETURN(1); DBUG_RETURN(1);
} ); } );
/*
If we are applying a non-transactional event group, we will be committing
here a transaction, but that does not imply that the event group has
completed or has been binlogged. So we should not trigger
wakeup_subsequent_commits() here.
Note: An alternative here could be to put a call to mark_start_commit() in
stmt_done() before the call to record_and_update_gtid(). This would
prevent later calling mark_start_commit() after we have run
wakeup_subsequent_commits() from committing the GTID update transaction
(which must be avoided to avoid accessing freed group_commit_orderer
object). It would also allow following event groups to start slightly
earlier. And in the cases where record_gtid() is called without an active
transaction, the current statement should have been binlogged already, so
binlog order is preserved.
But this is rather subtle, and potentially fragile. And it does not really
seem worth it; non-transactional loads are unlikely to benefit much from
parallel replication in any case. So for now, we go with the simple
suspend/resume of wakeup_subsequent_commits() here in record_gtid().
*/
suspended_wfc= thd->suspend_subsequent_commits();
thd->lex->reset_n_backup_query_tables_list(&lex_backup); thd->lex->reset_n_backup_query_tables_list(&lex_backup);
tlist.init_one_table(STRING_WITH_LEN("mysql"), tlist.init_one_table(STRING_WITH_LEN("mysql"),
rpl_gtid_slave_state_table_name.str, rpl_gtid_slave_state_table_name.str,
...@@ -689,6 +712,12 @@ IF_DBUG(dbug_break:, ) ...@@ -689,6 +712,12 @@ IF_DBUG(dbug_break:, )
} }
thd->lex->restore_backup_query_tables_list(&lex_backup); thd->lex->restore_backup_query_tables_list(&lex_backup);
thd->variables.option_bits= thd_saved_option; thd->variables.option_bits= thd_saved_option;
thd->resume_subsequent_commits(suspended_wfc);
DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep",
{
if (gtid->server_id == 100)
my_sleep(500000);
});
DBUG_RETURN(err); DBUG_RETURN(err);
} }
......
...@@ -171,8 +171,24 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, ...@@ -171,8 +171,24 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
/* Now free any GCOs in which all transactions have committed. */ /* Now free any GCOs in which all transactions have committed. */
group_commit_orderer *tmp_gco= rgi->gco; group_commit_orderer *tmp_gco= rgi->gco;
while (tmp_gco && while (tmp_gco &&
(!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id)) (!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id ||
tmp_gco->next_gco->wait_count > entry->count_committing_event_groups))
{
/*
We must not free a GCO before the wait_count of the following GCO has
been reached and wakeup has been sent. Otherwise we will lose the
wakeup and hang (there were several such bugs in the past).
The intention is that this is ensured already since we only free when
the last event group in the GCO has committed
(tmp_gco->last_sub_id <= sub_id). However, if we have a bug, we have
extra check on next_gco->wait_count to hopefully avoid hanging; we
have here an assertion in debug builds that this check does not in
fact trigger.
*/
DBUG_ASSERT(!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id);
tmp_gco= tmp_gco->prev_gco; tmp_gco= tmp_gco->prev_gco;
}
while (tmp_gco) while (tmp_gco)
{ {
group_commit_orderer *prev_gco= tmp_gco->prev_gco; group_commit_orderer *prev_gco= tmp_gco->prev_gco;
......
...@@ -320,6 +320,7 @@ static bool mysql_admin_table(THD* thd, TABLE_LIST* tables, ...@@ -320,6 +320,7 @@ static bool mysql_admin_table(THD* thd, TABLE_LIST* tables,
int result_code; int result_code;
int compl_result_code; int compl_result_code;
bool need_repair_or_alter= 0; bool need_repair_or_alter= 0;
wait_for_commit* suspended_wfc;
DBUG_ENTER("mysql_admin_table"); DBUG_ENTER("mysql_admin_table");
DBUG_PRINT("enter", ("extra_open_options: %u", extra_open_options)); DBUG_PRINT("enter", ("extra_open_options: %u", extra_open_options));
...@@ -337,6 +338,13 @@ static bool mysql_admin_table(THD* thd, TABLE_LIST* tables, ...@@ -337,6 +338,13 @@ static bool mysql_admin_table(THD* thd, TABLE_LIST* tables,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
/*
This function calls trans_commit() during its operation, but that does not
imply that the operation is complete or binlogged. So we have to suspend
temporarily the wakeup_subsequent_commits() calls (if used).
*/
suspended_wfc= thd->suspend_subsequent_commits();
mysql_ha_rm_tables(thd, tables); mysql_ha_rm_tables(thd, tables);
/* /*
...@@ -464,7 +472,7 @@ static bool mysql_admin_table(THD* thd, TABLE_LIST* tables, ...@@ -464,7 +472,7 @@ static bool mysql_admin_table(THD* thd, TABLE_LIST* tables,
if (!table->table->part_info) if (!table->table->part_info)
{ {
my_error(ER_PARTITION_MGMT_ON_NONPARTITIONED, MYF(0)); my_error(ER_PARTITION_MGMT_ON_NONPARTITIONED, MYF(0));
DBUG_RETURN(TRUE); goto err2;
} }
if (set_part_state(alter_info, table->table->part_info, PART_ADMIN)) if (set_part_state(alter_info, table->table->part_info, PART_ADMIN))
{ {
...@@ -1045,6 +1053,8 @@ static bool mysql_admin_table(THD* thd, TABLE_LIST* tables, ...@@ -1045,6 +1053,8 @@ static bool mysql_admin_table(THD* thd, TABLE_LIST* tables,
} }
my_eof(thd); my_eof(thd);
thd->resume_subsequent_commits(suspended_wfc);
DBUG_EXECUTE_IF("inject_analyze_table_sleep", my_sleep(500000););
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
err: err:
...@@ -1058,6 +1068,8 @@ static bool mysql_admin_table(THD* thd, TABLE_LIST* tables, ...@@ -1058,6 +1068,8 @@ static bool mysql_admin_table(THD* thd, TABLE_LIST* tables,
} }
close_thread_tables(thd); // Shouldn't be needed close_thread_tables(thd); // Shouldn't be needed
thd->mdl_context.release_transactional_locks(); thd->mdl_context.release_transactional_locks();
err2:
thd->resume_subsequent_commits(suspended_wfc);
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
......
...@@ -3682,6 +3682,15 @@ class THD :public Statement, ...@@ -3682,6 +3682,15 @@ class THD :public Statement,
if (wait_for_commit_ptr) if (wait_for_commit_ptr)
wait_for_commit_ptr->wakeup_subsequent_commits(wakeup_error); wait_for_commit_ptr->wakeup_subsequent_commits(wakeup_error);
} }
wait_for_commit *suspend_subsequent_commits() {
wait_for_commit *suspended= wait_for_commit_ptr;
wait_for_commit_ptr= NULL;
return suspended;
}
void resume_subsequent_commits(wait_for_commit *suspended) {
DBUG_ASSERT(!wait_for_commit_ptr);
wait_for_commit_ptr= suspended;
}
private: private:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment