Commit 8a3e2f29 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-6718: Server crashed in Gtid_log_event::Gtid_log_event with parallel replication

The bug occured in parallel replication when re-trying transactions that
failed due to deadlock. In this case, the relay log file is re-opened and the
events are read out again. This reading requires a format description event of
the appropriate version. But the code was using a description event stored in
rli, which is not thread-safe. This could lead to various rare races if the
format description event was replaced by the SQL driver thread at the exact
moment where a worker thread was trying to use it.

The fix is to instead make the retry code create and maintain its own format
description event. When the relay log file is opened, we first read the format
description event from the start of the file, before seeking to the current
position. This now uses the same code as when the SQL driver threads starts
from a given relay log position. This also makes sure that the correct format
description event version will be used in cases where the version of the
binlog could change during replication.
parent a98a034c
...@@ -290,6 +290,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, ...@@ -290,6 +290,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
THD *thd= rgi->thd; THD *thd= rgi->thd;
rpl_parallel_entry *entry= rgi->parallel_entry; rpl_parallel_entry *entry= rgi->parallel_entry;
ulong retries= 0; ulong retries= 0;
Format_description_log_event *description_event= NULL;
do_retry: do_retry:
event_count= 0; event_count= 0;
...@@ -355,6 +356,14 @@ do_retry: ...@@ -355,6 +356,14 @@ do_retry:
goto err; goto err;
} }
cur_offset= rgi->retry_start_offset; cur_offset= rgi->retry_start_offset;
delete description_event;
description_event=
read_relay_log_description_event(&rlog, cur_offset, &errmsg);
if (!description_event)
{
err= 1;
goto err;
}
my_b_seek(&rlog, cur_offset); my_b_seek(&rlog, cur_offset);
do do
...@@ -367,8 +376,7 @@ do_retry: ...@@ -367,8 +376,7 @@ do_retry:
for (;;) for (;;)
{ {
old_offset= cur_offset; old_offset= cur_offset;
ev= Log_event::read_log_event(&rlog, 0, ev= Log_event::read_log_event(&rlog, 0, description_event,
rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
opt_slave_sql_verify_checksum); opt_slave_sql_verify_checksum);
cur_offset= my_b_tell(&rlog); cur_offset= my_b_tell(&rlog);
...@@ -416,7 +424,12 @@ do_retry: ...@@ -416,7 +424,12 @@ do_retry:
} }
event_type= ev->get_type_code(); event_type= ev->get_type_code();
if (!Log_event::is_group_event(event_type)) if (event_type == FORMAT_DESCRIPTION_EVENT)
{
delete description_event;
description_event= (Format_description_log_event *)ev;
continue;
} else if (!Log_event::is_group_event(event_type))
{ {
delete ev; delete ev;
continue; continue;
...@@ -472,6 +485,8 @@ do_retry: ...@@ -472,6 +485,8 @@ do_retry:
err: err:
if (description_event)
delete description_event;
if (fd >= 0) if (fd >= 0)
{ {
end_io_cache(&rlog); end_io_cache(&rlog);
......
...@@ -518,6 +518,90 @@ void Relay_log_info::clear_until_condition() ...@@ -518,6 +518,90 @@ void Relay_log_info::clear_until_condition()
} }
/*
Read the correct format description event for starting to replicate from
a given position in a relay log file.
*/
Format_description_log_event *
read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
const char **errmsg)
{
Log_event *ev;
Format_description_log_event *fdev;
bool found= false;
/*
By default the relay log is in binlog format 3 (4.0).
Even if format is 4, this will work enough to read the first event
(Format_desc) (remember that format 4 is just lenghtened compared to format
3; format 3 is a prefix of format 4).
*/
fdev= new Format_description_log_event(3);
while (!found)
{
Log_event_type typ;
/*
Read the possible Format_description_log_event; if position
was 4, no need, it will be read naturally.
*/
DBUG_PRINT("info",("looking for a Format_description_log_event"));
if (my_b_tell(cur_log) >= start_pos)
break;
if (!(ev= Log_event::read_log_event(cur_log, 0, fdev,
opt_slave_sql_verify_checksum)))
{
DBUG_PRINT("info",("could not read event, cur_log->error=%d",
cur_log->error));
if (cur_log->error) /* not EOF */
{
*errmsg= "I/O error reading event at position 4";
delete fdev;
return NULL;
}
break;
}
typ= ev->get_type_code();
if (typ == FORMAT_DESCRIPTION_EVENT)
{
DBUG_PRINT("info",("found Format_description_log_event"));
delete fdev;
fdev= (Format_description_log_event*) ev;
/*
As ev was returned by read_log_event, it has passed is_valid(), so
my_malloc() in ctor worked, no need to check again.
*/
/*
Ok, we found a Format_description event. But it is not sure that this
describes the whole relay log; indeed, one can have this sequence
(starting from position 4):
Format_desc (of slave)
Rotate (of master)
Format_desc (of master)
So the Format_desc which really describes the rest of the relay log
is the 3rd event (it can't be further than that, because we rotate
the relay log when we queue a Rotate event from the master).
But what describes the Rotate is the first Format_desc.
So what we do is:
go on searching for Format_description events, until you exceed the
position (argument 'pos') or until you find another event than Rotate
or Format_desc.
*/
}
else
{
DBUG_PRINT("info",("found event of another type=%d", typ));
found= (typ != ROTATE_EVENT);
delete ev;
}
}
return fdev;
}
/* /*
Open the given relay log Open the given relay log
...@@ -641,68 +725,13 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log, ...@@ -641,68 +725,13 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,
*/ */
if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */ if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
{ {
Log_event* ev; if (look_for_description_event)
while (look_for_description_event)
{ {
/* Format_description_log_event *fdev;
Read the possible Format_description_log_event; if position if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg)))
was 4, no need, it will be read naturally. goto err;
*/ delete rli->relay_log.description_event_for_exec;
DBUG_PRINT("info",("looking for a Format_description_log_event")); rli->relay_log.description_event_for_exec= fdev;
if (my_b_tell(rli->cur_log) >= pos)
break;
/*
Because of we have rli->data_lock and log_lock, we can safely read an
event
*/
if (!(ev= Log_event::read_log_event(rli->cur_log, 0,
rli->relay_log.description_event_for_exec,
opt_slave_sql_verify_checksum)))
{
DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
rli->cur_log->error));
if (rli->cur_log->error) /* not EOF */
{
*errmsg= "I/O error reading event at position 4";
goto err;
}
break;
}
else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
{
DBUG_PRINT("info",("found Format_description_log_event"));
delete rli->relay_log.description_event_for_exec;
rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
/*
As ev was returned by read_log_event, it has passed is_valid(), so
my_malloc() in ctor worked, no need to check again.
*/
/*
Ok, we found a Format_description event. But it is not sure that this
describes the whole relay log; indeed, one can have this sequence
(starting from position 4):
Format_desc (of slave)
Rotate (of master)
Format_desc (of master)
So the Format_desc which really describes the rest of the relay log
is the 3rd event (it can't be further than that, because we rotate
the relay log when we queue a Rotate event from the master).
But what describes the Rotate is the first Format_desc.
So what we do is:
go on searching for Format_description events, until you exceed the
position (argument 'pos') or until you find another event than Rotate
or Format_desc.
*/
}
else
{
DBUG_PRINT("info",("found event of another type=%d",
ev->get_type_code()));
look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
delete ev;
}
} }
my_b_seek(rli->cur_log,(off_t)pos); my_b_seek(rli->cur_log,(off_t)pos);
#ifndef DBUG_OFF #ifndef DBUG_OFF
......
...@@ -220,6 +220,10 @@ void end_relay_log_info(Relay_log_info* rli); ...@@ -220,6 +220,10 @@ void end_relay_log_info(Relay_log_info* rli);
void lock_slave_threads(Master_info* mi); void lock_slave_threads(Master_info* mi);
void unlock_slave_threads(Master_info* mi); void unlock_slave_threads(Master_info* mi);
void init_thread_mask(int* mask,Master_info* mi,bool inverse); void init_thread_mask(int* mask,Master_info* mi,bool inverse);
Format_description_log_event *
read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
const char **errmsg);
int init_relay_log_pos(Relay_log_info* rli,const char* log,ulonglong pos, int init_relay_log_pos(Relay_log_info* rli,const char* log,ulonglong pos,
bool need_data_lock, const char** errmsg, bool need_data_lock, const char** errmsg,
bool look_for_description_event); bool look_for_description_event);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment