Commit 534417c2 authored by unknown's avatar unknown

WL#3072 - Maria Recovery

Bulk insert: don't log REDO/UNDO for rows, log one UNDO which will
truncate files; this is an optimization and a bugfix (table was left
half-repaired by crash).
Repair: mark table crashed-on-repair at start, bump skip_redo_lsn at start,
this is easier for recovery (tells it to skip old REDOs or even UNDO
phase) and user (tells it to repair) in case of crash, sync files
in the end.
Recovery skips missing or corrupted table and moves to next record
(in REDO or UNDO phase) to be more robust; warns if happens in UNDO phase.
Bugfix for UNDO_KEY_DELETE_WITH_ROOT (tested in ma_test_recovery)
and maria_enable_indexes().
Create missing bitmaps when needed (there can be more than one to create,
in rare cases), log a record for this.


include/myisamchk.h:
  new flag: bulk insert repair mustn't bump create_rename_lsn
mysql-test/lib/mtr_report.pl:
  skip normal warning in maria-recovery.test
mysql-test/r/maria-recovery.result:
  result: crash before bulk insert is committed, causes proper rollback,
  and crash right after OPTIMIZE replaces index file with new index file
  leads to table marked corrupted and recovery not failing.
mysql-test/t/maria-recovery.test:
  - can't check the table or it would commit the transaction,
  but check is made after recovery.
  - test of crash before bulk-insert-with-repair is committed
  (to see if it is rolled back), and of crash after OPTIMIZE has replaced
  index file but not finished all operations (to see if recovery fails -
  it used to assert when trying to execute an old REDO on the new
  index).
storage/maria/CMakeLists.txt:
  new file
storage/maria/Makefile.am:
  new file
storage/maria/ha_maria.cc:
  - If bulk insert on a transactional table using an index repair:
  table is initially empty, so don't log REDO/UNDO for data rows
  (optimization), just log an UNDO_BULK_INSERT_WITH_REPAIR
  which will, if executed, empty the data and index file. Re-enable
  logging in end_bulk_insert().
  - write log record for repair operation only after it's fully done,
  index sort including (maria_repair*() used to write the log record).
  - Adding back file->trn=NULL which was removed by mistake earlier.
storage/maria/ha_maria.h:
  new member (see ha_maria.cc)
storage/maria/ma_bitmap.c:
  Functions to create missing bitmaps:
  - one function which creates missing bitmaps in page cache, except
  the missing one with max offset which it does not put into page cache
  as it will be modified very soon.
  - one function which the one above calls, and creates bitmaps in page
  cache
  - one function to execute REDO_BITMAP_NEW_PAGE which uses the second
  one above.
storage/maria/ma_blockrec.c:
  - when logging REDO_DELETE_ALL, not only 'records' and 'checksum'
  has to be reset under log's mutex.
  - execution of REDO_INSERT_ROW_BLOBS now checks the dirty pages' list
  - execution of UNDO_BULK_INSERT_WITH_REPAIR
storage/maria/ma_blockrec.h:
  new functions
storage/maria/ma_check.c:
  - table-flush-before-repair is moved to a separate function reused
  by maria_sort_index(); syncing is added
  - maria_repair() is allowed to re-enable logging only if it is the one
  which disabled it.
  - "_ma_flush_table_files_after_repair" was a bad name, it's not after
  repair now, and it should not sync as we do more changes to the files
  shortly after (sync is postponed to when writing the log record)
  - REDO_REPAIR record should be written only after all repair
  operations (in particular after sorting index in ha_mara::repair())
  - close to the end of repair by sort, flushing of pages must happen
  also in the non-quick case, to prepare for the sync at end.
  - in parallel repair, some page flushes are not needed as done
  by initialize_variables_for_repair().
storage/maria/ma_create.c:
  Update skip_redo_lsn, create_rename_lsn optionally.
storage/maria/ma_delete_all.c:
  Need to sync files at end of maria_delete_all_rows(), if transactional.
storage/maria/ma_extra.c:
  During repair, we sometimes call _ma_flush_table_files() (via
  _ma_flush_table_files_before_swap()) while there is a WRITE_CACHE.
storage/maria/ma_key_recover.c:
  - when we see CLR_END for UNDO_BULK_INSERT_WITH_REPAIR, re-enable
  indices.
  - fixing bug: _ma_apply_undo_key_delete() parsed UNDO_KEY_DELETE_WITH_ROOT
  wrongly, leading to recovery failure
storage/maria/ma_key_recover.h:
  new prototype
storage/maria/ma_locking.c:
  DBUG_VOID_RETURN missing
storage/maria/ma_loghandler.c:
  UNDO for bulk insert with repair, and REDO for creating bitmaps.
  LOGREC_FIRST_FREE to not have to change the for() every time we
  add a new record type.
storage/maria/ma_loghandler.h:
  new UNDO and REDO
storage/maria/ma_open.c:
  Move share.kfile.file=kfile up a bit, so that _ma_update_state_lsns()
  can get its value, this fixes a bug where LSN_REPAIRED_BY_MARIA_CHK
  was not corrected on disk by maria_open().
  Store skip_redo_lsn in index' header.
  maria_enable_indexes() had a bug for BLOCK_RECORD, where an empty
  file has one page, not 0 bytes.
storage/maria/ma_recovery.c:
  - Skip a corrupted, missing, or repaired-with-maria_chk, table in
  recovery: don't fail, just go to next REDO or UNDO; but if an UNDO
  is skipped in UNDO phase we issue warnings.
  - Skip REDO|UNDO in REDO phase if <skip_redo_lsn.
  - If UNDO phase fails, delete transactions to not make trnman
  assert.
  - Update skip_redo_lsn when playing REDO_CREATE_TABLE
  - Don't record UNDOs for old transactions which we don't know (long_trid==0)
  - Bugfix for UNDO_KEY_DELETE_WITH_ROOT (see ma_key_recover.c)
  - Execution of UNDO_BULK_INSERT_WITH_REPAIR
  - Don't try to find a page number in REDO_DELETE_ALL
  - Pieces moved to ma_recovery_util.c
storage/maria/ma_rename.c:
  name change
storage/maria/ma_static.c:
  I modified layout of the index' header (inserted skip_redo_lsn in its middle)
storage/maria/ma_test2.c:
  allow breaking the test towards the end, tests execution of
  UNDO_KEY_DELETE_WITH_ROOT
storage/maria/ma_test_recovery.expected:
  6 as testflag instead of 4
storage/maria/ma_test_recovery:
  Increase the amount of rollback work to do when testing recovery
  with ma_test2; this reproduces the UNDO_KEY_DELETE_WITH_ROOT bug.
storage/maria/maria_chk.c:
  skip_redo_lsn should be updated too, for consistency.
  Write a REDO_REPAIR after all operations (including sort-records)
  have been done.
  No reason to flush blocks after maria_chk_data_link() and
  maria_sort_records(), there is maria_close() in the end.
  write_log_record() is a function, to not clutter maria_chk().
storage/maria/maria_def.h:
  New member skip_redo_lsn in the state, and comments
storage/maria/maria_pack.c:
  skip_redo_lsn should be updated too, for consistency
storage/maria/ma_recovery_util.c:
  _ma_redo_not_needed_for_page(), defined in ma_recovery.c, is needed
  by ma_blockrec.c; this causes link issues, resolved by putting
  _ma_redo_not_needed_for_page() into a new file (so that it is not
  in the same file as repair-related objects of ma_recovery.c).
storage/maria/ma_recovery_util.h:
  new file
parent 8382126a
......@@ -60,6 +60,8 @@
#define T_WAIT_FOREVER (1L << 30)
#define T_WRITE_LOOP ((ulong) 1L << 31)
#define T_ZEROFILL ((ulonglong) 1L << 32)
/** If repair should not bump create_rename_lsn */
#define T_NO_CREATE_RENAME_LSN ((ulonglong) 1L << 33)
#define T_REP_ANY (T_REP | T_REP_BY_SORT | T_REP_PARALLEL)
......
......@@ -366,7 +366,9 @@ sub mtr_report_stats ($) {
/Slave: Key column 'c6'.* 1072/ or
# maria-recovery.test has warning about missing log file
/Can't get stat of '.*maria_log.00*/
/Can't get stat of '.*maria_log.00/ or
# and about marked-corrupted table
/Table '.\/mysqltest\/t1' is crashed, skipping it. Please repair it with maria_chk -r/
)
{
next; # Skip these lines
......
......@@ -269,6 +269,39 @@ Checksum-check
ok
use mysqltest;
drop table t1;
Test of REPAIR's implicit commit
create table t1 (a varchar(100), key(a)) engine=maria;
insert into t1 values(3);
flush table t1;
* copied t1 for comparison
lock tables t1 write;
insert into t1 values (1);
repair table t1;
Table Op Msg_type Msg_text
mysqltest.t1 repair status OK
insert into t1 values(2);
select * from t1;
a
1
2
3
SET SESSION debug="+d,maria_flush_whole_log,maria_flush_whole_page_cache,maria_crash";
* crashing mysqld intentionally
set global maria_checkpoint_interval=1;
ERROR HY000: Lost connection to MySQL server during query
* recovery happens
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
Checksum-check
failure
use mysqltest;
select * from t1;
a
1
3
drop table t1;
* TEST of recovery with blobs
* shut down mysqld, removed logs, restarted it
use mysqltest;
......@@ -329,9 +362,6 @@ select a,length(b) from t1;
a length(b)
1 8
2 5
check table t1;
Table Op Msg_type Msg_text
mysqltest.t1 check status OK
SET SESSION debug="+d,maria_flush_whole_log,maria_crash";
* crashing mysqld intentionally
set global maria_checkpoint_interval=1;
......@@ -346,6 +376,58 @@ Checksum-check
ok
use mysqltest;
drop table t1;
* TEST of recovery when crash before bulk-insert-with-repair is committed
create table t1 (a varchar(100), key(a)) engine=maria;
create table t2 (a varchar(100)) engine=myisam;
set rand_seed1=12, rand_seed2=254;
insert into t2 values (rand());
insert into t2 select (rand()) from t2;
insert into t2 select (rand()) from t2;
insert into t2 select (rand()) from t2;
insert into t2 select (rand()) from t2;
insert into t2 select (rand()) from t2;
insert into t2 select (rand()) from t2;
insert into t1 values(30);
flush table t1;
* copied t1 for comparison
lock tables t1 write, t2 read;
delete from t1 limit 1;
insert into t1 select * from t2;
SET SESSION debug="+d,maria_flush_whole_log,maria_flush_whole_page_cache,maria_crash";
* crashing mysqld intentionally
set global maria_checkpoint_interval=1;
ERROR HY000: Lost connection to MySQL server during query
* recovery happens
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
Checksum-check
ok
use mysqltest;
show keys from t1;
Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment
t1 1 a 1 a A 1 NULL NULL YES BTREE
drop table t1;
* TEST of recovery when OPTIMIZE has replaced the index file and crash
create table t1 (a varchar(100), key(a)) engine=maria;
insert into t1 select (rand()) from t2;
flush table t1;
* copied t1 for comparison
SET SESSION debug="+d,maria_flush_whole_log,maria_flush_whole_page_cache,maria_crash_sort_index";
* crashing mysqld intentionally
optimize table t1;
ERROR HY000: Lost connection to MySQL server during query
* recovery happens
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 check warning Table is marked as crashed and last repair failed
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
Checksum-check
ok
use mysqltest;
drop table t1, t2;
drop database mysqltest_for_feeding_recovery;
drop database mysqltest_for_comparison;
drop database mysqltest;
......@@ -235,7 +235,6 @@ let $mvr_crash_statement= set global maria_checkpoint_interval=1;
drop table t1;
--echo Test of REPAIR's implicit commit
let $mms_tables=1;
create table t1 (a varchar(100), key(a)) engine=maria;
let $mvr_restore_old_snapshot=0;
......@@ -284,7 +283,6 @@ while ($loop)
dec $loop;
}
select a,length(b) from t1;
check table t1;
# we want recovery to run on the first snapshot made above
let $mvr_restore_old_snapshot=1;
let $mms_compare_physically=0;
......@@ -293,6 +291,46 @@ let $mvr_crash_statement= set global maria_checkpoint_interval=1;
-- source include/maria_verify_recovery.inc
drop table t1;
--echo * TEST of recovery when crash before bulk-insert-with-repair is committed
create table t1 (a varchar(100), key(a)) engine=maria;
create table t2 (a varchar(100)) engine=myisam;
let $mvr_restore_old_snapshot=0;
let $mms_compare_physically=0;
let $mvr_crash_statement= set global maria_checkpoint_interval=1;
let $mvr_debug_option="+d,maria_flush_whole_log,maria_flush_whole_page_cache,maria_crash";
set rand_seed1=12, rand_seed2=254; # repeatable
insert into t2 values (rand());
insert into t2 select (rand()) from t2;
insert into t2 select (rand()) from t2;
insert into t2 select (rand()) from t2;
insert into t2 select (rand()) from t2;
insert into t2 select (rand()) from t2;
insert into t2 select (rand()) from t2;
insert into t1 values(30);
-- source include/maria_make_snapshot_for_comparison.inc
lock tables t1 write, t2 read;
delete from t1 limit 1;
# 127 rows in t2, >100, so this will use repair-at-end
insert into t1 select * from t2;
-- source include/maria_verify_recovery.inc
show keys from t1; # should be enabled
drop table t1;
--echo * TEST of recovery when OPTIMIZE has replaced the index file and crash
create table t1 (a varchar(100), key(a)) engine=maria;
let $mvr_restore_old_snapshot=0;
let $mms_compare_physically=0;
let $mvr_crash_statement= optimize table t1;
let $mvr_debug_option="+d,maria_flush_whole_log,maria_flush_whole_page_cache,maria_crash_sort_index";
insert into t1 select (rand()) from t2;
-- source include/maria_make_snapshot_for_comparison.inc
# Recovery will not fix the table, but we expect to see it marked
# "crashed on repair".
# Because crash is mild, the table is actually not corrupted, so the
# "check table extended" done below fixes the table.
-- source include/maria_verify_recovery.inc
drop table t1, t2;
# clean up everything
let $mms_purpose=feeding_recovery;
eval drop database mysqltest_for_$mms_purpose;
......
......@@ -43,7 +43,7 @@ SET(MARIA_SOURCES ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c
ma_sp_key.c ma_control_file.c ma_loghandler.c
ma_pagecache.c ma_pagecaches.c
ma_checkpoint.c ma_recovery.c ma_commit.c ma_pagecrc.c
ha_maria.h maria_def.h
ha_maria.h maria_def.h ma_recovery_util.c
)
IF(NOT SOURCE_SUBLIBS)
......
......@@ -72,7 +72,8 @@ noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \
ma_control_file.h ha_maria.h ma_blockrec.h \
ma_loghandler.h ma_loghandler_lsn.h ma_pagecache.h \
ma_checkpoint.h ma_recovery.h ma_commit.h \
trnman_public.h ma_check_standalone.h ma_key_recover.h
trnman_public.h ma_check_standalone.h ma_key_recover.h \
ma_recovery_util.h
ma_test1_DEPENDENCIES= $(LIBRARIES)
ma_test1_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \
$(top_builddir)/storage/myisam/libmyisam.a \
......@@ -133,7 +134,7 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \
ma_sp_key.c ma_control_file.c ma_loghandler.c \
ma_pagecache.c ma_pagecaches.c \
ma_checkpoint.c ma_recovery.c ma_commit.c \
ma_pagecrc.c
ma_pagecrc.c ma_recovery_util.c
CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA?
SUFFIXES = .sh
......
......@@ -650,7 +650,7 @@ int_table_flags(HA_NULL_IN_KEY | HA_CAN_FULLTEXT | HA_CAN_SQL_HANDLER |
HA_FILE_BASED | HA_CAN_GEOMETRY | MARIA_CANNOT_ROLLBACK |
HA_CAN_BIT_FIELD | HA_CAN_RTREEKEYS |
HA_HAS_RECORDS | HA_STATS_RECORDS_IS_EXACT),
can_enable_indexes(1)
can_enable_indexes(1), bulk_insert_with_repair_trans(FALSE)
{}
......@@ -1316,11 +1316,6 @@ int ha_maria::repair(THD *thd, HA_CHECK &param, bool do_optimize)
param.testflag &= ~(T_REP_BY_SORT | T_REP_PARALLEL);
error= maria_repair(&param, file, fixed_name,
test(param.testflag & T_QUICK));
/**
@todo RECOVERY BUG we do things with the index file
(maria_sort_index() after the above which already has logged the
record and bumped create_rename_lsn. Is it ok?
*/
}
param.testflag= save_testflag;
optimize_done= 1;
......@@ -1392,8 +1387,11 @@ int ha_maria::repair(THD *thd, HA_CHECK &param, bool do_optimize)
thd_proc_info(thd, old_proc_info);
if (!thd->locked_tables)
maria_lock_database(file, F_UNLCK);
DBUG_RETURN(error ? HA_ADMIN_FAILED :
!optimize_done ? HA_ADMIN_ALREADY_DONE : HA_ADMIN_OK);
error= error ? HA_ADMIN_FAILED :
(optimize_done ?
(write_log_record_for_repair(&param, file) ? HA_ADMIN_FAILED :
HA_ADMIN_OK) : HA_ADMIN_ALREADY_DONE);
DBUG_RETURN(error);
}
......@@ -1613,6 +1611,14 @@ int ha_maria::enable_indexes(uint mode)
param.op_name= "recreating_index";
param.testflag= (T_SILENT | T_REP_BY_SORT | T_QUICK |
T_CREATE_MISSING_KEYS | T_SAFE_REPAIR);
if (bulk_insert_with_repair_trans)
{
/*
Don't bump create_rename_lsn, because UNDO_BULK_INSERT_WITH_REPAIR
should not be skipped in case of crash during repair.
*/
param.testflag|= T_NO_CREATE_RENAME_LSN;
}
param.myf_rw &= ~MY_WAIT_IF_FULL;
param.sort_buffer_length= THDVAR(thd,sort_buffer_size);
param.stats_method= (enum_handler_stats_method)THDVAR(thd,stats_method);
......@@ -1705,6 +1711,7 @@ void ha_maria::start_bulk_insert(ha_rows rows)
can_enable_indexes= (maria_is_all_keys_active(file->s->state.key_map,
file->s->base.keys));
bulk_insert_with_repair_trans= FALSE;
if (!(specialflag & SPECIAL_SAFE_MODE))
{
......@@ -1716,7 +1723,23 @@ void ha_maria::start_bulk_insert(ha_rows rows)
*/
if (file->state->records == 0 && can_enable_indexes &&
(!rows || rows >= MARIA_MIN_ROWS_TO_DISABLE_INDEXES))
{
maria_disable_non_unique_index(file, rows);
if (file->s->now_transactional)
{
bulk_insert_with_repair_trans= TRUE;
write_log_record_for_bulk_insert_with_repair(file);
/*
Pages currently in the page cache have type PAGECACHE_LSN_PAGE, we
are not allowed to overwrite them with PAGECACHE_PLAIN_PAGE, so
throw them away. It is not losing data, because we just wrote and
forced an UNDO which will for sure empty the table if we crash.
*/
_ma_flush_table_files(file, MARIA_FLUSH_DATA|MARIA_FLUSH_INDEX,
FLUSH_IGNORE_CHANGED, FLUSH_IGNORE_CHANGED);
_ma_tmp_disable_logging_for_table(file, TRUE);
}
}
else if (!file->bulk_insert &&
(!rows || rows >= MARIA_MIN_ROWS_TO_USE_BULK_INSERT))
{
......@@ -1750,9 +1773,18 @@ int ha_maria::end_bulk_insert()
int err;
DBUG_ENTER("ha_maria::end_bulk_insert");
maria_end_bulk_insert(file);
err= maria_extra(file, HA_EXTRA_NO_CACHE, 0);
DBUG_RETURN(err ? err : can_enable_indexes ?
enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE) : 0);
if ((err= maria_extra(file, HA_EXTRA_NO_CACHE, 0)))
goto end;
if (can_enable_indexes)
err= enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
end:
if (bulk_insert_with_repair_trans)
{
DBUG_ASSERT(can_enable_indexes);
/* table was transactional just before start_bulk_insert() */
_ma_reenable_logging_for_table(file);
}
DBUG_RETURN(err);
}
......@@ -2131,7 +2163,7 @@ int ha_maria::external_lock(THD *thd, int lock_type)
if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trans_register_ha(thd, TRUE, maria_hton);
}
this->file->trn= trn;
file->trn= trn;
if (!trnman_increment_locked_tables(trn))
{
trans_register_ha(thd, FALSE, maria_hton);
......@@ -2156,6 +2188,7 @@ int ha_maria::external_lock(THD *thd, int lock_type)
{
_ma_reenable_logging_for_table(file);
/** @todo zero file->trn also in commit and rollback */
file->trn= NULL;
if (trn && trnman_has_locked_tables(trn))
{
if (!trnman_decrement_locked_tables(trn))
......
......@@ -40,6 +40,8 @@ class ha_maria :public handler
char *data_file_name, *index_file_name;
enum data_file_type data_file_type;
bool can_enable_indexes;
/** If a transactional table is doing bulk insert with repair */
bool bulk_insert_with_repair_trans;
int repair(THD * thd, HA_CHECK &param, bool optimize);
int zerofill(THD * thd, HA_CHECK_OPT *check_opt);
......
......@@ -130,7 +130,10 @@
/*#define WRONG_BITMAP_FLUSH 1*/ /*define only for provoking bugs*/
#undef WRONG_BITMAP_FLUSH
static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
static my_bool _ma_read_bitmap_page(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
pgcache_page_no_t page);
static my_bool _ma_bitmap_create_missing(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
pgcache_page_no_t page);
......@@ -454,6 +457,7 @@ static void _ma_bitmap_unpin_all(MARIA_SHARE *share)
void _ma_bitmap_delete_all(MARIA_SHARE *share)
{
MARIA_FILE_BITMAP *bitmap= &share->bitmap;
DBUG_ENTER("_ma_bitmap_delete_all");
if (bitmap->map) /* Not in create */
{
bzero(bitmap->map, bitmap->block_size);
......@@ -461,6 +465,7 @@ void _ma_bitmap_delete_all(MARIA_SHARE *share)
bitmap->page= 0;
bitmap->used_size= bitmap->total_size;
}
DBUG_VOID_RETURN;
}
......@@ -725,7 +730,7 @@ void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap, uchar *data,
Read a given bitmap page
SYNOPSIS
read_bitmap_page()
_ma_read_bitmap_page()
info Maria handler
bitmap Bitmap handler
page Page to read
......@@ -742,50 +747,22 @@ void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap, uchar *data,
1 error (Error writing old bitmap or reading bitmap page)
*/
static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
static my_bool _ma_read_bitmap_page(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
pgcache_page_no_t page)
{
my_off_t end_of_page= (page + 1) * bitmap->block_size;
MARIA_SHARE *share= info->s;
my_bool res;
DBUG_ENTER("_ma_read_bitmap_page");
DBUG_ASSERT(page % bitmap->pages_covered == 0);
DBUG_ASSERT(!bitmap->changed);
bitmap->page= page;
if (end_of_page > share->state.state.data_file_length)
if (((page + 1) * bitmap->block_size) > share->state.state.data_file_length)
{
/*
Inexistent or half-created page (could be crash in the middle of
_ma_bitmap_create_first(), before appending maria_bitmap_marker).
*/
/**
@todo RECOVERY BUG
We are updating data_file_length before writing any log record for the
row operation. What if now state is flushed by a checkpoint with the
new value, and crash before the checkpoint record is written, recovery
may not even open the table (no log records) so not fix
data_file_length ("WAL violation")?
Scenario: assume share->id==0, then:
thread 1 (here) thread 2 (checkpoint)
update data_file_length
copy state to memory, flush log
set share->id and write FILE_ID (not flushed)
see share->id!=0 so flush state
crash
FILE_ID will be missing, Recovery will not open table and not fix
data_file_length. This bug should be fixed with other "checkpoint vs
bitmap" bugs.
One possibility will be logging a standalone LOGREC_CREATE_BITMAP in a
separate transaction (using dummy_transaction_object).
*/
share->state.state.data_file_length= end_of_page;
bzero(bitmap->map, bitmap->block_size);
bitmap->used_size= 0;
#ifndef DBUG_OFF
memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
#endif
DBUG_RETURN(0);
/* Inexistent or half-created page */
res= _ma_bitmap_create_missing(info, bitmap, page);
DBUG_RETURN(res);
}
bitmap->used_size= bitmap->total_size;
DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size);
......@@ -842,7 +819,7 @@ static my_bool _ma_change_bitmap_page(MARIA_HA *info,
DBUG_RETURN(1);
bitmap->changed= 0;
}
DBUG_RETURN(_ma_read_bitmap_page(info->s, bitmap, page));
DBUG_RETURN(_ma_read_bitmap_page(info, bitmap, page));
}
......@@ -2635,3 +2612,211 @@ void _ma_bitmap_set_pagecache_callbacks(PAGECACHE_FILE *file,
file->flush_log_callback= flush_log_for_bitmap;
}
}
/**
Extends data file with zeroes and creates new bitmap pages into page cache.
Non-bitmap pages of zeroes are correct as they are marked empty in
bitmaps. Bitmap pages will not be zeroes: they will get their CRC fixed when
flushed. And if there is a crash before flush (so they are zeroes at
restart), a REDO will re-create them in page cache.
*/
static my_bool
_ma_bitmap_create_missing_into_pagecache(MARIA_SHARE *share,
MARIA_FILE_BITMAP *bitmap,
pgcache_page_no_t from,
pgcache_page_no_t to,
uchar *zeroes)
{
pgcache_page_no_t i;
/*
We use my_chsize() to not rely on OS filling gaps with zeroes and to have
sequential order in the file (allocate all new data and bitmap pages from
the filesystem).
*/
if (my_chsize(bitmap->file.file, (to + 1) * bitmap->block_size, 0,
MYF(MY_WME)))
goto err;
/* Write all bitmap pages in [from, to] */
for (i= from; i <= to; i+= bitmap->pages_covered)
{
/* no need to keep them pinned, they are new so flushable */
if (pagecache_write(share->pagecache,
&bitmap->file, i, 0,
zeroes, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DELAY, 0, LSN_IMPOSSIBLE))
goto err;
}
/*
Data pages after data_file_length are full of zeroes but that is allowed
as they are marked empty in the bitmap.
*/
return FALSE;
err:
return TRUE;
}
/**
Creates missing bitmaps when we extend the data file.
At run-time, when we need a new bitmap page we come here; and only one bitmap
page at a time is created.
In some recovery cases we insert at a large offset in the data file, way
beyond state.data_file_length, so can need to create more than one bitmap
page in one go. Known case is:
Start a transaction in Maria;
delete last row of very large table (with delete_row)
do a bulk insert
crash
Then UNDO_BULK_INSERT_WITH_REPAIR will truncate table files, and
UNDO_ROW_DELETE will want to put the row back to its original position,
extending the data file a lot: bitmap page*s* in the hole must be created,
or he table would look corrupted.
We need to log REDOs for bitmap creation, consider: we apply a REDO for a
data page, which creates the first data page covered by a new bitmap
not yet created. If the data page is flushed but the bitmap page is not and
there is a crash, re-execution of the REDO will complain about the zeroed
bitmap page (see it as corruption). Thus a REDO is needed to re-create the
bitmap.
@param info Maria handler
@param bitmap Bitmap handler
@param page Last bitmap page to create
@note When this function is called this must be true:
((page + 1) * bitmap->block_size > info->s->state.state.data_file_length)
*/
static my_bool _ma_bitmap_create_missing(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
pgcache_page_no_t page)
{
MARIA_SHARE *share= info->s;
uint block_size= bitmap->block_size;
pgcache_page_no_t from, to;
my_off_t data_file_length= share->state.state.data_file_length;
DBUG_ENTER("_ma_bitmap_create_missing");
/* First (in offset order) bitmap page to create */
if (data_file_length < block_size)
goto err; /* corrupted, should have first bitmap page */
from= (data_file_length / block_size - 1) / bitmap->pages_covered + 1;
from*= bitmap->pages_covered;
/*
page>=from because:
(page + 1) * bs > dfl, and page == k * pc so:
(k * pc + 1) * bs > dfl; k * pc + 1 > dfl / bs; k * pc > dfl / bs - 1
k > (dfl / bs - 1) / pc; k >= (dfl / bs - 1) / pc + 1
k * pc >= ((dfl / bs - 1) / pc + 1) * pc == from.
*/
DBUG_ASSERT(page >= from);
if (share->now_transactional)
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
page_store(log_data + FILEID_STORE_SIZE, from);
page_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE, page);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
/*
We don't use info->trn so that this REDO is always executed even though
the UNDO does not reach disk due to crash. This is also consistent with
the fact that the new bitmap pages are not pinned.
*/
if (translog_write_record(&lsn, LOGREC_REDO_BITMAP_NEW_PAGE,
&dummy_transaction_object, info,
(translog_size_t)sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data, NULL))
goto err;
/*
No need to flush the log: the bitmap pages we are going to create will
flush it when they go to disk.
*/
}
/*
Last bitmap page. It has special creation: will go to the page cache
only later as we are going to modify it very soon.
*/
bzero(bitmap->map, bitmap->block_size);
bitmap->used_size= 0;
#ifndef DBUG_OFF
memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
#endif
/* Last bitmap page to create before 'page' */
DBUG_ASSERT(page >= bitmap->pages_covered);
to= page - bitmap->pages_covered;
/*
In run-time situations, from>=to is always false, i.e. we always create
one bitmap at a time ('page').
*/
if ((from <= to) &&
_ma_bitmap_create_missing_into_pagecache(share, bitmap, from, to,
bitmap->map))
goto err;
share->state.state.data_file_length= (page + 1) * bitmap->block_size;
DBUG_RETURN(FALSE);
err:
DBUG_RETURN(TRUE);
}
my_bool _ma_apply_redo_bitmap_new_page(MARIA_HA *info,
LSN lsn __attribute__ ((unused)),
const uchar *header)
{
MARIA_SHARE *share= info->s;
MARIA_FILE_BITMAP *bitmap= &share->bitmap;
my_bool error;
pgcache_page_no_t from, to, min_from;
DBUG_ENTER("_ma_apply_redo_bitmap_new_page");
from= page_korr(header);
to= page_korr(header + PAGE_STORE_SIZE);
DBUG_PRINT("info", ("from: %lu to: %lu", (ulong)from, (ulong)to));
if ((from > to) ||
(from % bitmap->pages_covered) != 0 ||
(to % bitmap->pages_covered) != 0)
{
error= TRUE; /* corrupted log record */
goto err;
}
min_from= (share->state.state.data_file_length / bitmap->block_size - 1) /
bitmap->pages_covered + 1;
min_from*= bitmap->pages_covered;
if (from < min_from)
{
DBUG_PRINT("info", ("overwrite bitmap pages from %lu", (ulong)min_from));
/*
We have to overwrite. It could be that there was a bitmap page in
memory, covering a data page which went to disk, then crash: the
bitmap page is now full of zeros and is ==min_from, we have to overwrite
it with correct checksum.
*/
}
share->state.changed|= STATE_CHANGED;
bzero(info->buff, bitmap->block_size);
if (!(error=
_ma_bitmap_create_missing_into_pagecache(share, bitmap, from, to,
info->buff)))
share->state.state.data_file_length= (to + 1) * bitmap->block_size;
err:
DBUG_RETURN(error);
}
......@@ -263,12 +263,16 @@
09 00 F4 1F Start position 9, length 8180
xx xx xx xx Checksum
A data page is allowed to have a wrong CRC and header as long as it is
marked empty in the bitmap and its directory's count is 0.
*/
#include "maria_def.h"
#include "ma_blockrec.h"
#include "trnman.h"
#include "ma_key_recover.h"
#include "ma_recovery_util.h"
#include <lf.h>
/*
......@@ -5643,8 +5647,8 @@ my_bool write_hook_for_undo(enum translog_record_type type
/**
@brief Sets the table's records count and checksum to 0, then calls the
generic REDO hook.
@brief Sets the table's records count and checksum and others to 0, then
calls the generic REDO hook.
@return Operation status, always 0 (success)
*/
......@@ -5655,9 +5659,8 @@ my_bool write_hook_for_redo_delete_all(enum translog_record_type type
__attribute__ ((unused)),
LSN *lsn, void *hook_arg)
{
MARIA_SHARE *share= tbl_info->s;
DBUG_ASSERT(tbl_info->state == &tbl_info->s->state.state);
share->state.state.records= share->state.state.checksum= 0;
_ma_reset_status(tbl_info);
return write_hook_for_redo(type, trn, tbl_info, lsn, hook_arg);
}
......@@ -6186,7 +6189,9 @@ err:
@brief Apply LOGREC_REDO_INSERT_ROW_BLOBS
@param info Maria handler
@param header Header (without FILEID)
@parma lsn LSN to put on pages
@param header Header (with FILEID)
@param redo_lsn REDO record's LSN
@note Write full pages (full head & blob pages)
......@@ -6196,17 +6201,21 @@ err:
*/
uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
LSN lsn, const uchar *header)
LSN lsn, const uchar *header,
LSN redo_lsn)
{
MARIA_SHARE *share= info->s;
const uchar *data;
uint data_size= FULL_PAGE_SIZE(share->block_size);
uint blob_count, ranges;
uint16 sid;
DBUG_ENTER("_ma_apply_redo_insert_row_blobs");
share->state.changed|= (STATE_CHANGED | STATE_NOT_ZEROFILLED |
STATE_NOT_MOVABLE);
sid= fileid_korr(header);
header+= FILEID_STORE_SIZE;
ranges= pagerange_korr(header);
header+= PAGERANGE_STORE_SIZE;
blob_count= pagerange_korr(header);
......@@ -6247,6 +6256,9 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
enum pagecache_page_pin unpin_method;
uint length;
if (_ma_redo_not_needed_for_page(sid, redo_lsn, page, FALSE))
continue;
if (((page + 1) * share->block_size) >
info->state->data_file_length)
{
......@@ -6355,7 +6367,7 @@ err:
Applying of UNDO entries
****************************************************************************/
/* Execute undo of a row insert (delete the inserted row) */
/** Execute undo of a row insert (delete the inserted row) */
my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header)
......@@ -6416,7 +6428,7 @@ err:
}
/* Execute undo of a row delete (insert the row back where it was) */
/** Execute undo of a row delete (insert the row back where it was) */
my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
const uchar *header,
......@@ -6652,7 +6664,7 @@ err:
}
/*
/**
Execute undo of a row update
@fn _ma_apply_undo_row_update()
......@@ -6816,6 +6828,34 @@ err:
}
/**
Execute undo of a bulk insert which used repair
@return Operation status
@retval 0 OK
@retval 1 Error
*/
my_bool _ma_apply_undo_bulk_insert_with_repair(MARIA_HA *info,
LSN undo_lsn)
{
my_bool error;
LSN lsn;
DBUG_ENTER("_ma_apply_undo_bulk_insert_with_repair");
/*
We delete all rows, re-enable indices as bulk insert had disabled
non-unique ones.
*/
error= (maria_delete_all_rows(info) ||
maria_enable_indexes(info) ||
/* we enabled indices so need '2' below */
_ma_state_info_write(info->s, 1|2|4) ||
_ma_write_clr(info, undo_lsn, LOGREC_UNDO_BULK_INSERT_WITH_REPAIR,
FALSE, 0, &lsn, NULL));
DBUG_RETURN(error);
}
/**
@brief Get the TRANSLOG_ADDRESS to flush up to
......
......@@ -236,14 +236,18 @@ uint _ma_apply_redo_free_blocks(MARIA_HA *info, LSN lsn,
const uchar *header);
uint _ma_apply_redo_free_head_or_tail(MARIA_HA *info, LSN lsn,
const uchar *header);
uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
LSN lsn, const uchar *header);
uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info, LSN lsn,
const uchar *header, LSN redo_lsn);
my_bool _ma_apply_redo_bitmap_new_page(MARIA_HA *info, LSN lsn,
const uchar *header);
my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header);
my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
const uchar *header, size_t length);
my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
const uchar *header, size_t length);
my_bool _ma_apply_undo_bulk_insert_with_repair(MARIA_HA *info,
LSN undo_lsn);
my_bool write_hook_for_redo(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
......
......@@ -93,10 +93,11 @@ static int _ma_safe_scan_block_record(MARIA_SORT_INFO *sort_info,
MARIA_HA *info, uchar *record);
static void copy_data_file_state(MARIA_STATE_INFO *to,
MARIA_STATE_INFO *from);
static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info);
static void report_keypage_fault(HA_CHECK *param, MARIA_HA *info,
my_off_t position);
static my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file);
static my_bool _ma_flush_table_files_before_swap(HA_CHECK *param,
MARIA_HA *info);
void maria_chk_init(HA_CHECK *param)
......@@ -2072,6 +2073,68 @@ err:
} /* maria_chk_data_link */
/**
Prepares a table for a repair or index sort: flushes pages, records durably
in the table that it is undergoing the operation (if that op crashes, that
info will serve for Recovery and the user).
If we start overwriting the index file, and crash then, old REDOs will
be tried and fail. To prevent that, we bump skip_redo_lsn, and thus we have
to flush and sync pages so that old REDOs can be skipped.
If this is not a bulk insert, which Recovery can handle gracefully (by
truncating files, see UNDO_BULK_INSERT_WITH_REPAIR) we also mark the table
crashed-on-repair, so that user knows it has to re-repair. If bulk insert we
shouldn't mark it crashed-on-repair, because if we did this, the UNDO phase
would skip the table (UNDO_BULK_INSERT_WITH_REPAIR would not be applied),
and maria_chk would not improve that.
If this is an OPTIMIZE which merely sorts index, we need to do the same
too: old REDOs should not apply to the new index file.
Only the flush is needed when in maria_chk which is not crash-safe.
@param info table
@param param repair parameters
@param discard_index if index pages can be thrown away
*/
static my_bool protect_against_repair_crash(MARIA_HA *info,
const HA_CHECK *param,
my_bool discard_index)
{
MARIA_SHARE *share= info->s;
/*
There are other than recovery-related reasons to do the writes below:
- the physical size of the data file is sometimes used during repair: we
need to flush to have it exact
- we flush the state because maria_open(HA_OPEN_COPY) will want to read
it from disk.
*/
if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
FLUSH_FORCE_WRITE,
discard_index ? FLUSH_IGNORE_CHANGED :
FLUSH_FORCE_WRITE) ||
(share->changed && _ma_state_info_write(share, 1|2|4)))
return TRUE;
/* In maria_chk this is not needed: */
if (maria_multi_threaded && share->base.born_transactional)
{
if ((param->testflag & T_NO_CREATE_RENAME_LSN) == 0)
{
/* this can be true only for a transactional table */
maria_mark_crashed_on_repair(info);
if (_ma_state_info_write(share, 1|4))
return TRUE;
}
if (translog_status == TRANSLOG_OK &&
_ma_update_state_lsns(share, translog_get_horizon(), FALSE, FALSE))
return TRUE;
if (_ma_sync_table_files(info))
return TRUE;
}
return FALSE;
}
/**
@brief Initialize variables for repair
*/
......@@ -2109,20 +2172,11 @@ static int initialize_variables_for_repair(HA_CHECK *param,
info->rec_cache.file= info->dfile.file;
info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
/* calculate max_records */
/*
The physical size of the data file is sometimes used during repair (see
sort_info.filelength further below); We need to flush to have it exact.
We flush the state because our maria_open(HA_OPEN_COPY) will want to read
it from disk. Index file will be recreated.
*/
if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
FLUSH_FORCE_WRITE,
(param->testflag & T_CREATE_MISSING_KEYS) ?
FLUSH_FORCE_WRITE : FLUSH_IGNORE_CHANGED) ||
(share->changed && _ma_state_info_write(share, 1|2|4)))
return(1);
if (protect_against_repair_crash(info, param, !test(param->testflag &
T_CREATE_MISSING_KEYS)))
return 1;
/* calculate max_records */
sort_info->filelength= my_seek(info->dfile.file, 0L, MY_SEEK_END, MYF(0));
if ((param->testflag & T_CREATE_MISSING_KEYS) ||
sort_info->org_data_file_type == COMPRESSED_RECORD)
......@@ -2172,7 +2226,7 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
char llbuff[22],llbuff2[22];
MARIA_SORT_INFO sort_info;
MARIA_SORT_PARAM sort_param;
my_bool block_record, scan_inited= 0;
my_bool block_record, scan_inited= 0, reenable_logging;
enum data_file_type org_data_file_type= share->data_file_type;
myf sync_dir= ((share->now_transactional && !share->temporary) ?
MY_SYNC_DIR : 0);
......@@ -2190,7 +2244,8 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
if (initialize_variables_for_repair(param, &sort_info, &sort_param, info,
rep_quick))
goto err;
if (share->now_transactional)
if ((reenable_logging= share->now_transactional))
_ma_tmp_disable_logging_for_table(info, 0);
new_header_length= ((param->testflag & T_UNPACK) ? 0L :
......@@ -2384,12 +2439,14 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
VOID(end_io_cache(&sort_info.new_info->rec_cache));
info->opt_flag&= ~WRITE_CACHE_USED;
/**
@todo RECOVERY BUG seems misplaced in some cases. We modify state after
writing it below. But if we move the call below too much down, flushing
of pages may happen too late, after files have been closed.
/*
As we have read the data file (sort_get_next_record()) we may have
cached, non-changed blocks of it in the page cache. We must throw them
away as we are going to close their descriptor ('new_file'). We also want
to flush any index block, so that it is ready for the upcoming sync.
*/
if (_ma_flush_table_files_after_repair(param, info))
if (_ma_flush_table_files_before_swap(param, info))
goto err;
if (!rep_quick)
......@@ -2460,7 +2517,7 @@ err:
if (! param->error_printed)
_ma_check_print_error(param,"%d for record at pos %s",my_errno,
llstr(sort_param.start_recpos,llbuff));
(void) _ma_flush_table_files_after_repair(param, info);
(void)_ma_flush_table_files_before_swap(param, info);
if (sort_info.new_info && sort_info.new_info != sort_info.info)
{
unuse_data_file_descriptor(sort_info.new_info);
......@@ -2473,14 +2530,8 @@ err:
}
maria_mark_crashed_on_repair(info);
}
else if (sync_dir)
{
/*
Now that we have flushed and forced everything, we can bump
create_rename_lsn:
*/
write_log_record_for_repair(param, info);
}
/* If caller had disabled logging it's not up to us to re-enable it */
if (reenable_logging)
_ma_reenable_logging_for_table(info);
my_free(sort_param.rec_buff, MYF(MY_ALLOW_ZERO_PTR));
......@@ -2632,30 +2683,28 @@ void maria_lock_memory(HA_CHECK *param __attribute__((unused)))
/**
Flush all changed blocks to disk so that we can say "at the end of repair,
the table is fully ok on disk".
Flush all changed blocks to disk.
It is a requirement for transactional tables.
We release blocks as it's unlikely that they would all be needed soon.
This function needs to be called before swapping data or index files or
syncing them.
@param param description of the repair operation
@param info table
*/
int _ma_flush_table_files_after_repair(HA_CHECK *param, MARIA_HA *info)
static my_bool _ma_flush_table_files_before_swap(HA_CHECK *param,
MARIA_HA *info)
{
MARIA_SHARE *share= info->s;
DBUG_ENTER("_ma_flush_table_files_after_repair");
DBUG_ENTER("_ma_flush_table_files_before_swap");
if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
FLUSH_RELEASE, FLUSH_RELEASE) ||
_ma_state_info_write(share, 1|4) ||
(share->base.born_transactional && _ma_sync_table_files(info)))
FLUSH_RELEASE, FLUSH_RELEASE))
{
_ma_check_print_error(param,"%d when trying to write bufferts",my_errno);
DBUG_RETURN(1);
_ma_check_print_error(param, "%d when trying to write buffers", my_errno);
DBUG_RETURN(TRUE);
}
DBUG_RETURN(0);
} /* _ma_flush_table_files_after_repair */
DBUG_RETURN(FALSE);
}
/* Sort index for more efficent reads */
......@@ -2683,6 +2732,9 @@ int maria_sort_index(HA_CHECK *param, register MARIA_HA *info, char *name)
if (!(param->testflag & T_SILENT))
printf("- Sorting index for MARIA-table '%s'\n",name);
if (protect_against_repair_crash(info, param, FALSE))
DBUG_RETURN(1);
/* Get real path for index file */
fn_format(param->temp_filename,name,"", MARIA_NAME_IEXT,2+4+32);
if ((new_file=my_create(fn_format(param->temp_filename,param->temp_filename,
......@@ -3423,6 +3475,9 @@ int maria_repair_by_sort(HA_CHECK *param, register MARIA_HA *info,
(ulonglong) info->state->records);
maria_set_key_active(share->state.key_map, sort_param.key);
if (_ma_flush_table_files_before_swap(param, info))
goto err;
if (sort_param.fix_datafile)
{
param->read_cache.end_of_file=sort_param.filepos;
......@@ -3447,13 +3502,6 @@ int maria_repair_by_sort(HA_CHECK *param, register MARIA_HA *info,
}
}
/** @todo RECOVERY BUG seems misplaced in some cases */
if (_ma_flush_table_files_after_repair(param, info))
{
_ma_check_print_error(param, "Got error when flushing caches");
goto err;
}
sort_info.new_info->state->data_file_length= sort_param.filepos;
if (sort_info.new_info != sort_info.info)
{
......@@ -3570,7 +3618,7 @@ err:
{
if (! param->error_printed)
_ma_check_print_error(param,"%d when fixing table",my_errno);
(void) _ma_flush_table_files_after_repair(param, info);
(void)_ma_flush_table_files_before_swap(param, info);
if (sort_info.new_info && sort_info.new_info != sort_info.info)
{
unuse_data_file_descriptor(sort_info.new_info);
......@@ -3602,7 +3650,6 @@ err:
fflush(DBUG_FILE);
abort();
});
write_log_record_for_repair(param, info);
}
share->state.changed|= STATE_NOT_SORTED_PAGES;
if (!rep_quick)
......@@ -3782,12 +3829,6 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
if (!(param->testflag & T_CREATE_MISSING_KEYS))
{
/*
Flush key cache for this file if we are calling this outside
maria_chk
*/
flush_pagecache_blocks(share->pagecache, &share->kfile,
FLUSH_IGNORE_CHANGED);
/* Clear the pointers to the given rows */
for (i=0 ; i < share->base.keys ; i++)
share->state.key_root[i]= HA_OFFSET_ERROR;
......@@ -3795,12 +3836,7 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
info->state->key_file_length=share->base.keystart;
}
else
{
if (flush_pagecache_blocks(share->pagecache, &share->kfile,
FLUSH_FORCE_WRITE))
goto err;
key_map= ~key_map; /* Create the missing keys */
}
param->read_cache.end_of_file= sort_info.filelength;
......@@ -3985,6 +4021,9 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
}
got_error=1; /* Assume the following may go wrong */
if (_ma_flush_table_files_before_swap(param, info))
goto err;
if (sort_param[0].fix_datafile)
{
/*
......@@ -4084,8 +4123,6 @@ err:
*/
if (!rep_quick)
VOID(end_io_cache(&new_data_cache));
/** @todo RECOVERY BUG seems misplaced in some cases */
got_error|= _ma_flush_table_files_after_repair(param, info);
if (!got_error)
{
/* Replace the actual file with the temporary file */
......@@ -4106,6 +4143,7 @@ err:
{
if (! param->error_printed)
_ma_check_print_error(param,"%d when fixing table",my_errno);
(void)_ma_flush_table_files_before_swap(param, info);
if (new_file >= 0)
{
VOID(my_close(new_file,MYF(0)));
......@@ -6153,16 +6191,7 @@ read_next_page:
/**
@brief Writes a LOGREC_REPAIR_TABLE record and updates create_rename_lsn
and is_of_horizon
REPAIR/OPTIMIZE have replaced the data/index file with a new file
and so, in this scenario:
@verbatim
CHECKPOINT - REDO_INSERT - COMMIT - ... - REPAIR - ... - crash
@endverbatim
we do not want Recovery to apply the REDO_INSERT to the table, as it would
then possibly wrongly extend the table. By updating create_rename_lsn at
the end of REPAIR, we know that REDO_INSERT will be skipped.
if needed (so that maria_read_log does not redo the repair).
@param param description of the REPAIR operation
@param info table
......@@ -6172,7 +6201,7 @@ read_next_page:
@retval 1 error (disk problem)
*/
static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info)
my_bool write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info)
{
MARIA_SHARE *share= info->s;
/* in case this is maria_chk or recovery... */
......@@ -6220,18 +6249,46 @@ static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info)
sizeof(log_array)/sizeof(log_array[0]),
log_array, log_data, NULL) ||
translog_flush(lsn)))
return 1;
return TRUE;
/*
The table's existence was made durable earlier (MY_SYNC_DIR passed to
maria_change_to_newfile()). _ma_flush_table_files_after_repair() was
called earlier, flushed and forced data+index+state. Old REDOs should
not be applied to the table:
maria_change_to_newfile()). All pages have been flushed, state too, we
need to force it to disk. Old REDOs should not be applied to the table,
which is already enforced as skip_redos_lsn was increased in
protect_against_repair_crash(). But if this is an explicit repair,
even UNDO phase should ignore this table: create_rename_lsn should be
increased, and this also serves for the REDO_REPAIR to be ignored by
maria_read_log.
The fully correct order would be: sync data and index file, remove crash
mark and update LSNs then write state and sync index file. But at this
point state (without crash mark) is already written.
*/
if (_ma_update_create_rename_lsn(share, lsn, TRUE))
return 1;
if ((!(param->testflag & T_NO_CREATE_RENAME_LSN) &&
_ma_update_state_lsns(share, lsn, FALSE, FALSE)) ||
_ma_sync_table_files(info))
return TRUE;
share->now_transactional= save_now_transactional;
}
return 0;
return FALSE;
}
my_bool write_log_record_for_bulk_insert_with_repair(MARIA_HA *info)
{
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE];
LSN lsn;
lsn_store(log_data, info->trn->undo_lsn);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
return translog_write_record(&lsn, LOGREC_UNDO_BULK_INSERT_WITH_REPAIR,
info->trn, info,
(translog_size_t)
log_array[TRANSLOG_INTERNAL_PARTS +
0].length,
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data + LSN_STORE_SIZE, NULL) ||
translog_flush(lsn); /* WAL */
}
......
......@@ -1077,7 +1077,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
DROP+CREATE happened (applying REDOs to the wrong table).
*/
share.kfile.file= file;
if (_ma_update_create_rename_lsn_sub(&share, lsn, FALSE))
if (_ma_update_state_lsns_sub(&share, lsn, FALSE, TRUE))
goto err;
my_free(log_data, MYF(0));
}
......@@ -1288,43 +1288,47 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile)
/**
@brief Writes create_rename_lsn and is_of_horizon to disk, can force.
@brief Writes create_rename_lsn, skip_redo_lsn and is_of_horizon to disk,
can force.
This is for special cases where:
- we don't want to write the full state to disk (so, not call
_ma_state_info_write()) because some parts of the state may be
currently inconsistent, or because it would be overkill
- we must sync these LSNs immediately for correctness.
It acquires intern_lock to protect the two LSNs and state write.
It acquires intern_lock to protect the LSNs and state write.
@param share table's share
@param do_sync if the write should be forced to disk
@param update_create_rename_lsn if this LSN should be updated or not
@return Operation status
@retval 0 ok
@retval 1 error (disk problem)
*/
int _ma_update_create_rename_lsn(MARIA_SHARE *share,
LSN lsn, my_bool do_sync)
int _ma_update_state_lsns(MARIA_SHARE *share, LSN lsn, my_bool do_sync,
my_bool update_create_rename_lsn)
{
int res;
pthread_mutex_lock(&share->intern_lock);
res= _ma_update_create_rename_lsn_sub(share, lsn, do_sync);
res= _ma_update_state_lsns_sub(share, lsn, do_sync,
update_create_rename_lsn);
pthread_mutex_unlock(&share->intern_lock);
return res;
}
/**
@brief Writes create_rename_lsn and is_of_horizon to disk, can force.
@brief Writes create_rename_lsn, skip_redo_lsn and is_of_horizon to disk,
can force.
Shortcut of _ma_update_create_rename_lsn() when we know that
intern_lock is not needed (when creating a table or opening it for the
first time).
Shortcut of _ma_update_state_lsns() when we know that intern_lock is not
needed (when creating a table or opening it for the first time).
@param share table's share
@param do_sync if the write should be forced to disk
@param update_create_rename_lsn if this LSN should be updated or not
@return Operation status
@retval 0 ok
......@@ -1338,15 +1342,18 @@ int _ma_update_create_rename_lsn(MARIA_SHARE *share,
*/
#pragma optimize("",off)
#endif
int _ma_update_create_rename_lsn_sub(MARIA_SHARE *share,
LSN lsn, my_bool do_sync)
int _ma_update_state_lsns_sub(MARIA_SHARE *share, LSN lsn, my_bool do_sync,
my_bool update_create_rename_lsn)
{
char buf[LSN_STORE_SIZE*2], *ptr;
char buf[LSN_STORE_SIZE * 3], *ptr;
File file= share->kfile.file;
DBUG_ASSERT(file >= 0);
for (ptr= buf; ptr < (buf + sizeof(buf)); ptr+= LSN_STORE_SIZE)
lsn_store(ptr, lsn);
share->state.is_of_horizon= share->state.create_rename_lsn= lsn;
share->state.skip_redo_lsn= share->state.is_of_horizon= lsn;
if (update_create_rename_lsn)
{
share->state.create_rename_lsn= lsn;
if (share->id != 0)
{
/*
......@@ -1360,6 +1367,9 @@ int _ma_update_create_rename_lsn_sub(MARIA_SHARE *share,
*/
translog_deassign_id_from_share(share);
}
}
else
lsn_store(buf, share->state.create_rename_lsn);
return my_pwrite(file, buf, sizeof(buf),
sizeof(share->state.header) +
MARIA_FILE_CREATE_RENAME_LSN_OFFSET, MYF(MY_NABP)) ||
......
......@@ -33,6 +33,7 @@ int maria_delete_all_rows(MARIA_HA *info)
{
MARIA_SHARE *share= info->s;
my_bool log_record;
LSN lsn;
DBUG_ENTER("maria_delete_all_rows");
if (share->options & HA_OPTION_READ_ONLY_DATA)
......@@ -54,9 +55,8 @@ int maria_delete_all_rows(MARIA_HA *info)
{
/*
This record will be used by Recovery to finish the deletion if it
crashed. We force it because it's a non-undoable operation.
crashed. We force it to have a complete history in the log.
*/
LSN lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[FILEID_STORE_SIZE];
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
......@@ -72,19 +72,17 @@ int maria_delete_all_rows(MARIA_HA *info)
inconsistent.
*/
}
/*
For recovery it matters that this is called after writing the log record,
so that resetting state.records and state.checksum actually happens under
log's mutex.
*/
else
{
/* Other branch called function below when writing log record, in hook */
_ma_reset_status(info);
}
/*
If we are using delayed keys or if the user has done changes to the tables
since it was locked then there may be key blocks in the page cache. Or
there may be data blocks there. We need to throw them away or they may
re-enter the emptied table later.
re-enter the emptied table or another table later.
*/
if (_ma_flush_table_files(info, MARIA_FLUSH_DATA|MARIA_FLUSH_INDEX,
FLUSH_IGNORE_CHANGED, FLUSH_IGNORE_CHANGED) ||
......@@ -95,12 +93,25 @@ int maria_delete_all_rows(MARIA_HA *info)
if (_ma_initialize_data_file(share, info->dfile.file))
goto err;
if (log_record)
{
/*
The operations above on the index/data file will be forced to disk at
Checkpoint or maria_close() time. So we can reset:
Because LOGREC_REDO_DELETE_ALL does not operate on pages, it has the
following problem:
delete_all; inserts (redo_insert); all pages get flushed; checkpoint:
the dirty pages list will be empty. In recovery, delete_all is executed,
but redo_insert are skipped (dirty pages list is empty).
To avoid this, we need to set skip_redo_lsn now, and thus need to sync
files.
*/
if (log_record)
my_bool error= _ma_state_info_write(share, 1|4) ||
_ma_update_state_lsns(share, lsn, FALSE, FALSE) ||
_ma_sync_table_files(info);
info->trn->rec_lsn= LSN_IMPOSSIBLE;
if (error)
goto err;
}
VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
#ifdef HAVE_MMAP
......
......@@ -550,14 +550,9 @@ int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index,
/* flush data file first because it's more critical */
if (flush_data_or_index & MARIA_FLUSH_DATA)
{
if (info->opt_flag & WRITE_CACHE_USED)
{
/* normally any code which creates a WRITE_CACHE destroys it later */
DBUG_ASSERT(0);
if (end_io_cache(&info->rec_cache))
if ((info->opt_flag & WRITE_CACHE_USED) &&
flush_io_cache(&info->rec_cache))
goto err;
info->opt_flag&= ~WRITE_CACHE_USED;
}
if (share->data_file_type == BLOCK_RECORD)
{
if(_ma_bitmap_flush(share) ||
......
......@@ -148,6 +148,7 @@ my_bool write_hook_for_clr_end(enum translog_record_type type
MARIA_SHARE *share= tbl_info->s;
struct st_msg_to_write_hook_for_clr_end *msg=
(struct st_msg_to_write_hook_for_clr_end *)hook_arg;
my_bool error= FALSE;
DBUG_ASSERT(trn->trid != 0);
trn->undo_lsn= msg->previous_undo_lsn;
......@@ -175,12 +176,18 @@ my_bool write_hook_for_clr_end(enum translog_record_type type
case LOGREC_UNDO_KEY_INSERT:
case LOGREC_UNDO_KEY_DELETE:
break;
case LOGREC_UNDO_BULK_INSERT_WITH_REPAIR:
error= (maria_enable_indexes(tbl_info) ||
/* we enabled indices, need '2' below */
_ma_state_info_write(share, 1|2|4));
/* no need for _ma_reset_status(): REDO_DELETE_ALL is just before us */
break;
default:
DBUG_ASSERT(0);
}
if (trn->undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
return 0;
return error;
}
......@@ -966,14 +973,17 @@ my_bool _ma_apply_undo_key_insert(MARIA_HA *info, LSN undo_lsn,
/**
@brief Undo of delete of key (ie, insert the deleted key)
@param with_root If the UNDO is UNDO_KEY_DELETE_WITH_ROOT
*/
my_bool _ma_apply_undo_key_delete(MARIA_HA *info, LSN undo_lsn,
const uchar *header, uint length)
const uchar *header, uint length,
my_bool with_root)
{
LSN lsn;
my_bool res;
uint keynr;
uint keynr, skip_bytes;
uchar key[HA_MAX_KEY_BUFF];
MARIA_SHARE *share= info->s;
my_off_t new_root;
......@@ -984,10 +994,12 @@ my_bool _ma_apply_undo_key_delete(MARIA_HA *info, LSN undo_lsn,
STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
STATE_NOT_MOVABLE);
keynr= key_nr_korr(header);
length-= KEY_NR_STORE_SIZE;
skip_bytes= KEY_NR_STORE_SIZE + (with_root ? PAGE_STORE_SIZE : 0);
header+= skip_bytes;
length-= skip_bytes;
/* We have to copy key as _ma_ck_real_write_btree() may change it */
memcpy(key, header + KEY_NR_STORE_SIZE, length);
memcpy(key, header, length);
DBUG_DUMP("key", key, length);
new_root= share->state.key_root[keynr];
......
......@@ -78,7 +78,8 @@ uint _ma_apply_redo_index(MARIA_HA *info,
my_bool _ma_apply_undo_key_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header, uint length);
my_bool _ma_apply_undo_key_delete(MARIA_HA *info, LSN undo_lsn,
const uchar *header, uint length);
const uchar *header, uint length,
my_bool with_root);
static inline void _ma_finalize_row(MARIA_HA *info)
{
......
......@@ -624,6 +624,7 @@ void _ma_mark_file_crashed(MARIA_SHARE *share)
sizeof(share->state.header) +
MARIA_FILE_CHANGED_OFFSET,
MYF(MY_NABP));
DBUG_VOID_RETURN;
}
......
......@@ -613,6 +613,18 @@ static LOG_DESC INIT_LOGREC_INCOMPLETE_GROUP=
NULL, NULL, NULL, 0,
"incomplete_group", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_BULK_INSERT_WITH_REPAIR=
{LOGRECTYPE_VARIABLE_LENGTH, 0,
LSN_STORE_SIZE + FILEID_STORE_SIZE,
NULL, write_hook_for_undo, NULL, 1,
"undo_bulk_insert_with_repair", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_BITMAP_NEW_PAGE=
{LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2,
FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2,
NULL, NULL, NULL, 0,
"redo_create_bitmap", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
const myf log_write_flags= MY_WME | MY_NABP | MY_WAIT_IF_FULL;
void translog_table_init()
......@@ -696,12 +708,14 @@ void translog_table_init()
INIT_LOGREC_INCOMPLETE_LOG;
log_record_type_descriptor[LOGREC_INCOMPLETE_GROUP]=
INIT_LOGREC_INCOMPLETE_GROUP;
for (i= LOGREC_INCOMPLETE_GROUP + 1;
i < LOGREC_NUMBER_OF_TYPES;
i++)
log_record_type_descriptor[LOGREC_UNDO_BULK_INSERT_WITH_REPAIR]=
INIT_LOGREC_UNDO_BULK_INSERT_WITH_REPAIR;
log_record_type_descriptor[LOGREC_REDO_BITMAP_NEW_PAGE]=
INIT_LOGREC_REDO_BITMAP_NEW_PAGE;
for (i= LOGREC_FIRST_FREE; i < LOGREC_NUMBER_OF_TYPES; i++)
log_record_type_descriptor[i].rclass= LOGRECTYPE_NOT_ALLOWED;
#ifndef DBUG_OFF
check_translog_description_table(LOGREC_INCOMPLETE_GROUP);
check_translog_description_table(LOGREC_FIRST_FREE -1);
#endif
};
......
......@@ -74,7 +74,7 @@ struct st_maria_handler;
/* Store methods to match the above sizes */
#define fileid_store(T,A) int2store(T,A)
#define page_store(T,A) int5store(T,A)
#define page_store(T,A) int5store(T,((ulonglong)(A)))
#define dirpos_store(T,A) ((*(uchar*) (T)) = A)
#define pagerange_store(T,A) int2store(T,A)
#define clr_type_store(T,A) ((*(uchar*) (T)) = A)
......@@ -112,8 +112,8 @@ enum translog_record_type
LOGREC_REDO_PURGE_ROW_TAIL,
LOGREC_REDO_FREE_BLOCKS,
LOGREC_REDO_FREE_HEAD_OR_TAIL,
LOGREC_REDO_DELETE_ROW,
LOGREC_REDO_UPDATE_ROW_HEAD,
LOGREC_REDO_DELETE_ROW, /* unused */
LOGREC_REDO_UPDATE_ROW_HEAD, /* unused */
LOGREC_REDO_INDEX,
LOGREC_REDO_INDEX_NEW_PAGE,
LOGREC_REDO_INDEX_FREE_PAGE,
......@@ -141,6 +141,9 @@ enum translog_record_type
LOGREC_LONG_TRANSACTION_ID,
LOGREC_INCOMPLETE_LOG,
LOGREC_INCOMPLETE_GROUP,
LOGREC_UNDO_BULK_INSERT_WITH_REPAIR,
LOGREC_REDO_BITMAP_NEW_PAGE,
LOGREC_FIRST_FREE,
LOGREC_RESERVED_FUTURE_EXTENSION= 63
};
#define LOGREC_NUMBER_OF_TYPES 64 /* Maximum, can't be extended */
......
......@@ -332,7 +332,7 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
if ((share->options & HA_OPTION_RELIES_ON_SQL_LAYER) &&
! (open_flags & HA_OPEN_FROM_SQL_LAYER))
{
DBUG_PRINT("error", ("table cannot be openned from non-sql layer"));
DBUG_PRINT("error", ("table cannot be opened from non-sql layer"));
my_errno= HA_ERR_UNSUPPORTED;
goto err;
}
......@@ -628,6 +628,7 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
LSN_STORE_SIZE + TRANSID_SIZE :
0) + KEYPAGE_KEYID_SIZE + KEYPAGE_FLAG_SIZE +
KEYPAGE_USED_SIZE);
share->kfile.file= kfile;
if (open_flags & HA_OPEN_COPY)
{
......@@ -652,13 +653,16 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
view of the server, including server's recovery) now.
*/
if ((open_flags & HA_OPEN_FROM_SQL_LAYER) || maria_in_recovery)
_ma_update_create_rename_lsn_sub(share, translog_get_horizon(),
TRUE);
_ma_update_state_lsns_sub(share, translog_get_horizon(),
TRUE, TRUE);
}
else if ((!LSN_VALID(share->state.create_rename_lsn) ||
!LSN_VALID(share->state.is_of_horizon) ||
(cmp_translog_addr(share->state.create_rename_lsn,
share->state.is_of_horizon) > 0)) &&
share->state.is_of_horizon) > 0) ||
!LSN_VALID(share->state.skip_redo_lsn) ||
(cmp_translog_addr(share->state.create_rename_lsn,
share->state.skip_redo_lsn) > 0)) &&
!(open_flags & HA_OPEN_FOR_REPAIR))
{
/*
......@@ -735,7 +739,6 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
share->tot_locks++;
}
share->kfile.file= kfile;
_ma_set_index_pagecache_callbacks(&share->kfile, share);
share->this_process=(ulong) getpid();
share->last_process= share->state.process;
......@@ -1140,12 +1143,12 @@ uint _ma_state_info_write_sub(File file, MARIA_STATE_INFO *state, uint pWrite)
mi_int2store(ptr,state->changed); ptr+= 2;
/*
if you change the offset of create_rename_lsn/is_of_horizon inside the
index file's header, fix ma_create + ma_rename + ma_delete_all +
backward-compatibility.
If you change the offset of these LSNs, note that some functions do a
direct write of them without going through this function.
*/
lsn_store(ptr, state->create_rename_lsn); ptr+= LSN_STORE_SIZE;
lsn_store(ptr, state->is_of_horizon); ptr+= LSN_STORE_SIZE;
lsn_store(ptr, state->skip_redo_lsn); ptr+= LSN_STORE_SIZE;
mi_rowstore(ptr,state->state.records); ptr+= 8;
mi_rowstore(ptr,state->state.del); ptr+= 8;
mi_rowstore(ptr,state->split); ptr+= 8;
......@@ -1211,6 +1214,7 @@ static uchar *_ma_state_info_read(uchar *ptr, MARIA_STATE_INFO *state)
state->changed= mi_uint2korr(ptr); ptr+= 2;
state->create_rename_lsn= lsn_korr(ptr); ptr+= LSN_STORE_SIZE;
state->is_of_horizon= lsn_korr(ptr); ptr+= LSN_STORE_SIZE;
state->skip_redo_lsn= lsn_korr(ptr); ptr+= LSN_STORE_SIZE;
state->state.records= mi_rowkorr(ptr); ptr+= 8;
state->state.del = mi_rowkorr(ptr); ptr+= 8;
state->split = mi_rowkorr(ptr); ptr+= 8;
......@@ -1695,7 +1699,8 @@ int maria_enable_indexes(MARIA_HA *info)
int error= 0;
MARIA_SHARE *share= info->s;
if (share->state.state.data_file_length ||
if ((share->state.state.data_file_length !=
(share->data_file_type == BLOCK_RECORD ? share->block_size : 0)) ||
(share->state.state.key_file_length != share->base.keystart))
{
maria_print_error(info->s, HA_ERR_CRASHED);
......
......@@ -26,17 +26,13 @@
#include "ma_checkpoint.h"
#include "trnman.h"
#include "ma_key_recover.h"
#include "ma_recovery_util.h"
struct st_trn_for_recovery /* used only in the REDO phase */
{
LSN group_start_lsn, undo_lsn, first_undo_lsn;
TrID long_trid;
};
struct st_dirty_page /* used only in the REDO phase */
{
uint64 file_and_page_id;
LSN rec_lsn;
};
struct st_table_for_recovery /* used in the REDO and UNDO phase */
{
MARIA_HA *info;
......@@ -44,25 +40,19 @@ struct st_table_for_recovery /* used in the REDO and UNDO phase */
/* Variables used by all functions of this module. Ok as single-threaded */
static struct st_trn_for_recovery *all_active_trans;
static struct st_table_for_recovery *all_tables;
static HASH all_dirty_pages;
static struct st_dirty_page *dirty_pages_pool;
static LSN current_group_end_lsn;
/*
LSN after which dirty pages list does not apply. Can be slightly before
when ma_checkpoint_execute() started.
*/
static LSN checkpoint_start= LSN_IMPOSSIBLE;
#ifndef DBUG_OFF
/** Current group of REDOs is about this table and only this one */
static MARIA_HA *current_group_table;
#endif
static TrID max_long_trid= 0; /**< max long trid seen by REDO phase */
static FILE *tracef; /**< trace file for debugging */
static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */
/** @brief to avoid writing a checkpoint if recovery did nothing. */
static my_bool checkpoint_useful;
/** @todo looks like duplicate of recovery_message_printed */
static my_bool procent_printed;
static my_bool in_redo_phase;
static my_bool trns_created;
static ulong skipped_undo_phase;
static ulonglong now; /**< for tracking execution time of phases */
static int (*save_error_handler_hook)(uint, const char *,myf);
static uint recovery_warnings; /**< count of warnings */
......@@ -86,6 +76,7 @@ prototype_redo_exec_hook(REDO_DROP_TABLE);
prototype_redo_exec_hook(FILE_ID);
prototype_redo_exec_hook(INCOMPLETE_LOG);
prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP);
prototype_redo_exec_hook(UNDO_BULK_INSERT_WITH_REPAIR);
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
......@@ -97,6 +88,7 @@ prototype_redo_exec_hook(REDO_DELETE_ALL);
prototype_redo_exec_hook(REDO_INDEX);
prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE);
prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE);
prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
prototype_redo_exec_hook(UNDO_ROW_INSERT);
prototype_redo_exec_hook(UNDO_ROW_DELETE);
prototype_redo_exec_hook(UNDO_ROW_UPDATE);
......@@ -111,6 +103,7 @@ prototype_undo_exec_hook(UNDO_ROW_UPDATE);
prototype_undo_exec_hook(UNDO_KEY_INSERT);
prototype_undo_exec_hook(UNDO_KEY_DELETE);
prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
prototype_undo_exec_hook(UNDO_BULK_INSERT_WITH_REPAIR);
static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
......@@ -134,6 +127,7 @@ static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
static int close_all_tables(void);
static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr);
static void print_redo_phase_progress(TRANSLOG_ADDRESS addr);
static void delete_all_transactions();
/** @brief global [out] buffer for translog_read_record(); never shrinks */
static struct
......@@ -160,54 +154,6 @@ static enum recovery_message_type
{
REC_MSG_NONE= 0, REC_MSG_REDO, REC_MSG_UNDO, REC_MSG_FLUSH
} recovery_message_printed;
/** @brief Prints to a trace file if it is not NULL */
void tprint(FILE *trace_file, const char *format, ...)
ATTRIBUTE_FORMAT(printf, 2, 3);
void tprint(FILE *trace_file __attribute__ ((unused)),
const char *format __attribute__ ((unused)), ...)
{
va_list args;
va_start(args, format);
DBUG_PRINT("info", ("%s", format));
if (trace_file != NULL)
{
if (procent_printed)
{
procent_printed= 0;
fputc('\n', trace_file);
}
vfprintf(trace_file, format, args);
}
va_end(args);
}
void eprint(FILE *trace_file, const char *format, ...)
ATTRIBUTE_FORMAT(printf, 2, 3);
void eprint(FILE *trace_file __attribute__ ((unused)),
const char *format __attribute__ ((unused)), ...)
{
va_list args;
va_start(args, format);
DBUG_PRINT("error", ("%s", format));
if (!trace_file)
trace_file= stderr;
if (procent_printed)
{
/* In silent mode, print on another line than the 0% 10% 20% line */
procent_printed= 0;
fputc('\n', trace_file);
}
vfprintf(trace_file , format, args);
fputc('\n', trace_file);
if (trace_file != stderr)
{
va_start(args, format);
my_printv_error(HA_ERR_INITIALIZATION, format, MYF(0), args);
}
va_end(args);
}
/* Hook to ensure we get nicer output if we get an error */
......@@ -269,15 +215,8 @@ int maria_recover(void)
if (warnings_count == 0)
tprint(trace_file, "SUCCESS\n");
else
{
tprint(trace_file, "DOUBTFUL (%u warnings, check previous output)\n",
warnings_count);
/*
We asked for execution of UNDOs, and skipped DDLs, so shouldn't get
any warnings.
*/
DBUG_ASSERT(0);
}
}
if (trace_file)
fclose(trace_file);
......@@ -339,8 +278,14 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
goto err;
recovery_message_printed= REC_MSG_NONE;
checkpoint_useful= trns_created= FALSE;
tracef= trace_file;
#ifdef INSTANT_FLUSH_OF_MESSAGES
/* enable this for instant flush of messages to trace file */
setbuf(tracef, NULL);
#endif
skip_DDLs= skip_DDLs_arg;
skipped_undo_phase= 0;
if (from_lsn == LSN_IMPOSSIBLE)
{
......@@ -359,6 +304,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
}
now= my_getsystime();
in_redo_phase= TRUE;
if (run_redo_phase(from_lsn, apply))
{
ma_message_no_user(0, "Redo phase failed");
......@@ -371,6 +317,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
ma_message_no_user(0, "End of redo phase failed");
goto err;
}
in_redo_phase= FALSE;
old_now= now;
now= my_getsystime();
......@@ -383,6 +330,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
*/
procent_printed= 1;
fprintf(stderr, " (%.1f seconds); ", phase_took);
fflush(stderr);
}
/**
......@@ -424,6 +372,17 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
recovery_warnings++;
}
if (skipped_undo_phase)
{
/*
We could want to print a list of tables for which UNDOs were skipped,
but not one line per skipped UNDO.
*/
eprint(tracef, "***WARNING: %lu UNDO records skipped in UNDO phase; some"
" tables may be left inconsistent!***", skipped_undo_phase);
recovery_warnings++;
}
old_now= now;
now= my_getsystime();
if (recovery_message_printed == REC_MSG_UNDO)
......@@ -431,6 +390,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
double phase_took= (now - old_now)/10000000.0;
procent_printed= 1;
fprintf(stderr, " (%.1f seconds); ", phase_took);
fflush(stderr);
}
/*
......@@ -450,6 +410,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
double phase_took= (now - old_now)/10000000.0;
procent_printed= 1;
fprintf(stderr, " (%.1f seconds); ", phase_took);
fflush(stderr);
}
if (take_checkpoints && checkpoint_useful)
......@@ -463,6 +424,8 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
err:
error= 1;
tprint(tracef, "\nRecovery of tables with transaction logs FAILED\n");
if (trns_created)
delete_all_transactions();
end:
error_handler_hook= save_error_handler_hook;
hash_free(&all_dirty_pages);
......@@ -484,6 +447,7 @@ end:
{
procent_printed= 0;
fprintf(stderr, "\n");
fflush(stderr);
}
if (!error)
ma_message_no_user(ME_JUST_INFO, "recovery done");
......@@ -600,6 +564,7 @@ prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP)
return 0;
}
prototype_redo_exec_hook(INCOMPLETE_LOG)
{
MARIA_HA *info;
......@@ -645,12 +610,12 @@ prototype_redo_exec_hook(INCOMPLETE_LOG)
prototype_redo_exec_hook(REDO_CREATE_TABLE)
{
File dfile= -1, kfile= -1;
char *linkname_ptr, filename[FN_REFLEN], *name, *ptr, *data_file_name,
*index_file_name;
char *linkname_ptr, filename[FN_REFLEN], *name, *ptr, *ptr2,
*data_file_name, *index_file_name;
uchar *kfile_header;
myf create_flag;
uint flags;
int error= 1, create_mode= O_RDWR | O_TRUNC;
int error= 1, create_mode= O_RDWR | O_TRUNC, i;
MARIA_HA *info= NULL;
uint kfile_size_before_extension, keystart;
......@@ -738,13 +703,14 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
ptr+= 2;
kfile_header= (uchar *)ptr;
ptr+= kfile_size_before_extension;
/* set create_rename_lsn (for maria_read_log to be idempotent) */
lsn_store(kfile_header + sizeof(info->s->state.header) +
MARIA_FILE_CREATE_RENAME_LSN_OFFSET, rec->lsn);
/* we also set is_of_horizon, like maria_create() does */
lsn_store(kfile_header + sizeof(info->s->state.header) +
MARIA_FILE_CREATE_RENAME_LSN_OFFSET + LSN_STORE_SIZE,
rec->lsn);
/* set header lsns */
ptr2= kfile_header + sizeof(info->s->state.header) +
MARIA_FILE_CREATE_RENAME_LSN_OFFSET;
for (i= 0; i<3; i++)
{
lsn_store(ptr2, rec->lsn);
ptr2+= LSN_STORE_SIZE;
}
data_file_name= ptr;
ptr+= strlen(data_file_name) + 1;
index_file_name= ptr;
......@@ -975,7 +941,7 @@ prototype_redo_exec_hook(REDO_RENAME_TABLE)
eprint(tracef, "Failed to open renamed table");
goto end;
}
if (_ma_update_create_rename_lsn(info->s, rec->lsn, TRUE))
if (_ma_update_state_lsns(info->s, rec->lsn, TRUE, TRUE))
goto end;
if (maria_close(info))
goto end;
......@@ -1051,7 +1017,8 @@ prototype_redo_exec_hook(REDO_REPAIR_TABLE)
else if (maria_repair(&param, info, name, quick_repair))
goto end;
if (_ma_update_create_rename_lsn(info->s, rec->lsn, TRUE))
if (_ma_update_state_lsns(info->s, rec->lsn, TRUE,
!(param.testflag & T_NO_CREATE_RENAME_LSN)))
goto end;
error= 0;
......@@ -1248,8 +1215,9 @@ static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id)
}
if (maria_is_crashed(info))
{
/** @todo what should we do? how to continue recovery? */
tprint(tracef, "Table is crashed, can't apply log records to it\n");
eprint(tracef, "Table '%s' is crashed, skipping it. Please repair it with"
" maria_chk -r", share->open_file_name);
error= -1; /* not fatal, try with other tables */
goto end;
}
/* don't log any records for this work */
......@@ -1295,7 +1263,7 @@ static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id)
/*
We don't set info->s->id, it would be useless (no logging in REDO phase);
if you change that, know that some records in REDO phase call
_ma_update_create_rename_lsn() which resets info->s->id.
_ma_update_state_lsns() which resets info->s->id.
*/
tprint(tracef, ", opened");
error= 0;
......@@ -1325,11 +1293,15 @@ prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)
{
/*
Table was skipped at open time (because later dropped/renamed, not
transactional, or create_rename_lsn newer than LOGREC_FILE_ID); it is
not an error.
transactional, or create_rename_lsn newer than LOGREC_FILE_ID), or
record was skipped due to skip_redo_lsn; it is not an error.
*/
return 0;
}
/*
Note that REDO is per page, we still consider it if its transaction
committed long ago and is unknown.
*/
/*
If REDO's LSN is > page's LSN (read from disk), we are going to modify the
page and change its LSN. The normal runtime code stores the UNDO's LSN
......@@ -1434,7 +1406,7 @@ prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS)
}
buff= log_record_buffer.str;
if (_ma_apply_redo_insert_row_blobs(info, current_group_end_lsn,
buff + FILEID_STORE_SIZE))
buff, rec->lsn))
goto end;
error= 0;
......@@ -1604,10 +1576,56 @@ end:
}
#define set_undo_lsn_for_active_trans(TRID, LSN) do { \
all_active_trans[TRID].undo_lsn= LSN; \
if (all_active_trans[TRID].first_undo_lsn == LSN_IMPOSSIBLE) \
all_active_trans[TRID].first_undo_lsn= LSN; } while (0)
prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
return 0;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
eprint(tracef, "Failed to read record");
goto end;
}
if (cmp_translog_addr(rec->lsn, checkpoint_start) >= 0)
{
/*
Record is potentially after the bitmap flush made by Checkpoint, so has
to be replayed. It may overwrite a more recent state but that will be
corrected by all upcoming REDOs for data pages.
If the condition is false, we must not apply the record: it is unneeded
and nocive (may not be corrected as REDOs can be skipped due to
dirty-pages list).
*/
if (_ma_apply_redo_bitmap_new_page(info, current_group_end_lsn,
log_record_buffer.str +
FILEID_STORE_SIZE))
goto end;
}
error= 0;
end:
return error;
}
static inline void set_undo_lsn_for_active_trans(uint16 short_trid, LSN lsn)
{
if (all_active_trans[short_trid].long_trid == 0)
{
/* transaction unknown, so has committed or fully rolled back long ago */
return;
}
all_active_trans[short_trid].undo_lsn= lsn;
if (all_active_trans[short_trid].first_undo_lsn == LSN_IMPOSSIBLE)
all_active_trans[short_trid].first_undo_lsn= lsn;
}
prototype_redo_exec_hook(UNDO_ROW_INSERT)
{
......@@ -1619,8 +1637,8 @@ prototype_redo_exec_hook(UNDO_ROW_INSERT)
{
/*
Note that we set undo_lsn anyway. So that if the transaction is later
rolled back, this UNDO is tried for execution and we get an error (as it
would then be abnormal that info==NULL).
rolled back, this UNDO is tried for execution and we get a warning (as
it would then be abnormal that info==NULL).
*/
return 0;
}
......@@ -1815,6 +1833,17 @@ prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
}
prototype_redo_exec_hook(UNDO_BULK_INSERT_WITH_REPAIR)
{
/*
If the repair finished it wrote and sync the state. If it didn't finish,
we are going to empty the table and that will fix the state.
*/
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
return 0;
}
prototype_redo_exec_hook(COMMIT)
{
uint16 sid= rec->short_trid;
......@@ -1908,6 +1937,8 @@ prototype_redo_exec_hook(CLR_END)
page * share->block_size);
break;
}
case LOGREC_UNDO_BULK_INSERT_WITH_REPAIR:
break;
default:
DBUG_ASSERT(0);
}
......@@ -1923,6 +1954,20 @@ prototype_redo_exec_hook(CLR_END)
}
/**
In some cases we have to skip execution of an UNDO record during the UNDO
phase.
*/
static void skip_undo_record(LSN previous_undo_lsn, TRN *trn)
{
trn->undo_lsn= previous_undo_lsn;
if (previous_undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
skipped_undo_phase++;
}
prototype_undo_exec_hook(UNDO_ROW_INSERT)
{
my_bool error;
......@@ -1937,9 +1982,11 @@ prototype_undo_exec_hook(UNDO_ROW_INSERT)
Unlike for REDOs, if the table was skipped it is abnormal; we have a
transaction to rollback which used this table, as it is not rolled back
it was supposed to hold this table and so the table should still be
there.
there. Skip it (user may have repaired the table with maria_chk because
it was so badly corrupted that a previous recovery failed) but warn.
*/
return 1;
skip_undo_record(previous_undo_lsn, trn);
return 0;
}
share= info->s;
share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
......@@ -1973,7 +2020,7 @@ prototype_undo_exec_hook(UNDO_ROW_INSERT)
/* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
tprint(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n",
LSN_IN_PARTS(previous_undo_lsn));
LSN_IN_PARTS(trn->undo_lsn));
return error;
}
......@@ -1986,7 +2033,10 @@ prototype_undo_exec_hook(UNDO_ROW_DELETE)
MARIA_SHARE *share;
if (info == NULL)
return 1;
{
skip_undo_record(previous_undo_lsn, trn);
return 0;
}
share= info->s;
share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
......@@ -2009,7 +2059,7 @@ prototype_undo_exec_hook(UNDO_ROW_DELETE)
(LSN_STORE_SIZE + FILEID_STORE_SIZE));
info->trn= 0;
tprint(tracef, " rows' count %lu\n undo_lsn now LSN (%lu,0x%lx)\n",
(ulong)share->state.state.records, LSN_IN_PARTS(previous_undo_lsn));
(ulong)share->state.state.records, LSN_IN_PARTS(trn->undo_lsn));
return error;
}
......@@ -2022,7 +2072,10 @@ prototype_undo_exec_hook(UNDO_ROW_UPDATE)
MARIA_SHARE *share;
if (info == NULL)
return 1;
{
skip_undo_record(previous_undo_lsn, trn);
return 0;
}
share= info->s;
share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
......@@ -2045,7 +2098,7 @@ prototype_undo_exec_hook(UNDO_ROW_UPDATE)
(LSN_STORE_SIZE + FILEID_STORE_SIZE));
info->trn= 0;
tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n",
LSN_IN_PARTS(previous_undo_lsn));
LSN_IN_PARTS(trn->undo_lsn));
return error;
}
......@@ -2059,13 +2112,8 @@ prototype_undo_exec_hook(UNDO_KEY_INSERT)
if (info == NULL)
{
/*
Unlike for REDOs, if the table was skipped it is abnormal; we have a
transaction to rollback which used this table, as it is not rolled back
it was supposed to hold this table and so the table should still be
there.
*/
return 1;
skip_undo_record(previous_undo_lsn, trn);
return 0;
}
share= info->s;
......@@ -2091,7 +2139,7 @@ prototype_undo_exec_hook(UNDO_KEY_INSERT)
info->trn= 0;
/* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n",
LSN_IN_PARTS(previous_undo_lsn));
LSN_IN_PARTS(trn->undo_lsn));
return error;
}
......@@ -2105,13 +2153,8 @@ prototype_undo_exec_hook(UNDO_KEY_DELETE)
if (info == NULL)
{
/*
Unlike for REDOs, if the table was skipped it is abnormal; we have a
transaction to rollback which used this table, as it is not rolled back
it was supposed to hold this table and so the table should still be
there.
*/
return 1;
skip_undo_record(previous_undo_lsn, trn);
return 0;
}
share= info->s;
......@@ -2133,11 +2176,11 @@ prototype_undo_exec_hook(UNDO_KEY_DELETE)
log_record_buffer.str + LSN_STORE_SIZE +
FILEID_STORE_SIZE,
rec->record_length - LSN_STORE_SIZE -
FILEID_STORE_SIZE);
FILEID_STORE_SIZE, FALSE);
info->trn= 0;
/* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n",
LSN_IN_PARTS(previous_undo_lsn));
LSN_IN_PARTS(trn->undo_lsn));
return error;
}
......@@ -2151,13 +2194,8 @@ prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
if (info == NULL)
{
/*
Unlike for REDOs, if the table was skipped it is abnormal; we have a
transaction to rollback which used this table, as it is not rolled back
it was supposed to hold this table and so the table should still be
there.
*/
return 1;
skip_undo_record(previous_undo_lsn, trn);
return 0;
}
share= info->s;
......@@ -2177,17 +2215,43 @@ prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
info->trn= trn;
error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
log_record_buffer.str + LSN_STORE_SIZE +
FILEID_STORE_SIZE + PAGE_STORE_SIZE,
FILEID_STORE_SIZE,
rec->record_length - LSN_STORE_SIZE -
FILEID_STORE_SIZE - PAGE_STORE_SIZE);
FILEID_STORE_SIZE, TRUE);
info->trn= 0;
/* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n",
LSN_IN_PARTS(previous_undo_lsn));
LSN_IN_PARTS(trn->undo_lsn));
return error;
}
prototype_undo_exec_hook(UNDO_BULK_INSERT_WITH_REPAIR)
{
my_bool error;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
LSN previous_undo_lsn= lsn_korr(rec->header);
MARIA_SHARE *share;
if (info == NULL)
{
skip_undo_record(previous_undo_lsn, trn);
return 0;
}
share= info->s;
share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
info->trn= trn;
error= _ma_apply_undo_bulk_insert_with_repair(info, previous_undo_lsn);
info->trn= 0;
/* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
tprint(tracef, " undo_lsn now LSN (%lu,0x%lx)\n",
LSN_IN_PARTS(trn->undo_lsn));
return error;
}
static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
{
......@@ -2226,6 +2290,7 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
install_redo_exec_hook(REDO_INDEX);
install_redo_exec_hook(REDO_INDEX_NEW_PAGE);
install_redo_exec_hook(REDO_INDEX_FREE_PAGE);
install_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
install_redo_exec_hook(UNDO_ROW_INSERT);
install_redo_exec_hook(UNDO_ROW_DELETE);
install_redo_exec_hook(UNDO_ROW_UPDATE);
......@@ -2244,6 +2309,8 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
install_redo_exec_hook_shared(REDO_NEW_ROW_HEAD, REDO_INSERT_ROW_HEAD);
/* REDO_NEW_ROW_TAIL shares entry with REDO_INSERT_ROW_TAIL */
install_redo_exec_hook_shared(REDO_NEW_ROW_TAIL, REDO_INSERT_ROW_TAIL);
install_redo_exec_hook(UNDO_BULK_INSERT_WITH_REPAIR);
install_undo_exec_hook(UNDO_BULK_INSERT_WITH_REPAIR);
current_group_end_lsn= LSN_IMPOSSIBLE;
#ifndef DBUG_OFF
......@@ -2404,6 +2471,7 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
if (recovery_message_printed == REC_MSG_REDO)
{
fprintf(stderr, " 100%%");
fflush(stderr);
procent_printed= 1;
}
return 0;
......@@ -2441,6 +2509,8 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
if (prepare_for_undo_phase && trnman_init(max_long_trid))
return -1;
trns_created= TRUE;
for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
{
TrID long_trid= all_active_trans[sid].long_trid;
......@@ -2457,8 +2527,17 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
llstr(long_trid, llbuf);
tprint(tracef, "Transaction long_trid %s short_trid %u uncommitted\n",
llbuf, sid);
/* dummy_transaction_object serves only for DDLs */
DBUG_ASSERT(long_trid != 0);
/*
dummy_transaction_object serves only for DDLs, where there is never a
rollback or incomplete group. And unknown transactions (which have
long_trid==0) should have undo_lsn==LSN_IMPOSSIBLE.
*/
if (long_trid ==0)
{
eprint(tracef, "Transaction with long_trid 0 should not roll back");
ALERT_USER();
return -1;
}
if (prepare_for_undo_phase)
{
if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL)
......@@ -2541,7 +2620,10 @@ static int run_undo_phase(uint uncommitted)
char llbuf[22];
TRN *trn;
if (recovery_message_printed == REC_MSG_UNDO)
{
fprintf(stderr, " %u", uncommitted);
fflush(stderr);
}
if ((uncommitted--) == 0)
break;
trn= trnman_get_any_trn();
......@@ -2584,6 +2666,27 @@ static int run_undo_phase(uint uncommitted)
}
/**
In case of error in recovery, deletes all transactions from the transaction
manager so that this module does not assert.
@note no checkpoint should be taken as those transactions matter for the
next recovery (they still haven't been properly dealt with).
*/
static void delete_all_transactions()
{
for( ; ; )
{
TRN *trn= trnman_get_any_trn();
if (trn == NULL)
break;
trn->undo_lsn= trn->first_undo_lsn= LSN_IMPOSSIBLE;
trnman_rollback_trn(trn); /* ignore error */
}
}
/**
@brief re-enables transactionality, updates is_of_horizon
......@@ -2623,12 +2726,12 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
uint16 sid;
pgcache_page_no_t page;
MARIA_HA *info;
MARIA_SHARE *share;
char llbuf[22];
my_bool index_page_redo_entry= 0;
my_bool index_page_redo_entry= FALSE, page_redo_entry= FALSE;
print_redo_phase_progress(rec->lsn);
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
switch (rec->type) {
/* not all REDO records have a page: */
case LOGREC_REDO_INDEX_NEW_PAGE:
......@@ -2642,13 +2745,19 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
case LOGREC_REDO_PURGE_ROW_TAIL:
case LOGREC_REDO_NEW_ROW_HEAD:
case LOGREC_REDO_NEW_ROW_TAIL:
case LOGREC_REDO_FREE_HEAD_OR_TAIL:
page_redo_entry= TRUE;
page= page_korr(rec->header + FILEID_STORE_SIZE);
llstr(page, llbuf);
tprint(tracef, " For page %s of table of short id %u", llbuf, sid);
break;
/* other types could print their info here too */
/*
For REDO_FREE_BLOCKS, no need to look at dirty pages list: it does not
read data pages, only reads/modifies bitmap page(s) which is cheap.
*/
default:
break;
}
tprint(tracef, " For table of short id %u", sid);
info= all_tables[sid].info;
#ifndef DBUG_OFF
DBUG_ASSERT(current_group_table == NULL || current_group_table == info);
......@@ -2659,8 +2768,10 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
tprint(tracef, ", table skipped, so skipping record\n");
return NULL;
}
tprint(tracef, ", '%s'", info->s->open_file_name);
if (cmp_translog_addr(rec->lsn, info->s->lsn_of_file_id) <= 0)
share= info->s;
tprint(tracef, ", '%s'", share->open_file_name);
DBUG_ASSERT(in_redo_phase);
if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
{
/*
This can happen only if processing a record before the checkpoint
......@@ -2673,34 +2784,31 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
DBUG_ASSERT(cmp_translog_addr(rec->lsn, checkpoint_start) < 0);
tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent"
" than record, skipping record",
LSN_IN_PARTS(info->s->lsn_of_file_id));
LSN_IN_PARTS(share->lsn_of_file_id));
return NULL;
}
if (cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
{
/* probably a bulk insert repair */
tprint(tracef, ", has skip_redo_lsn (%lu,0x%lx) more recent than"
" record, skipping record\n",
LSN_IN_PARTS(share->state.skip_redo_lsn));
return NULL;
}
/* detect if an open instance of a dropped table (internal bug) */
DBUG_ASSERT(info->s->last_version != 0);
if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
DBUG_ASSERT(share->last_version != 0);
if (page_redo_entry)
{
/*
64-bit key is formed like this:
Most significant byte: 0
Next byte: 0 if data page, 1 if index page
Next 2 bytes: table's short id
Next 4 bytes: page number
Consult dirty pages list.
REDO_INSERT_ROW_BLOBS will consult list by itself, as it covers several
pages.
*/
uint64 file_and_page_id=
(((uint64)((index_page_redo_entry << 16) | sid)) << 32) | page;
struct st_dirty_page *dirty_page= (struct st_dirty_page *)
hash_search(&all_dirty_pages,
(uchar *)&file_and_page_id, sizeof(file_and_page_id));
DBUG_PRINT("info", ("in dirty pages list: %d", dirty_page != NULL));
if ((dirty_page == NULL) ||
cmp_translog_addr(rec->lsn, dirty_page->rec_lsn) < 0)
{
tprint(tracef, ", ignoring because of dirty_pages list\n");
tprint(tracef, " page %s", llbuf);
if (_ma_redo_not_needed_for_page(sid, rec->lsn, page,
index_page_redo_entry))
return NULL;
}
}
/*
So we are going to read the page, and if its LSN is older than the
record's we will modify the page
......@@ -2716,12 +2824,14 @@ static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
{
uint16 sid;
MARIA_HA *info;
MARIA_SHARE *share;
sid= fileid_korr(rec->header + LSN_STORE_SIZE);
tprint(tracef, " For table of short id %u", sid);
info= all_tables[sid].info;
#ifndef DBUG_OFF
DBUG_ASSERT(current_group_table == NULL || current_group_table == info);
DBUG_ASSERT(!in_redo_phase ||
current_group_table == NULL || current_group_table == info);
current_group_table= info;
#endif
if (info == NULL)
......@@ -2729,15 +2839,25 @@ static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
tprint(tracef, ", table skipped, so skipping record\n");
return NULL;
}
tprint(tracef, ", '%s'", info->s->open_file_name);
if (cmp_translog_addr(rec->lsn, info->s->lsn_of_file_id) <= 0)
share= info->s;
tprint(tracef, ", '%s'", share->open_file_name);
if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
{
tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent"
" than record, skipping record",
LSN_IN_PARTS(info->s->lsn_of_file_id));
LSN_IN_PARTS(share->lsn_of_file_id));
return NULL;
}
if (in_redo_phase &&
cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
{
/* probably a bulk insert repair */
tprint(tracef, ", has skip_redo_lsn (%lu,0x%lx) more recent than"
" record, skipping record\n",
LSN_IN_PARTS(share->state.skip_redo_lsn));
return NULL;
}
DBUG_ASSERT(info->s->last_version != 0);
DBUG_ASSERT(share->last_version != 0);
_ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
tprint(tracef, ", applying record\n");
return info;
......@@ -2963,7 +3083,10 @@ static int close_all_tables(void)
for (list_element= maria_open_list ; ; list_element= next_open)
{
if (recovery_message_printed == REC_MSG_FLUSH)
{
fprintf(stderr, " %u", count--);
fflush(stderr);
}
if (list_element == NULL)
break;
next_open= list_element->next;
......@@ -3114,6 +3237,7 @@ static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
{
print_preamble();
fprintf(stderr, "recovered pages: 0%%");
fflush(stderr);
procent_printed= 1;
recovery_message_printed= REC_MSG_REDO;
}
......@@ -3137,6 +3261,7 @@ static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
{
percentage_printed= percentage_done;
fprintf(stderr, " %u%%", percentage_done);
fflush(stderr);
procent_printed= 1;
}
}
......
/* Copyright (C) 2006,2007,2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
Q: Why isn't ma_recovery_util.c simply moved to ma_recovery.c ?
A: ma_recovery.c, because it invokes objects from ma_check.c (like
maria_chk_init()) causes the following problem:
if a source file a.c of a program invokes a function defined in
ma_recovery.c, then a.o depends on ma_recovery.o which depends on
ma_check.o: linker thus brings in ma_check.o. That brings in the
dependencies of ma_check.o which are definitions of _ma_check_print_info()
etc; if a.o does not define them then the ones of ha_maria.o are used
i.e. ha_maria.o is linked into the program, and this brings in dependencies
of ha_maria.o on mysqld.o into the program's linking which thus fails, as
the program is not linked with mysqld.o.
Thus, while several functions defined in ma_recovery.c could be useful to
other files, they cannot be used by them.
So we are going to gradually move a great share of ma_recovery.c's exported
functions into the present file, to isolate the problematic components and
avoid the problem.
*/
#include "maria_def.h"
HASH all_dirty_pages;
struct st_dirty_page /* used only in the REDO phase */
{
uint64 file_and_page_id;
LSN rec_lsn;
};
/*
LSN after which dirty pages list does not apply. Can be slightly before
when ma_checkpoint_execute() started.
*/
LSN checkpoint_start= LSN_IMPOSSIBLE;
/** @todo looks like duplicate of recovery_message_printed */
my_bool procent_printed;
FILE *tracef; /**< trace file for debugging */
/** @brief Prints to a trace file if it is not NULL */
void tprint(FILE *trace_file __attribute__ ((unused)),
const char *format __attribute__ ((unused)), ...)
{
va_list args;
va_start(args, format);
DBUG_PRINT("info", ("%s", format));
if (trace_file != NULL)
{
if (procent_printed)
{
procent_printed= 0;
fputc('\n', trace_file);
}
vfprintf(trace_file, format, args);
}
va_end(args);
}
void eprint(FILE *trace_file __attribute__ ((unused)),
const char *format __attribute__ ((unused)), ...)
{
va_list args;
va_start(args, format);
DBUG_PRINT("error", ("%s", format));
if (!trace_file)
trace_file= stderr;
if (procent_printed)
{
/* In silent mode, print on another line than the 0% 10% 20% line */
procent_printed= 0;
fputc('\n', trace_file);
}
vfprintf(trace_file , format, args);
fputc('\n', trace_file);
if (trace_file != stderr)
{
va_start(args, format);
my_printv_error(HA_ERR_INITIALIZATION, format, MYF(0), args);
}
va_end(args);
fflush(trace_file);
}
/**
Tells if the dirty pages list found in checkpoint record allows to ignore a
REDO for a certain page.
@param shortid short id of the table
@param lsn REDO record's LSN
@param page page number
@param index TRUE if index page, FALSE if data page
*/
my_bool _ma_redo_not_needed_for_page(uint16 shortid, LSN lsn,
pgcache_page_no_t page,
my_bool index)
{
if (cmp_translog_addr(lsn, checkpoint_start) < 0)
{
/*
64-bit key is formed like this:
Most significant byte: 0
Next byte: 0 if data page, 1 if index page
Next 2 bytes: table's short id
Next 4 bytes: page number
*/
uint64 file_and_page_id=
(((uint64)((index << 16) | shortid)) << 32) | page;
struct st_dirty_page *dirty_page= (struct st_dirty_page *)
hash_search(&all_dirty_pages,
(uchar *)&file_and_page_id, sizeof(file_and_page_id));
DBUG_PRINT("info", ("in dirty pages list: %d", dirty_page != NULL));
if ((dirty_page == NULL) ||
cmp_translog_addr(lsn, dirty_page->rec_lsn) < 0)
{
tprint(tracef, ", ignoring because of dirty_pages list\n");
return TRUE;
}
}
return FALSE;
}
/* Copyright (C) 2006,2007,2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
struct st_dirty_page /* used only in the REDO phase */
{
uint64 file_and_page_id;
LSN rec_lsn;
};
extern HASH all_dirty_pages;
/*
LSN after which dirty pages list does not apply. Can be slightly before
when ma_checkpoint_execute() started.
*/
extern LSN checkpoint_start;
extern my_bool procent_printed;
extern FILE *tracef;
my_bool _ma_redo_not_needed_for_page(uint16 shortid, LSN lsn,
pgcache_page_no_t page,
my_bool index);
void tprint(FILE *trace_file, const char *format, ...)
ATTRIBUTE_FORMAT(printf, 2, 3);
void eprint(FILE *trace_file, const char *format, ...)
ATTRIBUTE_FORMAT(printf, 2, 3);
......@@ -95,7 +95,7 @@ int maria_rename(const char *old_name, const char *new_name)
store LSN into file, needed for Recovery to not be confused if a
RENAME happened (applying REDOs to the wrong table).
*/
if (_ma_update_create_rename_lsn(share, lsn, TRUE))
if (_ma_update_state_lsns(share, lsn, TRUE, TRUE))
{
maria_close(info);
DBUG_RETURN(1);
......
......@@ -25,7 +25,7 @@
LIST *maria_open_list=0;
uchar maria_file_magic[]=
{ (uchar) 254, (uchar) 254, (uchar) 9, '\002', };
{ (uchar) 254, (uchar) 254, (uchar) 9, '\003', };
uchar maria_pack_file_magic[]=
{ (uchar) 254, (uchar) 254, (uchar) 10, '\001', };
/* Unique number for this maria instance */
......
......@@ -893,6 +893,11 @@ int main(int argc, char *argv[])
maria_scan_end(file);
goto err;
}
if (testflag == 6)
goto end;
if (checkpoint == 6 && ma_checkpoint_execute(CHECKPOINT_MEDIUM, FALSE))
goto err;
end:
maria_scan_end(file);
if (die_in_middle_of_transaction)
......
......@@ -137,7 +137,7 @@ do
for test_undo in 1 2 3 4
do
# first iteration tests rollback of insert, second tests rollback of delete
set -- "ma_test1 $silent -M -T -c -N $blobs -H1" "--testflag=1" "--testflag=2 --test-undo=" "ma_test1 $silent -M -T -c -N $blobs -H2" "--testflag=3" "--testflag=4 --test-undo=" "ma_test1 $silent -M -T -c -N $blobs -H2 " "--testflag=2" "--testflag=3 --test-undo=" "ma_test2 $silent -L -K -W -P -M -T -c $blobs -H1" "-t1" "-t2 -A" "ma_test2 $silent -L -K -W -P -M -T -c $blobs -H1" "-t1" "-t4 -A"
set -- "ma_test1 $silent -M -T -c -N $blobs -H1" "--testflag=1" "--testflag=2 --test-undo=" "ma_test1 $silent -M -T -c -N $blobs -H2" "--testflag=3" "--testflag=4 --test-undo=" "ma_test1 $silent -M -T -c -N $blobs -H2 " "--testflag=2" "--testflag=3 --test-undo=" "ma_test2 $silent -L -K -W -P -M -T -c $blobs -H1" "-t1" "-t2 -A" "ma_test2 $silent -L -K -W -P -M -T -c $blobs -H1" "-t1" "-t6 -A"
# -N (create NULL fields) is needed because --test-undo adds it anyway
while [ $# != 0 ]
do
......
......@@ -75,7 +75,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t4 -A1 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t6 -A1 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -162,7 +162,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t4 -A2 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t6 -A2 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -249,7 +249,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t4 -A3 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t6 -A3 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -336,7 +336,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t4 -A4 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -t6 -A4 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -423,7 +423,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t4 -A1 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t6 -A1 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -510,7 +510,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t4 -A2 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t6 -A2 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -597,7 +597,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t4 -A3 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t6 -A3 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -684,7 +684,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t4 -A4 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -t6 -A4 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -771,7 +771,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t4 -A1 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t6 -A1 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -858,7 +858,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t4 -A2 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t6 -A2 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -945,7 +945,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t4 -A3 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t6 -A3 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -1032,7 +1032,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t4 -A4 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -H1 -t6 -A4 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -1119,7 +1119,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t4 -A1 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t6 -A1 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -1206,7 +1206,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t4 -A2 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t6 -A2 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -1293,7 +1293,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t4 -A3 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t6 -A3 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......@@ -1380,7 +1380,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
> Status: changed
========DIFF END=======
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t1 (commit at end)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t4 -A4 (additional aborted work)
TEST WITH ma_test2 -s -L -K -W -P -M -T -c -b32768 -H1 -t6 -A4 (additional aborted work)
Dying on request without maria_commit()/maria_close()
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
......
......@@ -71,6 +71,8 @@ static const char *record_formats[]=
};
static const char *maria_stats_method_str="nulls_unequal";
static char default_open_errmsg[]= "%d when opening MARIA-table '%s'";
static char default_close_errmsg[]= "%d when closing MARIA-table '%s'";
static void get_options(int *argc,char * * *argv);
static void print_version(void);
......@@ -84,6 +86,7 @@ static int sort_record_index(MARIA_SORT_PARAM *sort_param, MARIA_HA *info,
MARIA_KEYDEF *keyinfo,
my_off_t page, uchar *buff,uint sortkey,
File new_file, my_bool update_index);
static my_bool write_log_record(HA_CHECK *param);
HA_CHECK check_param;
......@@ -1105,7 +1108,7 @@ static int maria_chk(HA_CHECK *param, char *filename)
*/
if (share->base.born_transactional)
share->state.create_rename_lsn= share->state.is_of_horizon=
LSN_REPAIRED_BY_MARIA_CHK;
share->state.skip_redo_lsn= LSN_REPAIRED_BY_MARIA_CHK;
}
if (!error && (param->testflag & T_REP_ANY))
{
......@@ -1218,7 +1221,6 @@ static int maria_chk(HA_CHECK *param, char *filename)
(param->testflag & (T_EXTEND | T_MEDIUM)))
error|=maria_chk_data_link(param, info,
test(param->testflag & T_EXTEND));
error|= _ma_flush_table_files_after_repair(param, info);
VOID(end_io_cache(&param->read_cache));
}
if (!error)
......@@ -1258,8 +1260,7 @@ end2:
end_pagecache(maria_pagecache, 1);
if (maria_close(info))
{
_ma_check_print_error(param,"%d when closing MARIA-table '%s'",
my_errno,filename);
_ma_check_print_error(param, default_close_errmsg, my_errno, filename);
DBUG_RETURN(1);
}
if (error == 0)
......@@ -1272,6 +1273,12 @@ end2:
error|=maria_change_to_newfile(filename,MARIA_NAME_IEXT,INDEX_TMP_EXT,
MYF(0));
}
if (opt_transaction_logging &&
share->base.born_transactional && !error &&
(param->testflag & (T_REP_ANY | T_SORT_RECORDS | T_SORT_INDEX |
T_ZEROFILL)))
error= write_log_record(param);
VOID(fflush(stdout)); VOID(fflush(stderr));
if (param->error_printed)
{
......@@ -1745,7 +1752,7 @@ err:
my_free(sort_info.buff,MYF(MY_ALLOW_ZERO_PTR));
sort_info.buff=0;
share->state.sortkey=sort_key;
DBUG_RETURN(_ma_flush_table_files_after_repair(param, info) | got_error);
DBUG_RETURN(got_error);
} /* sort_records */
......@@ -1843,4 +1850,29 @@ err:
} /* sort_record_index */
static my_bool write_log_record(HA_CHECK *param)
{
/*
Now that all operations including O_NEW_DATA|INDEX are successfully
done, we can write a log record.
*/
MARIA_HA *info= maria_open(param->isam_file_name, O_RDWR, 0);
if (info == NULL)
_ma_check_print_error(param, default_open_errmsg, my_errno,
param->isam_file_name);
else
{
if (write_log_record_for_repair(param, info))
_ma_check_print_error(param, "%d when writing log record for"
" MARIA-table '%s'", my_errno,
param->isam_file_name);
else if (maria_close(info))
_ma_check_print_error(param, default_close_errmsg, my_errno,
param->isam_file_name);
else
return FALSE;
}
return TRUE;
}
#include "ma_check_standalone.h"
......@@ -109,9 +109,28 @@ typedef struct st_maria_state_info
uint sortkey; /* sorted by this key (not used) */
uint open_count;
uint changed; /* Changed since maria_chk */
LSN create_rename_lsn; /**< LSN when table was last created/renamed */
/**
Birthday of the table: no record in the log before this LSN should ever
be applied to the table. Updated when created, renamed, explicitely
repaired (REPAIR|OPTIMIZE TABLE, ALTER TABLE ENABLE KEYS, maria_chk).
*/
LSN create_rename_lsn;
/** @brief Log horizon when state was last updated on disk */
TRANSLOG_ADDRESS is_of_horizon;
/**
REDO phase should ignore any record before this LSN. UNDO phase
shouldn't, this is the difference with create_rename_lsn.
skip_redo_lsn >= create_rename_lsn.
The distinction is for these cases:
- after a repair at end of bulk insert (enabling indices), REDO phase
should skip the table but UNDO phase should not, so only skip_redo_lsn is
increased, not create_rename_lsn
- if one table is corrupted and so recovery fails, user may repair the
table with maria_chk and let recovery restart: that recovery should then
skip the repaired table even in the UNDO phase, so create_rename_lsn is
increased.
*/
LSN skip_redo_lsn;
/* the following isn't saved on disk */
uint state_diff_length; /* Should be 0 */
......@@ -121,7 +140,7 @@ typedef struct st_maria_state_info
#define MARIA_STATE_INFO_SIZE \
(24 + 2 + LSN_STORE_SIZE*2 + 4 + 11*8 + 4*4 + 8 + 3*4 + 5*8)
(24 + 2 + LSN_STORE_SIZE*3 + 4 + 11*8 + 4*4 + 8 + 3*4 + 5*8)
#define MARIA_FILE_OPEN_COUNT_OFFSET 0
#define MARIA_FILE_CHANGED_OFFSET 2
#define MARIA_FILE_CREATE_RENAME_LSN_OFFSET 4
......@@ -1054,6 +1073,7 @@ void _ma_check_print_warning _VARARGS((HA_CHECK *param, const char *fmt, ...))
ATTRIBUTE_FORMAT(printf, 2, 3);
void _ma_check_print_info _VARARGS((HA_CHECK *param, const char *fmt, ...))
ATTRIBUTE_FORMAT(printf, 2, 3);
my_bool write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info);
C_MODE_END
int _ma_flush_pending_blocks(MARIA_SORT_PARAM *param);
......@@ -1062,17 +1082,18 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param);
#ifdef THREAD
pthread_handler_t _ma_thr_find_all_keys(void *arg);
#endif
int _ma_flush_table_files_after_repair(HA_CHECK *param, MARIA_HA *info);
int _ma_sort_write_record(MARIA_SORT_PARAM *sort_param);
int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
size_t);
int _ma_sync_table_files(const MARIA_HA *info);
int _ma_initialize_data_file(MARIA_SHARE *share, File dfile);
int _ma_update_create_rename_lsn(MARIA_SHARE *share,
LSN lsn, my_bool do_sync);
int _ma_update_create_rename_lsn_sub(MARIA_SHARE *share,
LSN lsn, my_bool do_sync);
int _ma_update_state_lsns(MARIA_SHARE *share,
LSN lsn, my_bool do_sync, my_bool
update_create_rename_lsn);
int _ma_update_state_lsns_sub(MARIA_SHARE *share,
LSN lsn, my_bool do_sync, my_bool
update_create_rename_lsn);
void _ma_set_data_pagecache_callbacks(PAGECACHE_FILE *file,
MARIA_SHARE *share);
void _ma_set_index_pagecache_callbacks(PAGECACHE_FILE *file,
......@@ -1080,6 +1101,8 @@ void _ma_set_index_pagecache_callbacks(PAGECACHE_FILE *file,
void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
my_bool log_incomplete);
void _ma_reenable_logging_for_table(MARIA_HA *info);
my_bool write_log_record_for_bulk_insert_with_repair(MARIA_HA *info);
#define MARIA_NO_CRC_NORMAL_PAGE 0xffffffff
#define MARIA_NO_CRC_BITMAP_PAGE 0xfffffffe
......
......@@ -2977,7 +2977,7 @@ static int save_state(MARIA_HA *isam_file,PACK_MRG_INFO *mrg,
share->state.version=(ulong) time((time_t*) 0);
if (share->base.born_transactional)
share->state.create_rename_lsn= share->state.is_of_horizon=
LSN_REPAIRED_BY_MARIA_CHK;
share->state.skip_redo_lsn= LSN_REPAIRED_BY_MARIA_CHK;
if (! maria_is_all_keys_active(share->state.key_map, share->base.keys))
{
/*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment