Refactoring: Moved master info functionality into rpl_mi.cc to

make code easier to maintain.
parent c9b963cf
...@@ -54,7 +54,7 @@ ADD_EXECUTABLE(mysqld ../sql-common/client.c derror.cc des_key_file.cc ...@@ -54,7 +54,7 @@ ADD_EXECUTABLE(mysqld ../sql-common/client.c derror.cc des_key_file.cc
event_queue.cc event_db_repository.cc event_queue.cc event_db_repository.cc
sql_tablespace.cc events.cc ../sql-common/my_user.c sql_tablespace.cc events.cc ../sql-common/my_user.c
partition_info.cc rpl_utility.cc rpl_injector.cc sql_locale.cc partition_info.cc rpl_utility.cc rpl_injector.cc sql_locale.cc
rpl_rli.cc rpl_rli.cc rpl_mi.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc ${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.h ${PROJECT_SOURCE_DIR}/sql/sql_yacc.h
${PROJECT_SOURCE_DIR}/include/mysqld_error.h ${PROJECT_SOURCE_DIR}/include/mysqld_error.h
......
...@@ -53,7 +53,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ ...@@ -53,7 +53,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
ha_ndbcluster.h ha_ndbcluster_binlog.h \ ha_ndbcluster.h ha_ndbcluster_binlog.h \
ha_ndbcluster_tables.h \ ha_ndbcluster_tables.h \
opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \ opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
log.h sql_show.h rpl_rli.h \ log.h sql_show.h rpl_rli.h rpl_mi.h \
sql_select.h structs.h table.h sql_udf.h hash_filo.h \ sql_select.h structs.h table.h sql_udf.h hash_filo.h \
lex.h lex_symbol.h sql_acl.h sql_crypt.h \ lex.h lex_symbol.h sql_acl.h sql_crypt.h \
log_event.h sql_repl.h slave.h rpl_filter.h \ log_event.h sql_repl.h slave.h rpl_filter.h \
...@@ -93,7 +93,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ ...@@ -93,7 +93,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
sql_load.cc mf_iocache.cc field_conv.cc sql_show.cc \ sql_load.cc mf_iocache.cc field_conv.cc sql_show.cc \
sql_udf.cc sql_analyse.cc sql_analyse.h sql_cache.cc \ sql_udf.cc sql_analyse.cc sql_analyse.h sql_cache.cc \
slave.cc sql_repl.cc rpl_filter.cc rpl_tblmap.cc \ slave.cc sql_repl.cc rpl_filter.cc rpl_tblmap.cc \
rpl_utility.cc rpl_injector.cc rpl_rli.cc \ rpl_utility.cc rpl_injector.cc rpl_rli.cc rpl_mi.cc \
sql_union.cc sql_derived.cc \ sql_union.cc sql_derived.cc \
client.c sql_client.cc mini_client_errors.c pack.c\ client.c sql_client.cc mini_client_errors.c pack.c\
stacktrace.c repl_failsafe.h repl_failsafe.cc \ stacktrace.c repl_failsafe.h repl_failsafe.cc \
......
/* Copyright (C) 2000-2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h> // For HAVE_REPLICATION
#include "mysql_priv.h"
#include <my_dir.h>
#include "rpl_mi.h"
#ifdef HAVE_REPLICATION
// Defined in slave.cc
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
const char *default_val);
MASTER_INFO::MASTER_INFO()
:ssl(0), fd(-1), io_thd(0), inited(0),
abort_slave(0),slave_running(0), slave_run_id(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
ssl_cipher[0]= 0; ssl_key[0]= 0;
bzero((char*) &file, sizeof(file));
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
pthread_cond_init(&data_cond, NULL);
pthread_cond_init(&start_cond, NULL);
pthread_cond_init(&stop_cond, NULL);
}
MASTER_INFO::~MASTER_INFO()
{
pthread_mutex_destroy(&run_lock);
pthread_mutex_destroy(&data_lock);
pthread_cond_destroy(&data_cond);
pthread_cond_destroy(&start_cond);
pthread_cond_destroy(&stop_cond);
}
void init_master_info_with_options(MASTER_INFO* mi)
{
DBUG_ENTER("init_master_info_with_options");
mi->master_log_name[0] = 0;
mi->master_log_pos = BIN_LOG_HEADER_SIZE; // skip magic number
if (master_host)
strmake(mi->host, master_host, sizeof(mi->host) - 1);
if (master_user)
strmake(mi->user, master_user, sizeof(mi->user) - 1);
if (master_password)
strmake(mi->password, master_password, MAX_PASSWORD_LENGTH);
mi->port = master_port;
mi->connect_retry = master_connect_retry;
mi->ssl= master_ssl;
if (master_ssl_ca)
strmake(mi->ssl_ca, master_ssl_ca, sizeof(mi->ssl_ca)-1);
if (master_ssl_capath)
strmake(mi->ssl_capath, master_ssl_capath, sizeof(mi->ssl_capath)-1);
if (master_ssl_cert)
strmake(mi->ssl_cert, master_ssl_cert, sizeof(mi->ssl_cert)-1);
if (master_ssl_cipher)
strmake(mi->ssl_cipher, master_ssl_cipher, sizeof(mi->ssl_cipher)-1);
if (master_ssl_key)
strmake(mi->ssl_key, master_ssl_key, sizeof(mi->ssl_key)-1);
DBUG_VOID_RETURN;
}
#define LINES_IN_MASTER_INFO_WITH_SSL 14
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
const char* slave_info_fname,
bool abort_if_no_master_info_file,
int thread_mask)
{
int fd,error;
char fname[FN_REFLEN+128];
DBUG_ENTER("init_master_info");
if (mi->inited)
{
/*
We have to reset read position of relay-log-bin as we may have
already been reading from 'hotlog' when the slave was stopped
last time. If this case pos_in_file would be set and we would
get a crash when trying to read the signature for the binary
relay log.
We only rewind the read position if we are starting the SQL
thread. The handle_slave_sql thread assumes that the read
position is at the beginning of the file, and will read the
"signature" and then fast-forward to the last position read.
*/
if (thread_mask & SLAVE_SQL)
{
my_b_seek(mi->rli.cur_log, (my_off_t) 0);
}
DBUG_RETURN(0);
}
mi->mysql=0;
mi->file_id=1;
fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
/*
We need a mutex while we are changing master info parameters to
keep other threads from reading bogus info
*/
pthread_mutex_lock(&mi->data_lock);
fd = mi->fd;
/* does master.info exist ? */
if (access(fname,F_OK))
{
if (abort_if_no_master_info_file)
{
pthread_mutex_unlock(&mi->data_lock);
DBUG_RETURN(0);
}
/*
if someone removed the file from underneath our feet, just close
the old descriptor and re-create the old file
*/
if (fd >= 0)
my_close(fd, MYF(MY_WME));
if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
{
sql_print_error("Failed to create a new master info file (\
file '%s', errno %d)", fname, my_errno);
goto err;
}
if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
MYF(MY_WME)))
{
sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
goto err;
}
mi->fd = fd;
init_master_info_with_options(mi);
}
else // file exists
{
if (fd >= 0)
reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
else
{
if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
{
sql_print_error("Failed to open the existing master info file (\
file '%s', errno %d)", fname, my_errno);
goto err;
}
if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
0, MYF(MY_WME)))
{
sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
goto err;
}
}
mi->fd = fd;
int port, connect_retry, master_log_pos, ssl= 0, lines;
char *first_non_digit;
/*
Starting from 4.1.x master.info has new format. Now its
first line contains number of lines in file. By reading this
number we will be always distinguish to which version our
master.info corresponds to. We can't simply count lines in
file since versions before 4.1.x could generate files with more
lines than needed.
If first line doesn't contain a number or contain number less than
14 then such file is treated like file from pre 4.1.1 version.
There is no ambiguity when reading an old master.info, as before
4.1.1, the first line contained the binlog's name, which is either
empty or has an extension (contains a '.'), so can't be confused
with an integer.
So we're just reading first line and trying to figure which version
is this.
*/
/*
The first row is temporarily stored in mi->master_log_name,
if it is line count and not binlog name (new format) it will be
overwritten by the second row later.
*/
if (init_strvar_from_file(mi->master_log_name,
sizeof(mi->master_log_name), &mi->file,
""))
goto errwithmsg;
lines= strtoul(mi->master_log_name, &first_non_digit, 10);
if (mi->master_log_name[0]!='\0' &&
*first_non_digit=='\0' && lines >= LINES_IN_MASTER_INFO_WITH_SSL)
{ // Seems to be new format
if (init_strvar_from_file(mi->master_log_name,
sizeof(mi->master_log_name), &mi->file, ""))
goto errwithmsg;
}
else
lines= 7;
if (init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
master_host) ||
init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
master_user) ||
init_strvar_from_file(mi->password, SCRAMBLED_PASSWORD_CHAR_LENGTH+1,
&mi->file, master_password) ||
init_intvar_from_file(&port, &mi->file, master_port) ||
init_intvar_from_file(&connect_retry, &mi->file,
master_connect_retry))
goto errwithmsg;
/*
If file has ssl part use it even if we have server without
SSL support. But these option will be ignored later when
slave will try connect to master, so in this case warning
is printed.
*/
if (lines >= LINES_IN_MASTER_INFO_WITH_SSL &&
(init_intvar_from_file(&ssl, &mi->file, master_ssl) ||
init_strvar_from_file(mi->ssl_ca, sizeof(mi->ssl_ca),
&mi->file, master_ssl_ca) ||
init_strvar_from_file(mi->ssl_capath, sizeof(mi->ssl_capath),
&mi->file, master_ssl_capath) ||
init_strvar_from_file(mi->ssl_cert, sizeof(mi->ssl_cert),
&mi->file, master_ssl_cert) ||
init_strvar_from_file(mi->ssl_cipher, sizeof(mi->ssl_cipher),
&mi->file, master_ssl_cipher) ||
init_strvar_from_file(mi->ssl_key, sizeof(mi->ssl_key),
&mi->file, master_ssl_key)))
goto errwithmsg;
#ifndef HAVE_OPENSSL
if (ssl)
sql_print_warning("SSL information in the master info file "
"('%s') are ignored because this MySQL slave was compiled "
"without SSL support.", fname);
#endif /* HAVE_OPENSSL */
/*
This has to be handled here as init_intvar_from_file can't handle
my_off_t types
*/
mi->master_log_pos= (my_off_t) master_log_pos;
mi->port= (uint) port;
mi->connect_retry= (uint) connect_retry;
mi->ssl= (my_bool) ssl;
}
DBUG_PRINT("master_info",("log_file_name: %s position: %ld",
mi->master_log_name,
(ulong) mi->master_log_pos));
mi->rli.mi = mi;
if (init_relay_log_info(&mi->rli, slave_info_fname))
goto err;
mi->inited = 1;
// now change cache READ -> WRITE - must do this before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1);
if ((error=test(flush_master_info(mi, 1))))
sql_print_error("Failed to flush master info file");
pthread_mutex_unlock(&mi->data_lock);
DBUG_RETURN(error);
errwithmsg:
sql_print_error("Error reading master configuration");
err:
if (fd >= 0)
{
my_close(fd, MYF(0));
end_io_cache(&mi->file);
}
mi->fd= -1;
pthread_mutex_unlock(&mi->data_lock);
DBUG_RETURN(1);
}
/*
RETURN
2 - flush relay log failed
1 - flush master info failed
0 - all ok
*/
int flush_master_info(MASTER_INFO* mi, bool flush_relay_log_cache)
{
IO_CACHE* file = &mi->file;
char lbuf[22];
DBUG_ENTER("flush_master_info");
DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));
/*
Flush the relay log to disk. If we don't do it, then the relay log while
have some part (its last kilobytes) in memory only, so if the slave server
dies now, with, say, from master's position 100 to 150 in memory only (not
on disk), and with position 150 in master.info, then when the slave
restarts, the I/O thread will fetch binlogs from 150, so in the relay log
we will have "[0, 100] U [150, infinity[" and nobody will notice it, so the
SQL thread will jump from 100 to 150, and replication will silently break.
When we come to this place in code, relay log may or not be initialized;
the caller is responsible for setting 'flush_relay_log_cache' accordingly.
*/
if (flush_relay_log_cache &&
flush_io_cache(mi->rli.relay_log.get_log_file()))
DBUG_RETURN(2);
/*
We flushed the relay log BEFORE the master.info file, because if we crash
now, we will get a duplicate event in the relay log at restart. If we
flushed in the other order, we would get a hole in the relay log.
And duplicate is better than hole (with a duplicate, in later versions we
can add detection and scrap one event; with a hole there's nothing we can
do).
*/
/*
In certain cases this code may create master.info files that seems
corrupted, because of extra lines filled with garbage in the end
file (this happens if new contents take less space than previous
contents of file). But because of number of lines in the first line
of file we don't care about this garbage.
*/
my_b_seek(file, 0L);
my_b_printf(file, "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n",
LINES_IN_MASTER_INFO_WITH_SSL,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
mi->host, mi->user,
mi->password, mi->port, mi->connect_retry,
(int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
mi->ssl_cipher, mi->ssl_key);
DBUG_RETURN(-flush_io_cache(file));
}
void end_master_info(MASTER_INFO* mi)
{
DBUG_ENTER("end_master_info");
if (!mi->inited)
DBUG_VOID_RETURN;
end_relay_log_info(&mi->rli);
if (mi->fd >= 0)
{
end_io_cache(&mi->file);
(void)my_close(mi->fd, MYF(MY_WME));
mi->fd = -1;
}
mi->inited = 0;
DBUG_VOID_RETURN;
}
#endif /* HAVE_REPLICATION */
/* Copyright (C) 2000-2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef RPL_MI_H
#define RPL_MI_H
#ifdef HAVE_REPLICATION
/*****************************************************************************
Replication IO Thread
MASTER_INFO contains:
- information about how to connect to a master
- current master log name
- current master log offset
- misc control variables
MASTER_INFO is initialized once from the master.info file if such
exists. Otherwise, data members corresponding to master.info fields
are initialized with defaults specified by master-* options. The
initialization is done through init_master_info() call.
The format of master.info file:
log_name
log_pos
master_host
master_user
master_pass
master_port
master_connect_retry
To write out the contents of master.info file to disk ( needed every
time we read and queue data from the master ), a call to
flush_master_info() is required.
To clean up, call end_master_info()
*****************************************************************************/
class MASTER_INFO
{
public:
MASTER_INFO();
~MASTER_INFO();
/* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN];
char host[HOSTNAME_LENGTH+1];
char user[USERNAME_LENGTH+1];
char password[MAX_PASSWORD_LENGTH+1];
my_bool ssl; // enables use of SSL connection if true
char ssl_ca[FN_REFLEN], ssl_capath[FN_REFLEN], ssl_cert[FN_REFLEN];
char ssl_cipher[FN_REFLEN], ssl_key[FN_REFLEN];
my_off_t master_log_pos;
File fd; // we keep the file open, so we need to remember the file pointer
IO_CACHE file;
pthread_mutex_t data_lock,run_lock;
pthread_cond_t data_cond,start_cond,stop_cond;
THD *io_thd;
MYSQL* mysql;
uint32 file_id; /* for 3.23 load data infile */
RELAY_LOG_INFO rli;
uint port;
uint connect_retry;
#ifndef DBUG_OFF
int events_till_disconnect;
#endif
bool inited;
volatile bool abort_slave;
volatile uint slave_running;
volatile ulong slave_run_id;
/*
The difference in seconds between the clock of the master and the clock of
the slave (second - first). It must be signed as it may be <0 or >0.
clock_diff_with_master is computed when the I/O thread starts; for this the
I/O thread does a SELECT UNIX_TIMESTAMP() on the master.
"how late the slave is compared to the master" is computed like this:
clock_of_slave - last_timestamp_executed_by_SQL_thread - clock_diff_with_master
*/
long clock_diff_with_master;
};
void init_master_info_with_options(MASTER_INFO* mi);
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
const char* slave_info_fname,
bool abort_if_no_master_info_file,
int thread_mask);
void end_master_info(MASTER_INFO* mi);
int flush_master_info(MASTER_INFO* mi, bool flush_relay_log_cache);
#endif /* HAVE_REPLICATION */
#endif /* RPL_MI_H */
...@@ -99,8 +99,8 @@ typedef struct st_relay_log_info ...@@ -99,8 +99,8 @@ typedef struct st_relay_log_info
*/ */
pthread_cond_t start_cond, stop_cond, data_cond; pthread_cond_t start_cond, stop_cond, data_cond;
/* parent master info structure */ /* parent MASTER_INFO structure */
struct st_master_info *mi; class MASTER_INFO *mi;
/* /*
Needed to deal properly with cur_log getting closed and re-opened with Needed to deal properly with cur_log getting closed and re-opened with
......
...@@ -31,6 +31,8 @@ ...@@ -31,6 +31,8 @@
#include "rpl_tblmap.h" #include "rpl_tblmap.h"
int queue_event(MASTER_INFO* mi,const char* buf,ulong event_len);
#define MAX_SLAVE_RETRY_PAUSE 5 #define MAX_SLAVE_RETRY_PAUSE 5
bool use_slave_mask = 0; bool use_slave_mask = 0;
...@@ -38,7 +40,6 @@ MY_BITMAP slave_error_mask; ...@@ -38,7 +40,6 @@ MY_BITMAP slave_error_mask;
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*); typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
volatile bool slave_sql_running = 0, slave_io_running = 0;
char* slave_load_tmpdir = 0; char* slave_load_tmpdir = 0;
MASTER_INFO *active_mi= 0; MASTER_INFO *active_mi= 0;
my_bool replicate_same_server_id; my_bool replicate_same_server_id;
...@@ -1071,25 +1072,6 @@ int fetch_master_table(THD *thd, const char *db_name, const char *table_name, ...@@ -1071,25 +1072,6 @@ int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
} }
void end_master_info(MASTER_INFO* mi)
{
DBUG_ENTER("end_master_info");
if (!mi->inited)
DBUG_VOID_RETURN;
end_relay_log_info(&mi->rli);
if (mi->fd >= 0)
{
end_io_cache(&mi->file);
(void)my_close(mi->fd, MYF(MY_WME));
mi->fd = -1;
}
mi->inited = 0;
DBUG_VOID_RETURN;
}
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli) static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
{ {
bool slave_killed=0; bool slave_killed=0;
...@@ -1165,258 +1147,6 @@ static void write_ignored_events_info_to_relay_log(THD *thd, MASTER_INFO *mi) ...@@ -1165,258 +1147,6 @@ static void write_ignored_events_info_to_relay_log(THD *thd, MASTER_INFO *mi)
} }
void init_master_info_with_options(MASTER_INFO* mi)
{
DBUG_ENTER("init_master_info_with_options");
mi->master_log_name[0] = 0;
mi->master_log_pos = BIN_LOG_HEADER_SIZE; // skip magic number
if (master_host)
strmake(mi->host, master_host, sizeof(mi->host) - 1);
if (master_user)
strmake(mi->user, master_user, sizeof(mi->user) - 1);
if (master_password)
strmake(mi->password, master_password, MAX_PASSWORD_LENGTH);
mi->port = master_port;
mi->connect_retry = master_connect_retry;
mi->ssl= master_ssl;
if (master_ssl_ca)
strmake(mi->ssl_ca, master_ssl_ca, sizeof(mi->ssl_ca)-1);
if (master_ssl_capath)
strmake(mi->ssl_capath, master_ssl_capath, sizeof(mi->ssl_capath)-1);
if (master_ssl_cert)
strmake(mi->ssl_cert, master_ssl_cert, sizeof(mi->ssl_cert)-1);
if (master_ssl_cipher)
strmake(mi->ssl_cipher, master_ssl_cipher, sizeof(mi->ssl_cipher)-1);
if (master_ssl_key)
strmake(mi->ssl_key, master_ssl_key, sizeof(mi->ssl_key)-1);
DBUG_VOID_RETURN;
}
#define LINES_IN_MASTER_INFO_WITH_SSL 14
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
const char* slave_info_fname,
bool abort_if_no_master_info_file,
int thread_mask)
{
int fd,error;
char fname[FN_REFLEN+128];
DBUG_ENTER("init_master_info");
if (mi->inited)
{
/*
We have to reset read position of relay-log-bin as we may have
already been reading from 'hotlog' when the slave was stopped
last time. If this case pos_in_file would be set and we would
get a crash when trying to read the signature for the binary
relay log.
We only rewind the read position if we are starting the SQL
thread. The handle_slave_sql thread assumes that the read
position is at the beginning of the file, and will read the
"signature" and then fast-forward to the last position read.
*/
if (thread_mask & SLAVE_SQL)
{
my_b_seek(mi->rli.cur_log, (my_off_t) 0);
}
DBUG_RETURN(0);
}
mi->mysql=0;
mi->file_id=1;
fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
/*
We need a mutex while we are changing master info parameters to
keep other threads from reading bogus info
*/
pthread_mutex_lock(&mi->data_lock);
fd = mi->fd;
/* does master.info exist ? */
if (access(fname,F_OK))
{
if (abort_if_no_master_info_file)
{
pthread_mutex_unlock(&mi->data_lock);
DBUG_RETURN(0);
}
/*
if someone removed the file from underneath our feet, just close
the old descriptor and re-create the old file
*/
if (fd >= 0)
my_close(fd, MYF(MY_WME));
if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
{
sql_print_error("Failed to create a new master info file (\
file '%s', errno %d)", fname, my_errno);
goto err;
}
if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
MYF(MY_WME)))
{
sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
goto err;
}
mi->fd = fd;
init_master_info_with_options(mi);
}
else // file exists
{
if (fd >= 0)
reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
else
{
if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
{
sql_print_error("Failed to open the existing master info file (\
file '%s', errno %d)", fname, my_errno);
goto err;
}
if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
0, MYF(MY_WME)))
{
sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
goto err;
}
}
mi->fd = fd;
int port, connect_retry, master_log_pos, ssl= 0, lines;
char *first_non_digit;
/*
Starting from 4.1.x master.info has new format. Now its
first line contains number of lines in file. By reading this
number we will be always distinguish to which version our
master.info corresponds to. We can't simply count lines in
file since versions before 4.1.x could generate files with more
lines than needed.
If first line doesn't contain a number or contain number less than
14 then such file is treated like file from pre 4.1.1 version.
There is no ambiguity when reading an old master.info, as before
4.1.1, the first line contained the binlog's name, which is either
empty or has an extension (contains a '.'), so can't be confused
with an integer.
So we're just reading first line and trying to figure which version
is this.
*/
/*
The first row is temporarily stored in mi->master_log_name,
if it is line count and not binlog name (new format) it will be
overwritten by the second row later.
*/
if (init_strvar_from_file(mi->master_log_name,
sizeof(mi->master_log_name), &mi->file,
""))
goto errwithmsg;
lines= strtoul(mi->master_log_name, &first_non_digit, 10);
if (mi->master_log_name[0]!='\0' &&
*first_non_digit=='\0' && lines >= LINES_IN_MASTER_INFO_WITH_SSL)
{ // Seems to be new format
if (init_strvar_from_file(mi->master_log_name,
sizeof(mi->master_log_name), &mi->file, ""))
goto errwithmsg;
}
else
lines= 7;
if (init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
master_host) ||
init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
master_user) ||
init_strvar_from_file(mi->password, SCRAMBLED_PASSWORD_CHAR_LENGTH+1,
&mi->file, master_password) ||
init_intvar_from_file(&port, &mi->file, master_port) ||
init_intvar_from_file(&connect_retry, &mi->file,
master_connect_retry))
goto errwithmsg;
/*
If file has ssl part use it even if we have server without
SSL support. But these option will be ignored later when
slave will try connect to master, so in this case warning
is printed.
*/
if (lines >= LINES_IN_MASTER_INFO_WITH_SSL &&
(init_intvar_from_file(&ssl, &mi->file, master_ssl) ||
init_strvar_from_file(mi->ssl_ca, sizeof(mi->ssl_ca),
&mi->file, master_ssl_ca) ||
init_strvar_from_file(mi->ssl_capath, sizeof(mi->ssl_capath),
&mi->file, master_ssl_capath) ||
init_strvar_from_file(mi->ssl_cert, sizeof(mi->ssl_cert),
&mi->file, master_ssl_cert) ||
init_strvar_from_file(mi->ssl_cipher, sizeof(mi->ssl_cipher),
&mi->file, master_ssl_cipher) ||
init_strvar_from_file(mi->ssl_key, sizeof(mi->ssl_key),
&mi->file, master_ssl_key)))
goto errwithmsg;
#ifndef HAVE_OPENSSL
if (ssl)
sql_print_warning("SSL information in the master info file "
"('%s') are ignored because this MySQL slave was compiled "
"without SSL support.", fname);
#endif /* HAVE_OPENSSL */
/*
This has to be handled here as init_intvar_from_file can't handle
my_off_t types
*/
mi->master_log_pos= (my_off_t) master_log_pos;
mi->port= (uint) port;
mi->connect_retry= (uint) connect_retry;
mi->ssl= (my_bool) ssl;
}
DBUG_PRINT("master_info",("log_file_name: %s position: %ld",
mi->master_log_name,
(ulong) mi->master_log_pos));
mi->rli.mi = mi;
if (init_relay_log_info(&mi->rli, slave_info_fname))
goto err;
mi->inited = 1;
// now change cache READ -> WRITE - must do this before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1);
if ((error=test(flush_master_info(mi, 1))))
sql_print_error("Failed to flush master info file");
pthread_mutex_unlock(&mi->data_lock);
DBUG_RETURN(error);
errwithmsg:
sql_print_error("Error reading master configuration");
err:
if (fd >= 0)
{
my_close(fd, MYF(0));
end_io_cache(&mi->file);
}
mi->fd= -1;
pthread_mutex_unlock(&mi->data_lock);
DBUG_RETURN(1);
}
int register_slave_on_master(MYSQL* mysql) int register_slave_on_master(MYSQL* mysql)
{ {
char buf[1024], *pos= buf; char buf[1024], *pos= buf;
...@@ -1635,63 +1365,6 @@ bool show_master_info(THD* thd, MASTER_INFO* mi) ...@@ -1635,63 +1365,6 @@ bool show_master_info(THD* thd, MASTER_INFO* mi)
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
/*
RETURN
2 - flush relay log failed
1 - flush master info failed
0 - all ok
*/
int flush_master_info(MASTER_INFO* mi, bool flush_relay_log_cache)
{
IO_CACHE* file = &mi->file;
char lbuf[22];
DBUG_ENTER("flush_master_info");
DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));
/*
Flush the relay log to disk. If we don't do it, then the relay log while
have some part (its last kilobytes) in memory only, so if the slave server
dies now, with, say, from master's position 100 to 150 in memory only (not
on disk), and with position 150 in master.info, then when the slave
restarts, the I/O thread will fetch binlogs from 150, so in the relay log
we will have "[0, 100] U [150, infinity[" and nobody will notice it, so the
SQL thread will jump from 100 to 150, and replication will silently break.
When we come to this place in code, relay log may or not be initialized;
the caller is responsible for setting 'flush_relay_log_cache' accordingly.
*/
if (flush_relay_log_cache &&
flush_io_cache(mi->rli.relay_log.get_log_file()))
DBUG_RETURN(2);
/*
We flushed the relay log BEFORE the master.info file, because if we crash
now, we will get a duplicate event in the relay log at restart. If we
flushed in the other order, we would get a hole in the relay log.
And duplicate is better than hole (with a duplicate, in later versions we
can add detection and scrap one event; with a hole there's nothing we can
do).
*/
/*
In certain cases this code may create master.info files that seems
corrupted, because of extra lines filled with garbage in the end
file (this happens if new contents take less space than previous
contents of file). But because of number of lines in the first line
of file we don't care about this garbage.
*/
my_b_seek(file, 0L);
my_b_printf(file, "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n",
LINES_IN_MASTER_INFO_WITH_SSL,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
mi->host, mi->user,
mi->password, mi->port, mi->connect_retry,
(int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
mi->ssl_cipher, mi->ssl_key);
DBUG_RETURN(-flush_io_cache(file));
}
void set_slave_thread_options(THD* thd) void set_slave_thread_options(THD* thd)
{ {
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "rpl_filter.h" #include "rpl_filter.h"
#include "rpl_tblmap.h" #include "rpl_tblmap.h"
#include "rpl_rli.h" #include "rpl_rli.h"
#include "rpl_mi.h"
#define SLAVE_NET_TIMEOUT 3600 #define SLAVE_NET_TIMEOUT 3600
...@@ -38,11 +39,11 @@ ...@@ -38,11 +39,11 @@
I/O Thread - One of these threads is started for each master server. I/O Thread - One of these threads is started for each master server.
They maintain a connection to their master server, read log They maintain a connection to their master server, read log
events from the master as they arrive, and queues them into events from the master as they arrive, and queues them into
a single, shared relay log file. A MASTER_INFO struct a single, shared relay log file. A MASTER_INFO
represents each of these threads. represents each of these threads.
SQL Thread - One of these threads is started and reads from the relay log SQL Thread - One of these threads is started and reads from the relay log
file, executing each event. A RELAY_LOG_INFO struct file, executing each event. A RELAY_LOG_INFO
represents this thread. represents this thread.
Buffering in the relay log file makes it unnecessary to reread events from Buffering in the relay log file makes it unnecessary to reread events from
...@@ -95,7 +96,6 @@ extern my_string opt_relay_logname, opt_relaylog_index_name; ...@@ -95,7 +96,6 @@ extern my_string opt_relay_logname, opt_relaylog_index_name;
extern my_bool opt_skip_slave_start, opt_reckless_slave; extern my_bool opt_skip_slave_start, opt_reckless_slave;
extern my_bool opt_log_slave_updates; extern my_bool opt_log_slave_updates;
extern ulonglong relay_log_space_limit; extern ulonglong relay_log_space_limit;
struct st_master_info;
/* /*
3 possible values for MASTER_INFO::slave_running and 3 possible values for MASTER_INFO::slave_running and
...@@ -114,110 +114,6 @@ struct st_master_info; ...@@ -114,110 +114,6 @@ struct st_master_info;
static Log_event* next_event(RELAY_LOG_INFO* rli); static Log_event* next_event(RELAY_LOG_INFO* rli);
/*****************************************************************************
Replication IO Thread
st_master_info contains:
- information about how to connect to a master
- current master log name
- current master log offset
- misc control variables
st_master_info is initialized once from the master.info file if such
exists. Otherwise, data members corresponding to master.info fields
are initialized with defaults specified by master-* options. The
initialization is done through init_master_info() call.
The format of master.info file:
log_name
log_pos
master_host
master_user
master_pass
master_port
master_connect_retry
To write out the contents of master.info file to disk ( needed every
time we read and queue data from the master ), a call to
flush_master_info() is required.
To clean up, call end_master_info()
*****************************************************************************/
typedef struct st_master_info
{
/* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN];
char host[HOSTNAME_LENGTH+1];
char user[USERNAME_LENGTH+1];
char password[MAX_PASSWORD_LENGTH+1];
my_bool ssl; // enables use of SSL connection if true
char ssl_ca[FN_REFLEN], ssl_capath[FN_REFLEN], ssl_cert[FN_REFLEN];
char ssl_cipher[FN_REFLEN], ssl_key[FN_REFLEN];
my_off_t master_log_pos;
File fd; // we keep the file open, so we need to remember the file pointer
IO_CACHE file;
pthread_mutex_t data_lock,run_lock;
pthread_cond_t data_cond,start_cond,stop_cond;
THD *io_thd;
MYSQL* mysql;
uint32 file_id; /* for 3.23 load data infile */
RELAY_LOG_INFO rli;
uint port;
uint connect_retry;
#ifndef DBUG_OFF
int events_till_disconnect;
#endif
bool inited;
volatile bool abort_slave;
volatile uint slave_running;
volatile ulong slave_run_id;
/*
The difference in seconds between the clock of the master and the clock of
the slave (second - first). It must be signed as it may be <0 or >0.
clock_diff_with_master is computed when the I/O thread starts; for this the
I/O thread does a SELECT UNIX_TIMESTAMP() on the master.
"how late the slave is compared to the master" is computed like this:
clock_of_slave - last_timestamp_executed_by_SQL_thread - clock_diff_with_master
*/
long clock_diff_with_master;
st_master_info()
:ssl(0), fd(-1), io_thd(0), inited(0),
abort_slave(0),slave_running(0), slave_run_id(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
ssl_cipher[0]= 0; ssl_key[0]= 0;
bzero((char*) &file, sizeof(file));
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
pthread_cond_init(&data_cond, NULL);
pthread_cond_init(&start_cond, NULL);
pthread_cond_init(&stop_cond, NULL);
}
~st_master_info()
{
pthread_mutex_destroy(&run_lock);
pthread_mutex_destroy(&data_lock);
pthread_cond_destroy(&data_cond);
pthread_cond_destroy(&start_cond);
pthread_cond_destroy(&stop_cond);
}
} MASTER_INFO;
int queue_event(MASTER_INFO* mi,const char* buf,ulong event_len);
#define RPL_LOG_NAME (rli->group_master_log_name[0] ? rli->group_master_log_name :\ #define RPL_LOG_NAME (rli->group_master_log_name[0] ? rli->group_master_log_name :\
"FIRST") "FIRST")
#define IO_RPL_LOG_NAME (mi->master_log_name[0] ? mi->master_log_name :\ #define IO_RPL_LOG_NAME (mi->master_log_name[0] ? mi->master_log_name :\
...@@ -231,7 +127,6 @@ int queue_event(MASTER_INFO* mi,const char* buf,ulong event_len); ...@@ -231,7 +127,6 @@ int queue_event(MASTER_INFO* mi,const char* buf,ulong event_len);
int init_slave(); int init_slave();
void init_slave_skip_errors(const char* arg); void init_slave_skip_errors(const char* arg);
int flush_master_info(MASTER_INFO* mi, bool flush_relay_log_cache);
bool flush_relay_log_info(RELAY_LOG_INFO* rli); bool flush_relay_log_info(RELAY_LOG_INFO* rli);
int register_slave_on_master(MYSQL* mysql); int register_slave_on_master(MYSQL* mysql);
int terminate_slave_threads(MASTER_INFO* mi, int thread_mask, int terminate_slave_threads(MASTER_INFO* mi, int thread_mask,
...@@ -276,14 +171,8 @@ void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli, ...@@ -276,14 +171,8 @@ void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli,
ATTRIBUTE_FORMAT(printf, 4, 5); ATTRIBUTE_FORMAT(printf, 4, 5);
void end_slave(); /* clean up */ void end_slave(); /* clean up */
void init_master_info_with_options(MASTER_INFO* mi);
void clear_until_condition(RELAY_LOG_INFO* rli); void clear_until_condition(RELAY_LOG_INFO* rli);
void clear_slave_error(RELAY_LOG_INFO* rli); void clear_slave_error(RELAY_LOG_INFO* rli);
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
const char* slave_info_fname,
bool abort_if_no_master_info_file,
int thread_mask);
void end_master_info(MASTER_INFO* mi);
void end_relay_log_info(RELAY_LOG_INFO* rli); void end_relay_log_info(RELAY_LOG_INFO* rli);
void lock_slave_threads(MASTER_INFO* mi); void lock_slave_threads(MASTER_INFO* mi);
void unlock_slave_threads(MASTER_INFO* mi); void unlock_slave_threads(MASTER_INFO* mi);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment