Commit 1bc82aaf authored by Jan Lindström's avatar Jan Lindström

MDEV-26352 : Add new thread states for certain WSREP scenarios

This adds following new thread states:
* waiting to execute in isolation - DDL is waiting to execute in TOI mode.
* waiting for TOI DDL - some other statement is waiting for DDL to complete.
* waiting for flow control - some statement is paused while flow control is in effect.
* waiting for certification - the transaction is being certified.
parent 21ce6912
......@@ -10,31 +10,31 @@ INSERT INTO t1 VALUES (1);
INSERT INTO t2 VALUES (1);
connection node_2a;
SET SESSION wsrep_sync_wait=0;
SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%committing%';
COUNT(*) = 1
SELECT COUNT(*) AS EXPECT_1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE (STATE LIKE '%committing%' or STATE = 'Waiting for certification');
EXPECT_1
1
SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Waiting for table metadata lock%';
COUNT(*) = 1
1
SELECT COUNT(*) = 0 FROM t1;
COUNT(*) = 0
1
SELECT COUNT(*) = 0 FROM t2;
COUNT(*) = 0
SELECT COUNT(*) AS EXPECT_1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Waiting for table metadata lock%';
EXPECT_1
1
SELECT COUNT(*) AS EXPECT_0 FROM t1;
EXPECT_0
0
SELECT COUNT(*) AS EXPECT_0 FROM t2;
EXPECT_0
0
connection node_2;
UNLOCK TABLES;
connection node_2a;
SET SESSION wsrep_sync_wait = 15;;
SELECT COUNT(*) = 1 FROM t1;
COUNT(*) = 1
1
SELECT COUNT(*) = 1 FROM t2;
COUNT(*) = 1
SELECT COUNT(*) AS EXPECT_1 FROM t1;
EXPECT_1
1
SELECT COUNT(*) = 2 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%committed%';
COUNT(*) = 2
SELECT COUNT(*) AS EXPECT_1 FROM t2;
EXPECT_1
1
SELECT COUNT(*) AS EXPECT_2 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE (STATE LIKE '%committed%' or STATE = 'Waiting for certification');
EXPECT_2
2
SET GLOBAL wsrep_slave_threads = 1;;
DROP TABLE t1;
DROP TABLE t2;
......@@ -56,7 +56,7 @@ ALTER TABLE t1 ADD COLUMN (f4 int(10));
# reach commit stage. In the unlikely case the interleaving is different, the
# result of the test should not change.
--connection node_1c
--let $wait_condition = SELECT COUNT(*)=1 FROM information_schema.processlist WHERE State='Commit' AND ID=$insert_id
--let $wait_condition = SELECT COUNT(*)=1 FROM information_schema.processlist WHERE (State='Commit' OR State='Waiting for certification') AND ID=$insert_id
--source include/wait_condition.inc
--let $wait_condition = SELECT COUNT(*)=1 FROM information_schema.metadata_lock_info WHERE TABLE_NAME='t1' AND THREAD_ID=$insert_id
--source include/wait_condition.inc
......@@ -83,11 +83,11 @@ ALTER TABLE t1 ADD COLUMN (f4 int(10));
# wait for insert to get blocked
--connection node_1c
--let $wait_condition = SELECT COUNT(*)=1 FROM information_schema.processlist WHERE State='Commit' AND ID=$insert_id
--let $wait_condition = SELECT COUNT(*)=1 FROM information_schema.processlist WHERE (State='Commit' OR State='Waiting for certification') AND ID=$insert_id
--source include/wait_condition.inc
--let $wait_condition = SELECT COUNT(*)=1 FROM information_schema.metadata_lock_info WHERE TABLE_NAME='t1' AND THREAD_ID=$insert_id
--source include/wait_condition.inc
--let $wait_condition = SELECT COUNT(*)=2 FROM information_schema.processlist WHERE Info like 'INSERT INTO t1 (f1) values("node1%")' AND State = 'Commit'
--let $wait_condition = SELECT COUNT(*)=2 FROM information_schema.processlist WHERE Info like 'INSERT INTO t1 (f1) values("node1%")' AND (State = 'Commit' OR State='Waiting for certification')
--source include/wait_condition.inc
# nothing after BLOCK_DDL is applied
......
......@@ -32,19 +32,19 @@ INSERT INTO t2 VALUES (1);
--connection node_2a
--sleep 1
SET SESSION wsrep_sync_wait=0;
SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%committing%';
SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Waiting for table metadata lock%';
SELECT COUNT(*) = 0 FROM t1;
SELECT COUNT(*) = 0 FROM t2;
SELECT COUNT(*) AS EXPECT_1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE (STATE LIKE '%committing%' or STATE = 'Waiting for certification');
SELECT COUNT(*) AS EXPECT_1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Waiting for table metadata lock%';
SELECT COUNT(*) AS EXPECT_0 FROM t1;
SELECT COUNT(*) AS EXPECT_0 FROM t2;
--connection node_2
UNLOCK TABLES;
--connection node_2a
--eval SET SESSION wsrep_sync_wait = $wsrep_sync_wait_orig;
SELECT COUNT(*) = 1 FROM t1;
SELECT COUNT(*) = 1 FROM t2;
SELECT COUNT(*) = 2 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%committed%';
SELECT COUNT(*) AS EXPECT_1 FROM t1;
SELECT COUNT(*) AS EXPECT_1 FROM t2;
SELECT COUNT(*) AS EXPECT_2 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE (STATE LIKE '%committed%' or STATE = 'Waiting for certification');
--eval SET GLOBAL wsrep_slave_threads = $wsrep_slave_threads_orig;
DROP TABLE t1;
......
connection node_2;
connection node_1;
connect node_3, 127.0.0.1, root, , test, $NODE_MYPORT_3;
connection node_1;
connection node_2;
connection node_3;
connection node_1;
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 CHAR(1)) ENGINE=InnoDB;
connection node_1;
SET GLOBAL wsrep_on=OFF;
......@@ -9,7 +14,6 @@ connection node_2;
SET GLOBAL wsrep_on=OFF;
INSERT INTO t1 VALUES (1, 'a');
SET GLOBAL wsrep_on=ON;
connect node_3, 127.0.0.1, root, , test, $NODE_MYPORT_3;
connection node_3;
INSERT INTO t1 VALUES (1, 'b');
SET SESSION wsrep_sync_wait = 0;
......
......@@ -5,6 +5,15 @@
--source include/galera_cluster.inc
--source include/have_innodb.inc
--connect node_3, 127.0.0.1, root, , test, $NODE_MYPORT_3
# Save original auto_increment_offset values.
--let $node_1=node_1
--let $node_2=node_2
--let $node_3=node_3
--source ../galera/include/auto_increment_offset_save.inc
--connection node_1
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 CHAR(1)) ENGINE=InnoDB;
--connection node_1
......@@ -20,7 +29,6 @@ SET GLOBAL wsrep_on=OFF;
INSERT INTO t1 VALUES (1, 'a');
SET GLOBAL wsrep_on=ON;
--connect node_3, 127.0.0.1, root, , test, $NODE_MYPORT_3
--connection node_3
INSERT INTO t1 VALUES (1, 'b');
SET SESSION wsrep_sync_wait = 0;
......@@ -63,4 +71,4 @@ CALL mtr.add_suppression("WSREP: Vote 0 \\\(success\\\) on (.*) is inconsistent
CALL mtr.add_suppression("WSREP: Inconsistency detected: Inconsistent by consensus on ");
CALL mtr.add_suppression("Plugin 'InnoDB' will be forced to shutdown");
--source ../galera/include/auto_increment_offset_restore.inc
/* Copyright (c) 2018, 2020, MariaDB Corporation.
/* Copyright (c) 2018, 2021, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
......@@ -381,6 +381,9 @@ bool backup_end(THD *thd)
if (WSREP_NNULL(thd) && thd->wsrep_desynced_backup_stage)
{
Wsrep_server_state &server_state= Wsrep_server_state::instance();
THD_STAGE_INFO(thd, stage_waiting_flow);
WSREP_DEBUG("backup_end: waiting for flow control for %s",
wsrep_thd_query(thd));
server_state.resume_and_resync();
thd->wsrep_desynced_backup_stage= false;
}
......
/*
Copyright (c) 2000, 2011, Oracle and/or its affiliates.
Copyright (c) 2020, MariaDB
Copyright (c) 2020, 2021, MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -1138,6 +1138,9 @@ void Global_read_lock::unlock_global_read_lock(THD *thd)
else if (WSREP_NNULL(thd) &&
server_state.state() == Wsrep_server_state::s_synced)
{
THD_STAGE_INFO(thd, stage_waiting_flow);
WSREP_DEBUG("unlock_global_read_lock: waiting for flow control for %s",
wsrep_thd_query(thd));
server_state.resume_and_resync();
wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
}
......
/* Copyright (c) 2007, 2012, Oracle and/or its affiliates.
Copyright (c) 2020, MariaDB
Copyright (c) 2020, 2021, MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -2304,6 +2304,20 @@ MDL_context::acquire_lock(MDL_request *mdl_request, double lock_wait_timeout)
DBUG_RETURN(TRUE);
}
#ifdef WITH_WSREP
if (WSREP(get_thd()))
{
THD* requester= get_thd();
bool requester_toi= wsrep_thd_is_toi(requester) || wsrep_thd_is_applying(requester);
WSREP_DEBUG("::acquire_lock is TOI %d for %s", requester_toi,
wsrep_thd_query(requester));
if (requester_toi)
THD_STAGE_INFO(requester, stage_waiting_ddl);
else
THD_STAGE_INFO(requester, stage_waiting_isolation);
}
#endif /* WITH_WSREP */
lock->m_waiting.add_ticket(ticket);
/*
......
......@@ -9183,6 +9183,14 @@ PSI_stage_info stage_starting= { 0, "starting", 0};
PSI_stage_info stage_waiting_for_flush= { 0, "Waiting for non trans tables to be flushed", 0};
PSI_stage_info stage_waiting_for_ddl= { 0, "Waiting for DDLs", 0};
#ifdef WITH_WSREP
// Aditional Galera thread states
PSI_stage_info stage_waiting_isolation= { 0, "Waiting to execute in isolation", 0};
PSI_stage_info stage_waiting_certification= {0, "Waiting for certification", 0};
PSI_stage_info stage_waiting_ddl= {0, "Waiting for TOI DDL", 0};
PSI_stage_info stage_waiting_flow= {0, "Waiting for flow control", 0};
#endif /* WITH_WSREP */
PSI_memory_key key_memory_DATE_TIME_FORMAT;
PSI_memory_key key_memory_DDL_LOG_MEMORY_ENTRY;
PSI_memory_key key_memory_Event_queue_element_for_exec_names;
......@@ -9402,6 +9410,13 @@ PSI_stage_info *all_server_stages[]=
& stage_reading_semi_sync_ack,
& stage_waiting_for_deadlock_kill,
& stage_starting
#ifdef WITH_WSREP
,
& stage_waiting_isolation,
& stage_waiting_certification,
& stage_waiting_ddl,
& stage_waiting_flow
#endif /* WITH_WSREP */
};
PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection;
......
/* Copyright (c) 2006, 2016, Oracle and/or its affiliates.
Copyright (c) 2010, 2020, MariaDB Corporation.
Copyright (c) 2010, 2021, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -679,6 +679,13 @@ extern PSI_stage_info stage_slave_background_process_request;
extern PSI_stage_info stage_slave_background_wait_request;
extern PSI_stage_info stage_waiting_for_deadlock_kill;
extern PSI_stage_info stage_starting;
#ifdef WITH_WSREP
// Aditional Galera thread states
extern PSI_stage_info stage_waiting_isolation;
extern PSI_stage_info stage_waiting_certification;
extern PSI_stage_info stage_waiting_ddl;
extern PSI_stage_info stage_waiting_flow;
#endif /* WITH_WSREP */
#ifdef HAVE_PSI_STATEMENT_INTERFACE
/**
......
......@@ -5059,7 +5059,8 @@ bool select_create::send_eof()
{
WSREP_DEBUG("select_create commit failed, thd: %llu err: %s %s",
thd->thread_id,
wsrep_thd_transaction_state_str(thd), wsrep_thd_query(thd));
wsrep_thd_transaction_state_str(thd),
wsrep_thd_query(thd));
mysql_mutex_unlock(&thd->LOCK_thd_data);
abort_result_set();
DBUG_RETURN(true);
......
......@@ -6126,7 +6126,12 @@ mysql_execute_command(THD *thd, bool is_called_from_prepared_stmt)
thd->wsrep_consistency_check= NO_CONSISTENCY_CHECK;
if (wsrep_thd_is_toi(thd) || wsrep_thd_is_in_rsu(thd))
{
WSREP_DEBUG("mysql_execute_command for %s", wsrep_thd_query(thd));
THD_STAGE_INFO(thd, stage_waiting_isolation);
wsrep_to_isolation_end(thd);
}
/*
Force release of transactional locks if not in active MST and wsrep is on.
*/
......@@ -7890,7 +7895,8 @@ static bool wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
DBUG_ASSERT(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
});
WSREP_DEBUG("wsrep retrying AC query: %lu %s",
thd->wsrep_retry_counter, wsrep_thd_query(thd));
thd->wsrep_retry_counter,
wsrep_thd_query(thd));
wsrep_prepare_for_autocommit_retry(thd, rawbuf, length, parser_state);
if (thd->lex->explain)
delete_explain_query(thd->lex);
......
......@@ -2552,6 +2552,8 @@ static int wsrep_TOI_begin(THD *thd, const char *db, const char *table,
}
thd_proc_info(thd, "acquiring total order isolation");
WSREP_DEBUG("wsrep_TOI_begin for %s", wsrep_thd_query(thd));
THD_STAGE_INFO(thd, stage_waiting_isolation);
wsrep::client_state& cs(thd->wsrep_cs());
......@@ -2894,31 +2896,38 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx,
const MDL_ticket *ticket,
const MDL_key *key)
{
/* Fallback to the non-wsrep behaviour */
if (!WSREP_ON) return;
THD *request_thd= requestor_ctx->get_thd();
THD *granted_thd= ticket->get_ctx()->get_thd();
/* Fallback to the non-wsrep behaviour */
if (!WSREP(request_thd)) return;
const char* schema= key->db_name();
int schema_len= key->db_name_length();
mysql_mutex_lock(&request_thd->LOCK_thd_data);
if (wsrep_thd_is_toi(request_thd) ||
wsrep_thd_is_applying(request_thd)) {
if (wsrep_thd_is_toi(request_thd) ||
wsrep_thd_is_applying(request_thd))
{
WSREP_DEBUG("wsrep_handle_mdl_conflict request TOI/APPLY for %s",
wsrep_thd_query(request_thd));
THD_STAGE_INFO(request_thd, stage_waiting_isolation);
mysql_mutex_unlock(&request_thd->LOCK_thd_data);
WSREP_MDL_LOG(DEBUG, "MDL conflict ", schema, schema_len,
request_thd, granted_thd);
ticket->wsrep_report(wsrep_debug);
mysql_mutex_lock(&granted_thd->LOCK_thd_data);
if (wsrep_thd_is_toi(granted_thd) ||
wsrep_thd_is_applying(granted_thd))
{
if (wsrep_thd_is_aborting(granted_thd))
{
WSREP_DEBUG("BF thread waiting for SR in aborting state");
WSREP_DEBUG("BF thread waiting for SR in aborting state for %s",
wsrep_thd_query(request_thd));
THD_STAGE_INFO(request_thd, stage_waiting_isolation);
ticket->wsrep_report(wsrep_debug);
mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
}
......@@ -2927,6 +2936,9 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx,
WSREP_MDL_LOG(INFO, "MDL conflict, DDL vs SR",
schema, schema_len, request_thd, granted_thd);
mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
WSREP_DEBUG("wsrep_handle_mdl_conflict DDL vs SR for %s",
wsrep_thd_query(request_thd));
THD_STAGE_INFO(request_thd, stage_waiting_isolation);
wsrep_abort_thd(request_thd, granted_thd, 1);
}
else
......@@ -2941,14 +2953,18 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx,
else if (granted_thd->lex->sql_command == SQLCOM_FLUSH ||
granted_thd->mdl_context.has_explicit_locks())
{
WSREP_DEBUG("BF thread waiting for FLUSH");
WSREP_DEBUG("BF thread waiting for FLUSH for %s",
wsrep_thd_query(request_thd));
THD_STAGE_INFO(request_thd, stage_waiting_ddl);
ticket->wsrep_report(wsrep_debug);
mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
}
else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE)
{
WSREP_DEBUG("DROP caused BF abort, conf %s",
wsrep_thd_transaction_state_str(granted_thd));
WSREP_DEBUG("DROP caused BF abort, conf %s for %s",
wsrep_thd_transaction_state_str(granted_thd),
wsrep_thd_query(request_thd));
THD_STAGE_INFO(request_thd, stage_waiting_isolation);
ticket->wsrep_report(wsrep_debug);
mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
wsrep_abort_thd(request_thd, granted_thd, 1);
......@@ -2957,7 +2973,11 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx,
{
WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len,
request_thd, granted_thd);
WSREP_DEBUG("wsrep_handle_mdl_conflict -> BF abort for %s",
wsrep_thd_query(request_thd));
THD_STAGE_INFO(request_thd, stage_waiting_isolation);
ticket->wsrep_report(wsrep_debug);
if (granted_thd->wsrep_trx().active())
{
mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
......@@ -2970,13 +2990,15 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx,
thd is BF, BF abort and wait.
*/
mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
if (wsrep_thd_is_BF(request_thd, FALSE))
{
ha_abort_transaction(request_thd, granted_thd, TRUE);
}
else
{
WSREP_MDL_LOG(INFO, "MDL unknown BF-BF conflict", schema, schema_len,
WSREP_MDL_LOG(INFO, "MDL unknown BF-BF conflict",
schema, schema_len,
request_thd, granted_thd);
ticket->wsrep_report(true);
unireg_abort(1);
......
/* Copyright 2016-2019 Codership Oy <http://www.codership.com>
/* Copyright 2016-2021 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -272,8 +272,10 @@ static inline int wsrep_before_commit(THD* thd, bool all)
WSREP_DEBUG("wsrep_before_commit: %d, %lld",
wsrep_is_real(thd, all),
(long long)wsrep_thd_trx_seqno(thd));
THD_STAGE_INFO(thd, stage_waiting_certification);
int ret= 0;
DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
if ((ret= thd->wsrep_cs().before_commit()) == 0)
{
DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
......
......@@ -836,10 +836,11 @@ bool wsrep_desync_check (sys_var *self, THD* thd, set_var* var)
return true;
}
} else {
THD_STAGE_INFO(thd, stage_waiting_flow);
ret= Wsrep_server_state::instance().provider().resync();
if (ret != WSREP_OK) {
WSREP_WARN ("SET resync failed %d for schema: %s, query: %s", ret,
thd->get_db(), thd->query());
thd->get_db(), wsrep_thd_query(thd));
my_error (ER_CANNOT_USER, MYF(0), "'resync'", thd->query());
return true;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment