Commit 76e929a9 authored by unknown's avatar unknown

MDEV-4984: Implement MASTER_GTID_WAIT() and @@LAST_GTID.

Rewrite the gtid_waiting::wait_for_gtid() function.
The code was rubbish (and buggy). Now the logic is
much clearer.

Also fix a missing slave sync that could cause test failure.
parent 3c97d24f
...@@ -131,6 +131,7 @@ include/stop_slave.inc ...@@ -131,6 +131,7 @@ include/stop_slave.inc
CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_3; CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_3;
include/start_slave.inc include/start_slave.inc
DROP TABLE t1,t2; DROP TABLE t1,t2;
include/save_master_gtid.inc
*** A few more checks for BINLOG_GTID_POS function *** *** A few more checks for BINLOG_GTID_POS function ***
SELECT BINLOG_GTID_POS(); SELECT BINLOG_GTID_POS();
ERROR 42000: Incorrect parameter count in the call to native function 'BINLOG_GTID_POS' ERROR 42000: Incorrect parameter count in the call to native function 'BINLOG_GTID_POS'
...@@ -167,6 +168,7 @@ NULL ...@@ -167,6 +168,7 @@ NULL
Warnings: Warnings:
Warning 1916 Got overflow when converting '18446744073709551616' to INT. Value truncated. Warning 1916 Got overflow when converting '18446744073709551616' to INT. Value truncated.
*** Some tests of @@GLOBAL.gtid_binlog_state *** *** Some tests of @@GLOBAL.gtid_binlog_state ***
include/sync_with_master_gtid.inc
include/stop_slave.inc include/stop_slave.inc
SET @old_state= @@GLOBAL.gtid_binlog_state; SET @old_state= @@GLOBAL.gtid_binlog_state;
SET GLOBAL gtid_binlog_state = ''; SET GLOBAL gtid_binlog_state = '';
......
...@@ -139,6 +139,7 @@ eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_3; ...@@ -139,6 +139,7 @@ eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_3;
connection server_1; connection server_1;
DROP TABLE t1,t2; DROP TABLE t1,t2;
--source include/save_master_gtid.inc
--echo *** A few more checks for BINLOG_GTID_POS function *** --echo *** A few more checks for BINLOG_GTID_POS function ***
--let $valid_binlog_name = query_get_value(SHOW BINARY LOGS,Log_name,1) --let $valid_binlog_name = query_get_value(SHOW BINARY LOGS,Log_name,1)
...@@ -160,6 +161,7 @@ eval SELECT BINLOG_GTID_POS('$valid_binlog_name',18446744073709551616); ...@@ -160,6 +161,7 @@ eval SELECT BINLOG_GTID_POS('$valid_binlog_name',18446744073709551616);
--echo *** Some tests of @@GLOBAL.gtid_binlog_state *** --echo *** Some tests of @@GLOBAL.gtid_binlog_state ***
--connection server_2 --connection server_2
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc --source include/stop_slave.inc
--connection server_1 --connection server_1
......
...@@ -157,7 +157,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, ...@@ -157,7 +157,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (seq_no > elem->highest_seq_no) if (seq_no > elem->highest_seq_no)
elem->highest_seq_no= seq_no; elem->highest_seq_no= seq_no;
if (elem->min_wait_seq_no != 0 && elem->min_wait_seq_no <= seq_no) if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no)
{ {
/* /*
Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear. Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
...@@ -166,7 +166,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, ...@@ -166,7 +166,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
replication SQL thread. replication SQL thread.
*/ */
mysql_mutex_assert_owner(&LOCK_slave_state); mysql_mutex_assert_owner(&LOCK_slave_state);
elem->min_wait_seq_no= 0; elem->gtid_waiter= NULL;
mysql_cond_broadcast(&elem->COND_wait_gtid); mysql_cond_broadcast(&elem->COND_wait_gtid);
} }
...@@ -198,7 +198,7 @@ rpl_slave_state::get_element(uint32 domain_id) ...@@ -198,7 +198,7 @@ rpl_slave_state::get_element(uint32 domain_id)
elem->list= NULL; elem->list= NULL;
elem->domain_id= domain_id; elem->domain_id= domain_id;
elem->highest_seq_no= 0; elem->highest_seq_no= 0;
elem->min_wait_seq_no= 0; elem->gtid_waiter= NULL;
mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0); mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
if (my_hash_insert(&hash, (uchar *)elem)) if (my_hash_insert(&hash, (uchar *)elem))
{ {
...@@ -1732,8 +1732,7 @@ gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he) ...@@ -1732,8 +1732,7 @@ gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
if (queue_empty(&he->queue)) if (queue_empty(&he->queue))
return; return;
qe= (queue_element *)queue_top(&he->queue); qe= (queue_element *)queue_top(&he->queue);
qe->thd->wakeup_ready= true; qe->do_small_wait= true;
qe->wakeup_reason= queue_element::TAKEOVER;
mysql_cond_signal(&qe->thd->COND_wakeup_ready); mysql_cond_signal(&qe->thd->COND_wakeup_ready);
} }
...@@ -1747,14 +1746,14 @@ gtid_waiting::process_wait_hash(uint64 wakeup_seq_no, ...@@ -1747,14 +1746,14 @@ gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
{ {
queue_element *qe; queue_element *qe;
if (queue_first_element(&he->queue) > queue_last_element(&he->queue)) if (queue_empty(&he->queue))
break; break;
qe= (queue_element *)queue_top(&he->queue); qe= (queue_element *)queue_top(&he->queue);
if (qe->wait_seq_no > wakeup_seq_no) if (qe->wait_seq_no > wakeup_seq_no)
break; break;
DBUG_ASSERT(!qe->done);
queue_remove_top(&he->queue); queue_remove_top(&he->queue);
qe->thd->wakeup_ready= true; qe->done= true;;
qe->wakeup_reason= queue_element::DONE;
mysql_cond_signal(&qe->thd->COND_wakeup_ready); mysql_cond_signal(&qe->thd->COND_wakeup_ready);
} }
} }
...@@ -1775,15 +1774,29 @@ gtid_waiting::process_wait_hash(uint64 wakeup_seq_no, ...@@ -1775,15 +1774,29 @@ gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
If there is already a small waiter, a new thread will either replace the If there is already a small waiter, a new thread will either replace the
small waiter (if it needs to wait for an earlier sequence number), or small waiter (if it needs to wait for an earlier sequence number), or
instead to a "large" wait. instead do a "large" wait.
Once awoken on the small wait, the waiting thread releases the lock shared Once awoken on the small wait, the waiting thread releases the lock shared
with the SQL threads quickly, and then processes all waiters currently doing with the SQL threads quickly, and then processes all waiters currently doing
the large wait. the large wait using a different lock that does not impact replication.
This way, the SQL threads only need to do a single check + possibly a This way, the SQL threads only need to do a single check + possibly a
pthread_cond_signal() when updating the gtid_slave_state, and the time that pthread_cond_signal() when updating the gtid_slave_state, and the time that
non-SQL threads contend for the lock on gtid_slave_staste is minimized. non-SQL threads contend for the lock on gtid_slave_state is minimized.
There is always at least one thread that has the responsibility to ensure
that there is a small waiter; this thread has queue_element::do_small_wait
set to true. This thread will do the small wait until it is done, at which
point it will make sure to pass on the responsibility to another thread.
Normally only one thread has do_small_wait==true, but it can occasionally
happen that there is more than one, when threads race one another for the
lock on the small wait (this results in slightly increased activity on the
small lock but is otherwise harmless).
Returns:
0 Wait completed normally
-1 Wait completed due to timeout
1 An error (my_error() will have been called to set the error in the da)
*/ */
int int
gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
...@@ -1798,35 +1811,45 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, ...@@ -1798,35 +1811,45 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
rpl_slave_state::element *slave_state_elem= NULL; rpl_slave_state::element *slave_state_elem= NULL;
const char *old_msg= NULL; const char *old_msg= NULL;
bool did_enter_cond= false; bool did_enter_cond= false;
bool takeover= false;
elem.wait_seq_no= seq_no; elem.wait_seq_no= seq_no;
elem.thd= thd; elem.thd= thd;
/* elem.done= false;
Register on the large wait before checking the small wait.
This ensures that if we find another waiter already doing the small wait,
we are sure to be woken up by that one, and thus we will not need to take
the lock on the small wait more than once in this case.
*/
mysql_mutex_lock(&LOCK_gtid_waiting); mysql_mutex_lock(&LOCK_gtid_waiting);
if (!(he= register_in_wait_hash(thd, wait_gtid, &elem))) if (!(he= get_entry(wait_gtid->domain_id)))
{ {
mysql_mutex_unlock(&LOCK_gtid_waiting); mysql_mutex_unlock(&LOCK_gtid_waiting);
return 1; return 1;
} }
/* /*
Now check the small wait, and either do the large wait or the small one, If there is already another waiter with seq_no no larger than our own,
depending on whether there is already a suitable small waiter or not. we are sure that there is already a small waiter that will wake us up
(or later pass the small wait responsibility to us). So in this case, we
do not need to touch the small wait lock at all.
*/
elem.do_small_wait=
(queue_empty(&he->queue) ||
((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no);
We may need to do this multiple times, as a previous small waiter may if (register_in_wait_queue(thd, wait_gtid, he, &elem))
complete and pass the small wait on to us. {
mysql_mutex_unlock(&LOCK_gtid_waiting);
return 1;
}
/*
Loop, doing either the small or large wait as appropriate, until either
the position waited for is reached, or we get a kill or timeout.
*/ */
for (;;) for (;;)
{ {
uint64 wakeup_seq_no, cur_wait_seq_no;
mysql_mutex_assert_owner(&LOCK_gtid_waiting); mysql_mutex_assert_owner(&LOCK_gtid_waiting);
if (elem.do_small_wait)
{
uint64 wakeup_seq_no;
queue_element *cur_waiter;
mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state); mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
/* /*
The elements in the gtid_slave_state_hash are never re-allocated once The elements in the gtid_slave_state_hash are never re-allocated once
...@@ -1837,7 +1860,11 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, ...@@ -1837,7 +1860,11 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
!(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id))) !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id)))
{ {
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
remove_from_wait_hash(he, &elem); remove_from_wait_queue(he, &elem);
promote_new_waiter(he);
if (did_enter_cond)
thd->exit_cond(old_msg);
else
mysql_mutex_unlock(&LOCK_gtid_waiting); mysql_mutex_unlock(&LOCK_gtid_waiting);
my_error(ER_OUT_OF_RESOURCES, MYF(0)); my_error(ER_OUT_OF_RESOURCES, MYF(0));
return 1; return 1;
...@@ -1846,51 +1873,53 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, ...@@ -1846,51 +1873,53 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no) if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
{ {
/* /*
We do not have to wait. But we might need to wakeup other threads on We do not have to wait. (We will be removed from the wait queue when
the large wait (can happen if we were woken up to take over the small we call process_wait_hash() below.
wait, and SQL thread raced with us to reach the waited-for GTID.
*/ */
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
thd->wakeup_ready= 0; }
process_wait_hash(wakeup_seq_no, he); else if ((cur_waiter= slave_state_elem->gtid_waiter) &&
slave_state_elem->min_wait_seq_no <= seq_no)
{
/* /*
Since we already checked wakeup_seq_no, we are sure that There is already a suitable small waiter, go do the large wait.
process_wait_hash() will mark us done. (Normally we would not have needed to check the small wait in this
case, but it can happen if we race with another thread for the small
lock).
*/ */
DBUG_ASSERT(thd->wakeup_ready); elem.do_small_wait= false;
if (thd->wakeup_ready) mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
{
if (takeover)
promote_new_waiter(he);
break;
}
} }
else if ((cur_wait_seq_no= slave_state_elem->min_wait_seq_no) == 0 || else
cur_wait_seq_no > seq_no)
{ {
/* /*
We have to do the small wait ourselves (stealing it from any thread that We have to do the small wait ourselves (stealing it from any thread
might already be waiting for a later seq_no). that might already be waiting for a later seq_no).
*/ */
slave_state_elem->gtid_waiter= &elem;
slave_state_elem->min_wait_seq_no= seq_no; slave_state_elem->min_wait_seq_no= seq_no;
if (cur_wait_seq_no != 0) if (cur_waiter)
{ {
/* We stole the wait, so wake up the old waiting thread. */ /* We stole the wait, so wake up the old waiting thread. */
mysql_cond_signal(&slave_state_elem->COND_wait_gtid); mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
} }
/* Do the small wait. */
/* Release the large lock, and do the small wait. */
if (did_enter_cond) if (did_enter_cond)
{
thd->exit_cond(old_msg); thd->exit_cond(old_msg);
did_enter_cond= false;
}
else else
mysql_mutex_unlock(&LOCK_gtid_waiting); mysql_mutex_unlock(&LOCK_gtid_waiting);
old_msg=
old_msg= thd->enter_cond(&slave_state_elem->COND_wait_gtid, thd->enter_cond(&slave_state_elem->COND_wait_gtid,
&rpl_global_gtid_slave_state.LOCK_slave_state, &rpl_global_gtid_slave_state.LOCK_slave_state,
"Waiting in MASTER_GTID_WAIT() (primary waiter)"); "Waiting in MASTER_GTID_WAIT() (primary waiter)");
do do
{ {
if (thd->check_killed()) if (thd->check_killed())
slave_state_elem->min_wait_seq_no = 0; break;
else if (wait_until) else if (wait_until)
{ {
int err= int err=
...@@ -1900,50 +1929,48 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, ...@@ -1900,50 +1929,48 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
if (err == ETIMEDOUT || err == ETIME) if (err == ETIMEDOUT || err == ETIME)
{ {
timed_out= true; timed_out= true;
slave_state_elem->min_wait_seq_no = 0; break;
} }
} }
else else
mysql_cond_wait(&slave_state_elem->COND_wait_gtid, mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
&rpl_global_gtid_slave_state.LOCK_slave_state); &rpl_global_gtid_slave_state.LOCK_slave_state);
} while (slave_state_elem->min_wait_seq_no == seq_no); } while (slave_state_elem->gtid_waiter == &elem);
/*
Check the new gtid_slave_state. We could be woken up because our seq_no
has been reached, or because someone else stole the small wait from us.
(Or because of kill/timeout).
*/
wakeup_seq_no= slave_state_elem->highest_seq_no; wakeup_seq_no= slave_state_elem->highest_seq_no;
/*
If we aborted due to timeout or kill, remove us as waiter.
If we were replaced by another waiter with a smaller seq_no, then we
no longer have responsibility for the small wait.
*/
if ((cur_waiter= slave_state_elem->gtid_waiter))
{
if (cur_waiter == &elem)
slave_state_elem->gtid_waiter= NULL;
else if (slave_state_elem->min_wait_seq_no <= seq_no)
elem.do_small_wait= false;
}
thd->exit_cond(old_msg); thd->exit_cond(old_msg);
mysql_mutex_lock(&LOCK_gtid_waiting); mysql_mutex_lock(&LOCK_gtid_waiting);
}
/* /*
Note that hash_entry pointers do not change once allocated, so we do Note that hash_entry pointers do not change once allocated, so we do
not need to lookup `he' again after re-aquiring the lock. not need to lookup `he' again after re-aquiring LOCK_gtid_waiting.
*/ */
thd->wakeup_ready= 0;
process_wait_hash(wakeup_seq_no, he); process_wait_hash(wakeup_seq_no, he);
if (thd->wakeup_ready)
promote_new_waiter(he);
else if (thd->killed || timed_out)
{
remove_from_wait_hash(he, &elem);
promote_new_waiter(he);
if (thd->killed)
thd->send_kill_message();
break;
}
} }
else else
{ {
/* We have to do the large wait. */ /* Do the large wait. */
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); if (!did_enter_cond)
thd->wakeup_ready= 0; {
}
takeover= false;
old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting, old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
"Waiting in MASTER_GTID_WAIT()"); "Waiting in MASTER_GTID_WAIT()");
while (!thd->wakeup_ready && !thd->check_killed() && !timed_out) did_enter_cond= true;
}
while (!elem.done && !thd->check_killed())
{ {
thd_wait_begin(thd, THD_WAIT_BINLOG); thd_wait_begin(thd, THD_WAIT_BINLOG);
if (wait_until) if (wait_until)
...@@ -1956,31 +1983,35 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, ...@@ -1956,31 +1983,35 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
else else
mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting); mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
thd_wait_end(thd); thd_wait_end(thd);
if (elem.do_small_wait || timed_out)
break;
}
} }
if (thd->killed || timed_out) if ((thd->killed || timed_out) && !elem.done)
{
/* Aborted, so remove ourselves from the hash. */
remove_from_wait_queue(he, &elem);
elem.done= true;
}
if (elem.done)
{ {
remove_from_wait_hash(he, &elem);
/* /*
If we got kill/timeout _and_ we were asked to takeover the small wait, If our wait is done, but we have (or were passed) responsibility for
we need to pass on that task to someone else. the small wait, then we need to pass on that task to someone else.
*/ */
if (thd->wakeup_ready && elem.wakeup_reason == queue_element::TAKEOVER) if (elem.do_small_wait)
promote_new_waiter(he); promote_new_waiter(he);
if (thd->killed)
thd->send_kill_message();
break; break;
} }
if (elem.wakeup_reason == queue_element::DONE)
break;
takeover= true;
} }
if (did_enter_cond) if (did_enter_cond)
thd->exit_cond(old_msg); thd->exit_cond(old_msg);
else else
mysql_mutex_unlock(&LOCK_gtid_waiting); mysql_mutex_unlock(&LOCK_gtid_waiting);
if (thd->killed)
thd->send_kill_message();
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
return timed_out ? -1 : 0; return timed_out ? -1 : 0;
} }
...@@ -2060,32 +2091,28 @@ gtid_waiting::get_entry(uint32 domain_id) ...@@ -2060,32 +2091,28 @@ gtid_waiting::get_entry(uint32 domain_id)
} }
gtid_waiting::hash_element * int
gtid_waiting::register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid, gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid,
gtid_waiting::hash_element *he,
gtid_waiting::queue_element *elem) gtid_waiting::queue_element *elem)
{ {
hash_element *e;
mysql_mutex_assert_owner(&LOCK_gtid_waiting); mysql_mutex_assert_owner(&LOCK_gtid_waiting);
if (!(e= get_entry(wait_gtid->domain_id))) if (queue_insert_safe(&he->queue, (uchar *)elem))
return NULL;
if (queue_insert_safe(&e->queue, (uchar *)elem))
{ {
my_error(ER_OUT_OF_RESOURCES, MYF(0)); my_error(ER_OUT_OF_RESOURCES, MYF(0));
return NULL; return 1;
} }
return e; return 0;
} }
void void
gtid_waiting::remove_from_wait_hash(gtid_waiting::hash_element *e, gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
gtid_waiting::queue_element *elem) gtid_waiting::queue_element *elem)
{ {
mysql_mutex_assert_owner(&LOCK_gtid_waiting); mysql_mutex_assert_owner(&LOCK_gtid_waiting);
queue_remove(&e->queue, elem->queue_idx); queue_remove(&he->queue, elem->queue_idx);
} }
...@@ -40,6 +40,57 @@ enum enum_gtid_skip_type { ...@@ -40,6 +40,57 @@ enum enum_gtid_skip_type {
}; };
/*
Structure to keep track of threads waiting in MASTER_GTID_WAIT().
Since replication is (mostly) single-threaded, we want to minimise the
performance impact on that from MASTER_GTID_WAIT(). To achieve this, we
are careful to keep the common lock between replication threads and
MASTER_GTID_WAIT threads held for as short as possible. We keep only
a single thread waiting to be notified by the replication threads; this
thread then handles all the (potentially heavy) lifting of dealing with
all current waiting threads.
*/
struct gtid_waiting {
/* Elements in the hash, basically a priority queue for each domain. */
struct hash_element {
QUEUE queue;
uint32 domain_id;
};
/* A priority queue to handle waiters in one domain in seq_no order. */
struct queue_element {
uint64 wait_seq_no;
THD *thd;
int queue_idx;
/*
do_small_wait is true if we have responsibility for ensuring that there
is a small waiter.
*/
bool do_small_wait;
/*
The flag `done' is set when the wait is completed (either due to reaching
the position waited for, or due to timeout or kill). The queue_element
is in the queue if and only if `done' is true.
*/
bool done;
};
mysql_mutex_t LOCK_gtid_waiting;
HASH hash;
void init();
void destroy();
hash_element *get_entry(uint32 domain_id);
int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us);
void promote_new_waiter(gtid_waiting::hash_element *he);
int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until);
void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he);
int register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid, hash_element *he,
queue_element *elem);
void remove_from_wait_queue(hash_element *he, queue_element *elem);
};
/* /*
Replication slave state. Replication slave state.
...@@ -68,9 +119,14 @@ struct rpl_slave_state ...@@ -68,9 +119,14 @@ struct rpl_slave_state
/* Highest seq_no seen so far in this domain. */ /* Highest seq_no seen so far in this domain. */
uint64 highest_seq_no; uint64 highest_seq_no;
/* /*
If min_wait_seq_no is non-zero, then it is the smallest seq_no in this If this is non-NULL, then it is the waiter responsible for the small
domain that someone is doing MASTER_GTID_WAIT() on. When we reach this wait in MASTER_GTID_WAIT().
seq_no, we need to signal the waiter on COND_wait_gtid. */
gtid_waiting::queue_element *gtid_waiter;
/*
If gtid_waiter is non-NULL, then this is the seq_no that its
MASTER_GTID_WAIT() is waiting on. When we reach this seq_no, we need to
signal the waiter on COND_wait_gtid.
*/ */
uint64 min_wait_seq_no; uint64 min_wait_seq_no;
mysql_cond_t COND_wait_gtid; mysql_cond_t COND_wait_gtid;
...@@ -215,48 +271,6 @@ struct slave_connection_state ...@@ -215,48 +271,6 @@ struct slave_connection_state
}; };
/*
Structure to keep track of threads waiting in MASTER_GTID_WAIT().
Since replication is (mostly) single-threaded, we want to minimise the
performance impact on that from MASTER_GTID_WAIT(). To achieve this, we
are careful to keep the common lock between replication threads and
MASTER_GTID_WAIT threads held for as short as possible. We keep only
a single thread waiting to be notified by the replication threads; this
thread then handles all the (potentially heavy) lifting of dealing with
all current waiting threads.
*/
struct gtid_waiting {
/* Elements in the hash, basically a priority queue for each domain. */
struct hash_element {
QUEUE queue;
uint32 domain_id;
};
/* A priority queue to handle waiters in one domain in seq_no order. */
struct queue_element {
uint64 wait_seq_no;
THD *thd;
int queue_idx;
enum { DONE, TAKEOVER } wakeup_reason;
};
mysql_mutex_t LOCK_gtid_waiting;
HASH hash;
void init();
void destroy();
hash_element *get_entry(uint32 domain_id);
int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us);
void promote_new_waiter(gtid_waiting::hash_element *he);
int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until);
void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he);
hash_element *register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
queue_element *elem);
void remove_from_wait_hash(hash_element *e, queue_element *elem);
};
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
bool *first); bool *first);
extern int gtid_check_rpl_slave_state_table(TABLE *table); extern int gtid_check_rpl_slave_state_table(TABLE *table);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment