Commit d998da03 authored by Eugene Kosov's avatar Eugene Kosov

SQL: replication fixes [fixes #234]

parent 88454b33
include/master-slave.inc
[connection master]
connection slave;
connection master;
CREATE TABLE t1 (x int) with system versioning;
insert into t1 values (1);
SELECT * FROM t1;
x
1
delete from t1;
select * from t1;
x
select * from t1 for system_time all;
x
1
connection slave;
select * from t1;
x
select * from t1 for system_time all;
x
1
connection master;
insert into t1 values (2);
connection slave;
select * from t1;
x
2
connection master;
update t1 set x = 3;
connection slave;
select * from t1;
x
3
select * from t1 for system_time all;
x
1
3
2
connection master;
create or replace table t1 (x int primary key);
connection slave;
alter table t1 with system versioning;
connection master;
insert into t1 values (1);
connection slave;
select * from t1;
x
1
select * from t1 for system_time all;
x
1
connection master;
update t1 set x= 2 where x = 1;
connection slave;
select * from t1;
x
2
select * from t1 for system_time all;
x
1
2
connection master;
delete from t1;
connection slave;
select * from t1;
x
select * from t1 for system_time all;
x
1
2
connection master;
create or replace table t1 (x int);
connection slave;
alter table t1 with system versioning;
connection master;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
connection slave;
select * from t1;
x
2
select * from t1 for system_time all;
x
2
1
connection master;
delete from t1;
connection slave;
select * from t1;
x
select * from t1 for system_time all;
x
2
1
connection master;
create or replace table t1 (x int) with system versioning;
create or replace table t2 (x int) with system versioning;
insert into t1 values (1);
insert into t2 values (2);
update t1, t2 set t1.x=11, t2.x=22;
connection slave;
select * from t1;
x
11
select * from t2;
x
22
select * from t1 for system_time all;
x
11
1
select * from t2 for system_time all;
x
22
2
connection master;
drop table t1, t2;
include/rpl_end.inc
......@@ -2,7 +2,7 @@ include/master-slave.inc
[connection master]
connection slave;
connection master;
CREATE TABLE t1 (x int) with system versioning ENGINE = innodb;
CREATE TABLE t1 (x int) with system versioning;
insert into t1 values (1);
SELECT * FROM t1;
x
......@@ -37,11 +37,19 @@ x
3
2
connection master;
create or replace table t1 (x int primary key) engine = innodb;
create or replace table t1 (x int primary key);
connection slave;
alter table t1 with system versioning;
connection master;
insert into t1 values (1);
connection slave;
select * from t1;
x
1
select * from t1 for system_time all;
x
1
connection master;
update t1 set x= 2 where x = 1;
connection slave;
select * from t1;
......@@ -61,7 +69,7 @@ x
1
2
connection master;
create or replace table t1 (x int) engine = innodb;
create or replace table t1 (x int);
connection slave;
alter table t1 with system versioning;
connection master;
......@@ -85,42 +93,26 @@ x
2
1
connection master;
create or replace table t1 (x int primary key) with system versioning engine = innodb;
connection slave;
alter table t1 without system versioning;
connection master;
create or replace table t1 (x int) with system versioning;
create or replace table t2 (x int) with system versioning;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
select * from t1 for system_time all;
x
1
2
insert into t2 values (2);
update t1, t2 set t1.x=11, t2.x=22;
connection slave;
select * from t1;
x
2
connection master;
delete from t1;
11
select * from t2;
x
22
select * from t1 for system_time all;
x
11
1
2
connection slave;
select * from t1;
select * from t2 for system_time all;
x
connection master;
create or replace table t1 (a int) with system versioning engine = innodb;
insert into t1 values (1);
update t1 set a=2;
select * from t1 for system_time all;
a
22
2
1
connection slave;
select * from t1 for system_time all;
a
2
1
connection master;
drop table t1;
drop table t1, t2;
include/rpl_end.inc
......@@ -2,7 +2,7 @@ include/master-slave.inc
[connection master]
connection slave;
connection master;
CREATE TABLE t1 (x int) with system versioning ENGINE = innodb;
CREATE TABLE t1 (x int) with system versioning;
insert into t1 values (1);
SELECT * FROM t1;
x
......@@ -37,11 +37,19 @@ x
3
2
connection master;
create or replace table t1 (x int primary key) engine = innodb;
create or replace table t1 (x int primary key);
connection slave;
alter table t1 with system versioning;
connection master;
insert into t1 values (1);
connection slave;
select * from t1;
x
1
select * from t1 for system_time all;
x
1
connection master;
update t1 set x= 2 where x = 1;
connection slave;
select * from t1;
......@@ -61,7 +69,7 @@ x
1
2
connection master;
create or replace table t1 (x int) engine = innodb;
create or replace table t1 (x int);
connection slave;
alter table t1 with system versioning;
connection master;
......@@ -85,42 +93,26 @@ x
2
1
connection master;
create or replace table t1 (x int primary key) with system versioning engine = innodb;
connection slave;
alter table t1 without system versioning;
connection master;
create or replace table t1 (x int) with system versioning;
create or replace table t2 (x int) with system versioning;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
select * from t1 for system_time all;
x
1
2
insert into t2 values (2);
update t1, t2 set t1.x=11, t2.x=22;
connection slave;
select * from t1;
x
2
connection master;
delete from t1;
11
select * from t2;
x
22
select * from t1 for system_time all;
x
11
1
2
connection slave;
select * from t1;
select * from t2 for system_time all;
x
connection master;
create or replace table t1 (a int) with system versioning engine = innodb;
insert into t1 values (1);
update t1 set a=2;
select * from t1 for system_time all;
a
22
2
1
connection slave;
select * from t1 for system_time all;
a
2
1
connection master;
drop table t1;
drop table t1, t2;
include/rpl_end.inc
include/master-slave.inc
[connection master]
create table t (a int) with system versioning engine=innodb;
truncate t for system_time all;
ERROR HY000: `TRUNCATE FOR SYSTEM_TIME with row-based replication` is not allowed for versioned table
drop table t;
include/rpl_end.inc
[myisam]
default-storage-engine=myisam
[innodb]
default-storage-engine=innodb
-- source include/have_binlog_format_row.inc
-- source include/have_binlog_format_mixed.inc
-- source include/master-slave.inc
-- source include/have_innodb.inc
create table t (a int) with system versioning engine=innodb;
--error ER_VERS_NOT_ALLOWED
truncate t for system_time all;
drop table t;
-- source rpl_test.inc
-- source include/rpl_end.inc
[myisam]
default-storage-engine=myisam
[innodb]
default-storage-engine=innodb
[innodb]
default-storage-engine=innodb
[myisam]
default-storage-engine=myisam
......@@ -8,7 +8,7 @@ let $slave_com_delete_before= query_get_value(SHOW GLOBAL STATUS LIKE 'com_delet
let $slave_com_update_before= query_get_value(SHOW GLOBAL STATUS LIKE 'com_update', Value, 1);
connection master;
CREATE TABLE t1 (x int) with system versioning ENGINE = innodb;
CREATE TABLE t1 (x int) with system versioning;
insert into t1 values (1);
SELECT * FROM t1;
delete from t1;
......@@ -31,12 +31,17 @@ select * from t1 for system_time all;
# check unversioned -> versioned replication
connection master;
create or replace table t1 (x int primary key) engine = innodb;
create or replace table t1 (x int primary key);
sync_slave_with_master;
alter table t1 with system versioning;
connection master;
insert into t1 values (1);
sync_slave_with_master;
select * from t1;
select * from t1 for system_time all;
connection master;
update t1 set x= 2 where x = 1;
sync_slave_with_master;
select * from t1;
......@@ -50,7 +55,7 @@ select * from t1 for system_time all;
# same thing (UPDATE, DELETE), but without PK
connection master;
create or replace table t1 (x int) engine = innodb;
create or replace table t1 (x int);
sync_slave_with_master;
alter table t1 with system versioning;
......@@ -67,35 +72,19 @@ sync_slave_with_master;
select * from t1;
select * from t1 for system_time all;
# same thing, but reverse: versioned -> unversioned
connection master;
create or replace table t1 (x int primary key) with system versioning engine = innodb;
sync_slave_with_master;
alter table t1 without system versioning;
# multi-update
connection master;
create or replace table t1 (x int) with system versioning;
create or replace table t2 (x int) with system versioning;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
select * from t1 for system_time all;
insert into t2 values (2);
update t1, t2 set t1.x=11, t2.x=22;
sync_slave_with_master;
select * from t1;
connection master;
delete from t1;
select * from t2;
select * from t1 for system_time all;
sync_slave_with_master;
select * from t1;
select * from t2 for system_time all;
# at this point in this particular test master and slave have different curr_trx_id
# and the same rows have different sys_trx_start
# slave should ignore sys_trx_start while searching for a record to update in a InnoDB table
connection master;
create or replace table t1 (a int) with system versioning engine = innodb;
insert into t1 values (1);
update t1 set a=2;
select * from t1 for system_time all;
sync_slave_with_master;
select * from t1 for system_time all;
connection master;
drop table t1;
drop table t1, t2;
......@@ -5697,6 +5697,8 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat)
bool handler::check_table_binlog_row_based(bool binlog_row)
{
if (table->versioned_by_engine())
return false;
if (unlikely((table->in_use->variables.sql_log_bin_off)))
return 0; /* Called by partitioning engine */
if (unlikely((!check_table_binlog_row_based_done)))
......
......@@ -44,6 +44,7 @@
#include <strfunc.h>
#include "compat56.h"
#include "wsrep_mysqld.h"
#include "sql_insert.h"
#endif /* MYSQL_CLIENT */
#include <my_bitmap.h>
......@@ -12509,6 +12510,22 @@ Rows_log_event::write_row(rpl_group_info *rgi,
DBUG_RETURN(HA_ERR_GENERIC); // in case if error is not set yet
}
// Handle INSERT.
// Set vers fields when replicating from not system-versioned table.
if (m_type == WRITE_ROWS_EVENT_V1 && table->versioned_by_sql())
{
bitmap_set_bit(table->read_set, table->vers_start_field()->field_index);
// Check whether a row came from unversioned table and fix vers fields.
if (table->vers_start_field()->get_timestamp() == 0)
{
bitmap_set_bit(table->write_set, table->vers_start_field()->field_index);
bitmap_set_bit(table->write_set, table->vers_end_field()->field_index);
thd->set_current_time();
table->vers_start_field()->set_time();
table->vers_end_field()->set_max();
}
}
/*
Try to write record. If a corresponding record already exists in the table,
we try to change it using ha_update_row() if possible. Otherwise we delete
......@@ -12799,7 +12816,7 @@ static bool record_compare(TABLE *table)
/* Compare fields */
for (Field **ptr=table->field ; *ptr ; ptr++)
{
if (table->versioned_by_engine() && *ptr == table->vers_start_field())
if (table->versioned() && (*ptr)->vers_sys_field())
{
continue;
}
......@@ -12997,19 +13014,19 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
prepare_record(table, m_width, FALSE);
error= unpack_current_row(rgi);
m_vers_from_plain= false;
if (table->versioned())
{
Field *sys_trx_end= table->vers_end_field();
DBUG_ASSERT(table->read_set);
bitmap_set_bit(table->read_set, sys_trx_end->field_index);
// master table is unversioned
// check whether master table is unversioned
if (sys_trx_end->val_int() == 0)
{
DBUG_ASSERT(table->write_set);
bitmap_set_bit(table->write_set, sys_trx_end->field_index);
sys_trx_end->set_max();
table->vers_start_field()->set_notnull();
bitmap_set_bit(table->write_set, sys_trx_end->field_index);
table->vers_end_field()->set_max();
m_vers_from_plain= true;
}
}
......@@ -13395,7 +13412,19 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
if (!error)
{
m_table->mark_columns_per_binlog_row_image();
error= m_table->file->ha_delete_row(m_table->record[0]);
if (m_vers_from_plain && m_table->versioned_by_sql())
{
Field *end= m_table->vers_end_field();
bitmap_set_bit(m_table->write_set, end->field_index);
store_record(m_table, record[1]);
end->set_time();
error= m_table->file->ha_update_row(m_table->record[1],
m_table->record[0]);
}
else
{
error= m_table->file->ha_delete_row(m_table->record[0]);
}
m_table->default_column_bitmaps();
}
if (invoke_triggers && !error &&
......@@ -13652,9 +13681,22 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
m_table->mark_columns_per_binlog_row_image();
if (m_vers_from_plain && m_table->versioned_by_sql())
{
bitmap_set_bit(m_table->write_set,
m_table->vers_start_field()->field_index);
thd->set_current_time();
m_table->vers_start_field()->set_time();
}
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
if (error == HA_ERR_RECORD_IS_THE_SAME)
error= 0;
if (m_vers_from_plain && m_table->versioned_by_sql())
{
store_record(m_table, record[2]);
error= vers_insert_history_row(m_table);
restore_record(m_table, record[2]);
}
m_table->default_column_bitmaps();
if (invoke_triggers && !error &&
......
......@@ -4588,6 +4588,8 @@ class Rows_log_event : public Log_event
uchar *m_extra_row_data; /* Pointer to extra row data if any */
/* If non null, first byte is length */
bool m_vers_from_plain;
/* helper functions */
......@@ -4737,6 +4739,7 @@ class Write_rows_log_event : public Rows_log_event
__attribute__((unused)),
const uchar *after_record)
{
DBUG_ASSERT(!table->versioned_by_engine());
return thd->binlog_write_row(table, is_transactional, after_record);
}
#endif
......@@ -4818,6 +4821,7 @@ class Update_rows_log_event : public Rows_log_event
const uchar *before_record,
const uchar *after_record)
{
DBUG_ASSERT(!table->versioned_by_engine());
return thd->binlog_update_row(table, is_transactional,
before_record, after_record);
}
......@@ -4907,6 +4911,7 @@ class Delete_rows_log_event : public Rows_log_event
const uchar *after_record
__attribute__((unused)))
{
DBUG_ASSERT(!table->versioned_by_engine());
return thd->binlog_delete_row(table, is_transactional,
before_record);
}
......
......@@ -6052,6 +6052,24 @@ class ErrConvDQName: public ErrConv
}
};
class ScopedStatementReplication
{
public:
ScopedStatementReplication(THD *thd) : thd(thd)
{
if (thd)
saved_binlog_format= thd->set_current_stmt_binlog_format_stmt();
}
~ScopedStatementReplication()
{
if (thd)
thd->restore_stmt_binlog_format(saved_binlog_format);
}
private:
enum_binlog_format saved_binlog_format;
THD *thd;
};
#endif /* MYSQL_SERVER */
#endif /* SQL_CLASS_INCLUDED */
......@@ -270,14 +270,6 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
TABLE *table= table_list->table;
DBUG_ASSERT(table);
if (table->versioned_by_engine() &&
table->file->check_table_binlog_row_based(1))
{
my_error(ER_VERS_NOT_ALLOWED, MYF(0),
"TRUNCATE FOR SYSTEM_TIME with row-based replication");
DBUG_RETURN(TRUE);
}
DBUG_ASSERT(!conds);
if (vers_setup_select(thd, table_list, &conds, select_lex))
DBUG_RETURN(TRUE);
......@@ -724,6 +716,8 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
else
errcode= query_error_code(thd, killed_status == NOT_KILLED);
ScopedStatementReplication scoped_stmt_rpl(
table->versioned_by_engine() ? thd : NULL);
/*
[binlog]: If 'handler::delete_all_rows()' was called and the
storage engine does not inject the rows itself, we replicate
......
......@@ -1137,8 +1137,10 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
}
else
errcode= query_error_code(thd, thd->killed == NOT_KILLED);
/* bug#22725:
ScopedStatementReplication scoped_stmt_rpl(
table->versioned_by_engine() ? thd : NULL);
/* bug#22725:
A query which per-row-loop can not be interrupted with
KILLED, like INSERT, and that does not invoke stored
......
......@@ -1036,6 +1036,9 @@ int mysql_update(THD *thd,
else
errcode= query_error_code(thd, killed_status == NOT_KILLED);
ScopedStatementReplication scoped_stmt_rpl(
table->versioned_by_engine() ? thd : NULL);
if (thd->binlog_query(THD::ROW_QUERY_TYPE,
thd->query(), thd->query_length(),
transactional_table, FALSE, FALSE, errcode))
......@@ -2674,9 +2677,21 @@ bool multi_update::send_eof()
thd->clear_error();
else
errcode= query_error_code(thd, killed_status == NOT_KILLED);
if (thd->binlog_query(THD::ROW_QUERY_TYPE,
thd->query(), thd->query_length(),
transactional_tables, FALSE, FALSE, errcode))
bool force_stmt= false;
for (TABLE *table= all_tables->table; table; table= table->next)
{
if (table->versioned_by_engine())
{
force_stmt= true;
break;
}
}
ScopedStatementReplication scoped_stmt_rpl(force_stmt ? thd : NULL);
if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
thd->query_length(), transactional_tables, FALSE,
FALSE, errcode))
{
local_error= 1; // Rollback update
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment