Commit 1d30b7b1 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12699 preparation: Clean up recv_sys

The recv_sys data structures are accessed not only from the thread
that executes InnoDB plugin initialization, but also from the
InnoDB I/O threads, which can invoke recv_recover_page().

Assert that sufficient concurrency control is in place.
Some code was accessing recv_sys data structures without
holding recv_sys->mutex.

recv_recover_page(bpage): Refactor the call from buf_page_io_complete()
into a separate function that performs necessary steps. The
main thread was unnecessarily releasing and reacquiring recv_sys->mutex.

recv_recover_page(block,mtr,recv_addr): Pass more parameters from
the caller. Avoid redundant lookups and computations. Eliminate some
redundant variables.

recv_get_fil_addr_struct(): Assert that recv_sys->mutex is being held.
That was not always the case!

recv_scan_log_recs(): Acquire recv_sys->mutex for the whole duration
of the function. (While we are scanning and buffering redo log records,
no pages can be read in.)

recv_read_in_area(): Properly protect access with recv_sys->mutex.

recv_apply_hashed_log_recs(): Check recv_addr->state only once,
and continuously hold recv_sys->mutex. The mutex will be released
and reacquired inside recv_recover_page() and recv_read_in_area(),
allowing concurrent processing by buf_page_io_complete() in I/O threads.
parent aa3f7a10
......@@ -2752,8 +2752,13 @@ static bool xtrabackup_copy_logfile(bool last = false)
my_sleep(1000);
}
start_lsn = (lsn == start_lsn)
? 0 : xtrabackup_copy_log(start_lsn, lsn, last);
if (lsn == start_lsn) {
start_lsn = 0;
} else {
mutex_enter(&recv_sys->mutex);
start_lsn = xtrabackup_copy_log(start_lsn, lsn, last);
mutex_exit(&recv_sys->mutex);
}
log_mutex_exit();
......
......@@ -6057,9 +6057,7 @@ buf_page_io_complete(buf_page_t* bpage, bool dblwr, bool evict)
page_not_corrupt: bpage = bpage; );
if (recv_recovery_is_on()) {
/* Pages must be uncompressed for crash recovery. */
ut_a(uncompressed);
recv_recover_page(TRUE, (buf_block_t*) bpage);
recv_recover_page(bpage);
}
/* If space is being truncated then avoid ibuf operation.
......@@ -6079,7 +6077,7 @@ buf_page_io_complete(buf_page_t* bpage, bool dblwr, bool evict)
<< " encrypted. However key "
"management plugin or used "
<< "key_version " << key_version
<< "is not found or"
<< " is not found or"
" used encryption algorithm or method does not match."
" Can't continue opening the table.";
} else {
......
......@@ -49,12 +49,9 @@ dberr_t
recv_find_max_checkpoint(ulint* max_field)
MY_ATTRIBUTE((nonnull, warn_unused_result));
/** Apply the hashed log records to the page, if the page lsn is less than the
lsn of a log record.
@param just_read_in whether the page recently arrived to the I/O handler
@param block the page in the buffer pool */
void
recv_recover_page(bool just_read_in, buf_block_t* block);
/** Apply any buffered redo log to a page that was just read from a data file.
@param[in,out] bpage buffer pool page */
ATTRIBUTE_COLD void recv_recover_page(buf_page_t* bpage);
/** Start recovering from a redo log checkpoint.
@see recv_recovery_from_checkpoint_finish
......
......@@ -1766,6 +1766,8 @@ recv_get_fil_addr_struct(
ulint space, /*!< in: space id */
ulint page_no)/*!< in: page number */
{
ut_ad(mutex_own(&recv_sys->mutex));
recv_addr_t* recv_addr;
for (recv_addr = static_cast<recv_addr_t*>(
......@@ -1839,10 +1841,6 @@ recv_add_to_hash_table(
HASH_INSERT(recv_addr_t, addr_hash, recv_sys->addr_hash,
recv_fold(space, page_no), recv_addr);
recv_sys->n_addrs++;
#if 0
fprintf(stderr, "Inserting log rec for space %lu, page %lu\n",
space, page_no);
#endif
}
UT_LIST_ADD_LAST(recv_addr->rec_list, recv);
......@@ -1911,151 +1909,71 @@ recv_data_copy_to_buf(
/** Apply the hashed log records to the page, if the page lsn is less than the
lsn of a log record.
@param just_read_in whether the page recently arrived to the I/O handler
@param block the page in the buffer pool */
void
recv_recover_page(bool just_read_in, buf_block_t* block)
@param[in,out] block buffer pool page
@param[in,out] mtr mini-transaction
@param[in,out] recv_addr recovery address */
static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
recv_addr_t* recv_addr)
{
page_t* page;
page_zip_des_t* page_zip;
recv_addr_t* recv_addr;
recv_t* recv;
byte* buf;
lsn_t start_lsn;
lsn_t end_lsn;
lsn_t page_lsn;
lsn_t page_newest_lsn;
ibool modification_to_page;
mtr_t mtr;
mutex_enter(&(recv_sys->mutex));
if (recv_sys->apply_log_recs == FALSE) {
/* Log records should not be applied now */
mutex_exit(&(recv_sys->mutex));
return;
}
recv_addr = recv_get_fil_addr_struct(block->page.id.space(),
block->page.id.page_no());
if ((recv_addr == NULL)
|| (recv_addr->state == RECV_BEING_PROCESSED)
|| (recv_addr->state == RECV_PROCESSED)) {
ut_ad(recv_addr == NULL || recv_needed_recovery);
mutex_exit(&(recv_sys->mutex));
return;
}
ut_ad(mutex_own(&recv_sys->mutex));
ut_ad(recv_sys->apply_log_recs);
ut_ad(recv_needed_recovery);
ut_ad(recv_addr->state != RECV_BEING_PROCESSED);
ut_ad(recv_addr->state != RECV_PROCESSED);
if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
fprintf(stderr, "Applying log to page %u:%u\n",
recv_addr->space, recv_addr->page_no);
}
DBUG_PRINT("ib_log",
("Applying log to page %u:%u",
recv_addr->space, recv_addr->page_no));
DBUG_LOG("ib_log", "Applying log to page " << block->page.id);
recv_addr->state = RECV_BEING_PROCESSED;
mutex_exit(&(recv_sys->mutex));
mtr_start(&mtr);
mtr_set_log_mode(&mtr, MTR_LOG_NONE);
mutex_exit(&recv_sys->mutex);
page = block->frame;
page_zip = buf_block_get_page_zip(block);
if (just_read_in) {
/* Move the ownership of the x-latch on the page to
this OS thread, so that we can acquire a second
x-latch on it. This is needed for the operations to
the page to pass the debug checks. */
rw_lock_x_lock_move_ownership(&block->lock);
/* The page may have been modified in the buffer pool.
FIL_PAGE_LSN would only be updated right before flushing. */
lsn_t page_lsn = buf_page_get_newest_modification(&block->page);
if (!page_lsn) {
page_lsn = mach_read_from_8(page + FIL_PAGE_LSN);
}
ibool success = buf_page_get_known_nowait(
RW_X_LATCH, block, BUF_KEEP_OLD,
__FILE__, __LINE__, &mtr);
ut_a(success);
lsn_t start_lsn = 0, end_lsn = 0;
buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
/* Read the newest modification lsn from the page */
page_lsn = mach_read_from_8(page + FIL_PAGE_LSN);
/* It may be that the page has been modified in the buffer
pool: read the newest modification lsn there */
page_newest_lsn = buf_page_get_newest_modification(&block->page);
if (page_newest_lsn) {
page_lsn = page_newest_lsn;
if (srv_is_tablespace_truncated(recv_addr->space)) {
/* The table will be truncated after applying
normal redo log records. */
goto skip_log;
}
modification_to_page = FALSE;
start_lsn = end_lsn = 0;
recv = UT_LIST_GET_FIRST(recv_addr->rec_list);
while (recv) {
for (recv_t* recv = UT_LIST_GET_FIRST(recv_addr->rec_list);
recv; recv = UT_LIST_GET_NEXT(rec_list, recv)) {
ut_ad(recv->start_lsn);
end_lsn = recv->end_lsn;
ut_ad(end_lsn <= log_sys->log.scanned_lsn);
if (recv->len > RECV_DATA_BLOCK_SIZE) {
/* We have to copy the record body to a separate
buffer */
buf = static_cast<byte*>(ut_malloc_nokey(recv->len));
recv_data_copy_to_buf(buf, recv);
if (recv->start_lsn < page_lsn) {
/* Ignore this record, because there are later changes
for this page. */
} else if (srv_was_tablespace_truncated(
fil_space_get(recv_addr->space))
&& recv->start_lsn
< truncate_t::get_truncated_tablespace_init_lsn(
recv_addr->space)) {
/* If per-table tablespace was truncated and
there exist REDO records before truncate that
are to be applied as part of recovery
(checkpoint didn't happen since truncate was
done) skip such records using lsn check as
they may not stand valid post truncate. */
} else {
buf = ((byte*)(recv->data)) + sizeof(recv_data_t);
}
/* If per-table tablespace was truncated and there exist REDO
records before truncate that are to be applied as part of
recovery (checkpoint didn't happen since truncate was done)
skip such records using lsn check as they may not stand valid
post truncate.
LSN at start of truncate is recorded and any redo record
with LSN less than recorded LSN is skipped.
Note: We can't skip complete recv_addr as same page may have
valid REDO records post truncate those needs to be applied. */
bool skip_recv = false;
if (srv_was_tablespace_truncated(fil_space_get(recv_addr->space))) {
lsn_t init_lsn =
truncate_t::get_truncated_tablespace_init_lsn(
recv_addr->space);
skip_recv = (recv->start_lsn < init_lsn);
}
/* Ignore applying the redo logs for tablespace that is
truncated. Post recovery there is fixup action that will
restore the tablespace back to normal state.
Applying redo at this stage can result in error given that
redo will have action recorded on page before tablespace
was re-inited and that would lead to an error while applying
such action. */
if (recv->start_lsn >= page_lsn
&& !srv_is_tablespace_truncated(recv_addr->space)
&& !skip_recv) {
lsn_t end_lsn;
if (!modification_to_page) {
modification_to_page = TRUE;
if (!start_lsn) {
start_lsn = recv->start_lsn;
}
......@@ -2066,50 +1984,54 @@ recv_recover_page(bool just_read_in, buf_block_t* block)
recv_addr->space, recv_addr->page_no);
}
DBUG_PRINT("ib_log",
("apply " LSN_PF ":"
" %s len " ULINTPF " page %u:%u",
recv->start_lsn,
get_mlog_string(recv->type), recv->len,
recv_addr->space,
recv_addr->page_no));
DBUG_LOG("ib_log", "apply " << recv->start_lsn << ": "
<< get_mlog_string(recv->type)
<< " len " << recv->len
<< " page " << block->page.id);
byte* buf;
if (recv->len > RECV_DATA_BLOCK_SIZE) {
/* We have to copy the record body to
a separate buffer */
buf = static_cast<byte*>
(ut_malloc_nokey(recv->len));
recv_data_copy_to_buf(buf, recv);
} else {
buf = reinterpret_cast<byte*>(recv->data)
+ sizeof *recv->data;
}
recv_parse_or_apply_log_rec_body(
recv->type, buf, buf + recv->len,
recv_addr->space, recv_addr->page_no,
true, block, &mtr);
block->page.id.space(),
block->page.id.page_no(), true, block, &mtr);
end_lsn = recv->start_lsn + recv->len;
mach_write_to_8(FIL_PAGE_LSN + page, end_lsn);
mach_write_to_8(UNIV_PAGE_SIZE
mach_write_to_8(srv_page_size
- FIL_PAGE_END_LSN_OLD_CHKSUM
+ page, end_lsn);
if (page_zip) {
mach_write_to_8(FIL_PAGE_LSN
+ page_zip->data, end_lsn);
mach_write_to_8(FIL_PAGE_LSN + page_zip->data,
end_lsn);
}
}
if (recv->len > RECV_DATA_BLOCK_SIZE) {
ut_free(buf);
if (recv->len > RECV_DATA_BLOCK_SIZE) {
ut_free(buf);
}
}
recv = UT_LIST_GET_NEXT(rec_list, recv);
}
skip_log:
#ifdef UNIV_ZIP_DEBUG
if (fil_page_index_page_check(page)) {
page_zip_des_t* page_zip = buf_block_get_page_zip(block);
ut_a(!page_zip
|| page_zip_validate_low(page_zip, page, NULL, FALSE));
}
ut_ad(!fil_page_index_page_check(page)
|| !page_zip
|| page_zip_validate_low(page_zip, page, NULL, FALSE));
#endif /* UNIV_ZIP_DEBUG */
if (modification_to_page) {
ut_a(block);
if (start_lsn) {
log_flush_order_mutex_enter();
buf_flush_recv_note_modification(block, start_lsn, end_lsn);
log_flush_order_mutex_exit();
......@@ -2119,8 +2041,7 @@ recv_recover_page(bool just_read_in, buf_block_t* block)
lsn values of page */
mtr.discard_modifications();
mtr_commit(&mtr);
mtr.commit();
ib_time_t time = ut_time();
......@@ -2130,6 +2051,7 @@ recv_recover_page(bool just_read_in, buf_block_t* block)
recv_max_page_lsn = page_lsn;
}
ut_ad(recv_addr->state == RECV_BEING_PROCESSED);
recv_addr->state = RECV_PROCESSED;
ut_a(recv_sys->n_addrs > 0);
......@@ -2140,52 +2062,76 @@ recv_recover_page(bool just_read_in, buf_block_t* block)
INNODB_EXTEND_TIMEOUT_INTERVAL, "To recover: " ULINTPF " pages from log", n);
}
}
}
/** Apply any buffered redo log to a page that was just read from a data file.
@param[in,out] bpage buffer pool page */
void recv_recover_page(buf_page_t* bpage)
{
mtr_t mtr;
mtr.start();
mtr.set_log_mode(MTR_LOG_NONE);
ut_ad(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
buf_block_t* block = reinterpret_cast<buf_block_t*>(bpage);
/* Move the ownership of the x-latch on the page to
this OS thread, so that we can acquire a second
x-latch on it. This is needed for the operations to
the page to pass the debug checks. */
rw_lock_x_lock_move_ownership(&block->lock);
buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
ibool success = buf_page_get_known_nowait(
RW_X_LATCH, block, BUF_KEEP_OLD,
__FILE__, __LINE__, &mtr);
ut_a(success);
mutex_enter(&recv_sys->mutex);
if (!recv_sys->apply_log_recs) {
} else if (recv_addr_t* recv_addr = recv_get_fil_addr_struct(
bpage->id.space(), bpage->id.page_no())) {
switch (recv_addr->state) {
case RECV_BEING_PROCESSED:
case RECV_PROCESSED:
break;
default:
recv_recover_page(block, mtr, recv_addr);
goto func_exit;
}
}
mtr.commit();
func_exit:
mutex_exit(&recv_sys->mutex);
ut_ad(mtr.has_committed());
}
/** Reads in pages which have hashed log records, from an area around a given
page number.
@param[in] page_id page id
@return number of pages found */
static ulint recv_read_in_area(const page_id_t page_id)
@param[in] page_id page id */
static void recv_read_in_area(const page_id_t page_id)
{
recv_addr_t* recv_addr;
ulint page_nos[RECV_READ_AHEAD_AREA];
ulint low_limit;
ulint n;
low_limit = page_id.page_no()
ulint page_no = page_id.page_no()
- (page_id.page_no() % RECV_READ_AHEAD_AREA);
ulint* p = page_nos;
n = 0;
for (ulint page_no = low_limit;
page_no < low_limit + RECV_READ_AHEAD_AREA;
page_no++) {
recv_addr = recv_get_fil_addr_struct(page_id.space(), page_no);
const page_id_t cur_page_id(page_id.space(), page_no);
if (recv_addr && !buf_page_peek(cur_page_id)) {
mutex_enter(&(recv_sys->mutex));
if (recv_addr->state == RECV_NOT_PROCESSED) {
recv_addr->state = RECV_BEING_READ;
page_nos[n] = page_no;
n++;
}
mutex_exit(&(recv_sys->mutex));
for (const ulint up_limit = page_no + RECV_READ_AHEAD_AREA;
page_no < up_limit; page_no++) {
recv_addr_t* recv_addr = recv_get_fil_addr_struct(
page_id.space(), page_no);
if (recv_addr
&& recv_addr->state == RECV_NOT_PROCESSED
&& !buf_page_peek(page_id_t(page_id.space(), page_no))) {
recv_addr->state = RECV_BEING_READ;
*p++ = page_no;
}
}
buf_read_recv_pages(FALSE, page_id.space(), page_nos, n);
return(n);
mutex_exit(&recv_sys->mutex);
buf_read_recv_pages(FALSE, page_id.space(), page_nos,
ulint(p - page_nos));
mutex_enter(&recv_sys->mutex);
}
/** Apply the hash table of stored log records to persistent data pages.
......@@ -2238,49 +2184,55 @@ void recv_apply_hashed_log_recs(bool last_batch)
}
}
mtr_t mtr;
for (ulint i = 0; i < hash_get_n_cells(recv_sys->addr_hash); i++) {
for (recv_addr_t* recv_addr = static_cast<recv_addr_t*>(
HASH_GET_FIRST(recv_sys->addr_hash, i));
recv_addr;
recv_addr = static_cast<recv_addr_t*>(
HASH_GET_NEXT(addr_hash, recv_addr))) {
if (srv_is_tablespace_truncated(recv_addr->space)) {
/* Avoid applying REDO log for the tablespace
that is schedule for TRUNCATE. */
if (!UT_LIST_GET_LEN(recv_addr->rec_list)) {
ignore:
ut_a(recv_sys->n_addrs);
recv_addr->state = RECV_DISCARDED;
recv_sys->n_addrs--;
continue;
}
if (recv_addr->state == RECV_DISCARDED
|| !UT_LIST_GET_LEN(recv_addr->rec_list)) {
ut_a(recv_sys->n_addrs);
recv_sys->n_addrs--;
switch (recv_addr->state) {
case RECV_BEING_READ:
case RECV_BEING_PROCESSED:
case RECV_PROCESSED:
continue;
case RECV_DISCARDED:
goto ignore;
case RECV_NOT_PROCESSED:
break;
}
if (srv_is_tablespace_truncated(recv_addr->space)) {
/* Avoid applying REDO log for the tablespace
that is schedule for TRUNCATE. */
recv_addr->state = RECV_DISCARDED;
goto ignore;
}
const page_id_t page_id(recv_addr->space,
recv_addr->page_no);
if (recv_addr->state == RECV_NOT_PROCESSED) {
mutex_exit(&recv_sys->mutex);
mtr_t mtr;
mtr.start();
if (buf_block_t* block = buf_page_get_gen(
page_id, univ_page_size,
RW_X_LATCH, NULL,
BUF_GET_IF_IN_POOL,
__FILE__, __LINE__, &mtr, NULL)) {
buf_block_dbg_add_level(
block, SYNC_NO_ORDER_CHECK);
recv_recover_page(FALSE, block);
} else {
recv_read_in_area(page_id);
}
mtr.start();
mtr.set_log_mode(MTR_LOG_NONE);
if (buf_block_t* block = buf_page_get_gen(
page_id, univ_page_size, RW_X_LATCH,
NULL, BUF_GET_IF_IN_POOL,
__FILE__, __LINE__, &mtr, NULL)) {
buf_block_dbg_add_level(
block, SYNC_NO_ORDER_CHECK);
recv_recover_page(block, mtr, recv_addr);
ut_ad(mtr.has_committed());
} else {
mtr.commit();
mutex_enter(&recv_sys->mutex);
recv_read_in_area(page_id);
}
}
}
......@@ -2557,6 +2509,7 @@ bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply)
byte* body;
ut_ad(log_mutex_own());
ut_ad(mutex_own(&recv_sys->mutex));
ut_ad(recv_sys->parse_start_lsn != 0);
loop:
ptr = recv_sys->buf + recv_sys->recovered_offset;
......@@ -3125,6 +3078,8 @@ recv_scan_log_recs(
*group_scanned_lsn = scanned_lsn;
mutex_enter(&recv_sys->mutex);
if (more_data && !recv_sys->found_corrupt_log) {
/* Try to parse more log records */
......@@ -3134,7 +3089,8 @@ recv_scan_log_recs(
|| recv_sys->found_corrupt_fs
|| recv_sys->mlog_checkpoint_lsn
== recv_sys->recovered_lsn);
return(true);
finished = true;
goto func_exit;
}
if (*store_to_hash != STORE_NO
......@@ -3155,6 +3111,8 @@ recv_scan_log_recs(
}
}
func_exit:
mutex_exit(&recv_sys->mutex);
return(finished);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment