rpl000016.test sync

rpl000001.result	BitKeeper file /home/sasha/src/bk/mysql/mysql-test/r/rpl000001.result
ignore  	Added BitKeeper/tmp/bkr3sAHD to the ignore list
slave.h 	MASTER_POS_WAIT
lex.h   	MASTER_POS_WAIT
slave.cc	MASTER_POS_WAIT, do automagic restart on debugging abort, skip rotate events in 
slave.cc	debug abort count
sql_repl.cc	announce the log name at the start of the log with a fake rotate event
item_create.h	MASTER_POS_WAIT
item_func.cc	MASTER_POS_WAIT
item_func.h	MASTER_POS_WAIT
sql_class.h	enter_cond(), exit_cond() helper inliners
item_create.cc	added MASTER_POS_WAIT
mysql-test-run.sh	speed improvement fixes
rpl000007.test	sync
rpl000003.test	sleep -> sync
rpl000004.test	sleep -> sync, fixed clean up bug
rpl000014.test	sync
rpl000009.test	sync
rpl000013.test	sync
rpl000001.test	sleep -> sync
rpl000008.test	sync
rpl000006.test	sync on cleanup
rpl000011.test	sync
rpl000012.test	sync
rpl000005.test	sleep -> sync
rpl000010.test	sync
rpl000015.test	sync
rpl000002.test	sleep -> sync
rpl000014.result	we now know the master log name as soon as we connect
mysql.cc	added optional agrument to --wait
mysqltest.c	added save_master_pos and sync_with_master commands
parent 1d25808f
...@@ -188,3 +188,4 @@ Docs/my_sys.doc ...@@ -188,3 +188,4 @@ Docs/my_sys.doc
tmp/* tmp/*
extra/resolve_stack_dump extra/resolve_stack_dump
sql/share/*.sys sql/share/*.sys
BitKeeper/tmp/bkr3sAHD
...@@ -125,6 +125,7 @@ static char *current_host,*current_db,*current_user=0,*opt_password=0, ...@@ -125,6 +125,7 @@ static char *current_host,*current_db,*current_user=0,*opt_password=0,
*default_charset; *default_charset;
static char *histfile; static char *histfile;
static String glob_buffer,old_buffer; static String glob_buffer,old_buffer;
static int wait_time = 5;
static STATUS status; static STATUS status;
static ulong select_limit,max_join_size,opt_connect_timeout=0; static ulong select_limit,max_join_size,opt_connect_timeout=0;
static char default_pager[FN_REFLEN]; static char default_pager[FN_REFLEN];
...@@ -427,7 +428,7 @@ static struct option long_options[] = ...@@ -427,7 +428,7 @@ static struct option long_options[] =
{"verbose", no_argument, 0, 'v'}, {"verbose", no_argument, 0, 'v'},
{"version", no_argument, 0, 'V'}, {"version", no_argument, 0, 'V'},
{"vertical", no_argument, 0, 'E'}, {"vertical", no_argument, 0, 'E'},
{"wait", no_argument, 0, 'w'}, {"wait", optional_argument, 0, 'w'},
{0, 0, 0, 0} {0, 0, 0, 0}
}; };
...@@ -560,7 +561,7 @@ static int get_options(int argc, char **argv) ...@@ -560,7 +561,7 @@ static int get_options(int argc, char **argv)
set_all_changeable_vars(changeable_vars); set_all_changeable_vars(changeable_vars);
while ((c=getopt_long(argc,argv, while ((c=getopt_long(argc,argv,
"?ABCD:LfgGHinNoqrstTU::vVwWEe:h:O:P:S:u:#::p::", "?ABCD:LfgGHinNoqrstTU::vVw::WEe:h:O:P:S:u:#::p::",
long_options, &option_index)) != EOF) long_options, &option_index)) != EOF)
{ {
switch(c) { switch(c) {
...@@ -664,7 +665,10 @@ static int get_options(int argc, char **argv) ...@@ -664,7 +665,10 @@ static int get_options(int argc, char **argv)
case 'n': unbuffered=1; break; case 'n': unbuffered=1; break;
case 'v': verbose++; break; case 'v': verbose++; break;
case 'E': vertical=1; break; case 'E': vertical=1; break;
case 'w': wait_flag=1; break; case 'w':
wait_flag=1;
if(optarg) wait_time = atoi(optarg) ;
break;
case 'A': no_rehash=1; break; case 'A': no_rehash=1; break;
case 'G': no_named_cmds=0; break; case 'G': no_named_cmds=0; break;
case 'g': no_named_cmds=1; break; case 'g': no_named_cmds=1; break;
...@@ -2114,7 +2118,7 @@ sql_connect(char *host,char *database,char *user,char *password,uint silent) ...@@ -2114,7 +2118,7 @@ sql_connect(char *host,char *database,char *user,char *password,uint silent)
message=1; message=1;
tee_fputs("Waiting",stderr); (void) fflush(stderr); tee_fputs("Waiting",stderr); (void) fflush(stderr);
} }
(void) sleep(5); (void) sleep(wait_time);
if (!silent) if (!silent)
{ {
putc('.',stderr); (void) fflush(stderr); putc('.',stderr); (void) fflush(stderr);
......
...@@ -91,6 +91,12 @@ int *cur_block, *block_stack_end; ...@@ -91,6 +91,12 @@ int *cur_block, *block_stack_end;
DYNAMIC_ARRAY q_lines; DYNAMIC_ARRAY q_lines;
typedef struct
{
char file[FN_REFLEN];
ulong pos;
} MASTER_POS ;
struct connection struct connection
{ {
MYSQL mysql; MYSQL mysql;
...@@ -104,6 +110,7 @@ typedef ...@@ -104,6 +110,7 @@ typedef
} PARSER; } PARSER;
PARSER parser; PARSER parser;
MASTER_POS master_pos;
int block_ok = 1; /* set to 0 if the current block should not be executed */ int block_ok = 1; /* set to 0 if the current block should not be executed */
int false_block_depth = 0; int false_block_depth = 0;
const char* result_file = 0; /* if set, all results are concated and const char* result_file = 0; /* if set, all results are concated and
...@@ -137,13 +144,15 @@ struct st_query ...@@ -137,13 +144,15 @@ struct st_query
enum { Q_CONNECTION=1, Q_QUERY, Q_CONNECT, enum { Q_CONNECTION=1, Q_QUERY, Q_CONNECT,
Q_SLEEP, Q_INC, Q_DEC,Q_SOURCE, Q_SLEEP, Q_INC, Q_DEC,Q_SOURCE,
Q_DISCONNECT,Q_LET, Q_ECHO, Q_WHILE, Q_END_BLOCK, Q_DISCONNECT,Q_LET, Q_ECHO, Q_WHILE, Q_END_BLOCK,
Q_SYSTEM, Q_RESULT, Q_REQUIRE, Q_SYSTEM, Q_RESULT, Q_REQUIRE, Q_SAVE_MASTER_POS,
Q_SYNC_WITH_MASTER,
Q_UNKNOWN, Q_COMMENT, Q_COMMENT_WITH_COMMAND} type; Q_UNKNOWN, Q_COMMENT, Q_COMMENT_WITH_COMMAND} type;
}; };
const char *command_names[] = { const char *command_names[] = {
"connection", "query","connect","sleep","inc","dec","source","disconnect", "connection", "query","connect","sleep","inc","dec","source","disconnect",
"let","echo","while","end","system","result", "require",0 "let","echo","while","end","system","result", "require", "save_master_pos",
"sync_with_master", 0
}; };
TYPELIB command_typelib= {array_elements(command_names),"", TYPELIB command_typelib= {array_elements(command_names),"",
...@@ -471,6 +480,50 @@ int do_echo(struct st_query* q) ...@@ -471,6 +480,50 @@ int do_echo(struct st_query* q)
return 0; return 0;
} }
int do_sync_with_master()
{
MYSQL_RES* res;
MYSQL_ROW row;
MYSQL* mysql = &cur_con->mysql;
char query_buf[FN_REFLEN+128];
sprintf(query_buf, "select master_pos_wait('%s', %ld)", master_pos.file,
master_pos.pos);
if(mysql_query(mysql, query_buf))
die("At line %u: failed in %s: %d: %s", start_lineno, query_buf,
mysql_errno(mysql), mysql_error(mysql));
if(!(res = mysql_store_result(mysql)))
die("line %u: mysql_store_result() retuned NULL", start_lineno);
if(!(row = mysql_fetch_row(res)))
die("line %u: empty result in %s", start_lineno, query_buf);
if(!row[0])
die("Error on slave while syncing with master");
mysql_free_result(res);
return 0;
}
int do_save_master_pos()
{
MYSQL_RES* res;
MYSQL_ROW row;
MYSQL* mysql = &cur_con->mysql;
if(mysql_query(mysql, "show master status"))
die("At line %u: failed in show master status: %d: %s", start_lineno,
mysql_errno(mysql), mysql_error(mysql));
if(!(res = mysql_store_result(mysql)))
die("line %u: mysql_store_result() retuned NULL", start_lineno);
if(!(row = mysql_fetch_row(res)))
die("line %u: empty result in show master status", start_lineno);
strncpy(master_pos.file, row[0], sizeof(master_pos.file));
master_pos.pos = strtoul(row[1], (char**) 0, 10);
mysql_free_result(res);
return 0;
}
int do_let(struct st_query* q) int do_let(struct st_query* q)
{ {
char* p=q->first_argument; char* p=q->first_argument;
...@@ -1299,6 +1352,7 @@ int main(int argc, char** argv) ...@@ -1299,6 +1352,7 @@ int main(int argc, char** argv)
cur_con = cons; cur_con = cons;
memset(file_stack, 0, sizeof(file_stack)); memset(file_stack, 0, sizeof(file_stack));
memset(&master_pos, 0, sizeof(master_pos));
file_stack_end = file_stack + MAX_INCLUDE_DEPTH; file_stack_end = file_stack + MAX_INCLUDE_DEPTH;
cur_file = file_stack; cur_file = file_stack;
lineno = lineno_stack; lineno = lineno_stack;
...@@ -1361,6 +1415,8 @@ int main(int argc, char** argv) ...@@ -1361,6 +1415,8 @@ int main(int argc, char** argv)
get_file_name(save_file,q); get_file_name(save_file,q);
require_file=1; require_file=1;
break; break;
case Q_SAVE_MASTER_POS: do_save_master_pos(q); break;
case Q_SYNC_WITH_MASTER: do_sync_with_master(q); break;
case Q_COMMENT: /* Ignore row */ case Q_COMMENT: /* Ignore row */
case Q_COMMENT_WITH_COMMAND: case Q_COMMENT_WITH_COMMAND:
default: processed = 0; break; default: processed = 0; break;
......
...@@ -192,11 +192,13 @@ if [ x$SOURCE_DIST = x1 ] ; then ...@@ -192,11 +192,13 @@ if [ x$SOURCE_DIST = x1 ] ; then
MYSQLD="$BASEDIR/sql/mysqld" MYSQLD="$BASEDIR/sql/mysqld"
MYSQL_TEST="$BASEDIR/client/mysqltest" MYSQL_TEST="$BASEDIR/client/mysqltest"
MYSQLADMIN="$BASEDIR/client/mysqladmin" MYSQLADMIN="$BASEDIR/client/mysqladmin"
MYSQL="$BASEDIR/client/mysql"
INSTALL_DB="./install_test_db" INSTALL_DB="./install_test_db"
else else
MYSQLD="$BASEDIR/bin/mysqld" MYSQLD="$BASEDIR/bin/mysqld"
MYSQL_TEST="$BASEDIR/bin/mysqltest" MYSQL_TEST="$BASEDIR/bin/mysqltest"
MYSQLADMIN="$BASEDIR/bin/mysqladmin" MYSQLADMIN="$BASEDIR/bin/mysqladmin"
MYSQL="$BASEDIR/bin/mysql"
INSTALL_DB="./install_test_db -bin" INSTALL_DB="./install_test_db -bin"
fi fi
...@@ -230,6 +232,11 @@ SLAVE_MYSQLD=$MYSQLD #this can be changed later if we are doing gcov ...@@ -230,6 +232,11 @@ SLAVE_MYSQLD=$MYSQLD #this can be changed later if we are doing gcov
#++ #++
# Function Definitions # Function Definitions
#-- #--
wait_for_server_start ()
{
$MYSQL -e "select 1" --silent -w1 --host=127.0.0.1 --port=$1 \
>/dev/null
}
prompt_user () prompt_user ()
{ {
...@@ -320,6 +327,7 @@ gcov_collect () { ...@@ -320,6 +327,7 @@ gcov_collect () {
$ECHO "gcov info in $GCOV_MSG, errors in $GCOV_ERR" $ECHO "gcov info in $GCOV_MSG, errors in $GCOV_ERR"
} }
start_master() start_master()
{ {
[ x$MASTER_RUNNING = 1 ] && return [ x$MASTER_RUNNING = 1 ] && return
...@@ -354,6 +362,7 @@ start_master() ...@@ -354,6 +362,7 @@ start_master()
else else
$MYSQLD $master_args >> $MASTER_MYERR 2>&1 & $MYSQLD $master_args >> $MASTER_MYERR 2>&1 &
fi fi
wait_for_server_start $MASTER_MYPORT
MASTER_RUNNING=1 MASTER_RUNNING=1
} }
...@@ -399,6 +408,7 @@ start_slave() ...@@ -399,6 +408,7 @@ start_slave()
else else
$SLAVE_MYSQLD $slave_args >> $SLAVE_MYERR 2>&1 & $SLAVE_MYSQLD $slave_args >> $SLAVE_MYERR 2>&1 &
fi fi
wait_for_server_start $SLAVE_MYPORT
SLAVE_RUNNING=1 SLAVE_RUNNING=1
} }
...@@ -407,7 +417,6 @@ mysql_start () { ...@@ -407,7 +417,6 @@ mysql_start () {
start_master start_master
start_slave start_slave
cd $MYSQL_TEST_DIR cd $MYSQL_TEST_DIR
sleep $SLEEP_TIME # Give mysqld time to start properly
return 1 return 1
} }
...@@ -430,7 +439,6 @@ stop_slave () ...@@ -430,7 +439,6 @@ stop_slave ()
fi fi
fi fi
SLAVE_RUNNING=0 SLAVE_RUNNING=0
sleep $SLEEP_TIME # Give mysqld time to go down properly
fi fi
} }
...@@ -453,7 +461,6 @@ stop_master () ...@@ -453,7 +461,6 @@ stop_master ()
fi fi
fi fi
MASTER_RUNNING=0 MASTER_RUNNING=0
sleep $SLEEP_TIME # Give mysqld time to go down properly
fi fi
} }
...@@ -463,7 +470,10 @@ mysql_stop () ...@@ -463,7 +470,10 @@ mysql_stop ()
$ECHO "Shutting-down MySQL daemon" $ECHO "Shutting-down MySQL daemon"
$ECHO "" $ECHO ""
stop_master stop_master
$ECHO "Master shutdown finished"
stop_slave stop_slave
$ECHO "Slave shutdown finished"
return 1 return 1
} }
......
n
1
2
sum(length(word))
71
File Position Binlog_do_db Binlog_ignore_db File Position Binlog_do_db Binlog_ignore_db
master-bin.001 73 master-bin.001 73
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db
127.0.0.1 root 9306 1 73 Yes 127.0.0.1 root 9306 1 master-bin.001 73 Yes
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db
127.0.0.1 root 9306 1 73 No 127.0.0.1 root 9306 1 master-bin.001 73 No
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db
127.0.0.1 root 9306 1 73 Yes 127.0.0.1 root 9306 1 master-bin.001 73 Yes
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db
127.0.0.1 root 9306 1 173 Yes 127.0.0.1 root 9306 1 master-bin.001 173 Yes
File Position Binlog_do_db Binlog_ignore_db File Position Binlog_do_db Binlog_ignore_db
master-bin.001 73 master-bin.001 73
n n
......
...@@ -7,12 +7,15 @@ load data infile '../../std_data/words.dat' into table t1; ...@@ -7,12 +7,15 @@ load data infile '../../std_data/words.dat' into table t1;
drop table if exists foo; drop table if exists foo;
create table foo(n int); create table foo(n int);
insert into foo values(1),(2); insert into foo values(1),(2);
save_master_pos;
connection slave; connection slave;
sleep 2; sync_with_master;
use test; use test;
@r/rpl000001.a.result select * from foo; select * from foo;
@r/rpl000001.b.result select sum(length(word)) from t1; select sum(length(word)) from t1;
connection master; connection master;
drop table t1; drop table t1;
save_master_pos;
connection slave;
sync_with_master;
...@@ -5,8 +5,13 @@ drop table if exists t1; ...@@ -5,8 +5,13 @@ drop table if exists t1;
create table t1 (n int auto_increment primary key); create table t1 (n int auto_increment primary key);
set insert_id = 2000; set insert_id = 2000;
insert into t1 values (NULL),(NULL),(NULL); insert into t1 values (NULL),(NULL),(NULL);
save_master_pos;
connection slave; connection slave;
use test; use test;
sleep 2; sync_with_master;
@r/rpl000002.result select * from t1; @r/rpl000002.result select * from t1;
connection master;
drop table t1; drop table t1;
save_master_pos;
connection slave;
sync_with_master;
...@@ -4,7 +4,12 @@ drop table if exists t1; ...@@ -4,7 +4,12 @@ drop table if exists t1;
create table t1(n int primary key); create table t1(n int primary key);
!insert into t1 values (1),(2),(2); !insert into t1 values (1),(2),(2);
insert into t1 values (3); insert into t1 values (3);
save_master_pos;
connection slave; connection slave;
sleep 2; sync_with_master;
@r/rpl000003.result select * from t1; @r/rpl000003.result select * from t1;
connection master;
drop table t1; drop table t1;
save_master_pos;
connection slave;
sync_with_master;
...@@ -16,4 +16,11 @@ drop table if exists t2; ...@@ -16,4 +16,11 @@ drop table if exists t2;
load table t2 from master; load table t2 from master;
@r/rpl000004.a.result check table t1; @r/rpl000004.a.result check table t1;
@r/rpl000004.b.result select count(*) from t2; @r/rpl000004.b.result select count(*) from t2;
connection master;
set SQL_LOG_BIN=1;
drop table if exists t1,t2; drop table if exists t1,t2;
save_master_pos;
connection slave;
sync_with_master;
create table t1(n int);
drop table t1;
...@@ -7,8 +7,12 @@ INSERT t1 SET name='Jacob', age=2; ...@@ -7,8 +7,12 @@ INSERT t1 SET name='Jacob', age=2;
INSERT into t1 SET name='Caleb', age=1; INSERT into t1 SET name='Caleb', age=1;
ALTER TABLE t1 ADD id int(8) ZEROFILL AUTO_INCREMENT PRIMARY KEY; ALTER TABLE t1 ADD id int(8) ZEROFILL AUTO_INCREMENT PRIMARY KEY;
@r/rpl000005.result select * from t1; @r/rpl000005.result select * from t1;
save_master_pos;
connection slave; connection slave;
sleep 2; sync_with_master;
@r/rpl000005.result select * from t1; @r/rpl000005.result select * from t1;
connection master; connection master;
drop table t1; drop table t1;
save_master_pos;
connection slave;
sync_with_master;
...@@ -12,3 +12,6 @@ load table foo from master; ...@@ -12,3 +12,6 @@ load table foo from master;
@r/rpl000006.result select unix_timestamp(t) from foo; @r/rpl000006.result select unix_timestamp(t) from foo;
connection master; connection master;
drop table foo; drop table foo;
save_master_pos;
connection slave;
sync_with_master;
...@@ -15,8 +15,12 @@ insert into foo values('five'); ...@@ -15,8 +15,12 @@ insert into foo values('five');
drop table if exists bar; drop table if exists bar;
create table bar (m int); create table bar (m int);
insert into bar values(15); insert into bar values(15);
save_master_pos;
connection slave; connection slave;
sleep 2; sync_with_master;
@r/rpl000007.result select foo.n,bar.m from foo,bar; @r/rpl000007.result select foo.n,bar.m from foo,bar;
connection master; connection master;
drop table if exists bar,foo; drop table if exists bar,foo;
save_master_pos;
connection slave;
sync_with_master;
...@@ -17,8 +17,13 @@ insert into bar values(15); ...@@ -17,8 +17,13 @@ insert into bar values(15);
drop table if exists choo; drop table if exists choo;
create table choo (k int); create table choo (k int);
insert into choo values(55); insert into choo values(55);
save_master_pos;
connection slave; connection slave;
sleep 3; sync_with_master;
@r/rpl000008.result select foo.n,bar.m,choo.k from foo,bar,choo; @r/rpl000008.result select foo.n,bar.m,choo.k from foo,bar,choo;
connection master; connection master;
drop table if exists foo,bar,choo; drop table if exists foo,bar,choo;
save_master_pos;
connection slave;
sync_with_master;
drop table if exists foo,bar,choo;
...@@ -6,8 +6,9 @@ drop database if exists foo; ...@@ -6,8 +6,9 @@ drop database if exists foo;
create database foo; create database foo;
drop database if exists bar; drop database if exists bar;
create database bar; create database bar;
save_master_pos;
connection slave; connection slave;
sleep 2; sync_with_master;
drop table if exists foo.foo; drop table if exists foo.foo;
create table foo.foo (n int); create table foo.foo (n int);
insert into foo.foo values(4); insert into foo.foo values(4);
...@@ -18,9 +19,15 @@ insert into foo.foo values(5); ...@@ -18,9 +19,15 @@ insert into foo.foo values(5);
drop table if exists bar.bar; drop table if exists bar.bar;
create table bar.bar (m int); create table bar.bar (m int);
insert into bar.bar values(15); insert into bar.bar values(15);
save_master_pos;
connection slave; connection slave;
sleep 2; sync_with_master;
@r/rpl000009.result select foo.foo.n,bar.bar.m from foo.foo,bar.bar; @r/rpl000009.result select foo.foo.n,bar.bar.m from foo.foo,bar.bar;
connection master; connection master;
drop database if exists bar; drop database if exists bar;
drop database if exists foo; drop database if exists foo;
save_master_pos;
connection slave;
sync_with_master;
drop database if exists bar;
drop database if exists foo;
...@@ -8,8 +8,12 @@ drop table if exists t1; ...@@ -8,8 +8,12 @@ drop table if exists t1;
create table t1 (n int not null auto_increment primary key); create table t1 (n int not null auto_increment primary key);
insert into t1 values(NULL); insert into t1 values(NULL);
insert into t1 values(2); insert into t1 values(2);
save_master_pos;
connection slave; connection slave;
sleep 5; sync_with_master;
@r/rpl000010.result select n from t1; @r/rpl000010.result select n from t1;
connection master; connection master;
drop table t1; drop table t1;
save_master_pos;
connection slave;
sync_with_master;
...@@ -4,16 +4,21 @@ use test; ...@@ -4,16 +4,21 @@ use test;
drop table if exists t1; drop table if exists t1;
create table t1 (n int); create table t1 (n int);
insert into t1 values(1); insert into t1 values(1);
save_master_pos;
connection slave; connection slave;
#give slave some breathing room to get started #give slave some breathing room to get started
sleep 2; sync_with_master;
slave stop; slave stop;
slave start; slave start;
connection master; connection master;
insert into t1 values(2); insert into t1 values(2);
save_master_pos;
connection slave; connection slave;
#let slave catch up #let slave catch up
sleep 2; sync_with_master;
@r/rpl000011.result select * from t1; @r/rpl000011.result select * from t1;
connection master; connection master;
drop table t1; drop table t1;
save_master_pos;
connection slave;
sync_with_master;
...@@ -14,13 +14,18 @@ disconnect master; ...@@ -14,13 +14,18 @@ disconnect master;
connection master1; connection master1;
insert into t2 values(6); insert into t2 values(6);
disconnect master1; disconnect master1;
connect (master2,localhost,root,,test,0,mysql-master.sock);
connection master2;
save_master_pos;
connection slave; connection slave;
sleep 1; sync_with_master;
@r/rpl000012.result select * from t2; @r/rpl000012.result select * from t2;
@r/rpl000012.status.result show status like 'Slave_open_temp_tables'; @r/rpl000012.status.result show status like 'Slave_open_temp_tables';
# #
# Clean up # Clean up
# #
connect (master2,localhost,root,,test,0,mysql-master.sock);
connection master2; connection master2;
drop table if exists t1,t2; drop table if exists t1,t2;
save_master_pos;
connection slave;
sync_with_master;
source include/master-slave.inc; source include/master-slave.inc;
connection master; connection master;
save_master_pos;
connection slave;
sync_with_master;
connection master;
drop table if exists t2; drop table if exists t2;
create table t2(n int); create table t2(n int);
create temporary table t1 (n int); create temporary table t1 (n int);
...@@ -12,21 +16,19 @@ insert into t2 select * from t1; ...@@ -12,21 +16,19 @@ insert into t2 select * from t1;
disconnect master; disconnect master;
connection master1; connection master1;
insert into t2 values(6); insert into t2 values(6);
sleep 2;
disconnect master1; disconnect master1;
connect (master2,localhost,root,,test,0,mysql-master.sock);
connection master2;
save_master_pos;
connection slave; connection slave;
let $1=12; sync_with_master;
while ($1)
{
!slave start;
sleep 0.2;
dec $1;
}
@r/rpl000013.result select * from t2; @r/rpl000013.result select * from t2;
@r/rpl000013.status.result show status like 'Slave_open_temp_tables'; @r/rpl000013.status.result show status like 'Slave_open_temp_tables';
# #
# Clean up # Clean up
# #
connect (master2,localhost,root,,test,0,mysql-master.sock);
connection master2; connection master2;
drop table if exists t1,t2; drop table if exists t1,t2;
save_master_pos;
connection slave;
sync_with_master;
...@@ -2,11 +2,11 @@ source include/master-slave.inc; ...@@ -2,11 +2,11 @@ source include/master-slave.inc;
source include/have_default_master.inc; source include/have_default_master.inc;
connection master; connection master;
show master status; show master status;
save_master_pos;
connection slave; connection slave;
sleep 0.2; sync_with_master;
show slave status; show slave status;
change master to master_log_pos=73; change master to master_log_pos=73;
sleep 0.2;
slave stop; slave stop;
change master to master_log_pos=73; change master to master_log_pos=73;
show slave status; show slave status;
...@@ -20,9 +20,13 @@ create table if not exists foo(n int); ...@@ -20,9 +20,13 @@ create table if not exists foo(n int);
drop table if exists foo; drop table if exists foo;
create table foo (n int); create table foo (n int);
insert into foo values (1),(2),(3); insert into foo values (1),(2),(3);
save_master_pos;
connection slave; connection slave;
change master to master_log_pos=73; change master to master_log_pos=73;
sleep 2; sync_with_master;
select * from foo; select * from foo;
connection master; connection master;
drop table foo; drop table foo;
save_master_pos;
connection slave;
sync_with_master;
...@@ -18,8 +18,13 @@ connection master; ...@@ -18,8 +18,13 @@ connection master;
drop table if exists foo; drop table if exists foo;
create table foo (n int); create table foo (n int);
insert into foo values (10),(45),(90); insert into foo values (10),(45),(90);
save_master_pos;
connection slave; connection slave;
sleep 2; sync_with_master;
select * from foo; select * from foo;
connection master; connection master;
drop table foo; drop table foo;
save_master_pos;
connection slave;
sync_with_master;
...@@ -15,8 +15,9 @@ connection master; ...@@ -15,8 +15,9 @@ connection master;
drop table if exists t1; drop table if exists t1;
create table t1 (s text); create table t1 (s text);
insert into t1 values('Could not break slave'),('Tried hard'); insert into t1 values('Could not break slave'),('Tried hard');
save_master_pos;
connection slave; connection slave;
sleep 2; sync_with_master;
select * from t1; select * from t1;
connection master; connection master;
flush logs; flush logs;
...@@ -24,12 +25,14 @@ drop table if exists t2; ...@@ -24,12 +25,14 @@ drop table if exists t2;
create table t2(m int); create table t2(m int);
insert into t2 values (34),(67),(123); insert into t2 values (34),(67),(123);
flush logs; flush logs;
sleep 0.3;
show master logs; show master logs;
purge master logs to 'master-bin.003'; purge master logs to 'master-bin.003';
show master logs; show master logs;
insert into t2 values (65); insert into t2 values (65);
save_master_pos;
connection slave; connection slave;
sleep 2; sync_with_master;
select * from t2; select * from t2;
drop table if exists t1,t2; drop table if exists t1,t2;
connection master;
drop table if exists t1,t2;
...@@ -376,3 +376,8 @@ Item *create_load_file(Item* a) ...@@ -376,3 +376,8 @@ Item *create_load_file(Item* a)
{ {
return new Item_load_file(a); return new Item_load_file(a);
} }
Item *create_wait_for_master_pos(Item* a, Item* b)
{
return new Item_master_pos_wait(a, b);
}
...@@ -85,3 +85,4 @@ Item *create_func_ucase(Item* a); ...@@ -85,3 +85,4 @@ Item *create_func_ucase(Item* a);
Item *create_func_version(void); Item *create_func_version(void);
Item *create_func_weekday(Item* a); Item *create_func_weekday(Item* a);
Item *create_load_file(Item* a); Item *create_load_file(Item* a);
Item *create_wait_for_master_pos(Item* a, Item* b);
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include <hash.h> #include <hash.h>
#include <time.h> #include <time.h>
#include <ft_global.h> #include <ft_global.h>
#include "slave.h" // for wait_for_master_pos
/* return TRUE if item is a constant */ /* return TRUE if item is a constant */
...@@ -1387,6 +1388,28 @@ void item_user_lock_release(ULL *ull) ...@@ -1387,6 +1388,28 @@ void item_user_lock_release(ULL *ull)
delete ull; delete ull;
} }
/*
Wait until we are at or past the given position in the master binlog
on the slave
*/
longlong Item_master_pos_wait::val_int()
{
THD* thd = current_thd;
String *log_name = args[0]->val_str(&value);
int event_count;
if(thd->slave_thread || !log_name || !log_name->length())
{
null_value = 1;
return 0;
}
ulong pos = (ulong)args[1]->val_int();
if((event_count = glob_mi.wait_for_pos(thd, log_name, pos)) == -1)
null_value = 1;;
return event_count;
}
/* /*
Get a user level lock. If the thread has an old lock this is first released. Get a user level lock. If the thread has an old lock this is first released.
Returns 1: Got lock Returns 1: Got lock
......
...@@ -778,6 +778,18 @@ class Item_func_release_lock :public Item_int_func ...@@ -778,6 +778,18 @@ class Item_func_release_lock :public Item_int_func
void fix_length_and_dec() { decimals=0; max_length=1; maybe_null=1;} void fix_length_and_dec() { decimals=0; max_length=1; maybe_null=1;}
}; };
/* replication functions */
class Item_master_pos_wait :public Item_int_func
{
String value;
public:
Item_master_pos_wait(Item *a,Item *b) :Item_int_func(a,b) {}
longlong val_int();
const char *func_name() const { return "master_pos_wait"; }
void fix_length_and_dec() { decimals=0; max_length=1; maybe_null=1;}
};
/* Handling of user definiable variables */ /* Handling of user definiable variables */
......
...@@ -397,6 +397,8 @@ static SYMBOL sql_functions[] = { ...@@ -397,6 +397,8 @@ static SYMBOL sql_functions[] = {
{ "LOWER", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_lcase)}, { "LOWER", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_lcase)},
{ "LPAD", SYM(FUNC_ARG3),0,CREATE_FUNC(create_func_lpad)}, { "LPAD", SYM(FUNC_ARG3),0,CREATE_FUNC(create_func_lpad)},
{ "LTRIM", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_ltrim)}, { "LTRIM", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_ltrim)},
{ "MASTER_POS_WAIT", SYM(FUNC_ARG2),0,
CREATE_FUNC(create_wait_for_master_pos)},
{ "MAKE_SET", SYM(MAKE_SET_SYM),0,0}, { "MAKE_SET", SYM(MAKE_SET_SYM),0,0},
{ "MAX", SYM(MAX_SYM),0,0}, { "MAX", SYM(MAX_SYM),0,0},
{ "MD5", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_md5)}, { "MD5", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_md5)},
......
...@@ -616,6 +616,46 @@ int flush_master_info(MASTER_INFO* mi) ...@@ -616,6 +616,46 @@ int flush_master_info(MASTER_INFO* mi)
return 0; return 0;
} }
int st_master_info::wait_for_pos(THD* thd, String* log_name, ulong log_pos)
{
if(!inited) return -1;
bool pos_reached = 0;
int event_count = 0;
for(;!pos_reached && !thd->killed;)
{
int cmp_result;
char* basename;
pthread_mutex_lock(&lock);
if(*log_file_name)
{
basename = strrchr(log_file_name, FN_LIBCHAR);
if(basename)
++basename;
else
basename = log_file_name;
cmp_result = strncmp(basename, log_name->ptr(),
log_name->length());
}
else
cmp_result = 0;
pos_reached = ((!cmp_result && pos >= log_pos) || cmp_result > 0);
if(!pos_reached && !thd->killed)
{
const char* msg = thd->enter_cond(&cond, &lock,
"Waiting for master update");
pthread_cond_wait(&cond, &lock);
thd->exit_cond(msg);
event_count++;
}
pthread_mutex_unlock(&lock);
if(thd->killed)
return -1;
}
return event_count;
}
static int init_slave_thread(THD* thd) static int init_slave_thread(THD* thd)
{ {
...@@ -1003,10 +1043,17 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -1003,10 +1043,17 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{ {
Rotate_log_event* rev = (Rotate_log_event*)ev; Rotate_log_event* rev = (Rotate_log_event*)ev;
int ident_len = rev->ident_len; int ident_len = rev->ident_len;
pthread_mutex_lock(&mi->lock);
memcpy(mi->log_file_name, rev->new_log_ident,ident_len ); memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
mi->log_file_name[ident_len] = 0; mi->log_file_name[ident_len] = 0;
mi->pos = 4; // skip magic number mi->pos = 4; // skip magic number
pthread_cond_broadcast(&mi->cond);
pthread_mutex_unlock(&mi->lock);
flush_master_info(mi); flush_master_info(mi);
#ifndef DBUG_OFF
if(abort_slave_event_count)
++events_till_abort;
#endif
delete ev; delete ev;
break; break;
} }
...@@ -1045,6 +1092,9 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -1045,6 +1092,9 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
pthread_handler_decl(handle_slave,arg __attribute__((unused))) pthread_handler_decl(handle_slave,arg __attribute__((unused)))
{ {
#ifndef DBUG_OFF
slave_begin:
#endif
THD *thd; // needs to be first for thread_stack THD *thd; // needs to be first for thread_stack
MYSQL *mysql = NULL ; MYSQL *mysql = NULL ;
...@@ -1241,6 +1291,10 @@ position %ld", ...@@ -1241,6 +1291,10 @@ position %ld",
net_end(&thd->net); // destructor will not free it, because we are weird net_end(&thd->net); // destructor will not free it, because we are weird
delete thd; delete thd;
my_thread_end(); my_thread_end();
#ifndef DBUG_OFF
if(abort_slave_event_count && !events_till_abort)
goto slave_begin;
#endif
pthread_exit(0); pthread_exit(0);
DBUG_RETURN(0); // Can't return anything here DBUG_RETURN(0); // Can't return anything here
} }
......
...@@ -14,17 +14,20 @@ typedef struct st_master_info ...@@ -14,17 +14,20 @@ typedef struct st_master_info
uint port; uint port;
uint connect_retry; uint connect_retry;
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t cond;
bool inited; bool inited;
st_master_info():pending(0),fd(-1),inited(0) st_master_info():pending(0),fd(-1),inited(0)
{ {
host[0] = 0; user[0] = 0; password[0] = 0; host[0] = 0; user[0] = 0; password[0] = 0;
pthread_mutex_init(&lock, NULL); pthread_mutex_init(&lock, NULL);
pthread_cond_init(&cond, NULL);
} }
~st_master_info() ~st_master_info()
{ {
pthread_mutex_destroy(&lock); pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
} }
inline void inc_pending(ulonglong val) inline void inc_pending(ulonglong val)
{ {
...@@ -35,6 +38,7 @@ typedef struct st_master_info ...@@ -35,6 +38,7 @@ typedef struct st_master_info
pthread_mutex_lock(&lock); pthread_mutex_lock(&lock);
pos += val + pending; pos += val + pending;
pending = 0; pending = 0;
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lock); pthread_mutex_unlock(&lock);
} }
// thread safe read of position - not needed if we are in the slave thread, // thread safe read of position - not needed if we are in the slave thread,
...@@ -45,6 +49,8 @@ typedef struct st_master_info ...@@ -45,6 +49,8 @@ typedef struct st_master_info
var = pos; var = pos;
pthread_mutex_unlock(&lock); pthread_mutex_unlock(&lock);
} }
int wait_for_pos(THD* thd, String* log_name, ulong log_pos);
} MASTER_INFO; } MASTER_INFO;
typedef struct st_table_rule_ent typedef struct st_table_rule_ent
......
...@@ -281,6 +281,25 @@ class THD :public ilink { ...@@ -281,6 +281,25 @@ class THD :public ilink {
THD(); THD();
~THD(); ~THD();
bool store_globals(); bool store_globals();
inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex,
const char* msg)
{
const char* old_msg = proc_info;
pthread_mutex_lock(&mysys_var->mutex);
mysys_var->current_mutex = mutex;
mysys_var->current_cond = cond;
proc_info = msg;
pthread_mutex_unlock(&mysys_var->mutex);
return old_msg;
}
inline void exit_cond(const char* old_msg)
{
pthread_mutex_lock(&mysys_var->mutex);
mysys_var->current_mutex = 0;
mysys_var->current_cond = 0;
proc_info = old_msg;
pthread_mutex_unlock(&mysys_var->mutex);
}
inline time_t query_start() { query_start_used=1; return start_time; } inline time_t query_start() { query_start_used=1; return start_time; }
inline void set_time() { if (user_time) start_time=time_after_lock=user_time; else time_after_lock=time(&start_time); } inline void set_time() { if (user_time) start_time=time_after_lock=user_time; else time_after_lock=time(&start_time); }
inline void end_time() { time(&start_time); } inline void end_time() { time(&start_time); }
......
...@@ -26,6 +26,36 @@ ...@@ -26,6 +26,36 @@
extern const char* any_db; extern const char* any_db;
extern pthread_handler_decl(handle_slave,arg); extern pthread_handler_decl(handle_slave,arg);
static int fake_rotate_event(NET* net, String* packet,
const char* log_file_name);
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
const char**errmsg)
{
char header[LOG_EVENT_HEADER_LEN];
memset(header, 0, 4); // when does not matter
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
char* p = strrchr(log_file_name, FN_LIBCHAR);
// find the last slash
if(p)
p++;
else
p = log_file_name;
uint ident_len = (uint) strlen(p);
ulong event_len = ident_len + sizeof(header);
int4store(header + EVENT_TYPE_OFFSET + 1, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
packet->append(header, sizeof(header));
packet->append(p,ident_len);
if(my_net_write(net, (char*)packet->ptr(), packet->length()))
{
*errmsg = "failed on my_net_write()";
return -1;
}
return 0;
}
static int send_file(THD *thd) static int send_file(THD *thd)
{ {
...@@ -281,6 +311,15 @@ sweepstakes if you report the bug"; ...@@ -281,6 +311,15 @@ sweepstakes if you report the bug";
// we need to start a packet with something other than 255 // we need to start a packet with something other than 255
// to distiquish it from error // to distiquish it from error
if(pos == 4) // tell the client log name with a fake rotate_event
// if we are at the start of the log
{
if(fake_rotate_event(net, packet, log_file_name, &errmsg))
goto err;
packet->length(0);
packet->append("\0", 1);
}
while(!net->error && net->vio != 0 && !thd->killed) while(!net->error && net->vio != 0 && !thd->killed)
{ {
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock(); pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
...@@ -437,38 +476,17 @@ sweepstakes if you report the bug"; ...@@ -437,38 +476,17 @@ sweepstakes if you report the bug";
end_io_cache(&log); end_io_cache(&log);
(void) my_close(file, MYF(MY_WME)); (void) my_close(file, MYF(MY_WME));
if ((file=open_log(&log, log_file_name, &errmsg)) < 0)
goto err;
// fake Rotate_log event just in case it did not make it to the log // fake Rotate_log event just in case it did not make it to the log
// otherwise the slave make get confused about the offset // otherwise the slave make get confused about the offset
{ if ((file=open_log(&log, log_file_name, &errmsg)) < 0 ||
char header[LOG_EVENT_HEADER_LEN]; fake_rotate_event(net, packet, log_file_name, &errmsg))
memset(header, 0, 4); // when does not matter
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
char* p = strrchr(log_file_name, FN_LIBCHAR);
// find the last slash
if(p)
p++;
else
p = log_file_name;
uint ident_len = (uint) strlen(p);
ulong event_len = ident_len + sizeof(header);
int4store(header + EVENT_TYPE_OFFSET + 1, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
packet->append(header, sizeof(header));
packet->append(p,ident_len);
if(my_net_write(net, (char*)packet->ptr(), packet->length()))
{
errmsg = "failed on my_net_write()";
goto err; goto err;
}
packet->length(0); packet->length(0);
packet->append("\0",1); packet->append("\0",1);
} }
} }
}
end_io_cache(&log); end_io_cache(&log);
(void)my_close(file, MYF(MY_WME)); (void)my_close(file, MYF(MY_WME));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment