Commit ef3f71fa authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-23399 fixup: Interleaved doublewrite batches

Author: Vladislav Vaintroub
parent 8cb01c51
...@@ -53,8 +53,8 @@ inline buf_block_t *buf_dblwr_trx_sys_get(mtr_t *mtr) ...@@ -53,8 +53,8 @@ inline buf_block_t *buf_dblwr_trx_sys_get(mtr_t *mtr)
@param header doublewrite page header in the TRX_SYS page */ @param header doublewrite page header in the TRX_SYS page */
inline void buf_dblwr_t::init(const byte *header) inline void buf_dblwr_t::init(const byte *header)
{ {
ut_ad(!first_free); ut_ad(!active_slot->first_free);
ut_ad(!reserved); ut_ad(!active_slot->reserved);
ut_ad(!batch_running); ut_ad(!batch_running);
mysql_mutex_init(buf_dblwr_mutex_key, &mutex, nullptr); mysql_mutex_init(buf_dblwr_mutex_key, &mutex, nullptr);
...@@ -63,10 +63,14 @@ inline void buf_dblwr_t::init(const byte *header) ...@@ -63,10 +63,14 @@ inline void buf_dblwr_t::init(const byte *header)
block2= page_id_t(0, mach_read_from_4(header + TRX_SYS_DOUBLEWRITE_BLOCK2)); block2= page_id_t(0, mach_read_from_4(header + TRX_SYS_DOUBLEWRITE_BLOCK2));
const uint32_t buf_size= 2 * block_size(); const uint32_t buf_size= 2 * block_size();
write_buf= static_cast<byte*>(aligned_malloc(buf_size << srv_page_size_shift, for (int i= 0; i < 2; i++)
srv_page_size)); {
buf_block_arr= static_cast<element*> slots[i].write_buf= static_cast<byte*>
(ut_zalloc_nokey(buf_size * sizeof(element))); (aligned_malloc(buf_size << srv_page_size_shift, srv_page_size));
slots[i].buf_block_arr= static_cast<element*>
(ut_zalloc_nokey(buf_size * sizeof(element)));
}
active_slot= &slots[0];
} }
/** Create or restore the doublewrite buffer in the TRX_SYS page. /** Create or restore the doublewrite buffer in the TRX_SYS page.
...@@ -272,6 +276,7 @@ dberr_t buf_dblwr_t::init_or_load_pages(pfs_os_file_t file, const char *path) ...@@ -272,6 +276,7 @@ dberr_t buf_dblwr_t::init_or_load_pages(pfs_os_file_t file, const char *path)
TRX_SYS_DOUBLEWRITE + read_buf) != TRX_SYS_DOUBLEWRITE + read_buf) !=
TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N; TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N;
auto write_buf= active_slot->write_buf;
/* Read the pages from the doublewrite buffer to memory */ /* Read the pages from the doublewrite buffer to memory */
err= os_file_read(IORequestRead, file, write_buf, err= os_file_read(IORequestRead, file, write_buf,
block1.page_no() << srv_page_size_shift, block1.page_no() << srv_page_size_shift,
...@@ -443,16 +448,20 @@ void buf_dblwr_t::close() ...@@ -443,16 +448,20 @@ void buf_dblwr_t::close()
return; return;
/* Free the double write data structures. */ /* Free the double write data structures. */
ut_ad(!reserved); ut_ad(!active_slot->reserved);
ut_ad(!first_free); ut_ad(!active_slot->first_free);
ut_ad(!batch_running); ut_ad(!batch_running);
mysql_cond_destroy(&cond); mysql_cond_destroy(&cond);
aligned_free(write_buf); for (int i= 0; i < 2; i++)
ut_free(buf_block_arr); {
aligned_free(slots[i].write_buf);
ut_free(slots[i].buf_block_arr);
}
mysql_mutex_destroy(&mutex); mysql_mutex_destroy(&mutex);
memset((void*) this, 0, sizeof *this); memset((void*) this, 0, sizeof *this);
active_slot= &slots[0];
} }
/** Update the doublewrite buffer on write completion. */ /** Update the doublewrite buffer on write completion. */
...@@ -466,10 +475,11 @@ void buf_dblwr_t::write_completed() ...@@ -466,10 +475,11 @@ void buf_dblwr_t::write_completed()
mysql_mutex_lock(&mutex); mysql_mutex_lock(&mutex);
ut_ad(batch_running); ut_ad(batch_running);
ut_ad(reserved); slot *flush_slot= active_slot == &slots[0] ? &slots[1] : &slots[0];
ut_ad(reserved <= first_free); ut_ad(flush_slot->reserved);
ut_ad(flush_slot->reserved <= flush_slot->first_free);
if (!--reserved) if (!--flush_slot->reserved)
{ {
mysql_mutex_unlock(&mutex); mysql_mutex_unlock(&mutex);
/* This will finish the batch. Sync data files to the disk. */ /* This will finish the batch. Sync data files to the disk. */
...@@ -477,7 +487,7 @@ void buf_dblwr_t::write_completed() ...@@ -477,7 +487,7 @@ void buf_dblwr_t::write_completed()
mysql_mutex_lock(&mutex); mysql_mutex_lock(&mutex);
/* We can now reuse the doublewrite memory buffer: */ /* We can now reuse the doublewrite memory buffer: */
first_free= 0; flush_slot->first_free= 0;
batch_running= false; batch_running= false;
mysql_cond_broadcast(&cond); mysql_cond_broadcast(&cond);
} }
...@@ -552,25 +562,30 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size) ...@@ -552,25 +562,30 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size)
for (;;) for (;;)
{ {
if (!first_free) if (!active_slot->first_free)
return false; return false;
if (!batch_running) if (!batch_running)
break; break;
mysql_cond_wait(&cond, &mutex); mysql_cond_wait(&cond, &mutex);
} }
ut_ad(reserved == first_free); ut_ad(active_slot->reserved == active_slot->first_free);
/* Disallow anyone else to post to doublewrite buffer or to
start another batch of flushing. */ /* Disallow anyone else to start another batch of flushing. */
slot *flush_slot= active_slot;
/* Switch the active slot */
active_slot= active_slot == &slots[0] ? &slots[1] : &slots[0];
ut_a(active_slot->first_free == 0);
batch_running= true; batch_running= true;
const ulint old_first_free= first_free; const ulint old_first_free= flush_slot->first_free;
auto write_buf= flush_slot->write_buf;
/* Now safe to release the mutex. */ /* Now safe to release the mutex. */
mysql_mutex_unlock(&mutex); mysql_mutex_unlock(&mutex);
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
for (ulint len2= 0, i= 0; i < old_first_free; len2 += srv_page_size, i++) for (ulint len2= 0, i= 0; i < old_first_free; len2 += srv_page_size, i++)
{ {
buf_page_t *bpage= buf_block_arr[i].request.bpage; buf_page_t *bpage= flush_slot->buf_block_arr[i].request.bpage;
if (bpage->zip.data) if (bpage->zip.data)
/* No simple validate for ROW_FORMAT=COMPRESSED pages exists. */ /* No simple validate for ROW_FORMAT=COMPRESSED pages exists. */
...@@ -602,7 +617,7 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size) ...@@ -602,7 +617,7 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size)
} }
/* increment the doublewrite flushed pages counter */ /* increment the doublewrite flushed pages counter */
srv_stats.dblwr_pages_written.add(first_free); srv_stats.dblwr_pages_written.add(flush_slot->first_free);
srv_stats.dblwr_writes.inc(); srv_stats.dblwr_writes.inc();
/* Now flush the doublewrite buffer data to disk */ /* Now flush the doublewrite buffer data to disk */
...@@ -612,20 +627,13 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size) ...@@ -612,20 +627,13 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size)
and in recovery we will find them in the doublewrite buffer and in recovery we will find them in the doublewrite buffer
blocks. Next do the writes to the intended positions. */ blocks. Next do the writes to the intended positions. */
/* Up to this point old_first_free == first_free because we have set
the batch_running flag disallowing any other thread to post any ut_ad(active_slot != flush_slot);
request but we can't safely access first_free in the loop below. ut_ad(flush_slot->first_free == old_first_free);
This is so because it is possible that after we are done with the
last iteration and before we terminate the loop, the batch gets
finished in the IO helper thread and another thread posts a new
batch setting first_free to a higher value. If this happens and we
are using first_free in the loop termination condition then we'll
end up dispatching the same block twice from two different
threads. */
ut_ad(old_first_free == first_free);
for (ulint i= 0; i < old_first_free; i++) for (ulint i= 0; i < old_first_free; i++)
{ {
auto e= buf_block_arr[i]; auto e= flush_slot->buf_block_arr[i];
buf_page_t* bpage= e.request.bpage; buf_page_t* bpage= e.request.bpage;
ut_ad(bpage->in_file()); ut_ad(bpage->in_file());
...@@ -696,18 +704,15 @@ void buf_dblwr_t::add_to_batch(fil_space_t *space, const IORequest &request, ...@@ -696,18 +704,15 @@ void buf_dblwr_t::add_to_batch(fil_space_t *space, const IORequest &request,
for (;;) for (;;)
{ {
while (batch_running) ut_ad(active_slot->first_free <= buf_size);
mysql_cond_wait(&cond, &mutex); if (active_slot->first_free != buf_size)
ut_ad(first_free <= buf_size);
if (first_free != buf_size)
break; break;
if (flush_buffered_writes(buf_size / 2)) if (flush_buffered_writes(buf_size / 2))
mysql_mutex_lock(&mutex); mysql_mutex_lock(&mutex);
} }
byte *p= write_buf + srv_page_size * first_free; byte *p= active_slot->write_buf + srv_page_size * active_slot->first_free;
/* We request frame here to get correct buffer in case of /* We request frame here to get correct buffer in case of
encryption and/or page compression */ encryption and/or page compression */
...@@ -715,11 +720,13 @@ void buf_dblwr_t::add_to_batch(fil_space_t *space, const IORequest &request, ...@@ -715,11 +720,13 @@ void buf_dblwr_t::add_to_batch(fil_space_t *space, const IORequest &request,
memcpy_aligned<OS_FILE_LOG_BLOCK_SIZE>(p, frame, size); memcpy_aligned<OS_FILE_LOG_BLOCK_SIZE>(p, frame, size);
ut_ad(!request.bpage->zip_size() || request.bpage->zip_size() == size); ut_ad(!request.bpage->zip_size() || request.bpage->zip_size() == size);
ut_ad(reserved == first_free); ut_ad(active_slot->reserved == active_slot->first_free);
ut_ad(reserved < buf_size); ut_ad(active_slot->reserved < buf_size);
new (buf_block_arr + first_free++) element{space, request, size}; new (active_slot->buf_block_arr + active_slot->first_free++)
reserved= first_free; element{space, request, size};
active_slot->reserved= active_slot->first_free;
if (first_free != buf_size || !flush_buffered_writes(buf_size / 2))
if (active_slot->first_free != buf_size ||
!flush_buffered_writes(buf_size / 2))
mysql_mutex_unlock(&mutex); mysql_mutex_unlock(&mutex);
} }
...@@ -32,6 +32,29 @@ Created 2011/12/19 Inaam Rana ...@@ -32,6 +32,29 @@ Created 2011/12/19 Inaam Rana
/** Doublewrite control struct */ /** Doublewrite control struct */
class buf_dblwr_t class buf_dblwr_t
{ {
struct element
{
/** tablespace */
fil_space_t *space;
/** asynchronous write request */
IORequest request;
/** payload size in bytes */
size_t size;
};
struct slot
{
/** first free position in write_buf measured in units of
* srv_page_size */
ulint first_free;
/** number of slots reserved for the current write batch */
ulint reserved;
/** the doublewrite buffer, aligned to srv_page_size */
byte* write_buf;
/** buffer blocks to be written via write_buf */
element* buf_block_arr;
};
/** the page number of the first doublewrite block (block_size() pages) */ /** the page number of the first doublewrite block (block_size() pages) */
page_id_t block1= page_id_t(0, 0); page_id_t block1= page_id_t(0, 0);
/** the page number of the second doublewrite block (block_size() pages) */ /** the page number of the second doublewrite block (block_size() pages) */
...@@ -43,25 +66,10 @@ class buf_dblwr_t ...@@ -43,25 +66,10 @@ class buf_dblwr_t
mysql_cond_t cond; mysql_cond_t cond;
/** whether a batch is being written from the doublewrite buffer */ /** whether a batch is being written from the doublewrite buffer */
bool batch_running; bool batch_running;
/** first free position in write_buf measured in units of srv_page_size */
ulint first_free;
/** number of slots reserved for the current write batch */
ulint reserved;
/** the doublewrite buffer, aligned to srv_page_size */
byte *write_buf;
struct element slot slots[2];
{ slot *active_slot=&slots[0];
/** tablespace */
fil_space_t *space;
/** asynchronous write request */
IORequest request;
/** payload size in bytes */
size_t size;
};
/** buffer blocks to be written via write_buf */
element *buf_block_arr;
/** Initialize the doublewrite buffer data structure. /** Initialize the doublewrite buffer data structure.
@param header doublewrite page header in the TRX_SYS page */ @param header doublewrite page header in the TRX_SYS page */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment