Commit 016c35a7 authored by Jan Lindström's avatar Jan Lindström

MDEV-13690: Remove unnecessary innodb_use_mtflush, innodb_mtflush_threads...

MDEV-13690: Remove unnecessary innodb_use_mtflush, innodb_mtflush_threads parameters and related code

Users can use innodb-page-cleaners instead.
parent 9ff7129f
select @@global.innodb_mtflush_threads;
@@global.innodb_mtflush_threads
8
select @@session.innodb_mtflush_threads;
ERROR HY000: Variable 'innodb_mtflush_threads' is a GLOBAL variable
show global variables like 'innodb_mtflush_threads';
Variable_name Value
innodb_mtflush_threads 8
show session variables like 'innodb_mtflush_threads';
Variable_name Value
innodb_mtflush_threads 8
select * from information_schema.global_variables where variable_name='innodb_mtflush_threads';
VARIABLE_NAME VARIABLE_VALUE
INNODB_MTFLUSH_THREADS 8
select * from information_schema.session_variables where variable_name='innodb_mtflush_threads';
VARIABLE_NAME VARIABLE_VALUE
INNODB_MTFLUSH_THREADS 8
set global innodb_mtflush_threads=1;
ERROR HY000: Variable 'innodb_mtflush_threads' is a read only variable
set session innodb_mtflush_threads=1;
ERROR HY000: Variable 'innodb_mtflush_threads' is a read only variable
select @@global.innodb_use_mtflush;
@@global.innodb_use_mtflush
0
select @@session.innodb_use_mtflush;
ERROR HY000: Variable 'innodb_use_mtflush' is a GLOBAL variable
show global variables like 'innodb_use_mtflush';
Variable_name Value
innodb_use_mtflush OFF
show session variables like 'innodb_use_mtflush';
Variable_name Value
innodb_use_mtflush OFF
select * from information_schema.global_variables where variable_name='innodb_use_mtflush';
VARIABLE_NAME VARIABLE_VALUE
INNODB_USE_MTFLUSH OFF
select * from information_schema.session_variables where variable_name='innodb_use_mtflush';
VARIABLE_NAME VARIABLE_VALUE
INNODB_USE_MTFLUSH OFF
set global innodb_use_mtflush=1;
ERROR HY000: Variable 'innodb_use_mtflush' is a read only variable
set session innodb_use_mtflush=1;
ERROR HY000: Variable 'innodb_use_mtflush' is a read only variable
...@@ -364,15 +364,6 @@ ...@@ -364,15 +364,6 @@
VARIABLE_COMMENT Maximum delay of user threads in micro-seconds VARIABLE_COMMENT Maximum delay of user threads in micro-seconds
NUMERIC_MIN_VALUE 0 NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 10000000 NUMERIC_MAX_VALUE 10000000
@@ -1636,7 +1636,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 8
VARIABLE_SCOPE GLOBAL
-VARIABLE_TYPE BIGINT
+VARIABLE_TYPE INT
VARIABLE_COMMENT DEPRECATED. Number of multi-threaded flush threads
NUMERIC_MIN_VALUE 1
NUMERIC_MAX_VALUE 64
@@ -1692,10 +1692,10 @@ @@ -1692,10 +1692,10 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 0 DEFAULT_VALUE 0
......
...@@ -1630,20 +1630,6 @@ NUMERIC_BLOCK_SIZE NULL ...@@ -1630,20 +1630,6 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST NULL ENUM_VALUE_LIST NULL
READ_ONLY NO READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME INNODB_MTFLUSH_THREADS
SESSION_VALUE NULL
GLOBAL_VALUE 8
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 8
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BIGINT
VARIABLE_COMMENT DEPRECATED. Number of multi-threaded flush threads
NUMERIC_MIN_VALUE 1
NUMERIC_MAX_VALUE 64
NUMERIC_BLOCK_SIZE 0
ENUM_VALUE_LIST NULL
READ_ONLY YES
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME INNODB_OLD_BLOCKS_PCT VARIABLE_NAME INNODB_OLD_BLOCKS_PCT
SESSION_VALUE NULL SESSION_VALUE NULL
GLOBAL_VALUE 37 GLOBAL_VALUE 37
...@@ -2442,20 +2428,6 @@ NUMERIC_BLOCK_SIZE NULL ...@@ -2442,20 +2428,6 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST OFF,ON ENUM_VALUE_LIST OFF,ON
READ_ONLY YES READ_ONLY YES
COMMAND_LINE_ARGUMENT NONE COMMAND_LINE_ARGUMENT NONE
VARIABLE_NAME INNODB_USE_MTFLUSH
SESSION_VALUE NULL
GLOBAL_VALUE OFF
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE OFF
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BOOLEAN
VARIABLE_COMMENT DEPRECATED. Use multi-threaded flush. Default FALSE.
NUMERIC_MIN_VALUE NULL
NUMERIC_MAX_VALUE NULL
NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST OFF,ON
READ_ONLY YES
COMMAND_LINE_ARGUMENT NONE
VARIABLE_NAME INNODB_VERSION VARIABLE_NAME INNODB_VERSION
SESSION_VALUE NULL SESSION_VALUE NULL
GLOBAL_VALUE 5.7.19 GLOBAL_VALUE 5.7.19
......
--source include/have_innodb.inc
# bool readonly
#
# show values;
#
select @@global.innodb_mtflush_threads;
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
select @@session.innodb_mtflush_threads;
show global variables like 'innodb_mtflush_threads';
show session variables like 'innodb_mtflush_threads';
select * from information_schema.global_variables where variable_name='innodb_mtflush_threads';
select * from information_schema.session_variables where variable_name='innodb_mtflush_threads';
#
# show that it's read-only
#
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
set global innodb_mtflush_threads=1;
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
set session innodb_mtflush_threads=1;
--source include/have_innodb.inc
# bool readonly
#
# show values;
#
select @@global.innodb_use_mtflush;
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
select @@session.innodb_use_mtflush;
show global variables like 'innodb_use_mtflush';
show session variables like 'innodb_use_mtflush';
select * from information_schema.global_variables where variable_name='innodb_use_mtflush';
select * from information_schema.session_variables where variable_name='innodb_use_mtflush';
#
# show that it's read-only
#
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
set global innodb_use_mtflush=1;
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
set session innodb_use_mtflush=1;
...@@ -36,7 +36,6 @@ SET(INNOBASE_SOURCES ...@@ -36,7 +36,6 @@ SET(INNOBASE_SOURCES
buf/buf0flu.cc buf/buf0flu.cc
buf/buf0lru.cc buf/buf0lru.cc
buf/buf0rea.cc buf/buf0rea.cc
buf/buf0mtflu.cc
data/data0data.cc data/data0data.cc
data/data0type.cc data/data0type.cc
dict/dict0boot.cc dict/dict0boot.cc
......
...@@ -31,7 +31,6 @@ Created 11/11/1995 Heikki Tuuri ...@@ -31,7 +31,6 @@ Created 11/11/1995 Heikki Tuuri
#include "buf0flu.h" #include "buf0flu.h"
#include "buf0buf.h" #include "buf0buf.h"
#include "buf0mtflu.h"
#include "buf0checksum.h" #include "buf0checksum.h"
#include "srv0start.h" #include "srv0start.h"
#include "srv0srv.h" #include "srv0srv.h"
...@@ -2142,10 +2141,6 @@ buf_flush_lists( ...@@ -2142,10 +2141,6 @@ buf_flush_lists(
ulint n_flushed = 0; ulint n_flushed = 0;
bool success = true; bool success = true;
if (buf_mtflu_init_done()) {
return(buf_mtflu_flush_list(min_n, lsn_limit, n_processed));
}
if (n_processed) { if (n_processed) {
*n_processed = 0; *n_processed = 0;
} }
...@@ -2304,11 +2299,6 @@ buf_flush_LRU_list( ...@@ -2304,11 +2299,6 @@ buf_flush_LRU_list(
memset(&n, 0, sizeof(flush_counters_t)); memset(&n, 0, sizeof(flush_counters_t));
if(buf_mtflu_init_done())
{
return(buf_mtflu_flush_LRU_tail());
}
ut_ad(buf_pool); ut_ad(buf_pool);
/* srv_LRU_scan_depth can be arbitrarily large value. /* srv_LRU_scan_depth can be arbitrarily large value.
We cap it with current LRU size. */ We cap it with current LRU size. */
......
/*****************************************************************************
Copyright (C) 2013, 2014, Fusion-io. All Rights Reserved.
Copyright (C) 2013, 2017, MariaDB Corporation. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*****************************************************************************/
/******************************************************************//**
@file buf/buf0mtflu.cc
Multi-threaded flush method implementation
Created 06/11/2013 Dhananjoy Das DDas@fusionio.com
Modified 12/12/2013 Jan Lindström jan.lindstrom@skysql.com
Modified 03/02/2014 Dhananjoy Das DDas@fusionio.com
Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
***********************************************************************/
#include "buf0buf.h"
#include "buf0flu.h"
#include "buf0mtflu.h"
#include "buf0checksum.h"
#include "srv0start.h"
#include "srv0srv.h"
#include "page0zip.h"
#include "ut0byte.h"
#include "ut0lst.h"
#include "page0page.h"
#include "fil0fil.h"
#include "buf0lru.h"
#include "buf0rea.h"
#include "ibuf0ibuf.h"
#include "log0log.h"
#include "os0file.h"
#include "trx0sys.h"
#include "srv0mon.h"
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
#include "fil0pagecompress.h"
#define MT_COMP_WATER_MARK 50
/** Time to wait for a message. */
#define MT_WAIT_IN_USECS 5000000
/* Work item status */
typedef enum wrk_status {
WRK_ITEM_UNSET=0, /*!< Work item is not set */
WRK_ITEM_START=1, /*!< Processing of work item has started */
WRK_ITEM_DONE=2, /*!< Processing is done usually set to
SUCCESS/FAILED */
WRK_ITEM_SUCCESS=2, /*!< Work item successfully processed */
WRK_ITEM_FAILED=3, /*!< Work item process failed */
WRK_ITEM_EXIT=4, /*!< Exiting */
WRK_ITEM_SET=5, /*!< Work item is set */
WRK_ITEM_STATUS_UNDEFINED
} wrk_status_t;
/* Work item task type */
typedef enum mt_wrk_tsk {
MT_WRK_NONE=0, /*!< Exit queue-wait */
MT_WRK_WRITE=1, /*!< Flush operation */
MT_WRK_READ=2, /*!< Read operation */
MT_WRK_UNDEFINED
} mt_wrk_tsk_t;
/* Work thread status */
typedef enum wthr_status {
WTHR_NOT_INIT=0, /*!< Work thread not initialized */
WTHR_INITIALIZED=1, /*!< Work thread initialized */
WTHR_SIG_WAITING=2, /*!< Work thread wating signal */
WTHR_RUNNING=3, /*!< Work thread running */
WTHR_NO_WORK=4, /*!< Work thread has no work */
WTHR_KILL_IT=5, /*!< Work thread should exit */
WTHR_STATUS_UNDEFINED
} wthr_status_t;
/* Write work task */
typedef struct wr_tsk {
buf_pool_t *buf_pool; /*!< buffer-pool instance */
buf_flush_t flush_type; /*!< flush-type for buffer-pool
flush operation */
ulint min; /*!< minimum number of pages
requested to be flushed */
lsn_t lsn_limit; /*!< lsn limit for the buffer-pool
flush operation */
} wr_tsk_t;
/* Read work task */
typedef struct rd_tsk {
buf_pool_t *page_pool; /*!< list of pages to decompress; */
} rd_tsk_t;
/* Work item */
typedef struct wrk_itm
{
mt_wrk_tsk_t tsk; /*!< Task type. Based on task-type
one of the entries wr_tsk/rd_tsk
will be used */
wr_tsk_t wr; /*!< Flush page list */
rd_tsk_t rd; /*!< Decompress page list */
ulint n_flushed; /*!< Number of flushed pages */
ulint n_evicted; /*!< Number of evicted pages */
os_thread_id_t id_usr; /*!< Thread-id currently working */
wrk_status_t wi_status; /*!< Work item status */
mem_heap_t *wheap; /*!< Heap were to allocate memory
for queue nodes */
mem_heap_t *rheap;
} wrk_t;
struct thread_data_t
{
os_thread_id_t wthread_id; /*!< Identifier */
wthr_status_t wt_status; /*!< Worker thread status */
};
/** Flush dirty pages when multi-threaded flush is used. */
extern "C" UNIV_INTERN
os_thread_ret_t
DECLARE_THREAD(mtflush_io_thread)(void* arg);
/** Thread syncronization data */
struct thread_sync_t
{
/** Constructor */
thread_sync_t(ulint n_threads, mem_heap_t* wheap, mem_heap_t* rheap) :
thread_global_mtx(), n_threads(n_threads),
wq(ib_wqueue_create()),
wr_cq(ib_wqueue_create()),
rd_cq(ib_wqueue_create()),
wheap(wheap), rheap(rheap), gwt_status(),
thread_data(static_cast<thread_data_t*>(
mem_heap_zalloc(wheap, n_threads
* sizeof *thread_data)))
{
ut_a(wq);
ut_a(wr_cq);
ut_a(rd_cq);
ut_a(thread_data);
mutex_create(LATCH_ID_MTFLUSH_THREAD_MUTEX,
&thread_global_mtx);
/* Create threads for page-compression-flush */
for(ulint i = 0; i < n_threads; i++) {
thread_data[i].wt_status = WTHR_INITIALIZED;
os_thread_create(mtflush_io_thread, this,
&thread_data[i].wthread_id);
}
}
/** Destructor */
~thread_sync_t()
{
ut_a(ib_wqueue_is_empty(wq));
ut_a(ib_wqueue_is_empty(wr_cq));
ut_a(ib_wqueue_is_empty(rd_cq));
/* Free all queues */
ib_wqueue_free(wq);
ib_wqueue_free(wr_cq);
ib_wqueue_free(rd_cq);
mutex_free(&thread_global_mtx);
mem_heap_free(rheap);
mem_heap_free(wheap);
}
/* Global variables used by all threads */
ib_mutex_t thread_global_mtx; /*!< Mutex used protecting below
variables */
ulint n_threads; /*!< Number of threads */
ib_wqueue_t *wq; /*!< Work Queue */
ib_wqueue_t *wr_cq; /*!< Write Completion Queue */
ib_wqueue_t *rd_cq; /*!< Read Completion Queue */
mem_heap_t* wheap; /*!< Work heap where memory
is allocated */
mem_heap_t* rheap; /*!< Work heap where memory
is allocated */
wthr_status_t gwt_status; /*!< Global thread status */
/* Variables used by only one thread at a time */
thread_data_t* thread_data; /*!< Thread specific data */
};
static thread_sync_t* mtflush_ctx;
static ib_mutex_t mtflush_mtx;
/******************************************************************//**
Return true if multi-threaded flush is initialized
@return true if initialized */
bool
buf_mtflu_init_done(void)
/*=====================*/
{
return(mtflush_ctx != NULL);
}
/******************************************************************//**
Fush buffer pool instance.
@return number of flushed pages, or 0 if error happened
*/
static
ulint
buf_mtflu_flush_pool_instance(
/*==========================*/
wrk_t *work_item) /*!< inout: work item to be flushed */
{
flush_counters_t n;
ut_a(work_item != NULL);
ut_a(work_item->wr.buf_pool != NULL);
if (!buf_flush_start(work_item->wr.buf_pool, work_item->wr.flush_type)) {
/* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer
pool means we cannot guarantee that all pages
up to lsn_limit has been flushed. We can
return right now with failure or we can try
to flush remaining buffer pools up to the
lsn_limit. We attempt to flush other buffer
pools based on the assumption that it will
help in the retry which will follow the
failure. */
#ifdef UNIV_MTFLUSH_DEBUG
fprintf(stderr, "InnoDB: Note: buf flush start failed there is already active flush for this buffer pool.\n");
#endif
return 0;
}
memset(&n, 0, sizeof(flush_counters_t));
if (work_item->wr.flush_type == BUF_FLUSH_LRU) {
/* srv_LRU_scan_depth can be arbitrarily large value.
* We cap it with current LRU size.
*/
buf_pool_mutex_enter(work_item->wr.buf_pool);
work_item->wr.min = UT_LIST_GET_LEN(work_item->wr.buf_pool->LRU);
buf_pool_mutex_exit(work_item->wr.buf_pool);
work_item->wr.min = ut_min((ulint)srv_LRU_scan_depth,(ulint)work_item->wr.min);
}
buf_flush_batch(work_item->wr.buf_pool,
work_item->wr.flush_type,
work_item->wr.min,
work_item->wr.lsn_limit,
&n);
buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type);
buf_flush_common(work_item->wr.flush_type, n.flushed);
work_item->n_flushed = n.flushed;
work_item->n_evicted = n.evicted;
return work_item->n_flushed;
}
/******************************************************************//**
Worker function to wait for work items and processing them and
sending reply back.
*/
static
void
mtflush_service_io(
/*===============*/
thread_sync_t* mtflush_io, /*!< inout: multi-threaded flush
syncronization data */
thread_data_t* thread_data) /* Thread status data */
{
wrk_t *work_item = NULL;
ulint n_flushed=0;
ut_a(mtflush_io != NULL);
ut_a(thread_data != NULL);
thread_data->wt_status = WTHR_SIG_WAITING;
work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq);
if (work_item == NULL) {
work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq);
}
if (work_item) {
thread_data->wt_status = WTHR_RUNNING;
} else {
/* Thread did not get any work */
thread_data->wt_status = WTHR_NO_WORK;
return;
}
if (work_item->wi_status != WRK_ITEM_EXIT) {
work_item->wi_status = WRK_ITEM_SET;
}
#ifdef UNIV_MTFLUSH_DEBUG
ut_a(work_item->id_usr == 0);
#endif
work_item->id_usr = os_thread_get_curr_id();
/* This works as a producer/consumer model, where in tasks are
* inserted into the work-queue (wq) and completions are based
* on the type of operations performed and as a result the WRITE/
* compression/flush operation completions get posted to wr_cq.
* And READ/decompress operations completions get posted to rd_cq.
* in future we may have others.
*/
switch(work_item->tsk) {
case MT_WRK_NONE:
ut_a(work_item->wi_status == WRK_ITEM_EXIT);
work_item->wi_status = WRK_ITEM_EXIT;
ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap);
thread_data->wt_status = WTHR_KILL_IT;
break;
case MT_WRK_WRITE:
ut_a(work_item->wi_status == WRK_ITEM_SET);
work_item->wi_status = WRK_ITEM_START;
/* Process work item */
if (0 == (n_flushed = buf_mtflu_flush_pool_instance(work_item))) {
work_item->wi_status = WRK_ITEM_FAILED;
}
work_item->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap);
break;
case MT_WRK_READ:
ut_a(0);
break;
default:
/* None other than Write/Read handling planned */
ut_a(0);
break;
}
}
/** Flush dirty pages when multi-threaded flush is used. */
extern "C" UNIV_INTERN
os_thread_ret_t
DECLARE_THREAD(mtflush_io_thread)(void* arg)
{
thread_sync_t *mtflush_io = ((thread_sync_t *)arg);
thread_data_t *this_thread_data = NULL;
ulint i;
/* Find correct slot for this thread */
mutex_enter(&(mtflush_io->thread_global_mtx));
for(i=0; i < mtflush_io->n_threads; i ++) {
if (mtflush_io->thread_data[i].wthread_id == os_thread_get_curr_id()) {
break;
}
}
ut_a(i <= mtflush_io->n_threads);
this_thread_data = &mtflush_io->thread_data[i];
mutex_exit(&(mtflush_io->thread_global_mtx));
while (TRUE) {
#ifdef UNIV_MTFLUSH_DEBUG
fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n",
os_thread_get_curr_id(),
ib_wqueue_len(mtflush_io->wq),
ib_wqueue_len(mtflush_io->wr_cq));
#endif /* UNIV_MTFLUSH_DEBUG */
mtflush_service_io(mtflush_io, this_thread_data);
if (this_thread_data->wt_status == WTHR_KILL_IT) {
break;
}
}
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
}
/******************************************************************//**
Add exit work item to work queue to signal multi-threded flush
threads that they should exit.
*/
void
buf_mtflu_io_thread_exit(void)
/*==========================*/
{
ulint i;
thread_sync_t* mtflush_io = mtflush_ctx;
wrk_t* work_item = NULL;
ut_a(mtflush_io != NULL);
/* Allocate work items for shutdown message */
work_item = (wrk_t*)mem_heap_alloc(mtflush_io->wheap, sizeof(wrk_t)*srv_mtflush_threads);
/* Confirm if the io-thread KILL is in progress, bailout */
if (mtflush_io->gwt_status == WTHR_KILL_IT) {
return;
}
mtflush_io->gwt_status = WTHR_KILL_IT;
/* This lock is to safequard against timing bug: flush request take
this mutex before sending work items to be processed by flush
threads. Inside flush thread we assume that work queue contains only
a constant number of items. Thus, we may not install new work items
below before all previous ones are processed. This mutex is released
by flush request after all work items sent to flush threads have
been processed. Thus, we can get this mutex if and only if work
queue is empty. */
mutex_enter(&mtflush_mtx);
/* Make sure the work queue is empty */
ut_a(ib_wqueue_is_empty(mtflush_io->wq));
/* Send one exit work item/thread */
for (i=0; i < (ulint)srv_mtflush_threads; i++) {
work_item[i].tsk = MT_WRK_NONE;
work_item[i].wi_status = WRK_ITEM_EXIT;
work_item[i].wheap = mtflush_io->wheap;
work_item[i].rheap = mtflush_io->rheap;
work_item[i].id_usr = 0;
ib_wqueue_add(mtflush_io->wq,
(void *)&(work_item[i]),
mtflush_io->wheap);
}
/* Requests sent */
mutex_exit(&mtflush_mtx);
/* Wait until all work items on a work queue are processed */
while(!ib_wqueue_is_empty(mtflush_io->wq)) {
/* Wait */
os_thread_sleep(MT_WAIT_IN_USECS);
}
ut_a(ib_wqueue_is_empty(mtflush_io->wq));
/* Collect all work done items */
for (i=0; i < (ulint)srv_mtflush_threads;) {
wrk_t* work_item = NULL;
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, MT_WAIT_IN_USECS);
/* If we receive reply to work item and it's status is exit,
thead has processed this message and existed */
if (work_item && work_item->wi_status == WRK_ITEM_EXIT) {
i++;
}
}
/* Wait about 1/2 sec to allow threads really exit */
os_thread_sleep(MT_WAIT_IN_USECS);
/* Make sure that work queue is empty */
while(!ib_wqueue_is_empty(mtflush_io->wq))
{
ib_wqueue_nowait(mtflush_io->wq);
}
mtflush_ctx->~thread_sync_t();
mtflush_ctx = NULL;
mutex_free(&mtflush_mtx);
}
/******************************************************************//**
Initialize multi-threaded flush thread syncronization data.
@return Initialized multi-threaded flush thread syncroniztion data. */
void*
buf_mtflu_handler_init(
/*===================*/
ulint n_threads, /*!< in: Number of threads to create */
ulint wrk_cnt) /*!< in: Number of work items */
{
mem_heap_t* mtflush_heap;
mem_heap_t* mtflush_heap2;
/* Create heap, work queue, write completion queue, read
completion queue for multi-threaded flush, and init
handler. */
mtflush_heap = mem_heap_create(0);
ut_a(mtflush_heap != NULL);
mtflush_heap2 = mem_heap_create(0);
ut_a(mtflush_heap2 != NULL);
mutex_create(LATCH_ID_MTFLUSH_MUTEX, &mtflush_mtx);
mtflush_ctx = new (mem_heap_zalloc(mtflush_heap, sizeof *mtflush_ctx))
thread_sync_t(n_threads, mtflush_heap, mtflush_heap2);
return((void *)mtflush_ctx);
}
/******************************************************************//**
Flush buffer pool instances.
@return number of pages flushed. */
ulint
buf_mtflu_flush_work_items(
/*=======================*/
ulint buf_pool_inst, /*!< in: Number of buffer pool instances */
flush_counters_t *per_pool_cnt, /*!< out: Number of pages
flushed or evicted /instance */
buf_flush_t flush_type, /*!< in: Type of flush */
ulint min_n, /*!< in: Wished minimum number of
blocks to be flushed */
lsn_t lsn_limit) /*!< in: All blocks whose
oldest_modification is smaller than
this should be flushed (if their
number does not exceed min_n) */
{
ulint n_flushed=0, i;
mem_heap_t* work_heap;
mem_heap_t* reply_heap;
wrk_t work_item[MTFLUSH_MAX_WORKER];
if (mtflush_ctx->gwt_status == WTHR_KILL_IT) {
return 0;
}
/* Allocate heap where all work items used and queue
node items areallocated */
work_heap = mem_heap_create(0);
reply_heap = mem_heap_create(0);
for(i=0;i<buf_pool_inst; i++) {
work_item[i].tsk = MT_WRK_WRITE;
work_item[i].wr.buf_pool = buf_pool_from_array(i);
work_item[i].wr.flush_type = flush_type;
work_item[i].wr.min = min_n;
work_item[i].wr.lsn_limit = lsn_limit;
work_item[i].wi_status = WRK_ITEM_UNSET;
work_item[i].wheap = work_heap;
work_item[i].rheap = reply_heap;
work_item[i].n_flushed = 0;
work_item[i].n_evicted = 0;
work_item[i].id_usr = 0;
ib_wqueue_add(mtflush_ctx->wq,
(void *)(work_item + i),
work_heap);
}
/* wait on the completion to arrive */
for(i=0; i< buf_pool_inst;) {
wrk_t *done_wi = NULL;
done_wi = (wrk_t *)ib_wqueue_wait(mtflush_ctx->wr_cq);
if (done_wi != NULL) {
per_pool_cnt[i].flushed = done_wi->n_flushed;
per_pool_cnt[i].evicted = done_wi->n_evicted;
#ifdef UNIV_MTFLUSH_DEBUG
if((int)done_wi->id_usr == 0 &&
(done_wi->wi_status == WRK_ITEM_SET ||
done_wi->wi_status == WRK_ITEM_UNSET)) {
fprintf(stderr,
"**Set/Unused work_item[%lu] flush_type=%d\n",
i,
done_wi->wr.flush_type);
ut_a(0);
}
#endif
n_flushed+= done_wi->n_flushed+done_wi->n_evicted;
i++;
}
}
/* Release used work_items and queue nodes */
mem_heap_free(work_heap);
mem_heap_free(reply_heap);
return(n_flushed);
}
/*******************************************************************//**
Multi-threaded version of buf_flush_list
*/
bool
buf_mtflu_flush_list(
/*=================*/
ulint min_n, /*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all
blocks whose oldest_modification is
smaller than this should be flushed
(if their number does not exceed
min_n), otherwise ignored */
ulint* n_processed) /*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
{
ulint i;
bool success = true;
flush_counters_t cnt[MTFLUSH_MAX_WORKER];
if (n_processed) {
*n_processed = 0;
}
if (min_n != ULINT_MAX) {
/* Ensure that flushing is spread evenly amongst the
buffer pool instances. When min_n is ULINT_MAX
we need to flush everything up to the lsn limit
so no limit here. */
min_n = (min_n + srv_buf_pool_instances - 1)
/ srv_buf_pool_instances;
}
/* This lock is to safequard against re-entry if any. */
mutex_enter(&mtflush_mtx);
buf_mtflu_flush_work_items(srv_buf_pool_instances,
cnt, BUF_FLUSH_LIST,
min_n, lsn_limit);
mutex_exit(&mtflush_mtx);
for (i = 0; i < srv_buf_pool_instances; i++) {
if (n_processed) {
*n_processed += cnt[i].flushed+cnt[i].evicted;
}
if (cnt[i].flushed) {
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_FLUSH_BATCH_TOTAL_PAGE,
MONITOR_FLUSH_BATCH_COUNT,
MONITOR_FLUSH_BATCH_PAGES,
cnt[i].flushed);
}
if(cnt[i].evicted) {
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_LRU_BATCH_EVICT_TOTAL_PAGE,
MONITOR_LRU_BATCH_EVICT_COUNT,
MONITOR_LRU_BATCH_EVICT_PAGES,
cnt[i].evicted);
}
}
#ifdef UNIV_MTFLUSH_DEBUG
fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu ]\n",
__FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed);
#endif
return(success);
}
/*********************************************************************//**
Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
* Flush dirty pages at the tail of LRU to the disk
The depth to which we scan each buffer pool is controlled by dynamic
config parameter innodb_LRU_scan_depth.
@return total pages flushed */
UNIV_INTERN
ulint
buf_mtflu_flush_LRU_tail(void)
/*==========================*/
{
ulint total_flushed=0, i;
flush_counters_t cnt[MTFLUSH_MAX_WORKER];
ut_a(buf_mtflu_init_done());
/* At shutdown do not send requests anymore */
if (!mtflush_ctx || mtflush_ctx->gwt_status == WTHR_KILL_IT) {
return (total_flushed);
}
/* This lock is to safeguard against re-entry if any */
mutex_enter(&mtflush_mtx);
buf_mtflu_flush_work_items(srv_buf_pool_instances,
cnt, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
mutex_exit(&mtflush_mtx);
for (i = 0; i < srv_buf_pool_instances; i++) {
total_flushed += cnt[i].flushed+cnt[i].evicted;
if (cnt[i].flushed) {
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_LRU_BATCH_FLUSH_TOTAL_PAGE,
MONITOR_LRU_BATCH_FLUSH_COUNT,
MONITOR_LRU_BATCH_FLUSH_PAGES,
cnt[i].flushed);
}
if(cnt[i].evicted) {
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_LRU_BATCH_EVICT_TOTAL_PAGE,
MONITOR_LRU_BATCH_EVICT_COUNT,
MONITOR_LRU_BATCH_EVICT_PAGES,
cnt[i].evicted);
}
}
#if UNIV_MTFLUSH_DEBUG
fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu ]\n", (
srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed);
#endif
return(total_flushed);
}
/*********************************************************************//**
Set correct thread identifiers to io thread array based on
information we have. */
void
buf_mtflu_set_thread_ids(
/*=====================*/
ulint n_threads, /*!<in: Number of threads to fill */
void* ctx, /*!<in: thread context */
os_thread_id_t* thread_ids) /*!<in: thread id array */
{
thread_sync_t *mtflush_io = ((thread_sync_t *)ctx);
ulint i;
ut_a(mtflush_io != NULL);
ut_a(thread_ids != NULL);
for(i = 0; i < n_threads; i++) {
thread_ids[i] = mtflush_io->thread_data[i].wthread_id;
}
}
...@@ -3645,16 +3645,6 @@ static uint innobase_partition_flags() ...@@ -3645,16 +3645,6 @@ static uint innobase_partition_flags()
return (0); return (0);
} }
static const char* deprecated_use_mtflush
= "Using innodb_use_mtflush is deprecated"
" and the parameter will be removed in MariaDB 10.3."
" Use innodb-page-cleaners instead. ";
static const char* deprecated_mtflush_threads
= "Using innodb_mtflush_threads is deprecated"
" and the parameter will be removed in MariaDB 10.3."
" Use innodb-page-cleaners instead. ";
/** Update log_checksum_algorithm_ptr with a pointer to the function /** Update log_checksum_algorithm_ptr with a pointer to the function
corresponding to whether checksums are enabled. corresponding to whether checksums are enabled.
@param[in,out] thd client session, or NULL if at startup @param[in,out] thd client session, or NULL if at startup
...@@ -3992,14 +3982,6 @@ innobase_init( ...@@ -3992,14 +3982,6 @@ innobase_init(
DBUG_RETURN(innobase_init_abort()); DBUG_RETURN(innobase_init_abort());
} }
if (srv_use_mtflush) {
ib::warn() << deprecated_use_mtflush;
}
if (srv_use_mtflush && srv_mtflush_threads != MTFLUSH_DEFAULT_WORKER) {
ib::warn() << deprecated_mtflush_threads;
}
if (innobase_change_buffering) { if (innobase_change_buffering) {
ulint use; ulint use;
...@@ -21012,20 +20994,6 @@ static MYSQL_SYSVAR_ENUM(compression_algorithm, innodb_compression_algorithm, ...@@ -21012,20 +20994,6 @@ static MYSQL_SYSVAR_ENUM(compression_algorithm, innodb_compression_algorithm,
PAGE_ZLIB_ALGORITHM, PAGE_ZLIB_ALGORITHM,
&page_compression_algorithms_typelib); &page_compression_algorithms_typelib);
static MYSQL_SYSVAR_LONG(mtflush_threads, srv_mtflush_threads,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"DEPRECATED. Number of multi-threaded flush threads",
NULL, NULL,
MTFLUSH_DEFAULT_WORKER, /* Default setting */
1, /* Minimum setting */
MTFLUSH_MAX_WORKER, /* Max setting */
0);
static MYSQL_SYSVAR_BOOL(use_mtflush, srv_use_mtflush,
PLUGIN_VAR_NOCMDARG | PLUGIN_VAR_READONLY,
"DEPRECATED. Use multi-threaded flush. Default FALSE.",
NULL, NULL, FALSE);
static MYSQL_SYSVAR_ULONG(fatal_semaphore_wait_threshold, srv_fatal_semaphore_wait_threshold, static MYSQL_SYSVAR_ULONG(fatal_semaphore_wait_threshold, srv_fatal_semaphore_wait_threshold,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"Maximum number of seconds that semaphore times out in InnoDB.", "Maximum number of seconds that semaphore times out in InnoDB.",
...@@ -21324,8 +21292,6 @@ static struct st_mysql_sys_var* innobase_system_variables[]= { ...@@ -21324,8 +21292,6 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
/* Table page compression feature */ /* Table page compression feature */
MYSQL_SYSVAR(compression_default), MYSQL_SYSVAR(compression_default),
MYSQL_SYSVAR(compression_algorithm), MYSQL_SYSVAR(compression_algorithm),
MYSQL_SYSVAR(mtflush_threads),
MYSQL_SYSVAR(use_mtflush),
/* Encryption feature */ /* Encryption feature */
MYSQL_SYSVAR(encrypt_tables), MYSQL_SYSVAR(encrypt_tables),
MYSQL_SYSVAR(encryption_threads), MYSQL_SYSVAR(encryption_threads),
......
/*****************************************************************************
Copyright (C) 2014 SkySQL Ab. All Rights Reserved.
Copyright (C) 2014 Fusion-io. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*****************************************************************************/
/******************************************************************//**
@file include/buf0mtflu.h
Multi-threadef flush method interface function prototypes
Created 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
Dhananjoy Das DDas@fusionio.com
***********************************************************************/
#ifndef buf0mtflu_h
#define buf0mtflu_h
/******************************************************************//**
Add exit work item to work queue to signal multi-threded flush
threads that they should exit.
*/
void
buf_mtflu_io_thread_exit(void);
/*===========================*/
/******************************************************************//**
Initialize multi-threaded flush thread syncronization data.
@return Initialized multi-threaded flush thread syncroniztion data. */
void*
buf_mtflu_handler_init(
/*===================*/
ulint n_threads, /*!< in: Number of threads to create */
ulint wrk_cnt); /*!< in: Number of work items */
/******************************************************************//**
Return true if multi-threaded flush is initialized
@return true if initialized, false if not */
bool
buf_mtflu_init_done(void);
/*======================*/
/*********************************************************************//**
Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
* Flush dirty pages at the tail of LRU to the disk
The depth to which we scan each buffer pool is controlled by dynamic
config parameter innodb_LRU_scan_depth.
@return total pages flushed */
UNIV_INTERN
ulint
buf_mtflu_flush_LRU_tail(void);
/*===========================*/
/*******************************************************************//**
Multi-threaded version of buf_flush_list
*/
bool
buf_mtflu_flush_list(
/*=================*/
ulint min_n, /*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all
blocks whose oldest_modification is
smaller than this should be flushed
(if their number does not exceed
min_n), otherwise ignored */
ulint* n_processed); /*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
/*********************************************************************//**
Set correct thread identifiers to io thread array based on
information we have. */
void
buf_mtflu_set_thread_ids(
/*=====================*/
ulint n_threads, /*!<in: Number of threads to fill */
void* ctx, /*!<in: thread context */
os_thread_id_t* thread_ids); /*!<in: thread id array */
#endif
...@@ -275,16 +275,6 @@ extern my_bool srv_use_atomic_writes; ...@@ -275,16 +275,6 @@ extern my_bool srv_use_atomic_writes;
/* Compression algorithm*/ /* Compression algorithm*/
extern ulong innodb_compression_algorithm; extern ulong innodb_compression_algorithm;
/* Number of flush threads */
#define MTFLUSH_MAX_WORKER 64
#define MTFLUSH_DEFAULT_WORKER 8
/* Number of threads used for multi-threaded flush */
extern long srv_mtflush_threads;
/* If this flag is TRUE, then we will use multi threaded flush. */
extern my_bool srv_use_mtflush;
/** TRUE if the server was successfully started */ /** TRUE if the server was successfully started */
extern bool srv_was_started; extern bool srv_was_started;
......
...@@ -379,8 +379,6 @@ enum latch_id_t { ...@@ -379,8 +379,6 @@ enum latch_id_t {
LATCH_ID_SCRUB_STAT_MUTEX, LATCH_ID_SCRUB_STAT_MUTEX,
LATCH_ID_DEFRAGMENT_MUTEX, LATCH_ID_DEFRAGMENT_MUTEX,
LATCH_ID_BTR_DEFRAGMENT_MUTEX, LATCH_ID_BTR_DEFRAGMENT_MUTEX,
LATCH_ID_MTFLUSH_THREAD_MUTEX,
LATCH_ID_MTFLUSH_MUTEX,
LATCH_ID_FIL_CRYPT_MUTEX, LATCH_ID_FIL_CRYPT_MUTEX,
LATCH_ID_FIL_CRYPT_STAT_MUTEX, LATCH_ID_FIL_CRYPT_STAT_MUTEX,
LATCH_ID_FIL_CRYPT_DATA_MUTEX, LATCH_ID_FIL_CRYPT_DATA_MUTEX,
......
...@@ -172,10 +172,6 @@ my_bool srv_numa_interleave; ...@@ -172,10 +172,6 @@ my_bool srv_numa_interleave;
my_bool srv_use_atomic_writes; my_bool srv_use_atomic_writes;
/** innodb_compression_algorithm; used with page compression */ /** innodb_compression_algorithm; used with page compression */
ulong innodb_compression_algorithm; ulong innodb_compression_algorithm;
/** innodb_mtflush_threads; number of threads used for multi-threaded flush */
long srv_mtflush_threads;
/** innodb_use_mtflush; whether to use multi threaded flush. */
my_bool srv_use_mtflush;
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/** Used by SET GLOBAL innodb_master_thread_disabled_debug = X. */ /** Used by SET GLOBAL innodb_master_thread_disabled_debug = X. */
......
...@@ -83,7 +83,6 @@ Created 2/16/1996 Heikki Tuuri ...@@ -83,7 +83,6 @@ Created 2/16/1996 Heikki Tuuri
#include "os0proc.h" #include "os0proc.h"
#include "buf0flu.h" #include "buf0flu.h"
#include "buf0rea.h" #include "buf0rea.h"
#include "buf0mtflu.h"
#include "dict0boot.h" #include "dict0boot.h"
#include "dict0load.h" #include "dict0load.h"
#include "dict0stats_bg.h" #include "dict0stats_bg.h"
...@@ -191,9 +190,7 @@ static ulint n[SRV_MAX_N_IO_THREADS + 6]; ...@@ -191,9 +190,7 @@ static ulint n[SRV_MAX_N_IO_THREADS + 6];
/** io_handler_thread identifiers, 32 is the maximum number of purge threads */ /** io_handler_thread identifiers, 32 is the maximum number of purge threads */
/** 6 is the ? */ /** 6 is the ? */
#define START_OLD_THREAD_CNT (SRV_MAX_N_IO_THREADS + 6 + 32) #define START_OLD_THREAD_CNT (SRV_MAX_N_IO_THREADS + 6 + 32)
static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32 + MTFLUSH_MAX_WORKER]; static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32];
/* Thread contex data for multi-threaded flush */
void *mtflush_ctx=NULL;
/** Thead handles */ /** Thead handles */
static os_thread_t thread_handles[SRV_MAX_N_IO_THREADS + 6 + 32]; static os_thread_t thread_handles[SRV_MAX_N_IO_THREADS + 6 + 32];
...@@ -1307,10 +1304,6 @@ srv_shutdown_all_bg_threads() ...@@ -1307,10 +1304,6 @@ srv_shutdown_all_bg_threads()
} }
os_event_set(buf_flush_event); os_event_set(buf_flush_event);
if (srv_use_mtflush) {
buf_mtflu_io_thread_exit();
}
} }
if (!os_thread_count) { if (!os_thread_count) {
...@@ -2633,19 +2626,6 @@ innobase_start_or_create_for_mysql() ...@@ -2633,19 +2626,6 @@ innobase_start_or_create_for_mysql()
if (!srv_read_only_mode) { if (!srv_read_only_mode) {
/* wake main loop of page cleaner up */ /* wake main loop of page cleaner up */
os_event_set(buf_flush_event); os_event_set(buf_flush_event);
if (srv_use_mtflush) {
/* Start multi-threaded flush threads */
mtflush_ctx = buf_mtflu_handler_init(
srv_mtflush_threads,
srv_buf_pool_instances);
/* Set up the thread ids */
buf_mtflu_set_thread_ids(
srv_mtflush_threads,
mtflush_ctx,
(thread_ids + 6 + 32));
}
} }
if (srv_print_verbose_log) { if (srv_print_verbose_log) {
......
...@@ -1552,10 +1552,6 @@ sync_latch_meta_init() ...@@ -1552,10 +1552,6 @@ sync_latch_meta_init()
PFS_NOT_INSTRUMENTED); PFS_NOT_INSTRUMENTED);
LATCH_ADD_MUTEX(BTR_DEFRAGMENT_MUTEX, SYNC_NO_ORDER_CHECK, LATCH_ADD_MUTEX(BTR_DEFRAGMENT_MUTEX, SYNC_NO_ORDER_CHECK,
PFS_NOT_INSTRUMENTED); PFS_NOT_INSTRUMENTED);
LATCH_ADD_MUTEX(MTFLUSH_THREAD_MUTEX, SYNC_NO_ORDER_CHECK,
PFS_NOT_INSTRUMENTED);
LATCH_ADD_MUTEX(MTFLUSH_MUTEX, SYNC_NO_ORDER_CHECK,
PFS_NOT_INSTRUMENTED);
LATCH_ADD_MUTEX(FIL_CRYPT_MUTEX, SYNC_NO_ORDER_CHECK, LATCH_ADD_MUTEX(FIL_CRYPT_MUTEX, SYNC_NO_ORDER_CHECK,
PFS_NOT_INSTRUMENTED); PFS_NOT_INSTRUMENTED);
LATCH_ADD_MUTEX(FIL_CRYPT_STAT_MUTEX, SYNC_NO_ORDER_CHECK, LATCH_ADD_MUTEX(FIL_CRYPT_STAT_MUTEX, SYNC_NO_ORDER_CHECK,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment