Commit 177a571e authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-19586 Replace recv_sys_t::addr_hash with a std::map

InnoDB crash recovery buffers redo log records in a hash table.
The function recv_read_in_area() would pick a random hash bucket
and then try to submit read requests for a few nearby pages.
Let us replace the recv_sys.addr_hash with a std::map, which will
automatically be iterated in sorted order.

recv_sys_t::pages: Replaces recv_sys_t::addr_hash, recv_sys_t::n_addrs.

recv_sys_t::recs: Replaces most of recv_addr_t.

recv_t: Encapsulate a raw singly-linked list of records. This reduces
overhead compared to std::forward_list. Storage and cache overhead,
because the next-element pointer also points to the data payload.
Processing overhead, because recv_sys_t::recs_t::last will point to
the last record, so that recv_sys_t::add() can append straight to the
end of the list.

RECV_PROCESSED, RECV_DISCARDED: Remove. When a page is fully processed,
it will be deleted from recv_sys.pages.

recv_sys_t::trim(): Replaces recv_addr_trim().

recv_sys_t::add(): Use page_id_t for identifying pages.

recv_fold(), recv_hash(), recv_get_fil_addr_struct(): Remove.

recv_read_in_area(): Simplify the iteration.
parent 992d2494
......@@ -31,6 +31,7 @@ Created 10/25/1995 Heikki Tuuri
#ifndef UNIV_INNOCHECKSUM
#include "hash0hash.h"
#include "log0recv.h"
#include "dict0types.h"
#ifdef UNIV_LINUX
......
......@@ -29,7 +29,6 @@ Created 9/20/1997 Heikki Tuuri
#include "ut0byte.h"
#include "buf0types.h"
#include "hash0hash.h"
#include "log0log.h"
#include "mtr0types.h"
......@@ -48,10 +47,10 @@ dberr_t
recv_find_max_checkpoint(ulint* max_field)
MY_ATTRIBUTE((nonnull, warn_unused_result));
/** Reduces recv_sys.n_addrs for the corrupted page.
/** Remove records for a corrupted page.
This function should called when srv_force_recovery > 0.
@param[in] bpage buffer pool page */
void recv_recover_corrupt_page(buf_page_t* bpage);
ATTRIBUTE_COLD void recv_recover_corrupt_page(buf_page_t* bpage);
/** Apply any buffered redo log to a page that was just read from a data file.
@param[in,out] bpage buffer pool page */
......@@ -80,13 +79,13 @@ void
recv_sys_var_init(void);
/*===================*/
/** Apply the hash table of stored log records to persistent data pages.
/** Apply recv_sys.pages to persistent data pages.
@param[in] last_batch whether the change buffer merge will be
performed as part of the operation */
void
recv_apply_hashed_log_recs(bool last_batch);
/** Whether to store redo log records to the hash table */
/** Whether to store redo log records in recv_sys.pages */
enum store_t {
/** Do not store redo log records. */
STORE_NO,
......@@ -105,8 +104,8 @@ recv_sys.parse_start_lsn is non-zero.
@return true if more data added */
bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn);
/** Parse log records from a buffer and optionally store them to a
hash table to wait merging to file pages.
/** Parse log records from a buffer and optionally store them in recv_sys.pages
to wait merging to file pages.
@param[in] checkpoint_lsn the LSN of the latest checkpoint
@param[in] store whether to store page operations
@param[in] apply whether to apply the records
......@@ -144,8 +143,12 @@ struct recv_data_t{
/** Stored log record struct */
struct recv_t{
mlog_id_t type; /*!< log record type */
ulint len; /*!< log record body length in bytes */
/** next record */
recv_t* next;
/** log record body length in bytes */
uint32_t len;
/** log record type */
mlog_id_t type;
recv_data_t* data; /*!< chain of blocks containing the log record
body */
lsn_t start_lsn;/*!< start lsn of the log segment written by
......@@ -156,8 +159,6 @@ struct recv_t{
the mtr which generated this log record: NOTE
that this is not necessarily the end lsn of
this log record */
UT_LIST_NODE_T(recv_t)
rec_list;/*!< list of log records for this page */
};
struct recv_dblwr_t {
......@@ -205,7 +206,7 @@ struct recv_sys_t{
lsn_t parse_start_lsn;
/*!< this is the lsn from which we were able to
start parsing log records and adding them to
the hash table; zero if a suitable
pages; zero if a suitable
start point not found yet */
lsn_t scanned_lsn;
/*!< the log data has been scanned up to this
......@@ -234,9 +235,38 @@ struct recv_sys_t{
ib_time_t progress_time;
mem_heap_t* heap; /*!< memory heap of log records and file
addresses*/
hash_table_t* addr_hash;/*!< hash table of file addresses of pages */
ulint n_addrs;/*!< number of not processed hashed file
addresses in the hash table */
/** buffered records waiting to be applied to a page */
struct recs_t
{
/** Recovery state */
enum {
/** not yet processed */
RECV_NOT_PROCESSED,
/** not processed; the page will be reinitialized */
RECV_WILL_NOT_READ,
/** page is being read */
RECV_BEING_READ,
/** log records are being applied on the page */
RECV_BEING_PROCESSED
} state;
/** First log record */
recv_t* log;
/** Last log record */
recv_t* last;
};
using map = std::map<const page_id_t, recs_t,
std::less<const page_id_t>,
ut_allocator<std::pair<const page_id_t,recs_t>>>;
/** buffered records waiting to be applied to pages */
map pages;
/** Process a record that indicates that a tablespace is
being shrunk in size.
@param page_id first page identifier that is not in the file
@param lsn log sequence number of the shrink operation */
inline void trim(const page_id_t page_id, lsn_t lsn);
/** Undo tablespaces for which truncate has been logged
(indexed by id - srv_undo_space_id_start) */
......@@ -249,7 +279,7 @@ struct recv_sys_t{
recv_dblwr_t dblwr;
/** Lastly added LSN to the hash table of log records. */
/** Last added LSN to pages. */
lsn_t last_stored_lsn;
/** Initialize the redo log recovery subsystem. */
......@@ -265,13 +295,12 @@ struct recv_sys_t{
/** Store a redo log record for applying.
@param type record type
@param space tablespace identifier
@param page_no page number
@param page_id page identifier
@param body record body
@param rec_end end of record
@param lsn start LSN of the mini-transaction
@param end_lsn end LSN of the mini-transaction */
inline void add(mlog_id_t type, ulint space, ulint page_no,
inline void add(mlog_id_t type, const page_id_t page_id,
byte* body, byte* rec_end, lsn_t lsn,
lsn_t end_lsn);
......@@ -301,8 +330,8 @@ otherwise. Note that this is FALSE while a background thread is
rolling back incomplete transactions. */
extern volatile bool recv_recovery_on;
/** If the following is TRUE, the buffer pool file pages must be invalidated
after recovery and no ibuf operations are allowed; this becomes TRUE if
the log record hash table becomes too full, and log records must be merged
after recovery and no ibuf operations are allowed; this will be set if
recv_sys.pages becomes too full, and log records must be merged
to file pages already before the recovery is finished: in this case no
ibuf operations are allowed, as they could modify the pages read in the
buffer pool before the pages have been recovered to the up-to-date state.
......
......@@ -61,7 +61,7 @@ this must be less than srv_page_size as it is stored in the buffer pool */
#define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t))
/** Read-ahead area in applying log records to file pages */
#define RECV_READ_AHEAD_AREA 32
#define RECV_READ_AHEAD_AREA 32U
/** The recovery system */
recv_sys_t recv_sys;
......@@ -178,37 +178,6 @@ typedef std::map<
static recv_spaces_t recv_spaces;
/** States of recv_addr_t */
enum recv_addr_state {
/** not yet processed */
RECV_NOT_PROCESSED,
/** not processed; the page will be reinitialized */
RECV_WILL_NOT_READ,
/** page is being read */
RECV_BEING_READ,
/** log records are being applied on the page */
RECV_BEING_PROCESSED,
/** log records have been applied on the page */
RECV_PROCESSED,
/** log records have been discarded because the tablespace
does not exist */
RECV_DISCARDED
};
/** Hashed page file address struct */
struct recv_addr_t{
/** recovery state of the page */
recv_addr_state state;
/** tablespace identifier */
unsigned space:32;
/** page number */
unsigned page_no:32;
/** list of log records for this page */
UT_LIST_BASE_NODE_T(recv_t) rec_list;
/** hash node in the hash bucket chain */
hash_node_t addr_hash;
};
/** Report optimized DDL operation (without redo log),
corresponding to MLOG_INDEX_LOAD.
@param[in] space_id tablespace identifier
......@@ -250,19 +219,18 @@ class mlog_init_t
ut_allocator<std::pair<const page_id_t, init> > >
map;
/** Map of page initialization operations.
FIXME: Merge this to recv_sys.addr_hash! */
FIXME: Merge this to recv_sys.pages! */
map inits;
public:
/** Record that a page will be initialized by the redo log.
@param[in] space tablespace identifier
@param[in] page_no page number
@param[in] page_id page identifier
@param[in] lsn log sequence number */
void add(ulint space, ulint page_no, lsn_t lsn)
void add(const page_id_t page_id, lsn_t lsn)
{
ut_ad(mutex_own(&recv_sys.mutex));
const init init = { lsn, false };
std::pair<map::iterator, bool> p = inits.insert(
map::value_type(page_id_t(space, page_no), init));
map::value_type(page_id, init));
ut_ad(!p.first->second.created);
if (!p.second && p.first->second.lsn < init.lsn) {
p.first->second = init;
......@@ -337,54 +305,43 @@ class mlog_init_t
static mlog_init_t mlog_init;
/** Process a MLOG_CREATE2 record that indicates that a tablespace
is being shrunk in size.
@param[in] space_id tablespace identifier
@param[in] pages trimmed size of the file, in pages
@param[in] lsn log sequence number of the operation */
static void recv_addr_trim(ulint space_id, unsigned pages, lsn_t lsn)
/** Process a record that indicates that a tablespace is
being shrunk in size.
@param page_id first page identifier that is not in the file
@param lsn log sequence number of the shrink operation */
inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn)
{
DBUG_ENTER("recv_addr_trim");
DBUG_ENTER("recv_sys_t::trim");
DBUG_LOG("ib_log",
"discarding log beyond end of tablespace "
<< page_id_t(space_id, pages) << " before LSN " << lsn);
ut_ad(mutex_own(&recv_sys.mutex));
for (ulint i = recv_sys.addr_hash->n_cells; i--; ) {
hash_cell_t* const cell = hash_get_nth_cell(
recv_sys.addr_hash, i);
for (recv_addr_t* addr = static_cast<recv_addr_t*>(cell->node),
*next;
addr; addr = next) {
next = static_cast<recv_addr_t*>(addr->addr_hash);
if (addr->space != space_id || addr->page_no < pages) {
continue;
}
for (recv_t* recv = UT_LIST_GET_FIRST(addr->rec_list);
recv; ) {
recv_t* n = UT_LIST_GET_NEXT(rec_list, recv);
if (recv->start_lsn < lsn) {
DBUG_PRINT("ib_log",
("Discarding %s for"
" page %u:%u at " LSN_PF,
get_mlog_string(
recv->type),
addr->space, addr->page_no,
recv->start_lsn));
UT_LIST_REMOVE(addr->rec_list, recv);
<< page_id << " before LSN " << lsn);
ut_ad(mutex_own(&mutex));
for (recv_sys_t::map::iterator p = pages.lower_bound(page_id);
p != pages.end() && p->first.space() == page_id.space();) {
for (recv_t** prev = &p->second.log;;) {
if (!*prev || (*prev)->start_lsn >= lsn) {
break;
}
recv = n;
DBUG_LOG("ib_log", "Discarding "
<< get_mlog_string((*prev)->type)
<< " for " << p->first << " at "
<< (*prev)->start_lsn);
*prev = (*prev)->next;
}
recv_sys_t::map::iterator r = p++;
if (!r->second.log) {
pages.erase(r);
}
}
if (fil_space_t* space = fil_space_get(space_id)) {
if (fil_space_t* space = fil_space_get(page_id.space())) {
ut_ad(UT_LIST_GET_LEN(space->chain) == 1);
fil_node_t* file = UT_LIST_GET_FIRST(space->chain);
ut_ad(file->is_open());
os_file_truncate(file->name, file->handle,
os_offset_t(pages) << srv_page_size_shift,
true);
os_offset_t{page_id.page_no()}
<< srv_page_size_shift, true);
}
DBUG_VOID_RETURN;
}
......@@ -711,11 +668,7 @@ void recv_sys_t::close()
if (is_initialised()) {
dblwr.pages.clear();
if (addr_hash) {
hash_table_free(addr_hash);
addr_hash = NULL;
}
pages.clear();
if (heap) {
mem_heap_free(heap);
......@@ -859,8 +812,6 @@ void recv_sys_t::create()
found_corrupt_fs = false;
mlog_checkpoint_lsn = 0;
addr_hash = hash_create(size / 512);
n_addrs = 0;
progress_time = ut_time();
recv_max_page_lsn = 0;
......@@ -872,12 +823,8 @@ void recv_sys_t::create()
inline void recv_sys_t::empty()
{
ut_ad(mutex_own(&mutex));
ut_a(n_addrs == 0);
hash_table_free(addr_hash);
pages.clear();
mem_heap_empty(heap);
addr_hash = hash_create(buf_pool_get_curr_size() / 512);
}
/** Free most recovery data structures. */
......@@ -887,13 +834,12 @@ void recv_sys_t::debug_free()
ut_ad(is_initialised());
mutex_enter(&mutex);
hash_table_free(addr_hash);
pages.clear();
mem_heap_free(heap);
ut_free_dodump(buf, buf_size);
buf = NULL;
heap = NULL;
addr_hash = NULL;
/* wake page cleaner up to progress */
if (!srv_read_only_mode) {
......@@ -1752,74 +1698,14 @@ recv_parse_or_apply_log_rec_body(
return(ptr);
}
/*********************************************************************//**
Calculates the fold value of a page file address: used in inserting or
searching for a log record in the hash table.
@return folded value */
UNIV_INLINE
ulint
recv_fold(
/*======*/
ulint space, /*!< in: space */
ulint page_no)/*!< in: page number */
{
return(ut_fold_ulint_pair(space, page_no));
}
/*********************************************************************//**
Calculates the hash value of a page file address: used in inserting or
searching for a log record in the hash table.
@return folded value */
UNIV_INLINE
ulint
recv_hash(
/*======*/
ulint space, /*!< in: space */
ulint page_no)/*!< in: page number */
{
return(hash_calc_hash(recv_fold(space, page_no), recv_sys.addr_hash));
}
/*********************************************************************//**
Gets the hashed file address struct for a page.
@return file address struct, NULL if not found from the hash table */
static
recv_addr_t*
recv_get_fil_addr_struct(
/*=====================*/
ulint space, /*!< in: space id */
ulint page_no)/*!< in: page number */
{
ut_ad(mutex_own(&recv_sys.mutex));
recv_addr_t* recv_addr;
for (recv_addr = static_cast<recv_addr_t*>(
HASH_GET_FIRST(recv_sys.addr_hash,
recv_hash(space, page_no)));
recv_addr != 0;
recv_addr = static_cast<recv_addr_t*>(
HASH_GET_NEXT(addr_hash, recv_addr))) {
if (recv_addr->space == space
&& recv_addr->page_no == page_no) {
return(recv_addr);
}
}
return(NULL);
}
/** Store a redo log record for applying.
@param type record type
@param space tablespace identifier
@param page_no page number
@param page_id page identifier
@param body record body
@param rec_end end of record
@param lsn start LSN of the mini-transaction
@param end_lsn end LSN of the mini-transaction */
inline void recv_sys_t::add(mlog_id_t type, ulint space, ulint page_no,
inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
byte* body, byte* rec_end, lsn_t lsn,
lsn_t end_lsn)
{
......@@ -1832,46 +1718,33 @@ inline void recv_sys_t::add(mlog_id_t type, ulint space, ulint page_no,
ut_ad(type != MLOG_INDEX_LOAD);
ut_ad(type != MLOG_TRUNCATE);
recv_t* recv= static_cast<recv_t*>(mem_heap_alloc(heap, sizeof *recv));
recv->type = type;
recv->len = ulint(rec_end - body);
recv->start_lsn = lsn;
recv->end_lsn = end_lsn;
recv_addr_t* recv_addr = recv_get_fil_addr_struct(space, page_no);
if (recv_addr == NULL) {
recv_addr = static_cast<recv_addr_t*>(
mem_heap_alloc(heap, sizeof(recv_addr_t)));
recv_addr->space = space;
recv_addr->page_no = page_no;
recv_addr->state = RECV_NOT_PROCESSED;
UT_LIST_INIT(recv_addr->rec_list, &recv_t::rec_list);
HASH_INSERT(recv_addr_t, addr_hash, addr_hash,
recv_fold(space, page_no), recv_addr);
n_addrs++;
}
std::pair<map::iterator, bool> p = pages.insert(
map::value_type(page_id, recs_t{recs_t::RECV_NOT_PROCESSED,
NULL, NULL}));
recv_sys_t::recs_t& recs = p.first->second;
ut_ad(p.second == !recs.log);
ut_ad(p.second == !recs.last);
recv_data_t** prev_field;
switch (type) {
case MLOG_INIT_FILE_PAGE2:
case MLOG_ZIP_PAGE_COMPRESS:
case MLOG_INIT_FREE_PAGE:
/* Ignore any earlier redo log records for this page. */
ut_ad(recv_addr->state == RECV_NOT_PROCESSED
|| recv_addr->state == RECV_WILL_NOT_READ);
recv_addr->state = RECV_WILL_NOT_READ;
mlog_init.add(space, page_no, lsn);
ut_ad(recs.state == recs_t::RECV_NOT_PROCESSED
|| recs.state == recs_t::RECV_WILL_NOT_READ);
recs.state = recs_t::RECV_WILL_NOT_READ;
mlog_init.add(page_id, lsn);
recs.last = NULL;
/* fall through */
default:
break;
}
UT_LIST_ADD_LAST(recv_addr->rec_list, recv);
recv_t** prev = recs.last ? &recs.last->next : &recs.log;
recv_data_t** prev_field = &recv->data;
*prev = recs.last = new (mem_heap_alloc(heap, sizeof(recv_t)))
recv_t{NULL, uint32_t(rec_end - body), type, NULL,
lsn, end_lsn};
prev_field = &(*prev)->data;
}
/* Store the log record body in chunks of less than srv_page_size:
heap grows into the buffer pool, and bigger chunks could not
......@@ -1906,39 +1779,31 @@ void
recv_data_copy_to_buf(
/*==================*/
byte* buf, /*!< in: buffer of length at least recv->len */
recv_t* recv) /*!< in: log record */
const recv_t& recv) /*!< in: log record */
{
recv_data_t* recv_data;
ulint part_len;
ulint len;
len = recv->len;
recv_data = recv->data;
const recv_data_t* recv_data = recv.data;
ulint len = recv.len;
while (len > 0) {
if (len > RECV_DATA_BLOCK_SIZE) {
part_len = RECV_DATA_BLOCK_SIZE;
} else {
part_len = len;
}
ut_memcpy(buf, ((byte*) recv_data) + sizeof(recv_data_t),
do {
const ulint part_len = std::min<ulint>(len,
RECV_DATA_BLOCK_SIZE);
memcpy(buf, &reinterpret_cast<const byte*>(recv_data)[
sizeof(recv_data_t)],
part_len);
recv_data = recv_data->next;
buf += part_len;
len -= part_len;
recv_data = recv_data->next;
}
} while (len);
}
/** Apply the hashed log records to the page, if the page lsn is less than the
lsn of a log record.
@param[in,out] block buffer pool page
@param[in,out] mtr mini-transaction
@param[in,out] recv_addr recovery address
@param[in,out] p recovery address
@param[in,out] init page initialization operation, or NULL */
static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
recv_addr_t* recv_addr,
const recv_sys_t::map::iterator& p,
mlog_init_t::init* init = NULL)
{
page_t* page;
......@@ -1947,19 +1812,18 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
ut_ad(mutex_own(&recv_sys.mutex));
ut_ad(recv_sys.apply_log_recs);
ut_ad(recv_needed_recovery);
ut_ad(recv_addr->state != RECV_BEING_PROCESSED);
ut_ad(recv_addr->state != RECV_PROCESSED);
ut_ad(!init || init->created);
ut_ad(!init || init->lsn);
ut_ad(block->page.id == p->first);
ut_ad(p->second.state != recv_sys_t::recs_t::RECV_BEING_PROCESSED);
if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
fprintf(stderr, "Applying log to page %u:%u\n",
recv_addr->space, recv_addr->page_no);
ib::info() << "Applying log to page " << block->page.id;
}
DBUG_LOG("ib_log", "Applying log to page " << block->page.id);
recv_addr->state = RECV_BEING_PROCESSED;
p->second.state = recv_sys_t::recs_t::RECV_BEING_PROCESSED;
mutex_exit(&recv_sys.mutex);
page = block->frame;
......@@ -1974,11 +1838,15 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
bool free_page = false;
lsn_t start_lsn = 0, end_lsn = 0;
ut_d(lsn_t recv_start_lsn = 0);
const lsn_t init_lsn = init ? init->lsn : 0;
for (recv_t* recv = UT_LIST_GET_FIRST(recv_addr->rec_list);
recv; recv = UT_LIST_GET_NEXT(rec_list, recv)) {
for (const recv_t* recv = p->second.log; recv; recv = recv->next) {
ut_ad(recv->start_lsn);
ut_ad(recv->end_lsn);
ut_ad(recv_start_lsn < recv->start_lsn);
ut_d(recv_start_lsn = recv->start_lsn);
ut_ad(end_lsn <= recv->end_lsn);
end_lsn = recv->end_lsn;
ut_ad(end_lsn <= log_sys.log.scanned_lsn);
......@@ -2003,10 +1871,10 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
}
if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
fprintf(stderr, "apply " LSN_PF ":"
" %d len " ULINTPF " page %u:%u\n",
recv->start_lsn, recv->type, recv->len,
recv_addr->space, recv_addr->page_no);
ib::info() << "apply " << recv->start_lsn
<< ":" << recv->type
<< " len " << recv->len
<< " page " << block->page.id;
}
DBUG_LOG("ib_log", "apply " << recv->start_lsn << ": "
......@@ -2021,7 +1889,7 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
a separate buffer */
buf = static_cast<byte*>
(ut_malloc_nokey(recv->len));
recv_data_copy_to_buf(buf, recv);
recv_data_copy_to_buf(buf, *recv);
} else {
buf = reinterpret_cast<byte*>(recv->data)
+ sizeof *recv->data;
......@@ -2082,42 +1950,25 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
recv_max_page_lsn = page_lsn;
}
ut_ad(recv_addr->state == RECV_BEING_PROCESSED);
recv_addr->state = RECV_PROCESSED;
ut_ad(p->second.state == recv_sys_t::recs_t::RECV_BEING_PROCESSED);
ut_ad(!recv_sys.pages.empty());
recv_sys.pages.erase(p);
ut_a(recv_sys.n_addrs > 0);
if (ulint n = --recv_sys.n_addrs) {
if (recv_sys.report(time)) {
const ulint n = recv_sys.pages.size();
ib::info() << "To recover: " << n << " pages from log";
service_manager_extend_timeout(
INNODB_EXTEND_TIMEOUT_INTERVAL, "To recover: " ULINTPF " pages from log", n);
}
}
}
/** Reduces recv_sys.n_addrs for the corrupted page.
/** Remove records for a corrupted page.
This function should called when srv_force_recovery > 0.
@param[in] bpage buffer pool page */
void recv_recover_corrupt_page(buf_page_t* bpage)
{
ut_ad(srv_force_recovery);
mutex_enter(&recv_sys.mutex);
if (!recv_sys.apply_log_recs) {
mutex_exit(&recv_sys.mutex);
return;
}
recv_addr_t* recv_addr = recv_get_fil_addr_struct(
bpage->id.space(), bpage->id.page_no());
ut_ad(recv_addr->state != RECV_WILL_NOT_READ);
if (recv_addr->state != RECV_BEING_PROCESSED
&& recv_addr->state != RECV_PROCESSED) {
recv_sys.n_addrs--;
}
recv_sys.pages.erase(bpage->id);
mutex_exit(&recv_sys.mutex);
}
......@@ -2144,15 +1995,12 @@ void recv_recover_page(buf_page_t* bpage)
ut_a(success);
mutex_enter(&recv_sys.mutex);
if (!recv_sys.apply_log_recs) {
} else if (recv_addr_t* recv_addr = recv_get_fil_addr_struct(
bpage->id.space(), bpage->id.page_no())) {
switch (recv_addr->state) {
case RECV_BEING_PROCESSED:
case RECV_PROCESSED:
break;
default:
recv_recover_page(block, mtr, recv_addr);
if (recv_sys.apply_log_recs) {
recv_sys_t::map::iterator p = recv_sys.pages.find(bpage->id);
if (p != recv_sys.pages.end()
&& p->second.state
!= recv_sys_t::recs_t::RECV_BEING_PROCESSED) {
recv_recover_page(block, mtr, p);
goto func_exit;
}
}
......@@ -2166,32 +2014,35 @@ void recv_recover_page(buf_page_t* bpage)
/** Reads in pages which have hashed log records, from an area around a given
page number.
@param[in] page_id page id */
static void recv_read_in_area(const page_id_t page_id)
static void recv_read_in_area(page_id_t page_id)
{
ulint page_nos[RECV_READ_AHEAD_AREA];
ulint page_no = page_id.page_no()
- (page_id.page_no() % RECV_READ_AHEAD_AREA);
compile_time_assert(ut_is_2pow(RECV_READ_AHEAD_AREA));
page_id.set_page_no(ut_2pow_round(page_id.page_no(),
RECV_READ_AHEAD_AREA));
const ulint up_limit = page_id.page_no() + RECV_READ_AHEAD_AREA;
ulint* p = page_nos;
for (const ulint up_limit = page_no + RECV_READ_AHEAD_AREA;
page_no < up_limit; page_no++) {
recv_addr_t* recv_addr = recv_get_fil_addr_struct(
page_id.space(), page_no);
if (recv_addr
&& recv_addr->state == RECV_NOT_PROCESSED
&& !buf_page_peek(page_id_t(page_id.space(), page_no))) {
recv_addr->state = RECV_BEING_READ;
*p++ = page_no;
for (recv_sys_t::map::iterator i= recv_sys.pages.lower_bound(page_id);
i != recv_sys.pages.end()
&& i->first.space() == page_id.space()
&& i->first.page_no() < up_limit; i++) {
if (i->second.state == recv_sys_t::recs_t::RECV_NOT_PROCESSED
&& !buf_page_peek(i->first)) {
i->second.state = recv_sys_t::recs_t::RECV_BEING_READ;
*p++ = i->first.page_no();
}
}
if (p != page_nos) {
mutex_exit(&recv_sys.mutex);
buf_read_recv_pages(FALSE, page_id.space(), page_nos,
ulint(p - page_nos));
mutex_enter(&recv_sys.mutex);
}
}
/** Apply the hash table of stored log records to persistent data pages.
/** Apply recv_sys.pages to persistent data pages.
@param[in] last_batch whether the change buffer merge will be
performed as part of the operation */
void recv_apply_hashed_log_recs(bool last_batch)
......@@ -2222,7 +2073,12 @@ void recv_apply_hashed_log_recs(bool last_batch)
ut_d(recv_no_log_write = recv_no_ibuf_operations);
if (ulint n = recv_sys.n_addrs) {
mtr_t mtr;
if (recv_sys.pages.empty()) {
goto done;
}
if (!log_sys.log.subformat && !srv_force_recovery
&& srv_undo_tablespaces_open) {
ib::error() << "Recovery of separately logged"
......@@ -2232,58 +2088,40 @@ void recv_apply_hashed_log_recs(bool last_batch)
recv_sys.found_corrupt_log = true;
mutex_exit(&recv_sys.mutex);
return;
}
} else {
const char* msg = last_batch
? "Starting final batch to recover "
: "Starting a batch to recover ";
const ulint n = recv_sys.pages.size();
ib::info() << msg << n << " pages from redo log.";
sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log",
msg, n);
}
recv_sys.apply_log_recs = true;
recv_sys.apply_batch_on = true;
for (ulint id = srv_undo_tablespaces_open; id--; ) {
recv_sys_t::trunc& t = recv_sys.truncated_undo_spaces[id];
for (ulint id = srv_undo_tablespaces_open; id--;) {
const recv_sys_t::trunc& t= recv_sys.truncated_undo_spaces[id];
if (t.lsn) {
recv_addr_trim(id + srv_undo_space_id_start, t.pages,
t.lsn);
recv_sys.trim(page_id_t(id + srv_undo_space_id_start,
t.pages), t.lsn);
}
}
mtr_t mtr;
for (ulint i = 0; i < hash_get_n_cells(recv_sys.addr_hash); i++) {
for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
HASH_GET_FIRST(recv_sys.addr_hash, i));
recv_addr;
recv_addr = static_cast<recv_addr_t*>(
HASH_GET_NEXT(addr_hash, recv_addr))) {
if (!UT_LIST_GET_LEN(recv_addr->rec_list)) {
ignore:
ut_a(recv_sys.n_addrs);
recv_sys.n_addrs--;
continue;
}
for (recv_sys_t::map::iterator p = recv_sys.pages.begin();
p != recv_sys.pages.end();) {
const page_id_t page_id = p->first;
recv_sys_t::recs_t& recs = p->second;
ut_ad(recs.log);
switch (recv_addr->state) {
case RECV_BEING_READ:
case RECV_BEING_PROCESSED:
case RECV_PROCESSED:
switch (recs.state) {
case recv_sys_t::recs_t::RECV_BEING_READ:
case recv_sys_t::recs_t::RECV_BEING_PROCESSED:
p++;
continue;
case RECV_DISCARDED:
goto ignore;
case RECV_NOT_PROCESSED:
case RECV_WILL_NOT_READ:
break;
}
const page_id_t page_id(recv_addr->space,
recv_addr->page_no);
if (recv_addr->state == RECV_NOT_PROCESSED) {
apply:
case recv_sys_t::recs_t::RECV_NOT_PROCESSED:
apply:
mtr.start();
mtr.set_log_mode(MTR_LOG_NONE);
if (buf_block_t* block = buf_page_get_gen(
......@@ -2292,38 +2130,43 @@ void recv_apply_hashed_log_recs(bool last_batch)
__FILE__, __LINE__, &mtr, NULL)) {
buf_block_dbg_add_level(
block, SYNC_NO_ORDER_CHECK);
recv_recover_page(block, mtr,
recv_addr);
recv_recover_page(block, mtr, p);
ut_ad(mtr.has_committed());
} else {
mtr.commit();
recv_read_in_area(page_id);
}
} else {
break;
case recv_sys_t::recs_t::RECV_WILL_NOT_READ:
mlog_init_t::init& i = mlog_init.last(page_id);
const lsn_t end_lsn = UT_LIST_GET_LAST(
recv_addr->rec_list)->end_lsn;
lsn_t end_lsn = 0;
for (const recv_t* r = recs.log; r; r = r->next) {
ut_ad(r->end_lsn);
ut_ad(r->end_lsn >= end_lsn);
end_lsn = r->end_lsn;
}
if (end_lsn < i.lsn) {
DBUG_LOG("ib_log", "skip log for page "
<< page_id
<< " LSN " << end_lsn
<< " < " << i.lsn);
skip:
recv_addr->state = RECV_PROCESSED;
goto ignore;
ignore:
recv_sys_t::map::iterator r = p++;
recv_sys.pages.erase(r);
continue;
}
fil_space_t* space = fil_space_acquire_for_io(
recv_addr->space);
page_id.space());
if (!space) {
goto skip;
goto ignore;
}
if (space->enable_lsn) {
do_read:
space->release_for_io();
recv_addr->state = RECV_NOT_PROCESSED;
recs.state = recv_sys_t::recs_t::
RECV_NOT_PROCESSED;
goto apply;
}
......@@ -2353,7 +2196,8 @@ void recv_apply_hashed_log_recs(bool last_batch)
mtr.set_log_mode(MTR_LOG_NONE);
buf_block_t* block = buf_page_create(
page_id, space->zip_size(), &mtr);
if (recv_addr->state == RECV_PROCESSED) {
p = recv_sys.pages.find(page_id);
if (p == recv_sys.pages.end()) {
/* The page happened to exist
in the buffer pool, or it was
just being read in. Before
......@@ -2362,23 +2206,24 @@ void recv_apply_hashed_log_recs(bool last_batch)
been applied to the page already. */
mtr.commit();
} else {
ut_ad(&recs == &p->second);
i.created = true;
buf_block_dbg_add_level(
block, SYNC_NO_ORDER_CHECK);
mtr.x_latch_at_savepoint(0, block);
recv_recover_page(block, mtr,
recv_addr, &i);
recv_recover_page(block, mtr, p, &i);
ut_ad(mtr.has_committed());
}
space->release_for_io();
}
}
p = recv_sys.pages.lower_bound(page_id);
}
/* Wait until all the pages have been processed */
while (recv_sys.n_addrs != 0) {
while (!recv_sys.pages.empty()) {
const bool abort = recv_sys.found_corrupt_log
|| recv_sys.found_corrupt_fs;
......@@ -2398,6 +2243,7 @@ void recv_apply_hashed_log_recs(bool last_batch)
mutex_enter(&(recv_sys.mutex));
}
done:
if (!last_batch) {
/* Flush all the file pages to disk and invalidate them in
the buffer pool */
......@@ -2784,7 +2630,7 @@ bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply)
/* fall through */
case STORE_YES:
recv_sys.add(
type, space, page_no, body,
type, page_id_t(space, page_no), body,
ptr + len, old_lsn,
recv_sys.recovered_lsn);
}
......@@ -2968,7 +2814,8 @@ bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply)
/* fall through */
case STORE_YES:
recv_sys.add(
type, space, page_no,
type,
page_id_t(space, page_no),
body, ptr + len,
old_lsn,
new_recovered_lsn);
......@@ -3278,7 +3125,6 @@ recv_group_scan_log_recs(
mutex_enter(&recv_sys.mutex);
recv_sys.len = 0;
recv_sys.recovered_offset = 0;
recv_sys.n_addrs = 0;
recv_sys.empty();
srv_start_lsn = *contiguous_lsn;
recv_sys.parse_start_lsn = *contiguous_lsn;
......@@ -3386,16 +3232,13 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
{
dberr_t err = DB_SUCCESS;
for (ulint h = 0; h < hash_get_n_cells(recv_sys.addr_hash); h++) {
for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
HASH_GET_FIRST(recv_sys.addr_hash, h));
recv_addr != 0;
recv_addr = static_cast<recv_addr_t*>(
HASH_GET_NEXT(addr_hash, recv_addr))) {
const ulint space = recv_addr->space;
for (recv_sys_t::map::iterator p = recv_sys.pages.begin();
p != recv_sys.pages.end();) {
ut_ad(p->second.log);
const ulint space = p->first.space();
if (is_predefined_tablespace(space)) {
next:
p++;
continue;
}
......@@ -3403,19 +3246,19 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
ut_ad(i != recv_spaces.end());
switch (i->second.status) {
case file_name_t::NORMAL:
goto next;
case file_name_t::MISSING:
err = recv_init_missing_space(err, i);
i->second.status = file_name_t::DELETED;
/* fall through */
case file_name_t::DELETED:
recv_addr->state = RECV_DISCARDED;
/* fall through */
case file_name_t::NORMAL:
recv_sys_t::map::iterator r = p++;
recv_sys.pages.erase(r);
continue;
}
ut_ad(0);
}
}
if (err != DB_SUCCESS) {
return(err);
......@@ -3567,7 +3410,7 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
const lsn_t end_lsn = mach_read_from_8(
buf + LOG_CHECKPOINT_END_LSN);
ut_ad(recv_sys.n_addrs == 0);
ut_ad(recv_sys.pages.empty());
contiguous_lsn = checkpoint_lsn;
switch (log_sys.log.format) {
case 0:
......@@ -3590,7 +3433,7 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
/* Look for MLOG_CHECKPOINT. */
recv_group_scan_log_recs(checkpoint_lsn, &contiguous_lsn, false);
/* The first scan should not have stored or applied any records. */
ut_ad(recv_sys.n_addrs == 0);
ut_ad(recv_sys.pages.empty());
ut_ad(!recv_sys.found_corrupt_fs);
if (srv_read_only_mode && recv_needed_recovery) {
......@@ -3740,7 +3583,7 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
}
}
} else {
ut_ad(!rescan || recv_sys.n_addrs == 0);
ut_ad(!rescan || recv_sys.pages.empty());
}
if (log_sys.log.scanned_lsn < checkpoint_lsn
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment