Commit c995ecbe authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-12179: Per-engine mysql.gtid_slave_pos table

Intermediate commit.

For each GTID recorded in mysq.gtid_slave_pos, keep track of which
engine the update was made in.

This will be later used to know which rows can be deleted in the table
of a given engine.
parent 087cf023
......@@ -5027,6 +5027,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
int expected_error,actual_error= 0;
Schema_specification_st db_options;
uint64 sub_id= 0;
void *hton= NULL;
rpl_gtid gtid;
Relay_log_info const *rli= rgi->rli;
Rpl_filter *rpl_filter= rli->mi->rpl_filter;
......@@ -5197,7 +5198,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
gtid= rgi->current_gtid;
if (rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id,
true, false))
true, false, &hton))
{
int errcode= thd->get_stmt_da()->sql_errno();
if (!is_parallel_retry_error(rgi, errcode))
......@@ -5417,7 +5418,7 @@ START SLAVE; . Query: '%s'", expected_error, thd->query());
end:
if (sub_id && !thd->is_slave_error)
rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, rgi);
rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, hton, rgi);
/*
Probably we have set thd->query, thd->db, thd->catalog to point to places
......@@ -7899,15 +7900,17 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
int ret;
if (gl_flags & FLAG_IGN_GTIDS)
{
void *hton= NULL;
uint32 i;
for (i= 0; i < count; ++i)
{
if ((ret= rpl_global_gtid_slave_state->record_gtid(thd, &list[i],
sub_id_list[i],
false, false)))
sub_id_list[i],
false, false, &hton)))
return ret;
rpl_global_gtid_slave_state->update_state_hash(sub_id_list[i], &list[i],
NULL);
hton, NULL);
}
}
ret= Log_event::do_apply_event(rgi);
......@@ -8388,6 +8391,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
rpl_gtid gtid;
uint64 sub_id= 0;
Relay_log_info const *rli= rgi->rli;
void *hton= NULL;
/*
XID_EVENT works like a COMMIT statement. And it also updates the
......@@ -8408,7 +8412,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
gtid= rgi->current_gtid;
err= rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id, true,
false);
false, &hton);
if (err)
{
int ec= thd->get_stmt_da()->sql_errno();
......@@ -8441,7 +8445,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
thd->mdl_context.release_transactional_locks();
if (!res && sub_id)
rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, rgi);
rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, hton, rgi);
/*
Increment the global status commit count variable
......
......@@ -33,7 +33,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name=
void
rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
rpl_group_info *rgi)
{
int err;
......@@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
it is even committed.
*/
mysql_mutex_lock(&LOCK_slave_state);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi);
mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
......@@ -74,12 +74,14 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
if (rgi->gtid_pending)
{
uint64 sub_id= rgi->gtid_sub_id;
void *hton= NULL;
rgi->gtid_pending= false;
if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
{
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton))
DBUG_RETURN(1);
update_state_hash(sub_id, &rgi->current_gtid, rgi);
update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
}
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
}
......@@ -287,11 +289,12 @@ rpl_slave_state::truncate_hash()
int
rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
uint64 seq_no, rpl_group_info *rgi)
uint64 seq_no, void *hton, rpl_group_info *rgi)
{
element *elem= NULL;
list_element *list_elem= NULL;
DBUG_ASSERT(hton);
if (!(elem= get_element(domain_id)))
return 1;
......@@ -336,6 +339,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
list_elem->server_id= server_id;
list_elem->sub_id= sub_id;
list_elem->seq_no= seq_no;
list_elem->hton= hton;
elem->add(list_elem);
if (last_sub_id < sub_id)
......@@ -482,7 +486,8 @@ gtid_check_rpl_slave_state_table(TABLE *table)
*/
int
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement)
bool in_transaction, bool in_statement,
void **out_hton)
{
TABLE_LIST tlist;
int err= 0;
......@@ -495,6 +500,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
wait_for_commit* suspended_wfc;
DBUG_ENTER("record_gtid");
*out_hton= NULL;
if (unlikely(!loaded))
{
/*
......@@ -582,6 +588,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table->file->print_error(err, MYF(0));
goto end;
}
*out_hton= table->s->db_type();
if(opt_bin_log &&
(err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
......@@ -1078,11 +1085,12 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
{
rpl_gtid gtid;
uint64 sub_id;
void *hton= NULL;
if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_sub_id(gtid.domain_id)) ||
record_gtid(thd, &gtid, sub_id, false, in_statement) ||
update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL))
record_gtid(thd, &gtid, sub_id, false, in_statement, &hton) ||
update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
return 1;
if (state_from_master == end)
break;
......@@ -1144,7 +1152,7 @@ rpl_slave_state::set_gtid_pos_tables_list(struct rpl_slave_state::gtid_pos_table
struct rpl_slave_state::gtid_pos_table *
rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, handlerton *hton)
rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, void *hton)
{
struct gtid_pos_table *p;
char *allocated_str;
......
......@@ -112,6 +112,7 @@ struct rpl_slave_state
uint64 sub_id;
uint64 seq_no;
uint32 server_id;
void *hton;
};
/* Elements in the HASH that hold the state for one domain_id. */
......@@ -158,7 +159,13 @@ struct rpl_slave_state
/* Descriptor for mysql.gtid_slave_posXXX table in specific engine. */
struct gtid_pos_table {
struct gtid_pos_table *next;
handlerton *table_hton;
/*
Use a void * here, rather than handlerton *, to make explicit that we
are not using the value to access any functionality in the engine. It
is just used as an opaque value to identify which engine we are using
for each GTID row.
*/
void *table_hton;
LEX_STRING table_name;
};
......@@ -179,10 +186,10 @@ struct rpl_slave_state
void truncate_hash();
ulong count() const { return hash.records; }
int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
uint64 seq_no, rpl_group_info *rgi);
uint64 seq_no, void *hton, rpl_group_info *rgi);
int truncate_state_table(THD *thd);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement);
bool in_transaction, bool in_statement, void **out_hton);
uint64 next_sub_id(uint32 domain_id);
int iterate(int (*cb)(rpl_gtid *, void *), void *data,
rpl_gtid *extra_gtids, uint32 num_extra,
......@@ -196,12 +203,13 @@ struct rpl_slave_state
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
void update_state_hash(uint64 sub_id, rpl_gtid *gtid, rpl_group_info *rgi);
void update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
rpl_group_info *rgi);
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi);
void release_domain_owner(rpl_group_info *rgi);
void set_gtid_pos_tables_list(struct gtid_pos_table *new_list);
struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, handlerton *hton);
struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, void *hton);
void free_gtid_pos_tables(struct gtid_pos_table *list);
};
......
......@@ -1468,11 +1468,11 @@ Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; };
struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; };
static int
scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
LEX_STRING *tablename, handlerton **out_hton)
LEX_STRING *tablename, void **out_hton)
{
TABLE_LIST tlist;
TABLE *table;
......@@ -1529,6 +1529,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
tmp_entry.gtid.domain_id= domain_id;
tmp_entry.gtid.server_id= server_id;
tmp_entry.gtid.seq_no= seq_no;
tmp_entry.hton= table->s->db_type();
if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
......@@ -1544,6 +1545,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
DBUG_ASSERT(entry->gtid.domain_id == domain_id);
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
entry->hton= table->s->db_type();
}
else
{
......@@ -1558,6 +1560,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
entry->gtid.domain_id= domain_id;
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
entry->hton= table->s->db_type();
if ((err= my_hash_insert(hash, (uchar *)entry)))
{
my_free(entry);
......@@ -1652,7 +1655,7 @@ load_gtid_state_cb(THD *thd, LEX_STRING *table_name, void *arg)
int err;
load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
struct rpl_slave_state::gtid_pos_table *p;
handlerton *hton;
void *hton;
if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
table_name, &hton)))
......@@ -1707,10 +1710,11 @@ rpl_load_gtid_slave_state(THD *thd)
{
get_dynamic(&array, (uchar *)&tmp_entry, i);
if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id,
tmp_entry.gtid.server_id,
tmp_entry.sub_id,
tmp_entry.gtid.seq_no,
NULL)))
tmp_entry.gtid.server_id,
tmp_entry.sub_id,
tmp_entry.gtid.seq_no,
tmp_entry.hton,
NULL)))
{
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment