Commit 77017191 authored by unknown's avatar unknown

WL#3071 - Maria checkpoint

- serializing calls to flush_pagecache_blocks_int() on the same file
to avoid known concurrency bugs
- having that, we can now enable the background thread, as the
flushes it does are now supposedly safe in concurrent situations.
- new type of flush FLUSH_KEEP_LAZY: when the background checkpoint
thread is flushing a packet of dirty pages between two checkpoints,
it uses this flush type, indeed if a file is already being flushed
by another thread it's smarter to move on to the next file than wait.
- maria_checkpoint_frequency renamed to maria_checkpoint_interval.


include/my_sys.h:
  new type of flushing for the page cache: FLUSH_KEEP_LAZY
mysql-test/r/maria.result:
  result update
mysys/mf_keycache.c:
  indentation. No FLUSH_KEEP_LAZY support in key cache.
storage/maria/ha_maria.cc:
  maria_checkpoint_frequency was somehow a hidden part of the
  Checkpoint API and that was not good. Now we have checkpoint_interval,
  local to ha_maria.cc, which serves as container for the user-visible
  maria_checkpoint_interval global variable; setting it calls
  update_checkpoint_interval which passes the new value to
  ma_checkpoint_init(). There is no hiding anymore.
  By default, enable background thread which does checkpoints
  every 30 seconds, and dirty page flush in between. That thread takes
  a checkpoint when it ends, so no need for maria_hton_panic to take one.
  The | is | and not ||, because maria_panic() must always be called.
  frequency->interval.
storage/maria/ma_checkpoint.c:
  Use FLUSH_KEEP_LAZY for background thread when it flushes packets of
  dirty pages between two checkpoints: it is smarter to move on to
  the next file than wait for it to have been completely flushed, which
  may take long.
  Comments about flush concurrency bugs moved from ma_pagecache.c.
  Removing out-of-date comment.
  frequency->interval.
  create_background_thread -> (interval>0).
  In ma_checkpoint_background(), some variables need to be preserved
  between iterations.
storage/maria/ma_checkpoint.h:
  new prototype
storage/maria/ma_pagecache.c:
  - concurrent calls of flush_pagecache_blocks_int() on the same file
  cause bugs (see @note in that function); we fix them by serializing
  in this situation. For that we use a global hash of (file, wqueue).
  When flush_pagecache_blocks_int() starts it looks into the hash,
  using the file as key. If not found, it inserts (file,wqueue) into the
  hash, flushes the file, and finally removes itself from the hash and
  wakes up any waiter in the queue. If found, it adds itself to the
  wqueue and waits.
  - As a by-product, we can remove changed_blocks_is_incomplete
  and replace it by scanning the hash, replace the sleep() by a queue wait.
  - new type of flush FLUSH_KEEP_LAZY: when flushing a file, if it's
  already being flushed by another thread (even partially), return
  immediately.
storage/maria/ma_pagecache.h:
  In pagecache, a hash of files currently being flushed (i.e. there
  is a call to flush_pagecache_blocks_int() for them).
storage/maria/ma_recovery.c:
  new prototype
storage/maria/ma_test1.c:
  new prototype
storage/maria/ma_test2.c:
  new prototype
parent 9bcbf851
......@@ -283,7 +283,13 @@ enum flush_type
FLUSH_IGNORE_CHANGED, /* remove block from the cache */
/* as my_disable_flush_pagecache_blocks is always 0, it is
strictly equivalent to FLUSH_KEEP */
FLUSH_FORCE_WRITE
FLUSH_FORCE_WRITE,
/**
@brief like FLUSH_KEEP but return immediately if file is already being
flushed (even partially) by another thread; only for page cache,
forbidden for key cache.
*/
FLUSH_KEEP_LAZY
};
typedef struct st_record_cache /* Used when cacheing records */
......
......@@ -1976,7 +1976,7 @@ drop table t1;
show variables like 'maria%';
Variable_name Value
maria_block_size 8192
maria_checkpoint_frequency 0
maria_checkpoint_interval 30
maria_max_sort_file_size 9223372036853727232
maria_pagecache_age_threshold 300
maria_pagecache_buffer_size 8384512
......
......@@ -3557,10 +3557,11 @@ static int flush_key_blocks_int(KEY_CACHE *keycache,
file, keycache->blocks_used, keycache->blocks_changed));
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
DBUG_EXECUTE("check_keycache",
test_key_cache(keycache, "start of flush_key_blocks", 0););
DBUG_EXECUTE("check_keycache",
test_key_cache(keycache, "start of flush_key_blocks", 0););
#endif
DBUG_ASSERT(type != FLUSH_KEEP_LAZY);
cache= cache_buff;
if (keycache->disk_blocks > 0 &&
(!my_disable_flush_key_blocks || type != FLUSH_KEEP))
......
......@@ -81,9 +81,11 @@ TYPELIB maria_stats_method_typelib=
maria_stats_method_names, NULL
};
static void update_checkpoint_frequency(MYSQL_THD thd,
struct st_mysql_sys_var *var,
void *var_ptr, void *save);
/** @brief Interval between background checkpoints in seconds */
static ulong checkpoint_interval;
static void update_checkpoint_interval(MYSQL_THD thd,
struct st_mysql_sys_var *var,
void *var_ptr, void *save);
static MYSQL_SYSVAR_ULONG(block_size, maria_block_size,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
......@@ -91,12 +93,11 @@ static MYSQL_SYSVAR_ULONG(block_size, maria_block_size,
MARIA_KEY_BLOCK_LENGTH, MARIA_MIN_KEY_BLOCK_LENGTH,
MARIA_MAX_KEY_BLOCK_LENGTH, MARIA_MIN_KEY_BLOCK_LENGTH);
static MYSQL_SYSVAR_ULONG(checkpoint_frequency, maria_checkpoint_frequency,
static MYSQL_SYSVAR_ULONG(checkpoint_interval, checkpoint_interval,
PLUGIN_VAR_RQCMDARG,
"Frequency of automatic checkpoints, in seconds;"
" 0 means 'no checkpoints'.",
/* disabled for now */
NULL, update_checkpoint_frequency, 0, 0, UINT_MAX, 1);
"Interval between automatic checkpoints, in seconds;"
" 0 means 'no automatic checkpoints'.",
NULL, update_checkpoint_interval, 30, 0, UINT_MAX, 1);
static MYSQL_SYSVAR_ULONGLONG(max_sort_file_size,
maria_max_temp_length, PLUGIN_VAR_RQCMDARG,
......@@ -2376,8 +2377,9 @@ bool ha_maria::check_if_incompatible_data(HA_CREATE_INFO *info,
static int maria_hton_panic(handlerton *hton, ha_panic_function flag)
{
ma_checkpoint_execute(CHECKPOINT_FULL, FALSE); /* can't catch error */
return maria_panic(flag);
/* If no background checkpoints, we need to do one now */
return ((checkpoint_interval == 0) ?
ma_checkpoint_execute(CHECKPOINT_FULL, FALSE) : 0) | maria_panic(flag);
}
......@@ -2440,7 +2442,7 @@ static int ha_maria_init(void *p)
MYSQL_VERSION_ID, server_id, maria_log_pagecache,
TRANSLOG_DEFAULT_FLAGS) ||
maria_recover() ||
ma_checkpoint_init(TRUE);
ma_checkpoint_init(checkpoint_interval);
maria_multi_threaded= TRUE;
return res;
}
......@@ -2523,7 +2525,7 @@ my_bool ha_maria::register_query_cache_table(THD *thd, char *table_name,
static struct st_mysql_sys_var* system_variables[]= {
MYSQL_SYSVAR(block_size),
MYSQL_SYSVAR(checkpoint_frequency),
MYSQL_SYSVAR(checkpoint_interval),
MYSQL_SYSVAR(max_sort_file_size),
MYSQL_SYSVAR(pagecache_age_threshold),
MYSQL_SYSVAR(pagecache_buffer_size),
......@@ -2536,24 +2538,15 @@ static struct st_mysql_sys_var* system_variables[]= {
/**
@brief Updates the checkpoint frequency and restarts the background thread.
Background thread has a loop which correctness depends on a constant
checkpoint frequency. So when the user wants to modify it, we stop and
restart the thread.
@brief Updates the checkpoint interval and restarts the background thread.
*/
static void update_checkpoint_frequency(MYSQL_THD thd,
static void update_checkpoint_interval(MYSQL_THD thd,
struct st_mysql_sys_var *var,
void *var_ptr, void *save)
{
ulong new_value= (ulong)(*(long *)save), *dest= (ulong *)var_ptr;
if (new_value != *dest) /* it's actually a change */
{
ma_checkpoint_end();
*dest= new_value;
ma_checkpoint_init(TRUE);
}
ma_checkpoint_end();
ma_checkpoint_init(*(ulong *)var_ptr= (ulong)(*(long *)save));
}
static SHOW_VAR status_variables[]= {
......
......@@ -38,8 +38,6 @@
#include "ma_loghandler_lsn.h"
/** @brief Frequency of background checkpoints, in seconds */
ulong maria_checkpoint_frequency;
/** @brief type of checkpoint currently running */
static CHECKPOINT_LEVEL checkpoint_in_progress= CHECKPOINT_NONE;
/** @brief protects checkpoint_in_progress */
......@@ -310,31 +308,39 @@ end:
/**
@brief Initializes the checkpoint module
@param create_background_thread If one wants the module to now create a
thread which will periodically do
checkpoints, and flush dirty pages, in the
background.
@param interval If one wants the module to create a
thread which will periodically do
checkpoints, and flush dirty pages, in the
background, it should specify a non-zero
interval in seconds. The thread will then be
created and will take checkpoints separated by
approximately 'interval' second.
@note A checkpoint is taken only if there has been some significant
activity since the previous checkpoint. Between checkpoint N and N+1 the
thread flushes all dirty pages which were already dirty at the time of
checkpoint N.
@return Operation status
@retval 0 ok
@retval !=0 error
*/
int ma_checkpoint_init(my_bool create_background_thread)
int ma_checkpoint_init(ulong interval)
{
pthread_t th;
int res= 0;
DBUG_ENTER("ma_checkpoint_init");
checkpoint_inited= TRUE;
checkpoint_thread_die= 2; /* not yet born == dead */
if (maria_checkpoint_frequency == 0)
create_background_thread= FALSE;
if (pthread_mutex_init(&LOCK_checkpoint, MY_MUTEX_INIT_SLOW) ||
pthread_cond_init(&COND_checkpoint, 0))
res= 1;
else if (create_background_thread)
else if (interval > 0)
{
if (!(res= pthread_create(&th, NULL, ma_checkpoint_background, NULL)))
compile_time_assert(sizeof(void *) >= sizeof(ulong));
if (!(res= pthread_create(&th, NULL, ma_checkpoint_background,
(void *)interval)))
checkpoint_thread_die= 0; /* thread lives, will have to be killed */
}
DBUG_RETURN(res);
......@@ -489,12 +495,10 @@ filter_flush_data_file_evenly(enum pagecache_page_type type,
/**
@brief Background thread which does checkpoints and flushes periodically.
Takes a checkpoint every maria_checkpoint_frequency-th second. After taking
a checkpoint, all pages dirty at the time of that checkpoint are flushed
evenly until it is time to take another checkpoint
(maria_checkpoint_frequency seconds later). This ensures that the REDO
phase starts at earliest (in LSN time) at the next-to-last checkpoint
record ("two-checkpoint rule").
Takes a checkpoint. After this, all pages dirty at the time of that
checkpoint are flushed evenly until it is time to take another checkpoint.
This ensures that the REDO phase starts at earliest (in LSN time) at the
next-to-last checkpoint record ("two-checkpoint rule").
@note MikaelR questioned why the same thread does two different jobs, the
risk could be that while a checkpoint happens no LRD flushing happens.
......@@ -505,36 +509,46 @@ filter_flush_data_file_evenly(enum pagecache_page_type type,
writing 2 MB.
*/
pthread_handler_t ma_checkpoint_background(void *arg __attribute__((unused)))
pthread_handler_t ma_checkpoint_background(void *arg)
{
/** @brief At least this of log/page bytes written between checkpoints */
const uint checkpoint_min_activity= 2*1024*1024;
uint sleeps= 0;
/*
If the interval could be changed by the user while we are in this thread,
it could be annoying: for example it could cause "case 2" to be executed
right after "case 0", thus having 'dfile' unset. So the thread cares only
about the interval's value when it started.
*/
const ulong interval= (ulong)arg;
uint sleeps;
TRANSLOG_ADDRESS log_horizon_at_last_checkpoint= LSN_IMPOSSIBLE;
ulonglong pagecache_flushes_at_last_checkpoint= 0;
uint pages_bunch_size;
struct st_filter_param filter_param;
PAGECACHE_FILE *dfile; /**< data file currently being flushed */
PAGECACHE_FILE *kfile; /**< index file currently being flushed */
LINT_INIT(kfile);
LINT_INIT(dfile);
LINT_INIT(pages_bunch_size);
my_thread_init();
DBUG_PRINT("info",("Maria background checkpoint thread starts"));
for(;;)
DBUG_ASSERT(interval > 0);
/*
Recovery ended with all tables closed and a checkpoint: no need to take
one immediately.
*/
sleeps= 1;
pages_to_flush_before_next_checkpoint= 0;
for(;;) /* iterations of checkpoints and dirty page flushing */
{
#if 0 /* good for testing, to do a lot of checkpoints, finds a lot of bugs */
sleeps=0;
#endif
uint pages_bunch_size;
struct st_filter_param filter_param;
PAGECACHE_FILE *dfile; /**< data file currently being flushed */
PAGECACHE_FILE *kfile; /**< index file currently being flushed */
struct timespec abstime;
LINT_INIT(kfile);
LINT_INIT(dfile);
LINT_INIT(pages_bunch_size);
/*
If the frequency could be changed by the user while we are in this loop,
it could be annoying: for example it could cause "case 2" to be executed
right after "case 0", thus having 'dfile'
unset. update_checkpoint_frequency() takes care of stopping this thread.
*/
switch((sleeps++) % maria_checkpoint_frequency)
switch((sleeps++) % interval)
{
case 0:
/*
......@@ -578,22 +592,28 @@ pthread_handler_t ma_checkpoint_background(void *arg __attribute__((unused)))
case 1:
/* set up parameters for background page flushing */
filter_param.up_to_lsn= last_checkpoint_lsn;
pages_bunch_size= pages_to_flush_before_next_checkpoint /
maria_checkpoint_frequency;
pages_bunch_size= pages_to_flush_before_next_checkpoint / interval;
dfile= dfiles;
kfile= kfiles;
/* fall through */
default:
if (pages_bunch_size > 0)
{
DBUG_PRINT("info", ("Maria background checkpoint thread: %u pages",
pages_bunch_size));
/* flush a bunch of dirty pages */
filter_param.max_pages= pages_bunch_size;
filter_param.is_data_file= TRUE;
while (dfile != dfiles_end)
{
/*
We use FLUSH_KEEP_LAZY: if a file is already in flush, it's
smarter to move to the next file than wait for this one to be
completely flushed, which may take long.
*/
int res=
flush_pagecache_blocks_with_filter(maria_pagecache,
dfile, FLUSH_KEEP,
dfile, FLUSH_KEEP_LAZY,
filter_flush_data_file_evenly,
&filter_param);
/* note that it may just be a pinned page */
......@@ -609,7 +629,7 @@ pthread_handler_t ma_checkpoint_background(void *arg __attribute__((unused)))
{
int res=
flush_pagecache_blocks_with_filter(maria_pagecache,
dfile, FLUSH_KEEP,
dfile, FLUSH_KEEP_LAZY,
filter_flush_data_file_evenly,
&filter_param);
if (unlikely(res))
......@@ -640,22 +660,8 @@ pthread_handler_t ma_checkpoint_background(void *arg __attribute__((unused)))
pthread_mutex_unlock(&LOCK_checkpoint);
DBUG_PRINT("info",("Maria background checkpoint thread ends"));
/*
A last checkpoint, now that all tables should be closed; to have instant
recovery later. We always do it, because the test above about number of
log records or flushed pages is only approximative. For example, some log
records may have been written while ma_checkpoint_execute() above was
running, or some pages may have been flushed during this time. Thus it
could be that, while nothing has changed since that checkpoint's *end*, if
we recovered from that checkpoint we would have a non-empty dirty pages
list, REDOs to execute, and we don't want that, we want a clean shutdown
to have an empty recovery (simplifies upgrade/backups: one can just do a
clean shutdown, copy its tables to another system without copying the log
or control file and it will work because recovery will not need those).
Another reason why it's approximative is that a log record may have been
written above between ma_checkpoint_execute() and the
tranlog_get_horizon() which follows.
So, we have at least two checkpoints per start/stop of the engine, and
only two if the engine stays idle.
That's the final one, which guarantees that a clean shutdown always ends
with a checkpoint.
*/
ma_checkpoint_execute(CHECKPOINT_FULL, FALSE);
pthread_mutex_lock(&LOCK_checkpoint);
......@@ -829,6 +835,10 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
/* No need for a mutex to read the above, only us can write this flag */
continue;
}
/**
@todo We should not look at tables which didn't change since last
checkpoint.
*/
DBUG_PRINT("info",("looking at table '%s'", share->open_file_name));
if (state_copy == state_copies_end) /* we have no more cached states */
{
......@@ -1013,6 +1023,39 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
only a little slower than CHECKPOINT_INDIRECT).
*/
/*
PageCacheFlushConcurrencyBugs
Inside the page cache, calls to flush_pagecache_blocks_int() on the same
file are serialized. Examples of concurrency bugs which happened when we
didn't have this serialization:
- maria_chk_size() (via CHECK TABLE) happens concurrently with
Checkpoint: Checkpoint is flushing a page: it pins the page and is
pre-empted, maria_chk_size() wants to flush this page too so gets an
error because Checkpoint pinned this page. Such error makes
maria_chk_size() mark the table as corrupted.
- maria_close() happens concurrently with Checkpoint:
Checkpoint is flushing a page: it registers a request on the page, is
pre-empted ; maria_close() flushes this page too with FLUSH_RELEASE:
FLUSH_RELEASE will cause a free_block() which assumes the page is in the
LRU, but it is not (as Checkpoint registered a request). Crash.
- one thread is evicting a page of the file out of the LRU: it marks it
iPC_BLOCK_IN_SWITCH and is pre-empted. Then two other threads do flushes
of the same file concurrently (like above). Then one flusher sees the
page is in switch, removes it from changed_blocks[] and puts it in its
first_in_switch, so the other flusher will not see the page at all and
return too early. If it's maria_close() which returns too early, then
maria_close() may close the file descriptor, and the other flusher, and
the evicter will fail to write their page: corruption.
*/
/*
We do NOT use FLUSH_KEEP_LAZY because we must be sure that bitmap pages
have been flushed. That's a condition of correctness of Recovery: data
pages may have been all flushed, if we write the checkpoint record
Recovery will start from after their REDOs. If bitmap page was not
flushed, as the REDOs about it will be skipped, it will wrongly not be
recovered. If bitmap pages had a rec_lsn it would be different.
*/
/**
@todo we ignore the error because it may be just due a pinned page;
we should rather fix the function below to distinguish between
......
......@@ -32,7 +32,7 @@ typedef enum enum_ma_checkpoint_level {
} CHECKPOINT_LEVEL;
C_MODE_START
int ma_checkpoint_init(my_bool create_background_thread);
int ma_checkpoint_init(ulong interval);
void ma_checkpoint_end(void);
int ma_checkpoint_execute(CHECKPOINT_LEVEL level, my_bool no_wait);
C_MODE_END
......
......@@ -115,11 +115,6 @@
/* TODO: put it to my_static.c */
my_bool my_disable_flush_pagecache_blocks= 0;
/**
when flushing pages of a file, it can happen that we take some dirty blocks
out of changed_blocks[]; Checkpoint must not run at this moment.
*/
uint changed_blocks_is_incomplete= 0;
#define STRUCT_PTR(TYPE, MEMBER, a) \
(TYPE *) ((char *) (a) - offsetof(TYPE, MEMBER))
......@@ -320,6 +315,22 @@ struct st_pagecache_block_link
LSN rec_lsn;
};
/** @brief information describing a run of flush_pagecache_blocks_int() */
struct st_file_in_flush
{
PAGECACHE_FILE file;
/**
@brief threads waiting for the thread currently flushing this file to be
done
*/
WQUEUE flush_queue;
/**
@brief if the thread currently flushing the file has a non-empty
first_in_switch list.
*/
my_bool first_in_switch;
};
#ifndef DBUG_OFF
/* debug checks */
......@@ -678,9 +689,14 @@ ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem,
pagecache->disk_blocks= -1;
if (! pagecache->inited)
{
if (pthread_mutex_init(&pagecache->cache_lock, MY_MUTEX_INIT_FAST) ||
hash_init(&pagecache->files_in_flush, &my_charset_bin, 32,
offsetof(struct st_file_in_flush, file),
sizeof(((struct st_file_in_flush *)NULL)->file),
NULL, NULL, 0))
goto err;
pagecache->inited= 1;
pagecache->in_init= 0;
pthread_mutex_init(&pagecache->cache_lock, MY_MUTEX_INIT_FAST);
pagecache->resize_queue.last_thread= NULL;
}
......@@ -1074,6 +1090,7 @@ void end_pagecache(PAGECACHE *pagecache, my_bool cleanup)
if (cleanup)
{
hash_free(&pagecache->files_in_flush);
pthread_mutex_destroy(&pagecache->cache_lock);
pagecache->inited= pagecache->can_be_used= 0;
PAGECACHE_DEBUG_CLOSE;
......@@ -3557,7 +3574,8 @@ static int flush_cached_blocks(PAGECACHE *pagecache,
wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
#endif
/* type will never be FLUSH_IGNORE_CHANGED here */
if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
if (! (type == FLUSH_KEEP || type == FLUSH_KEEP_LAZY ||
type == FLUSH_FORCE_WRITE))
{
pagecache->blocks_changed--;
pagecache->global_blocks_changed--;
......@@ -3581,7 +3599,8 @@ static int flush_cached_blocks(PAGECACHE *pagecache,
@param file handler for the file to flush to
@param flush_type type of the flush
@param filter optional function which tells what blocks to flush;
can be non-NULL only if FLUSH_KEEP or FLUSH_FORCE_WRITE.
can be non-NULL only if FLUSH_KEEP, FLUSH_KEEP_LAZY
or FLUSH_FORCE_WRITE.
@param filter_arg an argument to pass to 'filter'. Information about
the block will be passed too.
......@@ -3590,6 +3609,12 @@ static int flush_cached_blocks(PAGECACHE *pagecache,
both from flush_pagecache_blocks and flush_all_key_blocks (the later one
does the mutex lock in the resize_pagecache() function).
@note
This function can cause problems if two threads call it
concurrently on the same file (look for "PageCacheFlushConcurrencyBugs"
in ma_checkpoint.c); to avoid them, it has internal logic to serialize in
this situation.
@return Operation status
@retval 0 OK
@retval 1 Error
......@@ -3615,9 +3640,15 @@ static int flush_pagecache_blocks_int(PAGECACHE *pagecache,
cache= cache_buff;
if (pagecache->disk_blocks > 0 &&
(!my_disable_flush_pagecache_blocks || type != FLUSH_KEEP))
(!my_disable_flush_pagecache_blocks ||
(type != FLUSH_KEEP && type != FLUSH_KEEP_LAZY)))
{
/* Key cache exists and flush is not disabled */
/*
Key cache exists. If my_disable_flush_pagecache_blocks is true it
disables the operation but only FLUSH_KEEP[_LAZY]: other flushes still
need to be allowed: FLUSH_RELEASE has to free blocks, and
FLUSH_FORCE_WRITE is to overrule my_disable_flush_pagecache_blocks.
*/
int error= 0;
uint count= 0;
PAGECACHE_BLOCK_LINK **pos, **end;
......@@ -3626,33 +3657,66 @@ static int flush_pagecache_blocks_int(PAGECACHE *pagecache,
#if defined(PAGECACHE_DEBUG)
uint cnt= 0;
#endif
uint8 changed_blocks_is_incomplete_incremented= 0;
if (type != FLUSH_IGNORE_CHANGED)
#ifdef THREAD
struct st_file_in_flush us_flusher, *other_flusher;
us_flusher.file= *file;
us_flusher.flush_queue.last_thread= NULL;
us_flusher.first_in_switch= FALSE;
while ((other_flusher= (struct st_file_in_flush *)
hash_search(&pagecache->files_in_flush, (uchar *)file,
sizeof(*file))))
{
/**
Count how many key blocks we have to cache to be able
to flush all dirty pages with minimum seek moves.
/*
File is in flush already: wait, unless FLUSH_KEEP_LAZY. "Flusher"
means "who can mark PCBLOCK_IN_FLUSH", i.e. caller of
flush_pagecache_blocks_int().
*/
struct st_my_thread_var *thread;
if (type == FLUSH_KEEP_LAZY)
{
DBUG_PRINT("info",("FLUSH_KEEP_LAZY skips"));
DBUG_RETURN(0);
}
thread= my_thread_var;
wqueue_add_to_queue(&other_flusher->flush_queue, thread);
do
{
KEYCACHE_DBUG_PRINT("flush_pagecache_blocks_int: wait1",
("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock);
}
while (thread->next);
}
/* we are the only flusher of this file now */
while (my_hash_insert(&pagecache->files_in_flush, (uchar *)&us_flusher))
{
/*
Out of memory, wait for flushers to empty the hash and retry; should
rarely happen. Other threads are flushing the file; when done, they
are going to remove themselves from the hash, and thus memory will
appear again. However, this memory may be stolen by yet another thread
(for a purpose unrelated to page cache), before we retry
hash_insert(). So the loop may run for long. Only if the thread was
killed do we abort the loop, returning 1 (error) which can cause the
table to be marked as corrupted (cf maria_chk_size(), maria_close())
and thus require a table check.
*/
DBUG_ASSERT(0);
pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
if (my_thread_var->abort)
DBUG_RETURN(1); /* End if aborted by user */
sleep(10);
pagecache_pthread_mutex_lock(&pagecache->cache_lock);
}
#endif
@todo RECOVERY BUG
We will soon here put code to wait if another thread is flushing the
same file, to avoid concurrency bugs. Examples of concurrency bugs
which happened without serialization:
- assume maria_chk_size() (via CHECK TABLE) happens
concurrently with Checkpoint: Checkpoint may be flushing a page, and
maria_chk_size() wants to flush this page too so gets an error
because Checkpoint pinned this page. Such error leads to marking the
table corrupted.
- assume maria_close() happens concurrently with Checkpoint:
Checkpoint may be flushing a page, and maria_close() flushes this
page too with FLUSH_RELEASE: the FLUSH_RELEASE will cause a
free_block() which assumes the page is in the LRU, but it is not (as
Checkpoint is flushing it). Crash.
- assume two flushes of the same file happen concurrently (like
above), and a third thread is pushing a page of this file out of the
LRU and runs first. Then one flusher will remove the page from
changed_blocks[] and put it in its first_in_switch, so the other
flusher will not see the page at all and return too early.
if (type != FLUSH_IGNORE_CHANGED)
{
/*
Count how many key blocks we have to cache to be able
to flush all dirty pages with minimum seek moves.
*/
for (block= pagecache->changed_blocks[FILE_HASH(*file)] ;
block;
......@@ -3745,34 +3809,15 @@ restart:
free_block(pagecache, block);
}
}
else
else if (type != FLUSH_KEEP_LAZY)
{
/* Link the block into a list of blocks 'in switch' */
unlink_changed(block);
link_changed(block, &first_in_switch);
/*
We have just removed a page from the list of dirty pages
("changed_blocks") though it's still dirty (the flush by another
thread has not yet happened). Checkpoint will miss the page and so
must be blocked until that flush has happened.
Note that if there are two concurrent
flush_pagecache_blocks_int() on this file, then the first one may
move the block into its first_in_switch, and the second one would
just not see the block and wrongly consider its job done.
@todo RECOVERY Maria does protect such flushes with intern_lock,
but Checkpoint does not (Checkpoint makes sure that
changed_blocks_is_incomplete is 0 when it starts, but as
flush_cached_blocks() releases mutex, this may change...
*/
/**
@todo RECOVERY: check all places where we remove a page from the
list of dirty pages
Link the block into a list of blocks 'in switch', and then we will
wait for this list to be empty, which means they have been flushed
*/
if (unlikely(!changed_blocks_is_incomplete_incremented))
{
changed_blocks_is_incomplete_incremented= 1;
changed_blocks_is_incomplete++;
}
unlink_changed(block);
link_changed(block, &first_in_switch);
us_flusher.first_in_switch= TRUE;
}
}
}
......@@ -3794,7 +3839,7 @@ restart:
wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
do
{
KEYCACHE_DBUG_PRINT("flush_pagecache_blocks_int: wait",
KEYCACHE_DBUG_PRINT("flush_pagecache_blocks_int: wait2",
("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock);
......@@ -3810,10 +3855,10 @@ restart:
KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
#endif
}
changed_blocks_is_incomplete-=
changed_blocks_is_incomplete_incremented;
us_flusher.first_in_switch= FALSE;
/* The following happens very seldom */
if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
if (! (type == FLUSH_KEEP || type == FLUSH_KEEP_LAZY ||
type == FLUSH_FORCE_WRITE))
{
/*
this code would free all blocks while filter maybe handled only a
......@@ -3841,6 +3886,12 @@ restart:
}
}
}
#ifdef THREAD
/* wake up others waiting to flush this file */
hash_delete(&pagecache->files_in_flush, (uchar *)&us_flusher);
if (us_flusher.flush_queue.last_thread)
wqueue_release_queue(&us_flusher.flush_queue);
#endif
}
#ifndef DBUG_OFF
......@@ -3862,7 +3913,8 @@ restart:
@param file handler for the file to flush to
@param flush_type type of the flush
@param filter optional function which tells what blocks to flush;
can be non-NULL only if FLUSH_KEEP or FLUSH_FORCE_WRITE.
can be non-NULL only if FLUSH_KEEP, FLUSH_KEEP_LAZY
or FLUSH_FORCE_WRITE.
@param filter_arg an argument to pass to 'filter'. Information about
the block will be passed too.
......@@ -3965,16 +4017,42 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
of memory at most.
*/
pagecache_pthread_mutex_lock(&pagecache->cache_lock);
while (changed_blocks_is_incomplete > 0)
#ifdef THREAD
for (;;)
{
struct st_file_in_flush *other_flusher;
for (file_hash= 0;
(other_flusher= (struct st_file_in_flush *)
hash_element(&pagecache->files_in_flush, file_hash)) != NULL &&
!other_flusher->first_in_switch;
file_hash++)
{}
if (other_flusher == NULL)
break;
/*
Some pages are more recent in memory than on disk (=dirty) and are not
in "changed_blocks" so we cannot know them. Wait.
other_flusher.first_in_switch is true: some thread is flushing a file
and has removed dirty blocks from changed_blocks[] while they were still
dirty (they were being evicted (=>flushed) by yet another thread, which
may not have flushed the block yet so it may still be dirty).
If Checkpoint proceeds now, it will not see the page. If there is a
crash right after writing the checkpoint record, before the page is
flushed, at recovery the page will be wrongly ignored because it won't
be in the dirty pages list in the checkpoint record. So wait.
*/
pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
sleep(1);
pagecache_pthread_mutex_lock(&pagecache->cache_lock);
{
struct st_my_thread_var *thread= my_thread_var;
wqueue_add_to_queue(&other_flusher->flush_queue, thread);
do
{
KEYCACHE_DBUG_PRINT("pagecache_collect_çhanged_blocks_with_lsn: wait",
("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock);
}
while (thread->next);
}
}
#endif
/* Count how many dirty pages are interesting */
for (file_hash= 0; file_hash < PAGECACHE_CHANGED_BLOCKS_HASH; file_hash++)
......
......@@ -22,6 +22,7 @@ C_MODE_START
#include "ma_loghandler_lsn.h"
#include <m_string.h>
#include <hash.h>
/* Type of the page */
enum pagecache_page_type
......@@ -159,6 +160,7 @@ typedef struct st_pagecache
my_bool resize_in_flush; /* true during flush of resize operation */
my_bool can_be_used; /* usage of cache for read/write is allowed */
my_bool in_init; /* Set to 1 in MySQL during init/resize */
HASH files_in_flush; /**< files in flush_pagecache_blocks_int() */
} PAGECACHE;
/** @brief Return values for PAGECACHE_FLUSH_FILTER */
......
......@@ -229,7 +229,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
if (!all_active_trans || !all_tables)
goto err;
if (take_checkpoints && ma_checkpoint_init(FALSE))
if (take_checkpoints && ma_checkpoint_init(0))
goto err;
recovery_message_printed= REC_MSG_NONE;
......
......@@ -83,7 +83,7 @@ int main(int argc,char *argv[])
translog_init(maria_data_root, TRANSLOG_FILE_SIZE,
0, 0, maria_log_pagecache,
TRANSLOG_DEFAULT_FLAGS) ||
(transactional && (trnman_init(0) || ma_checkpoint_init(FALSE))))
(transactional && (trnman_init(0) || ma_checkpoint_init(0))))
{
fprintf(stderr, "Error in initialization");
exit(1);
......
......@@ -98,7 +98,7 @@ int main(int argc, char *argv[])
translog_init(maria_data_root, TRANSLOG_FILE_SIZE,
0, 0, maria_log_pagecache,
TRANSLOG_DEFAULT_FLAGS) ||
(transactional && (trnman_init(0) || ma_checkpoint_init(FALSE))))
(transactional && (trnman_init(0) || ma_checkpoint_init(0))))
{
fprintf(stderr, "Error in initialization");
exit(1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment