Commit fa7a94ed authored by monty@mashka.mysql.fi's avatar monty@mashka.mysql.fi

Added timeout for wait_for_master_pos

Fixed comparision of log-binary name to handle comparison when file name extension wraps from .999 to .1000
Don't replicate CREATE/DROP DATABASE if wild_xxx_table=database.% is used.
parent 9ec97f2c
......@@ -8,6 +8,7 @@ drop database if exists foo;
create database foo;
drop database if exists bar;
create database bar;
create database foo;
drop table if exists foo.foo;
create table foo.foo (n int);
insert into foo.foo values(4);
......@@ -20,10 +21,11 @@ insert into bar.bar values(15);
select foo.foo.n,bar.bar.m from foo.foo,bar.bar;
n m
4 15
drop database if exists bar;
drop database if exists foo;
drop database if exists bar;
drop database bar;
drop database if exists foo;
drop database bar;
Can't drop database 'bar'. Database doesn't exist
drop database foo;
set sql_log_bin = 0;
create database foo;
create database bar;
......
slave stop;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
slave start;
select master_pos_wait('master-bin.999999',0,10);
master_pos_wait('master-bin.999999',0,10)
-1
......@@ -9,6 +9,7 @@ create database bar;
save_master_pos;
connection slave;
sync_with_master;
create database foo;
drop table if exists foo.foo;
create table foo.foo (n int);
insert into foo.foo values(4);
......@@ -24,13 +25,14 @@ connection slave;
sync_with_master;
select foo.foo.n,bar.bar.m from foo.foo,bar.bar;
connection master;
drop database if exists bar;
drop database bar;
drop database if exists foo;
save_master_pos;
connection slave;
sync_with_master;
drop database if exists bar;
drop database if exists foo;
--error 1008
drop database bar;
drop database foo;
# Now let's test load data from master
......
# See if master_pos_wait(,,timeout)
# Terminates with "timeout expired" (-1)
source include/master-slave.inc;
save_master_pos;
connection slave;
sync_with_master;
# Ask for a master log that has certainly not been reached yet
# timeout= 10 seconds
select master_pos_wait('master-bin.999999',0,10);
......@@ -424,12 +424,6 @@ Item *create_load_file(Item* a)
return new Item_load_file(a);
}
Item *create_wait_for_master_pos(Item* a, Item* b)
{
current_thd->safe_to_cache_query=0;
return new Item_master_pos_wait(a, b);
}
Item *create_func_cast(Item *a, Item_cast cast_type)
{
Item *res;
......
......@@ -92,6 +92,5 @@ Item *create_func_ucase(Item* a);
Item *create_func_version(void);
Item *create_func_weekday(Item* a);
Item *create_load_file(Item* a);
Item *create_wait_for_master_pos(Item* a, Item* b);
Item *create_func_is_free_lock(Item* a);
Item *create_func_quote(Item* a);
......@@ -1549,9 +1549,10 @@ longlong Item_master_pos_wait::val_int()
null_value = 1;
return 0;
}
ulong pos = (ulong)args[1]->val_int();
longlong pos = args[1]->val_int();
longlong timeout = (arg_count==3) ? args[2]->val_int() : 0 ;
LOCK_ACTIVE_MI;
if ((event_count = active_mi->rli.wait_for_pos(thd, log_name, pos)) == -1)
if ((event_count = active_mi->rli.wait_for_pos(thd, log_name, pos, timeout)) == -2)
{
null_value = 1;
event_count=0;
......
......@@ -865,9 +865,10 @@ class Item_master_pos_wait :public Item_int_func
String value;
public:
Item_master_pos_wait(Item *a,Item *b) :Item_int_func(a,b) {}
Item_master_pos_wait(Item *a,Item *b,Item *c) :Item_int_func(a,b,c) {}
longlong val_int();
const char *func_name() const { return "master_pos_wait"; }
void fix_length_and_dec() { max_length=1; maybe_null=1;}
void fix_length_and_dec() { max_length=21; maybe_null=1;}
unsigned int size_of() { return sizeof(*this);}
};
......
......@@ -462,9 +462,8 @@ static SYMBOL sql_functions[] = {
{ "LOWER", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_lcase)},
{ "LPAD", SYM(FUNC_ARG3),0,CREATE_FUNC(create_func_lpad)},
{ "LTRIM", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_ltrim)},
{ "MASTER_POS_WAIT", SYM(FUNC_ARG2),0,
CREATE_FUNC(create_wait_for_master_pos)},
{ "MAKE_SET", SYM(MAKE_SET_SYM),0,0},
{ "MASTER_POS_WAIT", SYM(MASTER_POS_WAIT),0,0},
{ "MAX", SYM(MAX_SYM),0,0},
{ "MD5", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_md5)},
{ "MID", SYM(SUBSTRING),0,0}, /* unireg function */
......
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
/* Copyright (C) 2000-2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -586,40 +586,118 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
return 0;
}
/*
Checks whether tables match some (wild_)do_table and (wild_)ignore_table
rules (for replication)
SYNOPSIS
tables_ok()
thd thread (SQL slave thread normally)
tables list of tables to check
NOTES
Note that changing the order of the tables in the list can lead to
different results. Note also the order of precedence of the do/ignore
rules (see code below). For that reason, users should not set conflicting
rules because they may get unpredicted results.
RETURN VALUES
0 should not be logged/replicated
1 should be logged/replicated
*/
int tables_ok(THD* thd, TABLE_LIST* tables)
{
DBUG_ENTER("tables_ok");
for (; tables; tables = tables->next)
{
char hash_key[2*NAME_LEN+2];
char *end;
uint len;
if (!tables->updating)
continue;
char hash_key[2*NAME_LEN+2];
char* p;
p = strmov(hash_key, tables->db ? tables->db : thd->db);
*p++ = '.';
uint len = strmov(p, tables->real_name) - hash_key ;
end= strmov(hash_key, tables->db ? tables->db : thd->db);
*end++= '.';
len= (uint) (strmov(end, tables->real_name) - hash_key);
if (do_table_inited) // if there are any do's
{
if (hash_search(&replicate_do_table, (byte*) hash_key, len))
return 1;
DBUG_RETURN(1);
}
if (ignore_table_inited) // if there are any ignores
{
if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
return 0;
DBUG_RETURN(0);
}
if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
hash_key, len))
return 1;
DBUG_RETURN(1);
if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
hash_key, len))
return 0;
DBUG_RETURN(0);
}
/*
If no explicit rule found and there was a do list, do not replicate.
If there was no do list, go ahead
*/
return !do_table_inited && !wild_do_table_inited;
DBUG_RETURN(!do_table_inited && !wild_do_table_inited);
}
/*
Checks whether a db matches wild_do_table and wild_ignore_table
rules (for replication)
SYNOPSIS
db_ok_with_wild_table()
db name of the db to check.
Is tested with check_db_name() before calling this function.
NOTES
Here is the reason for this function.
We advise users who want to exclude a database 'db1' safely to do it
with replicate_wild_ignore_table='db1.%' instead of binlog_ignore_db or
replicate_ignore_db because the two lasts only check for the selected db,
which won't work in that case:
USE db2;
UPDATE db1.t SET ... #this will be replicated and should not
whereas replicate_wild_ignore_table will work in all cases.
With replicate_wild_ignore_table, we only check tables. When
one does 'DROP DATABASE db1', tables are not involved and the
statement will be replicated, while users could expect it would not (as it
rougly means 'DROP db1.first_table, DROP db1.second_table...').
In other words, we want to interpret 'db1.%' as "everything touching db1".
That is why we want to match 'db1' against 'db1.%' wild table rules.
RETURN VALUES
0 should not be logged/replicated
1 should be logged/replicated
*/
int db_ok_with_wild_table(const char *db)
{
char hash_key[NAME_LEN+2];
char *end;
int len;
end= strmov(hash_key, db);
*end++= '.';
len= end - hash_key ;
if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
hash_key, len))
return 1;
if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
hash_key, len))
return 0;
/*
If no explicit rule found and there was a do list, do not replicate.
If there was no do list, go ahead
*/
return !wild_do_table_inited;
}
......@@ -750,6 +828,21 @@ char* rewrite_db(char* db)
}
/*
Checks whether a db matches some do_db and ignore_db rules
(for logging or replication)
SYNOPSIS
db_ok()
db name of the db to check
do_list either binlog_do_db or replicate_do_db
ignore_list either binlog_ignore_db or replicate_ignore_db
RETURN VALUES
0 should not be logged/replicated
1 should be logged/replicated
*/
int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list )
{
......@@ -1470,62 +1563,171 @@ bool flush_master_info(MASTER_INFO* mi)
DBUG_RETURN(0);
}
/*
Waits until the SQL thread reaches (has executed up to) the
log/position or timed out.
SYNOPSIS
wait_for_pos()
thd client thread that sent SELECT MASTER_POS_WAIT
log_name log name to wait for
log_pos position to wait for
timeout timeout in seconds before giving up waiting
NOTES
timeout is longlong whereas it should be ulong ; but this is
to catch if the user submitted a negative timeout.
RETURN VALUES
-2 improper arguments (log_pos<0)
or slave not running, or master info changed
during the function's execution,
or client thread killed. -2 is translated to NULL by caller
-1 timed out
>=0 number of log events the function had to wait
before reaching the desired log/position
*/
int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
ulonglong log_pos)
longlong log_pos,
longlong timeout)
{
if (!inited)
return -1;
int event_count = 0;
ulong init_abort_pos_wait;
int error=0;
struct timespec abstime; // for timeout checking
set_timespec(abstime,timeout);
DBUG_ENTER("wait_for_pos");
DBUG_PRINT("enter",("master_log_name: '%s' pos: %ld",
master_log_name, (ulong) master_log_pos));
DBUG_PRINT("enter",("master_log_name: '%s' pos: %lu timeout: %ld",
master_log_name, (ulong) master_log_pos,
(long) timeout));
pthread_mutex_lock(&data_lock);
// abort only if master info changes during wait
/*
This function will abort when it notices that
some CHANGE MASTER or RESET MASTER has changed
the master info. To catch this, these commands
modify abort_pos_wait ; we just monitor abort_pos_wait
and see if it has changed.
*/
init_abort_pos_wait= abort_pos_wait;
/*
We'll need to
handle all possible log names comparisons (e.g. 999 vs 1000).
We use ulong for string->number conversion ; this is no
stronger limitation than in find_uniq_filename in sql/log.cc
*/
ulong log_name_extension;
char log_name_tmp[FN_REFLEN]; //make a char[] from String
char *end= strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));
char *p= fn_ext(log_name_tmp);
char *p_end;
if (!*p || log_pos<0)
{
error= -2; //means improper arguments
goto err;
}
//p points to '.'
log_name_extension= strtoul(++p, &p_end, 10);
/*
p_end points to the first invalid character.
If it equals to p, no digits were found, error.
If it contains '\0' it means conversion went ok.
*/
if (p_end==p || *p_end)
{
error= -2;
goto err;
}
//"compare and wait" main loop
while (!thd->killed &&
init_abort_pos_wait == abort_pos_wait &&
mi->slave_running)
init_abort_pos_wait == abort_pos_wait &&
mi->slave_running)
{
bool pos_reached;
int cmp_result= 0;
DBUG_ASSERT(*master_log_name || master_log_pos == 0);
if (*master_log_name)
{
char *basename= master_log_name + dirname_length(master_log_name);
/*
TODO:
Replace strncmp() with a comparison function that
can handle comparison of the following files:
mysqlbin.999
mysqlbin.1000
First compare the parts before the extension.
Find the dot in the master's log basename,
and protect against user's input error :
if the names do not match up to '.' included, return error
*/
char *basename= master_log_name + dirname_length(master_log_name);
cmp_result = strncmp(basename, log_name->ptr(),
log_name->length());
char *q= (char*)(fn_ext(basename)+1);
if (strncmp(basename, log_name_tmp, (int)(q-basename)))
{
error= -2;
break;
}
// Now compare extensions.
char *q_end;
ulong master_log_name_extension= strtoul(q, &q_end, 10);
if (master_log_name_extension < log_name_extension)
cmp_result = -1 ;
else
cmp_result= (master_log_name_extension > log_name_extension) ? 1 : 0 ;
}
pos_reached = ((!cmp_result && master_log_pos >= log_pos) ||
cmp_result > 0);
pos_reached = ((!cmp_result && master_log_pos >= (ulonglong)log_pos) ||
cmp_result > 0);
if (pos_reached || thd->killed)
break;
//wait for master update, with optional timeout.
DBUG_PRINT("info",("Waiting for master update"));
const char* msg = thd->enter_cond(&data_cond, &data_lock,
"Waiting for master update");
pthread_cond_wait(&data_cond, &data_lock);
"Waiting for master update");
if (timeout > 0)
{
/*
Note that pthread_cond_timedwait checks for the timeout
before for the condition ; i.e. it returns ETIMEDOUT
if the system time equals or exceeds the time specified by abstime
before the condition variable is signaled or broadcast, _or_ if
the absolute time specified by abstime has already passed at the time
of the call.
For that reason, pthread_cond_timedwait will do the "timeoutting" job
even if its condition is always immediately signaled (case of a loaded
master).
*/
error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
}
else
pthread_cond_wait(&data_cond, &data_lock);
thd->exit_cond(msg);
if (error == ETIMEDOUT || error == ETIME)
{
error= -1;
break;
}
else
error=0;
event_count++;
}
err:
pthread_mutex_unlock(&data_lock);
DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d",
(int) thd->killed,
(int) (init_abort_pos_wait != abort_pos_wait),
(int) mi->slave_running));
DBUG_RETURN((thd->killed || init_abort_pos_wait != abort_pos_wait ||
!mi->slave_running) ?
-1 : event_count);
DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
improper_arguments: %d timed_out: %d",
(int) thd->killed,
(int) (init_abort_pos_wait != abort_pos_wait),
(int) mi->slave_running,
(int) (error == -2),
(int) (error == -1)));
if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
!mi->slave_running)
{
error= -2;
}
DBUG_RETURN( error ? error : event_count );
}
......
......@@ -226,7 +226,8 @@ typedef struct st_relay_log_info
pthread_mutex_unlock(&data_lock);
}
int wait_for_pos(THD* thd, String* log_name, ulonglong log_pos);
int wait_for_pos(THD* thd, String* log_name, longlong log_pos,
longlong timeout);
} RELAY_LOG_INFO;
......@@ -390,6 +391,7 @@ int tables_ok(THD* thd, TABLE_LIST* tables);
*/
int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list );
int db_ok_with_wild_table(const char *db);
int add_table_rule(HASH* h, const char* table_spec);
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec);
......
......@@ -2253,6 +2253,18 @@ mysql_execute_command(void)
}
if (lower_case_table_names)
casedn_str(lex->name);
/*
If in a slave thread :
CREATE DATABASE DB was certainly not preceded by USE DB.
For that reason, db_ok() in sql/slave.cc did not check the
do_db/ignore_db. And as this query involves no tables, tables_ok()
above was not called. So we have to check rules again here.
*/
if (thd->slave_thread &&
(!db_ok(lex->name, replicate_do_db, replicate_ignore_db) ||
!db_ok_with_wild_table(lex->name)))
break;
if (check_access(thd,CREATE_ACL,lex->name,0,1))
break;
res=mysql_create_db(thd,lex->name,lex->create_info.options,0);
......@@ -2267,6 +2279,17 @@ mysql_execute_command(void)
}
if (lower_case_table_names)
casedn_str(lex->name);
/*
If in a slave thread :
DROP DATABASE DB may not be preceded by USE DB.
For that reason, maybe db_ok() in sql/slave.cc did not check the
do_db/ignore_db. And as this query involves no tables, tables_ok()
above was not called. So we have to check rules again here.
*/
if (thd->slave_thread &&
(!db_ok(lex->name, replicate_do_db, replicate_ignore_db) ||
!db_ok_with_wild_table(lex->name)))
break;
if (check_access(thd,DROP_ACL,lex->name,0,1))
break;
if (thd->locked_tables || thd->active_transaction())
......
......@@ -925,18 +925,17 @@ int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
const char* log_file_name2, ulonglong log_pos2)
{
int res;
/*
TODO: Change compare function to work with file name of type
'.999 and .1000'
*/
uint log_file_name1_len= strlen(log_file_name1);
uint log_file_name2_len= strlen(log_file_name2);
if ((res = strcmp(log_file_name1, log_file_name2)))
return res;
if (log_pos1 > log_pos2)
return 1;
else if (log_pos1 == log_pos2)
return 0;
return -1;
// We assume that both log names match up to '.'
if (log_file_name1_len == log_file_name2_len)
{
if ((res= strcmp(log_file_name1, log_file_name2)))
return res;
return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
}
return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
}
......
......@@ -426,6 +426,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token LEFT
%token LOCATE
%token MAKE_SET_SYM
%token MASTER_POS_WAIT
%token MINUTE_SECOND_SYM
%token MINUTE_SYM
%token MODE_SYM
......@@ -1855,6 +1856,16 @@ simple_expr:
{ $$= new Item_func_log($3); }
| LOG_SYM '(' expr ',' expr ')'
{ $$= new Item_func_log($3, $5); }
| MASTER_POS_WAIT '(' expr ',' expr ')'
{
$$= new Item_master_pos_wait($3, $5);
current_thd->safe_to_cache_query=0;
}
| MASTER_POS_WAIT '(' expr ',' expr ',' expr ')'
{
$$= new Item_master_pos_wait($3, $5, $7);
current_thd->safe_to_cache_query=0;
}
| MINUTE_SYM '(' expr ')'
{ $$= new Item_func_minute($3); }
| MONTH_SYM '(' expr ')'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment