Commit 8cb01c51 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-16264 fixup: Clean up asynchronous I/O

os_aio_userdata_t: Remove. It was basically duplicating IORequest.

buf_page_write_complete(): Take only IORequest as a parameter.

os_aio_func(), pfs_os_aio_func(): Replaced with os_aio() that has
no redundant parameters. There is only one caller, so there is no
point to pass __FILE__, __LINE__ as a parameter.
parent 118e258a
...@@ -302,16 +302,28 @@ buf_flush_relocate_on_flush_list( ...@@ -302,16 +302,28 @@ buf_flush_relocate_on_flush_list(
} }
/** Complete write of a file page from buf_pool. /** Complete write of a file page from buf_pool.
@param bpage written page @param request write request */
@param request write request void buf_page_write_complete(const IORequest &request)
@param dblwr whether the doublewrite buffer was used */
void buf_page_write_complete(buf_page_t *bpage, const IORequest &request,
bool dblwr)
{ {
ut_ad(request.is_write()); ut_ad(request.is_write());
ut_ad(!srv_read_only_mode/* ||
request.node->space->purpose == FIL_TYPE_TEMPORARY*/);
buf_page_t *bpage= request.bpage;
ut_ad(bpage);
ut_ad(bpage->in_file()); ut_ad(bpage->in_file());
ut_ad(bpage->io_fix() == BUF_IO_WRITE); ut_ad(bpage->io_fix() == BUF_IO_WRITE);
ut_ad(!buf_dblwr.is_inside(bpage->id())); ut_ad(!buf_dblwr.is_inside(bpage->id()));
bool dblwr;
if (bpage->status == buf_page_t::INIT_ON_FLUSH)
{
bpage->status= buf_page_t::NORMAL;
dblwr= false;
}
else
{
ut_ad(bpage->status == buf_page_t::NORMAL);
dblwr= request.node->space->use_doublewrite();
}
/* We do not need protect io_fix here by mutex to read it because /* We do not need protect io_fix here by mutex to read it because
this and buf_page_read_complete() are the only functions where we can this and buf_page_read_complete() are the only functions where we can
......
...@@ -3423,11 +3423,8 @@ fil_io_t fil_space_t::io(const IORequest &type, os_offset_t offset, size_t len, ...@@ -3423,11 +3423,8 @@ fil_io_t fil_space_t::io(const IORequest &type, os_offset_t offset, size_t len,
goto release_sync_write; goto release_sync_write;
} else { } else {
/* Queue the aio request */ /* Queue the aio request */
err = os_aio( err = os_aio(IORequest(bpage, node, type.type),
IORequest(type, node), buf, offset, len);
node->name, node->handle, buf, offset, len,
purpose != FIL_TYPE_TEMPORARY && srv_read_only_mode,
node, bpage);
} }
/* We an try to recover the page from the double write buffer if /* We an try to recover the page from the double write buffer if
...@@ -3452,62 +3449,44 @@ fil_io_t fil_space_t::io(const IORequest &type, os_offset_t offset, size_t len, ...@@ -3452,62 +3449,44 @@ fil_io_t fil_space_t::io(const IORequest &type, os_offset_t offset, size_t len,
#include <tpool.h> #include <tpool.h>
/** Callback for AIO completion */ /** Callback for AIO completion */
void fil_aio_callback(os_aio_userdata_t *data) void fil_aio_callback(const IORequest &request)
{ {
ut_ad(fil_validate_skip()); ut_ad(fil_validate_skip());
ut_ad(request.node);
fil_node_t *node= data->node; if (!request.bpage)
if (UNIV_UNLIKELY(!node))
{ {
ut_ad(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS);
return;
}
buf_page_t *bpage= static_cast<buf_page_t*>(data->message);
if (!bpage)
{
/* Asynchronous single page writes from the doublewrite buffer,
or calls from buf_flush_freed_page() don't have access to the page. */
ut_ad(data->type.is_write());
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
ut_ad(request.type == IORequest::WRITE_ASYNC);
write_completed: write_completed:
node->complete_write(); request.node->complete_write();
} }
else if (data->type.is_write()) else if (request.is_write())
{ {
ut_ad(!srv_read_only_mode || node->space->purpose == FIL_TYPE_TEMPORARY); buf_page_write_complete(request);
bool dblwr= node->space->use_doublewrite();
if (dblwr && bpage->status == buf_page_t::INIT_ON_FLUSH)
{
bpage->status= buf_page_t::NORMAL;
dblwr= false;
}
buf_page_write_complete(bpage, data->type, dblwr);
goto write_completed; goto write_completed;
} }
else else
{ {
ut_ad(data->type.is_read()); ut_ad(request.is_read());
/* IMPORTANT: since i/o handling for reads will read also the insert /* IMPORTANT: since i/o handling for reads will read also the insert
buffer in fil_system.sys_space, we have to be very careful not to buffer in fil_system.sys_space, we have to be very careful not to
introduce deadlocks. We never close the system tablespace (0) data introduce deadlocks. We never close fil_system.sys_space data
files via fil_system.LRU and we never issue asynchronous reads of files and never issue asynchronous reads of change buffer pages. */
change buffer pages. */ const page_id_t id(request.bpage->id());
const page_id_t id(bpage->id());
if (dberr_t err= buf_page_read_complete(bpage, *node)) if (dberr_t err= buf_page_read_complete(request.bpage, *request.node))
{ {
if (recv_recovery_is_on() && !srv_force_recovery) if (recv_recovery_is_on() && !srv_force_recovery)
recv_sys.found_corrupt_fs= true; recv_sys.found_corrupt_fs= true;
ib::error() << "Failed to read page " << id.page_no() ib::error() << "Failed to read page " << id.page_no()
<< " from file '" << node->name << "': " << err; << " from file '" << request.node->name << "': " << err;
} }
} }
node->space->release(); request.node->space->release();
} }
/** Flush to disk the writes in file spaces of the given type /** Flush to disk the writes in file spaces of the given type
......
...@@ -72,11 +72,8 @@ buf_flush_relocate_on_flush_list( ...@@ -72,11 +72,8 @@ buf_flush_relocate_on_flush_list(
buf_page_t* dpage); /*!< in/out: destination block */ buf_page_t* dpage); /*!< in/out: destination block */
/** Complete write of a file page from buf_pool. /** Complete write of a file page from buf_pool.
@param bpage written page @param request write request */
@param request write request void buf_page_write_complete(const IORequest &request);
@param dblwr whether the doublewrite buffer was used */
void buf_page_write_complete(buf_page_t *bpage, const IORequest &request,
bool dblwr);
/** Assign the full crc32 checksum for non-compressed page. /** Assign the full crc32 checksum for non-compressed page.
@param[in,out] page page to be updated */ @param[in,out] page page to be updated */
......
...@@ -208,12 +208,12 @@ class IORequest ...@@ -208,12 +208,12 @@ class IORequest
PUNCH_RANGE= WRITE_SYNC | 128, PUNCH_RANGE= WRITE_SYNC | 128,
}; };
constexpr IORequest(buf_page_t *bpage, fil_node_t *node, Type type) :
bpage(bpage), node(node), type(type) {}
constexpr IORequest(Type type= READ_SYNC, buf_page_t *bpage= nullptr) : constexpr IORequest(Type type= READ_SYNC, buf_page_t *bpage= nullptr) :
bpage(bpage), type(type) {} bpage(bpage), type(type) {}
constexpr IORequest(const IORequest &old, fil_node_t *node= nullptr) :
bpage(old.bpage), node(node), type(old.type) {}
bool is_read() const { return (type & READ_SYNC) != 0; } bool is_read() const { return (type & READ_SYNC) != 0; }
bool is_write() const { return (type & WRITE_SYNC) != 0; } bool is_write() const { return (type & WRITE_SYNC) != 0; }
bool is_LRU() const { return (type & (WRITE_LRU ^ WRITE_ASYNC)) != 0; } bool is_LRU() const { return (type & (WRITE_LRU ^ WRITE_ASYNC)) != 0; }
...@@ -243,7 +243,7 @@ class IORequest ...@@ -243,7 +243,7 @@ class IORequest
buf_page_t* const bpage= nullptr; buf_page_t* const bpage= nullptr;
/** File descriptor */ /** File descriptor */
const fil_node_t *const node= nullptr; fil_node_t *const node= nullptr;
/** Request type bit flags */ /** Request type bit flags */
const Type type; const Type type;
...@@ -608,12 +608,6 @@ The wrapper functions have the prefix of "innodb_". */ ...@@ -608,12 +608,6 @@ The wrapper functions have the prefix of "innodb_". */
# define os_file_close(file) \ # define os_file_close(file) \
pfs_os_file_close_func(file, __FILE__, __LINE__) pfs_os_file_close_func(file, __FILE__, __LINE__)
# define os_aio(type, name, file, buf, offset, \
n, read_only, message1, message2) \
pfs_os_aio_func(type, name, file, buf, offset, \
n, read_only, message1, message2, \
__FILE__, __LINE__)
# define os_file_read(type, file, buf, offset, n) \ # define os_file_read(type, file, buf, offset, n) \
pfs_os_file_read_func(type, file, buf, offset, n, __FILE__, __LINE__) pfs_os_file_read_func(type, file, buf, offset, n, __FILE__, __LINE__)
...@@ -793,42 +787,6 @@ pfs_os_file_read_no_error_handling_func( ...@@ -793,42 +787,6 @@ pfs_os_file_read_no_error_handling_func(
const char* src_file, const char* src_file,
uint src_line); uint src_line);
/** NOTE! Please use the corresponding macro os_aio(), not directly this
function!
Performance schema wrapper function of os_aio() which requests
an asynchronous I/O operation.
@param[in,out] type IO request context
@param[in] name Name of the file or path as NUL terminated
string
@param[in] file Open file handle
@param[out] buf buffer where to read
@param[in] offset file offset where to read
@param[in] n number of bytes to read
@param[in] read_only if true read only mode checks are enforced
@param[in,out] m1 Message for the AIO handler, (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@param[in,out] m2 message for the AIO handler (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@param[in] src_file file name where func invoked
@param[in] src_line line where the func invoked
@return DB_SUCCESS if request was queued successfully, FALSE if fail */
UNIV_INLINE
dberr_t
pfs_os_aio_func(
const IORequest&type,
const char* name,
pfs_os_file_t file,
void* buf,
os_offset_t offset,
ulint n,
bool read_only,
fil_node_t* m1,
void* m2,
const char* src_file,
uint src_line);
/** NOTE! Please use the corresponding macro os_file_write(), not directly /** NOTE! Please use the corresponding macro os_file_write(), not directly
this function! this function!
This is the performance schema instrumented wrapper function for This is the performance schema instrumented wrapper function for
...@@ -950,11 +908,6 @@ to original un-instrumented file I/O APIs */ ...@@ -950,11 +908,6 @@ to original un-instrumented file I/O APIs */
# define os_file_close(file) os_file_close_func(file) # define os_file_close(file) os_file_close_func(file)
# define os_aio(type, name, file, buf, offset, \
n, read_only, message1, message2) \
os_aio_func(type, name, file, buf, offset, \
n, read_only, message1, message2)
# define os_file_read(type, file, buf, offset, n) \ # define os_file_read(type, file, buf, offset, n) \
os_file_read_func(type, file, buf, offset, n) os_file_read_func(type, file, buf, offset, n)
...@@ -1202,48 +1155,14 @@ os_aio_init( ...@@ -1202,48 +1155,14 @@ os_aio_init(
Frees the asynchronous io system. */ Frees the asynchronous io system. */
void os_aio_free(); void os_aio_free();
struct os_aio_userdata_t /** Request a read or write.
{ @param type I/O request
fil_node_t* node; @param buf buffer
IORequest type; @param offset file offset
void* message; @param n number of bytes
@retval DB_SUCCESS if request was queued successfully
os_aio_userdata_t(fil_node_t*node, IORequest type, void*message) : @retval DB_IO_ERROR on I/O error */
node(node), type(type), message(message) {} dberr_t os_aio(const IORequest &type, void *buf, os_offset_t offset, size_t n);
/** Construct from tpool::aiocb::m_userdata[] */
os_aio_userdata_t(const char *buf) { memcpy((void*)this, buf, sizeof*this); }
};
/**
NOTE! Use the corresponding macro os_aio(), not directly this function!
Requests an asynchronous i/o operation.
@param[in,out] type IO request context
@param[in] name Name of the file or path as NUL terminated
string
@param[in] file Open file handle
@param[out] buf buffer where to read
@param[in] offset file offset where to read
@param[in] n number of bytes to read
@param[in] read_only if true read only mode checks are enforced
@param[in,out] m1 Message for the AIO handler, (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@param[in,out] m2 message for the AIO handler (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@return DB_SUCCESS or error code */
dberr_t
os_aio_func(
const IORequest&type,
const char* name,
pfs_os_file_t file,
void* buf,
os_offset_t offset,
ulint n,
bool read_only,
fil_node_t* m1,
void* m2);
/** Waits until there are no pending writes in os_aio_write_array. There can /** Waits until there are no pending writes in os_aio_write_array. There can
be other, synchronous, pending writes. */ be other, synchronous, pending writes. */
......
/***************************************************************************** /*****************************************************************************
Copyright (c) 2010, 2017, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2010, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2013, 2019, MariaDB Corporation. Copyright (c) 2013, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -201,59 +201,6 @@ pfs_os_file_close_func( ...@@ -201,59 +201,6 @@ pfs_os_file_close_func(
return(result); return(result);
} }
/** NOTE! Please use the corresponding macro os_aio(), not directly this
function!
Performance schema wrapper function of os_aio() which requests
an asynchronous i/o operation.
@param[in,type] type IO request context
@param[in] name Name of the file or path as NUL terminated
string
@param[in] file Open file handle
@param[out] buf buffer where to read
@param[in] offset file offset where to read
@param[in] n number of bytes to read
@param[in] read_only if true read only mode checks are enforced
@param[in,out] m1 Message for the AIO handler, (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@param[in,out] m2 message for the AIO handler (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@param[in] src_file file name where func invoked
@param[in] src_line line where the func invoked
@return DB_SUCCESS if request was queued successfully, FALSE if fail */
UNIV_INLINE
dberr_t
pfs_os_aio_func(
const IORequest&type,
const char* name,
pfs_os_file_t file,
void* buf,
os_offset_t offset,
ulint n,
bool read_only,
fil_node_t* m1,
void* m2,
const char* src_file,
uint src_line)
{
PSI_file_locker_state state;
struct PSI_file_locker* locker = NULL;
/* Register the read or write I/O depending on "type" */
register_pfs_file_io_begin(
&state, locker, file, n,
type.is_write() ? PSI_FILE_WRITE : PSI_FILE_READ,
src_file, src_line);
dberr_t result = os_aio_func(
type, name, file, buf, offset, n, read_only, m1, m2);
register_pfs_file_io_end(locker, n);
return(result);
}
/** NOTE! Please use the corresponding macro os_file_read(), not directly /** NOTE! Please use the corresponding macro os_file_read(), not directly
this function! this function!
This is the performance schema instrumented wrapper function for This is the performance schema instrumented wrapper function for
......
...@@ -3854,22 +3854,26 @@ os_file_get_status( ...@@ -3854,22 +3854,26 @@ os_file_get_status(
} }
extern void fil_aio_callback(os_aio_userdata_t *data); extern void fil_aio_callback(const IORequest &request);
static void io_callback(tpool::aiocb* cb) static void io_callback(tpool::aiocb* cb)
{ {
ut_a(cb->m_err == DB_SUCCESS); ut_a(cb->m_err == DB_SUCCESS);
os_aio_userdata_t data(cb->m_userdata); const IORequest request(*static_cast<const IORequest*>
/* Return cb back to cache*/ (static_cast<const void*>(cb->m_userdata)));
if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD) { /* Return cb back to cache*/
ut_ad(read_slots->contains(cb)); if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD)
read_slots->release(cb); {
} else { ut_ad(read_slots->contains(cb));
ut_ad(write_slots->contains(cb)); read_slots->release(cb);
write_slots->release(cb); }
} else
{
fil_aio_callback(&data); ut_ad(write_slots->contains(cb));
write_slots->release(cb);
}
fil_aio_callback(request);
} }
#ifdef LINUX_NATIVE_AIO #ifdef LINUX_NATIVE_AIO
...@@ -4052,91 +4056,82 @@ void os_aio_wait_until_no_pending_writes() ...@@ -4052,91 +4056,82 @@ void os_aio_wait_until_no_pending_writes()
tpool::tpool_wait_end(); tpool::tpool_wait_end();
} }
/** Request a read or write.
/** @param type I/O request
NOTE! Use the corresponding macro os_aio(), not directly this function! @param buf buffer
Requests an asynchronous i/o operation. @param offset file offset
@param[in,out] type IO request context @param n number of bytes
@param[in] name Name of the file or path as NUL terminated @retval DB_SUCCESS if request was queued successfully
string @retval DB_IO_ERROR on I/O error */
@param[in] file Open file handle dberr_t os_aio(const IORequest &type, void *buf, os_offset_t offset, size_t n)
@param[out] buf buffer where to read
@param[in] offset file offset where to read
@param[in] n number of bytes to read
@param[in] read_only if true read only mode checks are enforced
@param[in,out] m1 Message for the AIO handler, (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@param[in,out] m2 message for the AIO handler (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@return DB_SUCCESS or error code */
dberr_t
os_aio_func(
const IORequest&type,
const char* name,
pfs_os_file_t file,
void* buf,
os_offset_t offset,
ulint n,
bool read_only,
fil_node_t* m1,
void* m2)
{ {
ut_ad(n > 0); ut_ad(n > 0);
ut_ad((n % OS_FILE_LOG_BLOCK_SIZE) == 0); ut_ad((n % OS_FILE_LOG_BLOCK_SIZE) == 0);
ut_ad((offset % OS_FILE_LOG_BLOCK_SIZE) == 0); ut_ad((offset % OS_FILE_LOG_BLOCK_SIZE) == 0);
ut_ad(type.is_read() || type.is_write());
ut_ad(type.node);
ut_ad(type.node->is_open());
#ifdef WIN_ASYNC_IO #ifdef WIN_ASYNC_IO
ut_ad((n & 0xFFFFFFFFUL) == n); ut_ad((n & 0xFFFFFFFFUL) == n);
#endif /* WIN_ASYNC_IO */ #endif /* WIN_ASYNC_IO */
if (!type.is_async()) { #ifdef UNIV_PFS_IO
if (type.is_read()) { PSI_file_locker_state state;
return(os_file_read_func(type, file, buf, offset, n)); PSI_file_locker* locker= nullptr;
} register_pfs_file_io_begin(&state, locker, type.node->handle, n,
type.is_write()
ut_ad(type.is_write()); ? PSI_FILE_WRITE : PSI_FILE_READ,
__FILE__, __LINE__);
#endif /* UNIV_PFS_IO */
dberr_t err = DB_SUCCESS;
return(os_file_write_func(type, name, file, buf, offset, n)); if (!type.is_async()) {
err = type.is_read()
? os_file_read_func(type, type.node->handle,
buf, offset, n)
: os_file_write_func(type, type.node->name,
type.node->handle,
buf, offset, n);
func_exit:
#ifdef UNIV_PFS_IO
register_pfs_file_io_end(locker, n);
#endif /* UNIV_PFS_IO */
return err;
} }
if (type.is_read()) { if (type.is_read()) {
++os_n_file_reads; ++os_n_file_reads;
} else { } else {
ut_ad(type.is_write());
++os_n_file_writes; ++os_n_file_writes;
} }
compile_time_assert(sizeof(os_aio_userdata_t) <= tpool::MAX_AIO_USERDATA_LEN); compile_time_assert(sizeof(IORequest) <= tpool::MAX_AIO_USERDATA_LEN);
os_aio_userdata_t userdata{m1,type,m2};
io_slots* slots= type.is_read() ? read_slots : write_slots; io_slots* slots= type.is_read() ? read_slots : write_slots;
tpool::aiocb* cb = slots->acquire(); tpool::aiocb* cb = slots->acquire();
cb->m_buffer = buf; cb->m_buffer = buf;
cb->m_callback = (tpool::callback_func)io_callback; cb->m_callback = (tpool::callback_func)io_callback;
cb->m_group = slots->get_task_group(); cb->m_group = slots->get_task_group();
cb->m_fh = file.m_file; cb->m_fh = type.node->handle.m_file;
cb->m_len = (int)n; cb->m_len = (int)n;
cb->m_offset = offset; cb->m_offset = offset;
cb->m_opcode = type.is_read() ? tpool::aio_opcode::AIO_PREAD : tpool::aio_opcode::AIO_PWRITE; cb->m_opcode = type.is_read() ? tpool::aio_opcode::AIO_PREAD : tpool::aio_opcode::AIO_PWRITE;
memcpy(cb->m_userdata, &userdata, sizeof(userdata)); new (cb->m_userdata) IORequest{type};
ut_a(reinterpret_cast<size_t>(cb->m_buffer) % OS_FILE_LOG_BLOCK_SIZE ut_a(reinterpret_cast<size_t>(cb->m_buffer) % OS_FILE_LOG_BLOCK_SIZE
== 0); == 0);
ut_a(cb->m_len % OS_FILE_LOG_BLOCK_SIZE == 0); ut_a(cb->m_len % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_a(cb->m_offset % OS_FILE_LOG_BLOCK_SIZE == 0); ut_a(cb->m_offset % OS_FILE_LOG_BLOCK_SIZE == 0);
if (!srv_thread_pool->submit_io(cb)) if (srv_thread_pool->submit_io(cb)) {
return DB_SUCCESS; slots->release(cb);
os_file_handle_error(type.node->name, type.is_read()
slots->release(cb); ? "aio read" : "aio write");
err = DB_IO_ERROR;
os_file_handle_error(name, type.is_read() ? "aio read" : "aio write"); }
return(DB_IO_ERROR); goto func_exit;
} }
/** Prints info of the aio arrays. /** Prints info of the aio arrays.
......
...@@ -114,7 +114,7 @@ enum class aio_opcode ...@@ -114,7 +114,7 @@ enum class aio_opcode
AIO_PREAD, AIO_PREAD,
AIO_PWRITE AIO_PWRITE
}; };
const int MAX_AIO_USERDATA_LEN= 40; constexpr size_t MAX_AIO_USERDATA_LEN= 3 * sizeof(void*);
/** IO control block, includes parameters for the IO, and the callback*/ /** IO control block, includes parameters for the IO, and the callback*/
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment