Commit 4179f93d authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-18976 Implement OPT_PAGE_CHECKSUM log record for improved validation

We will introduce an optional log record OPT_PAGE_CHECKSUM for recording
page checksums, so that more inconsistencies on crash recovery may be
caught.

mtr_t::page_checksum(const buf_page_t&): Write OPT_PAGE_CHECKSUM
(currently not for ROW_FORMAT=COMPRESSED pages).

mtr_t::do_write(): Write OPT_PAGE_CHECKSUM records for all pages
(currently, in debug builds only).

mtr_t::is_logged(): Return whether log should be written.

mtr_t::set_log_mode_sub(const mtr_t&): Set the logging mode of
a sub-minitransaction when another mini-transaction is holding
latches on some modified pages. When creating or freeing BLOB pages,
we may only write OPT_PAGE_CHECKSUM records in the main mini-transaction,
after all changes have been written to the log.

MTR_LOG_SUB: Log mode for a sub-mini-transaction.

mtr_t::free(): Define non-inline, and invoke MarkFreed.

MarkFreed: For any matching page in the mini-transaction log,
change the first entry to say MTR_MEMO_PAGE_X_MODIFY and any subsequent
entries to MTR_MEMO_PAGE_X_FIX.

FindModified: Simplify a condition. MTR_MEMO_MODIFY can only be set
if MTR_MEMO_PAGE_X_FIX or MTR_MEMO_PAGE_SX_FIX are set.

FindBlockX: Consider also MTR_MEMO_PAGE_X_MODIFY.

recv_sys_t::parse(): Store OPT_PAGE_CHECKSUM records.

log_phys_t::apply(): Validate OPT_PAGE_CHECKSUM records.

log_phys_t::page_checksum(): Validate an OPT_PAGE_CHECKSUM record.

Tested by: Matthias Leich
parent cc4eabc7
......@@ -6943,7 +6943,7 @@ btr_store_big_rec_extern_fields(
mtr.start();
index->set_modified(mtr);
mtr.set_log_mode(btr_mtr->get_log_mode());
mtr.set_log_mode_sub(*btr_mtr);
mtr.memo_push(rec_block, MTR_MEMO_PAGE_X_FIX);
rec_block->page.fix();
......@@ -7287,7 +7287,7 @@ btr_free_externally_stored_field(
mtr.start();
mtr.set_spaces(*local_mtr);
mtr.set_log_mode(local_mtr->get_log_mode());
mtr.set_log_mode_sub(*local_mtr);
ut_ad(!index->table->is_temporary()
|| local_mtr->get_log_mode() == MTR_LOG_NO_REDO);
......
......@@ -1483,7 +1483,7 @@ inline void mtr_t::log_file_op(mfile_type_t type, ulint space_id,
ut_ad(!strcmp(&path[strlen(path) - strlen(DOT_IBD)], DOT_IBD));
flag_modified();
if (m_log_mode != MTR_LOG_ALL)
if (!is_logged())
return;
m_last= nullptr;
......
......@@ -475,26 +475,20 @@ updating an allocation bitmap page.
@param[in] mtr mini-transaction */
void fil_space_t::modify_check(const mtr_t& mtr) const
{
switch (mtr.get_log_mode()) {
case MTR_LOG_NONE:
/* These modes are only allowed within a non-bitmap page
when there is a higher-level redo log record written. */
ut_ad(purpose == FIL_TYPE_TABLESPACE
|| purpose == FIL_TYPE_TEMPORARY);
break;
case MTR_LOG_NO_REDO:
ut_ad(purpose == FIL_TYPE_TEMPORARY
|| purpose == FIL_TYPE_IMPORT);
return;
case MTR_LOG_ALL:
/* We may only write redo log for a persistent
tablespace. */
ut_ad(purpose == FIL_TYPE_TABLESPACE);
ut_ad(mtr.is_named_space(id));
return;
}
ut_ad("invalid log mode" == 0);
switch (mtr.get_log_mode()) {
case MTR_LOG_NONE:
/* These modes are only allowed within a non-bitmap page
when there is a higher-level redo log record written. */
ut_ad(purpose == FIL_TYPE_TABLESPACE || purpose == FIL_TYPE_TEMPORARY);
break;
case MTR_LOG_NO_REDO:
ut_ad(purpose == FIL_TYPE_TEMPORARY || purpose == FIL_TYPE_IMPORT);
break;
default:
/* We may only write redo log for a persistent tablespace. */
ut_ad(purpose == FIL_TYPE_TABLESPACE);
ut_ad(mtr.is_named_space(id));
}
}
#endif
......
......@@ -24,8 +24,7 @@ The database buffer pool high-level routines
Created 11/5/1995 Heikki Tuuri
*******************************************************/
#ifndef buf0buf_h
#define buf0buf_h
#pragma once
/** Magic value to use instead of checksums when they are disabled */
#define BUF_NO_CHECKSUM_MAGIC 0xDEADBEEFUL
......@@ -2201,5 +2200,3 @@ struct CheckUnzipLRUAndLRUList {
#include "buf0buf.inl"
#endif /* !UNIV_INNOCHECKSUM */
#endif
......@@ -1278,8 +1278,9 @@ struct fil_addr_t {
/** For the first page in a system tablespace data file(ibdata*, not *.ibd):
the file has been flushed to disk at least up to this lsn
For other pages: 32-bit key version used to encrypt the page + 32-bit checksum
or 64 bites of zero if no encryption */
For other pages of tablespaces not in innodb_checksum_algorithm=full_crc32
format: 32-bit key version used to encrypt the page + 32-bit checksum
or 64 bits of zero if no encryption */
#define FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION 26U
/** This overloads FIL_PAGE_FILE_FLUSH_LSN for RTREE Split Sequence Number */
......
......@@ -196,7 +196,7 @@ inline bool mtr_t::write(const buf_block_t &block, void *ptr, V val)
}
byte *p= static_cast<byte*>(ptr);
const byte *const end= p + l;
if (w != FORCED && m_log_mode == MTR_LOG_ALL)
if (w != FORCED && is_logged())
{
const byte *b= buf;
while (*p++ == *b++)
......@@ -224,7 +224,7 @@ inline void mtr_t::memset(const buf_block_t &b, ulint ofs, ulint len, byte val)
{
ut_ad(len);
set_modified(b);
if (m_log_mode != MTR_LOG_ALL)
if (!is_logged())
return;
static_assert(MIN_4BYTE > UNIV_PAGE_SIZE_MAX, "consistency");
......@@ -261,7 +261,7 @@ inline void mtr_t::memset(const buf_block_t &b, ulint ofs, size_t len,
ut_ad(size);
ut_ad(len > size); /* use mtr_t::memcpy() for shorter writes */
set_modified(b);
if (m_log_mode != MTR_LOG_ALL)
if (!is_logged())
return;
static_assert(MIN_4BYTE > UNIV_PAGE_SIZE_MAX, "consistency");
......@@ -319,7 +319,7 @@ inline void mtr_t::memcpy_low(const buf_block_t &block, uint16_t offset,
{
ut_ad(len);
set_modified(block);
if (m_log_mode != MTR_LOG_ALL)
if (!is_logged())
return;
if (len < mtr_buf_t::MAX_DATA_SIZE - (1 + 3 + 3 + 5 + 5))
{
......@@ -354,7 +354,7 @@ inline void mtr_t::memmove(const buf_block_t &b, ulint d, ulint s, ulint len)
ut_ad(d + len <= ulint(srv_page_size));
set_modified(b);
if (m_log_mode != MTR_LOG_ALL)
if (!is_logged())
return;
static_assert(MIN_4BYTE > UNIV_PAGE_SIZE_MAX, "consistency");
size_t lenlen= (len < MIN_2BYTE ? 1 : len < MIN_3BYTE ? 2 : 3);
......@@ -387,7 +387,7 @@ template<byte type>
inline byte *mtr_t::log_write(const page_id_t id, const buf_page_t *bpage,
size_t len, bool alloc, size_t offset)
{
static_assert(!(type & 15) && type != RESERVED && type != OPTION &&
static_assert(!(type & 15) && type != RESERVED &&
type <= FILE_CHECKPOINT, "invalid type");
ut_ad(type >= FILE_CREATE || is_named_space(id.space()));
ut_ad(!bpage || bpage->id() == id);
......@@ -491,7 +491,7 @@ inline void mtr_t::memcpy(const buf_block_t &b, void *dest, const void *str,
ut_ad(ut_align_down(dest, srv_page_size) == b.page.frame);
char *d= static_cast<char*>(dest);
const char *s= static_cast<const char*>(str);
if (w != FORCED && m_log_mode == MTR_LOG_ALL)
if (w != FORCED && is_logged())
{
ut_ad(len);
const char *const end= d + len;
......@@ -531,35 +531,20 @@ inline void mtr_t::init(buf_block_t *b)
b->page.set_reinit(b->page.state() & buf_page_t::LRU_MASK);
if (m_log_mode != MTR_LOG_ALL)
{
ut_ad(m_log_mode == MTR_LOG_NONE || m_log_mode == MTR_LOG_NO_REDO);
if (!is_logged())
return;
}
m_log.close(log_write<INIT_PAGE>(b->page.id(), &b->page));
m_last_offset= FIL_PAGE_TYPE;
}
/** Free a page.
@param[in] space tablespace contains page to be freed
@param[in] offset page offset to be freed */
inline void mtr_t::free(fil_space_t &space, uint32_t offset)
{
ut_ad(is_named_space(&space));
ut_ad(!m_freed_space || m_freed_space == &space);
if (m_log_mode == MTR_LOG_ALL)
m_log.close(log_write<FREE_PAGE>({space.id, offset}, nullptr));
}
/** Write an EXTENDED log record.
@param block buffer pool page
@param type extended record subtype; @see mrec_ext_t */
inline void mtr_t::log_write_extended(const buf_block_t &block, byte type)
{
set_modified(block);
if (m_log_mode != MTR_LOG_ALL)
if (!is_logged())
return;
byte *l= log_write<EXTENDED>(block.page.id(), &block.page, 1, true);
*l++= type;
......@@ -586,7 +571,7 @@ inline void mtr_t::page_delete(const buf_block_t &block, ulint prev_rec)
ut_ad(!block.zip_size());
ut_ad(prev_rec < block.physical_size());
set_modified(block);
if (m_log_mode != MTR_LOG_ALL)
if (!is_logged())
return;
size_t len= (prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4);
byte *l= log_write<EXTENDED>(block.page.id(), &block.page, len, true);
......@@ -613,7 +598,7 @@ inline void mtr_t::page_delete(const buf_block_t &block, ulint prev_rec,
ut_ad(hdr_size < MIN_3BYTE);
ut_ad(prev_rec < block.physical_size());
ut_ad(data_size < block.physical_size());
if (m_log_mode != MTR_LOG_ALL)
if (!is_logged())
return;
size_t len= prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4;
len+= hdr_size < MIN_2BYTE ? 1 : 2;
......@@ -645,7 +630,7 @@ inline void mtr_t::undo_append(const buf_block_t &block,
{
ut_ad(len > 2);
set_modified(block);
if (m_log_mode != MTR_LOG_ALL)
if (!is_logged())
return;
const bool small= len + 1 < mtr_buf_t::MAX_DATA_SIZE - (1 + 3 + 3 + 5 + 5);
byte *end= log_write<EXTENDED>(block.page.id(), &block.page, len + 1, small);
......@@ -668,7 +653,7 @@ inline void mtr_t::undo_append(const buf_block_t &block,
@param id first page identifier that will not be in the file */
inline void mtr_t::trim_pages(const page_id_t id)
{
if (m_log_mode != MTR_LOG_ALL)
if (!is_logged())
return;
byte *l= log_write<EXTENDED>(id, nullptr, 1, true);
*l++= TRIM_PAGES;
......
......@@ -136,10 +136,18 @@ struct mtr_t {
mtr_log_t get_log_mode() const
{
static_assert(MTR_LOG_ALL == 0, "efficiency");
ut_ad(m_log_mode <= MTR_LOG_NO_REDO);
return static_cast<mtr_log_t>(m_log_mode);
}
/** @return whether log is to be written for changes */
bool is_logged() const
{
static_assert(MTR_LOG_ALL == 0, "efficiency");
static_assert(MTR_LOG_NONE & MTR_LOG_NO_REDO, "efficiency");
static_assert(!(MTR_LOG_NONE & MTR_LOG_SUB), "efficiency");
return !(m_log_mode & MTR_LOG_NONE);
}
/** Change the logging mode.
@param mode logging mode
@return old mode */
......@@ -150,6 +158,15 @@ struct mtr_t {
return old_mode;
}
/** Set the log mode of a sub-minitransaction
@param mtr parent mini-transaction */
void set_log_mode_sub(const mtr_t &mtr)
{
ut_ad(mtr.m_log_mode == MTR_LOG_ALL || mtr.m_log_mode == MTR_LOG_NO_REDO);
m_log_mode= mtr.m_log_mode | MTR_LOG_SUB;
static_assert((MTR_LOG_SUB | MTR_LOG_NO_REDO) == MTR_LOG_NO_REDO, "");
}
/** Check if we are holding a block latch in exclusive mode
@param block buffer pool block to search for */
bool have_x_latch(const buf_block_t &block) const;
......@@ -372,6 +389,9 @@ struct mtr_t {
/** @return whether the log and memo are empty */
bool is_empty() const { return m_memo.size() == 0 && m_log.size() == 0; }
/** Write an OPT_PAGE_CHECKSUM record. */
inline void page_checksum(const buf_page_t &bpage);
/** Write request types */
enum write_type
{
......@@ -470,9 +490,9 @@ struct mtr_t {
@param[in,out] b buffer page */
void init(buf_block_t *b);
/** Free a page.
@param[in] space tablespace contains page to be freed
@param[in] offset page offset to be freed */
inline void free(fil_space_t &space, uint32_t offset);
@param space tablespace
@param offset offset of the page to be freed */
void free(const fil_space_t &space, uint32_t offset);
/** Write log for partly initializing a B-tree or R-tree page.
@param block B-tree or R-tree page
@param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */
......
......@@ -41,6 +41,11 @@ enum mtr_log_t {
Set for attempting modification of a ROW_FORMAT=COMPRESSED page. */
MTR_LOG_NONE,
/** Log all operations, but do not write any OPT_PAGE_CHECKSUM
records because some of the modified pages were also modified
by another mini-transaction that did not write its log yet. */
MTR_LOG_SUB,
/** Don't generate REDO log but add dirty pages to flush list */
MTR_LOG_NO_REDO
};
......@@ -77,12 +82,8 @@ type. The following record types refer to data pages:
RESERVED (6): reserved for future use; a subtype code
(encoded immediately after the length) would be written
to reserve code space for further extensions
OPTION (7): optional record that may be ignored; a subtype code
(encoded immediately after the length) would distinguish actual
usage, such as:
* MDEV-18976 page checksum record
* binlog record
* SQL statement (at the start of statement)
OPTION (7): optional record that may be ignored; a subtype @see mrec_opt
(encoded immediately after the length) would distinguish actual usage
Bits 3..0 indicate the redo log record length, excluding the first
byte, but including additional length bytes and any other bytes,
......@@ -229,9 +230,7 @@ enum mrec_type_t
/** Reserved for future use. */
RESERVED= 0x60,
/** Optional record that may be ignored in crash recovery.
A subtype code will be encoded immediately after the length.
Possible subtypes would include a MDEV-18976 page checksum record,
a binlog record, or an SQL statement. */
A subtype (@see mrec_opt) will be encoded after the page identifier. */
OPTION= 0x70
};
......@@ -283,6 +282,15 @@ enum mrec_ext_t
};
/** Recognized OPTION record subtypes. */
enum mrec_opt
{
/** page checksum at the end of the mini-transaction */
OPT_PAGE_CHECKSUM= 0
/* Other possible subtypes: a binlog record, or an SQL statement. */
};
/** Redo log record types for file-level operations. These bit
patterns will be written to redo log files, so the existing codes or
their interpretation on crash recovery must not be changed. */
......
......@@ -54,6 +54,7 @@ Created 9/20/1997 Heikki Tuuri
#include "srv0srv.h"
#include "srv0start.h"
#include "fil0pagecompress.h"
#include "log.h"
/** The recovery system */
recv_sys_t recv_sys;
......@@ -86,7 +87,7 @@ is bigger than the lsn we are able to scan up to, that is an indication that
the recovery failed and the database may be corrupt. */
static lsn_t recv_max_page_lsn;
/** Stored physical log record with logical LSN (@see log_t::FORMAT_10_5) */
/** Stored physical log record */
struct log_phys_t : public log_rec_t
{
/** start LSN of the mini-transaction (not necessarily of this record) */
......@@ -178,6 +179,35 @@ struct log_phys_t : public log_rec_t
return false;
}
/** Check an OPT_PAGE_CHECKSUM record.
@see mtr_t::page_checksum()
@param block buffer page
@param l pointer to checksum
@return whether an unrecoverable mismatch was found */
static bool page_checksum(const buf_block_t &block, const byte *l)
{
size_t size;
const byte *page= block.page.zip.data;
if (UNIV_LIKELY_NULL(page))
size= (UNIV_ZIP_SIZE_MIN >> 1) << block.page.zip.ssize;
else
{
page= block.page.frame;
size= srv_page_size;
}
if (UNIV_LIKELY(my_crc32c(my_crc32c(my_crc32c(0, page + FIL_PAGE_OFFSET,
FIL_PAGE_LSN -
FIL_PAGE_OFFSET),
page + FIL_PAGE_TYPE, 2),
page + FIL_PAGE_SPACE_ID,
size - (FIL_PAGE_SPACE_ID + 8)) ==
mach_read_from_4(l)))
return false;
ib::error() << "OPT_PAGE_CHECKSUM mismatch on " << block.page.id();
return !srv_force_recovery;
}
/** The status of apply() */
enum apply_status {
/** The page was not affected */
......@@ -262,9 +292,21 @@ struct log_phys_t : public log_rec_t
next_not_same_page:
last_offset= 1; /* the next record must not be same_page */
}
next:
l+= rlen;
continue;
case OPTION:
ut_ad(rlen == 5);
ut_ad(*l == OPT_PAGE_CHECKSUM);
if (page_checksum(block, l + 1))
{
applied= APPLIED_YES;
page_corrupted:
sql_print_error("InnoDB: Set innodb_force_recovery=1"
" to ignore corruption.");
recv_sys.set_corrupt_log();
return applied;
}
goto next_after_applying;
}
ut_ad(mach_read_from_4(frame + FIL_PAGE_OFFSET) ==
......@@ -275,8 +317,6 @@ struct log_phys_t : public log_rec_t
ut_ad(last_offset <= size);
switch (b & 0x70) {
case OPTION:
goto next;
case EXTENDED:
if (UNIV_UNLIKELY(block.page.id().page_no() < 3 ||
block.page.zip.ssize))
......@@ -305,12 +345,7 @@ struct log_phys_t : public log_rec_t
if (UNIV_UNLIKELY(rlen <= 3))
goto record_corrupted;
if (undo_append(block, ++l, --rlen) && !srv_force_recovery)
{
page_corrupted:
ib::error() << "Set innodb_force_recovery=1 to ignore corruption.";
recv_sys.set_corrupt_log();
return applied;
}
goto page_corrupted;
break;
case INSERT_HEAP_REDUNDANT:
case INSERT_REUSE_REDUNDANT:
......@@ -2334,7 +2369,8 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply)
if (got_page_op)
{
const page_id_t id(space_id, page_no);
ut_d(if ((b & 0x70) == INIT_PAGE) freed.erase(id));
ut_d(if ((b & 0x70) == INIT_PAGE || (b & 0x70) == OPTION)
freed.erase(id));
ut_ad(freed.find(id) == freed.end());
switch (b & 0x70) {
case FREE_PAGE:
......@@ -2370,8 +2406,11 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply)
}
last_offset= FIL_PAGE_TYPE;
break;
case RESERVED:
case OPTION:
if (rlen == 5 && *l == OPT_PAGE_CHECKSUM)
break;
/* fall through */
case RESERVED:
continue;
case WRITE:
case MEMMOVE:
......@@ -2463,9 +2502,9 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply)
#if 0 && defined UNIV_DEBUG
switch (b & 0x70) {
case RESERVED:
case OPTION:
ut_ad(0); /* we did "continue" earlier */
break;
case OPTION:
case FREE_PAGE:
break;
default:
......
......@@ -375,8 +375,8 @@ struct ReleaseBlocks
return true;
}
buf_flush_note_modification(static_cast<buf_block_t*>(slot->object),
start, end);
buf_block_t *block= static_cast<buf_block_t*>(slot->object);
buf_flush_note_modification(block, start, end);
return true;
}
};
......@@ -436,7 +436,7 @@ void mtr_t::commit()
std::pair<lsn_t,page_flush_ahead> lsns;
if (UNIV_LIKELY(m_log_mode == MTR_LOG_ALL))
if (UNIV_LIKELY(is_logged()))
{
lsns= do_write();
......@@ -577,6 +577,7 @@ void mtr_t::commit_shrink(fil_space_t &space)
log_write_and_flush_prepare();
const lsn_t start_lsn= do_write().first;
ut_d(m_log.erase());
mysql_mutex_lock(&log_sys.flush_order_mutex);
/* Durably write the reduced FSP_SIZE before truncating the data file. */
......@@ -673,19 +674,9 @@ void mtr_t::commit_files(lsn_t checkpoint_lsn)
bool
mtr_t::is_named_space(ulint space) const
{
ut_ad(!m_user_space || m_user_space->id != TRX_SYS_SPACE);
switch (m_log_mode) {
case MTR_LOG_NONE:
case MTR_LOG_NO_REDO:
return(true);
case MTR_LOG_ALL:
return(m_user_space_id == space
|| is_predefined_tablespace(space));
}
ut_error;
return(false);
ut_ad(!m_user_space || m_user_space->id != TRX_SYS_SPACE);
return !is_logged() || m_user_space_id == space ||
is_predefined_tablespace(space);
}
/** Check if a tablespace is associated with the mini-transaction
(needed for generating a FILE_MODIFY record)
......@@ -695,16 +686,8 @@ bool mtr_t::is_named_space(const fil_space_t* space) const
{
ut_ad(!m_user_space || m_user_space->id != TRX_SYS_SPACE);
switch (m_log_mode) {
case MTR_LOG_NONE:
case MTR_LOG_NO_REDO:
return true;
case MTR_LOG_ALL:
return m_user_space == space || is_predefined_tablespace(space->id);
}
ut_error;
return false;
return !is_logged() || m_user_space == space ||
is_predefined_tablespace(space->id);
}
#endif /* UNIV_DEBUG */
......@@ -978,6 +961,68 @@ static mtr_t::page_flush_ahead log_close(lsn_t lsn)
return mtr_t::PAGE_FLUSH_SYNC;
}
inline void mtr_t::page_checksum(const buf_page_t &bpage)
{
const byte *page= bpage.frame;
size_t size= srv_page_size;
if (UNIV_LIKELY_NULL(bpage.zip.data))
{
size= (UNIV_ZIP_SIZE_MIN >> 1) << bpage.zip.ssize;
switch (fil_page_get_type(bpage.zip.data)) {
case FIL_PAGE_TYPE_ALLOCATED:
case FIL_PAGE_INODE:
case FIL_PAGE_IBUF_BITMAP:
case FIL_PAGE_TYPE_FSP_HDR:
case FIL_PAGE_TYPE_XDES:
/* These are essentially uncompressed pages. */
break;
default:
page= bpage.zip.data;
}
}
/* We have to exclude from the checksum the normal
page checksum that is written by buf_flush_init_for_writing()
and FIL_PAGE_LSN which would be updated once we have actually
allocated the LSN.
Unfortunately, we cannot access fil_space_t easily here. In order to
be compatible with encrypted tablespaces in the pre-full_crc32
format we will unconditionally exclude the 8 bytes at
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
a.k.a. FIL_RTREE_SPLIT_SEQ_NUM. */
const uint32_t checksum=
my_crc32c(my_crc32c(my_crc32c(0, page + FIL_PAGE_OFFSET,
FIL_PAGE_LSN - FIL_PAGE_OFFSET),
page + FIL_PAGE_TYPE, 2),
page + FIL_PAGE_SPACE_ID, size - (FIL_PAGE_SPACE_ID + 8));
byte *l= log_write<OPTION>(bpage.id(), nullptr, 5, true, 0);
*l++= OPT_PAGE_CHECKSUM;
mach_write_to_4(l, checksum);
m_log.close(l + 4);
}
/** Write OPT_PAGE_CHECKSUM records for modified pages */
struct WriteOPT_PAGE_CHECKSUM
{
mtr_t &mtr;
WriteOPT_PAGE_CHECKSUM(mtr_t &mtr) : mtr(mtr) {}
/** @return true always */
bool operator()(const mtr_memo_slot_t *slot) const
{
if (slot->type & MTR_MEMO_MODIFY)
{
const buf_page_t &b= static_cast<const buf_block_t*>(slot->object)->page;
if (!b.is_freed())
mtr.page_checksum(b);
}
return true;
}
};
/** Write the block contents to the REDO log */
struct mtr_write_log
{
......@@ -993,11 +1038,18 @@ struct mtr_write_log
std::pair<lsn_t,mtr_t::page_flush_ahead> mtr_t::do_write()
{
ut_ad(!recv_no_log_write);
ut_ad(m_log_mode == MTR_LOG_ALL);
ut_ad(is_logged());
ulint len = m_log.size();
ut_ad(len > 0);
#ifdef UNIV_DEBUG
if (m_log_mode == MTR_LOG_ALL) {
m_memo.for_each_block(CIterate<WriteOPT_PAGE_CHECKSUM>(*this));
len = m_log.size();
}
#endif
if (len > srv_log_buffer_size / 2) {
log_buffer_extend(ulong((len + 1) * 2));
}
......@@ -1033,7 +1085,7 @@ std::pair<lsn_t,mtr_t::page_flush_ahead> mtr_t::do_write()
@return {start_lsn,flush_ahead} */
inline std::pair<lsn_t,mtr_t::page_flush_ahead> mtr_t::finish_write(ulint len)
{
ut_ad(m_log_mode == MTR_LOG_ALL);
ut_ad(is_logged());
mysql_mutex_assert_owner(&log_sys.mutex);
ut_ad(m_log.size() == len);
ut_ad(len > 0);
......@@ -1074,7 +1126,7 @@ struct FindBlockX
/** @return whether the block was not found x-latched */
bool operator()(const mtr_memo_slot_t *slot) const
{
return slot->object != &block || slot->type != MTR_MEMO_PAGE_X_FIX;
return slot->object != &block || !(slot->type & MTR_MEMO_PAGE_X_FIX);
}
};
......@@ -1381,7 +1433,7 @@ mtr_t::memo_contains_page_flagged(
#endif /* UNIV_DEBUG */
/** Find a block, preferrably in MTR_MEMO_MODIFY state */
/** Find a potentially modified block. */
struct FindModified
{
mtr_memo_slot_t *found= nullptr;
......@@ -1393,8 +1445,7 @@ struct FindModified
if (slot->object != &block)
return true;
found= slot;
return !(slot->type & (MTR_MEMO_MODIFY |
MTR_MEMO_PAGE_X_FIX | MTR_MEMO_PAGE_SX_FIX));
return !(slot->type & (MTR_MEMO_PAGE_X_FIX | MTR_MEMO_PAGE_SX_FIX));
}
};
......@@ -1420,3 +1471,63 @@ void mtr_t::modify(const buf_block_t &block)
if (is_block_dirtied(&block))
m_made_dirty= true;
}
/** Handle an exclusively latched block that was later marked as freed. */
struct MarkFreed
{
const page_id_t id;
mutable buf_block_t *freed= nullptr;
MarkFreed(page_id_t id) : id(id) {}
bool operator()(mtr_memo_slot_t *slot) const
{
buf_block_t *block= static_cast<buf_block_t*>(slot->object);
if (!block);
else if (block == freed)
{
if (slot->type & (MTR_MEMO_PAGE_SX_FIX | MTR_MEMO_PAGE_X_FIX))
slot->type= MTR_MEMO_PAGE_X_FIX;
else
{
ut_ad(slot->type == MTR_MEMO_BUF_FIX);
block->page.unfix();
slot->object= nullptr;
}
}
else if (slot->type & (MTR_MEMO_PAGE_X_FIX | MTR_MEMO_PAGE_SX_FIX) &&
block->page.id() == id)
{
ut_ad(!block->page.is_freed());
ut_ad(!freed);
freed= block;
if (!(slot->type & MTR_MEMO_PAGE_X_FIX))
{
ut_d(bool upgraded=) block->page.lock.x_lock_upgraded();
ut_ad(upgraded);
}
slot->type= MTR_MEMO_PAGE_X_MODIFY;
#ifdef BTR_CUR_HASH_ADAPT
if (block->index)
btr_search_drop_page_hash_index(block);
#endif /* BTR_CUR_HASH_ADAPT */
block->page.set_freed(block->page.state());
}
return true;
}
};
/** Free a page.
@param space tablespace
@param offset offset of the page to be freed */
void mtr_t::free(const fil_space_t &space, uint32_t offset)
{
ut_ad(is_named_space(&space));
ut_ad(!m_freed_space || m_freed_space == &space);
if (is_logged())
{
m_memo.for_each_block_in_reverse
(CIterate<MarkFreed>((MarkFreed{{space.id, offset}})));
m_log.close(log_write<FREE_PAGE>({space.id, offset}, nullptr));
}
}
......@@ -1309,7 +1309,7 @@ page_cur_insert_rec_low(
ut_ad(!page_rec_is_supremum(cur->rec));
/* We should not write log for ROW_FORMAT=COMPRESSED pages here. */
ut_ad(mtr->get_log_mode() != MTR_LOG_ALL ||
ut_ad(!mtr->is_logged() ||
!(index->table->flags & DICT_TF_MASK_ZIP_SSIZE));
/* 1. Get the size of the physical record in the page */
......@@ -1509,7 +1509,7 @@ page_cur_insert_rec_low(
}
rec_set_bit_field_1(next_rec, n_owned + 1, REC_NEW_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
if (mtr->get_log_mode() != MTR_LOG_ALL)
if (!mtr->is_logged())
{
mtr->set_modified(*block);
goto copied;
......@@ -1551,7 +1551,7 @@ page_cur_insert_rec_low(
}
rec_set_bit_field_1(next_rec, n_owned + 1, REC_OLD_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
if (mtr->get_log_mode() != MTR_LOG_ALL)
if (!mtr->is_logged())
{
mtr->set_modified(*block);
goto copied;
......@@ -1572,7 +1572,7 @@ page_cur_insert_rec_low(
}
/* Insert the record, possibly copying from the preceding record. */
ut_ad(mtr->get_log_mode() == MTR_LOG_ALL);
ut_ad(mtr->is_logged());
{
const byte *r= rec;
......
......@@ -411,12 +411,8 @@ static void page_zip_compress_write_log(buf_block_t *block,
{
ut_ad(!index->is_ibuf());
if (mtr->get_log_mode() != MTR_LOG_ALL)
{
ut_ad(mtr->get_log_mode() == MTR_LOG_NONE ||
mtr->get_log_mode() == MTR_LOG_NO_REDO);
if (!mtr->is_logged())
return;
}
const page_t *page= block->page.frame;
const page_zip_des_t *page_zip= &block->page.zip;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment