Commit bee4b044 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-21751 innodb_fast_shutdown=0 can be unnecessarily slow

max out parallel purge worker tasks, on slow shutdown, to speedup
parent 839ad5e1
...@@ -757,6 +757,16 @@ srv_que_task_enqueue_low( ...@@ -757,6 +757,16 @@ srv_que_task_enqueue_low(
/*=====================*/ /*=====================*/
que_thr_t* thr); /*!< in: query thread */ que_thr_t* thr); /*!< in: query thread */
/**
Flag which is set, whenever innodb_purge_threads changes.
It is read and reset in srv_do_purge().
Thus it is Atomic_counter<int>, not bool, since unprotected
reads are used. We just need an atomic with relaxed memory
order, to please Thread Sanitizer.
*/
extern Atomic_counter<int> srv_purge_thread_count_changed;
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/** @return whether purge or master task is active */ /** @return whether purge or master task is active */
bool srv_any_background_activity(); bool srv_any_background_activity();
...@@ -796,7 +806,7 @@ ulint srv_get_task_queue_length(); ...@@ -796,7 +806,7 @@ ulint srv_get_task_queue_length();
void srv_purge_shutdown(); void srv_purge_shutdown();
/** Init purge tasks*/ /** Init purge tasks*/
void srv_init_purge_tasks(uint n_max); void srv_init_purge_tasks();
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/** Disables master thread. It's used by: /** Disables master thread. It's used by:
......
...@@ -48,6 +48,8 @@ static const ulint TRX_SYS_SPACE = 0; ...@@ -48,6 +48,8 @@ static const ulint TRX_SYS_SPACE = 0;
/** Random value to check for corruption of trx_t */ /** Random value to check for corruption of trx_t */
static const ulint TRX_MAGIC_N = 91118598; static const ulint TRX_MAGIC_N = 91118598;
constexpr uint innodb_purge_threads_MAX= 32;
/** Transaction execution states when trx->state == TRX_STATE_ACTIVE */ /** Transaction execution states when trx->state == TRX_STATE_ACTIVE */
enum trx_que_t { enum trx_que_t {
TRX_QUE_RUNNING, /*!< transaction is running */ TRX_QUE_RUNNING, /*!< transaction is running */
......
...@@ -72,6 +72,7 @@ Created 10/8/1995 Heikki Tuuri ...@@ -72,6 +72,7 @@ Created 10/8/1995 Heikki Tuuri
#include "fil0fil.h" #include "fil0fil.h"
#include "fil0crypt.h" #include "fil0crypt.h"
#include "fil0pagecompress.h" #include "fil0pagecompress.h"
#include "trx0types.h"
#include <my_service_manager.h> #include <my_service_manager.h>
...@@ -2060,6 +2061,15 @@ static bool srv_task_execute() ...@@ -2060,6 +2061,15 @@ static bool srv_task_execute()
return false; return false;
} }
std::mutex purge_thread_count_mtx;
void srv_update_purge_thread_count(uint n)
{
std::lock_guard<std::mutex> lk(purge_thread_count_mtx);
srv_n_purge_threads = n;
srv_purge_thread_count_changed = 1;
}
Atomic_counter<int> srv_purge_thread_count_changed;
/** Do the actual purge operation. /** Do the actual purge operation.
@param[in,out] n_total_purged total number of purged pages @param[in,out] n_total_purged total number of purged pages
...@@ -2072,7 +2082,7 @@ static uint32_t srv_do_purge(ulint* n_total_purged) ...@@ -2072,7 +2082,7 @@ static uint32_t srv_do_purge(ulint* n_total_purged)
static ulint n_use_threads = 0; static ulint n_use_threads = 0;
static uint32_t rseg_history_len = 0; static uint32_t rseg_history_len = 0;
ulint old_activity_count = srv_get_activity_count(); ulint old_activity_count = srv_get_activity_count();
const ulint n_threads = srv_n_purge_threads; static ulint n_threads = srv_n_purge_threads;
ut_a(n_threads > 0); ut_a(n_threads > 0);
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
...@@ -2088,7 +2098,20 @@ static uint32_t srv_do_purge(ulint* n_total_purged) ...@@ -2088,7 +2098,20 @@ static uint32_t srv_do_purge(ulint* n_total_purged)
} }
do { do {
if (trx_sys.rseg_history_len > rseg_history_len if (UNIV_UNLIKELY(srv_purge_thread_count_changed)) {
/* Read the fresh value of srv_n_purge_threads, reset
the changed flag. Both variables are protected by
purge_thread_count_mtx.
This code does not run concurrently, it is executed
by a single purge_coordinator thread, and no races
involving srv_purge_thread_count_changed are possible.
*/
std::lock_guard<std::mutex> lk(purge_thread_count_mtx);
n_threads = n_use_threads = srv_n_purge_threads;
srv_purge_thread_count_changed = 0;
} else if (trx_sys.rseg_history_len > rseg_history_len
|| (srv_max_purge_lag > 0 || (srv_max_purge_lag > 0
&& rseg_history_len > srv_max_purge_lag)) { && rseg_history_len > srv_max_purge_lag)) {
...@@ -2136,23 +2159,17 @@ static uint32_t srv_do_purge(ulint* n_total_purged) ...@@ -2136,23 +2159,17 @@ static uint32_t srv_do_purge(ulint* n_total_purged)
static std::queue<THD*> purge_thds; static std::queue<THD*> purge_thds;
static std::mutex purge_thd_mutex; static std::mutex purge_thd_mutex;
static void purge_create_background_thds(int n)
{
THD *thd= current_thd;
std::unique_lock<std::mutex> lk(purge_thd_mutex);
while (n--)
purge_thds.push(innobase_create_background_thd("InnoDB purge worker"));
set_current_thd(thd);
}
extern void* thd_attach_thd(THD*); extern void* thd_attach_thd(THD*);
extern void thd_detach_thd(void *); extern void thd_detach_thd(void *);
THD* acquire_thd(void **ctx) THD* acquire_thd(void **ctx)
{ {
std::unique_lock<std::mutex> lk(purge_thd_mutex); std::unique_lock<std::mutex> lk(purge_thd_mutex);
ut_a(!purge_thds.empty()); if (purge_thds.empty()) {
THD* thd = current_thd;
purge_thds.push(innobase_create_background_thd("InnoDB purge worker"));
set_current_thd(thd);
}
THD* thd = purge_thds.front(); THD* thd = purge_thds.front();
purge_thds.pop(); purge_thds.pop();
lk.unlock(); lk.unlock();
...@@ -2251,10 +2268,8 @@ static void purge_coordinator_callback(void*) ...@@ -2251,10 +2268,8 @@ static void purge_coordinator_callback(void*)
purge_state.m_running= 0; purge_state.m_running= 0;
} }
void srv_init_purge_tasks(uint n_tasks) void srv_init_purge_tasks()
{ {
purge_task_group.set_max_tasks(n_tasks - 1);
purge_create_background_thds(n_tasks);
purge_coordinator_timer= srv_thread_pool->create_timer purge_coordinator_timer= srv_thread_pool->create_timer
(purge_coordinator_timer_callback, nullptr); (purge_coordinator_timer_callback, nullptr);
} }
...@@ -2310,6 +2325,7 @@ ulint srv_get_task_queue_length() ...@@ -2310,6 +2325,7 @@ ulint srv_get_task_queue_length()
void srv_purge_shutdown() void srv_purge_shutdown()
{ {
if (purge_sys.enabled()) { if (purge_sys.enabled()) {
srv_update_purge_thread_count(innodb_purge_threads_MAX);
while(!srv_purge_should_exit()) { while(!srv_purge_should_exit()) {
ut_a(!purge_sys.paused()); ut_a(!purge_sys.paused());
srv_wake_purge_thread_if_not_active(); srv_wake_purge_thread_if_not_active();
......
...@@ -1962,8 +1962,7 @@ dberr_t srv_start(bool create_new_db) ...@@ -1962,8 +1962,7 @@ dberr_t srv_start(bool create_new_db)
if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL
&& srv_force_recovery < SRV_FORCE_NO_BACKGROUND) { && srv_force_recovery < SRV_FORCE_NO_BACKGROUND) {
srv_init_purge_tasks();
srv_init_purge_tasks(srv_n_purge_threads);
purge_sys.coordinator_startup(); purge_sys.coordinator_startup();
srv_wake_purge_thread_if_not_active(); srv_wake_purge_thread_if_not_active();
srv_start_state_set(SRV_START_STATE_PURGE); srv_start_state_set(SRV_START_STATE_PURGE);
......
...@@ -149,7 +149,7 @@ purge_graph_build() ...@@ -149,7 +149,7 @@ purge_graph_build()
NULL, NULL, QUE_FORK_PURGE, heap); NULL, NULL, QUE_FORK_PURGE, heap);
fork->trx = trx; fork->trx = trx;
for (ulint i = 0; i < srv_n_purge_threads; ++i) { for (auto i = innodb_purge_threads_MAX; i; i--) {
que_thr_t* thr = que_thr_create(fork, heap, NULL); que_thr_t* thr = que_thr_create(fork, heap, NULL);
thr->child = new(mem_heap_alloc(heap, sizeof(purge_node_t))) thr->child = new(mem_heap_alloc(heap, sizeof(purge_node_t)))
purge_node_t(thr); purge_node_t(thr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment