Commit 623ed58c authored by He Zhenxing's avatar He Zhenxing

Backporting WL#4398 WL#1720

Backporting BUG#44058 BUG#42244 BUG#45672 BUG#45673
Backporting BUG#45819 BUG#45973 BUG#39012
parent e465d113
[MYSQL]
post_commit_to = "commits@lists.mysql.com"
post_push_to = "commits@lists.mysql.com"
tree_name = "mysql-5.1"
tree_name = "mysql-5.1-rep-semisync"
......@@ -16,6 +16,11 @@
#ifndef _my_plugin_h
#define _my_plugin_h
/* size_t */
#include <stdlib.h>
typedef struct st_mysql MYSQL;
/*
On Windows, exports from DLL need to be declared
......@@ -75,7 +80,8 @@ typedef struct st_mysql_xid MYSQL_XID;
#define MYSQL_FTPARSER_PLUGIN 2 /* Full-text parser plugin */
#define MYSQL_DAEMON_PLUGIN 3 /* The daemon/raw plugin type */
#define MYSQL_INFORMATION_SCHEMA_PLUGIN 4 /* The I_S plugin type */
#define MYSQL_MAX_PLUGIN_TYPE_NUM 5 /* The number of plugin types */
#define MYSQL_REPLICATION_PLUGIN 5 /* The replication plugin type */
#define MYSQL_MAX_PLUGIN_TYPE_NUM 6 /* The number of plugin types */
/* We use the following strings to define licenses for plugins */
#define PLUGIN_LICENSE_PROPRIETARY 0
......@@ -650,6 +656,17 @@ struct st_mysql_information_schema
int interface_version;
};
/*
API for Replication plugin. (MYSQL_REPLICATION_PLUGIN)
*/
#define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100
/**
Replication plugin descriptor
*/
struct Mysql_replication {
int interface_version;
};
/*
st_mysql_value struct for reading values from mysqld.
......@@ -801,6 +818,64 @@ void mysql_query_cache_invalidate4(MYSQL_THD thd,
const char *key, unsigned int key_length,
int using_trx);
/**
Get the value of user variable as an integer.
This function will return the value of variable @a name as an
integer. If the original value of the variable is not an integer,
the value will be converted into an integer.
@param name user variable name
@param value pointer to return the value
@param null_value if not NULL, the function will set it to true if
the value of variable is null, set to false if not
@retval 0 Success
@retval 1 Variable not found
*/
int get_user_var_int(const char *name,
long long int *value, int *null_value);
/**
Get the value of user variable as a double precision float number.
This function will return the value of variable @a name as real
number. If the original value of the variable is not a real number,
the value will be converted into a real number.
@param name user variable name
@param value pointer to return the value
@param null_value if not NULL, the function will set it to true if
the value of variable is null, set to false if not
@retval 0 Success
@retval 1 Variable not found
*/
int get_user_var_real(const char *name,
double *value, int *null_value);
/**
Get the value of user variable as a string.
This function will return the value of variable @a name as
string. If the original value of the variable is not a string,
the value will be converted into a string.
@param name user variable name
@param value pointer to the value buffer
@param len length of the value buffer
@param precision precision of the value if it is a float number
@param null_value if not NULL, the function will set it to true if
the value of variable is null, set to false if not
@retval 0 Success
@retval 1 Variable not found
*/
int get_user_var_str(const char *name,
char *value, unsigned long len,
unsigned int precision, int *null_value);
#ifdef __cplusplus
}
#endif
......
#include <stdlib.h>
typedef struct st_mysql MYSQL;
struct st_mysql_lex_string
{
char *str;
......@@ -105,6 +107,9 @@ struct st_mysql_information_schema
{
int interface_version;
};
struct Mysql_replication {
int interface_version;
};
struct st_mysql_value
{
int (*value_type)(struct st_mysql_value *);
......@@ -137,3 +142,10 @@ void thd_get_xid(const void* thd, MYSQL_XID *xid);
void mysql_query_cache_invalidate4(void* thd,
const char *key, unsigned int key_length,
int using_trx);
int get_user_var_int(const char *name,
long long int *value, int *null_value);
int get_user_var_real(const char *name,
double *value, int *null_value);
int get_user_var_str(const char *name,
char *value, unsigned long len,
unsigned int precision, int *null_value);
......@@ -139,6 +139,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc
../sql/time.cc ../sql/tztime.cc ../sql/uniques.cc ../sql/unireg.cc
../sql/partition_info.cc ../sql/sql_connect.cc
../sql/scheduler.cc ../sql/event_parse_data.cc
../sql/rpl_handler.cc
${GEN_SOURCES}
${LIB_SOURCES})
......
......@@ -76,7 +76,8 @@ sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \
rpl_filter.cc sql_partition.cc sql_builtin.cc sql_plugin.cc \
sql_tablespace.cc \
rpl_injector.cc my_user.c partition_info.cc \
sql_servers.cc event_parse_data.cc
sql_servers.cc event_parse_data.cc \
rpl_handler.cc
libmysqld_int_a_SOURCES= $(libmysqld_sources)
nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources)
......
......@@ -1815,6 +1815,26 @@ sub environment_setup {
$ENV{'EXAMPLE_PLUGIN_LOAD'}="--plugin_load=;EXAMPLE=".$plugin_filename.";";
}
# --------------------------------------------------------------------------
# Add the path where mysqld will find semisync plugins
# --------------------------------------------------------------------------
my $lib_semisync_master_plugin=
mtr_file_exists("$basedir/plugin/semisync/.libs/libsemisync_master.so");
my $lib_semisync_slave_plugin=
mtr_file_exists("$basedir/plugin/semisync/.libs/libsemisync_slave.so");
if ($lib_semisync_master_plugin && $lib_semisync_slave_plugin)
{
$ENV{'SEMISYNC_MASTER_PLUGIN'}= basename($lib_semisync_master_plugin);
$ENV{'SEMISYNC_SLAVE_PLUGIN'}= basename($lib_semisync_slave_plugin);
$ENV{'SEMISYNC_PLUGIN_OPT'}= "--plugin-dir=".dirname($lib_semisync_master_plugin);
}
else
{
$ENV{'SEMISYNC_MASTER_PLUGIN'}= "";
$ENV{'SEMISYNC_SLAVE_PLUGIN'}= "";
$ENV{'SEMISYNC_PLUGIN_OPT'}="";
}
# ----------------------------------------------------
# Add the path where mysqld will find mypluglib.so
# ----------------------------------------------------
......
......@@ -6,6 +6,7 @@ grant replication slave on *.* to replicate@localhost identified by 'aaaaaaaaaaa
grant replication slave on *.* to replicate@127.0.0.1 identified by 'aaaaaaaaaaaaaaab';
connection slave;
start slave;
source include/wait_for_slave_to_start.inc;
connection master;
--disable_warnings
drop table if exists t1;
......
# Copyright (C) 2006 MySQL AB
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
## Makefile.am for semi-synchronous replication
pkgplugindir = $(pkglibdir)/plugin
INCLUDES = -I$(top_srcdir)/include \
-I$(top_srcdir)/sql \
-I$(srcdir)
noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h
pkgplugin_LTLIBRARIES = libsemisync_master.la libsemisync_slave.la
libsemisync_master_la_LDFLAGS = -module
libsemisync_master_la_CXXFLAGS= $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_master_la_CFLAGS = $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_master_la_SOURCES = semisync.cc semisync_master.cc semisync_master_plugin.cc
libsemisync_slave_la_LDFLAGS = -module
libsemisync_slave_la_CXXFLAGS= $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_slave_la_CFLAGS = $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_slave_la_SOURCES = semisync.cc semisync_slave.cc semisync_slave_plugin.cc
# configure.in for semi-synchronous replication
AC_INIT(mysql-semi-sync-plugin, 0.2)
AM_INIT_AUTOMAKE
AC_DISABLE_STATIC
AC_PROG_LIBTOOL
AC_CONFIG_FILES([Makefile])
AC_OUTPUT
MYSQL_PLUGIN(semisync,[Semi-synchronous Replication Plugin],
[Semi-synchronous replication plugin.])
MYSQL_PLUGIN_DYNAMIC(semisync, [libsemisync_master.la libsemisync_slave.la])
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "semisync.h"
const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
const unsigned long Trace::kTraceGeneral = 0x0001;
const unsigned long Trace::kTraceDetail = 0x0010;
const unsigned long Trace::kTraceNetWait = 0x0020;
const unsigned long Trace::kTraceFunction = 0x0040;
const char ReplSemiSyncBase::kSyncHeader[2] =
{ReplSemiSyncBase::kPacketMagicNum, 0};
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef SEMISYNC_H
#define SEMISYNC_H
#include <stdint.h>
#include <string.h>
#include <assert.h>
#include <sys/time.h>
#include <time.h>
#include <stdio.h>
#include <pthread.h>
#include <mysql.h>
typedef uint32_t uint32;
typedef unsigned long long my_off_t;
#define FN_REFLEN 512 /* Max length of full path-name */
void sql_print_error(const char *format, ...);
void sql_print_warning(const char *format, ...);
void sql_print_information(const char *format, ...);
extern unsigned long max_connections;
#define MYSQL_SERVER
#define HAVE_REPLICATION
#include <my_global.h>
#include <my_pthread.h>
#include <mysql/plugin.h>
#include <replication.h>
typedef struct st_mysql_show_var SHOW_VAR;
typedef struct st_mysql_sys_var SYS_VAR;
/**
This class is used to trace function calls and other process
information
*/
class Trace {
public:
static const unsigned long kTraceFunction;
static const unsigned long kTraceGeneral;
static const unsigned long kTraceDetail;
static const unsigned long kTraceNetWait;
unsigned long trace_level_; /* the level for tracing */
inline void function_enter(const char *func_name)
{
if (trace_level_ & kTraceFunction)
sql_print_information("---> %s enter", func_name);
}
inline int function_exit(const char *func_name, int exit_code)
{
if (trace_level_ & kTraceFunction)
sql_print_information("<--- %s exit (%d)", func_name, exit_code);
return exit_code;
}
Trace()
:trace_level_(0L)
{}
Trace(unsigned long trace_level)
:trace_level_(trace_level)
{}
};
/**
Base class for semi-sync master and slave classes
*/
class ReplSemiSyncBase
:public Trace {
public:
static const char kSyncHeader[2]; /* three byte packet header */
/* Constants in network packet header. */
static const unsigned char kPacketMagicNum;
static const unsigned char kPacketFlagSync;
};
#endif /* SEMISYNC_H */
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "semisync_master.h"
#define TIME_THOUSAND 1000
#define TIME_MILLION 1000000
#define TIME_BILLION 1000000000
/* This indicates whether semi-synchronous replication is enabled. */
char rpl_semi_sync_master_enabled;
unsigned long rpl_semi_sync_master_timeout;
unsigned long rpl_semi_sync_master_trace_level;
unsigned long rpl_semi_sync_master_status = 0;
unsigned long rpl_semi_sync_master_yes_transactions = 0;
unsigned long rpl_semi_sync_master_no_transactions = 0;
unsigned long rpl_semi_sync_master_off_times = 0;
unsigned long rpl_semi_sync_master_timefunc_fails = 0;
unsigned long rpl_semi_sync_master_num_timeouts = 0;
unsigned long rpl_semi_sync_master_wait_sessions = 0;
unsigned long rpl_semi_sync_master_back_wait_pos = 0;
unsigned long rpl_semi_sync_master_trx_wait_time = 0;
unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
unsigned long rpl_semi_sync_master_net_wait_time = 0;
unsigned long long rpl_semi_sync_master_net_wait_num = 0;
unsigned long rpl_semi_sync_master_clients = 0;
unsigned long long rpl_semi_sync_master_net_wait_total_time = 0;
unsigned long long rpl_semi_sync_master_trx_wait_total_time = 0;
static int getWaitTime(const struct timeval& start_tv);
/*******************************************************************************
*
* <ActiveTranx> class : manage all active transaction nodes
*
******************************************************************************/
ActiveTranx::ActiveTranx(int max_connections,
pthread_mutex_t *lock,
unsigned long trace_level)
: Trace(trace_level), num_transactions_(max_connections),
num_entries_(max_connections << 1),
lock_(lock)
{
/* Allocate the memory for the array */
node_array_ = new TranxNode[num_transactions_];
for (int idx = 0; idx < num_transactions_; ++idx)
{
node_array_[idx].log_pos_ = 0;
node_array_[idx].hash_next_ = NULL;
node_array_[idx].next_ = node_array_ + idx + 1;
node_array_[idx].log_name_ = new char[FN_REFLEN];
node_array_[idx].log_name_[0] = '\x0';
}
node_array_[num_transactions_-1].next_ = NULL;
/* All nodes in the array go to the pool initially. */
free_pool_ = node_array_;
/* No transactions are in the list initially. */
trx_front_ = NULL;
trx_rear_ = NULL;
/* Create the hash table to find a transaction's ending event. */
trx_htb_ = new TranxNode *[num_entries_];
for (int idx = 0; idx < num_entries_; ++idx)
trx_htb_[idx] = NULL;
sql_print_information("Semi-sync replication initialized for %d "
"transactions.", num_transactions_);
}
ActiveTranx::~ActiveTranx()
{
for (int idx = 0; idx < num_transactions_; ++idx)
{
delete [] node_array_[idx].log_name_;
node_array_[idx].log_name_ = NULL;
}
delete [] node_array_;
delete [] trx_htb_;
node_array_ = NULL;
trx_htb_ = NULL;
num_transactions_ = 0;
num_entries_ = 0;
}
unsigned int ActiveTranx::calc_hash(const unsigned char *key,
unsigned int length)
{
unsigned int nr = 1, nr2 = 4;
/* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
while (length--)
{
nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
nr2 += 3;
}
return((unsigned int) nr);
}
unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
my_off_t log_file_pos)
{
unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
strlen(log_file_name));
unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
sizeof(log_file_pos));
return (hash1 + hash2) % num_entries_;
}
ActiveTranx::TranxNode* ActiveTranx::alloc_tranx_node()
{
TranxNode *ptr = free_pool_;
if (free_pool_)
{
free_pool_ = free_pool_->next_;
ptr->next_ = NULL;
ptr->hash_next_ = NULL;
}
else
{
/*
free_pool should never be NULL here, because we have
max_connections number of pre-allocated nodes.
*/
sql_print_error("You have encountered a semi-sync bug (free_pool == NULL), "
"please report to http://bugs.mysql.com");
assert(free_pool_);
}
return ptr;
}
int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
const char *log_file_name2, my_off_t log_file_pos2)
{
int cmp = strcmp(log_file_name1, log_file_name2);
if (cmp != 0)
return cmp;
if (log_file_pos1 > log_file_pos2)
return 1;
else if (log_file_pos1 < log_file_pos2)
return -1;
return 0;
}
int ActiveTranx::insert_tranx_node(const char *log_file_name,
my_off_t log_file_pos)
{
const char *kWho = "ActiveTranx:insert_tranx_node";
TranxNode *ins_node;
int result = 0;
unsigned int hash_val;
function_enter(kWho);
ins_node = alloc_tranx_node();
if (!ins_node)
{
sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
kWho, log_file_name, (unsigned long)log_file_pos);
result = -1;
goto l_end;
}
/* insert the binlog position in the active transaction list. */
strcpy(ins_node->log_name_, log_file_name);
ins_node->log_pos_ = log_file_pos;
if (!trx_front_)
{
/* The list is empty. */
trx_front_ = trx_rear_ = ins_node;
}
else
{
int cmp = compare(ins_node, trx_rear_);
if (cmp > 0)
{
/* Compare with the tail first. If the transaction happens later in
* binlog, then make it the new tail.
*/
trx_rear_->next_ = ins_node;
trx_rear_ = ins_node;
}
else
{
/* Otherwise, it is an error because the transaction should hold the
* mysql_bin_log.LOCK_log when appending events.
*/
sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
"new node (%s, %lu)", kWho,
trx_rear_->log_name_, (unsigned long)trx_rear_->log_pos_,
ins_node->log_name_, (unsigned long)ins_node->log_pos_);
result = -1;
goto l_end;
}
}
hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
ins_node->hash_next_ = trx_htb_[hash_val];
trx_htb_[hash_val] = ins_node;
if (trace_level_ & kTraceDetail)
sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho,
ins_node->log_name_, (unsigned long)ins_node->log_pos_,
hash_val);
l_end:
return function_exit(kWho, result);
}
bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
my_off_t log_file_pos)
{
const char *kWho = "ActiveTranx::is_tranx_end_pos";
function_enter(kWho);
unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
TranxNode *entry = trx_htb_[hash_val];
while (entry != NULL)
{
if (compare(entry, log_file_name, log_file_pos) == 0)
break;
entry = entry->hash_next_;
}
if (trace_level_ & kTraceDetail)
sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
log_file_name, (unsigned long)log_file_pos, hash_val);
function_exit(kWho, (entry != NULL));
return (entry != NULL);
}
int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
my_off_t log_file_pos)
{
const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
TranxNode *new_front;
function_enter(kWho);
if (log_file_name != NULL)
{
new_front = trx_front_;
while (new_front)
{
if (compare(new_front, log_file_name, log_file_pos) > 0)
break;
new_front = new_front->next_;
}
}
else
{
/* If log_file_name is NULL, clear everything. */
new_front = NULL;
}
if (new_front == NULL)
{
/* No active transaction nodes after the call. */
/* Clear the hash table. */
memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
/* Clear the active transaction list. */
if (trx_front_ != NULL)
{
trx_rear_->next_ = free_pool_;
free_pool_ = trx_front_;
trx_front_ = NULL;
trx_rear_ = NULL;
}
if (trace_level_ & kTraceDetail)
sql_print_information("%s: free all nodes back to free list", kWho);
}
else if (new_front != trx_front_)
{
TranxNode *curr_node, *next_node;
/* Delete all transaction nodes before the confirmation point. */
int n_frees = 0;
curr_node = trx_front_;
while (curr_node != new_front)
{
next_node = curr_node->next_;
/* Put the node in the memory pool. */
curr_node->next_ = free_pool_;
free_pool_ = curr_node;
n_frees++;
/* Remove the node from the hash table. */
unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
TranxNode **hash_ptr = &(trx_htb_[hash_val]);
while ((*hash_ptr) != NULL)
{
if ((*hash_ptr) == curr_node)
{
(*hash_ptr) = curr_node->hash_next_;
break;
}
hash_ptr = &((*hash_ptr)->hash_next_);
}
curr_node = next_node;
}
trx_front_ = new_front;
if (trace_level_ & kTraceDetail)
sql_print_information("%s: free %d nodes back until pos (%s, %lu)",
kWho, n_frees,
trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
}
return function_exit(kWho, 0);
}
/*******************************************************************************
*
* <ReplSemiSyncMaster> class: the basic code layer for sync-replication master.
* <ReplSemiSyncSlave> class: the basic code layer for sync-replication slave.
*
* The most important functions during semi-syn replication listed:
*
* Master:
* . reportReplyBinlog(): called by the binlog dump thread when it receives
* the slave's status information.
* . updateSyncHeader(): based on transaction waiting information, decide
* whether to request the slave to reply.
* . writeTraxInBinlog(): called by the transaction thread when it finishes
* writing all transaction events in binlog.
* . commitTrx(): transaction thread wait for the slave reply.
*
* Slave:
* . slaveReadSyncHeader(): read the semi-sync header from the master, get the
* sync status and get the payload for events.
* . slaveReply(): reply to the master about the replication progress.
*
******************************************************************************/
ReplSemiSyncMaster::ReplSemiSyncMaster()
: active_tranxs_(NULL),
init_done_(false),
reply_file_name_inited_(false),
reply_file_pos_(0L),
wait_file_name_inited_(false),
wait_file_pos_(0),
master_enabled_(false),
wait_timeout_(0L),
state_(0),
enabled_transactions_(0),
disabled_transactions_(0),
switched_off_times_(0),
timefunc_fails_(0),
wait_sessions_(0),
wait_backtraverse_(0),
total_trx_wait_num_(0),
total_trx_wait_time_(0),
total_net_wait_num_(0),
total_net_wait_time_(0),
max_transactions_(0L)
{
strcpy(reply_file_name_, "");
strcpy(wait_file_name_, "");
}
int ReplSemiSyncMaster::initObject()
{
int result;
const char *kWho = "ReplSemiSyncMaster::initObject";
if (init_done_)
{
fprintf(stderr, "%s called twice\n", kWho);
return 1;
}
init_done_ = true;
/* References to the parameter works after set_options(). */
setWaitTimeout(rpl_semi_sync_master_timeout);
setTraceLevel(rpl_semi_sync_master_trace_level);
max_transactions_ = (int)max_connections;
/* Mutex initialization can only be done after MY_INIT(). */
pthread_mutex_init(&LOCK_binlog_, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_binlog_send_, NULL);
if (rpl_semi_sync_master_enabled)
result = enableMaster();
else
result = disableMaster();
return result;
}
int ReplSemiSyncMaster::enableMaster()
{
int result = 0;
/* Must have the lock when we do enable of disable. */
lock();
if (!getMasterEnabled())
{
active_tranxs_ = new ActiveTranx(max_connections,
&LOCK_binlog_,
trace_level_);
if (active_tranxs_ != NULL)
{
commit_file_name_inited_ = false;
reply_file_name_inited_ = false;
wait_file_name_inited_ = false;
set_master_enabled(true);
state_ = true;
sql_print_information("Semi-sync replication enabled on the master.");
}
else
{
sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
result = -1;
}
}
unlock();
return result;
}
int ReplSemiSyncMaster::disableMaster()
{
/* Must have the lock when we do enable of disable. */
lock();
if (getMasterEnabled())
{
/* Switch off the semi-sync first so that waiting transaction will be
* waken up.
*/
switch_off();
assert(active_tranxs_ != NULL);
delete active_tranxs_;
active_tranxs_ = NULL;
reply_file_name_inited_ = false;
wait_file_name_inited_ = false;
commit_file_name_inited_ = false;
set_master_enabled(false);
sql_print_information("Semi-sync replication disabled on the master.");
}
unlock();
return 0;
}
ReplSemiSyncMaster::~ReplSemiSyncMaster()
{
if (init_done_)
{
pthread_mutex_destroy(&LOCK_binlog_);
pthread_cond_destroy(&COND_binlog_send_);
}
delete active_tranxs_;
}
void ReplSemiSyncMaster::lock()
{
pthread_mutex_lock(&LOCK_binlog_);
}
void ReplSemiSyncMaster::unlock()
{
pthread_mutex_unlock(&LOCK_binlog_);
}
void ReplSemiSyncMaster::cond_broadcast()
{
pthread_cond_broadcast(&COND_binlog_send_);
}
int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time)
{
const char *kWho = "ReplSemiSyncMaster::cond_timewait()";
int wait_res;
function_enter(kWho);
wait_res = pthread_cond_timedwait(&COND_binlog_send_,
&LOCK_binlog_, wait_time);
return function_exit(kWho, wait_res);
}
void ReplSemiSyncMaster::add_slave()
{
lock();
rpl_semi_sync_master_clients++;
unlock();
}
void ReplSemiSyncMaster::remove_slave()
{
lock();
rpl_semi_sync_master_clients--;
unlock();
}
bool ReplSemiSyncMaster::is_semi_sync_slave()
{
int null_value;
long long val= 0;
get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
return val;
}
int ReplSemiSyncMaster::reportReplyBinlog(const char *log_file_pos)
{
char log_name[FN_REFLEN];
char *endptr;
my_off_t log_pos= strtoull(log_file_pos, &endptr, 10);
if (!log_pos || !endptr || *endptr != ':' )
return 1;
endptr++; // skip the ':' seperator
strncpy(log_name, endptr, FN_REFLEN);
uint32 server_id= 0;
return reportReplyBinlog(server_id, log_name, log_pos);
}
int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
const char *log_file_name,
my_off_t log_file_pos)
{
const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
int cmp;
bool can_release_threads = false;
bool need_copy_send_pos = true;
if (!(getMasterEnabled()))
return 0;
function_enter(kWho);
lock();
/* This is the real check inside the mutex. */
if (!getMasterEnabled())
goto l_end;
if (!is_on())
/* We check to see whether we can switch semi-sync ON. */
try_switch_on(server_id, log_file_name, log_file_pos);
/* The position should increase monotonically, if there is only one
* thread sending the binlog to the slave.
* In reality, to improve the transaction availability, we allow multiple
* sync replication slaves. So, if any one of them get the transaction,
* the transaction session in the primary can move forward.
*/
if (reply_file_name_inited_)
{
cmp = ActiveTranx::compare(log_file_name, log_file_pos,
reply_file_name_, reply_file_pos_);
/* If the requested position is behind the sending binlog position,
* would not adjust sending binlog position.
* We based on the assumption that there are multiple semi-sync slave,
* and at least one of them shou/ld be up to date.
* If all semi-sync slaves are behind, at least initially, the primary
* can find the situation after the waiting timeout. After that, some
* slaves should catch up quickly.
*/
if (cmp < 0)
{
/* If the position is behind, do not copy it. */
need_copy_send_pos = false;
}
}
if (need_copy_send_pos)
{
strcpy(reply_file_name_, log_file_name);
reply_file_pos_ = log_file_pos;
reply_file_name_inited_ = true;
/* Remove all active transaction nodes before this point. */
assert(active_tranxs_ != NULL);
active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
if (trace_level_ & kTraceDetail)
sql_print_information("%s: Got reply at (%s, %lu)", kWho,
log_file_name, (unsigned long)log_file_pos);
}
if (wait_sessions_ > 0)
{
/* Let us check if some of the waiting threads doing a trx
* commit can now proceed.
*/
cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
wait_file_name_, wait_file_pos_);
if (cmp >= 0)
{
/* Yes, at least one waiting thread can now proceed:
* let us release all waiting threads with a broadcast
*/
can_release_threads = true;
wait_file_name_inited_ = false;
}
}
l_end:
unlock();
if (can_release_threads)
{
if (trace_level_ & kTraceDetail)
sql_print_information("%s: signal all waiting threads.", kWho);
cond_broadcast();
}
return function_exit(kWho, 0);
}
int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos)
{
const char *kWho = "ReplSemiSyncMaster::commitTrx";
function_enter(kWho);
if (getMasterEnabled() && trx_wait_binlog_name)
{
struct timeval start_tv;
struct timespec abstime;
int wait_result, start_time_err;
const char *old_msg= 0;
start_time_err = gettimeofday(&start_tv, 0);
/* Acquire the mutex. */
lock();
/* This must be called after acquired the lock */
old_msg= thd_enter_cond(NULL, &COND_binlog_send_, &LOCK_binlog_,
"Waiting for semi-sync ACK from slave");
/* This is the real check inside the mutex. */
if (!getMasterEnabled() || !is_on() || !rpl_semi_sync_master_clients)
goto l_end;
if (trace_level_ & kTraceDetail)
{
sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho,
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
(int)is_on());
}
while (is_on())
{
int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
trx_wait_binlog_name, trx_wait_binlog_pos);
if (cmp >= 0)
{
/* We have already sent the relevant binlog to the slave: no need to
* wait here.
*/
if (trace_level_ & kTraceDetail)
sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
kWho, reply_file_name_, (unsigned long)reply_file_pos_);
break;
}
/* Let us update the info about the minimum binlog position of waiting
* threads.
*/
if (wait_file_name_inited_)
{
cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
wait_file_name_, wait_file_pos_);
if (cmp <= 0)
{
/* This thd has a lower position, let's update the minimum info. */
strcpy(wait_file_name_, trx_wait_binlog_name);
wait_file_pos_ = trx_wait_binlog_pos;
wait_backtraverse_++;
if (trace_level_ & kTraceDetail)
sql_print_information("%s: move back wait position (%s, %lu),",
kWho, wait_file_name_, (unsigned long)wait_file_pos_);
}
}
else
{
strcpy(wait_file_name_, trx_wait_binlog_name);
wait_file_pos_ = trx_wait_binlog_pos;
wait_file_name_inited_ = true;
if (trace_level_ & kTraceDetail)
sql_print_information("%s: init wait position (%s, %lu),",
kWho, wait_file_name_, (unsigned long)wait_file_pos_);
}
if (start_time_err == 0)
{
int diff_usecs = start_tv.tv_usec + wait_timeout_ * TIME_THOUSAND;
/* Calcuate the waiting period. */
abstime.tv_sec = start_tv.tv_sec;
if (diff_usecs < TIME_MILLION)
{
abstime.tv_nsec = diff_usecs * TIME_THOUSAND;
}
else
{
while (diff_usecs >= TIME_MILLION)
{
abstime.tv_sec++;
diff_usecs -= TIME_MILLION;
}
abstime.tv_nsec = diff_usecs * TIME_THOUSAND;
}
/* In semi-synchronous replication, we wait until the binlog-dump
* thread has received the reply on the relevant binlog segment from the
* replication slave.
*
* Let us suspend this thread to wait on the condition;
* when replication has progressed far enough, we will release
* these waiting threads.
*/
wait_sessions_++;
if (trace_level_ & kTraceDetail)
sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
kWho, wait_timeout_,
wait_file_name_, (unsigned long)wait_file_pos_);
wait_result = cond_timewait(&abstime);
wait_sessions_--;
if (wait_result != 0)
{
/* This is a real wait timeout. */
sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
"semi-sync up to file %s, position %lu.",
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
reply_file_name_, (unsigned long)reply_file_pos_);
total_wait_timeouts_++;
/* switch semi-sync off */
switch_off();
}
else
{
int wait_time;
wait_time = getWaitTime(start_tv);
if (wait_time < 0)
{
if (trace_level_ & kTraceGeneral)
{
/* This is a time/gettimeofday function call error. */
sql_print_error("Replication semi-sync gettimeofday fail1 at "
"wait position (%s, %lu)",
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos);
}
timefunc_fails_++;
}
else
{
total_trx_wait_num_++;
total_trx_wait_time_ += wait_time;
}
}
}
else
{
if (trace_level_ & kTraceGeneral)
{
/* This is a gettimeofday function call error. */
sql_print_error("Replication semi-sync gettimeofday fail2 at "
"wait position (%s, %lu)",
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos);
}
timefunc_fails_++;
/* switch semi-sync off */
switch_off();
}
}
l_end:
/* Update the status counter. */
if (is_on() && rpl_semi_sync_master_clients)
enabled_transactions_++;
else
disabled_transactions_++;
/* The lock held will be released by thd_exit_cond, so no need to
call unlock() here */
thd_exit_cond(NULL, old_msg);
}
return function_exit(kWho, 0);
}
/* Indicate that semi-sync replication is OFF now.
*
* What should we do when it is disabled? The problem is that we want
* the semi-sync replication enabled again when the slave catches up
* later. But, it is not that easy to detect that the slave has caught
* up. This is caused by the fact that MySQL's replication protocol is
* asynchronous, meaning that if the master does not use the semi-sync
* protocol, the slave would not send anything to the master.
* Still, if the master is sending (N+1)-th event, we assume that it is
* an indicator that the slave has received N-th event and earlier ones.
*
* If semi-sync is disabled, all transactions still update the wait
* position with the last position in binlog. But no transactions will
* wait for confirmations and the active transaction list would not be
* maintained. In binlog dump thread, updateSyncHeader() checks whether
* the current sending event catches up with last wait position. If it
* does match, semi-sync will be switched on again.
*/
int ReplSemiSyncMaster::switch_off()
{
const char *kWho = "ReplSemiSyncMaster::switch_off";
int result;
function_enter(kWho);
state_ = false;
/* Clear the active transaction list. */
assert(active_tranxs_ != NULL);
result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
switched_off_times_++;
wait_file_name_inited_ = false;
reply_file_name_inited_ = false;
sql_print_information("Semi-sync replication switched OFF.");
cond_broadcast(); /* wake up all waiting threads */
return function_exit(kWho, result);
}
int ReplSemiSyncMaster::try_switch_on(int server_id,
const char *log_file_name,
my_off_t log_file_pos)
{
const char *kWho = "ReplSemiSyncMaster::try_switch_on";
bool semi_sync_on = false;
function_enter(kWho);
/* If the current sending event's position is larger than or equal to the
* 'largest' commit transaction binlog position, the slave is already
* catching up now and we can switch semi-sync on here.
* If commit_file_name_inited_ indicates there are no recent transactions,
* we can enable semi-sync immediately.
*/
if (commit_file_name_inited_)
{
int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
commit_file_name_, commit_file_pos_);
semi_sync_on = (cmp >= 0);
}
else
{
semi_sync_on = true;
}
if (semi_sync_on)
{
/* Switch semi-sync replication on. */
state_ = true;
sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
"at (%s, %lu)",
server_id, log_file_name,
(unsigned long)log_file_pos);
}
return function_exit(kWho, 0);
}
int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header,
unsigned long size)
{
const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader";
function_enter(kWho);
int hlen=0;
if (!is_semi_sync_slave())
{
hlen= 0;
}
else
{
/* No enough space for the extra header, disable semi-sync master */
if (sizeof(kSyncHeader) > size)
{
sql_print_warning("No enough space in the packet "
"for semi-sync extra header, "
"semi-sync replication disabled");
disableMaster();
return 0;
}
/* Set the magic number and the sync status. By default, no sync
* is required.
*/
memcpy(header, kSyncHeader, sizeof(kSyncHeader));
hlen= sizeof(kSyncHeader);
}
return function_exit(kWho, hlen);
}
int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
const char *log_file_name,
my_off_t log_file_pos,
uint32 server_id)
{
const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
int cmp = 0;
bool sync = false;
/* If the semi-sync master is not enabled, or the slave is not a semi-sync
* target, do not request replies from the slave.
*/
if (!getMasterEnabled() || !is_semi_sync_slave())
{
sync = false;
return 0;
}
function_enter(kWho);
lock();
/* This is the real check inside the mutex. */
if (!getMasterEnabled())
{
sync = false;
goto l_end;
}
if (is_on())
{
/* semi-sync is ON */
sync = false; /* No sync unless a transaction is involved. */
if (reply_file_name_inited_)
{
cmp = ActiveTranx::compare(log_file_name, log_file_pos,
reply_file_name_, reply_file_pos_);
if (cmp <= 0)
{
/* If we have already got the reply for the event, then we do
* not need to sync the transaction again.
*/
goto l_end;
}
}
if (wait_file_name_inited_)
{
cmp = ActiveTranx::compare(log_file_name, log_file_pos,
wait_file_name_, wait_file_pos_);
}
else
{
cmp = 1;
}
/* If we are already waiting for some transaction replies which
* are later in binlog, do not wait for this one event.
*/
if (cmp >= 0)
{
/*
* We only wait if the event is a transaction's ending event.
*/
assert(active_tranxs_ != NULL);
sync = active_tranxs_->is_tranx_end_pos(log_file_name,
log_file_pos);
}
}
else
{
if (commit_file_name_inited_)
{
int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
commit_file_name_, commit_file_pos_);
sync = (cmp >= 0);
}
else
{
sync = true;
}
}
if (trace_level_ & kTraceDetail)
sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
kWho, server_id, log_file_name,
(unsigned long)log_file_pos, sync, (int)is_on());
l_end:
unlock();
/* We do not need to clear sync flag because we set it to 0 when we
* reserve the packet header.
*/
if (sync)
(packet)[2] = kPacketFlagSync;
return function_exit(kWho, 0);
}
int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
my_off_t log_file_pos)
{
const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog";
int result = 0;
function_enter(kWho);
lock();
/* This is the real check inside the mutex. */
if (!getMasterEnabled())
goto l_end;
/* Update the 'largest' transaction commit position seen so far even
* though semi-sync is switched off.
* It is much better that we update commit_file_* here, instead of
* inside commitTrx(). This is mostly because updateSyncHeader()
* will watch for commit_file_* to decide whether to switch semi-sync
* on. The detailed reason is explained in function updateSyncHeader().
*/
if (commit_file_name_inited_)
{
int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
commit_file_name_, commit_file_pos_);
if (cmp > 0)
{
/* This is a larger position, let's update the maximum info. */
strcpy(commit_file_name_, log_file_name);
commit_file_pos_ = log_file_pos;
}
}
else
{
strcpy(commit_file_name_, log_file_name);
commit_file_pos_ = log_file_pos;
commit_file_name_inited_ = true;
}
if (is_on() && rpl_semi_sync_master_clients)
{
assert(active_tranxs_ != NULL);
if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
{
/*
if insert tranx_node failed, print a warning message
and turn off semi-sync
*/
sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %ul",
log_file_name, log_file_pos);
switch_off();
}
}
l_end:
unlock();
return function_exit(kWho, result);
}
int ReplSemiSyncMaster::resetMaster()
{
const char *kWho = "ReplSemiSyncMaster::resetMaster";
int result = 0;
function_enter(kWho);
lock();
state_ = getMasterEnabled()? 1 : 0;
wait_file_name_inited_ = false;
reply_file_name_inited_ = false;
commit_file_name_inited_ = false;
enabled_transactions_ = 0;
disabled_transactions_ = 0;
switched_off_times_ = 0;
timefunc_fails_ = 0;
wait_sessions_ = 0;
wait_backtraverse_ = 0;
total_trx_wait_num_ = 0;
total_trx_wait_time_ = 0;
total_net_wait_num_ = 0;
total_net_wait_time_ = 0;
unlock();
return function_exit(kWho, result);
}
void ReplSemiSyncMaster::setExportStats()
{
lock();
rpl_semi_sync_master_status = state_ && rpl_semi_sync_master_clients;
rpl_semi_sync_master_yes_transactions = enabled_transactions_;
rpl_semi_sync_master_no_transactions = disabled_transactions_;
rpl_semi_sync_master_off_times = switched_off_times_;
rpl_semi_sync_master_timefunc_fails = timefunc_fails_;
rpl_semi_sync_master_num_timeouts = total_wait_timeouts_;
rpl_semi_sync_master_wait_sessions = wait_sessions_;
rpl_semi_sync_master_back_wait_pos = wait_backtraverse_;
rpl_semi_sync_master_trx_wait_num = total_trx_wait_num_;
rpl_semi_sync_master_trx_wait_time =
((total_trx_wait_num_) ?
(unsigned long)((double)total_trx_wait_time_ /
((double)total_trx_wait_num_)) : 0);
rpl_semi_sync_master_net_wait_num = total_net_wait_num_;
rpl_semi_sync_master_net_wait_time =
((total_net_wait_num_) ?
(unsigned long)((double)total_net_wait_time_ /
((double)total_net_wait_num_)) : 0);
rpl_semi_sync_master_net_wait_total_time = total_net_wait_time_;
rpl_semi_sync_master_trx_wait_total_time = total_trx_wait_time_;
unlock();
}
/* Get the waiting time given the wait's staring time.
*
* Return:
* >= 0: the waiting time in microsecons(us)
* < 0: error in gettimeofday or time back traverse
*/
static int getWaitTime(const struct timeval& start_tv)
{
unsigned long long start_usecs, end_usecs;
struct timeval end_tv;
int end_time_err;
/* Starting time in microseconds(us). */
start_usecs = start_tv.tv_sec * TIME_MILLION + start_tv.tv_usec;
/* Get the wait time interval. */
end_time_err = gettimeofday(&end_tv, 0);
/* Ending time in microseconds(us). */
end_usecs = end_tv.tv_sec * TIME_MILLION + end_tv.tv_usec;
if (end_time_err != 0 || end_usecs < start_usecs)
return -1;
return (int)(end_usecs - start_usecs);
}
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef SEMISYNC_MASTER_H
#define SEMISYNC_MASTER_H
#include "semisync.h"
/**
This class manages memory for active transaction list.
We record each active transaction with a TranxNode. Because each
session can only have only one open transaction, the total active
transaction nodes can not exceed the maximum sessions. Currently
in MySQL, sessions are the same as connections.
*/
class ActiveTranx
:public Trace {
private:
struct TranxNode {
char *log_name_;
my_off_t log_pos_;
struct TranxNode *next_; /* the next node in the sorted list */
struct TranxNode *hash_next_; /* the next node during hash collision */
};
/* The following data structure maintains an active transaction list. */
TranxNode *node_array_;
TranxNode *free_pool_;
/* These two record the active transaction list in sort order. */
TranxNode *trx_front_, *trx_rear_;
TranxNode **trx_htb_; /* A hash table on active transactions. */
int num_transactions_; /* maximum transactions */
int num_entries_; /* maximum hash table entries */
pthread_mutex_t *lock_; /* mutex lock */
inline void assert_lock_owner();
inline TranxNode* alloc_tranx_node();
inline unsigned int calc_hash(const unsigned char *key,unsigned int length);
unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos);
int compare(const char *log_file_name1, my_off_t log_file_pos1,
const TranxNode *node2) {
return compare(log_file_name1, log_file_pos1,
node2->log_name_, node2->log_pos_);
}
int compare(const TranxNode *node1,
const char *log_file_name2, my_off_t log_file_pos2) {
return compare(node1->log_name_, node1->log_pos_,
log_file_name2, log_file_pos2);
}
int compare(const TranxNode *node1, const TranxNode *node2) {
return compare(node1->log_name_, node1->log_pos_,
node2->log_name_, node2->log_pos_);
}
public:
ActiveTranx(int max_connections, pthread_mutex_t *lock,
unsigned long trace_level);
~ActiveTranx();
/* Insert an active transaction node with the specified position.
*
* Return:
* 0: success; -1 or otherwise: error
*/
int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
/* Clear the active transaction nodes until(inclusive) the specified
* position.
* If log_file_name is NULL, everything will be cleared: the sorted
* list and the hash table will be reset to empty.
*
* Return:
* 0: success; -1 or otherwise: error
*/
int clear_active_tranx_nodes(const char *log_file_name,
my_off_t log_file_pos);
/* Given a position, check to see whether the position is an active
* transaction's ending position by probing the hash table.
*/
bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
/* Given two binlog positions, compare which one is bigger based on
* (file_name, file_position).
*/
static int compare(const char *log_file_name1, my_off_t log_file_pos1,
const char *log_file_name2, my_off_t log_file_pos2);
};
/**
The extension class for the master of semi-synchronous replication
*/
class ReplSemiSyncMaster
:public ReplSemiSyncBase {
private:
ActiveTranx *active_tranxs_; /* active transaction list: the list will
be cleared when semi-sync switches off. */
/* True when initObject has been called */
bool init_done_;
/* This cond variable is signaled when enough binlog has been sent to slave,
* so that a waiting trx can return the 'ok' to the client for a commit.
*/
pthread_cond_t COND_binlog_send_;
/* Mutex that protects the following state variables and the active
* transaction list.
* Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are
* already holding LOCK_binlog_ because it can cause deadlocks.
*/
pthread_mutex_t LOCK_binlog_;
/* This is set to true when reply_file_name_ contains meaningful data. */
bool reply_file_name_inited_;
/* The binlog name up to which we have received replies from any slaves. */
char reply_file_name_[FN_REFLEN];
/* The position in that file up to which we have the reply from any slaves. */
my_off_t reply_file_pos_;
/* This is set to true when we know the 'smallest' wait position. */
bool wait_file_name_inited_;
/* NULL, or the 'smallest' filename that a transaction is waiting for
* slave replies.
*/
char wait_file_name_[FN_REFLEN];
/* The smallest position in that file that a trx is waiting for: the trx
* can proceed and send an 'ok' to the client when the master has got the
* reply from the slave indicating that it already got the binlog events.
*/
my_off_t wait_file_pos_;
/* This is set to true when we know the 'largest' transaction commit
* position in the binlog file.
* We always maintain the position no matter whether semi-sync is switched
* on switched off. When a transaction wait timeout occurs, semi-sync will
* switch off. Binlog-dump thread can use the three fields to detect when
* slaves catch up on replication so that semi-sync can switch on again.
*/
bool commit_file_name_inited_;
/* The 'largest' binlog filename that a commit transaction is seeing. */
char commit_file_name_[FN_REFLEN];
/* The 'largest' position in that file that a commit transaction is seeing. */
my_off_t commit_file_pos_;
/* All global variables which can be set by parameters. */
volatile bool master_enabled_; /* semi-sync is enabled on the master */
unsigned long wait_timeout_; /* timeout period(ms) during tranx wait */
/* All status variables. */
bool state_; /* whether semi-sync is switched */
unsigned long enabled_transactions_; /* semi-sync'ed tansactions */
unsigned long disabled_transactions_; /* non-semi-sync'ed tansactions */
unsigned long switched_off_times_; /* how many times are switched off? */
unsigned long timefunc_fails_; /* how many time function fails? */
unsigned long total_wait_timeouts_; /* total number of wait timeouts */
unsigned long wait_sessions_; /* how many sessions wait for replies? */
unsigned long wait_backtraverse_; /* wait position back traverses */
unsigned long long total_trx_wait_num_; /* total trx waits: non-timeout ones */
unsigned long long total_trx_wait_time_; /* total trx wait time: in us */
unsigned long long total_net_wait_num_; /* total network waits */
unsigned long long total_net_wait_time_; /* total network wait time */
/* The number of maximum active transactions. This should be the same as
* maximum connections because MySQL does not do connection sharing now.
*/
int max_transactions_;
void lock();
void unlock();
void cond_broadcast();
int cond_timewait(struct timespec *wait_time);
/* Is semi-sync replication on? */
bool is_on() {
return (state_);
}
void set_master_enabled(bool enabled) {
master_enabled_ = enabled;
}
/* Switch semi-sync off because of timeout in transaction waiting. */
int switch_off();
/* Switch semi-sync on when slaves catch up. */
int try_switch_on(int server_id,
const char *log_file_name, my_off_t log_file_pos);
public:
ReplSemiSyncMaster();
~ReplSemiSyncMaster();
bool getMasterEnabled() {
return master_enabled_;
}
void setTraceLevel(unsigned long trace_level) {
trace_level_ = trace_level;
if (active_tranxs_)
active_tranxs_->trace_level_ = trace_level;
}
/* Set the transaction wait timeout period, in milliseconds. */
void setWaitTimeout(unsigned long wait_timeout) {
wait_timeout_ = wait_timeout;
}
/* Initialize this class after MySQL parameters are initialized. this
* function should be called once at bootstrap time.
*/
int initObject();
/* Enable the object to enable semi-sync replication inside the master. */
int enableMaster();
/* Enable the object to enable semi-sync replication inside the master. */
int disableMaster();
/* Add a semi-sync replication slave */
void add_slave();
/* Remove a semi-sync replication slave */
void remove_slave();
/* Is the slave servered by the thread requested semi-sync */
bool is_semi_sync_slave();
int reportReplyBinlog(const char *log_file_pos);
/* In semi-sync replication, reports up to which binlog position we have
* received replies from the slave indicating that it already get the events.
*
* Input:
* server_id - (IN) master server id number
* log_file_name - (IN) binlog file name
* end_offset - (IN) the offset in the binlog file up to which we have
* the replies from the slave
*
* Return:
* 0: success; -1 or otherwise: error
*/
int reportReplyBinlog(uint32 server_id,
const char* log_file_name,
my_off_t end_offset);
/* Commit a transaction in the final step. This function is called from
* InnoDB before returning from the low commit. If semi-sync is switch on,
* the function will wait to see whether binlog-dump thread get the reply for
* the events of the transaction. Remember that this is not a direct wait,
* instead, it waits to see whether the binlog-dump thread has reached the
* point. If the wait times out, semi-sync status will be switched off and
* all other transaction would not wait either.
*
* Input: (the transaction events' ending binlog position)
* trx_wait_binlog_name - (IN) ending position's file name
* trx_wait_binlog_pos - (IN) ending position's file offset
*
* Return:
* 0: success; -1 or otherwise: error
*/
int commitTrx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos);
/* Reserve space in the replication event packet header:
* . slave semi-sync off: 1 byte - (0)
* . slave semi-sync on: 3 byte - (0, 0xef, 0/1}
*
* Input:
* header - (IN) the header buffer
* size - (IN) size of the header buffer
*
* Return:
* size of the bytes reserved for header
*/
int reserveSyncHeader(unsigned char *header, unsigned long size);
/* Update the sync bit in the packet header to indicate to the slave whether
* the master will wait for the reply of the event. If semi-sync is switched
* off and we detect that the slave is catching up, we switch semi-sync on.
*
* Input:
* packet - (IN) the packet containing the replication event
* log_file_name - (IN) the event ending position's file name
* log_file_pos - (IN) the event ending position's file offset
* server_id - (IN) master server id number
*
* Return:
* 0: success; -1 or otherwise: error
*/
int updateSyncHeader(unsigned char *packet,
const char *log_file_name,
my_off_t log_file_pos,
uint32 server_id);
/* Called when a transaction finished writing binlog events.
* . update the 'largest' transactions' binlog event position
* . insert the ending position in the active transaction list if
* semi-sync is on
*
* Input: (the transaction events' ending binlog position)
* log_file_name - (IN) transaction ending position's file name
* log_file_pos - (IN) transaction ending position's file offset
*
* Return:
* 0: success; -1 or otherwise: error
*/
int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos);
/* Export internal statistics for semi-sync replication. */
void setExportStats();
/* 'reset master' command is issued from the user and semi-sync need to
* go off for that.
*/
int resetMaster();
};
/* System and status variables for the master component */
extern char rpl_semi_sync_master_enabled;
extern unsigned long rpl_semi_sync_master_timeout;
extern unsigned long rpl_semi_sync_master_trace_level;
extern unsigned long rpl_semi_sync_master_status;
extern unsigned long rpl_semi_sync_master_yes_transactions;
extern unsigned long rpl_semi_sync_master_no_transactions;
extern unsigned long rpl_semi_sync_master_off_times;
extern unsigned long rpl_semi_sync_master_timefunc_fails;
extern unsigned long rpl_semi_sync_master_num_timeouts;
extern unsigned long rpl_semi_sync_master_wait_sessions;
extern unsigned long rpl_semi_sync_master_back_wait_pos;
extern unsigned long rpl_semi_sync_master_trx_wait_time;
extern unsigned long rpl_semi_sync_master_net_wait_time;
extern unsigned long long rpl_semi_sync_master_net_wait_num;
extern unsigned long long rpl_semi_sync_master_trx_wait_num;
extern unsigned long long rpl_semi_sync_master_net_wait_total_time;
extern unsigned long long rpl_semi_sync_master_trx_wait_total_time;
extern unsigned long rpl_semi_sync_master_clients;
#endif /* SEMISYNC_MASTER_H */
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "semisync_master.h"
ReplSemiSyncMaster repl_semisync;
int repl_semi_report_binlog_update(Binlog_storage_param *param,
const char *log_file,
my_off_t log_pos, uint32 flags)
{
int error= 0;
if (repl_semisync.getMasterEnabled())
{
/*
Let us store the binlog file name and the position, so that
we know how long to wait for the binlog to the replicated to
the slave in synchronous replication.
*/
error= repl_semisync.writeTranxInBinlog(log_file,
log_pos);
}
return error;
}
int repl_semi_request_commit(Trans_param *param)
{
return 0;
}
int repl_semi_report_commit(Trans_param *param)
{
bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
if (is_real_trans && param->log_pos)
{
const char *binlog_name= param->log_file;
return repl_semisync.commitTrx(binlog_name, param->log_pos);
}
return 0;
}
int repl_semi_report_rollback(Trans_param *param)
{
return repl_semi_report_commit(param);
}
int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
const char *log_file,
my_off_t log_pos)
{
bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
if (semi_sync_slave)
/* One more semi-sync slave */
repl_semisync.add_slave();
sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
semi_sync_slave ? "semi-sync" : "asynchronous",
param->server_id, log_file, (unsigned long)log_pos);
return 0;
}
int repl_semi_binlog_dump_end(Binlog_transmit_param *param)
{
bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
sql_print_information("Stop %s binlog_dump to slave (server_id: %d)",
semi_sync_slave ? "semi-sync" : "asynchronous",
param->server_id);
if (semi_sync_slave)
{
/* One less semi-sync slave */
repl_semisync.remove_slave();
}
return 0;
}
int repl_semi_reserve_header(Binlog_transmit_param *param,
unsigned char *header,
unsigned long size, unsigned long *len)
{
*len += repl_semisync.reserveSyncHeader(header, size);
return 0;
}
int repl_semi_before_send_event(Binlog_transmit_param *param,
unsigned char *packet, unsigned long len,
const char *log_file, my_off_t log_pos)
{
return repl_semisync.updateSyncHeader(packet,
log_file,
log_pos,
param->server_id);
}
int repl_semi_after_send_event(Binlog_transmit_param *param,
const char *event_buf, unsigned long len)
{
return 0;
}
int repl_semi_reset_master(Binlog_transmit_param *param)
{
if (repl_semisync.resetMaster())
return 1;
return 0;
}
/*
semisync system variables
*/
static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val);
static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val);
static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val);
static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val);
static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
PLUGIN_VAR_OPCMDARG,
"Enable semi-synchronous replication master (disabled by default). ",
NULL, // check
&fix_rpl_semi_sync_master_enabled, // update
0);
static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout,
PLUGIN_VAR_OPCMDARG,
"The timeout value (in ms) for semi-synchronous replication in the master",
NULL, // check
fix_rpl_semi_sync_master_timeout, // update
10000, 0, ~0L, 1);
static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
PLUGIN_VAR_OPCMDARG,
"The tracing level for semi-sync replication.",
NULL, // check
&fix_rpl_semi_sync_master_trace_level, // update
32, 0, ~0L, 1);
/*
Use a SESSION instead of GLOBAL variable for slave to send reply to
avoid requiring SUPER privilege.
*/
static MYSQL_THDVAR_STR(reply_log_file_pos,
PLUGIN_VAR_NOCMDOPT,
"The log filename and position slave has queued to relay log.",
NULL, // check
&fix_rpl_semi_sync_master_reply_log_file_pos,
"");
static SYS_VAR* semi_sync_master_system_vars[]= {
MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(timeout),
MYSQL_SYSVAR(trace_level),
MYSQL_SYSVAR(reply_log_file_pos),
NULL,
};
static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
*(unsigned long *)ptr= *(unsigned long *)val;
repl_semisync.setWaitTimeout(rpl_semi_sync_master_timeout);
return;
}
static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
*(unsigned long *)ptr= *(unsigned long *)val;
repl_semisync.setTraceLevel(rpl_semi_sync_master_trace_level);
return;
}
static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
*(char *)ptr= *(char *)val;
if (rpl_semi_sync_master_enabled)
{
if (repl_semisync.enableMaster() != 0)
rpl_semi_sync_master_enabled = false;
}
else
{
if (repl_semisync.disableMaster() != 0)
rpl_semi_sync_master_enabled = true;
}
return;
}
static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
const char *log_file_pos= *(char **)val;
if (repl_semisync.reportReplyBinlog(log_file_pos))
sql_print_error("report slave binlog reply failed.");
return;
}
Trans_observer trans_observer = {
sizeof(Trans_observer), // len
repl_semi_report_commit, // after_commit
repl_semi_report_rollback, // after_rollback
};
Binlog_storage_observer storage_observer = {
sizeof(Binlog_storage_observer), // len
repl_semi_report_binlog_update, // report_update
};
Binlog_transmit_observer transmit_observer = {
sizeof(Binlog_transmit_observer), // len
repl_semi_binlog_dump_start, // start
repl_semi_binlog_dump_end, // stop
repl_semi_reserve_header, // reserve_header
repl_semi_before_send_event, // before_send_event
repl_semi_after_send_event, // after_send_event
repl_semi_reset_master, // reset
};
#define SHOW_FNAME(name) \
rpl_semi_sync_master_show_##name
#define DEF_SHOW_FUNC(name, show_type) \
static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \
{ \
repl_semisync.setExportStats(); \
var->type= show_type; \
var->value= (char *)&rpl_semi_sync_master_##name; \
return 0; \
}
DEF_SHOW_FUNC(clients, SHOW_LONG)
DEF_SHOW_FUNC(net_wait_time, SHOW_LONG)
DEF_SHOW_FUNC(net_wait_total_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
DEF_SHOW_FUNC(off_times, SHOW_LONG)
DEF_SHOW_FUNC(no_transactions, SHOW_LONG)
DEF_SHOW_FUNC(status, SHOW_BOOL)
DEF_SHOW_FUNC(timefunc_fails, SHOW_LONG)
DEF_SHOW_FUNC(trx_wait_time, SHOW_LONG)
DEF_SHOW_FUNC(trx_wait_total_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
DEF_SHOW_FUNC(back_wait_pos, SHOW_LONG)
DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
DEF_SHOW_FUNC(yes_transactions, SHOW_LONG)
/* plugin status variables */
static SHOW_VAR semi_sync_master_status_vars[]= {
{"Rpl_semi_sync_master_clients", (char*) &SHOW_FNAME(clients), SHOW_FUNC},
{"Rpl_semi_sync_master_net_avg_wait_time",
(char*) &SHOW_FNAME(net_wait_time), SHOW_FUNC},
{"Rpl_semi_sync_master_net_wait_time",
(char*) &SHOW_FNAME(net_wait_total_time), SHOW_FUNC},
{"Rpl_semi_sync_master_net_waits", (char*) &SHOW_FNAME(net_wait_num), SHOW_FUNC},
{"Rpl_semi_sync_master_no_times", (char*) &SHOW_FNAME(off_times), SHOW_FUNC},
{"Rpl_semi_sync_master_no_tx", (char*) &SHOW_FNAME(no_transactions), SHOW_FUNC},
{"Rpl_semi_sync_master_status", (char*) &SHOW_FNAME(status), SHOW_FUNC},
{"Rpl_semi_sync_master_timefunc_failures",
(char*) &SHOW_FNAME(timefunc_fails), SHOW_FUNC},
{"Rpl_semi_sync_master_tx_avg_wait_time",
(char*) &SHOW_FNAME(trx_wait_time), SHOW_FUNC},
{"Rpl_semi_sync_master_tx_wait_time",
(char*) &SHOW_FNAME(trx_wait_total_time), SHOW_FUNC},
{"Rpl_semi_sync_master_tx_waits", (char*) &SHOW_FNAME(trx_wait_num), SHOW_FUNC},
{"Rpl_semi_sync_master_wait_pos_backtraverse",
(char*) &SHOW_FNAME(back_wait_pos), SHOW_FUNC},
{"Rpl_semi_sync_master_wait_sessions",
(char*) &SHOW_FNAME(wait_sessions), SHOW_FUNC},
{"Rpl_semi_sync_master_yes_tx", (char*) &SHOW_FNAME(yes_transactions), SHOW_FUNC},
{NULL, NULL, SHOW_LONG},
};
static int semi_sync_master_plugin_init(void *p)
{
if (repl_semisync.initObject())
return 1;
if (register_trans_observer(&trans_observer, p))
return 1;
if (register_binlog_storage_observer(&storage_observer, p))
return 1;
if (register_binlog_transmit_observer(&transmit_observer, p))
return 1;
return 0;
}
static int semi_sync_master_plugin_deinit(void *p)
{
if (unregister_trans_observer(&trans_observer, p))
{
sql_print_error("unregister_trans_observer failed");
return 1;
}
if (unregister_binlog_storage_observer(&storage_observer, p))
{
sql_print_error("unregister_binlog_storage_observer failed");
return 1;
}
if (unregister_binlog_transmit_observer(&transmit_observer, p))
{
sql_print_error("unregister_binlog_transmit_observer failed");
return 1;
}
sql_print_information("unregister_replicator OK");
return 0;
}
struct Mysql_replication semi_sync_master_plugin= {
MYSQL_REPLICATION_INTERFACE_VERSION
};
/*
Plugin library descriptor
*/
mysql_declare_plugin(semi_sync_master)
{
MYSQL_REPLICATION_PLUGIN,
&semi_sync_master_plugin,
"rpl_semi_sync_master",
"He Zhenxing",
"Semi-synchronous replication master",
PLUGIN_LICENSE_GPL,
semi_sync_master_plugin_init, /* Plugin Init */
semi_sync_master_plugin_deinit, /* Plugin Deinit */
0x0100 /* 1.0 */,
semi_sync_master_status_vars, /* status variables */
semi_sync_master_system_vars, /* system variables */
NULL /* config options */
}
mysql_declare_plugin_end;
/* Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "semisync_slave.h"
char rpl_semi_sync_slave_enabled;
unsigned long rpl_semi_sync_slave_status= 0;
unsigned long rpl_semi_sync_slave_trace_level;
int ReplSemiSyncSlave::initObject()
{
int result= 0;
const char *kWho = "ReplSemiSyncSlave::initObject";
if (init_done_)
{
fprintf(stderr, "%s called twice\n", kWho);
return 1;
}
init_done_ = true;
/* References to the parameter works after set_options(). */
setSlaveEnabled(rpl_semi_sync_slave_enabled);
setTraceLevel(rpl_semi_sync_slave_trace_level);
return result;
}
int ReplSemiSyncSlave::slaveReplyConnect()
{
if (!mysql_reply && !(mysql_reply= rpl_connect_master(NULL)))
{
sql_print_error("Semisync slave connect to master for reply failed");
return 1;
}
return 0;
}
int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
unsigned long total_len,
bool *need_reply,
const char **payload,
unsigned long *payload_len)
{
const char *kWho = "ReplSemiSyncSlave::slaveReadSyncHeader";
int read_res = 0;
function_enter(kWho);
if ((unsigned char)(header[0]) == kPacketMagicNum)
{
*need_reply = (header[1] & kPacketFlagSync);
*payload_len = total_len - 2;
*payload = header + 2;
if (trace_level_ & kTraceDetail)
sql_print_information("%s: reply - %d", kWho, *need_reply);
}
else
{
sql_print_error("Missing magic number for semi-sync packet, packet "
"len: %lu", total_len);
read_res = -1;
}
return function_exit(kWho, read_res);
}
int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param)
{
bool semi_sync= getSlaveEnabled();
sql_print_information("Slave I/O thread: Start %s replication to\
master '%s@%s:%d' in log '%s' at position %lu",
semi_sync ? "semi-sync" : "asynchronous",
param->user, param->host, param->port,
param->master_log_name[0] ? param->master_log_name : "FIRST",
(unsigned long)param->master_log_pos);
if (semi_sync && !rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 1;
return 0;
}
int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
{
if (rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 0;
if (mysql_reply)
mysql_close(mysql_reply);
mysql_reply= 0;
return 0;
}
int ReplSemiSyncSlave::slaveReply(const char *log_name, my_off_t log_pos)
{
char query[FN_REFLEN + 100];
sprintf(query, "SET SESSION rpl_semi_sync_master_reply_log_file_pos='%llu:%s'",
(unsigned long long)log_pos, log_name);
if (mysql_real_query(mysql_reply, query, strlen(query)))
{
sql_print_error("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed");
mysql_free_result(mysql_store_result(mysql_reply));
mysql_close(mysql_reply);
mysql_reply= 0;
return 1;
}
mysql_free_result(mysql_store_result(mysql_reply));
return 0;
}
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef SEMISYNC_SLAVE_H
#define SEMISYNC_SLAVE_H
#include "semisync.h"
/**
The extension class for the slave of semi-synchronous replication
*/
class ReplSemiSyncSlave
:public ReplSemiSyncBase {
public:
ReplSemiSyncSlave()
:slave_enabled_(false)
{}
~ReplSemiSyncSlave() {}
void setTraceLevel(unsigned long trace_level) {
trace_level_ = trace_level;
}
/* Initialize this class after MySQL parameters are initialized. this
* function should be called once at bootstrap time.
*/
int initObject();
bool getSlaveEnabled() {
return slave_enabled_;
}
void setSlaveEnabled(bool enabled) {
slave_enabled_ = enabled;
}
/* A slave reads the semi-sync packet header and separate the metadata
* from the payload data.
*
* Input:
* header - (IN) packet header pointer
* total_len - (IN) total packet length: metadata + payload
* need_reply - (IN) whether the master is waiting for the reply
* payload - (IN) payload: the replication event
* payload_len - (IN) payload length
*
* Return:
* 0: success; -1 or otherwise: error
*/
int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
const char **payload, unsigned long *payload_len);
/* A slave replies to the master indicating its replication process. It
* indicates that the slave has received all events before the specified
* binlog position.
*
* Input:
* log_name - (IN) the reply point's binlog file name
* log_pos - (IN) the reply point's binlog file offset
*
* Return:
* 0: success; -1 or otherwise: error
*/
int slaveReply(const char *log_name, my_off_t log_pos);
/*
Connect to master for sending reply
*/
int slaveReplyConnect();
int slaveStart(Binlog_relay_IO_param *param);
int slaveStop(Binlog_relay_IO_param *param);
private:
/* True when initObject has been called */
bool init_done_;
bool slave_enabled_; /* semi-sycn is enabled on the slave */
MYSQL *mysql_reply; /* connection to send reply */
};
/* System and status variables for the slave component */
extern char rpl_semi_sync_slave_enabled;
extern unsigned long rpl_semi_sync_slave_trace_level;
extern unsigned long rpl_semi_sync_slave_status;
#endif /* SEMISYNC_SLAVE_H */
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "semisync_slave.h"
ReplSemiSyncSlave repl_semisync;
/*
indicate whether or not the slave should send a reply to the master.
This is set to true in repl_semi_slave_read_event if the current
event read is the last event of a transaction. And the value is
checked in repl_semi_slave_queue_event.
*/
bool semi_sync_need_reply= false;
int repl_semi_reset_slave(Binlog_relay_IO_param *param)
{
// TODO: reset semi-sync slave status here
return 0;
}
int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
uint32 flags)
{
MYSQL *mysql= param->mysql;
MYSQL_RES *res= 0;
MYSQL_ROW row;
const char *query;
if (!repl_semisync.getSlaveEnabled())
return 0;
/*
Create the connection that is used to send slave ACK replies to
master
*/
if (repl_semisync.slaveReplyConnect())
return 1;
/* Check if master server has semi-sync plugin installed */
query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
if (mysql_real_query(mysql, query, strlen(query)) ||
!(res= mysql_store_result(mysql)))
{
mysql_free_result(mysql_store_result(mysql));
sql_print_error("Execution failed on master: %s", query);
return 1;
}
row= mysql_fetch_row(res);
if (!row || strcmp(row[1], "ON"))
{
/* Master does not support or not configured semi-sync */
sql_print_warning("Master server does not support or not configured semi-sync replication, fallback to asynchronous");
rpl_semi_sync_slave_status= 0;
return 0;
}
/*
Tell master dump thread that we want to do semi-sync
replication
*/
query= "SET @rpl_semi_sync_slave= 1";
if (mysql_real_query(mysql, query, strlen(query)))
{
sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed");
mysql_free_result(mysql_store_result(mysql));
return 1;
}
mysql_free_result(mysql_store_result(mysql));
rpl_semi_sync_slave_status= 1;
return 0;
}
int repl_semi_slave_read_event(Binlog_relay_IO_param *param,
const char *packet, unsigned long len,
const char **event_buf, unsigned long *event_len)
{
if (rpl_semi_sync_slave_status)
return repl_semisync.slaveReadSyncHeader(packet, len,
&semi_sync_need_reply,
event_buf, event_len);
*event_buf= packet;
*event_len= len;
return 0;
}
int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
const char *event_buf,
unsigned long event_len,
uint32 flags)
{
if (rpl_semi_sync_slave_status && semi_sync_need_reply)
return repl_semisync.slaveReply(param->master_log_name,
param->master_log_pos);
return 0;
}
int repl_semi_slave_io_start(Binlog_relay_IO_param *param)
{
return repl_semisync.slaveStart(param);
}
int repl_semi_slave_io_end(Binlog_relay_IO_param *param)
{
return repl_semisync.slaveStop(param);
}
static void fix_rpl_semi_sync_slave_enabled(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
*(char *)ptr= *(char *)val;
repl_semisync.setSlaveEnabled(rpl_semi_sync_slave_enabled != 0);
return;
}
static void fix_rpl_semi_sync_trace_level(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
*(unsigned long *)ptr= *(unsigned long *)val;
repl_semisync.setTraceLevel(rpl_semi_sync_slave_trace_level);
return;
}
/* plugin system variables */
static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_slave_enabled,
PLUGIN_VAR_OPCMDARG,
"Enable semi-synchronous replication slave (disabled by default). ",
NULL, // check
&fix_rpl_semi_sync_slave_enabled, // update
0);
static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level,
PLUGIN_VAR_OPCMDARG,
"The tracing level for semi-sync replication.",
NULL, // check
&fix_rpl_semi_sync_trace_level, // update
32, 0, ~0L, 1);
static SYS_VAR* semi_sync_slave_system_vars[]= {
MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(trace_level),
NULL,
};
/* plugin status variables */
static SHOW_VAR semi_sync_slave_status_vars[]= {
{"Rpl_semi_sync_slave_status",
(char*) &rpl_semi_sync_slave_status, SHOW_BOOL},
{NULL, NULL, SHOW_BOOL},
};
Binlog_relay_IO_observer relay_io_observer = {
sizeof(Binlog_relay_IO_observer), // len
repl_semi_slave_io_start, // start
repl_semi_slave_io_end, // stop
repl_semi_slave_request_dump, // request_transmit
repl_semi_slave_read_event, // after_read_event
repl_semi_slave_queue_event, // after_queue_event
repl_semi_reset_slave, // reset
};
static int semi_sync_slave_plugin_init(void *p)
{
if (repl_semisync.initObject())
return 1;
if (register_binlog_relay_io_observer(&relay_io_observer, p))
return 1;
return 0;
}
static int semi_sync_slave_plugin_deinit(void *p)
{
if (unregister_binlog_relay_io_observer(&relay_io_observer, p))
return 1;
return 0;
}
struct Mysql_replication semi_sync_slave_plugin= {
MYSQL_REPLICATION_INTERFACE_VERSION
};
/*
Plugin library descriptor
*/
mysql_declare_plugin(semi_sync_slave)
{
MYSQL_REPLICATION_PLUGIN,
&semi_sync_slave_plugin,
"rpl_semi_sync_slave",
"He Zhenxing",
"Semi-synchronous replication slave",
PLUGIN_LICENSE_GPL,
semi_sync_slave_plugin_init, /* Plugin Init */
semi_sync_slave_plugin_deinit, /* Plugin Deinit */
0x0100 /* 1.0 */,
semi_sync_slave_status_vars, /* status variables */
semi_sync_slave_system_vars, /* system variables */
NULL /* config options */
}
mysql_declare_plugin_end;
......@@ -75,6 +75,7 @@ SET (SQL_SOURCE
rpl_rli.cc rpl_mi.cc sql_servers.cc
sql_connect.cc scheduler.cc
sql_profile.cc event_parse_data.cc
rpl_handler.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.h
${PROJECT_SOURCE_DIR}/include/mysqld_error.h
......
......@@ -76,7 +76,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sql_plugin.h authors.h event_parse_data.h \
event_data_objects.h event_scheduler.h \
sql_partition.h partition_info.h partition_element.h \
contributors.h sql_servers.h
contributors.h sql_servers.h \
rpl_handler.h replication.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \
......@@ -120,7 +121,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
event_queue.cc event_db_repository.cc events.cc \
sql_plugin.cc sql_binlog.cc \
sql_builtin.cc sql_tablespace.cc partition_info.cc \
sql_servers.cc event_parse_data.cc
sql_servers.cc event_parse_data.cc \
rpl_handler.cc
nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c my_user.c
......
......@@ -24,6 +24,7 @@
#endif
#include "mysql_priv.h"
#include "rpl_handler.h"
#include "rpl_filter.h"
#include <myisampack.h>
#include <errno.h>
......@@ -221,6 +222,8 @@ handlerton *ha_checktype(THD *thd, enum legacy_db_type database_type,
return NULL;
}
RUN_HOOK(transaction, after_rollback, (thd, FALSE));
switch (database_type) {
#ifndef NO_HASH
case DB_TYPE_HASH:
......@@ -1190,6 +1193,7 @@ int ha_commit_trans(THD *thd, bool all)
if (cookie)
tc_log->unlog(cookie, xid);
DBUG_EXECUTE_IF("crash_commit_after", abort(););
RUN_HOOK(transaction, after_commit, (thd, FALSE));
end:
if (rw_trans)
start_waiting_global_read_lock(thd);
......@@ -1337,6 +1341,7 @@ int ha_rollback_trans(THD *thd, bool all)
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
RUN_HOOK(transaction, after_rollback, (thd, FALSE));
DBUG_RETURN(error);
}
......@@ -1371,7 +1376,14 @@ int ha_autocommit_or_rollback(THD *thd, int error)
thd->variables.tx_isolation=thd->session_tx_isolation;
}
else
#endif
{
if (!error)
RUN_HOOK(transaction, after_commit, (thd, FALSE));
else
RUN_HOOK(transaction, after_rollback, (thd, FALSE));
}
DBUG_RETURN(error);
}
......
......@@ -38,6 +38,7 @@
#endif
#include <mysql/plugin.h>
#include "rpl_handler.h"
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
......@@ -3683,9 +3684,11 @@ err:
}
bool MYSQL_BIN_LOG::flush_and_sync()
bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
{
int err=0, fd=log_file.file;
if (synced)
*synced= 0;
safe_mutex_assert_owner(&LOCK_log);
if (flush_io_cache(&log_file))
return 1;
......@@ -3693,6 +3696,8 @@ bool MYSQL_BIN_LOG::flush_and_sync()
{
sync_binlog_counter= 0;
err=my_sync(fd, MYF(MY_WME));
if (synced)
*synced= 1;
}
return err;
}
......@@ -3983,7 +3988,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
if (file == &log_file)
{
error= flush_and_sync();
error= flush_and_sync(0);
if (!error)
{
signal_update();
......@@ -4169,8 +4174,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
if (file == &log_file) // we are writing to the real log (disk)
{
if (flush_and_sync())
bool synced= 0;
if (flush_and_sync(&synced))
goto err;
if (RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, file->pos_in_file, synced))) {
sql_print_error("Failed to run 'after_flush' hooks");
goto err;
}
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
......@@ -4425,7 +4438,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
DBUG_ASSERT(carry == 0);
if (sync_log)
flush_and_sync();
flush_and_sync(0);
return 0; // All OK
}
......@@ -4472,7 +4485,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
ev.write(&log_file);
if (lock)
{
if (!error && !(error= flush_and_sync()))
if (!error && !(error= flush_and_sync(0)))
{
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
......@@ -4560,7 +4573,8 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
if (incident && write_incident(thd, FALSE))
goto err;
if (flush_and_sync())
bool synced= 0;
if (flush_and_sync(&synced))
goto err;
DBUG_EXECUTE_IF("half_binlogged_transaction", abort(););
if (cache->error) // Error on read
......@@ -4569,6 +4583,15 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
write_error=1; // Don't give more errors
goto err;
}
if (RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, log_file.pos_in_file, synced)))
{
sql_print_error("Failed to run 'after_flush' hooks");
write_error=1;
goto err;
}
signal_update();
}
......
......@@ -378,7 +378,21 @@ public:
bool is_active(const char* log_file_name);
int update_log_index(LOG_INFO* linfo, bool need_update_threads);
void rotate_and_purge(uint flags);
bool flush_and_sync();
/**
Flush binlog cache and synchronize to disk.
This function flushes events in binlog cache to binary log file,
it will do synchronizing according to the setting of system
variable 'sync_binlog'. If file is synchronized, @c synced will
be set to 1, otherwise 0.
@param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0
@retval 0 Success
@retval other Failure
*/
bool flush_and_sync(bool *synced);
int purge_logs(const char *to_log, bool included,
bool need_mutex, bool need_update_threads,
ulonglong *decrease_log_space);
......
......@@ -31,6 +31,8 @@
#include "rpl_injector.h"
#include "rpl_handler.h"
#ifdef HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
#endif
......@@ -1284,6 +1286,7 @@ void clean_up(bool print_message)
ha_end();
if (tc_log)
tc_log->close();
delegates_destroy();
xid_cache_free();
delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
multi_keycache_free();
......@@ -3760,6 +3763,13 @@ static int init_server_components()
unireg_abort(1);
}
/* initialize delegates for extension observers */
if (delegates_init())
{
sql_print_error("Initialize extension delegates failed");
unireg_abort(1);
}
/* need to configure logging before initializing storage engines */
if (opt_update_log)
{
......
/* Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef REPLICATION_H
#define REPLICATION_H
#ifdef __cplusplus
extern "C" {
#endif
/**
Transaction observer flags.
*/
enum Trans_flags {
/** Transaction is a real transaction */
TRANS_IS_REAL_TRANS = 1
};
/**
Transaction observer parameter
*/
typedef struct Trans_param {
uint32 server_id;
uint32 flags;
/*
The latest binary log file name and position written by current
transaction, if binary log is disabled or no log event has been
written into binary log file by current transaction (events
written into transaction log cache are not counted), these two
member will be zero.
*/
const char *log_file;
my_off_t log_pos;
} Trans_param;
/**
Observes and extends transaction execution
*/
typedef struct Trans_observer {
uint32 len;
/**
This callback is called after transaction commit
This callback is called right after commit to storage engines for
transactional tables.
For non-transactional tables, this is called at the end of the
statement, before sending statement status, if the statement
succeeded.
@note The return value is currently ignored by the server.
@param param The parameter for transaction observers
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_commit)(Trans_param *param);
/**
This callback is called after transaction rollback
This callback is called right after rollback to storage engines
for transactional tables.
For non-transactional tables, this is called at the end of the
statement, before sending statement status, if the statement
failed.
@note The return value is currently ignored by the server.
@param param The parameter for transaction observers
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_rollback)(Trans_param *param);
} Trans_observer;
/**
Binlog storage flags
*/
enum Binlog_storage_flags {
/** Binary log was sync:ed */
BINLOG_STORAGE_IS_SYNCED = 1
};
/**
Binlog storage observer parameters
*/
typedef struct Binlog_storage_param {
uint32 server_id;
} Binlog_storage_param;
/**
Observe binlog logging storage
*/
typedef struct Binlog_storage_observer {
uint32 len;
/**
This callback is called after binlog has been flushed
This callback is called after cached events have been flushed to
binary log file. Whether the binary log file is synchronized to
disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags.
@param param Observer common parameter
@param log_file Binlog file name been updated
@param log_pos Binlog position after update
@param flags flags for binlog storage
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_flush)(Binlog_storage_param *param,
const char *log_file, my_off_t log_pos,
uint32 flags);
} Binlog_storage_observer;
/**
Replication binlog transmitter (binlog dump) observer parameter.
*/
typedef struct Binlog_transmit_param {
uint32 server_id;
uint32 flags;
} Binlog_transmit_param;
/**
Observe and extends the binlog dumping thread.
*/
typedef struct Binlog_transmit_observer {
uint32 len;
/**
This callback is called when binlog dumping starts
@param param Observer common parameter
@param log_file Binlog file name to transmit from
@param log_pos Binlog position to transmit from
@retval 0 Sucess
@retval 1 Failure
*/
int (*transmit_start)(Binlog_transmit_param *param,
const char *log_file, my_off_t log_pos);
/**
This callback is called when binlog dumping stops
@param param Observer common parameter
@retval 0 Sucess
@retval 1 Failure
*/
int (*transmit_stop)(Binlog_transmit_param *param);
/**
This callback is called to reserve bytes in packet header for event transmission
This callback is called when resetting transmit packet header to
reserve bytes for this observer in packet header.
The @a header buffer is allocated by the server code, and @a size
is the size of the header buffer. Each observer can only reserve
a maximum size of @a size in the header.
@param param Observer common parameter
@param header Pointer of the header buffer
@param size Size of the header buffer
@param len Header length reserved by this observer
@retval 0 Sucess
@retval 1 Failure
*/
int (*reserve_header)(Binlog_transmit_param *param,
unsigned char *header,
unsigned long size,
unsigned long *len);
/**
This callback is called before sending an event packet to slave
@param param Observer common parameter
@param packet Binlog event packet to send
@param len Length of the event packet
@param log_file Binlog file name of the event packet to send
@param log_pos Binlog position of the event packet to send
@retval 0 Sucess
@retval 1 Failure
*/
int (*before_send_event)(Binlog_transmit_param *param,
unsigned char *packet, unsigned long len,
const char *log_file, my_off_t log_pos );
/**
This callback is called after sending an event packet to slave
@param param Observer common parameter
@param event_buf Binlog event packet buffer sent
@param len length of the event packet buffer
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_send_event)(Binlog_transmit_param *param,
const char *event_buf, unsigned long len);
/**
This callback is called after resetting master status
This is called when executing the command RESET MASTER, and is
used to reset status variables added by observers.
@param param Observer common parameter
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_reset_master)(Binlog_transmit_param *param);
} Binlog_transmit_observer;
/**
Binlog relay IO flags
*/
enum Binlog_relay_IO_flags {
/** Binary relay log was sync:ed */
BINLOG_RELAY_IS_SYNCED = 1
};
/**
Replication binlog relay IO observer parameter
*/
typedef struct Binlog_relay_IO_param {
uint32 server_id;
/* Master host, user and port */
char *host;
char *user;
unsigned int port;
char *master_log_name;
my_off_t master_log_pos;
MYSQL *mysql; /* the connection to master */
} Binlog_relay_IO_param;
/**
Observes and extends the service of slave IO thread.
*/
typedef struct Binlog_relay_IO_observer {
uint32 len;
/**
This callback is called when slave IO thread starts
@param param Observer common parameter
@retval 0 Sucess
@retval 1 Failure
*/
int (*thread_start)(Binlog_relay_IO_param *param);
/**
This callback is called when slave IO thread stops
@param param Observer common parameter
@retval 0 Sucess
@retval 1 Failure
*/
int (*thread_stop)(Binlog_relay_IO_param *param);
/**
This callback is called before slave requesting binlog transmission from master
This is called before slave issuing BINLOG_DUMP command to master
to request binlog.
@param param Observer common parameter
@param flags binlog dump flags
@retval 0 Sucess
@retval 1 Failure
*/
int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);
/**
This callback is called after read an event packet from master
@param param Observer common parameter
@param packet The event packet read from master
@param len Length of the event packet read from master
@param event_buf The event packet return after process
@param event_len The length of event packet return after process
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_read_event)(Binlog_relay_IO_param *param,
const char *packet, unsigned long len,
const char **event_buf, unsigned long *event_len);
/**
This callback is called after written an event packet to relay log
@param param Observer common parameter
@param event_buf Event packet written to relay log
@param event_len Length of the event packet written to relay log
@param flags flags for relay log
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_queue_event)(Binlog_relay_IO_param *param,
const char *event_buf, unsigned long event_len,
uint32 flags);
/**
This callback is called after reset slave relay log IO status
@param param Observer common parameter
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_reset_slave)(Binlog_relay_IO_param *param);
} Binlog_relay_IO_observer;
/**
Register a transaction observer
@param observer The transaction observer to register
@param p pointer to the internal plugin structure
@retval 0 Sucess
@retval 1 Observer already exists
*/
int register_trans_observer(Trans_observer *observer, void *p);
/**
Unregister a transaction observer
@param observer The transaction observer to unregister
@param p pointer to the internal plugin structure
@retval 0 Sucess
@retval 1 Observer not exists
*/
int unregister_trans_observer(Trans_observer *observer, void *p);
/**
Register a binlog storage observer
@param observer The binlog storage observer to register
@param p pointer to the internal plugin structure
@retval 0 Sucess
@retval 1 Observer already exists
*/
int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
/**
Unregister a binlog storage observer
@param observer The binlog storage observer to unregister
@param p pointer to the internal plugin structure
@retval 0 Sucess
@retval 1 Observer not exists
*/
int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
/**
Register a binlog transmit observer
@param observer The binlog transmit observer to register
@param p pointer to the internal plugin structure
@retval 0 Sucess
@retval 1 Observer already exists
*/
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
/**
Unregister a binlog transmit observer
@param observer The binlog transmit observer to unregister
@param p pointer to the internal plugin structure
@retval 0 Sucess
@retval 1 Observer not exists
*/
int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
/**
Register a binlog relay IO (slave IO thread) observer
@param observer The binlog relay IO observer to register
@param p pointer to the internal plugin structure
@retval 0 Sucess
@retval 1 Observer already exists
*/
int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
/**
Unregister a binlog relay IO (slave IO thread) observer
@param observer The binlog relay IO observer to unregister
@param p pointer to the internal plugin structure
@retval 0 Sucess
@retval 1 Observer not exists
*/
int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
/**
Connect to master
This function can only used in the slave I/O thread context, and
will use the same master information to do the connection.
@code
MYSQL *mysql = mysql_init(NULL);
if (rpl_connect_master(mysql))
{
// do stuff with the connection
}
mysql_close(mysql); // close the connection
@endcode
@param mysql address of MYSQL structure to use, pass NULL will
create a new one
@return address of MYSQL structure on success, NULL on failure
*/
MYSQL *rpl_connect_master(MYSQL *mysql);
/**
Set thread entering a condition
This function should be called before putting a thread to wait for
a condition. @a mutex should be held before calling this
function. After being waken up, @f thd_exit_cond should be called.
@param thd The thread entering the condition, NULL means current thread
@param cond The condition the thread is going to wait for
@param mutex The mutex associated with the condition, this must be
held before call this function
@param msg The new process message for the thread
*/
const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond,
pthread_mutex_t *mutex, const char *msg);
/**
Set thread leaving a condition
This function should be called after a thread being waken up for a
condition.
@param thd The thread entering the condition, NULL means current thread
@param old_msg The process message, ususally this should be the old process
message before calling @f thd_enter_cond
*/
void thd_exit_cond(MYSQL_THD thd, const char *old_msg);
#ifdef __cplusplus
}
#endif
#endif /* REPLICATION_H */
/* Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "mysql_priv.h"
#include "rpl_mi.h"
#include "sql_repl.h"
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
#include "rpl_handler.h"
Trans_delegate *transaction_delegate;
Binlog_storage_delegate *binlog_storage_delegate;
#ifdef HAVE_REPLICATION
Binlog_transmit_delegate *binlog_transmit_delegate;
Binlog_relay_IO_delegate *binlog_relay_io_delegate;
#endif /* HAVE_REPLICATION */
/*
structure to save transaction log filename and position
*/
typedef struct Trans_binlog_info {
my_off_t log_pos;
char log_file[FN_REFLEN];
} Trans_binlog_info;
static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
int get_user_var_int(const char *name,
long long int *value, int *null_value)
{
my_bool null_val;
user_var_entry *entry=
(user_var_entry*) hash_search(&current_thd->user_vars,
(uchar*) name, strlen(name));
if (!entry)
return 1;
*value= entry->val_int(&null_val);
if (null_value)
*null_value= null_val;
return 0;
}
int get_user_var_real(const char *name,
double *value, int *null_value)
{
my_bool null_val;
user_var_entry *entry=
(user_var_entry*) hash_search(&current_thd->user_vars,
(uchar*) name, strlen(name));
if (!entry)
return 1;
*value= entry->val_real(&null_val);
if (null_value)
*null_value= null_val;
return 0;
}
int get_user_var_str(const char *name, char *value,
size_t len, unsigned int precision, int *null_value)
{
String str;
my_bool null_val;
user_var_entry *entry=
(user_var_entry*) hash_search(&current_thd->user_vars,
(uchar*) name, strlen(name));
if (!entry)
return 1;
entry->val_str(&null_val, &str, precision);
strncpy(value, str.c_ptr(), len);
if (null_value)
*null_value= null_val;
return 0;
}
int delegates_init()
{
static unsigned char trans_mem[sizeof(Trans_delegate)];
static unsigned char storage_mem[sizeof(Binlog_storage_delegate)];
#ifdef HAVE_REPLICATION
static unsigned char transmit_mem[sizeof(Binlog_transmit_delegate)];
static unsigned char relay_io_mem[sizeof(Binlog_relay_IO_delegate)];
#endif
if (!(transaction_delegate= new (trans_mem) Trans_delegate)
|| (!transaction_delegate->is_inited())
|| !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate)
|| (!binlog_storage_delegate->is_inited())
#ifdef HAVE_REPLICATION
|| !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate)
|| (!binlog_transmit_delegate->is_inited())
|| !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate)
|| (!binlog_relay_io_delegate->is_inited())
#endif /* HAVE_REPLICATION */
)
return 1;
if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
return 1;
return 0;
}
void delegates_destroy()
{
if (transaction_delegate)
transaction_delegate->~Trans_delegate();
if (binlog_storage_delegate)
binlog_storage_delegate->~Binlog_storage_delegate();
#ifdef HAVE_REPLICATION
if (binlog_transmit_delegate)
binlog_transmit_delegate->~Binlog_transmit_delegate();
if (binlog_relay_io_delegate)
binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
#endif /* HAVE_REPLICATION */
}
/*
This macro is used by almost all the Delegate methods to iterate
over all the observers running given callback function of the
delegate .
Add observer plugins to the thd->lex list, after each statement, all
plugins add to thd->lex will be automatically unlocked.
*/
#define FOREACH_OBSERVER(r, f, thd, args) \
param.server_id= thd->server_id; \
read_lock(); \
Observer_info_iterator iter= observer_info_iter(); \
Observer_info *info= iter++; \
for (; info; info= iter++) \
{ \
plugin_ref plugin= \
my_plugin_lock(thd, &info->plugin); \
if (!plugin) \
{ \
r= 1; \
break; \
} \
if (((Observer *)info->observer)->f \
&& ((Observer *)info->observer)->f args) \
{ \
r= 1; \
plugin_unlock(thd, plugin); \
break; \
} \
plugin_unlock(thd, plugin); \
} \
unlock()
int Trans_delegate::after_commit(THD *thd, bool all)
{
Trans_param param;
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
if (is_real_trans)
param.flags |= TRANS_IS_REAL_TRANS;
Trans_binlog_info *log_info=
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
param.log_file= log_info ? log_info->log_file : 0;
param.log_pos= log_info ? log_info->log_pos : 0;
int ret= 0;
FOREACH_OBSERVER(ret, after_commit, thd, (&param));
/*
This is the end of a real transaction or autocommit statement, we
can free the memory allocated for binlog file and position.
*/
if (is_real_trans && log_info)
{
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
my_free(log_info, MYF(0));
}
return ret;
}
int Trans_delegate::after_rollback(THD *thd, bool all)
{
Trans_param param;
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
if (is_real_trans)
param.flags |= TRANS_IS_REAL_TRANS;
Trans_binlog_info *log_info=
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
param.log_file= log_info ? log_info->log_file : 0;
param.log_pos= log_info ? log_info->log_pos : 0;
int ret= 0;
FOREACH_OBSERVER(ret, after_commit, thd, (&param));
/*
This is the end of a real transaction or autocommit statement, we
can free the memory allocated for binlog file and position.
*/
if (is_real_trans && log_info)
{
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
my_free(log_info, MYF(0));
}
return ret;
}
int Binlog_storage_delegate::after_flush(THD *thd,
const char *log_file,
my_off_t log_pos,
bool synced)
{
Binlog_storage_param param;
uint32 flags=0;
if (synced)
flags |= BINLOG_STORAGE_IS_SYNCED;
Trans_binlog_info *log_info=
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
if (!log_info)
{
if(!(log_info=
(Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
return 1;
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
}
strcpy(log_info->log_file, log_file+dirname_length(log_file));
log_info->log_pos = log_pos;
int ret= 0;
FOREACH_OBSERVER(ret, after_flush, thd,
(&param, log_info->log_file, log_info->log_pos, flags));
return ret;
}
#ifdef HAVE_REPLICATION
int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
const char *log_file,
my_off_t log_pos)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, transmit_start, thd, (&param, log_file, log_pos));
return ret;
}
int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, transmit_stop, thd, (&param));
return ret;
}
int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
String *packet)
{
/* NOTE2ME: Maximum extra header size for each observer, I hope 32
bytes should be enough for each Observer to reserve their extra
header. If later found this is not enough, we can increase this
/HEZX
*/
#define RESERVE_HEADER_SIZE 32
unsigned char header[RESERVE_HEADER_SIZE];
ulong hlen;
Binlog_transmit_param param;
param.flags= flags;
param.server_id= thd->server_id;
int ret= 0;
read_lock();
Observer_info_iterator iter= observer_info_iter();
Observer_info *info= iter++;
for (; info; info= iter++)
{
plugin_ref plugin=
my_plugin_lock(thd, &info->plugin);
if (!plugin)
{
ret= 1;
break;
}
hlen= 0;
if (((Observer *)info->observer)->reserve_header
&& ((Observer *)info->observer)->reserve_header(&param,
header,
RESERVE_HEADER_SIZE,
&hlen))
{
ret= 1;
plugin_unlock(thd, plugin);
break;
}
plugin_unlock(thd, plugin);
if (hlen == 0)
continue;
if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
{
ret= 1;
break;
}
}
unlock();
return ret;
}
int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
String *packet,
const char *log_file,
my_off_t log_pos)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, before_send_event, thd,
(&param, (uchar *)packet->c_ptr(),
packet->length(),
log_file+dirname_length(log_file), log_pos));
return ret;
}
int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
String *packet)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, after_send_event, thd,
(&param, packet->c_ptr(), packet->length()));
return ret;
}
int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, after_reset_master, thd, (&param));
return ret;
}
void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
Master_info *mi)
{
param->mysql= mi->mysql;
param->user= mi->user;
param->host= mi->host;
param->port= mi->port;
param->master_log_name= mi->master_log_name;
param->master_log_pos= mi->master_log_pos;
}
int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, thread_start, thd, (&param));
return ret;
}
int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, thread_stop, thd, (&param));
return ret;
}
int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
Master_info *mi,
ushort flags)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, before_request_transmit, thd, (&param, (uint32)flags));
return ret;
}
int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
const char *packet, ulong len,
const char **event_buf,
ulong *event_len)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, after_read_event, thd,
(&param, packet, len, event_buf, event_len));
return ret;
}
int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
const char *event_buf,
ulong event_len,
bool synced)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
uint32 flags=0;
if (synced)
flags |= BINLOG_STORAGE_IS_SYNCED;
int ret= 0;
FOREACH_OBSERVER(ret, after_queue_event, thd,
(&param, event_buf, event_len, flags));
return ret;
}
int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, after_reset_slave, thd, (&param));
return ret;
}
#endif /* HAVE_REPLICATION */
int register_trans_observer(Trans_observer *observer, void *p)
{
return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_trans_observer(Trans_observer *observer, void *p)
{
return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
}
int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
{
return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
{
return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
}
#ifdef HAVE_REPLICATION
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
}
int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
{
return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
{
return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
}
#endif /* HAVE_REPLICATION */
/* Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef RPL_HANDLER_H
#define RPL_HANDLER_H
#include "mysql_priv.h"
#include "rpl_mi.h"
#include "rpl_rli.h"
#include "sql_plugin.h"
#include "replication.h"
class Observer_info {
public:
void *observer;
st_plugin_int *plugin_int;
plugin_ref plugin;
Observer_info(void *ob, st_plugin_int *p)
:observer(ob), plugin_int(p)
{
plugin= plugin_int_to_ref(plugin_int);
}
};
class Delegate {
public:
typedef List<Observer_info> Observer_info_list;
typedef List_iterator<Observer_info> Observer_info_iterator;
int add_observer(void *observer, st_plugin_int *plugin)
{
int ret= FALSE;
if (!inited)
return TRUE;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info= iter++;
while (info && info->observer != observer)
info= iter++;
if (!info)
{
info= new Observer_info(observer, plugin);
if (!info || observer_info_list.push_back(info, &memroot))
ret= TRUE;
}
else
ret= TRUE;
unlock();
return ret;
}
int remove_observer(void *observer, st_plugin_int *plugin)
{
int ret= FALSE;
if (!inited)
return TRUE;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info= iter++;
while (info && info->observer != observer)
info= iter++;
if (info)
iter.remove();
else
ret= TRUE;
unlock();
return ret;
}
inline Observer_info_iterator observer_info_iter()
{
return Observer_info_iterator(observer_info_list);
}
inline bool is_empty()
{
return observer_info_list.is_empty();
}
inline int read_lock()
{
if (!inited)
return TRUE;
return rw_rdlock(&lock);
}
inline int write_lock()
{
if (!inited)
return TRUE;
return rw_wrlock(&lock);
}
inline int unlock()
{
if (!inited)
return TRUE;
return rw_unlock(&lock);
}
inline bool is_inited()
{
return inited;
}
Delegate()
{
inited= FALSE;
if (my_rwlock_init(&lock, NULL))
return;
init_sql_alloc(&memroot, 1024, 0);
inited= TRUE;
}
~Delegate()
{
inited= FALSE;
rwlock_destroy(&lock);
free_root(&memroot, MYF(0));
}
private:
Observer_info_list observer_info_list;
rw_lock_t lock;
MEM_ROOT memroot;
bool inited;
};
class Trans_delegate
:public Delegate {
public:
typedef Trans_observer Observer;
int before_commit(THD *thd, bool all);
int before_rollback(THD *thd, bool all);
int after_commit(THD *thd, bool all);
int after_rollback(THD *thd, bool all);
};
class Binlog_storage_delegate
:public Delegate {
public:
typedef Binlog_storage_observer Observer;
int after_flush(THD *thd, const char *log_file,
my_off_t log_pos, bool synced);
};
#ifdef HAVE_REPLICATION
class Binlog_transmit_delegate
:public Delegate {
public:
typedef Binlog_transmit_observer Observer;
int transmit_start(THD *thd, ushort flags,
const char *log_file, my_off_t log_pos);
int transmit_stop(THD *thd, ushort flags);
int reserve_header(THD *thd, ushort flags, String *packet);
int before_send_event(THD *thd, ushort flags,
String *packet, const
char *log_file, my_off_t log_pos );
int after_send_event(THD *thd, ushort flags,
String *packet);
int after_reset_master(THD *thd, ushort flags);
};
class Binlog_relay_IO_delegate
:public Delegate {
public:
typedef Binlog_relay_IO_observer Observer;
int thread_start(THD *thd, Master_info *mi);
int thread_stop(THD *thd, Master_info *mi);
int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
int after_read_event(THD *thd, Master_info *mi,
const char *packet, ulong len,
const char **event_buf, ulong *event_len);
int after_queue_event(THD *thd, Master_info *mi,
const char *event_buf, ulong event_len,
bool synced);
int after_reset_slave(THD *thd, Master_info *mi);
private:
void init_param(Binlog_relay_IO_param *param, Master_info *mi);
};
#endif /* HAVE_REPLICATION */
int delegates_init();
void delegates_destroy();
extern Trans_delegate *transaction_delegate;
extern Binlog_storage_delegate *binlog_storage_delegate;
#ifdef HAVE_REPLICATION
extern Binlog_transmit_delegate *binlog_transmit_delegate;
extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
#endif /* HAVE_REPLICATION */
/*
if there is no observers in the delegate, we can return 0
immediately.
*/
#define RUN_HOOK(group, hook, args) \
(group ##_delegate->is_empty() ? \
0 : group ##_delegate->hook args)
#endif /* RPL_HANDLER_H */
......@@ -40,6 +40,7 @@
#include <errmsg.h>
#include <mysqld_error.h>
#include <mysys_err.h>
#include "rpl_handler.h"
#ifdef HAVE_REPLICATION
......@@ -69,6 +70,8 @@ ulonglong relay_log_space_limit = 0;
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
int events_till_abort = -1;
static pthread_key(Master_info*, RPL_MASTER_INFO);
enum enum_slave_reconnect_actions
{
SLAVE_RECON_ACT_REG= 0,
......@@ -231,6 +234,10 @@ int init_slave()
TODO: re-write this to interate through the list of files
for multi-master
*/
if (pthread_key_create(&RPL_MASTER_INFO, NULL))
goto err;
active_mi= new Master_info;
/*
......@@ -1868,17 +1875,22 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
}
static int request_dump(MYSQL* mysql, Master_info* mi,
bool *suppress_warnings)
static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
bool *suppress_warnings)
{
uchar buf[FN_REFLEN + 10];
int len;
int binlog_flags = 0; // for now
ushort binlog_flags = 0; // for now
char* logname = mi->master_log_name;
DBUG_ENTER("request_dump");
*suppress_warnings= FALSE;
if (RUN_HOOK(binlog_relay_io,
before_request_transmit,
(thd, mi, binlog_flags)))
DBUG_RETURN(1);
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
......@@ -2532,6 +2544,16 @@ pthread_handler_t handle_slave_io(void *arg)
mi->master_log_name,
llstr(mi->master_log_pos,llbuff)));
/* This must be called before run any binlog_relay_io hooks */
my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook");
goto err;
}
if (!(mi->mysql = mysql = mysql_init(NULL)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
......@@ -2621,7 +2643,7 @@ connected:
while (!io_slave_killed(thd,mi))
{
thd_proc_info(thd, "Requesting binlog dump");
if (request_dump(mysql, mi, &suppress_warnings))
if (request_dump(thd, mysql, mi, &suppress_warnings))
{
sql_print_error("Failed on request_dump()");
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
......@@ -2641,6 +2663,7 @@ requesting master dump") ||
goto err;
goto connected;
});
const char *event_buf;
DBUG_ASSERT(mi->last_error().number == 0);
while (!io_slave_killed(thd,mi))
......@@ -2697,14 +2720,37 @@ Stopping slave I/O thread due to out-of-memory error from master");
retry_count=0; // ok event, reset retry counter
thd_proc_info(thd, "Queueing master event to the relay log");
if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
event_len))
event_buf= (const char*)mysql->net.read_pos + 1;
if (RUN_HOOK(binlog_relay_io, after_read_event,
(thd, mi,(const char*)mysql->net.read_pos + 1,
event_len, &event_buf, &event_len)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
ER(ER_SLAVE_FATAL_ERROR),
"Failed to run 'after_read_event' hook");
goto err;
}
/* XXX: 'synced' should be updated by queue_event to indicate
whether event has been synced to disk */
bool synced= 0;
if (queue_event(mi, event_buf, event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"could not queue event from master");
goto err;
}
if (RUN_HOOK(binlog_relay_io, after_queue_event,
(thd, mi, event_buf, event_len, synced)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
ER(ER_SLAVE_FATAL_ERROR),
"Failed to run 'after_queue_event' hook");
goto err;
}
if (flush_master_info(mi, 1))
{
sql_print_error("Failed to flush master info file");
......@@ -2750,6 +2796,7 @@ err:
// print the current replication position
sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
thd->set_query(NULL, 0);
thd->reset_db(NULL, 0);
if (mysql)
......@@ -3906,6 +3953,71 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
}
MYSQL *rpl_connect_master(MYSQL *mysql)
{
THD *thd= current_thd;
Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
if (!mi)
{
sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
return NULL;
}
bool allocated= false;
if (!mysql)
{
if(!(mysql= mysql_init(NULL)))
{
sql_print_error("rpl_connect_master: failed in mysql_init()");
return NULL;
}
allocated= true;
}
/*
XXX: copied from connect_to_master, this function should not
change the slave status, so we cannot use connect_to_master
directly
TODO: make this part a seperate function to eliminate duplication
*/
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
#ifdef HAVE_OPENSSL
if (mi->ssl)
{
mysql_ssl_set(mysql,
mi->ssl_key[0]?mi->ssl_key:0,
mi->ssl_cert[0]?mi->ssl_cert:0,
mi->ssl_ca[0]?mi->ssl_ca:0,
mi->ssl_capath[0]?mi->ssl_capath:0,
mi->ssl_cipher[0]?mi->ssl_cipher:0);
mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
&mi->ssl_verify_server_cert);
}
#endif
mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
/* This one is not strictly needed but we have it here for completeness */
mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
if (io_slave_killed(thd, mi)
|| !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))
{
if (!io_slave_killed(thd, mi))
sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)",
mysql_error(mysql), mysql_errno(mysql));
if (allocated)
mysql_close(mysql); // this will free the object
return NULL;
}
return mysql;
}
/*
Store the file and position where the execute-slave thread are in the
relay log.
......
......@@ -277,6 +277,42 @@ const char *set_thd_proc_info(THD *thd, const char *info,
return old_info;
}
extern "C"
const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond,
pthread_mutex_t *mutex, const char *msg)
{
if (!thd)
thd= current_thd;
const char* old_msg = thd->proc_info;
safe_mutex_assert_owner(mutex);
thd->mysys_var->current_mutex = mutex;
thd->mysys_var->current_cond = cond;
thd->proc_info = msg;
return old_msg;
}
extern "C"
void thd_exit_cond(MYSQL_THD thd, const char *old_msg)
{
if (!thd)
thd= current_thd;
/*
Putting the mutex unlock in thd_exit_cond() ensures that
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
locked (if that would not be the case, you'll get a deadlock if someone
does a THD::awake() on you).
*/
pthread_mutex_unlock(thd->mysys_var->current_mutex);
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex = 0;
thd->mysys_var->current_cond = 0;
thd->proc_info = old_msg;
pthread_mutex_unlock(&thd->mysys_var->mutex);
return;
}
extern "C"
void **thd_ha_data(const THD *thd, const struct handlerton *hton)
{
......
......@@ -22,6 +22,7 @@
#include "log.h"
#include "rpl_tblmap.h"
#include "replication.h"
/**
An interface that is used to take an action when
......@@ -1940,27 +1941,11 @@ public:
inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex,
const char* msg)
{
const char* old_msg = proc_info;
safe_mutex_assert_owner(mutex);
mysys_var->current_mutex = mutex;
mysys_var->current_cond = cond;
proc_info = msg;
return old_msg;
return thd_enter_cond(this, cond, mutex, msg);
}
inline void exit_cond(const char* old_msg)
{
/*
Putting the mutex unlock in exit_cond() ensures that
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
locked (if that would not be the case, you'll get a deadlock if someone
does a THD::awake() on you).
*/
pthread_mutex_unlock(mysys_var->current_mutex);
pthread_mutex_lock(&mysys_var->mutex);
mysys_var->current_mutex = 0;
mysys_var->current_cond = 0;
proc_info = old_msg;
pthread_mutex_unlock(&mysys_var->mutex);
thd_exit_cond(this, old_msg);
}
inline time_t query_start() { query_start_used=1; return start_time; }
inline void set_time()
......
......@@ -21,6 +21,7 @@
#include <m_ctype.h>
#include <myisam.h>
#include <my_dir.h>
#include "rpl_handler.h"
#include "sp_head.h"
#include "sp.h"
......
......@@ -19,14 +19,6 @@
#define REPORT_TO_LOG 1
#define REPORT_TO_USER 2
#ifdef DBUG_OFF
#define plugin_ref_to_int(A) A
#define plugin_int_to_ref(A) A
#else
#define plugin_ref_to_int(A) (A ? A[0] : NULL)
#define plugin_int_to_ref(A) &(A)
#endif
extern struct st_mysql_plugin *mysqld_builtins[];
/**
......@@ -54,7 +46,8 @@ const LEX_STRING plugin_type_names[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{ C_STRING_WITH_LEN("STORAGE ENGINE") },
{ C_STRING_WITH_LEN("FTPARSER") },
{ C_STRING_WITH_LEN("DAEMON") },
{ C_STRING_WITH_LEN("INFORMATION SCHEMA") }
{ C_STRING_WITH_LEN("INFORMATION SCHEMA") },
{ C_STRING_WITH_LEN("REPLICATION") },
};
extern int initialize_schema_table(st_plugin_int *plugin);
......@@ -93,7 +86,8 @@ static int min_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
MYSQL_HANDLERTON_INTERFACE_VERSION,
MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION,
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
MYSQL_REPLICATION_INTERFACE_VERSION,
};
static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{
......@@ -101,7 +95,8 @@ static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
MYSQL_HANDLERTON_INTERFACE_VERSION,
MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION,
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
MYSQL_REPLICATION_INTERFACE_VERSION,
};
static bool initialized= 0;
......
......@@ -18,6 +18,14 @@
class sys_var;
#ifdef DBUG_OFF
#define plugin_ref_to_int(A) A
#define plugin_int_to_ref(A) A
#else
#define plugin_ref_to_int(A) (A ? A[0] : NULL)
#define plugin_int_to_ref(A) &(A)
#endif
/*
the following flags are valid for plugin_init()
*/
......
......@@ -21,6 +21,7 @@
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
#include "rpl_handler.h"
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
......@@ -80,6 +81,32 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
DBUG_RETURN(0);
}
/*
Reset thread transmit packet buffer for event sending
This function allocates header bytes for event transmission, and
should be called before store the event data to the packet buffer.
*/
static int reset_transmit_packet(THD *thd, ushort flags,
ulong *ev_offset, const char **errmsg)
{
int ret= 0;
String *packet= &thd->packet;
/* reserve and set default header */
packet->length(0);
packet->set("\0", 1, &my_charset_bin);
if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
{
*errmsg= "Failed to run hook 'reserve_header'";
my_errno= ER_UNKNOWN_ERROR;
ret= 1;
}
*ev_offset= packet->length();
return ret;
}
static int send_file(THD *thd)
{
NET* net = &thd->net;
......@@ -346,6 +373,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
LOG_INFO linfo;
char *log_file_name = linfo.log_file_name;
char search_file_name[FN_REFLEN], *name;
ulong ev_offset;
IO_CACHE log;
File file = -1;
String* packet = &thd->packet;
......@@ -361,6 +391,14 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
thd->server_id, log_ident, (ulong)pos);
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
{
errmsg= "Failed to run hook 'transmit_start'";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
......@@ -416,11 +454,9 @@ impossible position";
goto err;
}
/*
We need to start a packet with something other than 255
to distinguish it from error
*/
packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
/* reset transmit packet for the fake rotate event below */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
/*
Tell the client about the log name with a fake Rotate event;
......@@ -460,7 +496,7 @@ impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
packet->set("\0", 1, &my_charset_bin);
/*
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
this larger than the corresponding packet (query) sent
......@@ -476,6 +512,11 @@ impossible position";
log_lock = mysql_bin_log.get_log_lock();
if (pos > BIN_LOG_HEADER_SIZE)
{
/* reset transmit packet for the event read from binary log
file */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
/*
Try to find a Format_description_log_event at the beginning of
the binlog
......@@ -483,29 +524,30 @@ impossible position";
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
/*
The packet has offsets equal to the normal offsets in a binlog
event +1 (the first character is \0).
The packet has offsets equal to the normal offsets in a
binlog event + ev_offset (the first ev_offset characters are
the header (default \0)).
*/
DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %d",
(*packet)[EVENT_TYPE_OFFSET+1]));
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
(*packet)[EVENT_TYPE_OFFSET+ev_offset]));
if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
{
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
should not increment master's binlog position
(rli->group_master_log_pos)
*/
int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
/*
if reconnect master sends FD event with `created' as 0
to avoid destroying temp tables.
*/
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
ST_CREATED_OFFSET+1, (ulong) 0);
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* send it */
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
......@@ -531,8 +573,6 @@ impossible position";
Format_description_log_event will be found naturally if it is written.
*/
}
/* reset the packet as we wrote to it in any case */
packet->set("\0", 1, &my_charset_bin);
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
else
{
......@@ -544,6 +584,12 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed)
{
Log_event_type event_type= UNKNOWN_EVENT;
/* reset the transmit packet for the event read from binary log
file */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
#ifndef DBUG_OFF
......@@ -556,15 +602,25 @@ impossible position";
}
#endif
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
if (event_type == FORMAT_DESCRIPTION_EVENT)
{
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
else if (event_type == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
pos = my_b_tell(&log);
if (RUN_HOOK(binlog_transmit, before_send_event,
(thd, flags, packet, log_file_name, pos)))
{
my_errno= ER_UNKNOWN_ERROR;
errmsg= "run 'before_send_event' hook failed";
goto err;
}
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
......@@ -572,9 +628,8 @@ impossible position";
goto err;
}
DBUG_PRINT("info", ("log event code %d",
(*packet)[LOG_EVENT_OFFSET+1] ));
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
DBUG_PRINT("info", ("log event code %d", event_type));
if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
......@@ -583,7 +638,17 @@ impossible position";
goto err;
}
}
packet->set("\0", 1, &my_charset_bin);
if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
{
errmsg= "Failed to run hook 'after_send_event'";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
/* reset transmit packet for next loop */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
}
/*
......@@ -634,6 +699,11 @@ impossible position";
}
#endif
/* reset the transmit packet for the event read from binary log
file */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
/*
No one will update the log while we are reading
now, but we'll be quick and just read one record
......@@ -650,6 +720,7 @@ impossible position";
/* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock);
read_packet = 1;
event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
break;
case LOG_READ_EOF:
......@@ -676,8 +747,17 @@ impossible position";
}
if (read_packet)
{
thd_proc_info(thd, "Sending binlog event to slave");
{
thd_proc_info(thd, "Sending binlog event to slave");
pos = my_b_tell(&log);
if (RUN_HOOK(binlog_transmit, before_send_event,
(thd, flags, packet, log_file_name, pos)))
{
my_errno= ER_UNKNOWN_ERROR;
errmsg= "run 'before_send_event' hook failed";
goto err;
}
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
......@@ -685,7 +765,7 @@ impossible position";
goto err;
}
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
......@@ -694,11 +774,13 @@ impossible position";
goto err;
}
}
packet->set("\0", 1, &my_charset_bin);
/*
No need to net_flush because we will get to flush later when
we hit EOF pretty quick
*/
if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
{
my_errno= ER_UNKNOWN_ERROR;
errmsg= "Failed to run hook 'after_send_event'";
goto err;
}
}
if (fatal_error)
......@@ -734,6 +816,10 @@ impossible position";
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
/* reset transmit packet for the possible fake rotate event */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
/*
Call fake_rotate_event() in case the previous log (the one which
we have just finished reading) did not contain a Rotate event
......@@ -750,9 +836,6 @@ impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
packet->length(0);
packet->append('\0');
}
}
......@@ -760,6 +843,7 @@ end:
end_io_cache(&log);
(void)my_close(file, MYF(MY_WME));
RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
my_eof(thd);
thd_proc_info(thd, "Waiting to finalize termination");
pthread_mutex_lock(&LOCK_thread_count);
......@@ -770,6 +854,7 @@ end:
err:
thd_proc_info(thd, "Waiting to finalize termination");
end_io_cache(&log);
RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
/*
Exclude iteration through thread list
this is needed for purge_logs() - it will iterate through
......@@ -1064,6 +1149,7 @@ int reset_slave(THD *thd, Master_info* mi)
goto err;
}
RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err:
unlock_slave_threads(mi);
if (error)
......@@ -1363,7 +1449,11 @@ int reset_master(THD* thd)
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
return 1;
}
return mysql_bin_log.reset_logs(thd);
if (mysql_bin_log.reset_logs(thd))
return 1;
RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
return 0;
}
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
......@@ -1836,5 +1926,3 @@ int init_replication_sys_vars()
}
#endif /* HAVE_REPLICATION */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment