Commit 6a84473c authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-12179: Per-engine mysql.gtid_slave_pos table

Intermediate commit.

This commit implements that record_gtid() selects a gtid_slave_posXXX table
with a storage engine already in use by current transaction, if any.

The default table mysql.gtid_slave_pos is used if no match can be found on
storage engine, or for GTID position updates with no specific storage
engine.

Table discovery of mysql.gtid_slave_pos* happens on initial GTID state load
as well as on every START SLAVE. Some effort is made to make this possible
without additional locking. New tables are added using lock-free atomics.
Removing tables requires stopping all slaves first. A warning is given in
the error log when a table is removed but a non-stopped slave still has a
reference to it.

If multiple mysql.gtid_slave_posXXX tables with same storage engine exist,
one is chosen arbitrarily to be used, with a warning in the error log. GTID
data from all tables is still read, but only one among redundant tables with
same storage engine will be updated.
parent 3501a535
connect slave1,127.0.0.1,root,,,$SERVER_MYPORT_3;
connect master1,127.0.0.1,root,,,$SERVER_MYPORT_1;
connect master2,127.0.0.1,root,,,$SERVER_MYPORT_2;
connection slave1;
CHANGE MASTER 'slave1' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
CHANGE MASTER 'slave2' TO master_port=MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
set default_master_connection = 'slave1';
START SLAVE;
include/wait_for_slave_to_start.inc
set default_master_connection = 'slave2';
START SLAVE;
include/wait_for_slave_to_start.inc
set default_master_connection = '';
connection master1;
SET GLOBAL gtid_domain_id= 1;
SET SESSION gtid_domain_id= 1;
CREATE TABLE t3 (a INT PRIMARY KEY, b VARCHAR(10)) ENGINE=InnoDB;
CREATE TABLE t1 (a INT PRIMARY KEY, b VARCHAR(10));
INSERT INTO t1 VALUES (1, "initial");
INSERT INTO t3 VALUES (101, "initial 1");
include/save_master_gtid.inc
connection master2;
SET GLOBAL gtid_domain_id= 2;
SET SESSION gtid_domain_id= 2;
CREATE TABLE t2 (a INT PRIMARY KEY, b VARCHAR(10)) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1, "initial");
connection slave1;
include/sync_with_master_gtid.inc
connection master2;
include/save_master_gtid.inc
connection slave1;
include/sync_with_master_gtid.inc
*** Add an innodb gtid_slave_pos table. It is not used yet as slaves are already running ***
SET sql_log_bin=0;
CREATE TABLE mysql.gtid_slave_pos_innodb LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_innodb ENGINE=InnoDB;
SET sql_log_bin=0;
connection master1;
INSERT INTO t3 VALUES (102, "secondary");
include/save_master_gtid.inc
connection slave1;
include/sync_with_master_gtid.inc
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos GROUP BY domain_id;
domain_id max(seq_no)
1 5
2 2
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos_innodb GROUP BY domain_id;
domain_id max(seq_no)
*** Restart one slave thread, the other keeps running. Now the new table is used ***
connection slave1;
set default_master_connection = 'slave1';
STOP SLAVE;
include/wait_for_slave_to_stop.inc
START SLAVE;
include/wait_for_slave_to_start.inc
connection master2;
INSERT INTO t2 VALUES (2, "secondary2");
include/save_master_gtid.inc
connection slave1;
include/sync_with_master_gtid.inc
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos GROUP BY domain_id;
domain_id max(seq_no)
1 5
2 2
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos_innodb GROUP BY domain_id;
domain_id max(seq_no)
2 3
*** Remove a gtid_slave_posXXX table, restart one slave ***
*** Get a warning that the change is not yet picked up ***
*** See that updates fail due to trying to use the missing table ***
connection slave1;
SET sql_log_bin=0;
DROP TABLE mysql.gtid_slave_pos_innodb;
SET sql_log_bin=1;
set default_master_connection = 'slave2';
STOP SLAVE;
include/wait_for_slave_to_stop.inc
START SLAVE;
include/wait_for_slave_to_start.inc
CALL mtr.add_suppression("The table mysql.gtid_slave_pos_innodb was removed.");
connection master2;
INSERT INTO t2 VALUES (3, "tertiary 2");
include/save_master_gtid.inc
connection slave1;
include/wait_for_slave_sql_error.inc [errno=1942]
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos GROUP BY domain_id;
domain_id max(seq_no)
1 5
2 2
*** Stop both slaves, see that the drop of mysql.gtid_slave_pos_innodb is now picked up ***
connection slave1;
set default_master_connection = 'slave1';
STOP SLAVE;
include/wait_for_slave_to_stop.inc
set default_master_connection = 'slave2';
STOP SLAVE;
include/wait_for_slave_to_stop.inc
set default_master_connection = 'slave1';
START SLAVE;
include/wait_for_slave_to_start.inc
set default_master_connection = 'slave2';
START SLAVE;
include/wait_for_slave_to_start.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 initial
SELECT * FROM t2 ORDER BY a;
a b
1 initial
2 secondary2
3 tertiary 2
SELECT * FROM t3 ORDER BY a;
a b
101 initial 1
102 secondary
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos GROUP BY domain_id;
domain_id max(seq_no)
1 5
2 4
connection master1;
DROP TABLE t1;
DROP TABLE t3;
connection master2;
DROP TABLE t2;
connection slave1;
SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES;
Warnings:
Note 1938 SLAVE 'slave1' stopped
Note 1938 SLAVE 'slave2' stopped
include/reset_master_slave.inc
disconnect slave1;
connection master1;
SET GLOBAL gtid_domain_id=0;
include/reset_master_slave.inc
disconnect master1;
connection master2;
SET GLOBAL gtid_domain_id=0;
include/reset_master_slave.inc
disconnect master2;
--source include/not_embedded.inc
--source include/have_innodb.inc
#
# Test multiple mysql.gtid_slave_posXXX tables with multiple master connections
#
--connect (slave1,127.0.0.1,root,,,$SERVER_MYPORT_3)
--connect (master1,127.0.0.1,root,,,$SERVER_MYPORT_1)
--connect (master2,127.0.0.1,root,,,$SERVER_MYPORT_2)
--connection slave1
--replace_result $SERVER_MYPORT_1 MYPORT_1
eval CHANGE MASTER 'slave1' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
--replace_result $SERVER_MYPORT_2 MYPORT_2
eval CHANGE MASTER 'slave2' TO master_port=$SERVER_MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
set default_master_connection = 'slave1';
START SLAVE;
--source include/wait_for_slave_to_start.inc
set default_master_connection = 'slave2';
START SLAVE;
--source include/wait_for_slave_to_start.inc
set default_master_connection = '';
--connection master1
SET GLOBAL gtid_domain_id= 1;
SET SESSION gtid_domain_id= 1;
CREATE TABLE t3 (a INT PRIMARY KEY, b VARCHAR(10)) ENGINE=InnoDB;
CREATE TABLE t1 (a INT PRIMARY KEY, b VARCHAR(10));
INSERT INTO t1 VALUES (1, "initial");
INSERT INTO t3 VALUES (101, "initial 1");
--source include/save_master_gtid.inc
--connection master2
SET GLOBAL gtid_domain_id= 2;
SET SESSION gtid_domain_id= 2;
CREATE TABLE t2 (a INT PRIMARY KEY, b VARCHAR(10)) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1, "initial");
--connection slave1
--source include/sync_with_master_gtid.inc
--connection master2
--source include/save_master_gtid.inc
--connection slave1
--source include/sync_with_master_gtid.inc
--echo *** Add an innodb gtid_slave_pos table. It is not used yet as slaves are already running ***
SET sql_log_bin=0;
CREATE TABLE mysql.gtid_slave_pos_innodb LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_innodb ENGINE=InnoDB;
SET sql_log_bin=0;
--connection master1
INSERT INTO t3 VALUES (102, "secondary");
--source include/save_master_gtid.inc
--connection slave1
--source include/sync_with_master_gtid.inc
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos GROUP BY domain_id;
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos_innodb GROUP BY domain_id;
--echo *** Restart one slave thread, the other keeps running. Now the new table is used ***
--connection slave1
set default_master_connection = 'slave1';
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
START SLAVE;
--source include/wait_for_slave_to_start.inc
--connection master2
INSERT INTO t2 VALUES (2, "secondary2");
--source include/save_master_gtid.inc
--connection slave1
--source include/sync_with_master_gtid.inc
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos GROUP BY domain_id;
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos_innodb GROUP BY domain_id;
--echo *** Remove a gtid_slave_posXXX table, restart one slave ***
--echo *** Get a warning that the change is not yet picked up ***
--echo *** See that updates fail due to trying to use the missing table ***
--connection slave1
SET sql_log_bin=0;
DROP TABLE mysql.gtid_slave_pos_innodb;
SET sql_log_bin=1;
set default_master_connection = 'slave2';
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
START SLAVE;
--source include/wait_for_slave_to_start.inc
CALL mtr.add_suppression("The table mysql.gtid_slave_pos_innodb was removed.");
--connection master2
INSERT INTO t2 VALUES (3, "tertiary 2");
--source include/save_master_gtid.inc
--connection slave1
--let $slave_sql_errno= 1942
--source include/wait_for_slave_sql_error.inc
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos GROUP BY domain_id;
--echo *** Stop both slaves, see that the drop of mysql.gtid_slave_pos_innodb is now picked up ***
--connection slave1
set default_master_connection = 'slave1';
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
set default_master_connection = 'slave2';
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
set default_master_connection = 'slave1';
START SLAVE;
--source include/wait_for_slave_to_start.inc
set default_master_connection = 'slave2';
START SLAVE;
--source include/wait_for_slave_to_start.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
SELECT * FROM t3 ORDER BY a;
SELECT domain_id, max(seq_no) FROM mysql.gtid_slave_pos GROUP BY domain_id;
# Cleanup.
--connection master1
DROP TABLE t1;
DROP TABLE t3;
--connection master2
DROP TABLE t2;
--connection slave1
SET GLOBAL gtid_domain_id=0;
--let $wait_condition= SELECT COUNT(*)=0 FROM information_schema.tables WHERE table_name IN ("t1", "t2", "t3") AND table_schema = "test"
--source include/wait_condition.inc
--sorted_result
STOP ALL SLAVES;
--source include/reset_master_slave.inc
--disconnect slave1
--connection master1
SET GLOBAL gtid_domain_id=0;
--source include/reset_master_slave.inc
--disconnect master1
--connection master2
SET GLOBAL gtid_domain_id=0;
--source include/reset_master_slave.inc
--disconnect master2
--source include/have_innodb.inc
--let $rpl_topology=1->2 --let $rpl_topology=1->2
--source include/rpl_init.inc --source include/rpl_init.inc
--source include/have_innodb.inc
--connection server_2 --connection server_2
--source include/stop_slave.inc --source include/stop_slave.inc
......
...@@ -470,6 +470,48 @@ gtid_check_rpl_slave_state_table(TABLE *table) ...@@ -470,6 +470,48 @@ gtid_check_rpl_slave_state_table(TABLE *table)
} }
/*
Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine
that is already in use by the current transaction, if any.
*/
void
rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_STRING *out_tablename)
{
struct gtid_pos_table *list, *table_entry, *default_entry;
/*
See comments on rpl_slave_state::gtid_pos_tables for rules around proper
access to the list.
*/
list= my_atomic_loadptr_explicit(&gtid_pos_tables, MY_MEMORY_ORDER_ACQUIRE);
Ha_trx_info *ha_info= thd->transaction.all.ha_list;
while (ha_info)
{
void *trx_hton= ha_info->ht();
table_entry= list;
while (table_entry)
{
if (table_entry->table_hton == trx_hton)
{
*out_tablename= table_entry->table_name;
return;
}
table_entry= table_entry->next;
}
ha_info= ha_info->next();
}
/*
If we cannot find any table whose engine matches an engine that is
already active in the transaction, or if there is no current transaction
engines available, we return the default gtid_slave_pos table.
*/
default_entry= my_atomic_loadptr_explicit(&default_gtid_pos_table,
MY_MEMORY_ORDER_ACQUIRE);
*out_tablename= default_entry->table_name;
}
/* /*
Write a gtid to the replication slave state table. Write a gtid to the replication slave state table.
...@@ -500,6 +542,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, ...@@ -500,6 +542,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
Query_tables_list lex_backup; Query_tables_list lex_backup;
wait_for_commit* suspended_wfc; wait_for_commit* suspended_wfc;
void *hton= NULL; void *hton= NULL;
LEX_STRING gtid_pos_table_name;
DBUG_ENTER("record_gtid"); DBUG_ENTER("record_gtid");
*out_hton= NULL; *out_hton= NULL;
...@@ -517,6 +560,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, ...@@ -517,6 +560,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
if (!in_statement) if (!in_statement)
thd->reset_for_next_command(); thd->reset_for_next_command();
select_gtid_pos_table(thd, &gtid_pos_table_name);
DBUG_EXECUTE_IF("gtid_inject_record_gtid", DBUG_EXECUTE_IF("gtid_inject_record_gtid",
{ {
...@@ -547,10 +591,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, ...@@ -547,10 +591,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
*/ */
suspended_wfc= thd->suspend_subsequent_commits(); suspended_wfc= thd->suspend_subsequent_commits();
thd->lex->reset_n_backup_query_tables_list(&lex_backup); thd->lex->reset_n_backup_query_tables_list(&lex_backup);
tlist.init_one_table(STRING_WITH_LEN("mysql"), tlist.init_one_table(STRING_WITH_LEN("mysql"), gtid_pos_table_name.str,
rpl_gtid_slave_state_table_name.str, gtid_pos_table_name.length, NULL, TL_WRITE);
rpl_gtid_slave_state_table_name.length,
NULL, TL_WRITE);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end; goto end;
table_opened= true; table_opened= true;
...@@ -1168,18 +1210,35 @@ rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *li ...@@ -1168,18 +1210,35 @@ rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *li
} }
/*
Replace the list of available mysql.gtid_slave_posXXX tables with a new list.
The caller must be holding LOCK_slave_state. Additionally, this function
must only be called while all SQL threads are stopped.
*/
void void
rpl_slave_state::set_gtid_pos_tables_list(struct rpl_slave_state::gtid_pos_table *new_list) rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list,
rpl_slave_state::gtid_pos_table *default_entry)
{ {
struct gtid_pos_table *old_list; gtid_pos_table *old_list;
mysql_mutex_assert_owner(&LOCK_slave_state); mysql_mutex_assert_owner(&LOCK_slave_state);
old_list= gtid_pos_tables; old_list= gtid_pos_tables;
gtid_pos_tables= new_list; my_atomic_storeptr_explicit(&gtid_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE);
my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry,
MY_MEMORY_ORDER_RELEASE);
free_gtid_pos_tables(old_list); free_gtid_pos_tables(old_list);
} }
void
rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry)
{
mysql_mutex_assert_owner(&LOCK_slave_state);
entry->next= gtid_pos_tables;
my_atomic_storeptr_explicit(&gtid_pos_tables, entry, MY_MEMORY_ORDER_RELEASE);
}
struct rpl_slave_state::gtid_pos_table * struct rpl_slave_state::gtid_pos_table *
rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, void *hton) rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, void *hton)
{ {
......
...@@ -177,7 +177,27 @@ struct rpl_slave_state ...@@ -177,7 +177,27 @@ struct rpl_slave_state
DYNAMIC_ARRAY gtid_sort_array; DYNAMIC_ARRAY gtid_sort_array;
uint64 last_sub_id; uint64 last_sub_id;
/*
List of tables available for durably storing the slave GTID position.
Accesses to this table is protected by LOCK_slave_state. However for
efficiency, there is also a provision for read access to it from a running
slave without lock.
An element can be added at the head of a list by storing the new
gtid_pos_tables pointer atomically with release semantics, to ensure that
the next pointer of the new element is visible to readers of the new list.
Other changes (like deleting or replacing elements) must happen only while
all SQL driver threads are stopped. LOCK_slave_state must be held in any
case.
The list can be read without lock by an SQL driver thread or worker thread
by reading the gtid_pos_tables pointer atomically with acquire semantics,
to ensure that it will see the correct next pointer of a new head element.
*/
struct gtid_pos_table *gtid_pos_tables; struct gtid_pos_table *gtid_pos_tables;
/* The default entry in gtid_pos_tables, mysql.gtid_slave_pos. */
struct gtid_pos_table *default_gtid_pos_table;
bool loaded; bool loaded;
rpl_slave_state(); rpl_slave_state();
...@@ -188,6 +208,7 @@ struct rpl_slave_state ...@@ -188,6 +208,7 @@ struct rpl_slave_state
int update(uint32 domain_id, uint32 server_id, uint64 sub_id, int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
uint64 seq_no, void *hton, rpl_group_info *rgi); uint64 seq_no, void *hton, rpl_group_info *rgi);
int truncate_state_table(THD *thd); int truncate_state_table(THD *thd);
void select_gtid_pos_table(THD *thd, LEX_STRING *out_tablename);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement, void **out_hton); bool in_transaction, bool in_statement, void **out_hton);
uint64 next_sub_id(uint32 domain_id); uint64 next_sub_id(uint32 domain_id);
...@@ -208,7 +229,9 @@ struct rpl_slave_state ...@@ -208,7 +229,9 @@ struct rpl_slave_state
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi); int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi); int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi);
void release_domain_owner(rpl_group_info *rgi); void release_domain_owner(rpl_group_info *rgi);
void set_gtid_pos_tables_list(struct gtid_pos_table *new_list); void set_gtid_pos_tables_list(gtid_pos_table *new_list,
gtid_pos_table *default_entry);
void add_gtid_pos_table(gtid_pos_table *entry);
struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, void *hton); struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, void *hton);
void free_gtid_pos_tables(struct gtid_pos_table *list); void free_gtid_pos_tables(struct gtid_pos_table *list);
}; };
......
...@@ -1647,24 +1647,70 @@ struct load_gtid_state_cb_data { ...@@ -1647,24 +1647,70 @@ struct load_gtid_state_cb_data {
HASH *hash; HASH *hash;
DYNAMIC_ARRAY *array; DYNAMIC_ARRAY *array;
struct rpl_slave_state::gtid_pos_table *table_list; struct rpl_slave_state::gtid_pos_table *table_list;
struct rpl_slave_state::gtid_pos_table *default_entry;
}; };
static int
process_gtid_pos_table(THD *thd, LEX_STRING *table_name, void *hton,
struct load_gtid_state_cb_data *data)
{
struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr;
bool is_default=
(strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0);
/*
Ignore tables with duplicate storage engine, with a warning.
Prefer the default mysql.gtid_slave_pos over another table
mysql.gtid_slave_posXXX with the same storage engine.
*/
next_ptr= &data->table_list;
entry= data->table_list;
while (entry)
{
if (entry->table_hton == hton)
{
static const char *warning_msg= "Ignoring redundant table mysql.%s "
"since mysql.%s has the same storage engine";
if (!is_default)
{
/* Ignore the redundant table. */
sql_print_warning(warning_msg, table_name->str, entry->table_name);
return 0;
}
else
{
sql_print_warning(warning_msg, entry->table_name, table_name->str);
/* Delete the redundant table, and proceed to add this one instead. */
*next_ptr= entry->next;
my_free(entry);
break;
}
}
next_ptr= &entry->next;
entry= entry->next;
}
if (!(p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, hton)))
return 1;
p->next= data->table_list;
data->table_list= p;
if (is_default)
data->default_entry= p;
return 0;
}
static int static int
load_gtid_state_cb(THD *thd, LEX_STRING *table_name, void *arg) load_gtid_state_cb(THD *thd, LEX_STRING *table_name, void *arg)
{ {
int err; int err;
load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
struct rpl_slave_state::gtid_pos_table *p;
void *hton; void *hton;
if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array, if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
table_name, &hton))) table_name, &hton)))
return err; return err;
if (!(p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, hton))) return process_gtid_pos_table(thd, table_name, hton, data);
return 1;
p->next= data->table_list;
data->table_list= p;
return 0;
} }
...@@ -1687,6 +1733,7 @@ rpl_load_gtid_slave_state(THD *thd) ...@@ -1687,6 +1733,7 @@ rpl_load_gtid_slave_state(THD *thd)
DBUG_RETURN(0); DBUG_RETURN(0);
cb_data.table_list= NULL; cb_data.table_list= NULL;
cb_data.default_entry= NULL;
my_hash_init(&hash, &my_charset_bin, 32, my_hash_init(&hash, &my_charset_bin, 32,
offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id), offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE); sizeof(uint32), NULL, my_free, HASH_UNIQUE);
...@@ -1706,6 +1753,23 @@ rpl_load_gtid_slave_state(THD *thd) ...@@ -1706,6 +1753,23 @@ rpl_load_gtid_slave_state(THD *thd)
goto end; goto end;
} }
if (!cb_data.table_list)
{
my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
rpl_gtid_slave_state_table_name.str);
err= 1;
goto end;
}
else if (!cb_data.default_entry)
{
/*
If the mysql.gtid_slave_pos table does not exist, but at least one other
table is available, arbitrarily pick the first in the list to use as
default.
*/
cb_data.default_entry= cb_data.table_list;
}
for (i= 0; i < array.elements; ++i) for (i= 0; i < array.elements; ++i)
{ {
get_dynamic(&array, (uchar *)&tmp_entry, i); get_dynamic(&array, (uchar *)&tmp_entry, i);
...@@ -1735,7 +1799,8 @@ rpl_load_gtid_slave_state(THD *thd) ...@@ -1735,7 +1799,8 @@ rpl_load_gtid_slave_state(THD *thd)
} }
} }
rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list); rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
cb_data.default_entry);
cb_data.table_list= NULL; cb_data.table_list= NULL;
rpl_global_gtid_slave_state->loaded= true; rpl_global_gtid_slave_state->loaded= true;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
...@@ -1753,12 +1818,10 @@ rpl_load_gtid_slave_state(THD *thd) ...@@ -1753,12 +1818,10 @@ rpl_load_gtid_slave_state(THD *thd)
static int static int
find_gtid_pos_tables_cb(THD *thd, LEX_STRING *table_name, void *arg) find_gtid_pos_tables_cb(THD *thd, LEX_STRING *table_name, void *arg)
{ {
struct rpl_slave_state::gtid_pos_table **table_list_ptr= load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
static_cast<struct rpl_slave_state::gtid_pos_table **>(arg);
TABLE_LIST tlist; TABLE_LIST tlist;
TABLE *table= NULL; TABLE *table= NULL;
int err; int err;
struct rpl_slave_state::gtid_pos_table *p;
thd->reset_for_next_command(); thd->reset_for_next_command();
tlist.init_one_table(STRING_WITH_LEN("mysql"), table_name->str, tlist.init_one_table(STRING_WITH_LEN("mysql"), table_name->str,
...@@ -1769,14 +1832,7 @@ find_gtid_pos_tables_cb(THD *thd, LEX_STRING *table_name, void *arg) ...@@ -1769,14 +1832,7 @@ find_gtid_pos_tables_cb(THD *thd, LEX_STRING *table_name, void *arg)
if ((err= gtid_check_rpl_slave_state_table(table))) if ((err= gtid_check_rpl_slave_state_table(table)))
goto end; goto end;
err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data);
if (!(p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, table->s->db_type())))
err= 1;
else
{
p->next= *table_list_ptr;
*table_list_ptr= p;
}
end: end:
if (table) if (table)
...@@ -1801,7 +1857,8 @@ int ...@@ -1801,7 +1857,8 @@ int
find_gtid_slave_pos_tables(THD *thd) find_gtid_slave_pos_tables(THD *thd)
{ {
int err= 0; int err= 0;
struct rpl_slave_state::gtid_pos_table *table_list; load_gtid_state_cb_data cb_data;
bool any_running;
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state->loaded; bool loaded= rpl_global_gtid_slave_state->loaded;
...@@ -1809,18 +1866,95 @@ find_gtid_slave_pos_tables(THD *thd) ...@@ -1809,18 +1866,95 @@ find_gtid_slave_pos_tables(THD *thd)
if (!loaded) if (!loaded)
return 0; return 0;
table_list= NULL; cb_data.table_list= NULL;
if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &table_list))) cb_data.default_entry= NULL;
if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data)))
goto end;
if (!cb_data.table_list)
{
my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
rpl_gtid_slave_state_table_name.str);
err= 1;
goto end; goto end;
}
else if (!cb_data.default_entry)
{
/*
If the mysql.gtid_slave_pos table does not exist, but at least one other
table is available, arbitrarily pick the first in the list to use as
default.
*/
cb_data.default_entry= cb_data.table_list;
}
any_running= any_slave_sql_running();
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
rpl_global_gtid_slave_state->set_gtid_pos_tables_list(table_list); if (!any_running)
table_list= NULL; {
rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
cb_data.default_entry);
cb_data.table_list= NULL;
}
else
{
/*
If there are SQL threads running, we cannot safely remove the old list.
However we can add new entries, and warn about any tables that
disappeared, but may still be visible to running SQL threads.
*/
rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr;
old_entry= rpl_global_gtid_slave_state->gtid_pos_tables;
while (old_entry)
{
new_entry= cb_data.table_list;
while (new_entry)
{
if (new_entry->table_hton == old_entry->table_hton)
break;
new_entry= new_entry->next;
}
if (!new_entry)
sql_print_warning("The table mysql.%s was removed. "
"This change will not take full effect "
"until all SQL threads have been restarted",
old_entry->table_name.str);
old_entry= old_entry->next;
}
next_ptr_ptr= &cb_data.table_list;
new_entry= cb_data.table_list;
while (new_entry)
{
/* Check if we already have a table with this storage engine. */
old_entry= rpl_global_gtid_slave_state->gtid_pos_tables;
while (old_entry)
{
if (new_entry->table_hton == old_entry->table_hton)
break;
old_entry= old_entry->next;
}
if (old_entry)
{
/* This new_entry is already available in the list. */
next_ptr_ptr= &new_entry->next;
new_entry= new_entry->next;
}
else
{
/* Move this new_entry to the list. */
rpl_slave_state::gtid_pos_table *next= new_entry->next;
rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry);
*next_ptr_ptr= next;
new_entry= next;
}
}
}
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
end: end:
if (table_list) if (cb_data.table_list)
rpl_global_gtid_slave_state->free_gtid_pos_tables(table_list); rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
return err; return err;
} }
......
include/master-slave.inc
[connection master]
include/stop_slave.inc
CHANGE MASTER TO master_use_gtid=slave_pos;
SET sql_log_bin=0;
CREATE TABLE mysql.gtid_slave_pos_innodb LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_innodb ENGINE=InnoDB;
CREATE TABLE mysql.gtid_slave_pos_tokudb LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_tokudb ENGINE=TokuDB;
CREATE TABLE mysql.gtid_slave_pos_myisam_redundant LIKE mysql.gtid_slave_pos;
CREATE TABLE mysql.gtid_slave_pos_innodb_redundant LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_innodb_redundant ENGINE=InnoDB;
call mtr.add_suppression("Ignoring redundant table.*since.*has the same storage engine");
include/start_slave.inc
CREATE TABLE t1 (a INT PRIMARY KEY);
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
CREATE TABLE t3 (a INT PRIMARY KEY) ENGINE=TokuDB;
INSERT INTO t1 VALUES (1);
INSERT INTO t2 VALUES (1);
INSERT INTO t3 VALUES (1);
SELECT * FROM t1 ORDER BY a;
a
1
SELECT * FROM t2 ORDER BY a;
a
1
SELECT * FROM t3 ORDER BY a;
a
1
SELECT * FROM t1 ORDER BY a;
a
1
SELECT * FROM t2 ORDER BY a;
a
1
SELECT * FROM t3 ORDER BY a;
a
1
SELECT * FROM mysql.gtid_slave_pos ORDER BY sub_id;
domain_id sub_id server_id seq_no
0 3 1 3
0 4 1 4
SELECT * FROM ( SELECT * FROM mysql.gtid_slave_pos_innodb
UNION ALL SELECT * FROM mysql.gtid_slave_pos_innodb_redundant) inner_select
ORDER BY sub_id;
domain_id sub_id server_id seq_no
0 5 1 5
SELECT * FROM mysql.gtid_slave_pos_tokudb ORDER BY sub_id;
domain_id sub_id server_id seq_no
0 6 1 6
SET sql_log_bin=0;
DROP TABLE mysql.gtid_slave_pos_innodb;
DROP TABLE mysql.gtid_slave_pos_tokudb;
DROP TABLE mysql.gtid_slave_pos_myisam_redundant;
DROP TABLE mysql.gtid_slave_pos_innodb_redundant;
SET sql_log_bin=1;
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE t3;
include/rpl_end.inc
--source include/have_tokudb.inc
--source include/have_innodb.inc
--source include/master-slave.inc
--connection server_2
--source include/stop_slave.inc
CHANGE MASTER TO master_use_gtid=slave_pos;
SET sql_log_bin=0;
CREATE TABLE mysql.gtid_slave_pos_innodb LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_innodb ENGINE=InnoDB;
CREATE TABLE mysql.gtid_slave_pos_tokudb LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_tokudb ENGINE=TokuDB;
CREATE TABLE mysql.gtid_slave_pos_myisam_redundant LIKE mysql.gtid_slave_pos;
CREATE TABLE mysql.gtid_slave_pos_innodb_redundant LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_innodb_redundant ENGINE=InnoDB;
call mtr.add_suppression("Ignoring redundant table.*since.*has the same storage engine");
--source include/start_slave.inc
--connection server_1
CREATE TABLE t1 (a INT PRIMARY KEY);
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
CREATE TABLE t3 (a INT PRIMARY KEY) ENGINE=TokuDB;
INSERT INTO t1 VALUES (1);
INSERT INTO t2 VALUES (1);
INSERT INTO t3 VALUES (1);
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
SELECT * FROM t3 ORDER BY a;
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
SELECT * FROM t3 ORDER BY a;
SELECT * FROM mysql.gtid_slave_pos ORDER BY sub_id;
SELECT * FROM ( SELECT * FROM mysql.gtid_slave_pos_innodb
UNION ALL SELECT * FROM mysql.gtid_slave_pos_innodb_redundant) inner_select
ORDER BY sub_id;
SELECT * FROM mysql.gtid_slave_pos_tokudb ORDER BY sub_id;
--connection server_2
SET sql_log_bin=0;
DROP TABLE mysql.gtid_slave_pos_innodb;
DROP TABLE mysql.gtid_slave_pos_tokudb;
DROP TABLE mysql.gtid_slave_pos_myisam_redundant;
DROP TABLE mysql.gtid_slave_pos_innodb_redundant;
SET sql_log_bin=1;
--connection server_1
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE t3;
--source include/rpl_end.inc
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment