Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
7f3950a2
Commit
7f3950a2
authored
Feb 06, 2014
by
Jan Lindström
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Moved mt-flush code to buf0mtflu.[cc|h] and cleaned it up. This is for
InnoDB.
parent
921d87d4
Changes
10
Show whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
729 additions
and
1587 deletions
+729
-1587
storage/innobase/CMakeLists.txt
storage/innobase/CMakeLists.txt
+1
-2
storage/innobase/buf/buf0flu.cc
storage/innobase/buf/buf0flu.cc
+6
-229
storage/innobase/buf/buf0mtflu.cc
storage/innobase/buf/buf0mtflu.cc
+555
-940
storage/innobase/include/buf0flu.h
storage/innobase/include/buf0flu.h
+49
-0
storage/innobase/include/buf0mtflu.h
storage/innobase/include/buf0mtflu.h
+95
-0
storage/innobase/include/srv0srv.h
storage/innobase/include/srv0srv.h
+1
-1
storage/innobase/include/srv0start.h
storage/innobase/include/srv0start.h
+2
-1
storage/innobase/srv/srv0srv.cc
storage/innobase/srv/srv0srv.cc
+3
-1
storage/innobase/srv/srv0start.cc
storage/innobase/srv/srv0start.cc
+14
-413
storage/xtradb/buf/buf0flu.cc
storage/xtradb/buf/buf0flu.cc
+3
-0
No files found.
storage/innobase/CMakeLists.txt
View file @
7f3950a2
...
...
@@ -278,8 +278,7 @@ SET(INNOBASE_SOURCES
buf/buf0flu.cc
buf/buf0lru.cc
buf/buf0rea.cc
# TODO: JAN uncomment
# buf/buf0mtflu.cc
buf/buf0mtflu.cc
data/data0data.cc
data/data0type.cc
dict/dict0boot.cc
...
...
storage/innobase/buf/buf0flu.cc
View file @
7f3950a2
...
...
@@ -32,6 +32,7 @@ Created 11/11/1995 Heikki Tuuri
#endif
#include "buf0buf.h"
#include "buf0mtflu.h"
#include "buf0checksum.h"
#include "srv0start.h"
#include "srv0srv.h"
...
...
@@ -1680,7 +1681,6 @@ pages: to avoid deadlocks, this function must be written so that it cannot
end up waiting for these latches! NOTE 2: in the case of a flush list flush,
the calling thread is not allowed to own any latches on pages!
@return number of blocks for which the write request was queued */
//static
ulint
buf_flush_batch
(
/*============*/
...
...
@@ -1737,7 +1737,6 @@ buf_flush_batch(
/******************************************************************//**
Gather the aggregated stats for both flush list and LRU list flushing */
//static
void
buf_flush_common
(
/*=============*/
...
...
@@ -1762,7 +1761,6 @@ buf_flush_common(
/******************************************************************//**
Start a buffer flush batch for LRU or flush list */
//static
ibool
buf_flush_start
(
/*============*/
...
...
@@ -1791,7 +1789,6 @@ buf_flush_start(
/******************************************************************//**
End a buffer flush batch for LRU or flush list */
//static
void
buf_flush_end
(
/*==========*/
...
...
@@ -1846,50 +1843,6 @@ buf_flush_wait_batch_end(
}
}
/* JAN: TODO: */
/*******************************************************************//**
This utility flushes dirty blocks from the end of the LRU list and also
puts replaceable clean pages from the end of the LRU list to the free
list.
NOTE: The calling thread is not allowed to own any latches on pages!
@return true if a batch was queued successfully. false if another batch
of same type was already running. */
static
bool
pgcomp_buf_flush_LRU
(
/*==========*/
buf_pool_t
*
buf_pool
,
/*!< in/out: buffer pool instance */
ulint
min_n
,
/*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
ulint
*
n_processed
)
/*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
{
ulint
page_count
;
if
(
n_processed
)
{
*
n_processed
=
0
;
}
if
(
!
buf_flush_start
(
buf_pool
,
BUF_FLUSH_LRU
))
{
return
(
false
);
}
page_count
=
buf_flush_batch
(
buf_pool
,
BUF_FLUSH_LRU
,
min_n
,
0
);
buf_flush_end
(
buf_pool
,
BUF_FLUSH_LRU
);
buf_flush_common
(
BUF_FLUSH_LRU
,
page_count
);
if
(
n_processed
)
{
*
n_processed
=
page_count
;
}
return
(
true
);
}
/* JAN: TODO: END: */
/*******************************************************************//**
This utility flushes dirty blocks from the end of the LRU list and also
puts replaceable clean pages from the end of the LRU list to the free
...
...
@@ -1932,125 +1885,6 @@ buf_flush_LRU(
return
(
true
);
}
/* JAN: TODO: */
/*******************************************************************//**/
extern
int
is_pgcomp_wrk_init_done
(
void
);
extern
int
pgcomp_flush_work_items
(
int
buf_pool_inst
,
int
*
pages_flushed
,
enum
buf_flush
flush_type
,
int
min_n
,
lsn_t
lsn_limit
);
#define MT_COMP_WATER_MARK 50
#ifdef UNIV_DEBUG
#include <time.h>
int
timediff
(
struct
timeval
*
g_time
,
struct
timeval
*
s_time
,
struct
timeval
*
d_time
)
{
if
(
g_time
->
tv_usec
<
s_time
->
tv_usec
)
{
int
nsec
=
(
s_time
->
tv_usec
-
g_time
->
tv_usec
)
/
1000000
+
1
;
s_time
->
tv_usec
-=
1000000
*
nsec
;
s_time
->
tv_sec
+=
nsec
;
}
if
(
g_time
->
tv_usec
-
s_time
->
tv_usec
>
1000000
)
{
int
nsec
=
(
s_time
->
tv_usec
-
g_time
->
tv_usec
)
/
1000000
;
s_time
->
tv_usec
+=
1000000
*
nsec
;
s_time
->
tv_sec
-=
nsec
;
}
d_time
->
tv_sec
=
g_time
->
tv_sec
-
s_time
->
tv_sec
;
d_time
->
tv_usec
=
g_time
->
tv_usec
-
s_time
->
tv_usec
;
return
0
;
}
#endif
static
os_fast_mutex_t
pgcomp_mtx
;
void
pgcomp_init
(
void
)
{
os_fast_mutex_init
(
PFS_NOT_INSTRUMENTED
,
&
pgcomp_mtx
);
}
void
pgcomp_deinit
(
void
)
{
os_fast_mutex_free
(
&
pgcomp_mtx
);
}
/*******************************************************************//**
Multi-threaded version of buf_flush_list
*/
UNIV_INTERN
bool
pgcomp_buf_flush_list
(
/*==================*/
ulint
min_n
,
/*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
lsn_t
lsn_limit
,
/*!< in the case BUF_FLUSH_LIST all
blocks whose oldest_modification is
smaller than this should be flushed
(if their number does not exceed
min_n), otherwise ignored */
ulint
*
n_processed
)
/*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
{
ulint
i
;
bool
success
=
true
;
#ifdef UNIV_DEBUG
struct
timeval
p_start_time
,
p_end_time
,
d_time
;
#endif
int
cnt_flush
[
MTFLUSH_MAX_WORKER
];
if
(
n_processed
)
{
*
n_processed
=
0
;
}
if
(
min_n
!=
ULINT_MAX
)
{
/* Ensure that flushing is spread evenly amongst the
buffer pool instances. When min_n is ULINT_MAX
we need to flush everything up to the lsn limit
so no limit here. */
min_n
=
(
min_n
+
srv_buf_pool_instances
-
1
)
/
srv_buf_pool_instances
;
}
#ifdef UNIV_DEBUG
gettimeofday
(
&
p_start_time
,
0x0
);
#endif
os_fast_mutex_lock
(
&
pgcomp_mtx
);
pgcomp_flush_work_items
(
srv_buf_pool_instances
,
cnt_flush
,
BUF_FLUSH_LIST
,
min_n
,
lsn_limit
);
os_fast_mutex_unlock
(
&
pgcomp_mtx
);
for
(
i
=
0
;
i
<
srv_buf_pool_instances
;
i
++
)
{
if
(
n_processed
)
{
*
n_processed
+=
cnt_flush
[
i
];
}
if
(
cnt_flush
[
i
])
{
MONITOR_INC_VALUE_CUMULATIVE
(
MONITOR_FLUSH_BATCH_TOTAL_PAGE
,
MONITOR_FLUSH_BATCH_COUNT
,
MONITOR_FLUSH_BATCH_PAGES
,
cnt_flush
[
i
]);
}
}
#ifdef UNIV_DEBUG
gettimeofday
(
&
p_end_time
,
0x0
);
timediff
(
&
p_end_time
,
&
p_start_time
,
&
d_time
);
fprintf
(
stderr
,
"%s: [1] [*n_processed: (min:%lu)%lu %llu usec]
\n
"
,
__FUNCTION__
,
(
min_n
*
srv_buf_pool_instances
),
*
n_processed
,
(
unsigned
long
long
)(
d_time
.
tv_usec
+
(
d_time
.
tv_sec
*
1000000
)));
#endif
return
(
success
);
}
/* JAN: TODO: END: */
/*******************************************************************//**
This utility flushes dirty blocks from the end of the flush list of
all buffer pool instances.
...
...
@@ -2078,11 +1912,9 @@ buf_flush_list(
ulint
i
;
bool
success
=
true
;
/* JAN: TODO: */
if
(
is_pgcomp_wrk_init_done
())
{
return
(
pgcomp_buf_flush_list
(
min_n
,
lsn_limit
,
n_processed
));
if
(
buf_mtflu_init_done
())
{
return
(
buf_mtflu_flush_list
(
min_n
,
lsn_limit
,
n_processed
));
}
/* JAN: TODO: END: */
if
(
n_processed
)
{
*
n_processed
=
0
;
...
...
@@ -2237,60 +2069,6 @@ buf_flush_single_page_from_LRU(
return
(
freed
);
}
/* JAN: TODO: */
/*********************************************************************//**
pgcomp_Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
* Flush dirty pages at the tail of LRU to the disk
The depth to which we scan each buffer pool is controlled by dynamic
config parameter innodb_LRU_scan_depth.
@return total pages flushed */
UNIV_INTERN
ulint
pgcomp_buf_flush_LRU_tail
(
void
)
/*====================*/
{
#ifdef UNIV_DEBUG
struct
timeval
p_start_time
,
p_end_time
,
d_time
;
#endif
ulint
total_flushed
=
0
,
i
=
0
;
int
cnt_flush
[
32
];
#ifdef UNIV_DEBUG
gettimeofday
(
&
p_start_time
,
0x0
);
#endif
ut_ad
(
is_pgcomp_wrk_init_done
());
os_fast_mutex_lock
(
&
pgcomp_mtx
);
pgcomp_flush_work_items
(
srv_buf_pool_instances
,
cnt_flush
,
BUF_FLUSH_LRU
,
srv_LRU_scan_depth
,
0
);
os_fast_mutex_unlock
(
&
pgcomp_mtx
);
for
(
i
=
0
;
i
<
srv_buf_pool_instances
;
i
++
)
{
if
(
cnt_flush
[
i
])
{
total_flushed
+=
cnt_flush
[
i
];
MONITOR_INC_VALUE_CUMULATIVE
(
MONITOR_LRU_BATCH_TOTAL_PAGE
,
MONITOR_LRU_BATCH_COUNT
,
MONITOR_LRU_BATCH_PAGES
,
cnt_flush
[
i
]);
}
}
#if UNIV_DEBUG
gettimeofday
(
&
p_end_time
,
0x0
);
timediff
(
&
p_end_time
,
&
p_start_time
,
&
d_time
);
fprintf
(
stderr
,
"[1] [*n_processed: (min:%lu)%lu %llu usec]
\n
"
,
(
srv_LRU_scan_depth
*
srv_buf_pool_instances
),
total_flushed
,
(
unsigned
long
long
)(
d_time
.
tv_usec
+
(
d_time
.
tv_sec
*
1000000
)));
#endif
return
(
total_flushed
);
}
/* JAN: TODO: END: */
/*********************************************************************//**
Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
...
...
@@ -2304,12 +2082,11 @@ buf_flush_LRU_tail(void)
/*====================*/
{
ulint
total_flushed
=
0
;
/* JAN: TODO: */
if
(
is_pgcomp_wrk
_init_done
())
if
(
buf_mtflu
_init_done
())
{
return
(
pgcomp_buf
_flush_LRU_tail
());
return
(
buf_mtflu
_flush_LRU_tail
());
}
/* JAN: TODO: END */
for
(
ulint
i
=
0
;
i
<
srv_buf_pool_instances
;
i
++
)
{
...
...
storage/innobase/buf/buf0mtflu.cc
View file @
7f3950a2
/*****************************************************************************
Copyright (C) 2013 Fusion-io. All Rights Reserved.
Copyright (C) 2013 SkySQL Ab. All Rights Reserved.
Copyright (C) 2013
, 2014,
Fusion-io. All Rights Reserved.
Copyright (C) 2013
, 2014,
SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
...
...
@@ -23,283 +23,180 @@ Multi-threaded flush method implementation
Created 06/11/2013 Dhananjoy Das DDas@fusionio.com
Modified 12/12/2013 Jan Lindström jan.lindstrom@skysql.com
Modified 03/02/2014 Dhananjoy Das DDas@fusionio.com
Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
***********************************************************************/
#include <time.h>
#ifdef UNIV_PFS_MUTEX
/* Key to register fil_system_mutex with performance schema */
UNIV_INTERN
mysql_pfs_key_t
mtflush_mutex_key
;
#endif
/* UNIV_PFS_MUTEX */
/* Mutex to protect critical sections during multi-threaded flush */
ib_mutex_t
mt_flush_mutex
;
#include "buf0buf.h"
#include "buf0flu.h"
#include "buf0mtflu.h"
#include "buf0checksum.h"
#include "srv0start.h"
#include "srv0srv.h"
#include "page0zip.h"
#include "ut0byte.h"
#include "ut0lst.h"
#include "page0page.h"
#include "fil0fil.h"
#include "buf0lru.h"
#include "buf0rea.h"
#include "ibuf0ibuf.h"
#include "log0log.h"
#include "os0file.h"
#include "os0sync.h"
#include "trx0sys.h"
#include "srv0mon.h"
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
#include "fil0pagecompress.h"
#define MT_COMP_WATER_MARK 50
/* Work item status */
typedef
enum
{
WORK_ITEM_SET
=
0
,
/* Work item information set */
WORK_ITEM_START
=
1
,
/* Work item assigned to thread and
execution started */
WORK_ITEM_DONE
=
2
,
/* Work item execution done */
}
mtflu_witem_status_t
;
typedef
enum
wrk_status
{
WRK_ITEM_SET
=
0
,
/*!< Work item is set */
WRK_ITEM_START
=
1
,
/*!< Processing of work item has started */
WRK_ITEM_DONE
=
2
,
/*!< Processing is done usually set to
SUCCESS/FAILED */
WRK_ITEM_SUCCESS
=
2
,
/*!< Work item successfully processed */
WRK_ITEM_FAILED
=
3
,
/*!< Work item process failed */
WRK_ITEM_EXIT
=
4
,
/*!< Exiting */
WRK_ITEM_STATUS_UNDEFINED
}
wrk_status_t
;
/* Work item task type */
typedef
enum
mt_wrk_tsk
{
MT_WRK_NONE
=
0
,
/*!< Exit queue-wait */
MT_WRK_WRITE
=
1
,
/*!< Flush operation */
MT_WRK_READ
=
2
,
/*!< Read operation */
MT_WRK_UNDEFINED
}
mt_wrk_tsk_t
;
/* Work thread status */
typedef
enum
{
WORK_THREAD_NOT_INIT
=
0
,
/* Work thread not initialized */
WORK_THREAD_INITIALIZED
=
1
,
/* Work thread initialized */
WORK_THREAD_SIG_WAITING
=
2
,
/* Work thred signaled */
WORK_THREAD_RUNNING
=
3
,
/* Work thread running */
WORK_THREAD_NO_WORK
=
4
,
/* Work thread has no work to do */
}
mtflu_wthr_status_t
;
/* Structure containing multi-treaded flush thread information */
typedef
struct
{
os_thread_t
wthread_id
;
/* Thread id */
opq_t
*
wq
;
/* Write queue ? */
opq_t
*
cq
;
/* Commit queue ?*/
ib_mutex_t
thread_mutex
;
/* Mutex proecting below
structures */
mtflu_wthr_status_t
thread_status
;
/* Thread status */
ib_uint64_t
total_num_processed
;
/* Total number of
pages processed */
ib_uint64_t
cycle_num_processed
;
/* Numper of pages
processed on last
cycle */
ulint
check_wrk_done_count
;
/* Number of pages
to process in this
work item ? */
ulint
done_cnt_flag
;
/* Number of pages
processed in this
work item ?*/
}
mtflu_thread_t
;
struct
work_item_t
{
/****************************/
/* Need to group into struct*/
buf_pool_t
*
buf_pool
;
//buffer-pool instance
int
flush_type
;
//flush-type for buffer-pool flush operation
ulint
min
;
//minimum number of pages requested to be flushed
lsn_t
lsn_limit
;
//lsn limit for the buffer-pool flush operation
/****************************/
unsigned
long
result
;
//flush pages count
unsigned
long
t_usec
;
//time-taken in usec
os_thread_t
id_usr
;
/* thread-id
currently working , why ? */
mtflu_witem_status_t
wi_status
;
/* work item status */
UT_LIST_NODE_T
(
work_node_t
)
next
;
};
/* Multi-threaded flush system structure */
typedef
struct
{
int
pgc_n_threads
=
8
;
// ??? why what this is
mtflu_thread_t
pc_sync
[
PGCOMP_MAX_WORKER
];
wrk_t
work_items
[
PGCOMP_MAX_WORKER
];
int
pgcomp_wrk_initialized
=
-
1
;
/* ???? */
opq_t
wq
;
/* write queue ? */
opq_t
cq
;
/* commit queue ? */
}
mtflu_system_t
;
typedef
enum
op_q_status
{
Q_NOT_INIT
=
0
,
Q_EMPTY
=
1
,
Q_INITIALIZED
=
2
,
Q_PROCESS
=
3
,
Q_DONE
=
4
,
Q_ERROR
=
5
,
Q_STATUS_UNDEFINED
}
q_status_t
;
// NOTE: jan: could we use ut/ut0wqueue.(h|cc)
// NOTE: jan: here ????, it would handle waiting, signaling
// and contains simple interface
typedef
struct
op_queue
typedef
enum
wthr_status
{
WTHR_NOT_INIT
=
0
,
/*!< Work thread not initialized */
WTHR_INITIALIZED
=
1
,
/*!< Work thread initialized */
WTHR_SIG_WAITING
=
2
,
/*!< Work thread wating signal */
WTHR_RUNNING
=
3
,
/*!< Work thread running */
WTHR_NO_WORK
=
4
,
/*!< Work thread has no work */
WTHR_KILL_IT
=
5
,
/*!< Work thread should exit */
WTHR_STATUS_UNDEFINED
}
wthr_status_t
;
/* Write work task */
typedef
struct
wr_tsk
{
buf_pool_t
*
buf_pool
;
/*!< buffer-pool instance */
enum
buf_flush
flush_type
;
/*!< flush-type for buffer-pool
flush operation */
ulint
min
;
/*!< minimum number of pages
requested to be flushed */
lsn_t
lsn_limit
;
/*!< lsn limit for the buffer-pool
flush operation */
}
wr_tsk_t
;
/* Read work task */
typedef
struct
rd_tsk
{
buf_pool_t
*
page_pool
;
/*!< list of pages to decompress; */
}
rd_tsk_t
;
/* Work item */
typedef
struct
wrk_itm
{
ib_mutex_t
mtx
;
/* Mutex protecting below variables
*/
os_cond_t
cv
;
/* ? is waiting here ? */
q_status_t
flag
;
/* Operation queue status */
UT_LIST_BASE_NODE_T
(
work_item_t
)
work_list
;
}
opq_t
;
/*******************************************************************//**
Initialize multi-threaded flush.
*/
void
buf_mtflu_init
(
void
)
/*================*/
mt_wrk_tsk_t
tsk
;
/*!< Task type. Based on task-type
one of the entries wr_tsk/rd_tsk
will be used */
wr_tsk_t
wr
;
/*!< Flush page list */
rd_tsk_t
rd
;
/*!< Decompress page list */
ulint
n_flushed
;
/*!< Flushed pages count */
os_thread_t
id_usr
;
/*!< Thread-id currently working */
wrk_status_t
wi_status
;
/*!< Work item status */
struct
wrk_itm
*
next
;
/*!< Next work item */
}
wrk_t
;
/* Thread syncronization data */
typedef
struct
thread_sync
{
mutex_create
(
mtflush_mutex_key
,
&
mt_flush_mutex
,
SYNC_ANY_LATCH
);
}
os_thread_id_t
wthread_id
;
/*!< Identifier */
os_thread_t
wthread
;
/*!< Thread id */
ib_wqueue_t
*
wq
;
/*!< Work Queue */
ib_wqueue_t
*
wr_cq
;
/*!< Write Completion Queue */
ib_wqueue_t
*
rd_cq
;
/*!< Read Completion Queue */
wthr_status_t
wt_status
;
/*!< Worker thread status */
ulint
stat_universal_num_processed
;
/*!< Total number of pages
processed by this thread */
ulint
stat_cycle_num_processed
;
/*!< Number of pages processed
on this cycle */
mem_heap_t
*
wheap
;
/*!< Work heap where memory
is allocated */
wrk_t
*
work_item
;
/*!< Work items to be processed */
}
thread_sync_t
;
/* QUESTION: Is this array used from several threads concurrently ? */
// static wrk_t work_items[MTFLUSH_MAX_WORKER];
/* TODO: REALLY NEEDED ? */
static
int
mtflush_work_initialized
=
-
1
;
static
os_fast_mutex_t
mtflush_mtx
;
static
thread_sync_t
*
mtflush_ctx
=
NULL
;
/*******************************************************************//**
This utility flushes dirty blocks from the end of the LRU list and also
puts replaceable clean pages from the end of the LRU list to the free
list.
NOTE: The calling thread is not allowed to own any latches on pages!
@return true if a batch was queued successfully. false if another batch
of same type was already running. */
bool
buf_mtflu_flush_LRU
(
/*================*/
buf_pool_t
*
buf_pool
,
/*!< in/out: buffer pool instance */
ulint
min_n
,
/*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
ulint
*
n_processed
)
/*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
/******************************************************************//**
Initialize work items. */
static
void
mtflu_setup_work_items
(
/*===================*/
wrk_t
*
work_items
,
/*!< inout: Work items */
ulint
n_items
)
/*!< in: Number of work items */
{
ulint
page_count
;
if
(
n_processed
)
{
*
n_processed
=
0
;
}
if
(
!
buf_flush_start
(
buf_pool
,
BUF_FLUSH_LRU
))
{
return
(
false
);
}
page_count
=
buf_flush_batch
(
buf_pool
,
BUF_FLUSH_LRU
,
min_n
,
0
);
buf_flush_end
(
buf_pool
,
BUF_FLUSH_LRU
);
buf_flush_common
(
BUF_FLUSH_LRU
,
page_count
);
if
(
n_processed
)
{
*
n_processed
=
page_count
;
ulint
i
;
for
(
i
=
0
;
i
<
n_items
;
i
++
)
{
work_items
[
i
].
rd
.
page_pool
=
NULL
;
work_items
[
i
].
wr
.
buf_pool
=
NULL
;
work_items
[
i
].
n_flushed
=
0
;
work_items
[
i
].
id_usr
=
-
1
;
work_items
[
i
].
wi_status
=
WRK_ITEM_STATUS_UNDEFINED
;
work_items
[
i
].
next
=
&
work_items
[(
i
+
1
)
%
n_items
];
}
return
(
true
)
;
/* last node should be the tail */
work_items
[
n_items
-
1
].
next
=
NULL
;
}
#ifdef UNIV_DEBUG
/*******************************************************************//**
Utility function to calculate time difference between start time
and end time.
@return Time difference.
*/
UNIV_INTERN
/******************************************************************//**
Set multi-threaded flush work initialized. */
static
inline
void
mtflu_timediff
(
/*===========*/
struct
timeval
*
g_time
,
/*!< in/out: Start time*/
struct
timeval
*
s_time
,
/*!< in/out: End time */
struct
timeval
*
d_time
)
/*!< out: Time difference */
buf_mtflu_work_init
(
void
)
/*=====================*/
{
if
(
g_time
->
tv_usec
<
s_time
->
tv_usec
)
{
int
nsec
=
(
s_time
->
tv_usec
-
g_time
->
tv_usec
)
/
1000000
+
1
;
s_time
->
tv_usec
-=
1000000
*
nsec
;
s_time
->
tv_sec
+=
nsec
;
}
if
(
g_time
->
tv_usec
-
s_time
->
tv_usec
>
1000000
)
{
int
nsec
=
(
s_time
->
tv_usec
-
g_time
->
tv_usec
)
/
1000000
;
s_time
->
tv_usec
+=
1000000
*
nsec
;
s_time
->
tv_sec
-=
nsec
;
}
d_time
->
tv_sec
=
g_time
->
tv_sec
-
s_time
->
tv_sec
;
d_time
->
tv_usec
=
g_time
->
tv_usec
-
s_time
->
tv_usec
;
mtflush_work_initialized
=
1
;
}
#endif
/*******************************************************************//**
This utility flushes dirty blocks from the end of the flush list of
all buffer pool instances. This is multi-threaded version of buf_flush_list.
NOTE: The calling thread is not allowed to own any latches on pages!
@return true if a batch was queued successfully for each buffer pool
instance. false if another batch of same type was already running in
at least one of the buffer pool instance */
/******************************************************************//**
Return true if multi-threaded flush is initialized
@return true if initialized */
bool
buf_mtflu_flush_list
(
/*=================*/
ulint
min_n
,
/*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
lsn_t
lsn_limit
,
/*!< in the case BUF_FLUSH_LIST all
blocks whose oldest_modification is
smaller than this should be flushed
(if their number does not exceed
min_n), otherwise ignored */
ulint
*
n_processed
)
/*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
buf_mtflu_init_done
(
void
)
/*=====================*/
{
ulint
i
;
bool
success
=
true
;
struct
timeval
p_start_time
,
p_end_time
,
d_time
;
if
(
n_processed
)
{
*
n_processed
=
0
;
}
if
(
min_n
!=
ULINT_MAX
)
{
/* Ensure that flushing is spread evenly amongst the
buffer pool instances. When min_n is ULINT_MAX
we need to flush everything up to the lsn limit
so no limit here. */
min_n
=
(
min_n
+
srv_buf_pool_instances
-
1
)
/
srv_buf_pool_instances
;
}
#ifdef UNIV_DEBUG
gettimeofday
(
&
p_start_time
,
0x0
);
#endif
if
(
is_pgcomp_wrk_init_done
()
&&
(
min_n
>
MT_COMP_WATER_MARK
))
{
int
cnt_flush
[
32
];
mutex_enter
(
&
mt_flush_mutex
);
#ifdef UNIV_DEBUG
fprintf
(
stderr
,
"Calling into wrk-pgcomp [min:%lu]"
,
min_n
);
#endif
pgcomp_flush_work_items
(
srv_buf_pool_instances
,
cnt_flush
,
BUF_FLUSH_LIST
,
min_n
,
lsn_limit
);
for
(
i
=
0
;
i
<
srv_buf_pool_instances
;
i
++
)
{
if
(
n_processed
)
{
*
n_processed
+=
cnt_flush
[
i
];
}
if
(
cnt_flush
[
i
])
{
MONITOR_INC_VALUE_CUMULATIVE
(
MONITOR_FLUSH_BATCH_TOTAL_PAGE
,
MONITOR_FLUSH_BATCH_COUNT
,
MONITOR_FLUSH_BATCH_PAGES
,
cnt_flush
[
i
]);
}
}
mutex_exit
(
&
pgcomp_mtx
);
#ifdef UNIV_DEBUG
gettimeofday
(
&
p_end_time
,
0x0
);
timediff
(
&
p_end_time
,
&
p_start_time
,
&
d_time
);
fprintf
(
stderr
,
"[1] [*n_processed: (min:%lu)%lu %llu usec]
\n
"
,
(
min_n
*
srv_buf_pool_instances
),
*
n_processed
,
(
unsigned
long
long
)(
d_time
.
tv_usec
+
(
d_time
.
tv_sec
*
1000000
)));
#endif
return
(
success
);
}
/* Flush to lsn_limit in all buffer pool instances */
for
(
i
=
0
;
i
<
srv_buf_pool_instances
;
i
++
)
{
buf_pool_t
*
buf_pool
;
ulint
page_count
=
0
;
return
(
mtflush_work_initialized
==
1
);
}
buf_pool
=
buf_pool_from_array
(
i
);
/******************************************************************//**
Fush buffer pool instance.
@return number of flushed pages, or 0 if error happened
*/
static
ulint
buf_mtflu_flush_pool_instance
(
/*==========================*/
wrk_t
*
work_item
)
/*!< inout: work item to be flushed */
{
ut_a
(
work_item
!=
NULL
);
ut_a
(
work_item
->
wr
.
buf_pool
!=
NULL
);
if
(
!
buf_flush_start
(
buf_pool
,
BUF_FLUSH_LIST
))
{
if
(
!
buf_flush_start
(
work_item
->
wr
.
buf_pool
,
work_item
->
wr
.
flush_type
))
{
/* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer
pool means we cannot guarantee that all pages
...
...
@@ -310,794 +207,512 @@ buf_mtflu_flush_list(
pools based on the assumption that it will
help in the retry which will follow the
failure. */
success
=
false
;
continue
;
}
page_count
=
buf_flush_batch
(
buf_pool
,
BUF_FLUSH_LIST
,
min_n
,
lsn_limit
);
buf_flush_end
(
buf_pool
,
BUF_FLUSH_LIST
);
buf_flush_common
(
BUF_FLUSH_LIST
,
page_count
);
if
(
n_processed
)
{
*
n_processed
+=
page_count
;
}
if
(
page_count
)
{
MONITOR_INC_VALUE_CUMULATIVE
(
MONITOR_FLUSH_BATCH_TOTAL_PAGE
,
MONITOR_FLUSH_BATCH_COUNT
,
MONITOR_FLUSH_BATCH_PAGES
,
page_count
);
}
}
#ifdef UNIV_DEBUG
gettimeofday
(
&
p_end_time
,
0x0
);
timediff
(
&
p_end_time
,
&
p_start_time
,
&
d_time
);
fprintf
(
stderr
,
"[2] [*n_processed: (min:%lu)%lu %llu usec]
\n
"
,
(
min_n
*
srv_buf_pool_instances
),
*
n_processed
,
(
unsigned
long
long
)(
d_time
.
tv_usec
+
(
d_time
.
tv_sec
*
1000000
)));
#endif
return
(
success
);
}
/*********************************************************************//**
Clear up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
* Flush dirty pages at the tail of LRU to the disk
The depth to which we scan each buffer pool is controlled by dynamic
config parameter innodb_LRU_scan_depth.
@return total pages flushed */
ulint
buf_mtflu_flush_LRU_tail
(
void
)
/*==========================*/
{
ulint
total_flushed
=
0
,
i
=
0
;
int
cnt_flush
[
32
];
#ifdef UNIV_DEBUG
struct
timeval
p_start_time
,
p_end_time
,
d_time
;
gettimeofday
(
&
p_start_time
,
0x0
);
/* QUESTION: is this a really failure ? */
fprintf
(
stderr
,
"flush_start Failed, flush_type:%d
\n
"
,
work_item
->
wr
.
flush_type
);
#endif
assert
(
is_pgcomp_wrk_init_done
());
mutex_enter
(
&
pgcomp_mtx
);
pgcomp_flush_work_items
(
srv_buf_pool_instances
,
cnt_flush
,
BUF_FLUSH_LRU
,
srv_LRU_scan_depth
,
0
);
for
(
i
=
0
;
i
<
srv_buf_pool_instances
;
i
++
)
{
if
(
cnt_flush
[
i
])
{
total_flushed
+=
cnt_flush
[
i
];
MONITOR_INC_VALUE_CUMULATIVE
(
MONITOR_LRU_BATCH_TOTAL_PAGE
,
MONITOR_LRU_BATCH_COUNT
,
MONITOR_LRU_BATCH_PAGES
,
cnt_flush
[
i
]);
}
return
0
;
}
mutex_exit
(
&
pgcomp_mtx
);
#if UNIV_DEBUG
gettimeofday
(
&
p_end_time
,
0x0
);
timediff
(
&
p_end_time
,
&
p_start_time
,
&
d_time
);
if
(
work_item
->
wr
.
flush_type
==
BUF_FLUSH_LRU
)
{
/* srv_LRU_scan_depth can be arbitrarily large value.
* We cap it with current LRU size.
*/
buf_pool_mutex_enter
(
work_item
->
wr
.
buf_pool
);
work_item
->
wr
.
min
=
UT_LIST_GET_LEN
(
work_item
->
wr
.
buf_pool
->
LRU
);
buf_pool_mutex_exit
(
work_item
->
wr
.
buf_pool
);
work_item
->
wr
.
min
=
ut_min
(
srv_LRU_scan_depth
,
work_item
->
wr
.
min
);
}
fprintf
(
stderr
,
"[1] [*n_processed: (min:%lu)%lu %llu usec]
\n
"
,
(
srv_LRU_scan_depth
*
srv_buf_pool_instances
),
total_flushed
,
(
unsigned
long
long
)(
d_time
.
tv_usec
+
(
d_time
.
tv_sec
*
1000000
)));
#endif
work_item
->
n_flushed
=
buf_flush_batch
(
work_item
->
wr
.
buf_pool
,
work_item
->
wr
.
flush_type
,
work_item
->
wr
.
min
,
work_item
->
wr
.
lsn_limit
);
return
(
total_flushed
);
}
buf_flush_end
(
work_item
->
wr
.
buf_pool
,
work_item
->
wr
.
flush_type
);
buf_flush_common
(
work_item
->
wr
.
flush_type
,
work_item
->
n_flushed
);
/*******************************************************************//**
Set work done count to given count.
@return 1 if still work to do, 0 if no work left */
int
set_check_done_flag_count
(
int
cnt
)
/*================*/
{
return
(
check_wrk_done_count
=
cnt
);
}
/*******************************************************************//**
?
@return why ? */
int
set_pgcomp_wrk_init_done
(
void
)
/*================*/
{
pgcomp_wrk_initialized
=
1
;
return
0
;
}
/*******************************************************************//**
?
@return true if work is initialized */
bool
is_pgcomp_wrk_init_done
(
void
)
/*================*/
{
return
(
pgcomp_wrk_initialized
==
1
);
}
/*******************************************************************//**
Set current done pages count to the given value
@return number of pages flushed */
int
set_done_cnt_flag
(
int
val
)
/*================*/
#ifdef UNIV_DEBUG
/******************************************************************//**
Output work item list status,
*/
static
void
mtflu_print_work_list
(
/*==================*/
wrk_t
*
wi_list
)
/*!< in: Work item list */
{
/*
* Assumption: The thread calling into set_done_cnt_flag
* needs to have "cq.mtx" acquired, else not safe.
*/
done_cnt_flag
=
val
;
return
done_cnt_flag
;
}
wrk_t
*
wi
=
wi_list
;
ulint
i
=
0
;
/*******************************************************************//**
?
@return number of pages flushed */
int
cv_done_inc_flag_sig
(
thread_sync_t
*
ppc
)
/*================*/
{
mutex_enter
(
&
ppc
->
cq
->
mtx
);
ppc
->
stat_universal_num_processed
++
;
ppc
->
stat_cycle_num_processed
++
;
done_cnt_flag
++
;
if
(
!
(
done_cnt_flag
<=
check_wrk_done_count
))
{
fprintf
(
stderr
,
"ERROR: done_cnt:%d check_wrk_done_count:%d
\n
"
,
done_cnt_flag
,
check_wrk_done_count
);
if
(
!
wi_list
)
{
fprintf
(
stderr
,
"list NULL
\n
"
);
}
assert
(
done_cnt_flag
<=
check_wrk_done_count
);
mutex_exit
(
&
ppc
->
cq
->
mtx
);
if
(
done_cnt_flag
==
check_wrk_done_count
)
{
// why below does not need mutex protection ?
ppc
->
wq
->
flag
=
Q_DONE
;
mutex_enter
(
&
ppc
->
cq
->
mtx
);
ppc
->
cq
->
flag
=
Q_DONE
;
os_cond_signal
(
&
ppc
->
cq
->
cv
);
mutex_exit
(
&
ppc
->
cq
->
mtx
);
while
(
wi
)
{
fprintf
(
stderr
,
"-
\t
[%p]
\t
[%s]
\t
[%lu] > %p
\n
"
,
wi
,
(
wi
->
id_usr
==
-
1
)
?
"free"
:
"Busy"
,
wi
->
n_flushed
,
wi
->
next
);
wi
=
wi
->
next
;
i
++
;
}
return
(
done_cnt_flag
);
fprintf
(
stderr
,
"list len: %d
\n
"
,
i
);
}
#endif
/* UNIV_DEBUG */
/*******************************************************************//**
Remove work item from queue, in my opinion not needed after we use
UT_LIST
@return number of pages flushed */
int
q_remove_wrk
(
opq_t
*
q
,
wrk_t
**
wi
)
/*================*/
/******************************************************************//**
Worker function to wait for work items and processing them and
sending reply back.
*/
static
void
mtflush_service_io
(
/*===============*/
thread_sync_t
*
mtflush_io
)
/*!< inout: multi-threaded flush
syncronization data */
{
int
ret
=
0
;
wrk_t
*
work_item
=
NULL
;
ulint
n_flushed
=
0
;
ib_time_t
max_wait_usecs
=
5000000
;
if
(
!
wi
||
!
q
)
{
return
-
1
;
}
mtflush_io
->
wt_status
=
WTHR_SIG_WAITING
;
work_item
=
(
wrk_t
*
)
ib_wqueue_timedwait
(
mtflush_io
->
wq
,
max_wait_usecs
);
mutex_enter
(
&
q
->
mtx
);
assert
(
!
((
q
->
tail
==
NULL
)
&&
(
q
->
head
!=
NULL
)));
assert
(
!
((
q
->
tail
!=
NULL
)
&&
(
q
->
head
==
NULL
)));
/* get the first in the list*/
*
wi
=
q
->
head
;
if
(
q
->
head
)
{
ret
=
0
;
q
->
head
=
q
->
head
->
next
;
(
*
wi
)
->
next
=
NULL
;
if
(
!
q
->
head
)
{
q
->
tail
=
NULL
;
}
#ifdef UNIV_DEBUG
mtflu_print_work_list
(
mtflush_io
->
work_item
);
#endif
if
(
work_item
)
{
mtflush_io
->
wt_status
=
WTHR_RUNNING
;
}
else
{
q
->
tail
=
NULL
;
ret
=
1
;
/* indicating remove from queue failed */
/* Because of timeout this thread did not get any work */
mtflush_io
->
wt_status
=
WTHR_NO_WORK
;
return
;
}
mutex_exit
(
&
q
->
mtx
);
return
(
ret
);
}
/*******************************************************************//**
Return true if work item has being assigned to a thread or false
if work item is not assigned.
@return true if work is assigned, false if not */
bool
is_busy_wrk_itm
(
wrk_t
*
wi
)
/*================*/
{
if
(
!
wi
)
{
return
-
1
;
}
return
(
!
(
wi
->
id_usr
==
-
1
));
}
work_item
->
id_usr
=
mtflush_io
->
wthread
;
/*******************************************************************//**
Initialize work items.
@return why ? */
int
setup_wrk_itm
(
int
items
)
/*================*/
{
int
i
;
for
(
i
=
0
;
i
<
items
;
i
++
)
{
work_items
[
i
].
buf_pool
=
NULL
;
work_items
[
i
].
result
=
0
;
work_items
[
i
].
t_usec
=
0
;
work_items
[
i
].
id_usr
=
-
1
;
work_items
[
i
].
wi_status
=
WRK_ITEM_STATUS_UNDEFINED
;
work_items
[
i
].
next
=
&
work_items
[(
i
+
1
)
%
items
];
}
/* last node should be the tail */
work_items
[
items
-
1
].
next
=
NULL
;
return
0
;
}
switch
(
work_item
->
tsk
)
{
case
MT_WRK_NONE
:
ut_a
(
work_item
->
wi_status
==
WRK_ITEM_EXIT
);
work_item
->
wi_status
=
WRK_ITEM_SUCCESS
;
/* QUESTION: Why completed work items are inserted to
completion queue ? */
ib_wqueue_add
(
mtflush_io
->
wr_cq
,
work_item
,
mtflush_io
->
wheap
);
break
;
/*******************************************************************//**
Initialize queue
@return why ? */
int
init_queue
(
opq_t
*
q
)
/*================*/
{
if
(
!
q
)
{
return
-
1
;
case
MT_WRK_WRITE
:
work_item
->
wi_status
=
WRK_ITEM_START
;
/* Process work item */
/* QUESTION: Is this a really a error ? */
if
(
0
!=
(
n_flushed
=
buf_mtflu_flush_pool_instance
(
work_item
)))
{
fprintf
(
stderr
,
"FLUSH op failed ret:%lu
\n
"
,
n_flushed
);
work_item
->
wi_status
=
WRK_ITEM_FAILED
;
}
/* Initialize Queue mutex and CV */
q
->
mtx
=
os_mutex_create
();
os_cond_init
(
&
q
->
cv
);
q
->
flag
=
Q_INITIALIZED
;
q
->
head
=
q
->
tail
=
NULL
;
return
0
;
}
work_item
->
wi_status
=
WRK_ITEM_SUCCESS
;
ib_wqueue_add
(
mtflush_io
->
wr_cq
,
work_item
,
mtflush_io
->
wheap
);
break
;
/// NEEDED ?
#if 0
int drain_cq(opq_t *cq, int items)
{
int i=0;
case
MT_WRK_READ
:
/* Need to also handle the read case */
/* TODO: ? */
ut_a
(
0
);
/* completed task get added to rd_cq */
/* work_item->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(mtflush_io->rd_cq, work_item, mtflush_io->wheap);*/
break
;
if(!cq) {
return -1;
}
mutex_enter(&cq->mtx);
for(i=0; i<items; i++) {
work_items[i].result=0;
work_items[i].t_usec = 0;
work_items[i].id_usr = -1;
default:
/* None other than Write/Read handling planned */
ut_a
(
0
);
}
cq->head = cq->tail = NULL;
mutex_unlock(&cq->mtx);
return 0;
mtflush_io
->
wt_status
=
WTHR_NO_WORK
;
}
#endif
/*******************************************************************//**
Insert work item list to queue, not needed with UT_LIST
@return why ? */
int
q_insert_wrk_list
(
opq_t
*
q
,
wrk_t
*
w_list
)
/*================*/
/******************************************************************//**
Thead used to flush dirty pages when multi-threaded flush is
used.
@return a dummy parameter*/
extern
"C"
UNIV_INTERN
os_thread_ret_t
DECLARE_THREAD
(
mtflush_io_thread
)(
/*==============================*/
void
*
arg
)
{
if
((
!
q
)
||
(
!
w_list
))
{
fprintf
(
stderr
,
"insert failed q:%p w:%p
\n
"
,
q
,
w_list
);
return
-
1
;
}
thread_sync_t
*
mtflush_io
=
((
thread_sync_t
*
)
arg
);
mutex_enter
(
&
q
->
mtx
);
assert
(
!
((
q
->
tail
==
NULL
)
&&
(
q
->
head
!=
NULL
)));
assert
(
!
((
q
->
tail
!=
NULL
)
&&
(
q
->
head
==
NULL
)));
/* list is empty */
if
(
!
q
->
tail
)
{
q
->
head
=
q
->
tail
=
w_list
;
}
else
{
/* added the first of the node to list */
assert
(
q
->
head
!=
NULL
);
q
->
tail
->
next
=
w_list
;
while
(
srv_shutdown_state
!=
SRV_SHUTDOWN_EXIT_THREADS
)
{
mtflush_service_io
(
mtflush_io
);
mtflush_io
->
stat_cycle_num_processed
=
0
;
}
/* move tail to the last node */
while
(
q
->
tail
->
next
)
{
q
->
tail
=
q
->
tail
->
next
;
/* This should make sure that all current work items are
processed before threads exit. */
while
(
!
ib_wqueue_is_empty
(
mtflush_io
->
wq
))
{
mtflush_service_io
(
mtflush_io
);
}
mutex_exit
(
&
q
->
mtx
);
return
0
;
os_thread_exit
(
NULL
);
OS_THREAD_DUMMY_RETURN
;
}
/*******************************************************************//**
Flush ?
@return why ? */
int
flush_pool_instance
(
wrk_t
*
wi
)
/*================*/
/******************************************************************//**
Add exit work item to work queue to signal multi-threded flush
threads that they should exit.
*/
void
buf_mtflu_io_thread_exit
(
void
)
/*==========================*/
{
struct
timeval
p_start_time
,
p_end_time
,
d_time
;
ulint
i
;
thread_sync_t
*
mtflush_io
=
mtflush_ctx
;
if
(
!
wi
)
{
fprintf
(
stderr
,
"work item invalid wi:%p
\n
"
,
wi
);
return
-
1
;
}
ut_a
(
mtflush_io
!=
NULL
);
wi
->
t_usec
=
0
;
if
(
!
buf_flush_start
(
wi
->
buf_pool
,
(
buf_flush_t
)
wi
->
flush_type
))
{
/* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer
pool means we cannot guarantee that all pages
up to lsn_limit has been flushed. We can
return right now with failure or we can try
to flush remaining buffer pools up to the
lsn_limit. We attempt to flush other buffer
pools based on the assumption that it will
help in the retry which will follow the
failure. */
fprintf
(
stderr
,
"flush_start Failed, flush_type:%d
\n
"
,
(
buf_flush_t
)
wi
->
flush_type
);
return
-
1
;
}
fprintf
(
stderr
,
"signal page_comp_io_threads to exit [%lu]
\n
"
,
srv_buf_pool_instances
);
#ifdef UNIV_DEBUG
/* Record time taken for the OP in usec */
gettimeofday
(
&
p_start_time
,
0x0
);
#endif
/* Send one exit work item/thread */
for
(
i
=
0
;
i
<
srv_buf_pool_instances
;
i
++
)
{
mtflush_io
->
work_item
[
i
].
wr
.
buf_pool
=
NULL
;
mtflush_io
->
work_item
[
i
].
rd
.
page_pool
=
NULL
;
mtflush_io
->
work_item
[
i
].
tsk
=
MT_WRK_NONE
;
mtflush_io
->
work_item
[
i
].
wi_status
=
WRK_ITEM_EXIT
;
if
((
buf_flush_t
)
wi
->
flush_type
==
BUF_FLUSH_LRU
)
{
/* srv_LRU_scan_depth can be arbitrarily large value.
* We cap it with current LRU size.
*/
buf_pool_mutex_enter
(
wi
->
buf_pool
);
wi
->
min
=
UT_LIST_GET_LEN
(
wi
->
buf_pool
->
LRU
);
buf_pool_mutex_exit
(
wi
->
buf_pool
);
wi
->
min
=
ut_min
(
srv_LRU_scan_depth
,
wi
->
min
);
ib_wqueue_add
(
mtflush_io
->
wq
,
(
void
*
)
&
(
mtflush_io
->
work_item
[
i
]),
mtflush_io
->
wheap
);
}
wi
->
result
=
buf_flush_batch
(
wi
->
buf_pool
,
(
buf_flush_t
)
wi
->
flush_type
,
wi
->
min
,
wi
->
lsn_limit
);
/* Wait until all work items on a work queue are processed */
while
(
!
ib_wqueue_is_empty
(
mtflush_io
->
wq
))
{
/* Wait about 1/2 sec */
os_thread_sleep
(
50000
);
}
buf_flush_end
(
wi
->
buf_pool
,
(
buf_flush_t
)
wi
->
flush_type
);
buf_flush_common
((
buf_flush_t
)
wi
->
flush_type
,
wi
->
result
);
ut_a
(
ib_wqueue_is_empty
(
mtflush_io
->
wq
));
#ifdef UNIV_DEBUG
gettimeofday
(
&
p_end_time
,
0x0
);
timediff
(
&
p_end_time
,
&
p_start_time
,
&
d_time
)
;
/* Collect all work done items */
for
(
i
=
0
;
i
<
srv_buf_pool_instances
;)
{
wrk_t
*
work_item
;
wi
->
t_usec
=
(
unsigned
long
)(
d_time
.
tv_usec
+
(
d_time
.
tv_sec
*
1000000
));
#endif
return
0
;
}
work_item
=
(
wrk_t
*
)
ib_wqueue_timedwait
(
mtflush_io
->
wr_cq
,
50000
);
/*******************************************************************//**
?
@return why ? */
int
service_page_comp_io
(
thread_sync_t
*
ppc
)
/*================*/
{
wrk_t
*
wi
=
NULL
;
int
ret
=
0
;
struct
timespec
ts
;
mutex_enter
(
&
ppc
->
wq
->
mtx
);
do
{
ppc
->
wt_status
=
WTHR_SIG_WAITING
;
ret
=
os_cond_wait
(
&
ppc
->
wq
->
cv
,
&
ppc
->
wq
->
mtx
);
ppc
->
wt_status
=
WTHR_RUNNING
;
if
(
ret
==
ETIMEDOUT
)
{
fprintf
(
stderr
,
"ERROR ETIMEDOUT cnt_flag:[%d] ret:%d
\n
"
,
done_cnt_flag
,
ret
);
}
else
if
(
ret
==
EINVAL
||
ret
==
EPERM
)
{
fprintf
(
stderr
,
"ERROR EINVAL/EPERM cnt_flag:[%d] ret:%d
\n
"
,
done_cnt_flag
,
ret
);
}
if
(
ppc
->
wq
->
flag
==
Q_PROCESS
)
{
break
;
}
else
{
mutex_exit
(
&
ppc
->
wq
->
mtx
);
return
-
1
;
if
(
work_item
)
{
i
++
;
}
}
while
(
ppc
->
wq
->
flag
==
Q_PROCESS
&&
ret
==
0
);
mutex_exit
(
&
ppc
->
wq
->
mtx
);
while
(
ppc
->
cq
->
flag
==
Q_PROCESS
)
{
wi
=
NULL
;
/* Get the work item */
if
(
0
!=
(
ret
=
q_remove_wrk
(
ppc
->
wq
,
&
wi
)))
{
ppc
->
wt_status
=
WTHR_NO_WORK
;
return
-
1
;
}
assert
(
ret
==
0
);
assert
(
wi
!=
NULL
);
assert
(
0
==
is_busy_wrk_itm
(
wi
));
assert
(
wi
->
id_usr
==
-
1
);
ut_a
(
ib_wqueue_is_empty
(
mtflush_io
->
wr_cq
));
ut_a
(
ib_wqueue_is_empty
(
mtflush_io
->
rd_cq
));
wi
->
id_usr
=
ppc
->
wthread
;
wi
->
wi_status
=
WRK_ITEM_START
;
/* Free all queues */
ib_wqueue_free
(
mtflush_io
->
wq
);
ib_wqueue_free
(
mtflush_io
->
wr_cq
);
ib_wqueue_free
(
mtflush_io
->
rd_cq
);
/* Process work item */
if
(
0
!=
(
ret
=
flush_pool_instance
(
wi
)))
{
fprintf
(
stderr
,
"FLUSH op failed ret:%d
\n
"
,
ret
);
wi
->
wi_status
=
WRK_ITEM_FAILED
;
}
ret
=
q_insert_wrk_list
(
ppc
->
cq
,
wi
);
/* Free heap */
mem_heap_free
(
mtflush_io
->
wheap
);
assert
(
0
==
ret
);
assert
(
check_wrk_done_count
>=
done_cnt_flag
);
wi
->
wi_status
=
WRK_ITEM_SUCCESS
;
if
(
check_wrk_done_count
==
cv_done_inc_flag_sig
(
ppc
))
{
break
;
}
}
return
(
0
);
os_fast_mutex_free
(
&
mtflush_mtx
);
}
/******************************************************************//**
Thread main function for multi-threaded flush
@return
a dummy parameter
*/
extern
"C"
UNIV_INTERN
os_thread_ret_t
DECLARE_THREAD
(
page_comp_io_thread
)(
/*==========================================
*/
void
*
arg
)
Initialize multi-threaded flush thread syncronization data.
@return
Initialized multi-threaded flush thread syncroniztion data.
*/
void
*
buf_mtflu_handler_init
(
/*===================*/
ulint
n_threads
,
/*!< in: Number of threads to create
*/
ulint
wrk_cnt
)
/*!< in: Number of work items */
{
thread_sync_t
*
ppc_io
=
((
thread_sync_t
*
)
arg
);
ulint
i
;
mem_heap_t
*
mtflush_heap
;
ib_wqueue_t
*
mtflush_work_queue
;
ib_wqueue_t
*
mtflush_write_comp_queue
;
ib_wqueue_t
*
mtflush_read_comp_queue
;
wrk_t
*
work_items
;
os_fast_mutex_init
(
PFS_NOT_INSTRUMENTED
,
&
mtflush_mtx
);
/* Create heap, work queue, write completion queue, read
completion queue for multi-threaded flush, and init
handler. */
mtflush_heap
=
mem_heap_create
(
0
);
ut_a
(
mtflush_heap
!=
NULL
);
mtflush_work_queue
=
ib_wqueue_create
();
ut_a
(
mtflush_work_queue
!=
NULL
);
mtflush_write_comp_queue
=
ib_wqueue_create
();
ut_a
(
mtflush_write_comp_queue
!=
NULL
);
mtflush_read_comp_queue
=
ib_wqueue_create
();
ut_a
(
mtflush_read_comp_queue
!=
NULL
);
mtflush_ctx
=
(
thread_sync_t
*
)
mem_heap_alloc
(
mtflush_heap
,
MTFLUSH_MAX_WORKER
*
sizeof
(
thread_sync_t
));
ut_a
(
mtflush_ctx
!=
NULL
);
work_items
=
(
wrk_t
*
)
mem_heap_alloc
(
mtflush_heap
,
MTFLUSH_MAX_WORKER
*
sizeof
(
wrk_t
));
ut_a
(
work_items
!=
NULL
);
/* Initialize work items */
mtflu_setup_work_items
(
work_items
,
MTFLUSH_MAX_WORKER
);
while
(
srv_shutdown_state
!=
SRV_SHUTDOWN_EXIT_THREADS
)
{
service_page_comp_io
(
ppc_io
);
ppc_io
->
stat_cycle_num_processed
=
0
;
}
os_thread_exit
(
NULL
);
OS_THREAD_DUMMY_RETURN
;
}
/* Create threads for page-compression-flush */
for
(
i
=
0
;
i
<
n_threads
;
i
++
)
{
os_thread_id_t
new_thread_id
;
mtflush_ctx
[
i
].
wq
=
mtflush_work_queue
;
mtflush_ctx
[
i
].
wr_cq
=
mtflush_write_comp_queue
;
mtflush_ctx
[
i
].
rd_cq
=
mtflush_read_comp_queue
;
mtflush_ctx
[
i
].
wheap
=
mtflush_heap
;
mtflush_ctx
[
i
].
wt_status
=
WTHR_INITIALIZED
;
mtflush_ctx
[
i
].
work_item
=
work_items
;
/*******************************************************************//**
Print queue work item
@return why ? */
int
print_queue_wrk_itm
(
opq_t
*
q
)
/*================*/
{
#if UNIV_DEBUG
wrk_t
*
wi
=
NULL
;
mtflush_ctx
[
i
].
wthread
=
os_thread_create
(
mtflush_io_thread
,
((
void
*
)(
mtflush_ctx
+
i
)),
&
new_thread_id
);
if
(
!
q
)
{
fprintf
(
stderr
,
"queue NULL
\n
"
);
return
-
1
;
mtflush_ctx
[
i
].
wthread_id
=
new_thread_id
;
}
if
(
!
q
->
head
||
!
q
->
tail
)
{
assert
(
!
(((
q
->
tail
==
NULL
)
&&
(
q
->
head
!=
NULL
))
&&
((
q
->
tail
!=
NULL
)
&&
(
q
->
head
==
NULL
))));
fprintf
(
stderr
,
"queue empty (h:%p t:%p)
\n
"
,
q
->
head
,
q
->
tail
);
return
0
;
}
buf_mtflu_work_init
();
mutex_enter
(
&
q
->
mtx
);
for
(
wi
=
q
->
head
;
(
wi
!=
NULL
)
;
wi
=
wi
->
next
)
{
//fprintf(stderr, "- [%p] %p %lu %luus [%ld] >%p\n",
// wi, wi->buf_pool, wi->result, wi->t_usec, wi->id_usr, wi->next);
fprintf
(
stderr
,
"- [%p] [%s] >%p
\n
"
,
wi
,
(
wi
->
id_usr
==
-
1
)
?
"free"
:
"Busy"
,
wi
->
next
);
}
mutex_exit
(
&
q
->
mtx
);
#endif
return
(
0
);
return
((
void
*
)
mtflush_ctx
);
}
/*******************************************************************//**
Print work list
@return why ? */
int
print_wrk_list
(
wrk_t
*
wi_list
)
/*================*/
/******************************************************************//**
Flush buffer pool instances.
@return number of pages flushed. */
ulint
buf_mtflu_flush_work_items
(
/*=======================*/
ulint
buf_pool_inst
,
/*!< in: Number of buffer pool instances */
ulint
*
per_pool_pages_flushed
,
/*!< out: Number of pages
flushed/instance */
enum
buf_flush
flush_type
,
/*!< in: Type of flush */
ulint
min_n
,
/*!< in: Wished minimum number of
blocks to be flushed */
lsn_t
lsn_limit
)
/*!< in: All blocks whose
oldest_modification is smaller than
this should be flushed (if their
number does not exceed min_n) */
{
wrk_t
*
wi
=
wi_list
;
int
i
=
0
;
if
(
!
wi_list
)
{
fprintf
(
stderr
,
"list NULL
\n
"
);
}
ulint
n_flushed
=
0
,
i
;
wrk_t
*
done_wi
;
while
(
wi
)
{
fprintf
(
stderr
,
"-
\t
[%p]
\t
[%s]
\t
[%lu]
\t
[%luus] > %p
\n
"
,
wi
,
(
wi
->
id_usr
==
-
1
)
?
"free"
:
"Busy"
,
wi
->
result
,
wi
->
t_usec
,
wi
->
next
);
wi
=
wi
->
next
;
i
++
;
for
(
i
=
0
;
i
<
buf_pool_inst
;
i
++
)
{
mtflush_ctx
->
work_item
[
i
].
tsk
=
MT_WRK_WRITE
;
mtflush_ctx
->
work_item
[
i
].
rd
.
page_pool
=
NULL
;
mtflush_ctx
->
work_item
[
i
].
wr
.
buf_pool
=
buf_pool_from_array
(
i
);
mtflush_ctx
->
work_item
[
i
].
wr
.
flush_type
=
flush_type
;
mtflush_ctx
->
work_item
[
i
].
wr
.
min
=
min_n
;
mtflush_ctx
->
work_item
[
i
].
wr
.
lsn_limit
=
lsn_limit
;
mtflush_ctx
->
work_item
[
i
].
id_usr
=
-
1
;
mtflush_ctx
->
work_item
[
i
].
wi_status
=
WRK_ITEM_SET
;
ib_wqueue_add
(
mtflush_ctx
->
wq
,
(
void
*
)(
&
(
mtflush_ctx
->
work_item
[
i
])),
mtflush_ctx
->
wheap
);
}
/* wait on the completion to arrive */
for
(
i
=
0
;
i
<
buf_pool_inst
;)
{
done_wi
=
(
wrk_t
*
)
ib_wqueue_timedwait
(
mtflush_ctx
->
wr_cq
,
50000
);
if
(
done_wi
!=
NULL
)
{
if
(
done_wi
->
n_flushed
==
0
)
{
per_pool_pages_flushed
[
i
]
=
0
;
}
else
{
per_pool_pages_flushed
[
i
]
=
done_wi
->
n_flushed
;
}
fprintf
(
stderr
,
"list len: %d
\n
"
,
i
);
return
0
;
}
/*******************************************************************//**
?
@return why ? */
int
pgcomp_handler
(
wrk_t
*
w_list
)
/*================*/
{
struct
timespec
ts
;
int
ret
=
0
,
t_flag
=
0
;
opq_t
*
wrk_q
=
NULL
,
*
comp_q
=
NULL
;
wrk_t
*
tw_list
=
NULL
;
wrk_q
=&
wq
;
comp_q
=&
cq
;
mutex_enter
(
&
wrk_q
->
mtx
);
/* setup work queue here.. */
wrk_q
->
flag
=
Q_EMPTY
;
mutex_exit
(
&
wrk_q
->
mtx
);
ret
=
q_insert_wrk_list
(
wrk_q
,
w_list
);
if
(
ret
!=
0
)
{
fprintf
(
stderr
,
"%s():work-queue setup FAILED wq:%p w_list:%p
\n
"
,
__FUNCTION__
,
&
wq
,
w_list
);
return
-
1
;
if
(
done_wi
->
id_usr
==
-
1
&&
done_wi
->
wi_status
==
WRK_ITEM_SET
)
{
fprintf
(
stderr
,
"**Set/Unused work_item[%d] flush_type=%lu
\n
"
,
i
,
done_wi
->
wr
.
flush_type
);
ut_a
(
0
);
}
retry_submit:
mutex_enter
(
&
wrk_q
->
mtx
);
/* setup work queue here.. */
wrk_q
->
flag
=
Q_INITIALIZED
;
mutex_exit
(
&
wrk_q
->
mtx
);
mutex_enter
(
&
comp_q
->
mtx
);
if
(
0
!=
set_done_cnt_flag
(
0
))
{
fprintf
(
stderr
,
"FAILED %s:%d
\n
"
,
__FILE__
,
__LINE__
);
mutex_exit
(
&
comp_q
->
mtx
);
return
-
1
;
}
comp_q
->
flag
=
Q_PROCESS
;
mutex_enter
(
&
comp_q
->
mtx
);
/* if threads are waiting request them to start */
mutex_enter
(
&
wrk_q
->
mtx
);
wrk_q
->
flag
=
Q_PROCESS
;
os_cond_broadcast
(
&
wrk_q
->
cv
);
mutex_exit
(
&
wrk_q
->
mtx
);
/* Wait on all worker-threads to complete */
mutex_enter
(
&
comp_q
->
mtx
);
if
(
comp_q
->
flag
!=
Q_DONE
)
{
do
{
os_cond_wait
(
&
comp_q
->
cv
,
&
comp_q
->
mtx
);
if
(
comp_q
->
flag
!=
Q_DONE
)
{
fprintf
(
stderr
,
"[1] cv wait on CQ failed flag:%d cnt:%d
\n
"
,
comp_q
->
flag
,
done_cnt_flag
);
if
(
done_cnt_flag
!=
srv_buf_pool_instances
)
{
fprintf
(
stderr
,
"[2] cv wait on CQ failed flag:%d cnt:%d
\n
"
,
comp_q
->
flag
,
done_cnt_flag
);
fprintf
(
stderr
,
"============
\n
"
);
print_wrk_list
(
w_list
);
fprintf
(
stderr
,
"============
\n
"
);
}
continue
;
}
else
if
(
done_cnt_flag
!=
srv_buf_pool_instances
)
{
fprintf
(
stderr
,
"[3]cv wait on CQ failed flag:%d cnt:%d
\n
"
,
comp_q
->
flag
,
done_cnt_flag
);
fprintf
(
stderr
,
"============
\n
"
);
print_wrk_list
(
w_list
);
fprintf
(
stderr
,
"============
\n
"
);
comp_q
->
flag
=
Q_INITIALIZED
;
mutex_exit
(
&
comp_q
->
mtx
);
goto
retry_submit
;
ut_ad
(
!
done_cnt_flag
);
continue
;
}
ut_ad
(
done_cnt_flag
==
srv_buf_pool_instances
);
n_flushed
+=
done_wi
->
n_flushed
;
/* Reset for next round*/
mtflush_ctx
->
work_item
[
i
].
id_usr
=
-
1
;
if
((
comp_q
->
flag
==
Q_DONE
)
&&
(
done_cnt_flag
==
srv_buf_pool_instances
))
{
break
;
}
}
while
((
comp_q
->
flag
==
Q_INITIALIZED
)
&&
(
done_cnt_flag
!=
srv_buf_pool_instances
));
}
else
{
fprintf
(
stderr
,
"[4] cv wait on CQ failed flag:%d cnt:%d
\n
"
,
comp_q
->
flag
,
done_cnt_flag
);
if
(
!
done_cnt_flag
)
{
fprintf
(
stderr
,
"============
\n
"
);
print_wrk_list
(
w_list
);
fprintf
(
stderr
,
"============
\n
"
);
comp_q
->
flag
=
Q_INITIALIZED
;
mutex_enter
(
&
comp_q
->
mtx
);
goto
retry_submit
;
ut_ad
(
!
done_cnt_flag
);
i
++
;
}
ut_ad
(
done_cnt_flag
==
srv_buf_pool_instances
);
}
mutex_exit
(
&
comp_q
->
mtx
);
mutex_enter
(
&
wrk_q
->
mtx
);
wrk_q
->
flag
=
Q_DONE
;
mutex_exit
(
&
wrk_q
->
mtx
);
return
0
;
return
(
n_flushed
);
}
/******************************************************************//**
@return a dummy parameter*/
int
pgcomp_handler_init
(
int
num_threads
,
int
wrk_cnt
,
opq_t
*
wq
,
opq_t
*
cq
)
/*******************************************************************//**
Flushes dirty blocks from the end of the LRU list and also
puts replaceable clean pages from the end of the LRU list to the free
list.
NOTE: The calling thread is not allowed to own any latches on pages!
@return true if a batch was queued successfully. false if another batch
of same type was already running. */
bool
buf_mtflu_flush_LRU
(
/*================*/
buf_pool_t
*
buf_pool
,
/*!< in/out: buffer pool instance */
ulint
min_n
,
/*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
ulint
*
n_processed
)
/*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
{
int
i
=
0
;
ulint
page_count
;
if
(
is_pgcomp_wrk_init_done
())
{
fprintf
(
stderr
,
"pgcomp_handler_init(): ERROR already initialized
\n
"
);
return
-
1
;
if
(
n_processed
)
{
*
n_processed
=
0
;
}
if
(
!
wq
||
!
cq
)
{
fprintf
(
stderr
,
"%s() FAILED wq:%p cq:%p
\n
"
,
__FUNCTION__
,
wq
,
cq
);
return
-
1
;
if
(
!
buf_flush_start
(
buf_pool
,
BUF_FLUSH_LRU
))
{
return
(
false
);
}
/* work-item setup */
setup_wrk_itm
(
wrk_cnt
);
page_count
=
buf_flush_batch
(
buf_pool
,
BUF_FLUSH_LRU
,
min_n
,
0
);
/* wq & cq setup */
init_queue
(
wq
);
init_queue
(
cq
);
buf_flush_end
(
buf_pool
,
BUF_FLUSH_LRU
);
/* Mark each of the thread sync entires */
for
(
i
=
0
;
i
<
PGCOMP_MAX_WORKER
;
i
++
)
{
pc_sync
[
i
].
wthread_id
=
i
;
}
buf_flush_common
(
BUF_FLUSH_LRU
,
page_count
);
/* Create threads for page-compression-flush */
for
(
i
=
0
;
i
<
num_threads
;
i
++
)
{
pc_sync
[
i
].
wthread_id
=
i
;
pc_sync
[
i
].
wq
=
wq
;
pc_sync
[
i
].
cq
=
cq
;
os_thread_create
(
page_comp_io_thread
,
((
void
*
)(
pc_sync
+
i
)),
thread_ids
+
START_PGCOMP_CNT
+
i
);
//pc_sync[i].wthread = thread_ids[START_PGCOMP_CNT + i];
pc_sync
[
i
].
wthread
=
(
START_PGCOMP_CNT
+
i
);
pc_sync
[
i
].
wt_status
=
WTHR_INITIALIZED
;
if
(
n_processed
)
{
*
n_processed
=
page_count
;
}
set_check_done_flag_count
(
wrk_cnt
);
set_pgcomp_wrk_init_done
();
return
0
;
return
(
true
);
}
/*******************************************************************//**
Print work thread status information
@return why ? */
int
wrk_thread_stat
(
thread_sync_t
*
wthr
,
unsigned
int
num_threads
)
/*================*/
Multi-threaded version of buf_flush_list
*/
bool
buf_mtflu_flush_list
(
/*=================*/
ulint
min_n
,
/*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
lsn_t
lsn_limit
,
/*!< in the case BUF_FLUSH_LIST all
blocks whose oldest_modification is
smaller than this should be flushed
(if their number does not exceed
min_n), otherwise ignored */
ulint
*
n_processed
)
/*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
{
long
stat_tot
=
0
;
int
i
=
0
;
for
(
i
=
0
;
i
<
num_threads
;
i
++
)
{
stat_tot
+=
wthr
[
i
].
stat_universal_num_processed
;
fprintf
(
stderr
,
"[%d] stat [%lu]
\n
"
,
wthr
[
i
].
wthread_id
,
wthr
[
i
].
stat_universal_num_processed
)
;
ulint
i
;
bool
success
=
true
;
ulint
cnt_flush
[
MTFLUSH_MAX_WORKER
];
if
(
n_processed
)
{
*
n_processed
=
0
;
}
fprintf
(
stderr
,
"Stat-Total:%lu
\n
"
,
stat_tot
);
}
/*******************************************************************//**
Reset work items
@return why ? */
in
t
reset_wrk_itm
(
int
items
)
/*================*/
{
int
i
;
if
(
min_n
!=
ULINT_MAX
)
{
/* Ensure that flushing is spread evenly amongst the
buffer pool instances. When min_n is ULINT_MAX
we need to flush everything up to the lsn limi
t
so no limit here. */
min_n
=
(
min_n
+
srv_buf_pool_instances
-
1
)
/
srv_buf_pool_instances
;
}
mutex_enter
(
&
wq
.
mtx
);
wq
.
head
=
wq
.
tail
=
NULL
;
mutex_exit
(
&
wq
.
mtx
);
/* QUESTION: What is procted by below mutex ? */
os_fast_mutex_lock
(
&
mtflush_mtx
);
buf_mtflu_flush_work_items
(
srv_buf_pool_instances
,
cnt_flush
,
BUF_FLUSH_LIST
,
min_n
,
lsn_limit
);
os_fast_mutex_unlock
(
&
mtflush_mtx
);
mutex_enter
(
&
cq
.
mtx
);
for
(
i
=
0
;
i
<
items
;
i
++
)
{
work_items
[
i
].
id_usr
=
-
1
;
for
(
i
=
0
;
i
<
srv_buf_pool_instances
;
i
++
)
{
if
(
n_processed
)
{
*
n_processed
+=
cnt_flush
[
i
]
;
}
cq
.
head
=
cq
.
tail
=
NULL
;
mutex_exit
(
&
cq
.
mtx
);
return
0
;
if
(
cnt_flush
[
i
])
{
MONITOR_INC_VALUE_CUMULATIVE
(
MONITOR_FLUSH_BATCH_TOTAL_PAGE
,
MONITOR_FLUSH_BATCH_COUNT
,
MONITOR_FLUSH_BATCH_PAGES
,
cnt_flush
[
i
]);
}
}
#ifdef UNIV_DEBUG
fprintf
(
stderr
,
"%s: [1] [*n_processed: (min:%lu)%lu ]
\n
"
,
__FUNCTION__
,
(
min_n
*
srv_buf_pool_instances
),
*
n_processed
);
#endif
return
(
success
);
}
/*******************************************************************//**
?
@return why ? */
int
pgcomp_flush_work_items
(
/*================*/
int
buf_pool_inst
,
int
*
per_pool_pages_flushed
,
int
flush_type
,
int
min_n
,
lsn_t
lsn_limit
)
/*******************************************************************
**
//**
Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
* Flush dirty pages at the tail of LRU to the disk
The depth to which we scan each buffer pool is controlled by dynamic
config parameter innodb_LRU_scan_depth.
@return total pages flushed */
UNIV_INTERN
ulint
buf_mtflu_flush_LRU_tail
(
void
)
/*==========================*/
{
int
ret
=
0
,
i
=
0
;
mutex_enter
(
&
wq
.
mtx
);
mutex_enter
(
&
cq
.
mtx
);
assert
(
wq
.
head
==
NULL
);
assert
(
wq
.
tail
==
NULL
);
if
(
cq
.
head
)
{
print_wrk_list
(
cq
.
head
);
}
assert
(
cq
.
head
==
NULL
);
assert
(
cq
.
tail
==
NULL
);
ulint
total_flushed
=
0
,
i
;
ulint
cnt_flush
[
MTFLUSH_MAX_WORKER
];
for
(
i
=
0
;
i
<
buf_pool_inst
;
i
++
)
{
work_items
[
i
].
buf_pool
=
buf_pool_from_array
(
i
);
work_items
[
i
].
flush_type
=
flush_type
;
work_items
[
i
].
min
=
min_n
;
work_items
[
i
].
lsn_limit
=
lsn_limit
;
work_items
[
i
].
id_usr
=
-
1
;
work_items
[
i
].
next
=
&
work_items
[(
i
+
1
)
%
buf_pool_inst
];
work_items
[
i
].
wi_status
=
WRK_ITEM_SET
;
}
work_items
[
i
-
1
].
next
=
NULL
;
ut_a
(
buf_mtflu_init_done
());
mutex_exit
(
&
cq
.
mtx
);
mutex_exit
(
&
wq
.
mtx
);
/* QUESTION: What is protected by below mutex ? */
os_fast_mutex_lock
(
&
mtflush_mtx
);
buf_mtflu_flush_work_items
(
srv_buf_pool_instances
,
cnt_flush
,
BUF_FLUSH_LRU
,
srv_LRU_scan_depth
,
0
);
os_fast_mutex_unlock
(
&
mtflush_mtx
);
pgcomp_handler
(
work_items
);
for
(
i
=
0
;
i
<
srv_buf_pool_instances
;
i
++
)
{
if
(
cnt_flush
[
i
])
{
total_flushed
+=
cnt_flush
[
i
];
mutex_enter
(
&
wq
.
mtx
);
mutex_enter
(
&
cq
.
mtx
);
/* collect data/results total pages flushed */
for
(
i
=
0
;
i
<
buf_pool_inst
;
i
++
)
{
if
(
work_items
[
i
].
result
==
-
1
)
{
ret
=
-
1
;
per_pool_pages_flushed
[
i
]
=
0
;
}
else
{
per_pool_pages_flushed
[
i
]
=
work_items
[
i
].
result
;
}
if
((
work_items
[
i
].
id_usr
==
-
1
)
&&
(
work_items
[
i
].
wi_status
==
WRK_ITEM_SET
))
{
fprintf
(
stderr
,
"**Set/Unused work_item[%d] flush_type=%d
\n
"
,
i
,
work_items
[
i
].
flush_type
);
assert
(
0
);
MONITOR_INC_VALUE_CUMULATIVE
(
MONITOR_LRU_BATCH_TOTAL_PAGE
,
MONITOR_LRU_BATCH_COUNT
,
MONITOR_LRU_BATCH_PAGES
,
cnt_flush
[
i
]);
}
}
wq
.
flag
=
cq
.
flag
=
Q_INITIALIZED
;
mutex_exit
(
&
cq
.
mtx
);
mutex_exit
(
&
wq
.
mtx
);
#if UNIV_DEBUG
/* Print work-list stats */
fprintf
(
stderr
,
"==wq== [DONE]
\n
"
);
print_wrk_list
(
wq
.
head
);
fprintf
(
stderr
,
"==cq== [DONE]
\n
"
);
print_wrk_list
(
cq
.
head
);
fprintf
(
stderr
,
"==worker-thread-stats==
\n
"
);
wrk_thread_stat
(
pc_sync
,
pgc_n_threads
);
fprintf
(
stderr
,
"[1] [*n_processed: (min:%lu)%lu ]
\n
"
,
(
srv_LRU_scan_depth
*
srv_buf_pool_instances
),
total_flushed
);
#endif
/* clear up work-queue for next flush */
reset_wrk_itm
(
buf_pool_inst
);
return
(
ret
);
return
(
total_flushed
);
}
/*********************************************************************//**
Set correct thread identifiers to io thread array based on
information we have. */
void
buf_mtflu_set_thread_ids
(
/*=====================*/
ulint
n_threads
,
/*!<in: Number of threads to fill */
void
*
ctx
,
/*!<in: thread context */
os_thread_id_t
*
thread_ids
)
/*!<in: thread id array */
{
thread_sync_t
*
mtflush_io
=
((
thread_sync_t
*
)
ctx
);
ulint
i
;
ut_a
(
mtflush_io
!=
NULL
);
ut_a
(
thread_ids
!=
NULL
);
for
(
i
=
0
;
i
<
n_threads
;
i
++
)
{
thread_ids
[
i
]
=
mtflush_io
[
i
].
wthread_id
;
}
}
storage/innobase/include/buf0flu.h
View file @
7f3950a2
/*****************************************************************************
Copyright (c) 1995, 2011, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2014, SkySQL Ab.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
...
...
@@ -274,6 +275,54 @@ buf_flush_get_dirty_pages_count(
#endif
/* !UNIV_HOTBACKUP */
/******************************************************************//**
Start a buffer flush batch for LRU or flush list */
ibool
buf_flush_start
(
/*============*/
buf_pool_t
*
buf_pool
,
/*!< buffer pool instance */
enum
buf_flush
flush_type
);
/*!< in: BUF_FLUSH_LRU
or BUF_FLUSH_LIST */
/******************************************************************//**
End a buffer flush batch for LRU or flush list */
void
buf_flush_end
(
/*==========*/
buf_pool_t
*
buf_pool
,
/*!< buffer pool instance */
enum
buf_flush
flush_type
);
/*!< in: BUF_FLUSH_LRU
or BUF_FLUSH_LIST */
/******************************************************************//**
Gather the aggregated stats for both flush list and LRU list flushing */
void
buf_flush_common
(
/*=============*/
enum
buf_flush
flush_type
,
/*!< in: type of flush */
ulint
page_count
);
/*!< in: number of pages flushed */
/*******************************************************************//**
This utility flushes dirty blocks from the end of the LRU list or flush_list.
NOTE 1: in the case of an LRU flush the calling thread may own latches to
pages: to avoid deadlocks, this function must be written so that it cannot
end up waiting for these latches! NOTE 2: in the case of a flush list flush,
the calling thread is not allowed to own any latches on pages!
@return number of blocks for which the write request was queued */
ulint
buf_flush_batch
(
/*============*/
buf_pool_t
*
buf_pool
,
/*!< in: buffer pool instance */
enum
buf_flush
flush_type
,
/*!< in: BUF_FLUSH_LRU or
BUF_FLUSH_LIST; if BUF_FLUSH_LIST,
then the caller must not own any
latches on pages */
ulint
min_n
,
/*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
lsn_t
lsn_limit
);
/*!< in: in the case of BUF_FLUSH_LIST
all blocks whose oldest_modification is
smaller than this should be flushed
(if their number does not exceed
min_n), otherwise ignored */
#ifndef UNIV_NONINL
#include "buf0flu.ic"
#endif
...
...
storage/innobase/include/buf0mtflu.h
0 → 100644
View file @
7f3950a2
/*****************************************************************************
Copyright (C) 2014 SkySQL Ab. All Rights Reserved.
Copyright (C) 2014 Fusion-io. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*****************************************************************************/
/******************************************************************//**
@file include/buf0mtflu.h
Multi-threadef flush method interface function prototypes
Created 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
Dhananjoy Das DDas@fusionio.com
***********************************************************************/
#ifndef buf0mtflu_h
#define buf0mtflu_h
/******************************************************************//**
Add exit work item to work queue to signal multi-threded flush
threads that they should exit.
*/
void
buf_mtflu_io_thread_exit
(
void
);
/*===========================*/
/******************************************************************//**
Initialize multi-threaded flush thread syncronization data.
@return Initialized multi-threaded flush thread syncroniztion data. */
void
*
buf_mtflu_handler_init
(
/*===================*/
ulint
n_threads
,
/*!< in: Number of threads to create */
ulint
wrk_cnt
);
/*!< in: Number of work items */
/******************************************************************//**
Return true if multi-threaded flush is initialized
@return true if initialized, false if not */
bool
buf_mtflu_init_done
(
void
);
/*======================*/
/*********************************************************************//**
Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
* Flush dirty pages at the tail of LRU to the disk
The depth to which we scan each buffer pool is controlled by dynamic
config parameter innodb_LRU_scan_depth.
@return total pages flushed */
UNIV_INTERN
ulint
buf_mtflu_flush_LRU_tail
(
void
);
/*===========================*/
/*******************************************************************//**
Multi-threaded version of buf_flush_list
*/
bool
buf_mtflu_flush_list
(
/*=================*/
ulint
min_n
,
/*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
lsn_t
lsn_limit
,
/*!< in the case BUF_FLUSH_LIST all
blocks whose oldest_modification is
smaller than this should be flushed
(if their number does not exceed
min_n), otherwise ignored */
ulint
*
n_processed
);
/*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
/*********************************************************************//**
Set correct thread identifiers to io thread array based on
information we have. */
void
buf_mtflu_set_thread_ids
(
/*=====================*/
ulint
n_threads
,
/*!<in: Number of threads to fill */
void
*
ctx
,
/*!<in: thread context */
os_thread_id_t
*
thread_ids
);
/*!<in: thread id array */
#endif
storage/innobase/include/srv0srv.h
View file @
7f3950a2
...
...
@@ -259,7 +259,7 @@ extern my_bool srv_use_lz4;
/* Number of flush threads */
#define MTFLUSH_MAX_WORKER 64
extern
ulint
srv_mtflush_threads
;
extern
long
srv_mtflush_threads
;
#ifdef __WIN__
extern
ibool
srv_use_native_conditions
;
...
...
storage/innobase/include/srv0start.h
View file @
7f3950a2
...
...
@@ -37,7 +37,8 @@ Created 10/10/1995 Heikki Tuuri
#endif
/*********************************************************************//**
Normalizes a directory path for Windows: converts slashes to backslashes. */
Normalizes a directory path for Windows: converts slashes to backslashes.
*/
UNIV_INTERN
void
srv_normalize_path_for_win
(
...
...
storage/innobase/srv/srv0srv.cc
View file @
7f3950a2
...
...
@@ -3,7 +3,7 @@
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2008, 2009 Google Inc.
Copyright (c) 2009, Percona Inc.
Copyright (c) 2013, 2014, SkySQL Ab.
Copyright (c) 2013, 2014, SkySQL Ab.
All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
...
...
@@ -162,6 +162,8 @@ UNIV_INTERN my_bool srv_use_posix_fallocate = FALSE;
UNIV_INTERN
my_bool
srv_use_atomic_writes
=
FALSE
;
/* If this flag IS TRUE, then we use lz4 to compress/decompress pages */
UNIV_INTERN
my_bool
srv_use_lz4
=
FALSE
;
/* Number of threads used for multi-threaded flush */
UNIV_INTERN
long
srv_mtflush_threads
=
0
;
#ifdef __WIN__
/* Windows native condition variables. We use runtime loading / function
...
...
storage/innobase/srv/srv0start.cc
View file @
7f3950a2
...
...
@@ -70,6 +70,7 @@ Created 2/16/1996 Heikki Tuuri
# include "sync0sync.h"
# include "buf0flu.h"
# include "buf0rea.h"
# include "buf0mtflu.h"
# include "dict0boot.h"
# include "dict0load.h"
# include "dict0stats_bg.h"
...
...
@@ -130,6 +131,8 @@ static ulint n[SRV_MAX_N_IO_THREADS + 6];
/** 6 is the ? */
#define START_OLD_THREAD_CNT (SRV_MAX_N_IO_THREADS + 6 + 32)
static
os_thread_id_t
thread_ids
[
SRV_MAX_N_IO_THREADS
+
6
+
32
+
MTFLUSH_MAX_WORKER
];
/* Thread contex data for multi-threaded flush */
void
*
mtflush_ctx
=
NULL
;
/** We use this mutex to test the return value of pthread_mutex_trylock
on successful locking. HP-UX does NOT return 0, though Linux et al do. */
...
...
@@ -1434,403 +1437,6 @@ srv_start_wait_for_purge_to_start()
}
}
/* JAN: TODO: */
/**********************************************************************************/
#ifdef UNIV_DEBUG
extern
int
timediff
(
struct
timeval
*
g_time
,
struct
timeval
*
s_time
,
struct
timeval
*
d_time
);
#endif
extern
ibool
buf_flush_start
(
buf_pool_t
*
buf_pool
,
enum
buf_flush
flush_type
);
extern
void
buf_flush_end
(
buf_pool_t
*
buf_pool
,
enum
buf_flush
flush_type
);
extern
void
buf_flush_common
(
enum
buf_flush
flush_type
,
ulint
page_count
);
extern
ulint
buf_flush_batch
(
buf_pool_t
*
buf_pool
,
enum
buf_flush
flush_type
,
ulint
min_n
,
lsn_t
lsn_limit
);
extern
void
pgcomp_init
(
void
);
extern
void
pgcomp_deinit
(
void
);
typedef
enum
wrk_status
{
WRK_ITEM_SET
=
0
,
// wrk-item is set
WRK_ITEM_START
=
1
,
// processing of wrk-item has started
WRK_ITEM_DONE
=
2
,
// processing is done usually set to SUCCESS/FAILED
WRK_ITEM_SUCCESS
=
2
,
// Success processing the wrk-item
WRK_ITEM_FAILED
=
3
,
// status of failed
WRK_ITEM_EXIT
=
4
,
WRK_ITEM_STATUS_UNDEFINED
}
wrk_status_t
;
typedef
enum
mt_wrk_tsk
{
MT_WRK_NONE
=
0
,
// Exit queue-wait
MT_WRK_WRITE
=
1
,
// Flush operation
MT_WRK_READ
=
2
,
// Decompress operation
MT_WRK_UNDEFINED
}
mt_wrk_tsk_t
;
typedef
enum
wthr_status
{
WTHR_NOT_INIT
=
0
,
WTHR_INITIALIZED
=
1
,
WTHR_SIG_WAITING
=
2
,
WTHR_RUNNING
=
3
,
WTHR_NO_WORK
=
4
,
WTHR_KILL_IT
=
5
,
WTHR_STATUS_UNDEFINED
}
wthr_status_t
;
typedef
struct
wr_tsk
{
buf_pool_t
*
buf_pool
;
// buffer-pool instance
enum
buf_flush
flush_type
;
// flush-type for buffer-pool flush operation
ulint
min
;
//minimum number of pages requested to be flushed
lsn_t
lsn_limit
;
//lsn limit for the buffer-pool flush operation
}
wr_tsk_t
;
typedef
struct
rd_tsk
{
void
*
page_pool
;
//list of pages to decompress;
}
rd_tsk_t
;
typedef
struct
wrk_itm
{
mt_wrk_tsk_t
tsk
;
/* based on task-type one of the entries wr_tsk/rd_tsk will be used */
wr_tsk_t
wr
;
//flush page list
rd_tsk_t
rd
;
//decompress page list
unsigned
long
result
;
//flush pages count
unsigned
long
t_usec
;
//time-taken in usec
long
id_usr
;
//thread-id currently working
wrk_status_t
wi_status
;
//flag
struct
wrk_itm
*
next
;
}
wrk_t
;
typedef
struct
thread_sync
{
int
wthread_id
;
os_thread_t
wthread
;
ib_wqueue_t
*
wq
;
// work Queue
ib_wqueue_t
*
wr_cq
;
// Write Completion Queue
ib_wqueue_t
*
rd_cq
;
// Read Completion Queue
wthr_status_t
wt_status
;
// Worker Thread status
unsigned
long
stat_universal_num_processed
;
unsigned
long
stat_cycle_num_processed
;
}
thread_sync_t
;
/* Global XXX:DD needs to be cleaned */
ib_wqueue_t
*
wq
=
NULL
,
*
wr_cq
=
NULL
,
*
rd_cq
=
NULL
;
mem_heap_t
*
heap_allocated
=
NULL
;
thread_sync_t
pc_sync
[
MTFLUSH_MAX_WORKER
];
static
wrk_t
work_items
[
MTFLUSH_MAX_WORKER
];
static
int
pgcomp_wrk_initialized
=
-
1
;
ulint
srv_mtflush_threads
=
0
;
int
set_pgcomp_wrk_init_done
(
void
)
{
pgcomp_wrk_initialized
=
1
;
return
0
;
}
int
is_pgcomp_wrk_init_done
(
void
)
{
return
(
pgcomp_wrk_initialized
==
1
);
}
int
setup_wrk_itm
(
int
items
)
{
int
i
;
for
(
i
=
0
;
i
<
items
;
i
++
)
{
work_items
[
i
].
rd
.
page_pool
=
NULL
;
work_items
[
i
].
wr
.
buf_pool
=
NULL
;
work_items
[
i
].
t_usec
=
0
;
work_items
[
i
].
result
=
0
;
work_items
[
i
].
id_usr
=
-
1
;
work_items
[
i
].
wi_status
=
WRK_ITEM_STATUS_UNDEFINED
;
work_items
[
i
].
next
=
&
work_items
[(
i
+
1
)
%
items
];
}
/* last node should be the tail */
work_items
[
items
-
1
].
next
=
NULL
;
return
0
;
}
int
flush_pool_instance
(
wrk_t
*
wi
)
{
#ifdef UNIV_DEBUG
struct
timeval
p_start_time
,
p_end_time
,
d_time
;
#endif
if
(
!
wi
)
{
fprintf
(
stderr
,
"work item invalid wi:%p
\n
"
,
wi
);
return
-
1
;
}
if
(
!
wi
->
wr
.
buf_pool
)
{
fprintf
(
stderr
,
"work-item wi->buf_pool:%p [likely thread exit]
\n
"
,
wi
->
wr
.
buf_pool
);
return
-
1
;
}
wi
->
t_usec
=
0
;
if
(
!
buf_flush_start
(
wi
->
wr
.
buf_pool
,
wi
->
wr
.
flush_type
))
{
/* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer
pool means we cannot guarantee that all pages
up to lsn_limit has been flushed. We can
return right now with failure or we can try
to flush remaining buffer pools up to the
lsn_limit. We attempt to flush other buffer
pools based on the assumption that it will
help in the retry which will follow the
failure. */
fprintf
(
stderr
,
"flush_start Failed, flush_type:%d
\n
"
,
wi
->
wr
.
flush_type
);
return
-
1
;
}
#ifdef UNIV_DEBUG
/* Record time taken for the OP in usec */
gettimeofday
(
&
p_start_time
,
0x0
);
#endif
if
(
wi
->
wr
.
flush_type
==
BUF_FLUSH_LRU
)
{
/* srv_LRU_scan_depth can be arbitrarily large value.
* We cap it with current LRU size.
*/
buf_pool_mutex_enter
(
wi
->
wr
.
buf_pool
);
wi
->
wr
.
min
=
UT_LIST_GET_LEN
(
wi
->
wr
.
buf_pool
->
LRU
);
buf_pool_mutex_exit
(
wi
->
wr
.
buf_pool
);
wi
->
wr
.
min
=
ut_min
(
srv_LRU_scan_depth
,
wi
->
wr
.
min
);
}
wi
->
result
=
buf_flush_batch
(
wi
->
wr
.
buf_pool
,
wi
->
wr
.
flush_type
,
wi
->
wr
.
min
,
wi
->
wr
.
lsn_limit
);
buf_flush_end
(
wi
->
wr
.
buf_pool
,
wi
->
wr
.
flush_type
);
buf_flush_common
(
wi
->
wr
.
flush_type
,
wi
->
result
);
#ifdef UNIV_DEBUG
gettimeofday
(
&
p_end_time
,
0x0
);
timediff
(
&
p_end_time
,
&
p_start_time
,
&
d_time
);
wi
->
t_usec
=
(
unsigned
long
)(
d_time
.
tv_usec
+
(
d_time
.
tv_sec
*
1000000
));
#endif
return
0
;
}
int
service_page_comp_io
(
thread_sync_t
*
ppc
)
{
wrk_t
*
wi
=
NULL
;
int
ret
=
0
;
ppc
->
wt_status
=
WTHR_SIG_WAITING
;
wi
=
(
wrk_t
*
)
ib_wqueue_wait
(
ppc
->
wq
);
if
(
wi
)
{
ppc
->
wt_status
=
WTHR_RUNNING
;
}
else
{
fprintf
(
stderr
,
"%s:%d work-item is NULL
\n
"
,
__FILE__
,
__LINE__
);
ppc
->
wt_status
=
WTHR_NO_WORK
;
return
(
0
);
}
assert
(
wi
!=
NULL
);
wi
->
id_usr
=
ppc
->
wthread
;
switch
(
wi
->
tsk
)
{
case
MT_WRK_NONE
:
assert
(
wi
->
wi_status
==
WRK_ITEM_EXIT
);
wi
->
wi_status
=
WRK_ITEM_SUCCESS
;
ib_wqueue_add
(
ppc
->
wr_cq
,
wi
,
heap_allocated
);
break
;
case
MT_WRK_WRITE
:
wi
->
wi_status
=
WRK_ITEM_START
;
/* Process work item */
if
(
0
!=
(
ret
=
flush_pool_instance
(
wi
)))
{
fprintf
(
stderr
,
"FLUSH op failed ret:%d
\n
"
,
ret
);
wi
->
wi_status
=
WRK_ITEM_FAILED
;
}
wi
->
wi_status
=
WRK_ITEM_SUCCESS
;
ib_wqueue_add
(
ppc
->
wr_cq
,
wi
,
heap_allocated
);
break
;
case
MT_WRK_READ
:
/* Need to also handle the read case */
assert
(
0
);
/* completed task get added to rd_cq */
/* wi->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(ppc->rd_cq, wi, heap_allocated);*/
break
;
default:
/* None other than Write/Read handling planned */
assert
(
0
);
}
ppc
->
wt_status
=
WTHR_NO_WORK
;
return
(
0
);
}
void
page_comp_io_thread_exit
()
{
ulint
i
;
fprintf
(
stderr
,
"signal page_comp_io_threads to exit [%lu]
\n
"
,
srv_buf_pool_instances
);
for
(
i
=
0
;
i
<
srv_buf_pool_instances
;
i
++
)
{
work_items
[
i
].
wr
.
buf_pool
=
NULL
;
work_items
[
i
].
rd
.
page_pool
=
NULL
;
work_items
[
i
].
tsk
=
MT_WRK_NONE
;
work_items
[
i
].
wi_status
=
WRK_ITEM_EXIT
;
ib_wqueue_add
(
wq
,
(
void
*
)
&
work_items
[
i
],
heap_allocated
);
}
}
/******************************************************************//**
@return a dummy parameter*/
extern
"C"
UNIV_INTERN
os_thread_ret_t
DECLARE_THREAD
(
page_comp_io_thread
)(
/*================================*/
void
*
arg
)
{
thread_sync_t
*
ppc_io
=
((
thread_sync_t
*
)
arg
);
while
(
srv_shutdown_state
!=
SRV_SHUTDOWN_EXIT_THREADS
)
{
service_page_comp_io
(
ppc_io
);
ppc_io
->
stat_cycle_num_processed
=
0
;
}
os_thread_exit
(
NULL
);
OS_THREAD_DUMMY_RETURN
;
}
int
print_wrk_list
(
wrk_t
*
wi_list
)
{
wrk_t
*
wi
=
wi_list
;
int
i
=
0
;
if
(
!
wi_list
)
{
fprintf
(
stderr
,
"list NULL
\n
"
);
}
while
(
wi
)
{
fprintf
(
stderr
,
"-
\t
[%p]
\t
[%s]
\t
[%lu]
\t
[%luus] > %p
\n
"
,
wi
,
(
wi
->
id_usr
==
-
1
)
?
"free"
:
"Busy"
,
wi
->
result
,
wi
->
t_usec
,
wi
->
next
);
wi
=
wi
->
next
;
i
++
;
}
fprintf
(
stderr
,
"list len: %d
\n
"
,
i
);
return
0
;
}
/******************************************************************//**
@return a dummy parameter*/
int
pgcomp_handler_init
(
int
num_threads
,
int
wrk_cnt
,
ib_wqueue_t
*
wq
,
ib_wqueue_t
*
wr_cq
,
ib_wqueue_t
*
rd_cq
)
{
int
i
=
0
;
if
(
is_pgcomp_wrk_init_done
())
{
fprintf
(
stderr
,
"pgcomp_handler_init(): ERROR already initialized
\n
"
);
return
-
1
;
}
if
(
!
wq
||
!
wr_cq
||
!
rd_cq
)
{
fprintf
(
stderr
,
"%s() FAILED wq:%p write-cq:%p read-cq:%p
\n
"
,
__FUNCTION__
,
wq
,
wr_cq
,
rd_cq
);
return
-
1
;
}
/* work-item setup */
setup_wrk_itm
(
wrk_cnt
);
/* Mark each of the thread sync entires */
for
(
i
=
0
;
i
<
MTFLUSH_MAX_WORKER
;
i
++
)
{
pc_sync
[
i
].
wthread_id
=
i
;
}
/* Create threads for page-compression-flush */
for
(
i
=
0
;
i
<
num_threads
;
i
++
)
{
pc_sync
[
i
].
wthread_id
=
i
;
pc_sync
[
i
].
wq
=
wq
;
pc_sync
[
i
].
wr_cq
=
wr_cq
;
pc_sync
[
i
].
rd_cq
=
rd_cq
;
os_thread_create
(
page_comp_io_thread
,
((
void
*
)(
pc_sync
+
i
)),
thread_ids
+
START_OLD_THREAD_CNT
+
i
);
pc_sync
[
i
].
wthread
=
(
START_OLD_THREAD_CNT
+
i
);
pc_sync
[
i
].
wt_status
=
WTHR_INITIALIZED
;
}
set_pgcomp_wrk_init_done
();
fprintf
(
stderr
,
"%s() Worker-Threads created..
\n
"
,
__FUNCTION__
);
return
0
;
}
int
wrk_thread_stat
(
thread_sync_t
*
wthr
,
unsigned
int
num_threads
)
{
ulong
stat_tot
=
0
;
ulint
i
=
0
;
for
(
i
=
0
;
i
<
num_threads
;
i
++
)
{
stat_tot
+=
wthr
[
i
].
stat_universal_num_processed
;
fprintf
(
stderr
,
"[%d] stat [%lu]
\n
"
,
wthr
[
i
].
wthread_id
,
wthr
[
i
].
stat_universal_num_processed
);
}
fprintf
(
stderr
,
"Stat-Total:%lu
\n
"
,
stat_tot
);
}
int
reset_wrk_itm
(
int
items
)
{
int
i
;
for
(
i
=
0
;
i
<
items
;
i
++
)
{
work_items
[
i
].
id_usr
=
-
1
;
}
return
0
;
}
int
pgcomp_flush_work_items
(
int
buf_pool_inst
,
int
*
per_pool_pages_flushed
,
enum
buf_flush
flush_type
,
int
min_n
,
lsn_t
lsn_limit
)
{
int
ret
=
0
,
i
=
0
;
wrk_t
*
done_wi
;
for
(
i
=
0
;
i
<
buf_pool_inst
;
i
++
)
{
work_items
[
i
].
tsk
=
MT_WRK_WRITE
;
work_items
[
i
].
rd
.
page_pool
=
NULL
;
work_items
[
i
].
wr
.
buf_pool
=
buf_pool_from_array
(
i
);
work_items
[
i
].
wr
.
flush_type
=
(
enum
buf_flush
)
flush_type
;
work_items
[
i
].
wr
.
min
=
min_n
;
work_items
[
i
].
wr
.
lsn_limit
=
lsn_limit
;
work_items
[
i
].
id_usr
=
-
1
;
work_items
[
i
].
next
=
&
work_items
[(
i
+
1
)
%
buf_pool_inst
];
work_items
[
i
].
wi_status
=
WRK_ITEM_SET
;
}
work_items
[
i
-
1
].
next
=
NULL
;
for
(
i
=
0
;
i
<
buf_pool_inst
;
i
++
)
{
ib_wqueue_add
(
wq
,
(
void
*
)(
&
work_items
[
i
]),
heap_allocated
);
}
/* wait on the completion to arrive */
for
(
i
=
0
;
i
<
buf_pool_inst
;
i
++
)
{
done_wi
=
(
wrk_t
*
)
ib_wqueue_wait
(
wr_cq
);
//fprintf(stderr, "%s: queue-wait DONE\n", __FUNCTION__);
ut_ad
(
done_wi
!=
NULL
);
}
/* collect data/results total pages flushed */
for
(
i
=
0
;
i
<
buf_pool_inst
;
i
++
)
{
if
(
work_items
[
i
].
result
==
-
1
)
{
ret
=
-
1
;
per_pool_pages_flushed
[
i
]
=
0
;
}
else
{
per_pool_pages_flushed
[
i
]
=
work_items
[
i
].
result
;
}
if
((
work_items
[
i
].
id_usr
==
-
1
)
&&
(
work_items
[
i
].
wi_status
==
WRK_ITEM_SET
))
{
fprintf
(
stderr
,
"**Set/Unused work_item[%d] flush_type=%d
\n
"
,
i
,
work_items
[
i
].
wr
.
flush_type
);
//assert(0);
}
}
//wrk_thread_stat(pc_sync, pgc_n_threads);
/* clear up work-queue for next flush */
reset_wrk_itm
(
buf_pool_inst
);
return
(
ret
);
}
/* JAN: TODO: END: */
/********************************************************************
Starts InnoDB and creates a new database if database files
are not found and the user wants.
...
...
@@ -2986,25 +2592,23 @@ innobase_start_or_create_for_mysql(void)
}
if
(
!
srv_read_only_mode
)
{
/* JAN: TODO: */
if
(
srv_buf_pool_instances
<=
MTFLUSH_MAX_WORKER
)
{
srv_mtflush_threads
=
srv_buf_pool_instances
;
}
/* else we default to 8 worker-threads */
heap_allocated
=
mem_heap_create
(
0
);
ut_a
(
heap_allocated
!=
NULL
);
wq
=
ib_wqueue_create
();
wr_cq
=
ib_wqueue_create
();
rd_cq
=
ib_wqueue_create
();
pgcomp_init
();
pgcomp_handler_init
(
srv_mtflush_threads
,
srv_buf_pool_instances
,
wq
,
wr_cq
,
rd_cq
);
mtflush_ctx
=
buf_mtflu_handler_init
(
srv_mtflush_threads
,
srv_buf_pool_instances
);
/* Set up the thread ids */
buf_mtflu_set_thread_ids
(
srv_mtflush_threads
,
mtflush_ctx
,
(
thread_ids
+
6
+
32
));
#if UNIV_DEBUG
fprintf
(
stderr
,
"%s:%d buf-pool-instances:%lu
\n
"
,
__FILE__
,
__LINE__
,
srv_buf_pool_instances
);
#endif
/* JAN: TODO: END */
os_thread_create
(
buf_flush_page_cleaner_thread
,
NULL
,
NULL
);
}
...
...
@@ -3272,15 +2876,12 @@ innobase_shutdown_for_mysql(void)
/* g. Exit the multi threaded flush threads */
page_comp
_io_thread_exit
();
buf_mtflu
_io_thread_exit
();
#ifdef UNIV_DEBUG
fprintf
(
stderr
,
"%s:%d os_thread_count:%lu
\n
"
,
__FUNCTION__
,
__LINE__
,
os_thread_count
);
#endif
/* h. Remove the mutex */
pgcomp_deinit
();
os_mutex_enter
(
os_sync_mutex
);
if
(
os_thread_count
==
0
)
{
...
...
storage/xtradb/buf/buf0flu.cc
View file @
7f3950a2
...
...
@@ -1862,6 +1862,9 @@ buf_flush_start(
/* There is already a flush batch of the same type running */
fprintf
(
stderr
,
"Error: flush_type %d n_flush %lu init_flush
\n
"
,
flush_type
,
buf_pool
->
n_flush
[
flush_type
],
buf_pool
->
init_flush
[
flush_type
]);
mutex_exit
(
&
buf_pool
->
flush_state_mutex
);
return
(
FALSE
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment