Commit 7c273b82 authored by unknown's avatar unknown

Very simple log files purger added.

File number types fixed.


storage/maria/unittest/Makefile.am:
  Test of purger added.
storage/maria/unittest/ma_test_loghandler_purge-t.c:
  New BitKeeper file ``storage/maria/unittest/ma_test_loghandler_purge-t.c''
parent d430e5bf
......@@ -172,6 +172,13 @@ struct st_translog_descriptor
*/
pthread_mutex_t unfinished_files_lock;
DYNAMIC_ARRAY unfinished_files;
/* Purger data: minimum file in the log (or 0 if unknown) */
uint32 min_file_number;
/* Protect purger from many calls and it's data */
pthread_mutex_t purger_lock;
/* last low water mark checked */
LSN last_lsn_checked;
};
static struct st_translog_descriptor log_descriptor;
......@@ -743,13 +750,13 @@ my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc, File file)
@retval 1 Error
*/
static my_bool translog_set_lsn_for_files(ulong from_file, ulong to_file,
static my_bool translog_set_lsn_for_files(uint32 from_file, uint32 to_file,
LSN lsn, my_bool is_locked)
{
ulong file;
uint32 file;
DBUG_ENTER("translog_set_lsn_for_files");
DBUG_PRINT("enter", ("From: %lu to: %lu lsn: (%lu,0x%lx) locked: %d",
from_file, to_file,
(ulong) from_file, (ulong) to_file,
(ulong) LSN_FILE_NO(lsn), (ulong) LSN_OFFSET(lsn),
is_locked));
DBUG_ASSERT(from_file <= to_file);
......@@ -758,7 +765,7 @@ static my_bool translog_set_lsn_for_files(ulong from_file, ulong to_file,
/* Checks the current file (not finished yet file) */
if (!is_locked)
translog_lock();
if (to_file == (ulong) LSN_FILE_NO(log_descriptor.horizon))
if (to_file == (uint32) LSN_FILE_NO(log_descriptor.horizon))
{
if (likely(cmp_translog_addr(lsn, log_descriptor.max_lsn) > 0))
log_descriptor.max_lsn= lsn;
......@@ -788,8 +795,8 @@ static my_bool translog_set_lsn_for_files(ulong from_file, ulong to_file,
/* descriptor of file in unfinished_files */
struct st_file_counter
{
ulong file; /* file number */
ulong counter; /* counter for started writes */
uint32 file; /* file number */
uint32 counter; /* counter for started writes */
};
......@@ -799,14 +806,14 @@ struct st_file_counter
@param file log file number
*/
static void translog_mark_file_unfinished(ulong file)
static void translog_mark_file_unfinished(uint32 file)
{
int place, i;
struct st_file_counter fc, *fc_ptr;
fc.file= file; fc.counter= 1;
DBUG_ENTER("translog_mark_file_unfinished");
DBUG_PRINT("enter", ("file: %lu", file));
DBUG_PRINT("enter", ("file: %lu", (ulong) file));
pthread_mutex_lock(&log_descriptor.unfinished_files_lock);
......@@ -871,13 +878,13 @@ end:
@param file log file number
*/
static void translog_mark_file_finished(ulong file)
static void translog_mark_file_finished(uint32 file)
{
int i;
struct st_file_counter *fc_ptr;
DBUG_ENTER("translog_mark_file_finished");
DBUG_PRINT("enter", ("file: %lu", file));
DBUG_PRINT("enter", ("file: %lu", (ulong) file));
pthread_mutex_lock(&log_descriptor.unfinished_files_lock);
......@@ -913,11 +920,11 @@ static void translog_mark_file_finished(ulong file)
@retval # LSN of the record which parts stored in this file
*/
LSN translog_get_file_max_lsn_stored(ulong file)
LSN translog_get_file_max_lsn_stored(uint32 file)
{
ulong limit= FILENO_IMPOSSIBLE;
uint32 limit= FILENO_IMPOSSIBLE;
DBUG_ENTER("translog_get_file_max_lsn_stored");
DBUG_PRINT("enter", ("file: %lu", file));
DBUG_PRINT("enter", ("file: %lu", (ulong)file));
pthread_mutex_lock(&log_descriptor.unfinished_files_lock);
......@@ -2540,10 +2547,14 @@ my_bool translog_init(const char *directory,
MY_MUTEX_INIT_FAST) ||
pthread_mutex_init(&log_descriptor.unfinished_files_lock,
MY_MUTEX_INIT_FAST) ||
pthread_mutex_init(&log_descriptor.purger_lock,
MY_MUTEX_INIT_FAST) ||
init_dynamic_array(&log_descriptor.unfinished_files,
sizeof(struct st_file_counter),
10, 10 CALLER_INFO))
DBUG_RETURN(1);
log_descriptor.min_file_number= 0;
log_descriptor.last_lsn_checked= LSN_IMPOSSIBLE;
/* Directory to store files */
unpack_dirname(log_descriptor.directory, directory);
......@@ -2919,6 +2930,7 @@ void translog_destroy()
pthread_mutex_destroy(&log_descriptor.sent_to_file_lock);
pthread_mutex_destroy(&log_descriptor.file_header_lock);
pthread_mutex_destroy(&log_descriptor.unfinished_files_lock);
pthread_mutex_destroy(&log_descriptor.purger_lock);
delete_dynamic(&log_descriptor.unfinished_files);
my_close(log_descriptor.directory_fd, MYF(MY_WME));
......@@ -6318,39 +6330,45 @@ my_bool translog_is_file(uint file_no)
MY_STAT stat_buff;
char path[FN_REFLEN];
return (test(my_stat(translog_filename_by_fileno(file_no, path),
&stat_buff, MYF(MY_WME))));
&stat_buff, MYF(0))));
}
/**
@brief returns the LSN of the first record starting in this log
@brief returns minimum log file number
@retval LSN_ERROR Error
@retval LSN_IMPOSSIBLE no log
@retval # LSN of the first record
@param horizon the end of the log
@param is_protected true if it is under purge_log protection
@retval minimum file number
@retval 0 no files found
*/
LSN translog_first_lsn_in_log()
static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
{
TRANSLOG_ADDRESS addr, horizon= translog_get_horizon();
TRANSLOG_VALIDATOR_DATA data;
uint min_file= 1, max_file= LSN_FILE_NO(horizon);
uint chunk_type;
uint16 chunk_offset;
uchar *page;
TRANSLOG_SCANNER_DATA scanner;
DBUG_ENTER("translog_first_lsn_in_log");
DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)",
(ulong) LSN_FILE_NO(addr), (ulong) LSN_OFFSET(addr)));
TRANSLOG_ADDRESS addr;
uint min_file= 1, max_file;
DBUG_ENTER("translog_first_file");
if (!is_protected)
pthread_mutex_lock(&log_descriptor.purger_lock);
if (log_descriptor.min_file_number &&
translog_is_file(log_descriptor.min_file_number))
{
DBUG_PRINT("info", ("cached %lu",
(ulong) log_descriptor.min_file_number));
if (!is_protected)
pthread_mutex_unlock(&log_descriptor.purger_lock);
DBUG_RETURN(log_descriptor.min_file_number);
}
if (addr == MAKE_LSN(1, TRANSLOG_PAGE_SIZE))
max_file= LSN_FILE_NO(horizon);
if (MAKE_LSN(1, TRANSLOG_PAGE_SIZE) >= horizon)
{
/* there is no first page yet */
DBUG_RETURN(LSN_IMPOSSIBLE);
DBUG_RETURN(0);
}
/*TODO: lock loghandler purger when it will be created */
/* binary search for last file */
while (min_file != max_file && min_file != (max_file - 1))
{
......@@ -6364,8 +6382,41 @@ LSN translog_first_lsn_in_log()
else
min_file= test;
}
log_descriptor.min_file_number= max_file;
if (!is_protected)
pthread_mutex_unlock(&log_descriptor.purger_lock);
DBUG_RETURN(max_file);
}
/**
@brief returns the LSN of the first record starting in this log
@retval LSN_ERROR Error
@retval LSN_IMPOSSIBLE no log
@retval # LSN of the first record
*/
addr= MAKE_LSN(max_file, TRANSLOG_PAGE_SIZE); /* the first page of the file */
LSN translog_first_lsn_in_log()
{
TRANSLOG_ADDRESS addr, horizon= translog_get_horizon();
TRANSLOG_VALIDATOR_DATA data;
uint file;
uint chunk_type;
uint16 chunk_offset;
uchar *page;
TRANSLOG_SCANNER_DATA scanner;
DBUG_ENTER("translog_first_lsn_in_log");
DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)",
(ulong) LSN_FILE_NO(addr), (ulong) LSN_OFFSET(addr)));
if (!(file= translog_first_file(horizon, 0)))
{
/* log has no records yet */
DBUG_RETURN(LSN_IMPOSSIBLE);
}
addr= MAKE_LSN(file, TRANSLOG_PAGE_SIZE); /* the first page of the file */
data.addr= &addr;
if ((page= translog_get_page(&data, scanner.buffer)) == NULL ||
(chunk_offset= translog_get_first_chunk_offset(page)) == 0)
......@@ -6415,7 +6466,7 @@ LSN translog_first_theoretical_lsn()
DBUG_RETURN(LSN_IMPOSSIBLE);
if (addr == MAKE_LSN(1, TRANSLOG_PAGE_SIZE))
{
/* there is no first page yet */
/* log has no records yet */
DBUG_RETURN(MAKE_LSN(1, TRANSLOG_PAGE_SIZE +
log_descriptor.page_overhead));
}
......@@ -6428,3 +6479,55 @@ LSN translog_first_theoretical_lsn()
DBUG_RETURN(MAKE_LSN(1, TRANSLOG_PAGE_SIZE +
page_overhead[page[TRANSLOG_PAGE_FLAGS]]));
}
/**
@brief Check given low water mark and purge files if it is need
@param low the last (minimum) LSN which is need
@retval 0 OK
@retval 1 Error
*/
my_bool translog_purge(LSN low)
{
uint32 last_need_file= LSN_FILE_NO(low);
TRANSLOG_ADDRESS horizon= translog_get_horizon();
int rc= 0;
DBUG_ENTER("translog_purge");
DBUG_PRINT("enter", ("low: (%lu,0x%lx)",
(ulong)LSN_FILE_NO(low),
(ulong)LSN_OFFSET(low)));
pthread_mutex_lock(&log_descriptor.purger_lock);
if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file)
{
uint32 i;
uint32 min_file= translog_first_file(horizon, 1);
DBUG_ASSERT(min_file != 0); /* log is already started */
for(i= min_file; i < last_need_file && rc == 0; i++)
{
LSN lsn= translog_get_file_max_lsn_stored(i);
if (lsn == LSN_IMPOSSIBLE)
break; /* files are still in writing */
if (lsn == LSN_ERROR)
{
rc= 1;
break;
}
if (cmp_translog_addr(lsn, low) >= 0)
break;
DBUG_PRINT("info", ("purge file %lu", (ulong) i));
{
char path[FN_REFLEN], *file_name;
file_name= translog_filename_by_fileno(i, path);
rc= test(my_delete(file_name, MYF(MY_WME)));
}
}
}
pthread_mutex_unlock(&log_descriptor.purger_lock);
DBUG_RETURN(rc);
}
......@@ -250,7 +250,9 @@ extern my_bool translog_init_scanner(LSN lsn,
extern int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner,
TRANSLOG_HEADER_BUFFER *buff);
extern LSN translog_get_file_max_lsn_stored(ulong file);
extern LSN translog_get_file_max_lsn_stored(uint32 file);
extern my_bool translog_purge(LSN low);
extern my_bool translog_is_file(uint file_no);
extern my_bool translog_lock();
extern my_bool translog_unlock();
extern void translog_lock_assert_owner();
......
......@@ -45,7 +45,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \
ma_test_loghandler_long-t-big \
ma_test_loghandler_noflush-t \
ma_test_loghandler_first_lsn-t \
ma_test_loghandler_max_lsn-t
ma_test_loghandler_max_lsn-t \
ma_test_loghandler_purge-t
ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c
ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c
......@@ -56,6 +57,7 @@ ma_test_loghandler_long_t_big_CPPFLAGS = -DLONG_LOG_TEST
ma_test_loghandler_noflush_t_SOURCES = ma_test_loghandler_noflush-t.c ma_maria_log_cleanup.c
ma_test_loghandler_first_lsn_t_SOURCES = ma_test_loghandler_first_lsn-t.c ma_maria_log_cleanup.c
ma_test_loghandler_max_lsn_t_SOURCES = ma_test_loghandler_max_lsn-t.c ma_maria_log_cleanup.c
ma_test_loghandler_purge_t_SOURCES = ma_test_loghandler_purge-t.c ma_maria_log_cleanup.c
ma_pagecache_single_src = ma_pagecache_single.c test_file.c
ma_pagecache_consist_src = ma_pagecache_consist.c test_file.c
......
#include "../maria_def.h"
#include <stdio.h>
#include <errno.h>
#include <tap.h>
#include "../trnman.h"
extern my_bool maria_log_remove();
#ifndef DBUG_OFF
static const char *default_dbug_option;
#endif
#define PCACHE_SIZE (1024*1024*10)
#define PCACHE_PAGE TRANSLOG_PAGE_SIZE
#define LOG_FILE_SIZE (4*1024L*1024L)
#define LOG_FLAGS 0
#define LONG_BUFFER_SIZE (LOG_FILE_SIZE + LOG_FILE_SIZE / 2)
int main(int argc __attribute__((unused)), char *argv[])
{
ulong i;
uint pagen;
uchar long_tr_id[6];
PAGECACHE pagecache;
LSN lsn;
LEX_STRING parts[TRANSLOG_INTERNAL_PARTS + 1];
uchar *long_buffer= malloc(LONG_BUFFER_SIZE);
MY_INIT(argv[0]);
plan(4);
bzero(&pagecache, sizeof(pagecache));
bzero(long_buffer, LONG_BUFFER_SIZE);
maria_data_root= ".";
if (maria_log_remove())
exit(1);
bzero(long_tr_id, 6);
#ifndef DBUG_OFF
#if defined(__WIN__)
default_dbug_option= "d:t:i:O,\\ma_test_loghandler.trace";
#else
default_dbug_option= "d:t:i:o,/tmp/ma_test_loghandler.trace";
#endif
if (argc > 1)
{
DBUG_SET(default_dbug_option);
DBUG_SET_INITIAL(default_dbug_option);
}
#endif
if (ma_control_file_create_or_open(TRUE))
{
fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1);
}
if ((pagen= init_pagecache(&pagecache, PCACHE_SIZE, 0, 0,
PCACHE_PAGE)) == 0)
{
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1);
}
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
/* write more then 1 file */
int4store(long_tr_id, 0);
parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id;
parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6;
if (translog_write_record(&lsn,
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
exit(1);
}
translog_purge(lsn);
if (!translog_is_file(1))
{
fprintf(stderr, "First file was removed after first record\n");
translog_destroy();
exit(1);
}
ok(1, "First is not removed");
for(i= 0; i < LOG_FILE_SIZE/6 && LSN_FILE_NO(lsn) == 1; i++)
{
if (translog_write_record(&lsn,
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
exit(1);
}
}
translog_purge(lsn);
if (translog_is_file(1))
{
fprintf(stderr, "First file was not removed.\n");
translog_destroy();
exit(1);
}
ok(1, "First file is removed");
parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_buffer;
parts[TRANSLOG_INTERNAL_PARTS + 0].length= LONG_BUFFER_SIZE;
if (translog_write_record(&lsn,
LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, LONG_BUFFER_SIZE,
TRANSLOG_INTERNAL_PARTS + 1, parts, NULL))
{
fprintf(stderr, "Can't write variable record\n");
translog_destroy();
exit(1);
}
translog_purge(lsn);
if (!translog_is_file(2) || !translog_is_file(3))
{
fprintf(stderr, "Second file (%d) or third file (%d) is not present.\n",
translog_is_file(2), translog_is_file(3));
translog_destroy();
exit(1);
}
ok(1, "Second and third files are not removed");
int4store(long_tr_id, 0);
parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id;
parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6;
if (translog_write_record(&lsn,
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
{
fprintf(stderr, "Can't write last record\n");
translog_destroy();
exit(1);
}
translog_purge(lsn);
if (translog_is_file(2))
{
fprintf(stderr, "Second file is not removed\n");
translog_destroy();
exit(1);
}
ok(1, "Second file is removed");
translog_destroy();
end_pagecache(&pagecache, 1);
ma_control_file_end();
if (maria_log_remove())
exit(1);
exit(0);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment