Commit dbfe5f47 authored by unknown's avatar unknown

MDEV-5363: Make parallel replication waits killable

Add a test case for killing a waiting query in parallel replication.

Fix several bugs found:

 - We should not wakeup_subsequent_commits() in ha_rollback_trans(), since we
   do not know the right wakeup_error() to give.

 - When a wait_for_prior_commit() is killed, we must unregister from the
   waitee so we do not race and get an extra (non-kill) wakeup.

 - We need to deal with error propagation correctly in queue_for_group_commit
   when one thread is killed.

 - Fix one locking issue in queue_for_group_commit(), we could unlock the
   waitee lock too early and this end up processing wakeup() with insufficient
   locking.

 - Fix Xid_log_event::do_apply_event; if commit fails it must not update the
   in-memory @@gtid_slave_pos state.

 - Fix and cleanup some things in the rpl_parallel.cc error handling.

 - Add a missing check for killed in the slave sql driver thread, to avoid a
   race.
parent b7ae65ef
......@@ -7,6 +7,7 @@ SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc
*** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
......@@ -259,6 +260,78 @@ SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
*** Test killing slave threads at various wait points ***
*** 1. Test killing transaction waiting in commit for previous transaction to commit ***
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
SET binlog_format=statement;
INSERT INTO t3 VALUES (31, foo(31,
'commit_before_prepare_ordered WAIT_FOR t2_waiting',
'commit_after_prepare_ordered SIGNAL t1_ready WAIT_FOR t1_cont'));
SET debug_sync='now WAIT_FOR master_queued1';
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
SET binlog_format=statement;
BEGIN;
INSERT INTO t3 VALUES (32, foo(32,
'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
''));
INSERT INTO t3 VALUES (33, foo(33,
'group_commit_waiting_for_prior SIGNAL t2_waiting',
'group_commit_waiting_for_prior_killed SIGNAL t2_killed'));
COMMIT;
SET debug_sync='now WAIT_FOR master_queued2';
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
SET binlog_format=statement;
INSERT INTO t3 VALUES (34, foo(34,
'',
''));
SET debug_sync='now WAIT_FOR master_queued3';
SET debug_sync='now SIGNAL master_cont1';
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
a b
31 31
32 32
33 33
34 34
SET sql_log_bin=0;
CALL mtr.add_suppression("Query execution was interrupted");
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
SET sql_log_bin=1;
SET debug_sync='now WAIT_FOR t2_query';
SET debug_sync='now SIGNAL t2_cont';
SET debug_sync='now WAIT_FOR t1_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR t2_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1963]
STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
a b
31 31
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
DROP FUNCTION foo;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
RETURN x;
END
||
SET sql_log_bin=1;
INSERT INTO t3 VALUES (39,0);
include/start_slave.inc
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
a b
31 31
32 32
33 33
34 34
39 0
include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
......
......@@ -19,6 +19,7 @@ CHANGE MASTER TO master_use_gtid=slave_pos;
--echo *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
--connection server_1
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
......@@ -333,6 +334,126 @@ SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
--connection server_2
# Respawn all worker threads to clear any left-over debug_sync or other stuff.
--source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
--source include/start_slave.inc
--echo *** Test killing slave threads at various wait points ***
--echo *** 1. Test killing transaction waiting in commit for previous transaction to commit ***
# Set up three transactions on the master that will be group-committed
# together so they can be replicated in parallel on the slave.
--connection con_temp3
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
SET binlog_format=statement;
send INSERT INTO t3 VALUES (31, foo(31,
'commit_before_prepare_ordered WAIT_FOR t2_waiting',
'commit_after_prepare_ordered SIGNAL t1_ready WAIT_FOR t1_cont'));
--connection server_1
SET debug_sync='now WAIT_FOR master_queued1';
--connection con_temp4
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
SET binlog_format=statement;
BEGIN;
# This insert is just so we can get T2 to wait while a query is running that we
# can see in SHOW PROCESSLIST so we can get its thread_id to kill later.
INSERT INTO t3 VALUES (32, foo(32,
'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
''));
# This insert sets up debug_sync points so that T2 will tell when it is at its
# wait point where we want to kill it - and when it has been killed.
INSERT INTO t3 VALUES (33, foo(33,
'group_commit_waiting_for_prior SIGNAL t2_waiting',
'group_commit_waiting_for_prior_killed SIGNAL t2_killed'));
send COMMIT;
--connection server_1
SET debug_sync='now WAIT_FOR master_queued2';
--connection con_temp5
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
SET binlog_format=statement;
send INSERT INTO t3 VALUES (34, foo(34,
'',
''));
--connection server_1
SET debug_sync='now WAIT_FOR master_queued3';
SET debug_sync='now SIGNAL master_cont1';
--connection con_temp3
REAP;
--connection con_temp4
REAP;
--connection con_temp5
REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
--connection server_2
SET sql_log_bin=0;
CALL mtr.add_suppression("Query execution was interrupted");
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
SET sql_log_bin=1;
# Wait until T2 is inside executing its insert of 32, then find it in SHOW
# PROCESSLIST to know its thread id for KILL later.
SET debug_sync='now WAIT_FOR t2_query';
--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(32%' AND INFO NOT LIKE '%LIKE%'`
SET debug_sync='now SIGNAL t2_cont';
# Wait until T2 has entered its wait for T1 to commit, and T1 has
# progressed into its commit phase.
SET debug_sync='now WAIT_FOR t1_ready';
# Now kill the transaction T2.
--replace_result $thd_id THD_ID
eval KILL $thd_id;
# Wait until T2 has reacted on the kill.
SET debug_sync='now WAIT_FOR t2_killed';
# Now we can allow T1 to proceed.
SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1963
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
DROP FUNCTION foo;
--delimiter ||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
RETURN x;
END
||
--delimiter ;
SET sql_log_bin=1;
--connection server_1
INSERT INTO t3 VALUES (39,0);
--save_master_pos
--connection server_2
--source include/start_slave.inc
--sync_with_master
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
--connection server_2
--source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
......
......@@ -1300,7 +1300,10 @@ int ha_commit_trans(THD *thd, bool all)
{
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
{
thd->transaction.cleanup();
thd->wakeup_subsequent_commits(error);
}
DBUG_RETURN(0);
}
......@@ -1334,6 +1337,7 @@ int ha_commit_trans(THD *thd, bool all)
thd->variables.lock_wait_timeout))
{
ha_rollback_trans(thd, all);
thd->wakeup_subsequent_commits(1);
DBUG_RETURN(1);
}
......@@ -1421,6 +1425,7 @@ done:
err:
error= 1; /* Transaction was rolled back */
ha_rollback_trans(thd, all);
thd->wakeup_subsequent_commits(error);
end:
if (rw_trans && mdl_request.ticket)
......@@ -1591,10 +1596,7 @@ int ha_rollback_trans(THD *thd, bool all)
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
{
thd->wakeup_subsequent_commits(error);
thd->transaction.cleanup();
}
if (all)
thd->transaction_rollback_request= FALSE;
......
......@@ -6622,7 +6622,7 @@ int
MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
{
group_commit_entry *entry, *orig_queue;
wait_for_commit *list, *cur, *last;
wait_for_commit *cur, *last;
wait_for_commit *wfc;
DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit");
......@@ -6663,11 +6663,31 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
orig_entry->queued_by_other));
orig_entry->thd->exit_cond(old_msg);
if (wfc->waiting_for_commit)
{
/* Wait terminated due to kill. */
wait_for_commit *loc_waitee= wfc->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running ||
orig_entry->queued_by_other)
{
/* Our waitee is already waking us up, so ignore the kill. */
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
do
{
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
} while (wfc->waiting_for_commit);
}
else
{
/* We were killed, so remove us from the list of waitee. */
wfc->remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
orig_entry->thd->exit_cond(old_msg);
/* Interrupted by kill. */
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior_killed");
wfc->wakeup_error= orig_entry->thd->killed_errno();
if (wfc->wakeup_error)
wfc->wakeup_error= ER_QUERY_INTERRUPTED;
......@@ -6675,6 +6695,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
DBUG_RETURN(-1);
}
}
orig_entry->thd->exit_cond(old_msg);
}
else
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
......@@ -6729,9 +6751,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
used by the caller or any other function.
*/
list= wfc;
cur= list;
last= list;
cur= wfc;
last= wfc;
entry= orig_entry;
for (;;)
{
......@@ -6757,11 +6778,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
*/
if (cur->subsequent_commits_list)
{
bool have_lock;
wait_for_commit *waiter;
wait_for_commit *wakeup_list= NULL;
wait_for_commit **wakeup_next_ptr= &wakeup_list;
mysql_mutex_lock(&cur->LOCK_wait_commit);
have_lock= true;
/*
Grab the list, now safely under lock, and process it if still
non-empty.
......@@ -6802,18 +6823,68 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
For this, we need to set the "wakeup running" flag and release
the waitee lock to avoid a deadlock, see comments on
THD::wakeup_subsequent_commits2() for details.
So we need to put these on a list and delay the wakeup until we
have released the lock.
*/
if (have_lock)
*wakeup_next_ptr= waiter;
wakeup_next_ptr= &waiter->next_subsequent_commit;
}
waiter= next;
}
if (wakeup_list)
{
have_lock= false;
/* Now release our lock and do the wakeups that were delayed above. */
cur->wakeup_subsequent_commits_running= true;
mysql_mutex_unlock(&cur->LOCK_wait_commit);
for (;;)
{
wait_for_commit *next;
/*
ToDo: We wakeup the waiter here, so that it can have the chance to
reach its own commit state and queue up for this same group commit,
if it is still pending.
One problem with this is that if the waiter does not reach its own
commit state before this group commit starts, and then the group
commit fails (binlog write failure), we do not get to propagate
the error to the waiter.
A solution for this could be to delay the wakeup until commit is
successful. But then we need to set a flag in the waitee that it is
already queued for group commit, so that the waiter can check this
flag and queue itself if it _does_ reach the commit state in time.
(But error handling in case of binlog write failure is currently
broken in other ways, as well).
*/
if (&wakeup_list->next_subsequent_commit == wakeup_next_ptr)
{
/* The last one in the list. */
wakeup_list->wakeup(0);
break;
}
waiter->wakeup(0);
/*
Important: don't access wakeup_list->next after the wakeup() call,
it may be invalidated by the other thread.
*/
next= wakeup_list->next_subsequent_commit;
wakeup_list->wakeup(0);
wakeup_list= next;
}
waiter= next;
/*
We need a full memory barrier between walking the list and clearing
the flag wakeup_subsequent_commits_running. This barrier is needed
to ensure that no other thread will start to modify the list
pointers before we are done traversing the list.
But wait_for_commit::wakeup(), which was called above, does a full
memory barrier already (it locks a mutex).
*/
cur->wakeup_subsequent_commits_running= false;
}
if (have_lock)
else
mysql_mutex_unlock(&cur->LOCK_wait_commit);
}
if (cur == last)
......@@ -6827,29 +6898,6 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
DBUG_ASSERT(entry != NULL);
}
/*
Now we need to clear the wakeup_subsequent_commits_running flags.
We need a full memory barrier between walking the list above, and clearing
the flag wakeup_subsequent_commits_running below. This barrier is needed
to ensure that no other thread will start to modify the list pointers
before we are done traversing the list.
But wait_for_commit::wakeup(), which was called above for any other thread
that might modify the list in parallel, does a full memory barrier already
(it locks a mutex).
*/
if (list)
{
for (;;)
{
list->wakeup_subsequent_commits_running= false;
if (list == last)
break;
list= list->next_subsequent_commit;
}
}
if (opt_binlog_commit_wait_count > 0)
mysql_cond_signal(&COND_prepare_ordered);
mysql_mutex_unlock(&LOCK_prepare_ordered);
......
......@@ -7158,7 +7158,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
res= trans_commit(thd); /* Automatically rolls back on error. */
thd->mdl_context.release_transactional_locks();
if (sub_id)
if (!res && sub_id)
rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
/*
......
......@@ -24,7 +24,7 @@ static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt)
{
int err __attribute__((unused));
int err;
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
......@@ -172,6 +172,18 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
}
static void
signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
{
rgi->is_error= true;
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
rgi->rli->relay_log.signal_update();
}
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
......@@ -304,10 +316,8 @@ handle_rpl_parallel_thread(void *arg)
{
/* The thread got a kill signal. */
thd->send_kill_message();
rgi->is_error= true;
slave_output_error_info(rgi->rli, thd);
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
signal_error_to_sql_driver_thread(thd, rgi);
}
rgi->wait_start_sub_id= 0; /* No need to check again. */
}
......@@ -363,10 +373,8 @@ handle_rpl_parallel_thread(void *arg)
if (err)
{
rgi->is_error= true;
slave_output_error_info(rgi->rli, thd);
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
signal_error_to_sql_driver_thread(thd, rgi);
}
if (end_of_group)
{
......@@ -405,11 +413,9 @@ handle_rpl_parallel_thread(void *arg)
half-processed event group.
*/
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
group_rgi->is_error= true;
finish_event_group(thd, 1, group_rgi->gtid_sub_id,
group_rgi->parallel_entry, &group_rgi->commit_orderer);
group_rgi->cleanup_context(thd, true);
group_rgi->rli->abort_slave= true;
signal_error_to_sql_driver_thread(thd, group_rgi);
in_event_group= false;
delete group_rgi;
group_rgi= NULL;
......
......@@ -6241,6 +6241,17 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
rli->ignore_log_space_limit= true;
}
/*
We have to check sql_slave_killed() here an extra time.
Otherwise we may miss a wakeup, since last check was done
without holding LOCK_log.
*/
if (sql_slave_killed(rgi))
{
mysql_mutex_unlock(log_lock);
break;
}
/*
If the I/O thread is blocked, unblock it. Ok to broadcast
after unlock, because the mutex is only destroyed in
......
......@@ -5786,25 +5786,49 @@ int
wait_for_commit::wait_for_prior_commit2(THD *thd)
{
const char *old_msg;
wait_for_commit *loc_waitee;
mysql_mutex_lock(&LOCK_wait_commit);
old_msg= thd->enter_cond(&COND_wait_commit, &LOCK_wait_commit,
"Waiting for prior transaction to commit");
while (waiting_for_commit && !thd->check_killed())
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
thd->exit_cond(old_msg);
waitee= NULL;
if (!waiting_for_commit)
{
if (wakeup_error)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
return wakeup_error;
goto end;
}
/*
Wait was interrupted by kill. We need to unregister our wait and give the
error. But if a wakeup is already in progress, then we must ignore the
kill and not give error, otherwise we get inconsistency between waitee and
waiter as to whether we succeed or fail (eg. we may roll back but waitee
might attempt to commit both us and any subsequent commits waiting for us).
*/
loc_waitee= this->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
/* We are being woken up; ignore the kill and just wait. */
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
do
{
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
} while (waiting_for_commit);
goto end;
}
/* Wait was interrupted by kill, so give the error. */
remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
wakeup_error= thd->killed_errno();
if (!wakeup_error)
wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wakeup_error, ER(wakeup_error), MYF(0));
end:
thd->exit_cond(old_msg);
waitee= NULL;
return wakeup_error;
}
......@@ -5891,7 +5915,6 @@ wait_for_commit::unregister_wait_for_prior_commit2()
if (waiting_for_commit)
{
wait_for_commit *loc_waitee= this->waitee;
wait_for_commit **next_ptr_ptr, *cur;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
......@@ -5909,17 +5932,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
else
{
/* Remove ourselves from the list in the waitee. */
next_ptr_ptr= &loc_waitee->subsequent_commits_list;
while ((cur= *next_ptr_ptr) != NULL)
{
if (cur == this)
{
*next_ptr_ptr= this->next_subsequent_commit;
break;
}
next_ptr_ptr= &cur->next_subsequent_commit;
}
waiting_for_commit= false;
remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
}
}
......
......@@ -1660,6 +1660,25 @@ struct wait_for_commit
if (waiting_for_commit)
unregister_wait_for_prior_commit2();
}
/*
Remove a waiter from the list in the waitee. Used to unregister a wait.
The caller must be holding the locks of both waiter and waitee.
*/
void remove_from_list(wait_for_commit **next_ptr_ptr)
{
wait_for_commit *cur;
while ((cur= *next_ptr_ptr) != NULL)
{
if (cur == this)
{
*next_ptr_ptr= this->next_subsequent_commit;
break;
}
next_ptr_ptr= &cur->next_subsequent_commit;
}
waiting_for_commit= false;
}
void wakeup(int wakeup_error);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment