Commit 903f8dc7 authored by Kristian Nielsen's avatar Kristian Nielsen

Merge MDEV-8147 into 10.1

parents 5bd25a9c e5f1e841
include/master-slave.inc
[connection master]
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,inject_wakeup_subsequent_commits_sleep";
SET GLOBAL slave_parallel_threads=8;
*** MDEV-8147: Assertion `m_lock_type == 2' failed in handler::ha_close() during parallel replication ***
CREATE TABLE E (
pk INTEGER AUTO_INCREMENT,
col_int_nokey INTEGER /*! NULL */,
col_int_key INTEGER /*! NULL */,
col_date_key DATE /*! NULL */,
col_date_nokey DATE /*! NULL */,
col_time_key TIME /*! NULL */,
col_time_nokey TIME /*! NULL */,
col_datetime_key DATETIME /*! NULL */,
col_datetime_nokey DATETIME /*! NULL */,
col_varchar_key VARCHAR(1) /*! NULL */,
col_varchar_nokey VARCHAR(1) /*! NULL */,
PRIMARY KEY (pk),
KEY (col_int_key),
KEY (col_date_key),
KEY (col_time_key),
KEY (col_datetime_key),
KEY (col_varchar_key, col_int_key)
) ENGINE=InnoDB;
ALTER TABLE `E` PARTITION BY KEY() PARTITIONS 5;
ALTER TABLE `E` REMOVE PARTITIONING;
CREATE TABLE t1 (a INT PRIMARY KEY);
include/start_slave.inc
include/stop_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=8;
include/start_slave.inc
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
DROP TABLE `E`;
DROP TABLE t1;
include/rpl_end.inc
--source include/have_partition.inc
--source include/have_innodb.inc
--source include/have_debug.inc
--source include/master-slave.inc
--connection server_2
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
--source include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,inject_wakeup_subsequent_commits_sleep";
SET GLOBAL slave_parallel_threads=8;
--echo *** MDEV-8147: Assertion `m_lock_type == 2' failed in handler::ha_close() during parallel replication ***
--connection server_1
CREATE TABLE E (
pk INTEGER AUTO_INCREMENT,
col_int_nokey INTEGER /*! NULL */,
col_int_key INTEGER /*! NULL */,
col_date_key DATE /*! NULL */,
col_date_nokey DATE /*! NULL */,
col_time_key TIME /*! NULL */,
col_time_nokey TIME /*! NULL */,
col_datetime_key DATETIME /*! NULL */,
col_datetime_nokey DATETIME /*! NULL */,
col_varchar_key VARCHAR(1) /*! NULL */,
col_varchar_nokey VARCHAR(1) /*! NULL */,
PRIMARY KEY (pk),
KEY (col_int_key),
KEY (col_date_key),
KEY (col_time_key),
KEY (col_datetime_key),
KEY (col_varchar_key, col_int_key)
) ENGINE=InnoDB;
ALTER TABLE `E` PARTITION BY KEY() PARTITIONS 5;
ALTER TABLE `E` REMOVE PARTITIONING;
--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
wait
EOF
--shutdown_server 30
--source include/wait_until_disconnected.inc
--connection default
--source include/wait_until_disconnected.inc
--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
restart:
EOF
--enable_reconnect
--source include/wait_until_connected_again.inc
--connection server_1
--enable_reconnect
--source include/wait_until_connected_again.inc
CREATE TABLE t1 (a INT PRIMARY KEY);
--save_master_pos
--connection server_2
--source include/start_slave.inc
--sync_with_master
# Re-spawn worker threads to clear dbug injection.
--source include/stop_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=8;
--source include/start_slave.inc
# Clean up.
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc
--connection server_1
DROP TABLE `E`;
DROP TABLE t1;
--source include/rpl_end.inc
...@@ -9902,6 +9902,7 @@ PSI_stage_info stage_waiting_for_work_from_sql_thread= { 0, "Waiting for work fr ...@@ -9902,6 +9902,7 @@ PSI_stage_info stage_waiting_for_work_from_sql_thread= { 0, "Waiting for work fr
PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for prior transaction to commit", 0}; PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for prior transaction to commit", 0};
PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0}; PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0};
PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0}; PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0};
PSI_stage_info stage_waiting_for_workers_idle= { 0, "Waiting for worker threads to be idle", 0};
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0}; PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0}; PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0}; PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
......
...@@ -480,6 +480,7 @@ extern PSI_stage_info stage_waiting_for_work_from_sql_thread; ...@@ -480,6 +480,7 @@ extern PSI_stage_info stage_waiting_for_work_from_sql_thread;
extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit; extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit;
extern PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit; extern PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit;
extern PSI_stage_info stage_waiting_for_room_in_worker_thread; extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
extern PSI_stage_info stage_waiting_for_workers_idle;
extern PSI_stage_info stage_master_gtid_wait_primary; extern PSI_stage_info stage_master_gtid_wait_primary;
extern PSI_stage_info stage_master_gtid_wait; extern PSI_stage_info stage_master_gtid_wait;
extern PSI_stage_info stage_gtid_wait_other_connection; extern PSI_stage_info stage_gtid_wait_other_connection;
......
...@@ -167,6 +167,8 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, ...@@ -167,6 +167,8 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
done and also no longer need waiting for. done and also no longer need waiting for.
*/ */
entry->last_committed_sub_id= sub_id; entry->last_committed_sub_id= sub_id;
if (entry->need_sub_id_signal)
mysql_cond_broadcast(&entry->COND_parallel_entry);
/* Now free any GCOs in which all transactions have committed. */ /* Now free any GCOs in which all transactions have committed. */
group_commit_orderer *tmp_gco= rgi->gco; group_commit_orderer *tmp_gco= rgi->gco;
...@@ -1928,27 +1930,30 @@ rpl_parallel::wait_for_workers_idle(THD *thd) ...@@ -1928,27 +1930,30 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
max_i= domain_hash.records; max_i= domain_hash.records;
for (i= 0; i < max_i; ++i) for (i= 0; i < max_i; ++i)
{ {
bool active; PSI_stage_info old_stage;
wait_for_commit my_orderer;
struct rpl_parallel_entry *e; struct rpl_parallel_entry *e;
int err= 0;
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry); mysql_mutex_lock(&e->LOCK_parallel_entry);
if ((active= (e->current_sub_id > e->last_committed_sub_id))) e->need_sub_id_signal= true;
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
&stage_waiting_for_workers_idle, &old_stage);
while (e->current_sub_id > e->last_committed_sub_id)
{ {
wait_for_commit *waitee= &e->current_group_info->commit_orderer; if (thd->check_killed())
my_orderer.register_wait_for_prior_commit(waitee);
thd->wait_for_commit_ptr= &my_orderer;
}
mysql_mutex_unlock(&e->LOCK_parallel_entry);
if (active)
{ {
int err= my_orderer.wait_for_prior_commit(thd); thd->send_kill_message();
thd->wait_for_commit_ptr= NULL; err= 1;
break;
}
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
}
e->need_sub_id_signal= false;
thd->EXIT_COND(&old_stage);
if (err) if (err)
return err; return err;
} }
}
return 0; return 0;
} }
......
...@@ -250,6 +250,12 @@ struct rpl_parallel_entry { ...@@ -250,6 +250,12 @@ struct rpl_parallel_entry {
waiting for event groups to complete. waiting for event groups to complete.
*/ */
bool force_abort; bool force_abort;
/*
Set in wait_for_workers_idle() to show that it is waiting, so that
finish_event_group knows to signal it when last_committed_sub_id is
increased.
*/
bool need_sub_id_signal;
/* /*
At STOP SLAVE (force_abort=true), we do not want to process all events in At STOP SLAVE (force_abort=true), we do not want to process all events in
the queue (which could unnecessarily delay stop, if a lot of events happen the queue (which could unnecessarily delay stop, if a lot of events happen
......
...@@ -6961,6 +6961,7 @@ wait_for_commit::wakeup_subsequent_commits2(int wakeup_error) ...@@ -6961,6 +6961,7 @@ wait_for_commit::wakeup_subsequent_commits2(int wakeup_error)
a mutex), so no extra explicit barrier is needed here. a mutex), so no extra explicit barrier is needed here.
*/ */
wakeup_subsequent_commits_running= false; wakeup_subsequent_commits_running= false;
DBUG_EXECUTE_IF("inject_wakeup_subsequent_commits_sleep", my_sleep(21000););
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment