/* Copyright (c) 2005 PrimeBase Technologies GmbH
 *
 * PrimeBase XT
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 *
 * 2005-01-24	Paul McCullagh
 *
 * H&G2JCtL
 */

#include "xt_config.h"

#include <stdio.h>
#ifndef XT_WIN
#include <unistd.h>
#include <signal.h>
#endif
#include <stdlib.h>

#ifndef DRIZZLED
#include "mysql_priv.h"
#endif

#include "ha_pbxt.h"

#include "filesys_xt.h"
#include "database_xt.h"
#include "memory_xt.h"
#include "strutil_xt.h"
#include "sortedlist_xt.h"
#include "util_xt.h"
#include "heap_xt.h"
#include "table_xt.h"
#include "trace_xt.h"
#include "myxt_xt.h"

static void dl_wake_co_thread(XTDatabaseHPtr db);

/*
 * --------------------------------------------------------------------------------
 * SEQUENTIAL READING
 */

xtBool XTDataSeqRead::sl_seq_init(struct XTDatabase *db, size_t buffer_size)
{
	sl_db = db;
	sl_buffer_size = buffer_size;

	sl_log_file = NULL;
	sl_log_eof = 0;

	sl_buf_log_offset = 0;
	sl_buffer_len = 0;
	sl_buffer = (xtWord1 *) xt_malloc_ns(buffer_size);

	sl_rec_log_id = 0;
	sl_rec_log_offset = 0;
	sl_record_len = 0;
	sl_extra_garbage = 0;

	return sl_buffer != NULL;
}

void XTDataSeqRead::sl_seq_exit()
{
	if (sl_log_file) {
		xt_close_file_ns(sl_log_file);
		sl_log_file  = NULL;
	}
	if (sl_buffer) {
		xt_free_ns(sl_buffer);
		sl_buffer = NULL;
	}
}

XTOpenFilePtr XTDataSeqRead::sl_seq_open_file()
{
	return sl_log_file;
}

void XTDataSeqRead::sl_seq_pos(xtLogID *log_id, xtLogOffset *log_offset)
{
	*log_id = sl_rec_log_id;
	*log_offset = sl_rec_log_offset;
}

xtBool XTDataSeqRead::sl_seq_start(xtLogID log_id, xtLogOffset log_offset, xtBool missing_ok)
{
	if (sl_rec_log_id != log_id) {
		if (sl_log_file) {
			xt_close_file_ns(sl_log_file);
			sl_log_file  = NULL;
		}

		sl_rec_log_id = log_id;
		sl_buf_log_offset = sl_rec_log_offset;
		sl_buffer_len = 0;

		if (!sl_db->db_datalogs.dlc_open_log(&sl_log_file, log_id, missing_ok ? XT_FS_MISSING_OK : XT_FS_DEFAULT))
			return FAILED;
		if (sl_log_file)
			sl_log_eof = xt_seek_eof_file(NULL, sl_log_file);
	}
	sl_rec_log_offset = log_offset;
	sl_record_len = 0;
	return OK;
}

xtBool XTDataSeqRead::sl_rnd_read(xtLogOffset log_offset, size_t size, xtWord1 *buffer, size_t *data_read, struct XTThread *thread)
{
	if (!sl_log_file) {
		*data_read = 0;
		return OK;
	}
	return xt_pread_file(sl_log_file, log_offset, size, 0, buffer, data_read, &thread->st_statistics.st_data, thread);
}

/*
 * Unlike the transaction log sequential reader, this function only returns
 * the header of a record.
 *
 * {SKIP-GAPS}
 * This function now skips gaps. This should not be required, because in normal
 * operation, no gaps should be created.
 *
 * However, if his happens there is a danger that a valid record after the
 * gap will be lost.
 *
 * So, if we find an invalid record, we scan through the log to find the next
 * valid record. Note, that there is still a danger that will will find
 * data that looks like a valid record, but is not.
 *
 * In this case, this "pseudo record" may cause the function to actually skip
 * valid records.
 *
 * Note, any such malfunction will eventually cause the record to be lost forever
 * after the garbage collector has run.
 */
xtBool XTDataSeqRead::sl_seq_next(XTXactLogBufferDPtr *ret_entry, struct XTThread *thread)
{
	XTXactLogBufferDPtr	record;
	size_t				tfer;
	size_t				len = 0;
	size_t				rec_offset;
	size_t				max_rec_len;
	xtBool				reread_from_buffer;
	xtWord4				size;
	xtLogOffset			gap_start = 0;

	/* Go to the next record (xseq_record_len must be initialized
	 * to 0 for this to work.
	 */
	retry:
	sl_rec_log_offset += sl_record_len;
	sl_record_len = 0;

	if (sl_rec_log_offset < sl_buf_log_offset ||
		sl_rec_log_offset >= sl_buf_log_offset + (xtLogOffset) sl_buffer_len) {
		/* The current position is nowhere near the buffer, read data into the
		 * buffer:
		 */
		tfer = sl_buffer_size;
		if (!sl_rnd_read(sl_rec_log_offset, tfer, sl_buffer, &tfer, thread))
			return FAILED;
		sl_buf_log_offset = sl_rec_log_offset;
		sl_buffer_len = tfer;

		/* Should we go to the next log? */
		if (!tfer)
			goto return_empty;
	}

	/* The start of the record is in the buffer: */
	read_from_buffer:
	rec_offset = (size_t) (sl_rec_log_offset - sl_buf_log_offset);
	max_rec_len = sl_buffer_len - rec_offset;
	reread_from_buffer = FALSE;
	size = 0;

	/* Check the type of record: */
	record = (XTXactLogBufferDPtr) (sl_buffer + rec_offset);
	switch (record->xl.xl_status_1) {
		case XT_LOG_ENT_HEADER:
			if (sl_rec_log_offset != 0)
				goto scan_to_next_record;
			if (offsetof(XTXactLogHeaderDRec, xh_size_4) + 4 > max_rec_len) {
				reread_from_buffer = TRUE;
				goto read_more;
			}
			len = XT_GET_DISK_4(record->xh.xh_size_4);
			if (len > max_rec_len) {
				reread_from_buffer = TRUE;
				goto read_more;
			}

			if (record->xh.xh_checksum_1 != XT_CHECKSUM_1(sl_rec_log_id))
				goto return_empty;
			if (XT_LOG_HEAD_MAGIC(record, len) != XT_LOG_FILE_MAGIC)
				goto return_empty;
			if (len > offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
				if (XT_GET_DISK_4(record->xh.xh_log_id_4) != sl_rec_log_id)
					goto return_empty;
			}
			break;
		case XT_LOG_ENT_EXT_REC_OK:
		case XT_LOG_ENT_EXT_REC_DEL:
			if (gap_start) {
				xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap in data log %lu, start: %llu, size: %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start, (u_llong) (sl_rec_log_offset - gap_start));
				gap_start = 0;
			}
			len = offsetof(XTactExtRecEntryDRec, er_data);
			if (len > max_rec_len) {
				reread_from_buffer = TRUE;
				goto read_more;
			}
			size = XT_GET_DISK_4(record->er.er_data_size_4);
			/* Verify the record as good as we can! */
			if (!size)
				goto scan_to_next_record;
			if (sl_rec_log_offset + (xtLogOffset) offsetof(XTactExtRecEntryDRec, er_data) + size > sl_log_eof)
				goto scan_to_next_record;
			if (!XT_GET_DISK_4(record->er.er_tab_id_4))
				goto scan_to_next_record;
			if (!XT_GET_DISK_4(record->er.er_rec_id_4))
				goto scan_to_next_record;
			break;
		default:
			/* Note, we no longer assume EOF.
			 * Instead, we skip to the next value record. */
			goto scan_to_next_record;
	}

	if (len <= max_rec_len) {
		/* The record is completely in the buffer: */
		sl_record_len = len+size;
		*ret_entry = record;
		return OK;
	}
	
	read_more:
	/* The record is partially in the buffer. */
	memmove(sl_buffer, sl_buffer + rec_offset, max_rec_len);
	sl_buf_log_offset += rec_offset;
	sl_buffer_len = max_rec_len;

	/* Read the rest, as far as possible: */
	tfer = sl_buffer_size - max_rec_len;
	if (!sl_rnd_read(sl_buf_log_offset + max_rec_len, tfer, sl_buffer + max_rec_len, &tfer, thread))
		return FAILED;
	sl_buffer_len += tfer;

	if (sl_buffer_len < len)
		/* A partial record is in the log, must be the end of the log: */
		goto return_empty;

	if (reread_from_buffer)
		goto read_from_buffer;

	/* The record is not completely in the buffer: */
	sl_record_len = len;
	*ret_entry = (XTXactLogBufferDPtr) sl_buffer;
	return OK;

	scan_to_next_record:
	if (!gap_start) {
		gap_start = sl_rec_log_offset;
		xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap found in data log %lu, starting at offset %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start);
	}
	sl_record_len = 1;
	sl_extra_garbage++;
	goto retry;

	return_empty:
	if (gap_start) {
		xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap in data log %lu, start: %llu, size: %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start, (u_llong) (sl_rec_log_offset - gap_start));
		gap_start = 0;
	}
	*ret_entry = NULL;
	return OK;
}

void XTDataSeqRead::sl_seq_skip(size_t size)
{
	sl_record_len += size;
}

void XTDataSeqRead::sl_seq_skip_to(off_t log_offset)
{
	if (log_offset >= sl_rec_log_offset)
		sl_record_len = (size_t) (log_offset - sl_rec_log_offset);
}

/*
 * --------------------------------------------------------------------------------
 * STATIC UTILITIES
 */

static xtBool dl_create_log_header(XTDataLogFilePtr data_log, XTOpenFilePtr of, XTThreadPtr thread)
{
	XTXactLogHeaderDRec	header;

	/* The header was not completely written, so write a new one: */
	memset(&header, 0, sizeof(XTXactLogHeaderDRec));
	header.xh_status_1 = XT_LOG_ENT_HEADER;
	header.xh_checksum_1 = XT_CHECKSUM_1(data_log->dlf_log_id);
	XT_SET_DISK_4(header.xh_size_4, sizeof(XTXactLogHeaderDRec));
	XT_SET_DISK_8(header.xh_free_space_8, 0);
	XT_SET_DISK_8(header.xh_file_len_8, sizeof(XTXactLogHeaderDRec));
	XT_SET_DISK_4(header.xh_log_id_4, data_log->dlf_log_id);
	XT_SET_DISK_2(header.xh_version_2, XT_LOG_VERSION_NO);
	XT_SET_DISK_4(header.xh_magic_4, XT_LOG_FILE_MAGIC);
	if (!xt_pwrite_file(of, 0, sizeof(XTXactLogHeaderDRec), &header, &thread->st_statistics.st_data, thread))
		return FAILED;
	if (!xt_flush_file(of, &thread->st_statistics.st_data, thread))
		return FAILED;
	return OK;
}

static xtBool dl_write_garbage_level(XTDataLogFilePtr data_log, XTOpenFilePtr of, xtBool flush, XTThreadPtr thread)
{
	XTXactLogHeaderDRec	header;

	/* The header was not completely written, so write a new one: */
	XT_SET_DISK_8(header.xh_free_space_8, data_log->dlf_garbage_count);
	if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_free_space_8), 8, (xtWord1 *) &header.xh_free_space_8, &thread->st_statistics.st_data, thread))
		return FAILED;
	if (flush && !xt_flush_file(of, &thread->st_statistics.st_data, thread))
		return FAILED;
	return OK;
}

/*
 * {SKIP-GAPS}
 * Extra garbage is the amount of space skipped during recovery of the data
 * log file. We assume this space has not be counted as garbage, 
 * and add it to the garbage count.
 *
 * This may mean that our estimate of garbaged is higher than it should
 * be, but that is better than the other way around.
 *
 * The fact is, there should not be any gaps in the data log files, so
 * this is actually an exeption which should not occur.
 */
static xtBool dl_write_log_header(XTDataLogFilePtr data_log, XTOpenFilePtr of, xtLogOffset extra_garbage, XTThreadPtr thread)
{
	XTXactLogHeaderDRec	header;

	XT_SET_DISK_8(header.xh_file_len_8, data_log->dlf_log_eof);

	if (extra_garbage) {
		data_log->dlf_garbage_count += extra_garbage;
		if (data_log->dlf_garbage_count > data_log->dlf_log_eof)
			data_log->dlf_garbage_count = data_log->dlf_log_eof;
		XT_SET_DISK_8(header.xh_free_space_8, data_log->dlf_garbage_count);
		if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_free_space_8), 16, (xtWord1 *) &header.xh_free_space_8, &thread->st_statistics.st_data, thread))
			return FAILED;
	}
	else {
		if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_file_len_8), 8, (xtWord1 *) &header.xh_file_len_8, &thread->st_statistics.st_data, thread))
			return FAILED;
	}
	if (!xt_flush_file(of, &thread->st_statistics.st_data, thread))
		return FAILED;
	return OK;
}

static void dl_free_seq_read(XTThreadPtr self __attribute__((unused)), XTDataSeqReadPtr seq_read)
{
	seq_read->sl_seq_exit();
}

static void dl_recover_log(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogFilePtr data_log)
{
	XTDataSeqReadRec	seq_read;
	XTXactLogBufferDPtr	record;

	if (!seq_read.sl_seq_init(db, xt_db_log_buffer_size))
		xt_throw(self);
	pushr_(dl_free_seq_read, &seq_read);

	seq_read.sl_seq_start(data_log->dlf_log_id, 0, FALSE);

	for (;;) {
		if (!seq_read.sl_seq_next(&record, self))
			xt_throw(self);
		if (!record)
			break;
		switch (record->xh.xh_status_1) {
			case XT_LOG_ENT_HEADER:
				data_log->dlf_garbage_count = XT_GET_DISK_8(record->xh.xh_free_space_8);
				data_log->dlf_start_offset = XT_GET_DISK_8(record->xh.xh_comp_pos_8);
				seq_read.sl_seq_skip_to((off_t) XT_GET_DISK_8(record->xh.xh_file_len_8)); 
				break;
		}
	}

	ASSERT_NS(seq_read.sl_log_eof == seq_read.sl_rec_log_offset);
	data_log->dlf_log_eof = seq_read.sl_rec_log_offset;

	if (data_log->dlf_log_eof < (off_t) sizeof(XTXactLogHeaderDRec)) {
		data_log->dlf_log_eof = sizeof(XTXactLogHeaderDRec);
		if (!dl_create_log_header(data_log, seq_read.sl_log_file, self))
			xt_throw(self);
	}
	else {
		if (!dl_write_log_header(data_log, seq_read.sl_log_file, seq_read.sl_extra_garbage, self))
			xt_throw(self);
	}

	freer_(); // dl_free_seq_read(&seq_read)
}

/*
 * --------------------------------------------------------------------------------
 * D A T A  L O G  C AC H E
 */

void XTDataLogCache::dls_remove_log(XTDataLogFilePtr data_log)
{
	xtLogID log_id = data_log->dlf_log_id;

	switch (data_log->dlf_state) {
		case XT_DL_HAS_SPACE:
			xt_sl_delete(NULL, dlc_has_space, &log_id);
			break;
		case XT_DL_TO_COMPACT:
			xt_sl_delete(NULL, dlc_to_compact, &log_id);
			break;
		case XT_DL_TO_DELETE:
			xt_sl_delete(NULL, dlc_to_delete, &log_id);
			break;
		case XT_DL_DELETED:
			xt_sl_delete(NULL, dlc_deleted, &log_id);
			break;
	}
}

int XTDataLogCache::dls_get_log_state(XTDataLogFilePtr data_log)
{
	if (data_log->dlf_to_much_garbage())
		return XT_DL_TO_COMPACT;
	if (data_log->dlf_space_avaliable() > 0)
		return XT_DL_HAS_SPACE;
	return XT_DL_READ_ONLY;
}

xtBool XTDataLogCache::dls_set_log_state(XTDataLogFilePtr data_log, int state)
{
	xtLogID log_id = data_log->dlf_log_id;

	xt_lock_mutex_ns(&dlc_lock);
	if (state == XT_DL_MAY_COMPACT) {
		if (data_log->dlf_state != XT_DL_UNKNOWN &&
			data_log->dlf_state != XT_DL_HAS_SPACE &&
			data_log->dlf_state != XT_DL_READ_ONLY)
			goto ok;
		state = XT_DL_TO_COMPACT;
	}
	if (state == XT_DL_UNKNOWN)
		state = dls_get_log_state(data_log);
	switch (state) {
		case XT_DL_HAS_SPACE:
			if (data_log->dlf_state != XT_DL_HAS_SPACE) {
				dls_remove_log(data_log);
				if (!xt_sl_insert(NULL, dlc_has_space, &log_id, &log_id))
					goto failed;
			}
			break;
		case XT_DL_TO_COMPACT:
#ifdef DEBUG_LOG_DELETE
			printf("-- set to compact: %d\n", (int) log_id);
#endif
			if (data_log->dlf_state != XT_DL_TO_COMPACT) {
				dls_remove_log(data_log);
				if (!xt_sl_insert(NULL, dlc_to_compact, &log_id, &log_id))
					goto failed;
			}
			dl_wake_co_thread(dlc_db);
			break;
		case XT_DL_COMPACTED:
#ifdef DEBUG_LOG_DELETE
			printf("-- set compacted: %d\n", (int) log_id);
#endif
			if (data_log->dlf_state != state)
				dls_remove_log(data_log);
			break;
		case XT_DL_TO_DELETE:
#ifdef DEBUG_LOG_DELETE
			printf("-- set to delete log: %d\n", (int) log_id);
#endif
			if (data_log->dlf_state != XT_DL_TO_DELETE) {
				dls_remove_log(data_log);
				if (!xt_sl_insert(NULL, dlc_to_delete, &log_id, &log_id))
					goto failed;
			}
			break;
		case XT_DL_DELETED:
#ifdef DEBUG_LOG_DELETE
			printf("-- set DELETED log: %d\n", (int) log_id);
#endif
			if (data_log->dlf_state != XT_DL_DELETED) {
				dls_remove_log(data_log);
				if (!xt_sl_insert(NULL, dlc_deleted, &log_id, &log_id))
					goto failed;
			}
			break;
		default:
			if (data_log->dlf_state != state)
				dls_remove_log(data_log);
			break;
	}
	data_log->dlf_state = state;

	ok:
	xt_unlock_mutex_ns(&dlc_lock);
	return OK;

	failed:
	xt_unlock_mutex_ns(&dlc_lock);
	return FAILED;
}

static int dl_cmp_log_id(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
{
	xtLogID			log_id_a = *((xtLogID *) a);
	xtLogID			log_id_b = *((xtLogID *) b);

	if (log_id_a == log_id_b)
		return 0;
	if (log_id_a < log_id_b)
		return -1;
	return 1;
}

void XTDataLogCache::dlc_init(XTThreadPtr self, XTDatabaseHPtr db)
{
	XTOpenDirPtr		od;
	char				log_dir[PATH_MAX];
	char				*file;
	xtLogID				log_id;
	XTDataLogFilePtr	data_log= NULL;

	memset(this, 0, sizeof(XTDataLogCacheRec));
	dlc_db = db;
	try_(a) {
		xt_init_mutex_with_autoname(self, &dlc_lock);
		xt_init_cond(self, &dlc_cond);
		for (u_int i=0; i<XT_DL_NO_OF_SEGMENTS; i++) {
			xt_init_mutex_with_autoname(self, &dlc_segment[i].dls_lock);
			xt_init_cond(self, &dlc_segment[i].dls_cond);
		}
		dlc_has_space = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
		dlc_to_compact = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
		dlc_to_delete = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
		dlc_deleted = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
		xt_init_mutex_with_autoname(self, &dlc_mru_lock);
		xt_init_mutex_with_autoname(self, &dlc_head_lock);

		xt_strcpy(PATH_MAX, log_dir, dlc_db->db_main_path);
		xt_add_data_dir(PATH_MAX, log_dir);
		if (xt_fs_exists(log_dir)) {
			pushsr_(od, xt_dir_close, xt_dir_open(self, log_dir, NULL));
			while (xt_dir_next(self, od)) {
				file = xt_dir_name(self, od);
				if (xt_ends_with(file, ".xt")) {
					if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
						if (!dlc_get_data_log(&data_log, log_id, TRUE, NULL))
							xt_throw(self);
						dl_recover_log(self, db, data_log);
						if (!dls_set_log_state(data_log, XT_DL_UNKNOWN))
							xt_throw(self);
					}
				}
			}
			freer_();
		}
	}
	catch_(a) {
		dlc_exit(self);
		xt_throw(self);
	}
	cont_(a);
}

void XTDataLogCache::dlc_exit(XTThreadPtr self)
{
	XTDataLogFilePtr	data_log, tmp_data_log;
	XTOpenLogFilePtr	open_log, tmp_open_log;

	if (dlc_has_space) {
		xt_free_sortedlist(self, dlc_has_space);
		dlc_has_space = NULL;
	}
	if (dlc_to_compact) {
		xt_free_sortedlist(self, dlc_to_compact);
		dlc_to_compact = NULL;
	}
	if (dlc_to_delete) {
		xt_free_sortedlist(self, dlc_to_delete);
		dlc_to_delete = NULL;
	}
	if (dlc_deleted) {
		xt_free_sortedlist(self, dlc_deleted);
		dlc_deleted = NULL;
	}
	for (u_int i=0; i<XT_DL_NO_OF_SEGMENTS; i++) {
		for (u_int j=0; j<XT_DL_SEG_HASH_TABLE_SIZE; j++) {
			data_log = dlc_segment[i].dls_hash_table[j];
			while (data_log) {
				if (data_log->dlf_log_file) {
					xt_close_file_ns(data_log->dlf_log_file);
					data_log->dlf_log_file = NULL;
				}

				open_log = data_log->dlf_free_list;
				while (open_log) {
					if (open_log->odl_log_file)
						xt_close_file(self, open_log->odl_log_file);
					tmp_open_log = open_log;
					open_log = open_log->odl_next_free;
					xt_free(self, tmp_open_log);
				}
				tmp_data_log = data_log;
				data_log = data_log->dlf_next_hash;

				xt_free(self, tmp_data_log);
			}
		}
		xt_free_mutex(&dlc_segment[i].dls_lock);
		xt_free_cond(&dlc_segment[i].dls_cond);
	}
	xt_free_mutex(&dlc_head_lock);
	xt_free_mutex(&dlc_mru_lock);
	xt_free_mutex(&dlc_lock);
	xt_free_cond(&dlc_cond);
}

void XTDataLogCache::dlc_name(size_t size, char *path, xtLogID log_id)
{
	char name[50];

	sprintf(name, "dlog-%lu.xt", (u_long) log_id);
	xt_strcpy(size, path, dlc_db->db_main_path);
	xt_add_data_dir(size, path);
	xt_add_dir_char(size, path);
	xt_strcat(size, path, name);
}

xtBool XTDataLogCache::dlc_open_log(XTOpenFilePtr *fh, xtLogID log_id, int mode)
{
	char log_path[PATH_MAX];

	dlc_name(PATH_MAX, log_path, log_id);
	return xt_open_file_ns(fh, log_path, mode);
}

xtBool XTDataLogCache::dlc_unlock_log(XTDataLogFilePtr data_log)
{
	if (data_log->dlf_log_file) {
		xt_close_file_ns(data_log->dlf_log_file);
		data_log->dlf_log_file = NULL;
	}

	return dls_set_log_state(data_log, XT_DL_UNKNOWN);
}

XTDataLogFilePtr XTDataLogCache::dlc_get_log_for_writing(off_t space_required, struct XTThread *thread)
{
	xtLogID				log_id, *log_id_ptr = NULL;
	size_t				size;
	size_t				idx;
	XTDataLogFilePtr	data_log = NULL;

	xt_lock_mutex_ns(&dlc_lock);

	/* Look for an existing log with enough space: */
	size = xt_sl_get_size(dlc_has_space);
	for (idx=0; idx<size; idx++) {
		log_id_ptr = (xtLogID *) xt_sl_item_at(dlc_has_space, idx);
		if (!dlc_get_data_log(&data_log, *log_id_ptr, FALSE, NULL))
			goto failed;
		if (data_log) {
			if (data_log->dlf_space_avaliable() >= space_required)
				break;
			data_log = NULL;
		}
		else {
			ASSERT_NS(FALSE);
			xt_sl_delete_item_at(NULL, dlc_has_space, idx);
			idx--;
			size--;
		}
	}

	if (data_log) {
		/* Found a log: */
		if (!dlc_open_log(&data_log->dlf_log_file, *log_id_ptr, XT_FS_DEFAULT))
			goto failed;
		xt_sl_delete_item_at(NULL, dlc_has_space, idx);
	}
	else {
		/* Create a new log: */
		log_id = dlc_next_log_id;
		for (u_int i=0; i<XT_DL_MAX_LOG_ID; i++) {
			log_id++;
			if (log_id > XT_DL_MAX_LOG_ID)
				log_id = 1;
			if (!dlc_get_data_log(&data_log, log_id, FALSE, NULL))
				goto failed;
			if (!data_log)
				break;
		}
		dlc_next_log_id = log_id;
		if (data_log) {
			xt_register_ulxterr(XT_REG_CONTEXT, XT_ERR_LOG_MAX_EXCEEDED, (u_long) XT_DL_MAX_LOG_ID);
			goto failed;
		}
		if (!dlc_get_data_log(&data_log, log_id, TRUE, NULL))
			goto failed;
		if (!dlc_open_log(&data_log->dlf_log_file, log_id, XT_FS_CREATE | XT_FS_MAKE_PATH))
			goto failed;
		data_log->dlf_log_eof = sizeof(XTXactLogHeaderDRec);
		if (!dl_create_log_header(data_log, data_log->dlf_log_file, thread)) {
			xt_close_file_ns(data_log->dlf_log_file);
			goto failed;
		}
		/* By setting this late we ensure that the error
		 * will be repeated.
		 */ 
		dlc_next_log_id = log_id;
	}
	data_log->dlf_state = XT_DL_EXCLUSIVE;

	xt_unlock_mutex_ns(&dlc_lock);
	return data_log;

	failed:
	xt_unlock_mutex_ns(&dlc_lock);
	return NULL;
}

xtBool XTDataLogCache::dlc_get_data_log(XTDataLogFilePtr *lf, xtLogID log_id, xtBool create, XTDataLogSegPtr *ret_seg)
{
	register XTDataLogSegPtr	seg;
	register u_int				hash_idx;
	register XTDataLogFilePtr	data_log;

	/* Which segment, and hash index: */
	seg = &dlc_segment[log_id & XT_DL_SEGMENT_MASK];
	hash_idx = (log_id >> XT_DL_SEGMENT_SHIFTS) % XT_DL_SEG_HASH_TABLE_SIZE;

	/* Lock the segment: */
	xt_lock_mutex_ns(&seg->dls_lock);

	/* Find the log file on the hash list: */
	data_log = seg->dls_hash_table[hash_idx];
	while (data_log) {
		if (data_log->dlf_log_id == log_id)
			break;
		data_log = data_log->dlf_next_hash;
	}

	if (!data_log && create) {
		/* Create a new log file structure: */
		if (!(data_log = (XTDataLogFilePtr) xt_calloc_ns(sizeof(XTDataLogFileRec))))
			goto failed;
		data_log->dlf_log_id = log_id;
		data_log->dlf_next_hash = seg->dls_hash_table[hash_idx];
		seg->dls_hash_table[hash_idx] = data_log;
	}

	if (ret_seg) {
		/* This gives the caller the lock: */
		*ret_seg = seg;
		*lf = data_log;
		return OK;
	}

	xt_unlock_mutex_ns(&seg->dls_lock);
	*lf = data_log;
	return OK;

	failed:
	xt_unlock_mutex_ns(&seg->dls_lock);
	return FAILED;
}

/*
 * If just_close is FALSE, then a log is being deleted.
 * This means that that the log may still be in exclusive use by
 * some thread. So we just close the log!
 */
xtBool XTDataLogCache::dlc_remove_data_log(xtLogID log_id, xtBool just_close)
{
	register XTDataLogSegPtr	seg;
	register u_int				hash_idx;
	register XTDataLogFilePtr	data_log;
	XTOpenLogFilePtr			open_log, tmp_open_log;

	/* Which segment, and hash index: */
	seg = &dlc_segment[log_id & XT_DL_SEGMENT_MASK];
	hash_idx = (log_id >> XT_DL_SEGMENT_SHIFTS) % XT_DL_SEG_HASH_TABLE_SIZE;

	/* Lock the segment: */
	retry:
	xt_lock_mutex_ns(&seg->dls_lock);

	/* Find the log file on the hash list: */
	data_log = seg->dls_hash_table[hash_idx];
	while (data_log) {
		if (data_log->dlf_log_id == log_id)
			break;
		data_log = data_log->dlf_next_hash;
	}

	if (data_log) {
		xt_lock_mutex_ns(&dlc_mru_lock);

		open_log = data_log->dlf_free_list;
		while (open_log) {
			if (open_log->odl_log_file)
				xt_close_file_ns(open_log->odl_log_file);

			/* Remove from MRU list: */
			if (dlc_lru_open_log == open_log) {
				dlc_lru_open_log = open_log->odl_mr_used;
				ASSERT_NS(!open_log->odl_lr_used);
			}
			else if (open_log->odl_lr_used)
				open_log->odl_lr_used->odl_mr_used = open_log->odl_mr_used;
			if (dlc_mru_open_log == open_log) {
				dlc_mru_open_log = open_log->odl_lr_used;
				ASSERT_NS(!open_log->odl_mr_used);
			}
			else if (open_log->odl_mr_used)
				open_log->odl_mr_used->odl_lr_used = open_log->odl_lr_used;

			data_log->dlf_open_count--;
			tmp_open_log = open_log;
			open_log = open_log->odl_next_free;
			xt_free_ns(tmp_open_log);
		}
		data_log->dlf_free_list = NULL;

		xt_unlock_mutex_ns(&dlc_mru_lock);

		if (data_log->dlf_open_count) {
			if (!xt_timed_wait_cond_ns(&seg->dls_cond, &seg->dls_lock, 2000))
				goto failed;
			xt_unlock_mutex_ns(&seg->dls_lock);
			goto retry;
		}

		/* Close the exclusive file if required: */
		if (data_log->dlf_log_file) {
			xt_close_file_ns(data_log->dlf_log_file);
			data_log->dlf_log_file = NULL;
		}

		if (!just_close) {
			/* Remove the log from the hash list: */
			XTDataLogFilePtr ptr, pptr = NULL;

			ptr = seg->dls_hash_table[hash_idx];
			while (ptr) {
				if (ptr == data_log)
					break;
				pptr = ptr;
				ptr = ptr->dlf_next_hash;
			}
			
			if (ptr == data_log) {
				if (pptr)
					pptr->dlf_next_hash = ptr->dlf_next_hash;
				else
					seg->dls_hash_table[hash_idx] = ptr->dlf_next_hash;
			}

			xt_free_ns(data_log);
		}
	}

	xt_unlock_mutex_ns(&seg->dls_lock);
	return OK;

	failed:
	xt_unlock_mutex_ns(&seg->dls_lock);
	return FAILED;
}

xtBool XTDataLogCache::dlc_get_open_log(XTOpenLogFilePtr *ol, xtLogID log_id)
{
	register XTDataLogSegPtr	seg;
	register u_int				hash_idx;
	register XTDataLogFilePtr	data_log;
	register XTOpenLogFilePtr	open_log;
	char						path[PATH_MAX];

	/* Which segment, and hash index: */
	seg = &dlc_segment[log_id & XT_DL_SEGMENT_MASK];
	hash_idx = (log_id >> XT_DL_SEGMENT_SHIFTS) % XT_DL_SEG_HASH_TABLE_SIZE;

	/* Lock the segment: */
	xt_lock_mutex_ns(&seg->dls_lock);

	/* Find the log file on the hash list: */
	data_log = seg->dls_hash_table[hash_idx];
	while (data_log) {
		if (data_log->dlf_log_id == log_id)
			break;
		data_log = data_log->dlf_next_hash;
	}

	if (!data_log) {
		/* Create a new log file structure: */
		dlc_name(PATH_MAX, path, log_id);
		if (!xt_fs_exists(path)) {
			xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_DATA_LOG_NOT_FOUND, path);
			goto failed;
		}
		if (!(data_log = (XTDataLogFilePtr) xt_calloc_ns(sizeof(XTDataLogFileRec))))
			goto failed;
		data_log->dlf_log_id = log_id;
		data_log->dlf_next_hash = seg->dls_hash_table[hash_idx];
		seg->dls_hash_table[hash_idx] = data_log;
	}

	if ((open_log = data_log->dlf_free_list)) {
		/* Remove from the free list: */
		if ((data_log->dlf_free_list = open_log->odl_next_free))
			data_log->dlf_free_list->odl_prev_free = NULL;

		/* This file has been most recently used: */
		if (XT_TIME_DIFF(open_log->odl_ru_time, dlc_ru_now) > (XT_DL_LOG_POOL_SIZE >> 1)) {
			/* Move to the front of the MRU list: */
			xt_lock_mutex_ns(&dlc_mru_lock);

			open_log->odl_ru_time = ++dlc_ru_now;
			if (dlc_mru_open_log != open_log) {
				/* Remove from the MRU list: */
				if (dlc_lru_open_log == open_log) {
					dlc_lru_open_log = open_log->odl_mr_used;
					ASSERT_NS(!open_log->odl_lr_used);
				}
				else if (open_log->odl_lr_used)
					open_log->odl_lr_used->odl_mr_used = open_log->odl_mr_used;
				if (open_log->odl_mr_used)
					open_log->odl_mr_used->odl_lr_used = open_log->odl_lr_used;

				/* Make the file the most recently used: */
				if ((open_log->odl_lr_used = dlc_mru_open_log))
					dlc_mru_open_log->odl_mr_used = open_log;
				open_log->odl_mr_used = NULL;
				dlc_mru_open_log = open_log;
				if (!dlc_lru_open_log)
					dlc_lru_open_log = open_log;
			}
			xt_unlock_mutex_ns(&dlc_mru_lock);
		}
	}
	else {
		/* Create a new open file: */
		if (!(open_log = (XTOpenLogFilePtr) xt_calloc_ns(sizeof(XTOpenLogFileRec))))
			goto failed;
		dlc_name(PATH_MAX, path, log_id);
		if (!xt_open_file_ns(&open_log->odl_log_file, path, XT_FS_DEFAULT)) {
			xt_free_ns(open_log);
			goto failed;
		}
		open_log->olf_log_id = log_id;
		open_log->odl_data_log = data_log;
		data_log->dlf_open_count++;

		/* Make the new open file the most recently used: */
		xt_lock_mutex_ns(&dlc_mru_lock);
		open_log->odl_ru_time = ++dlc_ru_now;
		if ((open_log->odl_lr_used = dlc_mru_open_log))
			dlc_mru_open_log->odl_mr_used = open_log;
		open_log->odl_mr_used = NULL;
		dlc_mru_open_log = open_log;
		if (!dlc_lru_open_log)
			dlc_lru_open_log = open_log;
		dlc_open_count++;
		xt_unlock_mutex_ns(&dlc_mru_lock);
	}

	open_log->odl_in_use = TRUE;
	xt_unlock_mutex_ns(&seg->dls_lock);
	*ol = open_log;

	if (dlc_open_count > XT_DL_LOG_POOL_SIZE) {
		u_int	target = XT_DL_LOG_POOL_SIZE / 4 * 3;
		xtLogID	free_log_id;

		/* Remove some open files: */
		while (dlc_open_count > target) {
			XTOpenLogFilePtr to_free = dlc_lru_open_log;

			if (!to_free || to_free->odl_in_use)
				break;

			/* Dirty read the file ID: */
			free_log_id = to_free->olf_log_id;

			seg = &dlc_segment[free_log_id & XT_DL_SEGMENT_MASK];

			/* Lock the segment: */
			xt_lock_mutex_ns(&seg->dls_lock);

			/* Lock the MRU list: */
			xt_lock_mutex_ns(&dlc_mru_lock);

			/* Check if we have the same open file: */
			if (dlc_lru_open_log == to_free && !to_free->odl_in_use) {
				data_log = to_free->odl_data_log;
		
				/* Remove from the MRU list: */
				dlc_lru_open_log = to_free->odl_mr_used;
				ASSERT_NS(!to_free->odl_lr_used);

				if (dlc_mru_open_log == to_free) {
					dlc_mru_open_log = to_free->odl_lr_used;
					ASSERT_NS(!to_free->odl_mr_used);
				}
				else if (to_free->odl_mr_used)
					to_free->odl_mr_used->odl_lr_used = to_free->odl_lr_used;

				/* Remove from the free list of the file: */
				if (data_log->dlf_free_list == to_free) {
					data_log->dlf_free_list = to_free->odl_next_free;
					ASSERT_NS(!to_free->odl_prev_free);
				}
				else if (to_free->odl_prev_free)
					to_free->odl_prev_free->odl_next_free = to_free->odl_next_free;
				if (to_free->odl_next_free)
					to_free->odl_next_free->odl_prev_free = to_free->odl_prev_free;
				ASSERT_NS(data_log->dlf_open_count > 0);
				data_log->dlf_open_count--;
				dlc_open_count--;
			}
			else
				to_free = NULL;

			xt_unlock_mutex_ns(&dlc_mru_lock);
			xt_unlock_mutex_ns(&seg->dls_lock);

			if (to_free) {
				xt_close_file_ns(to_free->odl_log_file);
				xt_free_ns(to_free);
			}
		}
	}

	return OK;

	failed:
	xt_unlock_mutex_ns(&seg->dls_lock);
	return FAILED;
}

void XTDataLogCache::dlc_release_open_log(XTOpenLogFilePtr open_log)
{
	register XTDataLogSegPtr	seg;
	register XTDataLogFilePtr	data_log = open_log->odl_data_log;

	/* Which segment, and hash index: */
	seg = &dlc_segment[open_log->olf_log_id & XT_DL_SEGMENT_MASK];

	xt_lock_mutex_ns(&seg->dls_lock);
	open_log->odl_next_free = data_log->dlf_free_list;
	open_log->odl_prev_free = NULL;
	if (data_log->dlf_free_list)
		data_log->dlf_free_list->odl_prev_free = open_log;
	data_log->dlf_free_list = open_log;
	open_log->odl_in_use = FALSE;

	/* Wakeup any exclusive lockers: */
	if (!xt_broadcast_cond_ns(&seg->dls_cond))
		xt_log_and_clear_exception_ns();

	xt_unlock_mutex_ns(&seg->dls_lock);
}

/*
 * --------------------------------------------------------------------------------
 * D A T A   L O G   F I L E
 */

off_t XTDataLogFile::dlf_space_avaliable()
{
	if (dlf_log_eof < xt_db_data_log_threshold)
		return xt_db_data_log_threshold - dlf_log_eof;
	return 0;
}

xtBool XTDataLogFile::dlf_to_much_garbage()
{
	if (!dlf_log_eof)
		return FALSE;
	return dlf_garbage_count * 100 / dlf_log_eof >= xt_db_garbage_threshold;
}

/*
 * --------------------------------------------------------------------------------
 * D A T A   L O G   B U F F E R
 */

void XTDataLogBuffer::dlb_init(XTDatabaseHPtr db, size_t buffer_size)
{
	ASSERT_NS(!dlb_db);
	ASSERT_NS(!dlb_buffer_size);
	ASSERT_NS(!dlb_data_log);
	ASSERT_NS(!dlb_log_buffer);
	dlb_db = db;
	dlb_buffer_size = buffer_size;
}

void XTDataLogBuffer::dlb_exit(XTThreadPtr self)
{
	dlb_close_log(self);
	if (dlb_log_buffer) {
		xt_free(self, dlb_log_buffer);
		dlb_log_buffer = NULL;
	}
	dlb_db = NULL;
	dlb_buffer_offset = 0;
	dlb_buffer_size = 0;
	dlb_buffer_len = 0;
	dlb_flush_required = FALSE;
#ifdef DEBUG
	dlb_max_write_offset = 0;
#endif
}

xtBool XTDataLogBuffer::dlb_close_log(XTThreadPtr thread)
{
	if (dlb_data_log) {
		if (dlb_data_log->dlf_log_file) {
			if (!dl_write_log_header(dlb_data_log, dlb_data_log->dlf_log_file, 0, thread))
				return FAILED;
		}

		/* Flush and commit the data in the old log: */
		if (!dlb_flush_log(TRUE, thread))
			return FAILED;

		if (!dlb_db->db_datalogs.dlc_unlock_log(dlb_data_log))
			return FAILED;
		dlb_data_log = NULL;
	}
	return OK;
}

/* When I use 'thread' instead of 'self', this means
 * that I will not throw an error.
 */
xtBool XTDataLogBuffer::dlb_get_log_offset(xtLogID *log_id, xtLogOffset *out_offset, size_t XT_UNUSED(req_size), struct XTThread *thread)
{
	/* Note, I am allowing a log to grow beyond the threshold.
	 * The amount depends on the maximum extended record size.
	 * If I don't some logs will never fill up, because of only having
	 * a few more bytes available.
	 */
	if (!dlb_data_log || dlb_data_log->dlf_space_avaliable() == 0) {
		/* Release the old log: */
		if (!dlb_close_log(thread))
			return FAILED;

		if (!dlb_log_buffer) {
			if (!(dlb_log_buffer = (xtWord1 *) xt_malloc_ns(dlb_buffer_size)))
				return FAILED;
		}

		/* I could use req_size instead of 1, but this would mean some logs
		 * are never filled up.
		 */
		if (!(dlb_data_log = dlb_db->db_datalogs.dlc_get_log_for_writing(1, thread)))
			return FAILED;
#ifdef DEBUG
		dlb_max_write_offset = dlb_data_log->dlf_log_eof;
#endif
	}

	*log_id = dlb_data_log->dlf_log_id;
	*out_offset = dlb_data_log->dlf_log_eof;
	return OK;
}

xtBool XTDataLogBuffer::dlb_flush_log(xtBool commit, XTThreadPtr thread)
{
	if (!dlb_data_log || !dlb_data_log->dlf_log_file)
		return OK;

	if (dlb_buffer_len) {
		if (!xt_pwrite_file(dlb_data_log->dlf_log_file, dlb_buffer_offset, dlb_buffer_len, dlb_log_buffer, &thread->st_statistics.st_data, thread))
			return FAILED;
#ifdef DEBUG
		if (dlb_buffer_offset + (xtLogOffset) dlb_buffer_len > dlb_max_write_offset)
			dlb_max_write_offset = dlb_buffer_offset + (xtLogOffset) dlb_buffer_len;
#endif
		dlb_buffer_len = 0;
		dlb_flush_required = TRUE;
	}

	if (commit && dlb_flush_required) {
#ifdef DEBUG
		/* This would normally be equal, however, in the case
		 * where some other thread flushes the compactors
		 * data log, the eof, can be greater than the
		 * write offset.
		 *
		 * This occurs because the flush can come between the 
		 * dlb_get_log_offset() and dlb_write_thru_log() calls.
		 */
		ASSERT_NS(dlb_data_log->dlf_log_eof >= dlb_max_write_offset);
#endif
		if (!xt_flush_file(dlb_data_log->dlf_log_file, &thread->st_statistics.st_data, thread))
			return FAILED;
		dlb_flush_required = FALSE;
	}
	return OK;
}

xtBool XTDataLogBuffer::dlb_write_thru_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtLogOffset log_offset, size_t size, xtWord1 *data, XTThreadPtr thread)
{
	ASSERT_NS(log_id == dlb_data_log->dlf_log_id);

	if (dlb_buffer_len)
		dlb_flush_log(FALSE, thread);

	if (!xt_pwrite_file(dlb_data_log->dlf_log_file, log_offset, size, data, &thread->st_statistics.st_data, thread))
		return FAILED;
	/* Increment of dlb_data_log->dlf_log_eof was moved here from dlb_get_log_offset()
	 * to ensure it is done after a successful update of the log, otherwise otherwise a 
	 * gap occurs in the log which cause eof to be detected  in middle of the log
	 */
	dlb_data_log->dlf_log_eof += size;
#ifdef DEBUG
	if (log_offset + (xtLogOffset) size > (xtLogOffset) dlb_max_write_offset)
		dlb_max_write_offset = log_offset + size;
#endif
	dlb_flush_required = TRUE;
	return OK;
}

xtBool XTDataLogBuffer::dlb_append_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtLogOffset log_offset, size_t size, xtWord1 *data, XTThreadPtr thread)
{
	ASSERT_NS(log_id == dlb_data_log->dlf_log_id);

	if (dlb_buffer_len) {
		/* Should be the case, we only write by appending: */
		ASSERT_NS(dlb_buffer_offset + (xtLogOffset) dlb_buffer_len == log_offset);
		/* Check if we are appending to the existing value in the buffer: */
		if (dlb_buffer_offset + (xtLogOffset) dlb_buffer_len == log_offset) {
			/* Can we just append: */
			if (dlb_buffer_size >= dlb_buffer_len + size) {
				memcpy(dlb_log_buffer + dlb_buffer_len, data, size);
				dlb_buffer_len += size;
				dlb_data_log->dlf_log_eof += size;
				return OK;
			}
		}
		if (dlb_flush_log(FALSE, thread) != OK)
			return FAILED;
	}
	
	ASSERT_NS(dlb_buffer_len == 0);
	
	if (dlb_buffer_size >= size) {
		dlb_buffer_offset = log_offset;
		dlb_buffer_len = size;
		memcpy(dlb_log_buffer, data, size);
		dlb_data_log->dlf_log_eof += size;
		return OK;
	}

	/* Write directly: */
	if (!xt_pwrite_file(dlb_data_log->dlf_log_file, log_offset, size, data, &thread->st_statistics.st_data, thread))
		return FAILED;
#ifdef DEBUG
	if (log_offset + (xtLogOffset) size > (xtLogOffset) dlb_max_write_offset)
		dlb_max_write_offset = log_offset + size;
#endif
	dlb_flush_required = TRUE;
	dlb_data_log->dlf_log_eof += size;
	return OK;
}

xtBool XTDataLogBuffer::dlb_read_log(xtLogID log_id, xtLogOffset log_offset, size_t size, xtWord1 *data, XTThreadPtr thread)
{
	size_t				red_size;
	XTOpenLogFilePtr	open_log;

	if (dlb_data_log && log_id == dlb_data_log->dlf_log_id) {
		/* Reading from the write log, I can do this quicker: */
		if (dlb_buffer_len) {
			/* If it is in the buffer, then it is completely in the buffer. */
			if (log_offset >= dlb_buffer_offset) {
				if (log_offset + (xtLogOffset) size <= dlb_buffer_offset + (xtLogOffset) dlb_buffer_len) {
					memcpy(data, dlb_log_buffer + (log_offset - dlb_buffer_offset), size);
					return OK;
				}
				/* Should not happen, reading past EOF: */
				ASSERT_NS(FALSE);
				memset(data, 0, size);
				return OK;
			}
			/* In the write log, but not in the buffer,
			 * must be completely not in the log,
			 * because only whole records are written to the
			 * log:
			 */
			ASSERT_NS(log_offset + (xtLogOffset) size <= dlb_buffer_offset);
		}		
		return xt_pread_file(dlb_data_log->dlf_log_file, log_offset, size, size, data, NULL, &thread->st_statistics.st_data, thread);
	}

	/* Read from some other log: */
	if (!dlb_db->db_datalogs.dlc_get_open_log(&open_log, log_id))
		return FAILED;

	if (!xt_pread_file(open_log->odl_log_file, log_offset, size, 0, data, &red_size, &thread->st_statistics.st_data, thread)) {
		dlb_db->db_datalogs.dlc_release_open_log(open_log);
		return FAILED;
	}

	dlb_db->db_datalogs.dlc_release_open_log(open_log);

	if (red_size < size)
		memset(data + red_size, 0, size - red_size);

	return OK;
}

/*
 * We assume that the given reference may not be valid.
 * Only valid references actually cause a delete.
 * Invalid references are logged, and ignored.
 *
 * Note this routine does not lock the compactor.
 * This can lead to the some incorrect calculation is the
 * amount of garbage. But nothing serious I think.
 */
xtBool XTDataLogBuffer::dlb_delete_log(xtLogID log_id, xtLogOffset log_offset, size_t size, xtTableID tab_id, xtRecordID rec_id, XTThreadPtr thread)
{
	XTactExtRecEntryDRec	record;
	xtWord1					status = XT_LOG_ENT_EXT_REC_DEL;
	XTOpenLogFilePtr		open_log;
	xtBool					to_much_garbage;
	XTDataLogFilePtr		data_log;

	if (!dlb_read_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data), (xtWord1 *) &record, thread))
		return FAILED;

	/* Already deleted: */
	if (record.er_status_1 == XT_LOG_ENT_EXT_REC_DEL)
		return OK;

	if (record.er_status_1 != XT_LOG_ENT_EXT_REC_OK ||
		size != XT_GET_DISK_4(record.er_data_size_4) ||
		tab_id != XT_GET_DISK_4(record.er_tab_id_4) ||
		rec_id != XT_GET_DISK_4(record.er_rec_id_4)) {
		xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_EXT_RECORD);
		return FAILED;
	}

	if (dlb_data_log && log_id == dlb_data_log->dlf_log_id) {
		/* Writing to the write log, I can do this quicker: */
		if (dlb_buffer_len) {
			/* If it is in the buffer, then it is completely in the buffer. */
			if (log_offset >= dlb_buffer_offset) {
				if (log_offset + 1 <= dlb_buffer_offset + (xtLogOffset) dlb_buffer_len) {
					*(dlb_log_buffer + (log_offset - dlb_buffer_offset)) = XT_LOG_ENT_EXT_REC_DEL;
					goto inc_garbage_count;
				}
				/* Should not happen, writing past EOF: */
				ASSERT_NS(FALSE);
				return OK;
			}
			ASSERT_NS(log_offset + (xtLogOffset) size <= dlb_buffer_offset);
		}

		if (!xt_pwrite_file(dlb_data_log->dlf_log_file, log_offset, 1, &status, &thread->st_statistics.st_data, thread))
			return FAILED;
		
		inc_garbage_count:
		xt_lock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
		dlb_data_log->dlf_garbage_count += offsetof(XTactExtRecEntryDRec, er_data) + size;
		ASSERT_NS(dlb_data_log->dlf_garbage_count < dlb_data_log->dlf_log_eof);
		if (!dl_write_garbage_level(dlb_data_log, dlb_data_log->dlf_log_file, FALSE, thread)) {
			xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
			return FAILED;
		}
		dlb_flush_required = TRUE;
		xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
		return OK;
	}

	/* Write to some other log, open the log: */
	if (!dlb_db->db_datalogs.dlc_get_open_log(&open_log, log_id))
		return FAILED;

	/* Write the status byte: */
	if (!xt_pwrite_file(open_log->odl_log_file, log_offset, 1, &status, &thread->st_statistics.st_data, thread))
		goto failed;

	data_log = open_log->odl_data_log;

	/* Adjust the garbage level in the header. */
	xt_lock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
	data_log->dlf_garbage_count += offsetof(XTactExtRecEntryDRec, er_data) + size;
	ASSERT_NS(data_log->dlf_garbage_count < data_log->dlf_log_eof);
	if (!dl_write_garbage_level(data_log, open_log->odl_log_file, FALSE, thread)) {
		xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
		goto failed;
	}
	to_much_garbage = data_log->dlf_to_much_garbage();
	xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);

	if (to_much_garbage &&
		(data_log->dlf_state == XT_DL_HAS_SPACE || data_log->dlf_state == XT_DL_READ_ONLY)) {
		/* There is too much garbage, it may be compacted. */
		if (!dlb_db->db_datalogs.dls_set_log_state(data_log, XT_DL_MAY_COMPACT))
			goto failed;
	}

	/* Release the open log: */
	dlb_db->db_datalogs.dlc_release_open_log(open_log);
	
	return OK;

	failed:
	dlb_db->db_datalogs.dlc_release_open_log(open_log);
	return FAILED;
}

/*
 * Delete all the extended data belonging to a particular
 * table.
 */
xtPublic void xt_dl_delete_ext_data(XTThreadPtr self, XTTableHPtr tab, xtBool XT_UNUSED(missing_ok), xtBool have_table_lock)
{
	XTOpenTablePtr	ot;
	xtRecordID		page_rec_id, offs_rec_id;
	XTTabRecExtDPtr	rec_buf;
	xtWord4			log_over_size;
	xtLogID			log_id;
	xtLogOffset		log_offset;
	xtWord1			*page_data;

	page_data = (xtWord1 *) xt_malloc(self, tab->tab_recs.tci_page_size);
	pushr_(xt_free, page_data);

	/* Scan the table, and remove all exended data... */
	if (!(ot = xt_open_table(tab))) {
		if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
			XT_FILE_NOT_FOUND(self->t_exception.e_sys_err))
			return;
		xt_throw(self);
	}
	ot->ot_thread = self;

	/* {LOCK-EXT-REC} This lock is to stop the compactor changing records 
	 * while we are doing the delete.
	 */
	xt_lock_mutex_ns(&tab->tab_db->db_co_ext_lock);

	page_rec_id = 1;
	while (page_rec_id < tab->tab_rec_eof_id) {
		/* NOTE: There is a good reason for using xt_tc_read_page().
		 * A deadlock can occur if using read, which can run out of
		 * memory, which waits for the freeer, which may need to
		 * open a table, which requires the db->db_tables lock,
		 * which is owned by the this thread, when the function
		 * is called from drop table.
		 *
		 * xt_tc_read_page() should work because no more changes
		 * should happen to the table while we are dropping it.
		 */
		if (!tab->tab_recs.xt_tc_read_page(ot->ot_rec_file, page_rec_id, page_data, self))
			goto failed;

		for (offs_rec_id=0; offs_rec_id<tab->tab_recs.tci_rows_per_page && page_rec_id+offs_rec_id < tab->tab_rec_eof_id; offs_rec_id++) {
			rec_buf = (XTTabRecExtDPtr) (page_data + (offs_rec_id * tab->tab_recs.tci_rec_size));
			if (XT_REC_IS_EXT_DLOG(rec_buf->tr_rec_type_1)) {
				log_over_size = XT_GET_DISK_4(rec_buf->re_log_dat_siz_4);
				XT_GET_LOG_REF(log_id, log_offset, rec_buf);

				if (!self->st_dlog_buf.dlb_delete_log(log_id, log_offset, log_over_size, tab->tab_id, page_rec_id+offs_rec_id, self)) {
					if (self->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
						self->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
						xt_log_and_clear_exception(self);
				}
			}
		}

		page_rec_id += tab->tab_recs.tci_rows_per_page;
	}

	xt_unlock_mutex_ns(&tab->tab_db->db_co_ext_lock);

	xt_close_table(ot, TRUE, have_table_lock);
	
	freer_(); // xt_free(page_data)
	return;
	
	failed:
	xt_unlock_mutex_ns(&tab->tab_db->db_co_ext_lock);

	xt_close_table(ot, TRUE, have_table_lock);
	xt_throw(self);
}

/*
 * --------------------------------------------------------------------------------
 * GARBAGE COLLECTOR THREAD
 */

xtPublic void xt_dl_init_db(XTThreadPtr self, XTDatabaseHPtr db)
{
	xt_init_mutex_with_autoname(self, &db->db_co_ext_lock);
	xt_init_mutex_with_autoname(self, &db->db_co_dlog_lock);
}

xtPublic void xt_dl_exit_db(XTThreadPtr self, XTDatabaseHPtr db)
{
	xt_stop_compactor(self, db);	// Already done!
	db->db_co_thread = NULL;
	xt_free_mutex(&db->db_co_ext_lock);
	xt_free_mutex(&db->db_co_dlog_lock);
}

xtPublic void xt_dl_set_to_delete(XTThreadPtr self, XTDatabaseHPtr db, xtLogID log_id)
{
	XTDataLogFilePtr data_log;

	if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, FALSE, NULL))
		xt_throw(self);
	if (data_log) {
		if (!db->db_datalogs.dls_set_log_state(data_log, XT_DL_TO_DELETE))
			xt_throw(self);
	}
}

xtPublic void xt_dl_log_status(XTThreadPtr self, XTDatabaseHPtr db, XTStringBufferPtr strbuf)
{
	XTSortedListPtr		list;
	XTDataLogFilePtr	data_log;
	XTDataLogSegPtr		seg;
	u_int				no_of_logs;
	xtLogID				*log_id_ptr;

	list = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
	pushr_(xt_free_sortedlist, list);

	for (u_int i=0; i<XT_DL_NO_OF_SEGMENTS; i++) {
		for (u_int j=0; j<XT_DL_SEG_HASH_TABLE_SIZE; j++) {
			seg = &db->db_datalogs.dlc_segment[i];
			data_log = seg->dls_hash_table[j];
			while (data_log) {
				xt_sl_insert(self, list, &data_log->dlf_log_id, &data_log->dlf_log_id);
				data_log = data_log->dlf_next_hash;
			}
		}
	}

	no_of_logs = xt_sl_get_size(list);
	for (u_int i=0; i<no_of_logs; i++) {
		log_id_ptr = (xtLogID *) xt_sl_item_at(list, i);
		if (!db->db_datalogs.dlc_get_data_log(&data_log, *log_id_ptr, FALSE, &seg))
			xt_throw(self);
		if (data_log) {
			xt_sb_concat(self, strbuf, "d-log: ");
			xt_sb_concat_int8(self, strbuf, data_log->dlf_log_id);
			xt_sb_concat(self, strbuf, " status=");
			switch (data_log->dlf_state) {
				case XT_DL_UNKNOWN:
					xt_sb_concat(self, strbuf, "?");
					break;
				case XT_DL_HAS_SPACE:
					xt_sb_concat(self, strbuf, "has-space ");
					break;
				case XT_DL_READ_ONLY:
					xt_sb_concat(self, strbuf, "read-only ");
					break;
				case XT_DL_TO_COMPACT:
					xt_sb_concat(self, strbuf, "to-compact");
					break;
				case XT_DL_COMPACTED:
					xt_sb_concat(self, strbuf, "compacted ");
					break;
				case XT_DL_TO_DELETE:
					xt_sb_concat(self, strbuf, "to-delete ");
					break;
				case XT_DL_DELETED:
					xt_sb_concat(self, strbuf, "deleted   ");
					break;
				case XT_DL_EXCLUSIVE:
					xt_sb_concat(self, strbuf, "x-locked  ");
					break;
			}
			xt_sb_concat(self, strbuf, " eof=");
			xt_sb_concat_int8(self, strbuf, data_log->dlf_log_eof);
			xt_sb_concat(self, strbuf, " garbage=");
			xt_sb_concat_int8(self, strbuf, data_log->dlf_garbage_count);
			xt_sb_concat(self, strbuf, " g%=");
			if (data_log->dlf_log_eof)
				xt_sb_concat_int8(self, strbuf, data_log->dlf_garbage_count * 100 / data_log->dlf_log_eof);
			else
				xt_sb_concat(self, strbuf, "100");
			xt_sb_concat(self, strbuf, " open=");
			xt_sb_concat_int8(self, strbuf, data_log->dlf_open_count);
			xt_sb_concat(self, strbuf, "\n");
		}
		xt_unlock_mutex_ns(&seg->dls_lock);
	}

	freer_(); // xt_free_sortedlist(list)
}

xtPublic void xt_dl_delete_logs(XTThreadPtr self, XTDatabaseHPtr db)
{
	char			path[PATH_MAX];
	XTOpenDirPtr	od;
	char			*file;
	xtLogID			log_id;

	xt_strcpy(PATH_MAX, path, db->db_main_path);
	xt_add_data_dir(PATH_MAX, path);
	if (!xt_fs_exists(path))
		return;
	pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
	while (xt_dir_next(self, od)) {
		file = xt_dir_name(self, od);
		if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
			if (!db->db_datalogs.dlc_remove_data_log(log_id, TRUE))
				xt_log_and_clear_exception(self);
		}
		if (xt_ends_with(file, ".xt")) {
			xt_add_dir_char(PATH_MAX, path);
			xt_strcat(PATH_MAX, path, file);
			xt_fs_delete(self, path);
			xt_remove_last_name_of_path(path);
		}
	}
	freer_(); // xt_dir_close(od)

	/* I no longer attach the condition: !db->db_multi_path
	 * to removing this directory. This is because
	 * the pbxt directory must now be removed explicitly
	 * by drop database, or by delete all the PBXT
	 * system tables.
	 */
	if (!xt_fs_rmdir(NULL, path))
		xt_log_and_clear_exception(self);
}

typedef struct XTCompactorState {
	XTSeqLogReadPtr			cs_seqread;
	XTOpenTablePtr			cs_ot;
	XTDataBufferRec			cs_databuf;
} XTCompactorStateRec, *XTCompactorStatePtr;

static void dl_free_compactor_state(XTThreadPtr self, XTCompactorStatePtr cs)
{
	if (cs->cs_seqread) {
		cs->cs_seqread->sl_seq_exit();
		delete cs->cs_seqread;
		cs->cs_seqread = NULL;
	}
	if (cs->cs_ot) {
		xt_db_return_table_to_pool(self, cs->cs_ot);
		cs->cs_ot = NULL;
	}
	xt_db_set_size(self, &cs->cs_databuf, 0);
}

static XTOpenTablePtr dl_cs_get_open_table(XTThreadPtr self, XTCompactorStatePtr cs, xtTableID tab_id)
{
	if (cs->cs_ot) {
		if (cs->cs_ot->ot_table->tab_id == tab_id)
			return cs->cs_ot;

		xt_db_return_table_to_pool(self, cs->cs_ot);
		cs->cs_ot = NULL;
	}

	if (!cs->cs_ot) {
		if (!(cs->cs_ot = xt_db_open_pool_table(self, self->st_database, tab_id, NULL, TRUE)))
			return NULL;
	}

	return cs->cs_ot;
}

static void dl_co_wait(XTThreadPtr self, XTDatabaseHPtr db, u_int secs)
{
	xt_lock_mutex(self, &db->db_datalogs.dlc_lock);
	pushr_(xt_unlock_mutex, &db->db_datalogs.dlc_lock);
	if (!self->t_quit)
		xt_timed_wait_cond(self, &db->db_datalogs.dlc_cond, &db->db_datalogs.dlc_lock, secs * 1000);
	freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)
}

/*
 * Collect all the garbage in a file by moving all valid records
 * into some other data log and updating the handles.
 */
static xtBool dl_collect_garbage(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogFilePtr data_log)
{
	XTXactLogBufferDPtr	record;
	size_t				size;
	xtTableID			tab_id;
	xtRecordID			rec_id;
	XTCompactorStateRec	cs;
	XTOpenTablePtr		ot;
	XTTableHPtr			tab;
	XTTabRecExtDRec		rec_buffer;
	size_t				src_size;
	xtLogID				src_log_id;
	xtLogOffset			src_log_offset;
	xtLogID				curr_log_id;
	xtLogOffset			curr_log_offset;
	xtLogID				dest_log_id = 0;
	xtLogOffset			dest_log_offset = 0;
	off_t				garbage_count = 0;

	memset(&cs, 0, sizeof(XTCompactorStateRec));

	if (!(cs.cs_seqread = new XTDataSeqRead()))
		xt_throw_errno(XT_CONTEXT, XT_ENOMEM);

	if (!cs.cs_seqread->sl_seq_init(db, xt_db_log_buffer_size)) {
		delete cs.cs_seqread;
		xt_throw(self);
	}
	pushr_(dl_free_compactor_state, &cs);

	if (!cs.cs_seqread->sl_seq_start(data_log->dlf_log_id, data_log->dlf_start_offset, FALSE))
		xt_throw(self);

	for (;;) {
		if (self->t_quit) {
			/* Flush the destination log: */
			xt_lock_mutex(self, &db->db_co_dlog_lock);
			pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
			if (!self->st_dlog_buf.dlb_flush_log(TRUE, self))
				xt_throw(self);
			freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)

			/* Flush the transaction log. */
			if (!xt_xlog_flush_log(db, self))
				xt_throw(self);

			xt_lock_mutex_ns(&db->db_datalogs.dlc_head_lock);
			data_log->dlf_garbage_count += garbage_count;
			ASSERT(data_log->dlf_garbage_count < data_log->dlf_log_eof);
			if (!dl_write_garbage_level(data_log, cs.cs_seqread->sl_seq_open_file(), TRUE, self)) {
				xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
				xt_throw(self);
			}
			xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);

			freer_(); // dl_free_compactor_state(&cs)
			return FAILED;
		}
		if (!cs.cs_seqread->sl_seq_next(&record, self))
			xt_throw(self);
		cs.cs_seqread->sl_seq_pos(&curr_log_id, &curr_log_offset);
		if (!record) {
			data_log->dlf_start_offset = curr_log_offset;
			break;
		}
		switch (record->xh.xh_status_1) {
			case XT_LOG_ENT_EXT_REC_OK:
				size = XT_GET_DISK_4(record->er.er_data_size_4);
				tab_id = XT_GET_DISK_4(record->er.er_tab_id_4);
				rec_id = XT_GET_DISK_4(record->er.er_rec_id_4);
				
				if (!(ot = dl_cs_get_open_table(self, &cs, tab_id)))
					break;
				tab = ot->ot_table;
				
				/* All this is required for a valid record address: */
				if (!rec_id || rec_id >= tab->tab_rec_eof_id)
					break;

				/* {LOCK-EXT-REC} It is important to prevent the compactor from modifying
				 * a record that has been freed (and maybe allocated again).
				 *
				 * Consider the following sequence:
				 *
				 * 1. Compactor reads the record.
				 * 2. The record is freed and reallocated.
				 * 3. The compactor updates the record.
				 *
				 * To prevent this, the compactor locks out the
				 * sweeper using the db_co_ext_lock lock. The db_co_ext_lock lock
				 * prevents a extended record from being moved and removed at the
				 * same time.
				 *
				 * The compactor also checks the status of the record before
				 * moving a record.
				 */
				xt_lock_mutex(self, &db->db_co_ext_lock);
				pushr_(xt_unlock_mutex, &db->db_co_ext_lock);

				/* Read the record: */
				if (!xt_tab_get_rec_data(ot, rec_id, offsetof(XTTabRecExtDRec, re_data), (xtWord1 *) &rec_buffer)) {
					xt_log_and_clear_warning(self);
					freer_(); // xt_unlock_mutex(&db->db_co_ext_lockk)
					break;
				}

				/* [(7)] REMOVE is followed by FREE:
				if (XT_REC_IS_REMOVED(rec_buffer.tr_rec_type_1) || !XT_REC_IS_EXT_DLOG(rec_buffer.tr_rec_type_1)) {
				*/
				if (!XT_REC_IS_EXT_DLOG(rec_buffer.tr_rec_type_1)) {
					freer_(); // xt_unlock_mutex(&db->db_co_ext_lock)
					break;
				}

				XT_GET_LOG_REF(src_log_id, src_log_offset, &rec_buffer);
				src_size = (size_t) XT_GET_DISK_4(rec_buffer.re_log_dat_siz_4);

				/* Does the record agree with the current position: */
				if (curr_log_id != src_log_id ||
					curr_log_offset != src_log_offset ||
					size != src_size) {
					freer_(); // xt_unlock_mutex(&db->db_co_ext_lock)
					break;
				}

				size = offsetof(XTactExtRecEntryDRec, er_data) + size;

				/* Allocate space in a destination log: */
				xt_lock_mutex(self, &db->db_co_dlog_lock);
				pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
				if (!self->st_dlog_buf.dlb_get_log_offset(&dest_log_id, &dest_log_offset, size, self))
					xt_throw(self);
				freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)

				/* This record is referenced by the data: */
				xt_db_set_size(self, &cs.cs_databuf, size);
				if (!cs.cs_seqread->sl_rnd_read(src_log_offset, size, cs.cs_databuf.db_data, NULL, self))
					xt_throw(self);

				/* The problem with writing to the buffer here, is that other
				 * threads want to read the data! */
				xt_lock_mutex(self, &db->db_co_dlog_lock);
				pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
				if (!self->st_dlog_buf.dlb_write_thru_log(dest_log_id, dest_log_offset, size, cs.cs_databuf.db_data, self))
					xt_throw(self);
				freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)

				/* Make sure we flush the compactor target log, before we
				 * flush the transaction log!!
				 * This is done here [(8)]
				 */

				XT_SET_LOG_REF(&rec_buffer, dest_log_id, dest_log_offset);
				xtOpSeqNo op_seq;
				if (!xt_tab_put_log_rec_data(ot, XT_LOG_ENT_REC_MOVED, 0, rec_id, 8, (xtWord1 *) &rec_buffer.re_log_id_2, &op_seq))
					xt_throw(self);
				tab->tab_co_op_seq = op_seq;

				/* Only records that were actually moved, count as garbage now!
				 * This means, lost records, remain "lost" as far as the garbage
				 * count is concerned!
				 */
				garbage_count += size;
				freer_(); // xt_unlock_mutex(&db->db_co_ext_lock)
				break;
		}
		data_log->dlf_start_offset = curr_log_offset;
	}

	/* Flush the distination log. */
	xt_lock_mutex(self, &db->db_co_dlog_lock);
	pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
	if (!self->st_dlog_buf.dlb_flush_log(TRUE, self))
		xt_throw(self);
	freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)
	
	/* Flush the transaction log. */
	if (!xt_xlog_flush_log(db, self))
		xt_throw(self);

	/* Save state in source log header. */
	xt_lock_mutex_ns(&db->db_datalogs.dlc_head_lock);
	data_log->dlf_garbage_count += garbage_count;
	ASSERT(data_log->dlf_garbage_count < data_log->dlf_log_eof);
	if (!dl_write_garbage_level(data_log, cs.cs_seqread->sl_seq_open_file(), TRUE, self)) {
		xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
		xt_throw(self);
	}
	xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);

	/* Wait for the writer to write all the changes.
	 * Then we can start the delete process for the log:
	 *
	 * Note, if we do not wait, then it could be some operations are held up,
	 * by being out of sequence. This could cause the log to be deleted
	 * before all the operations have been performed (which are on a table
	 * basis).
	 *
	 */
	for (;;) {
		u_int			edx;
		XTTableEntryPtr tab_ptr;
		xtBool			wait;

		if (self->t_quit) {
			freer_(); // dl_free_compactor_state(&cs)
			return FAILED;
		}
		wait = FALSE;
		xt_ht_lock(self, db->db_tables);
		pushr_(xt_ht_unlock, db->db_tables);
		xt_enum_tables_init(&edx);
		while ((tab_ptr = xt_enum_tables_next(self, db, &edx))) {
			if (tab_ptr->te_table && tab_ptr->te_table->tab_co_op_seq > tab_ptr->te_table->tab_head_op_seq) {
				wait = TRUE;
				break;
			}
		}
		freer_(); // xt_ht_unlock(db->db_tables)
		
		if (!wait)
			break;

		/* Nobody will wake me, so check again shortly! */
		dl_co_wait(self, db, 1);		
	}

	db->db_datalogs.dls_set_log_state(data_log, XT_DL_COMPACTED);

#ifdef DEBUG_LOG_DELETE
	printf("-- MARK FOR DELETE IN LOG: %d\n", (int) data_log->dlf_log_id);
#endif
	/* Log that this log should be deleted on the next checkpoint: */
	// transaction log...
	XTXactNewLogEntryDRec	log_rec;
	log_rec.xl_status_1 = XT_LOG_ENT_DEL_LOG;
	log_rec.xl_checksum_1 = XT_CHECKSUM_1(data_log->dlf_log_id);
	XT_SET_DISK_4(log_rec.xl_log_id_4, data_log->dlf_log_id);
	if (!xt_xlog_log_data(self, sizeof(XTXactNewLogEntryDRec), (XTXactLogBufferDPtr) &log_rec, XT_XLOG_WRITE_AND_FLUSH)) {
		db->db_datalogs.dls_set_log_state(data_log, XT_DL_TO_COMPACT);
		xt_throw(self);
	}

	freer_(); // dl_free_compactor_state(&cs)
	return OK;
}

static void dl_co_not_busy(XTThreadPtr XT_UNUSED(self), XTDatabaseHPtr db)
{
	db->db_co_busy = FALSE;
}

static void dl_co_main(XTThreadPtr self, xtBool once_off)
{
	XTDatabaseHPtr		db = self->st_database;
	xtLogID				*log_id_ptr, log_id;
	XTDataLogFilePtr	data_log = NULL;

	xt_set_low_priority(self);

	while (!self->t_quit) {
		while (!self->t_quit) {
			xt_lock_mutex_ns(&db->db_datalogs.dlc_lock);
			if ((log_id_ptr = (xtLogID *) xt_sl_first_item(db->db_datalogs.dlc_to_compact))) {
				log_id = *log_id_ptr;
			}
			else
				log_id = 0;
			xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
			if (!log_id)
				break;
			if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, FALSE, NULL))
				xt_throw(self);
			ASSERT(data_log);
			if (data_log) {
				db->db_co_busy = TRUE;
				pushr_(dl_co_not_busy, db);
				dl_collect_garbage(self, db, data_log);
				freer_(); // dl_co_not_busy(db)
			}
			else {
				xt_lock_mutex_ns(&db->db_datalogs.dlc_lock);
				xt_sl_delete(self, db->db_datalogs.dlc_to_compact, &log_id);
				xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
			}
		}

		if (once_off)
			break;

		/* Wait for a signal that a data log can be collected: */
		dl_co_wait(self, db, 120);
	}
}

static void *dl_run_co_thread(XTThreadPtr self)
{
	XTDatabaseHPtr	db = (XTDatabaseHPtr) self->t_data;
	int				count;
	void			*mysql_thread;

	if (!(mysql_thread = myxt_create_thread()))
		xt_throw(self);

	while (!self->t_quit) {
		try_(a) {
			/*
			 * The garbage collector requires that the database
			 * is in use because.
			 */
			xt_use_database(self, db, XT_FOR_COMPACTOR);

			/* This action is both safe and required:
			 *
			 * safe: releasing the database is safe because as
			 * long as this thread is running the database
			 * reference is valid, and this reference cannot
			 * be the only one to the database because
			 * otherwize this thread would not be running.
			 *
			 * required: releasing the database is necessary
			 * otherwise we cannot close the database
			 * correctly because we only shutdown this
			 * thread when the database is closed and we
			 * only close the database when all references
			 * are removed.
			 */
			xt_heap_release(self, self->st_database);

			dl_co_main(self, FALSE);
		}
		catch_(a) {
			if (!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
				self->t_exception.e_sys_err == SIGTERM))
				xt_log_and_clear_exception(self);
		}
		cont_(a);

		/* Avoid releasing the database (done above) */
		self->st_database = NULL;
		xt_unuse_database(self, self);

		/* After an exception, pause before trying again... */
		/* Number of seconds */
#ifdef DEBUG
		count = 10;
#else
		count = 2*60;
#endif
		while (!self->t_quit && count > 0) {
			sleep(1);
			count--;
		}
	}

   /*
	* {MYSQL-THREAD-KILL}
	myxt_destroy_thread(mysql_thread, TRUE);
	*/
	return NULL;
}

static void dl_free_co_thread(XTThreadPtr self, void *data)
{
	XTDatabaseHPtr db = (XTDatabaseHPtr) data;

	if (db->db_co_thread) {
		xt_lock_mutex(self, &db->db_datalogs.dlc_lock);
		pushr_(xt_unlock_mutex, &db->db_datalogs.dlc_lock);
		db->db_co_thread = NULL;
		freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)
	}
}

xtPublic void xt_start_compactor(XTThreadPtr self, XTDatabaseHPtr db)
{
	char name[PATH_MAX];

	sprintf(name, "GC-%s", xt_last_directory_of_path(db->db_main_path));
	xt_remove_dir_char(name);
	db->db_co_thread = xt_create_daemon(self, name);
	xt_set_thread_data(db->db_co_thread, db, dl_free_co_thread);
	xt_run_thread(self, db->db_co_thread, dl_run_co_thread);
}

static void dl_wake_co_thread(XTDatabaseHPtr db)
{
	if (!xt_signal_cond(NULL, &db->db_datalogs.dlc_cond))
		xt_log_and_clear_exception_ns();
}

xtPublic void xt_stop_compactor(XTThreadPtr self, XTDatabaseHPtr db)
{
	XTThreadPtr thr_co;

	if (db->db_co_thread) {
		xt_lock_mutex(self, &db->db_datalogs.dlc_lock);
		pushr_(xt_unlock_mutex, &db->db_datalogs.dlc_lock);

		/* This pointer is safe as long as you have the transaction lock. */
		if ((thr_co = db->db_co_thread)) {
			xtThreadID tid = thr_co->t_id;

			/* Make sure the thread quits when woken up. */
			xt_terminate_thread(self, thr_co);

			dl_wake_co_thread(db);
	
			freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)

			/*
			 * This seems to kill the whole server sometimes!!
			 * SIGTERM is going to a different thread??!
			xt_kill_thread(thread);
			 */
			xt_wait_for_thread(tid, FALSE);
	
			/* PMC - This should not be necessary to set the signal here, but in the
			 * debugger the handler is not called!!?
			thr_co->t_delayed_signal = SIGTERM;
			xt_kill_thread(thread);
			 */
			db->db_co_thread = NULL;
		}
		else
			freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)
	}
}