Commit d3837c69 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-12179: Per-engine mysql.gtid_slave_pos table.

Intermediate commit.

On server start, look for and read all tables mysql.gtid_slave_pos* to
restore the GTID position.

Simple test case that moves the data to a new
mysql.gtid_slave_pos_innodb table and verifies that the new table is
read at server start.
parent ebce6825
include/rpl_init.inc [topology=1->2]
include/stop_slave.inc
CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc
CREATE TABLE t1 (a INT PRIMARY KEY);
INSERT INTO t1 VALUES (1);
SELECT * FROM t1 ORDER BY a;
a
1
SELECT * FROM t1 ORDER BY a;
a
1
include/stop_slave.inc
SET sql_log_bin=0;
CREATE TABLE mysql.gtid_slave_pos_innodb LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_innodb ENGINE=InnoDB;
INSERT INTO mysql.gtid_slave_pos_innodb SELECT * FROM mysql.gtid_slave_pos;
TRUNCATE mysql.gtid_slave_pos;
SET sql_log_bin=1;
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (3);
SELECT * FROM t1 ORDER BY a;
a
1
2
3
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a
1
2
3
SET sql_log_bin=0;
DROP TABLE mysql.gtid_slave_pos_innodb;
SET sql_log_bin=1;
DROP TABLE t1;
include/rpl_end.inc
--let $rpl_topology=1->2
--source include/rpl_init.inc
--source include/have_innodb.inc
--connection server_2
--source include/stop_slave.inc
CHANGE MASTER TO master_use_gtid=slave_pos;
--source include/start_slave.inc
--connection server_1
CREATE TABLE t1 (a INT PRIMARY KEY);
INSERT INTO t1 VALUES (1);
SELECT * FROM t1 ORDER BY a;
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t1 ORDER BY a;
--source include/stop_slave.inc
SET sql_log_bin=0;
CREATE TABLE mysql.gtid_slave_pos_innodb LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_innodb ENGINE=InnoDB;
INSERT INTO mysql.gtid_slave_pos_innodb SELECT * FROM mysql.gtid_slave_pos;
TRUNCATE mysql.gtid_slave_pos;
SET sql_log_bin=1;
# Restart the slave mysqld server, and verify that the GTID position is
# read correctly from the new mysql.gtid_slave_pos_innodb table.
--write_file $MYSQLTEST_VARDIR/tmp/mysqld.2.expect
wait
EOF
--shutdown_server 30
--source include/wait_until_disconnected.inc
--connection server_1
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (3);
SELECT * FROM t1 ORDER BY a;
--source include/save_master_gtid.inc
# Let the slave mysqld server start again.
--append_file $MYSQLTEST_VARDIR/tmp/mysqld.2.expect
restart: --skip-slave-start=0
EOF
--connection server_2
--enable_reconnect
--source include/wait_until_connected_again.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
--connection server_2
SET sql_log_bin=0;
DROP TABLE mysql.gtid_slave_pos_innodb;
SET sql_log_bin=1;
--connection server_1
DROP TABLE t1;
--source include/rpl_end.inc
......@@ -32,6 +32,8 @@
#include "slave.h"
#include <mysql/plugin.h>
#include <mysql/service_thd_wait.h>
#include "lock.h"
#include "sql_table.h"
static int count_relay_log_space(Relay_log_info* rli);
......@@ -1466,41 +1468,22 @@ Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
rpl_load_gtid_slave_state(THD *thd)
struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; };
static int
scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
LEX_STRING *tablename)
{
TABLE_LIST tlist;
TABLE *table;
bool table_opened= false;
bool table_scanned= false;
bool array_inited= false;
struct local_element { uint64 sub_id; rpl_gtid gtid; };
struct local_element tmp_entry, *entry;
HASH hash;
DYNAMIC_ARRAY array;
struct gtid_pos_element tmp_entry, *entry;
int err= 0;
uint32 i;
DBUG_ENTER("rpl_load_gtid_slave_state");
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state->loaded;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
if (loaded)
DBUG_RETURN(0);
my_hash_init(&hash, &my_charset_bin, 32,
offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
if ((err= my_init_dynamic_array(&array, sizeof(local_element), 0, 0, MYF(0))))
goto end;
array_inited= true;
thd->reset_for_next_command();
tlist.init_one_table(STRING_WITH_LEN("mysql"),
rpl_gtid_slave_state_table_name.str,
rpl_gtid_slave_state_table_name.length,
NULL, TL_READ);
tlist.init_one_table(STRING_WITH_LEN("mysql"), tablename->str,
tablename->length, NULL, TL_READ);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table_opened= true;
......@@ -1546,15 +1529,15 @@ rpl_load_gtid_slave_state(THD *thd)
tmp_entry.gtid.domain_id= domain_id;
tmp_entry.gtid.server_id= server_id;
tmp_entry.gtid.seq_no= seq_no;
if ((err= insert_dynamic(&array, (uchar *)&tmp_entry)))
if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
if ((rec= my_hash_search(hash, (const uchar *)&domain_id, 0)))
{
entry= (struct local_element *)rec;
entry= (struct gtid_pos_element *)rec;
if (entry->sub_id >= sub_id)
continue;
entry->sub_id= sub_id;
......@@ -1564,7 +1547,7 @@ rpl_load_gtid_slave_state(THD *thd)
}
else
{
if (!(entry= (struct local_element *)my_malloc(sizeof(*entry),
if (!(entry= (struct gtid_pos_element *)my_malloc(sizeof(*entry),
MYF(MY_WME))))
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
......@@ -1575,7 +1558,7 @@ rpl_load_gtid_slave_state(THD *thd)
entry->gtid.domain_id= domain_id;
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
if ((err= my_hash_insert(&hash, (uchar *)entry)))
if ((err= my_hash_insert(hash, (uchar *)entry)))
{
my_free(entry);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
......@@ -1583,6 +1566,104 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
}
err= 0; /* Clear HA_ERR_END_OF_FILE */
end:
if (table_scanned)
{
table->file->ha_index_or_rnd_end();
ha_commit_trans(thd, FALSE);
ha_commit_trans(thd, TRUE);
}
if (table_opened)
{
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
}
return err;
}
/*
Look for all tables mysql.gtid_slave_pos*. Read all rows from each such
table found into ARRAY. For each domain id, put the row with highest sub_id
into HASH.
*/
static int
scan_all_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array)
{
static LEX_STRING mysql_db_name= {C_STRING_WITH_LEN("mysql")};
char path[FN_REFLEN];
MY_DIR *dirp;
thd->reset_for_next_command();
if (lock_schema_name(thd, mysql_db_name.str))
return 1;
build_table_filename(path, sizeof(path) - 1, mysql_db_name.str, "", "", 0);
if (!(dirp= my_dir(path, MYF(MY_DONT_SORT))))
{
my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno);
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
return 1;
}
else
{
size_t i;
Dynamic_array<LEX_STRING*> files(dirp->number_of_files);
Discovered_table_list tl(thd, &files);
int err;
err= ha_discover_table_names(thd, &mysql_db_name, dirp, &tl, false);
my_dirend(dirp);
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
if (err)
return err;
for (i = 0; i < files.elements(); ++i)
{
if (strncmp(files.at(i)->str,
rpl_gtid_slave_state_table_name.str,
rpl_gtid_slave_state_table_name.length) == 0)
{
if ((err= scan_one_gtid_slave_pos_table(thd, hash, array, files.at(i))))
return err;
}
}
}
return 0;
}
int
rpl_load_gtid_slave_state(THD *thd)
{
bool array_inited= false;
struct gtid_pos_element tmp_entry, *entry;
HASH hash;
DYNAMIC_ARRAY array;
int err= 0;
uint32 i;
DBUG_ENTER("rpl_load_gtid_slave_state");
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state->loaded;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
if (loaded)
DBUG_RETURN(0);
my_hash_init(&hash, &my_charset_bin, 32,
offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
if ((err= my_init_dynamic_array(&array, sizeof(gtid_pos_element), 0, 0, MYF(0))))
goto end;
array_inited= true;
if ((err= scan_all_gtid_slave_pos_table(thd, &hash, &array)))
goto end;
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
if (rpl_global_gtid_slave_state->loaded)
......@@ -1608,7 +1689,7 @@ rpl_load_gtid_slave_state(THD *thd)
for (i= 0; i < hash.records; ++i)
{
entry= (struct local_element *)my_hash_element(&hash, i);
entry= (struct gtid_pos_element *)my_hash_element(&hash, i);
if (opt_bin_log &&
mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
entry->gtid.seq_no))
......@@ -1622,20 +1703,7 @@ rpl_load_gtid_slave_state(THD *thd)
rpl_global_gtid_slave_state->loaded= true;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
err= 0; /* Clear HA_ERR_END_OF_FILE */
end:
if (table_scanned)
{
table->file->ha_index_or_rnd_end();
ha_commit_trans(thd, FALSE);
ha_commit_trans(thd, TRUE);
}
if (table_opened)
{
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
}
if (array_inited)
delete_dynamic(&array);
my_hash_free(&hash);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment