Commit 62d218ed authored by He Zhenxing's avatar He Zhenxing

Manual merge semi-sync to 5.1-rep+2

parents e9ba0600 d8b4e637
...@@ -16,6 +16,11 @@ ...@@ -16,6 +16,11 @@
#ifndef _my_plugin_h #ifndef _my_plugin_h
#define _my_plugin_h #define _my_plugin_h
/* size_t */
#include <stdlib.h>
typedef struct st_mysql MYSQL;
/* /*
On Windows, exports from DLL need to be declared On Windows, exports from DLL need to be declared
...@@ -75,7 +80,8 @@ typedef struct st_mysql_xid MYSQL_XID; ...@@ -75,7 +80,8 @@ typedef struct st_mysql_xid MYSQL_XID;
#define MYSQL_FTPARSER_PLUGIN 2 /* Full-text parser plugin */ #define MYSQL_FTPARSER_PLUGIN 2 /* Full-text parser plugin */
#define MYSQL_DAEMON_PLUGIN 3 /* The daemon/raw plugin type */ #define MYSQL_DAEMON_PLUGIN 3 /* The daemon/raw plugin type */
#define MYSQL_INFORMATION_SCHEMA_PLUGIN 4 /* The I_S plugin type */ #define MYSQL_INFORMATION_SCHEMA_PLUGIN 4 /* The I_S plugin type */
#define MYSQL_MAX_PLUGIN_TYPE_NUM 5 /* The number of plugin types */ #define MYSQL_REPLICATION_PLUGIN 5 /* The replication plugin type */
#define MYSQL_MAX_PLUGIN_TYPE_NUM 6 /* The number of plugin types */
/* We use the following strings to define licenses for plugins */ /* We use the following strings to define licenses for plugins */
#define PLUGIN_LICENSE_PROPRIETARY 0 #define PLUGIN_LICENSE_PROPRIETARY 0
...@@ -650,6 +656,17 @@ struct st_mysql_information_schema ...@@ -650,6 +656,17 @@ struct st_mysql_information_schema
int interface_version; int interface_version;
}; };
/*
API for Replication plugin. (MYSQL_REPLICATION_PLUGIN)
*/
#define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100
/**
Replication plugin descriptor
*/
struct Mysql_replication {
int interface_version;
};
/* /*
st_mysql_value struct for reading values from mysqld. st_mysql_value struct for reading values from mysqld.
...@@ -801,6 +818,64 @@ void mysql_query_cache_invalidate4(MYSQL_THD thd, ...@@ -801,6 +818,64 @@ void mysql_query_cache_invalidate4(MYSQL_THD thd,
const char *key, unsigned int key_length, const char *key, unsigned int key_length,
int using_trx); int using_trx);
/**
Get the value of user variable as an integer.
This function will return the value of variable @a name as an
integer. If the original value of the variable is not an integer,
the value will be converted into an integer.
@param name user variable name
@param value pointer to return the value
@param null_value if not NULL, the function will set it to true if
the value of variable is null, set to false if not
@retval 0 Success
@retval 1 Variable not found
*/
int get_user_var_int(const char *name,
long long int *value, int *null_value);
/**
Get the value of user variable as a double precision float number.
This function will return the value of variable @a name as real
number. If the original value of the variable is not a real number,
the value will be converted into a real number.
@param name user variable name
@param value pointer to return the value
@param null_value if not NULL, the function will set it to true if
the value of variable is null, set to false if not
@retval 0 Success
@retval 1 Variable not found
*/
int get_user_var_real(const char *name,
double *value, int *null_value);
/**
Get the value of user variable as a string.
This function will return the value of variable @a name as
string. If the original value of the variable is not a string,
the value will be converted into a string.
@param name user variable name
@param value pointer to the value buffer
@param len length of the value buffer
@param precision precision of the value if it is a float number
@param null_value if not NULL, the function will set it to true if
the value of variable is null, set to false if not
@retval 0 Success
@retval 1 Variable not found
*/
int get_user_var_str(const char *name,
char *value, unsigned long len,
unsigned int precision, int *null_value);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
#include <stdlib.h>
typedef struct st_mysql MYSQL;
struct st_mysql_lex_string struct st_mysql_lex_string
{ {
char *str; char *str;
...@@ -105,6 +107,9 @@ struct st_mysql_information_schema ...@@ -105,6 +107,9 @@ struct st_mysql_information_schema
{ {
int interface_version; int interface_version;
}; };
struct Mysql_replication {
int interface_version;
};
struct st_mysql_value struct st_mysql_value
{ {
int (*value_type)(struct st_mysql_value *); int (*value_type)(struct st_mysql_value *);
...@@ -137,3 +142,10 @@ void thd_get_xid(const void* thd, MYSQL_XID *xid); ...@@ -137,3 +142,10 @@ void thd_get_xid(const void* thd, MYSQL_XID *xid);
void mysql_query_cache_invalidate4(void* thd, void mysql_query_cache_invalidate4(void* thd,
const char *key, unsigned int key_length, const char *key, unsigned int key_length,
int using_trx); int using_trx);
int get_user_var_int(const char *name,
long long int *value, int *null_value);
int get_user_var_real(const char *name,
double *value, int *null_value);
int get_user_var_str(const char *name,
char *value, unsigned long len,
unsigned int precision, int *null_value);
...@@ -139,6 +139,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ...@@ -139,6 +139,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc
../sql/time.cc ../sql/tztime.cc ../sql/uniques.cc ../sql/unireg.cc ../sql/time.cc ../sql/tztime.cc ../sql/uniques.cc ../sql/unireg.cc
../sql/partition_info.cc ../sql/sql_connect.cc ../sql/partition_info.cc ../sql/sql_connect.cc
../sql/scheduler.cc ../sql/event_parse_data.cc ../sql/scheduler.cc ../sql/event_parse_data.cc
../sql/rpl_handler.cc
${GEN_SOURCES} ${GEN_SOURCES}
${LIB_SOURCES}) ${LIB_SOURCES})
......
...@@ -76,7 +76,8 @@ sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \ ...@@ -76,7 +76,8 @@ sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \
rpl_filter.cc sql_partition.cc sql_builtin.cc sql_plugin.cc \ rpl_filter.cc sql_partition.cc sql_builtin.cc sql_plugin.cc \
sql_tablespace.cc \ sql_tablespace.cc \
rpl_injector.cc my_user.c partition_info.cc \ rpl_injector.cc my_user.c partition_info.cc \
sql_servers.cc event_parse_data.cc sql_servers.cc event_parse_data.cc \
rpl_handler.cc
libmysqld_int_a_SOURCES= $(libmysqld_sources) libmysqld_int_a_SOURCES= $(libmysqld_sources)
nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources) nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources)
......
#
# Check if dynamic loading is supported
#
--require r/have_dynamic_loading.require
disable_query_log;
show variables like 'have_dynamic_loading';
enable_query_log;
#
# Check if the variable SEMISYNC_MASTER_PLUGIN is set
#
if (`select LENGTH('$SEMISYNC_MASTER_PLUGIN') = 0`)
{
skip Need semisync plugins;
}
...@@ -1815,6 +1815,30 @@ sub environment_setup { ...@@ -1815,6 +1815,30 @@ sub environment_setup {
$ENV{'EXAMPLE_PLUGIN_LOAD'}="--plugin_load=;EXAMPLE=".$plugin_filename.";"; $ENV{'EXAMPLE_PLUGIN_LOAD'}="--plugin_load=;EXAMPLE=".$plugin_filename.";";
} }
# --------------------------------------------------------------------------
# Add the path where mysqld will find semisync plugins
# --------------------------------------------------------------------------
my $lib_semisync_master_plugin=
mtr_file_exists(vs_config_dirs('plugin/semisync',"libsemisync_master.so"),
"$basedir/plugin/semisync/.libs/libsemisync_master.so",
"$basedir/lib/mysql/plugin/libsemisync_master.so");
my $lib_semisync_slave_plugin=
mtr_file_exists(vs_config_dirs('plugin/semisync',"libsemisync_slave.so"),
"$basedir/plugin/semisync/.libs/libsemisync_slave.so",
"$basedir/lib/mysql/plugin/libsemisync_slave.so");
if ($lib_semisync_master_plugin && $lib_semisync_slave_plugin)
{
$ENV{'SEMISYNC_MASTER_PLUGIN'}= basename($lib_semisync_master_plugin);
$ENV{'SEMISYNC_SLAVE_PLUGIN'}= basename($lib_semisync_slave_plugin);
$ENV{'SEMISYNC_PLUGIN_OPT'}= "--plugin-dir=".dirname($lib_semisync_master_plugin);
}
else
{
$ENV{'SEMISYNC_MASTER_PLUGIN'}= "";
$ENV{'SEMISYNC_SLAVE_PLUGIN'}= "";
$ENV{'SEMISYNC_PLUGIN_OPT'}="--plugin-dir=";
}
# ---------------------------------------------------- # ----------------------------------------------------
# Add the path where mysqld will find mypluglib.so # Add the path where mysqld will find mypluglib.so
# ---------------------------------------------------- # ----------------------------------------------------
......
This diff is collapsed.
...@@ -6,6 +6,7 @@ grant replication slave on *.* to replicate@localhost identified by 'aaaaaaaaaaa ...@@ -6,6 +6,7 @@ grant replication slave on *.* to replicate@localhost identified by 'aaaaaaaaaaa
grant replication slave on *.* to replicate@127.0.0.1 identified by 'aaaaaaaaaaaaaaab'; grant replication slave on *.* to replicate@127.0.0.1 identified by 'aaaaaaaaaaaaaaab';
connection slave; connection slave;
start slave; start slave;
source include/wait_for_slave_to_start.inc;
connection master; connection master;
--disable_warnings --disable_warnings
drop table if exists t1; drop table if exists t1;
......
This diff is collapsed.
# Copyright (C) 2006 MySQL AB
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
## Makefile.am for semi-synchronous replication
pkgplugindir = $(pkglibdir)/plugin
INCLUDES = -I$(top_srcdir)/include \
-I$(top_srcdir)/sql \
-I$(srcdir)
noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h
pkgplugin_LTLIBRARIES = libsemisync_master.la libsemisync_slave.la
libsemisync_master_la_LDFLAGS = -module
libsemisync_master_la_CXXFLAGS= $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_master_la_CFLAGS = $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_master_la_SOURCES = semisync.cc semisync_master.cc semisync_master_plugin.cc
libsemisync_slave_la_LDFLAGS = -module
libsemisync_slave_la_CXXFLAGS= $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_slave_la_CFLAGS = $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_slave_la_SOURCES = semisync.cc semisync_slave.cc semisync_slave_plugin.cc
# configure.in for semi-synchronous replication
AC_INIT(mysql-semi-sync-plugin, 0.2)
AM_INIT_AUTOMAKE
AC_DISABLE_STATIC
AC_PROG_LIBTOOL
AC_CONFIG_FILES([Makefile])
AC_OUTPUT
MYSQL_PLUGIN(semisync,[Semi-synchronous Replication Plugin],
[Semi-synchronous replication plugin.])
MYSQL_PLUGIN_DYNAMIC(semisync, [libsemisync_master.la libsemisync_slave.la])
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "semisync.h"
const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
const unsigned long Trace::kTraceGeneral = 0x0001;
const unsigned long Trace::kTraceDetail = 0x0010;
const unsigned long Trace::kTraceNetWait = 0x0020;
const unsigned long Trace::kTraceFunction = 0x0040;
const char ReplSemiSyncBase::kSyncHeader[2] =
{ReplSemiSyncBase::kPacketMagicNum, 0};
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef SEMISYNC_H
#define SEMISYNC_H
#include <stdint.h>
#include <string.h>
#include <assert.h>
#include <sys/time.h>
#include <time.h>
#include <stdio.h>
#include <pthread.h>
#include <mysql.h>
typedef uint32_t uint32;
typedef unsigned long long my_off_t;
#define FN_REFLEN 512 /* Max length of full path-name */
void sql_print_error(const char *format, ...);
void sql_print_warning(const char *format, ...);
void sql_print_information(const char *format, ...);
extern unsigned long max_connections;
#define MYSQL_SERVER
#define HAVE_REPLICATION
#include <my_global.h>
#include <my_pthread.h>
#include <mysql/plugin.h>
#include <replication.h>
typedef struct st_mysql_show_var SHOW_VAR;
typedef struct st_mysql_sys_var SYS_VAR;
/**
This class is used to trace function calls and other process
information
*/
class Trace {
public:
static const unsigned long kTraceFunction;
static const unsigned long kTraceGeneral;
static const unsigned long kTraceDetail;
static const unsigned long kTraceNetWait;
unsigned long trace_level_; /* the level for tracing */
inline void function_enter(const char *func_name)
{
if (trace_level_ & kTraceFunction)
sql_print_information("---> %s enter", func_name);
}
inline int function_exit(const char *func_name, int exit_code)
{
if (trace_level_ & kTraceFunction)
sql_print_information("<--- %s exit (%d)", func_name, exit_code);
return exit_code;
}
Trace()
:trace_level_(0L)
{}
Trace(unsigned long trace_level)
:trace_level_(trace_level)
{}
};
/**
Base class for semi-sync master and slave classes
*/
class ReplSemiSyncBase
:public Trace {
public:
static const char kSyncHeader[2]; /* three byte packet header */
/* Constants in network packet header. */
static const unsigned char kPacketMagicNum;
static const unsigned char kPacketFlagSync;
};
#endif /* SEMISYNC_H */
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
/* Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "semisync_slave.h"
char rpl_semi_sync_slave_enabled;
char rpl_semi_sync_slave_status= 0;
unsigned long rpl_semi_sync_slave_trace_level;
int ReplSemiSyncSlave::initObject()
{
int result= 0;
const char *kWho = "ReplSemiSyncSlave::initObject";
if (init_done_)
{
fprintf(stderr, "%s called twice\n", kWho);
return 1;
}
init_done_ = true;
/* References to the parameter works after set_options(). */
setSlaveEnabled(rpl_semi_sync_slave_enabled);
setTraceLevel(rpl_semi_sync_slave_trace_level);
return result;
}
int ReplSemiSyncSlave::slaveReplyConnect()
{
if (!mysql_reply && !(mysql_reply= rpl_connect_master(NULL)))
{
sql_print_error("Semisync slave connect to master for reply failed");
return 1;
}
return 0;
}
int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
unsigned long total_len,
bool *need_reply,
const char **payload,
unsigned long *payload_len)
{
const char *kWho = "ReplSemiSyncSlave::slaveReadSyncHeader";
int read_res = 0;
function_enter(kWho);
if ((unsigned char)(header[0]) == kPacketMagicNum)
{
*need_reply = (header[1] & kPacketFlagSync);
*payload_len = total_len - 2;
*payload = header + 2;
if (trace_level_ & kTraceDetail)
sql_print_information("%s: reply - %d", kWho, *need_reply);
}
else
{
sql_print_error("Missing magic number for semi-sync packet, packet "
"len: %lu", total_len);
read_res = -1;
}
return function_exit(kWho, read_res);
}
int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param)
{
bool semi_sync= getSlaveEnabled();
sql_print_information("Slave I/O thread: Start %s replication to\
master '%s@%s:%d' in log '%s' at position %lu",
semi_sync ? "semi-sync" : "asynchronous",
param->user, param->host, param->port,
param->master_log_name[0] ? param->master_log_name : "FIRST",
(unsigned long)param->master_log_pos);
if (semi_sync && !rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 1;
return 0;
}
int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
{
if (rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 0;
if (mysql_reply)
mysql_close(mysql_reply);
mysql_reply= 0;
return 0;
}
int ReplSemiSyncSlave::slaveReply(const char *log_name, my_off_t log_pos)
{
char query[FN_REFLEN + 100];
sprintf(query, "SET SESSION rpl_semi_sync_master_reply_log_file_pos='%llu:%s'",
(unsigned long long)log_pos, log_name);
if (mysql_real_query(mysql_reply, query, strlen(query)))
{
sql_print_error("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed");
mysql_free_result(mysql_store_result(mysql_reply));
mysql_close(mysql_reply);
mysql_reply= 0;
return 1;
}
mysql_free_result(mysql_store_result(mysql_reply));
return 0;
}
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef SEMISYNC_SLAVE_H
#define SEMISYNC_SLAVE_H
#include "semisync.h"
/**
The extension class for the slave of semi-synchronous replication
*/
class ReplSemiSyncSlave
:public ReplSemiSyncBase {
public:
ReplSemiSyncSlave()
:slave_enabled_(false)
{}
~ReplSemiSyncSlave() {}
void setTraceLevel(unsigned long trace_level) {
trace_level_ = trace_level;
}
/* Initialize this class after MySQL parameters are initialized. this
* function should be called once at bootstrap time.
*/
int initObject();
bool getSlaveEnabled() {
return slave_enabled_;
}
void setSlaveEnabled(bool enabled) {
slave_enabled_ = enabled;
}
/* A slave reads the semi-sync packet header and separate the metadata
* from the payload data.
*
* Input:
* header - (IN) packet header pointer
* total_len - (IN) total packet length: metadata + payload
* need_reply - (IN) whether the master is waiting for the reply
* payload - (IN) payload: the replication event
* payload_len - (IN) payload length
*
* Return:
* 0: success; -1 or otherwise: error
*/
int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
const char **payload, unsigned long *payload_len);
/* A slave replies to the master indicating its replication process. It
* indicates that the slave has received all events before the specified
* binlog position.
*
* Input:
* log_name - (IN) the reply point's binlog file name
* log_pos - (IN) the reply point's binlog file offset
*
* Return:
* 0: success; -1 or otherwise: error
*/
int slaveReply(const char *log_name, my_off_t log_pos);
/*
Connect to master for sending reply
*/
int slaveReplyConnect();
int slaveStart(Binlog_relay_IO_param *param);
int slaveStop(Binlog_relay_IO_param *param);
private:
/* True when initObject has been called */
bool init_done_;
bool slave_enabled_; /* semi-sycn is enabled on the slave */
MYSQL *mysql_reply; /* connection to send reply */
};
/* System and status variables for the slave component */
extern char rpl_semi_sync_slave_enabled;
extern unsigned long rpl_semi_sync_slave_trace_level;
extern char rpl_semi_sync_slave_status;
#endif /* SEMISYNC_SLAVE_H */
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "semisync_slave.h"
ReplSemiSyncSlave repl_semisync;
/*
indicate whether or not the slave should send a reply to the master.
This is set to true in repl_semi_slave_read_event if the current
event read is the last event of a transaction. And the value is
checked in repl_semi_slave_queue_event.
*/
bool semi_sync_need_reply= false;
int repl_semi_reset_slave(Binlog_relay_IO_param *param)
{
// TODO: reset semi-sync slave status here
return 0;
}
int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
uint32 flags)
{
MYSQL *mysql= param->mysql;
MYSQL_RES *res= 0;
MYSQL_ROW row;
const char *query;
if (!repl_semisync.getSlaveEnabled())
return 0;
/*
Create the connection that is used to send slave ACK replies to
master
*/
if (repl_semisync.slaveReplyConnect())
return 1;
/* Check if master server has semi-sync plugin installed */
query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
if (mysql_real_query(mysql, query, strlen(query)) ||
!(res= mysql_store_result(mysql)))
{
mysql_free_result(mysql_store_result(mysql));
sql_print_error("Execution failed on master: %s", query);
return 1;
}
row= mysql_fetch_row(res);
if (!row || strcmp(row[1], "ON"))
{
/* Master does not support or not configured semi-sync */
sql_print_warning("Master server does not support or not configured semi-sync replication, fallback to asynchronous");
rpl_semi_sync_slave_status= 0;
return 0;
}
/*
Tell master dump thread that we want to do semi-sync
replication
*/
query= "SET @rpl_semi_sync_slave= 1";
if (mysql_real_query(mysql, query, strlen(query)))
{
sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed");
mysql_free_result(mysql_store_result(mysql));
return 1;
}
mysql_free_result(mysql_store_result(mysql));
rpl_semi_sync_slave_status= 1;
return 0;
}
int repl_semi_slave_read_event(Binlog_relay_IO_param *param,
const char *packet, unsigned long len,
const char **event_buf, unsigned long *event_len)
{
if (rpl_semi_sync_slave_status)
return repl_semisync.slaveReadSyncHeader(packet, len,
&semi_sync_need_reply,
event_buf, event_len);
*event_buf= packet;
*event_len= len;
return 0;
}
int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
const char *event_buf,
unsigned long event_len,
uint32 flags)
{
if (rpl_semi_sync_slave_status && semi_sync_need_reply)
return repl_semisync.slaveReply(param->master_log_name,
param->master_log_pos);
return 0;
}
int repl_semi_slave_io_start(Binlog_relay_IO_param *param)
{
return repl_semisync.slaveStart(param);
}
int repl_semi_slave_io_end(Binlog_relay_IO_param *param)
{
return repl_semisync.slaveStop(param);
}
static void fix_rpl_semi_sync_slave_enabled(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
*(char *)ptr= *(char *)val;
repl_semisync.setSlaveEnabled(rpl_semi_sync_slave_enabled != 0);
return;
}
static void fix_rpl_semi_sync_trace_level(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
*(unsigned long *)ptr= *(unsigned long *)val;
repl_semisync.setTraceLevel(rpl_semi_sync_slave_trace_level);
return;
}
/* plugin system variables */
static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_slave_enabled,
PLUGIN_VAR_OPCMDARG,
"Enable semi-synchronous replication slave (disabled by default). ",
NULL, // check
&fix_rpl_semi_sync_slave_enabled, // update
0);
static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level,
PLUGIN_VAR_OPCMDARG,
"The tracing level for semi-sync replication.",
NULL, // check
&fix_rpl_semi_sync_trace_level, // update
32, 0, ~0L, 1);
static SYS_VAR* semi_sync_slave_system_vars[]= {
MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(trace_level),
NULL,
};
/* plugin status variables */
static SHOW_VAR semi_sync_slave_status_vars[]= {
{"Rpl_semi_sync_slave_status",
(char*) &rpl_semi_sync_slave_status, SHOW_BOOL},
{NULL, NULL, SHOW_BOOL},
};
Binlog_relay_IO_observer relay_io_observer = {
sizeof(Binlog_relay_IO_observer), // len
repl_semi_slave_io_start, // start
repl_semi_slave_io_end, // stop
repl_semi_slave_request_dump, // request_transmit
repl_semi_slave_read_event, // after_read_event
repl_semi_slave_queue_event, // after_queue_event
repl_semi_reset_slave, // reset
};
static int semi_sync_slave_plugin_init(void *p)
{
if (repl_semisync.initObject())
return 1;
if (register_binlog_relay_io_observer(&relay_io_observer, p))
return 1;
return 0;
}
static int semi_sync_slave_plugin_deinit(void *p)
{
if (unregister_binlog_relay_io_observer(&relay_io_observer, p))
return 1;
return 0;
}
struct Mysql_replication semi_sync_slave_plugin= {
MYSQL_REPLICATION_INTERFACE_VERSION
};
/*
Plugin library descriptor
*/
mysql_declare_plugin(semi_sync_slave)
{
MYSQL_REPLICATION_PLUGIN,
&semi_sync_slave_plugin,
"rpl_semi_sync_slave",
"He Zhenxing",
"Semi-synchronous replication slave",
PLUGIN_LICENSE_GPL,
semi_sync_slave_plugin_init, /* Plugin Init */
semi_sync_slave_plugin_deinit, /* Plugin Deinit */
0x0100 /* 1.0 */,
semi_sync_slave_status_vars, /* status variables */
semi_sync_slave_system_vars, /* system variables */
NULL /* config options */
}
mysql_declare_plugin_end;
...@@ -280,6 +280,7 @@ cp include/mysql/plugin.h $DESTDIR/include/mysql/ ...@@ -280,6 +280,7 @@ cp include/mysql/plugin.h $DESTDIR/include/mysql/
mkdir -p $DESTDIR/lib/opt mkdir -p $DESTDIR/lib/opt
mkdir -p $DESTDIR/lib/plugin mkdir -p $DESTDIR/lib/plugin
cp sql/$TARGET/mysqld.lib $DESTDIR/lib/
cp libmysql/$TARGET/libmysql.dll \ cp libmysql/$TARGET/libmysql.dll \
libmysql/$TARGET/libmysql.lib \ libmysql/$TARGET/libmysql.lib \
libmysql/$TARGET/mysqlclient.lib \ libmysql/$TARGET/mysqlclient.lib \
......
...@@ -75,6 +75,7 @@ SET (SQL_SOURCE ...@@ -75,6 +75,7 @@ SET (SQL_SOURCE
rpl_rli.cc rpl_mi.cc sql_servers.cc rpl_rli.cc rpl_mi.cc sql_servers.cc
sql_connect.cc scheduler.cc sql_connect.cc scheduler.cc
sql_profile.cc event_parse_data.cc sql_profile.cc event_parse_data.cc
rpl_handler.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc ${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.h ${PROJECT_SOURCE_DIR}/sql/sql_yacc.h
${PROJECT_SOURCE_DIR}/include/mysqld_error.h ${PROJECT_SOURCE_DIR}/include/mysqld_error.h
......
...@@ -76,7 +76,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ ...@@ -76,7 +76,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sql_plugin.h authors.h event_parse_data.h \ sql_plugin.h authors.h event_parse_data.h \
event_data_objects.h event_scheduler.h \ event_data_objects.h event_scheduler.h \
sql_partition.h partition_info.h partition_element.h \ sql_partition.h partition_info.h partition_element.h \
contributors.h sql_servers.h contributors.h sql_servers.h \
rpl_handler.h replication.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \ item.cc item_sum.cc item_buff.cc item_func.cc \
...@@ -120,7 +121,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ ...@@ -120,7 +121,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
event_queue.cc event_db_repository.cc events.cc \ event_queue.cc event_db_repository.cc events.cc \
sql_plugin.cc sql_binlog.cc \ sql_plugin.cc sql_binlog.cc \
sql_builtin.cc sql_tablespace.cc partition_info.cc \ sql_builtin.cc sql_tablespace.cc partition_info.cc \
sql_servers.cc event_parse_data.cc sql_servers.cc event_parse_data.cc \
rpl_handler.cc
nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c my_user.c nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c my_user.c
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#endif #endif
#include "mysql_priv.h" #include "mysql_priv.h"
#include "rpl_handler.h"
#include "rpl_filter.h" #include "rpl_filter.h"
#include <myisampack.h> #include <myisampack.h>
#include <errno.h> #include <errno.h>
...@@ -221,6 +222,8 @@ handlerton *ha_checktype(THD *thd, enum legacy_db_type database_type, ...@@ -221,6 +222,8 @@ handlerton *ha_checktype(THD *thd, enum legacy_db_type database_type,
return NULL; return NULL;
} }
RUN_HOOK(transaction, after_rollback, (thd, FALSE));
switch (database_type) { switch (database_type) {
#ifndef NO_HASH #ifndef NO_HASH
case DB_TYPE_HASH: case DB_TYPE_HASH:
...@@ -1206,6 +1209,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1206,6 +1209,7 @@ int ha_commit_trans(THD *thd, bool all)
if (cookie) if (cookie)
tc_log->unlog(cookie, xid); tc_log->unlog(cookie, xid);
DBUG_EXECUTE_IF("crash_commit_after", abort();); DBUG_EXECUTE_IF("crash_commit_after", abort(););
RUN_HOOK(transaction, after_commit, (thd, FALSE));
end: end:
if (rw_trans) if (rw_trans)
start_waiting_global_read_lock(thd); start_waiting_global_read_lock(thd);
...@@ -1353,6 +1357,7 @@ int ha_rollback_trans(THD *thd, bool all) ...@@ -1353,6 +1357,7 @@ int ha_rollback_trans(THD *thd, bool all)
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARNING_NOT_COMPLETE_ROLLBACK, ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK)); ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
RUN_HOOK(transaction, after_rollback, (thd, FALSE));
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -1387,7 +1392,14 @@ int ha_autocommit_or_rollback(THD *thd, int error) ...@@ -1387,7 +1392,14 @@ int ha_autocommit_or_rollback(THD *thd, int error)
thd->variables.tx_isolation=thd->session_tx_isolation; thd->variables.tx_isolation=thd->session_tx_isolation;
} }
else
#endif #endif
{
if (!error)
RUN_HOOK(transaction, after_commit, (thd, FALSE));
else
RUN_HOOK(transaction, after_rollback, (thd, FALSE));
}
DBUG_RETURN(error); DBUG_RETURN(error);
} }
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#endif #endif
#include <mysql/plugin.h> #include <mysql/plugin.h>
#include "rpl_handler.h"
/* max size of the log message */ /* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024 #define MAX_LOG_BUFFER_SIZE 1024
...@@ -4225,9 +4226,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) ...@@ -4225,9 +4226,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
if (file == &log_file) // we are writing to the real log (disk) if (file == &log_file) // we are writing to the real log (disk)
{ {
bool synced; bool synced= 0;
if (flush_and_sync(&synced)) if (flush_and_sync(&synced))
goto err; goto err;
if (RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, file->pos_in_file, synced))) {
sql_print_error("Failed to run 'after_flush' hooks");
goto err;
}
signal_update(); signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
} }
...@@ -4529,8 +4537,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) ...@@ -4529,8 +4537,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
ev.write(&log_file); ev.write(&log_file);
if (lock) if (lock)
{ {
bool synced; if (!error && !(error= flush_and_sync(0)))
if (!error && !(error= flush_and_sync(&synced)))
{ {
signal_update(); signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
...@@ -4618,7 +4625,7 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, ...@@ -4618,7 +4625,7 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
if (incident && write_incident(thd, FALSE)) if (incident && write_incident(thd, FALSE))
goto err; goto err;
bool synced; bool synced= 0;
if (flush_and_sync(&synced)) if (flush_and_sync(&synced))
goto err; goto err;
DBUG_EXECUTE_IF("half_binlogged_transaction", abort();); DBUG_EXECUTE_IF("half_binlogged_transaction", abort(););
...@@ -4628,6 +4635,15 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, ...@@ -4628,6 +4635,15 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
write_error=1; // Don't give more errors write_error=1; // Don't give more errors
goto err; goto err;
} }
if (RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, log_file.pos_in_file, synced)))
{
sql_print_error("Failed to run 'after_flush' hooks");
write_error=1;
goto err;
}
signal_update(); signal_update();
} }
......
...@@ -31,6 +31,8 @@ ...@@ -31,6 +31,8 @@
#include "rpl_injector.h" #include "rpl_injector.h"
#include "rpl_handler.h"
#ifdef HAVE_SYS_PRCTL_H #ifdef HAVE_SYS_PRCTL_H
#include <sys/prctl.h> #include <sys/prctl.h>
#endif #endif
...@@ -1287,6 +1289,7 @@ void clean_up(bool print_message) ...@@ -1287,6 +1289,7 @@ void clean_up(bool print_message)
ha_end(); ha_end();
if (tc_log) if (tc_log)
tc_log->close(); tc_log->close();
delegates_destroy();
xid_cache_free(); xid_cache_free();
delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache); delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
multi_keycache_free(); multi_keycache_free();
...@@ -3764,6 +3767,13 @@ static int init_server_components() ...@@ -3764,6 +3767,13 @@ static int init_server_components()
unireg_abort(1); unireg_abort(1);
} }
/* initialize delegates for extension observers */
if (delegates_init())
{
sql_print_error("Initialize extension delegates failed");
unireg_abort(1);
}
/* need to configure logging before initializing storage engines */ /* need to configure logging before initializing storage engines */
if (opt_update_log) if (opt_update_log)
{ {
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -277,6 +277,42 @@ const char *set_thd_proc_info(THD *thd, const char *info, ...@@ -277,6 +277,42 @@ const char *set_thd_proc_info(THD *thd, const char *info,
return old_info; return old_info;
} }
extern "C"
const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond,
pthread_mutex_t *mutex, const char *msg)
{
if (!thd)
thd= current_thd;
const char* old_msg = thd->proc_info;
safe_mutex_assert_owner(mutex);
thd->mysys_var->current_mutex = mutex;
thd->mysys_var->current_cond = cond;
thd->proc_info = msg;
return old_msg;
}
extern "C"
void thd_exit_cond(MYSQL_THD thd, const char *old_msg)
{
if (!thd)
thd= current_thd;
/*
Putting the mutex unlock in thd_exit_cond() ensures that
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
locked (if that would not be the case, you'll get a deadlock if someone
does a THD::awake() on you).
*/
pthread_mutex_unlock(thd->mysys_var->current_mutex);
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex = 0;
thd->mysys_var->current_cond = 0;
thd->proc_info = old_msg;
pthread_mutex_unlock(&thd->mysys_var->mutex);
return;
}
extern "C" extern "C"
void **thd_ha_data(const THD *thd, const struct handlerton *hton) void **thd_ha_data(const THD *thd, const struct handlerton *hton)
{ {
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "log.h" #include "log.h"
#include "rpl_tblmap.h" #include "rpl_tblmap.h"
#include "replication.h"
/** /**
An interface that is used to take an action when An interface that is used to take an action when
...@@ -1940,27 +1941,11 @@ public: ...@@ -1940,27 +1941,11 @@ public:
inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex, inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex,
const char* msg) const char* msg)
{ {
const char* old_msg = proc_info; return thd_enter_cond(this, cond, mutex, msg);
safe_mutex_assert_owner(mutex);
mysys_var->current_mutex = mutex;
mysys_var->current_cond = cond;
proc_info = msg;
return old_msg;
} }
inline void exit_cond(const char* old_msg) inline void exit_cond(const char* old_msg)
{ {
/* thd_exit_cond(this, old_msg);
Putting the mutex unlock in exit_cond() ensures that
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
locked (if that would not be the case, you'll get a deadlock if someone
does a THD::awake() on you).
*/
pthread_mutex_unlock(mysys_var->current_mutex);
pthread_mutex_lock(&mysys_var->mutex);
mysys_var->current_mutex = 0;
mysys_var->current_cond = 0;
proc_info = old_msg;
pthread_mutex_unlock(&mysys_var->mutex);
} }
inline time_t query_start() { query_start_used=1; return start_time; } inline time_t query_start() { query_start_used=1; return start_time; }
inline void set_time() inline void set_time()
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <m_ctype.h> #include <m_ctype.h>
#include <myisam.h> #include <myisam.h>
#include <my_dir.h> #include <my_dir.h>
#include "rpl_handler.h"
#include "sp_head.h" #include "sp_head.h"
#include "sp.h" #include "sp.h"
......
...@@ -19,14 +19,6 @@ ...@@ -19,14 +19,6 @@
#define REPORT_TO_LOG 1 #define REPORT_TO_LOG 1
#define REPORT_TO_USER 2 #define REPORT_TO_USER 2
#ifdef DBUG_OFF
#define plugin_ref_to_int(A) A
#define plugin_int_to_ref(A) A
#else
#define plugin_ref_to_int(A) (A ? A[0] : NULL)
#define plugin_int_to_ref(A) &(A)
#endif
extern struct st_mysql_plugin *mysqld_builtins[]; extern struct st_mysql_plugin *mysqld_builtins[];
/** /**
...@@ -54,7 +46,8 @@ const LEX_STRING plugin_type_names[MYSQL_MAX_PLUGIN_TYPE_NUM]= ...@@ -54,7 +46,8 @@ const LEX_STRING plugin_type_names[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{ C_STRING_WITH_LEN("STORAGE ENGINE") }, { C_STRING_WITH_LEN("STORAGE ENGINE") },
{ C_STRING_WITH_LEN("FTPARSER") }, { C_STRING_WITH_LEN("FTPARSER") },
{ C_STRING_WITH_LEN("DAEMON") }, { C_STRING_WITH_LEN("DAEMON") },
{ C_STRING_WITH_LEN("INFORMATION SCHEMA") } { C_STRING_WITH_LEN("INFORMATION SCHEMA") },
{ C_STRING_WITH_LEN("REPLICATION") },
}; };
extern int initialize_schema_table(st_plugin_int *plugin); extern int initialize_schema_table(st_plugin_int *plugin);
...@@ -93,7 +86,8 @@ static int min_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= ...@@ -93,7 +86,8 @@ static int min_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
MYSQL_HANDLERTON_INTERFACE_VERSION, MYSQL_HANDLERTON_INTERFACE_VERSION,
MYSQL_FTPARSER_INTERFACE_VERSION, MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION, MYSQL_DAEMON_INTERFACE_VERSION,
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
MYSQL_REPLICATION_INTERFACE_VERSION,
}; };
static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{ {
...@@ -101,7 +95,8 @@ static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= ...@@ -101,7 +95,8 @@ static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
MYSQL_HANDLERTON_INTERFACE_VERSION, MYSQL_HANDLERTON_INTERFACE_VERSION,
MYSQL_FTPARSER_INTERFACE_VERSION, MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION, MYSQL_DAEMON_INTERFACE_VERSION,
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
MYSQL_REPLICATION_INTERFACE_VERSION,
}; };
static bool initialized= 0; static bool initialized= 0;
......
...@@ -18,6 +18,14 @@ ...@@ -18,6 +18,14 @@
class sys_var; class sys_var;
#ifdef DBUG_OFF
#define plugin_ref_to_int(A) A
#define plugin_int_to_ref(A) A
#else
#define plugin_ref_to_int(A) (A ? A[0] : NULL)
#define plugin_int_to_ref(A) &(A)
#endif
/* /*
the following flags are valid for plugin_init() the following flags are valid for plugin_init()
*/ */
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment