Commit 9bb989a9 authored by unknown's avatar unknown

MDEV-26: Global transaction ID.

Fix MDEV-4275 - I/O thread restart duplicates events in the relay log.
The first time we connect to master after CHANGE MASTER or restart, we connect
from the GTID position. But then subsequent reconnects or IO thread restarts
reconnect with the old-style file/offset binlog pos from where it left off at
last disconnect. This is necessary to avoid duplicate events in the relay
logs, as there is nothing that synchronises the SQL thread update of GTID
state (multiple threads in case of multi-source) with IO thread reconnects.

Test cases.

Some small cleanups and fixes.
parent 9d9ddad7
include/rpl_init.inc [topology=1->2]
*** Test crashing master, causing slave IO thread to reconnect while SQL thread is running ***
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);
include/stop_slave.inc
CHANGE MASTER TO master_host = '127.0.0.1', master_port = MASTER_PORT,
MASTER_GTID_POS=AUTO;
INSERT INTO t1 VALUES (2,1);
INSERT INTO t1 VALUES (3,1);
include/start_slave.inc
SET SESSION debug_dbug="+d,crash_dispatch_command_before";
SELECT 1;
Got one of the listed errors
INSERT INTO t1 VALUES (1000, 3);
DROP TABLE t1;
include/rpl_end.inc
......@@ -67,4 +67,25 @@ a
3
4
DROP TABLE t1;
*** MDEV-4275: I/O thread restart duplicates events in relay log ***
include/stop_slave.inc
RESET SLAVE ALL;
RESET MASTER;
RESET MASTER;
CHANGE MASTER TO master_host='127.0.0.1', master_port=MASTER_PORT, master_user='root', master_gtid_pos='';
include/start_slave.inc
CREATE TABLE t1 (a INT PRIMARY KEY);
INSERT INTO t1 VALUES (1);
SELECT * FROM t1;
a
1
include/stop_slave_io.inc
START SLAVE IO_THREAD;
include/wait_for_slave_io_to_start.inc
INSERT INTO t1 VALUES (2);
SELECT * FROM t1 ORDER BY a;
a
1
2
DROP TABLE t1;
include/rpl_end.inc
--skip-stack-trace --skip-core-file
--source include/have_innodb.inc
--let $rpl_topology=1->2
--source include/rpl_init.inc
--echo *** Test crashing master, causing slave IO thread to reconnect while SQL thread is running ***
--connection server_1
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);
--save_master_pos
--connection server_2
--sync_with_master
--source include/stop_slave.inc
--replace_result $MASTER_MYPORT MASTER_PORT
eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT,
MASTER_GTID_POS=AUTO;
--connection server_1
INSERT INTO t1 VALUES (2,1);
INSERT INTO t1 VALUES (3,1);
--connection server_2
--source include/start_slave.inc
--connection server_1
--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
wait-rpl_gtid_crash.test
EOF
let $1=200;
--disable_query_log
while ($1)
{
eval INSERT INTO t1 VALUES ($1 + 10, 2);
dec $1;
}
--enable_query_log
SET SESSION debug_dbug="+d,crash_dispatch_command_before";
--error 2006,2013
SELECT 1;
--remove_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
restart-rpl_gtid_crash.test
EOF
--enable_reconnect
--source include/wait_until_connected_again.inc
--connection server_2
--let $wait_condition= SELECT COUNT(*) = 200 FROM t1 WHERE b=2
--source include/wait_condition.inc
--connection server_1
INSERT INTO t1 VALUES (1000, 3);
--connection server_2
--let $wait_condition= SELECT COUNT(*) = 1 FROM t1 WHERE b=3
--source include/wait_condition.inc
--connection server_1
DROP TABLE t1;
--connection default
--enable_reconnect
--source include/wait_until_connected_again.inc
--source include/rpl_end.inc
......@@ -98,9 +98,54 @@ START SLAVE;
--source include/wait_condition.inc
SELECT * FROM t1 ORDER by a;
# Clean up.
--connection server_1
DROP TABLE t1;
--save_master_pos
--connection server_2
--sync_with_master
--echo *** MDEV-4275: I/O thread restart duplicates events in relay log ***
--connection server_2
--source include/stop_slave.inc
RESET SLAVE ALL;
RESET MASTER;
--connection server_1
RESET MASTER;
--connection server_2
--replace_result $MASTER_MYPORT MASTER_PORT
eval CHANGE MASTER TO master_host='127.0.0.1', master_port=$MASTER_MYPORT, master_user='root', master_gtid_pos='';
--source include/start_slave.inc
--connection server_1
CREATE TABLE t1 (a INT PRIMARY KEY);
INSERT INTO t1 VALUES (1);
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t1;
--source include/stop_slave_io.inc
START SLAVE IO_THREAD;
--source include/wait_for_slave_io_to_start.inc
--connection server_1
INSERT INTO t1 VALUES (2);
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t1 ORDER BY a;
# Clean up.
--connection server_1
DROP TABLE t1;
--source include/rpl_end.inc
......@@ -3702,6 +3702,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log)
const char* save_name;
DBUG_ENTER("reset_logs");
if (thd)
ha_reset_logs(thd);
/*
We need to get both locks to be sure that no one is trying to
......@@ -8486,7 +8487,9 @@ binlog_background_thread(void *arg __attribute__((unused)))
#ifdef HAVE_REPLICATION
if (rpl_load_gtid_slave_state(thd))
sql_print_warning("Failed to load slave replication state from table "
"%s.%s", "mysql", rpl_gtid_slave_state_table_name.str);
"%s.%s: %u: %s", "mysql",
rpl_gtid_slave_state_table_name.str,
thd->stmt_da->sql_errno(), thd->stmt_da->message());
#endif
mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread);
......
......@@ -1319,6 +1319,35 @@ public:
return do_shall_skip(rli);
}
/*
Check if an event is non-final part of a stand-alone event group,
such as Intvar_log_event (such events should be processed as part
of the following event group, not individually).
*/
static bool is_part_of_group(enum Log_event_type ev_type)
{
switch (ev_type)
{
case GTID_EVENT:
case INTVAR_EVENT:
case RAND_EVENT:
case USER_VAR_EVENT:
case TABLE_MAP_EVENT:
case ANNOTATE_ROWS_EVENT:
return true;
case DELETE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case WRITE_ROWS_EVENT:
/*
ToDo: also check for non-final Rows_log_event (though such events
are usually in a BEGIN-COMMIT group).
*/
default:
return false;
}
}
protected:
/**
......
......@@ -31,6 +31,11 @@ struct rpl_gtid
};
enum enum_gtid_skip_type {
GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION
};
/*
Replication slave state.
......
......@@ -37,7 +37,7 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
checksum_alg_before_fd(BINLOG_CHECKSUM_ALG_UNDEF),
connect_retry(DEFAULT_CONNECT_RETRY), inited(0), abort_slave(0),
slave_running(0), slave_run_id(0), sync_counter(0),
heartbeat_period(0), received_heartbeats(0), master_id(0), gtid_pos_auto(0)
heartbeat_period(0), received_heartbeats(0), master_id(0), using_gtid(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
......@@ -436,8 +436,8 @@ file '%s')", fname);
*/
while (!init_strvar_from_file(buf, sizeof(buf), &mi->file, 0))
{
if (0 == strncmp(buf, STRING_WITH_LEN("gtid_pos_auto=")))
mi->gtid_pos_auto= (0 != atoi(buf + sizeof("gtid_pos_auto")));
if (0 == strncmp(buf, STRING_WITH_LEN("using_gtid=")))
mi->using_gtid= (0 != atoi(buf + sizeof("using_gtid")));
}
}
}
......@@ -581,14 +581,14 @@ int flush_master_info(Master_info* mi,
my_b_printf(file,
"%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n"
"\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n"
"gtid_pos_auto=%d\n",
"using_gtid=%d\n",
LINES_IN_MASTER_INFO,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
mi->host, mi->user,
mi->password, mi->port, mi->connect_retry,
(int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert,
heartbeat_buf, "", ignore_server_ids_buf, mi->gtid_pos_auto);
heartbeat_buf, "", ignore_server_ids_buf, mi->using_gtid);
my_free(ignore_server_ids_buf);
err= flush_io_cache(file);
if (sync_masterinfo_period && !err &&
......
......@@ -126,8 +126,11 @@ class Master_info : public Slave_reporting_capability
ulonglong received_heartbeats; // counter of received heartbeat events
DYNAMIC_ARRAY ignore_server_ids;
ulong master_id;
/* If last CHANGE MASTER was MASTER_GTID_POS=AUTO. */
bool gtid_pos_auto;
/*
True if slave position is set using GTID state rather than old-style
file/offset binlog position.
*/
bool using_gtid;
};
int init_master_info(Master_info* mi, const char* master_info_fname,
const char* slave_info_fname,
......
......@@ -399,32 +399,6 @@ int init_recovery(Master_info* mi, const char** errmsg)
}
/*
When connecting a slave to a master with GTID, we reset the relay log
coordinates of the SQL thread and clear the master coordinates of SQL and IO
threads.
This way we ensure that we start from the correct place even after a change
to new master or a crash where relay log coordinates may be wrong (GTID
state is crash safe but master.info is not). And we get the correct master
coordinates set upon reading the initial fake rotate event sent from master.
*/
static void
reset_coordinates_for_gtid(Master_info *mi, Relay_log_info *rli)
{
mi->master_log_pos= 0;
mi->master_log_name[0]= 0;
rli->group_master_log_pos= 0;
rli->group_master_log_name[0]= 0;
rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE;
strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
sizeof(rli->group_relay_log_name)-1);
rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
sizeof(mi->rli.event_relay_log_name)-1);
}
/**
Convert slave skip errors bitmap into a printable string.
*/
......@@ -811,6 +785,7 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
mysql_mutex_t *lock_io=0, *lock_sql=0, *lock_cond_io=0, *lock_cond_sql=0;
mysql_cond_t* cond_io=0, *cond_sql=0;
int error=0;
const char *errmsg;
DBUG_ENTER("start_slave_threads");
if (need_slave_mutex)
......@@ -826,6 +801,22 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
lock_cond_sql = &mi->rli.run_lock;
}
/*
If we are using GTID and both SQL and IO threads are stopped, then get
rid of all relay logs.
Relay logs are not very useful when using GTID, except as a buffer
between the fetch in the IO thread and the apply in SQL thread. However
while one of the threads is running, they are in use and cannot be
removed.
*/
if (mi->using_gtid && !mi->slave_running && !mi->rli.slave_running)
{
purge_relay_logs(&mi->rli, NULL, 0, &errmsg);
mi->master_log_name[0]= 0;
mi->master_log_pos= 0;
}
if (thread_mask & SLAVE_IO)
error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE
......@@ -1813,9 +1804,17 @@ past_checksum:
after_set_capability:
#endif
/* Request dump start from slave replication GTID state. */
/*
Request dump start from slave replication GTID state.
Only request GTID position the first time we connect after CHANGE MASTER
or after starting both IO or SQL thread.
if (mi->gtid_pos_auto)
Otherwise, if the IO thread was ahead of the SQL thread before the
restart or reconnect, we might end up re-fetching and hence re-applying
the same event(s) again.
*/
if (mi->using_gtid && !mi->master_log_name[0])
{
int rc;
char str_buf[256];
......@@ -1866,7 +1865,7 @@ after_set_capability:
}
}
}
else
if (!mi->using_gtid)
{
/*
If we are not using GTID to connect this time, then instead request
......@@ -2435,7 +2434,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
}
// Master_Server_id
protocol->store((uint32) mi->master_id);
protocol->store((uint32) (mi->gtid_pos_auto != 0));
protocol->store((uint32) (mi->using_gtid != 0));
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
......@@ -3412,8 +3411,6 @@ connected:
if (ret == 1)
/* Fatal error */
goto err;
if (mi->gtid_pos_auto)
reset_coordinates_for_gtid(mi, rli);
if (ret == 2)
{
......
......@@ -806,8 +806,9 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
requested by the slave, then we still give error (below, after
the loop).
*/
if (!(missing_domains++))
missing_domain_gtid= domain_gtid;
if (!missing_domains)
missing_domain_gtid= *slave_gtid;
++missing_domains;
continue;
}
*errormsg= "Requested slave GTID state not found in binlog";
......@@ -1176,10 +1177,6 @@ gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
}
enum enum_gtid_skip_type {
GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION
};
/*
Helper function for mysql_binlog_send() to write an event down the slave
connection.
......@@ -1234,12 +1231,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
switch (*gtid_skip_group)
{
case GTID_SKIP_STANDALONE:
if (event_type != GTID_EVENT &&
event_type != INTVAR_EVENT &&
event_type != RAND_EVENT &&
event_type != USER_VAR_EVENT &&
event_type != TABLE_MAP_EVENT &&
event_type != ANNOTATE_ROWS_EVENT)
if (!Log_event::is_part_of_group(event_type))
*gtid_skip_group= GTID_SKIP_NOT;
return NULL;
case GTID_SKIP_TRANSACTION:
......@@ -2713,11 +2705,11 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
}
if (lex_mi->gtid_pos_auto || lex_mi->gtid_pos_str.str)
mi->gtid_pos_auto= true;
mi->using_gtid= true;
else if (lex_mi->gtid_pos_str.str ||
lex_mi->log_file_name || lex_mi->pos ||
lex_mi->relay_log_name || lex_mi->relay_log_pos)
mi->gtid_pos_auto= false;
mi->using_gtid= false;
/*
If user did specify neither host nor port nor any log name nor any log
......@@ -2783,6 +2775,16 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
ret= TRUE;
goto err;
}
if (mi->using_gtid)
{
/*
Clear the position in the master binlogs, so that we request the
correct GTID position.
*/
mi->master_log_name[0]= 0;
mi->master_log_pos= 0;
}
}
else
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment