Commit e39826c6 authored by sbains's avatar sbains

branches/innodb+: Create a separate purge thread to do the purge. Introduce

two new configuration parameters.

  1. innodb-purge-threads := [01] -- default is 0
  2. innodb-purge-batch-size := 20 ... 5000 -- default is 20

rb://271
parent b8e393db
...@@ -10669,6 +10669,23 @@ static MYSQL_SYSVAR_ULONG(io_capacity, srv_io_capacity, ...@@ -10669,6 +10669,23 @@ static MYSQL_SYSVAR_ULONG(io_capacity, srv_io_capacity,
"Number of IOPs the server can do. Tunes the background IO rate", "Number of IOPs the server can do. Tunes the background IO rate",
NULL, NULL, 200, 100, ~0L, 0); NULL, NULL, 200, 100, ~0L, 0);
static MYSQL_SYSVAR_ULONG(purge_batch_size, srv_purge_batch_size,
PLUGIN_VAR_OPCMDARG,
"Number of UNDO logs to purge in one batch from the history list. "
"Default is 20",
NULL, NULL,
20, /* Default setting */
1, /* Minimum value */
5000, 0); /* Maximum value */
static MYSQL_SYSVAR_ULONG(purge_threads, srv_n_purge_threads,
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
"Purge threads can be either 0 or 1. Default is 0.",
NULL, NULL,
0, /* Default setting */
0, /* Minimum value */
1, 0); /* Maximum value */
static MYSQL_SYSVAR_ULONG(fast_shutdown, innobase_fast_shutdown, static MYSQL_SYSVAR_ULONG(fast_shutdown, innobase_fast_shutdown,
PLUGIN_VAR_OPCMDARG, PLUGIN_VAR_OPCMDARG,
"Speeds up the shutdown process of the InnoDB storage engine. Possible " "Speeds up the shutdown process of the InnoDB storage engine. Possible "
...@@ -10982,6 +10999,8 @@ static struct st_mysql_sys_var* innobase_system_variables[]= { ...@@ -10982,6 +10999,8 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(change_buffering), MYSQL_SYSVAR(change_buffering),
MYSQL_SYSVAR(read_ahead_threshold), MYSQL_SYSVAR(read_ahead_threshold),
MYSQL_SYSVAR(io_capacity), MYSQL_SYSVAR(io_capacity),
MYSQL_SYSVAR(purge_threads),
MYSQL_SYSVAR(purge_batch_size),
NULL NULL
}; };
......
...@@ -271,6 +271,12 @@ extern ulint srv_os_log_pending_writes; ...@@ -271,6 +271,12 @@ extern ulint srv_os_log_pending_writes;
log buffer and have to flush it */ log buffer and have to flush it */
extern ulint srv_log_waits; extern ulint srv_log_waits;
/* the number of purge threads to use from the worker pool (currently 0 or 1) */
extern ulint srv_n_purge_threads;
/* the number of records to purge in one batch */
extern ulint srv_purge_batch_size;
/* variable that counts amount of data read in total (in bytes) */ /* variable that counts amount of data read in total (in bytes) */
extern ulint srv_data_read; extern ulint srv_data_read;
...@@ -483,6 +489,12 @@ srv_master_thread( ...@@ -483,6 +489,12 @@ srv_master_thread(
void* arg); /*!< in: a dummy parameter required by void* arg); /*!< in: a dummy parameter required by
os_thread_create */ os_thread_create */
/*******************************************************************//** /*******************************************************************//**
Wakes up the purge thread if it's not already awake. */
UNIV_INTERN
void
srv_wake_purge_thread(void);
/*=======================*/
/*******************************************************************//**
Tells the Innobase server that there has been activity in the database Tells the Innobase server that there has been activity in the database
and wakes up the master thread if it is suspended (not sleeping). Used and wakes up the master thread if it is suspended (not sleeping). Used
in the MySQL interface. Note that there is a small chance that the master in the MySQL interface. Note that there is a small chance that the master
...@@ -498,6 +510,16 @@ UNIV_INTERN ...@@ -498,6 +510,16 @@ UNIV_INTERN
void void
srv_wake_master_thread(void); srv_wake_master_thread(void);
/*========================*/ /*========================*/
/*******************************************************************//**
Tells the purge thread that there has been activity in the database
and wakes up the purge thread if it is suspended (not sleeping). Note
that there is a small chance that the purge thread stays suspended
(we do not protect our operation with the kernel mutex, for
performace reasons). */
UNIV_INTERN
void
srv_wake_purge_thread_if_not_active(void);
/*=====================================*/
/*********************************************************************//** /*********************************************************************//**
Puts an OS thread to wait if there are too many concurrent threads Puts an OS thread to wait if there are too many concurrent threads
(>= srv_thread_concurrency) inside InnoDB. The threads wait in a FIFO queue. */ (>= srv_thread_concurrency) inside InnoDB. The threads wait in a FIFO queue. */
...@@ -604,6 +626,15 @@ void ...@@ -604,6 +626,15 @@ void
srv_export_innodb_status(void); srv_export_innodb_status(void);
/*==========================*/ /*==========================*/
/*********************************************************************//**
Asynchronous purge thread.
@return a dummy parameter */
UNIV_INTERN
os_thread_ret_t
srv_purge_thread(
/*=============*/
void* arg __attribute__((unused))); /*!< in: a dummy parameter
required by os_thread_create */
/** Thread slot in the thread table */ /** Thread slot in the thread table */
typedef struct srv_slot_struct srv_slot_t; typedef struct srv_slot_struct srv_slot_t;
......
...@@ -112,8 +112,10 @@ This function runs a purge batch. ...@@ -112,8 +112,10 @@ This function runs a purge batch.
@return number of undo log pages handled in the batch */ @return number of undo log pages handled in the batch */
UNIV_INTERN UNIV_INTERN
ulint ulint
trx_purge(void); trx_purge(
/*===========*/ /*======*/
ulint limit); /*!< in: the maximum number of records to
purge in one batch */
/******************************************************************//** /******************************************************************//**
Prints information of the purge system to stderr. */ Prints information of the purge system to stderr. */
UNIV_INTERN UNIV_INTERN
......
...@@ -246,6 +246,12 @@ that during a time of heavy update/insert activity. */ ...@@ -246,6 +246,12 @@ that during a time of heavy update/insert activity. */
UNIV_INTERN ulong srv_max_buf_pool_modified_pct = 75; UNIV_INTERN ulong srv_max_buf_pool_modified_pct = 75;
/* the number of purge threads to use from the worker pool (currently 0 or 1).*/
UNIV_INTERN ulint srv_n_purge_threads = 0;
/* the number of records to purge in one batch */
UNIV_INTERN ulint srv_purge_batch_size = 20;
/* variable counts amount of data read in total (in bytes) */ /* variable counts amount of data read in total (in bytes) */
UNIV_INTERN ulint srv_data_read = 0; UNIV_INTERN ulint srv_data_read = 0;
...@@ -704,6 +710,16 @@ are indexed by the type of the thread. */ ...@@ -704,6 +710,16 @@ are indexed by the type of the thread. */
UNIV_INTERN ulint srv_n_threads_active[SRV_MASTER + 1]; UNIV_INTERN ulint srv_n_threads_active[SRV_MASTER + 1];
UNIV_INTERN ulint srv_n_threads[SRV_MASTER + 1]; UNIV_INTERN ulint srv_n_threads[SRV_MASTER + 1];
/*********************************************************************//**
Asynchronous purge thread.
@return a dummy parameter */
UNIV_INTERN
os_thread_ret_t
srv_purge_thread(
/*=============*/
void* arg __attribute__((unused))); /*!< in: a dummy parameter
required by os_thread_create */
/*********************************************************************** /***********************************************************************
Prints counters for work done by srv_master_thread. */ Prints counters for work done by srv_master_thread. */
static static
...@@ -2369,6 +2385,30 @@ srv_active_wake_master_thread(void) ...@@ -2369,6 +2385,30 @@ srv_active_wake_master_thread(void)
} }
} }
/*******************************************************************//**
Tells the purge thread that there has been activity in the database
and wakes up the purge thread if it is suspended (not sleeping). Note
that there is a small chance that the purge thread stays suspended
(we do not protect our operation with the kernel mutex, for
performace reasons). */
UNIV_INTERN
void
srv_wake_purge_thread_if_not_active(void)
/*=====================================*/
{
ut_ad(!mutex_own(&kernel_mutex));
if (srv_n_purge_threads > 0
&& srv_n_threads_active[SRV_WORKER] == 0) {
mutex_enter(&kernel_mutex);
srv_release_threads(SRV_WORKER, 1);
mutex_exit(&kernel_mutex);
}
}
/*******************************************************************//** /*******************************************************************//**
Wakes up the master thread if it is suspended or being suspended. */ Wakes up the master thread if it is suspended or being suspended. */
UNIV_INTERN UNIV_INTERN
...@@ -2385,6 +2425,25 @@ srv_wake_master_thread(void) ...@@ -2385,6 +2425,25 @@ srv_wake_master_thread(void)
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
} }
/*******************************************************************//**
Wakes up the purge thread if it's not already awake. */
UNIV_INTERN
void
srv_wake_purge_thread(void)
/*=======================*/
{
ut_ad(!mutex_own(&kernel_mutex));
if (srv_n_purge_threads > 0) {
mutex_enter(&kernel_mutex);
srv_release_threads(SRV_WORKER, 1);
mutex_exit(&kernel_mutex);
}
}
/********************************************************************** /**********************************************************************
The master thread is tasked to ensure that flush of log file happens The master thread is tasked to ensure that flush of log file happens
once every second in the background. This is to ensure that not more once every second in the background. This is to ensure that not more
...@@ -2405,6 +2464,34 @@ srv_sync_log_buffer_in_background(void) ...@@ -2405,6 +2464,34 @@ srv_sync_log_buffer_in_background(void)
} }
} }
/********************************************************************//**
Do a full purge, reconfigure the purge sub-system if a dynamic
change is detected. */
static
void
srv_master_do_purge(void)
/*=====================*/
{
ulint n_pages_purged;
ut_ad(!mutex_own(&kernel_mutex));
ut_a(srv_n_purge_threads == 0);
do {
/* Check for shutdown and change in purge config. */
if (srv_fast_shutdown && srv_shutdown_state > 0) {
/* Nothing to purge. */
n_pages_purged = 0;
} else {
n_pages_purged = trx_purge(srv_purge_batch_size);
}
srv_sync_log_buffer_in_background();
} while (n_pages_purged > 0);
}
/*********************************************************************//** /*********************************************************************//**
The master thread controlling the server. The master thread controlling the server.
@return a dummy parameter */ @return a dummy parameter */
...@@ -2620,20 +2707,16 @@ loop: ...@@ -2620,20 +2707,16 @@ loop:
/* We run a full purge every 10 seconds, even if the server /* We run a full purge every 10 seconds, even if the server
were active */ were active */
do { if (srv_n_purge_threads == 0) {
srv_main_thread_op_info = "master purging";
srv_master_do_purge();
if (srv_fast_shutdown && srv_shutdown_state > 0) { if (srv_fast_shutdown && srv_shutdown_state > 0) {
goto background_loop; goto background_loop;
} }
}
srv_main_thread_op_info = "purging";
n_pages_purged = trx_purge();
/* Flush logs if needed */
srv_sync_log_buffer_in_background();
} while (n_pages_purged);
srv_main_thread_op_info = "flushing buffer pool pages"; srv_main_thread_op_info = "flushing buffer pool pages";
...@@ -2702,22 +2785,11 @@ background_loop: ...@@ -2702,22 +2785,11 @@ background_loop:
os_thread_sleep(100000); os_thread_sleep(100000);
} }
srv_main_thread_op_info = "purging"; if (srv_n_purge_threads == 0) {
srv_main_thread_op_info = "master purging";
/* Run a full purge */
do {
if (srv_fast_shutdown && srv_shutdown_state > 0) {
break;
}
srv_main_thread_op_info = "purging";
n_pages_purged = trx_purge();
/* Flush logs if needed */
srv_sync_log_buffer_in_background();
} while (n_pages_purged); srv_master_do_purge();
}
srv_main_thread_op_info = "reserving kernel mutex"; srv_main_thread_op_info = "reserving kernel mutex";
...@@ -2871,3 +2943,100 @@ suspend_thread: ...@@ -2871,3 +2943,100 @@ suspend_thread:
OS_THREAD_DUMMY_RETURN; /* Not reached, avoid compiler warning */ OS_THREAD_DUMMY_RETURN; /* Not reached, avoid compiler warning */
} }
/*********************************************************************//**
Asynchronous purge thread.
@return a dummy parameter */
UNIV_INTERN
os_thread_ret_t
srv_purge_thread(
/*=============*/
void* arg __attribute__((unused))) /*!< in: a dummy parameter
required by os_thread_create */
{
srv_slot_t* slot;
ulint slot_no = ULINT_UNDEFINED;
ulint n_total_purged = ULINT_UNDEFINED;
ut_a(srv_n_purge_threads == 1);
#ifdef UNIV_DEBUG_THREAD_CREATION
fprintf(stderr, "InnoDB: Purge thread running, id %lu\n",
os_thread_pf(os_thread_get_curr_id()));
#endif /* UNIV_DEBUG_THREAD_CREATION */
mutex_enter(&kernel_mutex);
slot_no = srv_table_reserve_slot(SRV_WORKER);
++srv_n_threads_active[SRV_WORKER];
mutex_exit(&kernel_mutex);
while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
ulint n_pages_purged;
/* If there are very few records to purge or the last
purge didn't purge any records then wait for activity.
We peek at the history len without holding any mutex
because in the worst case we will end up waiting for
the next purge event. */
if (trx_sys->rseg_history_len < srv_purge_batch_size
|| n_total_purged == 0) {
os_event_t event;
mutex_enter(&kernel_mutex);
event = srv_suspend_thread();
mutex_exit(&kernel_mutex);
os_event_wait(event);
}
/* Check for shutdown and whether we should do purge at all. */
if (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND
|| srv_shutdown_state != 0
|| srv_fast_shutdown) {
break;
}
n_total_purged = 0;
/* Purge until there are no more records to purge and there is
no change in configuration or server state. */
do {
n_pages_purged = trx_purge(srv_purge_batch_size);
n_total_purged += n_pages_purged;
} while (n_pages_purged > 0 && !srv_fast_shutdown);
srv_sync_log_buffer_in_background();
}
/* Free the thread local memory. */
thr_local_free(os_thread_get_curr_id());
mutex_enter(&kernel_mutex);
/* Free the slot for reuse. */
slot = srv_table_get_nth_slot(slot_no);
slot->in_use = FALSE;
mutex_exit(&kernel_mutex);
#ifdef UNIV_DEBUG_THREAD_CREATION
fprintf(stderr, "InnoDB: Purge thread exiting, id %lu\n",
os_thread_pf(os_thread_get_curr_id()));
#endif /* UNIV_DEBUG_THREAD_CREATION */
/* We count the number of threads in os_thread_exit(). A created
thread should always use that to exit and not use return() to exit. */
os_thread_exit(NULL);
OS_THREAD_DUMMY_RETURN; /* Not reached, avoid compiler warning */
}
...@@ -1742,6 +1742,16 @@ innobase_start_or_create_for_mysql(void) ...@@ -1742,6 +1742,16 @@ innobase_start_or_create_for_mysql(void)
os_thread_create(&srv_master_thread, NULL, thread_ids os_thread_create(&srv_master_thread, NULL, thread_ids
+ (1 + SRV_MAX_N_IO_THREADS)); + (1 + SRV_MAX_N_IO_THREADS));
/* Currently we allow only a single purge thread. */
ut_a(srv_n_purge_threads == 0 || srv_n_purge_threads == 1);
/* If the user has requested a separate purge thread then
start the purge thread. */
if (srv_n_purge_threads == 1) {
os_thread_create(&srv_purge_thread, NULL, NULL);
}
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/* buf_debug_prints = TRUE; */ /* buf_debug_prints = TRUE; */
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
...@@ -1995,7 +2005,10 @@ innobase_shutdown_for_mysql(void) ...@@ -1995,7 +2005,10 @@ innobase_shutdown_for_mysql(void)
/* c. We wake the master thread so that it exits */ /* c. We wake the master thread so that it exits */
srv_wake_master_thread(); srv_wake_master_thread();
/* d. Exit the i/o threads */ /* d. We wake the purge thread so that it exits */
srv_wake_purge_thread();
/* e. Exit the i/o threads */
os_aio_wake_all_threads_at_shutdown(); os_aio_wake_all_threads_at_shutdown();
......
...@@ -41,7 +41,7 @@ Created 3/26/1996 Heikki Tuuri ...@@ -41,7 +41,7 @@ Created 3/26/1996 Heikki Tuuri
#include "row0purge.h" #include "row0purge.h"
#include "row0upd.h" #include "row0upd.h"
#include "trx0rec.h" #include "trx0rec.h"
#include "srv0que.h" #include "srv0srv.h"
#include "os0thread.h" #include "os0thread.h"
/** The global data structure coordinating a purge */ /** The global data structure coordinating a purge */
...@@ -364,6 +364,11 @@ trx_purge_add_update_undo_to_history( ...@@ -364,6 +364,11 @@ trx_purge_add_update_undo_to_history(
trx_sys->rseg_history_len++; trx_sys->rseg_history_len++;
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
if (!(trx_sys->rseg_history_len % srv_purge_batch_size)) {
/* Inform the purge thread that there is work to do. */
srv_wake_purge_thread_if_not_active();
}
/* Write the trx number to the undo log header */ /* Write the trx number to the undo log header */
mlog_write_dulint(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr); mlog_write_dulint(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
/* Write information about delete markings to the undo log header */ /* Write information about delete markings to the undo log header */
...@@ -1096,8 +1101,10 @@ This function runs a purge batch. ...@@ -1096,8 +1101,10 @@ This function runs a purge batch.
@return number of undo log pages handled in the batch */ @return number of undo log pages handled in the batch */
UNIV_INTERN UNIV_INTERN
ulint ulint
trx_purge(void) trx_purge(
/*===========*/ /*======*/
ulint limit) /*!< in: the maximum number of records to
purge in one batch */
{ {
que_thr_t* thr; que_thr_t* thr;
/* que_thr_t* thr2; */ /* que_thr_t* thr2; */
...@@ -1158,9 +1165,7 @@ trx_purge(void) ...@@ -1158,9 +1165,7 @@ trx_purge(void)
purge_sys->state = TRX_PURGE_ON; purge_sys->state = TRX_PURGE_ON;
/* Handle at most 20 undo log pages in one purge batch */ purge_sys->handle_limit = purge_sys->n_pages_handled + limit;
purge_sys->handle_limit = purge_sys->n_pages_handled + 20;
old_pages_handled = purge_sys->n_pages_handled; old_pages_handled = purge_sys->n_pages_handled;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment