Commit 39df665a authored by unknown's avatar unknown

MDEV-5206: Incorrect slave old-style position in MDEV-4506, parallel replication.

In parallel replication, there are two kinds of events which are
executed in different ways.

Normal events that are part of event groups/transactions are executed
asynchroneously by being queued for a worker thread.

Other events like format description and rotate and such are executed
directly in the driver SQL thread.

If the direct execution of the other events were to update the old-style
position, then the position gets updated too far ahead, before the normal
events that have been queued for a worker thread have been executed. So
this patch adds some special cases to prevent such position updates ahead
of time, and instead queues dummy events for the worker threads, so that
they will at an appropriate time do the position updates instead.

(Also fix a race in a test case that happened to trigger while running
tests for this patch).
parent 9c8da4ed
......@@ -92,6 +92,7 @@ INSERT INTO t2 VALUES (foo(10,
--connection server_2
FLUSH LOGS;
--source include/wait_for_binlog_checkpoint.inc
SET sql_log_bin=0;
--delimiter ||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
......@@ -148,6 +149,7 @@ SELECT * FROM t2 WHERE a >= 10 ORDER BY a;
--let $binlog_file= slave-bin.000002
--source include/show_binlog_events.inc
FLUSH LOGS;
--source include/wait_for_binlog_checkpoint.inc
# Restart all the slave parallel worker threads, to clear all debug_sync actions.
--connection server_2
......@@ -161,6 +163,7 @@ SET debug_sync='RESET';
--echo *** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
--connection server_1
FLUSH LOGS;
--source include/wait_for_binlog_checkpoint.inc
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
# Create some sentinel rows so that the rows inserted in parallel fall into
# separate gaps and do not cause gap lock conflicts.
......
......@@ -966,11 +966,17 @@ int Log_event::do_update_pos(rpl_group_info *rgi)
if (debug_not_change_ts_if_art_event == 1
&& is_artificial_event())
debug_not_change_ts_if_art_event= 0; );
rli->stmt_done(log_pos,
(is_artificial_event() &&
IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ?
0 : when),
thd, rgi);
/*
In parallel execution, delay position update for the events that are
not part of event groups (format description, rotate, and such) until
the actual event execution reaches that point.
*/
if (!rgi->is_parallel_exec || is_group_event(get_type_code()))
rli->stmt_done(log_pos,
(is_artificial_event() &&
IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ?
0 : when),
thd, rgi);
DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp",
if (debug_not_change_ts_if_art_event == 0)
debug_not_change_ts_if_art_event= 2; );
......
......@@ -56,6 +56,48 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
}
static void
handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
{
int cmp;
Relay_log_info *rli;
/*
Events that are not part of an event group, such as Format Description,
Stop, GTID List and such, are executed directly in the driver SQL thread,
to keep the relay log state up-to-date. But the associated position update
is done here, in sync with other normal events as they are queued to
worker threads.
*/
if ((thd->variables.option_bits & OPTION_BEGIN) &&
opt_using_transactions)
return;
rli= qev->rgi->rli;
mysql_mutex_lock(&rli->data_lock);
cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name);
if (cmp < 0)
{
rli->group_relay_log_pos= qev->future_event_relay_log_pos;
strmake_buf(rli->group_relay_log_name, qev->event_relay_log_name);
rli->notify_group_relay_log_name_update();
} else if (cmp == 0 &&
rli->group_relay_log_pos < qev->future_event_relay_log_pos)
rli->group_relay_log_pos= qev->future_event_relay_log_pos;
cmp= strcmp(rli->group_master_log_name, qev->future_event_master_log_name);
if (cmp < 0)
{
strcpy(rli->group_master_log_name, qev->future_event_master_log_name);
rli->notify_group_master_log_name_update();
rli->group_master_log_pos= qev->future_event_master_log_pos;
}
else if (cmp == 0
&& rli->group_master_log_pos < qev->future_event_master_log_pos)
rli->group_master_log_pos= qev->future_event_master_log_pos;
mysql_mutex_unlock(&rli->data_lock);
mysql_cond_broadcast(&rli->data_cond);
}
static bool
sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group)
{
......@@ -142,16 +184,24 @@ handle_rpl_parallel_thread(void *arg)
while (events)
{
struct rpl_parallel_thread::queued_event *next= events->next;
Log_event_type event_type= events->ev->get_type_code();
Log_event_type event_type;
rpl_group_info *rgi= events->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry;
uint64 wait_for_sub_id;
uint64 wait_start_sub_id;
bool end_of_group;
if (!events->ev)
{
handle_queued_pos_update(thd, events);
my_free(events);
events= next;
continue;
}
err= 0;
/* Handle a new event group, which will be initiated by a GTID event. */
if (event_type == GTID_EVENT)
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
{
in_event_group= true;
/*
......@@ -794,13 +844,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
e->last_commit_id= 0;
}
e->current_group_info= rgi;
qev->rgi= e->current_group_info= rgi;
e->current_sub_id= rgi->gtid_sub_id;
current= rgi->parallel_entry= e;
}
else if (!is_group_event || !current)
{
my_off_t log_pos;
int err;
bool tmp;
/*
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
Same for events not preceeded by GTID (we should not see those normally,
......@@ -824,11 +876,52 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
}
}
tmp= serial_rgi->is_parallel_exec;
serial_rgi->is_parallel_exec= true;
err= rpt_handle_event(qev, NULL);
serial_rgi->is_parallel_exec= tmp;
log_pos= qev->ev->log_pos;
delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev);
my_free(qev);
return (err != 0);
if (err)
{
my_free(qev);
return true;
}
qev->ev= NULL;
qev->future_event_master_log_pos= log_pos;
if (!current)
{
handle_queued_pos_update(rli->sql_driver_thd, qev);
my_free(qev);
return false;
}
/*
Queue an empty event, so that the position will be updated in a
reasonable way relative to other events:
- If the currently executing events are queued serially for a single
thread, the position will only be updated when everything before has
completed.
- If we are executing multiple independent events in parallel, then at
least the position will not be updated until one of them has reached
the current point.
*/
cur_thread= current->rpl_thread;
if (cur_thread)
{
mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
if (cur_thread->current_entry != current)
{
/* Not ours anymore, we need to grab a new one. */
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
cur_thread= NULL;
}
}
if (!cur_thread)
cur_thread= current->rpl_thread=
global_rpl_thread_pool.get_thread(current);
}
else
{
......@@ -848,8 +941,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
cur_thread= current->rpl_thread=
global_rpl_thread_pool.get_thread(current);
}
qev->rgi= current->current_group_info;
}
qev->rgi= current->current_group_info;
/*
Queue the event for processing.
......
......@@ -27,6 +27,7 @@ struct rpl_parallel_thread {
char event_relay_log_name[FN_REFLEN];
char future_event_master_log_name[FN_REFLEN];
ulonglong event_relay_log_pos;
my_off_t future_event_master_log_pos;
size_t event_size;
} *event_queue, *last_in_queue;
uint64 queued_size;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment