Commit 6441bc61 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-25113: Introduce a page cleaner mode before 'furious flush'

MDEV-23855 changed the way how the page cleaner is signaled by
user threads. If a threshold is exceeded, a mini-transaction commit
would invoke buf_flush_ahead() in order to initiate page flushing
before all writers would eventually grind to halt in
log_free_check(), waiting for the checkpoint age to reduce.

However, buf_flush_ahead() would always initiate 'furious flushing',
making the buf_flush_page_cleaner thread write innodb_io_capacity_max
pages per batch, and sleeping no time between batches, until the
limit LSN is reached. Because this could saturate the I/O subsystem,
system throughput could significantly reduce during these
'furious flushing' spikes.

With this change, we introduce a gentler version of flush-ahead,
which would write innodb_io_capacity_max pages per second until
the 'soft limit' is reached.

buf_flush_ahead(): Add a parameter to specify whether furious flushing
is requested.

buf_flush_async_lsn: Similar to buf_flush_sync_lsn, a limit for
the less intrusive flushing.

buf_flush_page_cleaner(): Keep working until buf_flush_async_lsn
has been reached.

log_close(): Suppress a warning message in the event that a new log
is being created during startup, when old logs did not exist.
Return what type of page cleaning will be needed.

mtr_t::finish_write(): Also when m_log.is_small(), invoke log_close().
Return what type of page cleaning will be needed.

mtr_t::commit(): Invoke buf_flush_ahead() based on the return value of
mtr_t::finish_write().
parent 22b62eda
...@@ -63,8 +63,11 @@ static constexpr ulint buf_flush_lsn_scan_factor = 3; ...@@ -63,8 +63,11 @@ static constexpr ulint buf_flush_lsn_scan_factor = 3;
/** Average redo generation rate */ /** Average redo generation rate */
static lsn_t lsn_avg_rate = 0; static lsn_t lsn_avg_rate = 0;
/** Target oldest_modification for the page cleaner; writes are protected by /** Target oldest_modification for the page cleaner background flushing;
buf_pool.flush_list_mutex */ writes are protected by buf_pool.flush_list_mutex */
static Atomic_relaxed<lsn_t> buf_flush_async_lsn;
/** Target oldest_modification for the page cleaner furious flushing;
writes are protected by buf_pool.flush_list_mutex */
static Atomic_relaxed<lsn_t> buf_flush_sync_lsn; static Atomic_relaxed<lsn_t> buf_flush_sync_lsn;
#ifdef UNIV_PFS_THREAD #ifdef UNIV_PFS_THREAD
...@@ -1905,9 +1908,10 @@ ATTRIBUTE_COLD void buf_flush_wait_flushed(lsn_t sync_lsn) ...@@ -1905,9 +1908,10 @@ ATTRIBUTE_COLD void buf_flush_wait_flushed(lsn_t sync_lsn)
} }
} }
/** If innodb_flush_sync=ON, initiate a furious flush. /** Initiate more eager page flushing if the log checkpoint age is too old.
@param lsn buf_pool.get_oldest_modification(LSN_MAX) target */ @param lsn buf_pool.get_oldest_modification(LSN_MAX) target
void buf_flush_ahead(lsn_t lsn) @param furious true=furious flushing, false=limit to innodb_io_capacity */
ATTRIBUTE_COLD void buf_flush_ahead(lsn_t lsn, bool furious)
{ {
mysql_mutex_assert_not_owner(&log_sys.mutex); mysql_mutex_assert_not_owner(&log_sys.mutex);
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
...@@ -1915,14 +1919,15 @@ void buf_flush_ahead(lsn_t lsn) ...@@ -1915,14 +1919,15 @@ void buf_flush_ahead(lsn_t lsn)
if (recv_recovery_is_on()) if (recv_recovery_is_on())
recv_sys.apply(true); recv_sys.apply(true);
if (buf_flush_sync_lsn < lsn) Atomic_relaxed<lsn_t> &limit= furious
? buf_flush_sync_lsn : buf_flush_async_lsn;
if (limit < lsn)
{ {
mysql_mutex_lock(&buf_pool.flush_list_mutex); mysql_mutex_lock(&buf_pool.flush_list_mutex);
if (buf_flush_sync_lsn < lsn) if (limit < lsn)
{ limit= lsn;
buf_flush_sync_lsn= lsn; pthread_cond_signal(&buf_pool.do_flush_list);
pthread_cond_signal(&buf_pool.do_flush_list);
}
mysql_mutex_unlock(&buf_pool.flush_list_mutex); mysql_mutex_unlock(&buf_pool.flush_list_mutex);
} }
} }
...@@ -1997,6 +2002,8 @@ ATTRIBUTE_COLD static void buf_flush_sync_for_checkpoint(lsn_t lsn) ...@@ -1997,6 +2002,8 @@ ATTRIBUTE_COLD static void buf_flush_sync_for_checkpoint(lsn_t lsn)
if (measure >= target) if (measure >= target)
buf_flush_sync_lsn= 0; buf_flush_sync_lsn= 0;
else if (measure >= buf_flush_async_lsn)
buf_flush_async_lsn= 0;
/* wake up buf_flush_wait_flushed() */ /* wake up buf_flush_wait_flushed() */
pthread_cond_broadcast(&buf_pool.done_flush_list); pthread_cond_broadcast(&buf_pool.done_flush_list);
...@@ -2016,7 +2023,7 @@ static bool af_needed_for_redo(lsn_t oldest_lsn) ...@@ -2016,7 +2023,7 @@ static bool af_needed_for_redo(lsn_t oldest_lsn)
{ {
lsn_t age= (log_sys.get_lsn() - oldest_lsn); lsn_t age= (log_sys.get_lsn() - oldest_lsn);
lsn_t af_lwm= static_cast<lsn_t>(srv_adaptive_flushing_lwm * lsn_t af_lwm= static_cast<lsn_t>(srv_adaptive_flushing_lwm *
static_cast<double>(log_sys.log_capacity) / 100); static_cast<double>(log_sys.log_capacity) / 100);
/* if age > af_lwm adaptive flushing is recommended */ /* if age > af_lwm adaptive flushing is recommended */
return (age > af_lwm); return (age > af_lwm);
...@@ -2240,6 +2247,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*) ...@@ -2240,6 +2247,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
set_timespec(abstime, 1); set_timespec(abstime, 1);
lsn_t soft_lsn_limit= buf_flush_async_lsn;
lsn_limit= buf_flush_sync_lsn; lsn_limit= buf_flush_sync_lsn;
if (UNIV_UNLIKELY(lsn_limit != 0)) if (UNIV_UNLIKELY(lsn_limit != 0))
...@@ -2261,6 +2269,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*) ...@@ -2261,6 +2269,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
pthread_cond_broadcast(&buf_pool.done_flush_list); pthread_cond_broadcast(&buf_pool.done_flush_list);
} }
unemployed: unemployed:
buf_flush_async_lsn= 0;
buf_pool.page_cleaner_set_idle(true); buf_pool.page_cleaner_set_idle(true);
continue; continue;
} }
...@@ -2275,7 +2284,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*) ...@@ -2275,7 +2284,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
bool idle_flush= false; bool idle_flush= false;
if (lsn_limit); if (lsn_limit || soft_lsn_limit);
else if (af_needed_for_redo(oldest_lsn)); else if (af_needed_for_redo(oldest_lsn));
else if (srv_max_dirty_pages_pct_lwm != 0.0) else if (srv_max_dirty_pages_pct_lwm != 0.0)
{ {
...@@ -2300,11 +2309,16 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*) ...@@ -2300,11 +2309,16 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
goto unemployed; goto unemployed;
if (UNIV_UNLIKELY(lsn_limit != 0) && oldest_lsn >= lsn_limit) if (UNIV_UNLIKELY(lsn_limit != 0) && oldest_lsn >= lsn_limit)
buf_flush_sync_lsn= 0; lsn_limit= buf_flush_sync_lsn= 0;
if (UNIV_UNLIKELY(soft_lsn_limit != 0) && oldest_lsn >= soft_lsn_limit)
soft_lsn_limit= buf_flush_async_lsn= 0;
buf_pool.page_cleaner_set_idle(false); buf_pool.page_cleaner_set_idle(false);
mysql_mutex_unlock(&buf_pool.flush_list_mutex); mysql_mutex_unlock(&buf_pool.flush_list_mutex);
if (!lsn_limit)
lsn_limit= soft_lsn_limit;
ulint n_flushed; ulint n_flushed;
if (UNIV_UNLIKELY(lsn_limit != 0)) if (UNIV_UNLIKELY(lsn_limit != 0))
...@@ -2355,7 +2369,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*) ...@@ -2355,7 +2369,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
goto do_checkpoint; goto do_checkpoint;
} }
} }
else else if (buf_flush_async_lsn <= oldest_lsn)
{ {
mysql_mutex_lock(&buf_pool.flush_list_mutex); mysql_mutex_lock(&buf_pool.flush_list_mutex);
goto unemployed; goto unemployed;
...@@ -2410,6 +2424,7 @@ ATTRIBUTE_COLD void buf_flush_page_cleaner_init() ...@@ -2410,6 +2424,7 @@ ATTRIBUTE_COLD void buf_flush_page_cleaner_init()
ut_ad(srv_operation == SRV_OPERATION_NORMAL || ut_ad(srv_operation == SRV_OPERATION_NORMAL ||
srv_operation == SRV_OPERATION_RESTORE || srv_operation == SRV_OPERATION_RESTORE ||
srv_operation == SRV_OPERATION_RESTORE_EXPORT); srv_operation == SRV_OPERATION_RESTORE_EXPORT);
buf_flush_async_lsn= 0;
buf_flush_sync_lsn= 0; buf_flush_sync_lsn= 0;
buf_page_cleaner_is_active= true; buf_page_cleaner_is_active= true;
os_thread_create(buf_flush_page_cleaner); os_thread_create(buf_flush_page_cleaner);
......
...@@ -111,9 +111,10 @@ void buf_flush_wait_batch_end(bool lru); ...@@ -111,9 +111,10 @@ void buf_flush_wait_batch_end(bool lru);
/** Wait until all persistent pages are flushed up to a limit. /** Wait until all persistent pages are flushed up to a limit.
@param sync_lsn buf_pool.get_oldest_modification(LSN_MAX) to wait for */ @param sync_lsn buf_pool.get_oldest_modification(LSN_MAX) to wait for */
ATTRIBUTE_COLD void buf_flush_wait_flushed(lsn_t sync_lsn); ATTRIBUTE_COLD void buf_flush_wait_flushed(lsn_t sync_lsn);
/** If innodb_flush_sync=ON, initiate a furious flush. /** Initiate more eager page flushing if the log checkpoint age is too old.
@param lsn buf_pool.get_oldest_modification(LSN_MAX) target */ @param lsn buf_pool.get_oldest_modification(LSN_MAX) target
void buf_flush_ahead(lsn_t lsn); @param furious true=furious flushing, false=limit to innodb_io_capacity */
ATTRIBUTE_COLD void buf_flush_ahead(lsn_t lsn, bool furious);
/********************************************************************//** /********************************************************************//**
This function should be called at a mini-transaction commit, if a page was This function should be called at a mini-transaction commit, if a page was
......
...@@ -588,6 +588,17 @@ struct mtr_t { ...@@ -588,6 +588,17 @@ struct mtr_t {
@return number of buffer count added by this mtr */ @return number of buffer count added by this mtr */
uint32_t get_fix_count(const buf_block_t *block) const; uint32_t get_fix_count(const buf_block_t *block) const;
/** type of page flushing is needed during commit() */
enum page_flush_ahead
{
/** no need to trigger page cleaner */
PAGE_FLUSH_NO= 0,
/** asynchronous flushing is needed */
PAGE_FLUSH_ASYNC,
/** furious flushing is needed */
PAGE_FLUSH_SYNC
};
private: private:
/** Log a write of a byte string to a page. /** Log a write of a byte string to a page.
@param block buffer page @param block buffer page
...@@ -621,7 +632,7 @@ struct mtr_t { ...@@ -621,7 +632,7 @@ struct mtr_t {
/** Append the redo log records to the redo log buffer. /** Append the redo log records to the redo log buffer.
@param len number of bytes to write @param len number of bytes to write
@return {start_lsn,flush_ahead} */ @return {start_lsn,flush_ahead} */
inline std::pair<lsn_t,bool> finish_write(ulint len); inline std::pair<lsn_t,page_flush_ahead> finish_write(ulint len);
/** Release the resources */ /** Release the resources */
inline void release_resources(); inline void release_resources();
......
...@@ -402,12 +402,12 @@ void mtr_t::commit() ...@@ -402,12 +402,12 @@ void mtr_t::commit()
{ {
ut_ad(!srv_read_only_mode || m_log_mode == MTR_LOG_NO_REDO); ut_ad(!srv_read_only_mode || m_log_mode == MTR_LOG_NO_REDO);
std::pair<lsn_t,bool> lsns; std::pair<lsn_t,page_flush_ahead> lsns;
if (const ulint len= prepare_write()) if (const ulint len= prepare_write())
lsns= finish_write(len); lsns= finish_write(len);
else else
lsns= { m_commit_lsn, false }; lsns= { m_commit_lsn, PAGE_FLUSH_NO };
if (m_made_dirty) if (m_made_dirty)
mysql_mutex_lock(&log_sys.flush_order_mutex); mysql_mutex_lock(&log_sys.flush_order_mutex);
...@@ -447,8 +447,8 @@ void mtr_t::commit() ...@@ -447,8 +447,8 @@ void mtr_t::commit()
m_memo.for_each_block_in_reverse(CIterate<ReleaseLatches>()); m_memo.for_each_block_in_reverse(CIterate<ReleaseLatches>());
if (lsns.second) if (UNIV_UNLIKELY(lsns.second != PAGE_FLUSH_NO))
buf_flush_ahead(m_commit_lsn); buf_flush_ahead(m_commit_lsn, lsns.second == PAGE_FLUSH_SYNC);
if (m_made_dirty) if (m_made_dirty)
srv_stats.log_write_requests.inc(); srv_stats.log_write_requests.inc();
...@@ -754,7 +754,7 @@ static void log_write_low(const void *str, size_t size) ...@@ -754,7 +754,7 @@ static void log_write_low(const void *str, size_t size)
/** Close the log at mini-transaction commit. /** Close the log at mini-transaction commit.
@return whether buffer pool flushing is needed */ @return whether buffer pool flushing is needed */
static bool log_close(lsn_t lsn) static mtr_t::page_flush_ahead log_close(lsn_t lsn)
{ {
mysql_mutex_assert_owner(&log_sys.mutex); mysql_mutex_assert_owner(&log_sys.mutex);
ut_ad(lsn == log_sys.get_lsn()); ut_ad(lsn == log_sys.get_lsn());
...@@ -777,7 +777,9 @@ static bool log_close(lsn_t lsn) ...@@ -777,7 +777,9 @@ static bool log_close(lsn_t lsn)
const lsn_t checkpoint_age= lsn - log_sys.last_checkpoint_lsn; const lsn_t checkpoint_age= lsn - log_sys.last_checkpoint_lsn;
if (UNIV_UNLIKELY(checkpoint_age >= log_sys.log_capacity)) if (UNIV_UNLIKELY(checkpoint_age >= log_sys.log_capacity) &&
/* silence message on create_log_file() after the log had been deleted */
checkpoint_age != lsn)
{ {
time_t t= time(nullptr); time_t t= time(nullptr);
if (!log_close_warned || difftime(t, log_close_warn_time) > 15) if (!log_close_warned || difftime(t, log_close_warn_time) > 15)
...@@ -786,15 +788,17 @@ static bool log_close(lsn_t lsn) ...@@ -786,15 +788,17 @@ static bool log_close(lsn_t lsn)
log_close_warn_time= t; log_close_warn_time= t;
ib::error() << "The age of the last checkpoint is " << checkpoint_age ib::error() << "The age of the last checkpoint is " << checkpoint_age
<< ", which exceeds the log capacity " << ", which exceeds the log capacity "
<< log_sys.log_capacity << "."; << log_sys.log_capacity << ".";
} }
} }
else if (UNIV_LIKELY(checkpoint_age <= log_sys.max_modified_age_async))
return mtr_t::PAGE_FLUSH_NO;
else if (UNIV_LIKELY(checkpoint_age <= log_sys.max_checkpoint_age)) else if (UNIV_LIKELY(checkpoint_age <= log_sys.max_checkpoint_age))
return false; return mtr_t::PAGE_FLUSH_ASYNC;
log_sys.set_check_flush_or_checkpoint(); log_sys.set_check_flush_or_checkpoint();
return true; return mtr_t::PAGE_FLUSH_SYNC;
} }
/** Write the block contents to the REDO log */ /** Write the block contents to the REDO log */
...@@ -858,8 +862,8 @@ inline ulint mtr_t::prepare_write() ...@@ -858,8 +862,8 @@ inline ulint mtr_t::prepare_write()
/** Append the redo log records to the redo log buffer. /** Append the redo log records to the redo log buffer.
@param len number of bytes to write @param len number of bytes to write
@return {start_lsn,flush_ahead_lsn} */ @return {start_lsn,flush_ahead} */
inline std::pair<lsn_t,bool> mtr_t::finish_write(ulint len) inline std::pair<lsn_t,mtr_t::page_flush_ahead> mtr_t::finish_write(ulint len)
{ {
ut_ad(m_log_mode == MTR_LOG_ALL); ut_ad(m_log_mode == MTR_LOG_ALL);
mysql_mutex_assert_owner(&log_sys.mutex); mysql_mutex_assert_owner(&log_sys.mutex);
...@@ -875,19 +879,19 @@ inline std::pair<lsn_t,bool> mtr_t::finish_write(ulint len) ...@@ -875,19 +879,19 @@ inline std::pair<lsn_t,bool> mtr_t::finish_write(ulint len)
m_commit_lsn = log_reserve_and_write_fast(front->begin(), len, m_commit_lsn = log_reserve_and_write_fast(front->begin(), len,
&start_lsn); &start_lsn);
if (m_commit_lsn) { if (!m_commit_lsn) {
return std::make_pair(start_lsn, false); goto piecewise;
} }
} else {
piecewise:
/* Open the database log for log_write_low */
start_lsn = log_reserve_and_open(len);
mtr_write_log write_log;
m_log.for_each_block(write_log);
m_commit_lsn = log_sys.get_lsn();
} }
page_flush_ahead flush= log_close(m_commit_lsn);
/* Open the database log for log_write_low */ DBUG_EXECUTE_IF("ib_log_flush_ahead", flush = PAGE_FLUSH_SYNC;);
start_lsn = log_reserve_and_open(len);
mtr_write_log write_log;
m_log.for_each_block(write_log);
m_commit_lsn = log_sys.get_lsn();
bool flush = log_close(m_commit_lsn);
DBUG_EXECUTE_IF("ib_log_flush_ahead", flush=true;);
return std::make_pair(start_lsn, flush); return std::make_pair(start_lsn, flush);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment