Commit 4e6606ac authored by unknown's avatar unknown

MDEV-4984: Implement MASTER_GTID_WAIT() and @@LAST_GTID.

MASTER_GTID_WAIT() is similar to MASTER_POS_WAIT(), but works with a
GTID position rather than an old-style filename/offset.

@@LAST_GTID gives the GTID assigned to the last transaction written
into the binlog.

Together, the two can be used by applications to obtain the GTID of
an update on the master, and then do a MASTER_GTID_WAIT() for that
position on any read slave where it is important to get results that
are caught up with the master at least to the point of the update.

The implementation of MASTER_GTID_WAIT() is implemented in a way
that tries to minimise the performance impact on the SQL threads,
even in the presense of many waiters on single GTID positions (as
from @@LAST_GTID).
parent ce02738d
......@@ -915,6 +915,7 @@ static COMMANDS commands[] = {
{ "MAKE_SET", 0, 0, 0, ""},
{ "MAKEDATE", 0, 0, 0, ""},
{ "MAKETIME", 0, 0, 0, ""},
{ "MASTER_GTID_WAIT", 0, 0, 0, ""},
{ "MASTER_POS_WAIT", 0, 0, 0, ""},
{ "MAX", 0, 0, 0, ""},
{ "MBRCONTAINS", 0, 0, 0, ""},
......
......@@ -51,6 +51,7 @@ typedef struct st_queue {
#define queue_first_element(queue) 1
#define queue_last_element(queue) (queue)->elements
#define queue_empty(queue) ((queue)->elements == 0)
#define queue_top(queue) ((queue)->root[1])
#define queue_element(queue,index) ((queue)->root[index])
#define queue_end(queue) ((queue)->root[(queue)->elements])
......
# ==== Purpose ====
#
# Wait until the slave has reached a certain GTID position.
# Similar to --sync_with_master, but using GTID instead of old-style
# binlog file/offset coordinates.
#
#
# ==== Usage ====
#
# --let $master_pos= `SELECT @@GLOBAL.gtid_binlog_pos`
# [--let $slave_timeout= NUMBER]
# [--let $rpl_debug= 1]
# --source include/sync_with_master_gtid.inc
#
# Syncs slave to the specified GTID position.
#
# Must be called on the slave.
#
# Parameters:
# $master_pos
# The GTID position to sync to. Typically obtained from
# @@GLOBAL.gtid_binlog_pos on the master.
#
# $slave_timeout
# Timeout in seconds. The default is 2 minutes.
#
# $rpl_debug
# See include/rpl_init.inc
--let $include_filename= sync_with_master_gtid.inc
--source include/begin_include_file.inc
let $_slave_timeout= $slave_timeout;
if (!$_slave_timeout)
{
let $_slave_timeout= 120;
}
--let $_result= `SELECT master_gtid_wait('$master_pos', $_slave_timeout)`
if ($_result == -1)
{
--let $_current_gtid_pos= `SELECT @@GLOBAL.gtid_slave_pos`
--die Timeout in master_gtid_wait('$master_pos', $_slave_timeout), current slave GTID position is: $_current_gtid_pos.
}
--let $include_filename= sync_with_master_gtid.inc
--source include/end_include_file.inc
......@@ -37,6 +37,7 @@ wait/synch/mutex/mysys/THR_LOCK_threads
wait/synch/mutex/mysys/TMPDIR_mutex
wait/synch/mutex/sql/Cversion_lock
wait/synch/mutex/sql/Event_scheduler::LOCK_scheduler_state
wait/synch/mutex/sql/gtid_waiting::LOCK_gtid_waiting
wait/synch/mutex/sql/hash_filo::lock
wait/synch/mutex/sql/LOCK_active_mi
wait/synch/mutex/sql/LOCK_audit_mask
......
......@@ -7,13 +7,13 @@ NAME ENABLED TIMED
wait/synch/mutex/sql/Cversion_lock YES YES
wait/synch/mutex/sql/Delayed_insert::mutex YES YES
wait/synch/mutex/sql/Event_scheduler::LOCK_scheduler_state YES YES
wait/synch/mutex/sql/gtid_waiting::LOCK_gtid_waiting YES YES
wait/synch/mutex/sql/hash_filo::lock YES YES
wait/synch/mutex/sql/HA_DATA_PARTITION::LOCK_auto_inc YES YES
wait/synch/mutex/sql/LOCK_active_mi YES YES
wait/synch/mutex/sql/LOCK_audit_mask YES YES
wait/synch/mutex/sql/LOCK_binlog_state YES YES
wait/synch/mutex/sql/LOCK_commit_ordered YES YES
wait/synch/mutex/sql/LOCK_connection_count YES YES
select * from performance_schema.setup_instruments
where name like 'Wait/Synch/Rwlock/sql/%'
and name not in ('wait/synch/rwlock/sql/CRYPTO_dynlock_value::lock')
......
......@@ -197,9 +197,119 @@ CREATE TABLE t1 (a INT PRIMARY KEY);
SET gtid_seq_no=100;
INSERT INTO t1 VALUES (1);
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1;
a
1
Gtid_IO_Pos = '0-1-100'
*** Test @@LAST_GTID and MASTER_GTID_WAIT() ***
DROP TABLE t1;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
include/stop_slave.inc
SELECT @@last_gtid;
@@last_gtid
SET gtid_seq_no=110;
SELECT @@last_gtid;
@@last_gtid
BEGIN;
SELECT @@last_gtid;
@@last_gtid
INSERT INTO t1 VALUES (2);
SELECT @@last_gtid;
@@last_gtid
COMMIT;
SELECT @@last_gtid;
@@last_gtid
0-1-110
SET @pos= '0-1-110';
SELECT master_gtid_wait(NULL);
master_gtid_wait(NULL)
NULL
SELECT master_gtid_wait('', NULL);
master_gtid_wait('', NULL)
0
SELECT master_gtid_wait(@pos, 0.5);
master_gtid_wait(@pos, 0.5)
-1
SELECT * FROM t1 ORDER BY a;
a
SELECT master_gtid_wait(@pos);
include/start_slave.inc
master_gtid_wait(@pos)
0
SELECT * FROM t1 ORDER BY a;
a
2
include/stop_slave.inc
SET gtid_domain_id= 1;
INSERT INTO t1 VALUES (3);
SET @pos= '1-1-1,0-1-110';
SELECT master_gtid_wait(@pos, 0);
master_gtid_wait(@pos, 0)
-1
SELECT * FROM t1 WHERE a >= 3;
a
SELECT master_gtid_wait(@pos, -1);
include/start_slave.inc
master_gtid_wait(@pos, -1)
0
SELECT * FROM t1 WHERE a >= 3;
a
3
SELECT master_gtid_wait('1-1-1', 0);
master_gtid_wait('1-1-1', 0)
0
SELECT master_gtid_wait('2-1-1,1-1-4,0-1-110');
SELECT master_gtid_wait('0-1-1000', 0.5);
SELECT master_gtid_wait('0-1-2000');
SELECT master_gtid_wait('2-1-10');
SELECT master_gtid_wait('2-1-5', 1);
SELECT master_gtid_wait('2-1-5');
SELECT master_gtid_wait('2-1-10');
SELECT master_gtid_wait('2-1-5,1-1-4,0-1-110');
SELECT master_gtid_wait('2-1-2');
SELECT master_gtid_wait('1-1-1');
master_gtid_wait('1-1-1')
0
SELECT master_gtid_wait('0-1-109');
SELECT master_gtid_wait('2-1-2', 0.5);
master_gtid_wait('2-1-2', 0.5)
-1
KILL QUERY 22;
ERROR 70100: Query execution was interrupted
SET gtid_domain_id=2;
SET gtid_seq_no=2;
INSERT INTO t1 VALUES (4);
master_gtid_wait('2-1-2')
0
KILL CONNECTION 25;
ERROR HY000: Lost connection to MySQL server during query
SET gtid_domain_id=1;
SET gtid_seq_no=4;
INSERT INTO t1 VALUES (5);
SET gtid_domain_id=2;
SET gtid_seq_no=5;
INSERT INTO t1 VALUES (6);
master_gtid_wait('2-1-5,1-1-4,0-1-110')
0
master_gtid_wait('2-1-1,1-1-4,0-1-110')
0
master_gtid_wait('0-1-1000', 0.5)
-1
master_gtid_wait('2-1-5', 1)
0
master_gtid_wait('0-1-109')
0
SET gtid_domain_id=2;
SET gtid_seq_no=10;
INSERT INTO t1 VALUES (7);
master_gtid_wait('2-1-10')
0
master_gtid_wait('2-1-10')
0
DROP TABLE t1;
include/rpl_end.inc
......@@ -199,14 +199,174 @@ INSERT INTO t1 VALUES (1);
# We cannot just use sync_with_master as we've done RESET MASTER, so
# slave old-style position is wrong.
# So sync on gtid position instead.
--let $wait_condition= SELECT @@GLOBAL.gtid_binlog_pos = '$master_pos'
--source include/wait_condition.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1;
# Check that the IO gtid position in SHOW SLAVE STATUS is also correct.
--let $status_items= Gtid_IO_Pos
--source include/show_slave_status.inc
--echo *** Test @@LAST_GTID and MASTER_GTID_WAIT() ***
--connection server_1
DROP TABLE t1;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
--save_master_pos
--connection server_2
--sync_with_master
--source include/stop_slave.inc
--connect (m1,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
SELECT @@last_gtid;
SET gtid_seq_no=110;
SELECT @@last_gtid;
BEGIN;
SELECT @@last_gtid;
INSERT INTO t1 VALUES (2);
SELECT @@last_gtid;
COMMIT;
SELECT @@last_gtid;
--let $pos= `SELECT @@gtid_binlog_pos`
--connect (s1,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
eval SET @pos= '$pos';
# Check NULL argument.
SELECT master_gtid_wait(NULL);
# Check empty argument returns immediately.
SELECT master_gtid_wait('', NULL);
# Let's check that we get a timeout
SELECT master_gtid_wait(@pos, 0.5);
SELECT * FROM t1 ORDER BY a;
# Now actually wait until the slave reaches the position
send SELECT master_gtid_wait(@pos);
--connection server_2
--source include/start_slave.inc
--connection s1
reap;
SELECT * FROM t1 ORDER BY a;
# Test waiting on a domain that does not exist yet.
--source include/stop_slave.inc
--connection server_1
SET gtid_domain_id= 1;
INSERT INTO t1 VALUES (3);
--let $pos= `SELECT @@gtid_binlog_pos`
--connection s1
eval SET @pos= '$pos';
SELECT master_gtid_wait(@pos, 0);
SELECT * FROM t1 WHERE a >= 3;
send SELECT master_gtid_wait(@pos, -1);
--connection server_2
--source include/start_slave.inc
--connection s1
reap;
SELECT * FROM t1 WHERE a >= 3;
# Waiting for only part of the position.
SELECT master_gtid_wait('1-1-1', 0);
# Now test a lot of parallel master_gtid_wait() calls, completing in different
# order, and some of which time out or get killed on the way.
--connection s1
send SELECT master_gtid_wait('2-1-1,1-1-4,0-1-110');
--connect (s2,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
# This will time out.
send SELECT master_gtid_wait('0-1-1000', 0.5);
--connect (s3,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
# This one we will kill
--let $kill1_id= `SELECT connection_id()`
send SELECT master_gtid_wait('0-1-2000');
--connect (s4,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
send SELECT master_gtid_wait('2-1-10');
--connect (s5,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
send SELECT master_gtid_wait('2-1-5', 1);
# This one we will kill also.
--connect (s6,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
--let $kill2_id= `SELECT connection_id()`
send SELECT master_gtid_wait('2-1-5');
--connect (s7,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
send SELECT master_gtid_wait('2-1-10');
--connect (s8,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
send SELECT master_gtid_wait('2-1-5,1-1-4,0-1-110');
--connect (s9,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
send SELECT master_gtid_wait('2-1-2');
--connection server_2
# This one completes immediately.
SELECT master_gtid_wait('1-1-1');
--connect (s10,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
send SELECT master_gtid_wait('0-1-109');
--connection server_2
# This one should time out.
SELECT master_gtid_wait('2-1-2', 0.5);
eval KILL QUERY $kill1_id;
--connection s3
--error ER_QUERY_INTERRUPTED
reap;
--connection server_1
SET gtid_domain_id=2;
SET gtid_seq_no=2;
INSERT INTO t1 VALUES (4);
--connection s9
reap;
--connection server_2
eval KILL CONNECTION $kill2_id;
--connection s6
--error 2013
reap;
--connection server_1
SET gtid_domain_id=1;
SET gtid_seq_no=4;
INSERT INTO t1 VALUES (5);
SET gtid_domain_id=2;
SET gtid_seq_no=5;
INSERT INTO t1 VALUES (6);
--connection s8
reap;
--connection s1
reap;
--connection s2
reap;
--connection s5
reap;
--connection s10
reap;
--connection server_1
SET gtid_domain_id=2;
SET gtid_seq_no=10;
INSERT INTO t1 VALUES (7);
--connection s4
reap;
--connection s7
reap;
--connection server_1
DROP TABLE t1;
......
SELECT @@global.last_gtid;
ERROR HY000: Variable 'last_gtid' is a SESSION variable
SET GLOBAL last_gtid= 10;
ERROR HY000: Variable 'last_gtid' is a read only variable
SET SESSION last_gtid= 20;
ERROR HY000: Variable 'last_gtid' is a read only variable
SELECT @@session.last_gtid;
@@session.last_gtid
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
SELECT @@global.last_gtid;
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
SET GLOBAL last_gtid= 10;
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
SET SESSION last_gtid= 20;
SELECT @@session.last_gtid;
......@@ -1783,6 +1783,19 @@ class Create_func_master_pos_wait : public Create_native_func
};
class Create_func_master_gtid_wait : public Create_native_func
{
public:
virtual Item *create_native(THD *thd, LEX_STRING name, List<Item> *item_list);
static Create_func_master_gtid_wait s_singleton;
protected:
Create_func_master_gtid_wait() {}
virtual ~Create_func_master_gtid_wait() {}
};
class Create_func_md5 : public Create_func_arg1
{
public:
......@@ -4590,6 +4603,47 @@ Create_func_master_pos_wait::create_native(THD *thd, LEX_STRING name,
}
Create_func_master_gtid_wait Create_func_master_gtid_wait::s_singleton;
Item*
Create_func_master_gtid_wait::create_native(THD *thd, LEX_STRING name,
List<Item> *item_list)
{
Item *func= NULL;
int arg_count= 0;
thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
if (item_list != NULL)
arg_count= item_list->elements;
if (arg_count < 1 || arg_count > 2)
{
my_error(ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT, MYF(0), name.str);
return func;
}
thd->lex->safe_to_cache_query= 0;
Item *param_1= item_list->pop();
switch (arg_count) {
case 1:
{
func= new (thd->mem_root) Item_master_gtid_wait(param_1);
break;
}
case 2:
{
Item *param_2= item_list->pop();
func= new (thd->mem_root) Item_master_gtid_wait(param_1, param_2);
break;
}
}
return func;
}
Create_func_md5 Create_func_md5::s_singleton;
Item*
......@@ -5536,6 +5590,7 @@ static Native_func_registry func_array[] =
{ { C_STRING_WITH_LEN("MAKEDATE") }, BUILDER(Create_func_makedate)},
{ { C_STRING_WITH_LEN("MAKETIME") }, BUILDER(Create_func_maketime)},
{ { C_STRING_WITH_LEN("MAKE_SET") }, BUILDER(Create_func_make_set)},
{ { C_STRING_WITH_LEN("MASTER_GTID_WAIT") }, BUILDER(Create_func_master_gtid_wait)},
{ { C_STRING_WITH_LEN("MASTER_POS_WAIT") }, BUILDER(Create_func_master_pos_wait)},
{ { C_STRING_WITH_LEN("MBRCONTAINS") }, GEOM_BUILDER(Create_func_mbr_contains)},
{ { C_STRING_WITH_LEN("MBRDISJOINT") }, GEOM_BUILDER(Create_func_mbr_disjoint)},
......
......@@ -3989,6 +3989,34 @@ longlong Item_master_pos_wait::val_int()
}
longlong Item_master_gtid_wait::val_int()
{
DBUG_ASSERT(fixed == 1);
longlong result= 0;
if (args[0]->null_value)
{
null_value= 1;
return 0;
}
null_value=0;
#ifdef HAVE_REPLICATION
THD* thd= current_thd;
longlong timeout_us;
String *gtid_pos = args[0]->val_str(&value);
if (arg_count==2 && !args[1]->null_value)
timeout_us= (longlong)(1e6*args[1]->val_real());
else
timeout_us= (longlong)-1;
result= rpl_global_gtid_waiting.wait_for_pos(thd, gtid_pos, timeout_us);
#endif
return result;
}
/**
Enables a session to wait on a condition until a timeout or a network
disconnect occurs.
......
......@@ -1642,6 +1642,22 @@ class Item_master_pos_wait :public Item_int_func
};
class Item_master_gtid_wait :public Item_int_func
{
String value;
public:
Item_master_gtid_wait(Item *a) :Item_int_func(a) {}
Item_master_gtid_wait(Item *a,Item *b) :Item_int_func(a,b) {}
longlong val_int();
const char *func_name() const { return "master_gtid_wait"; }
void fix_length_and_dec() { max_length=10+1+10+1+20+1; maybe_null=0;}
bool check_vcol_func_processor(uchar *int_arg)
{
return trace_unsupported_by_check_vcol_func_processor(func_name());
}
};
/* Handling of user definable variables */
class user_var_entry;
......
......@@ -5446,6 +5446,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
}
if (err)
return true;
thd->last_commit_gtid= gtid;
Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
LOG_EVENT_SUPPRESS_USE_F, is_transactional,
......
......@@ -780,6 +780,7 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats,
key_LOCK_wakeup_ready, key_LOCK_wait_commit;
PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
......@@ -825,6 +826,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
{ &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
{ &key_LOCK_gtid_waiting, "gtid_waiting::LOCK_gtid_waiting", 0},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
......@@ -895,6 +897,7 @@ PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_prepare_ordered;
PSI_cond_key key_COND_wait_gtid;
static PSI_cond_info all_server_conds[]=
{
......@@ -940,7 +943,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0}
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0}
};
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
......@@ -1821,6 +1825,7 @@ static void mysqld_exit(int exit_code)
but if a kill -15 signal was sent, the signal thread did
spawn the kill_server_thread thread, which is running concurrently.
*/
rpl_deinit_gtid_waiting();
rpl_deinit_gtid_slave_state();
wait_for_signal_thread_to_end();
mysql_audit_finalize();
......@@ -4201,6 +4206,7 @@ static int init_thread_environment()
#ifdef HAVE_REPLICATION
rpl_init_gtid_slave_state();
rpl_init_gtid_waiting();
#endif
DBUG_RETURN(0);
......
......@@ -256,6 +256,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
extern PSI_mutex_key key_LOCK_gtid_waiting;
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
......@@ -285,6 +286,7 @@ extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry;
extern PSI_cond_key key_COND_wait_gtid;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
......
......@@ -43,9 +43,9 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
there will not be an attempt to delete the corresponding table row before
it is even committed.
*/
lock();
mysql_mutex_lock(&LOCK_slave_state);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
unlock();
mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
sql_print_warning("Slave: Out of memory during slave state maintenance. "
......@@ -82,11 +82,20 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
}
static void
rpl_slave_state_free_element(void *arg)
{
struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
mysql_cond_destroy(&elem->COND_wait_gtid);
my_free(elem);
}
rpl_slave_state::rpl_slave_state()
: last_sub_id(0), inited(false), loaded(false)
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
}
......@@ -146,6 +155,21 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (!(elem= get_element(domain_id)))
return 1;
if (seq_no > elem->highest_seq_no)
elem->highest_seq_no= seq_no;
if (elem->min_wait_seq_no != 0 && elem->min_wait_seq_no <= seq_no)
{
/*
Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
Signal (and remove) them. The waiter will handle all the processing
of all pending MASTER_GTID_WAIT(), so we do not slow down the
replication SQL thread.
*/
mysql_mutex_assert_owner(&LOCK_slave_state);
elem->min_wait_seq_no= 0;
mysql_cond_broadcast(&elem->COND_wait_gtid);
}
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
list_elem->server_id= server_id;
......@@ -173,6 +197,9 @@ rpl_slave_state::get_element(uint32 domain_id)
return NULL;
elem->list= NULL;
elem->domain_id= domain_id;
elem->highest_seq_no= 0;
elem->min_wait_seq_no= 0;
mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
if (my_hash_insert(&hash, (uchar *)elem))
{
my_free(elem);
......@@ -378,10 +405,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
goto end;
}
lock();
mysql_mutex_lock(&LOCK_slave_state);
if ((elem= get_element(gtid->domain_id)) == NULL)
{
unlock();
mysql_mutex_unlock(&LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
err= 1;
goto end;
......@@ -410,7 +437,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
cur->next= NULL;
elem->list= cur;
}
unlock();
mysql_mutex_unlock(&LOCK_slave_state);
if (!elist)
goto end;
......@@ -470,9 +497,9 @@ IF_DBUG(dbug_break:, )
*/
if (elist)
{
lock();
mysql_mutex_lock(&LOCK_slave_state);
put_back_list(gtid->domain_id, elist);
unlock();
mysql_mutex_unlock(&LOCK_slave_state);
}
ha_rollback_trans(thd, FALSE);
......@@ -499,9 +526,9 @@ rpl_slave_state::next_sub_id(uint32 domain_id)
{
uint64 sub_id= 0;
lock();
mysql_mutex_lock(&LOCK_slave_state);
sub_id= ++last_sub_id;
unlock();
mysql_mutex_unlock(&LOCK_slave_state);
return sub_id;
}
......@@ -541,7 +568,7 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
goto err;
lock();
mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
......@@ -576,19 +603,19 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
memcpy(&best_gtid, gtid, sizeof(best_gtid));
if (my_hash_delete(&gtid_hash, rec))
{
unlock();
mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
if ((res= (*cb)(&best_gtid, data)))
{
unlock();
mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
unlock();
mysql_mutex_unlock(&LOCK_slave_state);
/* Also add any remaining extra domain_ids. */
for (i= 0; i < gtid_hash.records; ++i)
......@@ -659,11 +686,11 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
list_element *list;
uint64 best_sub_id;
lock();
mysql_mutex_lock(&LOCK_slave_state);
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (!elem || !(list= elem->list))
{
unlock();
mysql_mutex_unlock(&LOCK_slave_state);
return false;
}
......@@ -681,7 +708,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
out_gtid->seq_no= list->seq_no;
}
unlock();
mysql_mutex_unlock(&LOCK_slave_state);
return true;
}
......@@ -811,7 +838,7 @@ rpl_slave_state::is_empty()
uint32 i;
bool result= true;
lock();
mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
......@@ -821,7 +848,7 @@ rpl_slave_state::is_empty()
break;
}
}
unlock();
mysql_mutex_unlock(&LOCK_slave_state);
return result;
}
......@@ -1647,3 +1674,418 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
return 0;
}
/*
Execute a MASTER_GTID_WAIT().
The position to wait for is in gtid_str in string form.
The timeout in microseconds is in timeout_us, zero means no timeout.
Returns:
1 for error.
0 for wait completed.
-1 for wait timed out.
*/
int
gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
{
int err;
rpl_gtid *wait_pos;
uint32 count, i;
struct timespec wait_until, *wait_until_ptr;
/* Wait for the empty position returns immediately. */
if (gtid_str->length() == 0)
return 0;
if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
&count)))
{
my_error(ER_INCORRECT_GTID_STATE, MYF(0));
return 1;
}
if (timeout_us >= 0)
{
set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
wait_until_ptr= &wait_until;
}
else
wait_until_ptr= NULL;
err= 0;
for (i= 0; i < count; ++i)
{
if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
break;
}
my_free(wait_pos);
return err;
}
void
gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
{
queue_element *qe;
mysql_mutex_assert_owner(&LOCK_gtid_waiting);
if (queue_empty(&he->queue))
return;
qe= (queue_element *)queue_top(&he->queue);
qe->thd->wakeup_ready= true;
qe->wakeup_reason= queue_element::TAKEOVER;
mysql_cond_signal(&qe->thd->COND_wakeup_ready);
}
void
gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
gtid_waiting::hash_element *he)
{
mysql_mutex_assert_owner(&LOCK_gtid_waiting);
for (;;)
{
queue_element *qe;
if (queue_first_element(&he->queue) > queue_last_element(&he->queue))
break;
qe= (queue_element *)queue_top(&he->queue);
if (qe->wait_seq_no > wakeup_seq_no)
break;
queue_remove_top(&he->queue);
qe->thd->wakeup_ready= true;
qe->wakeup_reason= queue_element::DONE;
mysql_cond_signal(&qe->thd->COND_wakeup_ready);
}
}
/*
Execute a MASTER_GTID_WAIT() for one specific domain.
The implementation is optimised primarily for (1) minimal performance impact
on the slave replication threads, and secondarily for (2) quick performance
of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
read to clients in an async replication read-scaleout scenario.
To achieve (1), we have a "small" wait and a "large" wait. The small wait
contends with the replication threads on the lock on the gtid_slave_pos, so
only minimal processing is done under that lock, and only a single waiter at
a time does the small wait.
If there is already a small waiter, a new thread will either replace the
small waiter (if it needs to wait for an earlier sequence number), or
instead to a "large" wait.
Once awoken on the small wait, the waiting thread releases the lock shared
with the SQL threads quickly, and then processes all waiters currently doing
the large wait.
This way, the SQL threads only need to do a single check + possibly a
pthread_cond_signal() when updating the gtid_slave_state, and the time that
non-SQL threads contend for the lock on gtid_slave_staste is minimized.
*/
int
gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
struct timespec *wait_until)
{
bool timed_out= false;
#ifdef HAVE_REPLICATION
queue_element elem;
uint32_t domain_id= wait_gtid->domain_id;
uint64 seq_no= wait_gtid->seq_no;
hash_element *he;
rpl_slave_state::element *slave_state_elem= NULL;
const char *old_msg= NULL;
bool did_enter_cond= false;
bool takeover= false;
elem.wait_seq_no= seq_no;
elem.thd= thd;
/*
Register on the large wait before checking the small wait.
This ensures that if we find another waiter already doing the small wait,
we are sure to be woken up by that one, and thus we will not need to take
the lock on the small wait more than once in this case.
*/
mysql_mutex_lock(&LOCK_gtid_waiting);
if (!(he= register_in_wait_hash(thd, wait_gtid, &elem)))
{
mysql_mutex_unlock(&LOCK_gtid_waiting);
return 1;
}
/*
Now check the small wait, and either do the large wait or the small one,
depending on whether there is already a suitable small waiter or not.
We may need to do this multiple times, as a previous small waiter may
complete and pass the small wait on to us.
*/
for (;;)
{
uint64 wakeup_seq_no, cur_wait_seq_no;
mysql_mutex_assert_owner(&LOCK_gtid_waiting);
mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
/*
The elements in the gtid_slave_state_hash are never re-allocated once
they enter the hash, so we do not need to re-do the lookup after releasing
and re-aquiring the lock.
*/
if (!slave_state_elem &&
!(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id)))
{
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
remove_from_wait_hash(he, &elem);
mysql_mutex_unlock(&LOCK_gtid_waiting);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
return 1;
}
if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
{
/*
We do not have to wait. But we might need to wakeup other threads on
the large wait (can happen if we were woken up to take over the small
wait, and SQL thread raced with us to reach the waited-for GTID.
*/
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
thd->wakeup_ready= 0;
process_wait_hash(wakeup_seq_no, he);
/*
Since we already checked wakeup_seq_no, we are sure that
process_wait_hash() will mark us done.
*/
DBUG_ASSERT(thd->wakeup_ready);
if (thd->wakeup_ready)
{
if (takeover)
promote_new_waiter(he);
break;
}
}
else if ((cur_wait_seq_no= slave_state_elem->min_wait_seq_no) == 0 ||
cur_wait_seq_no > seq_no)
{
/*
We have to do the small wait ourselves (stealing it from any thread that
might already be waiting for a later seq_no).
*/
slave_state_elem->min_wait_seq_no= seq_no;
if (cur_wait_seq_no != 0)
{
/* We stole the wait, so wake up the old waiting thread. */
mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
}
/* Do the small wait. */
if (did_enter_cond)
thd->exit_cond(old_msg);
else
mysql_mutex_unlock(&LOCK_gtid_waiting);
old_msg= thd->enter_cond(&slave_state_elem->COND_wait_gtid,
&rpl_global_gtid_slave_state.LOCK_slave_state,
"Waiting in MASTER_GTID_WAIT() (primary waiter)");
do
{
if (thd->check_killed())
slave_state_elem->min_wait_seq_no = 0;
else if (wait_until)
{
int err=
mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
&rpl_global_gtid_slave_state.LOCK_slave_state,
wait_until);
if (err == ETIMEDOUT || err == ETIME)
{
timed_out= true;
slave_state_elem->min_wait_seq_no = 0;
}
}
else
mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
&rpl_global_gtid_slave_state.LOCK_slave_state);
} while (slave_state_elem->min_wait_seq_no == seq_no);
/*
Check the new gtid_slave_state. We could be woken up because our seq_no
has been reached, or because someone else stole the small wait from us.
(Or because of kill/timeout).
*/
wakeup_seq_no= slave_state_elem->highest_seq_no;
thd->exit_cond(old_msg);
mysql_mutex_lock(&LOCK_gtid_waiting);
/*
Note that hash_entry pointers do not change once allocated, so we do
not need to lookup `he' again after re-aquiring the lock.
*/
thd->wakeup_ready= 0;
process_wait_hash(wakeup_seq_no, he);
if (thd->wakeup_ready)
promote_new_waiter(he);
else if (thd->killed || timed_out)
{
remove_from_wait_hash(he, &elem);
promote_new_waiter(he);
if (thd->killed)
thd->send_kill_message();
break;
}
}
else
{
/* We have to do the large wait. */
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
thd->wakeup_ready= 0;
}
takeover= false;
old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
"Waiting in MASTER_GTID_WAIT()");
while (!thd->wakeup_ready && !thd->check_killed() && !timed_out)
{
thd_wait_begin(thd, THD_WAIT_BINLOG);
if (wait_until)
{
int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
&LOCK_gtid_waiting, wait_until);
if (err == ETIMEDOUT || err == ETIME)
timed_out= true;
}
else
mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
thd_wait_end(thd);
}
if (elem.wakeup_reason == queue_element::DONE)
break;
takeover= true;
if (thd->killed || timed_out)
{
remove_from_wait_hash(he, &elem);
/*
If we got kill/timeout _and_ we were asked to takeover the small wait,
we need to pass on that task to someone else.
*/
if (thd->wakeup_ready && elem.wakeup_reason == queue_element::TAKEOVER)
promote_new_waiter(he);
if (thd->killed)
thd->send_kill_message();
break;
}
}
if (did_enter_cond)
thd->exit_cond(old_msg);
else
mysql_mutex_unlock(&LOCK_gtid_waiting);
#endif /* HAVE_REPLICATION */
return timed_out ? -1 : 0;
}
static void
free_hash_element(void *p)
{
gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
delete_queue(&e->queue);
my_free(e);
}
void
gtid_waiting::init()
{
my_hash_init(&hash, &my_charset_bin, 32,
offsetof(hash_element, domain_id), sizeof(uint32), NULL,
free_hash_element, HASH_UNIQUE);
mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
}
void
gtid_waiting::destroy()
{
mysql_mutex_destroy(&LOCK_gtid_waiting);
my_hash_free(&hash);
}
static int
cmp_queue_elem(void *, uchar *a, uchar *b)
{
uint64 seq_no_a= *(uint64 *)a;
uint64 seq_no_b= *(uint64 *)b;
if (seq_no_a < seq_no_b)
return -1;
else if (seq_no_a == seq_no_b)
return 0;
else
return 1;
}
gtid_waiting::hash_element *
gtid_waiting::get_entry(uint32 domain_id)
{
hash_element *e;
if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
return e;
if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME))))
{
my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*e));
return NULL;
}
if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
my_free(e);
return NULL;
}
e->domain_id= domain_id;
if (my_hash_insert(&hash, (uchar *)e))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
delete_queue(&e->queue);
my_free(e);
return NULL;
}
return e;
}
gtid_waiting::hash_element *
gtid_waiting::register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
gtid_waiting::queue_element *elem)
{
hash_element *e;
mysql_mutex_assert_owner(&LOCK_gtid_waiting);
if (!(e= get_entry(wait_gtid->domain_id)))
return NULL;
if (queue_insert_safe(&e->queue, (uchar *)elem))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
return NULL;
}
return e;
}
void
gtid_waiting::remove_from_wait_hash(gtid_waiting::hash_element *e,
gtid_waiting::queue_element *elem)
{
mysql_mutex_assert_owner(&LOCK_gtid_waiting);
queue_remove(&e->queue, elem->queue_idx);
}
......@@ -16,6 +16,10 @@
#ifndef RPL_GTID_H
#define RPL_GTID_H
#include "hash.h"
#include "queues.h"
/* Definitions for MariaDB global transaction ID (GTID). */
......@@ -61,6 +65,15 @@ struct rpl_slave_state
{
struct list_element *list;
uint32 domain_id;
/* Highest seq_no seen so far in this domain. */
uint64 highest_seq_no;
/*
If min_wait_seq_no is non-zero, then it is the smallest seq_no in this
domain that someone is doing MASTER_GTID_WAIT() on. When we reach this
seq_no, we need to signal the waiter on COND_wait_gtid.
*/
uint64 min_wait_seq_no;
mysql_cond_t COND_wait_gtid;
list_element *grab_list() { list_element *l= list; list= NULL; return l; }
void add(list_element *l)
......@@ -99,9 +112,6 @@ struct rpl_slave_state
bool in_statement);
bool is_empty();
void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
......@@ -204,6 +214,49 @@ struct slave_connection_state
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
};
/*
Structure to keep track of threads waiting in MASTER_GTID_WAIT().
Since replication is (mostly) single-threaded, we want to minimise the
performance impact on that from MASTER_GTID_WAIT(). To achieve this, we
are careful to keep the common lock between replication threads and
MASTER_GTID_WAIT threads held for as short as possible. We keep only
a single thread waiting to be notified by the replication threads; this
thread then handles all the (potentially heavy) lifting of dealing with
all current waiting threads.
*/
struct gtid_waiting {
/* Elements in the hash, basically a priority queue for each domain. */
struct hash_element {
QUEUE queue;
uint32 domain_id;
};
/* A priority queue to handle waiters in one domain in seq_no order. */
struct queue_element {
uint64 wait_seq_no;
THD *thd;
int queue_idx;
enum { DONE, TAKEOVER } wakeup_reason;
};
mysql_mutex_t LOCK_gtid_waiting;
HASH hash;
void init();
void destroy();
hash_element *get_entry(uint32 domain_id);
int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us);
void promote_new_waiter(gtid_waiting::hash_element *he);
int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until);
void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he);
hash_element *register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
queue_element *elem);
void remove_from_wait_hash(hash_element *e, queue_element *elem);
};
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
bool *first);
extern int gtid_check_rpl_slave_state_table(TABLE *table);
......
......@@ -37,6 +37,8 @@ static int count_relay_log_space(Relay_log_info* rli);
domain).
*/
rpl_slave_state rpl_global_gtid_slave_state;
/* Object used for MASTER_GTID_WAIT(). */
gtid_waiting rpl_global_gtid_waiting;
// Defined in slave.cc
......@@ -1312,9 +1314,9 @@ rpl_load_gtid_slave_state(THD *thd)
uint32 i;
DBUG_ENTER("rpl_load_gtid_slave_state");
rpl_global_gtid_slave_state.lock();
mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state.loaded;
rpl_global_gtid_slave_state.unlock();
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (loaded)
DBUG_RETURN(0);
......@@ -1414,10 +1416,10 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
rpl_global_gtid_slave_state.lock();
mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (rpl_global_gtid_slave_state.loaded)
{
rpl_global_gtid_slave_state.unlock();
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
goto end;
}
......@@ -1429,7 +1431,7 @@ rpl_load_gtid_slave_state(THD *thd)
tmp_entry.sub_id,
tmp_entry.gtid.seq_no)))
{
rpl_global_gtid_slave_state.unlock();
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
......@@ -1442,14 +1444,14 @@ rpl_load_gtid_slave_state(THD *thd)
mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
entry->gtid.seq_no))
{
rpl_global_gtid_slave_state.unlock();
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
}
rpl_global_gtid_slave_state.loaded= true;
rpl_global_gtid_slave_state.unlock();
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
err= 0; /* Clear HA_ERR_END_OF_FILE */
......
......@@ -702,6 +702,7 @@ int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
extern struct rpl_slave_state rpl_global_gtid_slave_state;
extern gtid_waiting rpl_global_gtid_waiting;
int rpl_load_gtid_slave_state(THD *thd);
int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
......
......@@ -6547,7 +6547,7 @@ ER_UNTIL_REQUIRES_USING_GTID
ER_GTID_STRICT_OUT_OF_ORDER
eng "An attempt was made to binlog GTID %u-%u-%llu which would create an out-of-order sequence number with existing GTID %u-%u-%llu, and gtid strict mode is enabled."
ER_GTID_START_FROM_BINLOG_HOLE
eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though both a prior and a subsequent sequence number does exist), and GTID strict mode is enabled"
eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though a subsequent sequence number does exist), and GTID strict mode is enabled"
ER_SLAVE_UNEXPECTED_MASTER_SWITCH
eng "Unexpected GTID received from master after reconnect. This normally indicates that the master server was replaced without restarting the slave threads. %s"
ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
......
......@@ -1266,6 +1266,7 @@ void THD::init(void)
set_status_var_init();
bzero((char *) &org_status_var, sizeof(org_status_var));
start_bytes_received= 0;
last_commit_gtid.seq_no= 0;
if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG;
......
......@@ -44,6 +44,7 @@
#include "thr_lock.h" /* thr_lock_type, THR_LOCK_DATA,
THR_LOCK_INFO */
#include "my_apc.h"
#include "rpl_gtid.h"
class Reprepare_observer;
class Relay_log_info;
......@@ -3405,6 +3406,12 @@ class THD :public Statement,
*/
LEX_STRING invoker_user;
LEX_STRING invoker_host;
/* Protect against add/delete of temporary tables in parallel replication */
void rgi_lock_temporary_tables();
void rgi_unlock_temporary_tables();
bool rgi_have_temporary_tables();
public:
/*
Flag, mutex and condition for a thread to wait for a signal from another
thread.
......@@ -3415,12 +3422,12 @@ class THD :public Statement,
bool wakeup_ready;
mysql_mutex_t LOCK_wakeup_ready;
mysql_cond_t COND_wakeup_ready;
/*
The GTID assigned to the last commit. If no GTID was assigned to any commit
so far, this is indicated by last_commit_gtid.seq_no == 0.
*/
rpl_gtid last_commit_gtid;
/* Protect against add/delete of temporary tables in parallel replication */
void rgi_lock_temporary_tables();
void rgi_unlock_temporary_tables();
bool rgi_have_temporary_tables();
public:
inline void lock_temporary_tables()
{
if (rgi_slave)
......
......@@ -3965,6 +3965,20 @@ rpl_deinit_gtid_slave_state()
}
void
rpl_init_gtid_waiting()
{
rpl_global_gtid_waiting.init();
}
void
rpl_deinit_gtid_waiting()
{
rpl_global_gtid_waiting.destroy();
}
/*
Format the current GTID state as a string, for returning the value of
@@global.gtid_slave_pos.
......
......@@ -70,6 +70,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
void rpl_init_gtid_waiting();
void rpl_deinit_gtid_waiting();
int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
int rpl_append_gtid_state(String *dest, bool use_binlog);
int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog);
......
......@@ -1538,6 +1538,33 @@ static Sys_var_gtid_binlog_state Sys_gtid_binlog_state(
GLOBAL_VAR(opt_gtid_binlog_state_dummy), NO_CMD_LINE);
static Sys_var_last_gtid Sys_last_gtid(
"last_gtid", "The GTID of the last commit (if binlogging was enabled), "
"or the empty string if none.",
READ_ONLY sys_var::ONLY_SESSION, NO_CMD_LINE);
uchar *
Sys_var_last_gtid::session_value_ptr(THD *thd, LEX_STRING *base)
{
char buf[10+1+10+1+20+1];
String str(buf, sizeof(buf), system_charset_info);
char *p;
bool first= true;
str.length(0);
if ((thd->last_commit_gtid.seq_no > 0 &&
rpl_slave_state_tostring_helper(&str, &thd->last_commit_gtid, &first)) ||
!(p= thd->strmake(str.ptr(), str.length())))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
return NULL;
}
return (uchar *)p;
}
static bool
check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
{
......
......@@ -2211,3 +2211,53 @@ class Sys_var_gtid_binlog_state: public sys_var
}
uchar *global_value_ptr(THD *thd, LEX_STRING *base);
};
/**
Class for @@session.last_gtid.
*/
class Sys_var_last_gtid: public sys_var
{
public:
Sys_var_last_gtid(const char *name_arg,
const char *comment, int flag_args, CMD_LINE getopt)
: sys_var(&all_sys_vars, name_arg, comment, flag_args, 0, getopt.id,
getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
NULL, NULL, NULL)
{
option.var_type= GET_STR;
}
bool do_check(THD *thd, set_var *var)
{
DBUG_ASSERT(false);
return true;
}
bool session_update(THD *thd, set_var *var)
{
DBUG_ASSERT(false);
return true;
}
bool global_update(THD *thd, set_var *var)
{
DBUG_ASSERT(false);
return true;
}
bool check_update_type(Item_result type) {
DBUG_ASSERT(false);
return false;
}
void session_save_default(THD *thd, set_var *var)
{
DBUG_ASSERT(false);
}
void global_save_default(THD *thd, set_var *var)
{
DBUG_ASSERT(false);
}
uchar *session_value_ptr(THD *thd, LEX_STRING *base);
uchar *global_value_ptr(THD *thd, LEX_STRING *base)
{
DBUG_ASSERT(false);
return NULL;
}
};
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment