Commit bbcd1ecc authored by unknown's avatar unknown

mf_keycache.c, keycache.h:

  Fix for the resize key cache operation.


include/keycache.h:
  Fix for the resize key cache operation.
mysys/mf_keycache.c:
  Fix for the resize key cache operation.
parent 920c30b4
......@@ -45,6 +45,8 @@ typedef struct st_keycache_wqueue
typedef struct st_key_cache
{
my_bool key_cache_inited;
my_bool resize_in_flush; /* true during flush of resize operation */
my_bool can_be_used; /* usage of cache for read/write is allowed */
uint key_cache_shift;
ulong key_cache_mem_size; /* specified size of the cache memory */
uint key_cache_block_size; /* size of the page buffer of a cache block */
......@@ -58,6 +60,7 @@ typedef struct st_key_cache
ulong blocks_used; /* number of currently used blocks */
ulong blocks_changed; /* number of currently dirty blocks */
ulong warm_blocks; /* number of blocks in warm sub-chain */
ulong cnt_for_resize_op; /* counter to block resize operation */
long blocks_available; /* number of blocks available in the LRU chain */
HASH_LINK **hash_root; /* arr. of entries into hash table buckets */
HASH_LINK *hash_link_root; /* memory for hash table links */
......@@ -67,6 +70,7 @@ typedef struct st_key_cache
BLOCK_LINK *used_last; /* ptr to the last block of the LRU chain */
BLOCK_LINK *used_ins; /* ptr to the insertion block in LRU chain */
pthread_mutex_t cache_lock; /* to lock access to the cache structure */
KEYCACHE_WQUEUE resize_queue; /* threads waiting during resize operation */
KEYCACHE_WQUEUE waiting_for_hash_link; /* waiting for a free hash link */
KEYCACHE_WQUEUE waiting_for_block; /* requests waiting for a free block */
BLOCK_LINK *changed_blocks[CHANGED_BLOCKS_HASH]; /* hash for dirty file bl.*/
......
......@@ -141,6 +141,11 @@ KEY_CACHE *dflt_key_cache= &dflt_key_cache_var;
#define FLUSH_CACHE 2000 /* sort this many blocks at once */
static int flush_all_key_blocks(KEY_CACHE *keycache);
static inline void link_into_queue(KEYCACHE_WQUEUE *wqueue,
struct st_my_thread_var *thread);
static inline void unlink_from_queue(KEYCACHE_WQUEUE *wqueue,
struct st_my_thread_var *thread);
static void free_block(KEY_CACHE *keycache, BLOCK_LINK *block);
static void test_key_cache(KEY_CACHE *keycache,
const char *where, my_bool lock);
......@@ -225,10 +230,10 @@ static int keycache_pthread_cond_broadcast(pthread_cond_t *cond);
static uint next_power(uint value)
{
uint old_value=1;
uint old_value= 1;
while (value)
{
old_value=value;
old_value= value;
value&= value-1;
}
return (old_value << 1);
......@@ -239,8 +244,8 @@ static uint next_power(uint value)
Initialize a key cache
SYNOPSIS
init_ky_cache()
keycache pointer to the key cache handle to initialize
init_key_cache()
keycache pointer to a key cache data structure
key_cache_block_size size of blocks to keep cached data
use_mem total memory to use for the key cache
division_limit division limit (may be zero)
......@@ -366,11 +371,16 @@ int init_key_cache(KEY_CACHE *keycache, uint key_cache_block_size,
blocks * age_threshold / 100 :
blocks);
keycache->cnt_for_resize_op= 0;
keycache->resize_in_flush= 0;
keycache->can_be_used= 1;
keycache->resize_queue.last_thread= NULL;
keycache->waiting_for_hash_link.last_thread= NULL;
keycache->waiting_for_block.last_thread= NULL;
DBUG_PRINT("exit",
("disk_blocks: %d block_root: %lx hash_entries: %d hash_root: %lx \
hash_links: %d hash_link_root %lx",
("disk_blocks: %d block_root: %lx hash_entries: %d\
hash_root: %lx hash_links: %d hash_link_root %lx",
keycache->disk_blocks, keycache->block_root,
keycache->hash_entries, keycache->hash_root,
keycache->hash_links, keycache->hash_link_root));
......@@ -398,6 +408,7 @@ int init_key_cache(KEY_CACHE *keycache, uint key_cache_block_size,
keycache->block_root= NULL;
}
my_errno= error;
keycache->can_be_used= 0;
DBUG_RETURN(0);
}
......@@ -407,8 +418,8 @@ int init_key_cache(KEY_CACHE *keycache, uint key_cache_block_size,
SYNOPSIS
resize_key_cache()
keycache in/out key cache handle
key_cache_block_size size of blocks to keep cached data
keycache pointer to a key cache data structure
key_cache_block_size size of blocks to keep cached data
use_mem total memory to use for the new key cache
division_limit new division limit (if not zero)
age_threshold new age threshold (if not zero)
......@@ -425,6 +436,10 @@ int init_key_cache(KEY_CACHE *keycache, uint key_cache_block_size,
old key cache blocks by calling the end_key_cache function and
then rebuilds the key cache with new blocks by calling
init_key_cache.
The function starts the operation only when all other threads
performing operations with the key cache let her to proceed
(when cnt_for_resize=0).
*/
int resize_key_cache(KEY_CACHE *keycache, uint key_cache_block_size,
......@@ -432,35 +447,90 @@ int resize_key_cache(KEY_CACHE *keycache, uint key_cache_block_size,
uint age_threshold)
{
int blocks;
struct st_my_thread_var *thread;
KEYCACHE_WQUEUE *wqueue;
DBUG_ENTER("resize_key_cache");
if (!keycache->key_cache_inited ||
(key_cache_block_size == keycache->key_cache_block_size &&
use_mem == keycache->key_cache_mem_size))
if (!keycache->key_cache_inited)
DBUG_RETURN(keycache->disk_blocks);
if(key_cache_block_size == keycache->key_cache_block_size &&
use_mem == keycache->key_cache_mem_size)
{
change_key_cache_param(keycache, division_limit, age_threshold);
DBUG_RETURN(keycache->disk_blocks);
}
keycache_pthread_mutex_lock(&keycache->cache_lock);
wqueue= &keycache->resize_queue;
thread=my_thread_var;
link_into_queue(wqueue, thread);
while (wqueue->last_thread->next != thread)
{
keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
}
keycache->resize_in_flush= 1;
if (flush_all_key_blocks(keycache))
{
/* TODO: if this happens, we should write a warning in the log file ! */
keycache_pthread_mutex_unlock(&keycache->cache_lock);
DBUG_RETURN(0);
keycache->resize_in_flush= 0;
blocks= 0;
goto finish;
}
keycache->resize_in_flush= 0;
keycache->can_be_used= 0;
while (keycache->cnt_for_resize_op)
{
keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
}
end_key_cache(keycache, 0); /* Don't free mutex */
/* the following will work even if use_mem is 0 */
/* The following will work even if use_mem is 0 */
blocks= init_key_cache(keycache, key_cache_block_size, use_mem,
division_limit, age_threshold);
finish:
unlink_from_queue(wqueue, thread);
/* Signal for the next resize request to proceeed if any */
if (wqueue->last_thread)
keycache_pthread_cond_signal(&wqueue->last_thread->next->suspend);
keycache->can_be_used= blocks <= 0;
keycache_pthread_mutex_unlock(&keycache->cache_lock);
return blocks;
}
/*
Increment counter blocking resize key cache operation
*/
static inline void inc_counter_for_resize_op(KEY_CACHE *keycache)
{
keycache->cnt_for_resize_op++;
}
/*
Decrement counter blocking resize key cache operation;
Signal the operation to proceed when counter becomes equal zero
*/
static inline void dec_counter_for_resize_op(KEY_CACHE *keycache)
{
struct st_my_thread_var *last_thread;
if (!--keycache->cnt_for_resize_op &&
(last_thread= keycache->resize_queue.last_thread))
keycache_pthread_cond_signal(&last_thread->next->suspend);
}
/*
Change the key cache parameters
SYNOPSIS
change_key_cache_param()
keycache key cache handle
keycache pointer to a key cache data structure
division_limit new division limit (if not zero)
age_threshold new age threshold (if not zero)
......@@ -478,12 +548,14 @@ void change_key_cache_param(KEY_CACHE *keycache, uint division_limit,
{
DBUG_ENTER("change_key_cache_param");
keycache_pthread_mutex_lock(&keycache->cache_lock);
if (division_limit)
keycache->min_warm_blocks= (keycache->disk_blocks *
division_limit / 100 + 1);
if (age_threshold)
keycache->age_threshold= (keycache->disk_blocks *
age_threshold / 100);
keycache_pthread_mutex_unlock(&keycache->cache_lock);
DBUG_VOID_RETURN;
}
......@@ -1018,7 +1090,7 @@ static void unlink_hash(KEY_CACHE *keycache, HASH_LINK *hash_link)
hash_link->block= NULL;
if (keycache->waiting_for_hash_link.last_thread)
{
/* Signal that A free hash link appeared */
/* Signal that a free hash link has appeared */
struct st_my_thread_var *last_thread=
keycache->waiting_for_hash_link.last_thread;
struct st_my_thread_var *first_thread= last_thread->next;
......@@ -1204,9 +1276,57 @@ static BLOCK_LINK *find_key_block(KEY_CACHE *keycache,
block->hash_link == hash_link && (block->status & BLOCK_READ))
page_status= PAGE_READ;
if (page_status == PAGE_READ && (block->status & BLOCK_IN_SWITCH))
if (wrmode && keycache->resize_in_flush)
{
/* This is a write request during the flush phase of a resize operation */
if (page_status != PAGE_READ)
{
/* We don't need the page in the cache: we are going to write on disk */
hash_link->requests--;
unlink_hash(keycache, hash_link);
return 0;
}
if (!(block->status & BLOCK_IN_FLUSH))
{
hash_link->requests--;
/*
Remove block to invalidate the page in the block buffer
as we are going to write directly on disk.
Although we have an exlusive lock for the updated key part
the control can be yieded by the current thread as we might
have unfinished readers of other key parts in the block
buffer. Still we are guaranteed not to have any readers
of the key part we are writing into until the block is
removed from the cache as we set the BLOCL_REASSIGNED
flag (see the code below that handles reading requests).
*/
free_block(keycache, block);
return 0;
}
/* Wait intil the page is flushed on disk */
hash_link->requests--;
{
struct st_my_thread_var *thread= my_thread_var;
add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
do
{
keycache_pthread_cond_wait(&thread->suspend,
&keycache->cache_lock);
}
while(thread->next);
}
/* Invalidate page in the block if it has not been done yet */
if (block->status)
free_block(keycache, block);
return 0;
}
if (page_status == PAGE_READ &&
(block->status & (BLOCK_IN_SWITCH | BLOCK_REASSIGNED)))
{
/* This is a request for a page to be removed from cache */
KEYCACHE_DBUG_PRINT("find_key_block",
("request for old page in block %u",BLOCK_NUMBER(block)));
/*
......@@ -1270,10 +1390,11 @@ static BLOCK_LINK *find_key_block(KEY_CACHE *keycache,
else
{
/* There are no never used blocks, use a block from the LRU chain */
/*
Wait until a new block is added to the LRU chain;
several threads might wait here for the same page,
all of them must get the same block
Wait until a new block is added to the LRU chain;
several threads might wait here for the same page,
all of them must get the same block
*/
if (! keycache->used_last)
......@@ -1521,7 +1642,7 @@ byte *key_cache_read(KEY_CACHE *keycache,
DBUG_PRINT("enter", ("file %u, filepos %lu, length %u",
(uint) file, (ulong) filepos, length));
if (keycache->disk_blocks > 0)
if (keycache->can_be_used)
{
/* Key cache is used */
reg1 BLOCK_LINK *block;
......@@ -1533,7 +1654,7 @@ byte *key_cache_read(KEY_CACHE *keycache,
do
{
keycache_pthread_mutex_lock(&keycache->cache_lock);
if (keycache->disk_blocks <= 0) /* Resize failed */
if (!keycache->can_be_used)
{
keycache_pthread_mutex_unlock(&keycache->cache_lock);
goto no_key_cache;
......@@ -1548,6 +1669,7 @@ byte *key_cache_read(KEY_CACHE *keycache,
return_buffer=0;
#endif
inc_counter_for_resize_op(keycache);
keycache->global_cache_r_requests++;
block=find_key_block(keycache, file, filepos, level, 0, &page_st);
if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
......@@ -1598,6 +1720,8 @@ byte *key_cache_read(KEY_CACHE *keycache,
*/
unreg_request(keycache, block, 1);
dec_counter_for_resize_op(keycache);
keycache_pthread_mutex_unlock(&keycache->cache_lock);
if (status & BLOCK_ERROR)
......@@ -1654,7 +1778,7 @@ int key_cache_insert(KEY_CACHE *keycache,
DBUG_PRINT("enter", ("file %u, filepos %lu, length %u",
(uint) file,(ulong) filepos, length));
if (keycache->disk_blocks > 0)
if (keycache->can_be_used)
{
/* Key cache is used */
reg1 BLOCK_LINK *block;
......@@ -1666,7 +1790,7 @@ int key_cache_insert(KEY_CACHE *keycache,
{
uint offset;
keycache_pthread_mutex_lock(&keycache->cache_lock);
if (keycache->disk_blocks <= 0) /* Resize failed */
if (!keycache->can_be_used)
{
keycache_pthread_mutex_unlock(&keycache->cache_lock);
DBUG_RETURN(0);
......@@ -1678,6 +1802,7 @@ int key_cache_insert(KEY_CACHE *keycache,
/* Read data into key cache from buff in key_cache_block_size incr. */
filepos-= offset;
inc_counter_for_resize_op(keycache);
keycache->global_cache_r_requests++;
block= find_key_block(keycache, file, filepos, level, 0, &page_st);
if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
......@@ -1708,6 +1833,9 @@ int key_cache_insert(KEY_CACHE *keycache,
unreg_request(keycache, block, 1);
error= (block->status & BLOCK_ERROR);
dec_counter_for_resize_op(keycache);
keycache_pthread_mutex_unlock(&keycache->cache_lock);
if (error)
......@@ -1776,7 +1904,7 @@ int key_cache_write(KEY_CACHE *keycache,
test_key_cache(keycache, "start of key_cache_write", 1););
#endif
if (keycache->disk_blocks > 0)
if (keycache->can_be_used)
{
/* Key cache is used */
uint read_length;
......@@ -1786,7 +1914,7 @@ int key_cache_write(KEY_CACHE *keycache,
{
uint offset;
keycache_pthread_mutex_lock(&keycache->cache_lock);
if (keycache->disk_blocks <= 0) /* Resize failed */
if (!keycache->can_be_used)
{
keycache_pthread_mutex_unlock(&keycache->cache_lock);
goto no_key_cache;
......@@ -1798,8 +1926,25 @@ int key_cache_write(KEY_CACHE *keycache,
/* Write data in key_cache_block_size increments */
filepos-= offset;
inc_counter_for_resize_op(keycache);
keycache->global_cache_w_requests++;
block= find_key_block(keycache, file, filepos, level, 1, &page_st);
if (!block)
{
/* It happens only for requests submitted during resize operation */
dec_counter_for_resize_op(keycache);
keycache_pthread_mutex_unlock(&keycache->cache_lock);
if (dont_write)
{
keycache->global_cache_w_requests++;
keycache->global_cache_write++;
if (my_pwrite(file, (byte*) buff, length, filepos,
MYF(MY_NABP | MY_WAIT_IF_FULL)))
error=1;
}
goto next_block;
}
if (block->status != BLOCK_ERROR && page_st != PAGE_READ &&
(offset || read_length < keycache->key_cache_block_size))
read_block(keycache, block,
......@@ -1841,8 +1986,11 @@ int key_cache_write(KEY_CACHE *keycache,
break;
}
keycache_pthread_mutex_unlock(&keycache->cache_lock);
dec_counter_for_resize_op(keycache);
keycache_pthread_mutex_unlock(&keycache->cache_lock);
next_block:
buff+= read_length;
filepos+= read_length;
offset= 0;
......@@ -1951,6 +2099,12 @@ static int flush_cached_blocks(KEY_CACHE *keycache,
if (!last_errno)
last_errno= errno ? errno : -1;
}
/*
Let to proceed for possible waiting requests to write to the block page.
It might happen only during an operation to resize the key cache.
*/
if (block->wqueue[COND_FOR_SAVED].last_thread)
release_queue(&block->wqueue[COND_FOR_SAVED]);
/* type will never be FLUSH_IGNORE_CHANGED here */
if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
{
......@@ -2187,18 +2341,20 @@ static int flush_key_blocks_int(KEY_CACHE *keycache,
1 error
*/
int flush_key_blocks(KEY_CACHE *key_cache,
int flush_key_blocks(KEY_CACHE *keycache,
File file, enum flush_type type)
{
int res;
DBUG_ENTER("flush_key_blocks");
DBUG_PRINT("enter", ("key_cache: %lx", key_cache));
DBUG_PRINT("enter", ("keycache: %lx", keycache));
if (key_cache->disk_blocks <= 0)
if (keycache->disk_blocks <= 0)
DBUG_RETURN(0);
keycache_pthread_mutex_lock(&key_cache->cache_lock);
res= flush_key_blocks_int(key_cache, file, type);
keycache_pthread_mutex_unlock(&key_cache->cache_lock);
keycache_pthread_mutex_lock(&keycache->cache_lock);
inc_counter_for_resize_op(keycache);
res= flush_key_blocks_int(keycache, file, type);
dec_counter_for_resize_op(keycache);
keycache_pthread_mutex_unlock(&keycache->cache_lock);
DBUG_RETURN(res);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment