fixes for slave backward compat

fixed bug in LOAD DATA FROM MASTER
fixed rpl000001.result
Slave now replicates 3.23 master, with the exception of LOAD DATA INFILE, 
which is still buggy. Will push this one after the pull/merge
parent b9637dae
...@@ -7,7 +7,7 @@ use test; ...@@ -7,7 +7,7 @@ use test;
drop table if exists t1,t3; drop table if exists t1,t3;
create table t1 (word char(20) not null); create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1; load data infile '../../std_data/words.dat' into table t1;
load data local infile '/home/sasha/bk/mysql-4.0/mysql-test/std_data/words.dat' into table t1; load data local infile '$MYSQL_TEST_DIR/std_data/words.dat' into table t1;
select * from t1; select * from t1;
word word
Aarhus Aarhus
......
...@@ -556,6 +556,8 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, ...@@ -556,6 +556,8 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Query_log_event(buf, event_len, old_format); ev = new Query_log_event(buf, event_len, old_format);
break; break;
case LOAD_EVENT: case LOAD_EVENT:
ev = new Create_file_log_event(buf, event_len, old_format);
break;
case NEW_LOAD_EVENT: case NEW_LOAD_EVENT:
ev = new Load_log_event(buf, event_len, old_format); ev = new Load_log_event(buf, event_len, old_format);
break; break;
...@@ -566,7 +568,7 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, ...@@ -566,7 +568,7 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Slave_log_event(buf, event_len); ev = new Slave_log_event(buf, event_len);
break; break;
case CREATE_FILE_EVENT: case CREATE_FILE_EVENT:
ev = new Create_file_log_event(buf, event_len); ev = new Create_file_log_event(buf, event_len, old_format);
break; break;
case APPEND_BLOCK_EVENT: case APPEND_BLOCK_EVENT:
ev = new Append_block_log_event(buf, event_len); ev = new Append_block_log_event(buf, event_len);
...@@ -959,6 +961,12 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) ...@@ -959,6 +961,12 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
if (use_new_format) if (use_new_format)
{ {
empty_flags=0; empty_flags=0;
/* the code below assumes that buf will not disappear from
under our feet during the lifetime of the event. This assumption
holds true in the slave thread if the log is in new format, but is not
the case when we have old format because we will be reusing net buffer
to read the actual file before we write out the Create_file event
*/
if (read_str(buf, buf_end, field_term, field_term_len) || if (read_str(buf, buf_end, field_term, field_term_len) ||
read_str(buf, buf_end, enclosed, enclosed_len) || read_str(buf, buf_end, enclosed, enclosed_len) ||
read_str(buf, buf_end, line_term, line_term_len) || read_str(buf, buf_end, line_term, line_term_len) ||
...@@ -970,11 +978,11 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) ...@@ -970,11 +978,11 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
else else
{ {
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1; field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
*field_term=*buf++; field_term = buf++;
*enclosed= *buf++; enclosed= buf++;
*line_term= *buf++; line_term= buf++;
*line_start=*buf++; line_start= buf++;
*escaped= *buf++; escaped= buf++;
opt_flags = *buf++; opt_flags = *buf++;
empty_flags=*buf++; empty_flags=*buf++;
if (empty_flags & FIELD_TERM_EMPTY) if (empty_flags & FIELD_TERM_EMPTY)
...@@ -1095,7 +1103,9 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, ...@@ -1095,7 +1103,9 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
db_len = (uint)data_head[L_DB_LEN_OFFSET]; db_len = (uint)data_head[L_DB_LEN_OFFSET];
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET); num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
int body_offset = get_data_body_offset(); int body_offset = (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
LOAD_HEADER_LEN + OLD_HEADER_LEN : get_data_body_offset();
if ((int) event_len < body_offset) if ((int) event_len < body_offset)
return 1; return 1;
//sql_ex.init() on success returns the pointer to the first byte after //sql_ex.init() on success returns the pointer to the first byte after
...@@ -1117,7 +1127,6 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, ...@@ -1117,7 +1127,6 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
table_name = fields + field_block_len; table_name = fields + field_block_len;
db = table_name + table_name_len + 1; db = table_name + table_name_len + 1;
fname = db + db_len + 1; fname = db + db_len + 1;
int type_code = get_type_code();
fname_len = strlen(fname); fname_len = strlen(fname);
// null termination is accomplished by the caller doing buf[event_len]=0 // null termination is accomplished by the caller doing buf[event_len]=0
return 0; return 0;
...@@ -1367,20 +1376,29 @@ int Create_file_log_event::write_base(IO_CACHE* file) ...@@ -1367,20 +1376,29 @@ int Create_file_log_event::write_base(IO_CACHE* file)
return res; return res;
} }
Create_file_log_event::Create_file_log_event(const char* buf, int len): Create_file_log_event::Create_file_log_event(const char* buf, int len,
Load_log_event(buf,0,0),fake_base(0),block(0) bool old_format):
Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0)
{ {
int block_offset; int block_offset;
if (copy_log_event(buf,len,0)) if (copy_log_event(buf,len,old_format))
return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
+ LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname
if (len < block_offset)
return; return;
block = (char*)buf + block_offset; if (!old_format)
block_len = len - block_offset; {
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
+ LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname
if (len < block_offset)
return;
block = (char*)buf + block_offset;
block_len = len - block_offset;
}
else
{
sql_ex.force_new_format();
inited_from_old = 1;
}
} }
...@@ -1568,6 +1586,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -1568,6 +1586,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
int expected_error,actual_error = 0; int expected_error,actual_error = 0;
init_sql_alloc(&thd->mem_root, 8192,0); init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = rewrite_db((char*)db); thd->db = rewrite_db((char*)db);
DBUG_ASSERT(q_len == strlen(query));
if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{ {
thd->query = (char*)query; thd->query = (char*)query;
...@@ -1739,11 +1758,12 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) ...@@ -1739,11 +1758,12 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
int Start_log_event::exec_event(struct st_relay_log_info* rli) int Start_log_event::exec_event(struct st_relay_log_info* rli)
{ {
close_temporary_tables(thd);
// if we have old format, load_tmpdir is cleaned up by the I/O thread
// TODO: cleanup_load_tmpdir() needs to remove only the files associated
// with the server id that has just started
if (!rli->mi->old_format) if (!rli->mi->old_format)
{
close_temporary_tables(thd);
cleanup_load_tmpdir(); cleanup_load_tmpdir();
}
return Log_event::exec_event(rli); return Log_event::exec_event(rli);
} }
......
...@@ -64,6 +64,8 @@ struct old_sql_ex ...@@ -64,6 +64,8 @@ struct old_sql_ex
char empty_flags; char empty_flags;
}; };
#define NUM_LOAD_DELIM_STRS 5
struct sql_ex_info struct sql_ex_info
{ {
...@@ -153,8 +155,8 @@ struct sql_ex_info ...@@ -153,8 +155,8 @@ struct sql_ex_info
#define L_THREAD_ID_OFFSET 0 #define L_THREAD_ID_OFFSET 0
#define L_EXEC_TIME_OFFSET 4 #define L_EXEC_TIME_OFFSET 4
#define L_SKIP_LINES_OFFSET 8 #define L_SKIP_LINES_OFFSET 8
#define L_DB_LEN_OFFSET 12 #define L_TBL_LEN_OFFSET 12
#define L_TBL_LEN_OFFSET 13 #define L_DB_LEN_OFFSET 13
#define L_NUM_FIELDS_OFFSET 14 #define L_NUM_FIELDS_OFFSET 14
#define L_SQL_EX_OFFSET 18 #define L_SQL_EX_OFFSET 18
#define L_DATA_OFFSET LOAD_HEADER_LEN #define L_DATA_OFFSET LOAD_HEADER_LEN
...@@ -570,6 +572,7 @@ public: ...@@ -570,6 +572,7 @@ public:
char* block; char* block;
uint block_len; uint block_len;
uint file_id; uint file_id;
bool inited_from_old;
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Create_file_log_event(THD* thd, sql_exchange* ex, const char* db_arg, Create_file_log_event(THD* thd, sql_exchange* ex, const char* db_arg,
const char* table_name_arg, const char* table_name_arg,
...@@ -578,7 +581,7 @@ public: ...@@ -578,7 +581,7 @@ public:
char* block_arg, uint block_len_arg); char* block_arg, uint block_len_arg);
#endif #endif
Create_file_log_event(const char* buf, int event_len); Create_file_log_event(const char* buf, int event_len, bool old_format);
~Create_file_log_event() ~Create_file_log_event()
{ {
} }
...@@ -591,7 +594,7 @@ public: ...@@ -591,7 +594,7 @@ public:
4 + 1 + block_len;} 4 + 1 + block_len;}
int get_data_body_offset() { return fake_base ? LOAD_EVENT_OVERHEAD: int get_data_body_offset() { return fake_base ? LOAD_EVENT_OVERHEAD:
LOAD_EVENT_OVERHEAD + CREATE_FILE_HEADER_LEN; } LOAD_EVENT_OVERHEAD + CREATE_FILE_HEADER_LEN; }
bool is_valid() { return block != 0; } bool is_valid() { return inited_from_old || block != 0; }
int write_data_header(IO_CACHE* file); int write_data_header(IO_CACHE* file);
int write_data_body(IO_CACHE* file); int write_data_body(IO_CACHE* file);
int write_base(IO_CACHE* file); // cut out Create_file extentions and int write_base(IO_CACHE* file); // cut out Create_file extentions and
......
...@@ -48,6 +48,7 @@ char *sql_strmake(const char *str,uint len); ...@@ -48,6 +48,7 @@ char *sql_strmake(const char *str,uint len);
gptr sql_memdup(const void * ptr,unsigned size); gptr sql_memdup(const void * ptr,unsigned size);
void sql_element_free(void *ptr); void sql_element_free(void *ptr);
void kill_one_thread(THD *thd, ulong id); void kill_one_thread(THD *thd, ulong id);
int net_request_file(NET* net, const char* fname);
char* query_table_status(THD *thd,const char *db,const char *table_name); char* query_table_status(THD *thd,const char *db,const char *table_name);
#define x_free(A) { my_free((gptr) (A),MYF(MY_WME | MY_FAE | MY_ALLOW_ZERO_PTR)); } #define x_free(A) { my_free((gptr) (A),MYF(MY_WME | MY_FAE | MY_ALLOW_ZERO_PTR)); }
......
...@@ -814,3 +814,14 @@ my_net_read(NET *net) ...@@ -814,3 +814,14 @@ my_net_read(NET *net)
#endif /* HAVE_COMPRESS */ #endif /* HAVE_COMPRESS */
return len; return len;
} }
int net_request_file(NET* net, const char* fname)
{
char tmp [FN_REFLEN+1],*end;
DBUG_ENTER("net_request_file");
tmp[0] = (char) 251; /* NULL_LENGTH */
end=strnmov(tmp+1,fname,sizeof(tmp)-2);
DBUG_RETURN(my_net_write(net,tmp,(uint) (end-tmp)) ||
net_flush(net));
}
...@@ -828,6 +828,7 @@ int load_master_data(THD* thd) ...@@ -828,6 +828,7 @@ int load_master_data(THD* thd)
active_mi->rli.master_log_pos = active_mi->master_log_pos; active_mi->rli.master_log_pos = active_mi->master_log_pos;
strnmov(active_mi->rli.master_log_name,active_mi->master_log_name, strnmov(active_mi->rli.master_log_name,active_mi->master_log_name,
sizeof(active_mi->rli.master_log_name)); sizeof(active_mi->rli.master_log_name));
flush_relay_log_info(&active_mi->rli);
pthread_cond_broadcast(&active_mi->rli.data_cond); pthread_cond_broadcast(&active_mi->rli.data_cond);
pthread_mutex_unlock(&active_mi->rli.data_lock); pthread_mutex_unlock(&active_mi->rli.data_lock);
thd->proc_info = "starting slave"; thd->proc_info = "starting slave";
......
...@@ -55,6 +55,7 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; ...@@ -55,6 +55,7 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
void skip_load_data_infile(NET* net); void skip_load_data_infile(NET* net);
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev); static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
static int queue_old_event(MASTER_INFO* mi, const char* buf, static int queue_old_event(MASTER_INFO* mi, const char* buf,
uint event_len); uint event_len);
static inline bool slave_killed(THD* thd,MASTER_INFO* mi); static inline bool slave_killed(THD* thd,MASTER_INFO* mi);
...@@ -654,7 +655,7 @@ char* rewrite_db(char* db) ...@@ -654,7 +655,7 @@ char* rewrite_db(char* db)
int db_ok(const char* db, I_List<i_string> &do_list, int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list ) I_List<i_string> &ignore_list )
{ {
if(do_list.is_empty() && ignore_list.is_empty()) if (do_list.is_empty() && ignore_list.is_empty())
return 1; // ok to replicate if the user puts no constraints return 1; // ok to replicate if the user puts no constraints
// if the user has specified restrictions on which databases to replicate // if the user has specified restrictions on which databases to replicate
...@@ -1058,6 +1059,8 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, ...@@ -1058,6 +1059,8 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
if (init_relay_log_info(&mi->rli, slave_info_fname)) if (init_relay_log_info(&mi->rli, slave_info_fname))
return 1; return 1;
mi->rli.mi = mi; mi->rli.mi = mi;
mi->mysql=0;
mi->file_id=1;
mi->ignore_stop_event=0; mi->ignore_stop_event=0;
int fd,error; int fd,error;
MY_STAT stat_area; MY_STAT stat_area;
...@@ -1621,7 +1624,7 @@ slave_begin: ...@@ -1621,7 +1624,7 @@ slave_begin:
DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
mi->master_log_name, llstr(mi->master_log_pos,llbuff))); mi->master_log_name, llstr(mi->master_log_pos,llbuff)));
if (!(mysql = mc_mysql_init(NULL))) if (!(mi->mysql = mysql = mc_mysql_init(NULL)))
{ {
sql_print_error("Slave I/O thread: error in mc_mysql_init()"); sql_print_error("Slave I/O thread: error in mc_mysql_init()");
goto err; goto err;
...@@ -1780,8 +1783,11 @@ err: ...@@ -1780,8 +1783,11 @@ err:
sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s", sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
thd->query = thd->db = 0; // extra safety thd->query = thd->db = 0; // extra safety
if(mysql) if (mysql)
{
mc_mysql_close(mysql); mc_mysql_close(mysql);
mi->mysql=0;
}
thd->proc_info = "Waiting for slave mutex on exit"; thd->proc_info = "Waiting for slave mutex on exit";
pthread_mutex_lock(&mi->run_lock); pthread_mutex_lock(&mi->run_lock);
mi->slave_running = 0; mi->slave_running = 0;
...@@ -1790,7 +1796,7 @@ err: ...@@ -1790,7 +1796,7 @@ err:
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE); change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
mi->abort_slave = 0; // TODO: check if this is needed mi->abort_slave = 0; // TODO: check if this is needed
DBUG_ASSERT(thd->net.buff != 0); DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because we are weird net_end(&thd->net); // destructor will not free it, because net.vio is 0
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
delete thd; delete thd;
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
...@@ -1926,11 +1932,97 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ ...@@ -1926,11 +1932,97 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
DBUG_RETURN(0); // Can't return anything here DBUG_RETURN(0); // Can't return anything here
} }
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
int error = 1;
ulong num_bytes;
bool cev_not_written;
THD* thd;
NET* net = &mi->mysql->net;
if (unlikely(!cev->is_valid()))
return 1;
/*
TODO: fix to honor table rules, not only db rules
*/
if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
{
skip_load_data_infile(net);
return 0;
}
DBUG_ASSERT(cev->inited_from_old);
thd = mi->io_thd;
thd->file_id = cev->file_id = mi->file_id++;
cev_not_written = 1;
if (unlikely(net_request_file(net,cev->fname)))
{
sql_print_error("Slave I/O: failed requesting download of '%s'",
cev->fname);
goto err;
}
/* this dummy block is so we could insantiate Append_block_log_event
once and then modify it slightly instead of doing it multiple times
in the loop
*/
{
Append_block_log_event aev(thd,0,0);
for (;;)
{
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
{
sql_print_error("Network read error downloading '%s' from master",
cev->fname);
goto err;
}
if (unlikely(!num_bytes)) /* eof */
{
send_ok(net); /* 3.23 master wants it */
Execute_load_log_event xev(mi->io_thd);
if (unlikely(mi->rli.relay_log.append(&xev)))
{
sql_print_error("Slave I/O: error writing Exec_load event to \
relay log");
goto err;
}
break;
}
if (unlikely(cev_not_written))
{
cev->block = (char*)net->read_pos;
cev->block_len = num_bytes;
if (unlikely(mi->rli.relay_log.append(cev)))
{
sql_print_error("Slave I/O: error writing Create_file event to \
relay log");
goto err;
}
cev_not_written=0;
}
else
{
aev.block = (char*)net->read_pos;
aev.block_len = num_bytes;
if (unlikely(mi->rli.relay_log.append(&aev)))
{
sql_print_error("Slave I/O: error writing Append_block event to \
relay log");
goto err;
}
}
}
}
error=0;
err:
return error;
}
// We assume we already locked mi->data_lock // We assume we already locked mi->data_lock
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev) static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev)
{ {
if (!rev->is_valid()) if (unlikely(!rev->is_valid()))
return 1; return 1;
DBUG_ASSERT(rev->ident_len<sizeof(mi->master_log_name)); DBUG_ASSERT(rev->ident_len<sizeof(mi->master_log_name));
memcpy(mi->master_log_name,rev->new_log_ident, memcpy(mi->master_log_name,rev->new_log_ident,
...@@ -1961,6 +2053,21 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, ...@@ -1961,6 +2053,21 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
const char *errmsg = 0; const char *errmsg = 0;
bool inc_pos = 1; bool inc_pos = 1;
bool processed_stop_event = 0; bool processed_stop_event = 0;
char* tmp_buf = 0;
/* if we get Load event, we need to pass a non-reusable buffer
to read_log_event, so we do a trick
*/
if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
{
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
{
sql_print_error("Slave I/O: out of memory for Load event");
return 1;
}
memcpy(tmp_buf,buf,event_len);
tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer
buf = (const char*)tmp_buf;
}
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg, Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
1 /*old format*/ ); 1 /*old format*/ );
if (unlikely(!ev)) if (unlikely(!ev))
...@@ -1968,6 +2075,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, ...@@ -1968,6 +2075,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
sql_print_error("Read invalid event from master: '%s',\ sql_print_error("Read invalid event from master: '%s',\
master could be corrupt but a more likely cause of this is a bug", master could be corrupt but a more likely cause of this is a bug",
errmsg); errmsg);
my_free((char*)tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
return 1; return 1;
} }
pthread_mutex_lock(&mi->data_lock); pthread_mutex_lock(&mi->data_lock);
...@@ -1978,6 +2086,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, ...@@ -1978,6 +2086,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
{ {
delete ev; delete ev;
pthread_mutex_unlock(&mi->data_lock); pthread_mutex_unlock(&mi->data_lock);
DBUG_ASSERT(!tmp_buf);
return 1; return 1;
} }
mi->ignore_stop_event=1; mi->ignore_stop_event=1;
...@@ -1986,12 +2095,16 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, ...@@ -1986,12 +2095,16 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
case STOP_EVENT: case STOP_EVENT:
processed_stop_event=1; processed_stop_event=1;
break; break;
case LOAD_EVENT: case CREATE_FILE_EVENT:
// TODO: actually process it {
mi->master_log_pos += event_len; int error = process_io_create_file(mi,(Create_file_log_event*)ev);
delete ev; delete ev;
mi->master_log_pos += event_len;
pthread_mutex_unlock(&mi->data_lock); pthread_mutex_unlock(&mi->data_lock);
return 0; DBUG_ASSERT(tmp_buf);
my_free((char*)tmp_buf, MYF(0));
return error;
}
default: default:
mi->ignore_stop_event=0; mi->ignore_stop_event=0;
break; break;
...@@ -2002,6 +2115,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, ...@@ -2002,6 +2115,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
{ {
delete ev; delete ev;
pthread_mutex_unlock(&mi->data_lock); pthread_mutex_unlock(&mi->data_lock);
DBUG_ASSERT(!tmp_buf);
return 1; return 1;
} }
} }
...@@ -2011,6 +2125,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, ...@@ -2011,6 +2125,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
if (unlikely(processed_stop_event)) if (unlikely(processed_stop_event))
mi->ignore_stop_event=1; mi->ignore_stop_event=1;
pthread_mutex_unlock(&mi->data_lock); pthread_mutex_unlock(&mi->data_lock);
DBUG_ASSERT(!tmp_buf);
return 0; return 0;
} }
...@@ -2173,7 +2288,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) ...@@ -2173,7 +2288,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
int flush_relay_log_info(RELAY_LOG_INFO* rli) int flush_relay_log_info(RELAY_LOG_INFO* rli)
{ {
IO_CACHE* file = &rli->info_file; register IO_CACHE* file = &rli->info_file;
char lbuf[22],lbuf1[22]; char lbuf[22],lbuf1[22];
my_b_seek(file, 0L); my_b_seek(file, 0L);
...@@ -2251,7 +2366,10 @@ Log_event* next_event(RELAY_LOG_INFO* rli) ...@@ -2251,7 +2366,10 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
} }
DBUG_ASSERT(my_b_tell(cur_log) >= 4); DBUG_ASSERT(my_b_tell(cur_log) >= 4);
DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending); DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
if ((ev=Log_event::read_log_event(cur_log,0,rli->mi->old_format))) /* relay log is always in new format - if the master is 3.23, the
I/O thread will convert the format for us
*/
if ((ev=Log_event::read_log_event(cur_log,0,(bool)0/*new format*/)))
{ {
DBUG_ASSERT(thd==rli->sql_thd); DBUG_ASSERT(thd==rli->sql_thd);
if (hot_log) if (hot_log)
......
...@@ -254,6 +254,8 @@ typedef struct st_master_info ...@@ -254,6 +254,8 @@ typedef struct st_master_info
pthread_mutex_t data_lock,run_lock; pthread_mutex_t data_lock,run_lock;
pthread_cond_t data_cond,start_cond,stop_cond; pthread_cond_t data_cond,start_cond,stop_cond;
THD *io_thd; THD *io_thd;
MYSQL* mysql;
uint32 file_id; // for 3.23 load data infile
RELAY_LOG_INFO rli; RELAY_LOG_INFO rli;
uint port; uint port;
uint connect_retry; uint connect_retry;
......
...@@ -147,12 +147,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -147,12 +147,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (read_file_from_client && (thd->client_capabilities & CLIENT_LOCAL_FILES)) if (read_file_from_client && (thd->client_capabilities & CLIENT_LOCAL_FILES))
{ {
char tmp [FN_REFLEN+1],*end; (void)net_request_file(&thd->net,ex->file_name);
DBUG_PRINT("info",("reading local file"));
tmp[0] = (char) 251; /* NULL_LENGTH */
end=strnmov(tmp+1,ex->file_name,sizeof(tmp)-2);
(void) my_net_write(&thd->net,tmp,(uint) (end-tmp));
(void) net_flush(&thd->net);
file = -1; file = -1;
} }
else else
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment