Commit f9db747b authored by Igor Babaev's avatar Igor Babaev

Merge

parents 8bc5045e 0b690fdf
MYSQL_VERSION_MAJOR=5 MYSQL_VERSION_MAJOR=5
MYSQL_VERSION_MINOR=5 MYSQL_VERSION_MINOR=5
MYSQL_VERSION_PATCH=20 MYSQL_VERSION_PATCH=21
MYSQL_VERSION_EXTRA= MYSQL_VERSION_EXTRA=
...@@ -750,6 +750,31 @@ print_use_stmt(PRINT_EVENT_INFO* pinfo, const Query_log_event *ev) ...@@ -750,6 +750,31 @@ print_use_stmt(PRINT_EVENT_INFO* pinfo, const Query_log_event *ev)
} }
/**
Print "SET skip_replication=..." statement when needed.
Not all servers support this (only MariaDB from some version on). So we
mark the SET to only execute from the version of MariaDB that supports it,
and also only output it if we actually see events with the flag set, to not
get spurious errors on MySQL@Oracle servers of higher version that do not
support the flag.
So we start out assuming @@skip_replication is 0, and only output a SET
statement when it changes.
*/
static void
print_skip_replication_statement(PRINT_EVENT_INFO *pinfo, const Log_event *ev)
{
int cur_val;
cur_val= (ev->flags & LOG_EVENT_SKIP_REPLICATION_F) != 0;
if (cur_val == pinfo->skip_replication)
return; /* Not changed. */
fprintf(result_file, "/*!50521 SET skip_replication=%d*/%s\n",
cur_val, pinfo->delimiter);
pinfo->skip_replication= cur_val;
}
/** /**
Prints the given event in base64 format. Prints the given event in base64 format.
...@@ -893,7 +918,10 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, ...@@ -893,7 +918,10 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
goto end; goto end;
} }
else else
{
print_skip_replication_statement(print_event_info, ev);
ev->print(result_file, print_event_info); ev->print(result_file, print_event_info);
}
break; break;
} }
...@@ -923,7 +951,10 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, ...@@ -923,7 +951,10 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
goto end; goto end;
} }
else else
{
print_skip_replication_statement(print_event_info, ev);
ce->print(result_file, print_event_info, TRUE); ce->print(result_file, print_event_info, TRUE);
}
// If this binlog is not 3.23 ; why this test?? // If this binlog is not 3.23 ; why this test??
if (glob_description_event->binlog_version >= 3) if (glob_description_event->binlog_version >= 3)
...@@ -1027,6 +1058,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, ...@@ -1027,6 +1058,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
if (fname) if (fname)
{ {
convert_path_to_forward_slashes(fname); convert_path_to_forward_slashes(fname);
print_skip_replication_statement(print_event_info, ev);
exlq->print(result_file, print_event_info, fname); exlq->print(result_file, print_event_info, fname);
} }
else else
...@@ -1156,6 +1188,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, ...@@ -1156,6 +1188,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
/* FALL THROUGH */ /* FALL THROUGH */
} }
default: default:
print_skip_replication_statement(print_event_info, ev);
ev->print(result_file, print_event_info); ev->print(result_file, print_event_info);
} }
} }
......
...@@ -11,6 +11,10 @@ ...@@ -11,6 +11,10 @@
--source include/have_udf.inc --source include/have_udf.inc
--source include/master-slave.inc --source include/master-slave.inc
disable_query_log;
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT");
enable_query_log;
# #
# To run this tests the "sql/udf_example.c" need to be compiled into # To run this tests the "sql/udf_example.c" need to be compiled into
# udf_example.so and LD_LIBRARY_PATH should be setup to point out where # udf_example.so and LD_LIBRARY_PATH should be setup to point out where
......
...@@ -620,6 +620,14 @@ The following options may be given as the first argument: ...@@ -620,6 +620,14 @@ The following options may be given as the first argument:
directive multiple times, once for each table. This will directive multiple times, once for each table. This will
work for cross-database updates, in contrast to work for cross-database updates, in contrast to
replicate-do-db. replicate-do-db.
--replicate-events-marked-for-skip=name
Whether the slave should replicate events that were
created with @@skip_replication=1 on the master. Default
REPLICATE (no events are skipped). Other values are
FILTER_ON_SLAVE (events will be sent by the master but
ignored by the slave) and FILTER_ON_MASTER (events marked
with @@skip_replication=1 will be filtered on the master
and never be sent to the slave).
--replicate-ignore-db=name --replicate-ignore-db=name
Tells the slave thread to not replicate to the specified Tells the slave thread to not replicate to the specified
database. To specify more than one database to ignore, database. To specify more than one database to ignore,
...@@ -1029,6 +1037,7 @@ relay-log-purge TRUE ...@@ -1029,6 +1037,7 @@ relay-log-purge TRUE
relay-log-recovery FALSE relay-log-recovery FALSE
relay-log-space-limit 0 relay-log-space-limit 0
replicate-annotate-row-events FALSE replicate-annotate-row-events FALSE
replicate-events-marked-for-skip replicate
replicate-same-server-id FALSE replicate-same-server-id FALSE
report-host (No default value) report-host (No default value)
report-password (No default value) report-password (No default value)
......
...@@ -620,6 +620,14 @@ The following options may be given as the first argument: ...@@ -620,6 +620,14 @@ The following options may be given as the first argument:
directive multiple times, once for each table. This will directive multiple times, once for each table. This will
work for cross-database updates, in contrast to work for cross-database updates, in contrast to
replicate-do-db. replicate-do-db.
--replicate-events-marked-for-skip=name
Whether the slave should replicate events that were
created with @@skip_replication=1 on the master. Default
REPLICATE (no events are skipped). Other values are
FILTER_ON_SLAVE (events will be sent by the master but
ignored by the slave) and FILTER_ON_MASTER (events marked
with @@skip_replication=1 will be filtered on the master
and never be sent to the slave).
--replicate-ignore-db=name --replicate-ignore-db=name
Tells the slave thread to not replicate to the specified Tells the slave thread to not replicate to the specified
database. To specify more than one database to ignore, database. To specify more than one database to ignore,
...@@ -1021,6 +1029,7 @@ relay-log-purge TRUE ...@@ -1021,6 +1029,7 @@ relay-log-purge TRUE
relay-log-recovery FALSE relay-log-recovery FALSE
relay-log-space-limit 0 relay-log-space-limit 0
replicate-annotate-row-events FALSE replicate-annotate-row-events FALSE
replicate-events-marked-for-skip replicate
replicate-same-server-id FALSE replicate-same-server-id FALSE
report-host (No default value) report-host (No default value)
report-password (No default value) report-password (No default value)
......
...@@ -18,11 +18,11 @@ SELECT col_int_key FROM t1; ...@@ -18,11 +18,11 @@ SELECT col_int_key FROM t1;
--send DELETE IGNORE FROM t1; --send DELETE IGNORE FROM t1;
--connection con1 --connection con1
--error 0,1213 --error 0,ER_LOCK_DEADLOCK,ER_LOCK_WAIT_TIMEOUT
DELETE FROM t1 WHERE col_int_key IN (1, 40000000); DELETE FROM t1 WHERE col_int_key IN (1, 40000000);
--connection default --connection default
--error 0,1213 --error 0,ER_LOCK_DEADLOCK,ER_LOCK_WAIT_TIMEOUT
--reap --reap
--disconnect con1 --disconnect con1
......
include/master-slave.inc
[connection master]
CREATE USER 'nonsuperuser'@'127.0.0.1';
GRANT ALTER,CREATE,DELETE,DROP,EVENT,INSERT,PROCESS,REPLICATION SLAVE,
SELECT,UPDATE ON *.* TO 'nonsuperuser'@'127.0.0.1';
SET GLOBAL replicate_events_marked_for_skip=FILTER_ON_MASTER;
ERROR 42000: Access denied; you need (at least one of) the SUPER privilege(s) for this operation
DROP USER'nonsuperuser'@'127.0.0.1';
SELECT @@global.replicate_events_marked_for_skip;
@@global.replicate_events_marked_for_skip
replicate
SET GLOBAL replicate_events_marked_for_skip=FILTER_ON_SLAVE;
ERROR HY000: This operation cannot be performed with a running slave; run STOP SLAVE first
SELECT @@global.replicate_events_marked_for_skip;
@@global.replicate_events_marked_for_skip
replicate
STOP SLAVE;
SET SESSION replicate_events_marked_for_skip=FILTER_ON_MASTER;
ERROR HY000: Variable 'replicate_events_marked_for_skip' is a GLOBAL variable and should be set with SET GLOBAL
SELECT @@global.replicate_events_marked_for_skip;
@@global.replicate_events_marked_for_skip
replicate
SET GLOBAL replicate_events_marked_for_skip=FILTER_ON_MASTER;
SELECT @@global.replicate_events_marked_for_skip;
@@global.replicate_events_marked_for_skip
filter_on_master
START SLAVE;
SELECT @@skip_replication;
@@skip_replication
0
SET GLOBAL skip_replication=1;
ERROR HY000: Variable 'skip_replication' is a SESSION variable and can't be used with SET GLOBAL
SELECT @@skip_replication;
@@skip_replication
0
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=myisam;
CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=innodb;
INSERT INTO t1(a) VALUES (1);
INSERT INTO t2(a) VALUES (1);
SET skip_replication=1;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=myisam;
INSERT INTO t1(a) VALUES (2);
INSERT INTO t2(a) VALUES (2);
FLUSH NO_WRITE_TO_BINLOG LOGS;
SHOW TABLES;
Tables_in_test
t1
t2
SELECT * FROM t1;
a b
1 NULL
SELECT * FROM t2;
a b
1 NULL
DROP TABLE t3;
FLUSH NO_WRITE_TO_BINLOG LOGS;
STOP SLAVE;
SET GLOBAL replicate_events_marked_for_skip=FILTER_ON_SLAVE;
START SLAVE;
SET skip_replication=1;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=myisam;
INSERT INTO t1(a) VALUES (3);
INSERT INTO t2(a) VALUES (3);
FLUSH NO_WRITE_TO_BINLOG LOGS;
SHOW TABLES;
Tables_in_test
t1
t2
SELECT * FROM t1;
a b
1 NULL
SELECT * FROM t2;
a b
1 NULL
DROP TABLE t3;
FLUSH NO_WRITE_TO_BINLOG LOGS;
STOP SLAVE;
SET GLOBAL replicate_events_marked_for_skip=REPLICATE;
START SLAVE;
SET skip_replication=1;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=myisam;
INSERT INTO t3(a) VALUES(2);
SELECT * FROM t3;
a b
2 NULL
DROP TABLE t3;
TRUNCATE t1;
RESET MASTER;
SET skip_replication=0;
INSERT INTO t1 VALUES (1,0);
SET skip_replication=1;
INSERT INTO t1 VALUES (2,0);
SET skip_replication=0;
INSERT INTO t1 VALUES (3,0);
SELECT * FROM t1 ORDER by a;
a b
1 0
2 0
3 0
STOP SLAVE;
SET GLOBAL replicate_events_marked_for_skip=FILTER_ON_MASTER;
TRUNCATE t1;
SELECT * FROM t1 ORDER by a;
a b
1 0
2 0
3 0
START SLAVE;
SELECT * FROM t1 ORDER by a;
a b
1 0
3 0
TRUNCATE t1;
STOP SLAVE;
SET GLOBAL sql_slave_skip_counter=6;
SET GLOBAL replicate_events_marked_for_skip=FILTER_ON_SLAVE;
START SLAVE;
SET @old_binlog_format= @@binlog_format;
SET binlog_format= statement;
SET skip_replication=0;
INSERT INTO t1 VALUES (1,5);
SET skip_replication=1;
INSERT INTO t1 VALUES (2,5);
SET skip_replication=0;
INSERT INTO t1 VALUES (3,5);
INSERT INTO t1 VALUES (4,5);
SET binlog_format= @old_binlog_format;
SELECT * FROM t1;
a b
4 5
include/stop_slave.inc
SET @old_slave_binlog_format= @@global.binlog_format;
SET GLOBAL binlog_format= row;
include/start_slave.inc
TRUNCATE t1;
SET @old_binlog_format= @@binlog_format;
SET binlog_format= row;
BINLOG 'wlZOTw8BAAAA8QAAAPUAAAAAAAQANS41LjIxLU1hcmlhREItZGVidWctbG9nAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAEzgNAAgAEgAEBAQEEgAA2QAEGggAAAAICAgCAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAA371saA==';
BINLOG 'wlZOTxMBAAAAKgAAAGMBAAAAgCkAAAAAAAEABHRlc3QAAnQxAAIDAwAC
wlZOTxcBAAAAJgAAAIkBAAAAgCkAAAAAAAEAAv/8AQAAAAgAAAA=';
BINLOG 'wlZOTxMBAAAAKgAAADwCAAAAACkAAAAAAAEABHRlc3QAAnQxAAIDAwAC
wlZOTxcBAAAAJgAAAGICAAAAACkAAAAAAAEAAv/8AgAAAAgAAAA=';
SET binlog_format= @old_binlog_format;
SELECT * FROM t1 ORDER BY a;
a b
1 8
2 8
SELECT * FROM t1 ORDER by a;
a b
2 8
include/stop_slave.inc
SET GLOBAL binlog_format= @old_slave_binlog_format;
include/start_slave.inc
SET skip_replication=0;
BEGIN;
SET skip_replication=0;
ERROR HY000: Cannot modify @@session.skip_replication inside a transaction
SET skip_replication=1;
ERROR HY000: Cannot modify @@session.skip_replication inside a transaction
ROLLBACK;
SET skip_replication=1;
BEGIN;
SET skip_replication=0;
ERROR HY000: Cannot modify @@session.skip_replication inside a transaction
SET skip_replication=1;
ERROR HY000: Cannot modify @@session.skip_replication inside a transaction
COMMIT;
SET autocommit=0;
INSERT INTO t2(a) VALUES(100);
SET skip_replication=1;
ERROR HY000: Cannot modify @@session.skip_replication inside a transaction
ROLLBACK;
SET autocommit=1;
SET skip_replication=1;
CREATE FUNCTION foo (x INT) RETURNS INT BEGIN SET SESSION skip_replication=x; RETURN x; END|
CREATE PROCEDURE bar(x INT) BEGIN SET SESSION skip_replication=x; END|
CREATE FUNCTION baz (x INT) RETURNS INT BEGIN CALL bar(x); RETURN x; END|
SELECT foo(0);
ERROR HY000: Cannot modify @@session.skip_replication inside a stored function or trigger
SELECT baz(0);
ERROR HY000: Cannot modify @@session.skip_replication inside a stored function or trigger
SET @a= foo(1);
ERROR HY000: Cannot modify @@session.skip_replication inside a stored function or trigger
SET @a= baz(1);
ERROR HY000: Cannot modify @@session.skip_replication inside a stored function or trigger
UPDATE t2 SET b=foo(0);
ERROR HY000: Cannot modify @@session.skip_replication inside a stored function or trigger
UPDATE t2 SET b=baz(0);
ERROR HY000: Cannot modify @@session.skip_replication inside a stored function or trigger
INSERT INTO t1 VALUES (101, foo(1));
ERROR HY000: Cannot modify @@session.skip_replication inside a stored function or trigger
INSERT INTO t1 VALUES (101, baz(0));
ERROR HY000: Cannot modify @@session.skip_replication inside a stored function or trigger
SELECT @@skip_replication;
@@skip_replication
1
CALL bar(0);
SELECT @@skip_replication;
@@skip_replication
0
CALL bar(1);
SELECT @@skip_replication;
@@skip_replication
1
DROP FUNCTION foo;
DROP PROCEDURE bar;
DROP FUNCTION baz;
SET skip_replication= 0;
TRUNCATE t1;
STOP SLAVE;
SET GLOBAL replicate_events_marked_for_skip=FILTER_ON_MASTER;
START SLAVE IO_THREAD;
SET skip_replication= 1;
INSERT INTO t1(a) VALUES (1);
SET skip_replication= 0;
INSERT INTO t1(a) VALUES (2);
include/save_master_pos.inc
include/sync_io_with_master.inc
STOP SLAVE IO_THREAD;
SET GLOBAL replicate_events_marked_for_skip=REPLICATE;
START SLAVE;
SELECT * FROM t1;
a b
2 NULL
SET skip_replication= 0;
TRUNCATE t1;
STOP SLAVE;
SET GLOBAL replicate_events_marked_for_skip=FILTER_ON_SLAVE;
START SLAVE IO_THREAD;
SET skip_replication= 1;
INSERT INTO t1(a) VALUES (1);
SET skip_replication= 0;
INSERT INTO t1(a) VALUES (2);
include/save_master_pos.inc
include/sync_io_with_master.inc
STOP SLAVE IO_THREAD;
SET GLOBAL replicate_events_marked_for_skip=REPLICATE;
START SLAVE;
SELECT * FROM t1 ORDER BY a;
a b
1 NULL
2 NULL
SET skip_replication=0;
DROP TABLE t1,t2;
STOP SLAVE;
SET GLOBAL replicate_events_marked_for_skip=REPLICATE;
START SLAVE;
include/rpl_end.inc
This diff is collapsed.
...@@ -5,10 +5,6 @@ ...@@ -5,10 +5,6 @@
# statement based format. This tests work completed in WL#3629. # # statement based format. This tests work completed in WL#3629. #
################################################################### ###################################################################
disable_query_log;
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT");
enable_query_log;
let $engine_type= MyISAM; let $engine_type= MyISAM;
--source include/rpl_udf.inc --source include/rpl_udf.inc
......
#
# Basic testing of replicate_events_marked_for_skip.
#
SET @save_replicate_events_marked_for_skip = @@GLOBAL.replicate_events_marked_for_skip;
SELECT @save_replicate_events_marked_for_skip;
@save_replicate_events_marked_for_skip
replicate
# Scope.
SET @@SESSION.replicate_events_marked_for_skip = "";
ERROR HY000: Variable 'replicate_events_marked_for_skip' is a GLOBAL variable and should be set with SET GLOBAL
SELECT @@SESSION.replicate_events_marked_for_skip;
ERROR HY000: Variable 'replicate_events_marked_for_skip' is a GLOBAL variable
# Argument syntax.
SET @@GLOBAL.replicate_events_marked_for_skip=filter_on_master;
SELECT @@GLOBAL.replicate_events_marked_for_skip;
@@GLOBAL.replicate_events_marked_for_skip
filter_on_master
SET @@GLOBAL.replicate_events_marked_for_skip=filter_on_slave;
SELECT @@GLOBAL.replicate_events_marked_for_skip;
@@GLOBAL.replicate_events_marked_for_skip
filter_on_slave
SET @@GLOBAL.replicate_events_marked_for_skip=replicate;
SELECT @@GLOBAL.replicate_events_marked_for_skip;
@@GLOBAL.replicate_events_marked_for_skip
replicate
SET @@GLOBAL.replicate_events_marked_for_skip=filter;
ERROR 42000: Variable 'replicate_events_marked_for_skip' can't be set to the value of 'filter'
SELECT @@GLOBAL.replicate_events_marked_for_skip;
@@GLOBAL.replicate_events_marked_for_skip
replicate
SELECT * FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES WHERE VARIABLE_NAME='replicate_events_marked_for_skip';
VARIABLE_NAME VARIABLE_VALUE
REPLICATE_EVENTS_MARKED_FOR_SKIP replicate
# Cleanup.
SET @@GLOBAL.replicate_events_marked_for_skip = @save_replicate_events_marked_for_skip;
select @@global.skip_replication;
ERROR HY000: Variable 'skip_replication' is a SESSION variable
select @@session.skip_replication between 1 and 10000;
@@session.skip_replication between 1 and 10000
0
should be empty
show global variables like 'skip_replication';
Variable_name Value
show session variables like 'skip_replication';
Variable_name Value
skip_replication OFF
should be empty
select * from information_schema.global_variables where variable_name='skip_replication';
VARIABLE_NAME VARIABLE_VALUE
select @@session.skip_replication = variable_value from information_schema.session_variables where variable_name='skip_replication';
@@session.skip_replication = variable_value
1
Warnings:
Warning 1292 Truncated incorrect DOUBLE value: 'OFF'
set session skip_replication=0;
select @@session.skip_replication;
@@session.skip_replication
0
set session skip_replication=1;
select @@session.skip_replication;
@@session.skip_replication
1
select * from information_schema.global_variables where variable_name='skip_replication';
VARIABLE_NAME VARIABLE_VALUE
select variable_value from information_schema.session_variables where variable_name='skip_replication';
variable_value
ON
set global skip_replication=1;
ERROR HY000: Variable 'skip_replication' is a SESSION variable and can't be used with SET GLOBAL
source include/not_embedded.inc;
--echo #
--echo # Basic testing of replicate_events_marked_for_skip.
--echo #
SET @save_replicate_events_marked_for_skip = @@GLOBAL.replicate_events_marked_for_skip;
SELECT @save_replicate_events_marked_for_skip;
--echo # Scope.
--error ER_GLOBAL_VARIABLE
SET @@SESSION.replicate_events_marked_for_skip = "";
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
SELECT @@SESSION.replicate_events_marked_for_skip;
--echo # Argument syntax.
SET @@GLOBAL.replicate_events_marked_for_skip=filter_on_master;
SELECT @@GLOBAL.replicate_events_marked_for_skip;
SET @@GLOBAL.replicate_events_marked_for_skip=filter_on_slave;
SELECT @@GLOBAL.replicate_events_marked_for_skip;
SET @@GLOBAL.replicate_events_marked_for_skip=replicate;
SELECT @@GLOBAL.replicate_events_marked_for_skip;
--error ER_WRONG_VALUE_FOR_VAR
SET @@GLOBAL.replicate_events_marked_for_skip=filter;
SELECT @@GLOBAL.replicate_events_marked_for_skip;
SELECT * FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES WHERE VARIABLE_NAME='replicate_events_marked_for_skip';
--echo # Cleanup.
SET @@GLOBAL.replicate_events_marked_for_skip = @save_replicate_events_marked_for_skip;
# exists as a session only
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
select @@global.skip_replication;
# Check the variable has a valid numeric value (assumed to be less then 10000)
select @@session.skip_replication between 1 and 10000;
--echo should be empty
show global variables like 'skip_replication';
show session variables like 'skip_replication';
# Global I_S variable is empty
--echo should be empty
select * from information_schema.global_variables where variable_name='skip_replication';
# Check that I_S value is same as variable
select @@session.skip_replication = variable_value from information_schema.session_variables where variable_name='skip_replication';
#
# show that it's writable
#
set session skip_replication=0;
select @@session.skip_replication;
set session skip_replication=1;
select @@session.skip_replication;
select * from information_schema.global_variables where variable_name='skip_replication';
select variable_value from information_schema.session_variables where variable_name='skip_replication';
--error ER_LOCAL_VARIABLE
set global skip_replication=1;
...@@ -729,7 +729,7 @@ const char* Log_event::get_type_str() ...@@ -729,7 +729,7 @@ const char* Log_event::get_type_str()
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), :log_pos(0), temp_buf(0), exec_time(0),
crc(0), thd(thd_arg), crc(0), thd(thd_arg),
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{ {
...@@ -741,6 +741,9 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) ...@@ -741,6 +741,9 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
cache_type= Log_event::EVENT_TRANSACTIONAL_CACHE; cache_type= Log_event::EVENT_TRANSACTIONAL_CACHE;
else else
cache_type= Log_event::EVENT_STMT_CACHE; cache_type= Log_event::EVENT_STMT_CACHE;
flags= flags_arg |
(thd->variables.option_bits & OPTION_SKIP_REPLICATION ?
LOG_EVENT_SKIP_REPLICATION_F : 0);
} }
/** /**
...@@ -891,7 +894,9 @@ Log_event::do_shall_skip(Relay_log_info *rli) ...@@ -891,7 +894,9 @@ Log_event::do_shall_skip(Relay_log_info *rli)
rli->replicate_same_server_id, rli->replicate_same_server_id,
rli->slave_skip_counter)); rli->slave_skip_counter));
if ((server_id == ::server_id && !rli->replicate_same_server_id) || if ((server_id == ::server_id && !rli->replicate_same_server_id) ||
(rli->slave_skip_counter == 1 && rli->is_in_group())) (rli->slave_skip_counter == 1 && rli->is_in_group()) ||
(flags & LOG_EVENT_SKIP_REPLICATION_F &&
opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE))
return EVENT_SKIP_IGNORE; return EVENT_SKIP_IGNORE;
if (rli->slave_skip_counter > 0) if (rli->slave_skip_counter > 0)
return EVENT_SKIP_COUNT; return EVENT_SKIP_COUNT;
...@@ -3901,6 +3906,14 @@ Query_log_event::do_shall_skip(Relay_log_info *rli) ...@@ -3901,6 +3906,14 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
DBUG_PRINT("debug", ("query: %s; q_len: %d", query, q_len)); DBUG_PRINT("debug", ("query: %s; q_len: %d", query, q_len));
DBUG_ASSERT(query && q_len > 0); DBUG_ASSERT(query && q_len > 0);
/*
An event skipped due to @@skip_replication must not be counted towards the
number of events to be skipped due to @@sql_slave_skip_counter.
*/
if (flags & LOG_EVENT_SKIP_REPLICATION_F &&
opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE)
DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE);
if (rli->slave_skip_counter > 0) if (rli->slave_skip_counter > 0)
{ {
if (strcmp("BEGIN", query) == 0) if (strcmp("BEGIN", query) == 0)
...@@ -10806,7 +10819,7 @@ st_print_event_info::st_print_event_info() ...@@ -10806,7 +10819,7 @@ st_print_event_info::st_print_event_info()
auto_increment_increment(0),auto_increment_offset(0), charset_inited(0), auto_increment_increment(0),auto_increment_offset(0), charset_inited(0),
lc_time_names_number(~0), lc_time_names_number(~0),
charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER), charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
thread_id(0), thread_id_printed(false), thread_id(0), thread_id_printed(false), skip_replication(0),
base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE) base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE)
{ {
/* /*
......
...@@ -504,6 +504,19 @@ struct sql_ex_info ...@@ -504,6 +504,19 @@ struct sql_ex_info
*/ */
#define LOG_EVENT_RELAY_LOG_F 0x40 #define LOG_EVENT_RELAY_LOG_F 0x40
/**
@def LOG_EVENT_SKIP_REPLICATION_F
Flag set by application creating the event (with @@skip_replication); the
slave will skip replication of such events if
--replicate-events-marked-for-skip is not set to REPLICATE.
This is a MariaDB flag; we allocate it from the end of the available
values to reduce risk of conflict with new MySQL flags.
*/
#define LOG_EVENT_SKIP_REPLICATION_F 0x8000
/** /**
@def OPTIONS_WRITTEN_TO_BIN_LOG @def OPTIONS_WRITTEN_TO_BIN_LOG
...@@ -701,6 +714,11 @@ typedef struct st_print_event_info ...@@ -701,6 +714,11 @@ typedef struct st_print_event_info
uint charset_database_number; uint charset_database_number;
uint thread_id; uint thread_id;
bool thread_id_printed; bool thread_id_printed;
/*
Track when @@skip_replication changes so we need to output a SET
statement for it.
*/
int skip_replication;
st_print_event_info(); st_print_event_info();
...@@ -993,8 +1011,8 @@ class Log_event ...@@ -993,8 +1011,8 @@ class Log_event
/** /**
Some 16 flags. See the definitions above for LOG_EVENT_TIME_F, Some 16 flags. See the definitions above for LOG_EVENT_TIME_F,
LOG_EVENT_FORCED_ROTATE_F, LOG_EVENT_THREAD_SPECIFIC_F, and LOG_EVENT_FORCED_ROTATE_F, LOG_EVENT_THREAD_SPECIFIC_F,
LOG_EVENT_SUPPRESS_USE_F for notes. LOG_EVENT_SUPPRESS_USE_F, and LOG_EVENT_SKIP_REPLICATION_F for notes.
*/ */
uint16 flags; uint16 flags;
...@@ -4143,6 +4161,8 @@ class Incident_log_event : public Log_event { ...@@ -4143,6 +4161,8 @@ class Incident_log_event : public Log_event {
m_message.str= NULL; /* Just as a precaution */ m_message.str= NULL; /* Just as a precaution */
m_message.length= 0; m_message.length= 0;
set_direct_logging(); set_direct_logging();
/* Replicate the incident irregardless of @@skip_replication. */
flags&= ~LOG_EVENT_SKIP_REPLICATION_F;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -4153,6 +4173,8 @@ class Incident_log_event : public Log_event { ...@@ -4153,6 +4173,8 @@ class Incident_log_event : public Log_event {
DBUG_PRINT("enter", ("m_incident: %d", m_incident)); DBUG_PRINT("enter", ("m_incident: %d", m_incident));
m_message= msg; m_message= msg;
set_direct_logging(); set_direct_logging();
/* Replicate the incident irregardless of @@skip_replication. */
flags&= ~LOG_EVENT_SKIP_REPLICATION_F;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
#endif #endif
......
...@@ -437,6 +437,8 @@ uint opt_large_page_size= 0; ...@@ -437,6 +437,8 @@ uint opt_large_page_size= 0;
MYSQL_PLUGIN_IMPORT uint opt_debug_sync_timeout= 0; MYSQL_PLUGIN_IMPORT uint opt_debug_sync_timeout= 0;
#endif /* defined(ENABLED_DEBUG_SYNC) */ #endif /* defined(ENABLED_DEBUG_SYNC) */
my_bool opt_old_style_user_limits= 0, trust_function_creators= 0; my_bool opt_old_style_user_limits= 0, trust_function_creators= 0;
ulong opt_replicate_events_marked_for_skip;
/* /*
True if there is at least one per-hour limit for some user, so we should True if there is at least one per-hour limit for some user, so we should
check them before each query (and possibly reset counters when hour is check them before each query (and possibly reset counters when hour is
......
...@@ -109,6 +109,7 @@ extern my_bool opt_old_style_user_limits, trust_function_creators; ...@@ -109,6 +109,7 @@ extern my_bool opt_old_style_user_limits, trust_function_creators;
extern uint opt_crash_binlog_innodb; extern uint opt_crash_binlog_innodb;
extern char *shared_memory_base_name, *mysqld_unix_port; extern char *shared_memory_base_name, *mysqld_unix_port;
extern my_bool opt_enable_shared_memory; extern my_bool opt_enable_shared_memory;
extern ulong opt_replicate_events_marked_for_skip;
extern char *default_tz_name; extern char *default_tz_name;
extern Time_zone *default_tz; extern Time_zone *default_tz;
extern char *default_storage_engine; extern char *default_storage_engine;
......
...@@ -171,6 +171,7 @@ class sys_var ...@@ -171,6 +171,7 @@ class sys_var
#include "sql_plugin.h" /* SHOW_HA_ROWS, SHOW_MY_BOOL */ #include "sql_plugin.h" /* SHOW_HA_ROWS, SHOW_MY_BOOL */
/**************************************************************************** /****************************************************************************
Classes for parsing of the SET command Classes for parsing of the SET command
****************************************************************************/ ****************************************************************************/
......
...@@ -6557,4 +6557,7 @@ ER_CONNECTION_KILLED 70100 ...@@ -6557,4 +6557,7 @@ ER_CONNECTION_KILLED 70100
eng "Connection was killed" eng "Connection was killed"
ER_INTERNAL_ERROR ER_INTERNAL_ERROR
eng "Internal error: '%-.192s'" eng "Internal error: '%-.192s'"
ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_SKIP_REPLICATION
eng "Cannot modify @@session.skip_replication inside a transaction"
ER_STORED_FUNCTION_PREVENTS_SWITCH_SKIP_REPLICATION
eng "Cannot modify @@session.skip_replication inside a stored function or trigger"
...@@ -1710,6 +1710,48 @@ when it try to get the value of TIME_ZONE global variable from master."; ...@@ -1710,6 +1710,48 @@ when it try to get the value of TIME_ZONE global variable from master.";
past_checksum: past_checksum:
#endif #endif
/*
Request the master to filter away events with the @@skip_replication flag
set, if we are running with
--replicate-events-marked-for-skip=FILTER_ON_MASTER.
*/
if (opt_replicate_events_marked_for_skip == RPL_SKIP_FILTER_ON_MASTER)
{
if (mysql_real_query(mysql, STRING_WITH_LEN("SET skip_replication=1")))
{
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
{
mi->report(ERROR_LEVEL, err_code,
"Setting master-side filtering of @@skip_replication failed "
"with error: %s", mysql_error(mysql));
goto network_err;
}
else if (err_code == ER_UNKNOWN_SYSTEM_VARIABLE)
{
/*
The master is older than the slave and does not support the
@@skip_replication feature.
This is not a problem, as such master will not generate events with
the @@skip_replication flag set in the first place. We will still
do slave-side filtering of such events though, to handle the (rare)
case of downgrading a master and receiving old events generated from
before the downgrade with the @@skip_replication flag set.
*/
DBUG_PRINT("info", ("Old master does not support master-side filtering "
"of @@skip_replication events."));
}
else
{
/* Fatal error */
errmsg= "The slave I/O thread stops because a fatal error is "
"encountered when it tries to request filtering of events marked "
"with the @@skip_replication flag.";
sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
goto err;
}
}
}
err: err:
if (errmsg) if (errmsg)
{ {
...@@ -2498,6 +2540,9 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) ...@@ -2498,6 +2540,9 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
ev->when= hrtime_to_my_time(hrtime); ev->when= hrtime_to_my_time(hrtime);
ev->when_sec_part= hrtime_sec_part(hrtime); ev->when_sec_part= hrtime_sec_part(hrtime);
} }
thd->variables.option_bits=
(thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0);
ev->thd = thd; // because up to this point, ev->thd == 0 ev->thd = thd; // because up to this point, ev->thd == 0
int reason= ev->shall_skip(rli); int reason= ev->shall_skip(rli);
...@@ -4062,6 +4107,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -4062,6 +4107,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
int error= 0; int error= 0;
String error_msg; String error_msg;
ulong inc_pos; ulong inc_pos;
ulong event_pos;
Relay_log_info *rli= &mi->rli; Relay_log_info *rli= &mi->rli;
mysql_mutex_t *log_lock= rli->relay_log.get_log_lock(); mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
ulong s_id; ulong s_id;
...@@ -4134,7 +4180,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -4134,7 +4180,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
(uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
DBUG_RETURN(queue_old_event(mi,buf,event_len)); DBUG_RETURN(queue_old_event(mi,buf,event_len));
LINT_INIT(inc_pos);
mysql_mutex_lock(&mi->data_lock); mysql_mutex_lock(&mi->data_lock);
switch ((uchar)buf[EVENT_TYPE_OFFSET]) { switch ((uchar)buf[EVENT_TYPE_OFFSET]) {
...@@ -4326,6 +4371,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -4326,6 +4371,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
break; break;
} }
/*
If we filter events master-side (eg. @@skip_replication), we will see holes
in the event positions from the master. If we see such a hole, adjust
mi->master_log_pos accordingly so we maintain the correct position (for
reconnect, MASTER_POS_WAIT(), etc.)
*/
if (inc_pos > 0 &&
event_len >= LOG_POS_OFFSET+4 &&
(event_pos= uint4korr(buf+LOG_POS_OFFSET)) > mi->master_log_pos + inc_pos)
{
inc_pos= event_pos - mi->master_log_pos;
DBUG_PRINT("info", ("Adjust master_log_pos %lu->%lu to account for "
"master-side filtering",
(unsigned long)(mi->master_log_pos + inc_pos),
event_pos));
}
/* /*
If this event is originating from this server, don't queue it. If this event is originating from this server, don't queue it.
We don't check this for 3.23 events because it's simpler like this; 3.23 We don't check this for 3.23 events because it's simpler like this; 3.23
......
...@@ -152,6 +152,15 @@ extern ulonglong relay_log_space_limit; ...@@ -152,6 +152,15 @@ extern ulonglong relay_log_space_limit;
*/ */
#define SLAVE_FORCE_ALL 4 #define SLAVE_FORCE_ALL 4
/*
Values for the option --replicate-events-marked-for-skip.
Must match the names in replicate_events_marked_for_skip_names in sys_vars.cc
*/
#define RPL_SKIP_REPLICATE 0
#define RPL_SKIP_FILTER_ON_SLAVE 1
#define RPL_SKIP_FILTER_ON_MASTER 2
int init_slave(); int init_slave();
int init_recovery(Master_info* mi, const char** errmsg); int init_recovery(Master_info* mi, const char** errmsg);
void init_slave_skip_errors(const char* arg); void init_slave_skip_errors(const char* arg);
......
...@@ -44,6 +44,7 @@ ...@@ -44,6 +44,7 @@
void mysql_client_binlog_statement(THD* thd) void mysql_client_binlog_statement(THD* thd)
{ {
ulonglong save_skip_replication;
DBUG_ENTER("mysql_client_binlog_statement"); DBUG_ENTER("mysql_client_binlog_statement");
DBUG_PRINT("info",("binlog base64: '%*s'", DBUG_PRINT("info",("binlog base64: '%*s'",
(int) (thd->lex->comment.length < 2048 ? (int) (thd->lex->comment.length < 2048 ?
...@@ -225,7 +226,17 @@ void mysql_client_binlog_statement(THD* thd) ...@@ -225,7 +226,17 @@ void mysql_client_binlog_statement(THD* thd)
reporting. reporting.
*/ */
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
save_skip_replication= thd->variables.option_bits&OPTION_SKIP_REPLICATION;
thd->variables.option_bits=
(thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ?
OPTION_SKIP_REPLICATION : 0);
err= ev->apply_event(rli); err= ev->apply_event(rli);
thd->variables.option_bits=
(thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
save_skip_replication;
#else #else
err= 0; err= 0;
#endif #endif
......
...@@ -151,6 +151,7 @@ ...@@ -151,6 +151,7 @@
Note! Reserved for use in MySQL Cluster Note! Reserved for use in MySQL Cluster
*/ */
#define OPTION_ALLOW_BATCH (ULL(1) << 36) // THD, intern (slave) #define OPTION_ALLOW_BATCH (ULL(1) << 36) // THD, intern (slave)
#define OPTION_SKIP_REPLICATION (ULL(1) << 37) // THD, user
/* The rest of the file is included in the server only */ /* The rest of the file is included in the server only */
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
......
...@@ -556,8 +556,60 @@ static int send_heartbeat_event(NET* net, String* packet, ...@@ -556,8 +556,60 @@ static int send_heartbeat_event(NET* net, String* packet,
/* /*
TODO: Clean up loop to only have one call to send_file() Helper function for mysql_binlog_send() to write an event down the slave
connection.
Returns NULL on success, error message string on error.
*/ */
static const char *
send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Log_event_type event_type, char *log_file_name,
IO_CACHE *log)
{
my_off_t pos;
/* Do not send annotate_rows events unless slave requested it. */
if (event_type == ANNOTATE_ROWS_EVENT &&
!(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
return NULL;
/*
Skip events with the @@skip_replication flag set, if slave requested
skipping of such events.
*/
if (thd->variables.option_bits & OPTION_SKIP_REPLICATION)
{
/*
The first byte of the packet is a '\0' to distinguish it from an error
packet. So the actual event starts at offset +1.
*/
uint16 event_flags= uint2korr(&((*packet)[FLAGS_OFFSET+1]));
if (event_flags & LOG_EVENT_SKIP_REPLICATION_F)
return NULL;
}
thd_proc_info(thd, "Sending binlog event to slave");
pos= my_b_tell(log);
if (RUN_HOOK(binlog_transmit, before_send_event,
(thd, flags, packet, log_file_name, pos)))
return "run 'before_send_event' hook failed";
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
return "Failed on my_net_write()";
DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
if (event_type == LOAD_EVENT)
{
if (send_file(thd))
return "failed in send_file()";
}
if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
return "Failed to run hook 'after_send_event'";
return NULL; /* Success */
}
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
ushort flags) ushort flags)
...@@ -570,9 +622,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ...@@ -570,9 +622,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
IO_CACHE log; IO_CACHE log;
File file = -1; File file = -1;
String* packet = &thd->packet; String* const packet = &thd->packet;
int error; int error;
const char *errmsg = "Unknown error"; const char *errmsg = "Unknown error", *tmp_msg;
const char *fmt= "%s; the last event was read from '%s' at %s, the last byte read was read from '%s' at %s."; const char *fmt= "%s; the last event was read from '%s' at %s, the last byte read was read from '%s' at %s.";
char llbuff1[22], llbuff2[22]; char llbuff1[22], llbuff2[22];
char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message() char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
...@@ -889,51 +941,21 @@ impossible position"; ...@@ -889,51 +941,21 @@ impossible position";
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
} }
if (event_type != ANNOTATE_ROWS_EVENT || if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) log_file_name, &log)))
{ {
pos = my_b_tell(&log); errmsg= tmp_msg;
if (RUN_HOOK(binlog_transmit, before_send_event, my_errno= ER_UNKNOWN_ERROR;
(thd, flags, packet, log_file_name, pos))) goto err;
{ }
my_errno= ER_UNKNOWN_ERROR;
errmsg= "run 'before_send_event' hook failed";
goto err;
}
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
{
if (event_type == XID_EVENT)
{ {
if (event_type == XID_EVENT) net_flush(net);
{ }
net_flush(net); });
}
});
DBUG_PRINT("info", ("log event code %d", event_type));
if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
errmsg = "failed in send_file()";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
}
if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
{
errmsg= "Failed to run hook 'after_send_event'";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
}
/* reset transmit packet for next loop */ /* reset transmit packet for next loop */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
...@@ -1078,43 +1100,13 @@ impossible position"; ...@@ -1078,43 +1100,13 @@ impossible position";
goto err; goto err;
} }
if (read_packet && if (read_packet &&
(event_type != ANNOTATE_ROWS_EVENT || (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))) log_file_name, &log)))
{ {
thd_proc_info(thd, "Sending binlog event to slave"); errmsg= tmp_msg;
pos = my_b_tell(&log); my_errno= ER_UNKNOWN_ERROR;
if (RUN_HOOK(binlog_transmit, before_send_event, goto err;
(thd, flags, packet, log_file_name, pos)))
{
my_errno= ER_UNKNOWN_ERROR;
errmsg= "run 'before_send_event' hook failed";
goto err;
}
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
errmsg = "failed in send_file()";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
}
if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
{
my_errno= ER_UNKNOWN_ERROR;
errmsg= "Failed to run hook 'after_send_event'";
goto err;
}
} }
log.error=0; log.error=0;
......
...@@ -231,6 +231,35 @@ static Sys_var_ulonglong Sys_binlog_stmt_cache_size( ...@@ -231,6 +231,35 @@ static Sys_var_ulonglong Sys_binlog_stmt_cache_size(
CMD_LINE(REQUIRED_ARG), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(IO_SIZE, ULONGLONG_MAX), DEFAULT(32768), BLOCK_SIZE(IO_SIZE)); VALID_RANGE(IO_SIZE, ULONGLONG_MAX), DEFAULT(32768), BLOCK_SIZE(IO_SIZE));
/*
Some variables like @sql_log_bin and @binlog_format change how/if binlogging
is done. We must not change them inside a running transaction or statement,
otherwise the event group eventually written to the binlog may become
incomplete or otherwise garbled.
This function does the appropriate check.
It returns true if an error is caused by incorrect usage, false if ok.
*/
static bool
error_if_in_trans_or_substatement(THD *thd, int in_substatement_error,
int in_transaction_error)
{
if (thd->in_sub_stmt)
{
my_error(in_substatement_error, MYF(0));
return true;
}
if (thd->in_active_multi_stmt_transaction())
{
my_error(in_transaction_error, MYF(0));
return true;
}
return false;
}
static bool check_has_super(sys_var *self, THD *thd, set_var *var) static bool check_has_super(sys_var *self, THD *thd, set_var *var)
{ {
DBUG_ASSERT(self->scope() != sys_var::GLOBAL);// don't abuse check_has_super() DBUG_ASSERT(self->scope() != sys_var::GLOBAL);// don't abuse check_has_super()
...@@ -271,22 +300,10 @@ static bool binlog_format_check(sys_var *self, THD *thd, set_var *var) ...@@ -271,22 +300,10 @@ static bool binlog_format_check(sys_var *self, THD *thd, set_var *var)
return true; return true;
} }
/* if (error_if_in_trans_or_substatement(thd,
if in a stored function/trigger, it's too late to change mode ER_STORED_FUNCTION_PREVENTS_SWITCH_BINLOG_FORMAT,
*/ ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_BINLOG_FORMAT))
if (thd->in_sub_stmt)
{
my_error(ER_STORED_FUNCTION_PREVENTS_SWITCH_BINLOG_FORMAT, MYF(0));
return true; return true;
}
/*
Make the session variable 'binlog_format' read-only inside a transaction.
*/
if (thd->in_active_multi_stmt_transaction())
{
my_error(ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_BINLOG_FORMAT, MYF(0));
return true;
}
return false; return false;
} }
...@@ -322,24 +339,10 @@ static bool binlog_direct_check(sys_var *self, THD *thd, set_var *var) ...@@ -322,24 +339,10 @@ static bool binlog_direct_check(sys_var *self, THD *thd, set_var *var)
if (var->type == OPT_GLOBAL) if (var->type == OPT_GLOBAL)
return false; return false;
/* if (error_if_in_trans_or_substatement(thd,
Makes the session variable 'binlog_direct_non_transactional_updates' ER_STORED_FUNCTION_PREVENTS_SWITCH_BINLOG_DIRECT,
read-only if within a procedure, trigger or function. ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_BINLOG_DIRECT))
*/
if (thd->in_sub_stmt)
{
my_error(ER_STORED_FUNCTION_PREVENTS_SWITCH_BINLOG_DIRECT, MYF(0));
return true;
}
/*
Makes the session variable 'binlog_direct_non_transactional_updates'
read-only inside a transaction.
*/
if (thd->in_active_multi_stmt_transaction())
{
my_error(ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_BINLOG_DIRECT, MYF(0));
return true; return true;
}
return false; return false;
} }
...@@ -2017,6 +2020,67 @@ static Sys_var_mybool Sys_master_verify_checksum( ...@@ -2017,6 +2020,67 @@ static Sys_var_mybool Sys_master_verify_checksum(
"SHOW BINLOG EVENTS", "SHOW BINLOG EVENTS",
GLOBAL_VAR(opt_master_verify_checksum), CMD_LINE(OPT_ARG), GLOBAL_VAR(opt_master_verify_checksum), CMD_LINE(OPT_ARG),
DEFAULT(FALSE)); DEFAULT(FALSE));
/* These names must match RPL_SKIP_XXX #defines in slave.h. */
static const char *replicate_events_marked_for_skip_names[]= {
"replicate", "filter_on_slave", "filter_on_master", 0
};
static bool
replicate_events_marked_for_skip_check(sys_var *self, THD *thd,
set_var *var)
{
int thread_mask;
DBUG_ENTER("sys_var_replicate_events_marked_for_skip_check");
/* Slave threads must be stopped to change the variable. */
mysql_mutex_lock(&LOCK_active_mi);
lock_slave_threads(active_mi);
init_thread_mask(&thread_mask, active_mi, 0 /*not inverse*/);
unlock_slave_threads(active_mi);
mysql_mutex_unlock(&LOCK_active_mi);
if (thread_mask) // We refuse if any slave thread is running
{
my_error(ER_SLAVE_MUST_STOP, MYF(0));
DBUG_RETURN(true);
}
DBUG_RETURN(false);
}
bool
Sys_var_replicate_events_marked_for_skip::global_update(THD *thd, set_var *var)
{
bool result;
int thread_mask;
DBUG_ENTER("Sys_var_replicate_events_marked_for_skip::global_update");
/* Slave threads must be stopped to change the variable. */
mysql_mutex_lock(&LOCK_active_mi);
lock_slave_threads(active_mi);
init_thread_mask(&thread_mask, active_mi, 0 /*not inverse*/);
if (thread_mask) // We refuse if any slave thread is running
{
my_error(ER_SLAVE_MUST_STOP, MYF(0));
result= true;
}
else
result= Sys_var_enum::global_update(thd, var);
unlock_slave_threads(active_mi);
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(result);
}
static Sys_var_replicate_events_marked_for_skip Replicate_events_marked_for_skip
("replicate_events_marked_for_skip",
"Whether the slave should replicate events that were created with "
"@@skip_replication=1 on the master. Default REPLICATE (no events are "
"skipped). Other values are FILTER_ON_SLAVE (events will be sent by the "
"master but ignored by the slave) and FILTER_ON_MASTER (events marked with "
"@@skip_replication=1 will be filtered on the master and never be sent to "
"the slave).",
GLOBAL_VAR(opt_replicate_events_marked_for_skip), CMD_LINE(REQUIRED_ARG),
replicate_events_marked_for_skip_names, DEFAULT(RPL_SKIP_REPLICATE),
NO_MUTEX_GUARD, NOT_IN_BINLOG,
ON_CHECK(replicate_events_marked_for_skip_check));
#endif #endif
...@@ -2571,18 +2635,10 @@ static bool check_sql_log_bin(sys_var *self, THD *thd, set_var *var) ...@@ -2571,18 +2635,10 @@ static bool check_sql_log_bin(sys_var *self, THD *thd, set_var *var)
if (var->type == OPT_GLOBAL) if (var->type == OPT_GLOBAL)
return FALSE; return FALSE;
/* If in a stored function/trigger, it's too late to change sql_log_bin. */ if (error_if_in_trans_or_substatement(thd,
if (thd->in_sub_stmt) ER_STORED_FUNCTION_PREVENTS_SWITCH_SQL_LOG_BIN,
{ ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_SQL_LOG_BIN))
my_error(ER_STORED_FUNCTION_PREVENTS_SWITCH_SQL_LOG_BIN, MYF(0));
return TRUE; return TRUE;
}
/* Make the session variable 'sql_log_bin' read-only inside a transaction. */
if (thd->in_active_multi_stmt_transaction())
{
my_error(ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_SQL_LOG_BIN, MYF(0));
return TRUE;
}
return FALSE; return FALSE;
} }
...@@ -2647,6 +2703,40 @@ static Sys_var_ulong Sys_profiling_history_size( ...@@ -2647,6 +2703,40 @@ static Sys_var_ulong Sys_profiling_history_size(
VALID_RANGE(0, 100), DEFAULT(15), BLOCK_SIZE(1)); VALID_RANGE(0, 100), DEFAULT(15), BLOCK_SIZE(1));
#endif #endif
/*
When this is set by a connection, binlogged events will be marked with a
corresponding flag. The slave can be configured to not replicate events
so marked.
In the binlog dump thread on the master, this variable is re-used for a
related purpose: The slave sets this flag when connecting to the master to
request that the master filter out (ie. not send) any events with the flag
set, thus saving network traffic on events that would be ignored by the
slave anyway.
*/
static bool check_skip_replication(sys_var *self, THD *thd, set_var *var)
{
/*
We must not change @@skip_replication in the middle of a transaction or
statement, as that could result in only part of the transaction / statement
being replicated.
(This would be particularly serious if we were to replicate eg.
Rows_log_event without Table_map_log_event or transactional updates without
the COMMIT).
*/
if (error_if_in_trans_or_substatement(thd,
ER_STORED_FUNCTION_PREVENTS_SWITCH_SKIP_REPLICATION,
ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_SKIP_REPLICATION))
return 1;
return 0;
}
static Sys_var_bit Sys_skip_replication(
"skip_replication", "skip_replication",
SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_SKIP_REPLICATION,
DEFAULT(FALSE), NO_MUTEX_GUARD, NOT_IN_BINLOG,
ON_CHECK(check_skip_replication));
static Sys_var_harows Sys_select_limit( static Sys_var_harows Sys_select_limit(
"sql_select_limit", "sql_select_limit",
"The maximum number of rows to return from SELECT statements", "The maximum number of rows to return from SELECT statements",
...@@ -3523,7 +3613,7 @@ static Sys_var_mybool Sys_query_cache_strip_comments( ...@@ -3523,7 +3613,7 @@ static Sys_var_mybool Sys_query_cache_strip_comments(
static ulonglong in_transaction(THD *thd) static ulonglong in_transaction(THD *thd)
{ {
return test(thd->server_status & SERVER_STATUS_IN_TRANS); return test(thd->in_active_multi_stmt_transaction());
} }
static Sys_var_session_special Sys_in_transaction( static Sys_var_session_special Sys_in_transaction(
"in_transaction", "Whether there is an active transaction", "in_transaction", "Whether there is an active transaction",
......
...@@ -1800,6 +1800,26 @@ class Sys_var_tx_isolation: public Sys_var_enum ...@@ -1800,6 +1800,26 @@ class Sys_var_tx_isolation: public Sys_var_enum
} }
}; };
/*
Class for replicate_events_marked_for_skip.
We need a custom update function that ensures the slave is stopped when
the update is happening.
*/
class Sys_var_replicate_events_marked_for_skip: public Sys_var_enum
{
public:
Sys_var_replicate_events_marked_for_skip(const char *name_arg,
const char *comment, int flag_args, ptrdiff_t off, size_t size,
CMD_LINE getopt,
const char *values[], uint def_val, PolyLock *lock,
enum binlog_status_enum binlog_status_arg,
on_check_function on_check_func)
:Sys_var_enum(name_arg, comment, flag_args, off, size, getopt,
values, def_val, lock, binlog_status_arg, on_check_func)
{}
bool global_update(THD *thd, set_var *var);
};
/**************************************************************************** /****************************************************************************
Used templates Used templates
****************************************************************************/ ****************************************************************************/
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment