Commit 90a76ff2 authored by unknown's avatar unknown

Merge into 10.0-base: MDEV-5363, make parallel replication waits killable.

There are some places in parallel replication where transactions wait
for one another. Make sure those waits are killable by the (super)user.

Upon kill, unfinished transactions will be rolled back and the SQL thread
stops with an error.

Add a number of test cases to test the different cases. Also fix some
existing bugs found by those tests.
parents e1b2de5f cee92518
This diff is collapsed.
include/rpl_init.inc [topology=1->2]
*** Test killing transaction waiting in commit for previous transaction to commit, when not using 2-phase commit ***
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
SET sql_log_bin=0;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
RETURN x;
END
||
SET sql_log_bin=1;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
SET sql_log_bin=0;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
IF d1 != '' THEN
SET debug_sync = d1;
END IF;
IF d2 != '' THEN
SET debug_sync = d2;
END IF;
RETURN x;
END
||
SET sql_log_bin=1;
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
SET binlog_format=statement;
INSERT INTO t3 VALUES (31, foo(31,
'ha_commit_one_phase WAIT_FOR t2_waiting',
'commit_one_phase_2 SIGNAL t1_ready WAIT_FOR t1_cont'));
SET debug_sync='now WAIT_FOR master_queued1';
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
SET binlog_format=statement;
BEGIN;
INSERT INTO t3 VALUES (32, foo(32,
'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
''));
INSERT INTO t3 VALUES (33, foo(33,
'wait_for_prior_commit_waiting SIGNAL t2_waiting',
'wait_for_prior_commit_killed SIGNAL t2_killed'));
COMMIT;
SET debug_sync='now WAIT_FOR master_queued2';
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
SET binlog_format=statement;
INSERT INTO t3 VALUES (34, foo(34,
'',
''));
SET debug_sync='now WAIT_FOR master_queued3';
SET debug_sync='now SIGNAL master_cont1';
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
a b
31 31
32 32
33 33
34 34
SET sql_log_bin=0;
CALL mtr.add_suppression("Query execution was interrupted");
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
CALL mtr.add_suppression("Slave: Connection was killed");
SET sql_log_bin=1;
SET debug_sync='now WAIT_FOR t2_query';
SET debug_sync='now SIGNAL t2_cont';
SET debug_sync='now WAIT_FOR t1_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR t2_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
a b
31 31
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
DROP FUNCTION foo;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
RETURN x;
END
||
SET sql_log_bin=1;
INSERT INTO t3 VALUES (39,0);
include/start_slave.inc
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
a b
31 31
32 32
33 33
34 34
39 0
SET sql_log_bin=0;
DROP FUNCTION foo;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
IF d1 != '' THEN
SET debug_sync = d1;
END IF;
IF d2 != '' THEN
SET debug_sync = d2;
END IF;
RETURN x;
END
||
SET sql_log_bin=1;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
DROP function foo;
DROP TABLE t3;
include/rpl_end.inc
This diff is collapsed.
--source include/have_innodb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
--source include/have_binlog_format_statement.inc
--let $rpl_topology=1->2
--source include/rpl_init.inc
--echo *** Test killing transaction waiting in commit for previous transaction to commit, when not using 2-phase commit ***
--connection server_2
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=slave_pos;
--source include/start_slave.inc
--connection server_1
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
# Use a stored function to inject a debug_sync into the appropriate THD.
# The function does nothing on the master, and on the slave it injects the
# desired debug_sync action(s).
SET sql_log_bin=0;
--delimiter ||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
RETURN x;
END
||
--delimiter ;
SET sql_log_bin=1;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
--save_master_pos
--connection server_2
SET sql_log_bin=0;
--delimiter ||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
IF d1 != '' THEN
SET debug_sync = d1;
END IF;
IF d2 != '' THEN
SET debug_sync = d2;
END IF;
RETURN x;
END
||
--delimiter ;
SET sql_log_bin=1;
--sync_with_master
# Set up three transactions on the master that will be group-committed
# together so they can be replicated in parallel on the slave.
--connect (con_temp3,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
SET binlog_format=statement;
send INSERT INTO t3 VALUES (31, foo(31,
'ha_commit_one_phase WAIT_FOR t2_waiting',
'commit_one_phase_2 SIGNAL t1_ready WAIT_FOR t1_cont'));
--connection server_1
SET debug_sync='now WAIT_FOR master_queued1';
--connect (con_temp4,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
SET binlog_format=statement;
BEGIN;
# This insert is just so we can get T2 to wait while a query is running that we
# can see in SHOW PROCESSLIST so we can get its thread_id to kill later.
INSERT INTO t3 VALUES (32, foo(32,
'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
''));
# This insert sets up debug_sync points so that T2 will tell when it is at its
# wait point where we want to kill it - and when it has been killed.
INSERT INTO t3 VALUES (33, foo(33,
'wait_for_prior_commit_waiting SIGNAL t2_waiting',
'wait_for_prior_commit_killed SIGNAL t2_killed'));
send COMMIT;
--connection server_1
SET debug_sync='now WAIT_FOR master_queued2';
--connect (con_temp5,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
SET binlog_format=statement;
send INSERT INTO t3 VALUES (34, foo(34,
'',
''));
--connection server_1
SET debug_sync='now WAIT_FOR master_queued3';
SET debug_sync='now SIGNAL master_cont1';
--connection con_temp3
REAP;
--connection con_temp4
REAP;
--connection con_temp5
REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
--connection server_2
SET sql_log_bin=0;
CALL mtr.add_suppression("Query execution was interrupted");
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
CALL mtr.add_suppression("Slave: Connection was killed");
SET sql_log_bin=1;
# Wait until T2 is inside executing its insert of 32, then find it in SHOW
# PROCESSLIST to know its thread id for KILL later.
SET debug_sync='now WAIT_FOR t2_query';
--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(32%' AND INFO NOT LIKE '%LIKE%'`
SET debug_sync='now SIGNAL t2_cont';
# Wait until T2 has entered its wait for T1 to commit, and T1 has
# progressed into its commit phase.
SET debug_sync='now WAIT_FOR t1_ready';
# Now kill the transaction T2.
--replace_result $thd_id THD_ID
eval KILL $thd_id;
# Wait until T2 has reacted on the kill.
SET debug_sync='now WAIT_FOR t2_killed';
# Now we can allow T1 to proceed.
SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1927,1963
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
DROP FUNCTION foo;
--delimiter ||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
RETURN x;
END
||
--delimiter ;
SET sql_log_bin=1;
--connection server_1
INSERT INTO t3 VALUES (39,0);
--save_master_pos
--connection server_2
--source include/start_slave.inc
--sync_with_master
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
# Restore the foo() function.
SET sql_log_bin=0;
DROP FUNCTION foo;
--delimiter ||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
IF d1 != '' THEN
SET debug_sync = d1;
END IF;
IF d2 != '' THEN
SET debug_sync = d2;
END IF;
RETURN x;
END
||
--delimiter ;
SET sql_log_bin=1;
--connection server_2
# Respawn all worker threads to clear any left-over debug_sync or other stuff.
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
--source include/start_slave.inc
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc
--connection server_1
DROP function foo;
DROP TABLE t3;
--source include/rpl_end.inc
...@@ -1300,7 +1300,10 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1300,7 +1300,10 @@ int ha_commit_trans(THD *thd, bool all)
{ {
/* Free resources and perform other cleanup even for 'empty' transactions. */ /* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans) if (is_real_trans)
{
thd->transaction.cleanup(); thd->transaction.cleanup();
thd->wakeup_subsequent_commits(error);
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1334,6 +1337,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1334,6 +1337,7 @@ int ha_commit_trans(THD *thd, bool all)
thd->variables.lock_wait_timeout)) thd->variables.lock_wait_timeout))
{ {
ha_rollback_trans(thd, all); ha_rollback_trans(thd, all);
thd->wakeup_subsequent_commits(1);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -1421,6 +1425,7 @@ done: ...@@ -1421,6 +1425,7 @@ done:
err: err:
error= 1; /* Transaction was rolled back */ error= 1; /* Transaction was rolled back */
ha_rollback_trans(thd, all); ha_rollback_trans(thd, all);
thd->wakeup_subsequent_commits(error);
end: end:
if (rw_trans && mdl_request.ticket) if (rw_trans && mdl_request.ticket)
...@@ -1466,8 +1471,12 @@ int ha_commit_one_phase(THD *thd, bool all) ...@@ -1466,8 +1471,12 @@ int ha_commit_one_phase(THD *thd, bool all)
bool is_real_trans=all || thd->transaction.all.ha_list == 0; bool is_real_trans=all || thd->transaction.all.ha_list == 0;
int res; int res;
DBUG_ENTER("ha_commit_one_phase"); DBUG_ENTER("ha_commit_one_phase");
if (is_real_trans && (res= thd->wait_for_prior_commit())) if (is_real_trans)
{
DEBUG_SYNC(thd, "ha_commit_one_phase");
if ((res= thd->wait_for_prior_commit()))
DBUG_RETURN(res); DBUG_RETURN(res);
}
res= commit_one_phase_2(thd, all, trans, is_real_trans); res= commit_one_phase_2(thd, all, trans, is_real_trans);
DBUG_RETURN(res); DBUG_RETURN(res);
} }
...@@ -1479,6 +1488,8 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) ...@@ -1479,6 +1488,8 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
int error= 0; int error= 0;
Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
DBUG_ENTER("commit_one_phase_2"); DBUG_ENTER("commit_one_phase_2");
if (is_real_trans)
DEBUG_SYNC(thd, "commit_one_phase_2");
if (ha_info) if (ha_info)
{ {
for (; ha_info; ha_info= ha_info_next) for (; ha_info; ha_info= ha_info_next)
...@@ -1591,10 +1602,7 @@ int ha_rollback_trans(THD *thd, bool all) ...@@ -1591,10 +1602,7 @@ int ha_rollback_trans(THD *thd, bool all)
/* Always cleanup. Even if nht==0. There may be savepoints. */ /* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans) if (is_real_trans)
{
thd->wakeup_subsequent_commits(error);
thd->transaction.cleanup(); thd->transaction.cleanup();
}
if (all) if (all)
thd->transaction_rollback_request= FALSE; thd->transaction_rollback_request= FALSE;
......
...@@ -6612,16 +6612,17 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, ...@@ -6612,16 +6612,17 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
to commit. If so, we add those to the queue as well, transitively for all to commit. If so, we add those to the queue as well, transitively for all
waiters. waiters.
@retval TRUE If queued as the first entry in the queue (meaning this @retval < 0 Error
@retval > 0 If queued as the first entry in the queue (meaning this
is the leader) is the leader)
@retval FALSE Otherwise @retval 0 Otherwise (queued as participant, leader handles the commit)
*/ */
bool int
MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
{ {
group_commit_entry *entry, *orig_queue; group_commit_entry *entry, *orig_queue;
wait_for_commit *list, *cur, *last; wait_for_commit *cur, *last;
wait_for_commit *wfc; wait_for_commit *wfc;
DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit"); DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit");
...@@ -6641,6 +6642,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6641,6 +6642,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
/* Do an extra check here, this time safely under lock. */ /* Do an extra check here, this time safely under lock. */
if (wfc->waiting_for_commit) if (wfc->waiting_for_commit)
{ {
const char *old_msg;
/* /*
By setting wfc->opaque_pointer to our own entry, we mark that we are By setting wfc->opaque_pointer to our own entry, we mark that we are
ready to commit, but waiting for another transaction to commit before ready to commit, but waiting for another transaction to commit before
...@@ -6651,16 +6653,58 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6651,16 +6653,58 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
queued_by_other flag is set. queued_by_other flag is set.
*/ */
wfc->opaque_pointer= orig_entry; wfc->opaque_pointer= orig_entry;
old_msg=
orig_entry->thd->enter_cond(&wfc->COND_wait_commit,
&wfc->LOCK_wait_commit,
"Waiting for prior transaction to commit");
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior"); DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
do while (wfc->waiting_for_commit && !orig_entry->thd->check_killed())
{
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
} while (wfc->waiting_for_commit);
wfc->opaque_pointer= NULL; wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d", DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
orig_entry->queued_by_other)); orig_entry->queued_by_other));
if (wfc->waiting_for_commit)
{
/* Wait terminated due to kill. */
wait_for_commit *loc_waitee= wfc->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running ||
orig_entry->queued_by_other)
{
/* Our waitee is already waking us up, so ignore the kill. */
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
do
{
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
} while (wfc->waiting_for_commit);
} }
else
{
/* We were killed, so remove us from the list of waitee. */
wfc->remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
orig_entry->thd->exit_cond(old_msg);
/* Interrupted by kill. */
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior_killed");
wfc->wakeup_error= orig_entry->thd->killed_errno();
if (wfc->wakeup_error)
wfc->wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0));
DBUG_RETURN(-1);
}
}
orig_entry->thd->exit_cond(old_msg);
}
else
mysql_mutex_unlock(&wfc->LOCK_wait_commit); mysql_mutex_unlock(&wfc->LOCK_wait_commit);
if (wfc->wakeup_error)
{
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
DBUG_RETURN(-1);
}
} }
/* /*
...@@ -6669,7 +6713,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6669,7 +6713,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
then there is nothing else to do. then there is nothing else to do.
*/ */
if (orig_entry->queued_by_other) if (orig_entry->queued_by_other)
DBUG_RETURN(false); DBUG_RETURN(0);
/* Now enqueue ourselves in the group commit queue. */ /* Now enqueue ourselves in the group commit queue. */
DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue"); DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue");
...@@ -6707,9 +6751,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6707,9 +6751,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
used by the caller or any other function. used by the caller or any other function.
*/ */
list= wfc; cur= wfc;
cur= list; last= wfc;
last= list;
entry= orig_entry; entry= orig_entry;
for (;;) for (;;)
{ {
...@@ -6735,11 +6778,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6735,11 +6778,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
*/ */
if (cur->subsequent_commits_list) if (cur->subsequent_commits_list)
{ {
bool have_lock;
wait_for_commit *waiter; wait_for_commit *waiter;
wait_for_commit *wakeup_list= NULL;
wait_for_commit **wakeup_next_ptr= &wakeup_list;
mysql_mutex_lock(&cur->LOCK_wait_commit); mysql_mutex_lock(&cur->LOCK_wait_commit);
have_lock= true;
/* /*
Grab the list, now safely under lock, and process it if still Grab the list, now safely under lock, and process it if still
non-empty. non-empty.
...@@ -6780,18 +6823,68 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6780,18 +6823,68 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
For this, we need to set the "wakeup running" flag and release For this, we need to set the "wakeup running" flag and release
the waitee lock to avoid a deadlock, see comments on the waitee lock to avoid a deadlock, see comments on
THD::wakeup_subsequent_commits2() for details. THD::wakeup_subsequent_commits2() for details.
So we need to put these on a list and delay the wakeup until we
have released the lock.
*/ */
if (have_lock) *wakeup_next_ptr= waiter;
wakeup_next_ptr= &waiter->next_subsequent_commit;
}
waiter= next;
}
if (wakeup_list)
{ {
have_lock= false; /* Now release our lock and do the wakeups that were delayed above. */
cur->wakeup_subsequent_commits_running= true; cur->wakeup_subsequent_commits_running= true;
mysql_mutex_unlock(&cur->LOCK_wait_commit); mysql_mutex_unlock(&cur->LOCK_wait_commit);
for (;;)
{
wait_for_commit *next;
/*
ToDo: We wakeup the waiter here, so that it can have the chance to
reach its own commit state and queue up for this same group commit,
if it is still pending.
One problem with this is that if the waiter does not reach its own
commit state before this group commit starts, and then the group
commit fails (binlog write failure), we do not get to propagate
the error to the waiter.
A solution for this could be to delay the wakeup until commit is
successful. But then we need to set a flag in the waitee that it is
already queued for group commit, so that the waiter can check this
flag and queue itself if it _does_ reach the commit state in time.
(But error handling in case of binlog write failure is currently
broken in other ways, as well).
*/
if (&wakeup_list->next_subsequent_commit == wakeup_next_ptr)
{
/* The last one in the list. */
wakeup_list->wakeup(0);
break;
} }
waiter->wakeup(0); /*
Important: don't access wakeup_list->next after the wakeup() call,
it may be invalidated by the other thread.
*/
next= wakeup_list->next_subsequent_commit;
wakeup_list->wakeup(0);
wakeup_list= next;
} }
waiter= next; /*
We need a full memory barrier between walking the list and clearing
the flag wakeup_subsequent_commits_running. This barrier is needed
to ensure that no other thread will start to modify the list
pointers before we are done traversing the list.
But wait_for_commit::wakeup(), which was called above, does a full
memory barrier already (it locks a mutex).
*/
cur->wakeup_subsequent_commits_running= false;
} }
if (have_lock) else
mysql_mutex_unlock(&cur->LOCK_wait_commit); mysql_mutex_unlock(&cur->LOCK_wait_commit);
} }
if (cur == last) if (cur == last)
...@@ -6805,29 +6898,6 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6805,29 +6898,6 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
DBUG_ASSERT(entry != NULL); DBUG_ASSERT(entry != NULL);
} }
/*
Now we need to clear the wakeup_subsequent_commits_running flags.
We need a full memory barrier between walking the list above, and clearing
the flag wakeup_subsequent_commits_running below. This barrier is needed
to ensure that no other thread will start to modify the list pointers
before we are done traversing the list.
But wait_for_commit::wakeup(), which was called above for any other thread
that might modify the list in parallel, does a full memory barrier already
(it locks a mutex).
*/
if (list)
{
for (;;)
{
list->wakeup_subsequent_commits_running= false;
if (list == last)
break;
list= list->next_subsequent_commit;
}
}
if (opt_binlog_commit_wait_count > 0) if (opt_binlog_commit_wait_count > 0)
mysql_cond_signal(&COND_prepare_ordered); mysql_cond_signal(&COND_prepare_ordered);
mysql_mutex_unlock(&LOCK_prepare_ordered); mysql_mutex_unlock(&LOCK_prepare_ordered);
...@@ -6841,13 +6911,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6841,13 +6911,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
bool bool
MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{ {
bool is_leader= queue_for_group_commit(entry); int is_leader= queue_for_group_commit(entry);
/* /*
The first in the queue handles group commit for all; the others just wait The first in the queue handles group commit for all; the others just wait
to be signalled when group commit is done. to be signalled when group commit is done.
*/ */
if (is_leader) if (is_leader < 0)
return true; /* Error */
else if (is_leader)
trx_group_commit_leader(entry); trx_group_commit_leader(entry);
else if (!entry->queued_by_other) else if (!entry->queued_by_other)
entry->thd->wait_for_wakeup_ready(); entry->thd->wait_for_wakeup_ready();
......
...@@ -540,7 +540,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -540,7 +540,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
void do_checkpoint_request(ulong binlog_id); void do_checkpoint_request(ulong binlog_id);
void purge(); void purge();
int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id); int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
bool queue_for_group_commit(group_commit_entry *entry); int queue_for_group_commit(group_commit_entry *entry);
bool write_transaction_to_binlog_events(group_commit_entry *entry); bool write_transaction_to_binlog_events(group_commit_entry *entry);
void trx_group_commit_leader(group_commit_entry *leader); void trx_group_commit_leader(group_commit_entry *leader);
bool is_xidlist_idle_nolock(); bool is_xidlist_idle_nolock();
......
...@@ -7158,7 +7158,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -7158,7 +7158,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
res= trans_commit(thd); /* Automatically rolls back on error. */ res= trans_commit(thd); /* Automatically rolls back on error. */
thd->mdl_context.release_transactional_locks(); thd->mdl_context.release_transactional_locks();
if (sub_id) if (!res && sub_id)
rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid); rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
/* /*
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "rpl_parallel.h" #include "rpl_parallel.h"
#include "slave.h" #include "slave.h"
#include "rpl_mi.h" #include "rpl_mi.h"
#include "debug_sync.h"
/* /*
...@@ -24,7 +25,7 @@ static int ...@@ -24,7 +25,7 @@ static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev, rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt) struct rpl_parallel_thread *rpt)
{ {
int err __attribute__((unused)); int err;
rpl_group_info *rgi= qev->rgi; rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli; Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd; THD *thd= rgi->thd;
...@@ -142,7 +143,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id, ...@@ -142,7 +143,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
if (err) if (err)
wfc->unregister_wait_for_prior_commit(); wfc->unregister_wait_for_prior_commit();
else else
wfc->wait_for_prior_commit(); err= wfc->wait_for_prior_commit(thd);
thd->wait_for_commit_ptr= NULL; thd->wait_for_commit_ptr= NULL;
/* /*
...@@ -172,6 +173,18 @@ finish_event_group(THD *thd, int err, uint64 sub_id, ...@@ -172,6 +173,18 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
} }
static void
signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
{
rgi->is_error= true;
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
rgi->rli->relay_log.signal_update();
}
pthread_handler_t pthread_handler_t
handle_rpl_parallel_thread(void *arg) handle_rpl_parallel_thread(void *arg)
{ {
...@@ -207,6 +220,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -207,6 +220,7 @@ handle_rpl_parallel_thread(void *arg)
thd->variables.log_slow_filter= global_system_variables.log_slow_filter; thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
set_slave_thread_options(thd); set_slave_thread_options(thd);
thd->client_capabilities = CLIENT_LOCAL_FILES; thd->client_capabilities = CLIENT_LOCAL_FILES;
thd->net.reading_or_writing= 0;
thd_proc_info(thd, "Waiting for work from main SQL threads"); thd_proc_info(thd, "Waiting for work from main SQL threads");
thd->set_time(); thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT; thd->variables.lock_wait_timeout= LONG_TIMEOUT;
...@@ -285,20 +299,41 @@ handle_rpl_parallel_thread(void *arg) ...@@ -285,20 +299,41 @@ handle_rpl_parallel_thread(void *arg)
wait_start_sub_id= rgi->wait_start_sub_id; wait_start_sub_id= rgi->wait_start_sub_id;
if (wait_for_sub_id || wait_start_sub_id) if (wait_for_sub_id || wait_start_sub_id)
{ {
bool did_enter_cond= false;
const char *old_msg= NULL;
mysql_mutex_lock(&entry->LOCK_parallel_entry); mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (wait_start_sub_id) if (wait_start_sub_id)
{ {
while (wait_start_sub_id > entry->last_committed_sub_id) old_msg= thd->enter_cond(&entry->COND_parallel_entry,
&entry->LOCK_parallel_entry,
"Waiting for prior transaction to commit "
"before starting next transaction");
did_enter_cond= true;
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
while (wait_start_sub_id > entry->last_committed_sub_id &&
!thd->check_killed())
mysql_cond_wait(&entry->COND_parallel_entry, mysql_cond_wait(&entry->COND_parallel_entry,
&entry->LOCK_parallel_entry); &entry->LOCK_parallel_entry);
if (wait_start_sub_id > entry->last_committed_sub_id)
{
/* The thread got a kill signal. */
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->send_kill_message();
slave_output_error_info(rgi->rli, thd);
signal_error_to_sql_driver_thread(thd, rgi);
} }
rgi->wait_start_sub_id= 0; /* No need to check again. */ rgi->wait_start_sub_id= 0; /* No need to check again. */
}
if (wait_for_sub_id > entry->last_committed_sub_id) if (wait_for_sub_id > entry->last_committed_sub_id)
{ {
wait_for_commit *waitee= wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer; &rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee); rgi->commit_orderer.register_wait_for_prior_commit(waitee);
} }
if (did_enter_cond)
thd->exit_cond(old_msg);
else
mysql_mutex_unlock(&entry->LOCK_parallel_entry); mysql_mutex_unlock(&entry->LOCK_parallel_entry);
} }
...@@ -342,10 +377,8 @@ handle_rpl_parallel_thread(void *arg) ...@@ -342,10 +377,8 @@ handle_rpl_parallel_thread(void *arg)
if (err) if (err)
{ {
rgi->is_error= true;
slave_output_error_info(rgi->rli, thd); slave_output_error_info(rgi->rli, thd);
rgi->cleanup_context(thd, true); signal_error_to_sql_driver_thread(thd, rgi);
rgi->rli->abort_slave= true;
} }
if (end_of_group) if (end_of_group)
{ {
...@@ -354,6 +387,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -354,6 +387,7 @@ handle_rpl_parallel_thread(void *arg)
&rgi->commit_orderer); &rgi->commit_orderer);
delete rgi; delete rgi;
group_rgi= rgi= NULL; group_rgi= rgi= NULL;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
} }
events= next; events= next;
...@@ -384,11 +418,9 @@ handle_rpl_parallel_thread(void *arg) ...@@ -384,11 +418,9 @@ handle_rpl_parallel_thread(void *arg)
half-processed event group. half-processed event group.
*/ */
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
group_rgi->is_error= true;
finish_event_group(thd, 1, group_rgi->gtid_sub_id, finish_event_group(thd, 1, group_rgi->gtid_sub_id,
group_rgi->parallel_entry, &group_rgi->commit_orderer); group_rgi->parallel_entry, &group_rgi->commit_orderer);
group_rgi->cleanup_context(thd, true); signal_error_to_sql_driver_thread(thd, group_rgi);
group_rgi->rli->abort_slave= true;
in_event_group= false; in_event_group= false;
delete group_rgi; delete group_rgi;
group_rgi= NULL; group_rgi= NULL;
...@@ -753,6 +785,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -753,6 +785,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Relay_log_info *rli= serial_rgi->rli; Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ; enum Log_event_type typ;
bool is_group_event; bool is_group_event;
bool did_enter_cond= false;
const char *old_msg= NULL;
/* ToDo: what to do with this lock?!? */ /* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock); mysql_mutex_unlock(&rli->data_lock);
...@@ -767,6 +801,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -767,6 +801,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
sql_thread_stopping= true; sql_thread_stopping= true;
if (sql_thread_stopping) if (sql_thread_stopping)
{ {
delete ev;
/* QQ: Need a better comment why we return false here */ /* QQ: Need a better comment why we return false here */
return false; return false;
} }
...@@ -775,6 +810,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -775,6 +810,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
MYF(0)))) MYF(0))))
{ {
my_error(ER_OUT_OF_RESOURCES, MYF(0)); my_error(ER_OUT_OF_RESOURCES, MYF(0));
delete ev;
return true; return true;
} }
qev->ev= ev; qev->ev= ev;
...@@ -797,6 +833,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -797,6 +833,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{ {
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
delete rgi; delete rgi;
my_free(qev);
delete ev;
return true; return true;
} }
rgi->is_parallel_exec = true; rgi->is_parallel_exec = true;
...@@ -814,6 +852,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -814,6 +852,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
However, the commit of this event must wait for the commit of the prior However, the commit of this event must wait for the commit of the prior
event, to preserve binlog commit order and visibility across all event, to preserve binlog commit order and visibility across all
servers in the replication hierarchy. servers in the replication hierarchy.
In addition, we must not start executing this event until we have
finished the previous collection of event groups that group-committed
together; we use rgi->wait_start_sub_id to control this.
*/ */
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e); rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
rgi->wait_commit_sub_id= e->current_sub_id; rgi->wait_commit_sub_id= e->current_sub_id;
...@@ -860,6 +902,21 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -860,6 +902,21 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
} }
else if (cur_thread->queued_size <= opt_slave_parallel_max_queued) else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
break; // The thread is ready to queue into break; // The thread is ready to queue into
else if (rli->sql_driver_thd->check_killed())
{
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
my_error(ER_CONNECTION_KILLED, MYF(0));
delete rgi;
my_free(qev);
delete ev;
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
{
debug_sync_set_action(rli->sql_driver_thd,
STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
};);
slave_output_error_info(rli, rli->sql_driver_thd);
return true;
}
else else
{ {
/* /*
...@@ -867,6 +924,18 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -867,6 +924,18 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
use for queuing events, so wait for the thread to consume some use for queuing events, so wait for the thread to consume some
of its queue. of its queue.
*/ */
if (!did_enter_cond)
{
old_msg= rli->sql_driver_thd->enter_cond
(&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread,
"Waiting for room in worker thread event queue");
did_enter_cond= true;
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
{
debug_sync_set_action(rli->sql_driver_thd,
STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
};);
}
mysql_cond_wait(&cur_thread->COND_rpl_thread, mysql_cond_wait(&cur_thread->COND_rpl_thread,
&cur_thread->LOCK_rpl_thread); &cur_thread->LOCK_rpl_thread);
} }
...@@ -1016,6 +1085,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1016,6 +1085,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/ */
rli->event_relay_log_pos= rli->future_event_relay_log_pos; rli->event_relay_log_pos= rli->future_event_relay_log_pos;
cur_thread->enqueue(qev); cur_thread->enqueue(qev);
if (did_enter_cond)
rli->sql_driver_thd->exit_cond(old_msg);
else
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
mysql_cond_signal(&cur_thread->COND_rpl_thread); mysql_cond_signal(&cur_thread->COND_rpl_thread);
......
...@@ -886,11 +886,11 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, ...@@ -886,11 +886,11 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
int cmp= strcmp(group_relay_log_name, event_relay_log_name); int cmp= strcmp(group_relay_log_name, event_relay_log_name);
if (cmp < 0) if (cmp < 0)
{ {
group_relay_log_pos= event_relay_log_pos; group_relay_log_pos= rgi->future_event_relay_log_pos;
strmake_buf(group_relay_log_name, event_relay_log_name); strmake_buf(group_relay_log_name, event_relay_log_name);
notify_group_relay_log_name_update(); notify_group_relay_log_name_update();
} else if (cmp == 0 && group_relay_log_pos < event_relay_log_pos) } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
group_relay_log_pos= event_relay_log_pos; group_relay_log_pos= rgi->future_event_relay_log_pos;
/* /*
In the parallel case we need to update the master_log_name here, rather In the parallel case we need to update the master_log_name here, rather
......
...@@ -6181,6 +6181,17 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) ...@@ -6181,6 +6181,17 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
DBUG_RETURN(ev); DBUG_RETURN(ev);
} }
/*
We have to check sql_slave_killed() here an extra time.
Otherwise we may miss a wakeup, since last check was done
without holding LOCK_log.
*/
if (sql_slave_killed(rgi))
{
mysql_mutex_unlock(log_lock);
break;
}
/* /*
We can, and should release data_lock while we are waiting for We can, and should release data_lock while we are waiting for
update. If we do not, show slave status will block update. If we do not, show slave status will block
......
...@@ -5783,12 +5783,53 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) ...@@ -5783,12 +5783,53 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
returns immediately. returns immediately.
*/ */
int int
wait_for_commit::wait_for_prior_commit2() wait_for_commit::wait_for_prior_commit2(THD *thd)
{ {
const char *old_msg;
wait_for_commit *loc_waitee;
mysql_mutex_lock(&LOCK_wait_commit); mysql_mutex_lock(&LOCK_wait_commit);
while (waiting_for_commit) DEBUG_SYNC(thd, "wait_for_prior_commit_waiting");
old_msg= thd->enter_cond(&COND_wait_commit, &LOCK_wait_commit,
"Waiting for prior transaction to commit");
while (waiting_for_commit && !thd->check_killed())
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
mysql_mutex_unlock(&LOCK_wait_commit); if (!waiting_for_commit)
{
if (wakeup_error)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
goto end;
}
/*
Wait was interrupted by kill. We need to unregister our wait and give the
error. But if a wakeup is already in progress, then we must ignore the
kill and not give error, otherwise we get inconsistency between waitee and
waiter as to whether we succeed or fail (eg. we may roll back but waitee
might attempt to commit both us and any subsequent commits waiting for us).
*/
loc_waitee= this->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
/* We are being woken up; ignore the kill and just wait. */
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
do
{
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
} while (waiting_for_commit);
goto end;
}
remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
wakeup_error= thd->killed_errno();
if (!wakeup_error)
wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wakeup_error, ER(wakeup_error), MYF(0));
end:
thd->exit_cond(old_msg);
waitee= NULL; waitee= NULL;
return wakeup_error; return wakeup_error;
} }
...@@ -5876,7 +5917,6 @@ wait_for_commit::unregister_wait_for_prior_commit2() ...@@ -5876,7 +5917,6 @@ wait_for_commit::unregister_wait_for_prior_commit2()
if (waiting_for_commit) if (waiting_for_commit)
{ {
wait_for_commit *loc_waitee= this->waitee; wait_for_commit *loc_waitee= this->waitee;
wait_for_commit **next_ptr_ptr, *cur;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running) if (loc_waitee->wakeup_subsequent_commits_running)
{ {
...@@ -5894,17 +5934,7 @@ wait_for_commit::unregister_wait_for_prior_commit2() ...@@ -5894,17 +5934,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
else else
{ {
/* Remove ourselves from the list in the waitee. */ /* Remove ourselves from the list in the waitee. */
next_ptr_ptr= &loc_waitee->subsequent_commits_list; remove_from_list(&loc_waitee->subsequent_commits_list);
while ((cur= *next_ptr_ptr) != NULL)
{
if (cur == this)
{
*next_ptr_ptr= this->next_subsequent_commit;
break;
}
next_ptr_ptr= &cur->next_subsequent_commit;
}
waiting_for_commit= false;
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
} }
} }
......
...@@ -1627,14 +1627,14 @@ struct wait_for_commit ...@@ -1627,14 +1627,14 @@ struct wait_for_commit
bool wakeup_subsequent_commits_running; bool wakeup_subsequent_commits_running;
void register_wait_for_prior_commit(wait_for_commit *waitee); void register_wait_for_prior_commit(wait_for_commit *waitee);
int wait_for_prior_commit() int wait_for_prior_commit(THD *thd)
{ {
/* /*
Quick inline check, to avoid function call and locking in the common case Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled. where no wakeup is registered, or a registered wait was already signalled.
*/ */
if (waiting_for_commit) if (waiting_for_commit)
return wait_for_prior_commit2(); return wait_for_prior_commit2(thd);
else else
return wakeup_error; return wakeup_error;
} }
...@@ -1660,10 +1660,29 @@ struct wait_for_commit ...@@ -1660,10 +1660,29 @@ struct wait_for_commit
if (waiting_for_commit) if (waiting_for_commit)
unregister_wait_for_prior_commit2(); unregister_wait_for_prior_commit2();
} }
/*
Remove a waiter from the list in the waitee. Used to unregister a wait.
The caller must be holding the locks of both waiter and waitee.
*/
void remove_from_list(wait_for_commit **next_ptr_ptr)
{
wait_for_commit *cur;
while ((cur= *next_ptr_ptr) != NULL)
{
if (cur == this)
{
*next_ptr_ptr= this->next_subsequent_commit;
break;
}
next_ptr_ptr= &cur->next_subsequent_commit;
}
waiting_for_commit= false;
}
void wakeup(int wakeup_error); void wakeup(int wakeup_error);
int wait_for_prior_commit2(); int wait_for_prior_commit2(THD *thd);
void wakeup_subsequent_commits2(int wakeup_error); void wakeup_subsequent_commits2(int wakeup_error);
void unregister_wait_for_prior_commit2(); void unregister_wait_for_prior_commit2();
...@@ -3334,12 +3353,7 @@ public: ...@@ -3334,12 +3353,7 @@ public:
int wait_for_prior_commit() int wait_for_prior_commit()
{ {
if (wait_for_commit_ptr) if (wait_for_commit_ptr)
{ return wait_for_commit_ptr->wait_for_prior_commit(this);
int err= wait_for_commit_ptr->wait_for_prior_commit();
if (err)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
return err;
}
return 0; return 0;
} }
void wakeup_subsequent_commits(int wakeup_error) void wakeup_subsequent_commits(int wakeup_error)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment