Commit b4b0bf47 authored by unknown's avatar unknown

Merge bk-internal.mysql.com:/home/bk/mysql-maria

into  mysql.com:/home/my/mysql-maria


storage/maria/ma_pagecache.c:
  Auto merged
parents fe892036 03437ea0
......@@ -281,9 +281,11 @@ typedef struct st_maria_extent_cursor
/* Position to all tails in the row. Updated when reading a row */
MARIA_RECORD_POS *tail_positions;
/* Current page */
my_off_t page;
ulonglong page;
/* How many pages in the page region */
uint page_count;
/* What kind of lock to use for tail pages */
enum pagecache_page_lock lock_for_tail_pages;
/* Total number of extents (i.e., entries in the 'extent' slot) */
uint extent_count;
/* <> 0 if current extent is a tail page; Set while using cursor */
......@@ -2435,7 +2437,7 @@ my_bool _ma_write_block_record(MARIA_HA *info __attribute__ ((unused)),
/**
@brief Remove row written by _ma_write_block_record()
@brief Remove row written by _ma_write_block_record() and log undo
@param info Maria handler
......@@ -2894,8 +2896,6 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record)
delete_tails(info, info->cur_row.tail_positions))
goto err;
info->s->state.split--;
if (info->cur_row.extents && free_full_pages(info, &info->cur_row))
goto err;
......@@ -3023,6 +3023,7 @@ static void init_extent(MARIA_EXTENT_CURSOR *extent, uchar *extent_info,
else
extent->page_count= page_count;
extent->tail_positions= tail_positions;
extent->lock_for_tail_pages= PAGECACHE_LOCK_LEFT_UNLOCKED;
}
......@@ -3050,6 +3051,8 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent,
{
MARIA_SHARE *share= info->s;
uchar *buff, *data;
MARIA_PINNED_PAGE page_link;
enum pagecache_page_lock lock;
DBUG_ENTER("read_next_extent");
if (!extent->page_count)
......@@ -3073,17 +3076,22 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent,
}
extent->first_extent= 0;
lock= PAGECACHE_LOCK_LEFT_UNLOCKED;
if (extent->tail)
lock= extent->lock_for_tail_pages;
DBUG_ASSERT(share->pagecache->block_size == share->block_size);
if (!(buff= pagecache_read(share->pagecache,
&info->dfile, extent->page, 0,
info->buff, share->page_type,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
lock, &page_link.link)))
{
/* check if we tried to read over end of file (ie: bad data in record) */
if ((extent->page + 1) * share->block_size > info->state->data_file_length)
goto crashed;
DBUG_RETURN(0);
}
if (!extent->tail)
{
/* Full data page */
......@@ -3095,7 +3103,14 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent,
info->cur_row.full_page_count++; /* For maria_chk */
DBUG_RETURN(extent->data_start= buff + LSN_SIZE + PAGE_TYPE_SIZE);
}
/* Found tail */
if (lock != PAGECACHE_LOCK_LEFT_UNLOCKED)
{
/* Read during redo */
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
push_dynamic(&info->pinned_pages, (void*) &page_link);
}
if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != TAIL_PAGE)
goto crashed;
......@@ -3492,6 +3507,105 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
}
/** @brief Read positions to tail blocks and full blocks
@fn read_row_extent_info()
@param info Handler
@notes
This function is a simpler version of _ma_read_block_record2()
The data about the used pages is stored in info->cur_row.
@return
@retval 0 ok
@retval 1 Error. my_errno contains error number
*/
static my_bool read_row_extent_info(MARIA_HA *info, uchar *buff,
uint record_number)
{
MARIA_SHARE *share= info->s;
uchar *data, *end_of_data;
uint flag, row_extents, field_lengths;
MARIA_EXTENT_CURSOR extent;
DBUG_ENTER("read_row_extent_info");
if (!(data= get_record_position(buff, share->block_size,
record_number, &end_of_data)))
DBUG_RETURN(1); /* Wrong in record */
flag= (uint) (uchar) data[0];
/* Skip trans header */
data+= total_header_size[(flag & PRECALC_HEADER_BITMASK)];
row_extents= 0;
if (flag & ROW_FLAG_EXTENTS)
{
uint row_extent_size;
/*
Record is split over many data pages.
Get number of extents and first extent
*/
get_key_length(row_extents, data);
row_extent_size= row_extents * ROW_EXTENT_SIZE;
if (info->cur_row.extents_buffer_length < row_extent_size &&
_ma_alloc_buffer(&info->cur_row.extents,
&info->cur_row.extents_buffer_length,
row_extent_size))
DBUG_RETURN(1);
memcpy(info->cur_row.extents, data, ROW_EXTENT_SIZE);
data+= ROW_EXTENT_SIZE;
init_extent(&extent, info->cur_row.extents, row_extents,
info->cur_row.tail_positions);
extent.first_extent= 1;
}
else
(*info->cur_row.tail_positions)= 0;
info->cur_row.extents_count= row_extents;
if (share->base.max_field_lengths)
get_key_length(field_lengths, data);
if (share->calc_checksum)
info->cur_row.checksum= (uint) (uchar) *data++;
if (row_extents > 1)
{
MARIA_RECORD_POS *tail_pos;
uchar *extents, *end;
data+= share->base.null_bytes;
data+= share->base.pack_bytes;
data+= share->base.field_offsets * FIELD_OFFSET_SIZE;
/*
Read row extents (note that first extent was already read into
info->cur_row.extents above)
Lock tails with write lock as we will delete them later.
*/
extent.lock_for_tail_pages= PAGECACHE_LOCK_LEFT_WRITELOCKED;
if (read_long_data(info, info->cur_row.extents + ROW_EXTENT_SIZE,
(row_extents - 1) * ROW_EXTENT_SIZE,
&extent, &data, &end_of_data))
DBUG_RETURN(1);
/* Update tail_positions with pointer to tails */
tail_pos= info->cur_row.tail_positions;
for (extents= info->cur_row.extents, end= extents+ row_extents;
extents < end;
extents += ROW_EXTENT_SIZE)
{
ulonglong page= uint5korr(extents);
uint page_count= uint2korr(extents + ROW_EXTENT_PAGE_SIZE);
if (page_count & TAIL_BIT)
*(tail_pos++)= ma_recordpos(page, (page_count & ~TAIL_BIT));
}
*tail_pos= 0; /* End marker */
}
DBUG_RETURN(0);
}
/*
Read a record based on record position
......@@ -4575,3 +4689,62 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
}
DBUG_RETURN(0);
}
/****************************************************************************
Applying of UNDO entries
****************************************************************************/
my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header)
{
ulonglong page;
uint record_number;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE], *buff;
my_bool res= 1;
MARIA_PINNED_PAGE page_link;
LSN lsn;
DBUG_ENTER("_ma_apply_undo_row_insert");
page= page_korr(header);
record_number= dirpos_korr(header + PAGE_STORE_SIZE);
DBUG_PRINT("enter", ("Page: %lu record_number: %u", (ulong) page,
record_number));
if (!(buff= pagecache_read(info->s->pagecache,
&info->dfile, page, 0,
info->buff, info->s->page_type,
PAGECACHE_LOCK_WRITE,
&page_link.link)))
DBUG_RETURN(1);
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
push_dynamic(&info->pinned_pages, (void*) &page_link);
if (read_row_extent_info(info, buff, record_number))
DBUG_RETURN(1);
if (delete_head_or_tail(info, page, record_number, 1, 1) ||
delete_tails(info, info->cur_row.tail_positions))
goto err;
if (info->cur_row.extents && free_full_pages(info, &info->cur_row))
goto err;
lsn_store(log_data + FILEID_STORE_SIZE, undo_lsn);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data))
goto err;
info->s->state.state.records--;
res= 0;
err:
_ma_unpin_all_pages(info, lsn);
DBUG_RETURN(res);
}
......@@ -187,3 +187,5 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
const uchar *header);
uint _ma_apply_redo_purge_blocks(MARIA_HA *info, LSN lsn,
const uchar *header);
my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header);
......@@ -329,7 +329,7 @@ static LOG_DESC INIT_LOGREC_REDO_UNDELETE_ROW=
"redo_undelete_row", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_CLR_END=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, write_hook_for_redo, NULL, 1,
{LOGRECTYPE_FIXEDLENGTH, 9, 9, NULL, write_hook_for_redo, NULL, 0,
"clr_end", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_PURGE_END=
......@@ -6211,7 +6211,6 @@ static my_bool write_hook_for_undo(enum translog_record_type type
*/
}
/**
@brief Gives a 2-byte-id to MARIA_SHARE and logs this fact
......@@ -6353,7 +6352,6 @@ my_bool translog_is_file(uint file_no)
static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
{
TRANSLOG_ADDRESS addr;
uint min_file= 1, max_file;
DBUG_ENTER("translog_first_file");
if (!is_protected)
......
......@@ -353,7 +353,7 @@ typedef struct st_log_record_type_descriptor
/* a function to execute when we see the record during the REDO phase */
int (*record_execute_in_redo_phase)(const TRANSLOG_HEADER_BUFFER *);
/* a function to execute when we see the record during the UNDO phase */
int (*record_execute_in_undo_phase)(const TRANSLOG_HEADER_BUFFER *);
int (*record_execute_in_undo_phase)(const TRANSLOG_HEADER_BUFFER *, TRN *);
} LOG_DESC;
extern LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
......
......@@ -2885,6 +2885,7 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache,
&page_st);
DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
block->type == type ||
type == PAGECACHE_LSN_PAGE ||
type == PAGECACHE_READ_UNKNOWN_PAGE ||
block->type == PAGECACHE_READ_UNKNOWN_PAGE);
if (type != PAGECACHE_READ_UNKNOWN_PAGE ||
......
......@@ -49,27 +49,34 @@ static LSN current_group_end_lsn,
checkpoint_start= LSN_IMPOSSIBLE;
static FILE *tracef; /**< trace file for debugging */
#define prototype_exec_hook(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
#define prototype_exec_hook_dummy(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \
#define prototype_redo_exec_hook(R) \
static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
#define prototype_redo_exec_hook_dummy(R) \
static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \
__attribute ((unused)))
prototype_exec_hook(LONG_TRANSACTION_ID);
prototype_exec_hook_dummy(CHECKPOINT);
prototype_exec_hook(REDO_CREATE_TABLE);
prototype_exec_hook(REDO_DROP_TABLE);
prototype_exec_hook(FILE_ID);
prototype_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_exec_hook(REDO_PURGE_ROW_HEAD);
prototype_exec_hook(REDO_PURGE_ROW_TAIL);
prototype_exec_hook(REDO_PURGE_BLOCKS);
prototype_exec_hook(REDO_DELETE_ALL);
prototype_exec_hook(UNDO_ROW_INSERT);
prototype_exec_hook(UNDO_ROW_DELETE);
prototype_exec_hook(UNDO_ROW_UPDATE);
prototype_exec_hook(UNDO_ROW_PURGE);
prototype_exec_hook(COMMIT);
#define prototype_undo_exec_hook(R) \
static int exec_UNDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec, TRN *trn)
prototype_redo_exec_hook(LONG_TRANSACTION_ID);
prototype_redo_exec_hook_dummy(CHECKPOINT);
prototype_redo_exec_hook(REDO_CREATE_TABLE);
prototype_redo_exec_hook(REDO_DROP_TABLE);
prototype_redo_exec_hook(FILE_ID);
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD);
prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL);
prototype_redo_exec_hook(REDO_PURGE_BLOCKS);
prototype_redo_exec_hook(REDO_DELETE_ALL);
prototype_redo_exec_hook(UNDO_ROW_INSERT);
prototype_redo_exec_hook(UNDO_ROW_DELETE);
prototype_redo_exec_hook(UNDO_ROW_UPDATE);
prototype_redo_exec_hook(UNDO_ROW_PURGE);
prototype_redo_exec_hook(COMMIT);
prototype_undo_exec_hook(UNDO_ROW_INSERT);
static int run_redo_phase(LSN lsn, my_bool apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
static int run_undo_phase(uint unfinished);
......@@ -159,6 +166,7 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
my_bool should_run_undo_phase)
{
int error= 0;
uint unfinished_trans;
DBUG_ENTER("maria_apply_log");
DBUG_ASSERT(apply || !should_run_undo_phase);
......@@ -199,7 +207,7 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
if (run_redo_phase(from_lsn, apply))
goto err;
uint unfinished_trans= end_of_redo_phase(should_run_undo_phase);
unfinished_trans= end_of_redo_phase(should_run_undo_phase);
if (unfinished_trans == (uint)-1)
goto err;
if (should_run_undo_phase)
......@@ -270,12 +278,12 @@ static int display_and_apply_record(const LOG_DESC *log_desc,
return 1;
}
if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
fprintf(tracef, "Got error when executing record\n");
fprintf(tracef, "Got error when executing redo on record\n");
return error;
}
prototype_exec_hook(LONG_TRANSACTION_ID)
prototype_redo_exec_hook(LONG_TRANSACTION_ID)
{
uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid;
......@@ -325,14 +333,14 @@ static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
}
prototype_exec_hook_dummy(CHECKPOINT)
prototype_redo_exec_hook_dummy(CHECKPOINT)
{
/* the only checkpoint we care about was found via control file, ignore */
return 0;
}
prototype_exec_hook(REDO_CREATE_TABLE)
prototype_redo_exec_hook(REDO_CREATE_TABLE)
{
File dfile= -1, kfile= -1;
char *linkname_ptr, filename[FN_REFLEN];
......@@ -458,7 +466,7 @@ prototype_exec_hook(REDO_CREATE_TABLE)
}
prototype_exec_hook(REDO_DROP_TABLE)
prototype_redo_exec_hook(REDO_DROP_TABLE)
{
char *name;
int error= 1;
......@@ -528,7 +536,7 @@ prototype_exec_hook(REDO_DROP_TABLE)
}
prototype_exec_hook(FILE_ID)
prototype_redo_exec_hook(FILE_ID)
{
uint16 sid;
int error= 1;
......@@ -656,7 +664,7 @@ static int new_table(uint16 sid, const char *name,
}
prototype_exec_hook(REDO_INSERT_ROW_HEAD)
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)
{
int error= 1;
uchar *buff= NULL;
......@@ -701,7 +709,7 @@ prototype_exec_hook(REDO_INSERT_ROW_HEAD)
}
prototype_exec_hook(REDO_INSERT_ROW_TAIL)
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL)
{
int error= 1;
uchar *buff;
......@@ -737,7 +745,7 @@ prototype_exec_hook(REDO_INSERT_ROW_TAIL)
}
prototype_exec_hook(REDO_PURGE_ROW_HEAD)
prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
......@@ -753,7 +761,7 @@ prototype_exec_hook(REDO_PURGE_ROW_HEAD)
}
prototype_exec_hook(REDO_PURGE_ROW_TAIL)
prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
......@@ -769,7 +777,7 @@ prototype_exec_hook(REDO_PURGE_ROW_TAIL)
}
prototype_exec_hook(REDO_PURGE_BLOCKS)
prototype_redo_exec_hook(REDO_PURGE_BLOCKS)
{
int error= 1;
uchar *buff;
......@@ -797,7 +805,7 @@ prototype_exec_hook(REDO_PURGE_BLOCKS)
}
prototype_exec_hook(REDO_DELETE_ALL)
prototype_redo_exec_hook(REDO_DELETE_ALL)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
......@@ -813,12 +821,12 @@ prototype_exec_hook(REDO_DELETE_ALL)
}
#define set_undo_lsn_for_active_trans(I, L) do { \
all_active_trans[I].undo_lsn= L; \
if (all_active_trans[I].first_undo_lsn == LSN_IMPOSSIBLE) \
all_active_trans[I].first_undo_lsn= L; } while (0)
#define set_undo_lsn_for_active_trans(TRID, LSN) do { \
all_active_trans[TRID].undo_lsn= LSN; \
if (all_active_trans[TRID].first_undo_lsn == LSN_IMPOSSIBLE) \
all_active_trans[TRID].first_undo_lsn= LSN; } while (0)
prototype_exec_hook(UNDO_ROW_INSERT)
prototype_redo_exec_hook(UNDO_ROW_INSERT)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
......@@ -843,12 +851,13 @@ prototype_exec_hook(UNDO_ROW_INSERT)
}
fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
error= 0;
end:
return error;
}
prototype_exec_hook(UNDO_ROW_DELETE)
prototype_redo_exec_hook(UNDO_ROW_DELETE)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
......@@ -868,7 +877,7 @@ prototype_exec_hook(UNDO_ROW_DELETE)
}
prototype_exec_hook(UNDO_ROW_UPDATE)
prototype_redo_exec_hook(UNDO_ROW_UPDATE)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
......@@ -885,7 +894,7 @@ prototype_exec_hook(UNDO_ROW_UPDATE)
}
prototype_exec_hook(UNDO_ROW_PURGE)
prototype_redo_exec_hook(UNDO_ROW_PURGE)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
......@@ -906,7 +915,7 @@ prototype_exec_hook(UNDO_ROW_PURGE)
}
prototype_exec_hook(COMMIT)
prototype_redo_exec_hook(COMMIT)
{
uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid;
......@@ -945,28 +954,55 @@ prototype_exec_hook(COMMIT)
}
prototype_undo_exec_hook(UNDO_ROW_INSERT)
{
my_bool error;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
return 1;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
/* Set undo to point to previous undo record */
info->trn= trn;
info->trn->undo_lsn= lsn_korr(rec->header);
error= _ma_apply_undo_row_insert(info, rec->lsn,
rec->header + LSN_STORE_SIZE +
FILEID_STORE_SIZE);
info->trn= 0;
return error;
}
static int run_redo_phase(LSN lsn, my_bool apply)
{
/* install hooks for execution */
#define install_exec_hook(R) \
#define install_redo_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
exec_LOGREC_ ## R;
install_exec_hook(LONG_TRANSACTION_ID);
install_exec_hook(CHECKPOINT);
install_exec_hook(REDO_CREATE_TABLE);
install_exec_hook(REDO_DROP_TABLE);
install_exec_hook(FILE_ID);
install_exec_hook(REDO_INSERT_ROW_HEAD);
install_exec_hook(REDO_INSERT_ROW_TAIL);
install_exec_hook(REDO_PURGE_ROW_HEAD);
install_exec_hook(REDO_PURGE_ROW_TAIL);
install_exec_hook(REDO_PURGE_BLOCKS);
install_exec_hook(REDO_DELETE_ALL);
install_exec_hook(UNDO_ROW_INSERT);
install_exec_hook(UNDO_ROW_DELETE);
install_exec_hook(UNDO_ROW_UPDATE);
install_exec_hook(UNDO_ROW_PURGE);
install_exec_hook(COMMIT);
exec_REDO_LOGREC_ ## R;
#define install_undo_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_undo_phase= \
exec_UNDO_LOGREC_ ## R;
install_redo_exec_hook(LONG_TRANSACTION_ID);
install_redo_exec_hook(CHECKPOINT);
install_redo_exec_hook(REDO_CREATE_TABLE);
install_redo_exec_hook(REDO_DROP_TABLE);
install_redo_exec_hook(FILE_ID);
install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
install_redo_exec_hook(REDO_PURGE_ROW_HEAD);
install_redo_exec_hook(REDO_PURGE_ROW_TAIL);
install_redo_exec_hook(REDO_PURGE_BLOCKS);
install_redo_exec_hook(REDO_DELETE_ALL);
install_redo_exec_hook(UNDO_ROW_INSERT);
install_redo_exec_hook(UNDO_ROW_DELETE);
install_redo_exec_hook(UNDO_ROW_UPDATE);
install_redo_exec_hook(UNDO_ROW_PURGE);
install_redo_exec_hook(COMMIT);
install_undo_exec_hook(UNDO_ROW_INSERT);
current_group_end_lsn= LSN_IMPOSSIBLE;
......@@ -1178,10 +1214,6 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
}
}
/* we don't need all_tables anymore, maria_open_list is enough */
my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
all_tables= NULL;
/*
We could take a checkpoint here, in case of a crash during the UNDO
phase. The drawback is that a page which got a REDO (thus, flushed
......@@ -1207,7 +1239,23 @@ static int run_undo_phase(uint unfinished)
DBUG_ASSERT(trn != NULL);
llstr(trn->trid, llbuf);
fprintf(tracef, "Rolling back transaction of long id %s\n", llbuf);
/* of course we miss execution of UNDOs here */
/* Execute all undo entries */
while (trn->undo_lsn)
{
TRANSLOG_HEADER_BUFFER rec;
LOG_DESC *log_desc;
if (translog_read_record_header(trn->undo_lsn, &rec) ==
RECHEADER_READ_ERROR)
return 1;
log_desc= &log_record_type_descriptor[rec.type];
if (log_desc->record_execute_in_undo_phase(&rec, trn))
{
fprintf(tracef, "Got error when executing undo\n");
return 1;
}
}
if (trnman_rollback_trn(trn))
return 1;
/* We could want to span a few threads (4?) instead of 1 */
......
......@@ -38,7 +38,7 @@ static uint insert_count, update_count, remove_count;
static uint pack_keys=0, pack_seg=0, key_length;
static uint unique_key=HA_NOSAME;
static my_bool pagecacheing, null_fields, silent, skip_update, opt_unique,
verbose, skip_delete, transactional;
verbose, skip_delete, transactional, die_in_middle_of_transaction;
static MARIA_COLUMNDEF recinfo[4];
static MARIA_KEYDEF keyinfo[10];
static HA_KEYSEG keyseg[10];
......@@ -50,6 +50,19 @@ static void create_key(char *key,uint rownr);
static void create_record(char *record,uint rownr);
static void update_record(char *record);
/*
These are here only for testing of recovery with undo. We are not
including maria_def.h here as this test is also to be an example of
how to use maria outside of the maria directory
*/
extern int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index,
enum flush_type flush_type_for_data,
enum flush_type flush_type_for_index);
#define MARIA_FLUSH_DATA 1
int main(int argc,char *argv[])
{
MY_INIT(argv[0]);
......@@ -86,6 +99,9 @@ static int run_test(const char *filename)
MARIA_UNIQUEDEF uniquedef;
MARIA_CREATE_INFO create_info;
if (die_in_middle_of_transaction)
null_fields= 1;
bzero((char*) recinfo,sizeof(recinfo));
bzero((char*) &create_info,sizeof(create_info));
......@@ -198,6 +214,9 @@ static int run_test(const char *filename)
printf("J= %2d maria_write: %d errno: %d\n", j,error,my_errno);
}
if (maria_commit(file) || maria_begin(file))
goto err;
/* Insert 2 rows with null values */
if (null_fields)
{
......@@ -215,6 +234,17 @@ static int run_test(const char *filename)
flags[0]=2;
}
if (die_in_middle_of_transaction)
{
/*
Ensure we get changed pages and log to disk
As commit record is not done, the undo entries needs to be rolled back.
*/
_ma_flush_table_files(file, MARIA_FLUSH_DATA, FLUSH_RELEASE,
FLUSH_RELEASE);
exit(1);
}
if (!skip_update)
{
if (opt_unique)
......@@ -627,6 +657,11 @@ static struct my_option my_long_options[] =
(uchar**) &skip_delete, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"skip-update", 'D', "Don't test updates", (uchar**) &skip_update,
(uchar**) &skip_update, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"test-undo", 'A',
"Abort hard after doing inserts. Used for testing recovery with undo",
(uchar**) &die_in_middle_of_transaction,
(uchar**) &die_in_middle_of_transaction,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"transactional", 'T',
"Test in transactional mode. (Only works with block format)",
(uchar**) &transactional, (uchar**) &transactional, 0, GET_BOOL, NO_ARG,
......
......@@ -93,8 +93,7 @@ int main(int argc, char **argv)
*/
fprintf(stdout, "TRACE of the last maria_read_log\n");
/* Until we have UNDO records, no UNDO phase */
if (maria_apply_log(lsn, opt_display_and_apply, stdout, FALSE))
if (maria_apply_log(lsn, opt_display_and_apply, stdout, TRUE))
goto err;
fprintf(stdout, "%s: SUCCESS\n", my_progname);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment