Commit 8a346f31 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-17073 INSERT…ON DUPLICATE KEY UPDATE became more deadlock-prone

thd_rpl_stmt_based(): A new predicate to check if statement-based
replication is active. (This can also hold when replication is not
in use, but binlog is.)

que_thr_stop(), row_ins_duplicate_error_in_clust(),
row_ins_sec_index_entry_low(), row_ins(): On a duplicate key error,
only lock all index records when statement-based replication is in use.
parent cfa04706
# See innodb_binlog.combinations
# --log-bin is ignored in the embedded server
--source include/not_embedded.inc
--- auto_increment_dup.result
+++ auto_increment_dup,skip-log-bin.reject
@@ -89,13 +89,14 @@
SET DEBUG_SYNC='execute_command_after_close_tables SIGNAL continue';
affected rows: 0
INSERT INTO t1(k) VALUES (2), (4), (5) ON DUPLICATE KEY UPDATE c='2';
-ERROR HY000: Lock wait timeout exceeded; try restarting transaction
+affected rows: 3
+info: Records: 3 Duplicates: 0 Warnings: 0
connection con1;
#
# 2 duplicates
#
-affected rows: 3
-info: Records: 3 Duplicates: 0 Warnings: 0
+affected rows: 4
+info: Records: 3 Duplicates: 1 Warnings: 0
connection default;
#
# 3 rows
@@ -103,19 +104,21 @@
SELECT * FROM t1 order by k;
id k c
1 1 NULL
-2 2 NULL
-3 3 NULL
-affected rows: 3
+4 2 1
+2 3 NULL
+5 4 NULL
+6 5 NULL
+affected rows: 5
INSERT INTO t1(k) VALUES (2), (4), (5) ON DUPLICATE KEY UPDATE c='2';
-affected rows: 4
-info: Records: 3 Duplicates: 1 Warnings: 0
+affected rows: 6
+info: Records: 3 Duplicates: 3 Warnings: 0
SELECT * FROM t1 order by k;
id k c
1 1 NULL
-2 2 2
-3 3 NULL
-7 4 NULL
-8 5 NULL
+4 2 2
+2 3 NULL
+5 4 2
+6 5 2
affected rows: 5
disconnect con1;
disconnect con2;
drop table if exists t1;
set global transaction isolation level repeatable read; set global transaction isolation level repeatable read;
CREATE TABLE t1( CREATE TABLE t1(
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
...@@ -79,20 +78,13 @@ affected rows: 0 ...@@ -79,20 +78,13 @@ affected rows: 0
# #
# Parallel execution # Parallel execution
# #
connect con1, localhost, root;
connect con2, localhost, root; connect con2, localhost, root;
SET DEBUG_SYNC='now WAIT_FOR write_row_done'; SET DEBUG_SYNC='now WAIT_FOR write_row_done';
connection con1; connect con1, localhost, root;
#
# Connection 1
#
SET DEBUG_SYNC='ha_write_row_end SIGNAL write_row_done WAIT_FOR continue'; SET DEBUG_SYNC='ha_write_row_end SIGNAL write_row_done WAIT_FOR continue';
affected rows: 0 affected rows: 0
INSERT INTO t1(k) VALUES (1), (2), (3) ON DUPLICATE KEY UPDATE c='1'; INSERT INTO t1(k) VALUES (1), (2), (3) ON DUPLICATE KEY UPDATE c='1';
connection con2; connection con2;
#
# Connection 2
#
affected rows: 0 affected rows: 0
SET DEBUG_SYNC='execute_command_after_close_tables SIGNAL continue'; SET DEBUG_SYNC='execute_command_after_close_tables SIGNAL continue';
affected rows: 0 affected rows: 0
...@@ -140,18 +132,10 @@ k INT, ...@@ -140,18 +132,10 @@ k INT,
c CHAR(1), c CHAR(1),
UNIQUE KEY(k)) ENGINE=InnoDB; UNIQUE KEY(k)) ENGINE=InnoDB;
connect con1, localhost, root; connect con1, localhost, root;
connect con2, localhost, root;
connection con1;
#
# Connection 1
#
SET DEBUG_SYNC='ha_write_row_end SIGNAL continue2 WAIT_FOR continue1'; SET DEBUG_SYNC='ha_write_row_end SIGNAL continue2 WAIT_FOR continue1';
affected rows: 0 affected rows: 0
INSERT INTO t1(k) VALUES (1), (2), (3) ON DUPLICATE KEY UPDATE c='1'; INSERT INTO t1(k) VALUES (1), (2), (3) ON DUPLICATE KEY UPDATE c='1';
connection con2; connect con2, localhost, root;
#
# Connection 2
#
SET DEBUG_SYNC='ha_write_row_start WAIT_FOR continue2'; SET DEBUG_SYNC='ha_write_row_start WAIT_FOR continue2';
affected rows: 0 affected rows: 0
SET DEBUG_SYNC='after_mysql_insert SIGNAL continue1'; SET DEBUG_SYNC='after_mysql_insert SIGNAL continue1';
...@@ -159,6 +143,7 @@ affected rows: 0 ...@@ -159,6 +143,7 @@ affected rows: 0
INSERT INTO t1(k) VALUES (2), (4), (5) ON DUPLICATE KEY UPDATE c='2'; INSERT INTO t1(k) VALUES (2), (4), (5) ON DUPLICATE KEY UPDATE c='2';
affected rows: 3 affected rows: 3
info: Records: 3 Duplicates: 0 Warnings: 0 info: Records: 3 Duplicates: 0 Warnings: 0
disconnect con2;
connection con1; connection con1;
affected rows: 4 affected rows: 4
info: Records: 3 Duplicates: 1 Warnings: 0 info: Records: 3 Duplicates: 1 Warnings: 0
...@@ -174,7 +159,6 @@ id k c ...@@ -174,7 +159,6 @@ id k c
5 4 NULL 5 4 NULL
6 5 NULL 6 5 NULL
disconnect con1; disconnect con1;
disconnect con2;
connection default; connection default;
DROP TABLE t1; DROP TABLE t1;
set global transaction isolation level repeatable read; set global transaction isolation level repeatable read;
...@@ -4,11 +4,11 @@ ...@@ -4,11 +4,11 @@
########################################################################## ##########################################################################
--source include/have_innodb.inc --source include/have_innodb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc --source include/have_debug_sync.inc
--source include/innodb_binlog.inc
--disable_warnings let $stmt= `SELECT @@GLOBAL.log_bin`;
drop table if exists t1;
--enable_warnings
set global transaction isolation level repeatable read; set global transaction isolation level repeatable read;
...@@ -69,29 +69,28 @@ CREATE TABLE t1( ...@@ -69,29 +69,28 @@ CREATE TABLE t1(
k INT, k INT,
c CHAR(1), c CHAR(1),
UNIQUE KEY(k)) ENGINE=InnoDB; UNIQUE KEY(k)) ENGINE=InnoDB;
--echo # --echo #
--echo # Parallel execution --echo # Parallel execution
--echo # --echo #
--connect(con1, localhost, root)
--connect(con2, localhost, root) --connect(con2, localhost, root)
--send SET DEBUG_SYNC='now WAIT_FOR write_row_done' --send SET DEBUG_SYNC='now WAIT_FOR write_row_done'
--connection con1
--echo # --connect(con1, localhost, root)
--echo # Connection 1
--echo #
SET DEBUG_SYNC='ha_write_row_end SIGNAL write_row_done WAIT_FOR continue'; SET DEBUG_SYNC='ha_write_row_end SIGNAL write_row_done WAIT_FOR continue';
--send INSERT INTO t1(k) VALUES (1), (2), (3) ON DUPLICATE KEY UPDATE c='1' --send INSERT INTO t1(k) VALUES (1), (2), (3) ON DUPLICATE KEY UPDATE c='1'
--connection con2 --connection con2
--echo #
--echo # Connection 2
--echo #
--reap --reap
SET DEBUG_SYNC='execute_command_after_close_tables SIGNAL continue'; SET DEBUG_SYNC='execute_command_after_close_tables SIGNAL continue';
if ($stmt) {
--error ER_LOCK_WAIT_TIMEOUT --error ER_LOCK_WAIT_TIMEOUT
INSERT INTO t1(k) VALUES (2), (4), (5) ON DUPLICATE KEY UPDATE c='2'; INSERT INTO t1(k) VALUES (2), (4), (5) ON DUPLICATE KEY UPDATE c='2';
}
if (!$stmt) {
INSERT INTO t1(k) VALUES (2), (4), (5) ON DUPLICATE KEY UPDATE c='2';
}
--connection con1 --connection con1
--echo # --echo #
...@@ -138,23 +137,14 @@ CREATE TABLE t1( ...@@ -138,23 +137,14 @@ CREATE TABLE t1(
--enable_info --enable_info
--connect(con1, localhost, root) --connect(con1, localhost, root)
--connect(con2, localhost, root)
--connection con1
--echo #
--echo # Connection 1
--echo #
SET DEBUG_SYNC='ha_write_row_end SIGNAL continue2 WAIT_FOR continue1'; SET DEBUG_SYNC='ha_write_row_end SIGNAL continue2 WAIT_FOR continue1';
--send INSERT INTO t1(k) VALUES (1), (2), (3) ON DUPLICATE KEY UPDATE c='1' --send INSERT INTO t1(k) VALUES (1), (2), (3) ON DUPLICATE KEY UPDATE c='1'
--connection con2 --connect(con2, localhost, root)
--echo #
--echo # Connection 2
--echo #
SET DEBUG_SYNC='ha_write_row_start WAIT_FOR continue2'; SET DEBUG_SYNC='ha_write_row_start WAIT_FOR continue2';
SET DEBUG_SYNC='after_mysql_insert SIGNAL continue1'; SET DEBUG_SYNC='after_mysql_insert SIGNAL continue1';
INSERT INTO t1(k) VALUES (2), (4), (5) ON DUPLICATE KEY UPDATE c='2'; INSERT INTO t1(k) VALUES (2), (4), (5) ON DUPLICATE KEY UPDATE c='2';
--disconnect con2
--connection con1 --connection con1
--reap --reap
...@@ -167,11 +157,9 @@ SET DEBUG_SYNC='RESET'; ...@@ -167,11 +157,9 @@ SET DEBUG_SYNC='RESET';
SELECT * FROM t1 ORDER BY k; SELECT * FROM t1 ORDER BY k;
--disconnect con1 --disconnect con1
--disconnect con2
--connection default --connection default
DROP TABLE t1; DROP TABLE t1;
set global transaction isolation level repeatable read; set global transaction isolation level repeatable read;
...@@ -4523,6 +4523,11 @@ extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd) ...@@ -4523,6 +4523,11 @@ extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd)
return thd->rgi_slave && thd->rgi_slave->is_parallel_exec; return thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
} }
extern "C" int thd_rpl_stmt_based(const MYSQL_THD thd)
{
return !thd->is_current_stmt_binlog_format_row() &&
!thd->is_current_stmt_binlog_disabled();
}
/* Returns high resolution timestamp for the start /* Returns high resolution timestamp for the start
of the current query. */ of the current query. */
......
...@@ -122,6 +122,9 @@ thd_is_replication_slave_thread( ...@@ -122,6 +122,9 @@ thd_is_replication_slave_thread(
/*============================*/ /*============================*/
THD* thd); /*!< in: thread handle */ THD* thd); /*!< in: thread handle */
/** @return whether statement-based replication is active */
extern "C" int thd_rpl_stmt_based(const THD* thd);
/******************************************************************//** /******************************************************************//**
Returns true if the transaction this thread is processing has edited Returns true if the transaction this thread is processing has edited
non-transactional tables. Used by the deadlock detector when deciding non-transactional tables. Used by the deadlock detector when deciding
......
...@@ -689,7 +689,8 @@ que_thr_stop( ...@@ -689,7 +689,8 @@ que_thr_stop(
trx->lock.wait_thr = thr; trx->lock.wait_thr = thr;
thr->state = QUE_THR_LOCK_WAIT; thr->state = QUE_THR_LOCK_WAIT;
} else if (trx->duplicates && trx->error_state == DB_DUPLICATE_KEY) { } else if (trx->duplicates && trx->error_state == DB_DUPLICATE_KEY
&& thd_rpl_stmt_based(trx->mysql_thd)) {
return(FALSE); return(FALSE);
......
...@@ -2299,10 +2299,10 @@ row_ins_duplicate_error_in_clust( ...@@ -2299,10 +2299,10 @@ row_ins_duplicate_error_in_clust(
true, true,
ULINT_UNDEFINED, &heap); ULINT_UNDEFINED, &heap);
ulint lock_type; ulint lock_type =
lock_type =
trx->isolation_level <= TRX_ISO_READ_COMMITTED trx->isolation_level <= TRX_ISO_READ_COMMITTED
|| (trx->mysql_thd
&& !thd_rpl_stmt_based(trx->mysql_thd))
? LOCK_REC_NOT_GAP : LOCK_ORDINARY; ? LOCK_REC_NOT_GAP : LOCK_ORDINARY;
/* We set a lock on the possible duplicate: this /* We set a lock on the possible duplicate: this
...@@ -2342,10 +2342,7 @@ row_ins_duplicate_error_in_clust( ...@@ -2342,10 +2342,7 @@ row_ins_duplicate_error_in_clust(
if (row_ins_dupl_error_with_rec( if (row_ins_dupl_error_with_rec(
rec, entry, cursor->index, offsets)) { rec, entry, cursor->index, offsets)) {
duplicate: goto duplicate;
trx->error_info = cursor->index;
err = DB_DUPLICATE_KEY;
goto func_exit;
} }
} }
} }
...@@ -2388,7 +2385,10 @@ row_ins_duplicate_error_in_clust( ...@@ -2388,7 +2385,10 @@ row_ins_duplicate_error_in_clust(
if (row_ins_dupl_error_with_rec( if (row_ins_dupl_error_with_rec(
rec, entry, cursor->index, offsets)) { rec, entry, cursor->index, offsets)) {
goto duplicate; duplicate:
trx->error_info = cursor->index;
err = DB_DUPLICATE_KEY;
goto func_exit;
} }
} }
...@@ -3006,9 +3006,11 @@ row_ins_sec_index_entry_low( ...@@ -3006,9 +3006,11 @@ row_ins_sec_index_entry_low(
if (!(flags & BTR_NO_LOCKING_FLAG) if (!(flags & BTR_NO_LOCKING_FLAG)
&& dict_index_is_unique(index) && dict_index_is_unique(index)
&& thr_get_trx(thr)->duplicates && thr_get_trx(thr)->duplicates
&& thr_get_trx(thr)->isolation_level >= TRX_ISO_REPEATABLE_READ) { && thr_get_trx(thr)->isolation_level >= TRX_ISO_REPEATABLE_READ
&& thd_rpl_stmt_based(thr_get_trx(thr)->mysql_thd)) {
/* When using the REPLACE statement or ON DUPLICATE clause, a /* In statement-based replication, when replicating a
REPLACE statement or ON DUPLICATE KEY UPDATE clause, a
gap lock is taken on the position of the to-be-inserted record, gap lock is taken on the position of the to-be-inserted record,
to avoid other concurrent transactions from inserting the same to avoid other concurrent transactions from inserting the same
record. */ record. */
...@@ -3552,14 +3554,15 @@ row_ins( ...@@ -3552,14 +3554,15 @@ row_ins(
ins_node_t* node, /*!< in: row insert node */ ins_node_t* node, /*!< in: row insert node */
que_thr_t* thr) /*!< in: query thread */ que_thr_t* thr) /*!< in: query thread */
{ {
dberr_t err;
DBUG_ENTER("row_ins"); DBUG_ENTER("row_ins");
DBUG_PRINT("row_ins", ("table: %s", node->table->name.m_name)); DBUG_PRINT("row_ins", ("table: %s", node->table->name.m_name));
trx_t* trx = thr_get_trx(thr);
if (node->duplicate) { if (node->duplicate) {
thr_get_trx(thr)->error_state = DB_DUPLICATE_KEY; ut_ad(thd_rpl_stmt_based(trx->mysql_thd));
trx->error_state = DB_DUPLICATE_KEY;
} }
if (node->state == INS_NODE_ALLOC_ROW_ID) { if (node->state == INS_NODE_ALLOC_ROW_ID) {
...@@ -3585,7 +3588,7 @@ row_ins( ...@@ -3585,7 +3588,7 @@ row_ins(
while (node->index != NULL) { while (node->index != NULL) {
if (node->index->type != DICT_FTS) { if (node->index->type != DICT_FTS) {
err = row_ins_index_entry_step(node, thr); dberr_t err = row_ins_index_entry_step(node, thr);
switch (err) { switch (err) {
case DB_SUCCESS: case DB_SUCCESS:
...@@ -3598,9 +3601,11 @@ row_ins( ...@@ -3598,9 +3601,11 @@ row_ins(
case DB_DUPLICATE_KEY: case DB_DUPLICATE_KEY:
ut_ad(dict_index_is_unique(node->index)); ut_ad(dict_index_is_unique(node->index));
if (thr_get_trx(thr)->isolation_level if (trx->isolation_level
>= TRX_ISO_REPEATABLE_READ >= TRX_ISO_REPEATABLE_READ
&& thr_get_trx(thr)->duplicates) { && trx->duplicates
&& !node->table->is_temporary()
&& thd_rpl_stmt_based(trx->mysql_thd)) {
/* When we are in REPLACE statement or /* When we are in REPLACE statement or
INSERT .. ON DUPLICATE UPDATE INSERT .. ON DUPLICATE UPDATE
...@@ -3663,7 +3668,7 @@ row_ins( ...@@ -3663,7 +3668,7 @@ row_ins(
/* Save 1st dup error. Ignore /* Save 1st dup error. Ignore
subsequent dup errors. */ subsequent dup errors. */
node->duplicate = node->index; node->duplicate = node->index;
thr_get_trx(thr)->error_state trx->error_state
= DB_DUPLICATE_KEY; = DB_DUPLICATE_KEY;
} }
break; break;
...@@ -3674,18 +3679,6 @@ row_ins( ...@@ -3674,18 +3679,6 @@ row_ins(
} }
} }
if (node->duplicate && dict_table_is_temporary(node->table)) {
ut_ad(thr_get_trx(thr)->error_state
== DB_DUPLICATE_KEY);
/* For TEMPORARY TABLE, we won't lock anything,
so we can simply break here instead of requiring
GAP locks for other unique secondary indexes,
pretending we have consumed all indexes. */
node->index = NULL;
node->entry = NULL;
break;
}
node->index = dict_table_get_next_index(node->index); node->index = dict_table_get_next_index(node->index);
node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry); node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
...@@ -3704,6 +3697,7 @@ row_ins( ...@@ -3704,6 +3697,7 @@ row_ins(
insertion will take place. These gap locks are needed insertion will take place. These gap locks are needed
only for unique indexes. So skipping non-unique indexes. */ only for unique indexes. So skipping non-unique indexes. */
if (node->duplicate) { if (node->duplicate) {
ut_ad(thd_rpl_stmt_based(trx->mysql_thd));
while (node->index while (node->index
&& !dict_index_is_unique(node->index)) { && !dict_index_is_unique(node->index)) {
...@@ -3712,13 +3706,13 @@ row_ins( ...@@ -3712,13 +3706,13 @@ row_ins(
node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry = UT_LIST_GET_NEXT(tuple_list,
node->entry); node->entry);
} }
thr_get_trx(thr)->error_state = DB_DUPLICATE_KEY; trx->error_state = DB_DUPLICATE_KEY;
} }
} }
ut_ad(node->entry == NULL); ut_ad(node->entry == NULL);
thr_get_trx(thr)->error_info = node->duplicate; trx->error_info = node->duplicate;
node->state = INS_NODE_ALLOC_ROW_ID; node->state = INS_NODE_ALLOC_ROW_ID;
DBUG_RETURN(node->duplicate ? DB_DUPLICATE_KEY : DB_SUCCESS); DBUG_RETURN(node->duplicate ? DB_DUPLICATE_KEY : DB_SUCCESS);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment