Commit 13f45b16 authored by unknown's avatar unknown

WL#3072 Maria recovery:

fix for bug: if a crash happened right after writing a REDO like this:
REDO - UNDO - REDO*, then recovery would ignore the last REDO* (ok),
rollback: REDO - UNDO - REDO* - REDO - CLR, and a next recovery would
thus execute REDO* instead of skipping it again. Recovery now logs
LOGREC_INCOMPLETE_GROUP when it meets REDO* for the first time,
to draw a boundary and ensure it is always skipped. Tested by hand.
Note: ma_test_all fails "maria_chk: error: Key 1 - Found too many records"
not due to this patch (failed before).


BitKeeper/triggers/post-commit:
  no truncation of the commit mail, or how to review patches?
mysql-test/include/maria_verify_recovery.inc:
  let caller choose the statement used to crash (sometimes we
  want the crash to happen at special places)
mysql-test/t/maria-recovery.test:
  user of maria_verify_recovery.inc now specifies statement which the
  script should use for crashing.
storage/maria/ma_bitmap.c:
  it's easier to search for all places using functions from the bitmap
  module (like in ma_blockrec.c) if those exported functions all start
  with "_ma_bitmap": renaming some of them.
  Assertion that when we read a bitmap page, overwriting bitmap->map,
  we are not losing information (i.e. bitmap->changed is false).
storage/maria/ma_blockrec.c:
  update to new names. Adding code (disabled, protected by a #ifdef)
  that I use to test certain crash scenarios (more to come).
storage/maria/ma_blockrec.h:
  update to new names
storage/maria/ma_checkpoint.c:
  update to new names
storage/maria/ma_extra.c:
  update to new names
storage/maria/ma_loghandler.c:
  new LOGREC_INCOMPLETE_GROUP
storage/maria/ma_loghandler.h:
  new LOGREC_INCOMPLETE_GROUP
storage/maria/ma_recovery.c:
  When at the end of the REDO phase we have identified some transactions
  with incomplete REDO groups (REDOs without an UNDO or CLR_END),
  for each of them we log LOGREC_INCOMPLETE_GROUP. This way, the
  upcoming UNDO phase can write more records for such transaction,
  a future recovery won't pair the incomplete group with the
  CLR_END (as there is LOGREC_INCOMPLETE_GROUP to draw a boundary).
parent 12a81c39
......@@ -97,7 +97,7 @@ see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
EOF
bk changes -v -r+
bk rset -r+ -ah | bk gnupatch -h -dup -T
) | bk sed -e ${LIMIT}q > $BKROOT/BitKeeper/tmp/commits.txt
) > $BKROOT/BitKeeper/tmp/commits.txt
$SENDMAIL -t < $BKROOT/BitKeeper/tmp/commits.txt
......
......@@ -4,11 +4,11 @@
# API:
# 1) set $mms_tables to N, the script will cover tables mysqltest.t1,...tN
# 2) set $mvr_debug_option to the crash way
# 3) set $mvr_restore_old_snapshot to 1 if you want recovery to run on
# 3) set $mvr_crash_statement to the statement which will trigger a crash
# 4) set $mvr_restore_old_snapshot to 1 if you want recovery to run on
# an old copy of tables and of the control file, 0 for normal recovery.
# 4) set $mms_compare_physically to 1 if you want a physical byte-for-byte
# 5) set $mms_compare_physically to 1 if you want a physical byte-for-byte
# comparison with expected table. Checksum comparison is always done.
# "mvr" is a namespace for Maria_Verify_Recovery
connection admin;
......@@ -34,7 +34,7 @@ system echo wait-maria_verify_recovery.inc >> $MYSQLTEST_VARDIR/tmp/master0.expe
eval SET SESSION debug=$mvr_debug_option;
--echo * crashing mysqld intentionally
--error 2013
set global maria_checkpoint_interval=1; # this will crash (DBUG magic)
eval $mvr_crash_statement; # this will crash (DBUG magic)
if ($mvr_restore_old_snapshot)
{
......
......@@ -38,6 +38,7 @@ let $mvr_restore_old_snapshot=1;
# produce a physically identical table.
let $mms_compare_physically=1;
let $mvr_debug_option="+d,maria_flush_whole_log,maria_crash";
let $mvr_crash_statement= set global maria_checkpoint_interval=1;
# the script below will trigger recovery and compare checksums
-- source include/maria_verify_recovery.inc
let $mms_compare_physically=0;
......@@ -58,6 +59,7 @@ let $mvr_restore_old_snapshot=0;
# UNDO phase prevents physical comparison, normally,
# so we'll only use checksums to compare.
let $mms_compare_physically=0;
let $mvr_crash_statement= set global maria_checkpoint_interval=1;
# Note that we don't remove logs between iterations. Test is
# cumulative (each new recovery processes more log records than the previous).
......@@ -134,6 +136,7 @@ SELECT LENGTH(b) FROM t1 WHERE i=3;
let $mvr_restore_old_snapshot=1;
let $mms_compare_physically=0;
let $mvr_debug_option="+d,maria_flush_whole_log,maria_crash";
let $mvr_crash_statement= set global maria_checkpoint_interval=1;
-- source include/maria_verify_recovery.inc
SELECT LENGTH(b) FROM t1 WHERE i=3;
drop table t1;
......
......@@ -229,7 +229,7 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file)
my_bool _ma_bitmap_end(MARIA_SHARE *share)
{
my_bool res= _ma_flush_bitmap(share);
my_bool res= _ma_bitmap_flush(share);
pthread_mutex_destroy(&share->bitmap.bitmap_lock);
my_free((uchar*) share->bitmap.map, MYF(MY_ALLOW_ZERO_PTR));
share->bitmap.map= 0;
......@@ -241,11 +241,11 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share)
Send updated bitmap to the page cache
SYNOPSIS
_ma_flush_bitmap()
_ma_bitmap_flush()
share Share handler
NOTES
In the future, _ma_flush_bitmap() will be called to flush changes don't
In the future, _ma_bitmap_flush() will be called to flush changes don't
by this thread (ie, checking the changed flag is ok). The reason we
check it again in the mutex is that if someone else did a flush at the
same time, we don't have to do the write.
......@@ -255,10 +255,10 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share)
1 error
*/
my_bool _ma_flush_bitmap(MARIA_SHARE *share)
my_bool _ma_bitmap_flush(MARIA_SHARE *share)
{
my_bool res= 0;
DBUG_ENTER("_ma_flush_bitmap");
DBUG_ENTER("_ma_bitmap_flush");
if (share->bitmap.changed)
{
pthread_mutex_lock(&share->bitmap.bitmap_lock);
......@@ -585,6 +585,7 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
my_bool res;
DBUG_ENTER("_ma_read_bitmap_page");
DBUG_ASSERT(page % bitmap->pages_covered == 0);
DBUG_ASSERT(!bitmap->changed);
bitmap->page= page;
if (end_of_page > share->state.state.data_file_length)
......@@ -713,7 +714,7 @@ static my_bool _ma_change_bitmap_page(MARIA_HA *info,
RETURN
0 ok
1 error (either couldn't save old bitmap or read new one
1 error (either couldn't save old bitmap or read new one)
*/
static my_bool move_to_next_bitmap(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap)
......@@ -1824,7 +1825,7 @@ static uint get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
Mark all pages in a region as free
SYNOPSIS
_ma_reset_full_page_bits()
_ma_bitmap_reset_full_page_bits()
info Maria handler
bitmap Bitmap handler
page Start page
......@@ -1839,13 +1840,14 @@ static uint get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
1 Error (when reading bitmap)
*/
my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
my_bool _ma_bitmap_reset_full_page_bits(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
ulonglong page, uint page_count)
{
ulonglong bitmap_page;
uint offset, bit_start, bit_count, tmp;
uchar *data;
DBUG_ENTER("_ma_reset_full_page_bits");
DBUG_ENTER("_ma_bitmap_reset_full_page_bits");
DBUG_PRINT("enter", ("page: %lu page_count: %u", (ulong) page, page_count));
safe_mutex_assert_owner(&info->s->bitmap.bitmap_lock);
......@@ -1899,7 +1901,7 @@ my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
Set all pages in a region as used
SYNOPSIS
_ma_set_full_page_bits()
_ma_bitmap_set_full_page_bits()
info Maria handler
bitmap Bitmap handler
page Start page
......@@ -1914,13 +1916,14 @@ my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
1 Error (when reading bitmap)
*/
my_bool _ma_set_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
my_bool _ma_bitmap_set_full_page_bits(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
ulonglong page, uint page_count)
{
ulonglong bitmap_page;
uint offset, bit_start, bit_count, tmp;
uchar *data;
DBUG_ENTER("_ma_set_full_page_bits");
DBUG_ENTER("_ma_bitmap_set_full_page_bits");
DBUG_PRINT("enter", ("page: %lu page_count: %u", (ulong) page, page_count));
safe_mutex_assert_owner(&info->s->bitmap.bitmap_lock);
......@@ -2058,7 +2061,7 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks)
goto err;
}
if (!(block->used & BLOCKUSED_USED) &&
_ma_reset_full_page_bits(info, bitmap,
_ma_bitmap_reset_full_page_bits(info, bitmap,
block->page, page_count))
goto err;
}
......@@ -2105,7 +2108,8 @@ my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents,
continue; /* Not used extent */
if (pagecache_delete_pages(info->s->pagecache, &info->dfile, page,
page_count, PAGECACHE_LOCK_WRITE, 1) ||
_ma_reset_full_page_bits(info, &info->s->bitmap, page, page_count))
_ma_bitmap_reset_full_page_bits(info, &info->s->bitmap, page,
page_count))
{
pthread_mutex_unlock(&info->s->bitmap.bitmap_lock);
DBUG_RETURN(1);
......@@ -2122,7 +2126,7 @@ my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents,
SYNOPSIS
_ma_bitmap_set()
info Mari handler
info Maria handler
page Adress to page
head 1 if page is a head page, 0 if tail page
empty_space How much empty space there is on page
......
......@@ -1830,8 +1830,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
res= 1;
}
pthread_mutex_lock(&info->s->bitmap.bitmap_lock);
if (_ma_reset_full_page_bits(info, &info->s->bitmap, page,
count))
if (_ma_bitmap_reset_full_page_bits(info, &info->s->bitmap, page, count))
res= 1;
pthread_mutex_unlock(&info->s->bitmap.bitmap_lock);
DBUG_RETURN(res);
......@@ -2357,6 +2356,24 @@ static my_bool write_block_record(MARIA_HA *info,
goto disk_err;
}
#ifdef RECOVERY_EXTRA_DEBUG
if (info->trn->undo_lsn != LSN_IMPOSSIBLE)
{
/* Stop right after the REDO; testing incomplete log record groups */
DBUG_EXECUTE_IF("maria_flush_whole_log",
{
DBUG_PRINT("maria_flush_whole_log", ("now"));
translog_flush(translog_get_horizon());
});
DBUG_EXECUTE_IF("maria_crash",
{
DBUG_PRINT("maria_crash", ("now"));
fflush(DBUG_FILE);
abort();
});
}
#endif
/* Increase data file size, if extended */
position= (my_off_t) head_block->page * block_size;
if (info->state->data_file_length <= position)
......@@ -2677,6 +2694,24 @@ static my_bool allocate_and_write_block_record(MARIA_HA *info,
if (_ma_bitmap_find_place(info, row, blocks))
DBUG_RETURN(1); /* Error reading bitmap */
#ifdef RECOVERY_EXTRA_DEBUG
/* Send this over-allocated bitmap to disk and crash, see if recovers */
DBUG_EXECUTE_IF("maria_flush_bitmap",
{
DBUG_PRINT("maria_flush_bitmap", ("now"));
_ma_bitmap_flush(info->s);
_ma_flush_table_files(info, MARIA_FLUSH_DATA |
MARIA_FLUSH_INDEX,
FLUSH_KEEP, FLUSH_KEEP);
});
DBUG_EXECUTE_IF("maria_crash",
{
DBUG_PRINT("maria_crash", ("now"));
fflush(DBUG_FILE);
abort();
});
#endif
/* page will be pinned & locked by get_head_or_tail_page */
if (get_head_or_tail_page(info, blocks->block, info->buff,
row->space_on_head_page, HEAD_PAGE,
......@@ -4108,7 +4143,7 @@ my_bool _ma_scan_init_block_record(MARIA_HA *info)
We have to flush bitmap as we will read the bitmap from the page cache
while scanning rows
*/
DBUG_RETURN(_ma_flush_bitmap(info->s));
DBUG_RETURN(_ma_bitmap_flush(info->s));
}
......@@ -5329,7 +5364,7 @@ uint _ma_apply_redo_free_blocks(MARIA_HA *info,
/** @todo leave bitmap lock to the bitmap code... */
pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_reset_full_page_bits(info, &share->bitmap, start_page,
res= _ma_bitmap_reset_full_page_bits(info, &share->bitmap, start_page,
page_range);
pthread_mutex_unlock(&share->bitmap.bitmap_lock);
if (res)
......@@ -5404,7 +5439,7 @@ uint _ma_apply_redo_free_head_or_tail(MARIA_HA *info, LSN lsn,
}
/** @todo leave bitmap lock to the bitmap code... */
pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_reset_full_page_bits(info, &share->bitmap, page, 1);
res= _ma_bitmap_reset_full_page_bits(info, &share->bitmap, page, 1);
pthread_mutex_unlock(&share->bitmap.bitmap_lock);
if (res)
DBUG_RETURN(res);
......@@ -5553,7 +5588,7 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
}
/** @todo leave bitmap lock to the bitmap code... */
pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_set_full_page_bits(info, &share->bitmap, start_page,
res= _ma_bitmap_set_full_page_bits(info, &share->bitmap, start_page,
page_range);
pthread_mutex_unlock(&share->bitmap.bitmap_lock);
if (res)
......
......@@ -170,7 +170,7 @@ my_bool _ma_compare_block_record(register MARIA_HA *info,
/* ma_bitmap.c */
my_bool _ma_bitmap_init(MARIA_SHARE *share, File file);
my_bool _ma_bitmap_end(MARIA_SHARE *share);
my_bool _ma_flush_bitmap(MARIA_SHARE *share);
my_bool _ma_bitmap_flush(MARIA_SHARE *share);
void _ma_bitmap_reset_cache(MARIA_SHARE *share);
my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row,
MARIA_BITMAP_BLOCKS *result_blocks);
......@@ -179,9 +179,11 @@ my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents,
uint count);
my_bool _ma_bitmap_set(MARIA_HA *info, ulonglong pos, my_bool head,
uint empty_space);
my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
my_bool _ma_bitmap_reset_full_page_bits(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
ulonglong page, uint page_count);
my_bool _ma_set_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
my_bool _ma_bitmap_set_full_page_bits(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
ulonglong page, uint page_count);
uint _ma_free_size_to_head_pattern(MARIA_FILE_BITMAP *bitmap, uint size);
my_bool _ma_bitmap_find_new_place(MARIA_HA *info, MARIA_ROW *new_row,
......
......@@ -1070,7 +1070,7 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
over-allocation if crash); see also _ma_change_bitmap_page().
*/
sync_error|=
_ma_flush_bitmap(share); /* after that, all is in page cache */
_ma_bitmap_flush(share); /* after that, all is in page cache */
DBUG_ASSERT(share->pagecache == maria_pagecache);
}
if (share->in_checkpoint & MARIA_CHECKPOINT_SHOULD_FREE_ME)
......
......@@ -572,7 +572,7 @@ int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index,
}
if (share->data_file_type == BLOCK_RECORD)
{
if(_ma_flush_bitmap(share) ||
if(_ma_bitmap_flush(share) ||
flush_pagecache_blocks(share->pagecache, &info->dfile,
flush_type_for_data))
goto err;
......
......@@ -531,6 +531,11 @@ static LOG_DESC INIT_LOGREC_INCOMPLETE_LOG=
NULL, NULL, NULL, 0,
"incomplete_log", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_INCOMPLETE_GROUP=
{LOGRECTYPE_FIXEDLENGTH, 0, 0,
NULL, NULL, NULL, 0,
"incomplete_group", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
const myf log_write_flags= MY_WME | MY_NABP | MY_WAIT_IF_FULL;
static void loghandler_init()
......@@ -610,12 +615,14 @@ static void loghandler_init()
INIT_LOGREC_LONG_TRANSACTION_ID;
log_record_type_descriptor[LOGREC_INCOMPLETE_LOG]=
INIT_LOGREC_INCOMPLETE_LOG;
for (i= LOGREC_INCOMPLETE_LOG + 1;
log_record_type_descriptor[LOGREC_INCOMPLETE_GROUP]=
INIT_LOGREC_INCOMPLETE_GROUP;
for (i= LOGREC_INCOMPLETE_GROUP + 1;
i < LOGREC_NUMBER_OF_TYPES;
i++)
log_record_type_descriptor[i].rclass= LOGRECTYPE_NOT_ALLOWED;
DBUG_EXECUTE("info",
check_translog_description_table(LOGREC_INCOMPLETE_LOG););
check_translog_description_table(LOGREC_INCOMPLETE_GROUP););
};
......
......@@ -133,6 +133,7 @@ enum translog_record_type
LOGREC_FILE_ID,
LOGREC_LONG_TRANSACTION_ID,
LOGREC_INCOMPLETE_LOG,
LOGREC_INCOMPLETE_GROUP,
LOGREC_RESERVED_FUTURE_EXTENSION= 63
};
#define LOGREC_NUMBER_OF_TYPES 64 /* Maximum, can't be extended */
......
......@@ -80,6 +80,7 @@ prototype_redo_exec_hook(REDO_REPAIR_TABLE);
prototype_redo_exec_hook(REDO_DROP_TABLE);
prototype_redo_exec_hook(FILE_ID);
prototype_redo_exec_hook(INCOMPLETE_LOG);
prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP);
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
......@@ -108,7 +109,7 @@ prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
static int run_undo_phase(uint unfinished);
static int run_undo_phase(uint uncommitted);
static void display_record_position(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec,
uint number);
......@@ -276,7 +277,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
my_bool take_checkpoints, uint *warnings_count)
{
int error= 0;
uint unfinished_trans;
uint uncommitted_trans;
ulonglong old_now;
DBUG_ENTER("maria_apply_log");
......@@ -326,7 +327,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
if (run_redo_phase(from_lsn, apply))
goto err;
if ((unfinished_trans=
if ((uncommitted_trans=
end_of_redo_phase(should_run_undo_phase)) == (uint)-1)
goto err;
......@@ -366,13 +367,13 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
if (should_run_undo_phase)
{
if (run_undo_phase(unfinished_trans))
if (run_undo_phase(uncommitted_trans))
goto err;
}
else if (unfinished_trans > 0)
else if (uncommitted_trans > 0)
{
tprint(tracef, "***WARNING: %u unfinished transactions; some tables may"
" be left inconsistent!***\n", unfinished_trans);
tprint(tracef, "***WARNING: %u uncommitted transactions; some tables may"
" be left inconsistent!***\n", uncommitted_trans);
warnings++;
}
......@@ -481,7 +482,7 @@ prototype_redo_exec_hook(LONG_TRANSACTION_ID)
LSN gslsn= all_active_trans[sid].group_start_lsn;
if (gslsn != LSN_IMPOSSIBLE)
{
tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u incomplete\n",
LSN_IN_PARTS(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
......@@ -538,6 +539,12 @@ prototype_redo_exec_hook_dummy(CHECKPOINT)
}
prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP)
{
/* abortion was already made */
return 0;
}
prototype_redo_exec_hook(INCOMPLETE_LOG)
{
MARIA_HA *info;
......@@ -1687,7 +1694,6 @@ prototype_redo_exec_hook(COMMIT)
{
uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid;
LSN gslsn= all_active_trans[sid].group_start_lsn;
char llbuf[22];
if (long_trid == 0)
{
......@@ -1696,19 +1702,8 @@ prototype_redo_exec_hook(COMMIT)
return 0;
}
llstr(long_trid, llbuf);
tprint(tracef, "Transaction long_trid %s short_trid %u committed", llbuf, sid);
if (gslsn != LSN_IMPOSSIBLE)
{
/*
It's not an error, it may be that trn got a disk error when writing to a
table, so an unfinished group staid in the log.
*/
tprint(tracef, ", with group at LSN (%lu,0x%lx) short_trid %u aborted\n",
LSN_IN_PARTS(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
else
tprint(tracef, "\n");
tprint(tracef, "Transaction long_trid %s short_trid %u committed\n",
llbuf, sid);
bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
#ifdef MARIA_VERSIONING
/*
......@@ -2096,6 +2091,7 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
install_redo_exec_hook(REDO_DROP_TABLE);
install_redo_exec_hook(FILE_ID);
install_redo_exec_hook(INCOMPLETE_LOG);
install_redo_exec_hook(INCOMPLETE_GROUP);
install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
install_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
......@@ -2154,8 +2150,8 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
/*
A complete group is a set of log records with an "end mark" record
(e.g. a set of REDOs for an operation, terminated by an UNDO for this
operation); if there is no "end mark" record the group is incomplete
and won't be executed.
operation); if there is no "end mark" record the group is incomplete and
won't be executed.
*/
if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
(log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
......@@ -2168,8 +2164,7 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
can happen if the transaction got a table write error, then
unlocked tables thus wrote a COMMIT record.
*/
tprint(tracef, "\nDiscarding unfinished group before this record\n");
ALERT_USER();
tprint(tracef, "\nDiscarding incomplete group before this record\n");
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
else
......@@ -2285,14 +2280,14 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
/**
@brief Informs about any aborted groups or unfinished transactions,
@brief Informs about any aborted groups or uncommitted transactions,
prepares for the UNDO phase if needed.
@note Observe that it may init trnman.
*/
static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
{
uint sid, unfinished= 0;
uint sid, uncommitted= 0;
char llbuf[22];
LSN addr;
......@@ -2316,12 +2311,15 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
LSN gslsn= all_active_trans[sid].group_start_lsn;
TRN *trn;
if (gslsn != LSN_IMPOSSIBLE)
tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
{
tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u incomplete\n",
LSN_IN_PARTS(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
{
llstr(long_trid, llbuf);
tprint(tracef, "Transaction long_trid %s short_trid %u unfinished\n",
tprint(tracef, "Transaction long_trid %s short_trid %u uncommitted\n",
llbuf, sid);
/* dummy_transaction_object serves only for DDLs */
DBUG_ASSERT(long_trid != 0);
......@@ -2332,9 +2330,24 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
trn->undo_lsn= all_active_trans[sid].undo_lsn;
trn->first_undo_lsn= all_active_trans[sid].first_undo_lsn |
TRANSACTION_LOGGED_LONG_ID; /* because trn is known in log */
if (gslsn != LSN_IMPOSSIBLE)
{
/*
UNDO phase will log some records. So, a future recovery may see:
REDO(from incomplete group) - REDO(from rollback) - CLR_END
and thus execute the first REDO (finding it in "a complete
group"). To prevent that:
*/
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS];
LSN lsn;
if (translog_write_record(&lsn, LOGREC_INCOMPLETE_GROUP,
trn, NULL, 0,
TRANSLOG_INTERNAL_PARTS, log_array,
NULL, NULL))
return -1;
}
}
/* otherwise we will just warn about it */
unfinished++;
uncommitted++;
}
#ifdef MARIA_VERSIONING
/*
......@@ -2366,13 +2379,13 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
translog_assign_id_to_share_from_recovery(info->s, sid);
}
}
return unfinished;
return uncommitted;
}
static int run_undo_phase(uint unfinished)
static int run_undo_phase(uint uncommitted)
{
if (unfinished > 0)
if (uncommitted > 0)
{
checkpoint_useful= TRUE;
if (tracef != stdout)
......@@ -2382,12 +2395,12 @@ static int run_undo_phase(uint unfinished)
fprintf(stderr, "transactions to roll back:");
recovery_message_printed= REC_MSG_UNDO;
}
tprint(tracef, "%u transactions will be rolled back\n", unfinished);
tprint(tracef, "%u transactions will be rolled back\n", uncommitted);
for( ; ; )
{
if (recovery_message_printed == REC_MSG_UNDO)
fprintf(stderr, " %u", unfinished);
if ((unfinished--) == 0)
fprintf(stderr, " %u", uncommitted);
if ((uncommitted--) == 0)
break;
char llbuf[22];
TRN *trn= trnman_get_any_trn();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment