Commit f8155de0 authored by He Zhenxing's avatar He Zhenxing

Backport BUG#45848 Semisynchronous replication internals are visible in SHOW PROCESSLIST and logs

Semi-sync uses an extra connection from slave to master to send
replies, this is a normal client connection, and used a normal
SET query to set the reply information on master, which is visible
to user and may cause some confusion and complaining.

This problem is fixed by using the method of sending reply by
using the same connection that is used by master dump thread to
send binlog to slave. Since now the semi-sync plugins are integrated
with the server code, it is not a problem to use the internal net
interfaces to do this.

The master dump thread will mark the event requires a reply and
wait for the reply when the event just sent is the last event
of a transaction and semi-sync status is ON; And the slave will
send a reply to master when it received such an event that requires
a reply.
parent d8724a45
...@@ -81,7 +81,7 @@ Rpl_semi_sync_master_no_tx 0 ...@@ -81,7 +81,7 @@ Rpl_semi_sync_master_no_tx 0
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_yes_tx 0 Rpl_semi_sync_master_yes_tx 0
create table t1(n int) engine = ENGINE_TYPE; create table t1(a int) engine = ENGINE_TYPE;
[ master state after CREATE TABLE statement ] [ master state after CREATE TABLE statement ]
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
Variable_name Value Variable_name Value
...@@ -92,6 +92,9 @@ Rpl_semi_sync_master_no_tx 0 ...@@ -92,6 +92,9 @@ Rpl_semi_sync_master_no_tx 0
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_yes_tx 1 Rpl_semi_sync_master_yes_tx 1
select CONNECTIONS_NORMAL_SLAVE - CONNECTIONS_NORMAL_SLAVE as 'Should be 0';
Should be 0
0
[ insert records to table ] [ insert records to table ]
[ master status after inserts ] [ master status after inserts ]
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
...@@ -108,15 +111,19 @@ Rpl_semi_sync_master_yes_tx 301 ...@@ -108,15 +111,19 @@ Rpl_semi_sync_master_yes_tx 301
show status like 'Rpl_semi_sync_slave_status'; show status like 'Rpl_semi_sync_slave_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_slave_status ON Rpl_semi_sync_slave_status ON
select count(distinct n) from t1; select count(distinct a) from t1;
count(distinct n) count(distinct a)
300 300
select min(n) from t1; select min(a) from t1;
min(n) min(a)
1 1
select max(n) from t1; select max(a) from t1;
max(n) max(a)
300 300
#
# Test semi-sync master will switch OFF after one transacton
# timeout waiting for slave reply.
#
include/stop_slave.inc include/stop_slave.inc
[ on master ] [ on master ]
[ master status should be ON ] [ master status should be ON ]
...@@ -134,7 +141,16 @@ Variable_name Value ...@@ -134,7 +141,16 @@ Variable_name Value
Rpl_semi_sync_master_clients 1 Rpl_semi_sync_master_clients 1
[ semi-sync replication of these transactions will fail ] [ semi-sync replication of these transactions will fail ]
insert into t1 values (500); insert into t1 values (500);
delete from t1 where n < 500; [ master status should be OFF ]
show status like 'Rpl_semi_sync_master_status';
Variable_name Value
Rpl_semi_sync_master_status OFF
show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 1
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 301
insert into t1 values (100); insert into t1 values (100);
[ master status should be OFF ] [ master status should be OFF ]
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
...@@ -142,10 +158,13 @@ Variable_name Value ...@@ -142,10 +158,13 @@ Variable_name Value
Rpl_semi_sync_master_status OFF Rpl_semi_sync_master_status OFF
show status like 'Rpl_semi_sync_master_no_tx'; show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_no_tx 3 Rpl_semi_sync_master_no_tx 302
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_yes_tx 301 Rpl_semi_sync_master_yes_tx 301
#
# Test semi-sync status on master will be ON again when slave catches up
#
[ on slave ] [ on slave ]
[ slave status should be OFF ] [ slave status should be OFF ]
show status like 'Rpl_semi_sync_slave_status'; show status like 'Rpl_semi_sync_slave_status';
...@@ -156,31 +175,33 @@ include/start_slave.inc ...@@ -156,31 +175,33 @@ include/start_slave.inc
show status like 'Rpl_semi_sync_slave_status'; show status like 'Rpl_semi_sync_slave_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_slave_status ON Rpl_semi_sync_slave_status ON
select count(distinct n) from t1; select count(distinct a) from t1;
count(distinct n) count(distinct a)
2 2
select min(n) from t1; select min(a) from t1;
min(n) min(a)
100 100
select max(n) from t1; select max(a) from t1;
max(n) max(a)
500 500
[ on master ] [ on master ]
[ do something to activate semi-sync ] [ master status should be ON again after slave catches up ]
drop table t1;
[ master status should be ON again ]
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_status ON Rpl_semi_sync_master_status ON
show status like 'Rpl_semi_sync_master_no_tx'; show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_no_tx 3 Rpl_semi_sync_master_no_tx 302
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_yes_tx 302 Rpl_semi_sync_master_yes_tx 301
show status like 'Rpl_semi_sync_master_clients'; show status like 'Rpl_semi_sync_master_clients';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_clients 1 Rpl_semi_sync_master_clients 1
#
# Test disable/enable master semi-sync on the fly.
#
drop table t1;
[ on slave ] [ on slave ]
include/stop_slave.inc include/stop_slave.inc
[ on master ] [ on master ]
...@@ -206,6 +227,9 @@ rpl_semi_sync_master_enabled ON ...@@ -206,6 +227,9 @@ rpl_semi_sync_master_enabled ON
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
Variable_name Value Variable_name Value
Rpl_semi_sync_master_status ON Rpl_semi_sync_master_status ON
#
# Test RESET MASTER/SLAVE
#
[ on slave ] [ on slave ]
include/start_slave.inc include/start_slave.inc
[ on master ] [ on master ]
......
...@@ -7,6 +7,11 @@ source include/master-slave.inc; ...@@ -7,6 +7,11 @@ source include/master-slave.inc;
let $engine_type= InnoDB; let $engine_type= InnoDB;
#let $engine_type= MyISAM; #let $engine_type= MyISAM;
# After fix of BUG#45848, semi-sync slave should not create any extra
# connections on master, save the count of connections before start
# semi-sync slave for comparison below.
let $_connections_normal_slave= query_get_value(SHOW STATUS LIKE 'Threads_connected', Value, 1);
# Suppress warnings that might be generated during the test # Suppress warnings that might be generated during the test
disable_query_log; disable_query_log;
connection master; connection master;
...@@ -150,13 +155,20 @@ show status like 'Rpl_semi_sync_master_no_tx'; ...@@ -150,13 +155,20 @@ show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
replace_result $engine_type ENGINE_TYPE; replace_result $engine_type ENGINE_TYPE;
eval create table t1(n int) engine = $engine_type; eval create table t1(a int) engine = $engine_type;
echo [ master state after CREATE TABLE statement ]; echo [ master state after CREATE TABLE statement ];
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx'; show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
# After fix of BUG#45848, semi-sync slave should not create any extra
# connections on master.
let $_connections_semisync_slave= query_get_value(SHOW STATUS LIKE 'Threads_connected', Value, 1);
replace_result $_connections_semisync_slave CONNECTIONS_SEMISYNC_SLAVE;
replace_result $_connections_normal_slave CONNECTIONS_NORMAL_SLAVE;
eval select $_connections_semisync_slave - $_connections_normal_slave as 'Should be 0';
let $i=300; let $i=300;
echo [ insert records to table ]; echo [ insert records to table ];
disable_query_log; disable_query_log;
...@@ -178,10 +190,15 @@ echo [ on slave ]; ...@@ -178,10 +190,15 @@ echo [ on slave ];
echo [ slave status after replicated inserts ]; echo [ slave status after replicated inserts ];
show status like 'Rpl_semi_sync_slave_status'; show status like 'Rpl_semi_sync_slave_status';
select count(distinct n) from t1; select count(distinct a) from t1;
select min(n) from t1; select min(a) from t1;
select max(n) from t1; select max(a) from t1;
--echo #
--echo # Test semi-sync master will switch OFF after one transacton
--echo # timeout waiting for slave reply.
--echo #
connection slave;
source include/stop_slave.inc; source include/stop_slave.inc;
connection master; connection master;
...@@ -197,8 +214,11 @@ show status like 'Rpl_semi_sync_master_clients'; ...@@ -197,8 +214,11 @@ show status like 'Rpl_semi_sync_master_clients';
echo [ semi-sync replication of these transactions will fail ]; echo [ semi-sync replication of these transactions will fail ];
insert into t1 values (500); insert into t1 values (500);
delete from t1 where n < 500;
insert into t1 values (100); # Wait for the semi-sync replication of this transaction to timeout
let $status_var= Rpl_semi_sync_master_status;
let $status_var_value= OFF;
source include/wait_for_status_var.inc;
# The second semi-sync check should be off because one transaction # The second semi-sync check should be off because one transaction
# times out during waiting. # times out during waiting.
...@@ -207,6 +227,28 @@ show status like 'Rpl_semi_sync_master_status'; ...@@ -207,6 +227,28 @@ show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx'; show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
# Semi-sync status on master is now OFF, so all these transactions
# will be replicated asynchronously.
let $i=300;
disable_query_log;
while ($i)
{
eval delete from t1 where a=$i;
dec $i;
}
enable_query_log;
insert into t1 values (100);
echo [ master status should be OFF ];
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx';
--echo #
--echo # Test semi-sync status on master will be ON again when slave catches up
--echo #
# Save the master position for later use. # Save the master position for later use.
save_master_pos; save_master_pos;
...@@ -221,23 +263,25 @@ sync_with_master; ...@@ -221,23 +263,25 @@ sync_with_master;
echo [ slave status should be ON ]; echo [ slave status should be ON ];
show status like 'Rpl_semi_sync_slave_status'; show status like 'Rpl_semi_sync_slave_status';
select count(distinct n) from t1; select count(distinct a) from t1;
select min(n) from t1; select min(a) from t1;
select max(n) from t1; select max(a) from t1;
connection master; connection master;
echo [ on master ]; echo [ on master ];
echo [ do something to activate semi-sync ]; # The master semi-sync status should be on again after slave catches up.
drop table t1; echo [ master status should be ON again after slave catches up ];
# The third semi-sync check should be on again.
echo [ master status should be ON again ];
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx'; show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx'; show status like 'Rpl_semi_sync_master_yes_tx';
show status like 'Rpl_semi_sync_master_clients'; show status like 'Rpl_semi_sync_master_clients';
--echo #
--echo # Test disable/enable master semi-sync on the fly.
--echo #
drop table t1;
sync_slave_with_master; sync_slave_with_master;
echo [ on slave ]; echo [ on slave ];
...@@ -259,6 +303,10 @@ set global rpl_semi_sync_master_enabled=1; ...@@ -259,6 +303,10 @@ set global rpl_semi_sync_master_enabled=1;
show variables like 'rpl_semi_sync_master_enabled'; show variables like 'rpl_semi_sync_master_enabled';
show status like 'Rpl_semi_sync_master_status'; show status like 'Rpl_semi_sync_master_status';
--echo #
--echo # Test RESET MASTER/SLAVE
--echo #
connection slave; connection slave;
echo [ on slave ]; echo [ on slave ];
...@@ -512,6 +560,8 @@ source include/start_slave.inc; ...@@ -512,6 +560,8 @@ source include/start_slave.inc;
connection master; connection master;
drop table t1; drop table t1;
sync_slave_with_master;
connection master;
drop user rpl@127.0.0.1; drop user rpl@127.0.0.1;
flush privileges; flush privileges;
sync_slave_with_master;
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
pkgplugindir = $(pkglibdir)/plugin pkgplugindir = $(pkglibdir)/plugin
INCLUDES = -I$(top_srcdir)/include \ INCLUDES = -I$(top_srcdir)/include \
-I$(top_srcdir)/sql \ -I$(top_srcdir)/sql \
-I$(top_srcdir)/regex \
-I$(srcdir) -I$(srcdir)
noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h
......
...@@ -18,25 +18,9 @@ ...@@ -18,25 +18,9 @@
#ifndef SEMISYNC_H #ifndef SEMISYNC_H
#define SEMISYNC_H #define SEMISYNC_H
#include <stdint.h>
#include <string.h>
#include <assert.h>
#include <sys/time.h>
#include <time.h>
#include <stdio.h>
#include <pthread.h>
#include <mysql.h>
typedef uint32_t uint32;
typedef unsigned long long my_off_t;
#define FN_REFLEN 512 /* Max length of full path-name */
void sql_print_error(const char *format, ...);
void sql_print_warning(const char *format, ...);
void sql_print_information(const char *format, ...);
extern unsigned long max_connections;
#define MYSQL_SERVER #define MYSQL_SERVER
#define HAVE_REPLICATION #define HAVE_REPLICATION
#include <mysql_priv.h>
#include <my_global.h> #include <my_global.h>
#include <my_pthread.h> #include <my_pthread.h>
#include <mysql/plugin.h> #include <mysql/plugin.h>
...@@ -92,4 +76,16 @@ class ReplSemiSyncBase ...@@ -92,4 +76,16 @@ class ReplSemiSyncBase
static const unsigned char kPacketFlagSync; static const unsigned char kPacketFlagSync;
}; };
/* The layout of a semisync slave reply packet:
1 byte for the magic num
8 bytes for the binlog positon
n bytes for the binlog filename, terminated with a '\0'
*/
#define REPLY_MAGIC_NUM_LEN 1
#define REPLY_BINLOG_POS_LEN 8
#define REPLY_BINLOG_NAME_LEN (FN_REFLEN + 1)
#define REPLY_MAGIC_NUM_OFFSET 0
#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN)
#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN)
#endif /* SEMISYNC_H */ #endif /* SEMISYNC_H */
...@@ -546,19 +546,6 @@ bool ReplSemiSyncMaster::is_semi_sync_slave() ...@@ -546,19 +546,6 @@ bool ReplSemiSyncMaster::is_semi_sync_slave()
return val; return val;
} }
int ReplSemiSyncMaster::reportReplyBinlog(const char *log_file_pos)
{
char log_name[FN_REFLEN];
char *endptr;
my_off_t log_pos= strtoull(log_file_pos, &endptr, 10);
if (!log_pos || !endptr || *endptr != ':' )
return 1;
endptr++; // skip the ':' seperator
strncpy(log_name, endptr, FN_REFLEN);
uint32 server_id= 0;
return reportReplyBinlog(server_id, log_name, log_pos);
}
int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
const char *log_file_name, const char *log_file_name,
my_off_t log_file_pos) my_off_t log_file_pos)
...@@ -679,7 +666,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -679,7 +666,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
"Waiting for semi-sync ACK from slave"); "Waiting for semi-sync ACK from slave");
/* This is the real check inside the mutex. */ /* This is the real check inside the mutex. */
if (!getMasterEnabled() || !is_on() || !rpl_semi_sync_master_clients) if (!getMasterEnabled() || !is_on())
goto l_end; goto l_end;
if (trace_level_ & kTraceDetail) if (trace_level_ & kTraceDetail)
...@@ -690,6 +677,8 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -690,6 +677,8 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
} }
while (is_on()) while (is_on())
{
if (reply_file_name_inited_)
{ {
int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
trx_wait_binlog_name, trx_wait_binlog_pos); trx_wait_binlog_name, trx_wait_binlog_pos);
...@@ -703,13 +692,14 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -703,13 +692,14 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
kWho, reply_file_name_, (unsigned long)reply_file_pos_); kWho, reply_file_name_, (unsigned long)reply_file_pos_);
break; break;
} }
}
/* Let us update the info about the minimum binlog position of waiting /* Let us update the info about the minimum binlog position of waiting
* threads. * threads.
*/ */
if (wait_file_name_inited_) if (wait_file_name_inited_)
{ {
cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos, int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
wait_file_name_, wait_file_pos_); wait_file_name_, wait_file_pos_);
if (cmp <= 0) if (cmp <= 0)
{ {
...@@ -824,6 +814,13 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, ...@@ -824,6 +814,13 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
} }
l_end: l_end:
/*
At this point, the binlog file and position of this transaction
must have been removed from ActiveTranx.
*/
assert(!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
trx_wait_binlog_pos));
/* Update the status counter. */ /* Update the status counter. */
if (is_on() && rpl_semi_sync_master_clients) if (is_on() && rpl_semi_sync_master_clients)
enabled_transactions_++; enabled_transactions_++;
...@@ -1045,7 +1042,9 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, ...@@ -1045,7 +1042,9 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
* reserve the packet header. * reserve the packet header.
*/ */
if (sync) if (sync)
{
(packet)[2] = kPacketFlagSync; (packet)[2] = kPacketFlagSync;
}
return function_exit(kWho, 0); return function_exit(kWho, 0);
} }
...@@ -1098,8 +1097,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, ...@@ -1098,8 +1097,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
if insert tranx_node failed, print a warning message if insert tranx_node failed, print a warning message
and turn off semi-sync and turn off semi-sync
*/ */
sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %ul", sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
log_file_name, log_file_pos); log_file_name, (ulong)log_file_pos);
switch_off(); switch_off();
} }
} }
...@@ -1110,6 +1109,113 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, ...@@ -1110,6 +1109,113 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
return function_exit(kWho, result); return function_exit(kWho, result);
} }
int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
const char *event_buf)
{
const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
const unsigned char *packet;
char log_file_name[FN_REFLEN];
my_off_t log_file_pos;
ulong packet_len;
int result = -1;
struct timeval start_tv;
int start_time_err= 0;
ulong trc_level = trace_level_;
function_enter(kWho);
assert((unsigned char)event_buf[1] == kPacketMagicNum);
if ((unsigned char)event_buf[2] != kPacketFlagSync)
{
/* current event does not require reply */
result = 0;
goto l_end;
}
if (trc_level & kTraceNetWait)
start_time_err = gettimeofday(&start_tv, 0);
/* We flush to make sure that the current event is sent to the network,
* instead of being buffered in the TCP/IP stack.
*/
if (net_flush(net))
{
sql_print_error("Semi-sync master failed on net_flush() "
"before waiting for slave reply");
goto l_end;
}
net_clear(net, 0);
if (trc_level & kTraceDetail)
sql_print_information("%s: Wait for replica's reply", kWho);
/* Wait for the network here. Though binlog dump thread can indefinitely wait
* here, transactions would not wait indefintely.
* Transactions wait on binlog replies detected by binlog dump threads. If
* binlog dump threads wait too long, transactions will timeout and continue.
*/
packet_len = my_net_read(net);
if (trc_level & kTraceNetWait)
{
if (start_time_err != 0)
{
sql_print_error("Semi-sync master wait for reply "
"gettimeofday fail to get start time");
timefunc_fails_++;
}
else
{
int wait_time;
wait_time = getWaitTime(start_tv);
if (wait_time < 0)
{
sql_print_error("Semi-sync master wait for reply "
"gettimeofday fail to get wait time.");
timefunc_fails_++;
}
else
{
total_net_wait_num_++;
total_net_wait_time_ += wait_time;
}
}
}
if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
{
if (packet_len == packet_error)
sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
net->last_error, net->last_errno);
else
sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
net->last_error, net->last_errno);
goto l_end;
}
packet = net->read_pos;
if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
{
sql_print_error("Read semi-sync reply magic number error");
goto l_end;
}
log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
strcpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET);
if (trc_level & kTraceDetail)
sql_print_information("%s: Got reply (%s, %lu)",
kWho, log_file_name, (ulong)log_file_pos);
result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
l_end:
return function_exit(kWho, result);
}
int ReplSemiSyncMaster::resetMaster() int ReplSemiSyncMaster::resetMaster()
{ {
const char *kWho = "ReplSemiSyncMaster::resetMaster"; const char *kWho = "ReplSemiSyncMaster::resetMaster";
......
...@@ -81,7 +81,7 @@ class ActiveTranx ...@@ -81,7 +81,7 @@ class ActiveTranx
/* Insert an active transaction node with the specified position. /* Insert an active transaction node with the specified position.
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos); int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
...@@ -91,7 +91,7 @@ class ActiveTranx ...@@ -91,7 +91,7 @@ class ActiveTranx
* list and the hash table will be reset to empty. * list and the hash table will be reset to empty.
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int clear_active_tranx_nodes(const char *log_file_name, int clear_active_tranx_nodes(const char *log_file_name,
my_off_t log_file_pos); my_off_t log_file_pos);
...@@ -253,8 +253,6 @@ class ReplSemiSyncMaster ...@@ -253,8 +253,6 @@ class ReplSemiSyncMaster
/* Is the slave servered by the thread requested semi-sync */ /* Is the slave servered by the thread requested semi-sync */
bool is_semi_sync_slave(); bool is_semi_sync_slave();
int reportReplyBinlog(const char *log_file_pos);
/* In semi-sync replication, reports up to which binlog position we have /* In semi-sync replication, reports up to which binlog position we have
* received replies from the slave indicating that it already get the events. * received replies from the slave indicating that it already get the events.
* *
...@@ -265,7 +263,7 @@ class ReplSemiSyncMaster ...@@ -265,7 +263,7 @@ class ReplSemiSyncMaster
* the replies from the slave * the replies from the slave
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int reportReplyBinlog(uint32 server_id, int reportReplyBinlog(uint32 server_id,
const char* log_file_name, const char* log_file_name,
...@@ -284,7 +282,7 @@ class ReplSemiSyncMaster ...@@ -284,7 +282,7 @@ class ReplSemiSyncMaster
* trx_wait_binlog_pos - (IN) ending position's file offset * trx_wait_binlog_pos - (IN) ending position's file offset
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int commitTrx(const char* trx_wait_binlog_name, int commitTrx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos); my_off_t trx_wait_binlog_pos);
...@@ -313,7 +311,7 @@ class ReplSemiSyncMaster ...@@ -313,7 +311,7 @@ class ReplSemiSyncMaster
* server_id - (IN) master server id number * server_id - (IN) master server id number
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int updateSyncHeader(unsigned char *packet, int updateSyncHeader(unsigned char *packet,
const char *log_file_name, const char *log_file_name,
...@@ -330,10 +328,23 @@ class ReplSemiSyncMaster ...@@ -330,10 +328,23 @@ class ReplSemiSyncMaster
* log_file_pos - (IN) transaction ending position's file offset * log_file_pos - (IN) transaction ending position's file offset
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos); int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos);
/* Read the slave's reply so that we know how much progress the slave makes
* on receive replication events.
*
* Input:
* net - (IN) the connection to master
* server_id - (IN) master server id number
* event_buf - (IN) pointer to the event packet
*
* Return:
* 0: success; non-zero: error
*/
int readSlaveReply(NET *net, uint32 server_id, const char *event_buf);
/* Export internal statistics for semi-sync replication. */ /* Export internal statistics for semi-sync replication. */
void setExportStats(); void setExportStats();
......
...@@ -69,8 +69,16 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param, ...@@ -69,8 +69,16 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
bool semi_sync_slave= repl_semisync.is_semi_sync_slave(); bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
if (semi_sync_slave) if (semi_sync_slave)
{
/* One more semi-sync slave */ /* One more semi-sync slave */
repl_semisync.add_slave(); repl_semisync.add_slave();
/*
Let's assume this semi-sync slave has already received all
binlog events before the filename and position it requests.
*/
repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
}
sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)", sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
semi_sync_slave ? "semi-sync" : "asynchronous", semi_sync_slave ? "semi-sync" : "asynchronous",
param->server_id, log_file, (unsigned long)log_pos); param->server_id, log_file, (unsigned long)log_pos);
...@@ -114,6 +122,18 @@ int repl_semi_before_send_event(Binlog_transmit_param *param, ...@@ -114,6 +122,18 @@ int repl_semi_before_send_event(Binlog_transmit_param *param,
int repl_semi_after_send_event(Binlog_transmit_param *param, int repl_semi_after_send_event(Binlog_transmit_param *param,
const char *event_buf, unsigned long len) const char *event_buf, unsigned long len)
{ {
if (repl_semisync.is_semi_sync_slave())
{
THD *thd= current_thd;
/*
Possible errors in reading slave reply are ignored deliberately
because we do not want dump thread to quit on this. Error
messages are already reported.
*/
(void) repl_semisync.readSlaveReply(&thd->net,
param->server_id, event_buf);
thd->clear_error();
}
return 0; return 0;
} }
...@@ -142,11 +162,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd, ...@@ -142,11 +162,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
void *ptr, void *ptr,
const void *val); const void *val);
static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val);
static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled, static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
PLUGIN_VAR_OPCMDARG, PLUGIN_VAR_OPCMDARG,
"Enable semi-synchronous replication master (disabled by default). ", "Enable semi-synchronous replication master (disabled by default). ",
...@@ -168,22 +183,10 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level, ...@@ -168,22 +183,10 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
&fix_rpl_semi_sync_master_trace_level, // update &fix_rpl_semi_sync_master_trace_level, // update
32, 0, ~0L, 1); 32, 0, ~0L, 1);
/*
Use a SESSION instead of GLOBAL variable for slave to send reply to
avoid requiring SUPER privilege.
*/
static MYSQL_THDVAR_STR(reply_log_file_pos,
PLUGIN_VAR_NOCMDOPT,
"The log filename and position slave has queued to relay log.",
NULL, // check
&fix_rpl_semi_sync_master_reply_log_file_pos,
"");
static SYS_VAR* semi_sync_master_system_vars[]= { static SYS_VAR* semi_sync_master_system_vars[]= {
MYSQL_SYSVAR(enabled), MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(timeout), MYSQL_SYSVAR(timeout),
MYSQL_SYSVAR(trace_level), MYSQL_SYSVAR(trace_level),
MYSQL_SYSVAR(reply_log_file_pos),
NULL, NULL,
}; };
...@@ -228,19 +231,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd, ...@@ -228,19 +231,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
return; return;
} }
static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
const char *log_file_pos= *(char **)val;
if (repl_semisync.reportReplyBinlog(log_file_pos))
sql_print_error("report slave binlog reply failed.");
return;
}
Trans_observer trans_observer = { Trans_observer trans_observer = {
sizeof(Trans_observer), // len sizeof(Trans_observer), // len
......
...@@ -104,19 +104,45 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param) ...@@ -104,19 +104,45 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
return 0; return 0;
} }
int ReplSemiSyncSlave::slaveReply(const char *log_name, my_off_t log_pos) int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
const char *binlog_filename,
my_off_t binlog_filepos)
{ {
char query[FN_REFLEN + 100]; const char *kWho = "ReplSemiSyncSlave::slaveReply";
sprintf(query, "SET SESSION rpl_semi_sync_master_reply_log_file_pos='%llu:%s'", NET *net= &mysql->net;
(unsigned long long)log_pos, log_name); uchar reply_buffer[REPLY_MAGIC_NUM_LEN
if (mysql_real_query(mysql_reply, query, strlen(query))) + REPLY_BINLOG_POS_LEN
+ REPLY_BINLOG_NAME_LEN];
int reply_res, name_len = strlen(binlog_filename);
function_enter(kWho);
/* Prepare the buffer of the reply. */
reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum;
int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
binlog_filename,
name_len + 1 /* including trailing '\0' */);
if (trace_level_ & kTraceDetail)
sql_print_information("%s: reply (%s, %lu)", kWho,
binlog_filename, (ulong)binlog_filepos);
net_clear(net, 0);
/* Send the reply. */
reply_res = my_net_write(net, reply_buffer,
name_len + REPLY_BINLOG_NAME_OFFSET);
if (!reply_res)
{ {
sql_print_error("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed"); reply_res = net_flush(net);
mysql_free_result(mysql_store_result(mysql_reply)); if (reply_res)
mysql_close(mysql_reply); sql_print_error("Semi-sync slave net_flush() reply failed");
mysql_reply= 0;
return 1;
} }
mysql_free_result(mysql_store_result(mysql_reply)); else
return 0; {
sql_print_error("Semi-sync slave send reply failed: %s (%d)",
net->last_error, net->last_errno);
}
return function_exit(kWho, reply_res);
} }
...@@ -57,7 +57,7 @@ class ReplSemiSyncSlave ...@@ -57,7 +57,7 @@ class ReplSemiSyncSlave
* payload_len - (IN) payload length * payload_len - (IN) payload length
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply, int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
const char **payload, unsigned long *payload_len); const char **payload, unsigned long *payload_len);
...@@ -67,13 +67,15 @@ class ReplSemiSyncSlave ...@@ -67,13 +67,15 @@ class ReplSemiSyncSlave
* binlog position. * binlog position.
* *
* Input: * Input:
* log_name - (IN) the reply point's binlog file name * mysql - (IN) the mysql network connection
* log_pos - (IN) the reply point's binlog file offset * binlog_filename - (IN) the reply point's binlog file name
* binlog_filepos - (IN) the reply point's binlog file offset
* *
* Return: * Return:
* 0: success; -1 or otherwise: error * 0: success; non-zero: error
*/ */
int slaveReply(const char *log_name, my_off_t log_pos); int slaveReply(MYSQL *mysql, const char *binlog_filename,
my_off_t binlog_filepos);
/* /*
Connect to master for sending reply Connect to master for sending reply
......
...@@ -45,13 +45,6 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param, ...@@ -45,13 +45,6 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
if (!repl_semisync.getSlaveEnabled()) if (!repl_semisync.getSlaveEnabled())
return 0; return 0;
/*
Create the connection that is used to send slave ACK replies to
master
*/
if (repl_semisync.slaveReplyConnect())
return 1;
/* Check if master server has semi-sync plugin installed */ /* Check if master server has semi-sync plugin installed */
query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"; query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
if (mysql_real_query(mysql, query, strlen(query)) || if (mysql_real_query(mysql, query, strlen(query)) ||
...@@ -106,7 +99,8 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param, ...@@ -106,7 +99,8 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
uint32 flags) uint32 flags)
{ {
if (rpl_semi_sync_slave_status && semi_sync_need_reply) if (rpl_semi_sync_slave_status && semi_sync_need_reply)
return repl_semisync.slaveReply(param->master_log_name, return repl_semisync.slaveReply(param->mysql,
param->master_log_name,
param->master_log_pos); param->master_log_pos);
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment