Commit 1dd53ed0 authored by guilhem@mysql.com's avatar guilhem@mysql.com

First commit for fixing BUG#1100

"LOAD DATA INFILE is badly filtered by binlog-*-db rules".
There will probably be a second final one to merge Dmitri's changes
to rpl_log.result and mine.
2 new tests:
rpl_loaddata_rule_m : test of logging of LOAD DATA INFILE when the master has binlog-*-db rules,
rpl_loaddata_rule_s : test of logging of LOAD DATA INFILE when the slave has binlog-*-db rules and --log-slave-updates.
parent 6e32e190
......@@ -4,6 +4,7 @@ reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
slave start;
reset master;
create table t1(a int not null auto_increment, b int, primary key(a) );
load data infile '../../std_data/rpl_loaddata.dat' into table t1;
create temporary table t2 (day date,id int(9),category enum('a','b','c'),name varchar(60));
......@@ -19,6 +20,9 @@ day id category name
2003-02-22 2461 b a a a @ %  ' " a
2003-03-22 2161 c asdf
2003-04-22 2416 a bbbbb
show binlog events from 898;
Log_name Pos Event_type Server_id Orig_log_pos Info
slave-bin.001 898 Query 1 895 use test; insert into t3 select * from t2
drop table t1;
drop table t2;
drop table t3;
......
slave stop;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
slave start;
reset master;
create database test2;
create table t1(a int, b int, unique(b));
use test2;
load data infile '../../std_data/rpl_loaddata.dat' into table test.t1;
show binlog events from 79;
Log_name Pos Event_type Server_id Orig_log_pos Info
drop database test2;
slave stop;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
slave start;
reset master;
create table t1(a int, b int, unique(b));
load data infile '../../std_data/rpl_loaddata.dat' into table test.t1;
select count(*) from t1;
count(*)
2
show binlog events from 79;
Log_name Pos Event_type Server_id Orig_log_pos Info
......@@ -12,6 +12,10 @@
source include/master-slave.inc;
connection slave;
reset master;
connection master;
create table t1(a int not null auto_increment, b int, primary key(a) );
load data infile '../../std_data/rpl_loaddata.dat' into table t1;
......@@ -27,6 +31,16 @@ sync_with_master;
select * from t1;
select * from t3;
# We want to be sure that LOAD DATA is in the slave's binlog.
# But we can't simply read this binlog, because the file_id is uncertain (would
# cause test failures). So instead, we test if the binlog looks long enough to
# contain LOAD DATA. That is, I (Guilhem) have done SHOW BINLOG EVENTS on my
# machine, saw that the last event is 'create table t3' and is at position 898
# when things go fine. If LOAD DATA was not logged, the binlog would be shorter
# than 898 bytes and there would be an error in SHOW BINLOG EVENTS. Of course,
# if someone changes the content of '../../std_data/rpl_loaddata2.dat', 898 will
# have to be changed too.
show binlog events from 898;
connection master;
......@@ -38,6 +52,9 @@ create table t1(a int, b int, unique(b));
save_master_pos;
connection slave;
sync_with_master;
# See if slave stops when there's a duplicate entry for key error in LOAD DATA
insert into t1 values(1,10);
connection master;
......
# See if the master logs LOAD DATA INFILE correctly when binlog_*_db rules
# exist.
# This is for BUG#1100 (LOAD DATA INFILE was half-logged).
source include/master-slave.inc;
connection slave;
reset master;
# Test logging on master
connection master;
# 'test' is the current database
create database test2;
create table t1(a int, b int, unique(b));
use test2;
load data infile '../../std_data/rpl_loaddata.dat' into table test.t1;
show binlog events from 79; # should be nothing
drop database test2;
# See if the slave logs (in its own binlog, with --log-slave-updates) a
# replicated LOAD DATA INFILE correctly when it has binlog_*_db rules.
# This is for BUG#1100 (LOAD DATA INFILE was half-logged).
source include/master-slave.inc;
connection slave;
reset master;
connection master;
# 'test' is the current database
create table t1(a int, b int, unique(b));
load data infile '../../std_data/rpl_loaddata.dat' into table test.t1;
# Test logging on slave;
save_master_pos;
connection slave;
sync_with_master;
select count(*) from t1; # check that LOAD was replicated
show binlog events from 79; # should be nothing
......@@ -1067,6 +1067,7 @@ bool MYSQL_LOG::write(Log_event* event_info)
#else
IO_CACHE *file = &log_file;
#endif
DBUG_PRINT("info",("event type=%d",event_info->get_type_code()));
/*
In the future we need to add to the following if tests like
"do the involved tables match (to be implemented)
......
......@@ -1606,11 +1606,12 @@ void Create_file_log_event::pack_info(String* packet)
#endif
#ifndef MYSQL_CLIENT
Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg,
Append_block_log_event::Append_block_log_event(THD* thd_arg, const char* db_arg,
char* block_arg,
uint block_len_arg,
bool using_trans)
:Log_event(thd_arg,0, using_trans), block(block_arg),
block_len(block_len_arg), file_id(thd_arg->file_id)
block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg)
{
}
#endif
......@@ -1654,8 +1655,9 @@ void Append_block_log_event::pack_info(String* packet)
net_store_data(packet, buf1);
}
Delete_file_log_event::Delete_file_log_event(THD* thd_arg, bool using_trans)
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id)
Delete_file_log_event::Delete_file_log_event(THD* thd_arg, const char* db_arg,
bool using_trans)
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
{
}
#endif
......@@ -1700,8 +1702,9 @@ void Delete_file_log_event::pack_info(String* packet)
#ifndef MYSQL_CLIENT
Execute_load_log_event::Execute_load_log_event(THD* thd_arg, bool using_trans)
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id)
Execute_load_log_event::Execute_load_log_event(THD* thd_arg, const char* db_arg,
bool using_trans)
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
{
}
#endif
......@@ -1906,6 +1909,18 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
thd->query = 0; // Should not be needed
thd->query_error = 0;
/*
We test replicate_*_db rules. Note that we have already prepared the file to
load, even if we are going to ignore and delete it now. So it is possible
that we did a lot of disk writes for nothing. In other words, a big LOAD
DATA INFILE on the master will still consume a lot of space on the slave
(space in the relay log + space of temp files: twice the space of the file
to load...) even if it will finally be ignored.
TODO: fix this; this can be done by testing rules in
Create_file_log_event::exec_event() and then discarding Append_block and
al. Another way is do the filtering in the I/O thread (more efficient: no
disk writes at all).
*/
if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->set_time((time_t)when);
......@@ -2210,7 +2225,7 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf);
goto err;
}
......@@ -2221,7 +2236,7 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
if (write_base(&file))
{
strmov(p, ".info"); // to have it right in the error message
slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf);
slave_print_error(rli,my_errno, "Error in Create_file event: could not write to file '%s'", fname_buf);
goto err;
}
end_io_cache(&file);
......@@ -2231,16 +2246,14 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
MYF(MY_WME))) < 0)
{
slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf);
slave_print_error(rli,my_errno, "Error in Create_file event: write to '%s' failed", fname_buf);
goto err;
}
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
error=0; // Everything is ok
err:
......@@ -2259,8 +2272,6 @@ int Delete_file_log_event::exec_event(struct st_relay_log_info* rli)
(void) my_delete(fname, MYF(MY_WME));
memcpy(p, ".info", 6);
(void) my_delete(fname, MYF(MY_WME));
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
return Log_event::exec_event(rli);
}
......@@ -2274,16 +2285,14 @@ int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
memcpy(p, ".data", 6);
if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0)
{
slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
slave_print_error(rli,my_errno, "Error in Append_block event: could not open file '%s'", fname);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
slave_print_error(rli,my_errno, "Write to '%s' failed", fname);
slave_print_error(rli,my_errno, "Error in Append_block event: write to '%s' failed", fname);
goto err;
}
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
error=0;
err:
......@@ -2298,7 +2307,6 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
char *p= slave_load_file_stem(fname, file_id, server_id);
int fd;
int error = 1;
ulong save_options;
IO_CACHE file;
Load_log_event* lev = 0;
......@@ -2307,7 +2315,7 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
slave_print_error(rli,my_errno, "Error in Exec_load event: could not open file '%s'", fname);
goto err;
}
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
......@@ -2315,21 +2323,16 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
(bool)0)) ||
lev->get_type_code() != NEW_LOAD_EVENT)
{
slave_print_error(rli,0, "File '%s' appears corrupted", fname);
slave_print_error(rli,0, "Error in Exec_load event: file '%s' appears corrupted", fname);
goto err;
}
/*
We want to disable binary logging in slave thread because we need the file
events to appear in the same order as they do on the master relative to
other events, so that we can preserve ascending order of log sequence
numbers - needed to handle failover .
*/
save_options = thd->options;
thd->options &= ~ (ulong) (OPTION_BIN_LOG);
lev->thd = thd;
/*
lev->exec_event should use rli only for errors
i.e. should not advance rli's position
i.e. should not advance rli's position.
lev->exec_event is the place where the table is loaded (it calls
mysql_load()).
*/
if (lev->exec_event(0,rli,1))
{
......@@ -2350,15 +2353,11 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
tmp, fname);
my_free(tmp,MYF(0));
}
thd->options= save_options;
goto err;
}
thd->options = save_options;
(void) my_delete(fname, MYF(MY_WME));
memcpy(p, ".data", 6);
(void) my_delete(fname, MYF(MY_WME));
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
error = 0;
err:
......
......@@ -687,9 +687,20 @@ public:
char* block;
uint block_len;
uint file_id;
/*
'db' is filled when the event is created in mysql_load() (the event needs to
have a 'db' member to be well filtered by binlog-*-db rules). 'db' is not
written to the binlog (it's not used by Append_block_log_event::write()), so
it can't be read in the Append_block_log_event(const char* buf, int
event_len) constructor.
In other words, 'db' is used only for filtering by binlog-*-db rules.
Create_file_log_event is different: its 'db' (which is inherited from
Load_log_event) is written to the binlog and can be re-read.
*/
const char* db;
#ifndef MYSQL_CLIENT
Append_block_log_event(THD* thd, char* block_arg,
Append_block_log_event(THD* thd, const char* db_arg, char* block_arg,
uint block_len_arg, bool using_trans);
int exec_event(struct st_relay_log_info* rli);
void pack_info(String* packet);
......@@ -703,6 +714,7 @@ public:
int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;}
bool is_valid() { return block != 0; }
int write_data(IO_CACHE* file);
const char* get_db() { return db; }
};
......@@ -710,9 +722,10 @@ class Delete_file_log_event: public Log_event
{
public:
uint file_id;
const char* db; /* see comment in Append_block_log_event */
#ifndef MYSQL_CLIENT
Delete_file_log_event(THD* thd, bool using_trans);
Delete_file_log_event(THD* thd, const char* db_arg, bool using_trans);
void pack_info(String* packet);
int exec_event(struct st_relay_log_info* rli);
#else
......@@ -725,15 +738,17 @@ public:
int get_data_size() { return DELETE_FILE_HEADER_LEN ;}
bool is_valid() { return file_id != 0; }
int write_data(IO_CACHE* file);
const char* get_db() { return db; }
};
class Execute_load_log_event: public Log_event
{
public:
uint file_id;
const char* db; /* see comment in Append_block_log_event */
#ifndef MYSQL_CLIENT
Execute_load_log_event(THD* thd, bool using_trans);
Execute_load_log_event(THD* thd, const char* db_arg, bool using_trans);
void pack_info(String* packet);
int exec_event(struct st_relay_log_info* rli);
#else
......@@ -746,6 +761,7 @@ public:
int get_data_size() { return EXEC_LOAD_HEADER_LEN ;}
bool is_valid() { return file_id != 0; }
int write_data(IO_CACHE* file);
const char* get_db() { return db; }
};
#ifdef MYSQL_CLIENT
......
......@@ -57,7 +57,6 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
void skip_load_data_infile(NET* net);
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
......@@ -2730,102 +2729,6 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
}
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
int error = 1;
ulong num_bytes;
bool cev_not_written;
THD* thd;
NET* net = &mi->mysql->net;
DBUG_ENTER("process_io_create_file");
if (unlikely(!cev->is_valid()))
DBUG_RETURN(1);
/*
TODO: fix to honor table rules, not only db rules
*/
if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
{
skip_load_data_infile(net);
DBUG_RETURN(0);
}
DBUG_ASSERT(cev->inited_from_old);
thd = mi->io_thd;
thd->file_id = cev->file_id = mi->file_id++;
thd->server_id = cev->server_id;
cev_not_written = 1;
if (unlikely(net_request_file(net,cev->fname)))
{
sql_print_error("Slave I/O: failed requesting download of '%s'",
cev->fname);
goto err;
}
/*
This dummy block is so we could instantiate Append_block_log_event
once and then modify it slightly instead of doing it multiple times
in the loop
*/
{
Append_block_log_event aev(thd,0,0,0);
for (;;)
{
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
{
sql_print_error("Network read error downloading '%s' from master",
cev->fname);
goto err;
}
if (unlikely(!num_bytes)) /* eof */
{
send_ok(net); /* 3.23 master wants it */
Execute_load_log_event xev(thd,0);
xev.log_pos = mi->master_log_pos;
if (unlikely(mi->rli.relay_log.append(&xev)))
{
sql_print_error("Slave I/O: error writing Exec_load event to \
relay log");
goto err;
}
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
break;
}
if (unlikely(cev_not_written))
{
cev->block = (char*)net->read_pos;
cev->block_len = num_bytes;
cev->log_pos = mi->master_log_pos;
if (unlikely(mi->rli.relay_log.append(cev)))
{
sql_print_error("Slave I/O: error writing Create_file event to \
relay log");
goto err;
}
cev_not_written=0;
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
}
else
{
aev.block = (char*)net->read_pos;
aev.block_len = num_bytes;
aev.log_pos = mi->master_log_pos;
if (unlikely(mi->rli.relay_log.append(&aev)))
{
sql_print_error("Slave I/O: error writing Append_block event to \
relay log");
goto err;
}
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
}
}
}
error=0;
err:
DBUG_RETURN(error);
}
/*
Start using a new binary log on the master
......@@ -2929,18 +2832,6 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
mi->ignore_stop_event=1;
inc_pos= 0;
break;
case CREATE_FILE_EVENT:
{
/* We come here when and only when tmp_buf != 0 */
DBUG_ASSERT(tmp_buf);
int error = process_io_create_file(mi,(Create_file_log_event*)ev);
delete ev;
mi->master_log_pos += event_len;
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
pthread_mutex_unlock(&mi->data_lock);
my_free((char*)tmp_buf, MYF(0));
DBUG_RETURN(error);
}
default:
mi->ignore_stop_event=0;
inc_pos= event_len;
......
......@@ -299,7 +299,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
which is nonsense.
*/
read_info.end_io_cache();
Delete_file_log_event d(thd, log_delayed);
Delete_file_log_event d(thd, db, log_delayed);
mysql_bin_log.write(&d);
}
}
......@@ -331,7 +331,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
read_info.end_io_cache(); // make sure last block gets logged
if (lf_info.wrote_create_file)
{
Execute_load_log_event e(thd, log_delayed);
Execute_load_log_event e(thd, db, log_delayed);
mysql_bin_log.write(&e);
}
}
......
......@@ -1216,7 +1216,7 @@ int log_loaded_block(IO_CACHE* file)
lf_info->last_pos_in_file = file->pos_in_file;
if (lf_info->wrote_create_file)
{
Append_block_log_event a(lf_info->thd, buffer, block_len,
Append_block_log_event a(lf_info->thd, lf_info->db, buffer, block_len,
lf_info->log_delayed);
mysql_bin_log.write(&a);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment