Commit 12f6db96 authored by sjaakola's avatar sjaakola

MDEV-22763 backporting MDEV-20225 fix into 10.1

Backported the support for aborting and replaying stored procedure and fix for trigger
key assigments from 10.4 version.
Backported also two mtr tests: wsrep_sp_bf_abort and MDEV-20225
parent dbe447a7
CREATE TABLE t1 (f1 INT NOT NULL PRIMARY KEY AUTO_INCREMENT) ENGINE=InnoDB;
CREATE TABLE t2 (f1 INT NOT NULL PRIMARY KEY AUTO_INCREMENT, f2 INT) ENGINE=InnoDB;
CREATE TRIGGER tr1 BEFORE INSERT ON t1 FOR EACH ROW INSERT INTO t2 VALUES (NULL, NEW.f1);
SET SESSION wsrep_sync_wait = 0;
SET GLOBAL wsrep_slave_threads = 2;
SET GLOBAL debug_dbug = 'd,sync.mdev_20225';
DROP TRIGGER tr1;
INSERT INTO t1 VALUES (NULL);
SET GLOBAL debug_dbug = 'RESET';
SET DEBUG_SYNC = 'now SIGNAL signal.mdev_20225_continue';
SET DEBUG_SYNC = 'RESET';
SET GLOBAL wsrep_slave_threads = 1;
SHOW TRIGGERS;
Trigger Event Table Statement Timing Created sql_mode Definer character_set_client collation_connection Database Collation
DROP TABLE t1;
DROP TABLE t2;
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 CHAR(1));
SET SESSION wsrep_sync_wait = 0;
CREATE PROCEDURE proc_update_insert()
BEGIN
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
INSERT INTO t1 VALUES (2, 'c');
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
CALL proc_update_insert;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
CREATE PROCEDURE proc_update_insert_with_exit_handler()
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
INSERT INTO t1 VALUES (2, 'c');
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
CALL proc_update_insert_with_exit_handler;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
CREATE PROCEDURE proc_update_insert_with_continue_handler()
BEGIN
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END;
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
INSERT INTO t1 VALUES (2, 'c');
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
CALL proc_update_insert_with_continue_handler;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
CREATE PROCEDURE proc_update_insert_transaction()
BEGIN
START TRANSACTION;
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
COMMIT;
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
INSERT INTO t1 VALUES (2, 'c');
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
CALL proc_update_insert_transaction;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
Warnings:
Error 1317 Query execution was interrupted
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
CREATE PROCEDURE proc_update_insert_transaction_with_continue_handler()
BEGIN
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END;
START TRANSACTION;
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
COMMIT;
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
INSERT INTO t1 VALUES (2, 'c');
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
CALL proc_update_insert_transaction_with_continue_handler;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
Warnings:
Error 1317 Query execution was interrupted
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
CREATE PROCEDURE proc_update_insert_transaction_with_exit_handler()
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
START TRANSACTION;
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
COMMIT;
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
INSERT INTO t1 VALUES (2, 'c');
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
CALL proc_update_insert_transaction_with_exit_handler;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
Warnings:
Error 1317 Query execution was interrupted
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
CREATE PROCEDURE proc_insert_insert_conflict()
BEGIN
INSERT INTO t1 VALUES (2, 'd');
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
INSERT INTO t1 VALUES (2, 'c');
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
CALL proc_insert_insert_conflict;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
Got one of the listed errors
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 a
2 c
3 a
wsrep_local_replays
1
DELETE FROM t1;
CREATE PROCEDURE proc_insert_insert_conflict_with_exit_handler()
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION SELECT "Conflict exit handler";
INSERT INTO t1 VALUES (2, 'd');
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
INSERT INTO t1 VALUES (2, 'c');
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
CALL proc_insert_insert_conflict_with_exit_handler;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
Conflict exit handler
Conflict exit handler
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 a
2 c
3 a
wsrep_local_replays
1
DELETE FROM t1;
CREATE PROCEDURE proc_insert_insert_conflict_with_continue_handler()
BEGIN
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SELECT "Conflict continue handler";
INSERT INTO t1 VALUES (2, 'd');
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
INSERT INTO t1 VALUES (2, 'c');
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
CALL proc_insert_insert_conflict_with_continue_handler;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
Conflict continue handler
Conflict continue handler
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 a
2 c
3 a
4 d
wsrep_local_replays
1
DELETE FROM t1;
DROP PROCEDURE proc_update_insert;
DROP PROCEDURE proc_update_insert_with_continue_handler;
DROP PROCEDURE proc_update_insert_with_exit_handler;
DROP PROCEDURE proc_update_insert_transaction;
DROP PROCEDURE proc_update_insert_transaction_with_continue_handler;
DROP PROCEDURE proc_update_insert_transaction_with_exit_handler;
DROP PROCEDURE proc_insert_insert_conflict;
DROP PROCEDURE proc_insert_insert_conflict_with_exit_handler;
DROP PROCEDURE proc_insert_insert_conflict_with_continue_handler;
DROP TABLE t1;
#
# MDEV-20225 - Verify that DROP TRIGGER gets keys assigned corresponding
# to all affected tables.
#
--source include/galera_cluster.inc
--source include/have_innodb.inc
CREATE TABLE t1 (f1 INT NOT NULL PRIMARY KEY AUTO_INCREMENT) ENGINE=InnoDB;
CREATE TABLE t2 (f1 INT NOT NULL PRIMARY KEY AUTO_INCREMENT, f2 INT) ENGINE=InnoDB;
CREATE TRIGGER tr1 BEFORE INSERT ON t1 FOR EACH ROW INSERT INTO t2 VALUES (NULL, NEW.f1);
--connection node_2
SET SESSION wsrep_sync_wait = 0;
SET GLOBAL wsrep_slave_threads = 2;
SET GLOBAL debug_dbug = 'd,sync.mdev_20225';
--let $galera_connection_name = node_1a
--let $galera_server_number = 1
--source include/galera_connect.inc
DROP TRIGGER tr1;
--connection node_2
--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE = 'debug sync point: now'
--source include/wait_condition.inc
--connection node_1
INSERT INTO t1 VALUES (NULL);
# We must rely on sleep here. If the bug is fixed, the second applier
# is not allowed to go past apply monitor which would trigger the bug,
# so there is no sync point or condition to wait.
--sleep 1
--connection node_2
SET GLOBAL debug_dbug = 'RESET';
SET DEBUG_SYNC = 'now SIGNAL signal.mdev_20225_continue';
SET DEBUG_SYNC = 'RESET';
SET GLOBAL wsrep_slave_threads = 1;
--let $wait_condition = SELECT COUNT(*) = 1 FROM test.t1;
--source include/wait_condition.inc
# Trigger should now be dropped on node_2.
SHOW TRIGGERS;
DROP TABLE t1;
DROP TABLE t2;
#
# Issue an INSERT for gap between 1 and 3 to node_2 and wait until it hits
# apply monitor sync point on node_1
#
--connection node_1a
--let $galera_sync_point = apply_monitor_slave_enter_sync
--source include/galera_set_sync_point.inc
--connection node_2
--eval $galera_sp_bf_abort_conflict
--connection node_1a
--source include/galera_wait_sync_point.inc
--source include/galera_clear_sync_point.inc
# Send a procedure to node_1 which should take a gap lock between
# rows 1 and 3. It does not conflict with INSERT from node_2 in
# certification. Park the UPDATE after replicate and let INSERT to
# continue applying, generating a BF abort.
--let $galera_sync_point = after_replicate_sync
--source include/galera_set_sync_point.inc
--connection node_1
--send_eval CALL $galera_sp_bf_abort_proc
--connection node_1a
--let $galera_sync_point = after_replicate_sync apply_monitor_slave_enter_sync
--source include/galera_wait_sync_point.inc
--source include/galera_clear_sync_point.inc
--let $galera_sync_point = apply_monitor_slave_enter_sync
--source include/galera_signal_sync_point.inc
--let $galera_sync_point = after_replicate_sync
--source include/galera_signal_sync_point.inc
This diff is collapsed.
...@@ -44,6 +44,9 @@ ...@@ -44,6 +44,9 @@
#include "transaction.h" // trans_commit_stmt #include "transaction.h" // trans_commit_stmt
#include "sql_audit.h" #include "sql_audit.h"
#include "debug_sync.h" #include "debug_sync.h"
#ifdef WITH_WSREP
#include "wsrep_thd.h"
#endif /* WITH_WSREP */
/* /*
Sufficient max length of printed destinations and frame offsets (all uints). Sufficient max length of printed destinations and frame offsets (all uints).
...@@ -1307,7 +1310,93 @@ sp_head::execute(THD *thd, bool merge_da_on_success) ...@@ -1307,7 +1310,93 @@ sp_head::execute(THD *thd, bool merge_da_on_success)
thd->m_digest= NULL; thd->m_digest= NULL;
err_status= i->execute(thd, &ip); err_status= i->execute(thd, &ip);
#ifdef WITH_WSREP
if (m_type == TYPE_ENUM_PROCEDURE)
{
mysql_mutex_lock(&thd->LOCK_thd_data);
if (thd->wsrep_conflict_state == MUST_REPLAY)
{
wsrep_replay_sp_transaction(thd);
err_status= thd->get_stmt_da()->is_set();
thd->wsrep_conflict_state= NO_CONFLICT;
}
else if (thd->wsrep_conflict_state == ABORTED ||
thd->wsrep_conflict_state == CERT_FAILURE)
{
/*
If the statement execution was BF aborted or was aborted
due to certification failure, clean up transaction here
and reset conflict state to NO_CONFLICT and thd->killed
to THD::NOT_KILLED. Error handling is done based on err_status
below. Error must have been raised by wsrep hton code before
entering here.
*/
DBUG_ASSERT(err_status);
DBUG_ASSERT(thd->get_stmt_da()->is_error());
thd->wsrep_conflict_state= NO_CONFLICT;
thd->killed= NOT_KILLED;
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
}
#endif /* WITH_WSREP */
#ifdef WITH_WSREP_NO
if (WSREP(thd))
{
if (((thd->wsrep_trx().state() == wsrep::transaction::s_executing) &&
(thd->is_fatal_error || thd->killed)))
{
WSREP_DEBUG("SP abort err status %d in sub %d trx state %d",
err_status, thd->in_sub_stmt, thd->wsrep_trx().state());
err_status= 1;
thd->is_fatal_error= 1;
/*
SP was killed, and it is not due to a wsrep conflict.
We skip after_command hook at this point because
otherwise it clears the error, and cleans up the
whole transaction. For now we just return and finish
our handling once we are back to mysql_parse.
*/
WSREP_DEBUG("Skipping after_command hook for killed SP");
}
else
{
const bool must_replay= wsrep_must_replay(thd);
if (must_replay)
{
WSREP_DEBUG("MUST_REPLAY set after SP, err_status %d trx state: %d",
err_status, thd->wsrep_trx().state());
}
(void) wsrep_after_statement(thd);
/*
Reset the return code to zero if the transaction was
replayed succesfully.
*/
if (must_replay && !wsrep_current_error(thd))
{
err_status= 0;
thd->get_stmt_da()->reset_diagnostics_area();
}
/*
Final wsrep error status for statement is known only after
wsrep_after_statement() call. If the error is set, override
error in thd diagnostics area and reset wsrep client_state error
so that the error does not get propagated via client-server protocol.
*/
if (wsrep_current_error(thd))
{
wsrep_override_error(thd, wsrep_current_error(thd),
wsrep_current_error_status(thd));
thd->wsrep_cs().reset_error();
/* Reset also thd->killed if it has been set during BF abort. */
if (thd->killed == KILL_QUERY)
thd->killed= NOT_KILLED;
/* if failed transaction was not replayed, must return with error from here */
if (!must_replay) err_status = 1;
}
}
}
#endif /* WITH_WSREP */
thd->m_digest= parent_digest; thd->m_digest= parent_digest;
if (i->free_list) if (i->free_list)
......
...@@ -34,7 +34,9 @@ ...@@ -34,7 +34,9 @@
#include "sql_handler.h" // mysql_ha_rm_tables #include "sql_handler.h" // mysql_ha_rm_tables
#include "sp_cache.h" // sp_invalidate_cache #include "sp_cache.h" // sp_invalidate_cache
#include <mysys_err.h> #include <mysys_err.h>
#ifdef WITH_WSREP
#include "debug_sync.h"
#endif /* WITH_WSREP */
/*************************************************************************/ /*************************************************************************/
template <class T> template <class T>
...@@ -506,7 +508,7 @@ bool mysql_create_or_drop_trigger(THD *thd, TABLE_LIST *tables, bool create) ...@@ -506,7 +508,7 @@ bool mysql_create_or_drop_trigger(THD *thd, TABLE_LIST *tables, bool create)
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (thd->wsrep_exec_mode == LOCAL_STATE) if (thd->wsrep_exec_mode == LOCAL_STATE)
WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL); WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, tables);
#endif #endif
/* We should have only one table in table list. */ /* We should have only one table in table list. */
...@@ -568,6 +570,16 @@ bool mysql_create_or_drop_trigger(THD *thd, TABLE_LIST *tables, bool create) ...@@ -568,6 +570,16 @@ bool mysql_create_or_drop_trigger(THD *thd, TABLE_LIST *tables, bool create)
goto end; goto end;
} }
#ifdef WITH_WSREP
DBUG_EXECUTE_IF("sync.mdev_20225",
{
const char act[]=
"now "
"wait_for signal.mdev_20225_continue";
DBUG_ASSERT(!debug_sync_set_action(thd,
STRING_WITH_LEN(act)));
};);
#endif /* WITH_WSREP */
result= (create ? result= (create ?
table->triggers->create_trigger(thd, tables, &stmt_query): table->triggers->create_trigger(thd, tables, &stmt_query):
table->triggers->drop_trigger(thd, tables, &stmt_query)); table->triggers->drop_trigger(thd, tables, &stmt_query));
......
...@@ -37,6 +37,8 @@ extern "C" int thd_binlog_format(const MYSQL_THD thd); ...@@ -37,6 +37,8 @@ extern "C" int thd_binlog_format(const MYSQL_THD thd);
void wsrep_cleanup_transaction(THD *thd) void wsrep_cleanup_transaction(THD *thd)
{ {
if (!WSREP(thd)) return; if (!WSREP(thd)) return;
DBUG_ASSERT(thd->wsrep_conflict_state != MUST_REPLAY &&
thd->wsrep_conflict_state != REPLAYING);
if (wsrep_emulate_bin_log) thd_binlog_trx_reset(thd); if (wsrep_emulate_bin_log) thd_binlog_trx_reset(thd);
thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID; thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
...@@ -136,7 +138,11 @@ void wsrep_post_commit(THD* thd, bool all) ...@@ -136,7 +138,11 @@ void wsrep_post_commit(THD* thd, bool all)
/* non-InnoDB statements may have populated events in stmt cache /* non-InnoDB statements may have populated events in stmt cache
=> cleanup => cleanup
*/ */
WSREP_DEBUG("cleanup transaction for LOCAL_STATE"); if (thd->wsrep_conflict_state != MUST_REPLAY)
{
WSREP_DEBUG("cleanup transaction for LOCAL_STATE: %s",
WSREP_QUERY(thd));
}
/* /*
Run post-rollback hook to clean up in the case if Run post-rollback hook to clean up in the case if
some keys were populated for the transaction in provider some keys were populated for the transaction in provider
...@@ -145,13 +151,18 @@ void wsrep_post_commit(THD* thd, bool all) ...@@ -145,13 +151,18 @@ void wsrep_post_commit(THD* thd, bool all)
rolls back to savepoint after first operation. rolls back to savepoint after first operation.
*/ */
if (all && thd->wsrep_conflict_state != MUST_REPLAY && if (all && thd->wsrep_conflict_state != MUST_REPLAY &&
wsrep && wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) thd->wsrep_conflict_state != REPLAYING &&
wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
{ {
WSREP_WARN("post_rollback fail: %llu %d", WSREP_WARN("post_rollback fail: %llu %d",
(long long)thd->thread_id, thd->get_stmt_da()->status()); (long long)thd->thread_id, thd->get_stmt_da()->status());
} }
wsrep_cleanup_transaction(thd); if (thd->wsrep_conflict_state != MUST_REPLAY &&
break; thd->wsrep_conflict_state != REPLAYING)
{
wsrep_cleanup_transaction(thd);
}
break;
} }
default: break; default: break;
} }
...@@ -575,7 +586,8 @@ wsrep_run_wsrep_commit(THD *thd, bool all) ...@@ -575,7 +586,8 @@ wsrep_run_wsrep_commit(THD *thd, bool all)
DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED);
/* fall through */ /* fall through */
case WSREP_TRX_FAIL: case WSREP_TRX_FAIL:
WSREP_DEBUG("commit failed for reason: %d", rcode); WSREP_DEBUG("commit failed for reason: %d conf %d",
rcode, thd->wsrep_conflict_state);
DBUG_PRINT("wsrep", ("replicating commit fail")); DBUG_PRINT("wsrep", ("replicating commit fail"));
thd->wsrep_query_state= QUERY_EXEC; thd->wsrep_query_state= QUERY_EXEC;
......
...@@ -1570,7 +1570,6 @@ static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table, ...@@ -1570,7 +1570,6 @@ static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
case SQLCOM_CREATE_TRIGGER: case SQLCOM_CREATE_TRIGGER:
DBUG_ASSERT(!table_list);
DBUG_ASSERT(first_table); DBUG_ASSERT(first_table);
if (find_temporary_table(thd, first_table)) if (find_temporary_table(thd, first_table))
...@@ -1579,6 +1578,14 @@ static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table, ...@@ -1579,6 +1578,14 @@ static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
} }
return true; return true;
case SQLCOM_DROP_TRIGGER:
DBUG_ASSERT(table_list);
if (find_temporary_table(thd, table_list))
{
return false;
}
return true;
default: default:
if (table && !find_temporary_table(thd, db, table)) if (table && !find_temporary_table(thd, db, table))
{ {
......
...@@ -90,10 +90,10 @@ void wsrep_client_rollback(THD *thd) ...@@ -90,10 +90,10 @@ void wsrep_client_rollback(THD *thd)
#define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1 #define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1
#define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
static rpl_group_info* wsrep_relay_group_init(const char* log_fname) static rpl_group_info* wsrep_relay_group_init(THD *thd, const char* log_fname)
{ {
Relay_log_info* rli= new Relay_log_info(false); Relay_log_info* rli= new Relay_log_info(false);
WSREP_DEBUG("wsrep_relay_group_init %s", log_fname);
rli->no_storage= true; rli->no_storage= true;
if (!rli->relay_log.description_event_for_exec) if (!rli->relay_log.description_event_for_exec)
{ {
...@@ -123,7 +123,7 @@ static rpl_group_info* wsrep_relay_group_init(const char* log_fname) ...@@ -123,7 +123,7 @@ static rpl_group_info* wsrep_relay_group_init(const char* log_fname)
rli->mi = new Master_info(&connection_name, false); rli->mi = new Master_info(&connection_name, false);
struct rpl_group_info *rgi= new rpl_group_info(rli); struct rpl_group_info *rgi= new rpl_group_info(rli);
rgi->thd= rli->sql_driver_thd= current_thd; rgi->thd= rli->sql_driver_thd= thd;
if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
{ {
...@@ -148,7 +148,7 @@ static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) ...@@ -148,7 +148,7 @@ static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
else else
thd->variables.option_bits&= ~(OPTION_BIN_LOG); thd->variables.option_bits&= ~(OPTION_BIN_LOG);
if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init("wsrep_relay"); if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init(thd, "wsrep_relay");
/* thd->system_thread_info.rpl_sql_info isn't initialized. */ /* thd->system_thread_info.rpl_sql_info isn't initialized. */
thd->system_thread_info.rpl_sql_info= thd->system_thread_info.rpl_sql_info=
...@@ -189,6 +189,109 @@ static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) ...@@ -189,6 +189,109 @@ static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
thd->set_row_count_func(shadow->row_count_func); thd->set_row_count_func(shadow->row_count_func);
} }
void wsrep_replay_sp_transaction(THD* thd)
{
DBUG_ENTER("wsrep_replay_sp_transaction");
mysql_mutex_assert_owner(&thd->LOCK_thd_data);
DBUG_ASSERT(thd->wsrep_conflict_state == MUST_REPLAY);
DBUG_ASSERT(wsrep_thd_trx_seqno(thd) > 0);
WSREP_DEBUG("replaying SP transaction %llu", thd->thread_id);
close_thread_tables(thd);
if (thd->locked_tables_mode && thd->lock)
{
WSREP_DEBUG("releasing table lock for replaying (%u)",
thd->thread_id);
thd->locked_tables_list.unlock_locked_tables(thd);
thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
}
thd->mdl_context.release_transactional_locks();
mysql_mutex_unlock(&thd->LOCK_thd_data);
THD *replay_thd= new THD(true);
replay_thd->thread_stack= thd->thread_stack;
struct wsrep_thd_shadow shadow;
wsrep_prepare_bf_thd(replay_thd, &shadow);
WSREP_DEBUG("replaying set for %p rgi %p", replay_thd, replay_thd->wsrep_rgi); replay_thd->wsrep_trx_meta= thd->wsrep_trx_meta;
replay_thd->wsrep_ws_handle= thd->wsrep_ws_handle;
replay_thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
replay_thd->wsrep_conflict_state= REPLAYING;
replay_thd->variables.option_bits|= OPTION_BEGIN;
replay_thd->server_status|= SERVER_STATUS_IN_TRANS;
thd->reset_globals();
replay_thd->store_globals();
wsrep_status_t rcode= wsrep->replay_trx(wsrep,
&replay_thd->wsrep_ws_handle,
(void*) replay_thd);
wsrep_return_from_bf_mode(replay_thd, &shadow);
replay_thd->reset_globals();
delete replay_thd;
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->store_globals();
switch (rcode)
{
case WSREP_OK:
{
thd->wsrep_conflict_state= NO_CONFLICT;
thd->killed= NOT_KILLED;
wsrep_status_t rcode= wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
if (rcode != WSREP_OK)
{
WSREP_WARN("Post commit failed for SP replay: thd: %u error: %d",
thd->thread_id, rcode);
}
/* As replaying the transaction was successful, an error must not
be returned to client, so we need to reset the error state of
the diagnostics area */
thd->get_stmt_da()->reset_diagnostics_area();
break;
}
case WSREP_TRX_FAIL:
{
thd->wsrep_conflict_state= ABORTED;
wsrep_status_t rcode= wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
if (rcode != WSREP_OK)
{
WSREP_WARN("Post rollback failed for SP replay: thd: %u error: %d",
thd->thread_id, rcode);
}
if (thd->get_stmt_da()->is_set())
{
thd->get_stmt_da()->reset_diagnostics_area();
}
my_error(ER_LOCK_DEADLOCK, MYF(0));
break;
}
default:
WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
rcode,
(thd->db ? thd->db : "(null)"),
WSREP_QUERY(thd));
/* we're now in inconsistent state, must abort */
mysql_mutex_unlock(&thd->LOCK_thd_data);
unireg_abort(1);
break;
}
wsrep_cleanup_transaction(thd);
mysql_mutex_lock(&LOCK_wsrep_replaying);
wsrep_replaying--;
WSREP_DEBUG("replaying decreased: %d, thd: %u",
wsrep_replaying, thd->thread_id);
mysql_cond_broadcast(&COND_wsrep_replaying);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
DBUG_VOID_RETURN;
}
void wsrep_replay_transaction(THD *thd) void wsrep_replay_transaction(THD *thd)
{ {
DBUG_ENTER("wsrep_replay_transaction"); DBUG_ENTER("wsrep_replay_transaction");
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff, int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
enum enum_var_type scope); enum enum_var_type scope);
void wsrep_client_rollback(THD *thd); void wsrep_client_rollback(THD *thd);
void wsrep_replay_sp_transaction(THD* thd);
void wsrep_replay_transaction(THD *thd); void wsrep_replay_transaction(THD *thd);
void wsrep_create_appliers(long threads); void wsrep_create_appliers(long threads);
void wsrep_create_rollbacker(); void wsrep_create_rollbacker();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment