Commit b535a790 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-23399: Remove recv_writer_thread

Recovery works just fine without a separate thread whose only
task is to tell the page cleaner thread to do its job.

recv_sys_t::apply(): Flush the buffer pool at the end of each batch.

Reviewed by: Vladislav Vaintroub
parent fa70c146
......@@ -2535,41 +2535,8 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
" page cleaner thread priority can be changed."
" See the man page of setpriority().";
}
/* Signal that setpriority() has been attempted. */
os_event_set(recv_sys.flush_end);
#endif /* UNIV_LINUX */
do {
/* treat flushing requests during recovery. */
ulint n_flushed_lru = 0;
ulint n_flushed_list = 0;
os_event_wait(recv_sys.flush_start);
if (!recv_writer_thread_active) {
break;
}
if (recv_sys.flush_lru) {
/* Flush pages from end of LRU if required */
pc_request(0, LSN_MAX);
while (pc_flush_slot() > 0) {}
pc_wait_finished(&n_flushed_lru, &n_flushed_list);
} else {
/* Flush all pages */
do {
pc_request(ULINT_MAX, LSN_MAX);
while (pc_flush_slot() > 0) {}
} while (!pc_wait_finished(&n_flushed_lru,
&n_flushed_list));
}
os_event_reset(recv_sys.flush_start);
os_event_set(recv_sys.flush_end);
} while (recv_writer_thread_active);
os_event_wait(buf_flush_event);
ulint ret_sleep = 0;
ulint n_evicted = 0;
ulint n_flushed_last = 0;
......
......@@ -525,7 +525,6 @@ static PSI_mutex_info all_innodb_mutexes[] = {
PSI_KEY(page_zip_stat_per_index_mutex),
PSI_KEY(purge_sys_pq_mutex),
PSI_KEY(recv_sys_mutex),
PSI_KEY(recv_writer_mutex),
PSI_KEY(redo_rseg_mutex),
PSI_KEY(noredo_rseg_mutex),
# ifdef UNIV_DEBUG
......@@ -579,7 +578,6 @@ performance schema instrumented if "UNIV_PFS_THREAD"
is defined */
static PSI_thread_info all_innodb_threads[] = {
PSI_KEY(page_cleaner_thread),
PSI_KEY(recv_writer_thread),
PSI_KEY(trx_rollback_clean_thread),
PSI_KEY(thread_pool_thread)
};
......
......@@ -33,9 +33,6 @@ Created 9/20/1997 Heikki Tuuri
#include <deque>
/** Is recv_writer_thread active? */
extern bool recv_writer_thread_active;
/** @return whether recovery is currently running. */
#define recv_recovery_is_on() UNIV_UNLIKELY(recv_sys.recovery_on)
......@@ -226,16 +223,6 @@ struct recv_sys_t
/** whether recv_recover_page(), invoked from buf_page_read_complete(),
should apply log records*/
bool apply_log_recs;
ib_mutex_t writer_mutex;/*!< mutex coordinating
flushing between recv_writer_thread and
the recovery thread. */
os_event_t flush_start;/*!< event to activate
page cleaner threads */
os_event_t flush_end;/*!< event to signal that the page
cleaner has finished the request */
/** whether to flush from buf_pool.LRU instead of buf_pool.flush_list */
bool flush_lru;
/** whether recv_apply_hashed_log_recs() is running */
bool apply_batch_on;
byte* buf; /*!< buffer for parsing log records */
......
......@@ -519,7 +519,6 @@ extern ulong srv_buf_dump_status_frequency;
# ifdef UNIV_PFS_THREAD
extern mysql_pfs_key_t page_cleaner_thread_key;
extern mysql_pfs_key_t recv_writer_thread_key;
extern mysql_pfs_key_t trx_rollback_clean_thread_key;
extern mysql_pfs_key_t thread_pool_thread_key;
......
......@@ -66,7 +66,6 @@ extern mysql_pfs_key_t recalc_pool_mutex_key;
extern mysql_pfs_key_t page_cleaner_mutex_key;
extern mysql_pfs_key_t purge_sys_pq_mutex_key;
extern mysql_pfs_key_t recv_sys_mutex_key;
extern mysql_pfs_key_t recv_writer_mutex_key;
extern mysql_pfs_key_t rtr_active_mutex_key;
extern mysql_pfs_key_t rtr_match_mutex_key;
extern mysql_pfs_key_t rtr_path_mutex_key;
......
......@@ -254,8 +254,6 @@ enum latch_level_t {
SYNC_TRX_I_S_RWLOCK,
SYNC_RECV_WRITER,
/** Level is varying. Only used with buffer pool page locks, which
do not have a fixed level, but instead have their level set after
the page is locked; see e.g. ibuf_bitmap_get_map_page(). */
......@@ -291,7 +289,6 @@ enum latch_id_t {
LATCH_ID_PURGE_SYS_PQ,
LATCH_ID_RECALC_POOL,
LATCH_ID_RECV_SYS,
LATCH_ID_RECV_WRITER,
LATCH_ID_REDO_RSEG,
LATCH_ID_NOREDO_RSEG,
LATCH_ID_RW_LOCK_DEBUG,
......@@ -985,9 +982,6 @@ struct sync_checker : public sync_check_functor_t
{
if (some_allowed) {
switch (level) {
case SYNC_RECV_WRITER:
/* This only happens in
recv_apply_hashed_log_recs. */
case SYNC_DICT:
case SYNC_DICT_OPERATION:
case SYNC_FTS_CACHE:
......
......@@ -1555,14 +1555,6 @@ void logs_empty_and_mark_files_at_shutdown()
ut_ad(log_sys.is_initialised() || !srv_was_started);
ut_ad(fil_system.is_initialised() || !srv_was_started);
if (!srv_read_only_mode) {
if (recv_sys.flush_start) {
/* This is in case recv_writer_thread was never
started, or buf_flush_page_cleaner
failed to notice its termination. */
os_event_set(recv_sys.flush_start);
}
}
#define COUNT_INTERVAL 600U
#define CHECK_INTERVAL 100000U
os_thread_sleep(CHECK_INTERVAL);
......
......@@ -90,13 +90,6 @@ is bigger than the lsn we are able to scan up to, that is an indication that
the recovery failed and the database may be corrupt. */
static lsn_t recv_max_page_lsn;
#ifdef UNIV_PFS_THREAD
mysql_pfs_key_t recv_writer_thread_key;
#endif /* UNIV_PFS_THREAD */
/** Is recv_writer_thread active? */
bool recv_writer_thread_active;
/** Stored physical log record with logical LSN (@see log_t::FORMAT_10_5) */
struct log_phys_t : public log_rec_t
{
......@@ -915,7 +908,6 @@ fil_name_process(char* name, ulint len, ulint space_id, bool deleted)
void recv_sys_t::close()
{
ut_ad(this == &recv_sys);
ut_ad(!recv_writer_thread_active);
if (is_initialised())
{
......@@ -924,9 +916,6 @@ void recv_sys_t::close()
clear();
ut_d(mutex_exit(&mutex));
os_event_destroy(flush_start);
os_event_destroy(flush_end);
if (buf)
{
ut_free_dodump(buf, RECV_PARSING_BUF_SIZE);
......@@ -934,7 +923,6 @@ void recv_sys_t::close()
}
last_stored_lsn= 0;
mutex_free(&writer_mutex);
mutex_free(&mutex);
}
......@@ -944,80 +932,13 @@ void recv_sys_t::close()
close_files();
}
/******************************************************************//**
recv_writer thread tasked with flushing dirty pages from the buffer
pools.
@return a dummy parameter */
extern "C"
os_thread_ret_t
DECLARE_THREAD(recv_writer_thread)(
/*===============================*/
void* arg MY_ATTRIBUTE((unused)))
/*!< in: a dummy parameter required by
os_thread_create */
{
my_thread_init();
ut_ad(!srv_read_only_mode);
#ifdef UNIV_PFS_THREAD
pfs_register_thread(recv_writer_thread_key);
#endif /* UNIV_PFS_THREAD */
#ifdef UNIV_DEBUG_THREAD_CREATION
ib::info() << "recv_writer thread running, id "
<< os_thread_pf(os_thread_get_curr_id());
#endif /* UNIV_DEBUG_THREAD_CREATION */
while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
/* Wait till we get a signal to clean the LRU list.
Bounded by max wait time of 100ms. */
int64_t sig_count = os_event_reset(buf_flush_event);
os_event_wait_time_low(buf_flush_event, 100000, sig_count);
mutex_enter(&recv_sys.writer_mutex);
if (!recv_recovery_is_on()) {
mutex_exit(&recv_sys.writer_mutex);
break;
}
/* Flush pages from end of LRU if required */
os_event_reset(recv_sys.flush_end);
recv_sys.flush_lru = true;
os_event_set(recv_sys.flush_start);
os_event_wait(recv_sys.flush_end);
mutex_exit(&recv_sys.writer_mutex);
}
recv_writer_thread_active = false;
my_thread_end();
/* We count the number of threads in os_thread_exit().
A created thread should always use that to exit and not
use return() to exit. */
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
}
/** Initialize the redo log recovery subsystem. */
void recv_sys_t::create()
{
ut_ad(this == &recv_sys);
ut_ad(!is_initialised());
ut_ad(!flush_start);
ut_ad(!flush_end);
mutex_create(LATCH_ID_RECV_SYS, &mutex);
mutex_create(LATCH_ID_RECV_WRITER, &writer_mutex);
if (!srv_read_only_mode) {
flush_start = os_event_create(0);
flush_end = os_event_create(0);
}
flush_lru = true;
apply_log_recs = false;
apply_batch_on = false;
......@@ -1066,6 +987,7 @@ void recv_sys_t::debug_free()
{
ut_ad(this == &recv_sys);
ut_ad(is_initialised());
ut_ad(!recv_recovery_is_on());
mutex_enter(&mutex);
pages.clear();
......@@ -1073,15 +995,6 @@ void recv_sys_t::debug_free()
buf= nullptr;
/* wake page cleaner up to progress */
if (!srv_read_only_mode)
{
ut_ad(!recv_recovery_is_on());
ut_ad(!recv_writer_thread_active);
os_event_reset(buf_flush_event);
os_event_set(flush_start);
}
mutex_exit(&mutex);
}
......@@ -2759,35 +2672,28 @@ void recv_sys_t::apply(bool last_batch)
}
}
if (!last_batch)
if (last_batch)
/* We skipped this in buf_page_create(). */
mlog_init.mark_ibuf_exist(mtr);
else
{
/* Flush all the file pages to disk and invalidate them in buf_pool */
mutex_exit(&mutex);
mlog_init.reset();
log_mutex_exit();
}
/* Stop the recv_writer thread from issuing any LRU flush batches. */
mutex_enter(&writer_mutex);
ut_ad(!log_mutex_own());
mutex_exit(&mutex);
/* Wait for any currently run batch to end. */
buf_flush_wait_LRU_batch_end();
buf_flush_sync();
os_event_reset(flush_end);
flush_lru= false;
os_event_set(flush_start);
os_event_wait(flush_end);
if (!last_batch)
{
buf_pool_invalidate();
/* Allow batches from recv_writer thread. */
mutex_exit(&writer_mutex);
log_mutex_enter();
mutex_enter(&mutex);
mlog_init.reset();
}
else
/* We skipped this in buf_page_create(). */
mlog_init.mark_ibuf_exist(mtr);
mutex_enter(&mutex);
ut_d(after_apply= true);
clear();
......@@ -3120,7 +3026,6 @@ recv_group_scan_log_recs(
recv_sys.recovered_lsn = *contiguous_lsn;
recv_sys.scanned_checkpoint_no = 0;
ut_ad(recv_max_page_lsn == 0);
ut_ad(last_phase || !recv_writer_thread_active);
mutex_exit(&recv_sys.mutex);
lsn_t start_lsn;
......@@ -3574,11 +3479,6 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);
/* Spawn the background thread to flush dirty pages
from the buffer pools. */
recv_writer_thread_active = true;
os_thread_create(recv_writer_thread);
if (rescan) {
contiguous_lsn = checkpoint_lsn;
......@@ -3659,35 +3559,11 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
}
/** Complete recovery from a checkpoint. */
void
recv_recovery_from_checkpoint_finish(void)
void recv_recovery_from_checkpoint_finish()
{
/* Make sure that the recv_writer thread is done. This is
required because it grabs various mutexes and we want to
ensure that when we enable sync_order_checks there is no
mutex currently held by any thread. */
mutex_enter(&recv_sys.writer_mutex);
/* Free the resources of the recovery system */
recv_sys.recovery_on = false;
/* By acquring the mutex we ensure that the recv_writer thread
won't trigger any more LRU batches. Now wait for currently
in progress batches to finish. */
buf_flush_wait_LRU_batch_end();
mutex_exit(&recv_sys.writer_mutex);
ulint count = 0;
while (recv_writer_thread_active) {
++count;
os_thread_sleep(100000);
if (srv_print_verbose_log && count > 600) {
ib::info() << "Waiting for recv_writer to"
" finish flushing of buffer pool";
count = 0;
}
}
recv_sys.recovery_on= false;
recv_sys.debug_free();
......
......@@ -903,13 +903,6 @@ srv_shutdown_all_bg_threads()
ut_ad(!srv_read_only_mode);
/* e. Exit the i/o threads */
if (recv_sys.flush_start != NULL) {
os_event_set(recv_sys.flush_start);
}
if (recv_sys.flush_end != NULL) {
os_event_set(recv_sys.flush_end);
}
os_event_set(buf_flush_event);
}
......@@ -1208,7 +1201,6 @@ dberr_t srv_start(bool create_new_db)
+ 1 /* buf_dump_thread */
+ 1 /* dict_stats_thread */
+ 1 /* fts_optimize_thread */
+ 1 /* recv_writer_thread */
+ 1 /* trx_rollback_all_recovered */
+ 128 /* added as margin, for use of
InnoDB Memcached etc. */
......@@ -1336,11 +1328,6 @@ dberr_t srv_start(bool create_new_db)
if (!srv_read_only_mode) {
buf_flush_page_cleaner_init();
#ifdef UNIV_LINUX
/* Wait for the setpriority() call to finish. */
os_event_wait(recv_sys.flush_end);
#endif /* UNIV_LINUX */
srv_start_state_set(SRV_START_STATE_IO);
}
......@@ -1679,7 +1666,6 @@ dberr_t srv_start(bool create_new_db)
InnoDB files is needed. */
ut_ad(!srv_force_recovery);
ut_ad(recv_no_log_write);
buf_flush_sync();
err = fil_write_flushed_lsn(log_get_lsn());
DBUG_ASSERT(!buf_pool.any_io_pending());
log_sys.log.close_file();
......@@ -1976,11 +1962,6 @@ dberr_t srv_start(bool create_new_db)
srv_is_being_started = false;
if (!srv_read_only_mode) {
/* wake main loop of page cleaner up */
os_event_set(buf_flush_event);
}
if (srv_print_verbose_log) {
ib::info() << INNODB_VERSION_STR
<< " started; log sequence number "
......
......@@ -505,7 +505,6 @@ LatchDebug::LatchDebug()
LEVEL_MAP_INSERT(SYNC_FTS_CACHE);
LEVEL_MAP_INSERT(SYNC_DICT_OPERATION);
LEVEL_MAP_INSERT(SYNC_TRX_I_S_RWLOCK);
LEVEL_MAP_INSERT(SYNC_RECV_WRITER);
LEVEL_MAP_INSERT(SYNC_LEVEL_VARYING);
LEVEL_MAP_INSERT(SYNC_NO_ORDER_CHECK);
......@@ -765,7 +764,6 @@ LatchDebug::check_order(
case SYNC_STATS_AUTO_RECALC:
case SYNC_POOL:
case SYNC_POOL_MANAGER:
case SYNC_RECV_WRITER:
basic_check(latches, level, level);
break;
......@@ -1290,8 +1288,6 @@ sync_latch_meta_init()
LATCH_ADD_MUTEX(RECV_SYS, SYNC_RECV, recv_sys_mutex_key);
LATCH_ADD_MUTEX(RECV_WRITER, SYNC_RECV_WRITER, recv_writer_mutex_key);
LATCH_ADD_MUTEX(REDO_RSEG, SYNC_REDO_RSEG, redo_rseg_mutex_key);
LATCH_ADD_MUTEX(NOREDO_RSEG, SYNC_NOREDO_RSEG, noredo_rseg_mutex_key);
......
......@@ -55,7 +55,6 @@ mysql_pfs_key_t recalc_pool_mutex_key;
mysql_pfs_key_t page_cleaner_mutex_key;
mysql_pfs_key_t purge_sys_pq_mutex_key;
mysql_pfs_key_t recv_sys_mutex_key;
mysql_pfs_key_t recv_writer_mutex_key;
mysql_pfs_key_t redo_rseg_mutex_key;
mysql_pfs_key_t noredo_rseg_mutex_key;
mysql_pfs_key_t page_zip_stat_per_index_mutex_key;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment