Commit 3db92ee4 authored by Daniele Sciascia's avatar Daniele Sciascia Committed by Nirbhay Choubey

MW-265 Add support for wsrep_max_ws_rows

Variable wsrep_max_ws_rows limits the number of rows that a transaction
can insert/update/delete.
parent 10880d67
CREATE TABLE ten (f1 INTEGER) ENGINE=InnoDB;
INSERT INTO ten VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
CREATE TABLE t1 (f1 INTEGER AUTO_INCREMENT PRIMARY KEY, f2 INTEGER) ENGINE=InnoDB;
SET GLOBAL wsrep_max_ws_rows = 4;
START TRANSACTION;
INSERT INTO t1 (f2) VALUES (1);
INSERT INTO t1 (f2) VALUES (2);
INSERT INTO t1 (f2) VALUES (3);
INSERT INTO t1 (f2) VALUES (4);
INSERT INTO t1 (f2) VALUES (5);
ERROR HY000: wsrep_max_ws_rows exceeded
COMMIT;
SELECT COUNT(*) = 0 FROM t1;
COUNT(*) = 0
1
START TRANSACTION;
INSERT INTO t1 (f2) VALUES (1);
INSERT INTO t1 (f2) VALUES (2);
INSERT INTO t1 (f2) VALUES (3);
INSERT INTO t1 (f2) VALUES (4);
UPDATE t1 SET f2 = 10 WHERE f2 = 4;
ERROR HY000: wsrep_max_ws_rows exceeded
COMMIT;
SELECT COUNT(*) = 0 FROM t1;
COUNT(*) = 0
1
START TRANSACTION;
INSERT INTO t1 (f2) VALUES (1);
INSERT INTO t1 (f2) VALUES (2);
INSERT INTO t1 (f2) VALUES (3);
INSERT INTO t1 (f2) VALUES (4);
DELETE FROM t1 WHERE f2 = 1;
ERROR HY000: wsrep_max_ws_rows exceeded
COMMIT;
SELECT COUNT(*) = 0 FROM t1;
COUNT(*) = 0
1
SET GLOBAL wsrep_max_ws_rows = 5;
INSERT INTO t1 (f2) VALUES (1),(2),(3),(4),(5);
SET GLOBAL wsrep_max_ws_rows = 4;
UPDATE t1 SET f2 = f2 + 10;
ERROR HY000: wsrep_max_ws_rows exceeded
SELECT COUNT(*) = 5 FROM t1;
COUNT(*) = 5
1
DELETE FROM t1 WHERE f2 < 10;
ERROR HY000: wsrep_max_ws_rows exceeded
SELECT COUNT(*) = 5 FROM t1;
COUNT(*) = 5
1
INSERT INTO t1 (f2) SELECT * FROM ten;
ERROR HY000: wsrep_max_ws_rows exceeded
SELECT COUNT(*) = 5 FROM t1;
COUNT(*) = 5
1
INSERT INTO t1 (f2) VALUES (10),(20),(30),(40),(50);
ERROR HY000: wsrep_max_ws_rows exceeded
SELECT COUNT(*) = 5 FROM t1;
COUNT(*) = 5
1
SET GLOBAL wsrep_max_ws_rows = 10;
DELETE FROM t1 WHERE f2 < 10;
SELECT COUNT(*) = 0 FROM t1;
COUNT(*) = 0
1
SET GLOBAL wsrep_max_ws_rows = 100;
SELECT COUNT(*) = 100 FROM t1;
COUNT(*) = 100
1
DELETE FROM t1 WHERE f2 < 101;
SELECT COUNT(*) = 0 FROM t1;
COUNT(*) = 0
1
SET GLOBAL wsrep_max_ws_rows = 9999;
INSERT INTO t1 (f2) SELECT 1 FROM ten AS a1, ten AS a2, ten AS a3, ten AS a4;
ERROR HY000: wsrep_max_ws_rows exceeded
SET GLOBAL wsrep_max_ws_rows = 10000;
INSERT INTO t1 (f2) SELECT 1 FROM ten AS a1, ten AS a2, ten AS a3, ten AS a4;
SET GLOBAL wsrep_max_ws_rows = 9999;
UPDATE t1 SET f2 = 2 WHERE f2 = 1;
ERROR HY000: wsrep_max_ws_rows exceeded
SET GLOBAL wsrep_max_ws_rows = 10000;
UPDATE t1 SET f2 = 2 WHERE f2 = 1;
SET GLOBAL wsrep_max_ws_rows = 9999;
DELETE FROM t1 WHERE f2 = 2;
ERROR HY000: wsrep_max_ws_rows exceeded
SET GLOBAL wsrep_max_ws_rows = 10000;
DELETE FROM t1 WHERE f2 = 2;
SELECT COUNT(*) = 0 FROM t1;
COUNT(*) = 0
1
DROP TABLE t1;
DROP TABLE ten;
--source include/galera_cluster.inc
--source include/have_innodb.inc
CREATE TABLE ten (f1 INTEGER) ENGINE=InnoDB;
INSERT INTO ten VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
CREATE TABLE t1 (f1 INTEGER AUTO_INCREMENT PRIMARY KEY, f2 INTEGER) ENGINE=InnoDB;
--let $wsrep_max_ws_rows_orig = `SELECT @@wsrep_max_ws_rows`
SET GLOBAL wsrep_max_ws_rows = 4;
# Test that wsrep_max_ws_rows is enforced with multi statement transactions
START TRANSACTION;
INSERT INTO t1 (f2) VALUES (1);
INSERT INTO t1 (f2) VALUES (2);
INSERT INTO t1 (f2) VALUES (3);
INSERT INTO t1 (f2) VALUES (4);
--error ER_ERROR_DURING_COMMIT
INSERT INTO t1 (f2) VALUES (5);
COMMIT;
SELECT COUNT(*) = 0 FROM t1;
START TRANSACTION;
INSERT INTO t1 (f2) VALUES (1);
INSERT INTO t1 (f2) VALUES (2);
INSERT INTO t1 (f2) VALUES (3);
INSERT INTO t1 (f2) VALUES (4);
--error ER_ERROR_DURING_COMMIT
UPDATE t1 SET f2 = 10 WHERE f2 = 4;
COMMIT;
SELECT COUNT(*) = 0 FROM t1;
START TRANSACTION;
INSERT INTO t1 (f2) VALUES (1);
INSERT INTO t1 (f2) VALUES (2);
INSERT INTO t1 (f2) VALUES (3);
INSERT INTO t1 (f2) VALUES (4);
--error ER_ERROR_DURING_COMMIT
DELETE FROM t1 WHERE f2 = 1;
COMMIT;
SELECT COUNT(*) = 0 FROM t1;
# Test that wsrep_max_ws_rows is enforced on sigle statements
SET GLOBAL wsrep_max_ws_rows = 5;
INSERT INTO t1 (f2) VALUES (1),(2),(3),(4),(5);
SET GLOBAL wsrep_max_ws_rows = 4;
--error ER_ERROR_DURING_COMMIT
UPDATE t1 SET f2 = f2 + 10;
SELECT COUNT(*) = 5 FROM t1;
--error ER_ERROR_DURING_COMMIT
DELETE FROM t1 WHERE f2 < 10;
SELECT COUNT(*) = 5 FROM t1;
--error ER_ERROR_DURING_COMMIT
INSERT INTO t1 (f2) SELECT * FROM ten;
SELECT COUNT(*) = 5 FROM t1;
--error ER_ERROR_DURING_COMMIT
INSERT INTO t1 (f2) VALUES (10),(20),(30),(40),(50);
SELECT COUNT(*) = 5 FROM t1;
# Fewer than wsrep_max_ws_rows is OK
SET GLOBAL wsrep_max_ws_rows = 10;
DELETE FROM t1 WHERE f2 < 10;
SELECT COUNT(*) = 0 FROM t1;
# Test a series of transactions
--disable_query_log
SET GLOBAL wsrep_max_ws_rows = 5;
let $i= 100;
while ($i)
{
START TRANSACTION;
--eval INSERT INTO t1 (f2) VALUES ($i);
COMMIT;
dec $i;
}
--enable_query_log
SET GLOBAL wsrep_max_ws_rows = 100;
SELECT COUNT(*) = 100 FROM t1;
DELETE FROM t1 WHERE f2 < 101;
SELECT COUNT(*) = 0 FROM t1;
# Test large statements
SET GLOBAL wsrep_max_ws_rows = 9999;
--error ER_ERROR_DURING_COMMIT
INSERT INTO t1 (f2) SELECT 1 FROM ten AS a1, ten AS a2, ten AS a3, ten AS a4;
SET GLOBAL wsrep_max_ws_rows = 10000;
INSERT INTO t1 (f2) SELECT 1 FROM ten AS a1, ten AS a2, ten AS a3, ten AS a4;
SET GLOBAL wsrep_max_ws_rows = 9999;
--error ER_ERROR_DURING_COMMIT
UPDATE t1 SET f2 = 2 WHERE f2 = 1;
SET GLOBAL wsrep_max_ws_rows = 10000;
UPDATE t1 SET f2 = 2 WHERE f2 = 1;
SET GLOBAL wsrep_max_ws_rows = 9999;
--error ER_ERROR_DURING_COMMIT
DELETE FROM t1 WHERE f2 = 2;
SET GLOBAL wsrep_max_ws_rows = 10000;
DELETE FROM t1 WHERE f2 = 2;
SELECT COUNT(*) = 0 FROM t1;
--disable_query_log
--eval SET GLOBAL wsrep_max_ws_rows = $wsrep_max_ws_rows_orig
--enable_query_log
DROP TABLE t1;
DROP TABLE ten;
...@@ -6098,6 +6098,17 @@ int handler::ha_write_row(uchar *buf) ...@@ -6098,6 +6098,17 @@ int handler::ha_write_row(uchar *buf)
rows_changed++; rows_changed++;
if (unlikely(error= binlog_log_row(table, 0, buf, log_func))) if (unlikely(error= binlog_log_row(table, 0, buf, log_func)))
DBUG_RETURN(error); /* purecov: inspected */ DBUG_RETURN(error); /* purecov: inspected */
#ifdef WITH_WSREP
current_thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
current_thd->wsrep_exec_mode != REPL_RECV &&
current_thd->wsrep_affected_rows > wsrep_max_ws_rows)
{
current_thd->transaction_rollback_request= TRUE;
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
DBUG_RETURN(ER_ERROR_DURING_COMMIT);
}
#endif /* WITH_WSREP */
DEBUG_SYNC_C("ha_write_row_end"); DEBUG_SYNC_C("ha_write_row_end");
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -6131,6 +6142,17 @@ int handler::ha_update_row(const uchar *old_data, uchar *new_data) ...@@ -6131,6 +6142,17 @@ int handler::ha_update_row(const uchar *old_data, uchar *new_data)
rows_changed++; rows_changed++;
if (unlikely(error= binlog_log_row(table, old_data, new_data, log_func))) if (unlikely(error= binlog_log_row(table, old_data, new_data, log_func)))
return error; return error;
#ifdef WITH_WSREP
current_thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
current_thd->wsrep_exec_mode != REPL_RECV &&
current_thd->wsrep_affected_rows > wsrep_max_ws_rows)
{
current_thd->transaction_rollback_request= TRUE;
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
return ER_ERROR_DURING_COMMIT;
}
#endif /* WITH_WSREP */
return 0; return 0;
} }
...@@ -6158,6 +6180,17 @@ int handler::ha_delete_row(const uchar *buf) ...@@ -6158,6 +6180,17 @@ int handler::ha_delete_row(const uchar *buf)
rows_changed++; rows_changed++;
if (unlikely(error= binlog_log_row(table, buf, 0, log_func))) if (unlikely(error= binlog_log_row(table, buf, 0, log_func)))
return error; return error;
#ifdef WITH_WSREP
current_thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
current_thd->wsrep_exec_mode != REPL_RECV &&
current_thd->wsrep_affected_rows > wsrep_max_ws_rows)
{
current_thd->transaction_rollback_request= TRUE;
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
return ER_ERROR_DURING_COMMIT;
}
#endif /* WITH_WSREP */
return 0; return 0;
} }
......
...@@ -1213,7 +1213,8 @@ THD::THD() ...@@ -1213,7 +1213,8 @@ THD::THD()
wsrep_mysql_replicated = 0; wsrep_mysql_replicated = 0;
wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query = NULL;
wsrep_TOI_pre_query_len = 0; wsrep_TOI_pre_query_len = 0;
wsrep_sync_wait_gtid= WSREP_GTID_UNDEFINED; wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED;
wsrep_affected_rows = 0;
#endif #endif
/* Call to init() below requires fully initialized Open_tables_state. */ /* Call to init() below requires fully initialized Open_tables_state. */
reset_open_tables_state(this); reset_open_tables_state(this);
...@@ -1629,7 +1630,8 @@ void THD::init(void) ...@@ -1629,7 +1630,8 @@ void THD::init(void)
wsrep_mysql_replicated = 0; wsrep_mysql_replicated = 0;
wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query = NULL;
wsrep_TOI_pre_query_len = 0; wsrep_TOI_pre_query_len = 0;
wsrep_sync_wait_gtid= WSREP_GTID_UNDEFINED; wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED;
wsrep_affected_rows = 0;
/* /*
@@wsrep_causal_reads is now being handled via wsrep_sync_wait, update it @@wsrep_causal_reads is now being handled via wsrep_sync_wait, update it
...@@ -2383,6 +2385,8 @@ void THD::cleanup_after_query() ...@@ -2383,6 +2385,8 @@ void THD::cleanup_after_query()
#ifdef WITH_WSREP #ifdef WITH_WSREP
wsrep_sync_wait_gtid= WSREP_GTID_UNDEFINED; wsrep_sync_wait_gtid= WSREP_GTID_UNDEFINED;
if (!in_active_multi_stmt_transaction())
wsrep_affected_rows= 0;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
......
...@@ -3878,6 +3878,7 @@ class THD :public Statement, ...@@ -3878,6 +3878,7 @@ class THD :public Statement,
bool wsrep_apply_toi; /* applier processing in TOI */ bool wsrep_apply_toi; /* applier processing in TOI */
bool wsrep_skip_append_keys; bool wsrep_skip_append_keys;
wsrep_gtid_t wsrep_sync_wait_gtid; wsrep_gtid_t wsrep_sync_wait_gtid;
ulong wsrep_affected_rows;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
}; };
......
...@@ -4694,7 +4694,7 @@ static Sys_var_ulong Sys_wsrep_max_ws_size ( ...@@ -4694,7 +4694,7 @@ static Sys_var_ulong Sys_wsrep_max_ws_size (
static Sys_var_ulong Sys_wsrep_max_ws_rows ( static Sys_var_ulong Sys_wsrep_max_ws_rows (
"wsrep_max_ws_rows", "Max number of rows in write set", "wsrep_max_ws_rows", "Max number of rows in write set",
GLOBAL_VAR(wsrep_max_ws_rows), CMD_LINE(REQUIRED_ARG), GLOBAL_VAR(wsrep_max_ws_rows), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1, 1048576), DEFAULT(131072), BLOCK_SIZE(1)); VALID_RANGE(0, 1048576), DEFAULT(0), BLOCK_SIZE(1));
static Sys_var_charptr Sys_wsrep_notify_cmd( static Sys_var_charptr Sys_wsrep_notify_cmd(
"wsrep_notify_cmd", "", "wsrep_notify_cmd", "",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment