Commit 8b896992 authored by Marko Mäkelä's avatar Marko Mäkelä

Merge 10.5 into 10.6

parents 581aebe2 657fcdf4
......@@ -4157,7 +4157,7 @@ void btr_cur_upd_rec_in_place(rec_t *rec, const dict_index_t *index,
}
ulint l = rec_get_1byte_offs_flag(rec)
? (n + 1) : (n + 1) * 2;
byte* b = &rec[-REC_N_OLD_EXTRA_BYTES - l];
byte* b = rec - REC_N_OLD_EXTRA_BYTES - l;
compile_time_assert(REC_1BYTE_SQL_NULL_MASK << 8
== REC_2BYTE_SQL_NULL_MASK);
mtr->write<1>(*block, b,
......@@ -4180,7 +4180,7 @@ void btr_cur_upd_rec_in_place(rec_t *rec, const dict_index_t *index,
ut_ad(len == rec_get_nth_field_size(rec, n));
ulint l = rec_get_1byte_offs_flag(rec)
? (n + 1) : (n + 1) * 2;
byte* b = &rec[-REC_N_OLD_EXTRA_BYTES - l];
byte* b = rec - REC_N_OLD_EXTRA_BYTES - l;
compile_time_assert(REC_1BYTE_SQL_NULL_MASK << 8
== REC_2BYTE_SQL_NULL_MASK);
mtr->write<1>(*block, b,
......
......@@ -126,6 +126,20 @@ static void buf_flush_validate_skip()
}
#endif /* UNIV_DEBUG */
/** Wake up the page cleaner if needed */
inline void buf_pool_t::page_cleaner_wakeup()
{
if (page_cleaner_idle() &&
(srv_max_dirty_pages_pct_lwm == 0.0 ||
srv_max_dirty_pages_pct_lwm <=
double(UT_LIST_GET_LEN(buf_pool.flush_list)) * 100.0 /
double(UT_LIST_GET_LEN(buf_pool.LRU) + UT_LIST_GET_LEN(buf_pool.free))))
{
page_cleaner_is_idle= false;
mysql_cond_signal(&do_flush_list);
}
}
/** Insert a modified block into the flush list.
@param[in,out] block modified block
@param[in] lsn oldest modification */
......@@ -145,6 +159,7 @@ void buf_flush_insert_into_flush_list(buf_block_t* block, lsn_t lsn)
UT_LIST_ADD_FIRST(buf_pool.flush_list, &block->page);
ut_d(buf_flush_validate_skip());
buf_pool.page_cleaner_wakeup();
mysql_mutex_unlock(&buf_pool.flush_list_mutex);
}
......@@ -2067,8 +2082,12 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
else if (srv_shutdown_state > SRV_SHUTDOWN_INITIATED)
break;
mysql_cond_timedwait(&buf_pool.do_flush_list, &buf_pool.flush_list_mutex,
&abstime);
if (buf_pool.page_cleaner_idle())
mysql_cond_wait(&buf_pool.do_flush_list, &buf_pool.flush_list_mutex);
else
mysql_cond_timedwait(&buf_pool.do_flush_list, &buf_pool.flush_list_mutex,
&abstime);
set_timespec(abstime, 1);
lsn_limit= buf_flush_sync_lsn;
......@@ -2091,6 +2110,8 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
/* wake up buf_flush_wait_flushed() */
mysql_cond_broadcast(&buf_pool.done_flush_list);
}
unemployed:
buf_pool.page_cleaner_set_idle(true);
continue;
}
......@@ -2101,13 +2122,14 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
double(UT_LIST_GET_LEN(buf_pool.LRU) + UT_LIST_GET_LEN(buf_pool.free));
if (dirty_pct < srv_max_dirty_pages_pct_lwm && !lsn_limit)
continue;
goto unemployed;
const lsn_t oldest_lsn= buf_pool.get_oldest_modification(0);
if (UNIV_UNLIKELY(lsn_limit != 0) && oldest_lsn >= lsn_limit)
buf_flush_sync_lsn= 0;
buf_pool.page_cleaner_set_idle(false);
mysql_mutex_unlock(&buf_pool.flush_list_mutex);
ulint n_flushed;
......@@ -2159,6 +2181,11 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
goto do_checkpoint;
}
}
else
{
mysql_mutex_lock(&buf_pool.flush_list_mutex);
goto unemployed;
}
#ifdef UNIV_DEBUG
while (innodb_page_cleaner_disabled_debug && !buf_flush_sync_lsn &&
......
......@@ -76,13 +76,12 @@ uncompressed and compressed data), which must be clean. */
/* @{ */
/** Number of intervals for which we keep the history of these stats.
Each interval is 1 second, defined by the rate at which
srv_error_monitor_thread() calls buf_LRU_stat_update(). */
static const ulint BUF_LRU_STAT_N_INTERVAL = 50;
Updated at SRV_MONITOR_INTERVAL (the buf_LRU_stat_update() call rate). */
static constexpr ulint BUF_LRU_STAT_N_INTERVAL= 4;
/** Co-efficient with which we multiply I/O operations to equate them
with page_zip_decompress() operations. */
static const ulint BUF_LRU_IO_TO_UNZIP_FACTOR = 50;
static constexpr ulint BUF_LRU_IO_TO_UNZIP_FACTOR= 50;
/** Sampled values buf_LRU_stat_cur.
Not protected by any mutex. Updated by buf_LRU_stat_update(). */
......
......@@ -16948,6 +16948,7 @@ innodb_max_dirty_pages_pct_update(
in_val);
srv_max_dirty_pages_pct_lwm = in_val;
mysql_cond_signal(&buf_pool.do_flush_list);
}
srv_max_buf_pool_modified_pct = in_val;
......@@ -16981,6 +16982,7 @@ innodb_max_dirty_pages_pct_lwm_update(
}
srv_max_dirty_pages_pct_lwm = in_val;
mysql_cond_signal(&buf_pool.do_flush_list);
}
/*************************************************************//**
......
......@@ -1937,10 +1937,29 @@ class buf_pool_t
FlushHp flush_hp;
/** modified blocks (a subset of LRU) */
UT_LIST_BASE_NODE_T(buf_page_t) flush_list;
private:
/** whether the page cleaner needs wakeup from indefinite sleep */
bool page_cleaner_is_idle;
public:
/** signalled to wake up the page_cleaner; protected by flush_list_mutex */
mysql_cond_t do_flush_list;
/** @return whether the page cleaner must sleep due to being idle */
bool page_cleaner_idle() const
{
mysql_mutex_assert_owner(&flush_list_mutex);
return page_cleaner_is_idle;
}
/** Wake up the page cleaner if needed */
inline void page_cleaner_wakeup();
/** Register whether an explicit wakeup of the page cleaner is needed */
void page_cleaner_set_idle(bool deep_sleep)
{
mysql_mutex_assert_owner(&flush_list_mutex);
page_cleaner_is_idle= deep_sleep;
}
// n_flush_LRU + n_flush_list is approximately COUNT(io_fix()==BUF_IO_WRITE)
// in flush_list
......
......@@ -237,8 +237,7 @@ class IORequest
@param off byte offset from the start (SEEK_SET)
@param len size of the hole in bytes
@return DB_SUCCESS or error code */
dberr_t punch_hole(os_offset_t off, ulint len) const
MY_ATTRIBUTE((nonnull));
dberr_t punch_hole(os_offset_t off, ulint len) const;
public:
/** Page to be written on write operation */
......@@ -265,8 +264,7 @@ struct os_file_size_t {
os_offset_t m_alloc_size;
};
/** Win NT does not allow more than 64 */
static const ulint OS_AIO_N_PENDING_IOS_PER_THREAD = 256;
constexpr ulint OS_AIO_N_PENDING_IOS_PER_THREAD= 256;
extern ulint os_n_file_reads;
extern ulint os_n_file_writes;
......
......@@ -30,9 +30,9 @@ class rw_lock
/** Available lock */
static constexpr uint32_t UNLOCKED= 0;
/** Flag to indicate that write_lock() is being held */
static constexpr uint32_t WRITER= 1 << 31;
static constexpr uint32_t WRITER= 1U << 31;
/** Flag to indicate that write_lock_wait() is pending */
static constexpr uint32_t WRITER_WAITING= 1 << 30;
static constexpr uint32_t WRITER_WAITING= 1U << 30;
/** Flag to indicate that write_lock() or write_lock_wait() is pending */
static constexpr uint32_t WRITER_PENDING= WRITER | WRITER_WAITING;
......
......@@ -702,13 +702,6 @@ Complete the shutdown tasks such as background DROP TABLE,
and optionally change buffer merge (on innodb_fast_shutdown=0). */
void srv_shutdown(bool ibuf_merge);
/*************************************************************************
A task which prints warnings about semaphore waits which have lasted
too long. These can be used to track bugs which cause hangs.
*/
void srv_error_monitor_task(void*);
} /* extern "C" */
#ifdef UNIV_DEBUG
......@@ -900,12 +893,14 @@ struct srv_slot_t{
extern tpool::thread_pool *srv_thread_pool;
extern std::unique_ptr<tpool::timer> srv_master_timer;
extern std::unique_ptr<tpool::timer> srv_error_monitor_timer;
extern std::unique_ptr<tpool::timer> srv_monitor_timer;
/** The interval at which srv_monitor_task is invoked, in milliseconds */
constexpr unsigned SRV_MONITOR_INTERVAL= 15000; /* 4 times per minute */
static inline void srv_monitor_timer_schedule_now()
{
srv_monitor_timer->set_time(0, 5000);
srv_monitor_timer->set_time(0, SRV_MONITOR_INTERVAL);
}
static inline void srv_start_periodic_timer(std::unique_ptr<tpool::timer>& t,
void (*func)(void*), int period)
......
......@@ -160,8 +160,7 @@ trx_undo_get_first_rec(const fil_space_t &space, uint32_t page_no,
NOTE: This corresponds to a redo log record and must not be changed!
@see mtr_t::undo_create()
@param[in,out] block undo log page */
void trx_undo_page_init(const buf_block_t &block)
MY_ATTRIBUTE((nonnull));
void trx_undo_page_init(const buf_block_t &block);
/** Allocate an undo log page.
@param[in,out] undo undo log
......
......@@ -1009,7 +1009,6 @@ ATTRIBUTE_COLD void logs_empty_and_mark_files_at_shutdown()
!srv_read_only_mode && srv_fast_shutdown < 2) {
buf_dump_start();
}
srv_error_monitor_timer.reset();
srv_monitor_timer.reset();
lock_sys.timeout_timer.reset();
if (do_srv_shutdown) {
......
......@@ -2683,7 +2683,7 @@ bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse,
data_len-= enc_hdr_l >> 3;
data= &static_cast<const byte*>(data)[enc_hdr_l >> 3];
memcpy(buf, &prev_rec[-REC_N_NEW_EXTRA_BYTES - hdr_c], hdr_c);
memcpy(buf, prev_rec - REC_N_NEW_EXTRA_BYTES - hdr_c, hdr_c);
buf+= hdr_c;
*buf++= static_cast<byte>((enc_hdr_l & 3) << 4); /* info_bits; n_owned=0 */
*buf++= static_cast<byte>(h >> 5); /* MSB of heap number */
......
......@@ -1820,8 +1820,8 @@ row_merge_read_clustered_index(
based on that. */
clust_index = dict_table_get_first_index(old_table);
const ulint old_trx_id_col = DATA_TRX_ID - DATA_N_SYS_COLS
+ ulint(old_table->n_cols);
const ulint old_trx_id_col = ulint(old_table->n_cols)
- (DATA_N_SYS_COLS - DATA_TRX_ID);
ut_ad(old_table->cols[old_trx_id_col].mtype == DATA_SYS);
ut_ad(old_table->cols[old_trx_id_col].prtype
== (DATA_TRX_ID | DATA_NOT_NULL));
......
......@@ -186,7 +186,7 @@ tpool::thread_pool* srv_thread_pool;
/** Maximum number of times allowed to conditionally acquire
mutex before switching to blocking wait on the mutex */
#define MAX_MUTEX_NOWAIT 20
#define MAX_MUTEX_NOWAIT 2
/** Check whether the number of failed nonblocking mutex
acquisition attempts exceeds maximum allowed value. If so,
......@@ -555,8 +555,7 @@ struct purge_coordinator_state
static purge_coordinator_state purge_state;
/** threadpool timer for srv_error_monitor_task(). */
std::unique_ptr<tpool::timer> srv_error_monitor_timer;
/** threadpool timer for srv_monitor_task() */
std::unique_ptr<tpool::timer> srv_monitor_timer;
......@@ -769,16 +768,11 @@ srv_boot(void)
/******************************************************************//**
Refreshes the values used to calculate per-second averages. */
static
void
srv_refresh_innodb_monitor_stats(void)
/*==================================*/
static void srv_refresh_innodb_monitor_stats(time_t current_time)
{
mutex_enter(&srv_innodb_monitor_mutex);
time_t current_time = time(NULL);
if (difftime(current_time, srv_last_monitor_time) <= 60) {
if (difftime(current_time, srv_last_monitor_time) < 60) {
/* We referesh InnoDB Monitor values so that averages are
printed from at most 60 last seconds */
mutex_exit(&srv_innodb_monitor_mutex);
......@@ -1309,26 +1303,18 @@ struct srv_monitor_state_t
static srv_monitor_state_t monitor_state;
/** A task which prints the info output by various InnoDB monitors.*/
void srv_monitor_task(void*)
static void srv_monitor()
{
double time_elapsed;
time_t current_time;
ut_ad(!srv_read_only_mode);
current_time = time(NULL);
time_elapsed = difftime(current_time, monitor_state.last_monitor_time);
time_t current_time = time(NULL);
if (time_elapsed > 15) {
if (difftime(current_time, monitor_state.last_monitor_time) >= 15) {
monitor_state.last_monitor_time = current_time;
if (srv_print_innodb_monitor) {
/* Reset mutex_skipped counter everytime
srv_print_innodb_monitor changes. This is to
ensure we will not be blocked by lock_sys.mutex
for short duration information printing,
such as requested by sync_array_print_long_waits() */
for short duration information printing */
if (!monitor_state.last_srv_print_monitor) {
monitor_state.mutex_skipped = 0;
monitor_state.last_srv_print_monitor = true;
......@@ -1366,14 +1352,14 @@ void srv_monitor_task(void*)
}
}
srv_refresh_innodb_monitor_stats();
srv_refresh_innodb_monitor_stats(current_time);
}
/*********************************************************************//**
A task which prints warnings about semaphore waits which have lasted
too long. These can be used to track bugs which cause hangs.
*/
void srv_error_monitor_task(void*)
void srv_monitor_task(void*)
{
/* number of successive fatal timeouts observed */
static ulint fatal_cnt;
......@@ -1408,20 +1394,17 @@ void srv_error_monitor_task(void*)
if (sync_array_print_long_waits(&waiter, &sema)
&& sema == old_sema && os_thread_eq(waiter, old_waiter)) {
#if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES)
if (os_event_is_set(srv_allow_writes_event)) {
#endif /* WITH_WSREP */
fatal_cnt++;
#if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES)
} else {
fprintf(stderr,
"WSREP: avoiding InnoDB self crash due to long "
"semaphore wait of > %lu seconds\n"
"Server is processing SST donor operation, "
"fatal_cnt now: " ULINTPF,
srv_fatal_semaphore_wait_threshold, fatal_cnt);
}
if (!os_event_is_set(srv_allow_writes_event)) {
fprintf(stderr,
"WSREP: avoiding InnoDB self crash due to "
"long semaphore wait of > %lu seconds\n"
"Server is processing SST donor operation, "
"fatal_cnt now: " ULINTPF,
srv_fatal_semaphore_wait_threshold, fatal_cnt);
return;
}
#endif /* WITH_WSREP */
if (fatal_cnt > 10) {
if (fatal_cnt++) {
ib::fatal() << "Semaphore wait has lasted > "
<< srv_fatal_semaphore_wait_threshold
<< " seconds. We intentionally crash the"
......@@ -1432,6 +1415,8 @@ void srv_error_monitor_task(void*)
old_waiter = waiter;
old_sema = sema;
}
srv_monitor();
}
/******************************************************************//**
......
......@@ -339,7 +339,7 @@ static dberr_t create_log_file(bool create_new_db, lsn_t lsn,
@param[in,out] logfile0 name of the first log file
@return error code
@retval DB_SUCCESS on successful operation */
MY_ATTRIBUTE((warn_unused_result, nonnull))
MY_ATTRIBUTE((warn_unused_result))
static dberr_t create_log_file_rename(lsn_t lsn, std::string &logfile0)
{
ut_ad(!srv_log_file_created);
......@@ -1813,8 +1813,8 @@ dberr_t srv_start(bool create_new_db)
DBUG_EXECUTE_IF("innodb_skip_monitors", goto skip_monitors;);
/* Create the task which warns of long semaphore waits */
srv_start_periodic_timer(srv_error_monitor_timer, srv_error_monitor_task, 1000);
srv_start_periodic_timer(srv_monitor_timer, srv_monitor_task, 5000);
srv_start_periodic_timer(srv_monitor_timer, srv_monitor_task,
SRV_MONITOR_INTERVAL);
#ifndef DBUG_OFF
skip_monitors:
......
......@@ -893,6 +893,7 @@ sync_array_print_long_waits_low(
#else
# define SYNC_ARRAY_TIMEOUT 240
#endif
const time_t now = time(NULL);
for (ulint i = 0; i < arr->n_cells; i++) {
......@@ -908,7 +909,7 @@ sync_array_print_long_waits_low(
continue;
}
double diff = difftime(time(NULL), cell->reservation_time);
double diff = difftime(now, cell->reservation_time);
if (diff > SYNC_ARRAY_TIMEOUT) {
ib::warn() << "A long semaphore wait:";
......@@ -982,12 +983,6 @@ sync_array_print_long_waits(
}
if (noticed) {
fprintf(stderr,
"InnoDB: ###### Starts InnoDB Monitor"
" for 30 secs to print diagnostic info:\n");
my_bool old_val = srv_print_innodb_monitor;
/* If some crucial semaphore is reserved, then also the InnoDB
Monitor can hang, and we do not get diagnostics. Since in
many cases an InnoDB hang is caused by a pwrite() or a pread()
......@@ -1000,14 +995,7 @@ sync_array_print_long_waits(
MONITOR_VALUE(MONITOR_OS_PENDING_READS),
MONITOR_VALUE(MONITOR_OS_PENDING_WRITES));
srv_print_innodb_monitor = TRUE;
lock_wait_timeout_task(nullptr);
srv_print_innodb_monitor = static_cast<my_bool>(old_val);
fprintf(stderr,
"InnoDB: ###### Diagnostic info printed"
" to the standard error stream\n");
}
return(fatal);
......
......@@ -2452,7 +2452,7 @@ trx_undo_prev_version_build(
== rec_get_nth_field_size(rec, n));
ulint l = rec_get_1byte_offs_flag(*old_vers)
? (n + 1) : (n + 1) * 2;
(*old_vers)[-REC_N_OLD_EXTRA_BYTES - l]
*(*old_vers - REC_N_OLD_EXTRA_BYTES - l)
&= byte(~REC_1BYTE_SQL_NULL_MASK);
}
}
......
/* Copyright(C) 2019 MariaDB Corporation.
/* Copyright (C) 2019, 2020, MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
......@@ -14,133 +14,153 @@ along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h"
#include <stdlib.h>
#include <signal.h>
#include <assert.h>
#include "tpool.h"
#include <thread>
#ifdef LINUX_NATIVE_AIO
#include <libaio.h>
# include <thread>
# include <atomic>
# include <libaio.h>
# include <sys/syscall.h>
/**
Invoke the io_getevents() system call.
@param ctx context from io_setup()
@param min_nr minimum number of completion events to wait for
@param nr maximum number of completion events to collect
@param ev the collected events
In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc
the io_getevents() implementation in libaio was "optimized" so that it
would elide the system call when there are no outstanding requests
and a timeout was specified.
The libaio code for dereferencing ctx would occasionally trigger
SIGSEGV if io_destroy() was concurrently invoked from another thread.
Hence, we use the raw system call.
*/
static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
{
int saved_errno= errno;
int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
min_nr, nr, ev, 0);
if (ret < 0)
{
ret= -errno;
errno= saved_errno;
}
return ret;
}
#endif
/*
Linux AIO implementation, based on native AIO.
Needs libaio.h and -laio at the compile time.
submit_io() is used to submit async IO.
io_submit() is used to submit async IO.
There is a single thread, that collects the completion notification
with io_getevent(), and forwards io completion callback
A single thread will collect the completion notification
with io_getevents() and forward io completion callback to
the worker threadpool.
*/
namespace tpool
{
#ifdef LINUX_NATIVE_AIO
class aio_linux : public aio
class aio_linux final : public aio
{
thread_pool* m_pool;
thread_pool *m_pool;
io_context_t m_io_ctx;
bool m_in_shutdown;
std::thread m_getevent_thread;
static std::atomic<bool> shutdown_in_progress;
static void getevent_thread_routine(aio_linux* aio)
static void getevent_thread_routine(aio_linux *aio)
{
/* We collect this many events at a time. os_aio_init() would
multiply OS_AIO_N_PENDING_THREADS by the number of read and write threads
and ultimately pass it to io_setup() via thread_pool::configure_aio(). */
constexpr unsigned MAX_EVENTS= 256;
io_event events[MAX_EVENTS];
for (;;)
{
io_event event;
struct timespec ts{0, 500000000};
int ret = io_getevents(aio->m_io_ctx, 1, 1, &event, &ts);
if (aio->m_in_shutdown)
break;
if (ret > 0)
{
aiocb* iocb = (aiocb*)event.obj;
long long res = event.res;
if (res < 0)
switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) {
case -EINTR:
continue;
case -EINVAL:
if (shutdown_in_progress)
return;
/* fall through */
default:
if (ret < 0)
{
iocb->m_err = static_cast<int>(-res);
iocb->m_ret_len = 0;
fprintf(stderr, "io_getevents returned %d\n", ret);
abort();
return;
}
else
for (int i= 0; i < ret; i++)
{
iocb->m_ret_len = ret;
iocb->m_err = 0;
const io_event &event= events[i];
aiocb *iocb= static_cast<aiocb*>(event.obj);
if (static_cast<int>(event.res) < 0)
{
iocb->m_err= -event.res;
iocb->m_ret_len= 0;
}
else
{
iocb->m_ret_len= event.res;
iocb->m_err= 0;
}
iocb->m_internal_task.m_func= iocb->m_callback;
iocb->m_internal_task.m_arg= iocb;
iocb->m_internal_task.m_group= iocb->m_group;
aio->m_pool->submit_task(&iocb->m_internal_task);
}
iocb->m_internal_task.m_func = iocb->m_callback;
iocb->m_internal_task.m_arg = iocb;
iocb->m_internal_task.m_group = iocb->m_group;
aio->m_pool->submit_task(&iocb->m_internal_task);
continue;
}
switch (ret)
{
case -EAGAIN:
usleep(1000);
continue;
case -EINTR:
case 0:
continue;
default:
fprintf(stderr, "io_getevents returned %d\n", ret);
abort();
}
}
}
public:
aio_linux(io_context_t ctx, thread_pool* pool)
aio_linux(io_context_t ctx, thread_pool *pool)
: m_pool(pool), m_io_ctx(ctx),
m_in_shutdown(), m_getevent_thread(getevent_thread_routine, this)
m_getevent_thread(getevent_thread_routine, this)
{
}
~aio_linux()
{
m_in_shutdown = true;
m_getevent_thread.join();
shutdown_in_progress= true;
io_destroy(m_io_ctx);
m_getevent_thread.join();
shutdown_in_progress= false;
}
// Inherited via aio
virtual int submit_io(aiocb* cb) override
int submit_io(aiocb *cb) override
{
if (cb->m_opcode == aio_opcode::AIO_PREAD)
io_prep_pread((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len,
cb->m_offset);
else
io_prep_pwrite((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len,
cb->m_offset);
int ret;
ret = io_submit(m_io_ctx, 1, (iocb * *)& cb);
io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len,
cb->m_offset);
if (cb->m_opcode != aio_opcode::AIO_PREAD)
cb->aio_lio_opcode= IO_CMD_PWRITE;
iocb *icb= static_cast<iocb*>(cb);
int ret= io_submit(m_io_ctx, 1, &icb);
if (ret == 1)
return 0;
errno = -ret;
errno= -ret;
return -1;
}
// Inherited via aio
virtual int bind(native_file_handle& fd) override
{
return 0;
}
virtual int unbind(const native_file_handle& fd) override
{
return 0;
}
int bind(native_file_handle&) override { return 0; }
int unbind(const native_file_handle&) override { return 0; }
};
aio* create_linux_aio(thread_pool* pool, int max_io)
std::atomic<bool> aio_linux::shutdown_in_progress;
aio *create_linux_aio(thread_pool *pool, int max_io)
{
io_context_t ctx;
memset(&ctx, 0, sizeof(ctx));
int ret = io_setup(max_io, &ctx);
if (ret)
memset(&ctx, 0, sizeof ctx);
if (int ret= io_setup(max_io, &ctx))
{
fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
return nullptr;
......@@ -148,9 +168,6 @@ aio* create_linux_aio(thread_pool* pool, int max_io)
return new aio_linux(ctx, pool);
}
#else
aio* create_linux_aio(thread_pool* pool, int max_aio)
{
return nullptr;
}
aio *create_linux_aio(thread_pool*, int) { return nullptr; }
#endif
}
......@@ -230,19 +230,19 @@ class thread_pool_generic : public thread_pool
/** Maximimum number of threads in this pool. */
unsigned int m_max_threads;
/* Maintainence related statistics (see maintainence()) */
/* maintenance related statistics (see maintenance()) */
size_t m_last_thread_count;
unsigned long long m_last_activity;
std::unique_ptr<timer> m_maintaince_timer_task;
std::unique_ptr<timer> m_maintenance_timer_task;
void worker_main(worker_data *thread_data);
void worker_end(worker_data* thread_data);
/* Checks threadpool responsiveness, adjusts thread_counts */
void maintainence();
static void maintainence_func(void* arg)
void maintenance();
static void maintenance_func(void* arg)
{
((thread_pool_generic *)arg)->maintainence();
((thread_pool_generic *)arg)->maintenance();
}
bool add_thread();
bool wake(worker_wake_reason reason, task *t = nullptr);
......@@ -528,11 +528,11 @@ void thread_pool_generic::worker_main(worker_data *thread_var)
Periodic job to fix thread count and concurrency,
in case of long tasks, etc
*/
void thread_pool_generic::maintainence()
void thread_pool_generic::maintenance()
{
/*
If pool is busy (i.e the its mutex is currently locked), we can
skip the maintainence task, some times, to lower mutex contention
skip the maintenance task, some times, to lower mutex contention
*/
static int skip_counter;
const int MAX_SKIPS = 10;
......@@ -691,7 +691,7 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
m_max_threads(max_threads),
m_last_thread_count(),
m_last_activity(),
m_maintaince_timer_task()
m_maintenance_timer_task()
{
if (m_max_threads < m_concurrency)
......@@ -703,8 +703,8 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
if (min_threads < max_threads)
{
m_maintaince_timer_task.reset(new timer_generic(thread_pool_generic::maintainence_func, this, nullptr));
m_maintaince_timer_task->set_time((int)m_timer_interval.count(), (int)m_timer_interval.count());
m_maintenance_timer_task.reset(new timer_generic(thread_pool_generic::maintenance_func, this, nullptr));
m_maintenance_timer_task->set_time((int)m_timer_interval.count(), (int)m_timer_interval.count());
}
}
......@@ -792,7 +792,7 @@ thread_pool_generic::~thread_pool_generic()
m_aio.reset();
/* Also stop the maintanence task early. */
m_maintaince_timer_task.reset();
m_maintenance_timer_task.reset();
std::unique_lock<std::mutex> lk(m_mtx);
m_in_shutdown= true;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment