Commit 0b87de12 authored by Jonas Oreland's avatar Jonas Oreland Committed by Kristian Nielsen

MDEV-162 Enhanced semisync replication

Implement --semi-sync-master-wait-point=AFTER_SYNC|AFTER_COMMIT.

When AFTER_SYNC, the semi-sync wait will be done earlier, before the storage
engine commit rather than after. This means that a transaction will not be
visible on the master until at least one slave has received it.
parent 4d8b346e
......@@ -3,11 +3,11 @@ SELECT variable_value INTO @commits FROM information_schema.global_status
WHERE variable_name = 'binlog_commits';
SELECT variable_value INTO @group_commits FROM information_schema.global_status
WHERE variable_name = 'binlog_group_commits';
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group1_running WAIT_FOR group2_queued";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group1_running WAIT_FOR group2_queued";
INSERT INTO t1 VALUES ("con1");
set DEBUG_SYNC= "now WAIT_FOR group1_running";
SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL group2_con2";
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group2_running";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group2_running";
SET DEBUG_SYNC= "commit_after_release_LOCK_log WAIT_FOR group3_committed";
SET DEBUG_SYNC= "commit_after_group_run_commit_ordered SIGNAL group2_visible WAIT_FOR group2_checked";
INSERT INTO t1 VALUES ("con2");
......@@ -25,7 +25,7 @@ SET DEBUG_SYNC= "now SIGNAL group2_queued";
SELECT * FROM t1 ORDER BY a;
a
con1
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group3_con5";
SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con5_leader WAIT_FOR con6_queued";
set DEBUG_SYNC= "now WAIT_FOR group2_running";
INSERT INTO t1 VALUES ("con5");
......
......@@ -3,11 +3,11 @@ SELECT variable_value INTO @commits FROM information_schema.global_status
WHERE variable_name = 'binlog_commits';
SELECT variable_value INTO @group_commits FROM information_schema.global_status
WHERE variable_name = 'binlog_group_commits';
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group1_running WAIT_FOR group2_queued";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group1_running WAIT_FOR group2_queued";
INSERT INTO t1 VALUES ("con1");
set DEBUG_SYNC= "now WAIT_FOR group1_running";
SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL group2_con2";
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group2_running";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group2_running";
SET DEBUG_SYNC= "commit_after_release_LOCK_log WAIT_FOR group3_committed";
INSERT INTO t1 VALUES ("con2");
SET DEBUG_SYNC= "now WAIT_FOR group2_con2";
......@@ -25,7 +25,7 @@ SET DEBUG_SYNC= "now SIGNAL group2_queued";
SELECT * FROM t1 ORDER BY a;
a
con1
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group3_con5";
SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con5_leader WAIT_FOR con6_queued";
set DEBUG_SYNC= "now WAIT_FOR group2_running";
INSERT INTO t1 VALUES ("con5");
......
......@@ -27,7 +27,7 @@ connect(con6,localhost,root,,);
# group2 to queue up before finishing.
connection con1;
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group1_running WAIT_FOR group2_queued";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group1_running WAIT_FOR group2_queued";
send INSERT INTO t1 VALUES ("con1");
# Make group2 (with three threads) queue up.
......@@ -37,7 +37,7 @@ send INSERT INTO t1 VALUES ("con1");
connection con2;
set DEBUG_SYNC= "now WAIT_FOR group1_running";
SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL group2_con2";
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group2_running";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group2_running";
SET DEBUG_SYNC= "commit_after_release_LOCK_log WAIT_FOR group3_committed";
SET DEBUG_SYNC= "commit_after_group_run_commit_ordered SIGNAL group2_visible WAIT_FOR group2_checked";
send INSERT INTO t1 VALUES ("con2");
......@@ -69,7 +69,7 @@ connection default;
SELECT * FROM t1 ORDER BY a;
connection con5;
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group3_con5";
SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con5_leader WAIT_FOR con6_queued";
set DEBUG_SYNC= "now WAIT_FOR group2_running";
send INSERT INTO t1 VALUES ("con5");
......
......@@ -27,7 +27,7 @@ connect(con6,localhost,root,,);
# group2 to queue up before finishing.
connection con1;
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group1_running WAIT_FOR group2_queued";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group1_running WAIT_FOR group2_queued";
send INSERT INTO t1 VALUES ("con1");
# Make group2 (with three threads) queue up.
......@@ -37,7 +37,7 @@ send INSERT INTO t1 VALUES ("con1");
connection con2;
set DEBUG_SYNC= "now WAIT_FOR group1_running";
SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL group2_con2";
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group2_running";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group2_running";
SET DEBUG_SYNC= "commit_after_release_LOCK_log WAIT_FOR group3_committed";
send INSERT INTO t1 VALUES ("con2");
connection con3;
......@@ -69,7 +69,7 @@ connection default;
SELECT * FROM t1 ORDER BY a;
connection con5;
SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5";
SET DEBUG_SYNC= "commit_before_get_LOCK_after_binlog_sync SIGNAL group3_con5";
SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con5_leader WAIT_FOR con6_queued";
set DEBUG_SYNC= "now WAIT_FOR group2_running";
send INSERT INTO t1 VALUES ("con5");
......
......@@ -11,9 +11,9 @@ wait/synch/mutex/sql/gtid_waiting::LOCK_gtid_waiting YES YES
wait/synch/mutex/sql/hash_filo::lock YES YES
wait/synch/mutex/sql/HA_DATA_PARTITION::LOCK_auto_inc YES YES
wait/synch/mutex/sql/LOCK_active_mi YES YES
wait/synch/mutex/sql/LOCK_after_binlog_sync YES YES
wait/synch/mutex/sql/LOCK_audit_mask YES YES
wait/synch/mutex/sql/LOCK_binlog_state YES YES
wait/synch/mutex/sql/LOCK_commit_ordered YES YES
select * from performance_schema.setup_instruments
where name like 'Wait/Synch/Rwlock/sql/%'
and name not in ('wait/synch/rwlock/sql/CRYPTO_dynlock_value::lock')
......
This diff is collapsed.
This diff is collapsed.
set global rpl_semi_sync_master_wait_point=AFTER_SYNC;
include/master-slave.inc
[connection master]
call mtr.add_suppression("Timeout waiting for reply of binlog");
call mtr.add_suppression("Semi-sync master .* waiting for slave reply");
call mtr.add_suppression("Read semi-sync reply");
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
call mtr.add_suppression("Master server does not support semi-sync");
call mtr.add_suppression("Semi-sync slave .* reply");
call mtr.add_suppression("Slave SQL.*Request to stop slave SQL Thread received while applying a group that has non-transactional changes; waiting for completion of the group");
set global rpl_semi_sync_master_enabled = 1;
include/stop_slave.inc
set global rpl_semi_sync_slave_enabled = 1;
include/start_slave.inc
SET GLOBAL event_scheduler = ON;
CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8)) ENGINE=ENGINE_TYPE;
INSERT INTO t1 (f) VALUES ('a'),('a'),('a'),('a'),('a');
INSERT INTO t1 SELECT i+5, f FROM t1;
INSERT INTO t1 SELECT i+10, f FROM t1;
CREATE EVENT ev1 ON SCHEDULE EVERY 1 SECOND
DO INSERT INTO t1 VALUES (SLEEP(5),CONCAT('ev1_',CONNECTION_ID()));
CREATE EVENT ev2 ON SCHEDULE EVERY 1 SECOND
DO INSERT INTO t1 VALUES (SLEEP(5),CONCAT('ev2_',CONNECTION_ID()));
STOP SLAVE IO_THREAD;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 20;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 19;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 18;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 17;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 16;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 15;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 14;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 13;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 12;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 11;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 10;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 9;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 8;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 7;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 6;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 5;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 4;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 3;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 2;
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 1;
SET GLOBAL event_scheduler = OFF;
include/stop_slave.inc
set global rpl_semi_sync_slave_enabled = 0;
set global rpl_semi_sync_master_enabled = 0;
include/start_slave.inc
DROP EVENT ev1;
DROP EVENT ev2;
DROP TABLE t1;
include/rpl_end.inc
set global rpl_semi_sync_master_wait_point=default;
#
# Preparation
#
CREATE TABLE t1 (i INT NOT NULL, PRIMARY KEY (i)) ENGINE=InnoDB;
RESET MASTER;
SET @@global.rpl_semi_sync_master_timeout = 60000;
SET @@global.rpl_semi_sync_master_wait_no_slave = 1;
# It's okay to see "Killed" but we should not see "Timeout" in the log.
call mtr.add_suppression("Killed waiting for reply of binlog");
call mtr.add_suppression("Run function 'after_commit' in plugin 'rpl_semi_sync_master' failed");
call mtr.add_suppression("Run function 'after_sync' in plugin 'rpl_semi_sync_master' failed");
#
# Test wait point = AFTER_COMMIT
#
SET @@global.rpl_semi_sync_master_wait_point = AFTER_COMMIT;
# Make another connection to INSERT from.
SET GLOBAL rpl_semi_sync_master_enabled = 1;
# Go ahead and send the INSERT; it should block.
INSERT INTO t1 (i) VALUES (1);
# The INSERT thread should now be waiting.
SELECT state AS should_be_waiting
FROM information_schema.processlist WHERE id = @other_connection_id;
should_be_waiting
Waiting for semi-sync ACK from slave
# The insert should be visible to other threads
SELECT * FROM t1 ORDER BY 1;
i
1
# Kill the waiting thread; it should die immediately.
KILL @other_connection_id;
# Collect the error from the INSERT thread; it should be disconnected.
Got one of the listed errors
# Wait for INSERT thread to actually disappear (KILL closes connection
# before thread actually finishes its processing).
# The INSERT thread should now be gone.
SELECT state AS should_be_empty_set
FROM information_schema.processlist WHERE id = @other_connection_id;
should_be_empty_set
# The insert is still there
SELECT * FROM t1 ORDER BY 1;
i
1
# Make another connection to INSERT from.
# Go ahead and send the INSERT; it should block.
INSERT INTO t1 (i) VALUES (2);
# The INSERT thread should now be waiting.
SELECT state AS should_be_waiting
FROM information_schema.processlist WHERE id = @other_connection_id;
should_be_waiting
Waiting for semi-sync ACK from slave
# The insert should be visible to other threads
SELECT * FROM t1 ORDER BY 1;
i
1
2
# Now restart server
# Done restarting server
# Reset setting that were lost in restart
SET @@global.rpl_semi_sync_master_timeout = 60000;
SET @@global.rpl_semi_sync_master_wait_no_slave = 1;
# Check that row is still there
SELECT * FROM t1 ORDER BY 1;
i
1
2
#
# Test wait point = AFTER_SYNC
#
SET @@global.rpl_semi_sync_master_wait_point = AFTER_SYNC;
# Make another connection to INSERT from.
SET GLOBAL rpl_semi_sync_master_enabled = 1;
# Go ahead and send the INSERT; it should block.
INSERT INTO t1 (i) VALUES (3);
# The INSERT thread should now be waiting.
SELECT state AS should_be_waiting
FROM information_schema.processlist WHERE id = @other_connection_id;
should_be_waiting
Waiting for semi-sync ACK from slave
# The insert should NOT be visible to other threads
SELECT * FROM t1 ORDER BY 1;
i
1
2
# Kill the waiting thread; it should die immediately.
KILL @other_connection_id;
# Collect the error from the INSERT thread; it should be disconnected.
Got one of the listed errors
# Wait for INSERT thread to actually disappear (KILL closes connection
# before thread actually finishes its processing).
# The INSERT thread should now be gone.
SELECT state AS should_be_empty_set
FROM information_schema.processlist WHERE id = @other_connection_id;
should_be_empty_set
# The row inserted is there
SELECT * FROM t1 ORDER BY 1;
i
1
2
3
# Make another connection to INSERT from.
# Go ahead and send the INSERT; it should block.
INSERT INTO t1 (i) VALUES (4);
# The INSERT thread should now be waiting.
SELECT state AS should_be_waiting
FROM information_schema.processlist WHERE id = @other_connection_id;
should_be_waiting
Waiting for semi-sync ACK from slave
# The insert should NOT be visible to other threads
SELECT * FROM t1 ORDER BY 1;
i
1
2
3
# Now restart server
# Done restarting server
# Reset setting that were lost in restart
SET @@global.rpl_semi_sync_master_timeout = 60000;
SET @@global.rpl_semi_sync_master_wait_no_slave = 1;
# But the row inserted is there
SELECT * FROM t1 ORDER BY 1;
i
1
2
3
4
#
# Cleanup
#
SET GLOBAL rpl_semi_sync_master_enabled = 0;
DROP TABLE t1;
SET @@global.rpl_semi_sync_master_timeout = 10000;
SET @@global.rpl_semi_sync_master_wait_no_slave = 1;
SET @@global.rpl_semi_sync_master_wait_point = AFTER_COMMIT;
......@@ -59,7 +59,6 @@ show variables like 'rpl_semi_sync_master_enabled';
echo [ status of semi-sync on master should be ON even without any semi-sync slaves ];
show status like 'Rpl_semi_sync_master_clients';
show status like 'Rpl_semi_sync_master_status';
--replace_result 305 304
show status like 'Rpl_semi_sync_master_yes_tx';
--echo #
......@@ -120,7 +119,6 @@ echo [ initial master state after the semi-sync slave connected ];
show status like 'Rpl_semi_sync_master_clients';
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
--replace_result 305 304
show status like 'Rpl_semi_sync_master_yes_tx';
replace_result $engine_type ENGINE_TYPE;
......@@ -129,7 +127,6 @@ eval create table t1(a int) engine = $engine_type;
echo [ master state after CREATE TABLE statement ];
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
--replace_result 305 304
show status like 'Rpl_semi_sync_master_yes_tx';
# After fix of BUG#45848, semi-sync slave should not create any extra
......@@ -153,7 +150,6 @@ insert t1 values (1);
echo [ master status after inserts ];
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
--replace_result 305 304
show status like 'Rpl_semi_sync_master_yes_tx';
sync_slave_with_master;
......@@ -302,14 +298,12 @@ source include/stop_slave.inc;
connection master;
echo [ Semi-sync master status variables before FLUSH STATUS ];
SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
--replace_result 306 305
SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
# Do not write the FLUSH STATUS to binlog, to make sure we'll get a
# clean status after this.
FLUSH NO_WRITE_TO_BINLOG STATUS;
echo [ Semi-sync master status variables after FLUSH STATUS ];
SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
--replace_result 306 305
SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
connection master;
......@@ -358,7 +352,6 @@ reset master;
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
--replace_result 306 305
show status like 'Rpl_semi_sync_master_yes_tx';
connection slave;
......@@ -410,7 +403,6 @@ echo [ on master ];
echo [ master semi-sync status should be ON ];
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
--replace_result 306 305
show status like 'Rpl_semi_sync_master_yes_tx';
--echo #
......@@ -460,7 +452,6 @@ echo [ master semi-sync should be ON ];
show status like 'Rpl_semi_sync_master_clients';
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
--replace_result 306 305
show status like 'Rpl_semi_sync_master_yes_tx';
insert into t1 values (4);
insert into t1 values (5);
......@@ -468,7 +459,6 @@ echo [ master semi-sync should be ON ];
show status like 'Rpl_semi_sync_master_clients';
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
--replace_result 306 305
show status like 'Rpl_semi_sync_master_yes_tx';
--echo #
......
--source include/have_binlog_format_statement.inc
set global rpl_semi_sync_master_wait_point=AFTER_SYNC;
source rpl_semi_sync.test;
set global rpl_semi_sync_master_wait_point=default;
--source include/have_binlog_format_row.inc
set global rpl_semi_sync_master_wait_point=AFTER_SYNC;
source rpl_semi_sync.test;
set global rpl_semi_sync_master_wait_point=default;
set global rpl_semi_sync_master_wait_point=AFTER_SYNC;
source rpl_semi_sync_event.test;
set global rpl_semi_sync_master_wait_point=default;
source include/have_semisync.inc;
source include/not_embedded.inc;
source include/have_innodb.inc;
#
# This test the rpl_semi_sync_master_wait_point functionality
# and illustrates the differences between the two values AFTER_COMMIT and
# AFTER_SYNC
#
--echo #
--echo # Preparation
--echo #
CREATE TABLE t1 (i INT NOT NULL, PRIMARY KEY (i)) ENGINE=InnoDB;
RESET MASTER;
let $save_timeout = `select @@global.rpl_semi_sync_master_timeout`;
let $save_wait_no_slave = `select @@global.rpl_semi_sync_master_wait_no_slave`;
let $save_wait_point = `select @@global.rpl_semi_sync_master_wait_point`;
SET @@global.rpl_semi_sync_master_timeout = 60000;
SET @@global.rpl_semi_sync_master_wait_no_slave = 1;
--echo # It's okay to see "Killed" but we should not see "Timeout" in the log.
call mtr.add_suppression("Killed waiting for reply of binlog");
call mtr.add_suppression("Run function 'after_commit' in plugin 'rpl_semi_sync_master' failed");
call mtr.add_suppression("Run function 'after_sync' in plugin 'rpl_semi_sync_master' failed");
--echo #
--echo # Test wait point = AFTER_COMMIT
--echo #
SET @@global.rpl_semi_sync_master_wait_point = AFTER_COMMIT;
--echo # Make another connection to INSERT from.
connect (other,localhost,root,,);
connection other;
let $other_connection_id = `SELECT CONNECTION_ID()`;
connection default;
--disable_query_log
eval SET @other_connection_id = $other_connection_id;
--enable_query_log
SET GLOBAL rpl_semi_sync_master_enabled = 1;
--echo # Go ahead and send the INSERT; it should block.
connection other;
send INSERT INTO t1 (i) VALUES (1);
connection default;
let $wait_condition =
SELECT COUNT(*) > 0 AS should_be_true
FROM information_schema.processlist
WHERE id = @other_connection_id
AND state = "Waiting for semi-sync ACK from slave";
--source include/wait_condition.inc
--echo # The INSERT thread should now be waiting.
SELECT state AS should_be_waiting
FROM information_schema.processlist WHERE id = @other_connection_id;
--echo # The insert should be visible to other threads
SELECT * FROM t1 ORDER BY 1;
--echo # Kill the waiting thread; it should die immediately.
KILL @other_connection_id;
--echo # Collect the error from the INSERT thread; it should be disconnected.
connection other;
--error 2013,ER_CONNECTION_KILLED
reap;
connection default;
--echo # Wait for INSERT thread to actually disappear (KILL closes connection
--echo # before thread actually finishes its processing).
let $wait_condition =
SELECT COUNT(*) = 0 AS should_be_true
FROM information_schema.processlist
WHERE id = @other_connection_id;
--source include/wait_condition.inc
--echo # The INSERT thread should now be gone.
SELECT state AS should_be_empty_set
FROM information_schema.processlist WHERE id = @other_connection_id;
--echo # The insert is still there
SELECT * FROM t1 ORDER BY 1;
connection default;
disconnect other;
--echo # Make another connection to INSERT from.
connect (other,localhost,root,,);
connection other;
let $other_connection_id = `SELECT CONNECTION_ID()`;
connection default;
--disable_query_log
eval SET @other_connection_id = $other_connection_id;
--enable_query_log
--echo # Go ahead and send the INSERT; it should block.
connection other;
send INSERT INTO t1 (i) VALUES (2);
connection default;
let $wait_condition =
SELECT COUNT(*) > 0 AS should_be_true
FROM information_schema.processlist
WHERE id = @other_connection_id
AND state = "Waiting for semi-sync ACK from slave";
--source include/wait_condition.inc
--echo # The INSERT thread should now be waiting.
SELECT state AS should_be_waiting
FROM information_schema.processlist WHERE id = @other_connection_id;
--echo # The insert should be visible to other threads
SELECT * FROM t1 ORDER BY 1;
--echo # Now restart server
--source include/restart_mysqld.inc
--echo # Done restarting server
--echo # Reset setting that were lost in restart
SET @@global.rpl_semi_sync_master_timeout = 60000;
SET @@global.rpl_semi_sync_master_wait_no_slave = 1;
--echo # Check that row is still there
SELECT * FROM t1 ORDER BY 1;
disconnect other;
--echo #
--echo # Test wait point = AFTER_SYNC
--echo #
SET @@global.rpl_semi_sync_master_wait_point = AFTER_SYNC;
--echo # Make another connection to INSERT from.
connect (other,localhost,root,,);
connection other;
let $other_connection_id = `SELECT CONNECTION_ID()`;
connection default;
--disable_query_log
eval SET @other_connection_id = $other_connection_id;
--enable_query_log
SET GLOBAL rpl_semi_sync_master_enabled = 1;
--echo # Go ahead and send the INSERT; it should block.
connection other;
send INSERT INTO t1 (i) VALUES (3);
connection default;
let $wait_condition =
SELECT COUNT(*) > 0 AS should_be_true
FROM information_schema.processlist
WHERE id = @other_connection_id
AND state = "Waiting for semi-sync ACK from slave";
--source include/wait_condition.inc
--echo # The INSERT thread should now be waiting.
SELECT state AS should_be_waiting
FROM information_schema.processlist WHERE id = @other_connection_id;
--echo # The insert should NOT be visible to other threads
SELECT * FROM t1 ORDER BY 1;
--echo # Kill the waiting thread; it should die immediately.
KILL @other_connection_id;
--echo # Collect the error from the INSERT thread; it should be disconnected.
connection other;
--error 2013,ER_CONNECTION_KILLED
reap;
connection default;
--echo # Wait for INSERT thread to actually disappear (KILL closes connection
--echo # before thread actually finishes its processing).
let $wait_condition =
SELECT COUNT(*) = 0 AS should_be_true
FROM information_schema.processlist
WHERE id = @other_connection_id;
--source include/wait_condition.inc
--echo # The INSERT thread should now be gone.
SELECT state AS should_be_empty_set
FROM information_schema.processlist WHERE id = @other_connection_id;
--echo # The row inserted is there
SELECT * FROM t1 ORDER BY 1;
connection default;
disconnect other;
--echo # Make another connection to INSERT from.
connect (other,localhost,root,,);
connection other;
let $other_connection_id = `SELECT CONNECTION_ID()`;
connection default;
--disable_query_log
eval SET @other_connection_id = $other_connection_id;
--enable_query_log
--echo # Go ahead and send the INSERT; it should block.
connection other;
send INSERT INTO t1 (i) VALUES (4);
connection default;
let $wait_condition =
SELECT COUNT(*) > 0 AS should_be_true
FROM information_schema.processlist
WHERE id = @other_connection_id
AND state = "Waiting for semi-sync ACK from slave";
--source include/wait_condition.inc
--echo # The INSERT thread should now be waiting.
SELECT state AS should_be_waiting
FROM information_schema.processlist WHERE id = @other_connection_id;
--echo # The insert should NOT be visible to other threads
SELECT * FROM t1 ORDER BY 1;
--echo # Now restart server
--source include/restart_mysqld.inc
--echo # Done restarting server
--echo # Reset setting that were lost in restart
SET @@global.rpl_semi_sync_master_timeout = 60000;
SET @@global.rpl_semi_sync_master_wait_no_slave = 1;
--echo # But the row inserted is there
SELECT * FROM t1 ORDER BY 1;
disconnect other;
--echo #
--echo # Cleanup
--echo #
SET GLOBAL rpl_semi_sync_master_enabled = 0;
DROP TABLE t1;
eval SET @@global.rpl_semi_sync_master_timeout = $save_timeout;
eval SET @@global.rpl_semi_sync_master_wait_no_slave = $save_wait_no_slave;
eval SET @@global.rpl_semi_sync_master_wait_point = $save_wait_point;
select @@global.rpl_semi_sync_master_wait_point;
@@global.rpl_semi_sync_master_wait_point
AFTER_COMMIT
SET @start_global_value = @@global.rpl_semi_sync_master_wait_point;
select @@session.rpl_semi_sync_master_wait_point;
ERROR HY000: Variable 'rpl_semi_sync_master_wait_point' is a GLOBAL variable
show global variables like 'rpl_semi_sync_master_wait_point';
Variable_name Value
rpl_semi_sync_master_wait_point AFTER_COMMIT
show session variables like 'rpl_semi_sync_master_wait_point';
Variable_name Value
rpl_semi_sync_master_wait_point AFTER_COMMIT
select * from information_schema.global_variables where variable_name='rpl_semi_sync_master_wait_point';
VARIABLE_NAME VARIABLE_VALUE
RPL_SEMI_SYNC_MASTER_WAIT_POINT AFTER_COMMIT
select * from information_schema.session_variables where variable_name='rpl_semi_sync_master_wait_point';
VARIABLE_NAME VARIABLE_VALUE
RPL_SEMI_SYNC_MASTER_WAIT_POINT AFTER_COMMIT
set global rpl_semi_sync_master_wait_point=AFTER_SYNC;
set session rpl_semi_sync_master_wait_point=AFTER_COMMIT;
ERROR HY000: Variable 'rpl_semi_sync_master_wait_point' is a GLOBAL variable and should be set with SET GLOBAL
select @@global.rpl_semi_sync_master_wait_point;
@@global.rpl_semi_sync_master_wait_point
AFTER_SYNC
select @@session.rpl_semi_sync_master_wait_point;
ERROR HY000: Variable 'rpl_semi_sync_master_wait_point' is a GLOBAL variable
show global variables like 'rpl_semi_sync_master_wait_point';
Variable_name Value
rpl_semi_sync_master_wait_point AFTER_SYNC
show session variables like 'rpl_semi_sync_master_wait_point';
Variable_name Value
rpl_semi_sync_master_wait_point AFTER_SYNC
select * from information_schema.global_variables where variable_name='rpl_semi_sync_master_wait_point';
VARIABLE_NAME VARIABLE_VALUE
RPL_SEMI_SYNC_MASTER_WAIT_POINT AFTER_SYNC
select * from information_schema.session_variables where variable_name='rpl_semi_sync_master_wait_point';
VARIABLE_NAME VARIABLE_VALUE
RPL_SEMI_SYNC_MASTER_WAIT_POINT AFTER_SYNC
set global rpl_semi_sync_master_wait_point=1.1;
ERROR 42000: Incorrect argument type to variable 'rpl_semi_sync_master_wait_point'
set global rpl_semi_sync_master_wait_point=1e1;
ERROR 42000: Incorrect argument type to variable 'rpl_semi_sync_master_wait_point'
SET @@global.rpl_semi_sync_master_wait_point = @start_global_value;
select @@global.rpl_semi_sync_master_wait_point;
@@global.rpl_semi_sync_master_wait_point
AFTER_COMMIT
source include/not_embedded.inc;
source include/have_semisync.inc;
select @@global.rpl_semi_sync_master_wait_point;
SET @start_global_value = @@global.rpl_semi_sync_master_wait_point;
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
select @@session.rpl_semi_sync_master_wait_point;
show global variables like 'rpl_semi_sync_master_wait_point';
show session variables like 'rpl_semi_sync_master_wait_point';
select * from information_schema.global_variables where variable_name='rpl_semi_sync_master_wait_point';
select * from information_schema.session_variables where variable_name='rpl_semi_sync_master_wait_point';
#
# show that it's writable
#
set global rpl_semi_sync_master_wait_point=AFTER_SYNC;
--error ER_GLOBAL_VARIABLE
set session rpl_semi_sync_master_wait_point=AFTER_COMMIT;
select @@global.rpl_semi_sync_master_wait_point;
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
select @@session.rpl_semi_sync_master_wait_point;
show global variables like 'rpl_semi_sync_master_wait_point';
show session variables like 'rpl_semi_sync_master_wait_point';
select * from information_schema.global_variables where variable_name='rpl_semi_sync_master_wait_point';
select * from information_schema.session_variables where variable_name='rpl_semi_sync_master_wait_point';
#
# incorrect types
#
--error ER_WRONG_TYPE_FOR_VAR
set global rpl_semi_sync_master_wait_point=1.1;
--error ER_WRONG_TYPE_FOR_VAR
set global rpl_semi_sync_master_wait_point=1e1;
#
# Cleanup
#
SET @@global.rpl_semi_sync_master_wait_point = @start_global_value;
select @@global.rpl_semi_sync_master_wait_point;
......@@ -24,6 +24,8 @@
/* This indicates whether semi-synchronous replication is enabled. */
char rpl_semi_sync_master_enabled;
unsigned long rpl_semi_sync_master_wait_point =
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT;
unsigned long rpl_semi_sync_master_timeout;
unsigned long rpl_semi_sync_master_trace_level;
char rpl_semi_sync_master_status = 0;
......
......@@ -594,9 +594,15 @@ class ReplSemiSyncMaster
int resetMaster();
};
enum rpl_semi_sync_master_wait_point_t {
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC,
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT,
};
/* System and status variables for the master component */
extern char rpl_semi_sync_master_enabled;
extern char rpl_semi_sync_master_status;
extern unsigned long rpl_semi_sync_master_wait_point;
extern unsigned long rpl_semi_sync_master_clients;
extern unsigned long rpl_semi_sync_master_timeout;
extern unsigned long rpl_semi_sync_master_trace_level;
......
......@@ -48,8 +48,27 @@ int repl_semi_request_commit(Trans_param *param)
return 0;
}
int repl_semi_report_binlog_sync(Binlog_storage_param *param,
const char *log_file,
my_off_t log_pos, uint32 flags)
{
int error= 0;
if (rpl_semi_sync_master_wait_point ==
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
{
error = repl_semisync.commitTrx(log_file, log_pos);
}
return error;
}
int repl_semi_report_commit(Trans_param *param)
{
if (rpl_semi_sync_master_wait_point !=
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
{
return 0;
}
bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
......@@ -175,6 +194,33 @@ static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
&fix_rpl_semi_sync_master_enabled, // update
0);
/* NOTE: must match order of rpl_semi_sync_master_wait_point_t */
static const char *rpl_semi_sync_master_wait_point_names[] =
{
"AFTER_SYNC",
"AFTER_COMMIT",
NullS
};
static TYPELIB rpl_semi_sync_master_wait_point_typelib =
{
array_elements(rpl_semi_sync_master_wait_point_names) - 1,
"",
rpl_semi_sync_master_wait_point_names,
NULL
};
static MYSQL_SYSVAR_ENUM(
wait_point,
rpl_semi_sync_master_wait_point,
PLUGIN_VAR_RQCMDARG,
"Should transaction wait for semi-sync ack after having synced binlog, "
"or after having committed in storeage engine.",
NULL, // check
NULL, // update
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT,
&rpl_semi_sync_master_wait_point_typelib);
static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout,
PLUGIN_VAR_OPCMDARG,
"The timeout value (in ms) for semi-synchronous replication in the master",
......@@ -198,6 +244,7 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
static SYS_VAR* semi_sync_master_system_vars[]= {
MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(wait_point),
MYSQL_SYSVAR(timeout),
MYSQL_SYSVAR(wait_no_slave),
MYSQL_SYSVAR(trace_level),
......@@ -256,6 +303,7 @@ Binlog_storage_observer storage_observer = {
sizeof(Binlog_storage_observer), // len
repl_semi_report_binlog_update, // report_update
repl_semi_report_binlog_sync, // after_sync
};
Binlog_transmit_observer transmit_observer = {
......
......@@ -32,6 +32,7 @@
#include "sql_acl.h" // SUPER_ACL
#include "sql_base.h" // free_io_cache
#include "discover.h" // extension_based_table_discovery, etc
#include "log.h" // for assert_LOCK_log_owner
#include "log_event.h" // *_rows_log_event
#include "create_options.h"
#include "rpl_filter.h"
......@@ -1479,6 +1480,12 @@ int ha_commit_trans(THD *thd, bool all)
done:
DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE(););
/* documentation of which mutexes are (not) owned */
mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
assert_LOCK_log_owner(false);
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
RUN_HOOK(transaction, after_commit, (thd, FALSE));
goto end;
......
......@@ -93,6 +93,7 @@ ulong opt_binlog_dbug_fsync_sleep= 0;
mysql_mutex_t LOCK_prepare_ordered;
mysql_cond_t COND_prepare_ordered;
mysql_mutex_t LOCK_after_binlog_sync;
mysql_mutex_t LOCK_commit_ordered;
static ulonglong binlog_status_var_num_commits;
......@@ -3938,7 +3939,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
Without binlog, we cannot XA recover prepared-but-not-committed
transactions in engines. So force a commit checkpoint first.
Note that we take and immediately release LOCK_commit_ordered. This has
Note that we take and immediately
release LOCK_after_binlog_sync/LOCK_commit_ordered. This has
the effect to ensure that any on-going group commit (in
trx_group_commit_leader()) has completed before we request the checkpoint,
due to the chaining of LOCK_log and LOCK_commit_ordered in that function.
......@@ -3949,7 +3951,10 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
commit_ordered() in the engine of some transaction, and then a crash
later would leave such transaction not recoverable.
*/
mysql_mutex_lock(&LOCK_after_binlog_sync);
mysql_mutex_lock(&LOCK_commit_ordered);
mysql_mutex_unlock(&LOCK_after_binlog_sync);
mysql_mutex_unlock(&LOCK_commit_ordered);
mark_xids_active(current_binlog_id, 1);
......@@ -6035,11 +6040,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
if ((error= flush_and_sync(&synced)))
{
}
else if ((error= RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, file->pos_in_file, synced))))
{
sql_print_error("Failed to run 'after_flush' hooks");
}
else
{
/* update binlog_end_pos so it can be read by dump thread
......@@ -6050,23 +6050,58 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
*/
update_binlog_end_pos(offset);
/* documentation of which mutexes are (not) owned */
mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
bool first= true;
bool last= true;
if ((error= RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, file->pos_in_file,
synced, first, last))))
{
sql_print_error("Failed to run 'after_flush' hooks");
error= 1;
}
else
{
signal_update();
if ((error= rotate(false, &check_purge)))
check_purge= false;
}
}
}
status_var_add(thd->status_var.binlog_bytes_written,
offset - my_org_b_tell);
mysql_mutex_lock(&LOCK_after_binlog_sync);
mysql_mutex_unlock(&LOCK_log);
/* documentation of which mutexes are (not) owned */
mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
mysql_mutex_assert_not_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
bool first= true;
bool last= true;
if (RUN_HOOK(binlog_storage, after_sync,
(thd, log_file_name, file->pos_in_file,
first, last)))
{
error=1;
/* error is already printed inside hook */
}
/*
Take mutex to protect against a reader seeing partial writes of 64-bit
offset on 32-bit CPUs.
*/
mysql_mutex_lock(&LOCK_commit_ordered);
mysql_mutex_unlock(&LOCK_after_binlog_sync);
last_commit_pos_offset= offset;
mysql_mutex_unlock(&LOCK_commit_ordered);
mysql_mutex_unlock(&LOCK_log);
if (check_purge)
checkpoint_and_purge(prev_binlog_id);
......@@ -7374,13 +7409,22 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{
bool any_error= false;
bool all_error= true;
/* documentation of which mutexes are (not) owned */
mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
bool first= true, last;
for (current= queue; current != NULL; current= current->next)
{
last= current->next == NULL;
if (!current->error &&
RUN_HOOK(binlog_storage, after_flush,
(current->thd,
current->cache_mngr->last_commit_pos_file,
current->cache_mngr->last_commit_pos_offset, synced)))
current->cache_mngr->last_commit_pos_offset, synced,
first, last)))
{
current->error= ER_ERROR_ON_WRITE;
current->commit_errno= -1;
......@@ -7389,6 +7433,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
}
else
all_error= false;
first= false;
}
/* update binlog_end_pos so it can be read by dump thread
......@@ -7437,22 +7482,55 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
}
}
DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
mysql_mutex_lock(&LOCK_commit_ordered);
/**
* TODO(jonaso): Check with Kristian,
* if we rotate:d above, this offset is "wrong"
*/
last_commit_pos_offset= commit_offset;
DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_after_binlog_sync");
mysql_mutex_lock(&LOCK_after_binlog_sync);
/*
We cannot unlock LOCK_log until we have locked LOCK_commit_ordered;
We cannot unlock LOCK_log until we have locked LOCK_after_binlog_sync;
otherwise scheduling could allow the next group commit to run ahead of us,
messing up the order of commit_ordered() calls. But as soon as
LOCK_commit_ordered is obtained, we can let the next group commit start.
LOCK_after_binlog_sync is obtained, we can let the next group commit start.
*/
mysql_mutex_unlock(&LOCK_log);
DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log");
/*
Loop through threads and run the binlog_sync hook
*/
{
/* documentation of which mutexes are (not) owned */
mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
mysql_mutex_assert_not_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
bool first= true, last;
for (current= queue; current != NULL; current= current->next)
{
last= current->next == NULL;
if (!current->error &&
RUN_HOOK(binlog_storage, after_sync,
(current->thd, log_file_name,
current->cache_mngr->last_commit_pos_offset,
first, last)))
{
/* error is already printed inside hook */
}
first= false;
}
}
DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
mysql_mutex_lock(&LOCK_commit_ordered);
last_commit_pos_offset= commit_offset;
/*
Unlock LOCK_after_binlog_sync only *after* LOCK_commit_ordered has been
acquired so that groups can not reorder for the different stages of
the group commit procedure.
*/
mysql_mutex_unlock(&LOCK_after_binlog_sync);
DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_after_binlog_sync");
++num_group_commits;
if (!opt_optimize_thread_scheduling)
......
......@@ -88,9 +88,11 @@ class TC_LOG
*/
extern mysql_mutex_t LOCK_prepare_ordered;
extern mysql_cond_t COND_prepare_ordered;
extern mysql_mutex_t LOCK_after_binlog_sync;
extern mysql_mutex_t LOCK_commit_ordered;
#ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
extern PSI_mutex_key key_LOCK_after_binlog_sync;
extern PSI_cond_key key_COND_prepare_ordered;
#endif
......@@ -1157,4 +1159,6 @@ static inline TC_LOG *get_tc_log_implementation()
void assert_LOCK_log_owner(bool owner);
void assert_LOCK_log_owner(bool owner);
#endif /* LOG_H */
......@@ -890,6 +890,7 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_wakeup_ready, key_LOCK_wait_commit;
PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_after_binlog_sync;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
key_LOCK_slave_init;
PSI_mutex_key key_TABLE_SHARE_LOCK_share;
......@@ -954,6 +955,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_TABLE_SHARE_LOCK_share, "TABLE_SHARE::LOCK_share", 0},
{ &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL},
{ &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_after_binlog_sync, "LOCK_after_binlog_sync", PSI_FLAG_GLOBAL},
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_init, "LOCK_slave_init", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
......@@ -2243,6 +2245,7 @@ static void clean_up_mutexes()
mysql_cond_destroy(&COND_server_started);
mysql_mutex_destroy(&LOCK_prepare_ordered);
mysql_cond_destroy(&COND_prepare_ordered);
mysql_mutex_destroy(&LOCK_after_binlog_sync);
mysql_mutex_destroy(&LOCK_commit_ordered);
mysql_mutex_destroy(&LOCK_slave_init);
mysql_cond_destroy(&COND_slave_init);
......@@ -4535,6 +4538,8 @@ static int init_thread_environment()
mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_prepare_ordered, &COND_prepare_ordered, NULL);
mysql_mutex_init(key_LOCK_after_binlog_sync, &LOCK_after_binlog_sync,
MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_slave_init, &LOCK_slave_init,
......
......@@ -81,6 +81,7 @@ typedef struct Trans_observer {
succeeded.
@note The return value is currently ignored by the server.
@note This hook is called wo/ any global mutex held
@param param The parameter for transaction observers
......@@ -103,6 +104,8 @@ typedef struct Trans_observer {
@param param The parameter for transaction observers
@note This hook is called wo/ any global mutex held
@retval 0 Sucess
@retval 1 Failure
*/
......@@ -114,7 +117,13 @@ typedef struct Trans_observer {
*/
enum Binlog_storage_flags {
/** Binary log was sync:ed */
BINLOG_STORAGE_IS_SYNCED = 1
BINLOG_STORAGE_IS_SYNCED = 1,
/** First(or alone) in a group commit */
BINLOG_GROUP_COMMIT_LEADER = 2,
/** Last(or alone) in a group commit */
BINLOG_GROUP_COMMIT_TRAILER = 4
};
/**
......@@ -137,6 +146,8 @@ typedef struct Binlog_storage_observer {
binary log file. Whether the binary log file is synchronized to
disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags.
@note: this hook is called with LOCK_log mutex held
@param param Observer common parameter
@param log_file Binlog file name been updated
@param log_pos Binlog position after update
......@@ -148,6 +159,26 @@ typedef struct Binlog_storage_observer {
int (*after_flush)(Binlog_storage_param *param,
const char *log_file, my_off_t log_pos,
uint32 flags);
/**
This callback is called after binlog has been synced
This callback is called after events flushed to disk has been sync:ed
("group committed").
@note: this hook is called with LOCK_after_binlog_sync mutex held
@param param Observer common parameter
@param log_file Binlog file name been updated
@param log_pos Binlog position after update
@param flags flags for binlog storage
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_sync)(Binlog_storage_param *param,
const char *log_file, my_off_t log_pos,
uint32 flags);
} Binlog_storage_observer;
/**
......
......@@ -252,12 +252,18 @@ int Trans_delegate::after_rollback(THD *thd, bool all)
int Binlog_storage_delegate::after_flush(THD *thd,
const char *log_file,
my_off_t log_pos,
bool synced)
bool synced,
bool first_in_group,
bool last_in_group)
{
Binlog_storage_param param;
uint32 flags=0;
if (synced)
flags |= BINLOG_STORAGE_IS_SYNCED;
if (first_in_group)
flags|= BINLOG_GROUP_COMMIT_LEADER;
if (last_in_group)
flags|= BINLOG_GROUP_COMMIT_TRAILER;
Trans_binlog_info *log_info=
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
......@@ -279,6 +285,27 @@ int Binlog_storage_delegate::after_flush(THD *thd,
return ret;
}
int Binlog_storage_delegate::after_sync(THD *thd,
const char *log_file,
my_off_t log_pos,
bool first_in_group,
bool last_in_group)
{
Binlog_storage_param param;
uint32 flags=0;
if (first_in_group)
flags|= BINLOG_GROUP_COMMIT_LEADER;
if (last_in_group)
flags|= BINLOG_GROUP_COMMIT_TRAILER;
int ret= 0;
FOREACH_OBSERVER(ret, after_sync, thd,
(&param, log_file+dirname_length(log_file), log_pos, flags));
return ret;
}
#ifdef HAVE_REPLICATION
int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
const char *log_file,
......
......@@ -153,7 +153,10 @@ class Binlog_storage_delegate
public:
typedef Binlog_storage_observer Observer;
int after_flush(THD *thd, const char *log_file,
my_off_t log_pos, bool synced);
my_off_t log_pos, bool synced,
bool first_in_group, bool last_in_group);
int after_sync(THD *thd, const char *log_file, my_off_t log_pos,
bool first_in_group, bool last_in_group);
};
#ifdef HAVE_REPLICATION
......
......@@ -24,6 +24,7 @@
#include "rpl_handler.h"
#include "debug_sync.h" // DEBUG_SYNC
#include "sql_acl.h"
#include "log.h" // for assert_LOCK_log_owner
/* Conditions under which the transaction state must not change. */
static bool trans_check(THD *thd)
......@@ -232,6 +233,13 @@ bool trans_commit(THD *thd)
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
res= ha_commit_trans(thd, TRUE);
/* documentation of which mutexes are (not) owned */
mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
assert_LOCK_log_owner(false);
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
if (WSREP_ON)
wsrep_post_commit(thd, TRUE);
/*
......@@ -433,6 +441,12 @@ bool trans_commit_stmt(THD *thd)
}
}
/* documentation of which mutexes are (not) owned */
mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
assert_LOCK_log_owner(false);
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
/*
if res is non-zero, then ha_commit_trans has rolled back the
transaction, so the hooks for rollback will be called.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment