Commit 6d214415 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-21351: Free processed recv_sys_t::blocks

Release memory as soon as redo log records are processed.

Because the memory allocation and deallocation of parsed redo log
records must be protected by recv_sys.mutex, it is better to avoid
using a std::atomic field for bookkeeping.

buf_page_t::access_time: Keep track of the recv_sys.pages record
allocations. The most significant 16 bits will count allocated
blocks (which were previously counted by buf_page_t::buf_fix_count
in the debug version), and the least significant 16 bits indicate
the number of allocated bytes in the block (which was previously
managed in buf_block_t::modify_clock), which must be a positive
number, up to innodb_page_size. The byte offset 65536 is represented
as the value 0.

recv_recover_page(): Let the caller erase the log.

recv_validate_tablespace(): Acquire recv_sys_t::mutex.
parent d0c8316b
/***************************************************************************** /*****************************************************************************
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2013, 2019, MariaDB Corporation. Copyright (c) 2013, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -1568,7 +1568,17 @@ class buf_page_t { ...@@ -1568,7 +1568,17 @@ class buf_page_t {
unsigned access_time; /*!< time of first access, or unsigned access_time; /*!< time of first access, or
0 if the block was never accessed 0 if the block was never accessed
in the buffer pool. Protected by in the buffer pool. Protected by
block mutex */ block mutex for buf_page_in_file()
blocks.
For state==BUF_BLOCK_MEMORY
blocks, this field can be repurposed
for something else.
When this field counts log records
and bytes allocated for recv_sys.pages,
the field is protected by
recv_sys_t::mutex. */
# ifdef UNIV_DEBUG # ifdef UNIV_DEBUG
ibool file_page_was_freed; ibool file_page_was_freed;
/*!< this is set to TRUE when /*!< this is set to TRUE when
......
...@@ -196,9 +196,9 @@ struct page_recv_t ...@@ -196,9 +196,9 @@ struct page_recv_t
tail= recs; tail= recs;
} }
/** Trim old log records for a page /** Trim old log records for a page.
@param start_lsn oldest log sequence number to preserve @param start_lsn oldest log sequence number to preserve
@return whether the entire log was trimmed */ @return whether all the log for the page was trimmed */
inline bool trim(lsn_t start_lsn); inline bool trim(lsn_t start_lsn);
/** @return the last log snippet */ /** @return the last log snippet */
const log_rec_t* last() const { return tail; } const log_rec_t* last() const { return tail; }
...@@ -215,11 +215,8 @@ struct page_recv_t ...@@ -215,11 +215,8 @@ struct page_recv_t
iterator begin() { return head; } iterator begin() { return head; }
iterator end() { return NULL; } iterator end() { return NULL; }
bool empty() const { ut_ad(!head == !tail); return !head; } bool empty() const { ut_ad(!head == !tail); return !head; }
/** Clear and free the records; @see recv_sys_t::alloc() */
inline void clear(); inline void clear();
#ifdef UNIV_DEBUG
/** Declare the records as freed; @see recv_sys_t::alloc() */
inline void free() const;
#endif
} log; } log;
/** Ignore any earlier redo log records for this page. */ /** Ignore any earlier redo log records for this page. */
...@@ -284,26 +281,26 @@ struct recv_sys_t{ ...@@ -284,26 +281,26 @@ struct recv_sys_t{
using map = std::map<const page_id_t, page_recv_t, using map = std::map<const page_id_t, page_recv_t,
std::less<const page_id_t>, std::less<const page_id_t>,
ut_allocator ut_allocator<std::pair<const page_id_t, page_recv_t>>>;
<std::pair<const page_id_t, page_recv_t>>>;
/** buffered records waiting to be applied to pages */ /** buffered records waiting to be applied to pages */
map pages; map pages;
/** Process a record that indicates that a tablespace is /** Process a record that indicates that a tablespace size is being shrunk.
being shrunk in size. @param page_id first page that is not in the file
@param page_id first page identifier that is not in the file
@param lsn log sequence number of the shrink operation */ @param lsn log sequence number of the shrink operation */
inline void trim(const page_id_t page_id, lsn_t lsn); inline void trim(const page_id_t page_id, lsn_t lsn);
/** Undo tablespaces for which truncate has been logged /** Undo tablespaces for which truncate has been logged
(indexed by id - srv_undo_space_id_start) */ (indexed by page_id_t::space() - srv_undo_space_id_start) */
struct trunc { struct trunc
{
/** log sequence number of MLOG_FILE_CREATE2, or 0 if none */ /** log sequence number of MLOG_FILE_CREATE2, or 0 if none */
lsn_t lsn; lsn_t lsn;
/** truncated size of the tablespace, or 0 if not truncated */ /** truncated size of the tablespace, or 0 if not truncated */
unsigned pages; unsigned pages;
} truncated_undo_spaces[127]; } truncated_undo_spaces[127];
/** The contents of the doublewrite buffer */
recv_dblwr_t dblwr; recv_dblwr_t dblwr;
/** Last added LSN to pages. */ /** Last added LSN to pages. */
...@@ -374,17 +371,9 @@ struct recv_sys_t{ ...@@ -374,17 +371,9 @@ struct recv_sys_t{
@return pointer to len bytes of memory (never NULL) */ @return pointer to len bytes of memory (never NULL) */
inline byte *alloc(size_t len, bool store_recv= false); inline byte *alloc(size_t len, bool store_recv= false);
#ifdef UNIV_DEBUG /** Free a redo log snippet.
private: @param data buffer returned by alloc() */
/** Find the buffer pool block that is storing a redo log record. inline void free(const void *data);
@param[in] data pointer to buffer returned by alloc()
@return redo list element */
inline buf_block_t *find_block(const void *data) const;
public:
/** Declare a redo log record freed from a buffer pool block.
@param[in] data pointer to buffer returned by alloc() */
inline void free(const void *data) const;
#endif
/** @return the free length of the latest alloc() block, in bytes */ /** @return the free length of the latest alloc() block, in bytes */
inline size_t get_free_len() const; inline size_t get_free_len() const;
......
...@@ -152,17 +152,21 @@ struct recv_t : public log_rec_t ...@@ -152,17 +152,21 @@ struct recv_t : public log_rec_t
@param d log snippet @param d log snippet
*/ */
void append(data_t *d) { ut_ad(!next); ut_ad(!d->next); next= d; } void append(data_t *d) { ut_ad(!next); ut_ad(!d->next); next= d; }
#ifdef UNIV_DEBUG }* data;
/** Declare the record freed in the buffer pool */
void free() /** Free the log snippet */
void free() const
{ {
data_t *recv_data= this; data_t *d= data;
do do
recv_sys.free(recv_data); {
while ((recv_data= recv_data->next)); data_t *next= d->next;
recv_sys.free(d);
d= next;
}
while (d);
recv_sys.free(this);
} }
#endif
}* data;
}; };
...@@ -684,7 +688,9 @@ void recv_sys_t::close() ...@@ -684,7 +688,9 @@ void recv_sys_t::close()
if (is_initialised()) { if (is_initialised()) {
dblwr.pages.clear(); dblwr.pages.clear();
pages.clear(); ut_d(mutex_enter(&mutex));
clear();
ut_d(mutex_exit(&mutex));
if (flush_start) { if (flush_start) {
os_event_destroy(flush_start); os_event_destroy(flush_start);
...@@ -828,16 +834,14 @@ inline void recv_sys_t::clear() ...@@ -828,16 +834,14 @@ inline void recv_sys_t::clear()
ut_ad(mutex_own(&mutex)); ut_ad(mutex_own(&mutex));
apply_log_recs= false; apply_log_recs= false;
apply_batch_on= false; apply_batch_on= false;
ut_ad(!after_apply || !UT_LIST_GET_LAST(blocks));
pages.clear(); pages.clear();
for (buf_block_t *block= UT_LIST_GET_LAST(blocks); block; ) for (buf_block_t *block= UT_LIST_GET_LAST(blocks); block; )
{ {
buf_block_t *prev_block= UT_LIST_GET_PREV(unzip_LRU, block); buf_block_t *prev_block= UT_LIST_GET_PREV(unzip_LRU, block);
ut_ad(buf_block_get_state(block) == BUF_BLOCK_MEMORY); ut_ad(buf_block_get_state(block) == BUF_BLOCK_MEMORY);
/* Check buf_fix_count after applying all buffered redo log records */
ut_ad(!after_apply || !block->page.buf_fix_count);
UT_LIST_REMOVE(blocks, block); UT_LIST_REMOVE(blocks, block);
ut_d(block->page.buf_fix_count= 0);
buf_block_free(block); buf_block_free(block);
block= prev_block; block= prev_block;
} }
...@@ -868,11 +872,13 @@ void recv_sys_t::debug_free() ...@@ -868,11 +872,13 @@ void recv_sys_t::debug_free()
inline size_t recv_sys_t::get_free_len() const inline size_t recv_sys_t::get_free_len() const
{ {
if (UT_LIST_GET_LEN(blocks) == 0) if (const buf_block_t* block= UT_LIST_GET_FIRST(blocks))
{
if (const size_t used= static_cast<uint16_t>(block->page.access_time))
return srv_page_size - used;
ut_ad(srv_page_size == 65536);
}
return 0; return 0;
return srv_page_size -
static_cast<size_t>(UT_LIST_GET_FIRST(blocks)->modify_clock);
} }
inline byte* recv_sys_t::alloc(size_t len, bool store_recv) inline byte* recv_sys_t::alloc(size_t len, bool store_recv)
...@@ -886,41 +892,59 @@ inline byte* recv_sys_t::alloc(size_t len, bool store_recv) ...@@ -886,41 +892,59 @@ inline byte* recv_sys_t::alloc(size_t len, bool store_recv)
{ {
create_block: create_block:
block= buf_block_alloc(nullptr); block= buf_block_alloc(nullptr);
block->modify_clock= len; block->page.access_time= 1U << 16 | static_cast<uint16_t>(len);
UT_LIST_ADD_FIRST(blocks, block); UT_LIST_ADD_FIRST(blocks, block);
UNIV_MEM_INVALID(block->frame, len);
UNIV_MEM_FREE(block->frame + len, srv_page_size - len);
return block->frame; return block->frame;
} }
size_t free_offset= static_cast<size_t>(block->modify_clock); size_t free_offset= static_cast<uint16_t>(block->page.access_time);
if (UNIV_UNLIKELY(!free_offset))
{
ut_ad(srv_page_size == 65536);
goto create_block;
}
ut_ad(free_offset <= srv_page_size); ut_ad(free_offset <= srv_page_size);
free_offset+= len;
if (store_recv && if (store_recv && free_offset + sizeof(recv_t::data) + 1 > srv_page_size)
free_offset + len + sizeof(recv_t::data) + 1 > srv_page_size)
goto create_block; goto create_block;
if (free_offset + len > srv_page_size) if (free_offset > srv_page_size)
goto create_block; goto create_block;
block->modify_clock= free_offset + len;
return block->frame + free_offset; block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 |
static_cast<uint16_t>(free_offset);
UNIV_MEM_ALLOC(block->frame + free_offset - len, len);
return block->frame + free_offset - len;
} }
#ifdef UNIV_DEBUG
inline buf_block_t *recv_sys_t::find_block(const void* data) const /** Free a redo log snippet.
@param data buffer returned by alloc() */
inline void recv_sys_t::free(const void *data)
{ {
data= page_align(data); data= page_align(data);
ut_ad(mutex_own(&mutex));
for (buf_block_t *block= UT_LIST_GET_LAST(blocks); for (buf_block_t *block= UT_LIST_GET_LAST(blocks);
block; block = UT_LIST_GET_PREV(unzip_LRU, block)) block; block = UT_LIST_GET_PREV(unzip_LRU, block))
{
ut_ad(buf_block_get_state(block) == BUF_BLOCK_MEMORY);
ut_ad(block->page.access_time >= 1U << 16);
if (block->frame == data) if (block->frame == data)
return block; {
if (!((block->page.access_time -= 1U << 16) >> 16))
{
UT_LIST_REMOVE(blocks, block);
buf_block_free(block);
}
return;
}
}
ut_ad(0); ut_ad(0);
return nullptr;
} }
inline void recv_sys_t::free(const void *data) const
{
find_block(data)->unfix();
}
#endif
/** Read a log segment to log_sys.buf. /** Read a log segment to log_sys.buf.
@param[in,out] start_lsn in: read area start, @param[in,out] start_lsn in: read area start,
...@@ -1826,7 +1850,6 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id, ...@@ -1826,7 +1850,6 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
const size_t l= std::min(len, get_free_len() - sizeof(recv_t::data)); const size_t l= std::min(len, get_free_len() - sizeof(recv_t::data));
recv_t::data_t *d= new (alloc(sizeof(recv_t::data) + l)) recv_t::data_t *d= new (alloc(sizeof(recv_t::data) + l))
recv_t::data_t(body, l); recv_t::data_t(body, l);
ut_d(find_block(d)->fix());
if (prev) if (prev)
prev->append(d); prev->append(d);
else else
...@@ -1840,30 +1863,32 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id, ...@@ -1840,30 +1863,32 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
} }
} }
/** Trim old log records for a page /** Trim old log records for a page.
@param start_lsn oldest log sequence number to preserve @param start_lsn oldest log sequence number to preserve
@return whether the entire log was trimmed */ @return whether all the log for the page was trimmed */
inline bool page_recv_t::recs_t::trim(lsn_t start_lsn) inline bool page_recv_t::recs_t::trim(lsn_t start_lsn)
{ {
for (log_rec_t** prev= &head; *prev; *prev= (*prev)->next) while (head)
{ {
if ((*prev)->lsn >= start_lsn) return false; if (head->lsn >= start_lsn) return false;
ut_d(static_cast<const recv_t*>(*prev)->data->free()); log_rec_t *next= head->next;
static_cast<const recv_t*>(head)->free();
head= next;
} }
tail= nullptr;
return true; return true;
} }
#ifdef UNIV_DEBUG
inline void page_recv_t::recs_t::free() const
{
for (const log_rec_t *l= head; l; l= l->next)
static_cast<const recv_t*>(l)->data->free();
}
#endif
inline void page_recv_t::recs_t::clear() inline void page_recv_t::recs_t::clear()
{ {
ut_d(free()); ut_ad(mutex_own(&recv_sys.mutex));
for (const log_rec_t *l= head; l; )
{
const log_rec_t *next= l->next;
static_cast<const recv_t*>(l)->free();
l= next;
}
head= tail= nullptr; head= tail= nullptr;
} }
...@@ -2016,8 +2041,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr, ...@@ -2016,8 +2041,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
end_lsn); end_lsn);
} }
} }
ut_d(recv->data->free(););
} }
#ifdef UNIV_ZIP_DEBUG #ifdef UNIV_ZIP_DEBUG
...@@ -2055,7 +2078,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr, ...@@ -2055,7 +2078,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
ut_ad(p->second.is_being_processed()); ut_ad(p->second.is_being_processed());
ut_ad(!recv_sys.pages.empty()); ut_ad(!recv_sys.pages.empty());
recv_sys.pages.erase(p);
if (recv_sys.report(now)) { if (recv_sys.report(now)) {
const ulint n = recv_sys.pages.size(); const ulint n = recv_sys.pages.size();
...@@ -2071,12 +2093,12 @@ This function should only be called when innodb_force_recovery is set. ...@@ -2071,12 +2093,12 @@ This function should only be called when innodb_force_recovery is set.
ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id) ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id)
{ {
mutex_enter(&mutex); mutex_enter(&mutex);
#ifdef UNIV_DEBUG map::iterator p= pages.find(page_id);
map::const_iterator p= pages.find(page_id);
if (p != pages.end()) if (p != pages.end())
p->second.log.free(); {
#endif p->second.log.clear();
pages.erase(page_id); pages.erase(p);
}
mutex_exit(&mutex); mutex_exit(&mutex);
} }
...@@ -2106,6 +2128,8 @@ void recv_recover_page(buf_page_t* bpage) ...@@ -2106,6 +2128,8 @@ void recv_recover_page(buf_page_t* bpage)
if (p != recv_sys.pages.end() if (p != recv_sys.pages.end()
&& !p->second.is_being_processed()) { && !p->second.is_being_processed()) {
recv_recover_page(block, mtr, p); recv_recover_page(block, mtr, p);
p->second.log.clear();
recv_sys.pages.erase(p);
goto func_exit; goto func_exit;
} }
} }
...@@ -2239,8 +2263,15 @@ void recv_apply_hashed_log_recs(bool last_batch) ...@@ -2239,8 +2263,15 @@ void recv_apply_hashed_log_recs(bool last_batch)
} else { } else {
mtr.commit(); mtr.commit();
recv_read_in_area(page_id); recv_read_in_area(page_id);
}
break; break;
}
ignore:
{
recv_sys_t::map::iterator r = p++;
r->second.log.clear();
recv_sys.pages.erase(r);
}
continue;
case page_recv_t::RECV_WILL_NOT_READ: case page_recv_t::RECV_WILL_NOT_READ:
mlog_init_t::init& i = mlog_init.last(page_id); mlog_init_t::init& i = mlog_init.last(page_id);
const lsn_t end_lsn = recs.log.last()->lsn; const lsn_t end_lsn = recs.log.last()->lsn;
...@@ -2249,11 +2280,7 @@ void recv_apply_hashed_log_recs(bool last_batch) ...@@ -2249,11 +2280,7 @@ void recv_apply_hashed_log_recs(bool last_batch)
<< page_id << page_id
<< " LSN " << end_lsn << " LSN " << end_lsn
<< " < " << i.lsn); << " < " << i.lsn);
ignore: goto ignore;
recv_sys_t::map::iterator r = p++;
ut_d(r->second.log.free());
recv_sys.pages.erase(r);
continue;
} }
fil_space_t* space = fil_space_acquire_for_io( fil_space_t* space = fil_space_acquire_for_io(
...@@ -2311,6 +2338,8 @@ void recv_apply_hashed_log_recs(bool last_batch) ...@@ -2311,6 +2338,8 @@ void recv_apply_hashed_log_recs(bool last_batch)
mtr.x_latch_at_savepoint(0, block); mtr.x_latch_at_savepoint(0, block);
recv_recover_page(block, mtr, p, &i); recv_recover_page(block, mtr, p, &i);
ut_ad(mtr.has_committed()); ut_ad(mtr.has_committed());
p->second.log.clear();
recv_sys.pages.erase(p);
} }
space->release_for_io(); space->release_for_io();
...@@ -3311,6 +3340,8 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace) ...@@ -3311,6 +3340,8 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
{ {
dberr_t err = DB_SUCCESS; dberr_t err = DB_SUCCESS;
mutex_enter(&recv_sys.mutex);
for (recv_sys_t::map::iterator p = recv_sys.pages.begin(); for (recv_sys_t::map::iterator p = recv_sys.pages.begin();
p != recv_sys.pages.end();) { p != recv_sys.pages.end();) {
ut_ad(!p->second.log.empty()); ut_ad(!p->second.log.empty());
...@@ -3333,7 +3364,7 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace) ...@@ -3333,7 +3364,7 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
/* fall through */ /* fall through */
case file_name_t::DELETED: case file_name_t::DELETED:
recv_sys_t::map::iterator r = p++; recv_sys_t::map::iterator r = p++;
ut_d(r->second.log.free();); r->second.log.clear();
recv_sys.pages.erase(r); recv_sys.pages.erase(r);
continue; continue;
} }
...@@ -3341,6 +3372,8 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace) ...@@ -3341,6 +3372,8 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
} }
if (err != DB_SUCCESS) { if (err != DB_SUCCESS) {
func_exit:
mutex_exit(&recv_sys.mutex);
return(err); return(err);
} }
...@@ -3375,7 +3408,8 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace) ...@@ -3375,7 +3408,8 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
missing_tablespace = false; missing_tablespace = false;
} }
return DB_SUCCESS; err = DB_SUCCESS;
goto func_exit;
} }
/** Check if all tablespaces were found for crash recovery. /** Check if all tablespaces were found for crash recovery.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment