Commit c4d69f17 authored by unknown's avatar unknown

MWL#234: Support for marking binlog events to not be replicated, and for...

MWL#234: Support for marking binlog events to not be replicated, and for telling slaves not to replicate events with such mark
parent 51c7723e
...@@ -670,6 +670,31 @@ print_use_stmt(PRINT_EVENT_INFO* pinfo, const Query_log_event *ev) ...@@ -670,6 +670,31 @@ print_use_stmt(PRINT_EVENT_INFO* pinfo, const Query_log_event *ev)
} }
/**
Print "SET do_not_replicate=..." statement when needed.
Not all servers support this (only MariaDB from some version on). So we
mark the SET to only execute from the version of MariaDB that supports it,
and also only output it if we actually see events with the flag set, to not
get spurious errors on MySQL@Oracle servers of higher version that do not
support the flag.
So we start out assuming @@do_not_replicate is 0, and only output a SET
statement when it changes.
*/
static void
print_do_not_replicate_statement(PRINT_EVENT_INFO *pinfo, const Log_event *ev)
{
int cur_val;
cur_val= (ev->flags & LOG_EVENT_DO_NOT_REPLICATE_F) != 0;
if (cur_val == pinfo->do_not_replicate)
return; /* Not changed. */
fprintf(result_file, "/*!50400 SET do_not_replicate=%d*/%s\n",
cur_val, pinfo->delimiter);
pinfo->do_not_replicate= cur_val;
}
/** /**
Prints the given event in base64 format. Prints the given event in base64 format.
...@@ -802,7 +827,10 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, ...@@ -802,7 +827,10 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
goto end; goto end;
} }
else else
{
print_do_not_replicate_statement(print_event_info, ev);
ev->print(result_file, print_event_info); ev->print(result_file, print_event_info);
}
break; break;
} }
...@@ -832,7 +860,10 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, ...@@ -832,7 +860,10 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
goto end; goto end;
} }
else else
{
print_do_not_replicate_statement(print_event_info, ev);
ce->print(result_file, print_event_info, TRUE); ce->print(result_file, print_event_info, TRUE);
}
// If this binlog is not 3.23 ; why this test?? // If this binlog is not 3.23 ; why this test??
if (glob_description_event->binlog_version >= 3) if (glob_description_event->binlog_version >= 3)
...@@ -927,6 +958,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, ...@@ -927,6 +958,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
if (!shall_skip_database(exlq->db)) if (!shall_skip_database(exlq->db))
{ {
print_use_stmt(print_event_info, exlq); print_use_stmt(print_event_info, exlq);
print_do_not_replicate_statement(print_event_info, ev);
if (fname) if (fname)
{ {
convert_path_to_forward_slashes(fname); convert_path_to_forward_slashes(fname);
...@@ -1030,6 +1062,12 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, ...@@ -1030,6 +1062,12 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
} }
/* FALL THROUGH */ /* FALL THROUGH */
} }
case INTVAR_EVENT:
case RAND_EVENT:
case USER_VAR_EVENT:
case XID_EVENT:
print_do_not_replicate_statement(print_event_info, ev);
/* Fall through ... */
default: default:
ev->print(result_file, print_event_info); ev->print(result_file, print_event_info);
} }
......
...@@ -13,7 +13,7 @@ dnl When changing the major version number please also check the switch ...@@ -13,7 +13,7 @@ dnl When changing the major version number please also check the switch
dnl statement in mysqlbinlog::check_master_version(). You may also need dnl statement in mysqlbinlog::check_master_version(). You may also need
dnl to update version.c in ndb. dnl to update version.c in ndb.
AC_INIT([MariaDB Server], [5.2.7-MariaDB], [], [mysql]) AC_INIT([MariaDB Server], [5.4.0-MariaDB], [], [mysql])
AC_CONFIG_SRCDIR([sql/mysqld.cc]) AC_CONFIG_SRCDIR([sql/mysqld.cc])
AC_CANONICAL_SYSTEM AC_CANONICAL_SYSTEM
......
include/master-slave.inc
[connection master]
CREATE USER 'nonsuperuser'@'127.0.0.1';
GRANT ALTER,CREATE,DELETE,DROP,EVENT,INSERT,PROCESS,REPLICATION SLAVE,
SELECT,UPDATE ON *.* TO 'nonsuperuser'@'127.0.0.1';
SET GLOBAL replicate_ignore_do_not_replicate=1;
ERROR 42000: Access denied; you need the SUPER privilege for this operation
DROP USER'nonsuperuser'@'127.0.0.1';
SET GLOBAL replicate_ignore_do_not_replicate=1;
ERROR HY000: This operation cannot be performed with a running slave; run STOP SLAVE first
STOP SLAVE;
SET GLOBAL replicate_ignore_do_not_replicate=1;
START SLAVE;
SET do_not_replicate=0;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=myisam;
CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=innodb;
INSERT INTO t1(a) VALUES (1);
INSERT INTO t2(a) VALUES (1);
SET do_not_replicate=1;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=myisam;
INSERT INTO t1(a) VALUES (2);
INSERT INTO t2(a) VALUES (2);
FLUSH NO_WRITE_TO_BINLOG LOGS;
SHOW TABLES;
Tables_in_test
t1
t2
SELECT * FROM t1;
a b
1 NULL
SELECT * FROM t2;
a b
1 NULL
DROP TABLE t3;
FLUSH NO_WRITE_TO_BINLOG LOGS;
STOP SLAVE;
SET GLOBAL replicate_ignore_do_not_replicate=0;
START SLAVE;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=myisam;
INSERT INTO t3(a) VALUES(2);
SELECT * FROM t3;
a b
2 NULL
DROP TABLE t3;
TRUNCATE t1;
RESET MASTER;
SET do_not_replicate=0;
INSERT INTO t1 VALUES (1,0);
SET do_not_replicate=1;
INSERT INTO t1 VALUES (2,0);
SET do_not_replicate=0;
INSERT INTO t1 VALUES (3,0);
SELECT * FROM t1 ORDER by a;
a b
1 0
2 0
3 0
STOP SLAVE;
SET GLOBAL replicate_ignore_do_not_replicate=1;
TRUNCATE t1;
SELECT * FROM t1 ORDER by a;
a b
1 0
2 0
3 0
START SLAVE;
SELECT * FROM t1 ORDER by a;
a b
1 0
3 0
TRUNCATE t1;
STOP SLAVE;
SET GLOBAL sql_slave_skip_counter=2;
SET GLOBAL replicate_ignore_do_not_replicate=1;
START SLAVE;
SET @old_binlog_format= @@binlog_format;
SET binlog_format= statement;
SET do_not_replicate=0;
INSERT INTO t1 VALUES (1,5);
SET do_not_replicate=1;
INSERT INTO t1 VALUES (2,5);
SET do_not_replicate=0;
INSERT INTO t1 VALUES (3,5);
INSERT INTO t1 VALUES (4,5);
SET binlog_format= @old_binlog_format;
SELECT * FROM t1;
a b
4 5
TRUNCATE t1;
BINLOG '66I6Tg8BAAAAZgAAAGoAAAABAAQANS40LjAtTWFyaWFEQi12YWxncmluZC1tYXgtZGVidWctbG9n
AAAAAAAAAAAAAAAAAADrojpOEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC';
BINLOG 'HaM6ThMBAAAAKgAAANgAAAAAgA8AAAAAAAEABHRlc3QAAnQxAAIDAwAC
HaM6ThcBAAAAJgAAAP4AAAAAgA8AAAAAAAEAAv/8AQAAAAgAAAA=';
BINLOG 'JqM6ThMBAAAAKgAAALEBAAAAAA8AAAAAAAEABHRlc3QAAnQxAAIDAwAC
JqM6ThcBAAAAJgAAANcBAAAAAA8AAAAAAAEAAv/8AgAAAAgAAAA=';
SELECT * FROM t1 ORDER BY a;
a b
1 8
2 8
SELECT * FROM t1 ORDER by a;
a b
2 8
SET do_not_replicate=0;
BEGIN;
SET do_not_replicate=0;
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
SET do_not_replicate=1;
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
ROLLBACK;
SET do_not_replicate=1;
BEGIN;
SET do_not_replicate=0;
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
SET do_not_replicate=1;
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
COMMIT;
SET autocommit=0;
INSERT INTO t2(a) VALUES(100);
SET do_not_replicate=1;
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
ROLLBACK;
SET autocommit=1;
SET do_not_replicate=1;
CREATE FUNCTION foo (x INT) RETURNS INT BEGIN SET SESSION do_not_replicate=x; RETURN x; END|
CREATE PROCEDURE bar(x INT) BEGIN SET SESSION do_not_replicate=x; END|
CREATE FUNCTION baz (x INT) RETURNS INT BEGIN CALL bar(x); RETURN x; END|
SELECT foo(0);
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
SELECT baz(0);
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
SET @a= foo(1);
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
SET @a= baz(1);
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
UPDATE t2 SET b=foo(0);
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
UPDATE t2 SET b=baz(0);
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
INSERT INTO t1 VALUES (101, foo(1));
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
INSERT INTO t1 VALUES (101, baz(0));
ERROR HY000: Can't execute the given command because you have active locked tables or an active transaction
SELECT @@do_not_replicate;
@@do_not_replicate
1
CALL bar(0);
SELECT @@do_not_replicate;
@@do_not_replicate
0
CALL bar(1);
SELECT @@do_not_replicate;
@@do_not_replicate
1
DROP FUNCTION foo;
DROP PROCEDURE bar;
DROP FUNCTION baz;
SET do_not_replicate=0;
DROP TABLE t1,t2;
STOP SLAVE;
SET GLOBAL replicate_ignore_do_not_replicate=0;
START SLAVE;
include/rpl_end.inc
--source include/master-slave.inc
--source include/have_innodb.inc
connection slave;
# Test that SUPER is required to change @@replicate_ignore_do_not_replicate.
CREATE USER 'nonsuperuser'@'127.0.0.1';
GRANT ALTER,CREATE,DELETE,DROP,EVENT,INSERT,PROCESS,REPLICATION SLAVE,
SELECT,UPDATE ON *.* TO 'nonsuperuser'@'127.0.0.1';
connect(nonpriv, 127.0.0.1, nonsuperuser,, test, $SLAVE_MYPORT,);
connection nonpriv;
--error ER_SPECIFIC_ACCESS_DENIED_ERROR
SET GLOBAL replicate_ignore_do_not_replicate=1;
disconnect nonpriv;
connection slave;
DROP USER'nonsuperuser'@'127.0.0.1';
--error ER_SLAVE_MUST_STOP
SET GLOBAL replicate_ignore_do_not_replicate=1;
STOP SLAVE;
SET GLOBAL replicate_ignore_do_not_replicate=1;
START SLAVE;
connection master;
SET do_not_replicate=0;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=myisam;
CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=innodb;
INSERT INTO t1(a) VALUES (1);
INSERT INTO t2(a) VALUES (1);
SET do_not_replicate=1;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=myisam;
INSERT INTO t1(a) VALUES (2);
INSERT INTO t2(a) VALUES (2);
# Inject a rotate event in the binlog stream sent to slave (otherwise we will
# fail sync_slave_with_master as the last event on the master is not present
# on the slave).
FLUSH NO_WRITE_TO_BINLOG LOGS;
sync_slave_with_master;
connection slave;
SHOW TABLES;
SELECT * FROM t1;
SELECT * FROM t2;
connection master;
DROP TABLE t3;
FLUSH NO_WRITE_TO_BINLOG LOGS;
sync_slave_with_master;
connection slave;
STOP SLAVE;
SET GLOBAL replicate_ignore_do_not_replicate=0;
START SLAVE;
connection master;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=myisam;
INSERT INTO t3(a) VALUES(2);
sync_slave_with_master;
connection slave;
SELECT * FROM t3;
connection master;
DROP TABLE t3;
#
# Test that the slave will preserve the @@do_not_replicate flag in its
# own binlog.
#
TRUNCATE t1;
sync_slave_with_master;
connection slave;
RESET MASTER;
connection master;
SET do_not_replicate=0;
INSERT INTO t1 VALUES (1,0);
SET do_not_replicate=1;
INSERT INTO t1 VALUES (2,0);
SET do_not_replicate=0;
INSERT INTO t1 VALUES (3,0);
sync_slave_with_master;
connection slave;
# Since slave has @@replicate_ignore_do_not_replicate=0, it should have
# applied all events.
SELECT * FROM t1 ORDER by a;
STOP SLAVE;
SET GLOBAL replicate_ignore_do_not_replicate=1;
let $SLAVE_DATADIR= `select @@datadir`;
connection master;
TRUNCATE t1;
# Now apply the slave binlog to the master, to check that both the slave
# and mysqlbinlog will preserve the @@do_not_replicate flag.
--exec $MYSQL_BINLOG $SLAVE_DATADIR/slave-bin.000001 > $MYSQLTEST_VARDIR/tmp/rpl_do_not_replicate.binlog
--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/rpl_do_not_replicate.binlog
# The master should have all three events.
SELECT * FROM t1 ORDER by a;
# The slave should be missing event 2, which is marked with the
# @@do_not_replicate flag.
connection slave;
START SLAVE;
connection master;
sync_slave_with_master;
connection slave;
SELECT * FROM t1 ORDER by a;
#
# Test that @@sql_slave_skip_counter does not count skipped @@do_not_replicate
# events.
#
connection master;
TRUNCATE t1;
sync_slave_with_master;
connection slave;
STOP SLAVE;
SET GLOBAL sql_slave_skip_counter=2;
SET GLOBAL replicate_ignore_do_not_replicate=1;
START SLAVE;
connection master;
# Need to fix @@binlog_format to get consistent event count.
SET @old_binlog_format= @@binlog_format;
SET binlog_format= statement;
SET do_not_replicate=0;
INSERT INTO t1 VALUES (1,5);
SET do_not_replicate=1;
INSERT INTO t1 VALUES (2,5);
SET do_not_replicate=0;
INSERT INTO t1 VALUES (3,5);
INSERT INTO t1 VALUES (4,5);
SET binlog_format= @old_binlog_format;
sync_slave_with_master;
connection slave;
# The slave should have skipped the first three inserts (number 1 and 3 due
# to @@sql_slave_skip_counter=2, number 2 due to
# @@replicate_ignore_do_not_replicate=1). So only number 4 should be left.
SELECT * FROM t1;
#
# Check that BINLOG statement preserves the @@do_not_replicate flag.
#
connection master;
TRUNCATE t1;
# Format description log event.
BINLOG '66I6Tg8BAAAAZgAAAGoAAAABAAQANS40LjAtTWFyaWFEQi12YWxncmluZC1tYXgtZGVidWctbG9n
AAAAAAAAAAAAAAAAAADrojpOEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC';
# INSERT INTO t1 VALUES (1,8) # with @@do_not_replicate=1
BINLOG 'HaM6ThMBAAAAKgAAANgAAAAAgA8AAAAAAAEABHRlc3QAAnQxAAIDAwAC
HaM6ThcBAAAAJgAAAP4AAAAAgA8AAAAAAAEAAv/8AQAAAAgAAAA=';
# INSERT INTO t1 VALUES (2,8) # with @@do_not_replicate=0
BINLOG 'JqM6ThMBAAAAKgAAALEBAAAAAA8AAAAAAAEABHRlc3QAAnQxAAIDAwAC
JqM6ThcBAAAAJgAAANcBAAAAAA8AAAAAAAEAAv/8AgAAAAgAAAA=';
SELECT * FROM t1 ORDER BY a;
sync_slave_with_master;
connection slave;
# Slave should have only the second insert, the first should be ignored due to
# the @@do_not_replicate flag.
SELECT * FROM t1 ORDER by a;
# Test that it is not possible to d change @@do_not_replicate inside a
# transaction or statement, thereby replicating only parts of statements
# or transactions.
connection master;
SET do_not_replicate=0;
BEGIN;
--error ER_LOCK_OR_ACTIVE_TRANSACTION
SET do_not_replicate=0;
--error ER_LOCK_OR_ACTIVE_TRANSACTION
SET do_not_replicate=1;
ROLLBACK;
SET do_not_replicate=1;
BEGIN;
--error ER_LOCK_OR_ACTIVE_TRANSACTION
SET do_not_replicate=0;
--error ER_LOCK_OR_ACTIVE_TRANSACTION
SET do_not_replicate=1;
COMMIT;
SET autocommit=0;
INSERT INTO t2(a) VALUES(100);
--error ER_LOCK_OR_ACTIVE_TRANSACTION
SET do_not_replicate=1;
ROLLBACK;
SET autocommit=1;
SET do_not_replicate=1;
--delimiter |
CREATE FUNCTION foo (x INT) RETURNS INT BEGIN SET SESSION do_not_replicate=x; RETURN x; END|
CREATE PROCEDURE bar(x INT) BEGIN SET SESSION do_not_replicate=x; END|
CREATE FUNCTION baz (x INT) RETURNS INT BEGIN CALL bar(x); RETURN x; END|
--delimiter ;
--error ER_LOCK_OR_ACTIVE_TRANSACTION
SELECT foo(0);
--error ER_LOCK_OR_ACTIVE_TRANSACTION
SELECT baz(0);
--error ER_LOCK_OR_ACTIVE_TRANSACTION
SET @a= foo(1);
--error ER_LOCK_OR_ACTIVE_TRANSACTION
SET @a= baz(1);
--error ER_LOCK_OR_ACTIVE_TRANSACTION
UPDATE t2 SET b=foo(0);
--error ER_LOCK_OR_ACTIVE_TRANSACTION
UPDATE t2 SET b=baz(0);
--error ER_LOCK_OR_ACTIVE_TRANSACTION
INSERT INTO t1 VALUES (101, foo(1));
--error ER_LOCK_OR_ACTIVE_TRANSACTION
INSERT INTO t1 VALUES (101, baz(0));
SELECT @@do_not_replicate;
CALL bar(0);
SELECT @@do_not_replicate;
CALL bar(1);
SELECT @@do_not_replicate;
DROP FUNCTION foo;
DROP PROCEDURE bar;
DROP FUNCTION baz;
# Clean up.
connection master;
SET do_not_replicate=0;
DROP TABLE t1,t2;
connection slave;
STOP SLAVE;
SET GLOBAL replicate_ignore_do_not_replicate=0;
START SLAVE;
--source include/rpl_end.inc
...@@ -665,11 +665,13 @@ const char* Log_event::get_type_str() ...@@ -665,11 +665,13 @@ const char* Log_event::get_type_str()
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg) :log_pos(0), temp_buf(0), exec_time(0), thd(thd_arg)
{ {
server_id= thd->server_id; server_id= thd->server_id;
when= thd->start_time; when= thd->start_time;
cache_stmt= using_trans; cache_stmt= using_trans;
flags= flags_arg |
(thd->options & OPTION_DO_NOT_REPLICATE ? LOG_EVENT_DO_NOT_REPLICATE_F : 0);
} }
...@@ -825,7 +827,9 @@ Log_event::do_shall_skip(Relay_log_info *rli) ...@@ -825,7 +827,9 @@ Log_event::do_shall_skip(Relay_log_info *rli)
rli->replicate_same_server_id, rli->replicate_same_server_id,
rli->slave_skip_counter)); rli->slave_skip_counter));
if ((server_id == ::server_id && !rli->replicate_same_server_id) || if ((server_id == ::server_id && !rli->replicate_same_server_id) ||
(rli->slave_skip_counter == 1 && rli->is_in_group())) (rli->slave_skip_counter == 1 && rli->is_in_group()) ||
(flags & LOG_EVENT_DO_NOT_REPLICATE_F
&& opt_replicate_ignore_do_not_replicate))
return EVENT_SKIP_IGNORE; return EVENT_SKIP_IGNORE;
if (rli->slave_skip_counter > 0) if (rli->slave_skip_counter > 0)
return EVENT_SKIP_COUNT; return EVENT_SKIP_COUNT;
...@@ -3483,6 +3487,14 @@ Query_log_event::do_shall_skip(Relay_log_info *rli) ...@@ -3483,6 +3487,14 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
DBUG_PRINT("debug", ("query: %s; q_len: %d", query, q_len)); DBUG_PRINT("debug", ("query: %s; q_len: %d", query, q_len));
DBUG_ASSERT(query && q_len > 0); DBUG_ASSERT(query && q_len > 0);
/*
An event skipped due to @@do_not_replicate must not be counted towards the
number of events to be skipped due to @@sql_slave_skip_counter.
*/
if (flags & LOG_EVENT_DO_NOT_REPLICATE_F &&
opt_replicate_ignore_do_not_replicate)
DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE);
if (rli->slave_skip_counter > 0) if (rli->slave_skip_counter > 0)
{ {
if (strcmp("BEGIN", query) == 0) if (strcmp("BEGIN", query) == 0)
...@@ -9780,7 +9792,7 @@ st_print_event_info::st_print_event_info() ...@@ -9780,7 +9792,7 @@ st_print_event_info::st_print_event_info()
auto_increment_increment(0),auto_increment_offset(0), charset_inited(0), auto_increment_increment(0),auto_increment_offset(0), charset_inited(0),
lc_time_names_number(~0), lc_time_names_number(~0),
charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER), charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
thread_id(0), thread_id_printed(false), thread_id(0), thread_id_printed(false), do_not_replicate(0),
base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE) base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE)
{ {
/* /*
......
...@@ -490,6 +490,19 @@ struct sql_ex_info ...@@ -490,6 +490,19 @@ struct sql_ex_info
*/ */
#define LOG_EVENT_RELAY_LOG_F 0x40 #define LOG_EVENT_RELAY_LOG_F 0x40
/**
@def LOG_EVENT_DO_NOT_REPLICATE_F
Flag set by application creating the event (with @@do_not_replicate); the
slave will skip replication of such events if
--replicate-ignore-do-not-replicate is set.
This is a MariaDB flag; we allocate it from the end of the available
values to reduce risk of conflict with new MySQL flags.
*/
#define LOG_EVENT_DO_NOT_REPLICATE_F 0x8000
/** /**
@def OPTIONS_WRITTEN_TO_BIN_LOG @def OPTIONS_WRITTEN_TO_BIN_LOG
...@@ -656,6 +669,11 @@ typedef struct st_print_event_info ...@@ -656,6 +669,11 @@ typedef struct st_print_event_info
uint charset_database_number; uint charset_database_number;
uint thread_id; uint thread_id;
bool thread_id_printed; bool thread_id_printed;
/*
Track when @@do_not_replicate changes so we need to output a SET
statement for it.
*/
int do_not_replicate;
st_print_event_info(); st_print_event_info();
...@@ -910,8 +928,8 @@ class Log_event ...@@ -910,8 +928,8 @@ class Log_event
/** /**
Some 16 flags. See the definitions above for LOG_EVENT_TIME_F, Some 16 flags. See the definitions above for LOG_EVENT_TIME_F,
LOG_EVENT_FORCED_ROTATE_F, LOG_EVENT_THREAD_SPECIFIC_F, and LOG_EVENT_FORCED_ROTATE_F, LOG_EVENT_THREAD_SPECIFIC_F,
LOG_EVENT_SUPPRESS_USE_F for notes. LOG_EVENT_SUPPRESS_USE_F, and LOG_EVENT_DO_NOT_REPLICATE_F for notes.
*/ */
uint16 flags; uint16 flags;
...@@ -3915,6 +3933,8 @@ class Incident_log_event : public Log_event { ...@@ -3915,6 +3933,8 @@ class Incident_log_event : public Log_event {
DBUG_PRINT("enter", ("m_incident: %d", m_incident)); DBUG_PRINT("enter", ("m_incident: %d", m_incident));
m_message.str= NULL; /* Just as a precaution */ m_message.str= NULL; /* Just as a precaution */
m_message.length= 0; m_message.length= 0;
/* Replicate the incident irregardless of @@do_not_replicate. */
flags&= ~LOG_EVENT_DO_NOT_REPLICATE_F;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -3924,6 +3944,8 @@ class Incident_log_event : public Log_event { ...@@ -3924,6 +3944,8 @@ class Incident_log_event : public Log_event {
DBUG_ENTER("Incident_log_event::Incident_log_event"); DBUG_ENTER("Incident_log_event::Incident_log_event");
DBUG_PRINT("enter", ("m_incident: %d", m_incident)); DBUG_PRINT("enter", ("m_incident: %d", m_incident));
m_message= msg; m_message= msg;
/* Replicate the incident irregardless of @@do_not_replicate. */
flags&= ~LOG_EVENT_DO_NOT_REPLICATE_F;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
#endif #endif
......
...@@ -504,6 +504,7 @@ class Default_object_creation_ctx : public Object_creation_ctx ...@@ -504,6 +504,7 @@ class Default_object_creation_ctx : public Object_creation_ctx
*/ */
#define TMP_TABLE_FORCE_MYISAM (ULL(1) << 32) #define TMP_TABLE_FORCE_MYISAM (ULL(1) << 32)
#define OPTION_PROFILING (ULL(1) << 33) #define OPTION_PROFILING (ULL(1) << 33)
#define OPTION_DO_NOT_REPLICATE (ULL(1) << 34) // THD, user
...@@ -2064,6 +2065,7 @@ extern my_bool opt_old_style_user_limits, trust_function_creators; ...@@ -2064,6 +2065,7 @@ extern my_bool opt_old_style_user_limits, trust_function_creators;
extern uint opt_crash_binlog_innodb; extern uint opt_crash_binlog_innodb;
extern char *shared_memory_base_name, *mysqld_unix_port; extern char *shared_memory_base_name, *mysqld_unix_port;
extern my_bool opt_enable_shared_memory; extern my_bool opt_enable_shared_memory;
extern my_bool opt_replicate_ignore_do_not_replicate;
extern char *default_tz_name; extern char *default_tz_name;
#endif /* MYSQL_SERVER */ #endif /* MYSQL_SERVER */
#if defined MYSQL_SERVER || defined INNODB_COMPATIBILITY_HOOKS #if defined MYSQL_SERVER || defined INNODB_COMPATIBILITY_HOOKS
......
...@@ -553,6 +553,8 @@ uint opt_large_page_size= 0; ...@@ -553,6 +553,8 @@ uint opt_large_page_size= 0;
uint opt_debug_sync_timeout= 0; uint opt_debug_sync_timeout= 0;
#endif /* defined(ENABLED_DEBUG_SYNC) */ #endif /* defined(ENABLED_DEBUG_SYNC) */
my_bool opt_old_style_user_limits= 0, trust_function_creators= 0; my_bool opt_old_style_user_limits= 0, trust_function_creators= 0;
my_bool opt_replicate_ignore_do_not_replicate;
/* /*
True if there is at least one per-hour limit for some user, so we should True if there is at least one per-hour limit for some user, so we should
check them before each query (and possibly reset counters when hour is check them before each query (and possibly reset counters when hour is
...@@ -6085,7 +6087,8 @@ enum options_mysqld ...@@ -6085,7 +6087,8 @@ enum options_mysqld
OPT_IGNORE_BUILTIN_INNODB, OPT_IGNORE_BUILTIN_INNODB,
OPT_BINLOG_DIRECT_NON_TRANS_UPDATE, OPT_BINLOG_DIRECT_NON_TRANS_UPDATE,
OPT_DEFAULT_CHARACTER_SET_OLD, OPT_DEFAULT_CHARACTER_SET_OLD,
OPT_MAX_LONG_DATA_SIZE OPT_MAX_LONG_DATA_SIZE,
OPT_REPLICATE_IGNORE_DO_NOT_REPLICATE
}; };
...@@ -6782,6 +6785,11 @@ each time the SQL thread starts.", ...@@ -6782,6 +6785,11 @@ each time the SQL thread starts.",
"cross database updates. If you need cross database updates to work, " "cross database updates. If you need cross database updates to work, "
"make sure you have 3.23.28 or later, and use replicate-wild-ignore-" "make sure you have 3.23.28 or later, and use replicate-wild-ignore-"
"table=db_name.%. ", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, "table=db_name.%. ", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"replicate-ignore-do-not-replicate", OPT_REPLICATE_IGNORE_DO_NOT_REPLICATE,
"Tells the slave thread not to replicate events that were created with"
"@@do_not_replicat=1.", &opt_replicate_ignore_do_not_replicate,
&opt_replicate_ignore_do_not_replicate, 0, GET_BOOL, NO_ARG,
0, 0, 0 ,0, 0, 0},
{"replicate-ignore-table", OPT_REPLICATE_IGNORE_TABLE, {"replicate-ignore-table", OPT_REPLICATE_IGNORE_TABLE,
"Tells the slave thread to not replicate to the specified table. To specify " "Tells the slave thread to not replicate to the specified table. To specify "
"more than one table to ignore, use the directive multiple times, once for " "more than one table to ignore, use the directive multiple times, once for "
......
...@@ -117,6 +117,7 @@ static bool set_option_log_bin_bit(THD *thd, set_var *var); ...@@ -117,6 +117,7 @@ static bool set_option_log_bin_bit(THD *thd, set_var *var);
static bool set_option_autocommit(THD *thd, set_var *var); static bool set_option_autocommit(THD *thd, set_var *var);
static int check_log_update(THD *thd, set_var *var); static int check_log_update(THD *thd, set_var *var);
static bool set_log_update(THD *thd, set_var *var); static bool set_log_update(THD *thd, set_var *var);
static int check_do_not_replicate(THD *thd, set_var *var);
static int check_pseudo_thread_id(THD *thd, set_var *var); static int check_pseudo_thread_id(THD *thd, set_var *var);
void fix_binlog_format_after_update(THD *thd, enum_var_type type); void fix_binlog_format_after_update(THD *thd, enum_var_type type);
static void fix_low_priority_updates(THD *thd, enum_var_type type); static void fix_low_priority_updates(THD *thd, enum_var_type type);
...@@ -830,6 +831,10 @@ static sys_var_thd_bit sys_profiling(&vars, "profiling", NULL, ...@@ -830,6 +831,10 @@ static sys_var_thd_bit sys_profiling(&vars, "profiling", NULL,
static sys_var_thd_ulong sys_profiling_history_size(&vars, "profiling_history_size", static sys_var_thd_ulong sys_profiling_history_size(&vars, "profiling_history_size",
&SV::profiling_history_size); &SV::profiling_history_size);
#endif #endif
static sys_var_thd_bit sys_do_not_replicate(&vars, "do_not_replicate",
check_do_not_replicate,
set_option_bit,
OPTION_DO_NOT_REPLICATE);
/* Local state variables */ /* Local state variables */
...@@ -906,6 +911,12 @@ static sys_var_thd_set sys_log_slow_verbosity(&vars, ...@@ -906,6 +911,12 @@ static sys_var_thd_set sys_log_slow_verbosity(&vars,
"log_slow_verbosity", "log_slow_verbosity",
&SV::log_slow_verbosity, &SV::log_slow_verbosity,
&log_slow_verbosity_typelib); &log_slow_verbosity_typelib);
#ifdef HAVE_REPLICATION
static sys_var_replicate_ignore_do_not_replicate
sys_replicate_ignore_do_not_replicate(&vars,
"replicate_ignore_do_not_replicate",
&opt_replicate_ignore_do_not_replicate);
#endif
/* Global read-only variable containing hostname */ /* Global read-only variable containing hostname */
static sys_var_const_str sys_hostname(&vars, "hostname", glob_hostname); static sys_var_const_str sys_hostname(&vars, "hostname", glob_hostname);
...@@ -3268,6 +3279,25 @@ static bool set_log_update(THD *thd, set_var *var) ...@@ -3268,6 +3279,25 @@ static bool set_log_update(THD *thd, set_var *var)
} }
static int check_do_not_replicate(THD *thd, set_var *var)
{
/*
We must not change @@do_not_replicate in the middle of a transaction or
statement, as that could result in only part of the transaction / statement
being replicated.
(This would be particularly serious if we were to replicate eg.
Rows_log_event without Table_map_log_event or transactional updates without
the COMMIT).
*/
if (thd->locked_tables || thd->active_transaction())
{
my_error(ER_LOCK_OR_ACTIVE_TRANSACTION, MYF(0));
return 1;
}
return 0;
}
static int check_pseudo_thread_id(THD *thd, set_var *var) static int check_pseudo_thread_id(THD *thd, set_var *var)
{ {
var->save_result.ulonglong_value= var->value->val_int(); var->save_result.ulonglong_value= var->value->val_int();
...@@ -4412,6 +4442,32 @@ sys_var_event_scheduler::update(THD *thd, set_var *var) ...@@ -4412,6 +4442,32 @@ sys_var_event_scheduler::update(THD *thd, set_var *var)
} }
#ifdef HAVE_REPLICATION
bool sys_var_replicate_ignore_do_not_replicate::update(THD *thd, set_var *var)
{
bool result;
int thread_mask;
DBUG_ENTER("sys_var_replicate_ignore_do_not_replicate::update");
/* Slave threads must be stopped to change the variable. */
pthread_mutex_lock(&LOCK_active_mi);
lock_slave_threads(active_mi);
init_thread_mask(&thread_mask, active_mi, 0 /*not inverse*/);
if (thread_mask) // We refuse if any slave thread is running
{
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
result= TRUE;
}
else
result= sys_var_bool_ptr::update(thd, var);
unlock_slave_threads(active_mi);
pthread_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(result);
}
#endif
uchar *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type, uchar *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type,
LEX_STRING *base) LEX_STRING *base)
{ {
......
...@@ -1283,6 +1283,25 @@ class sys_var_thd_binlog_format :public sys_var_thd_enum ...@@ -1283,6 +1283,25 @@ class sys_var_thd_binlog_format :public sys_var_thd_enum
bool is_readonly() const; bool is_readonly() const;
}; };
#ifdef HAVE_REPLICATION
/**
Handler for setting the system variable --replicate-ignore-do-not-replicate.
*/
class sys_var_replicate_ignore_do_not_replicate :public sys_var_bool_ptr
{
public:
sys_var_replicate_ignore_do_not_replicate(sys_var_chain *chain,
const char *name_arg,
my_bool *value_arg) :
sys_var_bool_ptr(chain, name_arg, value_arg) {};
~sys_var_replicate_ignore_do_not_replicate() {};
bool update(THD *thd, set_var *var);
};
#endif
/**************************************************************************** /****************************************************************************
Classes for parsing of the SET command Classes for parsing of the SET command
****************************************************************************/ ****************************************************************************/
......
...@@ -1176,6 +1176,38 @@ when it try to get the value of TIME_ZONE global variable from master."; ...@@ -1176,6 +1176,38 @@ when it try to get the value of TIME_ZONE global variable from master.";
} }
} }
/*
Request the master to filter away events with the @@do_not_replicate flag
set, if we are running with --replicate-ignore-do_not_replicate=1.
*/
if (opt_replicate_ignore_do_not_replicate)
{
if (!mysql_real_query(mysql, STRING_WITH_LEN("SET do_not_replicate=1")))
{
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
{
mi->report(ERROR_LEVEL, err_code,
"Setting master-side filtering of @@do_not_replicate failed "
"with error: %s", mysql_error(mysql));
goto network_err;
}
else if (err_code == ER_UNKNOWN_SYSTEM_VARIABLE)
{
/*
The master is older than the slave and does not support the
@@do_not_replicate feature.
This is not a problem, as such master will not generate events with
the @@do_not_replicate flag set in the first place. We will still
do slave-side filtering of such events though, to handle the (rare)
case of downgrading a master and receiving old events generated from
before the downgrade with the @@do_not_replicate flag set.
*/
DBUG_PRINT("info", ("Old master does not support master-side filtering "
"of @@do_not_replicate events."));
}
}
}
err: err:
if (errmsg) if (errmsg)
{ {
...@@ -2114,6 +2146,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) ...@@ -2114,6 +2146,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
thd->lex->current_select= 0; thd->lex->current_select= 0;
if (!ev->when) if (!ev->when)
ev->when= my_time(0); ev->when= my_time(0);
thd->options= (thd->options & ~OPTION_DO_NOT_REPLICATE) |
(ev->flags & LOG_EVENT_DO_NOT_REPLICATE_F ? OPTION_DO_NOT_REPLICATE : 0);
ev->thd = thd; // because up to this point, ev->thd == 0 ev->thd = thd; // because up to this point, ev->thd == 0
int reason= ev->shall_skip(rli); int reason= ev->shall_skip(rli);
...@@ -3582,6 +3616,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -3582,6 +3616,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{ {
int error= 0; int error= 0;
ulong inc_pos; ulong inc_pos;
ulong event_pos;
Relay_log_info *rli= &mi->rli; Relay_log_info *rli= &mi->rli;
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock(); pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
DBUG_ENTER("queue_event"); DBUG_ENTER("queue_event");
...@@ -3666,6 +3701,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -3666,6 +3701,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
break; break;
} }
/*
If we filter events master-side (eg. @@do_not_replicate), we will see holes
in the event positions from the master. If we see such a hole, adjust
mi->master_log_pos accordingly so we maintain the correct position (for
reconnect, MASTER_POS_WAIT(), etc.)
*/
if (inc_pos > 0 &&
event_len >= LOG_POS_OFFSET+4 &&
(event_pos= uint4korr(buf+LOG_POS_OFFSET)) > mi->master_log_pos + inc_pos)
{
inc_pos= event_pos - mi->master_log_pos;
DBUG_PRINT("info", ("Adjust master_log_pos %lu->%lu to account for "
"master-side filtering",
(unsigned long)(mi->master_log_pos + inc_pos),
event_pos));
}
/* /*
If this event is originating from this server, don't queue it. If this event is originating from this server, don't queue it.
We don't check this for 3.23 events because it's simpler like this; 3.23 We don't check this for 3.23 events because it's simpler like this; 3.23
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
void mysql_client_binlog_statement(THD* thd) void mysql_client_binlog_statement(THD* thd)
{ {
ulonglong save_do_not_replicate;
DBUG_ENTER("mysql_client_binlog_statement"); DBUG_ENTER("mysql_client_binlog_statement");
DBUG_PRINT("info",("binlog base64: '%*s'", DBUG_PRINT("info",("binlog base64: '%*s'",
(int) (thd->lex->comment.length < 2048 ? (int) (thd->lex->comment.length < 2048 ?
...@@ -213,7 +214,15 @@ void mysql_client_binlog_statement(THD* thd) ...@@ -213,7 +214,15 @@ void mysql_client_binlog_statement(THD* thd)
reporting. reporting.
*/ */
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
save_do_not_replicate= thd->options & OPTION_DO_NOT_REPLICATE;
thd->options= (thd->options & ~OPTION_DO_NOT_REPLICATE) |
(ev->flags & LOG_EVENT_DO_NOT_REPLICATE_F ?
OPTION_DO_NOT_REPLICATE : 0);
err= ev->apply_event(rli); err= ev->apply_event(rli);
thd->options= (thd->options & ~OPTION_DO_NOT_REPLICATE) |
save_do_not_replicate;
#else #else
err= 0; err= 0;
#endif #endif
......
...@@ -337,6 +337,41 @@ Increase max_allowed_packet on master"; ...@@ -337,6 +337,41 @@ Increase max_allowed_packet on master";
} }
/*
Helper function for mysql_binlog_send() to write an event down the slave
connection.
Returns NULL on success, error message string on error.
*/
static const char *
send_event_to_slave(THD *thd, NET *net, String* const packet)
{
thd_proc_info(thd, "Sending binlog event to slave");
/*
Skip events with the @@do_not_replicate flag set, if slave requested
skipping of such events.
*/
if (thd->options & OPTION_DO_NOT_REPLICATE)
{
uint16 flags= uint2korr(&((*packet)[FLAGS_OFFSET+1]));
if (flags & LOG_EVENT_DO_NOT_REPLICATE_F)
return NULL;
}
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
return "Failed on my_net_write()";
DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
{
if (send_file(thd))
return "failed in send_file()";
}
return NULL; /* Success */
}
/* /*
TODO: Clean up loop to only have one call to send_file() TODO: Clean up loop to only have one call to send_file()
*/ */
...@@ -349,9 +384,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ...@@ -349,9 +384,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
char search_file_name[FN_REFLEN], *name; char search_file_name[FN_REFLEN], *name;
IO_CACHE log; IO_CACHE log;
File file = -1; File file = -1;
String* packet = &thd->packet; String* const packet = &thd->packet;
int error; int error;
const char *errmsg = "Unknown error"; const char *errmsg = "Unknown error", *tmp_msg;
NET* net = &thd->net; NET* net = &thd->net;
pthread_mutex_t *log_lock; pthread_mutex_t *log_lock;
bool binlog_can_be_corrupted= FALSE; bool binlog_can_be_corrupted= FALSE;
...@@ -588,9 +623,9 @@ impossible position"; ...@@ -588,9 +623,9 @@ impossible position";
else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
binlog_can_be_corrupted= FALSE; binlog_can_be_corrupted= FALSE;
if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) if ((tmp_msg= send_event_to_slave(thd, net, packet)))
{ {
errmsg = "Failed on my_net_write()"; errmsg = tmp_msg;
my_errno= ER_UNKNOWN_ERROR; my_errno= ER_UNKNOWN_ERROR;
goto err; goto err;
} }
...@@ -603,17 +638,6 @@ impossible position"; ...@@ -603,17 +638,6 @@ impossible position";
} }
}); });
DBUG_PRINT("info", ("log event code %d",
(*packet)[LOG_EVENT_OFFSET+1] ));
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
{
if (send_file(thd))
{
errmsg = "failed in send_file()";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
}
packet->set("\0", 1, &my_charset_bin); packet->set("\0", 1, &my_charset_bin);
} }
...@@ -713,23 +737,12 @@ impossible position"; ...@@ -713,23 +737,12 @@ impossible position";
if (read_packet) if (read_packet)
{ {
thd_proc_info(thd, "Sending binlog event to slave"); if ((tmp_msg= send_event_to_slave(thd, net, packet)))
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) {
{ errmsg = tmp_msg;
errmsg = "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR;
my_errno= ER_UNKNOWN_ERROR; goto err;
goto err; }
}
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
{
if (send_file(thd))
{
errmsg = "failed in send_file()";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
}
packet->set("\0", 1, &my_charset_bin); packet->set("\0", 1, &my_charset_bin);
/* /*
No need to net_flush because we will get to flush later when No need to net_flush because we will get to flush later when
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment