Commit e654be38 authored by unknown's avatar unknown

MDEV-4506: Parallel replication: Intermediate commit.

Impement options --binlog-commit-wait-count and
--binlog-commit-wait-usec.

These options permit the DBA to deliberately increase latency
of an individual commit to get more transactions in each
binlog group commit. This increases the opportunity for
parallel replication on the slave, and can also decrease I/O
load on the master.

The options also make it easier to test the parallel
replication with mysql-test-run.
parent b5a496a7
--source include/have_binlog_format_statement.inc
--source include/have_xtradb.inc
connect (m1,127.0.0.1,root,,test,$MASTER_MYPORT,);
connect (m2,127.0.0.1,root,,test,$MASTER_MYPORT,);
connect (m3,127.0.0.1,root,,test,$MASTER_MYPORT,);
connect (m4,127.0.0.1,root,,test,$MASTER_MYPORT,);
connect (s1,127.0.0.1,root,,test,$SLAVE_MYPORT,);
connect (s2,127.0.0.1,root,,test,$SLAVE_MYPORT,);
connect (s3,127.0.0.1,root,,test,$SLAVE_MYPORT,);
connect (s4,127.0.0.1,root,,test,$SLAVE_MYPORT,);
--connection m1
SELECT @@server_id;
SET sql_log_bin=0;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET sql_log_bin=1;
SET @old_count= @@GLOBAL.binlog_commit_wait_count;
SET @old_usec= @@GLOBAL.binlog_commit_wait_usec;
SET GLOBAL binlog_commit_wait_usec = 30*1000000;
--connection s1
SELECT @@server_id;
SET sql_log_bin=0;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET sql_log_bin=1;
--replace_result $MASTER_MYPORT MASTER_PORT
eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT,
master_user='root', master_use_gtid=current_pos;
--connection m1
SET GLOBAL binlog_commit_wait_count = 4;
send INSERT INTO t1 VALUES (1);
--connection m2
send INSERT INTO t1 VALUES (2);
--connection m3
send INSERT INTO t1 VALUES (3);
--connection m4
INSERT INTO t1 VALUES (4);
--connection m1
reap;
--connection m2
reap;
--connection m3
reap;
--connection m1
SHOW BINLOG EVENTS;
--connection s1
--source include/start_slave.inc
SELECT * FROM t1;
--source include/stop_slave.inc
SELECT * FROM t1;
--connection m1
SET sql_log_bin=0;
DROP TABLE t1;
SET sql_log_bin=1;
SET GLOBAL binlog_commit_wait_count= @old_count;
SET GLOBAL binlog_commit_wait_usec= @old_usec;
--connection s1
RESET SLAVE ALL;
SET sql_log_bin=0;
DROP TABLE t1;
SET sql_log_bin=1;
...@@ -88,6 +88,7 @@ ulong opt_binlog_dbug_fsync_sleep= 0; ...@@ -88,6 +88,7 @@ ulong opt_binlog_dbug_fsync_sleep= 0;
#endif #endif
mysql_mutex_t LOCK_prepare_ordered; mysql_mutex_t LOCK_prepare_ordered;
mysql_cond_t COND_prepare_ordered;
mysql_mutex_t LOCK_commit_ordered; mysql_mutex_t LOCK_commit_ordered;
static ulonglong binlog_status_var_num_commits; static ulonglong binlog_status_var_num_commits;
...@@ -6679,6 +6680,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry, ...@@ -6679,6 +6680,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
} }
} }
if (opt_binlog_commit_wait_count > 0)
mysql_cond_signal(&COND_prepare_ordered);
mysql_mutex_unlock(&LOCK_prepare_ordered); mysql_mutex_unlock(&LOCK_prepare_ordered);
DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered"); DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered");
...@@ -6840,6 +6843,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) ...@@ -6840,6 +6843,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
binlog_id= current_binlog_id; binlog_id= current_binlog_id;
mysql_mutex_lock(&LOCK_prepare_ordered); mysql_mutex_lock(&LOCK_prepare_ordered);
if (opt_binlog_commit_wait_count)
wait_for_sufficient_commits();
current= group_commit_queue; current= group_commit_queue;
group_commit_queue= NULL; group_commit_queue= NULL;
mysql_mutex_unlock(&LOCK_prepare_ordered); mysql_mutex_unlock(&LOCK_prepare_ordered);
...@@ -7135,6 +7140,48 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, ...@@ -7135,6 +7140,48 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
return 0; return 0;
} }
void
MYSQL_BIN_LOG::wait_for_sufficient_commits()
{
size_t count;
group_commit_entry *e;
group_commit_entry *last_head;
struct timespec wait_until;
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_prepare_ordered);
count= 0;
for (e= last_head= group_commit_queue; e; e= e->next)
++count;
if (count >= opt_binlog_commit_wait_count)
return;
mysql_mutex_unlock(&LOCK_log);
set_timespec_nsec(wait_until, (ulonglong)1000*opt_binlog_commit_wait_usec);
for (;;)
{
int err;
group_commit_entry *head;
err= mysql_cond_timedwait(&COND_prepare_ordered, &LOCK_prepare_ordered,
&wait_until);
if (err == ETIMEDOUT)
break;
head= group_commit_queue;
for (e= head; e && e != last_head; e= e->next)
++count;
if (count >= opt_binlog_commit_wait_count)
break;
last_head= head;
}
mysql_mutex_lock(&LOCK_log);
}
/** /**
Wait until we get a signal that the relay log has been updated. Wait until we get a signal that the relay log has been updated.
......
...@@ -85,9 +85,11 @@ protected: ...@@ -85,9 +85,11 @@ protected:
prepare_ordered() or commit_ordered() methods. prepare_ordered() or commit_ordered() methods.
*/ */
extern mysql_mutex_t LOCK_prepare_ordered; extern mysql_mutex_t LOCK_prepare_ordered;
extern mysql_cond_t COND_prepare_ordered;
extern mysql_mutex_t LOCK_commit_ordered; extern mysql_mutex_t LOCK_commit_ordered;
#ifdef HAVE_PSI_INTERFACE #ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered; extern PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
extern PSI_cond_key key_COND_prepare_ordered;
#endif #endif
class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging
...@@ -685,6 +687,7 @@ public: ...@@ -685,6 +687,7 @@ public:
} }
void set_max_size(ulong max_size_arg); void set_max_size(ulong max_size_arg);
void signal_update(); void signal_update();
void wait_for_sufficient_commits();
void wait_for_update_relay_log(THD* thd); void wait_for_update_relay_log(THD* thd);
int wait_for_update_bin_log(THD* thd, const struct timespec * timeout); int wait_for_update_bin_log(THD* thd, const struct timespec * timeout);
void init(ulong max_size); void init(ulong max_size);
......
...@@ -544,6 +544,8 @@ ulong rpl_recovery_rank=0; ...@@ -544,6 +544,8 @@ ulong rpl_recovery_rank=0;
ulong stored_program_cache_size= 0; ulong stored_program_cache_size= 0;
ulong opt_slave_parallel_threads= 0; ulong opt_slave_parallel_threads= 0;
ulong opt_binlog_commit_wait_count= 0;
ulong opt_binlog_commit_wait_usec= 0;
const double log_10[] = { const double log_10[] = {
1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009, 1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009,
...@@ -895,7 +897,7 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, ...@@ -895,7 +897,7 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
PSI_cond_key key_RELAYLOG_COND_queue_busy; PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool, PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry; key_COND_parallel_entry, key_COND_prepare_ordered;
static PSI_cond_info all_server_conds[]= static PSI_cond_info all_server_conds[]=
{ {
...@@ -940,7 +942,8 @@ static PSI_cond_info all_server_conds[]= ...@@ -940,7 +942,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL}, { &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL},
{ &key_COND_rpl_thread, "COND_rpl_thread", 0}, { &key_COND_rpl_thread, "COND_rpl_thread", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0}, { &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0} { &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0}
}; };
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
...@@ -2017,6 +2020,7 @@ static void clean_up_mutexes() ...@@ -2017,6 +2020,7 @@ static void clean_up_mutexes()
mysql_mutex_destroy(&LOCK_server_started); mysql_mutex_destroy(&LOCK_server_started);
mysql_cond_destroy(&COND_server_started); mysql_cond_destroy(&COND_server_started);
mysql_mutex_destroy(&LOCK_prepare_ordered); mysql_mutex_destroy(&LOCK_prepare_ordered);
mysql_cond_destroy(&COND_prepare_ordered);
mysql_mutex_destroy(&LOCK_commit_ordered); mysql_mutex_destroy(&LOCK_commit_ordered);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -4117,6 +4121,7 @@ static int init_thread_environment() ...@@ -4117,6 +4121,7 @@ static int init_thread_environment()
&LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW); &LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered, mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered,
MY_MUTEX_INIT_SLOW); MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_prepare_ordered, &COND_prepare_ordered, NULL);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered, mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
MY_MUTEX_INIT_SLOW); MY_MUTEX_INIT_SLOW);
......
...@@ -177,6 +177,8 @@ extern ulong opt_binlog_rows_event_max_size; ...@@ -177,6 +177,8 @@ extern ulong opt_binlog_rows_event_max_size;
extern ulong rpl_recovery_rank, thread_cache_size; extern ulong rpl_recovery_rank, thread_cache_size;
extern ulong stored_program_cache_size; extern ulong stored_program_cache_size;
extern ulong opt_slave_parallel_threads; extern ulong opt_slave_parallel_threads;
extern ulong opt_binlog_commit_wait_count;
extern ulong opt_binlog_commit_wait_usec;
extern ulong back_log; extern ulong back_log;
extern ulong executed_events; extern ulong executed_events;
extern char language[FN_REFLEN]; extern char language[FN_REFLEN];
......
...@@ -210,8 +210,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -210,8 +210,7 @@ handle_rpl_parallel_thread(void *arg)
if (entry->last_committed_sub_id < event_gtid_sub_id) if (entry->last_committed_sub_id < event_gtid_sub_id)
{ {
entry->last_committed_sub_id= event_gtid_sub_id; entry->last_committed_sub_id= event_gtid_sub_id;
if (entry->need_signal) mysql_cond_broadcast(&entry->COND_parallel_entry);
mysql_cond_broadcast(&entry->COND_parallel_entry);
} }
mysql_mutex_unlock(&entry->LOCK_parallel_entry); mysql_mutex_unlock(&entry->LOCK_parallel_entry);
......
...@@ -50,7 +50,6 @@ struct rpl_parallel_entry { ...@@ -50,7 +50,6 @@ struct rpl_parallel_entry {
uint64 last_seq_no; uint64 last_seq_no;
uint64 last_commit_id; uint64 last_commit_id;
bool active; bool active;
bool need_signal;
rpl_parallel_thread *rpl_thread; rpl_parallel_thread *rpl_thread;
/* /*
The sub_id of the last transaction to commit within this domain_id. The sub_id of the last transaction to commit within this domain_id.
......
...@@ -1483,6 +1483,26 @@ static Sys_var_ulong Sys_slave_parallel_threads( ...@@ -1483,6 +1483,26 @@ static Sys_var_ulong Sys_slave_parallel_threads(
#endif #endif
static Sys_var_ulong Sys_binlog_commit_wait_count(
"binlog_commit_wait_count",
"If non-zero, binlog write will wait at most binlog_commit_wait_usec "
"microseconds for at least this many commits to queue up for group "
"commit to the binlog. This can reduce I/O on the binlog and provide "
"increased opportunity for parallel apply on the slave, but too high "
"a value will decrease commit throughput.",
GLOBAL_VAR(opt_binlog_commit_wait_count), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1));
static Sys_var_ulong Sys_binlog_commit_wait_usec(
"binlog_commit_wait_usec",
"Maximum time, in microseconds, to wait for more commits to queue up "
" for binlog group commit. Only takes effect if the value of "
"binlog_commit_wait_count is non-zero.",
GLOBAL_VAR(opt_binlog_commit_wait_usec), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX), DEFAULT(100000), BLOCK_SIZE(1));
static bool fix_max_join_size(sys_var *self, THD *thd, enum_var_type type) static bool fix_max_join_size(sys_var *self, THD *thd, enum_var_type type)
{ {
SV *sv= type == OPT_GLOBAL ? &global_system_variables : &thd->variables; SV *sv= type == OPT_GLOBAL ? &global_system_variables : &thd->variables;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment