Commit 1542fffb authored by unknown's avatar unknown

First commit for fixing BUG#1100

"LOAD DATA INFILE is badly filtered by binlog-*-db rules".
There will probably be a second final one to merge Dmitri's changes
to rpl_log.result and mine.
2 new tests:
rpl_loaddata_rule_m : test of logging of LOAD DATA INFILE when the master has binlog-*-db rules,
rpl_loaddata_rule_s : test of logging of LOAD DATA INFILE when the slave has binlog-*-db rules and --log-slave-updates.


mysql-test/r/rpl_loaddata.result:
  Test that logging of LOAD DATA INFILE is done on the slave
mysql-test/t/rpl_loaddata.test:
  Test that logging of LOAD DATA is done on the slave
sql/log.cc:
  debug info
sql/log_event.cc:
  * Append_block, Exec_load and Delete_file now have a member 'db' like Create_file.
  This member is filled by mysql_load(). It is used for filtering by binlog-*-db rules,
  that's all. It's not written to the binlog, and so can't be read from the binlog.
  In other words, that's temporary info which is stored in the event and lost when
  it is written and deleted.
  * Better error messages in Append_block et al. events.
  * The slave now logs (log-slave-updates) the Create_file et al. events in mysql_load()
  (they are not directly copied from the events in the relay log, because this
  prevented filtering by binlog-*-db rules). Before, mysql_load() in the slave
  did no logging, now it does the logging, as in any regular thread.
sql/log_event.h:
  New member 'db' for Append_block et al. events.
sql/slave.cc:
  Removed useless code. Why was it useless:
  - CREATE_FILE_EVENT is not defined in 3.23. It appeared in 4.0.
  - in queue_old_event(), which is called only if the master is 3.23, we had a
  case CREATE_FILE_EVENT:
  so this case can be removed.
  - this case was the only caller of process_io_create_file() so this function
  can be removed.
sql/sql_load.cc:
  Pass the db to events, so that they can be well filtered.
sql/sql_repl.cc:
  Pass the db to events so that they can be well filtered.
parent d43a347d
...@@ -4,6 +4,7 @@ reset master; ...@@ -4,6 +4,7 @@ reset master;
reset slave; reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
slave start; slave start;
reset master;
create table t1(a int not null auto_increment, b int, primary key(a) ); create table t1(a int not null auto_increment, b int, primary key(a) );
load data infile '../../std_data/rpl_loaddata.dat' into table t1; load data infile '../../std_data/rpl_loaddata.dat' into table t1;
create temporary table t2 (day date,id int(9),category enum('a','b','c'),name varchar(60)); create temporary table t2 (day date,id int(9),category enum('a','b','c'),name varchar(60));
...@@ -19,6 +20,9 @@ day id category name ...@@ -19,6 +20,9 @@ day id category name
2003-02-22 2461 b a a a @ %  ' " a 2003-02-22 2461 b a a a @ %  ' " a
2003-03-22 2161 c asdf 2003-03-22 2161 c asdf
2003-04-22 2416 a bbbbb 2003-04-22 2416 a bbbbb
show binlog events from 898;
Log_name Pos Event_type Server_id Orig_log_pos Info
slave-bin.001 898 Query 1 895 use test; insert into t3 select * from t2
drop table t1; drop table t1;
drop table t2; drop table t2;
drop table t3; drop table t3;
......
slave stop;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
slave start;
reset master;
create database test2;
create table t1(a int, b int, unique(b));
use test2;
load data infile '../../std_data/rpl_loaddata.dat' into table test.t1;
show binlog events from 79;
Log_name Pos Event_type Server_id Orig_log_pos Info
drop database test2;
slave stop;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
slave start;
reset master;
create table t1(a int, b int, unique(b));
load data infile '../../std_data/rpl_loaddata.dat' into table test.t1;
select count(*) from t1;
count(*)
2
show binlog events from 79;
Log_name Pos Event_type Server_id Orig_log_pos Info
...@@ -12,6 +12,10 @@ ...@@ -12,6 +12,10 @@
source include/master-slave.inc; source include/master-slave.inc;
connection slave;
reset master;
connection master;
create table t1(a int not null auto_increment, b int, primary key(a) ); create table t1(a int not null auto_increment, b int, primary key(a) );
load data infile '../../std_data/rpl_loaddata.dat' into table t1; load data infile '../../std_data/rpl_loaddata.dat' into table t1;
...@@ -27,6 +31,16 @@ sync_with_master; ...@@ -27,6 +31,16 @@ sync_with_master;
select * from t1; select * from t1;
select * from t3; select * from t3;
# We want to be sure that LOAD DATA is in the slave's binlog.
# But we can't simply read this binlog, because the file_id is uncertain (would
# cause test failures). So instead, we test if the binlog looks long enough to
# contain LOAD DATA. That is, I (Guilhem) have done SHOW BINLOG EVENTS on my
# machine, saw that the last event is 'create table t3' and is at position 898
# when things go fine. If LOAD DATA was not logged, the binlog would be shorter
# than 898 bytes and there would be an error in SHOW BINLOG EVENTS. Of course,
# if someone changes the content of '../../std_data/rpl_loaddata2.dat', 898 will
# have to be changed too.
show binlog events from 898;
connection master; connection master;
...@@ -38,6 +52,9 @@ create table t1(a int, b int, unique(b)); ...@@ -38,6 +52,9 @@ create table t1(a int, b int, unique(b));
save_master_pos; save_master_pos;
connection slave; connection slave;
sync_with_master; sync_with_master;
# See if slave stops when there's a duplicate entry for key error in LOAD DATA
insert into t1 values(1,10); insert into t1 values(1,10);
connection master; connection master;
......
# See if the master logs LOAD DATA INFILE correctly when binlog_*_db rules
# exist.
# This is for BUG#1100 (LOAD DATA INFILE was half-logged).
source include/master-slave.inc;
connection slave;
reset master;
# Test logging on master
connection master;
# 'test' is the current database
create database test2;
create table t1(a int, b int, unique(b));
use test2;
load data infile '../../std_data/rpl_loaddata.dat' into table test.t1;
show binlog events from 79; # should be nothing
drop database test2;
# See if the slave logs (in its own binlog, with --log-slave-updates) a
# replicated LOAD DATA INFILE correctly when it has binlog_*_db rules.
# This is for BUG#1100 (LOAD DATA INFILE was half-logged).
source include/master-slave.inc;
connection slave;
reset master;
connection master;
# 'test' is the current database
create table t1(a int, b int, unique(b));
load data infile '../../std_data/rpl_loaddata.dat' into table test.t1;
# Test logging on slave;
save_master_pos;
connection slave;
sync_with_master;
select count(*) from t1; # check that LOAD was replicated
show binlog events from 79; # should be nothing
...@@ -1067,6 +1067,7 @@ bool MYSQL_LOG::write(Log_event* event_info) ...@@ -1067,6 +1067,7 @@ bool MYSQL_LOG::write(Log_event* event_info)
#else #else
IO_CACHE *file = &log_file; IO_CACHE *file = &log_file;
#endif #endif
DBUG_PRINT("info",("event type=%d",event_info->get_type_code()));
/* /*
In the future we need to add to the following if tests like In the future we need to add to the following if tests like
"do the involved tables match (to be implemented) "do the involved tables match (to be implemented)
......
...@@ -1606,11 +1606,12 @@ void Create_file_log_event::pack_info(String* packet) ...@@ -1606,11 +1606,12 @@ void Create_file_log_event::pack_info(String* packet)
#endif #endif
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg, Append_block_log_event::Append_block_log_event(THD* thd_arg, const char* db_arg,
char* block_arg,
uint block_len_arg, uint block_len_arg,
bool using_trans) bool using_trans)
:Log_event(thd_arg,0, using_trans), block(block_arg), :Log_event(thd_arg,0, using_trans), block(block_arg),
block_len(block_len_arg), file_id(thd_arg->file_id) block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg)
{ {
} }
#endif #endif
...@@ -1654,8 +1655,9 @@ void Append_block_log_event::pack_info(String* packet) ...@@ -1654,8 +1655,9 @@ void Append_block_log_event::pack_info(String* packet)
net_store_data(packet, buf1); net_store_data(packet, buf1);
} }
Delete_file_log_event::Delete_file_log_event(THD* thd_arg, bool using_trans) Delete_file_log_event::Delete_file_log_event(THD* thd_arg, const char* db_arg,
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id) bool using_trans)
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
{ {
} }
#endif #endif
...@@ -1700,8 +1702,9 @@ void Delete_file_log_event::pack_info(String* packet) ...@@ -1700,8 +1702,9 @@ void Delete_file_log_event::pack_info(String* packet)
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Execute_load_log_event::Execute_load_log_event(THD* thd_arg, bool using_trans) Execute_load_log_event::Execute_load_log_event(THD* thd_arg, const char* db_arg,
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id) bool using_trans)
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
{ {
} }
#endif #endif
...@@ -1905,7 +1908,19 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, ...@@ -1905,7 +1908,19 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
DBUG_ASSERT(thd->query == 0); DBUG_ASSERT(thd->query == 0);
thd->query = 0; // Should not be needed thd->query = 0; // Should not be needed
thd->query_error = 0; thd->query_error = 0;
/*
We test replicate_*_db rules. Note that we have already prepared the file to
load, even if we are going to ignore and delete it now. So it is possible
that we did a lot of disk writes for nothing. In other words, a big LOAD
DATA INFILE on the master will still consume a lot of space on the slave
(space in the relay log + space of temp files: twice the space of the file
to load...) even if it will finally be ignored.
TODO: fix this; this can be done by testing rules in
Create_file_log_event::exec_event() and then discarding Append_block and
al. Another way is do the filtering in the I/O thread (more efficient: no
disk writes at all).
*/
if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{ {
thd->set_time((time_t)when); thd->set_time((time_t)when);
...@@ -2210,7 +2225,7 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2210,7 +2225,7 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0, init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP))) MYF(MY_WME|MY_NABP)))
{ {
slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf);
goto err; goto err;
} }
...@@ -2221,7 +2236,7 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2221,7 +2236,7 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
if (write_base(&file)) if (write_base(&file))
{ {
strmov(p, ".info"); // to have it right in the error message strmov(p, ".info"); // to have it right in the error message
slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf); slave_print_error(rli,my_errno, "Error in Create_file event: could not write to file '%s'", fname_buf);
goto err; goto err;
} }
end_io_cache(&file); end_io_cache(&file);
...@@ -2231,16 +2246,14 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2231,16 +2246,14 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC, if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
MYF(MY_WME))) < 0) MYF(MY_WME))) < 0)
{ {
slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf);
goto err; goto err;
} }
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{ {
slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf); slave_print_error(rli,my_errno, "Error in Create_file event: write to '%s' failed", fname_buf);
goto err; goto err;
} }
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
error=0; // Everything is ok error=0; // Everything is ok
err: err:
...@@ -2259,8 +2272,6 @@ int Delete_file_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2259,8 +2272,6 @@ int Delete_file_log_event::exec_event(struct st_relay_log_info* rli)
(void) my_delete(fname, MYF(MY_WME)); (void) my_delete(fname, MYF(MY_WME));
memcpy(p, ".info", 6); memcpy(p, ".info", 6);
(void) my_delete(fname, MYF(MY_WME)); (void) my_delete(fname, MYF(MY_WME));
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
return Log_event::exec_event(rli); return Log_event::exec_event(rli);
} }
...@@ -2274,16 +2285,14 @@ int Append_block_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2274,16 +2285,14 @@ int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
memcpy(p, ".data", 6); memcpy(p, ".data", 6);
if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0) if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0)
{ {
slave_print_error(rli,my_errno, "Could not open file '%s'", fname); slave_print_error(rli,my_errno, "Error in Append_block event: could not open file '%s'", fname);
goto err; goto err;
} }
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{ {
slave_print_error(rli,my_errno, "Write to '%s' failed", fname); slave_print_error(rli,my_errno, "Error in Append_block event: write to '%s' failed", fname);
goto err; goto err;
} }
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
error=0; error=0;
err: err:
...@@ -2298,7 +2307,6 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2298,7 +2307,6 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
char *p= slave_load_file_stem(fname, file_id, server_id); char *p= slave_load_file_stem(fname, file_id, server_id);
int fd; int fd;
int error = 1; int error = 1;
ulong save_options;
IO_CACHE file; IO_CACHE file;
Load_log_event* lev = 0; Load_log_event* lev = 0;
...@@ -2307,7 +2315,7 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2307,7 +2315,7 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0, init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP))) MYF(MY_WME|MY_NABP)))
{ {
slave_print_error(rli,my_errno, "Could not open file '%s'", fname); slave_print_error(rli,my_errno, "Error in Exec_load event: could not open file '%s'", fname);
goto err; goto err;
} }
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file, if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
...@@ -2315,21 +2323,16 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2315,21 +2323,16 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
(bool)0)) || (bool)0)) ||
lev->get_type_code() != NEW_LOAD_EVENT) lev->get_type_code() != NEW_LOAD_EVENT)
{ {
slave_print_error(rli,0, "File '%s' appears corrupted", fname); slave_print_error(rli,0, "Error in Exec_load event: file '%s' appears corrupted", fname);
goto err; goto err;
} }
/*
We want to disable binary logging in slave thread because we need the file
events to appear in the same order as they do on the master relative to
other events, so that we can preserve ascending order of log sequence
numbers - needed to handle failover .
*/
save_options = thd->options;
thd->options &= ~ (ulong) (OPTION_BIN_LOG);
lev->thd = thd; lev->thd = thd;
/* /*
lev->exec_event should use rli only for errors lev->exec_event should use rli only for errors
i.e. should not advance rli's position i.e. should not advance rli's position.
lev->exec_event is the place where the table is loaded (it calls
mysql_load()).
*/ */
if (lev->exec_event(0,rli,1)) if (lev->exec_event(0,rli,1))
{ {
...@@ -2350,15 +2353,11 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2350,15 +2353,11 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
tmp, fname); tmp, fname);
my_free(tmp,MYF(0)); my_free(tmp,MYF(0));
} }
thd->options= save_options;
goto err; goto err;
} }
thd->options = save_options;
(void) my_delete(fname, MYF(MY_WME)); (void) my_delete(fname, MYF(MY_WME));
memcpy(p, ".data", 6); memcpy(p, ".data", 6);
(void) my_delete(fname, MYF(MY_WME)); (void) my_delete(fname, MYF(MY_WME));
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
error = 0; error = 0;
err: err:
......
...@@ -687,9 +687,20 @@ public: ...@@ -687,9 +687,20 @@ public:
char* block; char* block;
uint block_len; uint block_len;
uint file_id; uint file_id;
/*
'db' is filled when the event is created in mysql_load() (the event needs to
have a 'db' member to be well filtered by binlog-*-db rules). 'db' is not
written to the binlog (it's not used by Append_block_log_event::write()), so
it can't be read in the Append_block_log_event(const char* buf, int
event_len) constructor.
In other words, 'db' is used only for filtering by binlog-*-db rules.
Create_file_log_event is different: its 'db' (which is inherited from
Load_log_event) is written to the binlog and can be re-read.
*/
const char* db;
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Append_block_log_event(THD* thd, char* block_arg, Append_block_log_event(THD* thd, const char* db_arg, char* block_arg,
uint block_len_arg, bool using_trans); uint block_len_arg, bool using_trans);
int exec_event(struct st_relay_log_info* rli); int exec_event(struct st_relay_log_info* rli);
void pack_info(String* packet); void pack_info(String* packet);
...@@ -703,6 +714,7 @@ public: ...@@ -703,6 +714,7 @@ public:
int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;} int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;}
bool is_valid() { return block != 0; } bool is_valid() { return block != 0; }
int write_data(IO_CACHE* file); int write_data(IO_CACHE* file);
const char* get_db() { return db; }
}; };
...@@ -710,9 +722,10 @@ class Delete_file_log_event: public Log_event ...@@ -710,9 +722,10 @@ class Delete_file_log_event: public Log_event
{ {
public: public:
uint file_id; uint file_id;
const char* db; /* see comment in Append_block_log_event */
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Delete_file_log_event(THD* thd, bool using_trans); Delete_file_log_event(THD* thd, const char* db_arg, bool using_trans);
void pack_info(String* packet); void pack_info(String* packet);
int exec_event(struct st_relay_log_info* rli); int exec_event(struct st_relay_log_info* rli);
#else #else
...@@ -725,15 +738,17 @@ public: ...@@ -725,15 +738,17 @@ public:
int get_data_size() { return DELETE_FILE_HEADER_LEN ;} int get_data_size() { return DELETE_FILE_HEADER_LEN ;}
bool is_valid() { return file_id != 0; } bool is_valid() { return file_id != 0; }
int write_data(IO_CACHE* file); int write_data(IO_CACHE* file);
const char* get_db() { return db; }
}; };
class Execute_load_log_event: public Log_event class Execute_load_log_event: public Log_event
{ {
public: public:
uint file_id; uint file_id;
const char* db; /* see comment in Append_block_log_event */
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Execute_load_log_event(THD* thd, bool using_trans); Execute_load_log_event(THD* thd, const char* db_arg, bool using_trans);
void pack_info(String* packet); void pack_info(String* packet);
int exec_event(struct st_relay_log_info* rli); int exec_event(struct st_relay_log_info* rli);
#else #else
...@@ -746,6 +761,7 @@ public: ...@@ -746,6 +761,7 @@ public:
int get_data_size() { return EXEC_LOAD_HEADER_LEN ;} int get_data_size() { return EXEC_LOAD_HEADER_LEN ;}
bool is_valid() { return file_id != 0; } bool is_valid() { return file_id != 0; }
int write_data(IO_CACHE* file); int write_data(IO_CACHE* file);
const char* get_db() { return db; }
}; };
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
......
...@@ -57,7 +57,6 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; ...@@ -57,7 +57,6 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
void skip_load_data_infile(NET* net); void skip_load_data_infile(NET* net);
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev); static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli); static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi); static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli); static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
...@@ -2730,102 +2729,6 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ ...@@ -2730,102 +2729,6 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
} }
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
int error = 1;
ulong num_bytes;
bool cev_not_written;
THD* thd;
NET* net = &mi->mysql->net;
DBUG_ENTER("process_io_create_file");
if (unlikely(!cev->is_valid()))
DBUG_RETURN(1);
/*
TODO: fix to honor table rules, not only db rules
*/
if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
{
skip_load_data_infile(net);
DBUG_RETURN(0);
}
DBUG_ASSERT(cev->inited_from_old);
thd = mi->io_thd;
thd->file_id = cev->file_id = mi->file_id++;
thd->server_id = cev->server_id;
cev_not_written = 1;
if (unlikely(net_request_file(net,cev->fname)))
{
sql_print_error("Slave I/O: failed requesting download of '%s'",
cev->fname);
goto err;
}
/*
This dummy block is so we could instantiate Append_block_log_event
once and then modify it slightly instead of doing it multiple times
in the loop
*/
{
Append_block_log_event aev(thd,0,0,0);
for (;;)
{
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
{
sql_print_error("Network read error downloading '%s' from master",
cev->fname);
goto err;
}
if (unlikely(!num_bytes)) /* eof */
{
send_ok(net); /* 3.23 master wants it */
Execute_load_log_event xev(thd,0);
xev.log_pos = mi->master_log_pos;
if (unlikely(mi->rli.relay_log.append(&xev)))
{
sql_print_error("Slave I/O: error writing Exec_load event to \
relay log");
goto err;
}
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
break;
}
if (unlikely(cev_not_written))
{
cev->block = (char*)net->read_pos;
cev->block_len = num_bytes;
cev->log_pos = mi->master_log_pos;
if (unlikely(mi->rli.relay_log.append(cev)))
{
sql_print_error("Slave I/O: error writing Create_file event to \
relay log");
goto err;
}
cev_not_written=0;
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
}
else
{
aev.block = (char*)net->read_pos;
aev.block_len = num_bytes;
aev.log_pos = mi->master_log_pos;
if (unlikely(mi->rli.relay_log.append(&aev)))
{
sql_print_error("Slave I/O: error writing Append_block event to \
relay log");
goto err;
}
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
}
}
}
error=0;
err:
DBUG_RETURN(error);
}
/* /*
Start using a new binary log on the master Start using a new binary log on the master
...@@ -2929,18 +2832,6 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, ...@@ -2929,18 +2832,6 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
mi->ignore_stop_event=1; mi->ignore_stop_event=1;
inc_pos= 0; inc_pos= 0;
break; break;
case CREATE_FILE_EVENT:
{
/* We come here when and only when tmp_buf != 0 */
DBUG_ASSERT(tmp_buf);
int error = process_io_create_file(mi,(Create_file_log_event*)ev);
delete ev;
mi->master_log_pos += event_len;
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
pthread_mutex_unlock(&mi->data_lock);
my_free((char*)tmp_buf, MYF(0));
DBUG_RETURN(error);
}
default: default:
mi->ignore_stop_event=0; mi->ignore_stop_event=0;
inc_pos= event_len; inc_pos= event_len;
......
...@@ -299,7 +299,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -299,7 +299,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
which is nonsense. which is nonsense.
*/ */
read_info.end_io_cache(); read_info.end_io_cache();
Delete_file_log_event d(thd, log_delayed); Delete_file_log_event d(thd, db, log_delayed);
mysql_bin_log.write(&d); mysql_bin_log.write(&d);
} }
} }
...@@ -331,7 +331,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -331,7 +331,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
read_info.end_io_cache(); // make sure last block gets logged read_info.end_io_cache(); // make sure last block gets logged
if (lf_info.wrote_create_file) if (lf_info.wrote_create_file)
{ {
Execute_load_log_event e(thd, log_delayed); Execute_load_log_event e(thd, db, log_delayed);
mysql_bin_log.write(&e); mysql_bin_log.write(&e);
} }
} }
......
...@@ -1216,7 +1216,7 @@ int log_loaded_block(IO_CACHE* file) ...@@ -1216,7 +1216,7 @@ int log_loaded_block(IO_CACHE* file)
lf_info->last_pos_in_file = file->pos_in_file; lf_info->last_pos_in_file = file->pos_in_file;
if (lf_info->wrote_create_file) if (lf_info->wrote_create_file)
{ {
Append_block_log_event a(lf_info->thd, buffer, block_len, Append_block_log_event a(lf_info->thd, lf_info->db, buffer, block_len,
lf_info->log_delayed); lf_info->log_delayed);
mysql_bin_log.write(&a); mysql_bin_log.write(&a);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment