Commit a83c7ab1 authored by Brandon Nesterenko's avatar Brandon Nesterenko

MDEV-11853: semisync thread can be killed after sync binlog but before ACK in the sync state

Problem:
========
If a primary is shutdown during an active semi-sync connection
during the period when the primary is awaiting an ACK, the primary
hard kills the active communication thread and does not ensure the
transaction was received by a replica. This can lead to an
inconsistent replication state.

Solution:
========
During shutdown, the primary should wait for an ACK or timeout
before hard killing a thread which is awaiting a communication. We
extend the `SHUTDOWN WAIT FOR SLAVES` logic to identify and ignore
any threads waiting for a semi-sync ACK in phase 1. Then, before
stopping the ack receiver thread, the shutdown is delayed until all
waiting semi-sync connections receive an ACK or time out. The
connections are then killed in phase 2.

Notes:
 1) There remains an unresolved corner case that affects this
patch. MDEV-28141: Slave crashes with Packets out of order when
connecting to a shutting down master. Specifically, If a slave is
connecting to a master which is actively shutting down, the slave
can crash with a "Packets out of order" assertion error. To get
around this issue in the MTR tests, the primary will wait a small
amount of time before phase 1 killing threads to let the replicas
safely stop (if applicable).
 2) This patch also fixes MDEV-28114: Semi-sync Master ACK Receiver
Thread Can Error on COM_QUIT

Reviewed By
============
Andrei Elkin <andrei.elkin@mariadb.com>
parent 807945f2
This diff is collapsed.
!include ../my.cnf
[mysqld.1]
log_warnings=9
[mysqld.2]
log_warnings=9
[mysqld.3]
log_warnings=9
[ENV]
SERVER_MYPORT_3= @mysqld.3.port
SERVER_MYSOCK_3= @mysqld.3.socket
#
# Helper file to ensure that a primary waits for all ACKS (or timeout) from its
# replicas before shutting down.
#
# Parameters:
# server_1_dbug (string) Debug setting for primary (server 1)
# server_2_dbug (string) Debug setting to simulate delay or error on
# the first replica (server 2)
# server_3_dbug (string) Debug setting to simulate delay or error on
# the second replica (server 3)
# semisync_timeout (int) Rpl_semi_sync_master_timeout to use
# server_2_expect_row_count (int) The number of rows expected on the first
# replica after the shutdown
# server_3_expect_row_count (int) The number of rows expected on the second
# replica after the shutdown
#
--connection server_1
let $log_error_file= `SELECT @@GLOBAL.log_error`;
--echo #--
--echo #-- Semi-sync Setup
--connection server_1
--save_master_pos
echo #-- Enable semi-sync on slaves
let slave_last= 3;
--let i= 2
while (`SELECT $i <= $slave_last`)
{
--connection server_$i
--sync_with_master
set global rpl_semi_sync_slave_enabled = 1;
source include/stop_slave.inc;
source include/start_slave.inc;
show status like 'Rpl_semi_sync_slave_status';
--inc $i
}
--echo #-- Enable semi-sync on master
--connection server_1
SET @@GLOBAL.rpl_semi_sync_master_enabled = 1;
--eval set @@global.rpl_semi_sync_master_timeout= $semisync_timeout
--echo #-- Wait for master to recognize semi-sync slaves
--connection server_1
let $status_var= Rpl_semi_sync_master_clients;
let $status_var_value= 2;
source include/wait_for_status_var.inc;
--echo #-- Master should have semi-sync enabled with 2 connections
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_clients';
--echo #-- Prepare servers to simulate delay or error
--connection server_1
--eval SET @@GLOBAL.debug_dbug= $server_1_dbug
--connection server_2
--eval SET @@GLOBAL.debug_dbug= $server_2_dbug
--connection server_3
--eval SET @@GLOBAL.debug_dbug= $server_3_dbug
--echo #--
--echo #-- Test begins
--connection server_1
--echo #-- Begin semi-sync transaction
--send INSERT INTO t1 VALUES (1)
--connection server_1_con2
--echo #-- Wait until master recognizes a connection is awaiting semi-sync ACK
let $status_var= Rpl_semi_sync_master_wait_sessions;
let $status_var_value= 1;
source include/wait_for_status_var.inc;
show status like 'Rpl_semi_sync_master_wait_sessions';
--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
wait
EOF
--echo #-- Give enough time after timeout/ack received to query yes_tx/no_tx
SET @@GLOBAL.debug_dbug= "+d,delay_shutdown_phase_2_after_semisync_wait";
--echo #-- Begin master shutdown
--send SHUTDOWN WAIT FOR ALL SLAVES
--connection server_1
--reap
--echo #-- Ensure either ACK was received (yes_tx=1) or timeout (no_tx=1)
show status like 'Rpl_semi_sync_master_yes_tx';
show status like 'Rpl_semi_sync_master_no_tx';
--connection server_1_con2
--reap
--source include/wait_until_disconnected.inc
--echo # Check logs to ensure shutdown was delayed
--let SEARCH_FILE=$log_error_file
--let SEARCH_PATTERN=Delaying shutdown to await semi-sync ACK
--source include/search_pattern_in_file.inc
--echo # Validate slave data is in correct state
--connection server_2
--eval select count(*)=$server_2_expect_row_count from t1
--connection server_3
--eval select count(*)=$server_3_expect_row_count from t1
--echo #
--echo #-- Re-synchronize slaves with master and disable semi-sync
--echo #-- Stop slaves
--connection server_2
--eval SET @@GLOBAL.debug_dbug= "$sav_server_2_dbug"
--eval SET @@GLOBAL.rpl_semi_sync_slave_enabled= 0
source include/stop_slave.inc;
--connection server_3
--eval SET @@GLOBAL.debug_dbug= "$sav_server_3_dbug"
--eval SET @@GLOBAL.rpl_semi_sync_slave_enabled= 0
source include/stop_slave.inc;
--echo #-- Bring the master back up
--connection server_1_con2
--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
restart
EOF
--enable_reconnect
--source include/wait_until_connected_again.inc
--connection default
--enable_reconnect
--source include/wait_until_connected_again.inc
--connection server_1
--enable_reconnect
--source include/wait_until_connected_again.inc
--eval SET @@GLOBAL.debug_dbug= "$sav_master_dbug"
let $status_var= Rpl_semi_sync_master_clients;
let $status_var_value= 0;
source include/wait_for_status_var.inc;
--eval SET @@GLOBAL.rpl_semi_sync_master_enabled = 0
show status like 'Rpl_semi_sync_master_status';
TRUNCATE TABLE t1;
--save_master_pos
--echo #-- Bring slaves back up
--let i= 2
while (`SELECT $i <= $slave_last`)
{
--connection server_$i
source include/start_slave.inc;
show status like 'Rpl_semi_sync_slave_status';
--sync_with_master
SELECT COUNT(*)=0 from t1;
--inc $i
}
#
# Purpose:
# This test validates that data is consistent between a primary and replica
# in semi-sync mode when the primary is issued `SHUTDOWN WAIT FOR SLAVES`
# during an active communication. More specifically, the primary should not
# kill the connection until it is sure a replica has received all binlog
# data, i.e. once the primary receives the ACK. If a primary is issued a
# shutdown before receiving an ACK, it should wait until either 1) the ACK is
# received, or 2) the configured timeout (rpl_semi_sync_master_timeout) is
# reached.
#
# Methodology:
# Using a topology consisting of one primary with two replicas, all in
# semi-sync mode, we use DEBUG_DBUG to simulate an error or delay on the
# replicas during an active communication while the primary is issued
# `SHUTDOWN WAIT FOR SLAVES`. We create four test cases to ensure the primary
# will correctly wait for the communication to finish, and use the semi-sync
# status variables Rpl_semi_sync_master_yes_tx and Rpl_semi_sync_master_no_tx
# to ensure the connection was not prematurely killed due to the shutdown.
# Test Case 1) If both replicas simulate a delay that is within the allowed
# timeout, the primary should delay killing the suspended thread
# until an ACK is received (Rpl_semi_sync_master_yes_tx should
# be 1).
# Test Case 2) If both replicas simulate an error before sending an ACK, the
# primary should delay killing the suspended thread until the
# the timeout is reached (Rpl_semi_sync_master_no_tx should be
# 1).
# Test Case 3) If one replica simulates a delay within the allowed timeout
# and the other simulates an error before sending an ACK, the
# primary should delay killing the suspended thread until it
# receives an ACK from the delayed slave
# (Rpl_semi_sync_master_yes_tx should be 1).
# Test Case 4) If a replica errors before sending an ACK, it will cause the
# IO thread to stop and handle the error. During error handling,
# if semi-sync is active, the replica will form a new connection
# with the primary to kill the active connection. However, if
# the primary is shutting down, it may kill the new connection,
# thereby leaving the active semi-sync connection in-tact. The
# slave should notice this, and not issue a `QUIT` command to
# the primary, which would otherwise be sent to kill an active
# connection. This test case validates that the slave does not
# send a `QUIT` in this case (Rpl_semi_sync_master_yes_tx should
# be 1 because server_3 will send the ACK within a valid
# timeout).
#
# References:
# MDEV-11853: semisync thread can be killed after sync binlog but before ACK
# in the sync state
# MDEV-28114: Semi-sync Master ACK Receiver Thread Can Error on COM_QUIT
#
--echo #############################
--echo # Common setup for all tests
--echo #############################
--echo # Note: Simulated slave delay is hardcoded to 800 milliseconds
--echo # Note: Simulated master shutdown delay is hardcoded to 500 milliseconds
--source include/have_debug.inc
--let $rpl_topology=1->2, 1->3
--source include/rpl_init.inc
--connection server_1
--echo # Slaves which simulate an error will produce a timeout on the primary
call mtr.add_suppression("Timeout waiting");
call mtr.add_suppression("did not exit");
--let $sav_master_timeout= `SELECT @@global.rpl_semi_sync_master_timeout`
--let $sav_enabled_master= `SELECT @@GLOBAL.rpl_semi_sync_master_enabled`
--let $sav_master_dbug= `SELECT @@GLOBAL.debug_dbug`
--echo # Suppress slave errors related to the simulated error
--connection server_2
call mtr.add_suppression("reply failed");
call mtr.add_suppression("Replication event checksum verification");
call mtr.add_suppression("Relay log write failure");
call mtr.add_suppression("Failed to kill the active semi-sync connection");
--let $sav_enabled_server_2=`SELECT @@GLOBAL.rpl_semi_sync_slave_enabled`
--let $sav_server_2_dbug= `SELECT @@GLOBAL.debug_dbug`
--connection server_3
call mtr.add_suppression("reply failed");
call mtr.add_suppression("Replication event checksum verification");
call mtr.add_suppression("Relay log write failure");
call mtr.add_suppression("Failed to kill the active semi-sync connection");
--let $sav_enabled_server_3=`SELECT @@GLOBAL.rpl_semi_sync_slave_enabled`
--let $sav_server_3_dbug= `SELECT @@GLOBAL.debug_dbug`
--connection server_1
CREATE TABLE t1 (a int);
--save_master_pos
--let i= 2
--let slave_last= 3
while (`SELECT $i <= $slave_last`)
{
--connection server_$i
--sync_with_master
--inc $i
}
# Set up the connection used to issue the shutdown
--connect(server_1_con2, localhost, root,,)
--echo #############################
--echo # Test cases
--echo #############################
--echo #
--echo # Test Case 1) If both replicas simulate a delay that is within the
--echo # allowed timeout, the primary should delay killing the suspended thread
--echo # until an ACK is received (Rpl_semi_sync_master_yes_tx should be 1).
--echo #
--let server_1_dbug= ""
--let server_2_dbug= "+d,simulate_delay_semisync_slave_reply"
--let server_3_dbug= "+d,simulate_delay_semisync_slave_reply"
--let semisync_timeout= 1600
--let server_2_expect_row_count= 1
--let server_3_expect_row_count= 1
--source rpl_semi_sync_shutdown_await_ack.inc
--echo #
--echo # Test Case 2) If both replicas simulate an error before sending an ACK,
--echo # the primary should delay killing the suspended thread until the
--echo # timeout is reached (Rpl_semi_sync_master_no_tx should be 1).
--echo #
--let server_1_dbug= "+d,mysqld_delay_kill_threads_phase_1"
--let server_2_dbug= "+d,corrupt_queue_event"
--let server_3_dbug= "+d,corrupt_queue_event"
--let semisync_timeout= 500
--let server_2_expect_row_count= 0
--let server_3_expect_row_count= 0
--source rpl_semi_sync_shutdown_await_ack.inc
--echo #
--echo # Test Case 3) If one replica simulates a delay within the allowed
--echo # timeout and the other simulates an error before sending an ACK, the
--echo # primary should delay killing the suspended thread until it receives an
--echo # ACK from the delayed slave (Rpl_semi_sync_master_yes_tx should be 1).
--echo #
--let server_1_dbug= "+d,mysqld_delay_kill_threads_phase_1"
--let server_2_dbug= "+d,corrupt_queue_event"
--let server_3_dbug= "+d,simulate_delay_semisync_slave_reply"
--let semisync_timeout= 1600
--let server_2_expect_row_count= 0
--let server_3_expect_row_count= 1
--source rpl_semi_sync_shutdown_await_ack.inc
--echo #
--echo # Test Case 4) If a replica errors before sending an ACK, it will cause
--echo # the IO thread to stop and handle the error. During error handling, if
--echo # semi-sync is active, the replica will form a new connection with the
--echo # primary to kill the active connection. However, if the primary is
--echo # shutting down, it may kill the new connection, thereby leaving the
--echo # active semi-sync connection in-tact. The slave should notice this, and
--echo # not issue a `QUIT` command to the primary, which would otherwise be
--echo # sent to kill an active connection. This test case validates that the
--echo # slave does not send a `QUIT` in this case (Rpl_semi_sync_master_yes_tx
--echo # should be 1 because server_3 will send the ACK within a valid timeout).
--echo #
# mysqld_delay_kill_threads_phase1 ensures that server_2 will have enough time
# to start a new connection that has the intent to kill the active semi-sync
# connection
--let server_1_dbug= "+d,mysqld_delay_kill_threads_phase_1"
# slave_delay_killing_semisync_connection ensures that the primary has force
# killed its current connection before it is able to issue `KILL`
--let server_2_dbug= "+d,corrupt_queue_event,slave_delay_killing_semisync_connection"
--let server_3_dbug= "+d,simulate_delay_semisync_slave_reply"
--let semisync_timeout= 1600
--let server_2_expect_row_count= 0
--let server_3_expect_row_count= 1
--source rpl_semi_sync_shutdown_await_ack.inc
--echo #############################
--echo # Cleanup
--echo #############################
--connection server_2
source include/stop_slave.inc;
source include/start_slave.inc;
--disable_query_log
--eval SET @@GLOBAL.rpl_semi_sync_slave_enabled = $sav_enabled_server_2
--eval SET @@GLOBAL.debug_dbug= "$sav_server_2_dbug"
--enable_query_log
--connection server_3
source include/stop_slave.inc;
source include/start_slave.inc;
--disable_query_log
--eval SET @@GLOBAL.rpl_semi_sync_slave_enabled = $sav_enabled_server_3
--eval SET @@GLOBAL.debug_dbug= "$sav_server_3_dbug"
--enable_query_log
--connection server_1
let $status_var= Rpl_semi_sync_master_clients;
let $status_var_value= 0;
source include/wait_for_status_var.inc;
--disable_query_log
--eval SET @@GLOBAL.rpl_semi_sync_master_timeout= $sav_master_timeout
--eval SET @@GLOBAL.rpl_semi_sync_master_enabled= $sav_enabled_master
--eval SET @@GLOBAL.debug_dbug= "$sav_master_dbug"
--enable_query_log
drop table t1;
--source include/rpl_end.inc
...@@ -2453,6 +2453,8 @@ int run_plugin_auth(MYSQL *mysql, char *data, uint data_len, ...@@ -2453,6 +2453,8 @@ int run_plugin_auth(MYSQL *mysql, char *data, uint data_len,
mpvio.db= db; mpvio.db= db;
mpvio.plugin= auth_plugin; mpvio.plugin= auth_plugin;
DBUG_EXECUTE_IF("client_delay_run_plugin_auth", my_sleep(400000););
res= auth_plugin->authenticate_user((struct st_plugin_vio *)&mpvio, mysql); res= auth_plugin->authenticate_user((struct st_plugin_vio *)&mpvio, mysql);
DBUG_PRINT ("info", ("authenticate_user returned %s", DBUG_PRINT ("info", ("authenticate_user returned %s",
res == CR_OK ? "CR_OK" : res == CR_OK ? "CR_OK" :
......
...@@ -1524,11 +1524,15 @@ static void kill_thread(THD *thd) ...@@ -1524,11 +1524,15 @@ static void kill_thread(THD *thd)
/** /**
First shutdown everything but slave threads and binlog dump connections First shutdown everything but slave threads and binlog dump connections
*/ */
static my_bool kill_thread_phase_1(THD *thd, void *) static my_bool kill_thread_phase_1(THD *thd, int *n_threads_awaiting_ack)
{ {
DBUG_PRINT("quit", ("Informing thread %ld that it's time to die", DBUG_PRINT("quit", ("Informing thread %ld that it's time to die",
(ulong) thd->thread_id)); (ulong) thd->thread_id));
if (thd->slave_thread || thd->is_binlog_dump_thread())
if (thd->slave_thread || thd->is_binlog_dump_thread() ||
(shutdown_wait_for_slaves &&
repl_semisync_master.is_thd_awaiting_semisync_ack(thd) &&
++(*n_threads_awaiting_ack)))
return 0; return 0;
if (DBUG_EVALUATE_IF("only_kill_system_threads", !thd->system_thread, 0)) if (DBUG_EVALUATE_IF("only_kill_system_threads", !thd->system_thread, 0))
...@@ -1546,7 +1550,7 @@ static my_bool kill_thread_phase_1(THD *thd, void *) ...@@ -1546,7 +1550,7 @@ static my_bool kill_thread_phase_1(THD *thd, void *)
*/ */
static my_bool kill_thread_phase_2(THD *thd, void *) static my_bool kill_thread_phase_2(THD *thd, void *)
{ {
if (shutdown_wait_for_slaves) if (shutdown_wait_for_slaves && thd->is_binlog_dump_thread())
{ {
thd->set_killed(KILL_SERVER); thd->set_killed(KILL_SERVER);
} }
...@@ -1729,7 +1733,29 @@ static void close_connections(void) ...@@ -1729,7 +1733,29 @@ static void close_connections(void)
This will give the threads some time to gracefully abort their This will give the threads some time to gracefully abort their
statements and inform their clients that the server is about to die. statements and inform their clients that the server is about to die.
*/ */
server_threads.iterate(kill_thread_phase_1); DBUG_EXECUTE_IF("mysqld_delay_kill_threads_phase_1", my_sleep(200000););
int n_threads_awaiting_ack= 0;
server_threads.iterate(kill_thread_phase_1, &n_threads_awaiting_ack);
/*
If we are waiting on any ACKs, delay killing the thread until either an ACK
is received or the timeout is hit.
Allow at max the number of sessions to await a timeout; however, if all
ACKs have been received in less iterations, then quit early
*/
if (shutdown_wait_for_slaves && repl_semisync_master.get_master_enabled())
{
int waiting_threads= repl_semisync_master.sync_get_master_wait_sessions();
if (waiting_threads)
sql_print_information("Delaying shutdown to await semi-sync ACK");
while (waiting_threads-- > 0)
repl_semisync_master.await_slave_reply();
}
DBUG_EXECUTE_IF("delay_shutdown_phase_2_after_semisync_wait",
my_sleep(500000););
Events::deinit(); Events::deinit();
slave_prepare_for_shutdown(); slave_prepare_for_shutdown();
...@@ -1752,7 +1778,10 @@ static void close_connections(void) ...@@ -1752,7 +1778,10 @@ static void close_connections(void)
*/ */
DBUG_PRINT("info", ("THD_count: %u", THD_count::value())); DBUG_PRINT("info", ("THD_count: %u", THD_count::value()));
for (int i= 0; (THD_count::value() - binlog_dump_thread_count) && i < 1000; i++) for (int i= 0; (THD_count::value() - binlog_dump_thread_count -
n_threads_awaiting_ack) &&
i < 1000;
i++)
my_sleep(20000); my_sleep(20000);
if (global_system_variables.log_warnings) if (global_system_variables.log_warnings)
...@@ -1766,9 +1795,12 @@ static void close_connections(void) ...@@ -1766,9 +1795,12 @@ static void close_connections(void)
wsrep_sst_auth_free(); wsrep_sst_auth_free();
#endif #endif
/* All threads has now been aborted */ /* All threads has now been aborted */
DBUG_PRINT("quit", ("Waiting for threads to die (count=%u)", THD_count::value())); DBUG_PRINT("quit", ("Waiting for threads to die (count=%u)",
THD_count::value() - binlog_dump_thread_count -
n_threads_awaiting_ack));
while (THD_count::value() - binlog_dump_thread_count) while (THD_count::value() - binlog_dump_thread_count -
n_threads_awaiting_ack)
my_sleep(1000); my_sleep(1000);
/* Kill phase 2 */ /* Kill phase 2 */
......
...@@ -463,6 +463,37 @@ void Repl_semi_sync_master::cleanup() ...@@ -463,6 +463,37 @@ void Repl_semi_sync_master::cleanup()
delete m_active_tranxs; delete m_active_tranxs;
} }
int Repl_semi_sync_master::sync_get_master_wait_sessions()
{
int wait_sessions;
lock();
wait_sessions= rpl_semi_sync_master_wait_sessions;
unlock();
return wait_sessions;
}
void Repl_semi_sync_master::create_timeout(struct timespec *out,
struct timespec *start_arg)
{
struct timespec *start_ts;
struct timespec now_ts;
if (!start_arg)
{
set_timespec(now_ts, 0);
start_ts= &now_ts;
}
else
{
start_ts= start_arg;
}
long diff_secs= (long) (m_wait_timeout / TIME_THOUSAND);
long diff_nsecs= (long) ((m_wait_timeout % TIME_THOUSAND) * TIME_MILLION);
long nsecs= start_ts->tv_nsec + diff_nsecs;
out->tv_sec= start_ts->tv_sec + diff_secs + nsecs / TIME_BILLION;
out->tv_nsec= nsecs % TIME_BILLION;
}
void Repl_semi_sync_master::lock() void Repl_semi_sync_master::lock()
{ {
mysql_mutex_lock(&LOCK_binlog); mysql_mutex_lock(&LOCK_binlog);
...@@ -862,13 +893,6 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, ...@@ -862,13 +893,6 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
m_wait_file_name, (ulong)m_wait_file_pos)); m_wait_file_name, (ulong)m_wait_file_pos));
} }
/* Calcuate the waiting period. */
long diff_secs = (long) (m_wait_timeout / TIME_THOUSAND);
long diff_nsecs = (long) ((m_wait_timeout % TIME_THOUSAND) * TIME_MILLION);
long nsecs = start_ts.tv_nsec + diff_nsecs;
abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION;
abstime.tv_nsec = nsecs % TIME_BILLION;
/* In semi-synchronous replication, we wait until the binlog-dump /* In semi-synchronous replication, we wait until the binlog-dump
* thread has received the reply on the relevant binlog segment from the * thread has received the reply on the relevant binlog segment from the
* replication slave. * replication slave.
...@@ -879,12 +903,20 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, ...@@ -879,12 +903,20 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
*/ */
rpl_semi_sync_master_wait_sessions++; rpl_semi_sync_master_wait_sessions++;
/* We keep track of when this thread is awaiting an ack to ensure it is
* not killed while awaiting an ACK if a shutdown is issued.
*/
set_thd_awaiting_semisync_ack(thd, TRUE);
DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)", DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)",
"Repl_semi_sync_master::commit_trx", "Repl_semi_sync_master::commit_trx",
m_wait_timeout, m_wait_timeout,
m_wait_file_name, (ulong)m_wait_file_pos)); m_wait_file_name, (ulong)m_wait_file_pos));
create_timeout(&abstime, &start_ts);
wait_result = cond_timewait(&abstime); wait_result = cond_timewait(&abstime);
set_thd_awaiting_semisync_ack(thd, FALSE);
rpl_semi_sync_master_wait_sessions--; rpl_semi_sync_master_wait_sessions--;
if (wait_result != 0) if (wait_result != 0)
...@@ -1320,6 +1352,25 @@ void Repl_semi_sync_master::set_export_stats() ...@@ -1320,6 +1352,25 @@ void Repl_semi_sync_master::set_export_stats()
unlock(); unlock();
} }
void Repl_semi_sync_master::await_slave_reply()
{
struct timespec abstime;
DBUG_ENTER("Repl_semi_sync_master::::await_slave_reply");
lock();
/* Just return if there is nothing to wait for */
if (!rpl_semi_sync_master_wait_sessions)
goto end;
create_timeout(&abstime, NULL);
cond_timewait(&abstime);
end:
unlock();
DBUG_VOID_RETURN;
}
/* Get the waiting time given the wait's staring time. /* Get the waiting time given the wait's staring time.
* *
* Return: * Return:
......
...@@ -472,6 +472,21 @@ class Repl_semi_sync_master ...@@ -472,6 +472,21 @@ class Repl_semi_sync_master
m_wait_timeout = wait_timeout; m_wait_timeout = wait_timeout;
} }
int sync_get_master_wait_sessions();
/*
Calculates a timeout that is m_wait_timeout after start_arg and saves it
in out. If start_arg is NULL, the timeout is m_wait_timeout after the
current system time.
*/
void create_timeout(struct timespec *out, struct timespec *start_arg);
/*
Blocks the calling thread until the ack_receiver either receives an ACK
or times out (from rpl_semi_sync_master_timeout)
*/
void await_slave_reply();
/*set the ACK point, after binlog sync or after transaction commit*/ /*set the ACK point, after binlog sync or after transaction commit*/
void set_wait_point(unsigned long ack_point) void set_wait_point(unsigned long ack_point)
{ {
...@@ -620,6 +635,30 @@ class Repl_semi_sync_master ...@@ -620,6 +635,30 @@ class Repl_semi_sync_master
void check_and_switch(); void check_and_switch();
/*
Determines if the given thread is currently awaiting a semisync_ack. Note
that the thread's value is protected by this class's LOCK_binlog, so this
function (indirectly) provides safe access.
*/
my_bool is_thd_awaiting_semisync_ack(THD *thd)
{
lock();
my_bool ret= thd->is_awaiting_semisync_ack;
unlock();
return ret;
}
/*
Update the thread's value for is_awaiting_semisync_ack. LOCK_binlog (from
this class) should be acquired before calling this function.
*/
void set_thd_awaiting_semisync_ack(THD *thd,
my_bool _is_awaiting_semisync_ack)
{
mysql_mutex_assert_owner(&LOCK_binlog);
thd->is_awaiting_semisync_ack= _is_awaiting_semisync_ack;
}
mysql_mutex_t LOCK_rpl_semi_sync_master_enabled; mysql_mutex_t LOCK_rpl_semi_sync_master_enabled;
}; };
......
...@@ -114,10 +114,12 @@ int Repl_semi_sync_slave::slave_start(Master_info *mi) ...@@ -114,10 +114,12 @@ int Repl_semi_sync_slave::slave_start(Master_info *mi)
int Repl_semi_sync_slave::slave_stop(Master_info *mi) int Repl_semi_sync_slave::slave_stop(Master_info *mi)
{ {
if (rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 0;
if (get_slave_enabled()) if (get_slave_enabled())
kill_connection(mi->mysql); kill_connection(mi->mysql);
if (rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 0;
return 0; return 0;
} }
...@@ -133,6 +135,8 @@ void Repl_semi_sync_slave::kill_connection(MYSQL *mysql) ...@@ -133,6 +135,8 @@ void Repl_semi_sync_slave::kill_connection(MYSQL *mysql)
char kill_buffer[30]; char kill_buffer[30];
MYSQL *kill_mysql = NULL; MYSQL *kill_mysql = NULL;
size_t kill_buffer_length;
kill_mysql = mysql_init(kill_mysql); kill_mysql = mysql_init(kill_mysql);
mysql_options(kill_mysql, MYSQL_OPT_CONNECT_TIMEOUT, &m_kill_conn_timeout); mysql_options(kill_mysql, MYSQL_OPT_CONNECT_TIMEOUT, &m_kill_conn_timeout);
mysql_options(kill_mysql, MYSQL_OPT_READ_TIMEOUT, &m_kill_conn_timeout); mysql_options(kill_mysql, MYSQL_OPT_READ_TIMEOUT, &m_kill_conn_timeout);
...@@ -144,13 +148,35 @@ void Repl_semi_sync_slave::kill_connection(MYSQL *mysql) ...@@ -144,13 +148,35 @@ void Repl_semi_sync_slave::kill_connection(MYSQL *mysql)
{ {
sql_print_information("cannot connect to master to kill slave io_thread's " sql_print_information("cannot connect to master to kill slave io_thread's "
"connection"); "connection");
mysql_close(kill_mysql); goto failed_graceful_kill;
return;
} }
size_t kill_buffer_length = my_snprintf(kill_buffer, 30, "KILL %lu",
mysql->thread_id); DBUG_EXECUTE_IF("slave_delay_killing_semisync_connection", my_sleep(400000););
mysql_real_query(kill_mysql, kill_buffer, (ulong)kill_buffer_length);
kill_buffer_length= my_snprintf(kill_buffer, 30, "KILL %lu",
mysql->thread_id);
if (mysql_real_query(kill_mysql, kill_buffer, (ulong)kill_buffer_length))
{
sql_print_information(
"Failed to gracefully kill our active semi-sync connection with "
"primary. Silently closing the connection.");
goto failed_graceful_kill;
}
end:
mysql_close(kill_mysql); mysql_close(kill_mysql);
return;
failed_graceful_kill:
/*
If we fail to issue `KILL` on the primary to kill the active semi-sync
connection; we need to locally clean up our side of the connection. This
is because mysql_close will send COM_QUIT on the active semi-sync
connection, causing the primary to error.
*/
net_clear(&(mysql->net), 0);
end_server(mysql);
goto end;
} }
int Repl_semi_sync_slave::request_transmit(Master_info *mi) int Repl_semi_sync_slave::request_transmit(Master_info *mi)
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "sql_priv.h" #include "sql_priv.h"
#include "rpl_mi.h" #include "rpl_mi.h"
#include "mysql.h" #include "mysql.h"
#include <sql_common.h>
class Master_info; class Master_info;
......
...@@ -4859,6 +4859,7 @@ Stopping slave I/O thread due to out-of-memory error from master"); ...@@ -4859,6 +4859,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
not cause the slave IO thread to stop, and the error messages are not cause the slave IO thread to stop, and the error messages are
already reported. already reported.
*/ */
DBUG_EXECUTE_IF("simulate_delay_semisync_slave_reply", my_sleep(800000););
(void)repl_semisync_slave.slave_reply(mi); (void)repl_semisync_slave.slave_reply(mi);
} }
......
...@@ -640,7 +640,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) ...@@ -640,7 +640,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
, ,
current_linfo(0), current_linfo(0),
slave_info(0) slave_info(0),
is_awaiting_semisync_ack(0)
#endif #endif
#ifdef WITH_WSREP #ifdef WITH_WSREP
, ,
......
...@@ -4838,6 +4838,14 @@ class THD: public THD_count, /* this must be first */ ...@@ -4838,6 +4838,14 @@ class THD: public THD_count, /* this must be first */
bool is_binlog_dump_thread(); bool is_binlog_dump_thread();
#endif #endif
/*
Indicates if this thread is suspended due to awaiting an ACK from a
replica. True if suspended, false otherwise.
Note that this variable is protected by Repl_semi_sync_master::LOCK_binlog
*/
bool is_awaiting_semisync_ack;
inline ulong wsrep_binlog_format() const inline ulong wsrep_binlog_format() const
{ {
return WSREP_BINLOG_FORMAT(variables.binlog_format); return WSREP_BINLOG_FORMAT(variables.binlog_format);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment