Commit b289e515 authored by He Zhenxing's avatar He Zhenxing

Auto merge 5.1-rep-semisync

parents 2151a821 dab1162b
...@@ -29,13 +29,13 @@ set global rpl_semi_sync_master_enabled = 1; ...@@ -29,13 +29,13 @@ set global rpl_semi_sync_master_enabled = 1;
show variables like 'rpl_semi_sync_master_enabled'; show variables like 'rpl_semi_sync_master_enabled';
Variable_name Value Variable_name Value
rpl_semi_sync_master_enabled ON rpl_semi_sync_master_enabled ON
[ status of semi-sync on master should be OFF without any semi-sync slaves ] [ status of semi-sync on master should be ON even without any semi-sync slaves ]
show status like 'Rpl_semi_sync_master_clients'; show status like 'Rpl_semi_sync_master_clients';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_clients 0 Rpl_semi_sync_master_clients 0
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_status OFF Rpl_semi_sync_master_status ON
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_yes_tx 0 Rpl_semi_sync_master_yes_tx 0
...@@ -81,7 +81,7 @@ Rpl_semi_sync_master_no_tx 0 ...@@ -81,7 +81,7 @@ Rpl_semi_sync_master_no_tx 0
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_yes_tx 0 Rpl_semi_sync_master_yes_tx 0
create table t1(n int) engine = ENGINE_TYPE; create table t1(a int) engine = ENGINE_TYPE;
[ master state after CREATE TABLE statement ] [ master state after CREATE TABLE statement ]
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
Variable_name Value Variable_name Value
...@@ -92,6 +92,9 @@ Rpl_semi_sync_master_no_tx 0 ...@@ -92,6 +92,9 @@ Rpl_semi_sync_master_no_tx 0
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_yes_tx 1 Rpl_semi_sync_master_yes_tx 1
select CONNECTIONS_NORMAL_SLAVE - CONNECTIONS_NORMAL_SLAVE as 'Should be 0';
Should be 0
0
[ insert records to table ] [ insert records to table ]
[ master status after inserts ] [ master status after inserts ]
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
...@@ -108,15 +111,19 @@ Rpl_semi_sync_master_yes_tx 301 ...@@ -108,15 +111,19 @@ Rpl_semi_sync_master_yes_tx 301
show status like 'Rpl_semi_sync_slave_status'; show status like 'Rpl_semi_sync_slave_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_slave_status ON Rpl_semi_sync_slave_status ON
select count(distinct n) from t1; select count(distinct a) from t1;
count(distinct n) count(distinct a)
300 300
select min(n) from t1; select min(a) from t1;
min(n) min(a)
1 1
select max(n) from t1; select max(a) from t1;
max(n) max(a)
300 300
#
# Test semi-sync master will switch OFF after one transacton
# timeout waiting for slave reply.
#
include/stop_slave.inc include/stop_slave.inc
[ on master ] [ on master ]
[ master status should be ON ] [ master status should be ON ]
...@@ -134,7 +141,16 @@ Variable_name Value ...@@ -134,7 +141,16 @@ Variable_name Value
Rpl_semi_sync_master_clients 1 Rpl_semi_sync_master_clients 1
[ semi-sync replication of these transactions will fail ] [ semi-sync replication of these transactions will fail ]
insert into t1 values (500); insert into t1 values (500);
delete from t1 where n < 500; [ master status should be OFF ]
show status like 'Rpl_semi_sync_master_status';
Variable_name Value
Rpl_semi_sync_master_status OFF
show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 1
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 301
insert into t1 values (100); insert into t1 values (100);
[ master status should be OFF ] [ master status should be OFF ]
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
...@@ -142,10 +158,13 @@ Variable_name Value ...@@ -142,10 +158,13 @@ Variable_name Value
Rpl_semi_sync_master_status OFF Rpl_semi_sync_master_status OFF
show status like 'Rpl_semi_sync_master_no_tx'; show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_no_tx 3 Rpl_semi_sync_master_no_tx 302
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_yes_tx 301 Rpl_semi_sync_master_yes_tx 301
#
# Test semi-sync status on master will be ON again when slave catches up
#
[ on slave ] [ on slave ]
[ slave status should be OFF ] [ slave status should be OFF ]
show status like 'Rpl_semi_sync_slave_status'; show status like 'Rpl_semi_sync_slave_status';
...@@ -156,33 +175,53 @@ include/start_slave.inc ...@@ -156,33 +175,53 @@ include/start_slave.inc
show status like 'Rpl_semi_sync_slave_status'; show status like 'Rpl_semi_sync_slave_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_slave_status ON Rpl_semi_sync_slave_status ON
select count(distinct n) from t1; select count(distinct a) from t1;
count(distinct n) count(distinct a)
2 2
select min(n) from t1; select min(a) from t1;
min(n) min(a)
100 100
select max(n) from t1; select max(a) from t1;
max(n) max(a)
500 500
[ on master ] [ on master ]
[ do something to activate semi-sync ] [ master status should be ON again after slave catches up ]
drop table t1;
[ master status should be ON again ]
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_status ON Rpl_semi_sync_master_status ON
show status like 'Rpl_semi_sync_master_no_tx'; show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_no_tx 3 Rpl_semi_sync_master_no_tx 302
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_yes_tx 302 Rpl_semi_sync_master_yes_tx 301
show status like 'Rpl_semi_sync_master_clients'; show status like 'Rpl_semi_sync_master_clients';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_clients 1 Rpl_semi_sync_master_clients 1
#
# Test disable/enable master semi-sync on the fly.
#
drop table t1;
[ on slave ] [ on slave ]
include/stop_slave.inc include/stop_slave.inc
#
# Flush status
#
[ Semi-sync master status variables before FLUSH STATUS ]
SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 302
SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 302
FLUSH NO_WRITE_TO_BINLOG STATUS;
[ Semi-sync master status variables after FLUSH STATUS ]
SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 0
SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 0
[ on master ] [ on master ]
show master logs; show master logs;
Log_name master-bin.000001 Log_name master-bin.000001
...@@ -206,6 +245,9 @@ rpl_semi_sync_master_enabled ON ...@@ -206,6 +245,9 @@ rpl_semi_sync_master_enabled ON
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_status ON Rpl_semi_sync_master_status ON
#
# Test RESET MASTER/SLAVE
#
[ on slave ] [ on slave ]
include/start_slave.inc include/start_slave.inc
[ on master ] [ on master ]
...@@ -313,7 +355,7 @@ Variable_name Value ...@@ -313,7 +355,7 @@ Variable_name Value
Rpl_semi_sync_master_clients 0 Rpl_semi_sync_master_clients 0
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_status OFF Rpl_semi_sync_master_status ON
set global rpl_semi_sync_master_enabled= 0; set global rpl_semi_sync_master_enabled= 0;
[ on slave ] [ on slave ]
SHOW VARIABLES LIKE 'rpl_semi_sync_slave_enabled'; SHOW VARIABLES LIKE 'rpl_semi_sync_slave_enabled';
...@@ -322,17 +364,17 @@ rpl_semi_sync_slave_enabled ON ...@@ -322,17 +364,17 @@ rpl_semi_sync_slave_enabled ON
include/start_slave.inc include/start_slave.inc
[ on master ] [ on master ]
insert into t1 values (8); insert into t1 values (8);
[ master semi-sync clients should be 0, status should be OFF ] [ master semi-sync clients should be 1, status should be OFF ]
show status like 'Rpl_semi_sync_master_clients'; show status like 'Rpl_semi_sync_master_clients';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_clients 0 Rpl_semi_sync_master_clients 1
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_status OFF Rpl_semi_sync_master_status OFF
[ on slave ] [ on slave ]
show status like 'Rpl_semi_sync_slave_status'; show status like 'Rpl_semi_sync_slave_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_slave_status OFF Rpl_semi_sync_slave_status ON
include/stop_slave.inc include/stop_slave.inc
[ on master ] [ on master ]
set sql_log_bin=0; set sql_log_bin=0;
......
...@@ -11,12 +11,17 @@ let $engine_type= InnoDB; ...@@ -11,12 +11,17 @@ let $engine_type= InnoDB;
disable_query_log; disable_query_log;
connection master; connection master;
call mtr.add_suppression("Timeout waiting for reply of binlog"); call mtr.add_suppression("Timeout waiting for reply of binlog");
call mtr.add_suppression("Read semi-sync reply");
connection slave; connection slave;
call mtr.add_suppression("Master server does not support"); call mtr.add_suppression("Master server does not support semi-sync");
# These will be removed after fix bug#45852 call mtr.add_suppression("Semi-sync slave .* reply");
call mtr.add_suppression("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed");
call mtr.add_suppression("Slave I/O: Fatal error: Failed to run 'after_queue_event' hook, Error_code: 1593");
enable_query_log; enable_query_log;
connection master;
# After fix of BUG#45848, semi-sync slave should not create any extra
# connections on master, save the count of connections before start
# semi-sync slave for comparison below.
let $_connections_normal_slave= query_get_value(SHOW STATUS LIKE 'Threads_connected', Value, 1);
--echo # --echo #
--echo # Uninstall semi-sync plugins on master and slave --echo # Uninstall semi-sync plugins on master and slave
...@@ -69,7 +74,7 @@ echo [ enable semi-sync on master ]; ...@@ -69,7 +74,7 @@ echo [ enable semi-sync on master ];
set global rpl_semi_sync_master_enabled = 1; set global rpl_semi_sync_master_enabled = 1;
show variables like 'rpl_semi_sync_master_enabled'; show variables like 'rpl_semi_sync_master_enabled';
echo [ status of semi-sync on master should be OFF without any semi-sync slaves ]; echo [ status of semi-sync on master should be ON even without any semi-sync slaves ];
show status like 'Rpl_semi_sync_master_clients'; show status like 'Rpl_semi_sync_master_clients';
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
...@@ -150,13 +155,20 @@ show status like 'Rpl_semi_sync_master_no_tx'; ...@@ -150,13 +155,20 @@ show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
replace_result $engine_type ENGINE_TYPE; replace_result $engine_type ENGINE_TYPE;
eval create table t1(n int) engine = $engine_type; eval create table t1(a int) engine = $engine_type;
echo [ master state after CREATE TABLE statement ]; echo [ master state after CREATE TABLE statement ];
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx'; show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
# After fix of BUG#45848, semi-sync slave should not create any extra
# connections on master.
let $_connections_semisync_slave= query_get_value(SHOW STATUS LIKE 'Threads_connected', Value, 1);
replace_result $_connections_semisync_slave CONNECTIONS_SEMISYNC_SLAVE;
replace_result $_connections_normal_slave CONNECTIONS_NORMAL_SLAVE;
eval select $_connections_semisync_slave - $_connections_normal_slave as 'Should be 0';
let $i=300; let $i=300;
echo [ insert records to table ]; echo [ insert records to table ];
disable_query_log; disable_query_log;
...@@ -178,10 +190,15 @@ echo [ on slave ]; ...@@ -178,10 +190,15 @@ echo [ on slave ];
echo [ slave status after replicated inserts ]; echo [ slave status after replicated inserts ];
show status like 'Rpl_semi_sync_slave_status'; show status like 'Rpl_semi_sync_slave_status';
select count(distinct n) from t1; select count(distinct a) from t1;
select min(n) from t1; select min(a) from t1;
select max(n) from t1; select max(a) from t1;
--echo #
--echo # Test semi-sync master will switch OFF after one transacton
--echo # timeout waiting for slave reply.
--echo #
connection slave;
source include/stop_slave.inc; source include/stop_slave.inc;
connection master; connection master;
...@@ -197,8 +214,11 @@ show status like 'Rpl_semi_sync_master_clients'; ...@@ -197,8 +214,11 @@ show status like 'Rpl_semi_sync_master_clients';
echo [ semi-sync replication of these transactions will fail ]; echo [ semi-sync replication of these transactions will fail ];
insert into t1 values (500); insert into t1 values (500);
delete from t1 where n < 500;
insert into t1 values (100); # Wait for the semi-sync replication of this transaction to timeout
let $status_var= Rpl_semi_sync_master_status;
let $status_var_value= OFF;
source include/wait_for_status_var.inc;
# The second semi-sync check should be off because one transaction # The second semi-sync check should be off because one transaction
# times out during waiting. # times out during waiting.
...@@ -207,6 +227,28 @@ show status like 'Rpl_semi_sync_master_status'; ...@@ -207,6 +227,28 @@ show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx'; show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
# Semi-sync status on master is now OFF, so all these transactions
# will be replicated asynchronously.
let $i=300;
disable_query_log;
while ($i)
{
eval delete from t1 where a=$i;
dec $i;
}
enable_query_log;
insert into t1 values (100);
echo [ master status should be OFF ];
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx';
--echo #
--echo # Test semi-sync status on master will be ON again when slave catches up
--echo #
# Save the master position for later use. # Save the master position for later use.
save_master_pos; save_master_pos;
...@@ -221,28 +263,44 @@ sync_with_master; ...@@ -221,28 +263,44 @@ sync_with_master;
echo [ slave status should be ON ]; echo [ slave status should be ON ];
show status like 'Rpl_semi_sync_slave_status'; show status like 'Rpl_semi_sync_slave_status';
select count(distinct n) from t1; select count(distinct a) from t1;
select min(n) from t1; select min(a) from t1;
select max(n) from t1; select max(a) from t1;
connection master; connection master;
echo [ on master ]; echo [ on master ];
echo [ do something to activate semi-sync ]; # The master semi-sync status should be on again after slave catches up.
drop table t1; echo [ master status should be ON again after slave catches up ];
# The third semi-sync check should be on again.
echo [ master status should be ON again ];
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx'; show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
show status like 'Rpl_semi_sync_master_clients'; show status like 'Rpl_semi_sync_master_clients';
--echo #
--echo # Test disable/enable master semi-sync on the fly.
--echo #
drop table t1;
sync_slave_with_master; sync_slave_with_master;
echo [ on slave ]; echo [ on slave ];
source include/stop_slave.inc; source include/stop_slave.inc;
--echo #
--echo # Flush status
--echo #
connection master;
echo [ Semi-sync master status variables before FLUSH STATUS ];
SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
# Do not write the FLUSH STATUS to binlog, to make sure we'll get a
# clean status after this.
FLUSH NO_WRITE_TO_BINLOG STATUS;
echo [ Semi-sync master status variables after FLUSH STATUS ];
SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
connection master; connection master;
echo [ on master ]; echo [ on master ];
...@@ -259,6 +317,10 @@ set global rpl_semi_sync_master_enabled=1; ...@@ -259,6 +317,10 @@ set global rpl_semi_sync_master_enabled=1;
show variables like 'rpl_semi_sync_master_enabled'; show variables like 'rpl_semi_sync_master_enabled';
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
--echo #
--echo # Test RESET MASTER/SLAVE
--echo #
connection slave; connection slave;
echo [ on slave ]; echo [ on slave ];
...@@ -435,7 +497,10 @@ source include/start_slave.inc; ...@@ -435,7 +497,10 @@ source include/start_slave.inc;
connection master; connection master;
echo [ on master ]; echo [ on master ];
insert into t1 values (8); insert into t1 values (8);
echo [ master semi-sync clients should be 0, status should be OFF ]; let $status_var= Rpl_semi_sync_master_clients;
let $status_var_value= 1;
source include/wait_for_status_var.inc;
echo [ master semi-sync clients should be 1, status should be OFF ];
show status like 'Rpl_semi_sync_master_clients'; show status like 'Rpl_semi_sync_master_clients';
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
sync_slave_with_master; sync_slave_with_master;
...@@ -512,6 +577,8 @@ source include/start_slave.inc; ...@@ -512,6 +577,8 @@ source include/start_slave.inc;
connection master; connection master;
drop table t1; drop table t1;
sync_slave_with_master;
connection master;
drop user rpl@127.0.0.1; drop user rpl@127.0.0.1;
flush privileges; flush privileges;
sync_slave_with_master;
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
pkgplugindir = $(pkglibdir)/plugin pkgplugindir = $(pkglibdir)/plugin
INCLUDES = -I$(top_srcdir)/include \ INCLUDES = -I$(top_srcdir)/include \
-I$(top_srcdir)/sql \ -I$(top_srcdir)/sql \
-I$(top_srcdir)/regex \
-I$(srcdir) -I$(srcdir)
noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h
......
...@@ -18,25 +18,9 @@ ...@@ -18,25 +18,9 @@
#ifndef SEMISYNC_H #ifndef SEMISYNC_H
#define SEMISYNC_H #define SEMISYNC_H
#include <stdint.h>
#include <string.h>
#include <assert.h>
#include <sys/time.h>
#include <time.h>
#include <stdio.h>
#include <pthread.h>
#include <mysql.h>
typedef uint32_t uint32;
typedef unsigned long long my_off_t;
#define FN_REFLEN 512 /* Max length of full path-name */
void sql_print_error(const char *format, ...);
void sql_print_warning(const char *format, ...);
void sql_print_information(const char *format, ...);
extern unsigned long max_connections;
#define MYSQL_SERVER #define MYSQL_SERVER
#define HAVE_REPLICATION #define HAVE_REPLICATION
#include <mysql_priv.h>
#include <my_global.h> #include <my_global.h>
#include <my_pthread.h> #include <my_pthread.h>
#include <mysql/plugin.h> #include <mysql/plugin.h>
...@@ -92,4 +76,16 @@ public: ...@@ -92,4 +76,16 @@ public:
static const unsigned char kPacketFlagSync; static const unsigned char kPacketFlagSync;
}; };
/* The layout of a semisync slave reply packet:
1 byte for the magic num
8 bytes for the binlog positon
n bytes for the binlog filename, terminated with a '\0'
*/
#define REPLY_MAGIC_NUM_LEN 1
#define REPLY_BINLOG_POS_LEN 8
#define REPLY_BINLOG_NAME_LEN (FN_REFLEN + 1)
#define REPLY_MAGIC_NUM_OFFSET 0
#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN)
#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN)
#endif /* SEMISYNC_H */ #endif /* SEMISYNC_H */
...@@ -30,16 +30,17 @@ unsigned long rpl_semi_sync_master_yes_transactions = 0; ...@@ -30,16 +30,17 @@ unsigned long rpl_semi_sync_master_yes_transactions = 0;
unsigned long rpl_semi_sync_master_no_transactions = 0; unsigned long rpl_semi_sync_master_no_transactions = 0;
unsigned long rpl_semi_sync_master_off_times = 0; unsigned long rpl_semi_sync_master_off_times = 0;
unsigned long rpl_semi_sync_master_timefunc_fails = 0; unsigned long rpl_semi_sync_master_timefunc_fails = 0;
unsigned long rpl_semi_sync_master_num_timeouts = 0; unsigned long rpl_semi_sync_master_wait_timeouts = 0;
unsigned long rpl_semi_sync_master_wait_sessions = 0; unsigned long rpl_semi_sync_master_wait_sessions = 0;
unsigned long rpl_semi_sync_master_back_wait_pos = 0; unsigned long rpl_semi_sync_master_wait_pos_backtraverse = 0;
unsigned long rpl_semi_sync_master_trx_wait_time = 0; unsigned long rpl_semi_sync_master_avg_trx_wait_time = 0;
unsigned long long rpl_semi_sync_master_trx_wait_num = 0; unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
unsigned long rpl_semi_sync_master_net_wait_time = 0; unsigned long rpl_semi_sync_master_avg_net_wait_time = 0;
unsigned long long rpl_semi_sync_master_net_wait_num = 0; unsigned long long rpl_semi_sync_master_net_wait_num = 0;
unsigned long rpl_semi_sync_master_clients = 0; unsigned long rpl_semi_sync_master_clients = 0;
unsigned long long rpl_semi_sync_master_net_wait_total_time = 0; unsigned long long rpl_semi_sync_master_net_wait_time = 0;
unsigned long long rpl_semi_sync_master_trx_wait_total_time = 0; unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
char rpl_semi_sync_master_wait_no_slave = 1;
static int getWaitTime(const struct timeval& start_tv); static int getWaitTime(const struct timeval& start_tv);
...@@ -379,16 +380,6 @@ ReplSemiSyncMaster::ReplSemiSyncMaster() ...@@ -379,16 +380,6 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
master_enabled_(false), master_enabled_(false),
wait_timeout_(0L), wait_timeout_(0L),
state_(0), state_(0),
enabled_transactions_(0),
disabled_transactions_(0),
switched_off_times_(0),
timefunc_fails_(0),
wait_sessions_(0),
wait_backtraverse_(0),
total_trx_wait_num_(0),
total_trx_wait_time_(0),
total_net_wait_num_(0),
total_net_wait_time_(0),
max_transactions_(0L) max_transactions_(0L)
{ {
strcpy(reply_file_name_, ""); strcpy(reply_file_name_, "");
...@@ -535,6 +526,14 @@ void ReplSemiSyncMaster::remove_slave() ...@@ -535,6 +526,14 @@ void ReplSemiSyncMaster::remove_slave()
{ {
lock(); lock();
rpl_semi_sync_master_clients--; rpl_semi_sync_master_clients--;
/* If user has chosen not to wait if no semi-sync slave available
and the last semi-sync slave exits, turn off semi-sync on master
immediately.
*/
if (!rpl_semi_sync_master_wait_no_slave &&
rpl_semi_sync_master_clients == 0)
switch_off();
unlock(); unlock();
} }
...@@ -546,19 +545,6 @@ bool ReplSemiSyncMaster::is_semi_sync_slave() ...@@ -546,19 +545,6 @@ bool ReplSemiSyncMaster::is_semi_sync_slave()
return val; return val;
} }
int ReplSemiSyncMaster::reportReplyBinlog(const char *log_file_pos)
{
char log_name[FN_REFLEN];
char *endptr;
my_off_t log_pos= strtoull(log_file_pos, &endptr, 10);
if (!log_pos || !endptr || *endptr != ':' )
return 1;
endptr++; // skip the ':' seperator
strncpy(log_name, endptr, FN_REFLEN);
uint32 server_id= 0;
return reportReplyBinlog(server_id, log_name, log_pos);
}
int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
const char *log_file_name, const char *log_file_name,
my_off_t log_file_pos) my_off_t log_file_pos)
...@@ -624,7 +610,7 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, ...@@ -624,7 +610,7 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
log_file_name, (unsigned long)log_file_pos); log_file_name, (unsigned long)log_file_pos);
} }
if (wait_sessions_ > 0) if (rpl_semi_sync_master_wait_sessions > 0)
{ {
/* Let us check if some of the waiting threads doing a trx /* Let us check if some of the waiting threads doing a trx
* commit can now proceed. * commit can now proceed.
...@@ -679,7 +665,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -679,7 +665,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
"Waiting for semi-sync ACK from slave"); "Waiting for semi-sync ACK from slave");
/* This is the real check inside the mutex. */ /* This is the real check inside the mutex. */
if (!getMasterEnabled() || !is_on() || !rpl_semi_sync_master_clients) if (!getMasterEnabled() || !is_on())
goto l_end; goto l_end;
if (trace_level_ & kTraceDetail) if (trace_level_ & kTraceDetail)
...@@ -691,17 +677,20 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -691,17 +677,20 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
while (is_on()) while (is_on())
{ {
int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, if (reply_file_name_inited_)
trx_wait_binlog_name, trx_wait_binlog_pos);
if (cmp >= 0)
{ {
/* We have already sent the relevant binlog to the slave: no need to int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
* wait here. trx_wait_binlog_name, trx_wait_binlog_pos);
*/ if (cmp >= 0)
if (trace_level_ & kTraceDetail) {
sql_print_information("%s: Binlog reply is ahead (%s, %lu),", /* We have already sent the relevant binlog to the slave: no need to
kWho, reply_file_name_, (unsigned long)reply_file_pos_); * wait here.
break; */
if (trace_level_ & kTraceDetail)
sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
kWho, reply_file_name_, (unsigned long)reply_file_pos_);
break;
}
} }
/* Let us update the info about the minimum binlog position of waiting /* Let us update the info about the minimum binlog position of waiting
...@@ -709,15 +698,15 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -709,15 +698,15 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
*/ */
if (wait_file_name_inited_) if (wait_file_name_inited_)
{ {
cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos, int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
wait_file_name_, wait_file_pos_); wait_file_name_, wait_file_pos_);
if (cmp <= 0) if (cmp <= 0)
{ {
/* This thd has a lower position, let's update the minimum info. */ /* This thd has a lower position, let's update the minimum info. */
strcpy(wait_file_name_, trx_wait_binlog_name); strcpy(wait_file_name_, trx_wait_binlog_name);
wait_file_pos_ = trx_wait_binlog_pos; wait_file_pos_ = trx_wait_binlog_pos;
wait_backtraverse_++; rpl_semi_sync_master_wait_pos_backtraverse++;
if (trace_level_ & kTraceDetail) if (trace_level_ & kTraceDetail)
sql_print_information("%s: move back wait position (%s, %lu),", sql_print_information("%s: move back wait position (%s, %lu),",
kWho, wait_file_name_, (unsigned long)wait_file_pos_); kWho, wait_file_name_, (unsigned long)wait_file_pos_);
...@@ -762,7 +751,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -762,7 +751,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
* when replication has progressed far enough, we will release * when replication has progressed far enough, we will release
* these waiting threads. * these waiting threads.
*/ */
wait_sessions_++; rpl_semi_sync_master_wait_sessions++;
if (trace_level_ & kTraceDetail) if (trace_level_ & kTraceDetail)
sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)", sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
...@@ -770,7 +759,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -770,7 +759,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
wait_file_name_, (unsigned long)wait_file_pos_); wait_file_name_, (unsigned long)wait_file_pos_);
wait_result = cond_timewait(&abstime); wait_result = cond_timewait(&abstime);
wait_sessions_--; rpl_semi_sync_master_wait_sessions--;
if (wait_result != 0) if (wait_result != 0)
{ {
...@@ -779,7 +768,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -779,7 +768,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
"semi-sync up to file %s, position %lu.", "semi-sync up to file %s, position %lu.",
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos, trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
reply_file_name_, (unsigned long)reply_file_pos_); reply_file_name_, (unsigned long)reply_file_pos_);
total_wait_timeouts_++; rpl_semi_sync_master_wait_timeouts++;
/* switch semi-sync off */ /* switch semi-sync off */
switch_off(); switch_off();
...@@ -798,12 +787,12 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -798,12 +787,12 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
"wait position (%s, %lu)", "wait position (%s, %lu)",
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos); trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos);
} }
timefunc_fails_++; rpl_semi_sync_master_timefunc_fails++;
} }
else else
{ {
total_trx_wait_num_++; rpl_semi_sync_master_trx_wait_num++;
total_trx_wait_time_ += wait_time; rpl_semi_sync_master_trx_wait_time += wait_time;
} }
} }
} }
...@@ -816,7 +805,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -816,7 +805,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
"wait position (%s, %lu)", "wait position (%s, %lu)",
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos); trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos);
} }
timefunc_fails_++; rpl_semi_sync_master_timefunc_fails++;
/* switch semi-sync off */ /* switch semi-sync off */
switch_off(); switch_off();
...@@ -824,11 +813,18 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -824,11 +813,18 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
} }
l_end: l_end:
/*
At this point, the binlog file and position of this transaction
must have been removed from ActiveTranx.
*/
assert(!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
trx_wait_binlog_pos));
/* Update the status counter. */ /* Update the status counter. */
if (is_on() && rpl_semi_sync_master_clients) if (is_on())
enabled_transactions_++; rpl_semi_sync_master_yes_transactions++;
else else
disabled_transactions_++; rpl_semi_sync_master_no_transactions++;
/* The lock held will be released by thd_exit_cond, so no need to /* The lock held will be released by thd_exit_cond, so no need to
call unlock() here */ call unlock() here */
...@@ -868,7 +864,7 @@ int ReplSemiSyncMaster::switch_off() ...@@ -868,7 +864,7 @@ int ReplSemiSyncMaster::switch_off()
assert(active_tranxs_ != NULL); assert(active_tranxs_ != NULL);
result = active_tranxs_->clear_active_tranx_nodes(NULL, 0); result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
switched_off_times_++; rpl_semi_sync_master_off_times++;
wait_file_name_inited_ = false; wait_file_name_inited_ = false;
reply_file_name_inited_ = false; reply_file_name_inited_ = false;
sql_print_information("Semi-sync replication switched OFF."); sql_print_information("Semi-sync replication switched OFF.");
...@@ -1045,7 +1041,9 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, ...@@ -1045,7 +1041,9 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
* reserve the packet header. * reserve the packet header.
*/ */
if (sync) if (sync)
{
(packet)[2] = kPacketFlagSync; (packet)[2] = kPacketFlagSync;
}
return function_exit(kWho, 0); return function_exit(kWho, 0);
} }
...@@ -1089,7 +1087,7 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, ...@@ -1089,7 +1087,7 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
commit_file_name_inited_ = true; commit_file_name_inited_ = true;
} }
if (is_on() && rpl_semi_sync_master_clients) if (is_on())
{ {
assert(active_tranxs_ != NULL); assert(active_tranxs_ != NULL);
if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)) if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
...@@ -1098,8 +1096,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, ...@@ -1098,8 +1096,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
if insert tranx_node failed, print a warning message if insert tranx_node failed, print a warning message
and turn off semi-sync and turn off semi-sync
*/ */
sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %ul", sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
log_file_name, log_file_pos); log_file_name, (ulong)log_file_pos);
switch_off(); switch_off();
} }
} }
...@@ -1110,6 +1108,113 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, ...@@ -1110,6 +1108,113 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
return function_exit(kWho, result); return function_exit(kWho, result);
} }
int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
const char *event_buf)
{
const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
const unsigned char *packet;
char log_file_name[FN_REFLEN];
my_off_t log_file_pos;
ulong packet_len;
int result = -1;
struct timeval start_tv;
int start_time_err= 0;
ulong trc_level = trace_level_;
function_enter(kWho);
assert((unsigned char)event_buf[1] == kPacketMagicNum);
if ((unsigned char)event_buf[2] != kPacketFlagSync)
{
/* current event does not require reply */
result = 0;
goto l_end;
}
if (trc_level & kTraceNetWait)
start_time_err = gettimeofday(&start_tv, 0);
/* We flush to make sure that the current event is sent to the network,
* instead of being buffered in the TCP/IP stack.
*/
if (net_flush(net))
{
sql_print_error("Semi-sync master failed on net_flush() "
"before waiting for slave reply");
goto l_end;
}
net_clear(net, 0);
if (trc_level & kTraceDetail)
sql_print_information("%s: Wait for replica's reply", kWho);
/* Wait for the network here. Though binlog dump thread can indefinitely wait
* here, transactions would not wait indefintely.
* Transactions wait on binlog replies detected by binlog dump threads. If
* binlog dump threads wait too long, transactions will timeout and continue.
*/
packet_len = my_net_read(net);
if (trc_level & kTraceNetWait)
{
if (start_time_err != 0)
{
sql_print_error("Semi-sync master wait for reply "
"gettimeofday fail to get start time");
rpl_semi_sync_master_timefunc_fails++;
}
else
{
int wait_time;
wait_time = getWaitTime(start_tv);
if (wait_time < 0)
{
sql_print_error("Semi-sync master wait for reply "
"gettimeofday fail to get wait time.");
rpl_semi_sync_master_timefunc_fails++;
}
else
{
rpl_semi_sync_master_net_wait_num++;
rpl_semi_sync_master_net_wait_time += wait_time;
}
}
}
if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
{
if (packet_len == packet_error)
sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
net->last_error, net->last_errno);
else
sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
net->last_error, net->last_errno);
goto l_end;
}
packet = net->read_pos;
if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
{
sql_print_error("Read semi-sync reply magic number error");
goto l_end;
}
log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
strcpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET);
if (trc_level & kTraceDetail)
sql_print_information("%s: Got reply (%s, %lu)",
kWho, log_file_name, (ulong)log_file_pos);
result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
l_end:
return function_exit(kWho, result);
}
int ReplSemiSyncMaster::resetMaster() int ReplSemiSyncMaster::resetMaster()
{ {
const char *kWho = "ReplSemiSyncMaster::resetMaster"; const char *kWho = "ReplSemiSyncMaster::resetMaster";
...@@ -1126,16 +1231,16 @@ int ReplSemiSyncMaster::resetMaster() ...@@ -1126,16 +1231,16 @@ int ReplSemiSyncMaster::resetMaster()
reply_file_name_inited_ = false; reply_file_name_inited_ = false;
commit_file_name_inited_ = false; commit_file_name_inited_ = false;
enabled_transactions_ = 0; rpl_semi_sync_master_yes_transactions = 0;
disabled_transactions_ = 0; rpl_semi_sync_master_no_transactions = 0;
switched_off_times_ = 0; rpl_semi_sync_master_off_times = 0;
timefunc_fails_ = 0; rpl_semi_sync_master_timefunc_fails = 0;
wait_sessions_ = 0; rpl_semi_sync_master_wait_sessions = 0;
wait_backtraverse_ = 0; rpl_semi_sync_master_wait_pos_backtraverse = 0;
total_trx_wait_num_ = 0; rpl_semi_sync_master_trx_wait_num = 0;
total_trx_wait_time_ = 0; rpl_semi_sync_master_trx_wait_time = 0;
total_net_wait_num_ = 0; rpl_semi_sync_master_net_wait_num = 0;
total_net_wait_time_ = 0; rpl_semi_sync_master_net_wait_time = 0;
unlock(); unlock();
...@@ -1146,27 +1251,15 @@ void ReplSemiSyncMaster::setExportStats() ...@@ -1146,27 +1251,15 @@ void ReplSemiSyncMaster::setExportStats()
{ {
lock(); lock();
rpl_semi_sync_master_status = state_ && rpl_semi_sync_master_clients; rpl_semi_sync_master_status = state_;
rpl_semi_sync_master_yes_transactions = enabled_transactions_; rpl_semi_sync_master_avg_trx_wait_time=
rpl_semi_sync_master_no_transactions = disabled_transactions_; ((rpl_semi_sync_master_trx_wait_num) ?
rpl_semi_sync_master_off_times = switched_off_times_; (unsigned long)((double)rpl_semi_sync_master_trx_wait_time /
rpl_semi_sync_master_timefunc_fails = timefunc_fails_; ((double)rpl_semi_sync_master_trx_wait_num)) : 0);
rpl_semi_sync_master_num_timeouts = total_wait_timeouts_; rpl_semi_sync_master_avg_net_wait_time=
rpl_semi_sync_master_wait_sessions = wait_sessions_; ((rpl_semi_sync_master_net_wait_num) ?
rpl_semi_sync_master_back_wait_pos = wait_backtraverse_; (unsigned long)((double)rpl_semi_sync_master_net_wait_time /
rpl_semi_sync_master_trx_wait_num = total_trx_wait_num_; ((double)rpl_semi_sync_master_net_wait_num)) : 0);
rpl_semi_sync_master_trx_wait_time =
((total_trx_wait_num_) ?
(unsigned long)((double)total_trx_wait_time_ /
((double)total_trx_wait_num_)) : 0);
rpl_semi_sync_master_net_wait_num = total_net_wait_num_;
rpl_semi_sync_master_net_wait_time =
((total_net_wait_num_) ?
(unsigned long)((double)total_net_wait_time_ /
((double)total_net_wait_num_)) : 0);
rpl_semi_sync_master_net_wait_total_time = total_net_wait_time_;
rpl_semi_sync_master_trx_wait_total_time = total_trx_wait_time_;
unlock(); unlock();
} }
......
...@@ -81,7 +81,7 @@ public: ...@@ -81,7 +81,7 @@ public:
/* Insert an active transaction node with the specified position. /* Insert an active transaction node with the specified position.
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos); int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
...@@ -91,7 +91,7 @@ public: ...@@ -91,7 +91,7 @@ public:
* list and the hash table will be reset to empty. * list and the hash table will be reset to empty.
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int clear_active_tranx_nodes(const char *log_file_name, int clear_active_tranx_nodes(const char *log_file_name,
my_off_t log_file_pos); my_off_t log_file_pos);
...@@ -175,19 +175,7 @@ class ReplSemiSyncMaster ...@@ -175,19 +175,7 @@ class ReplSemiSyncMaster
volatile bool master_enabled_; /* semi-sync is enabled on the master */ volatile bool master_enabled_; /* semi-sync is enabled on the master */
unsigned long wait_timeout_; /* timeout period(ms) during tranx wait */ unsigned long wait_timeout_; /* timeout period(ms) during tranx wait */
/* All status variables. */
bool state_; /* whether semi-sync is switched */ bool state_; /* whether semi-sync is switched */
unsigned long enabled_transactions_; /* semi-sync'ed tansactions */
unsigned long disabled_transactions_; /* non-semi-sync'ed tansactions */
unsigned long switched_off_times_; /* how many times are switched off? */
unsigned long timefunc_fails_; /* how many time function fails? */
unsigned long total_wait_timeouts_; /* total number of wait timeouts */
unsigned long wait_sessions_; /* how many sessions wait for replies? */
unsigned long wait_backtraverse_; /* wait position back traverses */
unsigned long long total_trx_wait_num_; /* total trx waits: non-timeout ones */
unsigned long long total_trx_wait_time_; /* total trx wait time: in us */
unsigned long long total_net_wait_num_; /* total network waits */
unsigned long long total_net_wait_time_; /* total network wait time */
/* The number of maximum active transactions. This should be the same as /* The number of maximum active transactions. This should be the same as
* maximum connections because MySQL does not do connection sharing now. * maximum connections because MySQL does not do connection sharing now.
...@@ -253,8 +241,6 @@ class ReplSemiSyncMaster ...@@ -253,8 +241,6 @@ class ReplSemiSyncMaster
/* Is the slave servered by the thread requested semi-sync */ /* Is the slave servered by the thread requested semi-sync */
bool is_semi_sync_slave(); bool is_semi_sync_slave();
int reportReplyBinlog(const char *log_file_pos);
/* In semi-sync replication, reports up to which binlog position we have /* In semi-sync replication, reports up to which binlog position we have
* received replies from the slave indicating that it already get the events. * received replies from the slave indicating that it already get the events.
* *
...@@ -265,7 +251,7 @@ class ReplSemiSyncMaster ...@@ -265,7 +251,7 @@ class ReplSemiSyncMaster
* the replies from the slave * the replies from the slave
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int reportReplyBinlog(uint32 server_id, int reportReplyBinlog(uint32 server_id,
const char* log_file_name, const char* log_file_name,
...@@ -284,7 +270,7 @@ class ReplSemiSyncMaster ...@@ -284,7 +270,7 @@ class ReplSemiSyncMaster
* trx_wait_binlog_pos - (IN) ending position's file offset * trx_wait_binlog_pos - (IN) ending position's file offset
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int commitTrx(const char* trx_wait_binlog_name, int commitTrx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos); my_off_t trx_wait_binlog_pos);
...@@ -313,7 +299,7 @@ class ReplSemiSyncMaster ...@@ -313,7 +299,7 @@ class ReplSemiSyncMaster
* server_id - (IN) master server id number * server_id - (IN) master server id number
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int updateSyncHeader(unsigned char *packet, int updateSyncHeader(unsigned char *packet,
const char *log_file_name, const char *log_file_name,
...@@ -330,10 +316,23 @@ class ReplSemiSyncMaster ...@@ -330,10 +316,23 @@ class ReplSemiSyncMaster
* log_file_pos - (IN) transaction ending position's file offset * log_file_pos - (IN) transaction ending position's file offset
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos); int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos);
/* Read the slave's reply so that we know how much progress the slave makes
* on receive replication events.
*
* Input:
* net - (IN) the connection to master
* server_id - (IN) master server id number
* event_buf - (IN) pointer to the event packet
*
* Return:
* 0: success; non-zero: error
*/
int readSlaveReply(NET *net, uint32 server_id, const char *event_buf);
/* Export internal statistics for semi-sync replication. */ /* Export internal statistics for semi-sync replication. */
void setExportStats(); void setExportStats();
...@@ -345,22 +344,31 @@ class ReplSemiSyncMaster ...@@ -345,22 +344,31 @@ class ReplSemiSyncMaster
/* System and status variables for the master component */ /* System and status variables for the master component */
extern char rpl_semi_sync_master_enabled; extern char rpl_semi_sync_master_enabled;
extern char rpl_semi_sync_master_status;
extern unsigned long rpl_semi_sync_master_clients;
extern unsigned long rpl_semi_sync_master_timeout; extern unsigned long rpl_semi_sync_master_timeout;
extern unsigned long rpl_semi_sync_master_trace_level; extern unsigned long rpl_semi_sync_master_trace_level;
extern char rpl_semi_sync_master_status;
extern unsigned long rpl_semi_sync_master_yes_transactions; extern unsigned long rpl_semi_sync_master_yes_transactions;
extern unsigned long rpl_semi_sync_master_no_transactions; extern unsigned long rpl_semi_sync_master_no_transactions;
extern unsigned long rpl_semi_sync_master_off_times; extern unsigned long rpl_semi_sync_master_off_times;
extern unsigned long rpl_semi_sync_master_wait_timeouts;
extern unsigned long rpl_semi_sync_master_timefunc_fails; extern unsigned long rpl_semi_sync_master_timefunc_fails;
extern unsigned long rpl_semi_sync_master_num_timeouts; extern unsigned long rpl_semi_sync_master_num_timeouts;
extern unsigned long rpl_semi_sync_master_wait_sessions; extern unsigned long rpl_semi_sync_master_wait_sessions;
extern unsigned long rpl_semi_sync_master_back_wait_pos; extern unsigned long rpl_semi_sync_master_wait_pos_backtraverse;
extern unsigned long rpl_semi_sync_master_trx_wait_time; extern unsigned long rpl_semi_sync_master_avg_trx_wait_time;
extern unsigned long rpl_semi_sync_master_net_wait_time; extern unsigned long rpl_semi_sync_master_avg_net_wait_time;
extern unsigned long long rpl_semi_sync_master_net_wait_num; extern unsigned long long rpl_semi_sync_master_net_wait_num;
extern unsigned long long rpl_semi_sync_master_trx_wait_num; extern unsigned long long rpl_semi_sync_master_trx_wait_num;
extern unsigned long long rpl_semi_sync_master_net_wait_total_time; extern unsigned long long rpl_semi_sync_master_net_wait_time;
extern unsigned long long rpl_semi_sync_master_trx_wait_total_time; extern unsigned long long rpl_semi_sync_master_trx_wait_time;
extern unsigned long rpl_semi_sync_master_clients;
/*
This indicates whether we should keep waiting if no semi-sync slave
is available.
0 : stop waiting if detected no avaialable semi-sync slave.
1 (default) : keep waiting until timeout even no available semi-sync slave.
*/
extern char rpl_semi_sync_master_wait_no_slave;
#endif /* SEMISYNC_MASTER_H */ #endif /* SEMISYNC_MASTER_H */
...@@ -69,8 +69,16 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param, ...@@ -69,8 +69,16 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
bool semi_sync_slave= repl_semisync.is_semi_sync_slave(); bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
if (semi_sync_slave) if (semi_sync_slave)
{
/* One more semi-sync slave */ /* One more semi-sync slave */
repl_semisync.add_slave(); repl_semisync.add_slave();
/*
Let's assume this semi-sync slave has already received all
binlog events before the filename and position it requests.
*/
repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
}
sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)", sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
semi_sync_slave ? "semi-sync" : "asynchronous", semi_sync_slave ? "semi-sync" : "asynchronous",
param->server_id, log_file, (unsigned long)log_pos); param->server_id, log_file, (unsigned long)log_pos);
...@@ -114,6 +122,18 @@ int repl_semi_before_send_event(Binlog_transmit_param *param, ...@@ -114,6 +122,18 @@ int repl_semi_before_send_event(Binlog_transmit_param *param,
int repl_semi_after_send_event(Binlog_transmit_param *param, int repl_semi_after_send_event(Binlog_transmit_param *param,
const char *event_buf, unsigned long len) const char *event_buf, unsigned long len)
{ {
if (repl_semisync.is_semi_sync_slave())
{
THD *thd= current_thd;
/*
Possible errors in reading slave reply are ignored deliberately
because we do not want dump thread to quit on this. Error
messages are already reported.
*/
(void) repl_semisync.readSlaveReply(&thd->net,
param->server_id, event_buf);
thd->clear_error();
}
return 0; return 0;
} }
...@@ -142,11 +162,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd, ...@@ -142,11 +162,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
void *ptr, void *ptr,
const void *val); const void *val);
static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val);
static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled, static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
PLUGIN_VAR_OPCMDARG, PLUGIN_VAR_OPCMDARG,
"Enable semi-synchronous replication master (disabled by default). ", "Enable semi-synchronous replication master (disabled by default). ",
...@@ -161,6 +176,13 @@ static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout, ...@@ -161,6 +176,13 @@ static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout,
fix_rpl_semi_sync_master_timeout, // update fix_rpl_semi_sync_master_timeout, // update
10000, 0, ~0L, 1); 10000, 0, ~0L, 1);
static MYSQL_SYSVAR_BOOL(wait_no_slave, rpl_semi_sync_master_wait_no_slave,
PLUGIN_VAR_OPCMDARG,
"Wait until timeout when no semi-synchronous replication slave available (enabled by default). ",
NULL, // check
NULL, // update
1);
static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level, static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
PLUGIN_VAR_OPCMDARG, PLUGIN_VAR_OPCMDARG,
"The tracing level for semi-sync replication.", "The tracing level for semi-sync replication.",
...@@ -168,22 +190,11 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level, ...@@ -168,22 +190,11 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
&fix_rpl_semi_sync_master_trace_level, // update &fix_rpl_semi_sync_master_trace_level, // update
32, 0, ~0L, 1); 32, 0, ~0L, 1);
/*
Use a SESSION instead of GLOBAL variable for slave to send reply to
avoid requiring SUPER privilege.
*/
static MYSQL_THDVAR_STR(reply_log_file_pos,
PLUGIN_VAR_NOCMDOPT,
"The log filename and position slave has queued to relay log.",
NULL, // check
&fix_rpl_semi_sync_master_reply_log_file_pos,
"");
static SYS_VAR* semi_sync_master_system_vars[]= { static SYS_VAR* semi_sync_master_system_vars[]= {
MYSQL_SYSVAR(enabled), MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(timeout), MYSQL_SYSVAR(timeout),
MYSQL_SYSVAR(wait_no_slave),
MYSQL_SYSVAR(trace_level), MYSQL_SYSVAR(trace_level),
MYSQL_SYSVAR(reply_log_file_pos),
NULL, NULL,
}; };
...@@ -228,19 +239,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd, ...@@ -228,19 +239,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
return; return;
} }
static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
const char *log_file_pos= *(char **)val;
if (repl_semisync.reportReplyBinlog(log_file_pos))
sql_print_error("report slave binlog reply failed.");
return;
}
Trans_observer trans_observer = { Trans_observer trans_observer = {
sizeof(Trans_observer), // len sizeof(Trans_observer), // len
...@@ -278,45 +276,60 @@ Binlog_transmit_observer transmit_observer = { ...@@ -278,45 +276,60 @@ Binlog_transmit_observer transmit_observer = {
return 0; \ return 0; \
} }
DEF_SHOW_FUNC(clients, SHOW_LONG)
DEF_SHOW_FUNC(net_wait_time, SHOW_LONG)
DEF_SHOW_FUNC(net_wait_total_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
DEF_SHOW_FUNC(off_times, SHOW_LONG)
DEF_SHOW_FUNC(no_transactions, SHOW_LONG)
DEF_SHOW_FUNC(status, SHOW_BOOL) DEF_SHOW_FUNC(status, SHOW_BOOL)
DEF_SHOW_FUNC(timefunc_fails, SHOW_LONG) DEF_SHOW_FUNC(clients, SHOW_LONG)
DEF_SHOW_FUNC(trx_wait_time, SHOW_LONG) DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(trx_wait_total_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG) DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
DEF_SHOW_FUNC(back_wait_pos, SHOW_LONG) DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(wait_sessions, SHOW_LONG) DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
DEF_SHOW_FUNC(yes_transactions, SHOW_LONG) DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
/* plugin status variables */ /* plugin status variables */
static SHOW_VAR semi_sync_master_status_vars[]= { static SHOW_VAR semi_sync_master_status_vars[]= {
{"Rpl_semi_sync_master_clients", (char*) &SHOW_FNAME(clients), SHOW_FUNC}, {"Rpl_semi_sync_master_status",
{"Rpl_semi_sync_master_net_avg_wait_time", (char*) &SHOW_FNAME(status),
(char*) &SHOW_FNAME(net_wait_time), SHOW_FUNC}, SHOW_FUNC},
{"Rpl_semi_sync_master_net_wait_time", {"Rpl_semi_sync_master_clients",
(char*) &SHOW_FNAME(net_wait_total_time), SHOW_FUNC}, (char*) &SHOW_FNAME(clients),
{"Rpl_semi_sync_master_net_waits", (char*) &SHOW_FNAME(net_wait_num), SHOW_FUNC}, SHOW_FUNC},
{"Rpl_semi_sync_master_no_times", (char*) &SHOW_FNAME(off_times), SHOW_FUNC}, {"Rpl_semi_sync_master_yes_tx",
{"Rpl_semi_sync_master_no_tx", (char*) &SHOW_FNAME(no_transactions), SHOW_FUNC}, (char*) &rpl_semi_sync_master_yes_transactions,
{"Rpl_semi_sync_master_status", (char*) &SHOW_FNAME(status), SHOW_FUNC}, SHOW_LONG},
{"Rpl_semi_sync_master_no_tx",
(char*) &rpl_semi_sync_master_no_transactions,
SHOW_LONG},
{"Rpl_semi_sync_master_wait_sessions",
(char*) &rpl_semi_sync_master_wait_sessions,
SHOW_LONG},
{"Rpl_semi_sync_master_no_times",
(char*) &rpl_semi_sync_master_off_times,
SHOW_LONG},
{"Rpl_semi_sync_master_timefunc_failures", {"Rpl_semi_sync_master_timefunc_failures",
(char*) &SHOW_FNAME(timefunc_fails), SHOW_FUNC}, (char*) &rpl_semi_sync_master_timefunc_fails,
{"Rpl_semi_sync_master_tx_avg_wait_time", SHOW_LONG},
(char*) &SHOW_FNAME(trx_wait_time), SHOW_FUNC},
{"Rpl_semi_sync_master_tx_wait_time",
(char*) &SHOW_FNAME(trx_wait_total_time), SHOW_FUNC},
{"Rpl_semi_sync_master_tx_waits", (char*) &SHOW_FNAME(trx_wait_num), SHOW_FUNC},
{"Rpl_semi_sync_master_wait_pos_backtraverse", {"Rpl_semi_sync_master_wait_pos_backtraverse",
(char*) &SHOW_FNAME(back_wait_pos), SHOW_FUNC}, (char*) &rpl_semi_sync_master_wait_pos_backtraverse,
{"Rpl_semi_sync_master_wait_sessions", SHOW_LONG},
(char*) &SHOW_FNAME(wait_sessions), SHOW_FUNC}, {"Rpl_semi_sync_master_tx_wait_time",
{"Rpl_semi_sync_master_yes_tx", (char*) &SHOW_FNAME(yes_transactions), SHOW_FUNC}, (char*) &SHOW_FNAME(trx_wait_time),
SHOW_FUNC},
{"Rpl_semi_sync_master_tx_waits",
(char*) &SHOW_FNAME(trx_wait_num),
SHOW_FUNC},
{"Rpl_semi_sync_master_tx_avg_wait_time",
(char*) &SHOW_FNAME(avg_trx_wait_time),
SHOW_FUNC},
{"Rpl_semi_sync_master_net_wait_time",
(char*) &SHOW_FNAME(net_wait_time),
SHOW_FUNC},
{"Rpl_semi_sync_master_net_waits",
(char*) &SHOW_FNAME(net_wait_num),
SHOW_FUNC},
{"Rpl_semi_sync_master_net_avg_wait_time",
(char*) &SHOW_FNAME(avg_net_wait_time),
SHOW_FUNC},
{NULL, NULL, SHOW_LONG}, {NULL, NULL, SHOW_LONG},
}; };
......
...@@ -39,16 +39,6 @@ int ReplSemiSyncSlave::initObject() ...@@ -39,16 +39,6 @@ int ReplSemiSyncSlave::initObject()
return result; return result;
} }
int ReplSemiSyncSlave::slaveReplyConnect()
{
if (!mysql_reply && !(mysql_reply= rpl_connect_master(NULL)))
{
sql_print_error("Semisync slave connect to master for reply failed");
return 1;
}
return 0;
}
int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header, int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
unsigned long total_len, unsigned long total_len,
bool *need_reply, bool *need_reply,
...@@ -104,19 +94,45 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param) ...@@ -104,19 +94,45 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
return 0; return 0;
} }
int ReplSemiSyncSlave::slaveReply(const char *log_name, my_off_t log_pos) int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
const char *binlog_filename,
my_off_t binlog_filepos)
{ {
char query[FN_REFLEN + 100]; const char *kWho = "ReplSemiSyncSlave::slaveReply";
sprintf(query, "SET SESSION rpl_semi_sync_master_reply_log_file_pos='%llu:%s'", NET *net= &mysql->net;
(unsigned long long)log_pos, log_name); uchar reply_buffer[REPLY_MAGIC_NUM_LEN
if (mysql_real_query(mysql_reply, query, strlen(query))) + REPLY_BINLOG_POS_LEN
+ REPLY_BINLOG_NAME_LEN];
int reply_res, name_len = strlen(binlog_filename);
function_enter(kWho);
/* Prepare the buffer of the reply. */
reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum;
int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
binlog_filename,
name_len + 1 /* including trailing '\0' */);
if (trace_level_ & kTraceDetail)
sql_print_information("%s: reply (%s, %lu)", kWho,
binlog_filename, (ulong)binlog_filepos);
net_clear(net, 0);
/* Send the reply. */
reply_res = my_net_write(net, reply_buffer,
name_len + REPLY_BINLOG_NAME_OFFSET);
if (!reply_res)
{ {
sql_print_error("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed"); reply_res = net_flush(net);
mysql_free_result(mysql_store_result(mysql_reply)); if (reply_res)
mysql_close(mysql_reply); sql_print_error("Semi-sync slave net_flush() reply failed");
mysql_reply= 0;
return 1;
} }
mysql_free_result(mysql_store_result(mysql_reply)); else
return 0; {
sql_print_error("Semi-sync slave send reply failed: %s (%d)",
net->last_error, net->last_errno);
}
return function_exit(kWho, reply_res);
} }
...@@ -57,7 +57,7 @@ public: ...@@ -57,7 +57,7 @@ public:
* payload_len - (IN) payload length * payload_len - (IN) payload length
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply, int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
const char **payload, unsigned long *payload_len); const char **payload, unsigned long *payload_len);
...@@ -67,19 +67,16 @@ public: ...@@ -67,19 +67,16 @@ public:
* binlog position. * binlog position.
* *
* Input: * Input:
* log_name - (IN) the reply point's binlog file name * mysql - (IN) the mysql network connection
* log_pos - (IN) the reply point's binlog file offset * binlog_filename - (IN) the reply point's binlog file name
* binlog_filepos - (IN) the reply point's binlog file offset
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int slaveReply(const char *log_name, my_off_t log_pos); int slaveReply(MYSQL *mysql, const char *binlog_filename,
my_off_t binlog_filepos);
/*
Connect to master for sending reply
*/
int slaveReplyConnect();
int slaveStart(Binlog_relay_IO_param *param); int slaveStart(Binlog_relay_IO_param *param);
int slaveStop(Binlog_relay_IO_param *param); int slaveStop(Binlog_relay_IO_param *param);
......
...@@ -45,13 +45,6 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param, ...@@ -45,13 +45,6 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
if (!repl_semisync.getSlaveEnabled()) if (!repl_semisync.getSlaveEnabled())
return 0; return 0;
/*
Create the connection that is used to send slave ACK replies to
master
*/
if (repl_semisync.slaveReplyConnect())
return 1;
/* Check if master server has semi-sync plugin installed */ /* Check if master server has semi-sync plugin installed */
query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"; query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
if (mysql_real_query(mysql, query, strlen(query)) || if (mysql_real_query(mysql, query, strlen(query)) ||
...@@ -63,10 +56,11 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param, ...@@ -63,10 +56,11 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
} }
row= mysql_fetch_row(res); row= mysql_fetch_row(res);
if (!row || strcmp(row[1], "ON")) if (!row)
{ {
/* Master does not support or not configured semi-sync */ /* Master does not support semi-sync */
sql_print_warning("Master server does not support or not configured semi-sync replication, fallback to asynchronous"); sql_print_warning("Master server does not support semi-sync, "
"fallback to asynchronous replication");
rpl_semi_sync_slave_status= 0; rpl_semi_sync_slave_status= 0;
return 0; return 0;
} }
...@@ -106,8 +100,16 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param, ...@@ -106,8 +100,16 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
uint32 flags) uint32 flags)
{ {
if (rpl_semi_sync_slave_status && semi_sync_need_reply) if (rpl_semi_sync_slave_status && semi_sync_need_reply)
return repl_semisync.slaveReply(param->master_log_name, {
/*
We deliberately ignore the error in slaveReply, such error
should not cause the slave IO thread to stop, and the error
messages are already reported.
*/
(void) repl_semisync.slaveReply(param->mysql,
param->master_log_name,
param->master_log_pos); param->master_log_pos);
}
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment