Commit a5a2ef07 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-23855: Implement asynchronous doublewrite

Synchronous writes and calls to fdatasync(), fsync() or
FlushFileBuffers() would ruin performance. So, let us
submit asynchronous writes for the doublewrite buffer.
We submit a single request for the likely case that the
two doublewrite buffers are contiquous in the system tablespace.

buf_dblwr_t::flush_buffered_writes_completed(): The completion callback
of buf_dblwr_t::flush_buffered_writes().

os_aio_wait_until_no_pending_writes(): Also wait for doublewrite batches.

buf_dblwr_t::element::space: Remove. We can simply use
element::request.node->space instead.

Reviewed by: Vladislav Vaintroub
parent ef3f71fa
......@@ -570,6 +570,7 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size)
}
ut_ad(active_slot->reserved == active_slot->first_free);
ut_ad(!flushing_buffered_writes);
/* Disallow anyone else to start another batch of flushing. */
slot *flush_slot= active_slot;
......@@ -579,7 +580,9 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size)
batch_running= true;
const ulint old_first_free= flush_slot->first_free;
auto write_buf= flush_slot->write_buf;
const bool multi_batch= block1 + static_cast<uint32_t>(size) != block2 &&
old_first_free > size;
flushing_buffered_writes= 1 + multi_batch;
/* Now safe to release the mutex. */
mysql_mutex_unlock(&mutex);
#ifdef UNIV_DEBUG
......@@ -597,25 +600,48 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size)
ut_d(buf_dblwr_check_page_lsn(*bpage, write_buf + len2));
}
#endif /* UNIV_DEBUG */
/* Write out the first block of the doublewrite buffer */
const IORequest request(nullptr, fil_system.sys_space->chain.start,
IORequest::DBLWR_BATCH);
ut_a(fil_system.sys_space->acquire());
fil_system.sys_space->io(IORequestWrite,
os_offset_t{block1.page_no()} <<
srv_page_size_shift,
std::min(size, old_first_free) <<
srv_page_size_shift, write_buf);
if (old_first_free > size)
if (multi_batch)
{
/* Write out the second block of the doublewrite buffer. */
ut_a(fil_system.sys_space->acquire());
fil_system.sys_space->io(IORequestWrite,
os_offset_t{block2.page_no()} <<
srv_page_size_shift,
(old_first_free - size) << srv_page_size_shift,
write_buf + (size << srv_page_size_shift));
fil_system.sys_space->reacquire();
os_aio(request, write_buf,
os_offset_t{block1.page_no()} << srv_page_size_shift,
size << srv_page_size_shift);
os_aio(request, write_buf + (size << srv_page_size_shift),
os_offset_t{block2.page_no()} << srv_page_size_shift,
(old_first_free - size) << srv_page_size_shift);
}
else
os_aio(request, write_buf,
os_offset_t{block1.page_no()} << srv_page_size_shift,
old_first_free << srv_page_size_shift);
srv_stats.data_written.add(old_first_free);
return true;
}
void buf_dblwr_t::flush_buffered_writes_completed(const IORequest &request)
{
ut_ad(this == &buf_dblwr);
ut_ad(srv_use_doublewrite_buf);
ut_ad(is_initialised());
ut_ad(!srv_read_only_mode);
ut_ad(!request.bpage);
ut_ad(request.node == fil_system.sys_space->chain.start);
ut_ad(request.type == IORequest::DBLWR_BATCH);
mysql_mutex_lock(&mutex);
ut_ad(batch_running);
ut_ad(flushing_buffered_writes);
ut_ad(flushing_buffered_writes <= 2);
const bool completed= !--flushing_buffered_writes;
mysql_mutex_unlock(&mutex);
if (!completed)
return;
slot *const flush_slot= active_slot == &slots[0] ? &slots[1] : &slots[0];
ut_ad(flush_slot->reserved == flush_slot->first_free);
/* increment the doublewrite flushed pages counter */
srv_stats.dblwr_pages_written.add(flush_slot->first_free);
srv_stats.dblwr_writes.inc();
......@@ -623,15 +649,9 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size)
/* Now flush the doublewrite buffer data to disk */
fil_system.sys_space->flush();
/* We know that the writes have been flushed to disk now
and in recovery we will find them in the doublewrite buffer
blocks. Next do the writes to the intended positions. */
ut_ad(active_slot != flush_slot);
ut_ad(flush_slot->first_free == old_first_free);
for (ulint i= 0; i < old_first_free; i++)
/* The writes have been flushed to disk now and in recovery we will
find them in the doublewrite buffer blocks. Next, write the data pages. */
for (ulint i= 0, first_free= flush_slot->first_free; i < first_free; i++)
{
auto e= flush_slot->buf_block_arr[i];
buf_page_t* bpage= e.request.bpage;
......@@ -655,10 +675,9 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size)
ut_d(buf_dblwr_check_page_lsn(*bpage, static_cast<const byte*>(frame)));
}
e.space->io(e.request, bpage->physical_offset(), e_size, frame, bpage);
e.request.node->space->io(e.request, bpage->physical_offset(), e_size,
frame, bpage);
}
return true;
}
/** Flush possible buffered writes to persistent storage.
......@@ -684,18 +703,17 @@ void buf_dblwr_t::flush_buffered_writes()
/** Schedule a page write. If the doublewrite memory buffer is full,
flush_buffered_writes() will be invoked to make space.
@param space tablespace
@param request asynchronous write request
@param size payload size in bytes */
void buf_dblwr_t::add_to_batch(fil_space_t *space, const IORequest &request,
size_t size)
void buf_dblwr_t::add_to_batch(const IORequest &request, size_t size)
{
ut_ad(request.is_async());
ut_ad(request.is_write());
ut_ad(request.bpage);
ut_ad(request.bpage->in_file());
ut_ad(space->id == request.bpage->id().space());
ut_ad(space->referenced());
ut_ad(request.node);
ut_ad(request.node->space->id == request.bpage->id().space());
ut_ad(request.node->space->referenced());
ut_ad(!srv_read_only_mode);
const ulint buf_size= 2 * block_size();
......@@ -723,7 +741,7 @@ void buf_dblwr_t::add_to_batch(fil_space_t *space, const IORequest &request,
ut_ad(active_slot->reserved == active_slot->first_free);
ut_ad(active_slot->reserved < buf_size);
new (active_slot->buf_block_arr + active_slot->first_free++)
element{space, request, size};
element{request, size};
active_slot->reserved= active_slot->first_free;
if (active_slot->first_free != buf_size ||
......
......@@ -921,7 +921,7 @@ static bool buf_flush_page(buf_page_t *bpage, bool lru, fil_space_t *space)
space->io(IORequest(type, bpage),
bpage->physical_offset(), size, frame, bpage);
else
buf_dblwr.add_to_batch(space, IORequest(type, bpage), size);
buf_dblwr.add_to_batch(IORequest(bpage, space->chain.start, type), size);
}
/* Increment the I/O operation count used for selecting LRU policy. */
......
......@@ -3355,11 +3355,12 @@ fil_io_t fil_space_t::io(const IORequest &type, os_offset_t offset, size_t len,
ut_ad(offset % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_ad((len % OS_FILE_LOG_BLOCK_SIZE) == 0);
ut_ad(fil_validate_skip());
ut_ad(type.is_read() || type.is_write());
ut_ad(type.type != IORequest::DBLWR_BATCH);
if (type.is_read()) {
srv_stats.data_read.add(len);
} else {
ut_ad(type.is_write() || type.type == IORequest::PUNCH_RANGE);
ut_ad(!srv_read_only_mode || this == fil_system.temp_space);
srv_stats.data_written.add(len);
}
......@@ -3457,7 +3458,10 @@ void fil_aio_callback(const IORequest &request)
if (!request.bpage)
{
ut_ad(!srv_read_only_mode);
ut_ad(request.type == IORequest::WRITE_ASYNC);
if (request.type == IORequest::DBLWR_BATCH)
buf_dblwr.flush_buffered_writes_completed(request);
else
ut_ad(request.type == IORequest::WRITE_ASYNC);
write_completed:
request.node->complete_write();
}
......
......@@ -34,8 +34,6 @@ class buf_dblwr_t
{
struct element
{
/** tablespace */
fil_space_t *space;
/** asynchronous write request */
IORequest request;
/** payload size in bytes */
......@@ -66,10 +64,11 @@ class buf_dblwr_t
mysql_cond_t cond;
/** whether a batch is being written from the doublewrite buffer */
bool batch_running;
/** number of expected flush_buffered_writes_completed() calls */
unsigned flushing_buffered_writes;
slot slots[2];
slot *active_slot=&slots[0];
slot *active_slot= &slots[0];
/** Initialize the doublewrite buffer data structure.
@param header doublewrite page header in the TRX_SYS page */
......@@ -98,24 +97,25 @@ class buf_dblwr_t
/** Process and remove the double write buffer pages for all tablespaces. */
void recover();
/** Update the doublewrite buffer on write completion. */
/** Update the doublewrite buffer on data page write completion. */
void write_completed();
/** Flush possible buffered writes to persistent storage.
It is very important to call this function after a batch of writes has been
posted, and also when we may have to wait for a page latch!
Otherwise a deadlock of threads can occur. */
void flush_buffered_writes();
/** Update the doublewrite buffer on write batch completion
@param request the completed batch write request */
void flush_buffered_writes_completed(const IORequest &request);
/** Size of the doublewrite block in pages */
uint32_t block_size() const { return FSP_EXTENT_SIZE; }
/** Schedule a page write. If the doublewrite memory buffer is full,
flush_buffered_writes() will be invoked to make space.
@param space tablespace
@param request asynchronous write request
@param size payload size in bytes */
void add_to_batch(fil_space_t *space, const IORequest &request,
size_t size) MY_ATTRIBUTE((nonnull));
void add_to_batch(const IORequest &request, size_t size);
/** Determine whether the doublewrite buffer is initialized */
bool is_initialised() const
......@@ -132,6 +132,18 @@ class buf_dblwr_t
const uint32_t size= block_size();
return id < block1 + size || (id >= block2 && id < block2 + size);
}
/** Wait for flush_buffered_writes() to be fully completed */
void wait_flush_buffered_writes()
{
if (is_initialised())
{
mysql_mutex_lock(&mutex);
while (batch_running)
mysql_cond_wait(&cond, &mutex);
mysql_mutex_unlock(&mutex);
}
}
};
/** The doublewrite buffer */
......
......@@ -198,6 +198,8 @@ class IORequest
WRITE_SYNC= 16,
/** Asynchronous write */
WRITE_ASYNC= WRITE_SYNC | 1,
/** A doublewrite batch */
DBLWR_BATCH= WRITE_ASYNC | 8,
/** Write data; evict the block on write completion */
WRITE_LRU= WRITE_ASYNC | 32,
/** Write data and punch hole for the rest */
......
......@@ -78,6 +78,8 @@ Created 10/21/1995 Heikki Tuuri
#include <my_sys.h>
#endif
#include "buf0dblwr.h"
#include <thread>
#include <chrono>
......@@ -4041,9 +4043,8 @@ void os_aio_free()
write_slots= nullptr;
}
/** Waits until there are no pending writes. There can
be other, synchronous, pending writes. */
void os_aio_wait_until_no_pending_writes()
/** Wait until there are no pending asynchronous writes. */
static void os_aio_wait_until_no_pending_writes_low()
{
bool notify_wait = write_slots->pending_io_count() > 0;
......@@ -4056,6 +4057,14 @@ void os_aio_wait_until_no_pending_writes()
tpool::tpool_wait_end();
}
/** Waits until there are no pending writes. There can
be other, synchronous, pending writes. */
void os_aio_wait_until_no_pending_writes()
{
os_aio_wait_until_no_pending_writes_low();
buf_dblwr.wait_flush_buffered_writes();
}
/** Request a read or write.
@param type I/O request
@param buf buffer
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment