Commit cd15e764 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-16159 Use atomic memory access for purge_sys

Thanks to Sergey Vojtovich for feedback and many ideas.

purge_state_t: Remove. The states are replaced with
purge_sys_t::enabled() and purge_sys_t::paused() as follows:
PURGE_STATE_INIT, PURGE_STATE_EXIT, PURGE_STATE_DISABLED: !enabled().
PURGE_STATE_RUN, PURGE_STATE_STOP: paused() distinguishes these.

purge_sys_t::m_paused: Renamed from purge_sys_t::n_stop.
Protected by atomic memory access only, not purge_sys_t::latch.

purge_sys_t::m_enabled: An atomically updated Boolean that
replaces purge_sys_t::state.

purge_sys_t::running: Remove, because it duplicates
srv_sys.n_threads_active[SRV_PURGE].

purge_sys_t::running(): Accessor for srv_sys.n_threads_active[SRV_PURGE].

purge_sys_t::stop(): Renamed from trx_purge_stop().

purge_sys_t::resume(): Renamed from trx_purge_run().
Do not acquire latch; solely rely on atomics.

purge_sys_t::is_initialised(), purge_sys_t::m_initialised: Remove.

purge_sys_t::create(), purge_sys_t::close(): Instead of invoking
is_initialised(), check whether event is NULL.

purge_sys_t::event: Move before latch, so that fields that are
protected by latch can reside on the same cache line with latch.

srv_start_wait_for_purge_to_start(): Merge to the only caller srv_start().
parent 442a6e6b
......@@ -60,32 +60,6 @@ trx_purge(
ulint n_purge_threads, /*!< in: number of purge tasks to
submit to task queue. */
bool truncate); /*!< in: truncate history if true */
/*******************************************************************//**
Stop purge and wait for it to stop, move to PURGE_STATE_STOP. */
void
trx_purge_stop(void);
/*================*/
/*******************************************************************//**
Resume purge, move to PURGE_STATE_RUN. */
void
trx_purge_run(void);
/*================*/
/** Purge states */
enum purge_state_t {
PURGE_STATE_INIT, /*!< Purge instance created */
PURGE_STATE_RUN, /*!< Purge should be running */
PURGE_STATE_STOP, /*!< Purge should be stopped */
PURGE_STATE_EXIT, /*!< Purge has been shutdown */
PURGE_STATE_DISABLED /*!< Purge was never started */
};
/*******************************************************************//**
Get the purge state.
@return purge state. */
purge_state_t
trx_purge_state(void);
/*=================*/
/** Rollback segements from a given transaction with trx-no
scheduled for purge. */
......@@ -396,29 +370,20 @@ namespace undo {
/** The control structure used in the purge operation */
class purge_sys_t
{
bool m_initialised;
public:
/** signal state changes; os_event_reset() and os_event_set()
are protected by rw_lock_x_lock(latch) */
MY_ALIGNED(CACHE_LINE_SIZE)
rw_lock_t latch; /*!< The latch protecting the purge
view. A purge operation must acquire an
x-latch here for the instant at which
it changes the purge view: an undo
log operation can prevent this by
obtaining an s-latch here. It also
protects state and running */
os_event_t event;
/** latch protecting view, m_enabled */
MY_ALIGNED(CACHE_LINE_SIZE)
os_event_t event; /*!< State signal event;
os_event_set() and os_event_reset()
are protected by purge_sys_t::latch
X-lock */
MY_ALIGNED(CACHE_LINE_SIZE)
ulint n_stop; /*!< Counter to track number stops */
volatile bool running; /*!< true, if purge is active,
we check this without the latch too */
volatile purge_state_t state; /*!< Purge coordinator thread states,
we check this in several places
without holding the latch. */
rw_lock_t latch;
private:
/** whether purge is enabled; protected by latch and my_atomic */
int32_t m_enabled;
/** number of pending stop() calls without resume() */
int32_t m_paused;
public:
que_t* query; /*!< The query graph which will do the
parallelized purge operation */
MY_ALIGNED(CACHE_LINE_SIZE)
......@@ -494,10 +459,7 @@ class purge_sys_t
uninitialised. Real initialisation happens in create().
*/
purge_sys_t() : m_initialised(false) {}
bool is_initialised() const { return m_initialised; }
purge_sys_t() : event(NULL), m_enabled(false) {}
/** Create the instance */
......@@ -505,6 +467,49 @@ class purge_sys_t
/** Close the purge system on shutdown */
void close();
/** @return whether purge is enabled */
bool enabled()
{
return my_atomic_load32_explicit(&m_enabled, MY_MEMORY_ORDER_RELAXED);
}
/** @return whether purge is enabled */
bool enabled_latched()
{
ut_ad(rw_lock_own_flagged(&latch, RW_LOCK_FLAG_X | RW_LOCK_FLAG_S));
return bool(m_enabled);
}
/** @return whether the purge coordinator is paused */
bool paused()
{ return my_atomic_load32_explicit(&m_paused, MY_MEMORY_ORDER_RELAXED); }
/** @return whether the purge coordinator is paused */
bool paused_latched()
{
ut_ad(rw_lock_own_flagged(&latch, RW_LOCK_FLAG_X | RW_LOCK_FLAG_S));
return m_paused != 0;
}
/** Enable purge at startup. Not protected by latch; the main thread
will wait for purge_sys.enabled() in srv_start() */
void coordinator_startup()
{
ut_ad(!enabled());
my_atomic_store32_explicit(&m_enabled, true, MY_MEMORY_ORDER_RELAXED);
}
/** Disable purge at shutdown */
void coordinator_shutdown()
{
ut_ad(enabled());
my_atomic_store32_explicit(&m_enabled, false, MY_MEMORY_ORDER_RELAXED);
}
/** @return whether the purge coordinator thread is active */
bool running();
/** Stop purge during FLUSH TABLES FOR EXPORT */
void stop();
/** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */
void resume();
};
/** The global data structure coordinating a purge */
......
......@@ -4620,44 +4620,15 @@ lock_print_info_summary(
fprintf(file,
"Purge done for trx's n:o < " TRX_ID_FMT
" undo n:o < " TRX_ID_FMT " state: ",
" undo n:o < " TRX_ID_FMT " state: %s\n"
"History list length " ULINTPF "\n",
purge_sys.tail.trx_no(),
purge_sys.tail.undo_no);
/* Note: We are reading the state without the latch. One because it
will violate the latching order and two because we are merely querying
the state of the variable for display. */
switch (purge_sys.state){
case PURGE_STATE_INIT:
/* Should never be in this state while the system is running. */
ut_error;
case PURGE_STATE_EXIT:
fprintf(file, "exited");
break;
case PURGE_STATE_DISABLED:
fprintf(file, "disabled");
break;
case PURGE_STATE_RUN:
fprintf(file, "running");
/* Check if it is waiting for more data to arrive. */
if (!purge_sys.running) {
fprintf(file, " but idle");
}
break;
case PURGE_STATE_STOP:
fprintf(file, "stopped");
break;
}
fprintf(file, "\n");
fprintf(file,
"History list length " ULINTPF "\n", trx_sys.history_size());
purge_sys.tail.undo_no,
purge_sys.enabled()
? (purge_sys.running() ? "running"
: purge_sys.paused() ? "stopped" : "running but idle")
: "disabled",
trx_sys.history_size());
#ifdef PRINT_NUM_OF_LOCK_STRUCTS
fprintf(file,
......
......@@ -1959,15 +1959,6 @@ row_merge_read_clustered_index(
}
}
#ifdef DBUG_OFF
# define dbug_run_purge false
#else /* DBUG_OFF */
bool dbug_run_purge = false;
#endif /* DBUG_OFF */
DBUG_EXECUTE_IF(
"ib_purge_on_create_index_page_switch",
dbug_run_purge = true;);
/* Insert the cached spatial index rows. */
bool mtr_committed = false;
......@@ -1984,8 +1975,7 @@ row_merge_read_clustered_index(
goto scan_next;
}
if (dbug_run_purge
|| my_atomic_load32_explicit(&clust_index->lock.waiters,
if (my_atomic_load32_explicit(&clust_index->lock.waiters,
MY_MEMORY_ORDER_RELAXED)) {
/* There are waiters on the clustered
index tree lock, likely the purge
......@@ -2007,18 +1997,6 @@ row_merge_read_clustered_index(
btr_pcur_store_position(&pcur, &mtr);
mtr_commit(&mtr);
if (dbug_run_purge) {
/* This is for testing
purposes only (see
DBUG_EXECUTE_IF above). We
signal the purge thread and
hope that the purge batch will
complete before we execute
btr_pcur_restore_position(). */
trx_purge_run();
os_thread_sleep(1000000);
}
/* Give the waiters a chance to proceed. */
os_thread_yield();
scan_next:
......
......@@ -525,7 +525,7 @@ row_quiesce_table_start(
ib::info() << "Sync to disk of " << table->name << " started.";
if (srv_undo_sources) {
trx_purge_stop();
purge_sys.stop();
}
for (ulint count = 0;
......@@ -609,7 +609,7 @@ row_quiesce_table_complete(
}
if (srv_undo_sources) {
trx_purge_run();
purge_sys.resume();
}
dberr_t err = row_quiesce_set_state(table, QUIESCE_NONE, trx);
......
......@@ -613,6 +613,12 @@ struct srv_sys_t{
static srv_sys_t srv_sys;
/** @return whether the purge coordinator thread is active */
bool purge_sys_t::running()
{
return my_atomic_loadlint(&srv_sys.n_threads_active[SRV_PURGE]);
}
/** Event to signal srv_monitor_thread. Not protected by a mutex.
Set after setting srv_print_innodb_monitor. */
os_event_t srv_monitor_event;
......@@ -1896,19 +1902,8 @@ srv_get_active_thread_type(void)
srv_sys_mutex_exit();
if (ret == SRV_NONE && srv_shutdown_state != SRV_SHUTDOWN_NONE
&& purge_sys.is_initialised()) {
/* Check only on shutdown. */
switch (trx_purge_state()) {
case PURGE_STATE_RUN:
case PURGE_STATE_STOP:
if (ret == SRV_NONE && purge_sys.enabled()) {
ret = SRV_PURGE;
break;
case PURGE_STATE_INIT:
case PURGE_STATE_DISABLED:
case PURGE_STATE_EXIT:
break;
}
}
return(ret);
......@@ -1947,7 +1942,7 @@ srv_wake_purge_thread_if_not_active()
{
ut_ad(!srv_sys_mutex_own());
if (purge_sys.state == PURGE_STATE_RUN
if (purge_sys.enabled() && !purge_sys.paused()
&& !my_atomic_loadlint(&srv_sys.n_threads_active[SRV_PURGE])
&& trx_sys.history_size()) {
......@@ -2494,19 +2489,11 @@ DECLARE_THREAD(srv_worker_thread)(
srv_wake_purge_thread_if_not_active();
}
/* Note: we are checking the state without holding the
purge_sys.latch here. */
} while (purge_sys.state != PURGE_STATE_EXIT);
} while (purge_sys.enabled());
srv_free_slot(slot);
rw_lock_x_lock(&purge_sys.latch);
ut_a(!purge_sys.running);
ut_a(purge_sys.state == PURGE_STATE_EXIT);
rw_lock_x_unlock(&purge_sys.latch);
ut_ad(!purge_sys.enabled());
#ifdef UNIV_DEBUG_THREAD_CREATION
ib::info() << "Purge worker thread exiting, id "
......@@ -2604,8 +2591,7 @@ srv_do_purge(ulint* n_total_purged)
/* The previous round still did some work. */
continue;
}
} while (n_pages_purged > 0
&& purge_sys.state == PURGE_STATE_RUN
} while (n_pages_purged > 0 && !purge_sys.paused()
&& !srv_purge_should_exit());
return(rseg_history_len);
......@@ -2633,12 +2619,6 @@ srv_purge_coordinator_suspend(
int64_t sig_count = srv_suspend_thread(slot);
do {
rw_lock_x_lock(&purge_sys.latch);
purge_sys.running = false;
rw_lock_x_unlock(&purge_sys.latch);
/* We don't wait right away on the the non-timed wait because
we want to signal the thread that wants to suspend purge. */
const bool wait = stop
......@@ -2651,13 +2631,10 @@ srv_purge_coordinator_suspend(
rw_lock_x_lock(&purge_sys.latch);
stop = (srv_shutdown_state == SRV_SHUTDOWN_NONE
&& purge_sys.state == PURGE_STATE_STOP);
stop = srv_shutdown_state == SRV_SHUTDOWN_NONE
&& purge_sys.paused_latched();
if (!stop) {
ut_a(purge_sys.n_stop == 0);
purge_sys.running = true;
if (timeout
&& rseg_history_len < 5000
&& rseg_history_len == trx_sys.history_size()) {
......@@ -2671,8 +2648,6 @@ srv_purge_coordinator_suspend(
stop = true;
}
} else {
ut_a(purge_sys.n_stop > 0);
/* Signal that we are suspended. */
os_event_set(purge_sys.event);
}
......@@ -2700,15 +2675,9 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
ut_ad(!srv_read_only_mode);
ut_a(srv_n_purge_threads >= 1);
ut_a(trx_purge_state() == PURGE_STATE_INIT);
ut_a(srv_force_recovery < SRV_FORCE_NO_BACKGROUND);
rw_lock_x_lock(&purge_sys.latch);
purge_sys.running = true;
purge_sys.state = PURGE_STATE_RUN;
rw_lock_x_unlock(&purge_sys.latch);
purge_sys.coordinator_startup();
#ifdef UNIV_PFS_THREAD
pfs_register_thread(srv_purge_thread_key);
......@@ -2729,8 +2698,7 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
if (srv_shutdown_state == SRV_SHUTDOWN_NONE
&& srv_undo_sources
&& (n_total_purged == 0
|| purge_sys.state == PURGE_STATE_STOP)) {
&& (n_total_purged == 0 || purge_sys.paused())) {
srv_purge_coordinator_suspend(slot, rseg_history_len);
}
......@@ -2754,16 +2722,13 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
/* Note that we are shutting down. */
rw_lock_x_lock(&purge_sys.latch);
purge_sys.state = PURGE_STATE_EXIT;
purge_sys.coordinator_shutdown();
/* If there are any pending undo-tablespace truncate then clear
it off as we plan to shutdown the purge thread. */
purge_sys.undo_trunc.clear();
purge_sys.running = false;
/* Ensure that the wait in trx_purge_stop() will terminate. */
/* Ensure that the wait in purge_sys_t::stop() will terminate. */
os_event_set(purge_sys.event);
rw_lock_x_unlock(&purge_sys.latch);
......
......@@ -1102,41 +1102,6 @@ srv_undo_tablespaces_init(bool create_new_db)
return(DB_SUCCESS);
}
/********************************************************************
Wait for the purge thread(s) to start up. */
static
void
srv_start_wait_for_purge_to_start()
/*===============================*/
{
/* Wait for the purge coordinator and master thread to startup. */
purge_state_t state = trx_purge_state();
ut_a(state != PURGE_STATE_DISABLED);
while (srv_shutdown_state == SRV_SHUTDOWN_NONE
&& srv_force_recovery < SRV_FORCE_NO_BACKGROUND
&& state == PURGE_STATE_INIT) {
switch (state = trx_purge_state()) {
case PURGE_STATE_RUN:
case PURGE_STATE_STOP:
break;
case PURGE_STATE_INIT:
ib::info() << "Waiting for purge to start";
os_thread_sleep(50000);
break;
case PURGE_STATE_EXIT:
case PURGE_STATE_DISABLED:
ut_error;
}
}
}
/** Create the temporary file tablespace.
@param[in] create_new_db whether we are creating a new database
@return DB_SUCCESS or error code. */
......@@ -2417,7 +2382,7 @@ dberr_t srv_start(bool create_new_db)
| SRV_START_STATE_MONITOR;
ut_ad(srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN
|| trx_purge_state() == PURGE_STATE_INIT);
|| !purge_sys.enabled());
if (srv_force_recovery < SRV_FORCE_NO_BACKGROUND) {
srv_undo_sources = true;
......@@ -2494,11 +2459,14 @@ dberr_t srv_start(bool create_new_db)
thread_started[5 + i + SRV_MAX_N_IO_THREADS] = true;
}
srv_start_wait_for_purge_to_start();
while (srv_shutdown_state == SRV_SHUTDOWN_NONE
&& srv_force_recovery < SRV_FORCE_NO_BACKGROUND
&& !purge_sys.enabled()) {
ib::info() << "Waiting for purge to start";
os_thread_sleep(50000);
}
srv_start_state_set(SRV_START_STATE_PURGE);
} else {
purge_sys.state = PURGE_STATE_DISABLED;
}
srv_is_being_started = false;
......
......@@ -161,11 +161,11 @@ purge_graph_build()
void purge_sys_t::create()
{
ut_ad(this == &purge_sys);
ut_ad(!is_initialised());
ut_ad(!enabled());
ut_ad(!event);
event= os_event_create(0);
n_stop= 0;
running= false;
state= PURGE_STATE_INIT;
ut_ad(event);
m_paused= 0;
query= purge_graph_build();
n_submitted= 0;
n_completed= 0;
......@@ -178,27 +178,26 @@ void purge_sys_t::create()
rw_lock_create(trx_purge_latch_key, &latch, SYNC_PURGE_LATCH);
mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex);
undo_trunc.create();
m_initialised = true;
}
/** Close the purge subsystem on shutdown. */
void purge_sys_t::close()
{
ut_ad(this == &purge_sys);
if (!is_initialised()) return;
if (!event) return;
m_initialised = false;
m_enabled= false;
trx_t* trx = query->trx;
que_graph_free(query);
ut_ad(!trx->id);
ut_ad(trx->state == TRX_STATE_ACTIVE);
trx->state = TRX_STATE_NOT_STARTED;
trx->state= TRX_STATE_NOT_STARTED;
trx_free(trx);
rw_lock_free(&latch);
/* rw_lock_free() already called latch.~rw_lock_t(); tame the
debug assertions when the destructor will be called once more. */
ut_ad(latch.magic_n == 0);
ut_d(latch.magic_n = RW_LOCK_MAGIC_N);
ut_d(latch.magic_n= RW_LOCK_MAGIC_N);
mutex_free(&pq_mutex);
os_event_destroy(event);
}
......@@ -274,11 +273,10 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
in THD::cleanup() invoked from unlink_thd(), and we may also
continue to execute user transactions. */
ut_ad(srv_undo_sources
|| ((srv_startup_is_before_trx_rollback_phase
|| trx_rollback_is_active)
&& purge_sys.state == PURGE_STATE_INIT)
|| (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND
&& purge_sys.state == PURGE_STATE_DISABLED)
|| (!purge_sys.enabled()
&& (srv_startup_is_before_trx_rollback_phase
|| trx_rollback_is_active
|| srv_force_recovery >= SRV_FORCE_NO_BACKGROUND))
|| ((trx->undo_no == 0 || trx->mysql_thd
|| trx->internal)
&& srv_fast_shutdown));
......@@ -1591,110 +1589,63 @@ trx_purge(
return(n_pages_handled);
}
/*******************************************************************//**
Get the purge state.
@return purge state. */
purge_state_t
trx_purge_state(void)
/*=================*/
{
purge_state_t state;
rw_lock_x_lock(&purge_sys.latch);
state = purge_sys.state;
rw_lock_x_unlock(&purge_sys.latch);
return(state);
}
/*******************************************************************//**
Stop purge and wait for it to stop, move to PURGE_STATE_STOP. */
void
trx_purge_stop(void)
/*================*/
/** Stop purge during FLUSH TABLES FOR EXPORT */
void purge_sys_t::stop()
{
rw_lock_x_lock(&purge_sys.latch);
rw_lock_x_lock(&latch);
switch (purge_sys.state) {
case PURGE_STATE_INIT:
case PURGE_STATE_DISABLED:
ut_error;
case PURGE_STATE_EXIT:
/* Shutdown must have been initiated during
FLUSH TABLES FOR EXPORT. */
if (!enabled_latched())
{
/* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
ut_ad(!srv_undo_sources);
unlock:
rw_lock_x_unlock(&purge_sys.latch);
break;
case PURGE_STATE_STOP:
ut_ad(srv_n_purge_threads > 0);
++purge_sys.n_stop;
if (!purge_sys.running) {
goto unlock;
rw_lock_x_unlock(&latch);
return;
}
ib::info() << "Waiting for purge to stop";
do {
rw_lock_x_unlock(&purge_sys.latch);
os_thread_sleep(10000);
rw_lock_x_lock(&purge_sys.latch);
} while (purge_sys.running);
goto unlock;
case PURGE_STATE_RUN:
ut_ad(srv_n_purge_threads > 0);
++purge_sys.n_stop;
ib::info() << "Stopping purge";
/* We need to wakeup the purge thread in case it is suspended,
so that it can acknowledge the state change. */
ut_ad(srv_n_purge_threads > 0);
const int64_t sig_count = os_event_reset(purge_sys.event);
purge_sys.state = PURGE_STATE_STOP;
if (0 == my_atomic_add32_explicit(&m_paused, 1, MY_MEMORY_ORDER_RELAXED))
{
/* We need to wakeup the purge thread in case it is suspended, so
that it can acknowledge the state change. */
const int64_t sig_count = os_event_reset(event);
rw_lock_x_unlock(&latch);
ib::info() << "Stopping purge";
srv_purge_wakeup();
rw_lock_x_unlock(&purge_sys.latch);
/* Wait for purge coordinator to signal that it
is suspended. */
os_event_wait_low(purge_sys.event, sig_count);
/* Wait for purge coordinator to signal that it is suspended. */
os_event_wait_low(event, sig_count);
MONITOR_ATOMIC_INC(MONITOR_PURGE_STOP_COUNT);
return;
}
MONITOR_INC_VALUE(MONITOR_PURGE_STOP_COUNT, 1);
rw_lock_x_unlock(&latch);
if (running())
{
ib::info() << "Waiting for purge to stop";
while (running())
os_thread_sleep(10000);
}
}
/*******************************************************************//**
Resume purge, move to PURGE_STATE_RUN. */
void
trx_purge_run(void)
/*===============*/
/** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */
void purge_sys_t::resume()
{
rw_lock_x_lock(&purge_sys.latch);
switch (purge_sys.state) {
case PURGE_STATE_EXIT:
/* Shutdown must have been initiated during
FLUSH TABLES FOR EXPORT. */
if (!enabled())
{
/* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
ut_ad(!srv_undo_sources);
break;
case PURGE_STATE_INIT:
case PURGE_STATE_DISABLED:
ut_error;
case PURGE_STATE_RUN:
ut_a(!purge_sys.n_stop);
break;
case PURGE_STATE_STOP:
ut_a(purge_sys.n_stop);
if (--purge_sys.n_stop == 0) {
ib::info() << "Resuming purge";
purge_sys.state = PURGE_STATE_RUN;
}
MONITOR_INC_VALUE(MONITOR_PURGE_RESUME_COUNT, 1);
return;
}
rw_lock_x_unlock(&purge_sys.latch);
int32_t paused= my_atomic_add32_explicit(&m_paused, -1,
MY_MEMORY_ORDER_RELAXED);
ut_a(paused);
if (paused == 1)
{
ib::info() << "Resuming purge";
srv_purge_wakeup();
MONITOR_ATOMIC_INC(MONITOR_PURGE_RESUME_COUNT);
}
}
......@@ -715,7 +715,6 @@ trx_lists_init_at_db_start()
{
ut_a(srv_is_being_started);
ut_ad(!srv_was_started);
ut_ad(!purge_sys.is_initialised());
if (srv_operation == SRV_OPERATION_RESTORE) {
/* mariabackup --prepare only deals with
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment