Commit ee8a8162 authored by unknown's avatar unknown

MDEV-4506: Parallel replication.

Implement --slave-parallel-max-queue to limit memory usage
of SQL thread read-ahead in the relay log.
parent 96a4f1f6
...@@ -794,6 +794,11 @@ The following options may be given as the first argument: ...@@ -794,6 +794,11 @@ The following options may be given as the first argument:
--slave-net-timeout=# --slave-net-timeout=#
Number of seconds to wait for more data from any Number of seconds to wait for more data from any
master/slave connection before aborting the read master/slave connection before aborting the read
--slave-parallel-max-queued=#
Limit on how much memory SQL threads should use per
parallel replication thread when reading ahead in the
relay log looking for opportunities for parallel
replication. Only used when --slave-parallel-threads > 0.
--slave-parallel-threads=# --slave-parallel-threads=#
If non-zero, number of threads to spawn to apply in If non-zero, number of threads to spawn to apply in
parallel events on the slave that were group-committed on parallel events on the slave that were group-committed on
...@@ -1148,6 +1153,7 @@ slave-compressed-protocol FALSE ...@@ -1148,6 +1153,7 @@ slave-compressed-protocol FALSE
slave-exec-mode STRICT slave-exec-mode STRICT
slave-max-allowed-packet 1073741824 slave-max-allowed-packet 1073741824
slave-net-timeout 3600 slave-net-timeout 3600
slave-parallel-max-queued 131072
slave-parallel-threads 0 slave-parallel-threads 0
slave-skip-errors (No default value) slave-skip-errors (No default value)
slave-sql-verify-checksum TRUE slave-sql-verify-checksum TRUE
......
SET @save_slave_parallel_max_queued= @@GLOBAL.slave_parallel_max_queued;
SELECT @@GLOBAL.slave_parallel_max_queued as 'Check default';
Check default
131072
SELECT @@SESSION.slave_parallel_max_queued as 'no session var';
ERROR HY000: Variable 'slave_parallel_max_queued' is a GLOBAL variable
SET GLOBAL slave_parallel_max_queued= 0;
SET GLOBAL slave_parallel_max_queued= DEFAULT;
SET GLOBAL slave_parallel_max_queued= 65536;
SELECT @@GLOBAL.slave_parallel_max_queued;
@@GLOBAL.slave_parallel_max_queued
65536
SET GLOBAL slave_parallel_max_queued = @save_slave_parallel_max_queued;
--source include/not_embedded.inc
SET @save_slave_parallel_max_queued= @@GLOBAL.slave_parallel_max_queued;
SELECT @@GLOBAL.slave_parallel_max_queued as 'Check default';
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
SELECT @@SESSION.slave_parallel_max_queued as 'no session var';
SET GLOBAL slave_parallel_max_queued= 0;
SET GLOBAL slave_parallel_max_queued= DEFAULT;
SET GLOBAL slave_parallel_max_queued= 65536;
SELECT @@GLOBAL.slave_parallel_max_queued;
SET GLOBAL slave_parallel_max_queued = @save_slave_parallel_max_queued;
...@@ -548,6 +548,7 @@ ulong stored_program_cache_size= 0; ...@@ -548,6 +548,7 @@ ulong stored_program_cache_size= 0;
ulong opt_slave_parallel_threads= 0; ulong opt_slave_parallel_threads= 0;
ulong opt_binlog_commit_wait_count= 0; ulong opt_binlog_commit_wait_count= 0;
ulong opt_binlog_commit_wait_usec= 0; ulong opt_binlog_commit_wait_usec= 0;
ulong opt_slave_parallel_max_queued= 131072;
const double log_10[] = { const double log_10[] = {
1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009, 1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009,
......
...@@ -177,6 +177,7 @@ extern ulong opt_binlog_rows_event_max_size; ...@@ -177,6 +177,7 @@ extern ulong opt_binlog_rows_event_max_size;
extern ulong rpl_recovery_rank, thread_cache_size; extern ulong rpl_recovery_rank, thread_cache_size;
extern ulong stored_program_cache_size; extern ulong stored_program_cache_size;
extern ulong opt_slave_parallel_threads; extern ulong opt_slave_parallel_threads;
extern ulong opt_slave_parallel_max_queued;
extern ulong opt_binlog_commit_wait_count; extern ulong opt_binlog_commit_wait_count;
extern ulong opt_binlog_commit_wait_usec; extern ulong opt_binlog_commit_wait_usec;
extern ulong back_log; extern ulong back_log;
......
...@@ -14,10 +14,6 @@ ...@@ -14,10 +14,6 @@
following transactions, so slave binlog position will be correct. following transactions, so slave binlog position will be correct.
And all the retry logic for temporary errors like deadlock. And all the retry logic for temporary errors like deadlock.
- We need some user-configurable limit on how far ahead the SQL thread will
fetch and queue events for parallel execution (otherwise if slave gets
behind we will fill up memory with pending malloc()'ed events).
- In GTID replication, we should not need to update master.info and - In GTID replication, we should not need to update master.info and
relay-log.info on disk at all except at slave thread stop. They are not relay-log.info on disk at all except at slave thread stop. They are not
used to know where to restart, the updates are not crash-safe, and it used to know where to restart, the updates are not crash-safe, and it
...@@ -32,6 +28,7 @@ ...@@ -32,6 +28,7 @@
crashes in the middle of writing the event group to the binlog. The crashes in the middle of writing the event group to the binlog. The
slave rolls back the transaction; parallel execution needs to be able slave rolls back the transaction; parallel execution needs to be able
to deal with this wrt. commit_orderer and such. to deal with this wrt. commit_orderer and such.
See Format_description_log_event::do_apply_event().
- Retry of failed transactions is not yet implemented for the parallel case. - Retry of failed transactions is not yet implemented for the parallel case.
*/ */
...@@ -147,8 +144,9 @@ handle_rpl_parallel_thread(void *arg) ...@@ -147,8 +144,9 @@ handle_rpl_parallel_thread(void *arg)
"Waiting for work from SQL thread"); "Waiting for work from SQL thread");
while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed)
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
rpt->event_queue= rpt->last_in_queue= NULL; rpt->dequeue(events);
thd->exit_cond(old_msg); thd->exit_cond(old_msg);
mysql_cond_signal(&rpt->COND_rpl_thread);
more_events: more_events:
while (events) while (events)
...@@ -286,7 +284,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -286,7 +284,7 @@ handle_rpl_parallel_thread(void *arg)
This is faster than having to wakeup the pool manager thread to give us This is faster than having to wakeup the pool manager thread to give us
a new event. a new event.
*/ */
rpt->event_queue= rpt->last_in_queue= NULL; rpt->dequeue(events);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
goto more_events; goto more_events;
} }
...@@ -619,7 +617,8 @@ rpl_parallel::wait_for_done() ...@@ -619,7 +617,8 @@ rpl_parallel::wait_for_done()
*/ */
bool bool
rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
ulonglong event_size)
{ {
rpl_parallel_entry *e; rpl_parallel_entry *e;
rpl_parallel_thread *cur_thread; rpl_parallel_thread *cur_thread;
...@@ -653,6 +652,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) ...@@ -653,6 +652,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
return true; return true;
} }
qev->ev= ev; qev->ev= ev;
qev->event_size= event_size;
qev->next= NULL; qev->next= NULL;
strcpy(qev->event_relay_log_name, rli->event_relay_log_name); strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
qev->event_relay_log_pos= rli->event_relay_log_pos; qev->event_relay_log_pos= rli->event_relay_log_pos;
...@@ -715,17 +715,33 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) ...@@ -715,17 +715,33 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
if (cur_thread) if (cur_thread)
{ {
mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
for (;;)
{
if (cur_thread->current_entry != e) if (cur_thread->current_entry != e)
{ {
/* /*
The worker thread became idle, and returned to the free list and The worker thread became idle, and returned to the free list and
possibly was allocated to a different request. This also means possibly was allocated to a different request. This also means
that everything previously queued has already been executed, else that everything previously queued has already been executed,
the worker thread would not have become idle. So we should else the worker thread would not have become idle. So we should
allocate a new worker thread. allocate a new worker thread.
*/ */
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
e->rpl_thread= cur_thread= NULL; e->rpl_thread= cur_thread= NULL;
break;
}
else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
break; // The thread is ready to queue into
else
{
/*
We have reached the limit of how much memory we are allowed to
use for queuing events, so wait for the thread to consume some
of its queue.
*/
mysql_cond_wait(&cur_thread->COND_rpl_thread,
&cur_thread->LOCK_rpl_thread);
}
} }
} }
...@@ -819,11 +835,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) ...@@ -819,11 +835,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
/* /*
Queue the event for processing. Queue the event for processing.
*/ */
if (cur_thread->last_in_queue) cur_thread->enqueue(qev);
cur_thread->last_in_queue->next= qev;
else
cur_thread->event_queue= qev;
cur_thread->last_in_queue= qev;
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
mysql_cond_signal(&cur_thread->COND_rpl_thread); mysql_cond_signal(&cur_thread->COND_rpl_thread);
......
...@@ -27,7 +27,29 @@ struct rpl_parallel_thread { ...@@ -27,7 +27,29 @@ struct rpl_parallel_thread {
char event_relay_log_name[FN_REFLEN]; char event_relay_log_name[FN_REFLEN];
char future_event_master_log_name[FN_REFLEN]; char future_event_master_log_name[FN_REFLEN];
ulonglong event_relay_log_pos; ulonglong event_relay_log_pos;
size_t event_size;
} *event_queue, *last_in_queue; } *event_queue, *last_in_queue;
uint64 queued_size;
void enqueue(queued_event *qev)
{
if (last_in_queue)
last_in_queue->next= qev;
else
event_queue= qev;
last_in_queue= qev;
queued_size+= qev->event_size;
}
void dequeue(queued_event *list)
{
queued_event *tmp;
DBUG_ASSERT(list == event_queue);
event_queue= last_in_queue= NULL;
for (tmp= list; tmp; tmp= tmp->next)
queued_size-= tmp->event_size;
}
}; };
...@@ -87,7 +109,8 @@ struct rpl_parallel { ...@@ -87,7 +109,8 @@ struct rpl_parallel {
void reset(); void reset();
rpl_parallel_entry *find(uint32 domain_id); rpl_parallel_entry *find(uint32 domain_id);
void wait_for_done(); void wait_for_done();
bool do_event(rpl_group_info *serial_rgi, Log_event *ev); bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
ulonglong event_size);
}; };
......
...@@ -156,7 +156,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, ...@@ -156,7 +156,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
bool suppress_warnings); bool suppress_warnings);
static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
bool reconnect, bool suppress_warnings); bool reconnect, bool suppress_warnings);
static Log_event* next_event(rpl_group_info* rgi); static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size);
static int queue_event(Master_info* mi,const char* buf,ulong event_len); static int queue_event(Master_info* mi,const char* buf,ulong event_len);
static int terminate_slave_thread(THD *thd, static int terminate_slave_thread(THD *thd,
mysql_mutex_t *term_lock, mysql_mutex_t *term_lock,
...@@ -3273,6 +3273,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) ...@@ -3273,6 +3273,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev)
static int exec_relay_log_event(THD* thd, Relay_log_info* rli, static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
rpl_group_info *serial_rgi) rpl_group_info *serial_rgi)
{ {
ulonglong event_size;
DBUG_ENTER("exec_relay_log_event"); DBUG_ENTER("exec_relay_log_event");
/* /*
...@@ -3282,7 +3283,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, ...@@ -3282,7 +3283,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
*/ */
mysql_mutex_lock(&rli->data_lock); mysql_mutex_lock(&rli->data_lock);
Log_event * ev = next_event(serial_rgi); Log_event *ev= next_event(serial_rgi, &event_size);
if (sql_slave_killed(serial_rgi)) if (sql_slave_killed(serial_rgi))
{ {
...@@ -3344,7 +3345,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, ...@@ -3344,7 +3345,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
*/ */
if (opt_slave_parallel_threads > 0 && rli->slave_skip_counter == 0) if (opt_slave_parallel_threads > 0 && rli->slave_skip_counter == 0)
DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev)); DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, event_size));
/* /*
For GTID, allocate a new sub_id for the given domain_id. For GTID, allocate a new sub_id for the given domain_id.
...@@ -5836,8 +5837,10 @@ static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg) ...@@ -5836,8 +5837,10 @@ static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
@return The event read, or NULL on error. If an error occurs, the @return The event read, or NULL on error. If an error occurs, the
error is reported through the sql_print_information() or error is reported through the sql_print_information() or
sql_print_error() functions. sql_print_error() functions.
The size of the read event (in bytes) is returned in *event_size.
*/ */
static Log_event* next_event(rpl_group_info *rgi) static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
{ {
Log_event* ev; Log_event* ev;
Relay_log_info *rli= rgi->rli; Relay_log_info *rli= rgi->rli;
...@@ -5848,6 +5851,7 @@ static Log_event* next_event(rpl_group_info *rgi) ...@@ -5848,6 +5851,7 @@ static Log_event* next_event(rpl_group_info *rgi)
DBUG_ENTER("next_event"); DBUG_ENTER("next_event");
DBUG_ASSERT(thd != 0 && thd == rli->sql_driver_thd); DBUG_ASSERT(thd != 0 && thd == rli->sql_driver_thd);
*event_size= 0;
#ifndef DBUG_OFF #ifndef DBUG_OFF
if (abort_slave_event_count && !rli->events_till_abort--) if (abort_slave_event_count && !rli->events_till_abort--)
...@@ -5932,11 +5936,13 @@ static Log_event* next_event(rpl_group_info *rgi) ...@@ -5932,11 +5936,13 @@ static Log_event* next_event(rpl_group_info *rgi)
opt_slave_sql_verify_checksum))) opt_slave_sql_verify_checksum)))
{ {
ulonglong old_pos= rli->future_event_relay_log_pos;
/* /*
read it while we have a lock, to avoid a mutex lock in read it while we have a lock, to avoid a mutex lock in
inc_event_relay_log_pos() inc_event_relay_log_pos()
*/ */
rli->future_event_relay_log_pos= my_b_tell(cur_log); rli->future_event_relay_log_pos= my_b_tell(cur_log);
*event_size= rli->future_event_relay_log_pos - old_pos;
if (hot_log) if (hot_log)
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
......
...@@ -1479,6 +1479,16 @@ static Sys_var_ulong Sys_slave_parallel_threads( ...@@ -1479,6 +1479,16 @@ static Sys_var_ulong Sys_slave_parallel_threads(
VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD,
NOT_IN_BINLOG, ON_CHECK(check_slave_parallel_threads), NOT_IN_BINLOG, ON_CHECK(check_slave_parallel_threads),
ON_UPDATE(fix_slave_parallel_threads)); ON_UPDATE(fix_slave_parallel_threads));
static Sys_var_ulong Sys_slave_parallel_max_queued(
"slave_parallel_max_queued",
"Limit on how much memory SQL threads should use per parallel "
"replication thread when reading ahead in the relay log looking for "
"opportunities for parallel replication. Only used when "
"--slave-parallel-threads > 0.",
GLOBAL_VAR(opt_slave_parallel_max_queued), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0,2147483647), DEFAULT(131072), BLOCK_SIZE(1));
#endif #endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment