Commit aae3f921 authored by Marko Mäkelä's avatar Marko Mäkelä

Cleanup recv_sys: Move things to members

recv_sys.recovery_on: Replaces recv_recovery_on.

recv_sys_t::apply(): Replaces recv_apply_hashed_log_recs().

recv_sys_var_init(): Remove.

recv_sys_t::recover_low(): Attempt to initialize a page based
on buffered redo log records.
parent a8b04c3e
......@@ -3986,7 +3986,6 @@ static bool xtrabackup_backup_func()
sync_check_init();
ut_d(sync_check_enable());
/* Reset the system variables in the recovery module. */
recv_sys_var_init();
trx_pool_init();
ut_crc32_init();
......@@ -5367,7 +5366,7 @@ static bool xtrabackup_prepare_func(char** argv)
ut_crc32_init();
recv_sys.create();
log_sys.create();
recv_recovery_on = true;
recv_sys.recovery_on = true;
#ifdef WITH_INNODB_DISALLOW_WRITES
srv_allow_writes_event = os_event_create(0);
......
......@@ -1536,8 +1536,7 @@ yet, to get valid size and flags.
@param[in,out] space tablespace */
inline void fil_space_open_if_needed(fil_space_t* space)
{
ut_d(extern volatile bool recv_recovery_on);
ut_ad(recv_recovery_on);
ut_ad(recv_recovery_is_on());
if (space->size == 0) {
/* Initially, size and flags will be set to 0,
......
......@@ -24,8 +24,7 @@ Recovery
Created 9/20/1997 Heikki Tuuri
*******************************************************/
#ifndef log0recv_h
#define log0recv_h
#pragma once
#include "ut0byte.h"
#include "buf0types.h"
......@@ -38,7 +37,7 @@ Created 9/20/1997 Heikki Tuuri
extern bool recv_writer_thread_active;
/** @return whether recovery is currently running. */
#define recv_recovery_is_on() UNIV_UNLIKELY(recv_recovery_on)
#define recv_recovery_is_on() UNIV_UNLIKELY(recv_sys.recovery_on)
/** Find the latest checkpoint in the log header.
@param[out] max_field LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
......@@ -70,16 +69,6 @@ void
recv_recovery_rollback_active(void);
/*===============================*/
/********************************************************//**
Reset the state of the recovery system variables. */
void
recv_sys_var_init(void);
/*===================*/
/** Apply recv_sys.pages to persistent data pages.
@param[in] last_batch whether redo log writes are possible */
void recv_apply_hashed_log_recs(bool last_batch);
/** Whether to store redo log records in recv_sys.pages */
enum store_t {
/** Do not store redo log records. */
......@@ -220,6 +209,8 @@ struct recv_sys_t
{
/** mutex protecting apply_log_recs and page_recv_t::state */
ib_mutex_t mutex;
/** whether we are applying redo log records during crash recovery */
bool recovery_on;
/** whether recv_recover_page(), invoked from buf_page_io_complete(),
should apply log records*/
bool apply_log_recs;
......@@ -275,6 +266,7 @@ struct recv_sys_t
/** buffered records waiting to be applied to pages */
map pages;
private:
/** Process a record that indicates that a tablespace size is being shrunk.
@param page_id first page that is not in the file
@param lsn log sequence number of the shrink operation */
......@@ -290,6 +282,7 @@ struct recv_sys_t
unsigned pages;
} truncated_undo_spaces[127];
public:
/** The contents of the doublewrite buffer */
recv_dblwr_t dblwr;
......@@ -301,6 +294,13 @@ struct recv_sys_t
void close_files() { files.clear(); }
private:
/** Attempt to initialize a page based on redo log records.
@param page_id page identifier
@param p iterator pointing to page_id
@param mtr mini-transaction
@return whether the page was successfully initialized */
inline buf_block_t *recover_low(const page_id_t page_id, map::iterator &p,
mtr_t &mtr);
/** All found log files (multiple ones are possible if we are upgrading
from before MariaDB Server 10.5.1) */
std::vector<log_file_t> files;
......@@ -316,6 +316,9 @@ struct recv_sys_t
@param[in,out] store whether to store page operations
@return whether the memory is exhausted */
inline bool is_memory_exhausted(store_t *store);
/** Apply buffered log to persistent data pages.
@param last_batch whether it is possible to write more redo log */
void apply(bool last_batch);
#ifdef UNIV_DEBUG
/** whether all redo log in the current batch has been applied */
......@@ -386,10 +389,6 @@ struct recv_sys_t
/** The recovery system */
extern recv_sys_t recv_sys;
/** TRUE when applying redo log records during crash recovery; FALSE
otherwise. Note that this is FALSE while a background thread is
rolling back incomplete transactions. */
extern volatile bool recv_recovery_on;
/** If the following is TRUE, the buffer pool file pages must be invalidated
after recovery and no ibuf operations are allowed; this will be set if
recv_sys.pages becomes too full, and log records must be merged
......@@ -420,5 +419,3 @@ times! */
/** Size of block reads when the log groups are scanned forward to do a
roll-forward */
#define RECV_SCAN_SIZE (4U << srv_page_size_shift)
#endif
......@@ -1207,7 +1207,7 @@ static bool log_preflush_pool_modified_pages(lsn_t new_oldest)
not know how up-to-date the disk version of the database is,
and we could not make a new checkpoint on the basis of the
info on the buffer pool only. */
recv_apply_hashed_log_recs(true);
recv_sys.apply(true);
}
if (new_oldest == LSN_MAX
......@@ -1334,7 +1334,7 @@ bool log_checkpoint()
os_thread_sleep(360000000););
if (recv_recovery_is_on()) {
recv_apply_hashed_log_recs(true);
recv_sys.apply(true);
}
switch (srv_file_flush_method) {
......
......@@ -60,11 +60,6 @@ Created 9/20/1997 Heikki Tuuri
/** The recovery system */
recv_sys_t recv_sys;
/** TRUE when applying redo log records during crash recovery; FALSE
otherwise. Note that this is FALSE while a background thread is
rolling back incomplete transactions. */
volatile bool recv_recovery_on;
/** TRUE when recv_init_crash_recovery() has been called. */
bool recv_needed_recovery;
#ifdef UNIV_DEBUG
......@@ -918,19 +913,6 @@ void recv_sys_t::close()
files.clear();
}
/************************************************************
Reset the state of the recovery system variables. */
void
recv_sys_var_init(void)
/*===================*/
{
recv_recovery_on = false;
recv_needed_recovery = false;
recv_lsn_checks_on = false;
recv_no_ibuf_operations = false;
recv_max_page_lsn = 0;
}
/******************************************************************//**
recv_writer thread tasked with flushing dirty pages from the buffer
pools.
......@@ -2501,65 +2483,112 @@ static void recv_read_in_area(page_id_t page_id)
}
}
/** Apply recv_sys.pages to persistent data pages.
@param[in] last_batch whether redo log writes are possible */
void recv_apply_hashed_log_recs(bool last_batch)
/** Attempt to initialize a page based on redo log records.
@param page_id page identifier
@param p iterator pointing to page_id
@param mtr mini-transaction
@return whether the page was successfully initialized */
inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id,
map::iterator &p, mtr_t &mtr)
{
ut_ad(srv_operation == SRV_OPERATION_NORMAL
|| srv_operation == SRV_OPERATION_RESTORE
|| srv_operation == SRV_OPERATION_RESTORE_EXPORT);
ut_ad(mutex_own(&mutex));
ut_ad(p->first == page_id);
page_recv_t &recs= p->second;
ut_ad(recs.state == page_recv_t::RECV_WILL_NOT_READ);
buf_block_t* block= nullptr;
mlog_init_t::init &i= mlog_init.last(page_id);
const lsn_t end_lsn = recs.log.last()->lsn;
if (end_lsn < i.lsn)
DBUG_LOG("ib_log", "skip log for page " << page_id
<< " LSN " << end_lsn << " < " << i.lsn);
else if (fil_space_t *space= fil_space_acquire_for_io(page_id.space()))
{
mtr.start();
mtr.set_log_mode(MTR_LOG_NONE);
block= buf_page_create(page_id, space->zip_size(), &mtr);
p= recv_sys.pages.find(page_id);
if (p == recv_sys.pages.end())
{
/* The page happened to exist in the buffer pool, or it was just
being read in. Before buf_page_get_with_no_latch() returned to
buf_page_create(), all changes must have been applied to the
page already. */
mtr.commit();
block= nullptr;
}
else
{
ut_ad(&recs == &p->second);
i.created= true;
buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
mtr.x_latch_at_savepoint(0, block);
recv_recover_page(block, mtr, p, space, &i);
ut_ad(mtr.has_committed());
recs.log.clear();
map::iterator r= p++;
recv_sys.pages.erase(r);
}
space->release_for_io();
}
mutex_enter(&recv_sys.mutex);
return block;
}
while (recv_sys.apply_batch_on) {
bool abort = recv_sys.found_corrupt_log;
mutex_exit(&recv_sys.mutex);
/** Apply buffered log to persistent data pages.
@param last_batch whether it is possible to write more redo log */
void recv_sys_t::apply(bool last_batch)
{
ut_ad(srv_operation == SRV_OPERATION_NORMAL ||
srv_operation == SRV_OPERATION_RESTORE ||
srv_operation == SRV_OPERATION_RESTORE_EXPORT);
mutex_enter(&mutex);
while (apply_batch_on)
{
bool abort= found_corrupt_log;
mutex_exit(&mutex);
if (abort) {
if (abort)
return;
}
os_thread_sleep(500000);
mutex_enter(&recv_sys.mutex);
mutex_enter(&mutex);
}
ut_ad(!last_batch == log_mutex_own());
recv_no_ibuf_operations = !last_batch
|| srv_operation == SRV_OPERATION_RESTORE
|| srv_operation == SRV_OPERATION_RESTORE_EXPORT;
recv_no_ibuf_operations = !last_batch ||
srv_operation == SRV_OPERATION_RESTORE ||
srv_operation == SRV_OPERATION_RESTORE_EXPORT;
ut_d(recv_no_log_write = recv_no_ibuf_operations);
mtr_t mtr;
if (recv_sys.pages.empty()) {
goto done;
} else {
const char* msg = last_batch
if (!pages.empty())
{
const char *msg= last_batch
? "Starting final batch to recover "
: "Starting a batch to recover ";
const ulint n = recv_sys.pages.size();
const ulint n= pages.size();
ib::info() << msg << n << " pages from redo log.";
sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log",
msg, n);
}
sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log", msg, n);
recv_sys.apply_log_recs = true;
recv_sys.apply_batch_on = true;
apply_log_recs= true;
apply_batch_on= true;
for (ulint id = srv_undo_tablespaces_open; id--;) {
const recv_sys_t::trunc& t= recv_sys.truncated_undo_spaces[id];
if (t.lsn) {
recv_sys.trim(page_id_t(id + srv_undo_space_id_start,
t.pages), t.lsn);
}
for (auto id= srv_undo_tablespaces_open; id--;)
{
const trunc& t= truncated_undo_spaces[id];
if (t.lsn)
trim(page_id_t(id + srv_undo_space_id_start, t.pages), t.lsn);
}
for (recv_sys_t::map::iterator p = recv_sys.pages.begin();
p != recv_sys.pages.end();) {
const page_id_t page_id = p->first;
page_recv_t& recs = p->second;
for (map::iterator p= pages.begin(); p != pages.end(); )
{
const page_id_t page_id= p->first;
page_recv_t &recs= p->second;
ut_ad(!recs.log.empty());
switch (recs.state) {
......@@ -2567,136 +2596,85 @@ void recv_apply_hashed_log_recs(bool last_batch)
case page_recv_t::RECV_BEING_PROCESSED:
p++;
continue;
case page_recv_t::RECV_WILL_NOT_READ:
recover_low(page_id, p, mtr);
continue;
case page_recv_t::RECV_NOT_PROCESSED:
mtr.start();
mtr.set_log_mode(MTR_LOG_NONE);
if (buf_block_t* block = buf_page_get_gen(
page_id, 0, RW_X_LATCH, NULL,
BUF_GET_IF_IN_POOL,
__FILE__, __LINE__, &mtr, NULL)) {
buf_block_dbg_add_level(
block, SYNC_NO_ORDER_CHECK);
if (buf_block_t *block= buf_page_get_gen(page_id, 0, RW_X_LATCH,
nullptr, BUF_GET_IF_IN_POOL,
__FILE__, __LINE__, &mtr))
{
buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
recv_recover_page(block, mtr, p);
ut_ad(mtr.has_committed());
} else {
}
else
{
mtr.commit();
recv_read_in_area(page_id);
break;
}
ignore:
{
recv_sys_t::map::iterator r = p++;
map::iterator r= p++;
r->second.log.clear();
recv_sys.pages.erase(r);
}
pages.erase(r);
continue;
case page_recv_t::RECV_WILL_NOT_READ:
mlog_init_t::init& i = mlog_init.last(page_id);
const lsn_t end_lsn = recs.log.last()->lsn;
if (end_lsn < i.lsn) {
DBUG_LOG("ib_log", "skip log for page "
<< page_id
<< " LSN " << end_lsn
<< " < " << i.lsn);
goto ignore;
}
fil_space_t* space = fil_space_acquire_for_io(
page_id.space());
if (!space) {
goto ignore;
}
mtr.start();
mtr.set_log_mode(MTR_LOG_NONE);
buf_block_t* block = buf_page_create(
page_id, space->zip_size(), &mtr);
p = recv_sys.pages.find(page_id);
if (p == recv_sys.pages.end()) {
/* The page happened to exist
in the buffer pool, or it was
just being read in. Before
buf_page_get_with_no_latch()
returned, all changes must have
been applied to the page already. */
mtr.commit();
} else {
ut_ad(&recs == &p->second);
i.created = true;
buf_block_dbg_add_level(
block, SYNC_NO_ORDER_CHECK);
mtr.x_latch_at_savepoint(0, block);
recv_recover_page(block, mtr, p, space, &i);
ut_ad(mtr.has_committed());
p->second.log.clear();
recv_sys.pages.erase(p);
}
space->release_for_io();
}
p = recv_sys.pages.lower_bound(page_id);
p= pages.lower_bound(page_id);
}
/* Wait until all the pages have been processed */
while (!pages.empty())
{
const bool abort= found_corrupt_log || found_corrupt_fs;
while (!recv_sys.pages.empty()) {
const bool abort = recv_sys.found_corrupt_log
|| recv_sys.found_corrupt_fs;
if (recv_sys.found_corrupt_fs && !srv_force_recovery) {
ib::info() << "Set innodb_force_recovery=1"
" to ignore corrupted pages.";
}
if (found_corrupt_fs && !srv_force_recovery)
ib::info() << "Set innodb_force_recovery=1 to ignore corrupted pages.";
mutex_exit(&(recv_sys.mutex));
mutex_exit(&mutex);
if (abort) {
if (abort)
return;
}
os_thread_sleep(500000);
mutex_enter(&(recv_sys.mutex));
mutex_enter(&mutex);
}
}
done:
if (!last_batch) {
/* Flush all the file pages to disk and invalidate them in
the buffer pool */
mutex_exit(&(recv_sys.mutex));
if (!last_batch)
{
/* Flush all the file pages to disk and invalidate them in buf_pool */
mutex_exit(&mutex);
log_mutex_exit();
/* Stop the recv_writer thread from issuing any LRU
flush batches. */
mutex_enter(&recv_sys.writer_mutex);
/* Stop the recv_writer thread from issuing any LRU flush batches. */
mutex_enter(&writer_mutex);
/* Wait for any currently run batch to end. */
buf_flush_wait_LRU_batch_end();
os_event_reset(recv_sys.flush_end);
recv_sys.flush_type = BUF_FLUSH_LIST;
os_event_set(recv_sys.flush_start);
os_event_wait(recv_sys.flush_end);
os_event_reset(flush_end);
flush_type = BUF_FLUSH_LIST;
os_event_set(flush_start);
os_event_wait(flush_end);
buf_pool_invalidate();
/* Allow batches from recv_writer thread. */
mutex_exit(&recv_sys.writer_mutex);
mutex_exit(&writer_mutex);
log_mutex_enter();
mutex_enter(&(recv_sys.mutex));
mutex_enter(&mutex);
mlog_init.reset();
} else {
}
else
/* We skipped this in buf_page_create(). */
mlog_init.mark_ibuf_exist(mtr);
}
ut_d(recv_sys.after_apply= true;);
recv_sys.clear();
mutex_exit(&recv_sys.mutex);
ut_d(after_apply= true);
clear();
mutex_exit(&mutex);
}
/** Check whether the number of read redo log blocks exceeds the maximum.
......@@ -3040,7 +3018,7 @@ recv_group_scan_log_recs(
do {
if (last_phase && store == STORE_NO) {
store = STORE_IF_EXISTS;
recv_apply_hashed_log_recs(false);
recv_sys.apply(false);
/* Rescan the redo logs from last stored lsn */
end_lsn = recv_sys.recovered_lsn;
}
......@@ -3271,7 +3249,7 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
return(DB_SUCCESS);
}
recv_recovery_on = true;
recv_sys.recovery_on = true;
log_mutex_enter();
......@@ -3561,7 +3539,7 @@ recv_recovery_from_checkpoint_finish(void)
mutex_enter(&recv_sys.writer_mutex);
/* Free the resources of the recovery system */
recv_recovery_on = false;
recv_sys.recovery_on = false;
/* By acquring the mutex we ensure that the recv_writer thread
won't trigger any more LRU batches. Now wait for currently
......
......@@ -807,7 +807,6 @@ srv_boot(void)
/*==========*/
{
sync_check_init();
recv_sys_var_init();
trx_pool_init();
row_mysql_init();
srv_init();
......
......@@ -1567,7 +1567,7 @@ dberr_t srv_start(bool create_new_db)
respective file pages, for the last batch of
recv_group_scan_log_recs(). */
recv_apply_hashed_log_recs(true);
recv_sys.apply(true);
if (recv_sys.found_corrupt_log
|| recv_sys.found_corrupt_fs) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment