Commit 72a5a4f1 authored by Daniele Sciascia's avatar Daniele Sciascia Committed by Jan Lindström

MDEV-20780 Fixes for failures on galera_sr_ddl_master (#1425)

Test galera_sr_ddl_master would sometimes fail due to leftover
streaming replication fragments. Rollbacker thread would attempt to
open streaming_log table to remove the fragments, but would fail in
check_stack_overrun(). Ultimately the check_stack_overrun() failure
was caused by rollbacker missing to switch the victim's THD thread
stack to rollbacker's thread stack.

Also in this patch:
- Remove duplicate functionality in rollbacker helper functions,
  and extract rollbacker fragment removal into function
  wsrep_remove_streaming_fragments()
- Reuse open_for_write() in wsrep_schema::remove_fragments
- Partially revert changes to galera_sr_ddl_master test from
  commit 44a11a7c. Removed unnecessary
  wait condition and isolation level setting
parent 7c2c420b
galera_sr_table_contents : missing file
GCF-437 : test relies on InnoDB redo log size limitation
galera_sr_ddl_master : MDEV-20780 Galera test failure on galera_sr.galera_sr_ddl_master
GCF-1043A : MDEV-21170 Galera test failure on galera_sr.GCF-1043A
......@@ -48,8 +48,6 @@ SELECT COUNT(*) as expect_0 FROM mysql.wsrep_streaming_log;
expect_0
0
connection node_2;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
set global wsrep_sync_wait=15;
SELECT COUNT(*) as expect_6 FROM t1;
expect_6
6
......
......@@ -59,15 +59,8 @@ SELECT * FROM t1;
SELECT COUNT(*) as expect_0 FROM mysql.wsrep_streaming_log;
--connection node_2
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
set global wsrep_sync_wait=15;
--let $wait_condition = SELECT COUNT(*) = 6 FROM t1;
--source include/wait_condition.inc
SELECT COUNT(*) as expect_6 FROM t1;
SELECT * FROM t1;
--let $wait_condition = SELECT COUNT(*) = 0 FROM mysql.wsrep_streaming_log;
--source include/wait_condition.inc
SELECT COUNT(*) as expect_0 FROM mysql.wsrep_streaming_log;
DROP TABLE t1;
......@@ -1049,37 +1049,23 @@ int Wsrep_schema::remove_fragments(THD* thd,
Wsrep_schema_impl::wsrep_off wsrep_off(thd);
Wsrep_schema_impl::binlog_off binlog_off(thd);
/*
Open SR table for write.
Adopted from Rpl_info_table_access::open_table()
*/
uint flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY |
MYSQL_OPEN_IGNORE_FLUSH |
MYSQL_LOCK_IGNORE_TIMEOUT);
Query_tables_list query_tables_list_backup;
Open_tables_backup open_tables_backup;
thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
thd->reset_n_backup_open_tables_state(&open_tables_backup);
TABLE_LIST tables;
LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
LEX_CSTRING table_str= { sr_table_str.c_str(), sr_table_str.length() };
tables.init_one_table(&schema_str,
&table_str, 0, TL_WRITE);
if (!open_n_lock_single_table(thd, &tables, tables.lock_type, flags))
TABLE* frag_table= 0;
if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))
{
WSREP_DEBUG("Failed to open SR table for access");
ret= 1;
}
else
{
tables.table->use_all_columns();
for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
i != fragments.end(); ++i)
{
if (remove_fragment(thd,
tables.table,
frag_table,
server_id,
transaction_id, *i))
{
......
......@@ -42,15 +42,13 @@ static void init_service_thd(THD* thd, char* thread_stack)
thd->reset_for_next_command(true);
}
wsrep::storage_service* Wsrep_server_service::storage_service(
wsrep::client_service& client_service)
Wsrep_storage_service*
wsrep_create_storage_service(THD* orig_THD, const char* ctx)
{
Wsrep_client_service& cs=
static_cast<Wsrep_client_service&>(client_service);
THD* thd= new THD(next_thread_id(), true);
init_service_thd(thd, cs.m_thd->thread_stack);
WSREP_DEBUG("Created storage service with thread id %llu",
thd->thread_id);
THD* thd= new THD(true, true);
init_service_thd(thd, orig_THD->thread_stack);
WSREP_DEBUG("Created storage service in %s context with thread id %llu",
ctx, thd->thread_id);
/* Use variables from the current thd attached to client_service.
This is because we need to be able to BF abort storage access
operations. */
......@@ -58,17 +56,20 @@ wsrep::storage_service* Wsrep_server_service::storage_service(
return new Wsrep_storage_service(thd);
}
wsrep::storage_service* Wsrep_server_service::storage_service(
wsrep::client_service& client_service)
{
Wsrep_client_service& cs=
static_cast<Wsrep_client_service&>(client_service);
return wsrep_create_storage_service(cs.m_thd, "local");
}
wsrep::storage_service* Wsrep_server_service::storage_service(
wsrep::high_priority_service& high_priority_service)
{
Wsrep_high_priority_service& hps=
static_cast<Wsrep_high_priority_service&>(high_priority_service);
THD* thd= new THD(next_thread_id(), true);
init_service_thd(thd, hps.m_thd->thread_stack);
WSREP_DEBUG("Created high priority storage service with thread id %llu",
thd->thread_id);
wsrep_assign_from_threadvars(thd);
return new Wsrep_storage_service(thd);
return wsrep_create_storage_service(hps.m_thd, "high priority");
}
void Wsrep_server_service::release_storage_service(
......
......@@ -87,4 +87,14 @@ class Wsrep_applier_service;
Wsrep_applier_service*
wsrep_create_streaming_applier(THD *orig_thd, const char *ctx);
/**
Helper method to create new storage service.
@param orig_thd Original thd context to copy operation context from.
@param ctx Context string for debug logging.
*/
class Wsrep_storage_service;
Wsrep_storage_service*
wsrep_create_storage_service(THD *orig_thd, const char *ctx);
#endif /* WSREP_SERVER_SERVICE */
......@@ -136,47 +136,26 @@ void wsrep_create_appliers(long threads)
}
}
static void wsrep_rollback_streaming_aborted_by_toi(THD *thd)
static void wsrep_remove_streaming_fragments(THD* thd, const char* ctx)
{
WSREP_INFO("wsrep_rollback_streaming_aborted_by_toi");
/* Set thd->event_scheduler.data temporarily to NULL to avoid
callbacks to threadpool wait_begin() during rollback. */
auto saved_esd= thd->event_scheduler.data;
thd->event_scheduler.data= 0;
if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority)
{
DBUG_ASSERT(!saved_esd);
DBUG_ASSERT(thd->wsrep_applier_service);
thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
Wsrep_storage_service* storage_service= wsrep_create_storage_service(thd, ctx);
storage_service->store_globals();
storage_service->adopt_transaction(thd->wsrep_trx());
storage_service->remove_fragments();
storage_service->commit(wsrep::ws_handle(transaction_id, 0),
wsrep::ws_meta());
thd->wsrep_applier_service->after_apply();
/* Will free THD */
Wsrep_server_state::instance().server_service().
release_high_priority_service(thd->wsrep_applier_service);
}
else
{
mysql_mutex_lock(&thd->LOCK_thd_data);
/* prepare THD for rollback processing */
thd->reset_for_next_command(true);
thd->lex->sql_command= SQLCOM_ROLLBACK;
mysql_mutex_unlock(&thd->LOCK_thd_data);
/* Perform a client rollback, restore globals and signal
the victim only when all the resources have been
released */
thd->wsrep_cs().client_service().bf_rollback();
wsrep_reset_threadvars(thd);
/* Assign saved event_scheduler.data back before letting
client to continue. */
thd->event_scheduler.data= saved_esd;
thd->wsrep_cs().sync_rollback_complete();
}
Wsrep_server_state::instance().server_service()
.release_storage_service(storage_service);
wsrep_store_threadvars(thd);
}
static void wsrep_rollback_high_priority(THD *thd)
static void wsrep_rollback_high_priority(THD *thd, THD *rollbacker)
{
WSREP_INFO("rollbacker aborting SR thd: (%lld %llu)",
thd->thread_id, (long long)thd->real_id);
WSREP_DEBUG("Rollbacker aborting SR applier thd (%llu %lu)",
thd->thread_id, thd->real_id);
char* orig_thread_stack= thd->thread_stack;
thd->thread_stack= rollbacker->thread_stack;
DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
/* Must be streaming and must have been removed from the
server state streaming appliers map. */
......@@ -190,45 +169,27 @@ static void wsrep_rollback_high_priority(THD *thd)
the transaction non-observable in SR table after the rollback
completes. For correctness the order does not matter here,
but currently it is mandated by checks in some MTR tests. */
wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
Wsrep_storage_service* storage_service=
static_cast<Wsrep_storage_service*>(
Wsrep_server_state::instance().server_service().storage_service(
*thd->wsrep_applier_service));
storage_service->store_globals();
storage_service->adopt_transaction(thd->wsrep_trx());
storage_service->remove_fragments();
storage_service->commit(wsrep::ws_handle(transaction_id, 0),
wsrep::ws_meta());
Wsrep_server_state::instance().server_service().release_storage_service(storage_service);
wsrep_store_threadvars(thd);
wsrep_remove_streaming_fragments(thd, "high priority");
thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
wsrep::ws_meta());
thd->wsrep_applier_service->after_apply();
thd->thread_stack= orig_thread_stack;
WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
thd->thread_id, thd->real_id);
/* Will free THD */
Wsrep_server_state::instance().server_service()
.release_high_priority_service(thd->wsrep_applier_service);
}
static void wsrep_rollback_local(THD *thd)
static void wsrep_rollback_local(THD *thd, THD *rollbacker)
{
WSREP_INFO("Wsrep_rollback_local");
WSREP_DEBUG("Rollbacker aborting local thd (%llu %lu)",
thd->thread_id, thd->real_id);
char* orig_thread_stack= thd->thread_stack;
thd->thread_stack= rollbacker->thread_stack;
if (thd->wsrep_trx().is_streaming())
{
wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
Wsrep_storage_service* storage_service=
static_cast<Wsrep_storage_service*>(
Wsrep_server_state::instance().server_service().
storage_service(thd->wsrep_cs().client_service()));
storage_service->store_globals();
storage_service->adopt_transaction(thd->wsrep_trx());
storage_service->remove_fragments();
storage_service->commit(wsrep::ws_handle(transaction_id, 0),
wsrep::ws_meta());
Wsrep_server_state::instance().server_service().
release_storage_service(storage_service);
wsrep_store_threadvars(thd);
wsrep_remove_streaming_fragments(thd, "local");
}
/* Set thd->event_scheduler.data temporarily to NULL to avoid
callbacks to threadpool wait_begin() during rollback. */
......@@ -247,9 +208,10 @@ static void wsrep_rollback_local(THD *thd)
/* Assign saved event_scheduler.data back before letting
client to continue. */
thd->event_scheduler.data= saved_esd;
thd->thread_stack= orig_thread_stack;
thd->wsrep_cs().sync_rollback_complete();
WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)",
thd->thread_id, (long long)thd->real_id);
WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
thd->thread_id, thd->real_id);
}
static void wsrep_rollback_process(THD *rollbacker,
......@@ -286,18 +248,13 @@ static void wsrep_rollback_process(THD *rollbacker,
/* Rollback methods below may free thd pointer. Do not try
to access it after method returns. */
if (thd->wsrep_trx().is_streaming() &&
thd->wsrep_trx().bf_aborted_in_total_order())
{
wsrep_rollback_streaming_aborted_by_toi(thd);
}
else if (wsrep_thd_is_applying(thd))
if (wsrep_thd_is_applying(thd))
{
wsrep_rollback_high_priority(thd);
wsrep_rollback_high_priority(thd, rollbacker);
}
else
{
wsrep_rollback_local(thd);
wsrep_rollback_local(thd, rollbacker);
}
wsrep_store_threadvars(rollbacker);
thd_proc_info(rollbacker, "wsrep aborter idle");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment