Commit 1738c0f1 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-22169 Recovery fails after failing to insert into mlog_init

In a multi-batch recovery, we must ensure that INIT_PAGE and
especially the MDEV-15528 FREE_PAGE records will be taken
properly into account.

Writing a FREE_PAGE record gives the server permission to omit
a page write. If recovery insists on applying log to a page
whose page flush has been omitted, then the consistency checks
in the application of high-level redo log records (appending
an undo log record, inserting or deleting an index record)
will likely fail.

mlog_init_t::add(): Return whether the state was changed.

mlog_init_t::will_avoid_read(): Determine whether a page read
will be avoided and whether older log records can be safely
skipped.

recv_sys_t::parse(): Even if store==STORE_NO, process the records
INIT_PAGE and FREE_PAGE. While processing them, we can delete older
redo log records for the page. If store!=STORE_NO, we can directly
skip redo log recods of other types if mlog_init indicates that the
page will be freed or initialized by at a later LSN.

This fix was developed in cooperation with
Thirunarayanan Balathandayuthapani.
parent d848fcad
......@@ -107,11 +107,11 @@ struct log_phys_t : public log_rec_t
/** @return start of the log records */
byte *begin() { return reinterpret_cast<byte*>(&len + 1); }
/** @return start of the log records */
const byte *begin() const { return const_cast<log_phys_t*>(this)->begin(); }
/** @return end of the log records */
byte *end() { byte *e= begin() + len; ut_ad(!*e); return e; }
public:
/** @return start of the log records */
const byte *begin() const { return const_cast<log_phys_t*>(this)->begin(); }
/** @return end of the log records */
const byte *end() const { return const_cast<log_phys_t*>(this)->end(); }
......@@ -598,17 +598,19 @@ class mlog_init_t
public:
/** Record that a page will be initialized by the redo log.
@param[in] page_id page identifier
@param[in] lsn log sequence number */
void add(const page_id_t page_id, lsn_t lsn)
@param[in] lsn log sequence number
@return whether the state was changed */
bool add(const page_id_t page_id, lsn_t lsn)
{
ut_ad(mutex_own(&recv_sys.mutex));
const init init = { lsn, false };
std::pair<map::iterator, bool> p = inits.insert(
map::value_type(page_id, init));
ut_ad(!p.first->second.created);
if (!p.second && p.first->second.lsn < init.lsn) {
p.first->second = init;
}
if (p.second) return true;
if (p.first->second.lsn >= init.lsn) return false;
p.first->second = init;
return true;
}
/** Get the last stored lsn of the page id and its respective
......@@ -623,6 +625,17 @@ class mlog_init_t
return inits.find(page_id)->second;
}
/** Determine if a page will be initialized or freed after a time.
@param page_id page identifier
@param lsn log sequence number
@return whether page_id will be freed or initialized after lsn */
bool will_avoid_read(page_id_t page_id, lsn_t lsn) const
{
ut_ad(mutex_own(&recv_sys.mutex));
auto i= inits.find(page_id);
return i != inits.end() && i->second.lsn > lsn;
}
/** At the end of each recovery batch, reset the 'created' flags. */
void reset()
{
......@@ -1938,7 +1951,7 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t store, bool apply)
if (got_page_op)
{
ut_d(const page_id_t id(space_id, page_no));
const page_id_t id(space_id, page_no);
ut_d(if ((b & 0x70) == INIT_PAGE) freed.erase(id));
ut_ad(freed.find(id) == freed.end());
switch (b & 0x70) {
......@@ -2059,16 +2072,31 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t store, bool apply)
ut_ad(modified.emplace(id).second || (b & 0x70) != INIT_PAGE);
}
#endif
const bool is_init= (b & 0x70) <= INIT_PAGE;
switch (store) {
case STORE_NO:
continue;
case STORE_IF_EXISTS:
if (!fil_space_get_size(space_id))
continue;
/* fall through */
case STORE_YES:
add(page_id_t(space_id, page_no), start_lsn, end_lsn, recs,
static_cast<size_t>(l + rlen - recs));
if (is_init || !mlog_init.will_avoid_read(id, start_lsn))
add(id, start_lsn, end_lsn, recs,
static_cast<size_t>(l + rlen - recs));
continue;
case STORE_NO:
if (!is_init)
continue;
map::iterator i= pages.find(id);
if (i == pages.end())
continue;
if ((*static_cast<const log_phys_t*>(*i->second.log.begin())->begin() &
0x70) <= INIT_PAGE)
{
ut_ad(i->second.state == page_recv_t::RECV_WILL_NOT_READ);
continue;
}
pages.erase(i);
mlog_init.add(id, start_lsn);
}
}
#if 1 /* MDEV-14425 FIXME: this must be in the checkpoint file only! */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment