Commit 03437ea0 authored by unknown's avatar unknown

Added UNDO handling of insert during recovery


storage/maria/ma_blockrec.c:
  Added UNDO handling of insert during recovery
  To do this, I also had to add write locking of tail pages during undo phase (As we need to access the same page twice if extents are split over two pages)
  Another way to handle the undo of insert would be to store the extent information as part of the UNDO_INSERT block.
storage/maria/ma_blockrec.h:
  Added new prototype
storage/maria/ma_loghandler.c:
  Changed type of CLR_END (to avoid crash in log handler)
  Removed not used variable
storage/maria/ma_loghandler.h:
  Added TRN argument to record_execute_in_undo_phase()
storage/maria/ma_pagecache.c:
  Hack for undo phase of recovery.  During REDO we work with PLAIN pages, but UNDO works with LSN pages, which caused an abort when trying to access a cached page.
storage/maria/ma_recovery.c:
  Added execution of UNDO_ROW_INSERT
storage/maria/ma_test1.c:
  Added option --test-undo for testing recovery with undo
storage/maria/maria_read_log.c:
  Added processing of undos
parent 572ce24f
......@@ -281,9 +281,11 @@ typedef struct st_maria_extent_cursor
/* Position to all tails in the row. Updated when reading a row */
MARIA_RECORD_POS *tail_positions;
/* Current page */
my_off_t page;
ulonglong page;
/* How many pages in the page region */
uint page_count;
/* What kind of lock to use for tail pages */
enum pagecache_page_lock lock_for_tail_pages;
/* Total number of extents (i.e., entries in the 'extent' slot) */
uint extent_count;
/* <> 0 if current extent is a tail page; Set while using cursor */
......@@ -2435,7 +2437,7 @@ my_bool _ma_write_block_record(MARIA_HA *info __attribute__ ((unused)),
/**
@brief Remove row written by _ma_write_block_record()
@brief Remove row written by _ma_write_block_record() and log undo
@param info Maria handler
......@@ -2466,8 +2468,8 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
if (block->used & BLOCKUSED_TAIL)
{
/*
block->page_count is set to the tail directory entry number in
write_block_record()
block->page_count is set to the tail directory entry number in
write_block_record()
*/
if (delete_head_or_tail(info, block->page, block->page_count & ~TAIL_BIT,
0, 0))
......@@ -2894,8 +2896,6 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record)
delete_tails(info, info->cur_row.tail_positions))
goto err;
info->s->state.split--;
if (info->cur_row.extents && free_full_pages(info, &info->cur_row))
goto err;
......@@ -3023,6 +3023,7 @@ static void init_extent(MARIA_EXTENT_CURSOR *extent, uchar *extent_info,
else
extent->page_count= page_count;
extent->tail_positions= tail_positions;
extent->lock_for_tail_pages= PAGECACHE_LOCK_LEFT_UNLOCKED;
}
......@@ -3050,6 +3051,8 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent,
{
MARIA_SHARE *share= info->s;
uchar *buff, *data;
MARIA_PINNED_PAGE page_link;
enum pagecache_page_lock lock;
DBUG_ENTER("read_next_extent");
if (!extent->page_count)
......@@ -3073,17 +3076,22 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent,
}
extent->first_extent= 0;
lock= PAGECACHE_LOCK_LEFT_UNLOCKED;
if (extent->tail)
lock= extent->lock_for_tail_pages;
DBUG_ASSERT(share->pagecache->block_size == share->block_size);
if (!(buff= pagecache_read(share->pagecache,
&info->dfile, extent->page, 0,
info->buff, share->page_type,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
lock, &page_link.link)))
{
/* check if we tried to read over end of file (ie: bad data in record) */
if ((extent->page + 1) * share->block_size > info->state->data_file_length)
goto crashed;
DBUG_RETURN(0);
}
if (!extent->tail)
{
/* Full data page */
......@@ -3095,7 +3103,14 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent,
info->cur_row.full_page_count++; /* For maria_chk */
DBUG_RETURN(extent->data_start= buff + LSN_SIZE + PAGE_TYPE_SIZE);
}
/* Found tail */
if (lock != PAGECACHE_LOCK_LEFT_UNLOCKED)
{
/* Read during redo */
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
push_dynamic(&info->pinned_pages, (void*) &page_link);
}
if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != TAIL_PAGE)
goto crashed;
......@@ -3492,6 +3507,105 @@ err:
}
/** @brief Read positions to tail blocks and full blocks
@fn read_row_extent_info()
@param info Handler
@notes
This function is a simpler version of _ma_read_block_record2()
The data about the used pages is stored in info->cur_row.
@return
@retval 0 ok
@retval 1 Error. my_errno contains error number
*/
static my_bool read_row_extent_info(MARIA_HA *info, uchar *buff,
uint record_number)
{
MARIA_SHARE *share= info->s;
uchar *data, *end_of_data;
uint flag, row_extents, field_lengths;
MARIA_EXTENT_CURSOR extent;
DBUG_ENTER("read_row_extent_info");
if (!(data= get_record_position(buff, share->block_size,
record_number, &end_of_data)))
DBUG_RETURN(1); /* Wrong in record */
flag= (uint) (uchar) data[0];
/* Skip trans header */
data+= total_header_size[(flag & PRECALC_HEADER_BITMASK)];
row_extents= 0;
if (flag & ROW_FLAG_EXTENTS)
{
uint row_extent_size;
/*
Record is split over many data pages.
Get number of extents and first extent
*/
get_key_length(row_extents, data);
row_extent_size= row_extents * ROW_EXTENT_SIZE;
if (info->cur_row.extents_buffer_length < row_extent_size &&
_ma_alloc_buffer(&info->cur_row.extents,
&info->cur_row.extents_buffer_length,
row_extent_size))
DBUG_RETURN(1);
memcpy(info->cur_row.extents, data, ROW_EXTENT_SIZE);
data+= ROW_EXTENT_SIZE;
init_extent(&extent, info->cur_row.extents, row_extents,
info->cur_row.tail_positions);
extent.first_extent= 1;
}
else
(*info->cur_row.tail_positions)= 0;
info->cur_row.extents_count= row_extents;
if (share->base.max_field_lengths)
get_key_length(field_lengths, data);
if (share->calc_checksum)
info->cur_row.checksum= (uint) (uchar) *data++;
if (row_extents > 1)
{
MARIA_RECORD_POS *tail_pos;
uchar *extents, *end;
data+= share->base.null_bytes;
data+= share->base.pack_bytes;
data+= share->base.field_offsets * FIELD_OFFSET_SIZE;
/*
Read row extents (note that first extent was already read into
info->cur_row.extents above)
Lock tails with write lock as we will delete them later.
*/
extent.lock_for_tail_pages= PAGECACHE_LOCK_LEFT_WRITELOCKED;
if (read_long_data(info, info->cur_row.extents + ROW_EXTENT_SIZE,
(row_extents - 1) * ROW_EXTENT_SIZE,
&extent, &data, &end_of_data))
DBUG_RETURN(1);
/* Update tail_positions with pointer to tails */
tail_pos= info->cur_row.tail_positions;
for (extents= info->cur_row.extents, end= extents+ row_extents;
extents < end;
extents += ROW_EXTENT_SIZE)
{
ulonglong page= uint5korr(extents);
uint page_count= uint2korr(extents + ROW_EXTENT_PAGE_SIZE);
if (page_count & TAIL_BIT)
*(tail_pos++)= ma_recordpos(page, (page_count & ~TAIL_BIT));
}
*tail_pos= 0; /* End marker */
}
DBUG_RETURN(0);
}
/*
Read a record based on record position
......@@ -4575,3 +4689,62 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
}
DBUG_RETURN(0);
}
/****************************************************************************
Applying of UNDO entries
****************************************************************************/
my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header)
{
ulonglong page;
uint record_number;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE], *buff;
my_bool res= 1;
MARIA_PINNED_PAGE page_link;
LSN lsn;
DBUG_ENTER("_ma_apply_undo_row_insert");
page= page_korr(header);
record_number= dirpos_korr(header + PAGE_STORE_SIZE);
DBUG_PRINT("enter", ("Page: %lu record_number: %u", (ulong) page,
record_number));
if (!(buff= pagecache_read(info->s->pagecache,
&info->dfile, page, 0,
info->buff, info->s->page_type,
PAGECACHE_LOCK_WRITE,
&page_link.link)))
DBUG_RETURN(1);
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
push_dynamic(&info->pinned_pages, (void*) &page_link);
if (read_row_extent_info(info, buff, record_number))
DBUG_RETURN(1);
if (delete_head_or_tail(info, page, record_number, 1, 1) ||
delete_tails(info, info->cur_row.tail_positions))
goto err;
if (info->cur_row.extents && free_full_pages(info, &info->cur_row))
goto err;
lsn_store(log_data + FILEID_STORE_SIZE, undo_lsn);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data))
goto err;
info->s->state.state.records--;
res= 0;
err:
_ma_unpin_all_pages(info, lsn);
DBUG_RETURN(res);
}
......@@ -187,3 +187,5 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
const uchar *header);
uint _ma_apply_redo_purge_blocks(MARIA_HA *info, LSN lsn,
const uchar *header);
my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header);
......@@ -329,7 +329,7 @@ static LOG_DESC INIT_LOGREC_REDO_UNDELETE_ROW=
"redo_undelete_row", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_CLR_END=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, write_hook_for_redo, NULL, 1,
{LOGRECTYPE_FIXEDLENGTH, 9, 9, NULL, write_hook_for_redo, NULL, 0,
"clr_end", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_PURGE_END=
......@@ -6211,7 +6211,6 @@ static my_bool write_hook_for_undo(enum translog_record_type type
*/
}
/**
@brief Gives a 2-byte-id to MARIA_SHARE and logs this fact
......@@ -6353,7 +6352,6 @@ my_bool translog_is_file(uint file_no)
static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
{
TRANSLOG_ADDRESS addr;
uint min_file= 1, max_file;
DBUG_ENTER("translog_first_file");
if (!is_protected)
......
......@@ -342,8 +342,8 @@ typedef struct st_log_record_type_descriptor
/* HOOK for reading headers */
read_rec_hook read_hook;
/*
For pseudo fixed records number of compressed LSNs followed by
system header
For pseudo fixed records number of compressed LSNs followed by
system header
*/
int16 compressed_LSN;
/* the rest is for maria_read_log & Recovery */
......@@ -353,7 +353,7 @@ typedef struct st_log_record_type_descriptor
/* a function to execute when we see the record during the REDO phase */
int (*record_execute_in_redo_phase)(const TRANSLOG_HEADER_BUFFER *);
/* a function to execute when we see the record during the UNDO phase */
int (*record_execute_in_undo_phase)(const TRANSLOG_HEADER_BUFFER *);
int (*record_execute_in_undo_phase)(const TRANSLOG_HEADER_BUFFER *, TRN *);
} LOG_DESC;
extern LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
......
......@@ -2885,6 +2885,7 @@ restart:
&page_st);
DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
block->type == type ||
type == PAGECACHE_LSN_PAGE ||
type == PAGECACHE_READ_UNKNOWN_PAGE ||
block->type == PAGECACHE_READ_UNKNOWN_PAGE);
if (type != PAGECACHE_READ_UNKNOWN_PAGE ||
......
......@@ -49,27 +49,34 @@ static LSN current_group_end_lsn,
checkpoint_start= LSN_IMPOSSIBLE;
static FILE *tracef; /**< trace file for debugging */
#define prototype_exec_hook(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
#define prototype_exec_hook_dummy(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \
#define prototype_redo_exec_hook(R) \
static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
#define prototype_redo_exec_hook_dummy(R) \
static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \
__attribute ((unused)))
prototype_exec_hook(LONG_TRANSACTION_ID);
prototype_exec_hook_dummy(CHECKPOINT);
prototype_exec_hook(REDO_CREATE_TABLE);
prototype_exec_hook(REDO_DROP_TABLE);
prototype_exec_hook(FILE_ID);
prototype_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_exec_hook(REDO_PURGE_ROW_HEAD);
prototype_exec_hook(REDO_PURGE_ROW_TAIL);
prototype_exec_hook(REDO_PURGE_BLOCKS);
prototype_exec_hook(REDO_DELETE_ALL);
prototype_exec_hook(UNDO_ROW_INSERT);
prototype_exec_hook(UNDO_ROW_DELETE);
prototype_exec_hook(UNDO_ROW_UPDATE);
prototype_exec_hook(UNDO_ROW_PURGE);
prototype_exec_hook(COMMIT);
#define prototype_undo_exec_hook(R) \
static int exec_UNDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec, TRN *trn)
prototype_redo_exec_hook(LONG_TRANSACTION_ID);
prototype_redo_exec_hook_dummy(CHECKPOINT);
prototype_redo_exec_hook(REDO_CREATE_TABLE);
prototype_redo_exec_hook(REDO_DROP_TABLE);
prototype_redo_exec_hook(FILE_ID);
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD);
prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL);
prototype_redo_exec_hook(REDO_PURGE_BLOCKS);
prototype_redo_exec_hook(REDO_DELETE_ALL);
prototype_redo_exec_hook(UNDO_ROW_INSERT);
prototype_redo_exec_hook(UNDO_ROW_DELETE);
prototype_redo_exec_hook(UNDO_ROW_UPDATE);
prototype_redo_exec_hook(UNDO_ROW_PURGE);
prototype_redo_exec_hook(COMMIT);
prototype_undo_exec_hook(UNDO_ROW_INSERT);
static int run_redo_phase(LSN lsn, my_bool apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
static int run_undo_phase(uint unfinished);
......@@ -159,6 +166,7 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
my_bool should_run_undo_phase)
{
int error= 0;
uint unfinished_trans;
DBUG_ENTER("maria_apply_log");
DBUG_ASSERT(apply || !should_run_undo_phase);
......@@ -199,7 +207,7 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
if (run_redo_phase(from_lsn, apply))
goto err;
uint unfinished_trans= end_of_redo_phase(should_run_undo_phase);
unfinished_trans= end_of_redo_phase(should_run_undo_phase);
if (unfinished_trans == (uint)-1)
goto err;
if (should_run_undo_phase)
......@@ -270,12 +278,12 @@ static int display_and_apply_record(const LOG_DESC *log_desc,
return 1;
}
if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
fprintf(tracef, "Got error when executing record\n");
fprintf(tracef, "Got error when executing redo on record\n");
return error;
}
prototype_exec_hook(LONG_TRANSACTION_ID)
prototype_redo_exec_hook(LONG_TRANSACTION_ID)
{
uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid;
......@@ -325,14 +333,14 @@ static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
}
prototype_exec_hook_dummy(CHECKPOINT)
prototype_redo_exec_hook_dummy(CHECKPOINT)
{
/* the only checkpoint we care about was found via control file, ignore */
return 0;
}
prototype_exec_hook(REDO_CREATE_TABLE)
prototype_redo_exec_hook(REDO_CREATE_TABLE)
{
File dfile= -1, kfile= -1;
char *linkname_ptr, filename[FN_REFLEN];
......@@ -458,7 +466,7 @@ end:
}
prototype_exec_hook(REDO_DROP_TABLE)
prototype_redo_exec_hook(REDO_DROP_TABLE)
{
char *name;
int error= 1;
......@@ -528,7 +536,7 @@ end:
}
prototype_exec_hook(FILE_ID)
prototype_redo_exec_hook(FILE_ID)
{
uint16 sid;
int error= 1;
......@@ -656,7 +664,7 @@ end:
}
prototype_exec_hook(REDO_INSERT_ROW_HEAD)
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)
{
int error= 1;
uchar *buff= NULL;
......@@ -701,7 +709,7 @@ end:
}
prototype_exec_hook(REDO_INSERT_ROW_TAIL)
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL)
{
int error= 1;
uchar *buff;
......@@ -737,7 +745,7 @@ end:
}
prototype_exec_hook(REDO_PURGE_ROW_HEAD)
prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
......@@ -753,7 +761,7 @@ end:
}
prototype_exec_hook(REDO_PURGE_ROW_TAIL)
prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
......@@ -769,7 +777,7 @@ end:
}
prototype_exec_hook(REDO_PURGE_BLOCKS)
prototype_redo_exec_hook(REDO_PURGE_BLOCKS)
{
int error= 1;
uchar *buff;
......@@ -797,7 +805,7 @@ end:
}
prototype_exec_hook(REDO_DELETE_ALL)
prototype_redo_exec_hook(REDO_DELETE_ALL)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
......@@ -813,12 +821,12 @@ end:
}
#define set_undo_lsn_for_active_trans(I, L) do { \
all_active_trans[I].undo_lsn= L; \
if (all_active_trans[I].first_undo_lsn == LSN_IMPOSSIBLE) \
all_active_trans[I].first_undo_lsn= L; } while (0)
#define set_undo_lsn_for_active_trans(TRID, LSN) do { \
all_active_trans[TRID].undo_lsn= LSN; \
if (all_active_trans[TRID].first_undo_lsn == LSN_IMPOSSIBLE) \
all_active_trans[TRID].first_undo_lsn= LSN; } while (0)
prototype_exec_hook(UNDO_ROW_INSERT)
prototype_redo_exec_hook(UNDO_ROW_INSERT)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
......@@ -843,12 +851,13 @@ prototype_exec_hook(UNDO_ROW_INSERT)
}
fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
error= 0;
end:
return error;
}
prototype_exec_hook(UNDO_ROW_DELETE)
prototype_redo_exec_hook(UNDO_ROW_DELETE)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
......@@ -868,7 +877,7 @@ end:
}
prototype_exec_hook(UNDO_ROW_UPDATE)
prototype_redo_exec_hook(UNDO_ROW_UPDATE)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
......@@ -885,7 +894,7 @@ end:
}
prototype_exec_hook(UNDO_ROW_PURGE)
prototype_redo_exec_hook(UNDO_ROW_PURGE)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
......@@ -906,7 +915,7 @@ end:
}
prototype_exec_hook(COMMIT)
prototype_redo_exec_hook(COMMIT)
{
uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid;
......@@ -945,28 +954,55 @@ prototype_exec_hook(COMMIT)
}
prototype_undo_exec_hook(UNDO_ROW_INSERT)
{
my_bool error;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
return 1;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
/* Set undo to point to previous undo record */
info->trn= trn;
info->trn->undo_lsn= lsn_korr(rec->header);
error= _ma_apply_undo_row_insert(info, rec->lsn,
rec->header + LSN_STORE_SIZE +
FILEID_STORE_SIZE);
info->trn= 0;
return error;
}
static int run_redo_phase(LSN lsn, my_bool apply)
{
/* install hooks for execution */
#define install_exec_hook(R) \
#define install_redo_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
exec_LOGREC_ ## R;
install_exec_hook(LONG_TRANSACTION_ID);
install_exec_hook(CHECKPOINT);
install_exec_hook(REDO_CREATE_TABLE);
install_exec_hook(REDO_DROP_TABLE);
install_exec_hook(FILE_ID);
install_exec_hook(REDO_INSERT_ROW_HEAD);
install_exec_hook(REDO_INSERT_ROW_TAIL);
install_exec_hook(REDO_PURGE_ROW_HEAD);
install_exec_hook(REDO_PURGE_ROW_TAIL);
install_exec_hook(REDO_PURGE_BLOCKS);
install_exec_hook(REDO_DELETE_ALL);
install_exec_hook(UNDO_ROW_INSERT);
install_exec_hook(UNDO_ROW_DELETE);
install_exec_hook(UNDO_ROW_UPDATE);
install_exec_hook(UNDO_ROW_PURGE);
install_exec_hook(COMMIT);
exec_REDO_LOGREC_ ## R;
#define install_undo_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_undo_phase= \
exec_UNDO_LOGREC_ ## R;
install_redo_exec_hook(LONG_TRANSACTION_ID);
install_redo_exec_hook(CHECKPOINT);
install_redo_exec_hook(REDO_CREATE_TABLE);
install_redo_exec_hook(REDO_DROP_TABLE);
install_redo_exec_hook(FILE_ID);
install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
install_redo_exec_hook(REDO_PURGE_ROW_HEAD);
install_redo_exec_hook(REDO_PURGE_ROW_TAIL);
install_redo_exec_hook(REDO_PURGE_BLOCKS);
install_redo_exec_hook(REDO_DELETE_ALL);
install_redo_exec_hook(UNDO_ROW_INSERT);
install_redo_exec_hook(UNDO_ROW_DELETE);
install_redo_exec_hook(UNDO_ROW_UPDATE);
install_redo_exec_hook(UNDO_ROW_PURGE);
install_redo_exec_hook(COMMIT);
install_undo_exec_hook(UNDO_ROW_INSERT);
current_group_end_lsn= LSN_IMPOSSIBLE;
......@@ -1178,10 +1214,6 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
}
}
/* we don't need all_tables anymore, maria_open_list is enough */
my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
all_tables= NULL;
/*
We could take a checkpoint here, in case of a crash during the UNDO
phase. The drawback is that a page which got a REDO (thus, flushed
......@@ -1207,7 +1239,23 @@ static int run_undo_phase(uint unfinished)
DBUG_ASSERT(trn != NULL);
llstr(trn->trid, llbuf);
fprintf(tracef, "Rolling back transaction of long id %s\n", llbuf);
/* of course we miss execution of UNDOs here */
/* Execute all undo entries */
while (trn->undo_lsn)
{
TRANSLOG_HEADER_BUFFER rec;
LOG_DESC *log_desc;
if (translog_read_record_header(trn->undo_lsn, &rec) ==
RECHEADER_READ_ERROR)
return 1;
log_desc= &log_record_type_descriptor[rec.type];
if (log_desc->record_execute_in_undo_phase(&rec, trn))
{
fprintf(tracef, "Got error when executing undo\n");
return 1;
}
}
if (trnman_rollback_trn(trn))
return 1;
/* We could want to span a few threads (4?) instead of 1 */
......
......@@ -38,7 +38,7 @@ static uint insert_count, update_count, remove_count;
static uint pack_keys=0, pack_seg=0, key_length;
static uint unique_key=HA_NOSAME;
static my_bool pagecacheing, null_fields, silent, skip_update, opt_unique,
verbose, skip_delete, transactional;
verbose, skip_delete, transactional, die_in_middle_of_transaction;
static MARIA_COLUMNDEF recinfo[4];
static MARIA_KEYDEF keyinfo[10];
static HA_KEYSEG keyseg[10];
......@@ -50,6 +50,19 @@ static void create_key(char *key,uint rownr);
static void create_record(char *record,uint rownr);
static void update_record(char *record);
/*
These are here only for testing of recovery with undo. We are not
including maria_def.h here as this test is also to be an example of
how to use maria outside of the maria directory
*/
extern int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index,
enum flush_type flush_type_for_data,
enum flush_type flush_type_for_index);
#define MARIA_FLUSH_DATA 1
int main(int argc,char *argv[])
{
MY_INIT(argv[0]);
......@@ -86,6 +99,9 @@ static int run_test(const char *filename)
MARIA_UNIQUEDEF uniquedef;
MARIA_CREATE_INFO create_info;
if (die_in_middle_of_transaction)
null_fields= 1;
bzero((char*) recinfo,sizeof(recinfo));
bzero((char*) &create_info,sizeof(create_info));
......@@ -198,6 +214,9 @@ static int run_test(const char *filename)
printf("J= %2d maria_write: %d errno: %d\n", j,error,my_errno);
}
if (maria_commit(file) || maria_begin(file))
goto err;
/* Insert 2 rows with null values */
if (null_fields)
{
......@@ -215,6 +234,17 @@ static int run_test(const char *filename)
flags[0]=2;
}
if (die_in_middle_of_transaction)
{
/*
Ensure we get changed pages and log to disk
As commit record is not done, the undo entries needs to be rolled back.
*/
_ma_flush_table_files(file, MARIA_FLUSH_DATA, FLUSH_RELEASE,
FLUSH_RELEASE);
exit(1);
}
if (!skip_update)
{
if (opt_unique)
......@@ -627,6 +657,11 @@ static struct my_option my_long_options[] =
(uchar**) &skip_delete, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"skip-update", 'D', "Don't test updates", (uchar**) &skip_update,
(uchar**) &skip_update, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"test-undo", 'A',
"Abort hard after doing inserts. Used for testing recovery with undo",
(uchar**) &die_in_middle_of_transaction,
(uchar**) &die_in_middle_of_transaction,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"transactional", 'T',
"Test in transactional mode. (Only works with block format)",
(uchar**) &transactional, (uchar**) &transactional, 0, GET_BOOL, NO_ARG,
......
......@@ -93,8 +93,7 @@ int main(int argc, char **argv)
*/
fprintf(stdout, "TRACE of the last maria_read_log\n");
/* Until we have UNDO records, no UNDO phase */
if (maria_apply_log(lsn, opt_display_and_apply, stdout, FALSE))
if (maria_apply_log(lsn, opt_display_and_apply, stdout, TRUE))
goto err;
fprintf(stdout, "%s: SUCCESS\n", my_progname);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment