Commit 00ee8d85 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-16264: Add threadpool library

The library is capable of
- asynchronous execution of tasks (and optionally waiting for them)
- asynchronous file IO
  This is implemented using libaio on Linux and completion ports on
  Windows. Elsewhere, async io is "simulated", which means worker threads
  are performing synchronous IO.
- timers, scheduling work asynchronously in some point of the future.
  Also periodic timers are implemented.
parent 7e08dd85
......@@ -420,6 +420,7 @@ ADD_SUBDIRECTORY(client)
ADD_SUBDIRECTORY(extra)
ADD_SUBDIRECTORY(libservices)
ADD_SUBDIRECTORY(sql/share)
ADD_SUBDIRECTORY(tpool)
IF(NOT WITHOUT_SERVER)
ADD_SUBDIRECTORY(tests)
......
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
IF(WIN32)
SET(EXTRA_SOURCES tpool_win.cc aio_win.cc)
ELSE()
SET(EXTRA_SOURCES aio_linux.cc)
ENDIF()
IF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
CHECK_INCLUDE_FILES (libaio.h HAVE_LIBAIO_H)
CHECK_LIBRARY_EXISTS(aio io_queue_init "" HAVE_LIBAIO)
IF(HAVE_LIBAIO_H AND HAVE_LIBAIO)
ADD_DEFINITIONS(-DLINUX_NATIVE_AIO=1)
LINK_LIBRARIES(aio)
ENDIF()
ENDIF()
ADD_LIBRARY(tpool STATIC
aio_simulated.cc
tpool_structs.h
CMakeLists.txt
tpool.h
tpool_generic.cc
task_group.cc
task.cc
${EXTRA_SOURCES}
)
INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/include)
\ No newline at end of file
/* Copyright(C) 2019 MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h"
#include <stdlib.h>
#include <signal.h>
#include <assert.h>
#include "tpool.h"
#include <thread>
#ifdef LINUX_NATIVE_AIO
#include <libaio.h>
#endif
/*
Linux AIO implementation, based on native AIO.
Needs libaio.h and -laio at the compile time.
submit_io() is used to submit async IO.
There is a single thread, that collects the completion notification
with io_getevent(), and forwards io completion callback
the worker threadpool.
*/
namespace tpool
{
#ifdef LINUX_NATIVE_AIO
class aio_linux : public aio
{
int m_max_io_count;
thread_pool* m_pool;
io_context_t m_io_ctx;
bool m_in_shutdown;
std::thread m_getevent_thread;
static void getevent_thread_routine(aio_linux* aio)
{
for (;;)
{
io_event event;
struct timespec ts{0, 500000000};
int ret = io_getevents(aio->m_io_ctx, 1, 1, &event, &ts);
if (aio->m_in_shutdown)
break;
if (ret > 0)
{
aiocb* iocb = (aiocb*)event.obj;
long long res = event.res;
if (res < 0)
{
iocb->m_err = -res;
iocb->m_ret_len = 0;
}
else
{
iocb->m_ret_len = ret;
iocb->m_err = 0;
}
iocb->m_internal_task.m_func = iocb->m_callback;
iocb->m_internal_task.m_arg = iocb;
iocb->m_internal_task.m_group = iocb->m_group;
aio->m_pool->submit_task(&iocb->m_internal_task);
continue;
}
switch (ret)
{
case -EAGAIN:
usleep(1000);
continue;
case -EINTR:
case 0:
continue;
default:
fprintf(stderr, "io_getevents returned %d\n", ret);
abort();
}
}
}
public:
aio_linux(io_context_t ctx, thread_pool* pool, size_t max_count)
: m_max_io_count(max_count), m_pool(pool), m_io_ctx(ctx),
m_in_shutdown(), m_getevent_thread(getevent_thread_routine, this)
{
}
~aio_linux()
{
m_in_shutdown = true;
m_getevent_thread.join();
io_destroy(m_io_ctx);
}
// Inherited via aio
virtual int submit_io(aiocb* cb) override
{
if (cb->m_opcode == aio_opcode::AIO_PREAD)
io_prep_pread((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len,
cb->m_offset);
else
io_prep_pwrite((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len,
cb->m_offset);
int ret;
ret = io_submit(m_io_ctx, 1, (iocb * *)& cb);
if (ret == 1)
return 0;
errno = -ret;
return -1;
}
// Inherited via aio
virtual int bind(native_file_handle& fd) override
{
return 0;
}
virtual int unbind(const native_file_handle& fd) override
{
return 0;
}
};
aio* create_linux_aio(thread_pool* pool, int max_io)
{
io_context_t ctx;
memset(&ctx, 0, sizeof(ctx));
int ret = io_setup(max_io, &ctx);
if (ret)
{
fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
return nullptr;
}
return new aio_linux(ctx, pool, max_io);
}
#else
aio* create_linux_aio(thread_pool* pool, int max_aio)
{
return nullptr;
}
#endif
}
/* Copyright(C) 2019 MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#ifndef _WIN32
#include <unistd.h> /* pread(), pwrite() */
#endif
#include "tpool.h"
#include "tpool_structs.h"
#include <stdlib.h>
#include <string.h>
namespace tpool
{
#ifdef _WIN32
/*
In order to be able to execute synchronous IO even on file opened
with FILE_FLAG_OVERLAPPED, and to bypass to completion port,
we use valid event handle for the hEvent member of the OVERLAPPED structure,
with its low-order bit set.
See MSDN docs for GetQueuedCompletionStatus() for description of this trick.
*/
static DWORD fls_sync_io= FLS_OUT_OF_INDEXES;
HANDLE win_get_syncio_event()
{
HANDLE h;
h= (HANDLE) FlsGetValue(fls_sync_io);
if (h)
{
return h;
}
h= CreateEventA(NULL, FALSE, FALSE, NULL);
/* Set low-order bit to keeps I/O completion from being queued */
h= (HANDLE)((uintptr_t) h | 1);
FlsSetValue(fls_sync_io, h);
return h;
}
#include <WinIoCtl.h>
static void __stdcall win_free_syncio_event(void *data)
{
if (data)
{
CloseHandle((HANDLE) data);
}
}
struct WinIoInit
{
WinIoInit()
{
fls_sync_io= FlsAlloc(win_free_syncio_event);
if(fls_sync_io == FLS_OUT_OF_INDEXES)
abort();
}
~WinIoInit() { FlsFree(fls_sync_io); }
};
static WinIoInit win_io_init;
int pread(const native_file_handle &h, void *buf, size_t count,
unsigned long long offset)
{
OVERLAPPED ov{};
ULARGE_INTEGER uli;
uli.QuadPart= offset;
ov.Offset= uli.LowPart;
ov.OffsetHigh= uli.HighPart;
ov.hEvent= win_get_syncio_event();
if (ReadFile(h, buf, (DWORD) count, 0, &ov) ||
(GetLastError() == ERROR_IO_PENDING))
{
DWORD n_bytes;
if (GetOverlappedResult(h, &ov, &n_bytes, TRUE))
return n_bytes;
}
return -1;
}
int pwrite(const native_file_handle &h, void *buf, size_t count,
unsigned long long offset)
{
OVERLAPPED ov{};
ULARGE_INTEGER uli;
uli.QuadPart= offset;
ov.Offset= uli.LowPart;
ov.OffsetHigh= uli.HighPart;
ov.hEvent= win_get_syncio_event();
if (WriteFile(h, buf, (DWORD) count, 0, &ov) ||
(GetLastError() == ERROR_IO_PENDING))
{
DWORD n_bytes;
if (GetOverlappedResult(h, &ov, &n_bytes, TRUE))
return n_bytes;
}
return -1;
}
#endif
/**
Simulated AIO.
Executes IO synchronously in worker pool
and then calls the completion routine.
*/
class simulated_aio : public aio
{
thread_pool *m_pool;
public:
simulated_aio(thread_pool *tp)
: m_pool(tp)
{
}
static void simulated_aio_callback(void *param)
{
aiocb *cb= (aiocb *) param;
int ret_len;
int err= 0;
switch (cb->m_opcode)
{
case aio_opcode::AIO_PREAD:
ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
break;
case aio_opcode::AIO_PWRITE:
ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
break;
default:
abort();
}
if (ret_len < 0)
{
#ifdef _WIN32
err= GetLastError();
#else
err= errno;
#endif
}
cb->m_ret_len = ret_len;
cb->m_err = err;
cb->m_callback(cb);
}
virtual int submit_io(aiocb *aiocb) override
{
aiocb->m_internal_task.m_func = simulated_aio_callback;
aiocb->m_internal_task.m_arg = aiocb;
aiocb->m_internal_task.m_group = aiocb->m_group;
m_pool->submit_task(&aiocb->m_internal_task);
return 0;
}
virtual int bind(native_file_handle &fd) override { return 0; }
virtual int unbind(const native_file_handle &fd) override { return 0; }
};
aio *create_simulated_aio(thread_pool *tp)
{
return new simulated_aio(tp);
}
} // namespace tpool
\ No newline at end of file
/* Copyright(C) 2019 MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h"
#include <algorithm>
#include <assert.h>
#include <condition_variable>
#include <iostream>
#include <limits.h>
#include <mutex>
#include <queue>
#include <stack>
#include <thread>
#include <vector>
#include <tpool.h>
namespace tpool
{
/*
Windows AIO implementation, completion port based.
A single thread collects the completion notification with
GetQueuedCompletionStatus(), and forwards io completion callback
the worker threadpool
*/
class tpool_generic_win_aio : public aio
{
/* Thread that does collects completion status from the completion port. */
std::thread m_thread;
/* IOCP Completion port.*/
HANDLE m_completion_port;
/* The worker pool where completion routine is executed, as task. */
thread_pool* m_pool;
public:
tpool_generic_win_aio(thread_pool* pool, int max_io) : m_pool(pool)
{
m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
m_thread = std::thread(aio_completion_thread_proc, this);
}
/**
Task to be executed in the work pool.
*/
static void io_completion_task(void* data)
{
auto cb = (aiocb*)data;
cb->execute_callback();
}
void completion_thread_work()
{
for (;;)
{
DWORD n_bytes;
aiocb* aiocb;
ULONG_PTR key;
if (!GetQueuedCompletionStatus(m_completion_port, &n_bytes, &key,
(LPOVERLAPPED*)& aiocb, INFINITE))
break;
aiocb->m_err = 0;
aiocb->m_ret_len = n_bytes;
if (n_bytes != aiocb->m_len)
{
if (GetOverlappedResult(aiocb->m_fh, aiocb,
(LPDWORD)& aiocb->m_ret_len, FALSE))
{
aiocb->m_err = GetLastError();
}
}
aiocb->m_internal_task.m_func = aiocb->m_callback;
aiocb->m_internal_task.m_arg = aiocb;
aiocb->m_internal_task.m_group = aiocb->m_group;
m_pool->submit_task(&aiocb->m_internal_task);
}
}
static void aio_completion_thread_proc(tpool_generic_win_aio* aio)
{
aio->completion_thread_work();
}
~tpool_generic_win_aio()
{
if (m_completion_port)
CloseHandle(m_completion_port);
m_thread.join();
}
virtual int submit_io(aiocb* cb) override
{
memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED));
cb->m_internal = this;
ULARGE_INTEGER uli;
uli.QuadPart = cb->m_offset;
cb->Offset = uli.LowPart;
cb->OffsetHigh = uli.HighPart;
BOOL ok;
if (cb->m_opcode == aio_opcode::AIO_PREAD)
ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
else
ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
if (ok || (GetLastError() == ERROR_IO_PENDING))
return 0;
return -1;
}
// Inherited via aio
virtual int bind(native_file_handle& fd) override
{
return CreateIoCompletionPort(fd, m_completion_port, 0, 0) ? 0
: GetLastError();
}
virtual int unbind(const native_file_handle& fd) override { return 0; }
};
aio* create_win_aio(thread_pool* pool, int max_io)
{
return new tpool_generic_win_aio(pool, max_io);
}
} // namespace tpool
/* Copyright(C) 2019 MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include <tpool.h>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <tpool_structs.h>
namespace tpool
{
task::task(callback_func func, void* arg, task_group* group) :
m_func(func), m_arg(arg), m_group(group) {}
void task::execute()
{
if (m_group)
{
/* Executing in a group (limiting concurrency).*/
m_group->execute(this);
}
else
{
/* Execute directly. */
m_func(m_arg);
release();
}
}
/* Task that provide wait() operation. */
waitable_task::waitable_task(callback_func func, void* arg, task_group* group) :
task(func,arg, group),m_mtx(),m_cv(),m_ref_count(),m_waiter_count(){}
void waitable_task::add_ref()
{
std::unique_lock<std::mutex> lk(m_mtx);
m_ref_count++;
}
void waitable_task::release()
{
std::unique_lock<std::mutex> lk(m_mtx);
m_ref_count--;
if (!m_ref_count && m_waiter_count)
m_cv.notify_all();
}
void waitable_task::wait()
{
std::unique_lock<std::mutex> lk(m_mtx);
m_waiter_count++;
while (m_ref_count)
m_cv.wait(lk);
m_waiter_count--;
}
}
\ No newline at end of file
/* Copyright(C) 2019 MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include <tpool.h>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <tpool_structs.h>
#include <thread>
namespace tpool
{
task_group::task_group(unsigned int max_concurrency) :
m_queue(8),
m_mtx(),
m_tasks_running(),
m_max_concurrent_tasks(max_concurrency)
{};
void task_group::set_max_tasks(unsigned int max_concurrency)
{
std::unique_lock<std::mutex> lk(m_mtx);
m_max_concurrent_tasks = max_concurrency;
}
void task_group::execute(task* t)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (m_tasks_running == m_max_concurrent_tasks)
{
/* Queue for later execution by another thread.*/
m_queue.push(t);
return;
}
m_tasks_running++;
for (;;)
{
lk.unlock();
if (t)
{
t->m_func(t->m_arg);
t->release();
}
lk.lock();
if (m_queue.empty())
break;
t = m_queue.front();
m_queue.pop();
}
m_tasks_running--;
}
void task_group::cancel_pending(task* t)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (!t)
m_queue.clear();
for (auto it = m_queue.begin(); it != m_queue.end(); it++)
{
if (*it == t)
{
(*it)->release();
(*it) = nullptr;
}
}
}
task_group::~task_group()
{
std::unique_lock<std::mutex> lk(m_mtx);
m_queue.clear();
while (m_tasks_running)
{
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
lk.lock();
}
}
}
\ No newline at end of file
/* Copyright(C) 2019 MariaDB
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#pragma once
#include <memory> /* unique_ptr */
#include <condition_variable>
#include <mutex>
#include <atomic>
#include <tpool_structs.h>
#ifdef LINUX_NATIVE_AIO
#include <libaio.h>
#endif
#ifdef _WIN32
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <windows.h>
/**
Windows-specific native file handle struct.
Apart from the actual handle, contains PTP_IO
used by the Windows threadpool.
*/
struct native_file_handle
{
HANDLE m_handle;
PTP_IO m_ptp_io;
native_file_handle(){};
native_file_handle(HANDLE h) : m_handle(h), m_ptp_io() {}
operator HANDLE() const { return m_handle; }
};
#else
#include <unistd.h>
typedef int native_file_handle;
#endif
namespace tpool
{
/**
Task callback function
*/
typedef void (*callback_func)(void *);
class task;
/* A class that can be used e.g for
restricting concurrency for specific class of tasks. */
class task_group
{
private:
circular_queue<task*> m_queue;
std::mutex m_mtx;
std::condition_variable m_cv;
unsigned int m_tasks_running;
unsigned int m_max_concurrent_tasks;
public:
task_group(unsigned int max_concurrency= 100000);
void set_max_tasks(unsigned int max_concurrent_tasks);
void execute(task* t);
void cancel_pending(task *t);
~task_group();
};
class task
{
public:
callback_func m_func;
void *m_arg;
task_group* m_group;
virtual void add_ref() {};
virtual void release() {};
task() {};
task(callback_func func, void* arg, task_group* group = nullptr);
void* get_arg() { return m_arg; }
callback_func get_func() { return m_func; }
virtual void execute();
virtual ~task() {}
};
class waitable_task :public task
{
std::mutex m_mtx;
std::condition_variable m_cv;
int m_ref_count;
int m_waiter_count;
public:
waitable_task(callback_func func, void* arg, task_group* group = nullptr);
void add_ref() override;
void release() override;
bool is_running() { return m_ref_count > 0; }
bool get_ref_count() {return m_ref_count;}
void wait();
virtual ~waitable_task() {};
};
enum class aio_opcode
{
AIO_PREAD,
AIO_PWRITE
};
const int MAX_AIO_USERDATA_LEN= 40;
struct aiocb;
/** IO control block, includes parameters for the IO, and the callback*/
struct aiocb
#ifdef _WIN32
:OVERLAPPED
#elif defined LINUX_NATIVE_AIO
:iocb
#endif
{
native_file_handle m_fh;
aio_opcode m_opcode;
unsigned long long m_offset;
void *m_buffer;
unsigned int m_len;
callback_func m_callback;
task_group* m_group;
/* Returned length and error code*/
int m_ret_len;
int m_err;
void *m_internal;
task m_internal_task;
char m_userdata[MAX_AIO_USERDATA_LEN];
aiocb() : m_internal_task(nullptr, nullptr)
{}
void execute_callback()
{
task t(m_callback, this,m_group);
t.execute();
}
};
/**
AIO interface
*/
class aio
{
public:
/**
Submit asyncronous IO.
On completion, cb->m_callback is executed.
*/
virtual int submit_io(aiocb *cb)= 0;
/** "Bind" file to AIO handler (used on Windows only) */
virtual int bind(native_file_handle &fd)= 0;
/** "Unind" file to AIO handler (used on Windows only) */
virtual int unbind(const native_file_handle &fd)= 0;
virtual ~aio(){};
};
class timer
{
public:
virtual void set_time(int initial_delay_ms, int period_ms) = 0;
virtual void disarm() = 0;
virtual ~timer(){}
};
class thread_pool;
extern aio *create_simulated_aio(thread_pool *tp);
class thread_pool
{
protected:
/* AIO handler */
std::unique_ptr<aio> m_aio;
virtual aio *create_native_aio(int max_io)= 0;
/**
Functions to be called at worker thread start/end
can be used for example to set some TLS variables
*/
void (*m_worker_init_callback)(void);
void (*m_worker_destroy_callback)(void);
public:
thread_pool() : m_aio(), m_worker_init_callback(), m_worker_destroy_callback()
{
}
virtual void submit_task(task *t)= 0;
virtual timer* create_timer(callback_func func, void *data=nullptr) = 0;
void set_thread_callbacks(void (*init)(), void (*destroy)())
{
m_worker_init_callback= init;
m_worker_destroy_callback= destroy;
}
int configure_aio(bool use_native_aio, int max_io)
{
if (use_native_aio)
m_aio.reset(create_native_aio(max_io));
if (!m_aio)
m_aio.reset(create_simulated_aio(this));
return !m_aio ? -1 : 0;
}
void disable_aio()
{
m_aio.reset();
}
int bind(native_file_handle &fd) { return m_aio->bind(fd); }
void unbind(const native_file_handle &fd) { m_aio->unbind(fd); }
int submit_io(aiocb *cb) { return m_aio->submit_io(cb); }
virtual ~thread_pool() {}
};
const int DEFAULT_MIN_POOL_THREADS= 1;
const int DEFAULT_MAX_POOL_THREADS= 500;
extern thread_pool *
create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS,
int max_threads= DEFAULT_MAX_POOL_THREADS);
#ifdef _WIN32
extern thread_pool *
create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS,
int max_threads= DEFAULT_MAX_POOL_THREADS);
/*
Helper functions, to execute pread/pwrite even if file is
opened with FILE_FLAG_OVERLAPPED, and bound to completion
port.
*/
int pwrite(const native_file_handle &h, void *buf, size_t count,
unsigned long long offset);
int pread(const native_file_handle &h, void *buf, size_t count,
unsigned long long offset);
HANDLE win_get_syncio_event();
#endif
} // namespace tpool
/* Copyright(C) 2019 MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h"
#include <limits.h>
#include <algorithm>
#include <assert.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <limits.h>
#include <mutex>
#include <queue>
#include <stack>
#include <thread>
#include <vector>
#include "tpool.h"
#include <assert.h>
#include <my_global.h>
#include <my_dbug.h>
#include <thr_timer.h>
#include <stdlib.h>
namespace tpool
{
#ifdef __linux__
extern aio* create_linux_aio(thread_pool* tp, int max_io);
#endif
#ifdef _WIN32
extern aio* create_win_aio(thread_pool* tp, int max_io);
#endif
static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
static const int OVERSUBSCRIBE_FACTOR = 2;
/**
Implementation of generic threadpool.
This threadpool consists of the following components
- The task queue. This queue is populated by submit()
- Worker that execute the work items.
- Timer thread that takes care of pool health
The task queue is populated by submit() method.
on submit(), a worker thread can be woken, or created
to execute tasks.
The timer thread watches if work items are being dequeued, and if not,
this can indicate potential deadlock.
Thus the timer thread can also wake or create a thread, to ensure some progress.
Optimizations:
- worker threads that are idle for long time will shutdown.
- worker threads are woken in LIFO order, which minimizes context switching
and also ensures that idle timeout works well. LIFO wakeup order ensures
that active threads stay active, and idle ones stay idle.
- to minimize spurious wakeups, some items are not put into the queue. Instead
submit() will pass the data directly to the thread it woke up.
*/
/**
Worker wakeup flags.
*/
enum worker_wake_reason
{
WAKE_REASON_NONE,
WAKE_REASON_TASK,
WAKE_REASON_SHUTDOWN
};
/* A per-worker thread structure.*/
struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data
{
/** Condition variable to wakeup this worker.*/
std::condition_variable m_cv;
/** Reason why worker was woken. */
worker_wake_reason m_wake_reason;
/**
If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute.
*/
task* m_task;
/** Struct is member of intrusive doubly linked list */
worker_data* m_prev;
worker_data* m_next;
/* Current state of the worker.*/
enum state
{
NONE = 0,
EXECUTING_TASK = 1,
LONG_TASK = 2
};
int m_state;
bool is_executing_task()
{
return m_state & EXECUTING_TASK;
}
bool is_long_task()
{
return m_state & LONG_TASK;
}
std::chrono::system_clock::time_point m_task_start_time;
worker_data() :
m_cv(),
m_wake_reason(WAKE_REASON_NONE),
m_task(),
m_prev(),
m_next(),
m_state(NONE),
m_task_start_time()
{}
/*Define custom new/delete because of overaligned structure. */
void* operator new(size_t size)
{
#ifdef _WIN32
return _aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
#else
void* ptr;
int ret = posix_memalign(&ptr, CPU_LEVEL1_DCACHE_LINESIZE, size);
return ret ? 0 : ptr;
#endif
}
void operator delete(void* p)
{
#ifdef _WIN32
_aligned_free(p);
#else
free(p);
#endif
}
};
class thread_pool_generic : public thread_pool
{
/** Cache for per-worker structures */
cache<worker_data> m_thread_data_cache;
/** The task queue */
circular_queue<task*> m_task_queue;
/* List of standby (idle) workers.*/
doubly_linked_list<worker_data> m_standby_threads;
/** List of threads that are executing tasks. */
doubly_linked_list<worker_data> m_active_threads;
/* Mutex that protects the whole struct, most importantly
the standby threads list, and task queue. */
std::mutex m_mtx;
/** Timeout after which idle worker shuts down.*/
std::chrono::milliseconds m_thread_timeout;
/** How often should timer wakeup.*/
std::chrono::milliseconds m_timer_interval;
/** Another condition variable, used in pool shutdown-*/
std::condition_variable m_cv_no_threads;
/** Condition variable for the timer thread. Signaled on shutdown.*/
std::condition_variable m_cv_timer;
/** Overall number of enqueues*/
unsigned long long m_tasks_enqueued;
/** Overall number of dequeued tasks. */
unsigned long long m_tasks_dequeued;
/**Statistic related, number of worker thread wakeups.*/
int m_wakeups;
/**
Statistic related, number of spurious thread wakeups
(i.e thread woke up, and the task queue is empty)
*/
int m_spurious_wakeups;
/** The desired concurrency. This number of workers should be actively executing.*/
unsigned int m_concurrency;
/** True, if threadpool is being shutdown, false otherwise */
bool m_in_shutdown;
/** time point when timer last ran, used as a coarse clock. */
std::chrono::system_clock::time_point m_timestamp;
/** Number of long running tasks. The long running tasks are excluded when
adjusting concurrency */
int m_long_tasks_count;
/** Last time thread was created*/
std::chrono::system_clock::time_point m_last_thread_creation;
/** Minimumum number of threads in this pool.*/
unsigned int m_min_threads;
/** Maximimum number of threads in this pool. */
unsigned int m_max_threads;
/* Maintainence related statistics (see maintainence()) */
size_t m_last_thread_count;
unsigned long long m_last_activity;
std::unique_ptr<timer> m_maintaince_timer_task;
void worker_main(worker_data *thread_data);
void worker_end(worker_data* thread_data);
/* Checks threadpool responsiveness, adjusts thread_counts */
void maintainence();
static void maintainence_func(void* arg)
{
((thread_pool_generic *)arg)->maintainence();
}
bool add_thread();
bool wake(worker_wake_reason reason, task *t = nullptr);
void wake_or_create_thread();
bool get_task(worker_data *thread_var, task **t);
bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
worker_data *thread_var);
void cancel_pending(task* t);
size_t thread_count()
{
return m_active_threads.size() + m_standby_threads.size();
}
public:
thread_pool_generic(int min_threads, int max_threads);
~thread_pool_generic();
void submit_task(task *task) override;
virtual aio *create_native_aio(int max_io) override
{
#ifdef _WIN32
return create_win_aio(this, max_io);
#elif defined(__linux__)
return create_linux_aio(this,max_io);
#else
return nullptr;
#endif
}
class timer_generic : public thr_timer_t, public timer
{
thread_pool_generic* m_pool;
waitable_task m_task;
callback_func m_callback;
void* m_data;
int m_period;
std::mutex m_mtx;
bool m_on;
std::atomic<bool> m_running;
void run()
{
/*
In rare cases, multiple callbacks can be scheduled,
e.g with set_time(0,0) in a loop.
We do not allow parallel execution, as user is not prepared.
*/
bool expected = false;
if (!m_running.compare_exchange_strong(expected, true))
return;
m_callback(m_data);
m_running = false;
if (m_pool && m_period)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (m_on)
{
thr_timer_end(this);
thr_timer_settime(this, 1000ULL * m_period);
}
}
}
static void execute(void* arg)
{
auto timer = (timer_generic*)arg;
timer->run();
}
static void submit_task(void* arg)
{
timer_generic* timer = (timer_generic*)arg;
timer->m_pool->submit_task(&timer->m_task);
}
public:
timer_generic(callback_func func, void* data, thread_pool_generic * pool):
m_pool(pool),
m_task(timer_generic::execute,this),
m_callback(func),m_data(data),m_period(0),m_mtx(),
m_on(true),m_running()
{
if (pool)
{
/* EXecute callback in threadpool*/
thr_timer_init(this, submit_task, this);
}
else
{
/* run in "timer" thread */
thr_timer_init(this, m_task.get_func(), m_task.get_arg());
}
}
void set_time(int initial_delay_ms, int period_ms) override
{
std::unique_lock<std::mutex> lk(m_mtx);
if (!m_on)
return;
thr_timer_end(this);
if (!m_pool)
thr_timer_set_period(this, 1000ULL * period_ms);
else
m_period = period_ms;
thr_timer_settime(this, 1000ULL * initial_delay_ms);
}
void disarm() override
{
std::unique_lock<std::mutex> lk(m_mtx);
m_on = false;
thr_timer_end(this);
lk.unlock();
if (m_task.m_group)
{
m_task.m_group->cancel_pending(&m_task);
}
if (m_pool)
{
m_pool->cancel_pending(&m_task);
}
m_task.wait();
}
virtual ~timer_generic()
{
disarm();
}
};
virtual timer* create_timer(callback_func func, void *data) override
{
return new timer_generic(func, data, this);
}
};
void thread_pool_generic::cancel_pending(task* t)
{
std::unique_lock <std::mutex> lk(m_mtx);
for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++)
{
if (*it == t)
{
t->release();
*it = nullptr;
}
}
}
/**
Register worker in standby list, and wait to be woken.
@return
true - thread was woken
false - idle wait timeout exceeded (the current thread need to shutdown)
*/
bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk,
worker_data *thread_data)
{
assert(m_task_queue.empty());
assert(!m_in_shutdown);
thread_data->m_wake_reason= WAKE_REASON_NONE;
m_active_threads.erase(thread_data);
m_standby_threads.push_back(thread_data);
for (;;)
{
thread_data->m_cv.wait_for(lk, m_thread_timeout);
if (thread_data->m_wake_reason != WAKE_REASON_NONE)
{
/* Woke up not due to timeout.*/
return true;
}
if (thread_count() <= m_min_threads)
{
/* Do not shutdown thread, maintain required minimum of worker
threads.*/
continue;
}
/*
Woke up due to timeout, remove this thread's from the standby list. In
all other cases where it is signaled it is removed by the signaling
thread.
*/
m_standby_threads.erase(thread_data);
m_active_threads.push_back(thread_data);
return false;
}
}
/**
Workers "get next task" routine.
A task can be handed over to the current thread directly during submit().
if thread_var->m_wake_reason == WAKE_REASON_TASK.
Or a task can be taken from the task queue.
In case task queue is empty, the worker thread will park (wait for wakeup).
*/
bool thread_pool_generic::get_task(worker_data *thread_var, task **t)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (thread_var->is_long_task() && m_long_tasks_count)
m_long_tasks_count--;
thread_var->m_state = worker_data::NONE;
if (m_task_queue.empty())
{
if (m_in_shutdown)
return false;
if (!wait_for_tasks(lk, thread_var))
return false;
/* Task was handed over directly by signaling thread.*/
if (thread_var->m_wake_reason == WAKE_REASON_TASK)
{
*t= thread_var->m_task;
goto end;
}
if (m_task_queue.empty())
return false;
}
/* Dequeue from the task queue.*/
*t= m_task_queue.front();
m_task_queue.pop();
m_tasks_dequeued++;
end:
thread_var->m_state |= worker_data::EXECUTING_TASK;
thread_var->m_task_start_time = m_timestamp;
return true;
}
/** Worker thread shutdown routine. */
void thread_pool_generic::worker_end(worker_data* thread_data)
{
std::lock_guard<std::mutex> lk(m_mtx);
m_active_threads.erase(thread_data);
m_thread_data_cache.put(thread_data);
if (!thread_count() && m_in_shutdown)
{
/* Signal the destructor that no more threads are left. */
m_cv_no_threads.notify_all();
}
}
/* The worker get/execute task loop.*/
void thread_pool_generic::worker_main(worker_data *thread_var)
{
task* task;
if(m_worker_init_callback)
m_worker_init_callback();
while (get_task(thread_var, &task) && task)
{
task->execute();
}
if (m_worker_destroy_callback)
m_worker_destroy_callback();
worker_end(thread_var);
}
/*
Periodic job to fix thread count and concurrency,
in case of long tasks, etc
*/
void thread_pool_generic::maintainence()
{
/*
If pool is busy (i.e the its mutex is currently locked), we can
skip the maintainence task, some times, to lower mutex contention
*/
static int skip_counter;
const int MAX_SKIPS = 10;
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
if (skip_counter == MAX_SKIPS)
{
lk.lock();
}
else if (!lk.try_lock())
{
skip_counter++;
return;
}
skip_counter = 0;
m_timestamp = std::chrono::system_clock::now();
m_long_tasks_count = 0;
if (m_task_queue.empty())
{
m_last_activity = m_tasks_dequeued + m_wakeups;
return;
}
for (auto thread_data = m_active_threads.front();
thread_data;
thread_data = thread_data->m_next)
{
if (thread_data->is_executing_task() &&
(thread_data->is_long_task()
|| (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION)))
{
thread_data->m_state |= worker_data::LONG_TASK;
m_long_tasks_count++;
}
}
size_t thread_cnt = (int)thread_count();
if (m_active_threads.size() - m_long_tasks_count < m_concurrency*OVERSUBSCRIBE_FACTOR)
{
wake_or_create_thread();
return;
}
if (m_last_activity == m_tasks_dequeued + m_wakeups &&
m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
{
// no progress made since last iteration. create new
// thread
add_thread();
}
m_last_activity = m_tasks_dequeued + m_wakeups;
m_last_thread_count= thread_cnt;
}
/*
Heuristic used for thread creation throttling.
Returns interval in milliseconds between thread creation
(depending on number of threads already in the pool, and
desired concurrency level)
*/
static int throttling_interval_ms(size_t n_threads,size_t concurrency)
{
if (n_threads < concurrency*4)
return 0;
if (n_threads < concurrency*8)
return 50;
if (n_threads < concurrency*16)
return 100;
return 200;
}
/* Create a new worker.*/
bool thread_pool_generic::add_thread()
{
size_t n_threads = thread_count();
if (n_threads >= m_max_threads)
return false;
if (n_threads >= m_min_threads)
{
auto now = std::chrono::system_clock::now();
if (now - m_last_thread_creation <
std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency)))
{
/* Throttle thread creation.*/
return false;
}
}
worker_data *thread_data = m_thread_data_cache.get();
m_active_threads.push_back(thread_data);
try
{
std::thread thread(&thread_pool_generic::worker_main, this, thread_data);
m_last_thread_creation = std::chrono::system_clock::now();
thread.detach();
}
catch (std::system_error& e)
{
m_active_threads.erase(thread_data);
m_thread_data_cache.put(thread_data);
static bool warning_written;
if (!warning_written)
{
fprintf(stderr, "Warning : threadpool thread could not be created :%s,"
"current number of threads in pool %zu\n", e.what(), thread_count());
warning_written = true;
}
return false;
}
return true;
}
/** Wake a standby thread, and hand the given task over to this thread. */
bool thread_pool_generic::wake(worker_wake_reason reason, task *t)
{
assert(reason != WAKE_REASON_NONE);
if (m_standby_threads.empty())
return false;
auto var= m_standby_threads.back();
m_standby_threads.pop_back();
m_active_threads.push_back(var);
assert(var->m_wake_reason == WAKE_REASON_NONE);
var->m_wake_reason= reason;
var->m_cv.notify_one();
if (t)
{
var->m_task= t;
}
m_wakeups++;
return true;
}
thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
m_thread_data_cache(max_threads),
m_task_queue(10000),
m_standby_threads(),
m_active_threads(),
m_mtx(),
m_thread_timeout(std::chrono::milliseconds(60000)),
m_timer_interval(std::chrono::milliseconds(400)),
m_cv_no_threads(),
m_cv_timer(),
m_tasks_enqueued(),
m_tasks_dequeued(),
m_wakeups(),
m_spurious_wakeups(),
m_concurrency(std::thread::hardware_concurrency()),
m_in_shutdown(),
m_timestamp(),
m_long_tasks_count(),
m_last_thread_creation(),
m_min_threads(min_threads),
m_max_threads(max_threads),
m_last_thread_count(),
m_last_activity(),
m_maintaince_timer_task()
{
if (m_max_threads < m_concurrency)
m_concurrency = m_max_threads;
if (m_min_threads > m_concurrency)
m_concurrency = min_threads;
if (!m_concurrency)
m_concurrency = 1;
if (min_threads < max_threads)
{
m_maintaince_timer_task.reset(new timer_generic(thread_pool_generic::maintainence_func, this, nullptr));
m_maintaince_timer_task->set_time((int)m_timer_interval.count(), (int)m_timer_interval.count());
}
}
void thread_pool_generic::wake_or_create_thread()
{
assert(!m_task_queue.empty());
if (!m_standby_threads.empty())
{
auto t= m_task_queue.front();
m_task_queue.pop();
wake(WAKE_REASON_TASK, t);
}
else
{
add_thread();
}
}
/** Submit a new task*/
void thread_pool_generic::submit_task(task* task)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (m_in_shutdown)
return;
task->add_ref();
m_tasks_enqueued++;
m_task_queue.push(task);
if (m_active_threads.size() - m_long_tasks_count < m_concurrency *OVERSUBSCRIBE_FACTOR)
wake_or_create_thread();
}
/**
Wake up all workers, and wait until they are gone
Stop the timer.
*/
thread_pool_generic::~thread_pool_generic()
{
/*
Stop AIO early.
This is needed to prevent AIO completion thread
from calling submit_task() on an object that is being destroyed.
*/
m_aio.reset();
/* Also stop the maintanence task early. */
m_maintaince_timer_task.reset();
std::unique_lock<std::mutex> lk(m_mtx);
m_in_shutdown= true;
/* Wake up idle threads. */
while (wake(WAKE_REASON_SHUTDOWN))
{
}
while (thread_count())
{
m_cv_no_threads.wait(lk);
}
lk.unlock();
}
thread_pool *create_thread_pool_generic(int min_threads, int max_threads)
{
return new thread_pool_generic(min_threads, max_threads);
}
} // namespace tpool
/* Copyright(C) 2019 MariaDB Corporation
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#pragma once
#include <vector>
#include <stack>
#include <mutex>
#include <condition_variable>
#include <assert.h>
#include <algorithm>
namespace tpool
{
enum cache_notification_mode
{
NOTIFY_ONE,
NOTIFY_ALL
};
/**
Generic "pointer" cache of a fixed size
with fast put/get operations.
Compared to STL containers, is faster/does not
do allocations. However, put() operation will wait
if there is no free items.
*/
template<typename T> class cache
{
std::mutex m_mtx;
std::condition_variable m_cv;
std::vector<T> m_base;
std::vector<T*> m_cache;
cache_notification_mode m_notification_mode;
int m_waiters;
bool is_full()
{
return m_cache.size() == m_base.size();
}
public:
cache(size_t count, cache_notification_mode mode= tpool::cache_notification_mode::NOTIFY_ALL):
m_mtx(), m_cv(), m_base(count),m_cache(count), m_notification_mode(mode),m_waiters()
{
for(size_t i = 0 ; i < count; i++)
m_cache[i]=&m_base[i];
}
T* get(bool blocking=true)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (blocking)
{
while(m_cache.empty())
m_cv.wait(lk);
}
else
{
if(m_cache.empty())
return nullptr;
}
T* ret = m_cache.back();
m_cache.pop_back();
return ret;
}
void put(T *ele)
{
std::unique_lock<std::mutex> lk(m_mtx);
m_cache.push_back(ele);
if (m_notification_mode == NOTIFY_ONE)
m_cv.notify_one();
else if(m_cache.size() == 1)
m_cv.notify_all(); // Signal cache is not empty
else if(m_waiters && is_full())
m_cv.notify_all(); // Signal cache is full
}
bool contains(T* ele)
{
return ele >= &m_base[0] && ele <= &m_base[m_base.size() -1];
}
/* Wait until cache is full.*/
void wait()
{
std::unique_lock<std::mutex> lk(m_mtx);
m_waiters++;
while(!is_full())
m_cv.wait(lk);
m_waiters--;
}
};
/**
Circular, fixed size queue
used for the task queue.
Compared to STL queue, this one is
faster, and does not do memory allocations
*/
template <typename T> class circular_queue
{
public:
circular_queue(size_t N = 16)
: m_capacity(N + 1), m_buffer(m_capacity), m_head(), m_tail()
{
}
bool empty() { return m_head == m_tail; }
bool full() { return (m_head + 1) % m_capacity == m_tail; }
void clear() { m_head = m_tail = 0; }
void resize(size_t new_size)
{
auto current_size = size();
if (new_size <= current_size)
return;
size_t new_capacity = new_size - 1;
std::vector<T> new_buffer(new_capacity);
/* Figure out faster way to copy*/
size_t i = 0;
while (!empty())
{
T& ele = front();
pop();
new_buffer[i++] = ele;
}
m_buffer = new_buffer;
m_capacity = new_capacity;
m_tail = 0;
m_head = current_size;
}
void push(T ele)
{
if (full())
{
assert(size() == m_capacity - 1);
resize(size() + 1024);
}
m_buffer[m_head] = ele;
m_head = (m_head + 1) % m_capacity;
}
void push_front(T ele)
{
if (full())
{
resize(size() + 1024);
}
if (m_tail == 0)
m_tail = m_capacity - 1;
else
m_tail--;
m_buffer[m_tail] = ele;
}
T& front()
{
assert(!empty());
return m_buffer[m_tail];
}
void pop()
{
assert(!empty());
m_tail = (m_tail + 1) % m_capacity;
}
size_t size()
{
if (m_head < m_tail)
{
return m_capacity - m_tail + m_head;
}
else
{
return m_head - m_tail;
}
}
/*Iterator over elements in queue.*/
class iterator
{
size_t m_pos;
circular_queue<T>* m_queue;
public:
explicit iterator(size_t pos , circular_queue<T>* q) : m_pos(pos), m_queue(q) {}
iterator& operator++()
{
m_pos = (m_pos + 1) % m_queue->m_capacity;
return *this;
}
iterator operator++(int)
{
iterator retval = *this;
++(*this);
return retval;
}
bool operator==(iterator other) const
{
return m_pos == other.m_pos;
}
bool operator!=(iterator other) const
{
return !(*this == other);
}
T& operator*() const
{
return m_queue->m_buffer[m_pos];
}
};
iterator begin()
{
return iterator(m_tail, this);
}
iterator end()
{
return iterator(m_head, this);
}
private:
size_t m_capacity;
std::vector<T> m_buffer;
size_t m_head;
size_t m_tail;
};
/* Doubly linked list. Intrusive,
requires element to have m_next and m_prev pointers.
*/
template<typename T> class doubly_linked_list
{
public:
T* m_first;
T* m_last;
size_t m_count;
doubly_linked_list():m_first(),m_last(),m_count()
{}
void check()
{
assert(!m_first || !m_first->m_prev);
assert(!m_last || !m_last->m_next);
assert((!m_first && !m_last && m_count == 0)
|| (m_first != 0 && m_last != 0 && m_count > 0));
T* current = m_first;
for(size_t i=1; i< m_count;i++)
{
current = current->m_next;
}
assert(current == m_last);
current = m_last;
for (size_t i = 1; i < m_count; i++)
{
current = current->m_prev;
}
assert(current == m_first);
}
T* front()
{
return m_first;
}
size_t size()
{
return m_count;
}
void push_back(T* ele)
{
ele->m_prev = m_last;
if (m_last)
m_last->m_next = ele;
ele->m_next = 0;
m_last = ele;
if (!m_first)
m_first = m_last;
m_count++;
}
T* back()
{
return m_last;
}
bool empty()
{
return m_count == 0;
}
void pop_back()
{
m_last = m_last->m_prev;
if (m_last)
m_last->m_next = 0;
else
m_first = 0;
m_count--;
}
bool contains(T* ele)
{
if (!ele)
return false;
T* current = m_first;
while(current)
{
if(current == ele)
return true;
current = current->m_next;
}
return false;
}
void erase(T* ele)
{
assert(contains(ele));
if (ele == m_first)
{
m_first = ele->m_next;
if (m_first)
m_first->m_prev = 0;
else
m_last = 0;
}
else if (ele == m_last)
{
assert(ele->m_prev);
m_last = ele->m_prev;
m_last->m_next = 0;
}
else
{
assert(ele->m_next);
assert(ele->m_prev);
ele->m_next->m_prev = ele->m_prev;
ele->m_prev->m_next = ele->m_next;
}
m_count--;
}
};
}
/* Copyright(C) 2019 MariaDB
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h"
#include <stdlib.h>
#include <tpool.h>
#include <windows.h>
#include <atomic>
/**
Implementation of tpool/aio based on Windows native threadpool.
*/
namespace tpool
{
/**
Pool, based on Windows native(Vista+) threadpool.
*/
class thread_pool_win : public thread_pool
{
/**
Handle per-thread init/term functions.
Since it is Windows that creates thread, and not us,
it is tricky. We employ thread local storage data
and check whether init function was called, inside every callback.
*/
struct tls_data
{
thread_pool_win *m_pool;
~tls_data()
{
/* Call thread termination function. */
if (!m_pool)
return;
if (m_pool->m_worker_destroy_callback)
m_pool->m_worker_destroy_callback();
m_pool->m_thread_count--;
}
/** This needs to be called before every IO or simple task callback.*/
void callback_prolog(thread_pool_win* pool)
{
assert(pool);
assert(!m_pool || (m_pool == pool));
if (m_pool)
{
// TLS data already initialized.
return;
}
m_pool = pool;
m_pool->m_thread_count++;
// Call the thread init function.
if (m_pool->m_worker_init_callback)
m_pool->m_worker_init_callback();
}
};
static thread_local struct tls_data tls_data;
/** Timer */
class native_timer : public timer
{
std::mutex m_mtx; // protects against parallel execution
std::mutex m_shutdown_mtx; // protects m_on
PTP_TIMER m_ptp_timer;
callback_func m_func;
void *m_data;
thread_pool_win& m_pool;
int m_period;
bool m_on;
static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE callback_instance, void *context,
PTP_TIMER callback_timer)
{
native_timer *timer= (native_timer *) context;
tls_data.callback_prolog(&timer->m_pool);
std::unique_lock<std::mutex> lk(timer->m_mtx, std::defer_lock);
if (!lk.try_lock())
{
/* Do not try to run timers in parallel */
return;
}
timer->m_func(timer->m_data);
if (timer->m_period)
timer->set_time(timer->m_period, timer->m_period);
}
public:
native_timer(thread_pool_win& pool, callback_func func, void* data) :
m_mtx(), m_func(func), m_data(data), m_pool(pool), m_period(), m_on(true)
{
m_ptp_timer= CreateThreadpoolTimer(timer_callback, this, &pool.m_env);
}
void set_time(int initial_delay_ms, int period_ms) override
{
std::unique_lock<std::mutex> lk(m_shutdown_mtx);
if (!m_on)
return;
long long initial_delay = -10000LL * initial_delay_ms;
SetThreadpoolTimer(m_ptp_timer, NULL, 0, 0);
SetThreadpoolTimer(m_ptp_timer, (PFILETIME)&initial_delay, 0, 100);
m_period = period_ms;
}
void disarm() override
{
std::unique_lock<std::mutex> lk(m_shutdown_mtx);
m_on = false;
SetThreadpoolTimer(m_ptp_timer, NULL , 0, 0);
lk.unlock();
/* Don't do it in timer callback, that will hang*/
WaitForThreadpoolTimerCallbacks(m_ptp_timer, TRUE);
}
~native_timer()
{
disarm();
CloseThreadpoolTimer(m_ptp_timer);
}
};
/** AIO handler */
class native_aio : public aio
{
thread_pool_win& m_pool;
public:
native_aio(thread_pool_win &pool, int max_io)
: m_pool(pool)
{
}
/**
Submit async IO.
*/
virtual int submit_io(aiocb* cb) override
{
memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED));
ULARGE_INTEGER uli;
uli.QuadPart = cb->m_offset;
cb->Offset = uli.LowPart;
cb->OffsetHigh = uli.HighPart;
cb->m_internal = this;
StartThreadpoolIo(cb->m_fh.m_ptp_io);
BOOL ok;
if (cb->m_opcode == aio_opcode::AIO_PREAD)
ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
else
ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
if (ok || (GetLastError() == ERROR_IO_PENDING))
return 0;
CancelThreadpoolIo(cb->m_fh.m_ptp_io);
return -1;
}
/**
PTP_WIN32_IO_CALLBACK-typed function, required parameter for
CreateThreadpoolIo(). The user callback and other auxiliary data is put into
the extended OVERLAPPED parameter.
*/
static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PVOID overlapped,
ULONG io_result, ULONG_PTR nbytes,
PTP_IO io)
{
aiocb* cb = (aiocb*)overlapped;
native_aio* aio = (native_aio*)cb->m_internal;
tls_data.callback_prolog(&aio->m_pool);
cb->m_err = io_result;
cb->m_ret_len = (int)nbytes;
cb->m_internal_task.m_func = cb->m_callback;
cb->m_internal_task.m_group = cb->m_group;
cb->m_internal_task.m_arg = cb;
cb->m_internal_task.execute();
}
/**
Binds the file handle via CreateThreadpoolIo().
*/
virtual int bind(native_file_handle& fd) override
{
fd.m_ptp_io =
CreateThreadpoolIo(fd.m_handle, io_completion_callback, 0, &(m_pool.m_env));
if (fd.m_ptp_io)
return 0;
return -1;
}
/**
Unbind the file handle via CloseThreadpoolIo.
*/
virtual int unbind(const native_file_handle& fd) override
{
if (fd.m_ptp_io)
CloseThreadpoolIo(fd.m_ptp_io);
return 0;
}
};
PTP_POOL m_ptp_pool;
TP_CALLBACK_ENVIRON m_env;
PTP_CLEANUP_GROUP m_cleanup;
const int TASK_CACHE_SIZE= 10000;
struct task_cache_entry
{
thread_pool_win *m_pool;
task* m_task;
};
cache<task_cache_entry> m_task_cache;
std::atomic<int> m_thread_count;
public:
thread_pool_win(int min_threads= 0, int max_threads= 0)
: m_task_cache(TASK_CACHE_SIZE),m_thread_count(0)
{
InitializeThreadpoolEnvironment(&m_env);
m_ptp_pool= CreateThreadpool(NULL);
m_cleanup= CreateThreadpoolCleanupGroup();
SetThreadpoolCallbackPool(&m_env, m_ptp_pool);
SetThreadpoolCallbackCleanupGroup(&m_env, m_cleanup, 0);
if (min_threads)
SetThreadpoolThreadMinimum(m_ptp_pool, min_threads);
if (max_threads)
SetThreadpoolThreadMaximum(m_ptp_pool, max_threads);
}
~thread_pool_win()
{
CloseThreadpoolCleanupGroupMembers(m_cleanup, TRUE, NULL);
CloseThreadpoolCleanupGroup(m_cleanup);
CloseThreadpool(m_ptp_pool);
// Wait until all threads finished and TLS destructors ran.
while(m_thread_count)
Sleep(1);
}
/**
PTP_SIMPLE_CALLBACK-typed function, used by TrySubmitThreadpoolCallback()
*/
static void CALLBACK task_callback(PTP_CALLBACK_INSTANCE, void *param)
{
auto entry= (task_cache_entry *) param;
auto task= entry->m_task;
tls_data.callback_prolog(entry->m_pool);
entry->m_pool->m_task_cache.put(entry);
task->execute();
}
virtual void submit_task(task *task) override
{
auto entry= m_task_cache.get();
task->add_ref();
entry->m_pool= this;
entry->m_task= task;
if (!TrySubmitThreadpoolCallback(task_callback, entry, &m_env))
abort();
}
aio *create_native_aio(int max_io) override
{
return new native_aio(*this, max_io);
}
timer* create_timer(callback_func func, void* data) override
{
return new native_timer(*this, func, data);
}
};
thread_local struct thread_pool_win::tls_data thread_pool_win::tls_data;
thread_pool *create_thread_pool_win(int min_threads, int max_threads)
{
return new thread_pool_win(min_threads, max_threads);
}
} // namespace tpool
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment