Commit 6b89901c authored by unknown's avatar unknown

Merge desktop.sanja.is.com.ua:/home/bell/mysql/bk/mysql-maria

into  desktop.sanja.is.com.ua:/home/bell/mysql/bk/work-maria-testfail


mysql-test/t/maria-recovery.test:
  Auto merged
storage/maria/ma_loghandler.c:
  Auto merged
parents 01ea6c1e 77e08a74
set storage_engine=maria;
affected rows: 0
set global maria_log_file_size=4294967296;
affected rows: 0
drop table if exists t1, t2;
affected rows: 0
create table t1(a char(3));
......
set global storage_engine=maria;
set session storage_engine=maria;
set global maria_log_file_size=4294967296;
drop table if exists t1;
SET SQL_WARNINGS=1;
RESET MASTER;
......
set global storage_engine=maria;
set session storage_engine=maria;
set global maria_log_file_size=4294967296;
drop table if exists t1,t2;
SET SQL_WARNINGS=1;
CREATE TABLE t1 (
......@@ -32,51 +33,60 @@ insert into t2 select * from t1;
insert into t1 select * from t2;
insert into t2 select * from t1;
insert into t1 select * from t2;
set global maria_log_file_size=16777216;
set global maria_checkpoint_interval=30;
SHOW ENGINE maria logs;
Type Name Status
maria master-data/maria_log.00000002 in use
insert into t2 select * from t1;
insert into t1 select * from t2;
set global maria_checkpoint_interval=2;
set global maria_checkpoint_interval=30;
SHOW ENGINE maria logs;
Type Name Status
maria master-data/maria_log.00000001 in use
maria master-data/maria_log.00000004 in use
set global maria_log_file_size=16777216;
select @@global.maria_log_file_size;
@@global.maria_log_file_size
16777216
set global maria_checkpoint_interval=30;
SHOW ENGINE maria logs;
Type Name Status
maria master-data/maria_log.00000002 in use
maria master-data/maria_log.00000004 in use
set global maria_log_file_size=8388608;
select @@global.maria_log_file_size;
@@global.maria_log_file_size
8388608
set global maria_log_purge_type=at_flush;
insert into t1 select * from t2;
set global maria_checkpoint_interval=30;
SHOW ENGINE maria logs;
Type Name Status
maria master-data/maria_log.00000002 free
maria master-data/maria_log.00000003 free
maria master-data/maria_log.00000004 free
maria master-data/maria_log.00000005 in use
maria master-data/maria_log.00000005 free
maria master-data/maria_log.00000006 free
maria master-data/maria_log.00000007 free
maria master-data/maria_log.00000008 in use
flush logs;
SHOW ENGINE maria logs;
Type Name Status
maria master-data/maria_log.00000005 in use
maria master-data/maria_log.00000008 in use
set global maria_log_file_size=16777216;
set global maria_log_purge_type=external;
insert into t1 select * from t2;
set global maria_checkpoint_interval=30;
SHOW ENGINE maria logs;
Type Name Status
maria master-data/maria_log.00000005 free
maria master-data/maria_log.00000006 in use
maria master-data/maria_log.00000008 free
maria master-data/maria_log.00000009 in use
flush logs;
SHOW ENGINE maria logs;
Type Name Status
maria master-data/maria_log.00000005 free
maria master-data/maria_log.00000006 in use
maria master-data/maria_log.00000008 free
maria master-data/maria_log.00000009 in use
set global maria_log_purge_type=immediate;
insert into t1 select * from t2;
set global maria_checkpoint_interval=30;
SHOW ENGINE maria logs;
Type Name Status
maria master-data/maria_log.00000007 in use
maria master-data/maria_log.00000008 in use
maria master-data/maria_log.00000011 in use
drop table t1, t2;
set global storage_engine=maria;
set session storage_engine=maria;
set global maria_log_file_size=4294967296;
drop table if exists t1,t2;
SET SQL_WARNINGS=1;
CREATE TABLE t1 (
......@@ -2048,7 +2049,7 @@ show variables like 'maria%';
Variable_name Value
maria_block_size 8192
maria_checkpoint_interval 30
maria_log_file_size 1073741824
maria_log_file_size 4294959104
maria_log_purge_type immediate
maria_max_sort_file_size 9223372036853727232
maria_pagecache_age_threshold 300
......
set global maria_log_file_size=4294967296;
use test;
drop table if exists t1, t9 ;
create table t1
......
# Test of scenarios potentially too big for --valgrind or --mem
-- source include/have_maria.inc
enable_info;
set storage_engine=maria;
set global maria_log_file_size=4294967296;
disable_warnings;
drop table if exists t1, t2;
enable_warnings;
......
......@@ -9,6 +9,8 @@ let $default=`select @@global.storage_engine`;
set global storage_engine=maria;
set session storage_engine=maria;
set global maria_log_file_size=4294967296;
# Initialise
--disable_warnings
drop table if exists t1;
......
......@@ -6,6 +6,7 @@ set session storage_engine=maria;
let $def_logsize=`select @@global.maria_log_file_size`;
let $def_checkinterval=`select @@global.maria_checkpoint_interval`;
set global maria_log_file_size=4294967296;
# Initialise
--disable_warnings
drop table if exists t1,t2;
......@@ -19,6 +20,7 @@ CREATE TABLE t2 (
STRING_DATA char(255) default NULL
);
INSERT INTO t1 VALUES ('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA');
INSERT INTO t1 VALUES ('DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD');
insert into t2 select * from t1;
......@@ -43,16 +45,23 @@ insert into t2 select * from t1;
insert into t1 select * from t2;
insert into t2 select * from t1;
insert into t1 select * from t2;
set global maria_log_file_size=16777216;
eval set global maria_checkpoint_interval=$def_checkinterval;
--replace_regex /Size +[0-9]+ ; .+master-data/master-data/
SHOW ENGINE maria logs;
insert into t2 select * from t1;
insert into t1 select * from t2;
set global maria_checkpoint_interval=2;
eval set global maria_checkpoint_interval=$def_checkinterval;
--replace_regex /Size +[0-9]+ ; .+master-data/master-data/
SHOW ENGINE maria logs;
set global maria_log_file_size=16777216;
select @@global.maria_log_file_size;
sleep 7;
eval set global maria_checkpoint_interval=$def_checkinterval;
--replace_regex /Size +[0-9]+ ; .+master-data/master-data/
SHOW ENGINE maria logs;
set global maria_log_file_size=8388608;
......@@ -60,7 +69,7 @@ select @@global.maria_log_file_size;
set global maria_log_purge_type=at_flush;
insert into t1 select * from t2;
sleep 7;
eval set global maria_checkpoint_interval=$def_checkinterval;
--replace_regex /Size +[0-9]+ ; .+master-data/master-data/
SHOW ENGINE maria logs;
flush logs;
......@@ -70,7 +79,7 @@ SHOW ENGINE maria logs;
set global maria_log_file_size=16777216;
set global maria_log_purge_type=external;
insert into t1 select * from t2;
sleep 7;
eval set global maria_checkpoint_interval=$def_checkinterval;
--replace_regex /Size +[0-9]+ ; .+master-data/master-data/
SHOW ENGINE maria logs;
flush logs;
......@@ -79,7 +88,7 @@ SHOW ENGINE maria logs;
set global maria_log_purge_type=immediate;
insert into t1 select * from t2;
sleep 7;
eval set global maria_checkpoint_interval=$def_checkinterval;
--replace_regex /Size +[0-9]+ ; .+master-data/master-data/
SHOW ENGINE maria logs;
......
......@@ -5,6 +5,8 @@
--source include/have_debug.inc
--source include/have_maria.inc
set global maria_log_file_size=4294967296;
--disable_warnings
drop database if exists mysqltest;
--enable_warnings
......
......@@ -9,6 +9,8 @@ let $default=`select @@global.storage_engine`;
set global storage_engine=maria;
set session storage_engine=maria;
set global maria_log_file_size=4294967296;
# Initialise
--disable_warnings
drop table if exists t1,t2;
......
......@@ -8,6 +8,8 @@
# NOTE: PLEASE SEE ps_1general.test (bottom)
# BEFORE ADDING NEW TEST CASES HERE !!!
set global maria_log_file_size=4294967296;
use test;
-- source include/have_maria.inc
......
......@@ -90,8 +90,9 @@ struct st_translog_buffer
struct st_my_thread_var *waiting_flush;
/*
Pointer on the buffer which overlap with this one (due to flush of
loghandler) and have to be written first (because contain old content of
page which present in both buffers)
loghandler, the last page of that buffer is the same as the first page
of this buffer) and have to be written first (because contain old
content of page which present in both buffers)
*/
struct st_translog_buffer *overlay;
#ifndef DBUG_OFF
......@@ -99,8 +100,25 @@ struct st_translog_buffer
#endif
/*
Lock for the buffer.
Current buffer also lock the whole handler (if one want lock the handler
one should lock the current buffer)
one should lock the current buffer).
Buffers are locked only in one direction (with overflow and beginning
from the first buffer). If we keep lock on buffer N we can lock only
buffer N+1 (never N-1).
One thread do not lock more then 2 buffer in a time, so to make dead
lock it should be N thread (where N equal number of buffers) takes one
buffer and try to lock next. But it is impossible because there is only
2 cases when thread take 2 buffers: 1) one thread finishes current
buffer (where horizon is) and start next (to which horizon moves). 2)
flush start from buffer after current (oldest) and go till the current
crabbing by buffer sequence. And there is only one flush in a moment
(they are serialised).
Because of above and number of buffers equal 5 we can't get dead lock (it is
impossible to get all 5 buffers locked simultaneously).
*/
pthread_mutex_t mutex;
/* Cache for current log. */
......@@ -201,7 +219,12 @@ struct st_translog_descriptor
TRANSLOG_ADDRESS previous_flush_horizon;
/* All what is after this address is not sent to disk yet */
TRANSLOG_ADDRESS in_buffers_only;
/* protection of sent_to_file and in_buffers_only */
pthread_mutex_t sent_to_disk_lock;
/*
Protect flushed (see above) and for flush serialization (will
be removed in v1.5
*/
pthread_mutex_t log_flush_lock;
/* Protects changing of headers of finished files (max_lsn) */
......@@ -628,8 +651,9 @@ static void loghandler_init()
i < LOGREC_NUMBER_OF_TYPES;
i++)
log_record_type_descriptor[i].rclass= LOGRECTYPE_NOT_ALLOWED;
DBUG_EXECUTE("info",
check_translog_description_table(LOGREC_INCOMPLETE_GROUP););
#ifndef DBUG_OFF
check_translog_description_table(LOGREC_INCOMPLETE_GROUP);
#endif
};
......@@ -654,7 +678,6 @@ const char *maria_data_root;
cursor cursor which will be checked
*/
#ifndef DBUG_OFF
static void translog_check_cursor(struct st_buffer_cursor *cursor)
{
DBUG_ASSERT(cursor->chaser ||
......@@ -665,7 +688,6 @@ static void translog_check_cursor(struct st_buffer_cursor *cursor)
cursor->current_page_fill % TRANSLOG_PAGE_SIZE);
DBUG_ASSERT(cursor->current_page_fill <= TRANSLOG_PAGE_SIZE);
}
#endif
/*
@brief Get file name of the log by log number
......@@ -1248,19 +1270,17 @@ static my_bool translog_create_new_file()
}
/*
Lock the loghandler buffer
/**
@brief Locks the loghandler buffer.
SYNOPSIS
translog_buffer_lock()
buffer This buffer which should be locked
@param buffer This buffer which should be locked
RETURN
0 OK
1 Error
@note See comment before buffer 'mutex' variable.
@retval 0 OK
@retval 1 Error
*/
#ifndef DBUG_OFF
static my_bool translog_buffer_lock(struct st_translog_buffer *buffer)
{
my_bool res;
......@@ -1271,10 +1291,6 @@ static my_bool translog_buffer_lock(struct st_translog_buffer *buffer)
res= (pthread_mutex_lock(&buffer->mutex) != 0);
DBUG_RETURN(res);
}
#else
#define translog_buffer_lock(B) \
pthread_mutex_lock(&((B)->mutex))
#endif
/*
......@@ -1289,7 +1305,6 @@ static my_bool translog_buffer_lock(struct st_translog_buffer *buffer)
1 Error
*/
#ifndef DBUG_OFF
static my_bool translog_buffer_unlock(struct st_translog_buffer *buffer)
{
my_bool res;
......@@ -1300,10 +1315,6 @@ static my_bool translog_buffer_unlock(struct st_translog_buffer *buffer)
res= (pthread_mutex_unlock(&buffer->mutex) != 0);
DBUG_RETURN(res);
}
#else
#define translog_buffer_unlock(B) \
pthread_mutex_unlock(&((B)->mutex))
#endif
/*
......@@ -1357,8 +1368,8 @@ static void translog_new_page_header(TRANSLOG_ADDRESS *horizon,
higher level pseudo random value generator
*/
uint16 tmp_time= time(NULL);
int2store(ptr, tmp_time);
ptr+= (TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE) * 2;
ptr[0]= tmp_time & 0xFF;
ptr+= TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
}
{
uint len= (ptr - cursor->ptr);
......@@ -1372,7 +1383,7 @@ static void translog_new_page_header(TRANSLOG_ADDRESS *horizon,
(uint) cursor->buffer->buffer_no, (ulong) cursor->buffer,
cursor->chaser, (ulong) cursor->buffer->size,
(ulong) (cursor->ptr - cursor->buffer->buffer)));
DBUG_EXECUTE("info", translog_check_cursor(cursor););
translog_check_cursor(cursor);
DBUG_VOID_RETURN;
}
......@@ -1394,16 +1405,19 @@ static void translog_put_sector_protection(uchar *page,
struct st_buffer_cursor *cursor)
{
uchar *table= page + log_descriptor.page_overhead -
(TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE) * 2;
uint16 value= uint2korr(table) + cursor->write_counter;
TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
uint i, offset;
uint16 last_protected_sector= ((cursor->previous_offset - 1) /
DISK_DRIVE_SECTOR_SIZE);
uint16 start_sector= cursor->previous_offset / DISK_DRIVE_SECTOR_SIZE;
uint i, offset;
uint8 value= table[0] + cursor->write_counter;
DBUG_ENTER("translog_put_sector_protection");
if (start_sector == 0)
start_sector= 1; /* First sector is protected */
{
/* First sector is protected by file & page numbers in the page header. */
start_sector= 1;
}
DBUG_PRINT("enter", ("Write counter:%u value:%u offset:%u, "
"last protected:%u start sector:%u",
......@@ -1413,28 +1427,22 @@ static void translog_put_sector_protection(uchar *page,
(uint) last_protected_sector, (uint) start_sector));
if (last_protected_sector == start_sector)
{
i= last_protected_sector * 2;
i= last_protected_sector;
offset= last_protected_sector * DISK_DRIVE_SECTOR_SIZE;
/* restore data, because we modified sector which was protected */
if (offset < cursor->previous_offset)
page[offset]= table[i];
offset++;
if (offset < cursor->previous_offset)
page[offset]= table[i + 1];
}
for (i= start_sector * 2, offset= start_sector * DISK_DRIVE_SECTOR_SIZE;
i < (TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE) * 2;
(i+= 2), (offset+= DISK_DRIVE_SECTOR_SIZE))
for (i= start_sector, offset= start_sector * DISK_DRIVE_SECTOR_SIZE;
i < TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
i++, (offset+= DISK_DRIVE_SECTOR_SIZE))
{
DBUG_PRINT("info", ("sector:%u offset:%u data 0x%x%x",
i / 2, offset, (uint) page[offset],
(uint) page[offset + 1]));
DBUG_PRINT("info", ("sector:%u offset:%u data 0x%x",
i, offset, (uint) page[offset]));
table[i]= page[offset];
table[i + 1]= page[offset + 1];
int2store(page + offset, value);
DBUG_PRINT("info", ("sector:%u offset:%u data 0x%x%x",
i / 2, offset, (uint) page[offset],
(uint) page[offset + 1]));
page[offset]= value;
DBUG_PRINT("info", ("sector:%u offset:%u data 0x%x",
i, offset, (uint) page[offset]));
}
DBUG_VOID_RETURN;
}
......@@ -1487,7 +1495,7 @@ static void translog_finish_page(TRANSLOG_ADDRESS *horizon,
(ulong) (cursor->ptr -cursor->buffer->buffer),
(uint) cursor->current_page_fill, (uint) left));
DBUG_ASSERT(LSN_FILE_NO(*horizon) == LSN_FILE_NO(cursor->buffer->offset));
DBUG_EXECUTE("info", translog_check_cursor(cursor););
translog_check_cursor(cursor);
if (cursor->protected)
{
DBUG_PRINT("info", ("Already protected and finished"));
......@@ -1512,7 +1520,7 @@ static void translog_finish_page(TRANSLOG_ADDRESS *horizon,
(ulong) cursor->buffer, cursor->chaser,
(ulong) cursor->buffer->size,
(ulong) (cursor->ptr - cursor->buffer->buffer)));
DBUG_EXECUTE("info", translog_check_cursor(cursor););
translog_check_cursor(cursor);
}
/*
When we are finishing the page other thread might not finish the page
......@@ -1665,7 +1673,7 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
(uint) cursor->buffer->buffer_no, (ulong) cursor->buffer,
cursor->chaser, (ulong) cursor->buffer->size,
(ulong) (cursor->ptr - cursor->buffer->buffer)));
DBUG_EXECUTE("info", translog_check_cursor(cursor););
translog_check_cursor(cursor);
DBUG_VOID_RETURN;
}
......@@ -2087,11 +2095,14 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
It is possible for single request for flush and destroying the
loghandler.
*/
translog_buffer_unlock(buffer);
DBUG_RETURN(0);
}
}
/*
Send page by page in the pagecache what we are going to write on the
disk
*/
file.file= buffer->file;
for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE;
i < buffer->size;
......@@ -2277,23 +2288,29 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
{
uint i, offset;
uchar *table= page_pos;
uint16 current= uint2korr(table);
for (i= 2, offset= DISK_DRIVE_SECTOR_SIZE;
i < (TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE) * 2;
i+= 2, offset+= DISK_DRIVE_SECTOR_SIZE)
uint8 current= table[0];
for (i= 1, offset= DISK_DRIVE_SECTOR_SIZE;
i < TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
i++, offset+= DISK_DRIVE_SECTOR_SIZE)
{
/*
TODO: add chunk counting for "suspecting" sectors (difference is
more than 1-2)
more than 1-2), if difference more then present chunks then it is
the problem.
*/
uint16 test= uint2korr(page + offset);
uint8 test= page[offset];
DBUG_PRINT("info", ("sector: #%u offset: %u current: %lx "
"read: 0x%x stored: 0x%x%x",
i / 2, offset, (ulong) current,
i, offset, (ulong) current,
(uint) uint2korr(page + offset), (uint) table[i],
(uint) table[i + 1]));
/*
3 is minimal possible record length. So we can have "distance"
between 2 sectors value more then DISK_DRIVE_SECTOR_SIZE / 3
only if it is old value, i.e. the sector was not written.
*/
if (((test < current) &&
(LL(0xFFFF) - current + test > DISK_DRIVE_SECTOR_SIZE / 3)) ||
(0xFFL - current + test > DISK_DRIVE_SECTOR_SIZE / 3)) ||
((test >= current) &&
(test - current > DISK_DRIVE_SECTOR_SIZE / 3)))
{
......@@ -2303,30 +2320,26 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
DBUG_RETURN(0);
}
/* Return value on the page */
/* Restore value on the page */
page[offset]= table[i];
page[offset + 1]= table[i + 1];
current= test;
DBUG_PRINT("info", ("sector: #%u offset: %u current: %lx "
"read: 0x%x stored: 0x%x%x",
i / 2, offset, (ulong) current,
(uint) uint2korr(page + offset), (uint) table[i],
(uint) table[i + 1]));
"read: 0x%x stored: 0x%x",
i, offset, (ulong) current,
(uint) page[offset], (uint) table[i]));
}
}
DBUG_RETURN(0);
}
/*
Lock the loghandler
/**
@brief Locks the loghandler.
SYNOPSIS
translog_lock()
@note See comment before buffer 'mutex' variable.
RETURN
0 OK
1 Error
@retval 0 OK
@retval 1 Error
*/
my_bool translog_lock()
......@@ -2454,7 +2467,7 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
last_protected_sector= ((log_descriptor.bc.previous_offset - 1) /
DISK_DRIVE_SECTOR_SIZE);
table= buffer + log_descriptor.page_overhead -
(TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE) * 2;
TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE;
}
DBUG_ASSERT(buffer_unlock == curr_buffer);
......@@ -2477,13 +2490,11 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
last_protected_sector));
for (i= 1; i <= last_protected_sector; i++)
{
uint idx= i * 2;
uint offset= i * DISK_DRIVE_SECTOR_SIZE;
DBUG_PRINT("info", ("Sector %u: 0x%02x%02x <- 0x%02x%02x",
i, buffer[offset], buffer[offset + 1],
table[idx], table[idx + 1]));
buffer[offset]= table[idx];
buffer[offset + 1]= table[idx + 1];
DBUG_PRINT("info", ("Sector %u: 0x%02x <- 0x%02x",
i, buffer[offset],
table[i]));
buffer[offset]= table[i];
}
}
else
......@@ -2525,19 +2536,15 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
file.file= log_descriptor.log_file_num[cache_index];
buffer=
(uchar*) (direct_link ?
pagecache_valid_read(log_descriptor.pagecache, &file,
(uchar*) pagecache_valid_read(log_descriptor.pagecache, &file,
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
3, NULL,
3, (direct_link ? NULL : (char*) buffer),
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_READ, direct_link,
&translog_page_validator, (uchar*) data) :
pagecache_valid_read(log_descriptor.pagecache, &file,
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
3, (char*) buffer,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, direct_link,
&translog_page_validator, (uchar*) data));
(direct_link ?
PAGECACHE_LOCK_READ :
PAGECACHE_LOCK_LEFT_UNLOCKED),
direct_link,
&translog_page_validator, (uchar*) data);
DBUG_PRINT("info", ("Direct link is assigned to : 0x%lx * 0x%lx",
(ulong) direct_link,
(ulong)(direct_link ? *direct_link : NULL)));
......@@ -2586,18 +2593,17 @@ static void translog_free_link(PAGECACHE_BLOCK_LINK *direct_link)
}
/*
Finds last full page of the given log file
/**
@brief Finds last full page of the given log file.
SYNOPSIS
translog_get_last_page_addr()
addr address structure to fill with data, which contain
@param addr address structure to fill with data, which contain
file number of the log file
last_page_ok assigned 1 if last page was OK
@param last_page_ok Result of the check whether last page OK.
(for now only we check only that file length
divisible on page length).
RETURN
0 OK
1 Error
@retval 0 OK
@retval 1 Error
*/
static my_bool translog_get_last_page_addr(TRANSLOG_ADDRESS *addr,
......@@ -2651,7 +2657,7 @@ static uint translog_variable_record_length_bytes(translog_size_t length)
}
/*
/**
@brief Gets header of this chunk.
@param chunk The pointer to the chunk beginning
......@@ -2680,6 +2686,11 @@ static uint16 translog_get_chunk_header_length(uchar *chunk)
if (chunk_len)
{
/* TODO: fine header end */
/*
The last chunk of multi-group record can be base for it header
calculation (we skip to the first group to read the header) so if we
stuck here something is wrong.
*/
DBUG_ASSERT(0);
DBUG_RETURN(0); /* Keep compiler happy */
}
......@@ -2755,8 +2766,11 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr)
(page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr),
log_write_flags)) ||
my_sync(fd, MYF(MY_WME)) ||
my_close(fd, MYF(MY_WME)))
my_close(fd, MYF(MY_WME)) ||
(sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
my_sync(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD))))
DBUG_RETURN(1);
/* fix the horizon */
log_descriptor.horizon= addr;
/* fix the buffer data */
......@@ -2867,8 +2881,8 @@ my_bool translog_init(const char *directory,
if (i & TRANSLOG_PAGE_CRC)
page_overhead[i]+= CRC_LENGTH;
if (i & TRANSLOG_SECTOR_PROTECTION)
page_overhead[i]+= (TRANSLOG_PAGE_SIZE /
DISK_DRIVE_SECTOR_SIZE) * 2;
page_overhead[i]+= TRANSLOG_PAGE_SIZE /
DISK_DRIVE_SECTOR_SIZE;
}
log_descriptor.page_overhead= page_overhead[flags];
log_descriptor.page_capacity_chunk_2=
......@@ -2935,6 +2949,7 @@ my_bool translog_init(const char *directory,
DBUG_ASSERT(LSN_OFFSET(sure_page) % TRANSLOG_PAGE_SIZE != 0);
sure_page-= LSN_OFFSET(sure_page) % TRANSLOG_PAGE_SIZE;
}
/* Set horizon to the beginning of the last file first */
log_descriptor.horizon= last_page= MAKE_LSN(last_logno, 0);
if (translog_get_last_page_addr(&last_page, &pageok))
DBUG_RETURN(1);
......@@ -2962,7 +2977,12 @@ my_bool translog_init(const char *directory,
/* TODO: check page size */
last_valid_page= LSN_IMPOSSIBLE;
/* scan and validate pages */
/*
Scans and validate pages. We need it to show "outside" only for sure
valid part of the log. If the log was damaged then fixed we have to
cut off damaged part before some other process start write something
in the log.
*/
do
{
TRANSLOG_ADDRESS current_file_last_page;
......@@ -3074,7 +3094,7 @@ my_bool translog_init(const char *directory,
(ulong) log_descriptor.bc.buffer->size,
(ulong) (log_descriptor.bc.ptr - log_descriptor.bc.
buffer->buffer)));
DBUG_EXECUTE("info", translog_check_cursor(&log_descriptor.bc););
translog_check_cursor(&log_descriptor.bc);
}
if (!old_log_was_recovered && old_flags == flags)
{
......@@ -3127,8 +3147,12 @@ my_bool translog_init(const char *directory,
log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */
log_descriptor.previous_flush_horizon= log_descriptor.horizon;
/*
horizon is (potentially) address of the next LSN we need decrease
it to signal that all LSNs before it are flushed
Now 'flushed' is set to 'horizon' value, but 'horizon' is (potentially)
address of the next LSN and we want indicate that all LSNs that are
already on the disk are flushed so we need decrease horizon on 1 (we are
sure that there is no LSN on the disk which is greater then 'flushed'
and there will not be LSN created that is equal or less then the value
of the 'flushed').
*/
log_descriptor.flushed--; /* offset decreased */
log_descriptor.sent_to_disk--; /* offset decreased */
......@@ -3371,7 +3395,9 @@ void translog_destroy()
@param prev_buffer Buffer which should be flushed will be assigned here.
This is always set (to NULL if nothing to flush).
@note handler should be locked.
@note We do not want to flush the buffer immediately because we want to
let caller of this function first advance 'horizon' pointer and unlock the
loghandler and only then flush the log which can take some time.
@retval 0 OK
@retval 1 Error
......@@ -3464,7 +3490,7 @@ static my_bool translog_write_data_on_page(TRANSLOG_ADDRESS *horizon,
(uint) cursor->buffer->buffer_no, (ulong) cursor->buffer,
cursor->chaser, (ulong) cursor->buffer->size,
(ulong) (cursor->ptr - cursor->buffer->buffer)));
DBUG_EXECUTE("info", translog_check_cursor(cursor););
translog_check_cursor(cursor);
DBUG_RETURN(0);
}
......@@ -3502,7 +3528,7 @@ static my_bool translog_write_parts_on_page(TRANSLOG_ADDRESS *horizon,
(ulong) (cursor->ptr - cursor->buffer->buffer)));
DBUG_ASSERT(length > 0);
DBUG_ASSERT(length + cursor->current_page_fill <= TRANSLOG_PAGE_SIZE);
DBUG_ASSERT(length + cursor->ptr <=cursor->buffer->buffer +
DBUG_ASSERT(length + cursor->ptr <= cursor->buffer->buffer +
TRANSLOG_WRITE_BUFFER);
do
......@@ -3551,6 +3577,10 @@ static my_bool translog_write_parts_on_page(TRANSLOG_ADDRESS *horizon,
cursor->current_page_fill+= length;
if (!cursor->chaser)
cursor->buffer->size+= length;
/*
We do not not updating parts->total_record_length here because it is
need only before writing record to have total length
*/
DBUG_PRINT("info", ("Write parts buffer #%u: 0x%lx "
"chaser: %d Size: %lu (%lu) "
"Horizon: (%lu,0x%lx) buff offset: 0x%lx",
......@@ -3560,7 +3590,7 @@ static my_bool translog_write_parts_on_page(TRANSLOG_ADDRESS *horizon,
LSN_IN_PARTS(*horizon),
(ulong) (LSN_OFFSET(cursor->buffer->offset) +
cursor->buffer->size)));
DBUG_EXECUTE("info", translog_check_cursor(cursor););
translog_check_cursor(cursor);
DBUG_RETURN(0);
}
......@@ -3785,6 +3815,14 @@ static my_bool translog_advance_pointer(uint pages, uint16 last_page_data)
(uint) last_page_data));
translog_lock_assert_owner();
/*
The loop will be executed 1-3 times. Usually we advance the
pointer to fill only the current buffer (if we have more then 1/2 of
buffer free or 2 buffers (rest of current and all next). In case of
really huge record end where we write last group with "table of
content" of all groups and ignore buffer borders we can occupy
3 buffers.
*/
for (;;)
{
uint8 new_buffer_no;
......@@ -3882,7 +3920,7 @@ static my_bool translog_advance_pointer(uint pages, uint16 last_page_data)
DBUG_PRINT("info",
("pointer moved to: (%lu, 0x%lx)",
LSN_IN_PARTS(log_descriptor.horizon)));
DBUG_EXECUTE("info", translog_check_cursor(&log_descriptor.bc););
translog_check_cursor(&log_descriptor.bc);
log_descriptor.bc.protected= 0;
DBUG_RETURN(0);
}
......@@ -3901,8 +3939,11 @@ static my_bool translog_advance_pointer(uint pages, uint16 last_page_data)
number of bytes left on the current page
*/
#define translog_get_current_page_rest() \
(TRANSLOG_PAGE_SIZE - log_descriptor.bc.current_page_fill)
static uint translog_get_current_page_rest()
{
return (TRANSLOG_PAGE_SIZE - log_descriptor.bc.current_page_fill);
}
/*
Get buffer rest in full pages
......@@ -3916,10 +3957,12 @@ static my_bool translog_advance_pointer(uint pages, uint16 last_page_data)
number of full pages left on the current buffer
*/
#define translog_get_current_buffer_rest() \
((log_descriptor.bc.buffer->buffer + TRANSLOG_WRITE_BUFFER - \
log_descriptor.bc.ptr) / \
TRANSLOG_PAGE_SIZE)
static uint translog_get_current_buffer_rest()
{
return ((log_descriptor.bc.buffer->buffer + TRANSLOG_WRITE_BUFFER -
log_descriptor.bc.ptr) /
TRANSLOG_PAGE_SIZE);
}
/*
Calculate possible group size without first (current) page
......@@ -4098,10 +4141,6 @@ translog_write_variable_record_1group(LSN *lsn,
if (!(rc= translog_buffer_lock(cursor.buffer)))
{
/*
Check if we wrote something on 1:st not full page and need to reconstruct
CRC and sector protection
*/
translog_buffer_decrease_writers(cursor.buffer);
}
rc|= translog_buffer_unlock(cursor.buffer);
......@@ -4194,83 +4233,61 @@ translog_write_variable_record_1chunk(LSN *lsn,
That is, LSN is encoded in 2..5 bytes, and the number of bytes minus 2
is stored in the first two bits.
@note function made to write the result in backward direction with no
special sense or tricks both directions are equal in complicity
@retval # pointer on coded LSN
*/
static uchar *translog_put_LSN_diff(LSN base_lsn, LSN lsn, uchar *dst)
{
uint64 diff;
DBUG_ENTER("translog_put_LSN_diff");
DBUG_PRINT("enter", ("Base: (0x%lu,0x%lx) val: (0x%lu,0x%lx) dst: 0x%lx",
LSN_IN_PARTS(base_lsn), LSN_IN_PARTS(lsn),
(ulong) dst));
if (LSN_FILE_NO(base_lsn) == LSN_FILE_NO(lsn))
DBUG_ASSERT(base_lsn > lsn);
diff= base_lsn - lsn;
DBUG_PRINT("info", ("Diff: 0x%llx", (ulonglong) diff));
if (diff <= 0x3FFF)
{
uint32 diff;
DBUG_ASSERT(base_lsn > lsn);
diff= base_lsn - lsn;
DBUG_PRINT("info", ("File is the same. Diff: 0x%lx", (ulong) diff));
if (diff <= 0x3FFF)
{
dst-= 2;
/*
Note we store this high uchar first to ensure that first uchar has
0 in the 3 upper bits.
*/
dst[0]= diff >> 8;
dst[1]= (diff & 0xFF);
}
else if (diff <= 0x3FFFFF)
{
dst-= 3;
dst[0]= 0x40 | (diff >> 16);
int2store(dst + 1, diff & 0xFFFF);
}
else if (diff <= 0x3FFFFFFF)
{
dst-= 4;
dst[0]= 0x80 | (diff >> 24);
int3store(dst + 1, diff & 0xFFFFFF);
}
else
{
dst-= 5;
dst[0]= 0xC0;
int4store(dst + 1, diff);
}
dst-= 2;
/*
Note we store this high uchar first to ensure that first uchar has
0 in the 3 upper bits.
*/
dst[0]= diff >> 8;
dst[1]= (diff & 0xFF);
}
else
else if (diff <= 0x3FFFFFL)
{
uint32 diff;
uint32 offset_diff;
ulonglong base_offset= LSN_OFFSET(base_lsn);
DBUG_ASSERT(base_lsn > lsn);
diff= LSN_FILE_NO(base_lsn) - LSN_FILE_NO(lsn);
DBUG_PRINT("info", ("File is different. Diff: 0x%lx", (ulong) diff));
dst-= 3;
dst[0]= 0x40 | (diff >> 16);
int2store(dst + 1, diff & 0xFFFF);
}
else if (diff <= 0x3FFFFFFFL)
{
dst-= 4;
dst[0]= 0x80 | (diff >> 24);
int3store(dst + 1, diff & 0xFFFFFFL);
}
else if (diff <= LL(0x3FFFFFFFFF))
if (base_offset < LSN_OFFSET(lsn))
{
/* take 1 from file offset */
diff--;
base_offset+= LL(0x100000000);
}
offset_diff= base_offset - LSN_OFFSET(lsn);
if (diff > 0x3f)
{
/*
It is full LSN after special 1 diff (which is impossible
in real life)
*/
dst-= 2 + LSN_STORE_SIZE;
dst[0]= 0;
dst[1]= 1;
lsn_store(dst + 2, lsn);
}
else
{
dst-= 5;
*dst= (0xC0 | diff);
int4store(dst + 1, offset_diff);
}
{
dst-= 5;
dst[0]= 0xC0 | (diff >> 32);
int4store(dst + 1, diff & 0xFFFFFFFFL);
}
else
{
/*
It is full LSN after special 1 diff (which is impossible
in real life)
*/
dst-= 2 + LSN_STORE_SIZE;
dst[0]= 0;
dst[1]= 1;
lsn_store(dst + 2, lsn);
}
DBUG_PRINT("info", ("new dst: 0x%lx", (ulong) dst));
DBUG_RETURN(dst);
......@@ -4440,6 +4457,10 @@ static void translog_relative_LSN_encode(struct st_translog_parts *parts,
uchar *src_ptr;
uchar *dst_ptr= compressed_LSNs + (MAX_NUMBER_OF_LSNS_PER_RECORD *
COMPRESSED_LSN_MAX_STORE_SIZE);
/*
We write the result in backward direction with no special sense or
tricks both directions are equal in complicity
*/
for (src_ptr= buffer + lsns_len - LSN_STORE_SIZE;
src_ptr >= (uchar*) buffer;
src_ptr-= LSN_STORE_SIZE)
......@@ -4531,6 +4552,13 @@ translog_write_variable_record_mgroup(LSN *lsn,
if (record_rest < buffer_rest)
{
/*
The record (group 1 type) is larger than the free space on the page
- we need to split it in two. But when we split it in two, the first
part is big enough to hold all the data of the record (because the
header of the first part of the split is smaller than the header of
the record as a whole when it takes only one chunk)
*/
DBUG_PRINT("info", ("too many free space because changing header"));
buffer_rest-= log_descriptor.page_capacity_chunk_2;
DBUG_ASSERT(record_rest >= buffer_rest);
......@@ -6404,7 +6432,7 @@ static void translog_force_current_buffer_to_finish()
DBUG_ASSERT(log_descriptor.bc.ptr !=NULL);
DBUG_ASSERT(LSN_FILE_NO(log_descriptor.horizon) ==
LSN_FILE_NO(log_descriptor.bc.buffer->offset));
DBUG_EXECUTE("info", translog_check_cursor(&log_descriptor.bc););
translog_check_cursor(&log_descriptor.bc);
DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE);
if (left != 0)
{
......@@ -7176,6 +7204,7 @@ uint32 translog_get_file_size()
void translog_set_file_size(uint32 size)
{
struct st_translog_buffer *old_buffer= NULL;
DBUG_ENTER("translog_set_file_size");
translog_lock();
DBUG_PRINT("enter", ("Size: %lu", (ulong) size));
......@@ -7185,11 +7214,17 @@ void translog_set_file_size(uint32 size)
/* if current file longer then finish it*/
if (LSN_OFFSET(log_descriptor.horizon) >= log_descriptor.log_file_max_size)
{
struct st_translog_buffer *old_buffer= log_descriptor.bc.buffer;
old_buffer= log_descriptor.bc.buffer;
translog_buffer_next(&log_descriptor.horizon, &log_descriptor.bc, 1);
translog_buffer_unlock(old_buffer);
}
translog_unlock();
if (old_buffer)
{
translog_buffer_lock(old_buffer);
translog_buffer_flush(old_buffer);
translog_buffer_unlock(old_buffer);
}
DBUG_VOID_RETURN;
}
......@@ -25,7 +25,12 @@
/* transaction log default flags (TODO: make it global variable) */
#define TRANSLOG_DEFAULT_FLAGS 0
/* Transaction log flags */
/*
Transaction log flags.
We allow all kind protections to be switched on together for people who
really unsure in their hardware/OS.
*/
#define TRANSLOG_PAGE_CRC 1
#define TRANSLOG_SECTOR_PROTECTION (1<<1)
#define TRANSLOG_RECORD_CRC (1<<2)
......@@ -81,7 +86,7 @@ struct st_maria_handler;
#define dirpos_korr(P) (*(uchar *) (P))
#define pagerange_korr(P) uint2korr(P)
#define clr_type_korr(P) (*(uchar *) (P))
#define key_nr_korr(P) ((P)[0])
#define key_nr_korr(P) (*(uchar *) (P))
#define ha_checksum_korr(P) uint4korr(P)
/*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment