Commit 5c68343d authored by seppo's avatar seppo Committed by Jan Lindström

MDEV-18497 CTAS async replication from mariadb master crashes galera nodes (#1410)

This PR contains a mtr test for reproducing a failure with replicating create table as select statement (CTAS) through asynchronous mariadb replication to mariadb galera cluster.
The problem happens when CTAS replication contains both create table statement followed by row events for populating the table. In such situation, the galera node operating as mariadb replication slave, will first replicate only the create table part into the cluster, and then perform another replication containing both the create table and row events. This will lead all other nodes to fail for duplicate table create attempt, and crash due to this failure.

PR contains also a fix, which identifies the situation when CTAS has been replicated, and makes further scan in async replication stream to see if there are following row events. The slave node will replicate either single TOI in case the CTAS table is empty, or if CTAS table contains rows, then single bundled write set with create table and row events is replicated to galera cluster.

This fix should keep master server's GTID's for CTAS replication in sync with GTID's in galera cluster.
parent 29097256
START SLAVE;
SHOW VARIABLES LIKE 'binlog_format';
Variable_name Value
binlog_format ROW
CREATE TABLE source (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
CREATE TABLE target AS SELECT * FROM source;
DROP TABLE target;
INSERT INTO source VALUES(1);
CREATE TABLE target AS SELECT * FROM source;
DROP TABLE source;
DROP TABLE target;
STOP SLAVE;
RESET SLAVE ALL;
RESET MASTER;
!include ../galera_2nodes_as_slave.cnf
# make sure master server uses ROW format for replication
[mysqld]
binlog-format=row
#
# Test Galera as a slave to a MySQL master
#
# The galera/galera_2node_slave.cnf describes the setup of the nodes
# also, for this test, master server must have binlog_format=ROW
#
--source include/have_innodb.inc
# As node #1 is not a Galera node, we connect to node #2 in order to run include/galera_cluster.inc
--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
--source include/galera_cluster.inc
--connect node_3, 127.0.0.1, root, , test, $NODE_MYPORT_3
--connection node_2
--disable_query_log
--eval CHANGE MASTER TO MASTER_HOST='127.0.0.1', MASTER_USER='root', MASTER_PORT=$NODE_MYPORT_1;
--enable_query_log
START SLAVE;
# make sure master server has binlog_format=ROW
--connection node_1
SHOW VARIABLES LIKE 'binlog_format';
#
# test phase one, issue CTAS with empty source table
#
--connection node_1
CREATE TABLE source (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
CREATE TABLE target AS SELECT * FROM source;
--connection node_2
--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'target';
--source include/wait_condition.inc
--connection node_3
--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'target';
--source include/wait_condition.inc
#
# test phase two, issue CTAS with populated source table
#
--connection node_1
DROP TABLE target;
INSERT INTO source VALUES(1);
CREATE TABLE target AS SELECT * FROM source;
--connection node_2
--let $wait_condition = SELECT COUNT(*) = 1 FROM target;
--source include/wait_condition.inc
--connection node_3
--let $wait_condition = SELECT COUNT(*) = 1 FROM target;
--source include/wait_condition.inc
--connection node_1
DROP TABLE source;
DROP TABLE target;
--connection node_3
--let $wait_condition = SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'target';
--source include/wait_condition.inc
--connection node_2
STOP SLAVE;
RESET SLAVE ALL;
--connection node_1
RESET MASTER;
--sleep 20
...@@ -4967,6 +4967,9 @@ bool event_that_should_be_ignored(const char *buf); ...@@ -4967,6 +4967,9 @@ bool event_that_should_be_ignored(const char *buf);
bool event_checksum_test(uchar *buf, ulong event_len, enum_binlog_checksum_alg alg); bool event_checksum_test(uchar *buf, ulong event_len, enum_binlog_checksum_alg alg);
enum enum_binlog_checksum_alg get_checksum_alg(const char* buf, ulong len); enum enum_binlog_checksum_alg get_checksum_alg(const char* buf, ulong len);
extern TYPELIB binlog_checksum_typelib; extern TYPELIB binlog_checksum_typelib;
#ifdef WITH_WSREP
enum Log_event_type wsrep_peak_event(rpl_group_info *rgi, ulonglong* event_size);
#endif /* WITH_WSREP */
/** /**
@} (end of group Replication) @} (end of group Replication)
......
...@@ -7220,7 +7220,33 @@ event(errno: %d cur_log->error: %d)", ...@@ -7220,7 +7220,33 @@ event(errno: %d cur_log->error: %d)",
sql_print_error("Error reading relay log event: %s", errmsg); sql_print_error("Error reading relay log event: %s", errmsg);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
#ifdef WITH_WSREP
enum Log_event_type wsrep_peak_event(rpl_group_info *rgi, ulonglong* event_size)
{
mysql_mutex_lock(&rgi->rli->data_lock);
unsigned long long event_pos= rgi->event_relay_log_pos;
unsigned long long future_pos= rgi->future_event_relay_log_pos;
/* scan the log to read next event */
my_b_seek(rgi->rli->cur_log, future_pos);
rgi->rli->event_relay_log_pos= future_pos;
rgi->event_relay_log_pos= future_pos;
Log_event* ev = next_event(rgi, event_size);
enum Log_event_type ev_type= (ev) ? ev->get_type_code() : UNKNOWN_EVENT;
delete ev;
/* scan the log back and re-set the positions to original values */
rgi->rli->event_relay_log_pos= event_pos;
rgi->event_relay_log_pos= event_pos;
my_b_seek(rgi->rli->cur_log, future_pos);
mysql_mutex_unlock(&rgi->rli->data_lock);
return ev_type;
}
#endif /* WITH_WSREP */
/* /*
Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
because of size is simpler because when we do it we already have all relevant because of size is simpler because when we do it we already have all relevant
......
...@@ -37,7 +37,6 @@ ...@@ -37,7 +37,6 @@
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
#include "log_event.h" #include "log_event.h"
#include <slave.h>
#include "sql_plugin.h" /* wsrep_plugins_pre_init() */ #include "sql_plugin.h" /* wsrep_plugins_pre_init() */
wsrep_t *wsrep = NULL; wsrep_t *wsrep = NULL;
...@@ -1502,6 +1501,39 @@ static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table, ...@@ -1502,6 +1501,39 @@ static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
{ {
return false; return false;
} }
/*
If mariadb master has replicated a CTAS, we should not replicate the create table
part separately as TOI, but to replicate both create table and following inserts
as one write set.
Howver, if CTAS creates empty table, we should replicate the create table alone
as TOI. We have to do relay log event lookup to see if row events follow the
create table event.
*/
if (thd->slave_thread && !(thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_STANDALONE))
{
/* this is CTAS, either empty or populated table */
ulonglong event_size = 0;
enum Log_event_type ev_type= wsrep_peak_event(thd->rgi_slave, &event_size);
switch (ev_type)
{
case QUERY_EVENT:
/* CTAS with empty table, we replicate create table as TOI */
break;
case TABLE_MAP_EVENT:
WSREP_DEBUG("replicating CTAS of empty table as TOI");
// fall through
case WRITE_ROWS_EVENT:
/* CTAS with populated table, we replicate later at commit time */
WSREP_DEBUG("skipping create table of CTAS replication");
return false;
default:
WSREP_WARN("unexpected async replication event: %d", ev_type);
}
return true;
}
/* no next async replication event */
return true; return true;
case SQLCOM_CREATE_VIEW: case SQLCOM_CREATE_VIEW:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment