Commit bc2903e7 authored by Kristian Nielsen's avatar Kristian Nielsen

Merge branch 'gtid_table_garbage_rows' into 10.1

parents 3c3c4ae2 2f4a0c5b
......@@ -571,4 +571,10 @@ SET GLOBAL slave_parallel_mode=@old_parallel_mode;
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
DROP TABLE t1, t2, t3;
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
Check that no more than the expected last two GTIDs are in mysql.gtid_slave_pos
select count(*) from mysql.gtid_slave_pos order by domain_id, sub_id;
count(*)
2
include/rpl_end.inc
......@@ -549,5 +549,22 @@ SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--connection server_1
DROP TABLE t1, t2, t3;
--source include/save_master_gtid.inc
--connection server_2
--source include/sync_with_master_gtid.inc
# Check for left-over rows in table mysql.gtid_slave_pos (MDEV-12147).
#
# There was a bug when a transaction got a conflict and was rolled back. It
# might have also handled deletion of some old rows, and these deletions would
# then also be rolled back. And since the deletes were never re-tried, old no
# longer needed rows would accumulate in the table without limit.
#
# The earlier part of this test file have plenty of transactions being rolled
# back. But the last DROP TABLE statement runs on its own and should never
# conflict, thus at this point the mysql.gtid_slave_pos table should be clean.
--echo Check that no more than the expected last two GTIDs are in mysql.gtid_slave_pos
select count(*) from mysql.gtid_slave_pos order by domain_id, sub_id;
--connection server_1
--source include/rpl_end.inc
......@@ -4429,7 +4429,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
gtid= rgi->current_gtid;
if (rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id,
true, false))
rgi, false))
{
int errcode= thd->get_stmt_da()->sql_errno();
if (!is_parallel_retry_error(rgi, errcode))
......@@ -7132,7 +7132,7 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
{
if ((ret= rpl_global_gtid_slave_state->record_gtid(thd, &list[i],
sub_id_list[i],
false, false)))
NULL, false)))
return ret;
rpl_global_gtid_slave_state->update_state_hash(sub_id_list[i], &list[i],
NULL);
......@@ -7639,7 +7639,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
rgi->gtid_pending= false;
gtid= rgi->current_gtid;
err= rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id, true,
err= rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id, rgi,
false);
if (err)
{
......
......@@ -77,7 +77,7 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
rgi->gtid_pending= false;
if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
{
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
if (record_gtid(thd, &rgi->current_gtid, sub_id, NULL, false))
DBUG_RETURN(1);
update_state_hash(sub_id, &rgi->current_gtid, rgi);
}
......@@ -328,6 +328,8 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
}
}
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
rgi->pending_gtid_deletes_clear();
}
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
......@@ -377,15 +379,24 @@ int
rpl_slave_state::put_back_list(uint32 domain_id, list_element *list)
{
element *e;
int err= 0;
mysql_mutex_lock(&LOCK_slave_state);
if (!(e= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
return 1;
{
err= 1;
goto end;
}
while (list)
{
list_element *next= list->next;
e->add(list);
list= next;
}
return 0;
end:
mysql_mutex_unlock(&LOCK_slave_state);
return err;
}
......@@ -468,12 +479,12 @@ gtid_check_rpl_slave_state_table(TABLE *table)
/*
Write a gtid to the replication slave state table.
Do it as part of the transaction, to get slave crash safety, or as a separate
transaction if !in_transaction (eg. MyISAM or DDL).
gtid The global transaction id for this event group.
sub_id Value allocated within the sub_id when the event group was
read (sub_id must be consistent with commit order in master binlog).
rgi rpl_group_info context, if we are recording the gtid transactionally
as part of replicating a transactional event. NULL if called from
outside of a replicated transaction.
Note that caller must later ensure that the new gtid and sub_id is inserted
into the appropriate HASH element with rpl_slave_state.add(), so that it can
......@@ -481,13 +492,13 @@ gtid_check_rpl_slave_state_table(TABLE *table)
*/
int
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement)
rpl_group_info *rgi, bool in_statement)
{
TABLE_LIST tlist;
int err= 0;
bool table_opened= false;
TABLE *table;
list_element *elist= 0, *next;
list_element *elist= 0, *cur, *next;
element *elem;
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
......@@ -558,7 +569,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
thd->wsrep_ignore_table= true;
#endif
if (!in_transaction)
if (!rgi)
{
DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
thd->variables.option_bits&=
......@@ -601,9 +612,9 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
if ((elist= elem->grab_list()) != NULL)
{
/* Delete any old stuff, but keep around the most recent one. */
list_element *cur= elist;
uint64 best_sub_id= cur->sub_id;
uint64 best_sub_id= elist->sub_id;
list_element **best_ptr_ptr= &elist;
cur= elist;
while ((next= cur->next))
{
if (next->sub_id > best_sub_id)
......@@ -636,7 +647,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table->file->print_error(err, MYF(0));
goto end;
}
while (elist)
cur = elist;
while (cur)
{
uchar key_buffer[4+8];
......@@ -646,9 +658,9 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
/* `break' does not work inside DBUG_EXECUTE_IF */
goto dbug_break; });
next= elist->next;
next= cur->next;
table->field[1]->store(elist->sub_id, true);
table->field[1]->store(cur->sub_id, true);
/* domain_id is already set in table->record[0] from write_row() above. */
key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
if (table->file->ha_index_read_map(table->record[1], key_buffer,
......@@ -662,8 +674,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
not want to endlessly error on the same element in case of table
corruption or such.
*/
my_free(elist);
elist= next;
cur= next;
if (err)
break;
}
......@@ -686,18 +697,31 @@ IF_DBUG(dbug_break:, )
*/
if (elist)
{
mysql_mutex_lock(&LOCK_slave_state);
put_back_list(gtid->domain_id, elist);
mysql_mutex_unlock(&LOCK_slave_state);
elist = 0;
}
ha_rollback_trans(thd, FALSE);
}
close_thread_tables(thd);
if (in_transaction)
if (rgi)
{
thd->mdl_context.release_statement_locks();
/*
Save the list of old gtid entries we deleted. If this transaction
fails later for some reason and is rolled back, the deletion of those
entries will be rolled back as well, and we will need to put them back
on the to-be-deleted list so we can re-do the deletion. Otherwise
redundant rows in mysql.gtid_slave_pos may accumulate if transactions
are rolled back and retried after record_gtid().
*/
rgi->pending_gtid_deletes_save(gtid->domain_id, elist);
}
else
{
thd->mdl_context.release_transactional_locks();
rpl_group_info::pending_gtid_deletes_free(elist);
}
}
thd->lex->restore_backup_query_tables_list(&lex_backup);
thd->variables.option_bits= thd_saved_option;
......@@ -1080,7 +1104,7 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_sub_id(gtid.domain_id)) ||
record_gtid(thd, &gtid, sub_id, false, in_statement) ||
record_gtid(thd, &gtid, sub_id, NULL, in_statement) ||
update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL))
return 1;
if (state_from_master == end)
......
......@@ -182,7 +182,7 @@ struct rpl_slave_state
uint64 seq_no, rpl_group_info *rgi);
int truncate_state_table(THD *thd);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement);
rpl_group_info *rgi, bool in_statement);
uint64 next_sub_id(uint32 domain_id);
int iterate(int (*cb)(rpl_gtid *, void *), void *data,
rpl_gtid *extra_gtids, uint32 num_extra,
......
......@@ -1680,6 +1680,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
long_find_row_note_printed= false;
did_mark_start_commit= false;
gtid_ev_flags2= 0;
pending_gtid_delete_list= NULL;
last_master_timestamp = 0;
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
speculation= SPECULATE_NO;
......@@ -1804,6 +1805,12 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
erroneously update the GTID position.
*/
gtid_pending= false;
/*
Rollback will have undone any deletions of old rows we might have made
in mysql.gtid_slave_pos. Put those rows back on the list to be deleted.
*/
pending_gtid_deletes_put_back();
}
m_table_map.clear_tables();
slave_close_thread_tables(thd);
......@@ -2027,6 +2034,78 @@ rpl_group_info::unmark_start_commit()
}
/*
When record_gtid() has deleted any old rows from the table
mysql.gtid_slave_pos as part of a replicated transaction, save the list of
rows deleted here.
If later the transaction fails (eg. optimistic parallel replication), the
deletes will be undone when the transaction is rolled back. Then we can
put back the list of rows into the rpl_global_gtid_slave_state, so that
we can re-do the deletes and avoid accumulating old rows in the table.
*/
void
rpl_group_info::pending_gtid_deletes_save(uint32 domain_id,
rpl_slave_state::list_element *list)
{
/*
We should never get to a state where we try to save a new pending list of
gtid deletes while we still have an old one. But make sure we handle it
anyway just in case, so we avoid leaving stray entries in the
mysql.gtid_slave_pos table.
*/
DBUG_ASSERT(!pending_gtid_delete_list);
if (unlikely(pending_gtid_delete_list))
pending_gtid_deletes_put_back();
pending_gtid_delete_list= list;
pending_gtid_delete_list_domain= domain_id;
}
/*
Take the list recorded by pending_gtid_deletes_save() and put it back into
rpl_global_gtid_slave_state. This is needed if deletion of the rows was
rolled back due to transaction failure.
*/
void
rpl_group_info::pending_gtid_deletes_put_back()
{
if (pending_gtid_delete_list)
{
rpl_global_gtid_slave_state->put_back_list(pending_gtid_delete_list_domain,
pending_gtid_delete_list);
pending_gtid_delete_list= NULL;
}
}
/*
Free the list recorded by pending_gtid_deletes_save(). Done when the deletes
in the list have been permanently committed.
*/
void
rpl_group_info::pending_gtid_deletes_clear()
{
pending_gtid_deletes_free(pending_gtid_delete_list);
pending_gtid_delete_list= NULL;
}
void
rpl_group_info::pending_gtid_deletes_free(rpl_slave_state::list_element *list)
{
rpl_slave_state::list_element *next;
while (list)
{
next= list->next;
my_free(list);
list= next;
}
}
rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
: rpl_filter(filter)
{
......
......@@ -676,6 +676,11 @@ struct rpl_group_info
/* Needs room for "Gtid D-S-N\x00". */
char gtid_info_buf[5+10+1+10+1+20+1];
/* List of not yet committed deletions in mysql.gtid_slave_pos. */
rpl_slave_state::list_element *pending_gtid_delete_list;
/* Domain associated with pending_gtid_delete_list. */
uint32 pending_gtid_delete_list_domain;
/*
The timestamp, from the master, of the commit event.
Used to do delayed update of rli->last_master_timestamp, for getting
......@@ -817,6 +822,12 @@ struct rpl_group_info
char *gtid_info();
void unmark_start_commit();
static void pending_gtid_deletes_free(rpl_slave_state::list_element *list);
void pending_gtid_deletes_save(uint32 domain_id,
rpl_slave_state::list_element *list);
void pending_gtid_deletes_put_back();
void pending_gtid_deletes_clear();
time_t get_row_stmt_start_timestamp()
{
return row_stmt_start_timestamp;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment