Commit 9487e0b2 authored by Teemu Ollakka's avatar Teemu Ollakka Committed by Jan Lindström

MDEV-19826 10.4 seems to crash with "pool-of-threads" (#1370)

MariaDB 10.4 was crashing when thread-handling was set to
pool-of-threads and wsrep was enabled.

There were two apparent reasons for the crash:
- Connection handling in threadpool_common.cc was missing calls to
  control wsrep client state.
- Thread specific storage which contains thread variables (THR_KEY_mysys)
  was not handled appropriately by wsrep patch when pool-of-threads
  was configured.

This patch addresses the above issues in the following way:
- Wsrep client state open/close was moved in thd_prepare_connection() and
  end_connection() to have common handling for one-thread-per-connection
  and pool-of-threads.
- Thread local storage handling in wsrep patch was reworked by introducing
  set of wsrep_xxx_threadvars() calls which replace calls to
  THD store_globals()/reset_globals() and deal with thread handling
  specifics internally.

Wsrep-lib was updated to version which relaxes internal concurrency
related sanity checks.

Rollback code from wsrep_rollback_process() was extracted to separate calls
for better readability.

Post rollback thread was removed as it was completely unused.
parent d22f8c45
......@@ -7,7 +7,6 @@ WHERE name LIKE 'thread/sql/wsrep%'
ORDER BY name;
name thread/sql/wsrep_applier_thread
name thread/sql/wsrep_rollbacker_thread
name thread/sql/wsrep_rollbacker_thread
use test;
create table t1 (a int not null primary key) engine=innodb;
insert into t1 values (1),(2);
......
......@@ -146,11 +146,10 @@ extern "C" void wsrep_handle_SR_rollback(THD *bf_thd,
victim_thd->wsrep_trx_id(),
victim_thd->wsrep_sr().fragments_certified(),
wsrep_thd_transaction_state_str(victim_thd));
if (bf_thd && bf_thd != victim_thd)
{
victim_thd->store_globals();
}
else
/* Note: do not store/reset globals before wsrep_bf_abort() call
to avoid losing BF thd context. */
if (!(bf_thd && bf_thd != victim_thd))
{
DEBUG_SYNC(victim_thd, "wsrep_before_SR_rollback");
}
......@@ -162,21 +161,24 @@ extern "C" void wsrep_handle_SR_rollback(THD *bf_thd,
{
wsrep_thd_self_abort(victim_thd);
}
if (bf_thd && bf_thd != victim_thd)
if (bf_thd)
{
bf_thd->store_globals();
wsrep_store_threadvars(bf_thd);
}
}
extern "C" my_bool wsrep_thd_bf_abort(const THD *bf_thd, THD *victim_thd,
my_bool signal)
{
/* Note: do not store/reset globals before wsrep_bf_abort() call
to avoid losing BF thd context. */
if (WSREP(victim_thd) && !victim_thd->wsrep_trx().active())
{
WSREP_DEBUG("BF abort for non active transaction");
wsrep_start_transaction(victim_thd, victim_thd->wsrep_next_trx_id());
}
my_bool ret= wsrep_bf_abort(bf_thd, victim_thd);
wsrep_store_threadvars((THD*)bf_thd);
/*
Send awake signal if victim was BF aborted or does not
have wsrep on. Note that this should never interrupt RSU
......
......@@ -1188,6 +1188,16 @@ void end_connection(THD *thd)
{
NET *net= &thd->net;
#ifdef WITH_WSREP
if (thd->wsrep_cs().state() == wsrep::client_state::s_exec)
{
/* Error happened after the thread acquired ownership to wsrep
client state, but before command was processed. Clean up the
state before wsrep_close(). */
wsrep_after_command_ignore_result(thd);
}
wsrep_close(thd);
#endif /* WITH_WSREP */
if (thd->user_connect)
{
/*
......@@ -1321,6 +1331,7 @@ bool thd_prepare_connection(THD *thd)
prepare_new_connection_state(thd);
#ifdef WITH_WSREP
thd->wsrep_client_thread= true;
wsrep_open(thd);
#endif /* WITH_WSREP */
return FALSE;
}
......@@ -1393,9 +1404,6 @@ void do_handle_one_connection(CONNECT *connect)
create_user= FALSE;
goto end_thread;
}
#ifdef WITH_WSREP
wsrep_open(thd);
#endif /* WITH_WSREP */
while (thd_is_connection_alive(thd))
{
......@@ -1406,10 +1414,6 @@ void do_handle_one_connection(CONNECT *connect)
}
end_connection(thd);
#ifdef WITH_WSREP
wsrep_close(thd);
#endif /* WITH_WSREP */
end_thread:
close_connection(thd);
......
......@@ -23,7 +23,9 @@
#include <sql_audit.h>
#include <debug_sync.h>
#include <threadpool.h>
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
#endif /* WITH_WSREP */
/* Threadpool parameters */
......@@ -137,6 +139,11 @@ static inline void set_thd_idle(THD *thd)
*/
static void thread_attach(THD* thd)
{
#ifdef WITH_WSREP
/* Wait until possible background rollback has finished before
attaching the thd. */
wsrep_wait_rollback_complete_and_acquire_ownership(thd);
#endif /* WITH_WSREP */
pthread_setspecific(THR_KEY_mysys,thd->mysys_var);
thd->thread_stack=(char*)&thd;
thd->store_globals();
......
......@@ -30,9 +30,9 @@
#include "slave.h" /* opt_log_slave_updates */
#include "transaction.h" /* trans_commit()... */
#include "log.h" /* stmt_has_updated_trans_table() */
//#include "debug_sync.h"
#include "mysql/service_debug_sync.h"
#include "mysql/psi/mysql_thread.h" /* mysql_mutex_assert_owner() */
namespace
{
......@@ -57,16 +57,12 @@ Wsrep_client_service::Wsrep_client_service(THD* thd,
void Wsrep_client_service::store_globals()
{
DBUG_ENTER("Wsrep_client_service::store_globals");
m_thd->store_globals();
DBUG_VOID_RETURN;
wsrep_store_threadvars(m_thd);
}
void Wsrep_client_service::reset_globals()
{
DBUG_ENTER("Wsrep_client_service::reset_globals");
m_thd->reset_globals();
DBUG_VOID_RETURN;
wsrep_reset_threadvars(m_thd);
}
bool Wsrep_client_service::interrupted(
......
......@@ -379,20 +379,13 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
void Wsrep_high_priority_service::store_globals()
{
DBUG_ENTER("Wsrep_high_priority_service::store_globals");
/* In addition to calling THD::store_globals(), call
wsrep::client_state::store_globals() to gain ownership of
the client state */
m_thd->store_globals();
m_thd->wsrep_cs().store_globals();
DBUG_VOID_RETURN;
wsrep_store_threadvars(m_thd);
m_thd->wsrep_cs().acquire_ownership();
}
void Wsrep_high_priority_service::reset_globals()
{
DBUG_ENTER("Wsrep_high_priority_service::reset_globals");
m_thd->reset_globals();
DBUG_VOID_RETURN;
wsrep_reset_threadvars(m_thd);
}
void Wsrep_high_priority_service::switch_execution_context(wsrep::high_priority_service& orig_high_priority_service)
......@@ -572,11 +565,14 @@ Wsrep_replayer_service::Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd)
thd_proc_info(orig_thd, "wsrep replaying trx");
/*
Swith execution context to replayer_thd and prepare it for
Switch execution context to replayer_thd and prepare it for
replay execution.
*/
orig_thd->reset_globals();
replayer_thd->store_globals();
/* Copy thd vars from orig_thd before reset, otherwise reset
for orig thd clears thread local storage before copy. */
wsrep_assign_from_threadvars(replayer_thd);
wsrep_reset_threadvars(orig_thd);
wsrep_store_threadvars(replayer_thd);
wsrep_open(replayer_thd);
wsrep_before_command(replayer_thd);
replayer_thd->wsrep_cs().clone_transaction_for_replay(orig_thd->wsrep_trx());
......@@ -593,8 +589,8 @@ Wsrep_replayer_service::~Wsrep_replayer_service()
wsrep_after_apply(replayer_thd);
wsrep_after_command_ignore_result(replayer_thd);
wsrep_close(replayer_thd);
replayer_thd->reset_globals();
orig_thd->store_globals();
wsrep_reset_threadvars(replayer_thd);
wsrep_store_threadvars(orig_thd);
DBUG_ASSERT(!orig_thd->get_stmt_da()->is_sent());
DBUG_ASSERT(!orig_thd->get_stmt_da()->is_set());
......
......@@ -2243,6 +2243,7 @@ static void wsrep_close_thread(THD *thd)
{
thd->set_killed(KILL_CONNECTION);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
mysql_mutex_lock(&thd->LOCK_thd_kill);
if (thd->mysys_var)
{
thd->mysys_var->abort=1;
......@@ -2255,6 +2256,7 @@ static void wsrep_close_thread(THD *thd)
}
mysql_mutex_unlock(&thd->mysys_var->mutex);
}
mysql_mutex_unlock(&thd->LOCK_thd_kill);
}
static my_bool have_committing_connections(THD *thd, void *)
......@@ -2658,7 +2660,8 @@ void* start_wsrep_THD(void *arg)
/* now that we've called my_thread_init(), it is safe to call DBUG_* */
thd->thread_stack= (char*) &thd;
if (thd->store_globals())
wsrep_assign_from_threadvars(thd);
if (wsrep_store_threadvars(thd))
{
close_connection(thd, ER_OUT_OF_RESOURCES);
statistic_increment(aborted_connects,&LOCK_status);
......@@ -2703,7 +2706,7 @@ void* start_wsrep_THD(void *arg)
/* Wsrep may reset globals during thread context switches, store globals
before cleanup. */
thd->store_globals();
wsrep_store_threadvars(thd);
close_connection(thd, 0);
......
......@@ -29,6 +29,7 @@
#include "wsrep_binlog.h"
#include "wsrep_high_priority_service.h"
#include "wsrep_storage_service.h"
#include "wsrep_thd.h"
#include <string>
#include <sstream>
......@@ -145,13 +146,13 @@ class thd_context_switch
: m_orig_thd(orig_thd)
, m_cur_thd(cur_thd)
{
m_orig_thd->reset_globals();
m_cur_thd->store_globals();
wsrep_reset_threadvars(m_orig_thd);
wsrep_store_threadvars(m_cur_thd);
}
~thd_context_switch()
{
m_cur_thd->reset_globals();
m_orig_thd->store_globals();
wsrep_reset_threadvars(m_cur_thd);
wsrep_store_threadvars(m_orig_thd);
}
private:
THD *m_orig_thd;
......@@ -595,7 +596,8 @@ static void wsrep_init_thd_for_schema(THD *thd)
thd->variables.option_bits |= OPTION_LOG_OFF;
/* Read committed isolation to avoid gap locking */
thd->variables.tx_isolation= ISO_READ_COMMITTED;
thd->store_globals();
wsrep_assign_from_threadvars(thd);
wsrep_store_threadvars(thd);
}
int Wsrep_schema::init()
......@@ -1123,6 +1125,7 @@ int Wsrep_schema::replay_transaction(THD* orig_thd,
THD thd(next_thread_id(), true);
thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
(char*) &thd);
wsrep_assign_from_threadvars(&thd);
Wsrep_schema_impl::wsrep_off wsrep_off(&thd);
Wsrep_schema_impl::binlog_off binlog_off(&thd);
......@@ -1228,6 +1231,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
THD storage_thd(next_thread_id(), true);
storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
(char*) &storage_thd);
wsrep_assign_from_threadvars(&storage_thd);
TABLE* frag_table= 0;
TABLE* cluster_table= 0;
Wsrep_storage_service storage_service(&storage_thd);
......@@ -1333,12 +1337,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
transaction_id)))
{
DBUG_ASSERT(wsrep::starts_transaction(flags));
THD* thd= new THD(next_thread_id(), true);
thd->thread_stack= (char*)&storage_thd;
thd->real_id= pthread_self();
applier= new Wsrep_applier_service(thd);
applier = wsrep_create_streaming_applier(&storage_thd, "recovery");
server_state.start_streaming_applier(server_id, transaction_id,
applier);
applier->start_transaction(wsrep::ws_handle(transaction_id, 0),
......@@ -1364,6 +1363,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
Wsrep_schema_impl::end_scan(frag_table);
Wsrep_schema_impl::finish_stmt(&storage_thd);
trans_commit(&storage_thd);
storage_thd.set_mysys_var(0);
out:
DBUG_RETURN(ret);
}
......@@ -26,6 +26,7 @@
#include "wsrep_mysqld.h"
#include "wsrep_schema.h"
#include "wsrep_utils.h"
#include "wsrep_thd.h"
#include "log.h" /* sql_print_xxx() */
#include "sql_class.h" /* system variables */
......@@ -50,6 +51,10 @@ wsrep::storage_service* Wsrep_server_service::storage_service(
init_service_thd(thd, cs.m_thd->thread_stack);
WSREP_DEBUG("Created storage service with thread id %llu",
thd->thread_id);
/* Use variables from the current thd attached to client_service.
This is because we need to be able to BF abort storage access
operations. */
wsrep_assign_from_threadvars(thd);
return new Wsrep_storage_service(thd);
}
......@@ -62,6 +67,7 @@ wsrep::storage_service* Wsrep_server_service::storage_service(
init_service_thd(thd, hps.m_thd->thread_stack);
WSREP_DEBUG("Created high priority storage service with thread id %llu",
thd->thread_id);
wsrep_assign_from_threadvars(thd);
return new Wsrep_storage_service(thd);
}
......@@ -71,21 +77,48 @@ void Wsrep_server_service::release_storage_service(
Wsrep_storage_service* ss=
static_cast<Wsrep_storage_service*>(storage_service);
THD* thd= ss->m_thd;
wsrep_reset_threadvars(thd);
delete ss;
delete thd;
}
Wsrep_applier_service*
wsrep_create_streaming_applier(THD *orig_thd, const char *ctx)
{
/* Reset variables to allow creating new variables in thread local
storage for new THD if needed. Note that reset must be done for
current_thd, as orig_thd may not be in effect. This may be the case when
streaming transaction is BF aborted and streaming applier
is created from BF aborter context. */
Wsrep_threadvars saved_threadvars(wsrep_save_threadvars());
wsrep_reset_threadvars(saved_threadvars.cur_thd);
THD *thd= 0;
Wsrep_applier_service *ret= 0;
if (!wsrep_create_threadvars() &&
(thd= new THD(next_thread_id(), true)))
{
init_service_thd(thd, orig_thd->thread_stack);
wsrep_assign_from_threadvars(thd);
WSREP_DEBUG("Created streaming applier service in %s context with "
"thread id %llu", ctx, thd->thread_id);
if (!(ret= new (std::nothrow) Wsrep_applier_service(thd)))
{
delete thd;
}
}
/* Restore original thread local storage state before returning. */
wsrep_restore_threadvars(saved_threadvars);
wsrep_store_threadvars(saved_threadvars.cur_thd);
return ret;
}
wsrep::high_priority_service*
Wsrep_server_service::streaming_applier_service(
wsrep::client_service& orig_client_service)
{
Wsrep_client_service& orig_cs=
static_cast<Wsrep_client_service&>(orig_client_service);
THD* thd= new THD(next_thread_id(), true);
init_service_thd(thd, orig_cs.m_thd->thread_stack);
WSREP_DEBUG("Created streaming applier service in local context with "
"thread id %llu", thd->thread_id);
return new Wsrep_applier_service(thd);
return wsrep_create_streaming_applier(orig_cs.m_thd, "local");
}
wsrep::high_priority_service*
......@@ -94,11 +127,7 @@ Wsrep_server_service::streaming_applier_service(
{
Wsrep_high_priority_service&
orig_hps(static_cast<Wsrep_high_priority_service&>(orig_high_priority_service));
THD* thd= new THD(next_thread_id(), true);
init_service_thd(thd, orig_hps.m_thd->thread_stack);
WSREP_DEBUG("Created streaming applier service in high priority "
"context with thread id %llu", thd->thread_id);
return new Wsrep_applier_service(thd);
return wsrep_create_streaming_applier(orig_hps.m_thd, "high priority");
}
void Wsrep_server_service::release_high_priority_service(wsrep::high_priority_service* high_priority_service)
......@@ -107,7 +136,9 @@ void Wsrep_server_service::release_high_priority_service(wsrep::high_priority_se
static_cast<Wsrep_high_priority_service*>(high_priority_service);
THD* thd= hps->m_thd;
delete hps;
wsrep_store_threadvars(thd);
delete thd;
wsrep_delete_threadvars();
}
void Wsrep_server_service::background_rollback(wsrep::client_state& client_state)
......
......@@ -77,5 +77,14 @@ class Wsrep_server_service : public wsrep::server_service
Wsrep_server_state& m_server_state;
};
/**
Helper method to create new streaming applier.
@param orig_thd Original thd context to copy operation context from.
@param ctx Context string for debug logging.
*/
class Wsrep_applier_service;
Wsrep_applier_service*
wsrep_create_streaming_applier(THD *orig_thd, const char *ctx);
#endif /* WSREP_SERVER_SERVICE */
......@@ -27,6 +27,8 @@
#include "wsrep_priv.h"
#include "wsrep_utils.h"
#include "wsrep_xid.h"
#include "wsrep_thd.h"
#include <cstdio>
#include <cstdlib>
......@@ -237,7 +239,7 @@ void wsrep_sst_received (THD* thd,
wsrep thread pool. Restore original thd context before returning.
*/
if (thd) {
thd->store_globals();
wsrep_store_threadvars(thd);
}
else {
my_pthread_setspecific_ptr(THR_THD, NULL);
......@@ -509,7 +511,8 @@ static void* sst_joiner_thread (void* a)
thd->system_thread= SYSTEM_THREAD_GENERIC;
thd->real_id= pthread_self();
thd->store_globals();
wsrep_assign_from_threadvars(thd);
wsrep_store_threadvars(thd);
/* */
thd->variables.wsrep_on = 0;
......
......@@ -196,18 +196,10 @@ int Wsrep_storage_service::rollback(const wsrep::ws_handle& ws_handle,
void Wsrep_storage_service::store_globals()
{
DBUG_ENTER("Wsrep_storage_service::store_globals");
DBUG_PRINT("info", ("Wsrep_storage_service::store_globals(%llu, %p)",
m_thd->thread_id, m_thd));
m_thd->store_globals();
DBUG_VOID_RETURN;
wsrep_store_threadvars(m_thd);
}
void Wsrep_storage_service::reset_globals()
{
DBUG_ENTER("Wsrep_storage_service::reset_globals");
DBUG_PRINT("info", ("Wsrep_storage_service::reset_globals(%llu, %p)",
m_thd->thread_id, m_thd));
m_thd->reset_globals();
DBUG_VOID_RETURN;
wsrep_reset_threadvars(m_thd);
}
......@@ -31,8 +31,9 @@
#include "rpl_rli.h"
#include "rpl_mi.h"
extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
static Wsrep_thd_queue* wsrep_rollback_queue= 0;
static Wsrep_thd_queue* wsrep_post_rollback_queue= 0;
static Atomic_counter<uint64_t> wsrep_bf_aborts_counter;
......@@ -149,43 +150,16 @@ void wsrep_create_appliers(long threads)
}
}
static void wsrep_rollback_process(THD *rollbacker,
void *arg __attribute__((unused)))
static void wsrep_rollback_streaming_aborted_by_toi(THD *thd)
{
DBUG_ENTER("wsrep_rollback_process");
THD* thd= NULL;
DBUG_ASSERT(!wsrep_rollback_queue);
wsrep_rollback_queue= new Wsrep_thd_queue(rollbacker);
WSREP_INFO("Starting rollbacker thread %llu", rollbacker->thread_id);
thd_proc_info(rollbacker, "wsrep aborter idle");
while ((thd= wsrep_rollback_queue->pop_front()) != NULL)
{
mysql_mutex_lock(&thd->LOCK_thd_data);
wsrep::client_state& cs(thd->wsrep_cs());
const wsrep::transaction& tx(cs.transaction());
if (tx.state() == wsrep::transaction::s_aborted)
{
WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d",
(long long)thd->real_id,
tx.state());
mysql_mutex_unlock(&thd->LOCK_thd_data);
continue;
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
thd_proc_info(rollbacker, "wsrep aborter active");
wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
if (thd->wsrep_trx().is_streaming() &&
thd->wsrep_trx().bf_aborted_in_total_order())
{
thd->store_globals();
thd->wsrep_cs().store_globals();
WSREP_INFO("wsrep_rollback_streaming_aborted_by_toi");
/* Set thd->event_scheduler.data temporarily to NULL to avoid
callbacks to threadpool wait_begin() during rollback. */
auto saved_esd= thd->event_scheduler.data;
thd->event_scheduler.data= 0;
if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority)
{
DBUG_ASSERT(!saved_esd);
DBUG_ASSERT(thd->wsrep_applier_service);
thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
wsrep::ws_meta());
......@@ -205,13 +179,17 @@ static void wsrep_rollback_process(THD *rollbacker,
the victim only when all the resources have been
released */
thd->wsrep_cs().client_service().bf_rollback();
thd->reset_globals();
wsrep_reset_threadvars(thd);
/* Assign saved event_scheduler.data back before letting
client to continue. */
thd->event_scheduler.data= saved_esd;
thd->wsrep_cs().sync_rollback_complete();
}
}
else if (wsrep_thd_is_applying(thd))
{
WSREP_DEBUG("rollbacker aborting SR thd: (%lld %llu)",
}
static void wsrep_rollback_high_priority(THD *thd)
{
WSREP_INFO("rollbacker aborting SR thd: (%lld %llu)",
thd->thread_id, (long long)thd->real_id);
DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
/* Must be streaming and must have been removed from the
......@@ -226,6 +204,7 @@ static void wsrep_rollback_process(THD *rollbacker,
the transaction non-observable in SR table after the rollback
completes. For correctness the order does not matter here,
but currently it is mandated by checks in some MTR tests. */
wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
Wsrep_storage_service* storage_service=
static_cast<Wsrep_storage_service*>(
Wsrep_server_state::instance().server_service().storage_service(
......@@ -236,20 +215,21 @@ static void wsrep_rollback_process(THD *rollbacker,
storage_service->commit(wsrep::ws_handle(transaction_id, 0),
wsrep::ws_meta());
Wsrep_server_state::instance().server_service().release_storage_service(storage_service);
thd->store_globals();
thd->wsrep_cs().store_globals();
wsrep_store_threadvars(thd);
thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
wsrep::ws_meta());
thd->wsrep_applier_service->after_apply();
/* Will free THD */
Wsrep_server_state::instance().server_service()
.release_high_priority_service(thd->wsrep_applier_service);
}
}
else
{
static void wsrep_rollback_local(THD *thd)
{
WSREP_INFO("Wsrep_rollback_local");
if (thd->wsrep_trx().is_streaming())
{
wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
Wsrep_storage_service* storage_service=
static_cast<Wsrep_storage_service*>(
Wsrep_server_state::instance().server_service().
......@@ -262,9 +242,12 @@ static void wsrep_rollback_process(THD *rollbacker,
wsrep::ws_meta());
Wsrep_server_state::instance().server_service().
release_storage_service(storage_service);
wsrep_store_threadvars(thd);
}
thd->store_globals();
thd->wsrep_cs().store_globals();
/* Set thd->event_scheduler.data temporarily to NULL to avoid
callbacks to threadpool wait_begin() during rollback. */
auto saved_esd= thd->event_scheduler.data;
thd->event_scheduler.data= 0;
mysql_mutex_lock(&thd->LOCK_thd_data);
/* prepare THD for rollback processing */
thd->reset_for_next_command();
......@@ -274,55 +257,73 @@ static void wsrep_rollback_process(THD *rollbacker,
the victim only when all the resources have been
released */
thd->wsrep_cs().client_service().bf_rollback();
thd->reset_globals();
wsrep_reset_threadvars(thd);
/* Assign saved event_scheduler.data back before letting
client to continue. */
thd->event_scheduler.data= saved_esd;
thd->wsrep_cs().sync_rollback_complete();
WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)",
thd->thread_id, (long long)thd->real_id);
}
thd_proc_info(rollbacker, "wsrep aborter idle");
}
delete wsrep_rollback_queue;
wsrep_rollback_queue= NULL;
WSREP_INFO("rollbacker thread exiting %llu", rollbacker->thread_id);
DBUG_ASSERT(rollbacker->killed != NOT_KILLED);
DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
DBUG_VOID_RETURN;
}
static void wsrep_post_rollback_process(THD *post_rollbacker,
static void wsrep_rollback_process(THD *rollbacker,
void *arg __attribute__((unused)))
{
DBUG_ENTER("wsrep_post_rollback_process");
THD* thd= NULL;
DBUG_ENTER("wsrep_rollback_process");
WSREP_INFO("Starting post rollbacker thread %llu", post_rollbacker->thread_id);
DBUG_ASSERT(!wsrep_post_rollback_queue);
wsrep_post_rollback_queue= new Wsrep_thd_queue(post_rollbacker);
THD* thd= NULL;
DBUG_ASSERT(!wsrep_rollback_queue);
wsrep_rollback_queue= new Wsrep_thd_queue(rollbacker);
WSREP_INFO("Starting rollbacker thread %llu", rollbacker->thread_id);
while ((thd= wsrep_post_rollback_queue->pop_front()) != NULL)
thd_proc_info(rollbacker, "wsrep aborter idle");
while ((thd= wsrep_rollback_queue->pop_front()) != NULL)
{
thd->store_globals();
wsrep::client_state& cs(thd->wsrep_cs());
mysql_mutex_lock(&thd->LOCK_thd_data);
DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting);
WSREP_DEBUG("post rollbacker calling post rollback for thd %llu, conf %s",
thd->thread_id, wsrep_thd_transaction_state_str(thd));
cs.after_rollback();
DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborted);
wsrep::client_state& cs(thd->wsrep_cs());
const wsrep::transaction& tx(cs.transaction());
if (tx.state() == wsrep::transaction::s_aborted)
{
WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d",
(long long)thd->real_id,
tx.state());
mysql_mutex_unlock(&thd->LOCK_thd_data);
continue;
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
delete wsrep_post_rollback_queue;
wsrep_post_rollback_queue= NULL;
wsrep_reset_threadvars(rollbacker);
wsrep_store_threadvars(thd);
thd->wsrep_cs().acquire_ownership();
DBUG_ASSERT(post_rollbacker->killed != NOT_KILLED);
DBUG_PRINT("wsrep",("wsrep post rollbacker thread exiting"));
WSREP_INFO("post rollbacker thread exiting %llu", post_rollbacker->thread_id);
thd_proc_info(rollbacker, "wsrep aborter active");
/* Rollback methods below may free thd pointer. Do not try
to access it after method returns. */
if (thd->wsrep_trx().is_streaming() &&
thd->wsrep_trx().bf_aborted_in_total_order())
{
wsrep_rollback_streaming_aborted_by_toi(thd);
}
else if (wsrep_thd_is_applying(thd))
{
wsrep_rollback_high_priority(thd);
}
else
{
wsrep_rollback_local(thd);
}
wsrep_store_threadvars(rollbacker);
thd_proc_info(rollbacker, "wsrep aborter idle");
}
delete wsrep_rollback_queue;
wsrep_rollback_queue= NULL;
WSREP_INFO("rollbacker thread exiting %llu", rollbacker->thread_id);
DBUG_ASSERT(rollbacker->killed != NOT_KILLED);
DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
DBUG_VOID_RETURN;
}
......@@ -337,14 +338,6 @@ void wsrep_create_rollbacker()
/* create rollbacker */
if (create_wsrep_THD(args))
WSREP_WARN("Can't create thread to manage wsrep rollback");
/* create post_rollbacker */
args= new Wsrep_thd_args(wsrep_post_rollback_process,
WSREP_ROLLBACKER_THREAD,
pthread_self());
if (create_wsrep_THD(args))
WSREP_WARN("Can't create thread to manage wsrep post rollback");
}
}
......@@ -438,3 +431,84 @@ void wsrep_thd_auto_increment_variables(THD* thd,
*offset= thd->variables.auto_increment_offset;
*increment= thd->variables.auto_increment_increment;
}
int wsrep_create_threadvars()
{
int ret= 0;
if (thread_handling == SCHEDULER_TYPES_COUNT)
{
/* Caller should have called wsrep_reset_threadvars() before this
method. */
DBUG_ASSERT(!pthread_getspecific(THR_KEY_mysys));
pthread_setspecific(THR_KEY_mysys, 0);
ret= my_thread_init();
}
return ret;
}
void wsrep_delete_threadvars()
{
if (thread_handling == SCHEDULER_TYPES_COUNT)
{
/* The caller should have called wsrep_store_threadvars() before
this method. */
DBUG_ASSERT(pthread_getspecific(THR_KEY_mysys));
/* Reset psi state to avoid deallocating applier thread
psi_thread. */
PSI_thread *psi_thread= PSI_CALL_get_thread();
#ifdef HAVE_PSI_INTERFACE
if (PSI_server)
{
PSI_server->set_thread(0);
}
#endif /* HAVE_PSI_INTERFACE */
my_thread_end();
PSI_CALL_set_thread(psi_thread);
pthread_setspecific(THR_KEY_mysys, 0);
}
}
void wsrep_assign_from_threadvars(THD *thd)
{
if (thread_handling == SCHEDULER_TYPES_COUNT)
{
st_my_thread_var *mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
DBUG_ASSERT(mysys_var);
thd->set_mysys_var(mysys_var);
}
}
Wsrep_threadvars wsrep_save_threadvars()
{
return Wsrep_threadvars{
current_thd,
(st_my_thread_var*) pthread_getspecific(THR_KEY_mysys)
};
}
void wsrep_restore_threadvars(const Wsrep_threadvars& globals)
{
set_current_thd(globals.cur_thd);
pthread_setspecific(THR_KEY_mysys, globals.mysys_var);
}
int wsrep_store_threadvars(THD *thd)
{
if (thread_handling == SCHEDULER_TYPES_COUNT)
{
pthread_setspecific(THR_KEY_mysys, thd->mysys_var);
}
return thd->store_globals();
}
void wsrep_reset_threadvars(THD *thd)
{
if (thread_handling == SCHEDULER_TYPES_COUNT)
{
pthread_setspecific(THR_KEY_mysys, 0);
}
else
{
thd->reset_globals();
}
}
......@@ -82,13 +82,8 @@ class Wsrep_thd_queue
mysql_cond_t COND_wsrep_thd_queue;
};
void wsrep_prepare_bf_thd(THD*, struct wsrep_thd_shadow*);
void wsrep_return_from_bf_mode(THD*, struct wsrep_thd_shadow*);
int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
enum enum_var_type scope);
void wsrep_client_rollback(THD *thd, bool rollbacker = false);
void wsrep_replay_transaction(THD *thd);
void wsrep_create_appliers(long threads);
void wsrep_create_rollbacker();
......@@ -96,8 +91,83 @@ bool wsrep_bf_abort(const THD*, THD*);
int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
my_bool signal);
extern void wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe);
THD* wsrep_start_SR_THD(char *thread_stack);
void wsrep_end_SR_THD(THD* thd);
/*
Helper methods to deal with thread local storage.
The purpose of these methods is to hide the details of thread
local storage handling when operating with wsrep storage access
and streaming applier THDs
With one-thread-per-connection thread handling thread specific
variables are allocated when the thread is started and deallocated
before thread exits (my_thread_init(), my_thread_end()). However,
with pool-of-threads thread handling new thread specific variables
are allocated for each THD separately (see threadpool_add_connection()),
and the variables in thread local storage are assigned from
currently active thread (see thread_attach()). This must be taken into
account when storing/resetting thread local storage and when creating
streaming applier THDs.
*/
/**
Create new variables for thread local storage. With
one-thread-per-connection thread handling this is a no op,
with pool-of-threads new variables are created via my_thread_init().
It is assumed that the caller has called wsrep_reset_threadvars() to clear
the thread local storage before this call.
@return Zero in case of success, non-zero otherwise.
*/
int wsrep_create_threadvars();
/**
Delete variables which were created by wsrep_create_threadvars().
The caller must store variables into thread local storage before
this call via wsrep_store_threadvars().
*/
void wsrep_delete_threadvars();
/**
Assign variables from current thread local storage into THD.
This should be called for THDs whose lifetime is limited to single
thread execution or which may share the operation context with some
parent THD (e.g. storage access) and thus don't require separately
allocated globals.
With one-thread-per-connection thread handling this is a no-op,
with pool-of-threads the variables which are currently stored into
thread local storage are assigned to THD.
*/
void wsrep_assign_from_threadvars(THD *);
/**
Helper struct to save variables from thread local storage.
*/
struct Wsrep_threadvars
{
THD* cur_thd;
st_my_thread_var* mysys_var;
};
/**
Save variables from thread local storage into Wsrep_threadvars struct.
*/
Wsrep_threadvars wsrep_save_threadvars();
/**
Restore variables into thread local storage from Wsrep_threadvars struct.
*/
void wsrep_restore_threadvars(const Wsrep_threadvars&);
/**
Store variables into thread local storage.
*/
int wsrep_store_threadvars(THD *);
/**
Reset thread local storage.
*/
void wsrep_reset_threadvars(THD *);
/**
Helper functions to override error status
......
......@@ -422,6 +422,17 @@ static inline void wsrep_close(THD* thd)
DBUG_VOID_RETURN;
}
static inline void
wsrep_wait_rollback_complete_and_acquire_ownership(THD *thd)
{
DBUG_ENTER("wsrep_wait_rollback_complete_and_acquire_ownership");
if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
{
thd->wsrep_cs().wait_rollback_complete_and_acquire_ownership();
}
DBUG_VOID_RETURN;
}
static inline int wsrep_before_command(THD* thd)
{
return (thd->wsrep_cs().state() != wsrep::client_state::s_none ?
......
......@@ -25,6 +25,7 @@
#include "wsrep_api.h"
#include "wsrep_utils.h"
#include "wsrep_mysqld.h"
#include "wsrep_thd.h"
#include <sql_class.h>
......@@ -421,7 +422,8 @@ thd::thd (my_bool won) : init(), ptr(new THD(0))
if (ptr)
{
ptr->thread_stack= (char*) &ptr;
ptr->store_globals();
wsrep_assign_from_threadvars(ptr);
wsrep_store_threadvars(ptr);
ptr->variables.option_bits&= ~OPTION_BIN_LOG; // disable binlog
ptr->variables.wsrep_on= won;
ptr->security_ctx->master_access= ~(ulong)0;
......
Subproject commit 0f676bd89378c7c823cff7ae7cdaef3cafcca231
Subproject commit 58aa3e821f575532870c5f76f6f1cf833458eed4
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment