Commit c88a0d48 authored by Jan Lindström's avatar Jan Lindström

Temporal fix for flush thread hang.

Added option to disable multi-threaded flush with innodb_use_mtflush = 0
option, by default multi-threaded flush is used.

Updated innochecksum tool, still it does not support new checksums.
parent b620e736
......@@ -72,10 +72,24 @@ IF(CMAKE_SYSTEM_NAME STREQUAL "SunOS")
ENDIF()
ENDIF()
IF(WITH_INNOBASE_STORAGE_ENGINE)
# Add path to the InnoDB headers
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/innobase/include)
# We use the InnoDB code directly in case the code changes.
ADD_DEFINITIONS("-DUNIV_INNOCHECKSUM")
SET(INNOBASE_SOURCES
../storage/innobase/buf/buf0checksum.cc
../storage/innobase/ut/ut0crc32.cc
../storage/innobase/ut/ut0ut.cc
)
MYSQL_ADD_EXECUTABLE(innochecksum innochecksum.cc ${INNOBASE_SOURCES})
TARGET_LINK_LIBRARIES(innochecksum mysys mysys_ssl)
ENDIF()
MYSQL_ADD_EXECUTABLE(replace replace.c COMPONENT Server)
TARGET_LINK_LIBRARIES(replace mysys)
IF(UNIX)
MYSQL_ADD_EXECUTABLE(innochecksum innochecksum.c)
MYSQL_ADD_EXECUTABLE(resolve_stack_dump resolve_stack_dump.c)
TARGET_LINK_LIBRARIES(resolve_stack_dump mysys)
......
/*
Copyright (c) 2005, 2011, Oracle and/or its affiliates
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
InnoDB offline file checksum utility. 85% of the code in this file
was taken wholesale fron the InnoDB codebase.
The final 15% was originally written by Mark Smith of Danga
Interactive, Inc. <junior@danga.com>
Published with a permission.
*/
#include <my_global.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
/* all of these ripped from InnoDB code from MySQL 4.0.22 */
#define UT_HASH_RANDOM_MASK 1463735687
#define UT_HASH_RANDOM_MASK2 1653893711
#define FIL_PAGE_LSN 16
#define FIL_PAGE_FILE_FLUSH_LSN 26
#define FIL_PAGE_OFFSET 4
#define FIL_PAGE_DATA 38
#define FIL_PAGE_END_LSN_OLD_CHKSUM 8
#define FIL_PAGE_SPACE_OR_CHKSUM 0
#define UNIV_PAGE_SIZE (2 * 8192)
/* command line argument to do page checks (that's it) */
/* another argument to specify page ranges... seek to right spot and go from there */
typedef unsigned long int ulint;
/* innodb function in name; modified slightly to not have the ASM version (lots of #ifs that didn't apply) */
ulint mach_read_from_4(uchar *b)
{
return( ((ulint)(b[0]) << 24)
+ ((ulint)(b[1]) << 16)
+ ((ulint)(b[2]) << 8)
+ (ulint)(b[3])
);
}
ulint
ut_fold_ulint_pair(
/*===============*/
/* out: folded value */
ulint n1, /* in: ulint */
ulint n2) /* in: ulint */
{
return(((((n1 ^ n2 ^ UT_HASH_RANDOM_MASK2) << 8) + n1)
^ UT_HASH_RANDOM_MASK) + n2);
}
ulint
ut_fold_binary(
/*===========*/
/* out: folded value */
uchar* str, /* in: string of bytes */
ulint len) /* in: length */
{
ulint i;
ulint fold= 0;
for (i= 0; i < len; i++)
{
fold= ut_fold_ulint_pair(fold, (ulint)(*str));
str++;
}
return(fold);
}
ulint
buf_calc_page_new_checksum(
/*=======================*/
/* out: checksum */
uchar* page) /* in: buffer page */
{
ulint checksum;
/* Since the fields FIL_PAGE_FILE_FLUSH_LSN and ..._ARCH_LOG_NO
are written outside the buffer pool to the first pages of data
files, we have to skip them in the page checksum calculation.
We must also skip the field FIL_PAGE_SPACE_OR_CHKSUM where the
checksum is stored, and also the last 8 bytes of page because
there we store the old formula checksum. */
checksum= ut_fold_binary(page + FIL_PAGE_OFFSET,
FIL_PAGE_FILE_FLUSH_LSN - FIL_PAGE_OFFSET)
+ ut_fold_binary(page + FIL_PAGE_DATA,
UNIV_PAGE_SIZE - FIL_PAGE_DATA
- FIL_PAGE_END_LSN_OLD_CHKSUM);
checksum= checksum & 0xFFFFFFFF;
return(checksum);
}
ulint
buf_calc_page_old_checksum(
/*=======================*/
/* out: checksum */
uchar* page) /* in: buffer page */
{
ulint checksum;
checksum= ut_fold_binary(page, FIL_PAGE_FILE_FLUSH_LSN);
checksum= checksum & 0xFFFFFFFF;
return(checksum);
}
int main(int argc, char **argv)
{
FILE *f; /* our input file */
uchar *p; /* storage of pages read */
int bytes; /* bytes read count */
ulint ct; /* current page number (0 based) */
int now; /* current time */
int lastt; /* last time */
ulint oldcsum, oldcsumfield, csum, csumfield, logseq, logseqfield; /* ulints for checksum storage */
struct stat st; /* for stat, if you couldn't guess */
unsigned long long int size; /* size of file (has to be 64 bits) */
ulint pages; /* number of pages in file */
ulint start_page= 0, end_page= 0, use_end_page= 0; /* for starting and ending at certain pages */
off_t offset= 0;
int just_count= 0; /* if true, just print page count */
int verbose= 0;
int debug= 0;
int c;
int fd;
/* remove arguments */
while ((c= getopt(argc, argv, "cvds:e:p:")) != -1)
{
switch (c)
{
case 'v':
verbose= 1;
break;
case 'c':
just_count= 1;
break;
case 's':
start_page= atoi(optarg);
break;
case 'e':
end_page= atoi(optarg);
use_end_page= 1;
break;
case 'p':
start_page= atoi(optarg);
end_page= atoi(optarg);
use_end_page= 1;
break;
case 'd':
debug= 1;
break;
case ':':
fprintf(stderr, "option -%c requires an argument\n", optopt);
return 1;
break;
case '?':
fprintf(stderr, "unrecognized option: -%c\n", optopt);
return 1;
break;
}
}
/* debug implies verbose... */
if (debug) verbose= 1;
/* make sure we have the right arguments */
if (optind >= argc)
{
printf("InnoDB offline file checksum utility.\n");
printf("usage: %s [-c] [-s <start page>] [-e <end page>] [-p <page>] [-v] [-d] <filename>\n", argv[0]);
printf("\t-c\tprint the count of pages in the file\n");
printf("\t-s n\tstart on this page number (0 based)\n");
printf("\t-e n\tend at this page number (0 based)\n");
printf("\t-p n\tcheck only this page (0 based)\n");
printf("\t-v\tverbose (prints progress every 5 seconds)\n");
printf("\t-d\tdebug mode (prints checksums for each page)\n");
return 1;
}
/* stat the file to get size and page count */
if (stat(argv[optind], &st))
{
perror("error statting file");
return 1;
}
size= st.st_size;
pages= size / UNIV_PAGE_SIZE;
if (just_count)
{
printf("%lu\n", pages);
return 0;
}
else if (verbose)
{
printf("file %s = %llu bytes (%lu pages)...\n", argv[optind], size, pages);
printf("checking pages in range %lu to %lu\n", start_page, use_end_page ? end_page : (pages - 1));
}
/* open the file for reading */
f= fopen(argv[optind], "r");
if (!f)
{
perror("error opening file");
return 1;
}
/* seek to the necessary position */
if (start_page)
{
fd= fileno(f);
if (!fd)
{
perror("unable to obtain file descriptor number");
return 1;
}
offset= (off_t)start_page * (off_t)UNIV_PAGE_SIZE;
if (lseek(fd, offset, SEEK_SET) != offset)
{
perror("unable to seek to necessary offset");
return 1;
}
}
/* allocate buffer for reading (so we don't realloc every time) */
p= (uchar *)malloc(UNIV_PAGE_SIZE);
/* main checksumming loop */
ct= start_page;
lastt= 0;
while (!feof(f))
{
bytes= fread(p, 1, UNIV_PAGE_SIZE, f);
if (!bytes && feof(f)) return 0;
if (bytes != UNIV_PAGE_SIZE)
{
fprintf(stderr, "bytes read (%d) doesn't match universal page size (%d)\n", bytes, UNIV_PAGE_SIZE);
return 1;
}
/* check the "stored log sequence numbers" */
logseq= mach_read_from_4(p + FIL_PAGE_LSN + 4);
logseqfield= mach_read_from_4(p + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM + 4);
if (debug)
printf("page %lu: log sequence number: first = %lu; second = %lu\n", ct, logseq, logseqfield);
if (logseq != logseqfield)
{
fprintf(stderr, "page %lu invalid (fails log sequence number check)\n", ct);
return 1;
}
/* check old method of checksumming */
oldcsum= buf_calc_page_old_checksum(p);
oldcsumfield= mach_read_from_4(p + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM);
if (debug)
printf("page %lu: old style: calculated = %lu; recorded = %lu\n", ct, oldcsum, oldcsumfield);
if (oldcsumfield != mach_read_from_4(p + FIL_PAGE_LSN) && oldcsumfield != oldcsum)
{
fprintf(stderr, "page %lu invalid (fails old style checksum)\n", ct);
return 1;
}
/* now check the new method */
csum= buf_calc_page_new_checksum(p);
csumfield= mach_read_from_4(p + FIL_PAGE_SPACE_OR_CHKSUM);
if (debug)
printf("page %lu: new style: calculated = %lu; recorded = %lu\n", ct, csum, csumfield);
if (csumfield != 0 && csum != csumfield)
{
fprintf(stderr, "page %lu invalid (fails new style checksum)\n", ct);
return 1;
}
/* end if this was the last page we were supposed to check */
if (use_end_page && (ct >= end_page))
return 0;
/* do counter increase and progress printing */
ct++;
if (verbose)
{
if (ct % 64 == 0)
{
now= time(0);
if (!lastt) lastt= now;
if (now - lastt >= 1)
{
printf("page %lu okay: %.3f%% done\n", (ct - 1), (float) ct / pages * 100);
lastt= now;
}
}
}
}
return 0;
}
This diff is collapsed.
......@@ -134,6 +134,7 @@ typedef struct thread_sync
static int mtflush_work_initialized = -1;
static os_fast_mutex_t mtflush_mtx;
static os_fast_mutex_t mtflush_mtx_wait;
static thread_sync_t* mtflush_ctx=NULL;
/******************************************************************//**
......@@ -180,7 +181,9 @@ buf_mtflu_flush_pool_instance(
pools based on the assumption that it will
help in the retry which will follow the
failure. */
#ifdef UNIV_DEBUG
fprintf(stderr, "InnoDB: Note: buf flush start failed there is already active flush for this buffer pool.\n");
#endif
return 0;
}
......@@ -223,12 +226,16 @@ mtflush_service_io(
mtflush_io->wt_status = WTHR_SIG_WAITING;
/* TODO: Temporal fix for the hang bug. This needs a real fix. */
os_fast_mutex_lock(&mtflush_mtx_wait);
work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq);
if (work_item == NULL) {
work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq);
}
os_fast_mutex_unlock(&mtflush_mtx_wait);
if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING;
} else {
......@@ -237,6 +244,10 @@ mtflush_service_io(
return;
}
if (work_item->wi_status != WRK_ITEM_EXIT) {
work_item->wi_status = WRK_ITEM_SET;
}
work_item->id_usr = os_thread_get_curr_id();
/* This works as a producer/consumer model, where in tasks are
......@@ -253,7 +264,7 @@ mtflush_service_io(
work_item->wi_status = WRK_ITEM_EXIT;
ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap);
mtflush_io->wt_status = WTHR_KILL_IT;
return;
break;
case MT_WRK_WRITE:
ut_a(work_item->wi_status == WRK_ITEM_SET);
......@@ -273,9 +284,9 @@ mtflush_service_io(
default:
/* None other than Write/Read handling planned */
ut_a(0);
break;
}
mtflush_io->wt_status = WTHR_NO_WORK;
}
/******************************************************************//**
......@@ -289,6 +300,7 @@ DECLARE_THREAD(mtflush_io_thread)(
void * arg)
{
thread_sync_t *mtflush_io = ((thread_sync_t *)arg);
ulint n_timeout = 0;
#ifdef UNIV_DEBUG
ib_uint64_t stat_universal_num_processed = 0;
ib_uint64_t stat_cycle_num_processed = 0;
......@@ -296,8 +308,32 @@ DECLARE_THREAD(mtflush_io_thread)(
#endif
while (TRUE) {
#ifdef UNIV_DEBUG
fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n",
os_thread_get_curr_id(),
ib_wqueue_len(mtflush_io->wq),
ib_wqueue_len(mtflush_io->wr_cq));
#endif /* UNIV_DEBUG */
mtflush_service_io(mtflush_io);
#ifdef UNIV_DEBUG
if (mtflush_io->wt_status == WTHR_NO_WORK) {
n_timeout++;
if (n_timeout > 10) {
fprintf(stderr, "InnoDB: Note: Thread %lu has not received "
" work queue len %lu return queue len %lu\n",
os_thread_get_curr_id(),
ib_wqueue_len(mtflush_io->wq),
ib_wqueue_len(mtflush_io->wr_cq));
n_timeout = 0;
}
} else {
n_timeout = 0;
}
#endif /* UNIV_DEBUG */
if (mtflush_io->wt_status == WTHR_KILL_IT) {
break;
}
......@@ -379,6 +415,7 @@ buf_mtflu_io_thread_exit(void)
ib_wqueue_free(mtflush_io->rd_cq);
os_fast_mutex_free(&mtflush_mtx);
os_fast_mutex_free(&mtflush_mtx_wait);
/* Free heap */
mem_heap_free(mtflush_io->wheap);
......@@ -400,6 +437,7 @@ buf_mtflu_handler_init(
ib_wqueue_t* mtflush_read_comp_queue;
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx_wait);
/* Create heap, work queue, write completion queue, read
completion queue for multi-threaded flush, and init
......@@ -465,16 +503,15 @@ buf_mtflu_flush_work_items(
node items areallocated */
work_heap = mem_heap_create(0);
work_item = (wrk_t*)mem_heap_alloc(work_heap, sizeof(wrk_t)*buf_pool_inst);
memset(work_item, 0, sizeof(wrk_t)*buf_pool_inst);
for(i=0;i<buf_pool_inst; i++) {
work_item[i].tsk = MT_WRK_WRITE;
work_item[i].rd.page_pool = NULL;
work_item[i].wr.buf_pool = buf_pool_from_array(i);
work_item[i].wr.flush_type = flush_type;
work_item[i].wr.min = min_n;
work_item[i].wr.lsn_limit = lsn_limit;
work_item[i].id_usr = -1;
work_item[i].wi_status = WRK_ITEM_SET;
work_item[i].wi_status = WRK_ITEM_UNSET;
work_item[i].wheap = work_heap;
ib_wqueue_add(mtflush_ctx->wq,
......@@ -490,14 +527,18 @@ buf_mtflu_flush_work_items(
if (done_wi != NULL) {
per_pool_pages_flushed[i] = done_wi->n_flushed;
if((int)done_wi->id_usr == -1 &&
done_wi->wi_status == WRK_ITEM_SET ) {
#ifdef UNIV_DEBUG
/* TODO: Temporal fix for hang. This is really a bug. */
if((int)done_wi->id_usr == 0 &&
(done_wi->wi_status == WRK_ITEM_SET ||
done_wi->wi_status == WRK_ITEM_UNSET)) {
fprintf(stderr,
"**Set/Unused work_item[%lu] flush_type=%d\n",
i,
done_wi->wr.flush_type);
ut_a(0);
}
#endif
n_flushed+= done_wi->n_flushed;
i++;
......
......@@ -16610,6 +16610,11 @@ static MYSQL_SYSVAR_LONG(mtflush_threads, srv_mtflush_threads,
MTFLUSH_MAX_WORKER, /* Max setting */
0);
static MYSQL_SYSVAR_BOOL(use_mtflush, srv_use_mtflush,
PLUGIN_VAR_OPCMDARG ,
"Use multi-threaded flush. Default TRUE.",
NULL, NULL, TRUE);
static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(additional_mem_pool_size),
MYSQL_SYSVAR(api_trx_level),
......@@ -16762,6 +16767,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(use_lz4),
#endif
MYSQL_SYSVAR(mtflush_threads),
MYSQL_SYSVAR(use_mtflush),
NULL
};
......
......@@ -1008,6 +1008,8 @@ Release fil_system mutex */
void
fil_system_exit(void);
/*==================*/
#ifndef UNIV_INNOCHECKSUM
/*******************************************************************//**
Returns the table space by a given id, NULL if not found. */
fil_space_t*
......@@ -1020,5 +1022,5 @@ char*
fil_space_name(
/*===========*/
fil_space_t* space); /*!< in: space */
#endif
#endif /* fil0fil_h */
......@@ -257,8 +257,13 @@ extern my_bool srv_use_lz4;
/* Number of flush threads */
#define MTFLUSH_MAX_WORKER 64
#define MTFLUSH_DEFAULT_WORKER 8
/* Number of threads used for multi-threaded flush */
extern long srv_mtflush_threads;
/* If this flag is TRUE, then we will use multi threaded flush. */
extern my_bool srv_use_mtflush;
#ifdef __WIN__
extern ibool srv_use_native_conditions;
#endif /* __WIN__ */
......
......@@ -150,6 +150,15 @@ ib_list_is_empty(
/* out: TRUE if empty else */
const ib_list_t* list); /* in: list */
/********************************************************************
Get number of items on list.
@return number of items on list */
UNIV_INLINE
ulint
ib_list_len(
/*========*/
const ib_list_t* list); /*<! in: list */
/* List. */
struct ib_list_t {
ib_list_node_t* first; /*!< first node */
......
......@@ -58,3 +58,23 @@ ib_list_is_empty(
{
return(!(list->first || list->last));
}
/********************************************************************
Get number of items on list.
@return number of items on list */
UNIV_INLINE
ulint
ib_list_len(
/*========*/
const ib_list_t* list) /*<! in: list */
{
ulint len = 0;
ib_list_node_t* node = list->first;
while(node) {
len++;
node = node->next;
}
return (len);
}
......@@ -103,6 +103,14 @@ ib_wqueue_nowait(
/*=============*/
ib_wqueue_t* wq); /*<! in: work queue */
/********************************************************************
Get number of items on queue.
@return number of items on queue */
ulint
ib_wqueue_len(
/*==========*/
ib_wqueue_t* wq); /*<! in: work queue */
/* Work queue. */
struct ib_wqueue_t {
......
......@@ -158,9 +158,11 @@ UNIV_INTERN my_bool srv_use_posix_fallocate = FALSE;
/* If this flag is TRUE, then we disable doublewrite buffer */
UNIV_INTERN my_bool srv_use_atomic_writes = FALSE;
/* If this flag IS TRUE, then we use lz4 to compress/decompress pages */
UNIV_INTERN my_bool srv_use_lz4 = FALSE;
UNIV_INTERN my_bool srv_use_lz4 = FALSE;
/* Number of threads used for multi-threaded flush */
UNIV_INTERN long srv_mtflush_threads = MTFLUSH_DEFAULT_WORKER;
/* If this flag is TRUE, then we will use multi threaded flush. */
UNIV_INTERN my_bool srv_use_mtflush = TRUE;
#ifdef __WIN__
/* Windows native condition variables. We use runtime loading / function
......
......@@ -2593,19 +2593,23 @@ innobase_start_or_create_for_mysql(void)
if (!srv_read_only_mode) {
/* Start multi-threaded flush threads */
mtflush_ctx = buf_mtflu_handler_init(srv_mtflush_threads,
srv_buf_pool_instances);
/* Set up the thread ids */
buf_mtflu_set_thread_ids(srv_mtflush_threads,
mtflush_ctx,
(thread_ids + 6 + 32));
if (srv_use_mtflush) {
/* Start multi-threaded flush threads */
mtflush_ctx = buf_mtflu_handler_init(
srv_mtflush_threads,
srv_buf_pool_instances);
/* Set up the thread ids */
buf_mtflu_set_thread_ids(
srv_mtflush_threads,
mtflush_ctx,
(thread_ids + 6 + SRV_MAX_N_PURGE_THREADS));
#if UNIV_DEBUG
fprintf(stderr, "InnoDB: Note: %s:%d buf-pool-instances:%lu mtflush_threads %lu\n",
__FILE__, __LINE__, srv_buf_pool_instances, srv_mtflush_threads);
fprintf(stderr, "InnoDB: Note: %s:%d buf-pool-instances:%lu mtflush_threads %lu\n",
__FILE__, __LINE__, srv_buf_pool_instances, srv_mtflush_threads);
#endif
}
os_thread_create(buf_flush_page_cleaner_thread, NULL, NULL);
}
......@@ -2871,9 +2875,12 @@ innobase_shutdown_for_mysql(void)
logs_empty_and_mark_files_at_shutdown() and should have
already quit or is quitting right now. */
/* g. Exit the multi threaded flush threads */
buf_mtflu_io_thread_exit();
if (srv_use_mtflush) {
/* g. Exit the multi threaded flush threads */
buf_mtflu_io_thread_exit();
}
#ifdef UNIV_DEBUG
fprintf(stderr, "InnoDB: Note: %s:%d os_thread_count:%lu \n", __FUNCTION__, __LINE__, os_thread_count);
......
......@@ -205,3 +205,20 @@ ib_wqueue_is_empty(
{
return(ib_list_is_empty(wq->items));
}
/********************************************************************
Get number of items on queue.
@return number of items on queue */
ulint
ib_wqueue_len(
/*==========*/
ib_wqueue_t* wq) /*<! in: work queue */
{
ulint len = 0;
mutex_enter(&wq->mutex);
len = ib_list_len(wq->items);
mutex_exit(&wq->mutex);
return(len);
}
......@@ -134,6 +134,7 @@ typedef struct thread_sync
static int mtflush_work_initialized = -1;
static os_fast_mutex_t mtflush_mtx;
static os_fast_mutex_t mtflush_mtx_wait;
static thread_sync_t* mtflush_ctx=NULL;
/******************************************************************//**
......@@ -182,7 +183,9 @@ buf_mtflu_flush_pool_instance(
pools based on the assumption that it will
help in the retry which will follow the
failure. */
#ifdef UNIV_DEBUG
fprintf(stderr, "InnoDB: Note: buf flush start failed there is already active flush for this buffer pool.\n");
#endif
return 0;
}
......@@ -228,12 +231,16 @@ mtflush_service_io(
mtflush_io->wt_status = WTHR_SIG_WAITING;
/* TODO: Temporal fix for the hang bug. This needs a real fix. */
os_fast_mutex_lock(&mtflush_mtx_wait);
work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq);
if (work_item == NULL) {
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, MT_WAIT_IN_USECS);
}
os_fast_mutex_unlock(&mtflush_mtx_wait);
if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING;
} else {
......@@ -242,6 +249,10 @@ mtflush_service_io(
return;
}
if (work_item->wi_status != WRK_ITEM_EXIT) {
work_item->wi_status = WRK_ITEM_SET;
}
work_item->id_usr = os_thread_get_curr_id();
/* This works as a producer/consumer model, where in tasks are
......@@ -258,7 +269,7 @@ mtflush_service_io(
work_item->wi_status = WRK_ITEM_EXIT;
ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap);
mtflush_io->wt_status = WTHR_KILL_IT;
return;
break;
case MT_WRK_WRITE:
ut_a(work_item->wi_status == WRK_ITEM_SET);
......@@ -278,9 +289,9 @@ mtflush_service_io(
default:
/* None other than Write/Read handling planned */
ut_a(0);
break;
}
mtflush_io->wt_status = WTHR_NO_WORK;
}
/******************************************************************//**
......@@ -302,13 +313,16 @@ DECLARE_THREAD(mtflush_io_thread)(
#endif
while (TRUE) {
#ifdef UNIV_DEBUG
fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n",
os_thread_get_curr_id(),
ib_wqueue_len(mtflush_io->wq),
ib_wqueue_len(mtflush_io->wr_cq));
#endif /* UNIV_DEBUG */
mtflush_service_io(mtflush_io);
#ifdef UNIV_DEBUG
if (mtflush_io->wt_status == WTHR_NO_WORK) {
n_timeout++;
......@@ -323,6 +337,7 @@ DECLARE_THREAD(mtflush_io_thread)(
} else {
n_timeout = 0;
}
#endif /* UNIV_DEBUG */
if (mtflush_io->wt_status == WTHR_KILL_IT) {
break;
......@@ -405,6 +420,7 @@ buf_mtflu_io_thread_exit(void)
ib_wqueue_free(mtflush_io->rd_cq);
os_fast_mutex_free(&mtflush_mtx);
os_fast_mutex_free(&mtflush_mtx_wait);
/* Free heap */
mem_heap_free(mtflush_io->wheap);
......@@ -426,6 +442,7 @@ buf_mtflu_handler_init(
ib_wqueue_t* mtflush_read_comp_queue;
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx_wait);
/* Create heap, work queue, write completion queue, read
completion queue for multi-threaded flush, and init
......@@ -491,16 +508,15 @@ buf_mtflu_flush_work_items(
node items areallocated */
work_heap = mem_heap_create(0);
work_item = (wrk_t*)mem_heap_alloc(work_heap, sizeof(wrk_t)*buf_pool_inst);
memset(work_item, 0, sizeof(wrk_t)*buf_pool_inst);
for(i=0;i<buf_pool_inst; i++) {
work_item[i].tsk = MT_WRK_WRITE;
work_item[i].rd.page_pool = NULL;
work_item[i].wr.buf_pool = buf_pool_from_array(i);
work_item[i].wr.flush_type = flush_type;
work_item[i].wr.min = min_n;
work_item[i].wr.lsn_limit = lsn_limit;
work_item[i].id_usr = -1;
work_item[i].wi_status = WRK_ITEM_SET;
work_item[i].wi_status = WRK_ITEM_UNSET;
work_item[i].wheap = work_heap;
ib_wqueue_add(mtflush_ctx->wq,
......@@ -516,14 +532,18 @@ buf_mtflu_flush_work_items(
if (done_wi != NULL) {
per_pool_pages_flushed[i] = done_wi->n_flushed;
if((int)done_wi->id_usr == -1 &&
done_wi->wi_status == WRK_ITEM_SET ) {
#ifdef UNIV_DEBUG
/* TODO: Temporal fix for hang. This is really a bug. */
if((int)done_wi->id_usr == 0 &&
(done_wi->wi_status == WRK_ITEM_SET ||
done_wi->wi_status == WRK_ITEM_UNSET)) {
fprintf(stderr,
"**Set/Unused work_item[%lu] flush_type=%d\n",
i,
done_wi->wr.flush_type);
ut_a(0);
}
#endif
n_flushed+= done_wi->n_flushed;
i++;
......
......@@ -17971,6 +17971,11 @@ static MYSQL_SYSVAR_LONG(mtflush_threads, srv_mtflush_threads,
MTFLUSH_MAX_WORKER, /* Max setting */
0);
static MYSQL_SYSVAR_BOOL(use_mtflush, srv_use_mtflush,
PLUGIN_VAR_OPCMDARG ,
"Use multi-threaded flush. Default TRUE.",
NULL, NULL, TRUE);
static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(log_block_size),
MYSQL_SYSVAR(additional_mem_pool_size),
......@@ -18168,6 +18173,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(use_lz4),
#endif
MYSQL_SYSVAR(mtflush_threads),
MYSQL_SYSVAR(use_mtflush),
NULL
};
......
......@@ -1042,6 +1042,8 @@ Release fil_system mutex */
void
fil_system_exit(void);
/*==================*/
#ifndef UNIV_INNOCHECKSUM
/*******************************************************************//**
Returns the table space by a given id, NULL if not found. */
fil_space_t*
......@@ -1054,5 +1056,5 @@ char*
fil_space_name(
/*===========*/
fil_space_t* space); /*!< in: space */
#endif
#endif /* fil0fil_h */
......@@ -277,8 +277,13 @@ extern my_bool srv_use_lz4;
/* Number of flush threads */
#define MTFLUSH_MAX_WORKER 64
#define MTFLUSH_DEFAULT_WORKER 8
/* Number of threads used for multi-threaded flush */
extern long srv_mtflush_threads;
/* If this flag is TRUE, then we will use multi threaded flush. */
extern my_bool srv_use_mtflush;
/** Server undo tablespaces directory, can be absolute path. */
extern char* srv_undo_dir;
......
......@@ -151,7 +151,7 @@ ib_list_is_empty(
const ib_list_t* list); /* in: list */
/********************************************************************
Get number of items on list.
Get number of items on list.
@return number of items on list */
UNIV_INLINE
ulint
......
......@@ -60,7 +60,7 @@ ib_list_is_empty(
}
/********************************************************************
Get number of items on list.
Get number of items on list.
@return number of items on list */
UNIV_INLINE
ulint
......
......@@ -105,7 +105,7 @@ ib_wqueue_nowait(
/********************************************************************
Get number of items on queue.
Get number of items on queue.
@return number of items on queue */
ulint
ib_wqueue_len(
......
......@@ -176,9 +176,11 @@ UNIV_INTERN my_bool srv_use_posix_fallocate = FALSE;
/* If this flag is TRUE, then we disable doublewrite buffer */
UNIV_INTERN my_bool srv_use_atomic_writes = FALSE;
/* If this flag IS TRUE, then we use lz4 to compress/decompress pages */
UNIV_INTERN my_bool srv_use_lz4 = FALSE;
UNIV_INTERN my_bool srv_use_lz4 = FALSE;
/* Number of threads used for multi-threaded flush */
UNIV_INTERN long srv_mtflush_threads = MTFLUSH_DEFAULT_WORKER;
/* If this flag is TRUE, then we will use multi threaded flush. */
UNIV_INTERN my_bool srv_use_mtflush = TRUE;
#ifdef __WIN__
/* Windows native condition variables. We use runtime loading / function
......
......@@ -2719,19 +2719,23 @@ innobase_start_or_create_for_mysql(void)
if (!srv_read_only_mode) {
/* Start multi-threaded flush threads */
mtflush_ctx = buf_mtflu_handler_init(srv_mtflush_threads,
srv_buf_pool_instances);
/* Set up the thread ids */
buf_mtflu_set_thread_ids(srv_mtflush_threads,
mtflush_ctx,
(thread_ids + 6 + SRV_MAX_N_PURGE_THREADS));
if (srv_use_mtflush) {
/* Start multi-threaded flush threads */
mtflush_ctx = buf_mtflu_handler_init(
srv_mtflush_threads,
srv_buf_pool_instances);
/* Set up the thread ids */
buf_mtflu_set_thread_ids(
srv_mtflush_threads,
mtflush_ctx,
(thread_ids + 6 + SRV_MAX_N_PURGE_THREADS));
#if UNIV_DEBUG
fprintf(stderr, "InnoDB: Note: %s:%d buf-pool-instances:%lu mtflush_threads %lu\n",
__FILE__, __LINE__, srv_buf_pool_instances, srv_mtflush_threads);
fprintf(stderr, "InnoDB: Note: %s:%d buf-pool-instances:%lu mtflush_threads %lu\n",
__FILE__, __LINE__, srv_buf_pool_instances, srv_mtflush_threads);
#endif
}
os_thread_create(buf_flush_page_cleaner_thread, NULL, NULL);
}
......@@ -3004,9 +3008,12 @@ innobase_shutdown_for_mysql(void)
logs_empty_and_mark_files_at_shutdown() and should have
already quit or is quitting right now. */
/* g. Exit the multi threaded flush threads */
buf_mtflu_io_thread_exit();
if (srv_use_mtflush) {
/* g. Exit the multi threaded flush threads */
buf_mtflu_io_thread_exit();
}
#ifdef UNIV_DEBUG
fprintf(stderr, "InnoDB: Note: %s:%d os_thread_count:%lu \n", __FUNCTION__, __LINE__, os_thread_count);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment