Commit 8b97eba3 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-21674 purge_sys.stop() fails to wait for purge workers to complete

Since commit 5e62b6a5 (MDEV-16264),
purge_sys_t::stop() no longer waited for all purge activity to stop.

This caused problems on FLUSH TABLES...FOR EXPORT because of
purge running concurrently with the buffer pool flush.
The assertion at the end of buf_flush_dirty_pages() could fail.

The, implemented by Vladislav Vaintroub, aims to eliminate race
conditions when stopping or resuming purge:

waitable_task::disable(): Wait for the task to complete, then replace
the task callback function with noop.

waitable_task::enable(): Restore the original task callback function
after disable().

purge_sys_t::stop(): Invoke purge_coordinator_task.disable().

purge_sys_t::resume(): Invoke purge_coordinator_task.enable().

purge_sys_t::running(): Add const qualifier, and clarify the comment.
The purge coordinator task will remain active as long as any purge
worker task is active.

purge_worker_callback(): Assert purge_sys.running().

srv_purge_wakeup(): Merge with the only caller purge_sys_t::resume().

purge_coordinator_task: Use static linkage.
parent cd3bdc09
......@@ -818,9 +818,6 @@ void srv_error_monitor_task(void*);
ulint srv_get_task_queue_length();
#endif
/** Wakeup the purge threads. */
void srv_purge_wakeup();
/** Shut down the purge threads. */
void srv_purge_shutdown();
......
/*****************************************************************************
Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2019, MariaDB Corporation.
Copyright (c) 2017, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -250,8 +250,8 @@ class purge_sys_t
m_enabled.store(false, std::memory_order_relaxed);
}
/** @return whether the purge coordinator thread is active */
bool running();
/** @return whether the purge tasks are active */
bool running() const;
/** Stop purge during FLUSH TABLES FOR EXPORT */
void stop();
/** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */
......
......@@ -3,7 +3,7 @@
Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2008, 2009 Google Inc.
Copyright (c) 2009, Percona Inc.
Copyright (c) 2013, 2019, MariaDB Corporation.
Copyright (c) 2013, 2020, MariaDB Corporation.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
......@@ -589,10 +589,6 @@ struct purge_coordinator_state
};
static purge_coordinator_state purge_state;
extern tpool::waitable_task purge_coordinator_task;
/** @return whether the purge coordinator thread is active */
bool purge_sys_t::running() { return purge_coordinator_task.is_running(); }
/** threadpool timer for srv_error_monitor_task(). */
std::unique_ptr<tpool::timer> srv_error_monitor_timer;
......@@ -1590,9 +1586,8 @@ static tpool::task_group purge_task_group;
tpool::waitable_task purge_worker_task(purge_worker_callback, nullptr,
&purge_task_group);
static tpool::task_group purge_coordinator_task_group(1);
tpool::waitable_task purge_coordinator_task(purge_coordinator_callback,
nullptr,
&purge_coordinator_task_group);
static tpool::waitable_task purge_coordinator_task
(purge_coordinator_callback, nullptr, &purge_coordinator_task_group);
static tpool::timer *purge_coordinator_timer;
......@@ -1611,6 +1606,66 @@ srv_wake_purge_thread_if_not_active()
}
}
/** @return whether the purge tasks are active */
bool purge_sys_t::running() const
{
return purge_coordinator_task.is_running();
}
/** Stop purge during FLUSH TABLES FOR EXPORT */
void purge_sys_t::stop()
{
rw_lock_x_lock(&latch);
if (!enabled())
{
/* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
ut_ad(!srv_undo_sources);
rw_lock_x_unlock(&latch);
return;
}
ut_ad(srv_n_purge_threads > 0);
const auto paused= m_paused++;
rw_lock_x_unlock(&latch);
if (!paused)
{
ib::info() << "Stopping purge";
MONITOR_ATOMIC_INC(MONITOR_PURGE_STOP_COUNT);
purge_coordinator_task.disable();
}
}
/** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */
void purge_sys_t::resume()
{
if (!enabled())
{
/* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
ut_ad(!srv_undo_sources);
return;
}
ut_ad(!srv_read_only_mode);
ut_ad(srv_force_recovery < SRV_FORCE_NO_BACKGROUND);
ut_ad(!sync_check_iterate(sync_check()));
purge_coordinator_task.enable();
rw_lock_x_lock(&latch);
int32_t paused= m_paused--;
ut_a(paused);
if (paused == 1)
{
ib::info() << "Resuming purge";
purge_state.m_running = 0;
srv_wake_purge_thread_if_not_active();
MONITOR_ATOMIC_INC(MONITOR_PURGE_RESUME_COUNT);
}
rw_lock_x_unlock(&latch);
}
/** Wake up the master thread if it is suspended or being suspended. */
void
srv_wake_master_thread()
......@@ -2182,7 +2237,8 @@ static void purge_worker_callback(void*)
ut_ad(srv_force_recovery < SRV_FORCE_NO_BACKGROUND);
void *ctx;
THD *thd= acquire_thd(&ctx);
while (srv_task_execute()) {}
while (srv_task_execute())
ut_ad(purge_sys.running());
release_thd(thd,ctx);
}
......@@ -2287,19 +2343,6 @@ ulint srv_get_task_queue_length()
}
#endif
/** Wake up the purge coordinator. */
void
srv_purge_wakeup()
{
ut_ad(!srv_read_only_mode);
if (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND) {
return;
}
ut_a(purge_sys.enabled() && !purge_sys.paused());
purge_state.m_running = 0;
srv_wake_purge_thread_if_not_active();
}
/** Shut down the purge threads. */
void srv_purge_shutdown()
{
......
/*****************************************************************************
Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2019, MariaDB Corporation.
Copyright (c) 2017, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -1308,60 +1308,3 @@ ulint trx_purge(ulint n_tasks, bool truncate)
return(n_pages_handled);
}
extern tpool::waitable_task purge_coordinator_task;
/** Stop purge during FLUSH TABLES FOR EXPORT */
void purge_sys_t::stop()
{
rw_lock_x_lock(&latch);
if (!enabled())
{
/* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
ut_ad(!srv_undo_sources);
rw_lock_x_unlock(&latch);
return;
}
ut_ad(srv_n_purge_threads > 0);
if (m_paused++ == 0)
{
rw_lock_x_unlock(&latch);
ib::info() << "Stopping purge";
MONITOR_ATOMIC_INC(MONITOR_PURGE_STOP_COUNT);
return;
}
rw_lock_x_unlock(&latch);
if (running())
{
ib::info() << "Waiting for purge to stop";
purge_coordinator_task.wait();
}
}
/** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */
void purge_sys_t::resume()
{
if (!enabled())
{
/* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
ut_ad(!srv_undo_sources);
return;
}
ut_ad(!sync_check_iterate(sync_check()));
rw_lock_x_lock(&latch);
int32_t paused= m_paused--;
ut_a(paused);
if (paused == 1)
{
ib::info() << "Resuming purge";
srv_purge_wakeup();
MONITOR_ATOMIC_INC(MONITOR_PURGE_RESUME_COUNT);
}
rw_lock_x_unlock(&latch);
}
/* Copyright(C) 2019 MariaDB Corporation.
/* Copyright (C) 2019, 2020, MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
......@@ -57,7 +57,7 @@ void execute_after_task_callback()
/* Task that provide wait() operation. */
waitable_task::waitable_task(callback_func func, void* arg, task_group* group) :
task(func,arg, group),m_mtx(),m_cv(),m_ref_count(),m_waiter_count(){}
task(func,arg, group),m_mtx(),m_cv(),m_ref_count(),m_waiter_count(),m_original_func(){}
void waitable_task::add_ref()
{
......@@ -72,13 +72,37 @@ void execute_after_task_callback()
if (!m_ref_count && m_waiter_count)
m_cv.notify_all();
}
void waitable_task::wait()
void waitable_task::wait(std::unique_lock<std::mutex>& lk)
{
std::unique_lock<std::mutex> lk(m_mtx);
m_waiter_count++;
while (m_ref_count)
m_cv.wait(lk);
m_waiter_count--;
}
void waitable_task::wait()
{
std::unique_lock<std::mutex> lk(m_mtx);
wait(lk);
}
}
\ No newline at end of file
static void noop(void*)
{
}
void waitable_task::disable()
{
std::unique_lock<std::mutex> lk(m_mtx);
if (m_func == noop)
return;
wait(lk);
m_original_func = m_func;
m_func = noop;
}
void waitable_task::enable()
{
std::unique_lock<std::mutex> lk(m_mtx);
if(m_func != noop)
return;
wait(lk);
m_func = m_original_func;
}
}
/* Copyright(C) 2019 MariaDB
/* Copyright (C) 2019, 2020, MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
......@@ -96,6 +96,8 @@ class waitable_task :public task
std::condition_variable m_cv;
int m_ref_count;
int m_waiter_count;
callback_func m_original_func;
void wait(std::unique_lock<std::mutex>&lk);
public:
waitable_task(callback_func func, void* arg, task_group* group = nullptr);
void add_ref() override;
......@@ -103,6 +105,8 @@ class waitable_task :public task
TPOOL_SUPPRESS_TSAN bool is_running() { return get_ref_count() > 0; }
TPOOL_SUPPRESS_TSAN int get_ref_count() {return m_ref_count;}
void wait();
void disable();
void enable();
virtual ~waitable_task() {};
};
enum class aio_opcode
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment