Commit 7a9405e3 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-24270 Misuse of io_getevents() causes wake-ups at least twice per second

In the asynchronous I/O interface, InnoDB is invoking io_getevents()
with a timeout value of half a second, and requesting exactly 1 event
at a time.

The reason to have such a short timeout is to facilitate shutdown.

We can do better: Use an infinite timeout, wait for a larger maximum
number of events. On shutdown, we will invoke io_destroy(), which
should lead to the io_getevents system call reporting EINVAL.

my_getevents(): Reimplement the libaio io_getevents() by only invoking
the system call. The library implementation would try to elide the
system call and return 0 immediately if aio_ring_is_empty() holds.
Here, we do want a blocking system call, not 100% CPU usage. Neither
do we want the aio_ring_is_empty() trigger SIGSEGV because it is
dereferencing some memory that was freed by io_destroy().
parent 1b12e251
/* Copyright(C) 2019 MariaDB Corporation. /* Copyright (C) 2019, 2020, MariaDB Corporation.
This program is free software; you can redistribute itand /or modify This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -14,133 +14,133 @@ along with this program; if not, write to the Free Software ...@@ -14,133 +14,133 @@ along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h" #include "tpool_structs.h"
#include <stdlib.h>
#include <signal.h>
#include <assert.h>
#include "tpool.h" #include "tpool.h"
#include <thread>
#ifdef LINUX_NATIVE_AIO #ifdef LINUX_NATIVE_AIO
#include <libaio.h> # include <thread>
# include <atomic>
# include <libaio.h>
# include <sys/syscall.h>
/** A simpler alternative to io_getevents(), without
aio_ring_is_empty() that may trigger SIGSEGV */
static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
{
int saved_errno= errno;
int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
min_nr, nr, ev, 0);
if (ret < 0)
{
ret= -errno;
errno= saved_errno;
}
return ret;
}
#endif #endif
/* /*
Linux AIO implementation, based on native AIO. Linux AIO implementation, based on native AIO.
Needs libaio.h and -laio at the compile time. Needs libaio.h and -laio at the compile time.
submit_io() is used to submit async IO. io_submit() is used to submit async IO.
There is a single thread, that collects the completion notification A single thread will collect the completion notification
with io_getevent(), and forwards io completion callback with io_getevents() and forward io completion callback to
the worker threadpool. the worker threadpool.
*/ */
namespace tpool namespace tpool
{ {
#ifdef LINUX_NATIVE_AIO #ifdef LINUX_NATIVE_AIO
class aio_linux : public aio class aio_linux final : public aio
{ {
thread_pool* m_pool; thread_pool *m_pool;
io_context_t m_io_ctx; io_context_t m_io_ctx;
bool m_in_shutdown;
std::thread m_getevent_thread; std::thread m_getevent_thread;
static std::atomic<bool> shutdown_in_progress;
static void getevent_thread_routine(aio_linux* aio) static void getevent_thread_routine(aio_linux *aio)
{ {
io_event events[1];
for (;;) for (;;)
{ {
io_event event; switch (int ret= my_getevents(aio->m_io_ctx, 1, 1, events)) {
struct timespec ts{0, 500000000}; case -EINTR:
int ret = io_getevents(aio->m_io_ctx, 1, 1, &event, &ts); case 0:
continue;
if (aio->m_in_shutdown) case -EINVAL:
break; if (shutdown_in_progress)
return;
if (ret > 0) /* fall through */
{ default:
aiocb* iocb = (aiocb*)event.obj; if (ret != 1)
long long res = event.res;
if (res < 0)
{ {
iocb->m_err = static_cast<int>(-res); fprintf(stderr, "io_getevents returned %d\n", ret);
iocb->m_ret_len = 0; abort();
return;
} }
else else
{ {
iocb->m_ret_len = ret; const io_event &event= events[0];
iocb->m_err = 0; aiocb *iocb= static_cast<aiocb*>(event.obj);
if (static_cast<int>(event.res) < 0)
{
iocb->m_err= -event.res;
iocb->m_ret_len= 0;
}
else
{
iocb->m_ret_len= event.res;
iocb->m_err= 0;
}
iocb->m_internal_task.m_func= iocb->m_callback;
iocb->m_internal_task.m_arg= iocb;
iocb->m_internal_task.m_group= iocb->m_group;
aio->m_pool->submit_task(&iocb->m_internal_task);
} }
iocb->m_internal_task.m_func = iocb->m_callback;
iocb->m_internal_task.m_arg = iocb;
iocb->m_internal_task.m_group = iocb->m_group;
aio->m_pool->submit_task(&iocb->m_internal_task);
continue;
}
switch (ret)
{
case -EAGAIN:
usleep(1000);
continue;
case -EINTR:
case 0:
continue;
default:
fprintf(stderr, "io_getevents returned %d\n", ret);
abort();
} }
} }
} }
public: public:
aio_linux(io_context_t ctx, thread_pool* pool) aio_linux(io_context_t ctx, thread_pool *pool)
: m_pool(pool), m_io_ctx(ctx), : m_pool(pool), m_io_ctx(ctx),
m_in_shutdown(), m_getevent_thread(getevent_thread_routine, this) m_getevent_thread(getevent_thread_routine, this)
{ {
} }
~aio_linux() ~aio_linux()
{ {
m_in_shutdown = true; shutdown_in_progress= true;
m_getevent_thread.join();
io_destroy(m_io_ctx); io_destroy(m_io_ctx);
m_getevent_thread.join();
shutdown_in_progress= false;
} }
// Inherited via aio int submit_io(aiocb *cb) override
virtual int submit_io(aiocb* cb) override
{ {
io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len,
if (cb->m_opcode == aio_opcode::AIO_PREAD) cb->m_offset);
io_prep_pread((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len, if (cb->m_opcode != aio_opcode::AIO_PREAD)
cb->m_offset); cb->aio_lio_opcode= IO_CMD_PWRITE;
else iocb *icb= static_cast<iocb*>(cb);
io_prep_pwrite((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len, int ret= io_submit(m_io_ctx, 1, &icb);
cb->m_offset);
int ret;
ret = io_submit(m_io_ctx, 1, (iocb * *)& cb);
if (ret == 1) if (ret == 1)
return 0; return 0;
errno = -ret; errno= -ret;
return -1; return -1;
} }
// Inherited via aio int bind(native_file_handle&) override { return 0; }
virtual int bind(native_file_handle& fd) override int unbind(const native_file_handle&) override { return 0; }
{
return 0;
}
virtual int unbind(const native_file_handle& fd) override
{
return 0;
}
}; };
aio* create_linux_aio(thread_pool* pool, int max_io) std::atomic<bool> aio_linux::shutdown_in_progress;
aio *create_linux_aio(thread_pool *pool, int max_io)
{ {
io_context_t ctx; io_context_t ctx;
memset(&ctx, 0, sizeof(ctx)); memset(&ctx, 0, sizeof ctx);
int ret = io_setup(max_io, &ctx); if (int ret= io_setup(max_io, &ctx))
if (ret)
{ {
fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret); fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
return nullptr; return nullptr;
...@@ -148,9 +148,6 @@ aio* create_linux_aio(thread_pool* pool, int max_io) ...@@ -148,9 +148,6 @@ aio* create_linux_aio(thread_pool* pool, int max_io)
return new aio_linux(ctx, pool); return new aio_linux(ctx, pool);
} }
#else #else
aio* create_linux_aio(thread_pool* pool, int max_aio) aio *create_linux_aio(thread_pool*, int) { return nullptr; }
{
return nullptr;
}
#endif #endif
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment