Commit 717a3215 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-26356 preparation: Refactor purge_state

purge_coordinator_timer_callback(): Remove. We will have
purge_coordinator_timer invoke purge_coordinator_callback()
directly.

srv_master_callback(): Invoke srv_wake_purge_thread_if_not_active()
instead of purge_coordinator_timer_callback(). That is, we will
trigger purge_coordinator_callback() once per second if there is
any work to be done.

purge_state::do_purge(): Replaces srv_do_purge(),
purge_coordinator_callback_low(), and
purge_coordinator_timer_callback(). The static variables
inside srv_do_purge() were moved to purge_state data members.
parent 03e4cb24
......@@ -528,7 +528,14 @@ struct purge_coordinator_state
/** Snapshot of the last history length before the purge call.*/
uint32 m_history_length;
Atomic_counter<int> m_running;
purge_coordinator_state() : m_history_length(), m_running(0) {}
private:
ulint count;
ulint n_use_threads;
ulint n_threads;
inline void lazy_init();
public:
inline void do_purge();
};
static purge_coordinator_state purge_state;
......@@ -1329,7 +1336,6 @@ bool srv_any_background_activity()
static void purge_worker_callback(void*);
static void purge_coordinator_callback(void*);
static void purge_coordinator_timer_callback(void*);
static tpool::task_group purge_task_group;
tpool::waitable_task purge_worker_task(purge_worker_callback, nullptr,
......@@ -1621,24 +1627,25 @@ void srv_shutdown(bool ibuf_merge)
/** The periodic master task controlling the server. */
void srv_master_callback(void*)
{
static ulint old_activity_count;
static ulint old_activity_count;
ut_a(srv_shutdown_state <= SRV_SHUTDOWN_INITIATED);
ut_a(srv_shutdown_state <= SRV_SHUTDOWN_INITIATED);
MONITOR_INC(MONITOR_MASTER_THREAD_SLEEP);
ut_d(srv_master_do_disabled_loop());
purge_coordinator_timer_callback(nullptr);
ulonglong counter_time = microsecond_interval_timer();
srv_sync_log_buffer_in_background();
MONITOR_INC_TIME_IN_MICRO_SECS(
MONITOR_SRV_LOG_FLUSH_MICROSECOND, counter_time);
MONITOR_INC(MONITOR_MASTER_THREAD_SLEEP);
ut_d(srv_master_do_disabled_loop());
if (!purge_state.m_running)
srv_wake_purge_thread_if_not_active();
ulonglong counter_time= microsecond_interval_timer();
srv_sync_log_buffer_in_background();
MONITOR_INC_TIME_IN_MICRO_SECS(MONITOR_SRV_LOG_FLUSH_MICROSECOND,
counter_time);
if (srv_check_activity(&old_activity_count)) {
srv_master_do_active_tasks(counter_time);
} else {
srv_master_do_idle_tasks(counter_time);
}
srv_main_thread_op_info = "sleeping";
if (srv_check_activity(&old_activity_count))
srv_master_do_active_tasks(counter_time);
else
srv_master_do_idle_tasks(counter_time);
srv_main_thread_op_info= "sleeping";
}
/** @return whether purge should exit due to shutdown */
......@@ -1711,86 +1718,88 @@ void srv_update_purge_thread_count(uint n)
Atomic_counter<int> srv_purge_thread_count_changed;
/** Do the actual purge operation.
@param[in,out] n_total_purged total number of purged pages
@return length of history list before the last purge batch. */
static uint32_t srv_do_purge(ulint* n_total_purged)
inline void purge_coordinator_state::do_purge()
{
ulint n_pages_purged;
ut_ad(!srv_read_only_mode);
lazy_init();
ut_ad(n_threads);
bool wakeup= false;
static ulint count = 0;
static ulint n_use_threads = 0;
static uint32_t rseg_history_len = 0;
ulint old_activity_count = srv_get_activity_count();
static ulint n_threads = srv_n_purge_threads;
purge_coordinator_timer->disarm();
ut_a(n_threads > 0);
ut_ad(!srv_read_only_mode);
while (purge_sys.enabled() && !purge_sys.paused())
{
loop:
wakeup= false;
const auto sigcount= m_running;
/* Purge until there are no more records to purge and there is
no change in configuration or server state. If the user has
configured more than one purge thread then we treat that as a
pool of threads and only use the extra threads if purge can't
keep up with updates. */
const auto old_activity_count= srv_sys.activity_count;
const auto history_size= trx_sys.history_size();
if (n_use_threads == 0) {
n_use_threads = n_threads;
}
if (UNIV_UNLIKELY(srv_purge_thread_count_changed))
{
/* Read the fresh value of srv_n_purge_threads, reset
the changed flag. Both are protected by purge_thread_count_mtx.
do {
if (UNIV_UNLIKELY(srv_purge_thread_count_changed)) {
/* Read the fresh value of srv_n_purge_threads, reset
the changed flag. Both variables are protected by
purge_thread_count_mtx.
This code does not run concurrently, it is executed
by a single purge_coordinator thread, and no races
involving srv_purge_thread_count_changed are possible.
*/
std::lock_guard<std::mutex> lk(purge_thread_count_mtx);
n_threads = n_use_threads = srv_n_purge_threads;
srv_purge_thread_count_changed = 0;
} else if (trx_sys.history_size_approx() > rseg_history_len
|| (srv_max_purge_lag > 0
&& rseg_history_len > srv_max_purge_lag)) {
/* History length is now longer than what it was
when we took the last snapshot. Use more threads. */
if (n_use_threads < n_threads) {
++n_use_threads;
}
This code does not run concurrently, it is executed
by a single purge_coordinator thread, and no races
involving srv_purge_thread_count_changed are possible. */
{
std::lock_guard<std::mutex> lk(purge_thread_count_mtx);
n_threads= n_use_threads= srv_n_purge_threads;
srv_purge_thread_count_changed= 0;
}
}
else if (history_size > m_history_length ||
(srv_max_purge_lag && m_history_length > srv_max_purge_lag))
{
/* dynamically adjust the purge thread based on redo log fill factor */
if (n_threads > n_use_threads)
++n_use_threads;
}
else if (n_use_threads > 1 && old_activity_count == srv_sys.activity_count)
--n_use_threads;
} else if (srv_check_activity(&old_activity_count)
&& n_use_threads > 1) {
ut_ad(n_use_threads);
ut_ad(n_use_threads <= n_threads);
/* History length same or smaller since last snapshot,
use fewer threads. */
m_history_length= history_size;
--n_use_threads;
}
if (history_size &&
trx_purge(n_use_threads,
!(++count % srv_purge_rseg_truncate_frequency) ||
purge_sys.truncate.current))
continue;
/* Ensure that the purge threads are less than what
was configured. */
if (m_running == sigcount)
{
/* Purge was not woken up by srv_wake_purge_thread_if_not_active() */
ut_a(n_use_threads > 0);
ut_a(n_use_threads <= n_threads);
/* The magic number 5000 is an approximation for the case where we have
cached undo log records which prevent truncate of rollback segments. */
wakeup= history_size &&
(history_size >= 5000 ||
history_size != trx_sys.history_size_approx());
break;
}
else if (!trx_sys.history_exists())
break;
/* Take a snapshot of the history list before purge. */
if (!(rseg_history_len = trx_sys.history_size())) {
break;
}
if (!srv_purge_should_exit())
goto loop;
}
n_pages_purged = trx_purge(
n_use_threads,
!(++count % srv_purge_rseg_truncate_frequency)
|| purge_sys.truncate.current);
if (wakeup)
purge_coordinator_timer->set_time(10, 0);
*n_total_purged += n_pages_purged;
} while (n_pages_purged > 0 && !purge_sys.paused()
&& !srv_purge_should_exit());
m_running= 0;
}
return(rseg_history_len);
inline void purge_coordinator_state::lazy_init()
{
if (n_threads)
return;
n_threads= n_use_threads= srv_n_purge_threads;
}
......@@ -1836,24 +1845,6 @@ static void release_thd(THD *thd, void *ctx)
set_current_thd(0);
}
/**
Called by timer when purge coordinator decides
to delay processing of purge records.
*/
static void purge_coordinator_timer_callback(void *)
{
if (!purge_sys.enabled() || purge_sys.paused() || purge_state.m_running)
return;
/* The magic number 5000 is an approximation for the case where we have
cached undo log records which prevent truncate of the rollback segments. */
if (const auto history_size= trx_sys.history_size())
if (purge_state.m_history_length >= 5000 ||
purge_state.m_history_length != history_size)
srv_wake_purge_thread_if_not_active();
}
static void purge_worker_callback(void*)
{
ut_ad(!current_thd);
......@@ -1866,57 +1857,19 @@ static void purge_worker_callback(void*)
release_thd(thd,ctx);
}
static void purge_coordinator_callback_low()
{
ulint n_total_purged= ULINT_UNDEFINED;
purge_state.m_history_length= 0;
if (!purge_sys.enabled() || purge_sys.paused())
return;
do
{
n_total_purged = 0;
int sigcount= purge_state.m_running;
purge_state.m_history_length= srv_do_purge(&n_total_purged);
/* Check if purge was woken by srv_wake_purge_thread_if_not_active() */
bool woken_during_purge= purge_state.m_running > sigcount;
/* If last purge batch processed less than 1 page and there is
still work to do, delay the next batch by 10ms. Unless
someone added work and woke us up. */
if (n_total_purged == 0)
{
if (trx_sys.history_size() == 0)
return;
if (!woken_during_purge)
{
/* Delay next purge round*/
purge_coordinator_timer->set_time(10, 0);
return;
}
}
}
while ((purge_sys.enabled() && !purge_sys.paused()) ||
!srv_purge_should_exit());
}
static void purge_coordinator_callback(void*)
{
void *ctx;
THD *thd= acquire_thd(&ctx);
purge_coordinator_callback_low();
release_thd(thd,ctx);
purge_state.m_running= 0;
purge_state.do_purge();
release_thd(thd, ctx);
}
void srv_init_purge_tasks()
{
purge_create_background_thds(srv_n_purge_threads);
purge_coordinator_timer= srv_thread_pool->create_timer
(purge_coordinator_timer_callback, nullptr);
(purge_coordinator_callback, nullptr);
}
static void srv_shutdown_purge_tasks()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment