added SHOW BINLOG EVENTS

fixed log sequence bugs
fixed bugs in handling Slave event
added test case with SHOW BINLOG EVENTS
have not fixed all the bugs - found some that are also in 3.23,
will fix them there first, then do pull and cleanup

will not push this changeset
parent 172913f5
...@@ -210,4 +210,5 @@ ...@@ -210,4 +210,5 @@
#define ER_READ_ONLY_TRANSACTION 1207 #define ER_READ_ONLY_TRANSACTION 1207
#define ER_CONNECT_TO_MASTER 1208 #define ER_CONNECT_TO_MASTER 1208
#define ER_QUERY_ON_MASTER 1209 #define ER_QUERY_ON_MASTER 1209
#define ER_ERROR_MESSAGES 210 #define ER_SHOW_BINLOG_EVENTS 1210
#define ER_ERROR_MESSAGES 211
...@@ -659,7 +659,7 @@ run_testcase () ...@@ -659,7 +659,7 @@ run_testcase ()
slave_master_info_file=$TESTDIR/$tname-slave-master-info.opt slave_master_info_file=$TESTDIR/$tname-slave-master-info.opt
SKIP_SLAVE=`$EXPR \( $tname : rpl \) = 0` SKIP_SLAVE=`$EXPR \( $tname : rpl \) = 0`
if [ -n $SKIP_TEST ] ; then if [ -n $SKIP_TEST ] ; then
SKIP_THIS_TEST=`$EXPR \( $tname : $SKIP_TEST \) != 0` SKIP_THIS_TEST=`$EXPR \( $tname : '$SKIP_TEST' \) != 0`
if [ x$SKIP_THIS_TEST = x1 ] ; if [ x$SKIP_THIS_TEST = x1 ] ;
then then
return; return;
......
Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 4 Start 1 1 Server ver: 4.0.0-debug-log, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
master-bin.001 263 Query 1 5 use test; drop table t1
master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null)
master-bin.001 386 Load 1 7 use test; LOAD DATA INFILE '../../std_data/words.dat' INTO TABLE t1 FIELDS TERMINATED BY '\t' ESCAPED BY '\\' LINES TERMINATED BY '\n' (word)
master-bin.001 468 Query 1 8 use test; drop table t1
Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1
Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 4 Start 1 1 Server ver: 4.0.0-debug-log, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
master-bin.001 263 Query 1 5 use test; drop table t1
master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null)
master-bin.001 386 Load 1 7 use test; LOAD DATA INFILE '../../std_data/words.dat' INTO TABLE t1 FIELDS TERMINATED BY '\t' ESCAPED BY '\\' LINES TERMINATED BY '\n' (word)
master-bin.001 468 Query 1 8 use test; drop table t1
master-bin.001 516 Rotate 1 9 master-bin.002
master-bin.001 549 Stop 1 10
Log_name Pos Event_type Server_id Log_seq Info
master-bin.002 4 Start 1 1 Server ver: 4.0.0-debug-log, Binlog ver: 2
Log_name
slave-bin.001
slave-bin.002
slave-bin.003
slave-bin.004
Log_name Pos Event_type Server_id Log_seq Info
slave-bin.001 4 Start 2 1 Server ver: 4.0.0-debug-log, Binlog ver: 2
slave-bin.001 79 Slave 2 2
slave-bin.001 118 Rotate 2 3 slave-bin.002
slave-bin.001 150 Stop 2 4
Log_name Pos Event_type Server_id Log_seq Info
slave-bin.002 4 Start 2 1 Server ver: 4.0.0-debug-log, Binlog ver: 2
slave-bin.002 79 Slave 1 2
slave-bin.002 132 Slave 1 3
slave-bin.002 185 Query 1 4 use test; create table t1(n int not null auto_increment primary key)
slave-bin.002 278 Intvar 2 5 INSERT_ID=1
slave-bin.002 306 Query 1 6 use test; insert into t1 values (NULL)
slave-bin.002 369 Query 1 7 use test; drop table t1
slave-bin.002 417 Query 1 8 use test; create table t1 (word char(20) not null)
slave-bin.002 492 Query 1 9 use test; drop table t1
slave-bin.002 540 Rotate 2 10 slave-bin.003
slave-bin.002 572 Stop 2 11
source include/master-slave.inc;
#clean up slave binlogs
connection slave;
reset master;
connection master;
drop table if exists t1;
create table t1(n int not null auto_increment primary key);
insert into t1 values (NULL);
drop table t1;
create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1;
drop table t1;
show binlog events;
show binlog events from 79 limit 1;
show binlog events from 79 limit 2;
show binlog events from 79 limit 2,1;
flush logs;
show binlog events;
show binlog events in 'master-bin.002';
save_master_pos;
connection slave;
sync_with_master;
show master logs;
show binlog events in 'slave-bin.001' from 4;
show binlog events in 'slave-bin.002' from 4;
...@@ -70,6 +70,7 @@ static SYMBOL symbols[] = { ...@@ -70,6 +70,7 @@ static SYMBOL symbols[] = {
{ "BIGINT", SYM(BIGINT),0,0}, { "BIGINT", SYM(BIGINT),0,0},
{ "BIT", SYM(BIT_SYM),0,0}, { "BIT", SYM(BIT_SYM),0,0},
{ "BINARY", SYM(BINARY),0,0}, { "BINARY", SYM(BINARY),0,0},
{ "BINLOG", SYM(BINLOG_SYM),0,0},
{ "BLOB", SYM(BLOB_SYM),0,0}, { "BLOB", SYM(BLOB_SYM),0,0},
{ "BOOL", SYM(BOOL_SYM),0,0}, { "BOOL", SYM(BOOL_SYM),0,0},
{ "BOTH", SYM(BOTH),0,0}, { "BOTH", SYM(BOTH),0,0},
...@@ -128,6 +129,7 @@ static SYMBOL symbols[] = { ...@@ -128,6 +129,7 @@ static SYMBOL symbols[] = {
{ "ENABLE", SYM(ENABLE_SYM),0,0}, { "ENABLE", SYM(ENABLE_SYM),0,0},
{ "ENCLOSED", SYM(ENCLOSED),0,0}, { "ENCLOSED", SYM(ENCLOSED),0,0},
{ "ENUM", SYM(ENUM),0,0}, { "ENUM", SYM(ENUM),0,0},
{ "EVENTS", SYM(EVENTS_SYM),0,0},
{ "EXPLAIN", SYM(DESCRIBE),0,0}, { "EXPLAIN", SYM(DESCRIBE),0,0},
{ "EXISTS", SYM(EXISTS),0,0}, { "EXISTS", SYM(EXISTS),0,0},
{ "EXTENDED", SYM(EXTENDED_SYM),0,0}, { "EXTENDED", SYM(EXTENDED_SYM),0,0},
......
...@@ -230,6 +230,8 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -230,6 +230,8 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
if ((do_magic && my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4)) || if ((do_magic && my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4)) ||
open_index(O_APPEND | O_RDWR | O_CREAT)) open_index(O_APPEND | O_RDWR | O_CREAT))
goto err; goto err;
log_seq = 1;
Start_log_event s; Start_log_event s;
bool error; bool error;
s.set_log_seq(0, this); s.set_log_seq(0, this);
...@@ -240,10 +242,12 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -240,10 +242,12 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
{ {
THD* thd = current_thd; THD* thd = current_thd;
Slave_log_event s(thd, &glob_mi); Slave_log_event s(thd, &glob_mi);
s.set_log_seq(thd, this);
if(s.master_host) if(s.master_host)
{
s.set_log_seq(thd, this);
s.write(&log_file); s.write(&log_file);
}
} }
flush_io_cache(&log_file); flush_io_cache(&log_file);
...@@ -553,7 +557,6 @@ void MYSQL_LOG::new_file() ...@@ -553,7 +557,6 @@ void MYSQL_LOG::new_file()
open(old_name, log_type, new_name); open(old_name, log_type, new_name);
my_free(old_name,MYF(0)); my_free(old_name,MYF(0));
last_time=query_start=0; last_time=query_start=0;
log_seq = 1;
write_error=0; write_error=0;
VOID(pthread_mutex_unlock(&LOCK_log)); VOID(pthread_mutex_unlock(&LOCK_log));
} }
...@@ -666,8 +669,12 @@ bool MYSQL_LOG::write(Query_log_event* event_info) ...@@ -666,8 +669,12 @@ bool MYSQL_LOG::write(Query_log_event* event_info)
if (is_open()) if (is_open())
{ {
THD *thd=event_info->thd; THD *thd=event_info->thd;
#ifdef USING_TRANSACTIONS
IO_CACHE *file = (event_info->cache_stmt ? &thd->transaction.trans_log : IO_CACHE *file = (event_info->cache_stmt ? &thd->transaction.trans_log :
&log_file); &log_file);
#else
IO_CACHE *file = &log_file;
#endif
if ((!(thd->options & OPTION_BIN_LOG) && if ((!(thd->options & OPTION_BIN_LOG) &&
(thd->master_access & PROCESS_ACL)) || (thd->master_access & PROCESS_ACL)) ||
!db_ok(event_info->db, binlog_do_db, binlog_ignore_db)) !db_ok(event_info->db, binlog_do_db, binlog_ignore_db))
...@@ -680,12 +687,14 @@ bool MYSQL_LOG::write(Query_log_event* event_info) ...@@ -680,12 +687,14 @@ bool MYSQL_LOG::write(Query_log_event* event_info)
if (thd->last_insert_id_used) if (thd->last_insert_id_used)
{ {
Intvar_log_event e((uchar)LAST_INSERT_ID_EVENT, thd->last_insert_id); Intvar_log_event e((uchar)LAST_INSERT_ID_EVENT, thd->last_insert_id);
e.set_log_seq(thd, this);
if (e.write(file)) if (e.write(file))
goto err; goto err;
} }
if (thd->insert_id_used) if (thd->insert_id_used)
{ {
Intvar_log_event e((uchar)INSERT_ID_EVENT, thd->last_insert_id); Intvar_log_event e((uchar)INSERT_ID_EVENT, thd->last_insert_id);
e.set_log_seq(thd, this);
if (e.write(file)) if (e.write(file))
goto err; goto err;
} }
...@@ -698,10 +707,12 @@ bool MYSQL_LOG::write(Query_log_event* event_info) ...@@ -698,10 +707,12 @@ bool MYSQL_LOG::write(Query_log_event* event_info)
// just in case somebody wants it later // just in case somebody wants it later
thd->query_length = (uint)(p - buf); thd->query_length = (uint)(p - buf);
Query_log_event e(thd, buf); Query_log_event e(thd, buf);
e.set_log_seq(thd, this);
if (e.write(file)) if (e.write(file))
goto err; goto err;
thd->query_length = save_query_length; // clean up thd->query_length = save_query_length; // clean up
} }
event_info->set_log_seq(thd, this);
if (event_info->write(file) || if (event_info->write(file) ||
file == &log_file && flush_io_cache(file)) file == &log_file && flush_io_cache(file))
goto err; goto err;
...@@ -796,6 +807,7 @@ bool MYSQL_LOG::write(Load_log_event* event_info) ...@@ -796,6 +807,7 @@ bool MYSQL_LOG::write(Load_log_event* event_info)
if ((thd->options & OPTION_BIN_LOG) || if ((thd->options & OPTION_BIN_LOG) ||
!(thd->master_access & PROCESS_ACL)) !(thd->master_access & PROCESS_ACL))
{ {
event_info->set_log_seq(thd, this);
if (event_info->write(&log_file) || flush_io_cache(&log_file)) if (event_info->write(&log_file) || flush_io_cache(&log_file))
{ {
if (!write_error) if (!write_error)
...@@ -947,6 +959,7 @@ void MYSQL_LOG::close(bool exiting) ...@@ -947,6 +959,7 @@ void MYSQL_LOG::close(bool exiting)
if (log_type == LOG_BIN) if (log_type == LOG_BIN)
{ {
Stop_log_event s; Stop_log_event s;
s.set_log_seq(0, this);
s.write(&log_file); s.write(&log_file);
VOID(pthread_cond_broadcast(&COND_binlog_update)); VOID(pthread_cond_broadcast(&COND_binlog_update));
} }
......
...@@ -32,6 +32,7 @@ static void pretty_print_char(FILE* file, int c) ...@@ -32,6 +32,7 @@ static void pretty_print_char(FILE* file, int c)
case '\r': fprintf(file, "\\r"); break; case '\r': fprintf(file, "\\r"); break;
case '\\': fprintf(file, "\\\\"); break; case '\\': fprintf(file, "\\\\"); break;
case '\b': fprintf(file, "\\b"); break; case '\b': fprintf(file, "\\b"); break;
case '\t': fprintf(file, "\\t"); break;
case '\'': fprintf(file, "\\'"); break; case '\'': fprintf(file, "\\'"); break;
case 0 : fprintf(file, "\\0"); break; case 0 : fprintf(file, "\\0"); break;
default: default:
...@@ -41,6 +42,203 @@ static void pretty_print_char(FILE* file, int c) ...@@ -41,6 +42,203 @@ static void pretty_print_char(FILE* file, int c)
fputc('\'', file); fputc('\'', file);
} }
#ifndef MYSQL_CLIENT
static void pretty_print_char(String* packet, int c)
{
packet->append('\'');
switch(c) {
case '\n': packet->append( "\\n"); break;
case '\r': packet->append( "\\r"); break;
case '\\': packet->append( "\\\\"); break;
case '\b': packet->append( "\\b"); break;
case '\t': packet->append( "\\t"); break;
case '\'': packet->append( "\\'"); break;
case 0 : packet->append( "\\0"); break;
default:
packet->append((char)c);
break;
}
packet->append('\'');
}
#endif
const char* Log_event::get_type_str()
{
switch(get_type_code())
{
case START_EVENT: return "Start";
case STOP_EVENT: return "Stop";
case QUERY_EVENT: return "Query";
case ROTATE_EVENT: return "Rotate";
case INTVAR_EVENT: return "Intvar";
case LOAD_EVENT: return "Load";
case SLAVE_EVENT: return "Slave";
default: /* impossible */ return "Unknown";
}
}
#ifndef MYSQL_CLIENT
void Log_event::pack_info(String* packet)
{
net_store_data(packet, "", 0);
}
void Query_log_event::pack_info(String* packet)
{
String tmp;
if(db && db_len)
{
tmp.append("use ");
tmp.append(db, db_len);
tmp.append("; ", 2);
}
if(query && q_len)
tmp.append(query, q_len);
net_store_data(packet, (char*)tmp.ptr(), tmp.length());
}
void Start_log_event::pack_info(String* packet)
{
String tmp;
char buf[22];
tmp.append("Server ver: ");
tmp.append(server_version);
tmp.append(", Binlog ver: ");
tmp.append(llstr(binlog_version, buf));
net_store_data(packet, tmp.ptr(), tmp.length());
}
void Load_log_event::pack_info(String* packet)
{
String tmp;
if(db && db_len)
{
tmp.append("use ");
tmp.append(db, db_len);
tmp.append("; ", 2);
}
tmp.append("LOAD DATA INFILE '");
tmp.append(fname);
tmp.append("' ", 2);
if(sql_ex.opt_flags && REPLACE_FLAG )
tmp.append(" REPLACE ");
else if(sql_ex.opt_flags && IGNORE_FLAG )
tmp.append(" IGNORE ");
tmp.append("INTO TABLE ");
tmp.append(table_name);
if (!(sql_ex.empty_flags & FIELD_TERM_EMPTY))
{
tmp.append(" FIELDS TERMINATED BY ");
pretty_print_char(&tmp, sql_ex.field_term);
}
if (!(sql_ex.empty_flags & ENCLOSED_EMPTY))
{
if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG )
tmp.append(" OPTIONALLY ");
tmp.append( " ENCLOSED BY ");
pretty_print_char(&tmp, sql_ex.enclosed);
}
if (!(sql_ex.empty_flags & ESCAPED_EMPTY))
{
tmp.append( " ESCAPED BY ");
pretty_print_char(&tmp, sql_ex.escaped);
}
if (!(sql_ex.empty_flags & LINE_TERM_EMPTY))
{
tmp.append(" LINES TERMINATED BY ");
pretty_print_char(&tmp, sql_ex.line_term);
}
if (!(sql_ex.empty_flags & LINE_START_EMPTY))
{
tmp.append(" LINES STARTING BY ");
pretty_print_char(&tmp, sql_ex.line_start);
}
if ((int)skip_lines > 0)
tmp.append( " IGNORE %ld LINES ", (long) skip_lines);
if (num_fields)
{
uint i;
const char* field = fields;
tmp.append(" (");
for(i = 0; i < num_fields; i++)
{
if(i)
tmp.append(" ,");
tmp.append( field);
field += field_lens[i] + 1;
}
tmp.append(')');
}
net_store_data(packet, tmp.ptr(), tmp.length());
}
void Rotate_log_event::pack_info(String* packet)
{
net_store_data(packet, new_log_ident, ident_len);
}
void Intvar_log_event::pack_info(String* packet)
{
String tmp;
char buf[22];
tmp.append(get_var_type_name());
tmp.append('=');
tmp.append(llstr(val, buf));
net_store_data(packet, tmp.ptr(), tmp.length());
}
void Slave_log_event::pack_info(String* packet)
{
net_store_data(packet, "", 0);
}
void Log_event::init_show_field_list(List<Item>* field_list)
{
field_list->push_back(new Item_empty_string("Log_name", 20));
field_list->push_back(new Item_empty_string("Pos", 20));
field_list->push_back(new Item_empty_string("Event_type", 20));
field_list->push_back(new Item_empty_string("Server_id", 20));
field_list->push_back(new Item_empty_string("Log_seq", 20));
field_list->push_back(new Item_empty_string("Info", 20));
}
int Log_event::net_send(THD* thd, const char* log_name, ulong pos)
{
String* packet = &thd->packet;
const char* p = strrchr(log_name, FN_LIBCHAR);
const char* event_type;
if (p)
log_name = p + 1;
packet->length(0);
net_store_data(packet, log_name, strlen(log_name));
net_store_data(packet, (longlong)pos);
event_type = get_type_str();
net_store_data(packet, event_type, strlen(event_type));
net_store_data(packet, server_id);
net_store_data(packet, log_seq);
pack_info(packet);
return my_net_write(&thd->net, (char*)packet->ptr(), packet->length());
}
#endif
int Query_log_event::write(IO_CACHE* file) int Query_log_event::write(IO_CACHE* file)
{ {
return query ? Log_event::write(file) : -1; return query ? Log_event::write(file) : -1;
...@@ -209,6 +407,17 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len) ...@@ -209,6 +407,17 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len)
return r; return r;
} }
case SLAVE_EVENT:
{
Slave_log_event* s = new Slave_log_event(buf, event_len);
if (!s->master_host)
{
delete s;
return NULL;
}
return s;
}
case START_EVENT: return new Start_log_event(buf); case START_EVENT: return new Start_log_event(buf);
case STOP_EVENT: return new Stop_log_event(buf); case STOP_EVENT: return new Stop_log_event(buf);
case INTVAR_EVENT: return new Intvar_log_event(buf); case INTVAR_EVENT: return new Intvar_log_event(buf);
...@@ -394,6 +603,16 @@ Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf) ...@@ -394,6 +603,16 @@ Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf)
val = uint8korr(buf+I_VAL_OFFSET); val = uint8korr(buf+I_VAL_OFFSET);
} }
const char* Intvar_log_event::get_var_type_name()
{
switch(type)
{
case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
case INSERT_ID_EVENT: return "INSERT_ID";
default: /* impossible */ return "UNKNOWN";
}
}
int Intvar_log_event::write_data(IO_CACHE* file) int Intvar_log_event::write_data(IO_CACHE* file)
{ {
char buf[9]; char buf[9];
...@@ -616,8 +835,7 @@ Slave_log_event::Slave_log_event(THD* thd_arg,MASTER_INFO* mi): ...@@ -616,8 +835,7 @@ Slave_log_event::Slave_log_event(THD* thd_arg,MASTER_INFO* mi):
if((mem_pool = (char*)my_malloc(get_data_size() + 1, if((mem_pool = (char*)my_malloc(get_data_size() + 1,
MYF(MY_WME)))) MYF(MY_WME))))
{ {
master_host = mem_pool + sizeof(uint32) + master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
sizeof(ulonglong) + sizeof(uint16);
memcpy(master_host, mi->host, master_host_len + 1); memcpy(master_host, mi->host, master_host_len + 1);
master_log = master_host + master_host_len + 1; master_log = master_host + master_host_len + 1;
memcpy(master_log, mi->log_file_name, master_log_len + 1); memcpy(master_log, mi->log_file_name, master_log_len + 1);
...@@ -653,19 +871,15 @@ void Slave_log_event::print(FILE* file, bool short_form = 0, ...@@ -653,19 +871,15 @@ void Slave_log_event::print(FILE* file, bool short_form = 0,
int Slave_log_event::get_data_size() int Slave_log_event::get_data_size()
{ {
return master_host_len + master_log_len + 1 + 4 /* data_size*/ + return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET;
8 /* master_pos */ +
2 /* master_port */;
} }
int Slave_log_event::write_data(IO_CACHE* file) int Slave_log_event::write_data(IO_CACHE* file)
{ {
int data_size = get_data_size();
int4store(mem_pool, data_size);
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos); int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port); int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
// log and host are already there // log and host are already there
return my_b_write(file, (byte*)mem_pool, data_size); return my_b_write(file, (byte*)mem_pool, get_data_size());
} }
void Slave_log_event::init_from_mem_pool(int data_size) void Slave_log_event::init_from_mem_pool(int data_size)
...@@ -694,8 +908,3 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len): ...@@ -694,8 +908,3 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len):
mem_pool[event_len] = 0; mem_pool[event_len] = 0;
init_from_mem_pool(event_len); init_from_mem_pool(event_len);
} }
...@@ -71,9 +71,9 @@ ...@@ -71,9 +71,9 @@
/* slave event post-header */ /* slave event post-header */
#define SL_MASTER_PORT_OFFSET 12 #define SL_MASTER_PORT_OFFSET 8
#define SL_MASTER_POS_OFFSET 4 #define SL_MASTER_POS_OFFSET 0
#define SL_MASTER_HOST_OFFSET 14 #define SL_MASTER_HOST_OFFSET 10
/* query event post-header */ /* query event post-header */
...@@ -177,11 +177,15 @@ class Log_event ...@@ -177,11 +177,15 @@ class Log_event
// if mutex is 0, the read will proceed without mutex // if mutex is 0, the read will proceed without mutex
static Log_event* read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock); static Log_event* read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock);
static Log_event* read_log_event(const char* buf, int event_len); static Log_event* read_log_event(const char* buf, int event_len);
const char* get_type_str();
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
static int read_log_event(IO_CACHE* file, String* packet, static int read_log_event(IO_CACHE* file, String* packet,
pthread_mutex_t* log_lock); pthread_mutex_t* log_lock);
void set_log_seq(THD* thd, MYSQL_LOG* log); void set_log_seq(THD* thd, MYSQL_LOG* log);
virtual void pack_info(String* packet);
int net_send(THD* thd, const char* log_name, ulong pos);
static void init_show_field_list(List<Item>* field_list);
#endif #endif
}; };
...@@ -217,6 +221,8 @@ class Query_log_event: public Log_event ...@@ -217,6 +221,8 @@ class Query_log_event: public Log_event
exec_time = (ulong) (end_time - thd->start_time); exec_time = (ulong) (end_time - thd->start_time);
db_len = (db) ? (uint32) strlen(db) : 0; db_len = (db) ? (uint32) strlen(db) : 0;
} }
void pack_info(String* packet);
#endif #endif
Query_log_event(const char* buf, int event_len); Query_log_event(const char* buf, int event_len);
...@@ -257,6 +263,7 @@ class Slave_log_event: public Log_event ...@@ -257,6 +263,7 @@ class Slave_log_event: public Log_event
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Slave_log_event(THD* thd_arg, struct st_master_info* mi); Slave_log_event(THD* thd_arg, struct st_master_info* mi);
void pack_info(String* packet);
#endif #endif
Slave_log_event(const char* buf, int event_len); Slave_log_event(const char* buf, int event_len);
...@@ -383,6 +390,7 @@ class Load_log_event: public Log_event ...@@ -383,6 +390,7 @@ class Load_log_event: public Log_event
fields = fields_buf.ptr(); fields = fields_buf.ptr();
} }
void set_fields(List<Item> &fields_arg); void set_fields(List<Item> &fields_arg);
void pack_info(String* packet);
#endif #endif
Load_log_event(const char* buf, int event_len); Load_log_event(const char* buf, int event_len);
...@@ -432,6 +440,9 @@ class Start_log_event: public Log_event ...@@ -432,6 +440,9 @@ class Start_log_event: public Log_event
{ {
return START_HEADER_LEN; return START_HEADER_LEN;
} }
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
#endif
void print(FILE* file, bool short_form = 0, char* last_db = 0); void print(FILE* file, bool short_form = 0, char* last_db = 0);
}; };
...@@ -446,8 +457,12 @@ class Intvar_log_event: public Log_event ...@@ -446,8 +457,12 @@ class Intvar_log_event: public Log_event
Intvar_log_event(const char* buf); Intvar_log_event(const char* buf);
~Intvar_log_event() {} ~Intvar_log_event() {}
Log_event_type get_type_code() { return INTVAR_EVENT;} Log_event_type get_type_code() { return INTVAR_EVENT;}
const char* get_var_type_name();
int get_data_size() { return sizeof(type) + sizeof(val);} int get_data_size() { return sizeof(type) + sizeof(val);}
int write_data(IO_CACHE* file); int write_data(IO_CACHE* file);
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
#endif
void print(FILE* file, bool short_form = 0, char* last_db = 0); void print(FILE* file, bool short_form = 0, char* last_db = 0);
...@@ -491,6 +506,12 @@ class Rotate_log_event: public Log_event ...@@ -491,6 +506,12 @@ class Rotate_log_event: public Log_event
int write_data(IO_CACHE* file); int write_data(IO_CACHE* file);
void print(FILE* file, bool short_form = 0, char* last_db = 0); void print(FILE* file, bool short_form = 0, char* last_db = 0);
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
#endif
}; };
#endif #endif
...@@ -211,3 +211,5 @@ ...@@ -211,3 +211,5 @@
"Update locks cannot be acquired during a READ UNCOMMITTED transaction", "Update locks cannot be acquired during a READ UNCOMMITTED transaction",
"Error connecting to master: %-.128s", "Error connecting to master: %-.128s",
"Error running query on master: %-.128s", "Error running query on master: %-.128s",
"Error in SHOW BINLOG EVENTS: %-.128s",
...@@ -55,7 +55,8 @@ enum enum_sql_command { ...@@ -55,7 +55,8 @@ enum enum_sql_command {
SQLCOM_RESET, SQLCOM_PURGE, SQLCOM_SHOW_BINLOGS, SQLCOM_RESET, SQLCOM_PURGE, SQLCOM_SHOW_BINLOGS,
SQLCOM_SHOW_OPEN_TABLES, SQLCOM_LOAD_MASTER_DATA, SQLCOM_SHOW_OPEN_TABLES, SQLCOM_LOAD_MASTER_DATA,
SQLCOM_HA_OPEN, SQLCOM_HA_CLOSE, SQLCOM_HA_READ, SQLCOM_HA_OPEN, SQLCOM_HA_CLOSE, SQLCOM_HA_READ,
SQLCOM_SHOW_SLAVE_HOSTS, SQLCOM_MULTI_DELETE, SQLCOM_UNION_SELECT SQLCOM_SHOW_SLAVE_HOSTS, SQLCOM_MULTI_DELETE, SQLCOM_UNION_SELECT,
SQLCOM_SHOW_BINLOG_EVENTS
}; };
enum lex_states { STATE_START, STATE_CHAR, STATE_IDENT, enum lex_states { STATE_START, STATE_CHAR, STATE_IDENT,
......
...@@ -1179,6 +1179,13 @@ mysql_execute_command(void) ...@@ -1179,6 +1179,13 @@ mysql_execute_command(void)
res = show_slave_hosts(thd); res = show_slave_hosts(thd);
break; break;
} }
case SQLCOM_SHOW_BINLOG_EVENTS:
{
if(check_access(thd, FILE_ACL, any_db))
goto error;
res = show_binlog_events(thd);
break;
}
case SQLCOM_BACKUP_TABLE: case SQLCOM_BACKUP_TABLE:
{ {
if (check_db_used(thd,tables) || if (check_db_used(thd,tables) ||
......
...@@ -381,7 +381,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -381,7 +381,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
if (pos < 4) if (pos < 4)
{ {
errmsg = "Client requested master to start repliction from impossible position.\n"; errmsg = "Client requested master to start repliction from \
impossible position";
goto err; goto err;
} }
...@@ -826,6 +827,105 @@ void reset_master() ...@@ -826,6 +827,105 @@ void reset_master()
} }
int show_binlog_events(THD* thd)
{
DBUG_ENTER("show_binlog_events");
List<Item> field_list;
const char* errmsg = 0;
IO_CACHE log;
File file = -1;
Log_event::init_show_field_list(&field_list);
if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
if (mysql_bin_log.is_open())
{
LOG_INFO linfo;
char search_file_name[FN_REFLEN];
LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
uint event_count, limit_start, limit_end;
const char* log_file_name = lex_mi->log_file_name;
Log_event* ev;
ulong pos = (ulong) lex_mi->pos;
limit_start = thd->lex.select->offset_limit;
limit_end = thd->lex.select->select_limit + limit_start;
if (log_file_name)
mysql_bin_log.make_log_name(search_file_name, log_file_name);
else
search_file_name[0] = 0;
linfo.index_file_offset = 0;
thd->current_linfo = &linfo;
if (mysql_bin_log.find_first_log(&linfo, search_file_name))
{
errmsg = "Could not find target log";
goto err;
}
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
goto err;
if (pos < 4)
{
errmsg = "Invalid log position";
goto err;
}
pthread_mutex_lock(mysql_bin_log.get_log_lock());
my_b_seek(&log, pos);
for (event_count = 0;
(ev = Log_event::read_log_event(&log, 0));)
{
if (event_count >= limit_start &&
ev->net_send(thd, linfo.log_file_name, pos))
{
errmsg = "Net error";
delete ev;
pthread_mutex_unlock(mysql_bin_log.get_log_lock());
goto err;
}
pos = my_b_tell(&log);
delete ev;
if (++event_count >= limit_end)
break;
}
if (event_count < limit_end && log.error)
{
errmsg = "Wrong offset or I/O error";
goto err;
}
pthread_mutex_unlock(mysql_bin_log.get_log_lock());
}
err:
if (file >= 0)
{
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
}
if (errmsg)
{
net_printf(&thd->net, ER_SHOW_BINLOG_EVENTS, errmsg);
DBUG_RETURN(1);
}
send_eof(&thd->net);
DBUG_RETURN(0);
}
int show_slave_hosts(THD* thd) int show_slave_hosts(THD* thd)
{ {
DBUG_ENTER("show_slave_hosts"); DBUG_ENTER("show_slave_hosts");
......
...@@ -29,6 +29,7 @@ int load_master_data(THD* thd); ...@@ -29,6 +29,7 @@ int load_master_data(THD* thd);
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi); int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
int change_master(THD* thd); int change_master(THD* thd);
int show_slave_hosts(THD* thd); int show_slave_hosts(THD* thd);
int show_binlog_events(THD* thd);
void reset_slave(); void reset_slave();
void reset_master(); void reset_master();
void init_slave_list(); void init_slave_list();
......
...@@ -130,6 +130,8 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize); ...@@ -130,6 +130,8 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token LOAD %token LOAD
%token LOCK_SYM %token LOCK_SYM
%token UNLOCK_SYM %token UNLOCK_SYM
%token BINLOG_SYM
%token EVENTS_SYM
%token ACTION %token ACTION
%token AGGREGATE_SYM %token AGGREGATE_SYM
...@@ -466,12 +468,12 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize); ...@@ -466,12 +468,12 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
opt_escape opt_escape
%type <string> %type <string>
text_string text_string
%type <num> %type <num>
type int_type real_type order_dir opt_field_spec set_option lock_option type int_type real_type order_dir opt_field_spec set_option lock_option
udf_type if_exists opt_local opt_table_options table_options udf_type if_exists opt_local opt_table_options table_options
table_option opt_if_not_exists table_option opt_if_not_exists
%type <ulong_num> %type <ulong_num>
ULONG_NUM raid_types ULONG_NUM raid_types
...@@ -2406,6 +2408,10 @@ show_param: ...@@ -2406,6 +2408,10 @@ show_param:
{ {
Lex->sql_command = SQLCOM_SHOW_SLAVE_HOSTS; Lex->sql_command = SQLCOM_SHOW_SLAVE_HOSTS;
} }
| BINLOG_SYM EVENTS_SYM binlog_in binlog_from limit_clause
{
Lex->sql_command = SQLCOM_SHOW_BINLOG_EVENTS;
}
| keys_or_index FROM table_ident opt_db | keys_or_index FROM table_ident opt_db
{ {
Lex->sql_command= SQLCOM_SHOW_KEYS; Lex->sql_command= SQLCOM_SHOW_KEYS;
...@@ -2456,6 +2462,15 @@ opt_full: ...@@ -2456,6 +2462,15 @@ opt_full:
/* empty */ { Lex->verbose=0; } /* empty */ { Lex->verbose=0; }
| FULL { Lex->verbose=1; } | FULL { Lex->verbose=1; }
binlog_in:
/* empty */ { Lex->mi.log_file_name = 0; }
| IN_SYM TEXT_STRING { Lex->mi.log_file_name = $2.str; }
binlog_from:
/* empty */ { Lex->mi.pos = 4; /* skip magic number */ }
| FROM ULONGLONG_NUM { Lex->mi.pos = $2; }
/* A Oracle compatible synonym for show */ /* A Oracle compatible synonym for show */
describe: describe:
describe_command table_ident describe_command table_ident
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment