/* Copyright (c) 2005 PrimeBase Technologies GmbH, Germany
 *
 * PrimeBase XT
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 *
 * 2005-05-24	Paul McCullagh
 *
 * H&G2JCtL
 */

#include "xt_config.h"

#ifdef DRIZZLED
#include <bitset>
#endif

#ifndef XT_WIN
#include <unistd.h>
#endif

#include <stdio.h>
#include <time.h>

#include "pthread_xt.h"
#include "thread_xt.h"
#include "filesys_xt.h"
#include "cache_xt.h"
#include "table_xt.h"
#include "trace_xt.h"
#include "util_xt.h"

#define XT_TIME_DIFF(start, now) (\
	((xtWord4) (now) < (xtWord4) (start)) ? \
	((xtWord4) 0XFFFFFFFF - ((xtWord4) (start) - (xtWord4) (now))) : \
	((xtWord4) (now) - (xtWord4) (start)))

/*
 * -----------------------------------------------------------------------
 * D I S K   C A C H E
 */

#define IDX_CAC_SEGMENT_COUNT		((off_t) 1 << XT_INDEX_CACHE_SEGMENT_SHIFTS)
#define IDX_CAC_SEGMENT_MASK		(IDX_CAC_SEGMENT_COUNT - 1)

#ifdef XT_NO_ATOMICS
#define IDX_CAC_USE_PTHREAD_RW
#else
//#define IDX_CAC_USE_RWMUTEX
//#define IDX_CAC_USE_PTHREAD_RW
//#define IDX_USE_SPINXSLOCK
#define IDX_CAC_USE_XSMUTEX
#endif

#ifdef IDX_CAC_USE_XSMUTEX
#define IDX_CAC_LOCK_TYPE				XTXSMutexRec
#define IDX_CAC_INIT_LOCK(s, i)			xt_xsmutex_init_with_autoname(s, &(i)->cs_lock)
#define IDX_CAC_FREE_LOCK(s, i)			xt_xsmutex_free(s, &(i)->cs_lock)	
#define IDX_CAC_READ_LOCK(i, o)			xt_xsmutex_slock(&(i)->cs_lock, (o)->t_id)
#define IDX_CAC_WRITE_LOCK(i, o)		xt_xsmutex_xlock(&(i)->cs_lock, (o)->t_id)
#define IDX_CAC_UNLOCK(i, o)			xt_xsmutex_unlock(&(i)->cs_lock, (o)->t_id)
#elif defined(IDX_CAC_USE_PTHREAD_RW)
#define IDX_CAC_LOCK_TYPE				xt_rwlock_type
#define IDX_CAC_INIT_LOCK(s, i)			xt_init_rwlock(s, &(i)->cs_lock)
#define IDX_CAC_FREE_LOCK(s, i)			xt_free_rwlock(&(i)->cs_lock)	
#define IDX_CAC_READ_LOCK(i, o)			xt_slock_rwlock_ns(&(i)->cs_lock)
#define IDX_CAC_WRITE_LOCK(i, o)		xt_xlock_rwlock_ns(&(i)->cs_lock)
#define IDX_CAC_UNLOCK(i, o)			xt_unlock_rwlock_ns(&(i)->cs_lock)
#elif defined(IDX_CAC_USE_RWMUTEX)
#define IDX_CAC_LOCK_TYPE				XTRWMutexRec
#define IDX_CAC_INIT_LOCK(s, i)			xt_rwmutex_init_with_autoname(s, &(i)->cs_lock)
#define IDX_CAC_FREE_LOCK(s, i)			xt_rwmutex_free(s, &(i)->cs_lock)	
#define IDX_CAC_READ_LOCK(i, o)			xt_rwmutex_slock(&(i)->cs_lock, (o)->t_id)
#define IDX_CAC_WRITE_LOCK(i, o)		xt_rwmutex_xlock(&(i)->cs_lock, (o)->t_id)
#define IDX_CAC_UNLOCK(i, o)			xt_rwmutex_unlock(&(i)->cs_lock, (o)->t_id)
#elif defined(IDX_CAC_USE_SPINXSLOCK)
#define IDX_CAC_LOCK_TYPE				XTSpinXSLockRec
#define IDX_CAC_INIT_LOCK(s, i)			xt_spinxslock_init_with_autoname(s, &(i)->cs_lock)
#define IDX_CAC_FREE_LOCK(s, i)			xt_spinxslock_free(s, &(i)->cs_lock)	
#define IDX_CAC_READ_LOCK(i, s)			xt_spinxslock_slock(&(i)->cs_lock, (s)->t_id)
#define IDX_CAC_WRITE_LOCK(i, s)		xt_spinxslock_xlock(&(i)->cs_lock, (s)->t_id)
#define IDX_CAC_UNLOCK(i, s)			xt_spinxslock_unlock(&(i)->cs_lock, (s)->t_id)
#endif

#define ID_HANDLE_USE_SPINLOCK
//#define ID_HANDLE_USE_PTHREAD_RW

#if defined(ID_HANDLE_USE_PTHREAD_RW)
#define ID_HANDLE_LOCK_TYPE				xt_mutex_type
#define ID_HANDLE_INIT_LOCK(s, i)		xt_init_mutex_with_autoname(s, i)
#define ID_HANDLE_FREE_LOCK(s, i)		xt_free_mutex(i)	
#define ID_HANDLE_LOCK(i)				xt_lock_mutex_ns(i)
#define ID_HANDLE_UNLOCK(i)				xt_unlock_mutex_ns(i)
#elif defined(ID_HANDLE_USE_SPINLOCK)
#define ID_HANDLE_LOCK_TYPE				XTSpinLockRec
#define ID_HANDLE_INIT_LOCK(s, i)		xt_spinlock_init_with_autoname(s, i)
#define ID_HANDLE_FREE_LOCK(s, i)		xt_spinlock_free(s, i)	
#define ID_HANDLE_LOCK(i)				xt_spinlock_lock(i)
#define ID_HANDLE_UNLOCK(i)				xt_spinlock_unlock(i)
#endif

#define XT_HANDLE_SLOTS					37

/*
#ifdef DEBUG
#define XT_INIT_HANDLE_COUNT			0
#define XT_INIT_HANDLE_BLOCKS			0
#else
#define XT_INIT_HANDLE_COUNT			40
#define XT_INIT_HANDLE_BLOCKS			10
#endif
*/

/* A disk cache segment. The cache is divided into a number of segments
 * to improve concurrency.
 */
typedef struct DcSegment {
	IDX_CAC_LOCK_TYPE	cs_lock;						/* The cache segment lock. */
	XTIndBlockPtr		*cs_hash_table;
} DcSegmentRec, *DcSegmentPtr;

typedef struct DcHandleSlot {
	ID_HANDLE_LOCK_TYPE	hs_handles_lock;
	XTIndHandleBlockPtr	hs_free_blocks;
	XTIndHandlePtr		hs_free_handles;
	XTIndHandlePtr		hs_used_handles;
} DcHandleSlotRec, *DcHandleSlotPtr;

typedef struct DcGlobals {
	xt_mutex_type		cg_lock;						/* The public cache lock. */
	DcSegmentRec		cg_segment[IDX_CAC_SEGMENT_COUNT];
	XTIndBlockPtr		cg_blocks;
#ifdef XT_USE_DIRECT_IO_ON_INDEX
	xtWord1				*cg_buffer;
#endif
	XTIndBlockPtr		cg_free_list;
	xtWord4				cg_free_count;
	xtWord4				cg_ru_now;						/* A counter as described by Jim Starkey (my thanks) */
	XTIndBlockPtr		cg_lru_block;
	XTIndBlockPtr		cg_mru_block;
	xtWord4				cg_hash_size;
	xtWord4				cg_block_count;
	xtWord4				cg_max_free;
#ifdef DEBUG_CHECK_IND_CACHE
	u_int				cg_reserved_by_ots;				/* Number of blocks reserved by open tables. */
	u_int				cg_read_count;					/* Number of blocks being read. */
#endif

	/* Index cache handles: */
	DcHandleSlotRec		cg_handle_slot[XT_HANDLE_SLOTS];
} DcGlobalsRec;

static DcGlobalsRec	ind_cac_globals;

#ifdef XT_USE_MYSYS
#ifdef xtPublic
#undef xtPublic
#endif
#include "my_global.h"
#include "my_sys.h"
#include "keycache.h"
KEY_CACHE my_cache;
#undef	pthread_rwlock_rdlock
#undef	pthread_rwlock_wrlock
#undef	pthread_rwlock_unlock
#undef	pthread_mutex_lock
#undef	pthread_mutex_unlock
#undef	pthread_cond_wait
#undef	pthread_cond_broadcast
#undef	xt_mutex_type
#define xtPublic
#endif

/*
 * -----------------------------------------------------------------------
 * INDEX CACHE HANDLES
 */

static XTIndHandlePtr ind_alloc_handle()
{
	XTIndHandlePtr handle;

	if (!(handle = (XTIndHandlePtr) xt_calloc_ns(sizeof(XTIndHandleRec))))
		return NULL;
	xt_spinlock_init_with_autoname(NULL, &handle->ih_lock);
	return handle;
}

static void ind_free_handle(XTIndHandlePtr handle)
{
	xt_spinlock_free(NULL, &handle->ih_lock);
	xt_free_ns(handle);
}

static void ind_handle_exit(XTThreadPtr self)
{
	DcHandleSlotPtr		hs;
	XTIndHandlePtr		handle;
	XTIndHandleBlockPtr	hptr;

	for (int i=0; i<XT_HANDLE_SLOTS; i++) {
		hs = &ind_cac_globals.cg_handle_slot[i];

		while (hs->hs_used_handles) {
			handle = hs->hs_used_handles;
			xt_ind_release_handle(handle, FALSE, self);
		}

		while (hs->hs_free_blocks) {
			hptr = hs->hs_free_blocks;
			hs->hs_free_blocks = hptr->hb_next;
			xt_free(self, hptr);
		}

		while (hs->hs_free_handles) {
			handle = hs->hs_free_handles;
			hs->hs_free_handles = handle->ih_next;
			ind_free_handle(handle);
		}

		ID_HANDLE_FREE_LOCK(self, &hs->hs_handles_lock);
	}
}

static void ind_handle_init(XTThreadPtr self)
{
	DcHandleSlotPtr		hs;

	for (int i=0; i<XT_HANDLE_SLOTS; i++) {
		hs = &ind_cac_globals.cg_handle_slot[i];
		memset(hs, 0, sizeof(DcHandleSlotRec));
		ID_HANDLE_INIT_LOCK(self, &hs->hs_handles_lock);
	}
}

//#define CHECK_HANDLE_STRUCTS

#ifdef CHECK_HANDLE_STRUCTS
static int gdummy = 0;

static void ic_stop_here()
{
	gdummy = gdummy + 1;
	printf("Nooo %d!\n", gdummy);
}

static void ic_check_handle_structs()
{
	XTIndHandlePtr		handle, phandle;
	XTIndHandleBlockPtr	hptr, phptr;
	int					count = 0;
	int					ctest;

	phandle = NULL;
	handle = ind_cac_globals.cg_used_handles;
	while (handle) {
		if (handle == phandle)
			ic_stop_here();
		if (handle->ih_prev != phandle)
			ic_stop_here();
		if (handle->ih_cache_reference) {
			ctest = handle->x.ih_cache_block->cb_handle_count;
			if (ctest == 0 || ctest > 100)
				ic_stop_here();
		}
		else {
			ctest = handle->x.ih_handle_block->hb_ref_count;
			if (ctest == 0 || ctest > 100)
				ic_stop_here();
		}
		phandle = handle;
		handle = handle->ih_next;
		count++;
		if (count > 1000)
			ic_stop_here();
	}

	count = 0;
	hptr = ind_cac_globals.cg_free_blocks;
	while (hptr) {
		if (hptr == phptr)
			ic_stop_here();
		phptr = hptr;
		hptr = hptr->hb_next;
		count++;
		if (count > 1000)
			ic_stop_here();
	}

	count = 0;
	handle = ind_cac_globals.cg_free_handles;
	while (handle) {
		if (handle == phandle)
			ic_stop_here();
		phandle = handle;
		handle = handle->ih_next;
		count++;
		if (count > 1000)
			ic_stop_here();
	}
}
#endif

/*
 * Get a handle to the index block.
 * This function is called by index scanners (readers).
 */
xtPublic XTIndHandlePtr xt_ind_get_handle(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref)
{
	DcHandleSlotPtr	hs;
	XTIndHandlePtr	handle;

	hs = &ind_cac_globals.cg_handle_slot[iref->ir_block->cb_address % XT_HANDLE_SLOTS];

	ASSERT_NS(iref->ir_xlock == FALSE);
	ASSERT_NS(iref->ir_updated == FALSE);
	ID_HANDLE_LOCK(&hs->hs_handles_lock);
#ifdef CHECK_HANDLE_STRUCTS
	ic_check_handle_structs();
#endif
	if ((handle = hs->hs_free_handles))
		hs->hs_free_handles = handle->ih_next;
	else {
		if (!(handle = ind_alloc_handle())) {
			ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
			xt_ind_release(ot, ind, XT_UNLOCK_READ, iref);
			return NULL;
		}
	}
	if (hs->hs_used_handles)
		hs->hs_used_handles->ih_prev = handle;
	handle->ih_next = hs->hs_used_handles;
	handle->ih_prev = NULL;
	handle->ih_address = iref->ir_block->cb_address;
	handle->ih_cache_reference = TRUE;
	handle->x.ih_cache_block = iref->ir_block;
	handle->ih_branch = iref->ir_branch;
	/* {HANDLE-COUNT-USAGE}
	 * This is safe because:
	 *
	 * I have an Slock on the cache block, and I have
	 * at least an Slock on the index.
	 * So this excludes anyone who is reading 
	 * cb_handle_count in the index.
	 * (all cache block writers, and the freeer).
	 *
	 * The increment is safe because I have the list
	 * lock (hs_handles_lock), which is required by anyone else
	 * who increments or decrements this value.
	 */
	iref->ir_block->cb_handle_count++;
	hs->hs_used_handles = handle;
#ifdef CHECK_HANDLE_STRUCTS
	ic_check_handle_structs();
#endif
	ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
	xt_ind_release(ot, ind, XT_UNLOCK_READ, iref);
	return handle;
}

xtPublic void xt_ind_release_handle(XTIndHandlePtr handle, xtBool have_lock, XTThreadPtr thread)
{
	DcHandleSlotPtr	hs;
	XTIndBlockPtr	block = NULL;
	u_int		hash_idx = NULL;
	DcSegmentPtr	seg = NULL;
	XTIndBlockPtr	xblock;

	/* The lock order is:
	 * 1. Cache segment (cs_lock) - This is only by ind_free_block()!
	 * 1. S/Slock cache block (cb_lock)
	 * 2. List lock (cg_handles_lock).
	 * 3. Handle lock (ih_lock)
	 */
	if (!have_lock)
		xt_spinlock_lock(&handle->ih_lock);

	/* Get the lock on the cache page if required: */
	if (handle->ih_cache_reference) {
		u_int			file_id;
		xtIndexNodeID	address;

		block = handle->x.ih_cache_block;

		file_id = block->cb_file_id;
		address = block->cb_address;
		hash_idx = XT_NODE_ID(address) + (file_id * 223);
		seg = &ind_cac_globals.cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
		hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % ind_cac_globals.cg_hash_size;
	}

	xt_spinlock_unlock(&handle->ih_lock);

	/* Because of the lock order, I have to release the
	 * handle before I get a lock on the cache block.
	 *
	 * But, by doing this, thie cache block may be gone!
	 */
	if (block) {
		IDX_CAC_READ_LOCK(seg, thread);
		xblock = seg->cs_hash_table[hash_idx];
		while (xblock) {
			if (block == xblock) {
				/* Found the block... 
				 * {HANDLE-COUNT-SLOCK}
				 * 04.05.2009, changed to slock.
				 */
				XT_IPAGE_READ_LOCK(&block->cb_lock);
				goto block_found;
			}
			xblock = xblock->cb_next;
		}
		block = NULL;
		block_found:
		IDX_CAC_UNLOCK(seg, thread);
	}

	hs = &ind_cac_globals.cg_handle_slot[handle->ih_address % XT_HANDLE_SLOTS];

	ID_HANDLE_LOCK(&hs->hs_handles_lock);
#ifdef CHECK_HANDLE_STRUCTS
	ic_check_handle_structs();
#endif

	/* I don't need to lock the handle because I have locked
	 * the list, and no other thread can change the
	 * handle without first getting a lock on the list.
	 *
	 * In addition, the caller is the only owner of the
	 * handle, and the only thread with an independent
	 * reference to the handle.
	 * All other access occur over the list.
	 */

	/* Remove the reference to the cache or a handle block: */
	if (handle->ih_cache_reference) {
		ASSERT_NS(block == handle->x.ih_cache_block);
		ASSERT_NS(block && block->cb_handle_count > 0);
		/* {HANDLE-COUNT-USAGE}
		 * This is safe here because I have excluded
		 * all readers by taking an Xlock on the
		 * cache block (CHANGED - see below).
		 *
		 * {HANDLE-COUNT-SLOCK}
		 * 04.05.2009, changed to slock.
		 * Should be OK, because:
		 * A have a lock on the list lock (hs_handles_lock),
		 * which prevents concurrent updates to cb_handle_count.
		 *
		 * I have also have a read lock on the cache block
		 * but not a lock on the index. As a result, we cannot
		 * excluded all index writers (and readers of 
		 * cb_handle_count.
		 */
		block->cb_handle_count--;
	}
	else {
		XTIndHandleBlockPtr	hptr = handle->x.ih_handle_block;

		ASSERT_NS(!handle->ih_cache_reference);
		ASSERT_NS(hptr->hb_ref_count > 0);
		hptr->hb_ref_count--;
		if (!hptr->hb_ref_count) {
			/* Put it back on the free list: */
			hptr->hb_next = hs->hs_free_blocks;
			hs->hs_free_blocks = hptr;
		}
	}

	/* Unlink the handle: */
	if (handle->ih_next)
		handle->ih_next->ih_prev = handle->ih_prev;
	if (handle->ih_prev)
		handle->ih_prev->ih_next = handle->ih_next;
	if (hs->hs_used_handles == handle)
		hs->hs_used_handles = handle->ih_next;

	/* Put it on the free list: */
	handle->ih_next = hs->hs_free_handles;
	hs->hs_free_handles = handle;

#ifdef CHECK_HANDLE_STRUCTS
	ic_check_handle_structs();
#endif
	ID_HANDLE_UNLOCK(&hs->hs_handles_lock);

	if (block)
		XT_IPAGE_UNLOCK(&block->cb_lock, FALSE);
}

/* Call this function before a referenced cache block is modified!
 * This function is called by index updaters.
 */
xtPublic xtBool xt_ind_copy_on_write(XTIndReferencePtr iref)
{
	DcHandleSlotPtr		hs;
	XTIndHandleBlockPtr	hptr;
	u_int				branch_size;
	XTIndHandlePtr		handle;
	u_int				i = 0;

	hs = &ind_cac_globals.cg_handle_slot[iref->ir_block->cb_address % XT_HANDLE_SLOTS];

	ID_HANDLE_LOCK(&hs->hs_handles_lock);

	/* {HANDLE-COUNT-USAGE}
	 * This is only called by updaters of this index block, or
	 * the free which holds an Xlock on the index block.
	 * These are all mutually exclusive for the index block.
	 *
	 * {HANDLE-COUNT-SLOCK}
	 * Do this check again, after we have the list lock (hs_handles_lock).
	 * There is a small chance that the count has changed, since we last
	 * checked because xt_ind_release_handle() only holds
	 * an slock on the index page.
	 *
	 * An updater can sometimes have a XLOCK on the index and an slock
	 * on the cache block. In this case xt_ind_release_handle()
	 * could have run through.
	 */
	if (!iref->ir_block->cb_handle_count) {
		ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
		return OK;
	}

#ifdef CHECK_HANDLE_STRUCTS
	ic_check_handle_structs();
#endif
	if ((hptr = hs->hs_free_blocks))
		hs->hs_free_blocks = hptr->hb_next;
	else {
		if (!(hptr = (XTIndHandleBlockPtr) xt_malloc_ns(sizeof(XTIndHandleBlockRec)))) {
			ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
			return FAILED;
		}
	}

	branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref->ir_branch->tb_size_2));
	memcpy(&hptr->hb_branch, iref->ir_branch, branch_size);
	hptr->hb_ref_count = iref->ir_block->cb_handle_count;

	handle = hs->hs_used_handles;
	while (handle) {
		if (handle->ih_branch == iref->ir_branch) {
			i++;
			xt_spinlock_lock(&handle->ih_lock);
			ASSERT_NS(handle->ih_cache_reference);
			handle->ih_cache_reference = FALSE;
			handle->x.ih_handle_block = hptr;
			handle->ih_branch = &hptr->hb_branch;
			xt_spinlock_unlock(&handle->ih_lock);
#ifndef DEBUG
			if (i == hptr->hb_ref_count)
				break;
#endif
		}
		handle = handle->ih_next;
	}
#ifdef DEBUG
	ASSERT_NS(hptr->hb_ref_count == i);
#endif
	/* {HANDLE-COUNT-USAGE}
	 * It is safe to modify cb_handle_count when I have the
	 * list lock, and I have excluded all readers!
	 */
	iref->ir_block->cb_handle_count = 0;
#ifdef CHECK_HANDLE_STRUCTS
	ic_check_handle_structs();
#endif
	ID_HANDLE_UNLOCK(&hs->hs_handles_lock);

	return OK;
}

xtPublic void xt_ind_lock_handle(XTIndHandlePtr handle)
{
	xt_spinlock_lock(&handle->ih_lock);
}

xtPublic void xt_ind_unlock_handle(XTIndHandlePtr handle)
{
	xt_spinlock_unlock(&handle->ih_lock);
}

/*
 * -----------------------------------------------------------------------
 * INIT/EXIT
 */

/*
 * Initialize the disk cache.
 */
xtPublic void xt_ind_init(XTThreadPtr self, size_t cache_size)
{
	XTIndBlockPtr	block;

#ifdef XT_USE_MYSYS
	init_key_cache(&my_cache, 1024, cache_size, 100, 300);
#endif
	/* Memory is devoted to the page data alone, I no longer count the size of the directory,
	 * or the page overhead: */
	ind_cac_globals.cg_block_count = cache_size / XT_INDEX_PAGE_SIZE;
	ind_cac_globals.cg_hash_size = ind_cac_globals.cg_block_count / (IDX_CAC_SEGMENT_COUNT >> 1);
	ind_cac_globals.cg_max_free = ind_cac_globals.cg_block_count / 10;
	if (ind_cac_globals.cg_max_free < 8)
		ind_cac_globals.cg_max_free = 8;
	if (ind_cac_globals.cg_max_free > 128)
		ind_cac_globals.cg_max_free = 128;

	try_(a) {
		for (u_int i=0; i<IDX_CAC_SEGMENT_COUNT; i++) {
			ind_cac_globals.cg_segment[i].cs_hash_table = (XTIndBlockPtr *) xt_calloc(self, ind_cac_globals.cg_hash_size * sizeof(XTIndBlockPtr));
			IDX_CAC_INIT_LOCK(self, &ind_cac_globals.cg_segment[i]);
		}

		block = (XTIndBlockPtr) xt_malloc(self, ind_cac_globals.cg_block_count * sizeof(XTIndBlockRec));
		ind_cac_globals.cg_blocks = block;
		xt_init_mutex_with_autoname(self, &ind_cac_globals.cg_lock);
#ifdef XT_USE_DIRECT_IO_ON_INDEX
		xtWord1 *buffer;
#ifdef XT_WIN
		size_t	psize = 512;
#else
		size_t	psize = getpagesize();
#endif
		size_t	diff;

		buffer = (xtWord1 *) xt_malloc(self, (ind_cac_globals.cg_block_count * XT_INDEX_PAGE_SIZE));
		diff = (size_t) buffer % psize;
		if (diff != 0) {
			xt_free(self, buffer);
			buffer = (xtWord1 *) xt_malloc(self, (ind_cac_globals.cg_block_count * XT_INDEX_PAGE_SIZE) + psize);
			diff = (size_t) buffer % psize;
			if (diff != 0)
				diff = psize - diff;
		}
		ind_cac_globals.cg_buffer = buffer;
		buffer += diff;
#endif

		for (u_int i=0; i<ind_cac_globals.cg_block_count; i++) {
			XT_IPAGE_INIT_LOCK(self, &block->cb_lock);
			block->cb_state = IDX_CAC_BLOCK_FREE;
			block->cb_next = ind_cac_globals.cg_free_list;
#ifdef XT_USE_DIRECT_IO_ON_INDEX
			block->cb_data = buffer;
			buffer += XT_INDEX_PAGE_SIZE;
#endif
			ind_cac_globals.cg_free_list = block;
			block++;
		}
		ind_cac_globals.cg_free_count = ind_cac_globals.cg_block_count;
#ifdef DEBUG_CHECK_IND_CACHE
		ind_cac_globals.cg_reserved_by_ots = 0;
#endif
		ind_handle_init(self);
	}
	catch_(a) {
		xt_ind_exit(self);
		throw_();
	}
	cont_(a);
}

xtPublic void xt_ind_exit(XTThreadPtr self)
{
#ifdef XT_USE_MYSYS
	end_key_cache(&my_cache, 1);
#endif
	for (u_int i=0; i<IDX_CAC_SEGMENT_COUNT; i++) {
		if (ind_cac_globals.cg_segment[i].cs_hash_table) {
			xt_free(self, ind_cac_globals.cg_segment[i].cs_hash_table);
			ind_cac_globals.cg_segment[i].cs_hash_table = NULL;
			IDX_CAC_FREE_LOCK(self, &ind_cac_globals.cg_segment[i]);
		}
	}

	if (ind_cac_globals.cg_blocks) {
		xt_free(self, ind_cac_globals.cg_blocks);
		ind_cac_globals.cg_blocks = NULL;
		xt_free_mutex(&ind_cac_globals.cg_lock);
	}
#ifdef XT_USE_DIRECT_IO_ON_INDEX
	if (ind_cac_globals.cg_buffer) {
		xt_free(self, ind_cac_globals.cg_buffer);
		ind_cac_globals.cg_buffer = NULL;
	}
#endif
	ind_handle_exit(self);

	memset(&ind_cac_globals, 0, sizeof(ind_cac_globals));
}

xtPublic xtInt8 xt_ind_get_usage()
{
	xtInt8 size = 0;

	size = (xtInt8) (ind_cac_globals.cg_block_count - ind_cac_globals.cg_free_count) * (xtInt8) XT_INDEX_PAGE_SIZE;
	return size;
}

xtPublic xtInt8 xt_ind_get_size()
{
	xtInt8 size = 0;

	size = (xtInt8) ind_cac_globals.cg_block_count * (xtInt8) XT_INDEX_PAGE_SIZE;
	return size;
}

/*
 * -----------------------------------------------------------------------
 * INDEX CHECKING
 */

xtPublic void xt_ind_check_cache(XTIndexPtr ind)
{
	XTIndBlockPtr	block;
	u_int			free_count, inuse_count, clean_count;
	xtBool			check_count = FALSE;

	if (ind == (XTIndex *) 1) {
		ind = NULL;
		check_count = TRUE;
	}

	// Check the dirty list:
	if (ind) {
		u_int cnt = 0;

		block = ind->mi_dirty_list;
		while (block) {
			cnt++;
			ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_DIRTY);
			block = block->cb_dirty_next;
		}
		ASSERT_NS(ind->mi_dirty_blocks == cnt);
	}

	xt_lock_mutex_ns(&ind_cac_globals.cg_lock);

	// Check the free list:
	free_count = 0;
	block = ind_cac_globals.cg_free_list;
	while (block) {
		free_count++;
		ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_FREE);
		block = block->cb_next;
	}
	ASSERT_NS(ind_cac_globals.cg_free_count == free_count);

	/* Check the LRU list: */
	XTIndBlockPtr list_block, plist_block;
	
	plist_block = NULL;
	list_block = ind_cac_globals.cg_lru_block;
	if (list_block) {
		ASSERT_NS(ind_cac_globals.cg_mru_block != NULL);
		ASSERT_NS(ind_cac_globals.cg_mru_block->cb_mr_used == NULL);
		ASSERT_NS(list_block->cb_lr_used == NULL);
		inuse_count = 0;
		clean_count = 0;
		while (list_block) {
			inuse_count++;
			ASSERT_NS(list_block->cb_state == IDX_CAC_BLOCK_DIRTY || list_block->cb_state == IDX_CAC_BLOCK_CLEAN);
			if (list_block->cb_state == IDX_CAC_BLOCK_CLEAN)
				clean_count++;
			ASSERT_NS(block != list_block);
			ASSERT_NS(list_block->cb_lr_used == plist_block);
			plist_block = list_block;
			list_block = list_block->cb_mr_used;
		}
		ASSERT_NS(ind_cac_globals.cg_mru_block == plist_block);
	}
	else {
		inuse_count = 0;
		clean_count = 0;
		ASSERT_NS(ind_cac_globals.cg_mru_block == NULL);
	}

#ifdef DEBUG_CHECK_IND_CACHE
	ASSERT_NS(free_count + inuse_count + ind_cac_globals.cg_reserved_by_ots + ind_cac_globals.cg_read_count == ind_cac_globals.cg_block_count);
#endif
	xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);
	if (check_count) {
		/* We have just flushed, check how much is now free/clean. */
		if (free_count + clean_count < 10) {
			/* This could be a problem: */
			printf("Cache very low!\n");
		}
	}
}

#ifdef XXXXDEBUG
static void ind_cac_check_on_dirty_list(DcSegmentPtr seg, XTIndBlockPtr block)
{
	XTIndBlockPtr	list_block, plist_block;
	xtBool		found = FALSE;
	
	plist_block = NULL;
	list_block = seg->cs_dirty_list[block->cb_file_id % XT_INDEX_CACHE_FILE_SLOTS];
	while (list_block) {
		ASSERT_NS(list_block->cb_state == IDX_CAC_BLOCK_DIRTY);
		ASSERT_NS(list_block->cb_dirty_prev == plist_block);
		if (list_block == block)
			found = TRUE;
		plist_block = list_block;
		list_block = list_block->cb_dirty_next;
	}
	ASSERT_NS(found);
}

static void ind_cac_check_dirty_list(DcSegmentPtr seg, XTIndBlockPtr block)
{
	XTIndBlockPtr list_block, plist_block;
	
	for (u_int j=0; j<XT_INDEX_CACHE_FILE_SLOTS; j++) {
		plist_block = NULL;
		list_block = seg->cs_dirty_list[j];
		while (list_block) {
			ASSERT_NS(list_block->cb_state == IDX_CAC_BLOCK_DIRTY);
			ASSERT_NS(block != list_block);
			ASSERT_NS(list_block->cb_dirty_prev == plist_block);
			plist_block = list_block;
			list_block = list_block->cb_dirty_next;
		}
	}
}

#endif

/*
 * -----------------------------------------------------------------------
 * FREEING INDEX CACHE
 */

/*
 * This function return TRUE if the block is freed. 
 * This function returns FALSE if the block cannot be found, or the
 * block is not clean.
 *
 * We also return FALSE if we cannot copy the block to the handle
 * (if this is required). This will be due to out-of-memory!
 */
static xtBool ind_free_block(XTOpenTablePtr ot, XTIndBlockPtr block)
{
	XTIndBlockPtr	xblock, pxblock;
	u_int			hash_idx;
	u_int			file_id;
	xtIndexNodeID	address;
	DcSegmentPtr	seg;

#ifdef DEBUG_CHECK_IND_CACHE
	xt_ind_check_cache(NULL);
#endif
	file_id = block->cb_file_id;
	address = block->cb_address;

	hash_idx = XT_NODE_ID(address) + (file_id * 223);
	seg = &ind_cac_globals.cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
	hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % ind_cac_globals.cg_hash_size;

	IDX_CAC_WRITE_LOCK(seg, ot->ot_thread);

	pxblock = NULL;
	xblock = seg->cs_hash_table[hash_idx];
	while (xblock) {
		if (block == xblock) {
			/* Found the block... */
			XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
			if (block->cb_state != IDX_CAC_BLOCK_CLEAN) {
				/* This block cannot be freeed: */
				XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
				IDX_CAC_UNLOCK(seg, ot->ot_thread);
#ifdef DEBUG_CHECK_IND_CACHE
				xt_ind_check_cache(NULL);
#endif
				return FALSE;
			}
			
			goto free_the_block;
		}
		pxblock = xblock;
		xblock = xblock->cb_next;
	}

	IDX_CAC_UNLOCK(seg, ot->ot_thread);

	/* Not found (this can happen, if block was freed by another thread) */
#ifdef DEBUG_CHECK_IND_CACHE
	xt_ind_check_cache(NULL);
#endif
	return FALSE;

	free_the_block:

	/* If the block is reference by a handle, then we
	 * have to copy the data to the handle before we
	 * free the page:
	 */
	/* {HANDLE-COUNT-USAGE}
	 * This access is safe because:
	 *
	 * We have an Xlock on the cache block, which excludes
	 * all other writers that want to change the cache block
	 * and also all readers of the cache block, because
	 * they all have at least an Slock on the cache block.
	 */
	if (block->cb_handle_count) {
		XTIndReferenceRec	iref;
		
		iref.ir_xlock = TRUE;
		iref.ir_updated = FALSE;
		iref.ir_block = block;
		iref.ir_branch = (XTIdxBranchDPtr) block->cb_data;
		if (!xt_ind_copy_on_write(&iref)) {
			XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
			return FALSE;
		}
	}

	/* Block is clean, remove from the hash table: */
	if (pxblock)
		pxblock->cb_next = block->cb_next;
	else
		seg->cs_hash_table[hash_idx] = block->cb_next;

	xt_lock_mutex_ns(&ind_cac_globals.cg_lock);

	/* Remove from the MRU list: */
	if (ind_cac_globals.cg_lru_block == block)
		ind_cac_globals.cg_lru_block = block->cb_mr_used;
	if (ind_cac_globals.cg_mru_block == block)
		ind_cac_globals.cg_mru_block = block->cb_lr_used;
	
	/* Note, I am updating blocks for which I have no lock
	 * here. But I think this is OK because I have a lock
	 * for the MRU list.
	 */
	if (block->cb_lr_used)
		block->cb_lr_used->cb_mr_used = block->cb_mr_used;
	if (block->cb_mr_used)
		block->cb_mr_used->cb_lr_used = block->cb_lr_used;

	/* The block is now free: */
	block->cb_next = ind_cac_globals.cg_free_list;
	ind_cac_globals.cg_free_list = block;
	ind_cac_globals.cg_free_count++;
	block->cb_state = IDX_CAC_BLOCK_FREE;
	IDX_TRACE("%d- f%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(block->cb_data));

	/* Unlock BEFORE the block is reused! */
	XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);

	xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);

	IDX_CAC_UNLOCK(seg, ot->ot_thread);

#ifdef DEBUG_CHECK_IND_CACHE
	xt_ind_check_cache(NULL);
#endif
	return TRUE;
}

#define IND_CACHE_MAX_BLOCKS_TO_FREE		100

/*
 * Return the number of blocks freed.
 *
 * The idea is to grab a list of blocks to free.
 * The list consists of the LRU blocks that are
 * clean.
 *
 * Free as many as possible (up to max of blocks_required)
 * from the list, even if LRU position has changed
 * (or we have a race if there are too few blocks).
 * However, if the block cannot be found, or is dirty
 * we must skip it.
 *
 * Repeat until we find no blocks for the list, or
 * we have freed 'blocks_required'.
 *
 * 'not_this' is a block that must not be freed because
 * it is locked by the calling thread!
 */
static u_int ind_cac_free_lru_blocks(XTOpenTablePtr ot, u_int blocks_required, XTIdxBranchDPtr not_this)
{
	register DcGlobalsRec	*dcg = &ind_cac_globals;
	XTIndBlockPtr			to_free[IND_CACHE_MAX_BLOCKS_TO_FREE];
	int						count;
	XTIndBlockPtr			block;
	u_int					blocks_freed = 0;
	XTIndBlockPtr			locked_block;

#ifdef XT_USE_DIRECT_IO_ON_INDEX
#error This will not work!
#endif
	locked_block = (XTIndBlockPtr) ((xtWord1 *) not_this - offsetof(XTIndBlockRec, cb_data));

	retry:
	xt_lock_mutex_ns(&ind_cac_globals.cg_lock);
	block = dcg->cg_lru_block;
	count = 0;
	while (block && count < IND_CACHE_MAX_BLOCKS_TO_FREE) {
		if (block != locked_block && block->cb_state == IDX_CAC_BLOCK_CLEAN) {
			to_free[count] = block;
			count++;
		}
		block = block->cb_mr_used;
	}
	xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);

	if (!count)
		return blocks_freed;

	for (int i=0; i<count; i++) {
		if (ind_free_block(ot, to_free[i]))
			blocks_freed++;
		if (blocks_freed >= blocks_required &&
			ind_cac_globals.cg_free_count >= ind_cac_globals.cg_max_free + blocks_required)
		return blocks_freed;
	}

	goto retry;
}

/*
 * -----------------------------------------------------------------------
 * MAIN CACHE FUNCTIONS
 */

/*
 * Fetch the block. Note, if we are about to write the block
 * then there is no need to read it from disk!
 */
static XTIndBlockPtr ind_cac_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, DcSegmentPtr *ret_seg, xtBool read_data)
{
	register XTOpenFilePtr	file = ot->ot_ind_file;
	register XTIndBlockPtr	block, new_block;
	register DcSegmentPtr	seg;
	register u_int			hash_idx;
	register DcGlobalsRec	*dcg = &ind_cac_globals;
	size_t					red_size;

#ifdef DEBUG_CHECK_IND_CACHE
	xt_ind_check_cache(NULL);
#endif
	/* Address, plus file ID multiplied by my favorite prime number! */
	hash_idx = XT_NODE_ID(address) + (file->fr_id * 223);
	seg = &dcg->cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
	hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % dcg->cg_hash_size;

	IDX_CAC_READ_LOCK(seg, ot->ot_thread);
	block = seg->cs_hash_table[hash_idx];
	while (block) {
		if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
			ASSERT_NS(block->cb_state != IDX_CAC_BLOCK_FREE);

			/* Check how recently this page has been used: */
			if (XT_TIME_DIFF(block->cb_ru_time, dcg->cg_ru_now) > (dcg->cg_block_count >> 1)) {
				xt_lock_mutex_ns(&dcg->cg_lock);

				/* Move to the front of the MRU list: */
				block->cb_ru_time = ++dcg->cg_ru_now;
				if (dcg->cg_mru_block != block) {
					/* Remove from the MRU list: */
					if (dcg->cg_lru_block == block)
						dcg->cg_lru_block = block->cb_mr_used;
					if (block->cb_lr_used)
						block->cb_lr_used->cb_mr_used = block->cb_mr_used;
					if (block->cb_mr_used)
						block->cb_mr_used->cb_lr_used = block->cb_lr_used;

					/* Make the block the most recently used: */
					if ((block->cb_lr_used = dcg->cg_mru_block))
						dcg->cg_mru_block->cb_mr_used = block;
					block->cb_mr_used = NULL;
					dcg->cg_mru_block = block;
					if (!dcg->cg_lru_block)
						dcg->cg_lru_block = block;
				}

				xt_unlock_mutex_ns(&dcg->cg_lock);
			}
		
			*ret_seg = seg;
#ifdef DEBUG_CHECK_IND_CACHE
			xt_ind_check_cache(NULL);
#endif
			ot->ot_thread->st_statistics.st_ind_cache_hit++;
			return block;
		}
		block = block->cb_next;
	}
	
	/* Block not found... */
	IDX_CAC_UNLOCK(seg, ot->ot_thread);

	/* Check the open table reserve list first: */
	if ((new_block = ot->ot_ind_res_bufs)) {
		ot->ot_ind_res_bufs = new_block->cb_next;
		ot->ot_ind_res_count--;
#ifdef DEBUG_CHECK_IND_CACHE
		xt_lock_mutex_ns(&dcg->cg_lock);
		dcg->cg_reserved_by_ots--;
		dcg->cg_read_count++;
		xt_unlock_mutex_ns(&dcg->cg_lock);
#endif
		goto use_free_block;
	}

	free_some_blocks:
	if (!dcg->cg_free_list) {
		if (!ind_cac_free_lru_blocks(ot, 1, NULL)) {
			if (!dcg->cg_free_list) {
				xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_INDEX_CACHE);
#ifdef DEBUG_CHECK_IND_CACHE
				xt_ind_check_cache(NULL);
#endif
				return NULL;
			}
		}
	}

	/* Get a free block: */
	xt_lock_mutex_ns(&dcg->cg_lock);
	if (!(new_block = dcg->cg_free_list)) {
		xt_unlock_mutex_ns(&dcg->cg_lock);
		goto free_some_blocks;
	}
	ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_FREE);
	dcg->cg_free_list = new_block->cb_next;
	dcg->cg_free_count--;
#ifdef DEBUG_CHECK_IND_CACHE
	dcg->cg_read_count++;
#endif
	xt_unlock_mutex_ns(&dcg->cg_lock);

	use_free_block:
	new_block->cb_address = address;
	new_block->cb_file_id = file->fr_id;
	new_block->cb_state = IDX_CAC_BLOCK_CLEAN;
	new_block->cb_handle_count = 0;
	new_block->cp_flush_seq = 0;
	new_block->cp_del_count = 0;
	new_block->cb_dirty_next = NULL;
	new_block->cb_dirty_prev = NULL;

	if (read_data) {
		if (!xt_pread_file(file, xt_ind_node_to_offset(ot->ot_table, address), XT_INDEX_PAGE_SIZE, 0, new_block->cb_data, &red_size, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread)) {
			xt_lock_mutex_ns(&dcg->cg_lock);
			new_block->cb_next = dcg->cg_free_list;
			dcg->cg_free_list = new_block;
			dcg->cg_free_count++;
#ifdef DEBUG_CHECK_IND_CACHE
			dcg->cg_read_count--;
#endif
			new_block->cb_state = IDX_CAC_BLOCK_FREE;
			IDX_TRACE("%d- F%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
			xt_unlock_mutex_ns(&dcg->cg_lock);
#ifdef DEBUG_CHECK_IND_CACHE
			xt_ind_check_cache(NULL);
#endif
			return NULL;
		}
		IDX_TRACE("%d- R%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
		ot->ot_thread->st_statistics.st_ind_cache_miss++;
	}
	else
		red_size = 0;
	// PMC - I don't think this is required! memset(new_block->cb_data + red_size, 0, XT_INDEX_PAGE_SIZE - red_size);

	IDX_CAC_WRITE_LOCK(seg, ot->ot_thread);
	block = seg->cs_hash_table[hash_idx];
	while (block) {
		if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
			/* Oops, someone else was faster! */
			xt_lock_mutex_ns(&dcg->cg_lock);
			new_block->cb_next = dcg->cg_free_list;
			dcg->cg_free_list = new_block;
			dcg->cg_free_count++;
#ifdef DEBUG_CHECK_IND_CACHE
			dcg->cg_read_count--;
#endif
			new_block->cb_state = IDX_CAC_BLOCK_FREE;
			IDX_TRACE("%d- F%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
			xt_unlock_mutex_ns(&dcg->cg_lock);
			goto done_ok;
		}
		block = block->cb_next;
	}
	block = new_block;

	/* Make the block the most recently used: */
	xt_lock_mutex_ns(&dcg->cg_lock);
	block->cb_ru_time = ++dcg->cg_ru_now;
	if ((block->cb_lr_used = dcg->cg_mru_block))
		dcg->cg_mru_block->cb_mr_used = block;
	block->cb_mr_used = NULL;
	dcg->cg_mru_block = block;
	if (!dcg->cg_lru_block)
		dcg->cg_lru_block = block;
#ifdef DEBUG_CHECK_IND_CACHE
	dcg->cg_read_count--;
#endif
	xt_unlock_mutex_ns(&dcg->cg_lock);

	/* {LAZY-DEL-INDEX-ITEMS}
	 * Conditionally count the number of deleted entries in the index:
	 * We do this before other threads can read the block.
	 */
	if (ind->mi_lazy_delete && read_data)
		xt_ind_count_deleted_items(ot->ot_table, ind, block);

	/* Add to the hash table: */
	block->cb_next = seg->cs_hash_table[hash_idx];
	seg->cs_hash_table[hash_idx] = block;

	done_ok:
	*ret_seg = seg;
#ifdef DEBUG_CHECK_IND_CACHE
	xt_ind_check_cache(NULL);
#endif
	return block;
}

static xtBool ind_cac_get(XTOpenTablePtr ot, xtIndexNodeID address, DcSegmentPtr *ret_seg, XTIndBlockPtr *ret_block)
{
	register XTOpenFilePtr	file = ot->ot_ind_file;
	register XTIndBlockPtr	block;
	register DcSegmentPtr	seg;
	register u_int			hash_idx;
	register DcGlobalsRec	*dcg = &ind_cac_globals;

	hash_idx = XT_NODE_ID(address) + (file->fr_id * 223);
	seg = &dcg->cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
	hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % dcg->cg_hash_size;

	IDX_CAC_READ_LOCK(seg, ot->ot_thread);
	block = seg->cs_hash_table[hash_idx];
	while (block) {
		if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
			ASSERT_NS(block->cb_state != IDX_CAC_BLOCK_FREE);

			*ret_seg = seg;
			*ret_block = block;
			return OK;
		}
		block = block->cb_next;
	}
	IDX_CAC_UNLOCK(seg, ot->ot_thread);
	
	/* Block not found: */
	*ret_seg = NULL;
	*ret_block = NULL;
	return OK;
}

xtPublic xtBool xt_ind_write(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, size_t size, xtWord1 *data)
{
	XTIndBlockPtr	block;
	DcSegmentPtr	seg;

	if (!(block = ind_cac_fetch(ot, ind, address, &seg, FALSE)))
		return FAILED;

	XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
	ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN || block->cb_state == IDX_CAC_BLOCK_DIRTY);
	memcpy(block->cb_data, data, size);
	block->cp_flush_seq = ot->ot_table->tab_ind_flush_seq;
	if (block->cb_state != IDX_CAC_BLOCK_DIRTY) {
		TRACK_BLOCK_WRITE(offset);
		xt_spinlock_lock(&ind->mi_dirty_lock);
		if ((block->cb_dirty_next = ind->mi_dirty_list))
			ind->mi_dirty_list->cb_dirty_prev = block;
		block->cb_dirty_prev = NULL;
		ind->mi_dirty_list = block;
		ind->mi_dirty_blocks++;
		xt_spinlock_unlock(&ind->mi_dirty_lock);
		block->cb_state = IDX_CAC_BLOCK_DIRTY;
	}
	XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
	IDX_CAC_UNLOCK(seg, ot->ot_thread);
#ifdef XT_TRACK_INDEX_UPDATES
	ot->ot_ind_changed++;
#endif
	return OK;
}

/*
 * Update the cache, if in RAM.
 */
xtPublic xtBool xt_ind_write_cache(XTOpenTablePtr ot, xtIndexNodeID address, size_t size, xtWord1 *data)
{
	XTIndBlockPtr	block;
	DcSegmentPtr	seg;

	if (!ind_cac_get(ot, address, &seg, &block))
		return FAILED;

	if (block) {
		XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
		ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN || block->cb_state == IDX_CAC_BLOCK_DIRTY);
		memcpy(block->cb_data, data, size);
		XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
		IDX_CAC_UNLOCK(seg, ot->ot_thread);
	}

	return OK;
}

xtPublic xtBool xt_ind_clean(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address)
{
	XTIndBlockPtr	block;
	DcSegmentPtr	seg;

	if (!ind_cac_get(ot, address, &seg, &block))
		return FAILED;
	if (block) {
		XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
		ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN || block->cb_state == IDX_CAC_BLOCK_DIRTY);

		if (block->cb_state == IDX_CAC_BLOCK_DIRTY) {
			/* Take the block off the dirty list: */
			xt_spinlock_lock(&ind->mi_dirty_lock);
			if (block->cb_dirty_next)
				block->cb_dirty_next->cb_dirty_prev = block->cb_dirty_prev;
			if (block->cb_dirty_prev)
				block->cb_dirty_prev->cb_dirty_next = block->cb_dirty_next;
			if (ind->mi_dirty_list == block)
				ind->mi_dirty_list = block->cb_dirty_next;
			ind->mi_dirty_blocks--;
			xt_spinlock_unlock(&ind->mi_dirty_lock);
			block->cb_state = IDX_CAC_BLOCK_CLEAN;
		}
		XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);

		IDX_CAC_UNLOCK(seg, ot->ot_thread);
	}

	return OK;
}

xtPublic xtBool xt_ind_read_bytes(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, size_t size, xtWord1 *data)
{
	XTIndBlockPtr	block;
	DcSegmentPtr	seg;

	if (!(block = ind_cac_fetch(ot, ind, address, &seg, TRUE)))
		return FAILED;

	XT_IPAGE_READ_LOCK(&block->cb_lock);
	memcpy(data, block->cb_data, size);
	XT_IPAGE_UNLOCK(&block->cb_lock, FALSE);
	IDX_CAC_UNLOCK(seg, ot->ot_thread);
	return OK;
}

xtPublic xtBool xt_ind_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, XTPageLockType ltype, XTIndReferencePtr iref)
{
	register XTIndBlockPtr	block;
	DcSegmentPtr			seg;
	xtWord2					branch_size;
	xtBool					xlock = FALSE;

#ifdef DEBUG
	ASSERT_NS(iref->ir_xlock == 2);
	ASSERT_NS(iref->ir_xlock == 2);
#endif
	if (!(block = ind_cac_fetch(ot, ind, address, &seg, TRUE)))
		return NULL;

	branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
	if (XT_GET_INDEX_BLOCK_LEN(branch_size) < 2 || XT_GET_INDEX_BLOCK_LEN(branch_size) > XT_INDEX_PAGE_SIZE) {
		IDX_CAC_UNLOCK(seg, ot->ot_thread);
		xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
		return FAILED;
	}

	switch (ltype) {
		case XT_LOCK_READ:
			break;
		case XT_LOCK_WRITE:
			xlock = TRUE;
			break;
		case XT_XLOCK_LEAF:
			if (!XT_IS_NODE(branch_size))
				xlock = TRUE;
			break;
		case XT_XLOCK_DEL_LEAF:
			if (!XT_IS_NODE(branch_size)) {
				if (ot->ot_table->tab_dic.dic_no_lazy_delete)
					xlock = TRUE;
				else {
					/*
					 * {LAZY-DEL-INDEX-ITEMS}
					 *
					 * We are fetch a page for delete purpose.
					 * we decide here if we plan to do a lazy delete,
					 * Or if we plan to compact the node.
					 *
					 * A lazy delete just requires a shared lock.
					 *
					 */
					if (ind->mi_lazy_delete) {
						/* If the number of deleted items is greater than
						 * half of the number of times that can fit in the
						 * page, then we will compact the node.
						 */
						if (!xt_idx_lazy_delete_on_leaf(ind, block, XT_GET_INDEX_BLOCK_LEN(branch_size)))
							xlock = TRUE;
					}
					else
						xlock = TRUE;
				}
			}
			break;
	}

	if ((iref->ir_xlock = xlock))
		XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
	else
		XT_IPAGE_READ_LOCK(&block->cb_lock);

	IDX_CAC_UNLOCK(seg, ot->ot_thread);

	/* {DIRECT-IO}
	 * Direct I/O requires that the buffer is 512 byte aligned.
	 * To do this, cb_data is turned into a pointer, instead
	 * of an array.
	 * As a result, we need to pass a pointer to both the
	 * cache block and the cache block data:
	 */
	iref->ir_updated = FALSE;
	iref->ir_block = block;
	iref->ir_branch = (XTIdxBranchDPtr) block->cb_data;
	return OK;
}

xtPublic xtBool xt_ind_release(XTOpenTablePtr ot, XTIndexPtr ind, XTPageUnlockType XT_NDEBUG_UNUSED(utype), XTIndReferencePtr iref)
{
	register XTIndBlockPtr	block;

	block = iref->ir_block;

#ifdef DEBUG
	ASSERT_NS(iref->ir_xlock != 2);
	ASSERT_NS(iref->ir_updated != 2);
	if (iref->ir_updated)
		ASSERT_NS(utype == XT_UNLOCK_R_UPDATE || utype == XT_UNLOCK_W_UPDATE);
	else
		ASSERT_NS(utype == XT_UNLOCK_READ || utype == XT_UNLOCK_WRITE);
	if (iref->ir_xlock)
		ASSERT_NS(utype == XT_UNLOCK_WRITE || utype == XT_UNLOCK_W_UPDATE);
	else
		ASSERT_NS(utype == XT_UNLOCK_READ || utype == XT_UNLOCK_R_UPDATE);
#endif
	if (iref->ir_updated) {
		/* The page was update: */
		ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN || block->cb_state == IDX_CAC_BLOCK_DIRTY);
		block->cp_flush_seq = ot->ot_table->tab_ind_flush_seq;
		if (block->cb_state != IDX_CAC_BLOCK_DIRTY) {
			TRACK_BLOCK_WRITE(offset);
			xt_spinlock_lock(&ind->mi_dirty_lock);
			if ((block->cb_dirty_next = ind->mi_dirty_list))
				ind->mi_dirty_list->cb_dirty_prev = block;
			block->cb_dirty_prev = NULL;
			ind->mi_dirty_list = block;
			ind->mi_dirty_blocks++;
			xt_spinlock_unlock(&ind->mi_dirty_lock);
			block->cb_state = IDX_CAC_BLOCK_DIRTY;
		}
	}

	XT_IPAGE_UNLOCK(&block->cb_lock, iref->ir_xlock);
#ifdef DEBUG
	iref->ir_xlock = 2;
	iref->ir_updated = 2;
#endif
	return OK;
}

xtPublic xtBool xt_ind_reserve(XTOpenTablePtr ot, u_int count, XTIdxBranchDPtr not_this)
{
	register XTIndBlockPtr	block;
	register DcGlobalsRec	*dcg = &ind_cac_globals;

#ifdef XT_TRACK_INDEX_UPDATES
	ot->ot_ind_reserved = count;
	ot->ot_ind_reads = 0;
#endif
#ifdef DEBUG_CHECK_IND_CACHE
	xt_ind_check_cache(NULL);
#endif
	while (ot->ot_ind_res_count < count) {
		if (!dcg->cg_free_list) {
			if (!ind_cac_free_lru_blocks(ot, count - ot->ot_ind_res_count, not_this)) {
				if (!dcg->cg_free_list) {
					xt_ind_free_reserved(ot);
					xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_INDEX_CACHE);
#ifdef DEBUG_CHECK_IND_CACHE
					xt_ind_check_cache(NULL);
#endif
					return FAILED;
				}
			}
		}

		/* Get a free block: */
		xt_lock_mutex_ns(&dcg->cg_lock);
		while (ot->ot_ind_res_count < count && (block = dcg->cg_free_list)) {
			ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_FREE);
			dcg->cg_free_list = block->cb_next;
			dcg->cg_free_count--;
			block->cb_next = ot->ot_ind_res_bufs;
			ot->ot_ind_res_bufs = block;
			ot->ot_ind_res_count++;
#ifdef DEBUG_CHECK_IND_CACHE
			dcg->cg_reserved_by_ots++;
#endif
		}
		xt_unlock_mutex_ns(&dcg->cg_lock);
	}
#ifdef DEBUG_CHECK_IND_CACHE
	xt_ind_check_cache(NULL);
#endif
	return OK;
}

xtPublic void xt_ind_free_reserved(XTOpenTablePtr ot)
{
#ifdef DEBUG_CHECK_IND_CACHE
	xt_ind_check_cache(NULL);
#endif
	if (ot->ot_ind_res_bufs) {
		register XTIndBlockPtr	block, fblock;
		register DcGlobalsRec	*dcg = &ind_cac_globals;

		xt_lock_mutex_ns(&dcg->cg_lock);
		block = ot->ot_ind_res_bufs;
		while (block) {
			fblock = block;
			block = block->cb_next;

			fblock->cb_next = dcg->cg_free_list;
			dcg->cg_free_list = fblock;
#ifdef DEBUG_CHECK_IND_CACHE
			dcg->cg_reserved_by_ots--;
#endif
			dcg->cg_free_count++;
		}
		xt_unlock_mutex_ns(&dcg->cg_lock);
		ot->ot_ind_res_bufs = NULL;
		ot->ot_ind_res_count = 0;
	}
#ifdef DEBUG_CHECK_IND_CACHE
	xt_ind_check_cache(NULL);
#endif
}

xtPublic void xt_ind_unreserve(XTOpenTablePtr ot)
{
	if (!ind_cac_globals.cg_free_list)
		xt_ind_free_reserved(ot);
}