Commit 641feed4 authored by unknown's avatar unknown

MDEV-5764: START SLAVE UNTIL does not work with parallel replication

With parallel replication, there can be any number of events queued on
in-memory lists in the worker threads.

For normal STOP SLAVE, we want to skip executing any remaining events on those
lists and stop as quickly as possible.

However, for START SLAVE UNTIL, when the UNTIL position is reached in the SQL
driver thread, we must _not_ stop until all already queued events for the
workers have been executed - otherwise we would stop too early, before the
actual UNTIL position had been completely reached.

The code did not handle UNTIL correctly, stopping too early due to not
executing the queued events to completion. Fix this, and also implement that
an explicit STOP SLAVE in the middle (when the SQL driver thread has reached
the UNTIL position but the workers have not) _will_ cause an immediate stop.
parent e90f68c0
...@@ -6649,7 +6649,7 @@ Gtid_list_log_event::write(IO_CACHE *file) ...@@ -6649,7 +6649,7 @@ Gtid_list_log_event::write(IO_CACHE *file)
int int
Gtid_list_log_event::do_apply_event(rpl_group_info *rgi) Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
{ {
Relay_log_info const *rli= rgi->rli; Relay_log_info *rli= const_cast<Relay_log_info*>(rgi->rli);
int ret; int ret;
if (gl_flags & FLAG_IGN_GTIDS) if (gl_flags & FLAG_IGN_GTIDS)
{ {
...@@ -6669,10 +6669,11 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -6669,10 +6669,11 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
{ {
char str_buf[128]; char str_buf[128];
String str(str_buf, sizeof(str_buf), system_charset_info); String str(str_buf, sizeof(str_buf), system_charset_info);
const_cast<Relay_log_info*>(rli)->until_gtid_pos.to_string(&str); rli->until_gtid_pos.to_string(&str);
sql_print_information("Slave SQL thread stops because it reached its" sql_print_information("Slave SQL thread stops because it reached its"
" UNTIL master_gtid_pos %s", str.c_ptr_safe()); " UNTIL master_gtid_pos %s", str.c_ptr_safe());
const_cast<Relay_log_info*>(rli)->abort_slave= true; rli->abort_slave= true;
rli->stop_for_until= true;
} }
return ret; return ret;
} }
......
...@@ -173,6 +173,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi) ...@@ -173,6 +173,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
rgi->is_error= true; rgi->is_error= true;
rgi->cleanup_context(thd, true); rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true; rgi->rli->abort_slave= true;
rgi->rli->stop_for_until= false;
mysql_mutex_lock(rgi->rli->relay_log.get_log_lock()); mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock()); mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
rgi->rli->relay_log.signal_update(); rgi->rli->relay_log.signal_update();
...@@ -1122,7 +1123,7 @@ rpl_parallel::find(uint32 domain_id) ...@@ -1122,7 +1123,7 @@ rpl_parallel::find(uint32 domain_id)
void void
rpl_parallel::wait_for_done(THD *thd) rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
{ {
struct rpl_parallel_entry *e; struct rpl_parallel_entry *e;
rpl_parallel_thread *rpt; rpl_parallel_thread *rpt;
...@@ -1152,9 +1153,13 @@ rpl_parallel::wait_for_done(THD *thd) ...@@ -1152,9 +1153,13 @@ rpl_parallel::wait_for_done(THD *thd)
started executing yet. So we set e->stop_count here and use it to started executing yet. So we set e->stop_count here and use it to
decide in the worker threads whether to continue executing an event decide in the worker threads whether to continue executing an event
group or whether to skip it, when force_abort is set. group or whether to skip it, when force_abort is set.
If we stop due to reaching the START SLAVE UNTIL condition, then we
need to continue executing any queued events up to that point.
*/ */
e->force_abort= true; e->force_abort= true;
e->stop_count= e->count_committing_event_groups; e->stop_count= rli->stop_for_until ?
e->count_queued_event_groups : e->count_committing_event_groups;
mysql_mutex_unlock(&e->LOCK_parallel_entry); mysql_mutex_unlock(&e->LOCK_parallel_entry);
for (j= 0; j < e->rpl_thread_max; ++j) for (j= 0; j < e->rpl_thread_max; ++j)
{ {
...@@ -1190,6 +1195,30 @@ rpl_parallel::wait_for_done(THD *thd) ...@@ -1190,6 +1195,30 @@ rpl_parallel::wait_for_done(THD *thd)
} }
/*
This function handles the case where the SQL driver thread reached the
START SLAVE UNTIL position; we stop queueing more events but continue
processing remaining, already queued events; then use executes manual
STOP SLAVE; then this function signals to worker threads that they
should stop the processing of any remaining queued events.
*/
void
rpl_parallel::stop_during_until()
{
struct rpl_parallel_entry *e;
uint32 i;
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry);
if (e->force_abort)
e->stop_count= e->count_committing_event_groups;
mysql_mutex_unlock(&e->LOCK_parallel_entry);
}
}
bool bool
rpl_parallel::workers_idle() rpl_parallel::workers_idle()
{ {
......
...@@ -222,7 +222,8 @@ struct rpl_parallel { ...@@ -222,7 +222,8 @@ struct rpl_parallel {
~rpl_parallel(); ~rpl_parallel();
void reset(); void reset();
rpl_parallel_entry *find(uint32 domain_id); rpl_parallel_entry *find(uint32 domain_id);
void wait_for_done(THD *thd); void wait_for_done(THD *thd, Relay_log_info *rli);
void stop_during_until();
bool workers_idle(); bool workers_idle();
bool do_event(rpl_group_info *serial_rgi, Log_event *ev, bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
ulonglong event_size); ulonglong event_size);
......
...@@ -60,7 +60,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) ...@@ -60,7 +60,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), inited(0), abort_slave(0), stop_for_until(0),
slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0), until_log_pos(0), retried_trans(0), executed_entries(0),
m_flags(0) m_flags(0)
{ {
......
...@@ -262,6 +262,7 @@ class Relay_log_info : public Slave_reporting_capability ...@@ -262,6 +262,7 @@ class Relay_log_info : public Slave_reporting_capability
*/ */
volatile bool inited; volatile bool inited;
volatile bool abort_slave; volatile bool abort_slave;
volatile bool stop_for_until;
volatile uint slave_running; volatile uint slave_running;
/* /*
......
...@@ -615,6 +615,13 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) ...@@ -615,6 +615,13 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
{ {
DBUG_PRINT("info",("Terminating SQL thread")); DBUG_PRINT("info",("Terminating SQL thread"));
if (opt_slave_parallel_threads > 0 &&
mi->rli.abort_slave && mi->rli.stop_for_until)
{
mi->rli.stop_for_until= false;
mi->rli.parallel.stop_during_until();
}
else
mi->rli.abort_slave=1; mi->rli.abort_slave=1;
if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock, if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock,
&mi->rli.stop_cond, &mi->rli.stop_cond,
...@@ -3414,6 +3421,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, ...@@ -3414,6 +3421,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
message about error in query execution to be printed. message about error in query execution to be printed.
*/ */
rli->abort_slave= 1; rli->abort_slave= 1;
rli->stop_for_until= true;
mysql_mutex_unlock(&rli->data_lock); mysql_mutex_unlock(&rli->data_lock);
delete ev; delete ev;
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -4356,6 +4364,7 @@ pthread_handler_t handle_slave_sql(void *arg) ...@@ -4356,6 +4364,7 @@ pthread_handler_t handle_slave_sql(void *arg)
Seconds_Behind_Master grows. No big deal. Seconds_Behind_Master grows. No big deal.
*/ */
rli->abort_slave = 0; rli->abort_slave = 0;
rli->stop_for_until= false;
mysql_mutex_unlock(&rli->run_lock); mysql_mutex_unlock(&rli->run_lock);
mysql_cond_broadcast(&rli->start_cond); mysql_cond_broadcast(&rli->start_cond);
...@@ -4526,7 +4535,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, ...@@ -4526,7 +4535,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
} }
if (opt_slave_parallel_threads > 0) if (opt_slave_parallel_threads > 0)
rli->parallel.wait_for_done(thd); rli->parallel.wait_for_done(thd, rli);
/* Thread stopped. Print the current replication position to the log */ /* Thread stopped. Print the current replication position to the log */
{ {
...@@ -4552,7 +4561,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, ...@@ -4552,7 +4561,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
get the correct position printed.) get the correct position printed.)
*/ */
if (opt_slave_parallel_threads > 0) if (opt_slave_parallel_threads > 0)
rli->parallel.wait_for_done(thd); rli->parallel.wait_for_done(thd, rli);
/* /*
Some events set some playgrounds, which won't be cleared because thread Some events set some playgrounds, which won't be cleared because thread
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment