Commit 3ef0b9b2 authored by Kristian Nielsen's avatar Kristian Nielsen

Merge MDEV-6589 and MDEV-6403 into 10.0.

parents e33b48ae 78c74dbe
include/master-slave.inc
[connection master]
*** MDEV-6403: Temporary tables lost at STOP SLAVE in GTID mode if master has not rotated binlog since restart ***
CREATE TABLE t1 (a INT PRIMARY KEY);
include/stop_slave.inc
SET sql_log_bin= 0;
INSERT INTO t1 VALUES (1);
SET sql_log_bin= 1;
CHANGE MASTER TO master_use_gtid= current_pos;
CREATE TEMPORARY TABLE t2 LIKE t1;
INSERT INTO t2 VALUE (1);
INSERT INTO t1 SELECT * FROM t2;
DROP TEMPORARY TABLE t2;
START SLAVE;
include/wait_for_slave_sql_error.inc [errno=1062]
STOP SLAVE IO_THREAD;
SET sql_log_bin= 0;
DELETE FROM t1 WHERE a=1;
SET sql_log_bin= 1;
include/start_slave.inc
SELECT * FROM t1 ORDER BY a;
a
1
DROP TABLE t1;
include/rpl_end.inc
include/master-slave.inc
[connection master]
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=current_pos;
include/start_slave.inc
*** MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication ***
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
INSERT INTO t2 VALUES (1);
SELECT * FROM t1;
a
1
SELECT * FROM t2;
a
1
SET sql_log_bin=0;
BEGIN;
INSERT INTO t2 VALUES (5);
SET gtid_domain_id=0;
INSERT INTO t1 VALUES (2);
INSERT INTO t2 VALUES (3);
FLUSH LOGS;
INSERT INTO t1 VALUES (4);
SET gtid_domain_id=1;
INSERT INTO t2 VALUES (5);
SET gtid_domain_id=0;
INSERT INTO t1 VALUES (6);
INSERT INTO t1 VALUES (7);
SET gtid_domain_id=2;
INSERT INTO t2 VALUES (8);
INSERT INTO t1 VALUES (9);
FLUSH LOGS;
SET gtid_domain_id=3;
INSERT INTO t2 VALUES (10);
INSERT INTO t1 VALUES (11);
SET gtid_domain_id=1;
INSERT INTO t1 VALUES (12);
INSERT INTO t2 VALUES (13);
SET gtid_domain_id=0;
INSERT INTO t2 VALUES (14);
FLUSH LOGS;
SET gtid_domain_id=3;
INSERT INTO t2 VALUES (15);
SET gtid_domain_id=2;
INSERT INTO t2 VALUES (16);
SET gtid_domain_id=0;
INSERT INTO t1 VALUES (17);
SET @gtid0 = @@last_gtid;
SET gtid_domain_id=2;
INSERT INTO t1 VALUES (18);
SET @gtid2 = @@last_gtid;
SET gtid_domain_id=3;
INSERT INTO t1 VALUES (19);
SET @gtid3 = @@last_gtid;
SELECT * FROM t1 ORDER BY a;
a
1
2
4
6
7
9
11
12
17
18
19
SELECT * FROM t2 ORDER BY a;
a
1
3
5
8
10
13
14
15
16
include/save_master_gtid.inc
SELECT MASTER_GTID_WAIT('WAIT_POS');
MASTER_GTID_WAIT('WAIT_POS')
0
COMMIT;
SET sql_log_bin=1;
include/wait_for_slave_sql_error.inc [errno=1062]
SELECT * FROM t1 ORDER BY a;
a
1
2
4
6
7
9
11
17
18
19
SELECT * FROM t2 ORDER BY a;
a
1
3
5
8
10
14
15
16
SET sql_log_bin=0;
DELETE FROM t2 WHERE a=5;
SET sql_log_bin=1;
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a
1
2
4
6
7
9
11
12
17
18
19
SELECT * FROM t2 ORDER BY a;
a
1
3
5
8
10
13
14
15
16
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
SET DEBUG_SYNC= 'RESET';
DROP TABLE t1,t2;
SET DEBUG_SYNC= 'RESET';
include/rpl_end.inc
--source include/master-slave.inc
--echo *** MDEV-6403: Temporary tables lost at STOP SLAVE in GTID mode if master has not rotated binlog since restart ***
--connection master
CREATE TABLE t1 (a INT PRIMARY KEY);
--sync_slave_with_master
--connection slave
--source include/stop_slave.inc
# Inject a duplicate key error that will make the slave stop in the middle of
# a sequence of transactions that use a temporary table.
SET sql_log_bin= 0;
INSERT INTO t1 VALUES (1);
SET sql_log_bin= 1;
CHANGE MASTER TO master_use_gtid= current_pos;
--connection master
# Make some queries that use a temporary table.
CREATE TEMPORARY TABLE t2 LIKE t1;
INSERT INTO t2 VALUE (1);
INSERT INTO t1 SELECT * FROM t2;
DROP TEMPORARY TABLE t2;
--save_master_pos
--connection slave
START SLAVE;
--let $slave_sql_errno=1062
--source include/wait_for_slave_sql_error.inc
# Restart the slave.
# The bug was that the IO thread would receive again the restart
# format_description event at the start of the master's binlog, and this
# event would cause the SQL thread to discard all active temporary tables.
STOP SLAVE IO_THREAD;
SET sql_log_bin= 0;
DELETE FROM t1 WHERE a=1;
SET sql_log_bin= 1;
--source include/start_slave.inc
--sync_with_master
SELECT * FROM t1 ORDER BY a;
--connection master
DROP TABLE t1;
--source include/rpl_end.inc
--source include/have_innodb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
--source include/master-slave.inc
--connection server_2
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=current_pos;
--source include/start_slave.inc
--echo *** MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication ***
--connection server_1
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
INSERT INTO t2 VALUES (1);
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t1;
SELECT * FROM t2;
# Block one domain, which we will later cause to give an error. And let some
# other domains proceed so we can check that after restart, the slave is able
# to correctly restart each domain in a separate position.
--connect (con_temp1,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
SET sql_log_bin=0;
BEGIN;
INSERT INTO t2 VALUES (5);
--connection server_1
SET gtid_domain_id=0;
INSERT INTO t1 VALUES (2);
INSERT INTO t2 VALUES (3);
FLUSH LOGS;
INSERT INTO t1 VALUES (4);
SET gtid_domain_id=1;
# This query will be blocked on the slave, and later give a duplicate key error.
INSERT INTO t2 VALUES (5);
SET gtid_domain_id=0;
INSERT INTO t1 VALUES (6);
INSERT INTO t1 VALUES (7);
SET gtid_domain_id=2;
INSERT INTO t2 VALUES (8);
INSERT INTO t1 VALUES (9);
FLUSH LOGS;
SET gtid_domain_id=3;
INSERT INTO t2 VALUES (10);
INSERT INTO t1 VALUES (11);
# These cannot be replicated before the error, as a prior commit is blocked.
SET gtid_domain_id=1;
INSERT INTO t1 VALUES (12);
INSERT INTO t2 VALUES (13);
SET gtid_domain_id=0;
INSERT INTO t2 VALUES (14);
FLUSH LOGS;
SET gtid_domain_id=3;
INSERT INTO t2 VALUES (15);
SET gtid_domain_id=2;
INSERT INTO t2 VALUES (16);
SET gtid_domain_id=0;
INSERT INTO t1 VALUES (17);
SET @gtid0 = @@last_gtid;
SET gtid_domain_id=2;
INSERT INTO t1 VALUES (18);
SET @gtid2 = @@last_gtid;
SET gtid_domain_id=3;
INSERT INTO t1 VALUES (19);
SET @gtid3 = @@last_gtid;
--let $wait_pos= `SELECT CONCAT(@gtid0, ",", @gtid2, ",", @gtid3)`
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
--source include/save_master_gtid.inc
--connection server_2
# First wait for domains 0, 2, and 3 to complete.
--replace_result $wait_pos WAIT_POS
eval SELECT MASTER_GTID_WAIT('$wait_pos');
# Then release the row lock, and wait for the domain 1 to fail with
# duplicate key error.
--connection con_temp1
COMMIT;
SET sql_log_bin=1;
--connection server_2
--let $slave_sql_errno= 1062
--source include/wait_for_slave_sql_error.inc
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
SET sql_log_bin=0;
DELETE FROM t2 WHERE a=5;
SET sql_log_bin=1;
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
# Clean up.
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc
SET DEBUG_SYNC= 'RESET';
--connection server_1
DROP TABLE t1,t2;
SET DEBUG_SYNC= 'RESET';
--source include/rpl_end.inc
...@@ -4134,8 +4134,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) ...@@ -4134,8 +4134,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
included= 1; included= 1;
to_purge_if_included= my_strdup(ir->name, MYF(0)); to_purge_if_included= my_strdup(ir->name, MYF(0));
} }
my_atomic_rwlock_destroy(&ir->inuse_relaylog_atomic_lock); rli->free_inuse_relaylog(ir);
my_free(ir);
ir= next; ir= next;
} }
rli->inuse_relaylog_list= ir; rli->inuse_relaylog_list= ir;
......
...@@ -1089,6 +1089,27 @@ rpl_binlog_state::load(struct rpl_gtid *list, uint32 count) ...@@ -1089,6 +1089,27 @@ rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
} }
static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data)
{
rpl_binlog_state *self= (rpl_binlog_state *)data;
return self->update_nolock(gtid, false);
}
bool
rpl_binlog_state::load(rpl_slave_state *slave_pos)
{
bool res= false;
mysql_mutex_lock(&LOCK_binlog_state);
reset_nolock();
if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0))
res= true;
mysql_mutex_unlock(&LOCK_binlog_state);
return res;
}
rpl_binlog_state::~rpl_binlog_state() rpl_binlog_state::~rpl_binlog_state()
{ {
free(); free();
...@@ -1848,6 +1869,31 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) ...@@ -1848,6 +1869,31 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
} }
/*
Check if the GTID position has been reached, for mysql_binlog_send().
The position has not been reached if we have anything in the state, unless
it has either the START_ON_EMPTY_DOMAIN flag set (which means it does not
belong to this master at all), or the START_OWN_SLAVE_POS (which means that
we start on an old position from when the server was a slave with
--log-slave-updates=0).
*/
bool
slave_connection_state::is_pos_reached()
{
uint32 i;
for (i= 0; i < hash.records; ++i)
{
entry *e= (entry *)my_hash_element(&hash, i);
if (!(e->flags & (START_OWN_SLAVE_POS|START_ON_EMPTY_DOMAIN)))
return false;
}
return true;
}
/* /*
Execute a MASTER_GTID_WAIT(). Execute a MASTER_GTID_WAIT().
The position to wait for is in gtid_str in string form. The position to wait for is in gtid_str in string form.
......
...@@ -235,6 +235,7 @@ struct rpl_binlog_state ...@@ -235,6 +235,7 @@ struct rpl_binlog_state
void reset(); void reset();
void free(); void free();
bool load(struct rpl_gtid *list, uint32 count); bool load(struct rpl_gtid *list, uint32 count);
bool load(rpl_slave_state *slave_pos);
int update_nolock(const struct rpl_gtid *gtid, bool strict); int update_nolock(const struct rpl_gtid *gtid, bool strict);
int update(const struct rpl_gtid *gtid, bool strict); int update(const struct rpl_gtid *gtid, bool strict);
int update_with_next_gtid(uint32 domain_id, uint32 server_id, int update_with_next_gtid(uint32 domain_id, uint32 server_id,
...@@ -287,6 +288,7 @@ struct slave_connection_state ...@@ -287,6 +288,7 @@ struct slave_connection_state
int to_string(String *out_str); int to_string(String *out_str);
int append_to_string(String *out_str); int append_to_string(String *out_str);
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
bool is_pos_reached();
}; };
......
...@@ -1832,6 +1832,41 @@ rpl_parallel::wait_for_workers_idle(THD *thd) ...@@ -1832,6 +1832,41 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
} }
/*
Handle seeing a GTID during slave restart in GTID mode. If we stopped with
different replication domains having reached different positions in the relay
log, we need to skip event groups in domains that are further progressed.
Updates the state with the seen GTID, and returns true if this GTID should
be skipped, false otherwise.
*/
bool
process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid)
{
slave_connection_state::entry *gtid_entry;
slave_connection_state *state= &rli->restart_gtid_pos;
if (likely(state->count() == 0) ||
!(gtid_entry= state->find_entry(gtid->domain_id)))
return false;
if (gtid->server_id == gtid_entry->gtid.server_id)
{
uint64 seq_no= gtid_entry->gtid.seq_no;
if (gtid->seq_no >= seq_no)
{
/*
This domain has reached its start position. So remove it, so that
further events will be processed normally.
*/
state->remove(&gtid_entry->gtid);
}
return gtid->seq_no <= seq_no;
}
else
return true;
}
/* /*
This is used when we get an error during processing in do_event(); This is used when we get an error during processing in do_event();
We will not queue any event to the thread, but we still need to wake it up We will not queue any event to the thread, but we still need to wake it up
...@@ -1893,13 +1928,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1893,13 +1928,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return -1; return -1;
/* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */ /* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
if (unlikely(!current) && typ != GTID_EVENT) is_group_event= Log_event::is_group_event(typ);
if (unlikely(!current) && typ != GTID_EVENT &&
!(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event))
return -1; return -1;
/* ToDo: what to do with this lock?!? */ /* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock); mysql_mutex_unlock(&rli->data_lock);
if (typ == FORMAT_DESCRIPTION_EVENT) if (unlikely(typ == FORMAT_DESCRIPTION_EVENT))
{ {
Format_description_log_event *fdev= Format_description_log_event *fdev=
static_cast<Format_description_log_event *>(ev); static_cast<Format_description_log_event *>(ev);
...@@ -1925,6 +1962,19 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1925,6 +1962,19 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
} }
} }
} }
else if (unlikely(typ == GTID_LIST_EVENT))
{
Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev);
rpl_gtid *list= glev->list;
uint32 count= glev->count;
rli->update_relay_log_state(list, count);
while (count)
{
process_gtid_for_restart_pos(rli, list);
++list;
--count;
}
}
/* /*
Stop queueing additional event groups once the SQL thread is requested to Stop queueing additional event groups once the SQL thread is requested to
...@@ -1934,7 +1984,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1934,7 +1984,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
been partially queued, but after that we will just ignore any further been partially queued, but after that we will just ignore any further
events the SQL driver thread may try to queue, and eventually it will stop. events the SQL driver thread may try to queue, and eventually it will stop.
*/ */
is_group_event= Log_event::is_group_event(typ);
if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave) if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave)
sql_thread_stopping= true; sql_thread_stopping= true;
if (sql_thread_stopping) if (sql_thread_stopping)
...@@ -1947,8 +1996,34 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1947,8 +1996,34 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 0; return 0;
} }
if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)
{
if (typ == GTID_EVENT)
rli->gtid_skip_flag= GTID_SKIP_NOT;
else
{
if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE)
{
if (!Log_event::is_part_of_group(typ))
rli->gtid_skip_flag= GTID_SKIP_NOT;
}
else
{
DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION);
if (typ == XID_EVENT ||
(typ == QUERY_EVENT &&
(((Query_log_event *)ev)->is_commit() ||
((Query_log_event *)ev)->is_rollback())))
rli->gtid_skip_flag= GTID_SKIP_NOT;
}
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
return 0;
}
}
if (typ == GTID_EVENT) if (typ == GTID_EVENT)
{ {
rpl_gtid gtid;
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
0 : gtid_ev->domain_id); 0 : gtid_ev->domain_id);
...@@ -1959,6 +2034,23 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1959,6 +2034,23 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 1; return 1;
} }
current= e; current= e;
gtid.domain_id= gtid_ev->domain_id;
gtid.server_id= gtid_ev->server_id;
gtid.seq_no= gtid_ev->seq_no;
rli->update_relay_log_state(&gtid, 1);
if (process_gtid_for_restart_pos(rli, &gtid))
{
/*
This domain has progressed further into the relay log before the last
SQL thread restart. So we need to skip this event group to not doubly
apply it.
*/
rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
return 0;
}
} }
else else
e= current; e= current;
......
...@@ -317,5 +317,6 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool; ...@@ -317,5 +317,6 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
uint32 new_count, uint32 new_count,
bool skip_check= false); bool skip_check= false);
extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
#endif /* RPL_PARALLEL_H */ #endif /* RPL_PARALLEL_H */
...@@ -62,7 +62,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) ...@@ -62,7 +62,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
inited(0), abort_slave(0), stop_for_until(0), gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
slave_running(0), until_condition(UNTIL_NONE), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0), until_log_pos(0), retried_trans(0), executed_entries(0),
m_flags(0) m_flags(0)
...@@ -100,18 +100,9 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) ...@@ -100,18 +100,9 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
Relay_log_info::~Relay_log_info() Relay_log_info::~Relay_log_info()
{ {
inuse_relaylog *cur;
DBUG_ENTER("Relay_log_info::~Relay_log_info"); DBUG_ENTER("Relay_log_info::~Relay_log_info");
cur= inuse_relaylog_list; reset_inuse_relaylog();
while (cur)
{
DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
inuse_relaylog *next= cur->next;
my_atomic_rwlock_destroy(&cur->inuse_relaylog_atomic_lock);
my_free(cur);
cur= next;
}
mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock); mysql_mutex_destroy(&log_space_lock);
...@@ -1384,14 +1375,34 @@ int ...@@ -1384,14 +1375,34 @@ int
Relay_log_info::alloc_inuse_relaylog(const char *name) Relay_log_info::alloc_inuse_relaylog(const char *name)
{ {
inuse_relaylog *ir; inuse_relaylog *ir;
uint32 gtid_count;
rpl_gtid *gtid_list;
if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL)))) if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
{ {
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
return 1; return 1;
} }
gtid_count= relay_log_state.count();
if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count,
MYF(MY_WME))))
{
my_free(ir);
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count);
return 1;
}
if (relay_log_state.get_gtid_list(gtid_list, gtid_count))
{
my_free(gtid_list);
my_free(ir);
DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
return 1;
}
ir->rli= this; ir->rli= this;
strmake_buf(ir->name, name); strmake_buf(ir->name, name);
ir->relay_log_state= gtid_list;
ir->relay_log_state_count= gtid_count;
if (!inuse_relaylog_list) if (!inuse_relaylog_list)
inuse_relaylog_list= ir; inuse_relaylog_list= ir;
...@@ -1407,6 +1418,45 @@ Relay_log_info::alloc_inuse_relaylog(const char *name) ...@@ -1407,6 +1418,45 @@ Relay_log_info::alloc_inuse_relaylog(const char *name)
} }
void
Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir)
{
my_free(ir->relay_log_state);
my_atomic_rwlock_destroy(&ir->inuse_relaylog_atomic_lock);
my_free(ir);
}
void
Relay_log_info::reset_inuse_relaylog()
{
inuse_relaylog *cur= inuse_relaylog_list;
while (cur)
{
DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
inuse_relaylog *next= cur->next;
free_inuse_relaylog(cur);
cur= next;
}
inuse_relaylog_list= last_inuse_relaylog= NULL;
}
int
Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
{
int res= 0;
while (count)
{
if (relay_log_state.update_nolock(gtid_list, false))
res= 1;
++gtid_list;
--count;
}
return res;
}
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int int
rpl_load_gtid_slave_state(THD *thd) rpl_load_gtid_slave_state(THD *thd)
......
...@@ -269,6 +269,8 @@ public: ...@@ -269,6 +269,8 @@ public:
int events_till_abort; int events_till_abort;
#endif #endif
enum_gtid_skip_type gtid_skip_flag;
/* /*
inited changes its value within LOCK_active_mi-guarded critical inited changes its value within LOCK_active_mi-guarded critical
sections at times of start_slave_threads() (0->1) and end_slave() (1->0). sections at times of start_slave_threads() (0->1) and end_slave() (1->0).
...@@ -344,6 +346,21 @@ public: ...@@ -344,6 +346,21 @@ public:
size_t slave_patternload_file_size; size_t slave_patternload_file_size;
rpl_parallel parallel; rpl_parallel parallel;
/*
The relay_log_state keeps track of the current binlog state of the execution
of the relay log. This is used to know where to resume current GTID position
if the slave thread is stopped and restarted.
It is only accessed from the SQL thread, so it does not need any locking.
*/
rpl_binlog_state relay_log_state;
/*
The restart_gtid_state is used when the SQL thread restarts on a relay log
in GTID mode. In multi-domain parallel replication, each domain may have a
separat position, so some events in more progressed domains may need to be
skipped. This keeps track of the domains that have not yet reached their
starting event.
*/
slave_connection_state restart_gtid_pos;
Relay_log_info(bool is_slave_recovery); Relay_log_info(bool is_slave_recovery);
~Relay_log_info(); ~Relay_log_info();
...@@ -408,6 +425,9 @@ public: ...@@ -408,6 +425,9 @@ public:
time_t event_creation_time, THD *thd, time_t event_creation_time, THD *thd,
rpl_group_info *rgi); rpl_group_info *rgi);
int alloc_inuse_relaylog(const char *name); int alloc_inuse_relaylog(const char *name);
void free_inuse_relaylog(inuse_relaylog *ir);
void reset_inuse_relaylog();
int update_relay_log_state(rpl_gtid *gtid_list, uint32 count);
/** /**
Is the replication inside a group? Is the replication inside a group?
...@@ -497,6 +517,12 @@ private: ...@@ -497,6 +517,12 @@ private:
struct inuse_relaylog { struct inuse_relaylog {
inuse_relaylog *next; inuse_relaylog *next;
Relay_log_info *rli; Relay_log_info *rli;
/*
relay_log_state holds the binlog state corresponding to the start of this
relay log file. It is an array with relay_log_state_count elements.
*/
rpl_gtid *relay_log_state;
uint32 relay_log_state_count;
/* Number of events in this relay log queued for worker threads. */ /* Number of events in this relay log queued for worker threads. */
int64 queued_count; int64 queued_count;
/* Number of events completed by worker threads. */ /* Number of events completed by worker threads. */
......
...@@ -943,6 +943,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, ...@@ -943,6 +943,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
Master_info::USE_GTID_CURRENT_POS); Master_info::USE_GTID_CURRENT_POS);
mi->events_queued_since_last_gtid= 0; mi->events_queued_since_last_gtid= 0;
mi->gtid_reconnect_event_skip_count= 0; mi->gtid_reconnect_event_skip_count= 0;
mi->rli.restart_gtid_pos.reset();
} }
if (!error && (thread_mask & SLAVE_IO)) if (!error && (thread_mask & SLAVE_IO))
...@@ -4504,6 +4506,16 @@ pthread_handler_t handle_slave_sql(void *arg) ...@@ -4504,6 +4506,16 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi->gtid_sub_id= 0; serial_rgi->gtid_sub_id= 0;
serial_rgi->gtid_pending= false; serial_rgi->gtid_pending= false;
if (mi->using_gtid != Master_info::USE_GTID_NO)
{
/*
We initialize the relay log state from the know starting position.
It will then be updated as required by GTID and GTID_LIST events found
while applying events read from relay logs.
*/
rli->relay_log_state.load(&rpl_global_gtid_slave_state);
}
rli->gtid_skip_flag = GTID_SKIP_NOT;
if (init_relay_log_pos(rli, if (init_relay_log_pos(rli,
rli->group_relay_log_name, rli->group_relay_log_name,
rli->group_relay_log_pos, rli->group_relay_log_pos,
...@@ -4514,6 +4526,7 @@ pthread_handler_t handle_slave_sql(void *arg) ...@@ -4514,6 +4526,7 @@ pthread_handler_t handle_slave_sql(void *arg)
"Error initializing relay log position: %s", errmsg); "Error initializing relay log position: %s", errmsg);
goto err; goto err;
} }
rli->reset_inuse_relaylog();
if (rli->alloc_inuse_relaylog(rli->group_relay_log_name)) if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
goto err; goto err;
...@@ -4718,7 +4731,49 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, ...@@ -4718,7 +4731,49 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
thd->reset_query(); thd->reset_query();
thd->reset_db(NULL, 0); thd->reset_db(NULL, 0);
if (rli->mi->using_gtid != Master_info::USE_GTID_NO) if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
{
ulong domain_count;
flush_relay_log_info(rli); flush_relay_log_info(rli);
if (opt_slave_parallel_threads > 0)
{
/*
In parallel replication GTID mode, we may stop with different domains
at different positions in the relay log.
To handle this when we restart the SQL thread, mark the current
per-domain position in the Relay_log_info.
*/
mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
domain_count= rpl_global_gtid_slave_state.count();
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (domain_count > 1)
{
inuse_relaylog *ir;
/*
Load the starting GTID position, so that we can skip already applied
GTIDs when we restart the SQL thread. And set the start position in
the relay log back to a known safe place to start (prior to any not
yet applied transaction in any domain).
*/
rli->restart_gtid_pos.load(&rpl_global_gtid_slave_state, NULL, 0);
if ((ir= rli->inuse_relaylog_list))
{
rpl_gtid *gtid= ir->relay_log_state;
uint32 count= ir->relay_log_state_count;
while (count > 0)
{
process_gtid_for_restart_pos(rli, gtid);
++gtid;
--count;
}
strmake_buf(rli->group_relay_log_name, ir->name);
rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE;
}
}
}
}
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
thd->add_status_to_global(); thd->add_status_to_global();
mysql_mutex_lock(&rli->run_lock); mysql_mutex_lock(&rli->run_lock);
...@@ -4731,6 +4786,7 @@ err_during_init: ...@@ -4731,6 +4786,7 @@ err_during_init:
/* Forget the relay log's format */ /* Forget the relay log's format */
delete rli->relay_log.description_event_for_exec; delete rli->relay_log.description_event_for_exec;
rli->relay_log.description_event_for_exec= 0; rli->relay_log.description_event_for_exec= 0;
rli->reset_inuse_relaylog();
/* Wake up master_pos_wait() */ /* Wake up master_pos_wait() */
mysql_mutex_unlock(&rli->data_lock); mysql_mutex_unlock(&rli->data_lock);
DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions")); DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
......
...@@ -2377,6 +2377,31 @@ impossible position"; ...@@ -2377,6 +2377,31 @@ impossible position";
info.fdev= tmp; info.fdev= tmp;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
if (info.using_gtid_state)
{
/*
If this event has the field `created' set, then it will cause the
slave to delete all active temporary tables. This must not happen
if the slave received any later GTIDs in a previous connect, as
those GTIDs might have created new temporary tables that are still
needed.
So here, we check if the starting GTID position was already
reached before this format description event. If not, we clear the
`created' flag to preserve temporary tables on the slave. (If the
slave connects at a position past this event, it means that it
already received and handled it in a previous connect).
*/
if (!info.gtid_state.is_pos_reached())
{
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
fix_checksum(packet, ev_offset);
}
}
} }
#ifndef DBUG_OFF #ifndef DBUG_OFF
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment