Commit 087cf023 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-12179: Per-engine mysql.gtid_slave_pos table

Intermediate commit.

Keep track of which mysql.gtid_slave_posXXX tables are available for each
engine, by searching for all tables in the mysql schema with names that
start with "gtid_slave_pos".

The list is computed at server start when the GTID position is loaded, and
it is re-computed on every START SLAVE command. This way, the DBA can
manually add a table for a new engine, and it will be automatically picked
up on next START SLAVE, so a full server restart is not needed.

The list is not yet actually used in the code.
parent 141a1b09
......@@ -243,7 +243,7 @@ rpl_slave_state_free_element(void *arg)
rpl_slave_state::rpl_slave_state()
: last_sub_id(0), loaded(false)
: last_sub_id(0), gtid_pos_tables(0), loaded(false)
{
mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state,
MY_MUTEX_INIT_SLOW);
......@@ -255,6 +255,7 @@ rpl_slave_state::rpl_slave_state()
rpl_slave_state::~rpl_slave_state()
{
free_gtid_pos_tables(gtid_pos_tables);
truncate_hash();
my_hash_free(&hash);
delete_dynamic(&gtid_sort_array);
......@@ -1115,6 +1116,56 @@ rpl_slave_state::is_empty()
}
void
rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list)
{
struct gtid_pos_table *cur, *next;
cur= list;
while (cur)
{
next= cur->next;
my_free(cur);
cur= next;
}
}
void
rpl_slave_state::set_gtid_pos_tables_list(struct rpl_slave_state::gtid_pos_table *new_list)
{
struct gtid_pos_table *old_list;
mysql_mutex_assert_owner(&LOCK_slave_state);
old_list= gtid_pos_tables;
gtid_pos_tables= new_list;
free_gtid_pos_tables(old_list);
}
struct rpl_slave_state::gtid_pos_table *
rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, handlerton *hton)
{
struct gtid_pos_table *p;
char *allocated_str;
if (!my_multi_malloc(MYF(MY_WME),
&p, sizeof(*p),
&allocated_str, table_name->length+1,
NULL))
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1));
return NULL;
}
memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0'
p->next = NULL;
p->table_hton= hton;
p->table_name.str= allocated_str;
p->table_name.length= table_name->length;
return p;
}
void rpl_binlog_state::init()
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
......
......@@ -155,6 +155,13 @@ struct rpl_slave_state
}
};
/* Descriptor for mysql.gtid_slave_posXXX table in specific engine. */
struct gtid_pos_table {
struct gtid_pos_table *next;
handlerton *table_hton;
LEX_STRING table_name;
};
/* Mapping from domain_id to its element. */
HASH hash;
/* Mutex protecting access to the state. */
......@@ -163,6 +170,7 @@ struct rpl_slave_state
DYNAMIC_ARRAY gtid_sort_array;
uint64 last_sub_id;
struct gtid_pos_table *gtid_pos_tables;
bool loaded;
rpl_slave_state();
......@@ -192,6 +200,9 @@ struct rpl_slave_state
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi);
void release_domain_owner(rpl_group_info *rgi);
void set_gtid_pos_tables_list(struct gtid_pos_table *new_list);
struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, handlerton *hton);
void free_gtid_pos_tables(struct gtid_pos_table *list);
};
......
......@@ -1472,7 +1472,7 @@ struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; };
static int
scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
LEX_STRING *tablename)
LEX_STRING *tablename, handlerton **out_hton)
{
TABLE_LIST tlist;
TABLE *table;
......@@ -1577,6 +1577,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
}
if (table_opened)
{
*out_hton= table->s->db_type();
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
}
......@@ -1642,13 +1643,25 @@ scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_STRING *, void *),
struct load_gtid_state_cb_data {
HASH *hash;
DYNAMIC_ARRAY *array;
struct rpl_slave_state::gtid_pos_table *table_list;
};
static int
load_gtid_state_cb(THD *thd, LEX_STRING *table_name, void *arg)
{
int err;
load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
return scan_one_gtid_slave_pos_table(thd, data->hash, data->array, table_name);
struct rpl_slave_state::gtid_pos_table *p;
handlerton *hton;
if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
table_name, &hton)))
return err;
if (!(p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, hton)))
return 1;
p->next= data->table_list;
data->table_list= p;
return 0;
}
......@@ -1670,6 +1683,7 @@ rpl_load_gtid_slave_state(THD *thd)
if (loaded)
DBUG_RETURN(0);
cb_data.table_list= NULL;
my_hash_init(&hash, &my_charset_bin, 32,
offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
......@@ -1717,6 +1731,8 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list);
cb_data.table_list= NULL;
rpl_global_gtid_slave_state->loaded= true;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
......@@ -1724,10 +1740,87 @@ rpl_load_gtid_slave_state(THD *thd)
if (array_inited)
delete_dynamic(&array);
my_hash_free(&hash);
if (cb_data.table_list)
rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
DBUG_RETURN(err);
}
static int
find_gtid_pos_tables_cb(THD *thd, LEX_STRING *table_name, void *arg)
{
struct rpl_slave_state::gtid_pos_table **table_list_ptr=
static_cast<struct rpl_slave_state::gtid_pos_table **>(arg);
TABLE_LIST tlist;
TABLE *table= NULL;
int err;
struct rpl_slave_state::gtid_pos_table *p;
thd->reset_for_next_command();
tlist.init_one_table(STRING_WITH_LEN("mysql"), table_name->str,
table_name->length, NULL, TL_READ);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table= tlist.table;
if ((err= gtid_check_rpl_slave_state_table(table)))
goto end;
if (!(p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, table->s->db_type())))
err= 1;
else
{
p->next= *table_list_ptr;
*table_list_ptr= p;
}
end:
if (table)
{
ha_commit_trans(thd, FALSE);
ha_commit_trans(thd, TRUE);
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
}
return err;
}
/*
Re-compute the list of available mysql.gtid_slave_posXXX tables.
This is done at START SLAVE to pick up any newly created tables without
requiring server restart.
*/
int
find_gtid_slave_pos_tables(THD *thd)
{
int err= 0;
struct rpl_slave_state::gtid_pos_table *table_list;
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state->loaded;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
if (!loaded)
return 0;
table_list= NULL;
if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &table_list)))
goto end;
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
rpl_global_gtid_slave_state->set_gtid_pos_tables_list(table_list);
table_list= NULL;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
end:
if (table_list)
rpl_global_gtid_slave_state->free_gtid_pos_tables(table_list);
return err;
}
void
rpl_group_info::reinit(Relay_log_info *rli)
{
......
......@@ -959,6 +959,7 @@ extern struct rpl_slave_state *rpl_global_gtid_slave_state;
extern gtid_waiting rpl_global_gtid_waiting;
int rpl_load_gtid_slave_state(THD *thd);
int find_gtid_slave_pos_tables(THD *thd);
int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
void delete_or_keep_event_post_apply(rpl_group_info *rgi,
Log_event_type typ, Log_event *ev);
......
......@@ -3031,6 +3031,10 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
DBUG_RETURN(-1);
/* Re-load the set of mysql.gtid_slave_posXXX tables available. */
if (find_gtid_slave_pos_tables(thd))
DBUG_RETURN(-1);
create_logfile_name_with_suffix(master_info_file_tmp,
sizeof(master_info_file_tmp),
master_info_file, 0,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment