mf_keycache.c 82.9 KB
Newer Older
unknown's avatar
unknown committed
1 2 3 4 5 6 7 8
/* Copyright (C) 2000 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
unknown's avatar
unknown committed
9
   but WITHOUT ANY WARRANTY; without even the implied warranty of
unknown's avatar
unknown committed
10 11 12 13 14 15
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
unknown's avatar
unknown committed
16 17

/*
unknown's avatar
unknown committed
18 19
  These functions handle keyblock cacheing for ISAM and MyISAM tables.

20 21
  One cache can handle many files.
  It must contain buffers of the same blocksize.
unknown's avatar
unknown committed
22
  init_key_cache() should be used to init cache handler.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39

  The free list (free_block_list) is a stack like structure.
  When a block is freed by free_block(), it is pushed onto the stack.
  When a new block is required it is first tried to pop one from the stack.
  If the stack is empty, it is tried to get a never-used block from the pool.
  If this is empty too, then a block is taken from the LRU ring, flushing it
  to disk, if neccessary. This is handled in find_key_block().
  With the new free list, the blocks can have three temperatures:
  hot, warm and cold (which is free). This is remembered in the block header
  by the enum BLOCK_TEMPERATURE temperature variable. Remembering the 
  temperature is neccessary to correctly count the number of warm blocks, 
  which is required to decide when blocks are allowed to become hot. Whenever 
  a block is inserted to another (sub-)chain, we take the old and new 
  temperature into account to decide if we got one more or less warm block.
  blocks_unused is the sum of never used blocks in the pool and of currently
  free blocks. blocks_used is the number of blocks fetched from the pool and
  as such gives the maximum number of in-use blocks at any time.
40
*/
unknown's avatar
unknown committed
41 42

#include "mysys_priv.h"
unknown's avatar
unknown committed
43
#include <keycache.h>
unknown's avatar
unknown committed
44 45 46
#include "my_static.h"
#include <m_string.h>
#include <errno.h>
47 48 49
#include <assert.h>
#include <stdarg.h>

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
/*
  Some compilation flags have been added specifically for this module
  to control the following:
  - not to let a thread to yield the control when reading directly
    from key cache, which might improve performance in many cases;
    to enable this add:
    #define SERIALIZED_READ_FROM_CACHE
  - to set an upper bound for number of threads simultaneously
    using the key cache; this setting helps to determine an optimal
    size for hash table and improve performance when the number of
    blocks in the key cache much less than the number of threads
    accessing it;
    to set this number equal to <N> add
      #define MAX_THREADS <N>
  - to substitute calls of pthread_cond_wait for calls of
    pthread_cond_timedwait (wait with timeout set up);
    this setting should be used only when you want to trap a deadlock
    situation, which theoretically should not happen;
    to set timeout equal to <T> seconds add
      #define KEYCACHE_TIMEOUT <T>
  - to enable the module traps and to send debug information from
    key cache module to a special debug log add:
      #define KEYCACHE_DEBUG
    the name of this debug log file <LOG NAME> can be set through:
      #define KEYCACHE_DEBUG_LOG  <LOG NAME>
    if the name is not defined, it's set by default;
    if the KEYCACHE_DEBUG flag is not set up and we are in a debug
    mode, i.e. when ! defined(DBUG_OFF), the debug information from the
    module is sent to the regular debug log.

  Example of the settings:
    #define SERIALIZED_READ_FROM_CACHE
    #define MAX_THREADS   100
    #define KEYCACHE_TIMEOUT  1
    #define KEYCACHE_DEBUG
    #define KEYCACHE_DEBUG_LOG  "my_key_cache_debug.log"
86
*/
unknown's avatar
unknown committed
87 88

#if defined(MSDOS) && !defined(M_IC80386)
89
/* we nead much memory */
unknown's avatar
unknown committed
90 91
#undef my_malloc_lock
#undef my_free_lock
92 93 94 95 96 97 98 99 100 101 102 103
#define my_malloc_lock(A,B)  halloc((long) (A/IO_SIZE),IO_SIZE)
#define my_free_lock(A,B)    hfree(A)
#endif /* defined(MSDOS) && !defined(M_IC80386) */

#define STRUCT_PTR(TYPE, MEMBER, a)                                           \
          (TYPE *) ((char *) (a) - offsetof(TYPE, MEMBER))

/* types of condition variables */
#define  COND_FOR_REQUESTED 0
#define  COND_FOR_SAVED     1
#define  COND_FOR_READERS   2

104 105 106
typedef pthread_cond_t KEYCACHE_CONDVAR;

/* descriptor of the page in the key cache block buffer */
unknown's avatar
unknown committed
107
struct st_keycache_page
108
{
109 110
  int file;               /* file to which the page belongs to  */
  my_off_t filepos;       /* position of the page in the file   */
unknown's avatar
unknown committed
111
};
112

113
/* element in the chain of a hash table bucket */
unknown's avatar
unknown committed
114
struct st_hash_link
115
{
116 117 118 119 120
  struct st_hash_link *next, **prev; /* to connect links in the same bucket  */
  struct st_block_link *block;       /* reference to the block for the page: */
  File file;                         /* from such a file                     */
  my_off_t diskpos;                  /* with such an offset                  */
  uint requests;                     /* number of requests for the page      */
unknown's avatar
unknown committed
121
};
122 123 124 125 126 127 128 129 130 131 132 133 134 135

/* simple states of a block */
#define BLOCK_ERROR       1   /* an error occured when performing disk i/o   */
#define BLOCK_READ        2   /* the is page in the block buffer             */
#define BLOCK_IN_SWITCH   4   /* block is preparing to read new page         */
#define BLOCK_REASSIGNED  8   /* block does not accept requests for old page */
#define BLOCK_IN_FLUSH   16   /* block is in flush operation                 */
#define BLOCK_CHANGED    32   /* block buffer contains a dirty page          */

/* page status, returned by find_key_block */
#define PAGE_READ               0
#define PAGE_TO_BE_READ         1
#define PAGE_WAIT_TO_BE_READ    2

136 137 138
/* block temperature determines in which (sub-)chain the block currently is */
enum BLOCK_TEMPERATURE { BLOCK_COLD /*free*/ , BLOCK_WARM , BLOCK_HOT };

139
/* key cache block */
unknown's avatar
unknown committed
140
struct st_block_link
141
{
142 143 144 145 146 147 148 149 150 151 152
  struct st_block_link
    *next_used, **prev_used;   /* to connect links in the LRU chain (ring)   */
  struct st_block_link
    *next_changed, **prev_changed; /* for lists of file dirty/clean blocks   */
  struct st_hash_link *hash_link; /* backward ptr to referring hash_link     */
  KEYCACHE_WQUEUE wqueue[2]; /* queues on waiting requests for new/old pages */
  uint requests;          /* number of requests for the block                */
  byte *buffer;           /* buffer for the block page                       */
  uint offset;            /* beginning of modified data in the buffer        */
  uint length;            /* end of data in the buffer                       */
  uint status;            /* state of the block                              */
153
  enum BLOCK_TEMPERATURE temperature; /* block temperature: cold, warm, hot */
154 155
  uint hits_left;         /* number of hits left until promotion             */
  ulonglong last_hit_time; /* timestamp of the last hit                      */
156
  KEYCACHE_CONDVAR *condvar; /* condition variable for 'no readers' event    */
unknown's avatar
unknown committed
157
};
158

unknown's avatar
unknown committed
159 160 161
KEY_CACHE dflt_key_cache_var;
KEY_CACHE *dflt_key_cache= &dflt_key_cache_var;

162 163
#define FLUSH_CACHE         2000            /* sort this many blocks at once */

unknown's avatar
unknown committed
164
static int flush_all_key_blocks(KEY_CACHE *keycache);
unknown's avatar
unknown committed
165
static void link_into_queue(KEYCACHE_WQUEUE *wqueue,
unknown's avatar
unknown committed
166
                                   struct st_my_thread_var *thread);
unknown's avatar
unknown committed
167
static void unlink_from_queue(KEYCACHE_WQUEUE *wqueue,
unknown's avatar
unknown committed
168 169
                                     struct st_my_thread_var *thread);
static void free_block(KEY_CACHE *keycache, BLOCK_LINK *block);
170
static void test_key_cache(KEY_CACHE *keycache,
unknown's avatar
unknown committed
171 172
                           const char *where, my_bool lock);

173
#define KEYCACHE_HASH(f, pos)                                                 \
unknown's avatar
unknown committed
174 175
(((ulong) ((pos) >> keycache->key_cache_shift)+                               \
                                     (ulong) (f)) & (keycache->hash_entries-1))
176
#define FILE_HASH(f)                 ((uint) (f) & (CHANGED_BLOCKS_HASH-1))
unknown's avatar
unknown committed
177

178
#define DEFAULT_KEYCACHE_DEBUG_LOG  "keycache_debug.log"
unknown's avatar
unknown committed
179

180 181 182
#if defined(KEYCACHE_DEBUG) && ! defined(KEYCACHE_DEBUG_LOG)
#define KEYCACHE_DEBUG_LOG  DEFAULT_KEYCACHE_DEBUG_LOG
#endif
unknown's avatar
unknown committed
183

184 185 186 187
#if defined(KEYCACHE_DEBUG_LOG)
static FILE *keycache_debug_log=NULL;
static void keycache_debug_print _VARARGS((const char *fmt,...));
#define KEYCACHE_DEBUG_OPEN                                                   \
unknown's avatar
unknown committed
188
          if (!keycache_debug_log) keycache_debug_log=fopen(KEYCACHE_DEBUG_LOG, "w")
189 190

#define KEYCACHE_DEBUG_CLOSE                                                  \
unknown's avatar
unknown committed
191
          if (keycache_debug_log) { fclose(keycache_debug_log); keycache_debug_log=0; }
192
#else
193
#define KEYCACHE_DEBUG_OPEN
194 195 196
#define KEYCACHE_DEBUG_CLOSE
#endif /* defined(KEYCACHE_DEBUG_LOG) */

197
#if defined(KEYCACHE_DEBUG_LOG) && defined(KEYCACHE_DEBUG)
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
#define KEYCACHE_DBUG_PRINT(l, m)                                             \
            { if (keycache_debug_log) fprintf(keycache_debug_log, "%s: ", l); \
              keycache_debug_print m; }

#define KEYCACHE_DBUG_ASSERT(a)                                               \
            { if (! (a) && keycache_debug_log) fclose(keycache_debug_log);    \
              assert(a); }
#else
#define KEYCACHE_DBUG_PRINT(l, m)  DBUG_PRINT(l, m)
#define KEYCACHE_DBUG_ASSERT(a)    DBUG_ASSERT(a)
#endif /* defined(KEYCACHE_DEBUG_LOG) && defined(KEYCACHE_DEBUG) */

#if defined(KEYCACHE_DEBUG) || !defined(DBUG_OFF)
static long keycache_thread_id;
#define KEYCACHE_THREAD_TRACE(l)                                              \
             KEYCACHE_DBUG_PRINT(l,("|thread %ld",keycache_thread_id))

#define KEYCACHE_THREAD_TRACE_BEGIN(l)                                        \
unknown's avatar
unknown committed
216 217
            { struct st_my_thread_var *thread_var= my_thread_var;             \
              keycache_thread_id= my_thread_var->id;                          \
218 219 220 221 222
              KEYCACHE_DBUG_PRINT(l,("[thread %ld",keycache_thread_id)) }

#define KEYCACHE_THREAD_TRACE_END(l)                                          \
            KEYCACHE_DBUG_PRINT(l,("]thread %ld",keycache_thread_id))
#else
223 224 225
#define KEYCACHE_THREAD_TRACE_BEGIN(l)
#define KEYCACHE_THREAD_TRACE_END(l)
#define KEYCACHE_THREAD_TRACE(l)
226 227 228
#endif /* defined(KEYCACHE_DEBUG) || !defined(DBUG_OFF) */

#define BLOCK_NUMBER(b)                                                       \
unknown's avatar
unknown committed
229
  ((uint) (((char*)(b)-(char *) keycache->block_root)/sizeof(BLOCK_LINK)))
230
#define HASH_LINK_NUMBER(h)                                                   \
unknown's avatar
unknown committed
231
  ((uint) (((char*)(h)-(char *) keycache->hash_link_root)/sizeof(HASH_LINK)))
232 233 234 235 236 237

#if (defined(KEYCACHE_TIMEOUT) && !defined(__WIN__)) || defined(KEYCACHE_DEBUG)
static int keycache_pthread_cond_wait(pthread_cond_t *cond,
                                      pthread_mutex_t *mutex);
#else
#define  keycache_pthread_cond_wait pthread_cond_wait
unknown's avatar
unknown committed
238 239
#endif

240 241 242 243 244 245 246 247 248 249 250 251 252 253
#if defined(KEYCACHE_DEBUG)
static int keycache_pthread_mutex_lock(pthread_mutex_t *mutex);
static void keycache_pthread_mutex_unlock(pthread_mutex_t *mutex);
static int keycache_pthread_cond_signal(pthread_cond_t *cond);
static int keycache_pthread_cond_broadcast(pthread_cond_t *cond);
#else
#define keycache_pthread_mutex_lock pthread_mutex_lock
#define keycache_pthread_mutex_unlock pthread_mutex_unlock
#define keycache_pthread_cond_signal pthread_cond_signal
#define keycache_pthread_cond_broadcast pthread_cond_broadcast
#endif /* defined(KEYCACHE_DEBUG) */

static uint next_power(uint value)
{
unknown's avatar
unknown committed
254
  uint old_value= 1;
255 256
  while (value)
  {
unknown's avatar
unknown committed
257
    old_value= value;
258 259 260 261
    value&= value-1;
  }
  return (old_value << 1);
}
unknown's avatar
unknown committed
262 263


264
/*
265 266 267
  Initialize a key cache

  SYNOPSIS
unknown's avatar
unknown committed
268 269
    init_key_cache()
    keycache			pointer to a key cache data structure
unknown's avatar
unknown committed
270 271 272 273
    key_cache_block_size	size of blocks to keep cached data
    use_mem                 	total memory to use for the key cache
    division_limit		division limit (may be zero)
    age_threshold		age threshold (may be zero)
274 275 276 277 278 279

  RETURN VALUE
    number of blocks in the key cache, if successful,
    0 - otherwise.

  NOTES.
unknown's avatar
unknown committed
280 281 282 283
    if keycache->key_cache_inited != 0 we assume that the key cache
    is already initialized.  This is for now used by myisamchk, but shouldn't
    be something that a program should rely on!

284 285
    It's assumed that no two threads call this function simultaneously
    referring to the same key cache handle.
286

287
*/
288

unknown's avatar
unknown committed
289 290 291
int init_key_cache(KEY_CACHE *keycache, uint key_cache_block_size,
		   ulong use_mem, uint division_limit,
		   uint age_threshold)
unknown's avatar
unknown committed
292
{
293 294
  uint blocks, hash_links, length;
  int error;
unknown's avatar
unknown committed
295
  DBUG_ENTER("init_key_cache");
296
  DBUG_ASSERT(key_cache_block_size >= 512);
297

298
  KEYCACHE_DEBUG_OPEN;
unknown's avatar
unknown committed
299
  if (keycache->key_cache_inited && keycache->disk_blocks > 0)
unknown's avatar
unknown committed
300
  {
301 302
    DBUG_PRINT("warning",("key cache already in use"));
    DBUG_RETURN(0);
unknown's avatar
unknown committed
303
  }
unknown's avatar
unknown committed
304

unknown's avatar
unknown committed
305 306 307
  keycache->global_cache_w_requests= keycache->global_cache_r_requests= 0;
  keycache->global_cache_read= keycache->global_cache_write= 0;
  keycache->disk_blocks= -1;
unknown's avatar
unknown committed
308
  if (! keycache->key_cache_inited)
unknown's avatar
unknown committed
309
  {
310
    keycache->key_cache_inited= 1;
unknown's avatar
unknown committed
311
    keycache->in_init= 0;
unknown's avatar
unknown committed
312
    pthread_mutex_init(&keycache->cache_lock, MY_MUTEX_INIT_FAST);
unknown's avatar
unknown committed
313
    keycache->resize_queue.last_thread= NULL;
unknown's avatar
unknown committed
314
  }
315

unknown's avatar
unknown committed
316 317 318 319 320
  keycache->key_cache_mem_size= use_mem;
  keycache->key_cache_block_size= key_cache_block_size;
  keycache->key_cache_shift= my_bit_log2(key_cache_block_size);
  DBUG_PRINT("info", ("key_cache_block_size: %u",
		      key_cache_block_size));
321

322
  blocks= (uint) (use_mem / (sizeof(BLOCK_LINK) + 2 * sizeof(HASH_LINK) +
323
			     sizeof(HASH_LINK*) * 5/4 + key_cache_block_size));
324
  /* It doesn't make sense to have too few blocks (less than 8) */
unknown's avatar
unknown committed
325
  if (blocks >= 8 && keycache->disk_blocks < 0)
unknown's avatar
unknown committed
326
  {
327
    for ( ; ; )
unknown's avatar
unknown committed
328
    {
329
      /* Set my_hash_entries to the next bigger 2 power */
330
      if ((keycache->hash_entries= next_power(blocks)) < blocks * 5/4)
unknown's avatar
unknown committed
331
        keycache->hash_entries<<= 1;
332
      hash_links= 2 * blocks;
333 334
#if defined(MAX_THREADS)
      if (hash_links < MAX_THREADS + blocks - 1)
335
        hash_links= MAX_THREADS + blocks - 1;
336
#endif
337 338
      while ((length= (ALIGN_SIZE(blocks * sizeof(BLOCK_LINK)) +
		       ALIGN_SIZE(hash_links * sizeof(HASH_LINK)) +
339
		       ALIGN_SIZE(sizeof(HASH_LINK*) *
340
                                  keycache->hash_entries))) +
unknown's avatar
unknown committed
341
	     ((ulong) blocks << keycache->key_cache_shift) > use_mem)
342 343
        blocks--;
      /* Allocate memory for cache page buffers */
344 345 346
      if ((keycache->block_mem=
	   my_malloc_lock((ulong) blocks * keycache->key_cache_block_size,
			  MYF(0))))
347
      {
348
        /*
349 350
	  Allocate memory for blocks, hash_links and hash entries;
	  For each block 2 hash links are allocated
351
        */
unknown's avatar
unknown committed
352 353
        if ((keycache->block_root= (BLOCK_LINK*) my_malloc((uint) length,
                                                           MYF(0))))
354
          break;
unknown's avatar
unknown committed
355
        my_free_lock(keycache->block_mem, MYF(0));
356
      }
357
      if (blocks < 8)
unknown's avatar
unknown committed
358
      {
unknown's avatar
unknown committed
359
        my_errno= ENOMEM;
360
        goto err;
unknown's avatar
unknown committed
361
      }
362
      blocks= blocks / 4*3;
unknown's avatar
unknown committed
363
    }
364
    keycache->blocks_unused= (ulong) blocks;
unknown's avatar
unknown committed
365 366 367 368 369 370
    keycache->disk_blocks= (int) blocks;
    keycache->hash_links= hash_links;
    keycache->hash_root= (HASH_LINK**) ((char*) keycache->block_root +
				        ALIGN_SIZE(blocks*sizeof(BLOCK_LINK)));
    keycache->hash_link_root= (HASH_LINK*) ((char*) keycache->hash_root +
				            ALIGN_SIZE((sizeof(HASH_LINK*) *
371
							keycache->hash_entries)));
unknown's avatar
unknown committed
372
    bzero((byte*) keycache->block_root,
373
	  keycache->disk_blocks * sizeof(BLOCK_LINK));
unknown's avatar
unknown committed
374
    bzero((byte*) keycache->hash_root,
375
          keycache->hash_entries * sizeof(HASH_LINK*));
unknown's avatar
unknown committed
376
    bzero((byte*) keycache->hash_link_root,
377
	  keycache->hash_links * sizeof(HASH_LINK));
unknown's avatar
unknown committed
378 379 380
    keycache->hash_links_used= 0;
    keycache->free_hash_list= NULL;
    keycache->blocks_used= keycache->blocks_changed= 0;
unknown's avatar
unknown committed
381

382
    keycache->global_blocks_changed= 0;
unknown's avatar
unknown committed
383 384
    keycache->blocks_available=0;		/* For debugging */

385
    /* The LRU chain is empty after initialization */
386 387
    keycache->used_last= NULL;
    keycache->used_ins= NULL;
388
    keycache->free_block_list= NULL;
389 390
    keycache->keycache_time= 0;
    keycache->warm_blocks= 0;
unknown's avatar
unknown committed
391 392
    keycache->min_warm_blocks= (division_limit ?
				blocks * division_limit / 100 + 1 :
393
				blocks);
unknown's avatar
unknown committed
394 395
    keycache->age_threshold= (age_threshold ?
			      blocks * age_threshold / 100 :
396
			      blocks);
397

unknown's avatar
unknown committed
398 399 400 401
    keycache->cnt_for_resize_op= 0;
    keycache->resize_in_flush= 0;
    keycache->can_be_used= 1;

unknown's avatar
unknown committed
402 403
    keycache->waiting_for_hash_link.last_thread= NULL;
    keycache->waiting_for_block.last_thread= NULL;
404
    DBUG_PRINT("exit",
unknown's avatar
unknown committed
405 406
	       ("disk_blocks: %d  block_root: %lx  hash_entries: %d\
 hash_root: %lx hash_links: %d hash_link_root %lx",
407 408 409
		keycache->disk_blocks, keycache->block_root,
		keycache->hash_entries, keycache->hash_root,
		keycache->hash_links, keycache->hash_link_root));
unknown's avatar
unknown committed
410 411 412 413
    bzero((gptr) keycache->changed_blocks,
	  sizeof(keycache->changed_blocks[0]) * CHANGED_BLOCKS_HASH);
    bzero((gptr) keycache->file_blocks,
	  sizeof(keycache->file_blocks[0]) * CHANGED_BLOCKS_HASH);
unknown's avatar
unknown committed
414
  }
unknown's avatar
unknown committed
415 416

  keycache->blocks= keycache->disk_blocks > 0 ? keycache->disk_blocks : 0;
417
  DBUG_RETURN((int) keycache->disk_blocks);
418

unknown's avatar
unknown committed
419
err:
unknown's avatar
unknown committed
420
  error= my_errno;
421
  keycache->disk_blocks= 0;
unknown's avatar
unknown committed
422
  keycache->blocks=  0;
unknown's avatar
unknown committed
423
  if (keycache->block_mem)
424
  {
unknown's avatar
unknown committed
425
    my_free_lock((gptr) keycache->block_mem, MYF(0));
426 427 428 429
    keycache->block_mem= NULL;
  }
  if (keycache->block_root)
  {
unknown's avatar
unknown committed
430
    my_free((gptr) keycache->block_root, MYF(0));
431 432
    keycache->block_root= NULL;
  }
unknown's avatar
unknown committed
433
  my_errno= error;
unknown's avatar
unknown committed
434
  keycache->can_be_used= 0;
unknown's avatar
unknown committed
435
  DBUG_RETURN(0);
436
}
unknown's avatar
unknown committed
437 438


unknown's avatar
unknown committed
439
/*
440 441 442 443
  Resize a key cache

  SYNOPSIS
    resize_key_cache()
unknown's avatar
unknown committed
444 445
    keycache     	        pointer to a key cache data structure
    key_cache_block_size        size of blocks to keep cached data
unknown's avatar
unknown committed
446 447 448
    use_mem			total memory to use for the new key cache
    division_limit		new division limit (if not zero)
    age_threshold		new age threshold (if not zero)
449 450 451 452 453 454 455

  RETURN VALUE
    number of blocks in the key cache, if successful,
    0 - otherwise.

  NOTES.
    The function first compares the memory size and the block size parameters
unknown's avatar
unknown committed
456 457 458 459 460 461
    with the key cache values.

    If they differ the function free the the memory allocated for the
    old key cache blocks by calling the end_key_cache function and
    then rebuilds the key cache with new blocks by calling
    init_key_cache.
unknown's avatar
unknown committed
462 463 464 465

    The function starts the operation only when all other threads
    performing operations with the key cache let her to proceed
    (when cnt_for_resize=0).
unknown's avatar
unknown committed
466
*/
467

unknown's avatar
unknown committed
468 469 470
int resize_key_cache(KEY_CACHE *keycache, uint key_cache_block_size,
		     ulong use_mem, uint division_limit,
		     uint age_threshold)
unknown's avatar
unknown committed
471
{
472
  int blocks;
unknown's avatar
unknown committed
473 474
  struct st_my_thread_var *thread;
  KEYCACHE_WQUEUE *wqueue;
475
  DBUG_ENTER("resize_key_cache");
unknown's avatar
unknown committed
476

unknown's avatar
unknown committed
477
  if (!keycache->key_cache_inited)
unknown's avatar
unknown committed
478
    DBUG_RETURN(keycache->disk_blocks);
unknown's avatar
unknown committed
479 480 481 482 483 484 485
  
  if(key_cache_block_size == keycache->key_cache_block_size &&
     use_mem == keycache->key_cache_mem_size)
  {
    change_key_cache_param(keycache, division_limit, age_threshold);
    DBUG_RETURN(keycache->disk_blocks);
  }    
486

unknown's avatar
unknown committed
487
  keycache_pthread_mutex_lock(&keycache->cache_lock);
unknown's avatar
unknown committed
488 489

  wqueue= &keycache->resize_queue;
unknown's avatar
unknown committed
490
  thread= my_thread_var;
unknown's avatar
unknown committed
491 492 493 494 495 496 497 498
  link_into_queue(wqueue, thread);

  while (wqueue->last_thread->next != thread)
  {
    keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
  }

  keycache->resize_in_flush= 1;
unknown's avatar
unknown committed
499
  if (flush_all_key_blocks(keycache))
unknown's avatar
unknown committed
500
  {
501
    /* TODO: if this happens, we should write a warning in the log file ! */
unknown's avatar
unknown committed
502 503
    keycache->resize_in_flush= 0;
    blocks= 0;
unknown's avatar
unknown committed
504
    keycache->can_be_used= 0;
unknown's avatar
unknown committed
505
    goto finish;
unknown's avatar
unknown committed
506
  }
unknown's avatar
unknown committed
507 508 509 510 511 512 513
  keycache->resize_in_flush= 0;
  keycache->can_be_used= 0;   
  while (keycache->cnt_for_resize_op)
  {
    keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
  }

unknown's avatar
unknown committed
514
  end_key_cache(keycache, 0);			/* Don't free mutex */
unknown's avatar
unknown committed
515
  /* The following will work even if use_mem is 0 */
unknown's avatar
unknown committed
516 517
  blocks= init_key_cache(keycache, key_cache_block_size, use_mem,
			 division_limit, age_threshold);
unknown's avatar
unknown committed
518 519 520 521 522 523

finish:
  unlink_from_queue(wqueue, thread);
  /* Signal for the next resize request to proceeed if any */
  if (wqueue->last_thread)
    keycache_pthread_cond_signal(&wqueue->last_thread->next->suspend);
unknown's avatar
unknown committed
524
  keycache_pthread_mutex_unlock(&keycache->cache_lock);
525
  return blocks;
unknown's avatar
unknown committed
526 527 528
}


unknown's avatar
unknown committed
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
/*
  Increment counter blocking resize key cache operation
*/
static inline void inc_counter_for_resize_op(KEY_CACHE *keycache)
{
  keycache->cnt_for_resize_op++;
}


/*
  Decrement counter blocking resize key cache operation;
  Signal the operation to proceed when counter becomes equal zero
*/
static inline void dec_counter_for_resize_op(KEY_CACHE *keycache)
{
  struct st_my_thread_var *last_thread;  
  if (!--keycache->cnt_for_resize_op &&
      (last_thread= keycache->resize_queue.last_thread)) 
    keycache_pthread_cond_signal(&last_thread->next->suspend);
}

550
/*
551
  Change the key cache parameters
552 553 554

  SYNOPSIS
    change_key_cache_param()
unknown's avatar
unknown committed
555
    keycache			pointer to a key cache data structure 
unknown's avatar
unknown committed
556 557
    division_limit		new division limit (if not zero)
    age_threshold		new age threshold (if not zero)
558 559 560 561 562 563 564

  RETURN VALUE
    none

  NOTES.
    Presently the function resets the key cache parameters
    concerning midpoint insertion strategy - division_limit and
unknown's avatar
unknown committed
565
    age_threshold.
566 567
*/

unknown's avatar
unknown committed
568 569
void change_key_cache_param(KEY_CACHE *keycache, uint division_limit,
			    uint age_threshold)
570
{
unknown's avatar
unknown committed
571
  DBUG_ENTER("change_key_cache_param");
572

unknown's avatar
unknown committed
573
  keycache_pthread_mutex_lock(&keycache->cache_lock);
unknown's avatar
unknown committed
574 575 576 577 578 579
  if (division_limit)
    keycache->min_warm_blocks= (keycache->disk_blocks *
				division_limit / 100 + 1);
  if (age_threshold)
    keycache->age_threshold=   (keycache->disk_blocks *
				age_threshold / 100);
unknown's avatar
unknown committed
580
  keycache_pthread_mutex_unlock(&keycache->cache_lock);
unknown's avatar
unknown committed
581
  DBUG_VOID_RETURN;
582 583 584
}


585
/*
586
  Remove key_cache from memory
587 588 589

  SYNOPSIS
    end_key_cache()
unknown's avatar
unknown committed
590 591
    keycache		key cache handle
    cleanup		Complete free (Free also mutex for key cache)
592 593 594

  RETURN VALUE
    none
595
*/
596

unknown's avatar
unknown committed
597
void end_key_cache(KEY_CACHE *keycache, my_bool cleanup)
unknown's avatar
unknown committed
598 599
{
  DBUG_ENTER("end_key_cache");
600 601
  DBUG_PRINT("enter", ("key_cache: %lx", keycache));

unknown's avatar
unknown committed
602
  if (!keycache->key_cache_inited)
603
    DBUG_VOID_RETURN;
unknown's avatar
unknown committed
604

unknown's avatar
unknown committed
605
  if (keycache->disk_blocks > 0)
unknown's avatar
unknown committed
606
  {
unknown's avatar
unknown committed
607
    if (keycache->block_mem)
unknown's avatar
unknown committed
608
    {
unknown's avatar
unknown committed
609
      my_free_lock((gptr) keycache->block_mem, MYF(0));
610
      keycache->block_mem= NULL;
unknown's avatar
unknown committed
611
      my_free((gptr) keycache->block_root, MYF(0));
612
      keycache->block_root= NULL;
unknown's avatar
unknown committed
613
    }
unknown's avatar
unknown committed
614
    keycache->disk_blocks= -1;
unknown's avatar
unknown committed
615 616
    /* Reset blocks_changed to be safe if flush_all_key_blocks is called */
    keycache->blocks_changed= 0;
unknown's avatar
unknown committed
617
  }
unknown's avatar
unknown committed
618 619 620 621

  DBUG_PRINT("status",
    ("used: %d  changed: %d  w_requests: %ld  \
writes: %ld  r_requests: %ld  reads: %ld",
622
      keycache->blocks_used, keycache->global_blocks_changed,
unknown's avatar
unknown committed
623 624 625
      keycache->global_cache_w_requests, keycache->global_cache_write,
      keycache->global_cache_r_requests, keycache->global_cache_read));

unknown's avatar
unknown committed
626 627
  if (cleanup)
  {
unknown's avatar
unknown committed
628
    pthread_mutex_destroy(&keycache->cache_lock);
unknown's avatar
unknown committed
629 630
    keycache->key_cache_inited= 0;
    KEYCACHE_DEBUG_CLOSE;
unknown's avatar
unknown committed
631
  }
unknown's avatar
unknown committed
632 633 634 635
  DBUG_VOID_RETURN;
} /* end_key_cache */


636
/*
637 638 639 640
  Link a thread into double-linked queue of waiting threads.

  SYNOPSIS
    link_into_queue()
641 642
      wqueue              pointer to the queue structure
      thread              pointer to the thread to be added to the queue
643 644 645 646 647 648 649 650

  RETURN VALUE
    none

  NOTES.
    Queue is represented by a circular list of the thread structures
    The list is double-linked of the type (**prev,*next), accessed by
    a pointer to the last element.
651
*/
652

653
static void link_into_queue(KEYCACHE_WQUEUE *wqueue,
654
                                   struct st_my_thread_var *thread)
655
{
656
  struct st_my_thread_var *last;
657
  if (! (last= wqueue->last_thread))
658 659
  {
    /* Queue is empty */
660 661
    thread->next= thread;
    thread->prev= &thread->next;
662 663
  }
  else
664
  {
665 666 667 668
    thread->prev= last->next->prev;
    last->next->prev= &thread->next;
    thread->next= last->next;
    last->next= thread;
669
  }
670
  wqueue->last_thread= thread;
671 672 673
}

/*
674
  Unlink a thread from double-linked queue of waiting threads
675 676 677

  SYNOPSIS
    unlink_from_queue()
678 679
      wqueue              pointer to the queue structure
      thread              pointer to the thread to be removed from the queue
680 681 682 683 684 685

  RETURN VALUE
    none

  NOTES.
    See NOTES for link_into_queue
686
*/
687

688
static void unlink_from_queue(KEYCACHE_WQUEUE *wqueue,
689
                                     struct st_my_thread_var *thread)
690
{
691 692 693
  KEYCACHE_DBUG_PRINT("unlink_from_queue", ("thread %ld", thread->id));
  if (thread->next == thread)
    /* The queue contains only one member */
694
    wqueue->last_thread= NULL;
695
  else
696
  {
697
    thread->next->prev= thread->prev;
698 699
    *thread->prev=thread->next;
    if (wqueue->last_thread == thread)
700 701
      wqueue->last_thread= STRUCT_PTR(struct st_my_thread_var, next,
                                      thread->prev);
702
  }
703
  thread->next= NULL;
704 705 706 707
}


/*
708
  Add a thread to single-linked queue of waiting threads
709 710 711

  SYNOPSIS
    add_to_queue()
712 713
      wqueue              pointer to the queue structure
      thread              pointer to the thread to be added to the queue
714 715 716 717 718 719 720 721

  RETURN VALUE
    none

  NOTES.
    Queue is represented by a circular list of the thread structures
    The list is single-linked of the type (*next), accessed by a pointer
    to the last element.
722
*/
723

724 725
static inline void add_to_queue(KEYCACHE_WQUEUE *wqueue,
                                struct st_my_thread_var *thread)
726
{
727
  struct st_my_thread_var *last;
728 729
  if (! (last= wqueue->last_thread))
    thread->next= thread;
730
  else
731
  {
732 733
    thread->next= last->next;
    last->next= thread;
734
  }
735
  wqueue->last_thread= thread;
736 737 738 739
}


/*
740
  Remove all threads from queue signaling them to proceed
741 742 743

  SYNOPSIS
    realease_queue()
744 745
      wqueue              pointer to the queue structure
      thread              pointer to the thread to be added to the queue
746 747 748 749 750 751 752

  RETURN VALUE
    none

  NOTES.
    See notes for add_to_queue
    When removed from the queue each thread is signaled via condition
753
    variable thread->suspend.
754
*/
755 756 757

static void release_queue(KEYCACHE_WQUEUE *wqueue)
{
unknown's avatar
unknown committed
758 759
  struct st_my_thread_var *last= wqueue->last_thread;
  struct st_my_thread_var *next= last->next;
760 761 762 763 764 765 766
  struct st_my_thread_var *thread;
  do
  {
    thread=next;
    keycache_pthread_cond_signal(&thread->suspend);
    KEYCACHE_DBUG_PRINT("release_queue: signal", ("thread %ld", thread->id));
    next=thread->next;
767
    thread->next= NULL;
768 769
  }
  while (thread != last);
770
  wqueue->last_thread= NULL;
771 772 773 774
}


/*
775
  Unlink a block from the chain of dirty/clean blocks
776
*/
777

778
static inline void unlink_changed(BLOCK_LINK *block)
unknown's avatar
unknown committed
779
{
780
  if (block->next_changed)
781 782
    block->next_changed->prev_changed= block->prev_changed;
  *block->prev_changed= block->next_changed;
unknown's avatar
unknown committed
783 784 785
}


786
/*
787
  Link a block into the chain of dirty/clean blocks
788
*/
789

790
static inline void link_changed(BLOCK_LINK *block, BLOCK_LINK **phead)
unknown's avatar
unknown committed
791
{
792 793
  block->prev_changed= phead;
  if ((block->next_changed= *phead))
794
    (*phead)->prev_changed= &block->next_changed;
795
  *phead= block;
unknown's avatar
unknown committed
796 797
}

798 799

/*
800 801
  Unlink a block from the chain of dirty/clean blocks, if it's asked for,
  and link it to the chain of clean blocks for the specified file
802
*/
803

unknown's avatar
unknown committed
804 805
static void link_to_file_list(KEY_CACHE *keycache,
                              BLOCK_LINK *block, int file, my_bool unlink)
unknown's avatar
unknown committed
806
{
807 808
  if (unlink)
    unlink_changed(block);
809
  link_changed(block, &keycache->file_blocks[FILE_HASH(file)]);
810 811
  if (block->status & BLOCK_CHANGED)
  {
812
    block->status&= ~BLOCK_CHANGED;
unknown's avatar
unknown committed
813
    keycache->blocks_changed--;
unknown's avatar
unknown committed
814
    keycache->global_blocks_changed--;
815
  }
unknown's avatar
unknown committed
816 817
}

818

819 820 821
/*
  Unlink a block from the chain of clean blocks for the specified
  file and link it to the chain of dirty blocks for this file
822
*/
823

unknown's avatar
unknown committed
824 825
static inline void link_to_changed_list(KEY_CACHE *keycache,
                                        BLOCK_LINK *block)
unknown's avatar
unknown committed
826
{
827
  unlink_changed(block);
unknown's avatar
unknown committed
828 829
  link_changed(block,
               &keycache->changed_blocks[FILE_HASH(block->hash_link->file)]);
830
  block->status|=BLOCK_CHANGED;
unknown's avatar
unknown committed
831
  keycache->blocks_changed++;
unknown's avatar
unknown committed
832
  keycache->global_blocks_changed++;
unknown's avatar
unknown committed
833 834 835
}


836
/*
837 838 839 840 841
  Link a block to the LRU chain at the beginning or at the end of
  one of two parts.

  SYNOPSIS
    link_block()
842
      keycache            pointer to a key cache data structure
843 844 845 846 847 848 849 850
      block               pointer to the block to link to the LRU chain
      hot                 <-> to link the block into the hot subchain
      at_end              <-> to link the block at the end of the subchain

  RETURN VALUE
    none

  NOTES.
851 852 853
    The LRU chain is represented by a curcular list of block structures.
    The list is double-linked of the type (**prev,*next) type.
    The LRU chain is divided into two parts - hot and warm.
854
    There are two pointers to access the last blocks of these two
855
    parts. The beginning of the warm part follows right after the
856 857 858 859 860 861 862 863 864 865 866 867 868
    end of the hot part.
    Only blocks of the warm part can be used for replacement.
    The first block from the beginning of this subchain is always
    taken for eviction (keycache->last_used->next)

    LRU chain:       +------+   H O T    +------+
                +----| end  |----...<----| beg  |----+
                |    +------+last        +------+    |
                v<-link in latest hot (new end)      |
                |     link in latest warm (new end)->^
                |    +------+  W A R M   +------+    |
                +----| beg  |---->...----| end  |----+
                     +------+            +------+ins
869
                  first for eviction
870
*/
871

872 873
static void link_block(KEY_CACHE *keycache, BLOCK_LINK *block, my_bool hot,
                       my_bool at_end)
874
{
875 876 877
  BLOCK_LINK *ins;
  BLOCK_LINK **pins;

878
  KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests));
879 880
  if (!hot && keycache->waiting_for_block.last_thread) {
    /* Signal that in the LRU warm sub-chain an available block has appeared */
unknown's avatar
unknown committed
881 882 883 884
    struct st_my_thread_var *last_thread=
                               keycache->waiting_for_block.last_thread;
    struct st_my_thread_var *first_thread= last_thread->next;
    struct st_my_thread_var *next_thread= first_thread;
885 886 887 888
    HASH_LINK *hash_link= (HASH_LINK *) first_thread->opt_info;
    struct st_my_thread_var *thread;
    do
    {
889 890
      thread= next_thread;
      next_thread= thread->next;
891
      /*
892 893 894 895 896 897
         We notify about the event all threads that ask
         for the same page as the first thread in the queue
      */
      if ((HASH_LINK *) thread->opt_info == hash_link)
      {
        keycache_pthread_cond_signal(&thread->suspend);
unknown's avatar
unknown committed
898
        unlink_from_queue(&keycache->waiting_for_block, thread);
899 900
        block->requests++;
      }
901
    }
902
    while (thread != last_thread);
unknown's avatar
unknown committed
903
    hash_link->block= block;
904 905
    KEYCACHE_THREAD_TRACE("link_block: after signaling");
#if defined(KEYCACHE_DEBUG)
906
    KEYCACHE_DBUG_PRINT("link_block",
907
        ("linked,unlinked block %u  status=%x  #requests=%u  #available=%u",
unknown's avatar
unknown committed
908
         BLOCK_NUMBER(block), block->status,
unknown's avatar
unknown committed
909
         block->requests, keycache->blocks_available));
unknown's avatar
unknown committed
910
#endif
911 912
    return;
  }
913
  pins= hot ? &keycache->used_ins : &keycache->used_last;
914
  ins= *pins;
915
  if (ins)
916
  {
917 918 919 920
    ins->next_used->prev_used= &block->next_used;
    block->next_used= ins->next_used;
    block->prev_used= &ins->next_used;
    ins->next_used= block;
921
    if (at_end)
922
      *pins= block;
923 924 925 926
  }
  else
  {
    /* The LRU chain is empty */
927
    keycache->used_last= keycache->used_ins= block->next_used= block;
unknown's avatar
unknown committed
928
    block->prev_used= &block->next_used;
929 930 931
  }
  KEYCACHE_THREAD_TRACE("link_block");
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
932
  keycache->blocks_available++;
933
  KEYCACHE_DBUG_PRINT("link_block",
934
      ("linked block %u:%1u  status=%x  #requests=%u  #available=%u",
935
       BLOCK_NUMBER(block), at_end, block->status,
unknown's avatar
unknown committed
936
       block->requests, keycache->blocks_available));
unknown's avatar
unknown committed
937 938
  KEYCACHE_DBUG_ASSERT((ulong) keycache->blocks_available <=
                       keycache->blocks_used);
939 940
#endif
}
unknown's avatar
unknown committed
941

942 943

/*
944
  Unlink a block from the LRU chain
945 946 947

  SYNOPSIS
    unlink_block()
948
      keycache            pointer to a key cache data structure
949 950 951 952 953 954 955
      block               pointer to the block to unlink from the LRU chain

  RETURN VALUE
    none

  NOTES.
    See NOTES for link_block
956
*/
957

unknown's avatar
unknown committed
958
static void unlink_block(KEY_CACHE *keycache, BLOCK_LINK *block)
959 960 961
{
  if (block->next_used == block)
    /* The list contains only one member */
962
    keycache->used_last= keycache->used_ins= NULL;
963
  else
964
  {
unknown's avatar
unknown committed
965 966 967 968
    block->next_used->prev_used= block->prev_used;
    *block->prev_used= block->next_used;
    if (keycache->used_last == block)
      keycache->used_last= STRUCT_PTR(BLOCK_LINK, next_used, block->prev_used);
969 970
    if (keycache->used_ins == block)
      keycache->used_ins=STRUCT_PTR(BLOCK_LINK, next_used, block->prev_used);
971
  }
unknown's avatar
unknown committed
972
  block->next_used= NULL;
973

974 975
  KEYCACHE_THREAD_TRACE("unlink_block");
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
976
  keycache->blocks_available--;
977
  KEYCACHE_DBUG_PRINT("unlink_block",
978
    ("unlinked block %u  status=%x   #requests=%u  #available=%u",
979
     BLOCK_NUMBER(block), block->status,
unknown's avatar
unknown committed
980 981
     block->requests, keycache->blocks_available));
  KEYCACHE_DBUG_ASSERT(keycache->blocks_available >= 0);
982 983 984 985 986
#endif
}


/*
987
  Register requests for a block
988
*/
unknown's avatar
unknown committed
989
static void reg_requests(KEY_CACHE *keycache, BLOCK_LINK *block, int count)
990 991 992
{
  if (! block->requests)
    /* First request for the block unlinks it */
unknown's avatar
unknown committed
993
    unlink_block(keycache, block);
994 995 996 997
  block->requests+=count;
}


998 999 1000
/*
  Unregister request for a block
  linking it to the LRU chain if it's the last request
1001 1002 1003 1004

  SYNOPSIS

    unreg_block()
1005
      keycache            pointer to a key cache data structure
1006 1007 1008 1009 1010 1011
      block               pointer to the block to link to the LRU chain
      at_end              <-> to link the block at the end of the LRU chain

  RETURN VALUE
    none

1012
  NOTES.
1013 1014 1015
    Every linking to the LRU chain decrements by one a special block
    counter (if it's positive). If the at_end parameter is TRUE the block is
    added either at the end of warm sub-chain or at the end of hot sub-chain.
1016 1017
    It is added to the hot subchain if its counter is zero and number of
    blocks in warm sub-chain is not less than some low limit (determined by
1018 1019
    the division_limit parameter). Otherwise the block is added to the warm
    sub-chain. If the at_end parameter is FALSE the block is always added
1020
    at beginning of the warm sub-chain.
1021 1022 1023 1024
    Thus a warm block can be promoted to the hot sub-chain when its counter
    becomes zero for the first time.
    At the same time  the block at the very beginning of the hot subchain
    might be moved to the beginning of the warm subchain if it stays untouched
1025
    for a too long time (this time is determined by parameter age_threshold).
1026
*/
1027

unknown's avatar
unknown committed
1028 1029
static inline void unreg_request(KEY_CACHE *keycache,
                                 BLOCK_LINK *block, int at_end)
1030 1031
{
  if (! --block->requests)
1032 1033 1034 1035 1036 1037 1038 1039
  {
    my_bool hot;
    if (block->hits_left)
      block->hits_left--;
    hot= !block->hits_left && at_end &&
      keycache->warm_blocks > keycache->min_warm_blocks;
    if (hot)
    {
1040 1041 1042
      if (block->temperature == BLOCK_WARM)
        keycache->warm_blocks--;
      block->temperature= BLOCK_HOT;
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
      KEYCACHE_DBUG_PRINT("unreg_request", ("#warm_blocks=%u",
                           keycache->warm_blocks));
    }
    link_block(keycache, block, hot, (my_bool)at_end);
    block->last_hit_time= keycache->keycache_time;
    if (++keycache->keycache_time - keycache->used_ins->last_hit_time >
	keycache->age_threshold)
    {
      block= keycache->used_ins;
      unlink_block(keycache, block);
      link_block(keycache, block, 0, 0);
1054 1055 1056 1057 1058
      if (block->temperature != BLOCK_WARM)
      {
        keycache->warm_blocks++;
        block->temperature= BLOCK_WARM;
      }
1059 1060 1061 1062
      KEYCACHE_DBUG_PRINT("unreg_request", ("#warm_blocks=%u",
                           keycache->warm_blocks));
    }
  }
1063 1064 1065
}

/*
1066
  Remove a reader of the page in block
1067
*/
1068

1069
static inline void remove_reader(BLOCK_LINK *block)
1070
{
1071 1072 1073 1074 1075 1076
  if (! --block->hash_link->requests && block->condvar)
    keycache_pthread_cond_signal(block->condvar);
}


/*
1077 1078
  Wait until the last reader of the page in block
  signals on its termination
1079
*/
1080

unknown's avatar
unknown committed
1081
static inline void wait_for_readers(KEY_CACHE *keycache, BLOCK_LINK *block)
1082
{
unknown's avatar
unknown committed
1083
  struct st_my_thread_var *thread= my_thread_var;
1084 1085
  while (block->hash_link->requests)
  {
1086
    block->condvar= &thread->suspend;
unknown's avatar
unknown committed
1087
    keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
1088
    block->condvar= NULL;
1089 1090 1091 1092 1093
  }
}


/*
1094
  Add a hash link to a bucket in the hash_table
1095
*/
1096

1097 1098 1099
static inline void link_hash(HASH_LINK **start, HASH_LINK *hash_link)
{
  if (*start)
unknown's avatar
unknown committed
1100 1101 1102 1103
    (*start)->prev= &hash_link->next;
  hash_link->next= *start;
  hash_link->prev= start;
  *start= hash_link;
1104 1105 1106 1107
}


/*
1108
  Remove a hash link from the hash table
1109
*/
1110

unknown's avatar
unknown committed
1111
static void unlink_hash(KEY_CACHE *keycache, HASH_LINK *hash_link)
1112 1113 1114 1115
{
  KEYCACHE_DBUG_PRINT("unlink_hash", ("file %u, filepos %lu #requests=%u",
      (uint) hash_link->file,(ulong) hash_link->diskpos, hash_link->requests));
  KEYCACHE_DBUG_ASSERT(hash_link->requests == 0);
unknown's avatar
unknown committed
1116 1117 1118 1119
  if ((*hash_link->prev= hash_link->next))
    hash_link->next->prev= hash_link->prev;
  hash_link->block= NULL;
  if (keycache->waiting_for_hash_link.last_thread)
1120
  {
unknown's avatar
unknown committed
1121
    /* Signal that a free hash link has appeared */
unknown's avatar
unknown committed
1122 1123 1124 1125
    struct st_my_thread_var *last_thread=
                               keycache->waiting_for_hash_link.last_thread;
    struct st_my_thread_var *first_thread= last_thread->next;
    struct st_my_thread_var *next_thread= first_thread;
1126 1127 1128
    KEYCACHE_PAGE *first_page= (KEYCACHE_PAGE *) (first_thread->opt_info);
    struct st_my_thread_var *thread;

unknown's avatar
unknown committed
1129 1130
    hash_link->file= first_page->file;
    hash_link->diskpos= first_page->filepos;
1131 1132 1133
    do
    {
      KEYCACHE_PAGE *page;
unknown's avatar
unknown committed
1134
      thread= next_thread;
1135
      page= (KEYCACHE_PAGE *) thread->opt_info;
unknown's avatar
unknown committed
1136
      next_thread= thread->next;
1137
      /*
1138 1139 1140 1141 1142 1143
         We notify about the event all threads that ask
         for the same page as the first thread in the queue
      */
      if (page->file == hash_link->file && page->filepos == hash_link->diskpos)
      {
        keycache_pthread_cond_signal(&thread->suspend);
unknown's avatar
unknown committed
1144
        unlink_from_queue(&keycache->waiting_for_hash_link, thread);
1145 1146 1147
      }
    }
    while (thread != last_thread);
unknown's avatar
unknown committed
1148 1149 1150
    link_hash(&keycache->hash_root[KEYCACHE_HASH(hash_link->file,
					         hash_link->diskpos)],
              hash_link);
1151
    return;
1152
  }
unknown's avatar
unknown committed
1153 1154
  hash_link->next= keycache->free_hash_list;
  keycache->free_hash_list= hash_link;
1155 1156
}

1157

1158
/*
1159
  Get the hash link for a page
1160
*/
1161

1162
static HASH_LINK *get_hash_link(KEY_CACHE *keycache,
unknown's avatar
unknown committed
1163
                                int file, my_off_t filepos)
1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179
{
  reg1 HASH_LINK *hash_link, **start;
  KEYCACHE_PAGE page;
#if defined(KEYCACHE_DEBUG)
  int cnt;
#endif

  KEYCACHE_DBUG_PRINT("get_hash_link", ("file %u, filepos %lu",
                      (uint) file,(ulong) filepos));

restart:
  /*
     Find the bucket in the hash table for the pair (file, filepos);
     start contains the head of the bucket list,
     hash_link points to the first member of the list
  */
unknown's avatar
unknown committed
1180
  hash_link= *(start= &keycache->hash_root[KEYCACHE_HASH(file, filepos)]);
1181
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
1182
  cnt= 0;
1183 1184 1185 1186 1187 1188 1189 1190
#endif
  /* Look for an element for the pair (file, filepos) in the bucket chain */
  while (hash_link &&
         (hash_link->diskpos != filepos || hash_link->file != file))
  {
    hash_link= hash_link->next;
#if defined(KEYCACHE_DEBUG)
    cnt++;
unknown's avatar
unknown committed
1191
    if (! (cnt <= keycache->hash_links_used))
1192 1193
    {
      int i;
unknown's avatar
unknown committed
1194 1195
      for (i=0, hash_link= *start ;
           i < cnt ; i++, hash_link= hash_link->next)
1196 1197 1198 1199 1200
      {
        KEYCACHE_DBUG_PRINT("get_hash_link", ("file %u, filepos %lu",
            (uint) hash_link->file,(ulong) hash_link->diskpos));
      }
    }
unknown's avatar
unknown committed
1201
    KEYCACHE_DBUG_ASSERT(cnt <= keycache->hash_links_used);
1202 1203 1204
#endif
  }
  if (! hash_link)
1205 1206
  {
    /* There is no hash link in the hash table for the pair (file, filepos) */
unknown's avatar
unknown committed
1207
    if (keycache->free_hash_list)
1208
    {
unknown's avatar
unknown committed
1209
      hash_link= keycache->free_hash_list;
1210
      keycache->free_hash_list= hash_link->next;
1211
    }
unknown's avatar
unknown committed
1212
    else if (keycache->hash_links_used < keycache->hash_links)
1213
    {
unknown's avatar
unknown committed
1214
      hash_link= &keycache->hash_link_root[keycache->hash_links_used++];
1215 1216
    }
    else
1217 1218
    {
      /* Wait for a free hash link */
unknown's avatar
unknown committed
1219
      struct st_my_thread_var *thread= my_thread_var;
1220
      KEYCACHE_DBUG_PRINT("get_hash_link", ("waiting"));
1221 1222
      page.file= file;
      page.filepos= filepos;
1223
      thread->opt_info= (void *) &page;
unknown's avatar
unknown committed
1224 1225
      link_into_queue(&keycache->waiting_for_hash_link, thread);
      keycache_pthread_cond_wait(&thread->suspend,
unknown's avatar
unknown committed
1226
                                 &keycache->cache_lock);
unknown's avatar
unknown committed
1227
      thread->opt_info= NULL;
1228 1229
      goto restart;
    }
unknown's avatar
unknown committed
1230 1231
    hash_link->file= file;
    hash_link->diskpos= filepos;
1232 1233 1234 1235
    link_hash(start, hash_link);
  }
  /* Register the request for the page */
  hash_link->requests++;
1236

1237 1238 1239 1240 1241
  return hash_link;
}


/*
1242 1243
  Get a block for the file page requested by a keycache read/write operation;
  If the page is not in the cache return a free block, if there is none
1244
  return the lru block after saving its buffer if the page is dirty.
1245

1246 1247 1248
  SYNOPSIS

    find_key_block()
1249
      keycache            pointer to a key cache data structure
1250 1251 1252 1253
      file                handler for the file to read page from
      filepos             position of the page in the file
      init_hits_left      how initialize the block counter for the page
      wrmode              <-> get for writing
1254
      page_st        out  {PAGE_READ,PAGE_TO_BE_READ,PAGE_WAIT_TO_BE_READ}
1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265

  RETURN VALUE
    Pointer to the found block if successful, 0 - otherwise

  NOTES.
    For the page from file positioned at filepos the function checks whether
    the page is in the key cache specified by the first parameter.
    If this is the case it immediately returns the block.
    If not, the function first chooses  a block for this page. If there is
    no not used blocks in the key cache yet, the function takes the block
    at the very beginning of the warm sub-chain. It saves the page in that
1266
    block if it's dirty before returning the pointer to it.
1267 1268 1269
    The function returns in the page_st parameter the following values:
      PAGE_READ         - if page already in the block,
      PAGE_TO_BE_READ   - if it is to be read yet by the current thread
1270
      WAIT_TO_BE_READ   - if it is to be read by another thread
1271 1272 1273
    If an error occurs THE BLOCK_ERROR bit is set in the block status.
    It might happen that there are no blocks in LRU chain (in warm part) -
    all blocks  are unlinked for some read/write operations. Then the function
1274
    waits until first of this operations links any block back.
1275 1276
*/

unknown's avatar
unknown committed
1277
static BLOCK_LINK *find_key_block(KEY_CACHE *keycache,
1278 1279
                                  File file, my_off_t filepos,
                                  int init_hits_left,
1280 1281 1282 1283
                                  int wrmode, int *page_st)
{
  HASH_LINK *hash_link;
  BLOCK_LINK *block;
unknown's avatar
unknown committed
1284
  int error= 0;
1285
  int page_status;
1286

1287 1288 1289
  DBUG_ENTER("find_key_block");
  KEYCACHE_THREAD_TRACE("find_key_block:begin");
  DBUG_PRINT("enter", ("file %u, filepos %lu, wrmode %lu",
1290
               (uint) file, (ulong) filepos, (uint) wrmode));
1291
  KEYCACHE_DBUG_PRINT("find_key_block", ("file %u, filepos %lu, wrmode %lu",
1292
                      (uint) file, (ulong) filepos, (uint) wrmode));
1293
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
unknown's avatar
unknown committed
1294 1295
  DBUG_EXECUTE("check_keycache2",
               test_key_cache(keycache, "start of find_key_block", 0););
unknown's avatar
unknown committed
1296
#endif
1297

1298 1299
restart:
  /* Find the hash link for the requested page (file, filepos) */
unknown's avatar
unknown committed
1300
  hash_link= get_hash_link(keycache, file, filepos);
1301

unknown's avatar
unknown committed
1302 1303
  page_status= -1;
  if ((block= hash_link->block) &&
1304
      block->hash_link == hash_link && (block->status & BLOCK_READ))
unknown's avatar
unknown committed
1305
    page_status= PAGE_READ;
1306

unknown's avatar
unknown committed
1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354
  if (wrmode && keycache->resize_in_flush)
  {
    /* This is a write request during the flush phase of a resize operation */

    if (page_status != PAGE_READ)
    {
      /* We don't need the page in the cache: we are going to write on disk */
      hash_link->requests--;
      unlink_hash(keycache, hash_link);
      return 0;
    }
    if (!(block->status & BLOCK_IN_FLUSH))
    { 
      hash_link->requests--;
      /*
        Remove block to invalidate the page in the block buffer
        as we are going to write directly on disk.
        Although we have an exlusive lock for the updated key part
        the control can be yieded by the current thread as we might
        have unfinished readers of other key parts in the block
        buffer. Still we are guaranteed not to have any readers
        of the key part we are writing into until the block is
        removed from the cache as we set the BLOCL_REASSIGNED
        flag (see the code below that handles reading requests). 
      */
      free_block(keycache, block); 
      return 0;
    }
    /* Wait intil the page is flushed on disk */
    hash_link->requests--;
    {
      struct st_my_thread_var *thread= my_thread_var;
      add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
      do
      {
        keycache_pthread_cond_wait(&thread->suspend,
                                   &keycache->cache_lock);
      }
      while(thread->next);
    }
    /* Invalidate page in the block if it has not been done yet */
    if (block->status)
      free_block(keycache, block);
    return 0;
  }
    
  if (page_status == PAGE_READ &&
      (block->status & (BLOCK_IN_SWITCH | BLOCK_REASSIGNED)))
1355 1356
  {
    /* This is a request for a page to be removed from cache */
unknown's avatar
unknown committed
1357

1358
    KEYCACHE_DBUG_PRINT("find_key_block",
1359
             ("request for old page in block %u",BLOCK_NUMBER(block)));
1360
    /*
1361 1362 1363 1364
       Only reading requests can proceed until the old dirty page is flushed,
       all others are to be suspended, then resubmitted
    */
    if (!wrmode && !(block->status & BLOCK_REASSIGNED))
1365
      reg_requests(keycache, block, 1);
1366 1367 1368
    else
    {
      hash_link->requests--;
1369
      KEYCACHE_DBUG_PRINT("find_key_block",
1370 1371
                          ("request waiting for old page to be saved"));
      {
unknown's avatar
unknown committed
1372
        struct st_my_thread_var *thread= my_thread_var;
1373 1374 1375 1376 1377
        /* Put the request into the queue of those waiting for the old page */
        add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
        /* Wait until the request can be resubmitted */
        do
        {
unknown's avatar
unknown committed
1378
          keycache_pthread_cond_wait(&thread->suspend,
unknown's avatar
unknown committed
1379
                                     &keycache->cache_lock);
1380 1381 1382
        }
        while(thread->next);
      }
1383
      KEYCACHE_DBUG_PRINT("find_key_block",
1384 1385 1386 1387 1388 1389
                          ("request for old page resubmitted"));
      /* Resubmit the request */
      goto restart;
    }
  }
  else
1390 1391
  {
    /* This is a request for a new page or for a page not to be removed */
1392
    if (! block)
1393 1394
    {
      /* No block is assigned for the page yet */
1395
      if (keycache->blocks_unused)
1396
      {
1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
        if (keycache->free_block_list)
        {
          /* There is a block in the free list. */
          block= keycache->free_block_list;
          keycache->free_block_list= block->next_used;
          block->next_used= NULL;
        }
        else
        {
          /* There are some never used blocks, take first of them */
          block= &keycache->block_root[keycache->blocks_used];
          block->buffer= ADD_TO_PTR(keycache->block_mem,
                                    ((ulong) keycache->blocks_used*
                                     keycache->key_cache_block_size),
                                    byte*);
          keycache->blocks_used++;
        }
        keycache->blocks_unused--;
unknown's avatar
unknown committed
1415 1416 1417 1418
        block->status= 0;
        block->length= 0;
        block->offset= keycache->key_cache_block_size;
        block->requests= 1;
1419
        block->temperature= BLOCK_COLD;
1420 1421
        block->hits_left= init_hits_left;
        block->last_hit_time= 0;
unknown's avatar
unknown committed
1422 1423
        link_to_file_list(keycache, block, file, 0);
        block->hash_link= hash_link;
1424
        hash_link->block= block;
unknown's avatar
unknown committed
1425
        page_status= PAGE_TO_BE_READ;
1426
        KEYCACHE_DBUG_PRINT("find_key_block",
1427 1428
                            ("got free or never used block %u",
                             BLOCK_NUMBER(block)));
1429 1430
      }
      else
1431 1432
      {
	/* There are no never used blocks, use a block from the LRU chain */
unknown's avatar
unknown committed
1433

1434
        /*
unknown's avatar
unknown committed
1435 1436 1437
          Wait until a new block is added to the LRU chain;
          several threads might wait here for the same page,
          all of them must get the same block
1438
        */
1439

unknown's avatar
unknown committed
1440
        if (! keycache->used_last)
1441
        {
unknown's avatar
unknown committed
1442 1443 1444
          struct st_my_thread_var *thread= my_thread_var;
          thread->opt_info= (void *) hash_link;
          link_into_queue(&keycache->waiting_for_block, thread);
1445
          do
1446
          {
unknown's avatar
unknown committed
1447
            keycache_pthread_cond_wait(&thread->suspend,
unknown's avatar
unknown committed
1448
                                       &keycache->cache_lock);
1449 1450
          }
          while (thread->next);
unknown's avatar
unknown committed
1451
          thread->opt_info= NULL;
1452
        }
unknown's avatar
unknown committed
1453
        block= hash_link->block;
1454 1455
        if (! block)
        {
1456 1457
          /*
             Take the first block from the LRU chain
1458 1459
             unlinking it from the chain
          */
unknown's avatar
unknown committed
1460
          block= keycache->used_last->next_used;
1461 1462
          block->hits_left= init_hits_left;
          block->last_hit_time= 0;
unknown's avatar
unknown committed
1463 1464
          reg_requests(keycache, block,1);
          hash_link->block= block;
1465
        }
1466 1467 1468 1469 1470

        if (block->hash_link != hash_link &&
	    ! (block->status & BLOCK_IN_SWITCH) )
        {
	  /* this is a primary request for a new page */
1471
          block->status|= BLOCK_IN_SWITCH;
1472 1473

          KEYCACHE_DBUG_PRINT("find_key_block",
unknown's avatar
unknown committed
1474
                        ("got block %u for new page", BLOCK_NUMBER(block)));
1475

1476
          if (block->status & BLOCK_CHANGED)
1477 1478 1479
          {
	    /* The block contains a dirty page - push it out of the cache */

unknown's avatar
unknown committed
1480
            KEYCACHE_DBUG_PRINT("find_key_block", ("block is dirty"));
1481

unknown's avatar
unknown committed
1482
            keycache_pthread_mutex_unlock(&keycache->cache_lock);
1483 1484 1485
            /*
	      The call is thread safe because only the current
	      thread might change the block->hash_link value
1486
            */
1487 1488 1489 1490 1491
	    error= my_pwrite(block->hash_link->file,
			     block->buffer+block->offset,
			     block->length - block->offset,
			     block->hash_link->diskpos+ block->offset,
			     MYF(MY_NABP | MY_WAIT_IF_FULL));
unknown's avatar
unknown committed
1492
            keycache_pthread_mutex_lock(&keycache->cache_lock);
unknown's avatar
unknown committed
1493
	    keycache->global_cache_write++;
1494
          }
1495

unknown's avatar
unknown committed
1496
          block->status|= BLOCK_REASSIGNED;
1497 1498
          if (block->hash_link)
          {
1499 1500 1501 1502 1503
            /*
	      Wait until all pending read requests
	      for this page are executed
	      (we could have avoided this waiting, if we had read
	      a page in the cache in a sweep, without yielding control)
1504
            */
unknown's avatar
unknown committed
1505
            wait_for_readers(keycache, block);
1506

1507
            /* Remove the hash link for this page from the hash table */
unknown's avatar
unknown committed
1508
            unlink_hash(keycache, block->hash_link);
1509 1510 1511 1512
            /* All pending requests for this page must be resubmitted */
            if (block->wqueue[COND_FOR_SAVED].last_thread)
              release_queue(&block->wqueue[COND_FOR_SAVED]);
          }
unknown's avatar
unknown committed
1513 1514 1515 1516 1517 1518 1519
          link_to_file_list(keycache, block, file,
                            (my_bool)(block->hash_link ? 1 : 0));
          block->status= error? BLOCK_ERROR : 0;
          block->length= 0;
          block->offset= keycache->key_cache_block_size;
          block->hash_link= hash_link;
          page_status= PAGE_TO_BE_READ;
1520

1521 1522 1523 1524 1525 1526
          KEYCACHE_DBUG_ASSERT(block->hash_link->block == block);
          KEYCACHE_DBUG_ASSERT(hash_link->block->hash_link == hash_link);
        }
        else
        {
          /* This is for secondary requests for a new page only */
unknown's avatar
unknown committed
1527 1528 1529
            page_status= block->hash_link == hash_link &&
                           (block->status & BLOCK_READ) ?
                              PAGE_READ : PAGE_WAIT_TO_BE_READ;
1530 1531
        }
      }
unknown's avatar
unknown committed
1532
      keycache->global_cache_read++;
1533 1534 1535
    }
    else
    {
unknown's avatar
unknown committed
1536
      reg_requests(keycache, block, 1);
1537 1538 1539 1540 1541
      page_status = block->hash_link == hash_link &&
                    (block->status & BLOCK_READ) ?
                      PAGE_READ : PAGE_WAIT_TO_BE_READ;
    }
  }
1542

1543 1544
  KEYCACHE_DBUG_ASSERT(page_status != -1);
  *page_st=page_status;
unknown's avatar
unknown committed
1545 1546 1547
  KEYCACHE_DBUG_PRINT("find_key_block",
                      ("file %u, filepos %lu, page_status %lu",
                      (uint) file,(ulong) filepos,(uint) page_status));
1548

1549
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
unknown's avatar
unknown committed
1550 1551
  DBUG_EXECUTE("check_keycache2",
               test_key_cache(keycache, "end of find_key_block",0););
1552 1553 1554 1555
#endif
  KEYCACHE_THREAD_TRACE("find_key_block:end");
  DBUG_RETURN(block);
}
unknown's avatar
unknown committed
1556 1557


1558
/*
1559 1560 1561 1562 1563
  Read into a key cache block buffer from disk.

  SYNOPSIS

    read_block()
1564
      keycache            pointer to a key cache data structure
1565
      block               block to which buffer the data is to be read
1566 1567 1568 1569
      read_length         size of data to be read
      min_length          at least so much data must be read
      primary             <-> the current thread will read the data

1570 1571 1572 1573 1574 1575 1576 1577 1578 1579
  RETURN VALUE
    None

  NOTES.
    The function either reads a page data from file to the block buffer,
    or waits until another thread reads it. What page to read is determined
    by a block parameter - reference to a hash link for this page.
    If an error occurs THE BLOCK_ERROR bit is set in the block status.
    We do not report error when the size of successfully read
    portion is less than read_length, but not less than min_length.
1580
*/
1581

unknown's avatar
unknown committed
1582 1583
static void read_block(KEY_CACHE *keycache,
                       BLOCK_LINK *block, uint read_length,
1584 1585 1586
                       uint min_length, my_bool primary)
{
  uint got_length;
1587

unknown's avatar
unknown committed
1588
  /* On entry cache_lock is locked */
1589

1590 1591
  KEYCACHE_THREAD_TRACE("read_block");
  if (primary)
1592 1593 1594 1595
  {
    /*
      This code is executed only by threads
      that submitted primary requests
1596
    */
1597 1598

    KEYCACHE_DBUG_PRINT("read_block",
1599
                        ("page to be read by primary request"));
1600

1601
    /* Page is not in buffer yet, is to be read from disk */
unknown's avatar
unknown committed
1602
    keycache_pthread_mutex_unlock(&keycache->cache_lock);
unknown's avatar
unknown committed
1603 1604
    got_length= my_pread(block->hash_link->file, block->buffer,
                         read_length, block->hash_link->diskpos, MYF(0));
unknown's avatar
unknown committed
1605
    keycache_pthread_mutex_lock(&keycache->cache_lock);
1606
    if (got_length < min_length)
unknown's avatar
unknown committed
1607
      block->status|= BLOCK_ERROR;
1608 1609
    else
    {
unknown's avatar
unknown committed
1610 1611
      block->status= BLOCK_READ;
      block->length= got_length;
1612
    }
1613
    KEYCACHE_DBUG_PRINT("read_block",
1614 1615 1616 1617 1618
                        ("primary request: new page in cache"));
    /* Signal that all pending requests for this page now can be processed */
    if (block->wqueue[COND_FOR_REQUESTED].last_thread)
      release_queue(&block->wqueue[COND_FOR_REQUESTED]);
  }
1619 1620 1621 1622 1623
  else
  {
    /*
      This code is executed only by threads
      that submitted secondary requests
1624
    */
1625
    KEYCACHE_DBUG_PRINT("read_block",
1626 1627
                      ("secondary request waiting for new page to be read"));
    {
1628
      struct st_my_thread_var *thread= my_thread_var;
1629
      /* Put the request into a queue and wait until it can be processed */
unknown's avatar
unknown committed
1630
      add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread);
1631 1632
      do
      {
unknown's avatar
unknown committed
1633
        keycache_pthread_cond_wait(&thread->suspend,
unknown's avatar
unknown committed
1634
                                   &keycache->cache_lock);
1635 1636 1637
      }
      while (thread->next);
    }
1638
    KEYCACHE_DBUG_PRINT("read_block",
1639 1640 1641 1642 1643 1644
                        ("secondary request: new page in cache"));
  }
}


/*
1645
  Read a block of data from a cached file into a buffer;
1646 1647 1648 1649

  SYNOPSIS

    key_cache_read()
1650
      keycache            pointer to a key cache data structure
1651 1652 1653
      file                handler for the file for the block of data to be read
      filepos             position of the block of data in the file
      level               determines the weight of the data
1654
      buff                buffer to where the data must be placed
1655
      length              length of the buffer
1656 1657 1658
      block_length        length of the block in the key cache buffer
      return_buffer       return pointer to the key cache buffer with the data

1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669
  RETURN VALUE
    Returns address from where the data is placed if sucessful, 0 - otherwise.

  NOTES.
    The function ensures that a block of data of size length from file
    positioned at filepos is in the buffers for some key cache blocks.
    Then the function either copies the data into the buffer buff, or,
    if return_buffer is TRUE, it just returns the pointer to the key cache
    buffer with the data.
    Filepos must be a multiple of 'block_length', but it doesn't
    have to be a multiple of key_cache_block_size;
1670
*/
unknown's avatar
unknown committed
1671

unknown's avatar
unknown committed
1672
byte *key_cache_read(KEY_CACHE *keycache,
1673 1674
                     File file, my_off_t filepos, int level,
                     byte *buff, uint length,
1675 1676
		     uint block_length __attribute__((unused)),
		     int return_buffer __attribute__((unused)))
unknown's avatar
unknown committed
1677 1678
{
  int error=0;
unknown's avatar
unknown committed
1679 1680
  uint offset= 0;
  byte *start= buff;
unknown's avatar
unknown committed
1681 1682
  DBUG_ENTER("key_cache_read");
  DBUG_PRINT("enter", ("file %u, filepos %lu, length %u",
1683
               (uint) file, (ulong) filepos, length));
1684

unknown's avatar
unknown committed
1685
  if (keycache->can_be_used)
1686 1687
  {
    /* Key cache is used */
1688
    reg1 BLOCK_LINK *block;
unknown's avatar
unknown committed
1689
    uint read_length;
1690 1691
    uint status;
    int page_st;
1692

1693
    /* Read data in key_cache_block_size increments */
unknown's avatar
unknown committed
1694 1695
    do
    {
unknown's avatar
unknown committed
1696
      keycache_pthread_mutex_lock(&keycache->cache_lock);
unknown's avatar
unknown committed
1697
      if (!keycache->can_be_used)	
unknown's avatar
unknown committed
1698 1699 1700 1701
      {
	keycache_pthread_mutex_unlock(&keycache->cache_lock);
	goto no_key_cache;
      }
unknown's avatar
unknown committed
1702 1703
      read_length= length > keycache->key_cache_block_size ?
                   keycache->key_cache_block_size : length;
1704
      KEYCACHE_DBUG_ASSERT(read_length > 0);
unknown's avatar
unknown committed
1705 1706 1707 1708 1709 1710 1711
      offset= (uint) (filepos & (keycache->key_cache_block_size-1));
      filepos-= offset;
#ifndef THREAD
      if (block_length > keycache->key_cache_block_size || offset)
	return_buffer=0;
#endif

unknown's avatar
unknown committed
1712
      inc_counter_for_resize_op(keycache);
unknown's avatar
unknown committed
1713
      keycache->global_cache_r_requests++;
1714
      block=find_key_block(keycache, file, filepos, level, 0, &page_st);
unknown's avatar
unknown committed
1715
      if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
unknown's avatar
unknown committed
1716
      {
1717
        /* The requested page is to be read into the block buffer */
1718
        read_block(keycache, block,
unknown's avatar
unknown committed
1719
                   keycache->key_cache_block_size, read_length+offset,
1720
                   (my_bool)(page_st == PAGE_TO_BE_READ));
unknown's avatar
unknown committed
1721
      }
1722 1723
      else if (! (block->status & BLOCK_ERROR) &&
               block->length < read_length + offset)
1724 1725 1726 1727 1728
      {
        /*
           Impossible if nothing goes wrong:
           this could only happen if we are using a file with
           small key blocks and are trying to read outside the file
1729
        */
unknown's avatar
unknown committed
1730 1731
        my_errno= -1;
        block->status|= BLOCK_ERROR;
unknown's avatar
unknown committed
1732
      }
1733

unknown's avatar
unknown committed
1734
      if (! ((status= block->status) & BLOCK_ERROR))
unknown's avatar
unknown committed
1735
      {
1736
#ifndef THREAD
1737
        if (! return_buffer)
1738 1739 1740
#endif
        {
#if !defined(SERIALIZED_READ_FROM_CACHE)
unknown's avatar
unknown committed
1741
          keycache_pthread_mutex_unlock(&keycache->cache_lock);
1742
#endif
1743

1744 1745
          /* Copy data from the cache buffer */
          if (!(read_length & 511))
unknown's avatar
unknown committed
1746
            bmove512(buff, block->buffer+offset, read_length);
1747
          else
unknown's avatar
unknown committed
1748
            memcpy(buff, block->buffer+offset, (size_t) read_length);
1749 1750

#if !defined(SERIALIZED_READ_FROM_CACHE)
unknown's avatar
unknown committed
1751
          keycache_pthread_mutex_lock(&keycache->cache_lock);
1752 1753
#endif
        }
unknown's avatar
unknown committed
1754
      }
1755

1756
      remove_reader(block);
1757 1758 1759
      /*
         Link the block into the LRU chain
         if it's the last submitted request for the block
1760
      */
1761
      unreg_request(keycache, block, 1);
1762

unknown's avatar
unknown committed
1763 1764
      dec_counter_for_resize_op(keycache);

unknown's avatar
unknown committed
1765
      keycache_pthread_mutex_unlock(&keycache->cache_lock);
1766

1767 1768
      if (status & BLOCK_ERROR)
        DBUG_RETURN((byte *) 0);
1769

1770
#ifndef THREAD
unknown's avatar
unknown committed
1771
      /* This is only true if we where able to read everything in one block */
1772
      if (return_buffer)
unknown's avatar
unknown committed
1773
	return (block->buffer);
unknown's avatar
unknown committed
1774
#endif
unknown's avatar
unknown committed
1775 1776
      buff+= read_length;
      filepos+= read_length;
1777

unknown's avatar
unknown committed
1778
    } while ((length-= read_length));
unknown's avatar
unknown committed
1779
    DBUG_RETURN(start);
unknown's avatar
unknown committed
1780
  }
1781

unknown's avatar
unknown committed
1782 1783 1784 1785 1786 1787
no_key_cache:					/* Key cache is not used */

  /* We can't use mutex here as the key cache may not be initialized */
  keycache->global_cache_r_requests++;
  keycache->global_cache_read++;
  if (my_pread(file, (byte*) buff, length, filepos+offset, MYF(MY_NABP)))
unknown's avatar
unknown committed
1788
    error= 1;
unknown's avatar
unknown committed
1789
  DBUG_RETURN(error ? (byte*) 0 : start);
1790
}
unknown's avatar
unknown committed
1791 1792


unknown's avatar
unknown committed
1793 1794 1795 1796
/*
  Insert a block of file data from a buffer into key cache

  SYNOPSIS
1797
    key_cache_insert()
unknown's avatar
unknown committed
1798 1799 1800 1801 1802 1803 1804 1805 1806 1807
    keycache            pointer to a key cache data structure
    file                handler for the file to insert data from
    filepos             position of the block of data in the file to insert
    level               determines the weight of the data
    buff                buffer to read data from
    length              length of the data in the buffer

  NOTES
    This is used by MyISAM to move all blocks from a index file to the key
    cache
1808

unknown's avatar
unknown committed
1809
  RETURN VALUE
1810
    0 if a success, 1 - otherwise.
unknown's avatar
unknown committed
1811 1812
*/

unknown's avatar
unknown committed
1813
int key_cache_insert(KEY_CACHE *keycache,
1814 1815
                     File file, my_off_t filepos, int level,
                     byte *buff, uint length)
unknown's avatar
unknown committed
1816 1817 1818 1819 1820
{
  DBUG_ENTER("key_cache_insert");
  DBUG_PRINT("enter", ("file %u, filepos %lu, length %u",
               (uint) file,(ulong) filepos, length));

unknown's avatar
unknown committed
1821
  if (keycache->can_be_used)
unknown's avatar
unknown committed
1822 1823 1824 1825 1826
  {
    /* Key cache is used */
    reg1 BLOCK_LINK *block;
    uint read_length;
    int page_st;
unknown's avatar
unknown committed
1827
    int error;
unknown's avatar
unknown committed
1828 1829 1830

    do
    {
unknown's avatar
unknown committed
1831 1832
      uint offset;
      keycache_pthread_mutex_lock(&keycache->cache_lock);
unknown's avatar
unknown committed
1833
      if (!keycache->can_be_used)	
unknown's avatar
unknown committed
1834 1835 1836 1837
      {
	keycache_pthread_mutex_unlock(&keycache->cache_lock);
	DBUG_RETURN(0);
      }
unknown's avatar
unknown committed
1838 1839
      read_length= length > keycache->key_cache_block_size ?
                   keycache->key_cache_block_size : length;
unknown's avatar
unknown committed
1840
      KEYCACHE_DBUG_ASSERT(read_length > 0);
unknown's avatar
unknown committed
1841 1842 1843 1844
      offset= (uint) (filepos & (keycache->key_cache_block_size-1));
      /* Read data into key cache from buff in key_cache_block_size incr. */
      filepos-= offset;

unknown's avatar
unknown committed
1845
      inc_counter_for_resize_op(keycache);
unknown's avatar
unknown committed
1846
      keycache->global_cache_r_requests++;
1847
      block= find_key_block(keycache, file, filepos, level, 0, &page_st);
unknown's avatar
unknown committed
1848 1849 1850 1851
      if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
      {
        /* The requested page is to be read into the block buffer */
#if !defined(SERIALIZED_READ_FROM_CACHE)
unknown's avatar
unknown committed
1852
        keycache_pthread_mutex_unlock(&keycache->cache_lock);
unknown's avatar
unknown committed
1853 1854 1855 1856 1857 1858 1859 1860 1861
#endif

        /* Copy data from buff */
        if (!(read_length & 511))
          bmove512(block->buffer+offset, buff, read_length);
        else
          memcpy(block->buffer+offset, buff, (size_t) read_length);

#if !defined(SERIALIZED_READ_FROM_CACHE)
unknown's avatar
unknown committed
1862
        keycache_pthread_mutex_lock(&keycache->cache_lock);
unknown's avatar
unknown committed
1863 1864 1865 1866 1867 1868 1869 1870 1871 1872
#endif
        block->status= BLOCK_READ;
        block->length= read_length+offset;
      }

      remove_reader(block);
      /*
         Link the block into the LRU chain
         if it's the last submitted request for the block
      */
1873
      unreg_request(keycache, block, 1);
unknown's avatar
unknown committed
1874

unknown's avatar
unknown committed
1875
      error= (block->status & BLOCK_ERROR);
unknown's avatar
unknown committed
1876 1877 1878

      dec_counter_for_resize_op(keycache);      

unknown's avatar
unknown committed
1879
      keycache_pthread_mutex_unlock(&keycache->cache_lock);
unknown's avatar
unknown committed
1880

unknown's avatar
unknown committed
1881
      if (error)
unknown's avatar
unknown committed
1882 1883
        DBUG_RETURN(1);

unknown's avatar
unknown committed
1884 1885
      buff+= read_length;
      filepos+= read_length;
unknown's avatar
unknown committed
1886 1887 1888 1889 1890 1891 1892

    } while ((length-= read_length));
  }
  DBUG_RETURN(0);
}


1893
/*
1894 1895
  Write a buffer into a cached file.

1896 1897 1898
  SYNOPSIS

    key_cache_write()
1899
      keycache            pointer to a key cache data structure
1900 1901 1902
      file                handler for the file to write data to
      filepos             position in the file to write data to
      level               determines the weight of the data
1903
      buff                buffer with the data
1904 1905
      length              length of the buffer
      dont_write          if is 0 then all dirty pages involved in writing
1906 1907
                          should have been flushed from key cache

1908 1909 1910 1911 1912 1913 1914
  RETURN VALUE
    0 if a success, 1 - otherwise.

  NOTES.
    The function copies the data of size length from buff into buffers
    for key cache blocks that are  assigned to contain the portion of
    the file starting with position filepos.
1915
    It ensures that this data is flushed to the file if dont_write is FALSE.
1916 1917
    Filepos must be a multiple of 'block_length', but it doesn't
    have to be a multiple of key_cache_block_size;
1918
*/
1919

unknown's avatar
unknown committed
1920
int key_cache_write(KEY_CACHE *keycache,
1921 1922
                    File file, my_off_t filepos, int level,
                    byte *buff, uint length,
1923 1924
                    uint block_length  __attribute__((unused)),
                    int dont_write)
unknown's avatar
unknown committed
1925
{
1926
  reg1 BLOCK_LINK *block;
unknown's avatar
unknown committed
1927
  int error=0;
unknown's avatar
unknown committed
1928
  DBUG_ENTER("key_cache_write");
1929 1930 1931 1932
  DBUG_PRINT("enter",
	     ("file %u  filepos %lu  length  %u  block_length %u  key_block_length: %u",
	      (uint) file, (ulong) filepos, length, block_length,
	      keycache ? keycache->key_cache_block_size : 0));
unknown's avatar
unknown committed
1933 1934

  if (!dont_write)
1935 1936
  {
    /* Force writing from buff into disk */
unknown's avatar
unknown committed
1937
    keycache->global_cache_write++;
unknown's avatar
unknown committed
1938
    if (my_pwrite(file, buff, length, filepos, MYF(MY_NABP | MY_WAIT_IF_FULL)))
unknown's avatar
unknown committed
1939
      DBUG_RETURN(1);
unknown's avatar
unknown committed
1940
  }
1941

unknown's avatar
unknown committed
1942
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
unknown's avatar
unknown committed
1943 1944
  DBUG_EXECUTE("check_keycache",
               test_key_cache(keycache, "start of key_cache_write", 1););
unknown's avatar
unknown committed
1945
#endif
1946

unknown's avatar
unknown committed
1947
  if (keycache->can_be_used)
1948 1949
  {
    /* Key cache is used */
unknown's avatar
unknown committed
1950
    uint read_length;
1951
    int page_st;
1952

unknown's avatar
unknown committed
1953 1954
    do
    {
unknown's avatar
unknown committed
1955 1956
      uint offset;
      keycache_pthread_mutex_lock(&keycache->cache_lock);
unknown's avatar
unknown committed
1957
      if (!keycache->can_be_used)	
unknown's avatar
unknown committed
1958 1959 1960 1961
      {
	keycache_pthread_mutex_unlock(&keycache->cache_lock);
	goto no_key_cache;
      }
unknown's avatar
unknown committed
1962
      read_length= length > keycache->key_cache_block_size ?
unknown's avatar
unknown committed
1963
	keycache->key_cache_block_size : length;
1964
      KEYCACHE_DBUG_ASSERT(read_length > 0);
unknown's avatar
unknown committed
1965 1966 1967 1968
      offset= (uint) (filepos & (keycache->key_cache_block_size-1));
      /* Write data in key_cache_block_size increments */
      filepos-= offset;

unknown's avatar
unknown committed
1969
      inc_counter_for_resize_op(keycache);
unknown's avatar
unknown committed
1970
      keycache->global_cache_w_requests++;
1971
      block= find_key_block(keycache, file, filepos, level, 1, &page_st);
unknown's avatar
unknown committed
1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987
      if (!block)
      {
        /* It happens only for requests submitted during resize operation */
        dec_counter_for_resize_op(keycache);
	keycache_pthread_mutex_unlock(&keycache->cache_lock);
	if (dont_write)
        {
          keycache->global_cache_w_requests++;
          keycache->global_cache_write++;
          if (my_pwrite(file, (byte*) buff, length, filepos,
		        MYF(MY_NABP | MY_WAIT_IF_FULL)))
            error=1;
	}
        goto next_block;
      }

unknown's avatar
unknown committed
1988
      if (block->status != BLOCK_ERROR && page_st != PAGE_READ &&
unknown's avatar
unknown committed
1989 1990 1991 1992
          (offset || read_length < keycache->key_cache_block_size))
        read_block(keycache, block,
                   offset + read_length >= keycache->key_cache_block_size?
                   offset : keycache->key_cache_block_size,
1993
                   offset,(my_bool)(page_st == PAGE_TO_BE_READ));
1994

1995
      if (!dont_write)
1996 1997 1998
      {
	/* buff has been written to disk at start */
        if ((block->status & BLOCK_CHANGED) &&
unknown's avatar
unknown committed
1999 2000
            (!offset && read_length >= keycache->key_cache_block_size))
             link_to_file_list(keycache, block, block->hash_link->file, 1);
2001 2002
      }
      else if (! (block->status & BLOCK_CHANGED))
unknown's avatar
unknown committed
2003
        link_to_changed_list(keycache, block);
2004

unknown's avatar
unknown committed
2005
      set_if_smaller(block->offset, offset);
unknown's avatar
unknown committed
2006
      set_if_bigger(block->length, read_length+offset);
2007

2008
      if (! (block->status & BLOCK_ERROR))
unknown's avatar
unknown committed
2009
      {
2010
        if (!(read_length & 511))
2011
	  bmove512(block->buffer+offset, buff, read_length);
2012
        else
unknown's avatar
unknown committed
2013
          memcpy(block->buffer+offset, buff, (size_t) read_length);
unknown's avatar
unknown committed
2014
      }
2015 2016 2017

      block->status|=BLOCK_READ;

2018 2019
      /* Unregister the request */
      block->hash_link->requests--;
unknown's avatar
unknown committed
2020
      unreg_request(keycache, block, 1);
2021

2022 2023
      if (block->status & BLOCK_ERROR)
      {
unknown's avatar
unknown committed
2024
        keycache_pthread_mutex_unlock(&keycache->cache_lock);
unknown's avatar
unknown committed
2025
        error= 1;
2026 2027
        break;
      }
2028

unknown's avatar
unknown committed
2029
      dec_counter_for_resize_op(keycache);
2030

unknown's avatar
unknown committed
2031 2032 2033
      keycache_pthread_mutex_unlock(&keycache->cache_lock);
    
    next_block:
unknown's avatar
unknown committed
2034 2035 2036
      buff+= read_length;
      filepos+= read_length;
      offset= 0;
2037

unknown's avatar
unknown committed
2038
    } while ((length-= read_length));
unknown's avatar
unknown committed
2039
    goto end;
unknown's avatar
unknown committed
2040
  }
unknown's avatar
unknown committed
2041 2042 2043 2044

no_key_cache:
  /* Key cache is not used */
  if (dont_write)
2045
  {
unknown's avatar
unknown committed
2046 2047 2048 2049 2050
    keycache->global_cache_w_requests++;
    keycache->global_cache_write++;
    if (my_pwrite(file, (byte*) buff, length, filepos,
		  MYF(MY_NABP | MY_WAIT_IF_FULL)))
      error=1;
2051
  }
unknown's avatar
unknown committed
2052

unknown's avatar
unknown committed
2053
end:
unknown's avatar
unknown committed
2054
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
unknown's avatar
unknown committed
2055 2056
  DBUG_EXECUTE("exec",
               test_key_cache(keycache, "end of key_cache_write", 1););
unknown's avatar
unknown committed
2057
#endif
2058 2059
  DBUG_RETURN(error);
}
unknown's avatar
unknown committed
2060 2061


2062 2063 2064
/*
  Free block: remove reference to it from hash table,
  remove it from the chain file of dirty/clean blocks
2065
  and add it to the free list.
2066 2067
*/

unknown's avatar
unknown committed
2068
static void free_block(KEY_CACHE *keycache, BLOCK_LINK *block)
unknown's avatar
unknown committed
2069
{
2070
  KEYCACHE_THREAD_TRACE("free block");
2071 2072 2073
  KEYCACHE_DBUG_PRINT("free_block",
                      ("block %u to be freed",BLOCK_NUMBER(block)));
  if (block->hash_link)
unknown's avatar
unknown committed
2074
  {
unknown's avatar
unknown committed
2075 2076 2077
    block->status|= BLOCK_REASSIGNED;
    wait_for_readers(keycache, block);
    unlink_hash(keycache, block->hash_link);
unknown's avatar
unknown committed
2078
  }
2079

2080
  unlink_changed(block);
unknown's avatar
unknown committed
2081 2082 2083
  block->status= 0;
  block->length= 0;
  block->offset= keycache->key_cache_block_size;
2084
  KEYCACHE_THREAD_TRACE("free block");
2085
  KEYCACHE_DBUG_PRINT("free_block",
2086
                      ("block is freed"));
unknown's avatar
unknown committed
2087 2088
  unreg_request(keycache, block, 0);
  block->hash_link= NULL;
2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099

  /* Remove the free block from the LRU ring. */
  unlink_block(keycache, block);
  if (block->temperature == BLOCK_WARM)
    keycache->warm_blocks--;
  block->temperature= BLOCK_COLD;
  /* Insert the free block in the free list. */
  block->next_used= keycache->free_block_list;
  keycache->free_block_list= block;
  /* Keep track of the number of currently unused blocks. */
  keycache->blocks_unused++;
unknown's avatar
unknown committed
2100 2101 2102
}


2103
static int cmp_sec_link(BLOCK_LINK **a, BLOCK_LINK **b)
unknown's avatar
unknown committed
2104
{
2105 2106
  return (((*a)->hash_link->diskpos < (*b)->hash_link->diskpos) ? -1 :
      ((*a)->hash_link->diskpos > (*b)->hash_link->diskpos) ? 1 : 0);
unknown's avatar
unknown committed
2107 2108
}

unknown's avatar
unknown committed
2109

2110 2111 2112
/*
  Flush a portion of changed blocks to disk,
  free used blocks if requested
2113
*/
2114

unknown's avatar
unknown committed
2115 2116
static int flush_cached_blocks(KEY_CACHE *keycache,
                               File file, BLOCK_LINK **cache,
2117 2118
                               BLOCK_LINK **end,
                               enum flush_type type)
unknown's avatar
unknown committed
2119
{
2120
  int error;
unknown's avatar
unknown committed
2121 2122
  int last_errno= 0;
  uint count= end-cache;
2123

2124
  /* Don't lock the cache during the flush */
unknown's avatar
unknown committed
2125
  keycache_pthread_mutex_unlock(&keycache->cache_lock);
2126 2127 2128
  /*
     As all blocks referred in 'cache' are marked by BLOCK_IN_FLUSH
     we are guarunteed no thread will change them
2129
  */
unknown's avatar
unknown committed
2130
  qsort((byte*) cache, count, sizeof(*cache), (qsort_cmp) cmp_sec_link);
2131

unknown's avatar
unknown committed
2132
  keycache_pthread_mutex_lock(&keycache->cache_lock);
2133
  for ( ; cache != end ; cache++)
unknown's avatar
unknown committed
2134
  {
2135
    BLOCK_LINK *block= *cache;
2136 2137

    KEYCACHE_DBUG_PRINT("flush_cached_blocks",
2138
                        ("block %u to be flushed", BLOCK_NUMBER(block)));
unknown's avatar
unknown committed
2139
    keycache_pthread_mutex_unlock(&keycache->cache_lock);
2140 2141 2142 2143
    error= my_pwrite(file,
		     block->buffer+block->offset,
		     block->length - block->offset,
                     block->hash_link->diskpos+ block->offset,
unknown's avatar
unknown committed
2144
                     MYF(MY_NABP | MY_WAIT_IF_FULL));
unknown's avatar
unknown committed
2145
    keycache_pthread_mutex_lock(&keycache->cache_lock);
unknown's avatar
unknown committed
2146
    keycache->global_cache_write++;
2147
    if (error)
unknown's avatar
unknown committed
2148
    {
2149
      block->status|= BLOCK_ERROR;
unknown's avatar
unknown committed
2150
      if (!last_errno)
unknown's avatar
unknown committed
2151
        last_errno= errno ? errno : -1;
2152
    }
unknown's avatar
unknown committed
2153 2154 2155 2156 2157 2158
    /* 
      Let to proceed for possible waiting requests to write to the block page.
      It might happen only during an operation to resize the key cache.
    */
    if (block->wqueue[COND_FOR_SAVED].last_thread)
      release_queue(&block->wqueue[COND_FOR_SAVED]);
2159 2160 2161
    /* type will never be FLUSH_IGNORE_CHANGED here */
    if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
    {
unknown's avatar
unknown committed
2162
      keycache->blocks_changed--;
unknown's avatar
unknown committed
2163
      keycache->global_blocks_changed--;
unknown's avatar
unknown committed
2164
      free_block(keycache, block);
2165
    }
2166
    else
2167
    {
unknown's avatar
unknown committed
2168 2169 2170
      block->status&= ~BLOCK_IN_FLUSH;
      link_to_file_list(keycache, block, file, 1);
      unreg_request(keycache, block, 1);
unknown's avatar
unknown committed
2171
    }
2172

unknown's avatar
unknown committed
2173 2174 2175 2176 2177
  }
  return last_errno;
}


2178
/*
2179
  flush all key blocks for a file to disk, but don't do any mutex locks
2180

unknown's avatar
unknown committed
2181
    flush_key_blocks_int()
2182
      keycache            pointer to a key cache data structure
2183 2184
      file                handler for the file to flush to
      flush_type          type of the flush
2185

2186 2187 2188 2189 2190 2191 2192 2193 2194 2195
  NOTES
    This function doesn't do any mutex locks because it needs to be called both
    from flush_key_blocks and flush_all_key_blocks (the later one does the
    mutex lock in the resize_key_cache() function).

  RETURN
    0   ok
    1  error
*/

unknown's avatar
unknown committed
2196
static int flush_key_blocks_int(KEY_CACHE *keycache,
2197
				File file, enum flush_type type)
unknown's avatar
unknown committed
2198
{
2199
  BLOCK_LINK *cache_buff[FLUSH_CACHE],**cache;
unknown's avatar
unknown committed
2200
  int last_errno= 0;
2201
  DBUG_ENTER("flush_key_blocks_int");
unknown's avatar
unknown committed
2202
  DBUG_PRINT("enter",("file: %d  blocks_used: %d  blocks_changed: %d",
unknown's avatar
unknown committed
2203
              file, keycache->blocks_used, keycache->blocks_changed));
2204

unknown's avatar
unknown committed
2205
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
unknown's avatar
unknown committed
2206 2207
    DBUG_EXECUTE("check_keycache",
                 test_key_cache(keycache, "start of flush_key_blocks", 0););
unknown's avatar
unknown committed
2208
#endif
2209

unknown's avatar
unknown committed
2210 2211
  cache= cache_buff;
  if (keycache->disk_blocks > 0 &&
2212
      (!my_disable_flush_key_blocks || type != FLUSH_KEEP))
2213 2214
  {
    /* Key cache exists and flush is not disabled */
unknown's avatar
unknown committed
2215 2216
    int error= 0;
    uint count= 0;
2217
    BLOCK_LINK **pos,**end;
unknown's avatar
unknown committed
2218
    BLOCK_LINK *first_in_switch= NULL;
2219 2220 2221 2222
    BLOCK_LINK *block, *next;
#if defined(KEYCACHE_DEBUG)
    uint cnt=0;
#endif
2223

unknown's avatar
unknown committed
2224 2225
    if (type != FLUSH_IGNORE_CHANGED)
    {
2226
      /*
2227 2228 2229
         Count how many key blocks we have to cache to be able
         to flush all dirty pages with minimum seek moves
      */
unknown's avatar
unknown committed
2230
      for (block= keycache->changed_blocks[FILE_HASH(file)] ;
2231
           block ;
unknown's avatar
unknown committed
2232
           block= block->next_changed)
unknown's avatar
unknown committed
2233
      {
2234
        if (block->hash_link->file == file)
2235
        {
2236
          count++;
unknown's avatar
unknown committed
2237
          KEYCACHE_DBUG_ASSERT(count<= keycache->blocks_used);
2238
        }
unknown's avatar
unknown committed
2239
      }
2240
      /* Allocate a new buffer only if its bigger than the one we have */
2241
      if (count > FLUSH_CACHE &&
unknown's avatar
unknown committed
2242 2243
          !(cache= (BLOCK_LINK**) my_malloc(sizeof(BLOCK_LINK*)*count,
                                            MYF(0))))
2244
      {
unknown's avatar
unknown committed
2245 2246
        cache= cache_buff;
        count= FLUSH_CACHE;
2247
      }
unknown's avatar
unknown committed
2248
    }
2249

2250 2251
    /* Retrieve the blocks and write them to a buffer to be flushed */
restart:
unknown's avatar
unknown committed
2252 2253
    end= (pos= cache)+count;
    for (block= keycache->changed_blocks[FILE_HASH(file)] ;
2254
         block ;
unknown's avatar
unknown committed
2255
         block= next)
unknown's avatar
unknown committed
2256
    {
2257 2258
#if defined(KEYCACHE_DEBUG)
      cnt++;
unknown's avatar
unknown committed
2259
      KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
2260
#endif
unknown's avatar
unknown committed
2261
      next= block->next_changed;
2262
      if (block->hash_link->file == file)
unknown's avatar
unknown committed
2263
      {
2264
        /*
2265 2266 2267 2268 2269
           Mark the block with BLOCK_IN_FLUSH in order not to let
           other threads to use it for new pages and interfere with
           our sequence ot flushing dirty file pages
        */
        block->status|= BLOCK_IN_FLUSH;
2270

2271
        if (! (block->status & BLOCK_IN_SWITCH))
2272 2273 2274 2275 2276
        {
	  /*
	    We care only for the blocks for which flushing was not
	    initiated by other threads as a result of page swapping
          */
unknown's avatar
unknown committed
2277
          reg_requests(keycache, block, 1);
2278 2279 2280
          if (type != FLUSH_IGNORE_CHANGED)
          {
	    /* It's not a temporary file */
2281
            if (pos == end)
2282 2283 2284 2285
            {
	      /*
		This happens only if there is not enough
		memory for the big block
2286
              */
2287
              if ((error= flush_cached_blocks(keycache, file, cache,
unknown's avatar
unknown committed
2288
                                              end,type)))
2289 2290 2291 2292 2293
                last_errno=error;
              /*
		Restart the scan as some other thread might have changed
		the changed blocks chain: the blocks that were in switch
		state before the flush started have to be excluded
2294 2295 2296
              */
              goto restart;
            }
unknown's avatar
unknown committed
2297
            *pos++= block;
2298 2299 2300 2301
          }
          else
          {
            /* It's a temporary file */
unknown's avatar
unknown committed
2302
            keycache->blocks_changed--;
unknown's avatar
unknown committed
2303
	    keycache->global_blocks_changed--;
unknown's avatar
unknown committed
2304
            free_block(keycache, block);
2305 2306 2307
          }
        }
        else
2308 2309
        {
	  /* Link the block into a list of blocks 'in switch' */
2310
          unlink_changed(block);
unknown's avatar
unknown committed
2311
          link_changed(block, &first_in_switch);
2312
        }
unknown's avatar
unknown committed
2313 2314 2315 2316
      }
    }
    if (pos != cache)
    {
unknown's avatar
unknown committed
2317 2318
      if ((error= flush_cached_blocks(keycache, file, cache, pos, type)))
        last_errno= error;
2319 2320 2321 2322 2323
    }
    /* Wait until list of blocks in switch is empty */
    while (first_in_switch)
    {
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
2324
      cnt= 0;
2325
#endif
unknown's avatar
unknown committed
2326
      block= first_in_switch;
2327
      {
unknown's avatar
unknown committed
2328
        struct st_my_thread_var *thread= my_thread_var;
2329 2330 2331
        add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
        do
        {
unknown's avatar
unknown committed
2332
          keycache_pthread_cond_wait(&thread->suspend,
unknown's avatar
unknown committed
2333
                                     &keycache->cache_lock);
2334 2335 2336 2337 2338
        }
        while (thread->next);
      }
#if defined(KEYCACHE_DEBUG)
      cnt++;
unknown's avatar
unknown committed
2339
      KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
2340
#endif
unknown's avatar
unknown committed
2341 2342
    }
    /* The following happens very seldom */
2343
    if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
unknown's avatar
unknown committed
2344
    {
2345 2346 2347
#if defined(KEYCACHE_DEBUG)
      cnt=0;
#endif
unknown's avatar
unknown committed
2348
      for (block= keycache->file_blocks[FILE_HASH(file)] ;
2349
           block ;
unknown's avatar
unknown committed
2350
           block= next)
unknown's avatar
unknown committed
2351
      {
2352 2353
#if defined(KEYCACHE_DEBUG)
        cnt++;
unknown's avatar
unknown committed
2354
        KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
2355
#endif
unknown's avatar
unknown committed
2356
        next= block->next_changed;
2357 2358 2359 2360
        if (block->hash_link->file == file &&
            (! (block->status & BLOCK_CHANGED)
             || type == FLUSH_IGNORE_CHANGED))
        {
unknown's avatar
unknown committed
2361 2362
          reg_requests(keycache, block, 1);
          free_block(keycache, block);
2363
        }
unknown's avatar
unknown committed
2364 2365
      }
    }
2366
  }
2367

unknown's avatar
unknown committed
2368
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2369 2370
  DBUG_EXECUTE("check_keycache",
               test_key_cache(keycache, "end of flush_key_blocks", 0););
unknown's avatar
unknown committed
2371 2372
#endif
  if (cache != cache_buff)
unknown's avatar
unknown committed
2373
    my_free((gptr) cache, MYF(0));
unknown's avatar
unknown committed
2374
  if (last_errno)
2375
    errno=last_errno;                /* Return first error */
unknown's avatar
unknown committed
2376
  DBUG_RETURN(last_errno != 0);
unknown's avatar
unknown committed
2377 2378 2379
}


2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392
/*
  Flush all blocks for a file to disk

  SYNOPSIS

    flush_key_blocks()
      keycache            pointer to a key cache data structure
      file                handler for the file to flush to
      flush_type          type of the flush

  RETURN
    0   ok
    1  error
unknown's avatar
unknown committed
2393
*/
2394

unknown's avatar
unknown committed
2395
int flush_key_blocks(KEY_CACHE *keycache,
2396 2397 2398 2399
                     File file, enum flush_type type)
{
  int res;
  DBUG_ENTER("flush_key_blocks");
unknown's avatar
unknown committed
2400
  DBUG_PRINT("enter", ("keycache: %lx", keycache));
2401

unknown's avatar
unknown committed
2402
  if (keycache->disk_blocks <= 0)
2403
    DBUG_RETURN(0);
unknown's avatar
unknown committed
2404 2405 2406 2407 2408
  keycache_pthread_mutex_lock(&keycache->cache_lock);
  inc_counter_for_resize_op(keycache);
  res= flush_key_blocks_int(keycache, file, type);
  dec_counter_for_resize_op(keycache);
  keycache_pthread_mutex_unlock(&keycache->cache_lock);
2409 2410 2411 2412
  DBUG_RETURN(res);
}


2413 2414
/*
  Flush all blocks in the key cache to disk
2415
*/
2416

unknown's avatar
unknown committed
2417
static int flush_all_key_blocks(KEY_CACHE *keycache)
2418 2419 2420 2421
{
#if defined(KEYCACHE_DEBUG)
  uint cnt=0;
#endif
unknown's avatar
unknown committed
2422
  while (keycache->blocks_changed > 0)
2423 2424
  {
    BLOCK_LINK *block;
unknown's avatar
unknown committed
2425
    for (block= keycache->used_last->next_used ; ; block=block->next_used)
2426 2427 2428 2429 2430
    {
      if (block->hash_link)
      {
#if defined(KEYCACHE_DEBUG)
        cnt++;
unknown's avatar
unknown committed
2431
        KEYCACHE_DBUG_ASSERT(cnt <= keycache->blocks_used);
2432
#endif
2433 2434
        if (flush_key_blocks_int(keycache, block->hash_link->file,
				 FLUSH_RELEASE))
2435 2436 2437
          return 1;
        break;
      }
unknown's avatar
unknown committed
2438
      if (block == keycache->used_last)
2439 2440 2441 2442 2443
        break;
    }
  }
  return 0;
}
unknown's avatar
unknown committed
2444 2445


2446 2447
#ifndef DBUG_OFF
/*
2448
  Test if disk-cache is ok
unknown's avatar
unknown committed
2449
*/
unknown's avatar
unknown committed
2450
static void test_key_cache(KEY_CACHE *keycache __attribute__((unused)),
unknown's avatar
unknown committed
2451
                           const char *where __attribute__((unused)),
2452 2453 2454
                           my_bool lock __attribute__((unused)))
{
  /* TODO */
2455
}
2456
#endif
unknown's avatar
unknown committed
2457

2458 2459 2460 2461 2462 2463
#if defined(KEYCACHE_TIMEOUT)

#define KEYCACHE_DUMP_FILE  "keycache_dump.txt"
#define MAX_QUEUE_LEN  100


unknown's avatar
unknown committed
2464
static void keycache_dump(KEY_CACHE *keycache)
unknown's avatar
unknown committed
2465
{
2466
  FILE *keycache_dump_file=fopen(KEYCACHE_DUMP_FILE, "w");
unknown's avatar
unknown committed
2467
  struct st_my_thread_var *thread_var= my_thread_var;
2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491
  struct st_my_thread_var *last;
  struct st_my_thread_var *thread;
  BLOCK_LINK *block;
  HASH_LINK *hash_link;
  KEYCACHE_PAGE *page;
  uint i;

  fprintf(keycache_dump_file, "thread:%u\n", thread->id);

  i=0;
  thread=last=waiting_for_hash_link.last_thread;
  fprintf(keycache_dump_file, "queue of threads waiting for hash link\n");
  if (thread)
    do
    {
      thread=thread->next;
      page= (KEYCACHE_PAGE *) thread->opt_info;
      fprintf(keycache_dump_file,
              "thread:%u, (file,filepos)=(%u,%lu)\n",
              thread->id,(uint) page->file,(ulong) page->filepos);
      if (++i == MAX_QUEUE_LEN)
        break;
    }
    while (thread != last);
2492

2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509
  i=0;
  thread=last=waiting_for_block.last_thread;
  fprintf(keycache_dump_file, "queue of threads waiting for block\n");
  if (thread)
    do
    {
      thread=thread->next;
      hash_link= (HASH_LINK *) thread->opt_info;
      fprintf(keycache_dump_file,
        "thread:%u hash_link:%u (file,filepos)=(%u,%lu)\n",
        thread->id, (uint) HASH_LINK_NUMBER(hash_link),
        (uint) hash_link->file,(ulong) hash_link->diskpos);
      if (++i == MAX_QUEUE_LEN)
        break;
    }
    while (thread != last);

unknown's avatar
unknown committed
2510
  for (i=0 ; i< keycache->blocks_used ; i++)
2511 2512
  {
    int j;
unknown's avatar
unknown committed
2513
    block= &keycache->block_root[i];
2514
    hash_link= block->hash_link;
2515 2516 2517 2518 2519 2520 2521
    fprintf(keycache_dump_file,
            "block:%u hash_link:%d status:%x #requests=%u waiting_for_readers:%d\n",
            i, (int) (hash_link ? HASH_LINK_NUMBER(hash_link) : -1),
            block->status, block->requests, block->condvar ? 1 : 0);
    for (j=0 ; j < 2; j++)
    {
      KEYCACHE_WQUEUE *wqueue=&block->wqueue[j];
2522
      thread= last= wqueue->last_thread;
2523 2524
      fprintf(keycache_dump_file, "queue #%d\n", j);
      if (thread)
2525
      {
2526 2527 2528 2529 2530 2531 2532 2533 2534
        do
        {
          thread=thread->next;
          fprintf(keycache_dump_file,
                  "thread:%u\n", thread->id);
          if (++i == MAX_QUEUE_LEN)
            break;
        }
        while (thread != last);
2535
      }
2536 2537 2538
    }
  }
  fprintf(keycache_dump_file, "LRU chain:");
unknown's avatar
unknown committed
2539
  block= keycache= used_last;
2540
  if (block)
2541
  {
2542 2543
    do
    {
unknown's avatar
unknown committed
2544
      block= block->next_used;
2545 2546 2547
      fprintf(keycache_dump_file,
              "block:%u, ", BLOCK_NUMBER(block));
    }
unknown's avatar
unknown committed
2548
    while (block != keycache->used_last);
2549
  }
2550
  fprintf(keycache_dump_file, "\n");
2551

2552
  fclose(keycache_dump_file);
unknown's avatar
unknown committed
2553 2554
}

2555
#endif /* defined(KEYCACHE_TIMEOUT) */
unknown's avatar
unknown committed
2556

2557
#if defined(KEYCACHE_TIMEOUT) && !defined(__WIN__)
unknown's avatar
unknown committed
2558 2559


2560
static int keycache_pthread_cond_wait(pthread_cond_t *cond,
2561 2562 2563 2564 2565 2566 2567 2568 2569
                                      pthread_mutex_t *mutex)
{
  int rc;
  struct timeval  now;            /* time when we started waiting        */
  struct timespec timeout;        /* timeout value for the wait function */
  struct timezone tz;
#if defined(KEYCACHE_DEBUG)
  int cnt=0;
#endif
2570 2571

  /* Get current time */
2572 2573
  gettimeofday(&now, &tz);
  /* Prepare timeout value */
2574 2575
  timeout.tv_sec= now.tv_sec + KEYCACHE_TIMEOUT;
  timeout.tv_nsec= now.tv_usec * 1000; /* timeval uses microseconds.         */
2576 2577 2578 2579 2580 2581 2582 2583 2584
                                        /* timespec uses nanoseconds.         */
                                        /* 1 nanosecond = 1000 micro seconds. */
  KEYCACHE_THREAD_TRACE_END("started waiting");
#if defined(KEYCACHE_DEBUG)
  cnt++;
  if (cnt % 100 == 0)
    fprintf(keycache_debug_log, "waiting...\n");
    fflush(keycache_debug_log);
#endif
2585
  rc= pthread_cond_timedwait(cond, mutex, &timeout);
2586 2587 2588
  KEYCACHE_THREAD_TRACE_BEGIN("finished waiting");
#if defined(KEYCACHE_DEBUG)
  if (rc == ETIMEDOUT)
2589
  {
2590 2591 2592 2593 2594
    fprintf(keycache_debug_log,"aborted by keycache timeout\n");
    fclose(keycache_debug_log);
    abort();
  }
#endif
2595

2596 2597
  if (rc == ETIMEDOUT)
    keycache_dump();
2598

2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612
#if defined(KEYCACHE_DEBUG)
  KEYCACHE_DBUG_ASSERT(rc != ETIMEDOUT);
#else
  assert(rc != ETIMEDOUT);
#endif
  return rc;
}
#else
#if defined(KEYCACHE_DEBUG)
static int keycache_pthread_cond_wait(pthread_cond_t *cond,
                                      pthread_mutex_t *mutex)
{
  int rc;
  KEYCACHE_THREAD_TRACE_END("started waiting");
2613
  rc= pthread_cond_wait(cond, mutex);
2614 2615 2616 2617 2618
  KEYCACHE_THREAD_TRACE_BEGIN("finished waiting");
  return rc;
}
#endif
#endif /* defined(KEYCACHE_TIMEOUT) && !defined(__WIN__) */
unknown's avatar
unknown committed
2619

2620
#if defined(KEYCACHE_DEBUG)
unknown's avatar
unknown committed
2621 2622


2623
static int keycache_pthread_mutex_lock(pthread_mutex_t *mutex)
unknown's avatar
unknown committed
2624
{
2625
  int rc;
2626
  rc= pthread_mutex_lock(mutex);
2627 2628
  KEYCACHE_THREAD_TRACE_BEGIN("");
  return rc;
unknown's avatar
unknown committed
2629
}
unknown's avatar
unknown committed
2630 2631


2632 2633 2634 2635 2636
static void keycache_pthread_mutex_unlock(pthread_mutex_t *mutex)
{
  KEYCACHE_THREAD_TRACE_END("");
  pthread_mutex_unlock(mutex);
}
unknown's avatar
unknown committed
2637 2638


2639
static int keycache_pthread_cond_signal(pthread_cond_t *cond)
unknown's avatar
unknown committed
2640
{
2641 2642
  int rc;
  KEYCACHE_THREAD_TRACE("signal");
2643
  rc= pthread_cond_signal(cond);
2644 2645
  return rc;
}
unknown's avatar
unknown committed
2646 2647


2648 2649 2650 2651
static int keycache_pthread_cond_broadcast(pthread_cond_t *cond)
{
  int rc;
  KEYCACHE_THREAD_TRACE("signal");
2652
  rc= pthread_cond_broadcast(cond);
2653 2654
  return rc;
}
unknown's avatar
unknown committed
2655

2656
#if defined(KEYCACHE_DEBUG_LOG)
unknown's avatar
unknown committed
2657 2658


2659 2660 2661 2662 2663
static void keycache_debug_print(const char * fmt,...)
{
  va_list args;
  va_start(args,fmt);
  if (keycache_debug_log)
unknown's avatar
unknown committed
2664
  {
2665 2666
    VOID(vfprintf(keycache_debug_log, fmt, args));
    VOID(fputc('\n',keycache_debug_log));
unknown's avatar
unknown committed
2667
  }
2668 2669 2670
  va_end(args);
}
#endif /* defined(KEYCACHE_DEBUG_LOG) */
unknown's avatar
unknown committed
2671

2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682
#if defined(KEYCACHE_DEBUG_LOG)


void keycache_debug_log_close(void)
{
  if (keycache_debug_log)
    fclose(keycache_debug_log);
}
#endif /* defined(KEYCACHE_DEBUG_LOG) */

#endif /* defined(KEYCACHE_DEBUG) */