Commit 044c4103 authored by unknown's avatar unknown

Added undo of deleted row

Added part of undo of update row
Extended ma_test1 for recovery testing
Some bug fixes


storage/maria/ha_maria.cc:
  Ignore 'state.split' in case of block records
storage/maria/ma_bitmap.c:
  Added return value for _ma_bitmap_find_place() for how much data we should put on head page
storage/maria/ma_blockrec.c:
  Added undo of deleted row.
  - Added logging of CLR_END records in write_block_record()
  - Split ma_write_init_block_record() to two functions to get better code reuse
  - Added _ma_apply_undo_row_delete()
  - Added ma_get_length()
  
  Added 'empty' prototype for undo_row_update()
  
  Fixed bug when moving data withing a head/tail page.
  Fixed bug when reading a page with bigger LSN but of different type than was expected.
  Store undo_lsn first in CLR_END record
  
  Simplified some code by adding local variables.
  Changed log format for UNDO_ROW_DELETE to store total length of used blobs
storage/maria/ma_blockrec.h:
  Added prototypes for undo code.
storage/maria/ma_pagecache.c:
  Allow plain page to change to LSN page (needed in recovery to apply UNDO)
storage/maria/ma_recovery.c:
  Added undo handling of UNDO_ROW_DELETE and UNDO_ROW_UPDATE
storage/maria/ma_test1.c:
  Extended --test-undo option to allow us to die after insert or after delete.
  Fixed bug in printing key values when using -v
storage/maria/maria_def.h:
  Moved some variables around to be getter alignment
  Added length_buff buffer to be used during undo handling
parent b4b0bf47
......@@ -1219,7 +1219,9 @@ int ha_maria::repair(THD *thd, HA_CHECK &param, bool do_optimize)
}
if (!do_optimize ||
((file->state->del || share->state.split != file->state->records) &&
((file->state->del ||
((file->s->data_file_type != BLOCK_RECORD) &&
share->state.split != file->state->records)) &&
(!(param.testflag & T_QUICK) ||
(share->state.changed & (STATE_NOT_OPTIMIZED_KEYS |
STATE_NOT_OPTIMIZED_ROWS)))))
......
......@@ -1421,6 +1421,8 @@ static my_bool write_rest_of_head(MARIA_HA *info, uint position,
RETURN
0 ok
row->space_on_head_page contains minimum number of bytes we
expect to put on the head page.
1 error
*/
......@@ -1457,6 +1459,7 @@ my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row,
position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1;
if (find_head(info, (uint) row->total_length, position))
goto abort;
row->space_on_head_page= row->total_length;
goto end;
}
......@@ -1474,6 +1477,7 @@ my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row,
position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1;
if (find_head(info, head_length, position))
goto abort;
row->space_on_head_page= head_length;
goto end;
}
......@@ -1490,6 +1494,7 @@ my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row,
position= ELEMENTS_RESERVED_FOR_MAIN_PART -2; /* Only head and tail */
if (find_head(info, row_length, position))
goto abort;
row->space_on_head_page= row_length;
rest_length= head_length - row_length;
if (write_rest_of_head(info, position, rest_length))
goto abort;
......
......@@ -449,7 +449,7 @@ my_bool _ma_init_block_record(MARIA_HA *info)
&info->log_row_parts,
sizeof(*info->log_row_parts) *
(TRANSLOG_INTERNAL_PARTS + 2 +
info->s->base.fields + 2),
info->s->base.fields + 3),
&info->update_field_data,
(info->s->base.fields * 4 +
info->s->base.max_field_lengths + 1 + 4),
......@@ -655,9 +655,6 @@ static my_bool extend_area_on_page(uchar *buff, uchar *dir,
}
/*
Check that a region is all zero
......@@ -726,6 +723,23 @@ void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn)
}
#ifdef NOT_YET_NEEDED
/* Calculate empty space on a page */
static uint empty_space_on_page(uchar *buff, uint block_size)
{
enum en_page_type;
page_type= (enum en_page_type) (buff[PAGE_TYPE_OFFSET] &
~(uchar) PAGE_CAN_BE_COMPACTED);
if (page_type == UNALLOCATED_PAGE)
return block_size;
if ((uint) page_type <= TAIL_PAGE)
return uint2korr(buff+EMPTY_SPACE_OFFSET);
return 0; /* Blob page */
}
#endif
/*
Find free position in directory
......@@ -917,8 +931,8 @@ static void calc_record_size(MARIA_HA *info, const uchar *record,
{
uint length, field_length_data_length;
const uchar *field_pos= record + column->offset;
/* 256 is correct as this includes the length uchar */
/* 256 is correct as this includes the length uchar */
field_length_data[0]= field_pos[0];
if (column->length <= 256)
{
......@@ -1228,17 +1242,21 @@ static my_bool get_head_or_tail_page(MARIA_HA *info,
if (res->length < length)
{
if (res->empty_space + res->length < length)
if (res->empty_space + res->length >= length)
{
compact_page(res->buff, block_size, res->rownr, 1);
/* All empty space are now after current position */
dir= (res->buff + block_size - DIR_ENTRY_SIZE * res->rownr -
PAGE_SUFFIX_SIZE);
DIR_ENTRY_SIZE - PAGE_SUFFIX_SIZE);
res->length= res->empty_space= uint2korr(dir+2);
}
if (res->length < length)
{
DBUG_PRINT("error", ("length: %u res->length: %u empty_space: %u",
length, res->length, res->empty_space));
goto crashed; /* Wrong bitmap information */
}
}
res->dir= dir;
res->data= res->buff + uint2korr(dir);
}
......@@ -1631,6 +1649,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
/**
@brief Write a record to a (set of) pages
@fn write_block_record()
@param info Maria handler
@param old_record Original record in case of update; NULL in case of
insert
......@@ -1640,6 +1659,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
@param map_blocks On which pages the record should be stored
@param row_pos Position on head page where to put head part of
record
@param undo_lsn <> 0 if we are in UNDO
@note
On return all pinned pages are released.
......@@ -1654,7 +1674,8 @@ static my_bool write_block_record(MARIA_HA *info,
MARIA_ROW *row,
MARIA_BITMAP_BLOCKS *bitmap_blocks,
my_bool head_block_is_read,
struct st_row_pos_info *row_pos)
struct st_row_pos_info *row_pos,
LSN undo_lsn)
{
uchar *data, *end_of_data, *tmp_data_used, *tmp_data;
uchar *row_extents_first_part, *row_extents_second_part;
......@@ -1862,7 +1883,7 @@ static my_bool write_block_record(MARIA_HA *info,
{
/* Update page directory */
uint length= (uint) (data - row_pos->data);
DBUG_PRINT("info", ("head length: %u", length));
DBUG_PRINT("info", ("Used head length on page: %u", length));
if (length < info->s->base.min_row_length)
{
uint diff_length= info->s->base.min_row_length - length;
......@@ -2256,12 +2277,31 @@ static my_bool write_block_record(MARIA_HA *info,
goto disk_err;
}
/* Write UNDO record */
/* Write UNDO or CLR record */
lsn= 0;
if (share->now_transactional)
{
LEX_STRING *log_array= info->log_row_parts;
if (undo_lsn)
{
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE];
/* undo_lsn must be first for compression to work */
lsn_store(log_data, undo_lsn);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data+ FILEID_STORE_SIZE))
goto disk_err;
}
else
{
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE];
LEX_STRING *log_array= info->log_row_parts;
/* LOGREC_UNDO_ROW_INSERT & LOGREC_UNDO_ROW_INSERT share same header */
lsn_store(log_data, info->trn->undo_lsn);
......@@ -2299,8 +2339,8 @@ static my_bool write_block_record(MARIA_HA *info,
goto disk_err;
}
}
_ma_unpin_all_pages(info, info->trn->undo_lsn);
}
_ma_unpin_all_pages(info, lsn);
if (tmp_data_used)
{
......@@ -2379,7 +2419,49 @@ disk_err:
/*
Write a record (to get the row id for it)
@brief Write a record
@fn allocate_and_write_block_record()
@param info Maria handler
@param record Record to write
@param row Information about fields in 'record'
@param undo_lsn <> 0 if in undo
@return
@retval 0 ok
@retval 1 Error
*/
static my_bool allocate_and_write_block_record(MARIA_HA *info,
const uchar *record,
MARIA_ROW *row,
LSN undo_lsn)
{
struct st_row_pos_info row_pos;
MARIA_BITMAP_BLOCKS *blocks= &row->insert_blocks;
DBUG_ENTER("allocate_and_write_block_record");
if (_ma_bitmap_find_place(info, row, blocks))
DBUG_RETURN(1); /* Error reading bitmap */
/* page will be pinned & locked by get_head_or_tail_page */
if (get_head_or_tail_page(info, blocks->block, info->buff,
row->space_on_head_page, HEAD_PAGE,
PAGECACHE_LOCK_WRITE, &row_pos))
DBUG_RETURN(1);
row->lastpos= ma_recordpos(blocks->block->page, row_pos.rownr);
if (info->s->calc_checksum)
row->checksum= (info->s->calc_checksum)(info,record);
if (write_block_record(info, (uchar*) 0, record, row,
blocks, blocks->block->org_bitmap_value != 0,
&row_pos, undo_lsn))
DBUG_RETURN(1); /* Error reading bitmap */
DBUG_PRINT("exit", ("Rowid: %lu", (ulong) row->lastpos));
DBUG_RETURN(0);
}
/*
Write a record and return rowid for it
SYNOPSIS
_ma_write_init_block_record()
......@@ -2397,27 +2479,11 @@ disk_err:
MARIA_RECORD_POS _ma_write_init_block_record(MARIA_HA *info,
const uchar *record)
{
MARIA_BITMAP_BLOCKS *blocks= &info->cur_row.insert_blocks;
struct st_row_pos_info row_pos;
DBUG_ENTER("_ma_write_init_block_record");
calc_record_size(info, record, &info->cur_row);
if (_ma_bitmap_find_place(info, &info->cur_row, blocks))
DBUG_RETURN(HA_OFFSET_ERROR); /* Error reading bitmap */
/* page will be pinned & locked by get_head_or_tail_page */
if (get_head_or_tail_page(info, blocks->block, info->buff,
info->s->base.min_row_length, HEAD_PAGE,
PAGECACHE_LOCK_WRITE, &row_pos))
if (allocate_and_write_block_record(info, record, &info->cur_row, 0))
DBUG_RETURN(HA_OFFSET_ERROR);
info->cur_row.lastpos= ma_recordpos(blocks->block->page, row_pos.rownr);
if (info->s->calc_checksum)
info->cur_row.checksum= (info->s->calc_checksum)(info,record);
if (write_block_record(info, (uchar*) 0, record, &info->cur_row,
blocks, blocks->block->org_bitmap_value != 0,
&row_pos))
DBUG_RETURN(HA_OFFSET_ERROR); /* Error reading bitmap */
DBUG_PRINT("exit", ("Rowid: %lu", (ulong) info->cur_row.lastpos));
info->s->state.split++;
DBUG_RETURN(info->cur_row.lastpos);
}
......@@ -2603,7 +2669,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
if (cur_row->extents_count && free_full_pages(info, cur_row))
goto err;
DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks,
1, &row_pos));
1, &row_pos, 0));
}
/*
Allocate all size in block for record
......@@ -2636,7 +2702,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
row_pos.data= buff + uint2korr(dir);
row_pos.length= head_length;
DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, 1,
&row_pos));
&row_pos, 0));
err:
_ma_unpin_all_pages(info, 0);
......@@ -3218,6 +3284,7 @@ static my_bool read_long_data(MARIA_HA *info, uchar *to, ulong length,
cur_row.extents_counts contains number of extents
cur_row.empty_bits is set to empty bits
cur_row.field_lengths contains packed length of all fields
cur_row.blob_length contains total length of all blobs.
RETURN
0 ok
......@@ -3233,6 +3300,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
my_bool found_blob= 0;
MARIA_EXTENT_CURSOR extent;
MARIA_COLUMNDEF *column, *end_column;
MARIA_ROW *cur_row= &info->cur_row;
DBUG_ENTER("_ma_read_block_record2");
LINT_INIT(field_length_data);
......@@ -3242,8 +3310,9 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
flag= (uint) (uchar) data[0];
cur_null_bytes= share->base.original_null_bytes;
null_bytes= share->base.null_bytes;
info->cur_row.head_length= (uint) (end_of_data - data);
info->cur_row.full_page_count= info->cur_row.tail_count= 0;
cur_row->head_length= (uint) (end_of_data - data);
cur_row->full_page_count= cur_row->tail_count= 0;
cur_row->blob_length= 0;
/* Skip trans header (for now, until we have MVCC csupport) */
data+= total_header_size[(flag & PRECALC_HEADER_BITMASK)];
......@@ -3259,22 +3328,22 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
Get number of extents and first extent
*/
get_key_length(row_extents, data);
info->cur_row.extents_count= row_extents;
cur_row->extents_count= row_extents;
row_extent_size= row_extents * ROW_EXTENT_SIZE;
if (info->cur_row.extents_buffer_length < row_extent_size &&
_ma_alloc_buffer(&info->cur_row.extents,
&info->cur_row.extents_buffer_length,
if (cur_row->extents_buffer_length < row_extent_size &&
_ma_alloc_buffer(&cur_row->extents,
&cur_row->extents_buffer_length,
row_extent_size))
DBUG_RETURN(my_errno);
memcpy(info->cur_row.extents, data, ROW_EXTENT_SIZE);
memcpy(cur_row->extents, data, ROW_EXTENT_SIZE);
data+= ROW_EXTENT_SIZE;
init_extent(&extent, info->cur_row.extents, row_extents,
info->cur_row.tail_positions);
init_extent(&extent, cur_row->extents, row_extents,
cur_row->tail_positions);
}
else
{
info->cur_row.extents_count= 0;
(*info->cur_row.tail_positions)= 0;
cur_row->extents_count= 0;
(*cur_row->tail_positions)= 0;
extent.page_count= 0;
extent.extent_count= 1;
}
......@@ -3284,7 +3353,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
if (share->base.max_field_lengths)
{
get_key_length(field_lengths, data);
info->cur_row.field_lengths_length= field_lengths;
cur_row->field_lengths_length= field_lengths;
#ifdef SANITY_CHECKS
if (field_lengths > share->base.max_field_lengths)
goto err;
......@@ -3292,7 +3361,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
}
if (share->calc_checksum)
info->cur_row.checksum= (uint) (uchar) *data++;
cur_row->checksum= (uint) (uchar) *data++;
/* data now points on null bits */
memcpy(record, data, cur_null_bytes);
if (unlikely(cur_null_bytes != null_bytes))
......@@ -3305,7 +3374,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
}
data+= null_bytes;
/* We copy the empty bits to be able to use them for delete/update */
memcpy(info->cur_row.empty_bits, data, share->base.pack_bytes);
memcpy(cur_row->empty_bits, data, share->base.pack_bytes);
data+= share->base.pack_bytes;
/* TODO: Use field offsets, instead of just skipping them */
......@@ -3313,11 +3382,11 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
/*
Read row extents (note that first extent was already read into
info->cur_row.extents above)
cur_row->extents above)
*/
if (row_extents > 1)
{
if (read_long_data(info, info->cur_row.extents + ROW_EXTENT_SIZE,
if (read_long_data(info, cur_row->extents + ROW_EXTENT_SIZE,
(row_extents - 1) * ROW_EXTENT_SIZE,
&extent, &data, &end_of_data))
DBUG_RETURN(my_errno);
......@@ -3342,7 +3411,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
/* Read array of field lengths. This may be stored in several extents */
if (field_lengths)
{
field_length_data= info->cur_row.field_lengths;
field_length_data= cur_row->field_lengths;
if (read_long_data(info, field_length_data, field_lengths, &extent,
&data, &end_of_data))
DBUG_RETURN(my_errno);
......@@ -3356,7 +3425,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
uchar *field_pos= record + column->offset;
/* First check if field is present in record */
if ((record[column->null_pos] & column->null_bit) ||
(info->cur_row.empty_bits[column->empty_pos] & column->empty_bit))
(cur_row->empty_bits[column->empty_pos] & column->empty_bit))
{
if (type == FIELD_SKIP_ENDSPACE)
bfill(record + column->offset, column->length, ' ');
......@@ -3432,13 +3501,14 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
{
uint size_length;
if ((record[blob_field->null_pos] & blob_field->null_bit) ||
(info->cur_row.empty_bits[blob_field->empty_pos] &
(cur_row->empty_bits[blob_field->empty_pos] &
blob_field->empty_bit))
continue;
size_length= blob_field->length - portable_sizeof_char_ptr;
blob_lengths+= _ma_calc_blob_length(size_length, length_data);
length_data+= size_length;
}
cur_row->blob_length= blob_lengths;
DBUG_PRINT("info", ("Total blob length: %lu", blob_lengths));
if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
blob_lengths))
......@@ -4012,6 +4082,37 @@ uint ma_calc_length_for_store_length(ulong nr)
}
/* Retrive a stored number */
static ulong ma_get_length(uchar **packet)
{
reg1 uchar *pos= *packet;
if (*pos < 251)
{
(*packet)++;
return (ulong) *pos;
}
if (*pos == 251)
{
(*packet)+= 2;
return (ulong) pos[1];
}
if (*pos == 252)
{
(*packet)+= 3;
return (ulong) uint2korr(pos+1);
}
if (*pos == 253)
{
(*packet)+= 4;
return (ulong) uint3korr(pos+1);
}
DBUG_ASSERT(*pos == 254);
(*packet)+= 5;
return (ulong) uint4korr(pos+1);
}
/*
Fill array with pointers to field parts to be stored in log for insert
......@@ -4058,7 +4159,7 @@ static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record,
if (share->base.max_field_lengths)
{
/* Store field lenghts, with a prefix of number of bytes */
/* Store length of all not empty char, varchar and blob fields */
log_parts->str= field_lengths-2;
log_parts->length= info->cur_row.field_lengths_length+2;
int2store(log_parts->str, info->cur_row.field_lengths_length);
......@@ -4066,6 +4167,17 @@ static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record,
log_parts++;
}
if (share->base.blobs)
{
/* Store total blob length to make buffer allocation easier during undo */
log_parts->str= info->length_buff;
log_parts->length= (uint) (ma_store_length(log_parts->str,
info->cur_row.blob_length) -
(uchar*) log_parts->str);
row_length+= log_parts->length;
log_parts++;
}
/* Handle constant length fields that are always present */
for (column= share->columndef,
end_column= column+ share->base.fixed_not_null_fields;
......@@ -4574,14 +4686,18 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
{
MARIA_SHARE *share= info->s;
ulonglong page;
uint record_number, empty_space;
uint rownr, empty_space;
uint block_size= share->block_size;
uchar *buff= info->keyread_buff;
DBUG_ENTER("_ma_apply_redo_purge_row_head_or_tail");
info->keyread_buff_used= 1;
page= page_korr(header);
record_number= dirpos_korr(header+PAGE_STORE_SIZE);
rownr= dirpos_korr(header+PAGE_STORE_SIZE);
DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u",
(ulong) ma_recordpos(page, rownr),
(ulong) page, rownr));
info->keyread_buff_used= 1;
if (!(buff= pagecache_read(share->pagecache,
&info->dfile,
......@@ -4589,18 +4705,27 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
DBUG_RETURN(my_errno);
DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == (uchar) page_type);
if (lsn_korr(buff) >= lsn)
{
/* Already applied */
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
/*
Already applied
Note that in case the page is not anymore a head or tail page
a future redo will fix the bitmap.
*/
if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type)
{
empty_space= uint2korr(buff+EMPTY_SPACE_OFFSET);
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE,
empty_space))
DBUG_RETURN(my_errno);
}
DBUG_RETURN(0);
}
if (delete_dir_entry(buff, block_size, record_number, &empty_space) < 0)
DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == (uchar) page_type);
if (delete_dir_entry(buff, block_size, rownr, &empty_space) < 0)
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
lsn_store(buff, lsn);
......@@ -4698,7 +4823,7 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header)
{
ulonglong page;
uint record_number;
uint rownr;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE], *buff;
my_bool res= 1;
......@@ -4707,9 +4832,8 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
DBUG_ENTER("_ma_apply_undo_row_insert");
page= page_korr(header);
record_number= dirpos_korr(header + PAGE_STORE_SIZE);
DBUG_PRINT("enter", ("Page: %lu record_number: %u", (ulong) page,
record_number));
rownr= dirpos_korr(header + PAGE_STORE_SIZE);
DBUG_PRINT("enter", ("Page: %lu rownr: %u", (ulong) page, rownr));
if (!(buff= pagecache_read(info->s->pagecache,
&info->dfile, page, 0,
......@@ -4722,24 +4846,25 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
push_dynamic(&info->pinned_pages, (void*) &page_link);
if (read_row_extent_info(info, buff, record_number))
if (read_row_extent_info(info, buff, rownr))
DBUG_RETURN(1);
if (delete_head_or_tail(info, page, record_number, 1, 1) ||
if (delete_head_or_tail(info, page, rownr, 1, 1) ||
delete_tails(info, info->cur_row.tail_positions))
goto err;
if (info->cur_row.extents && free_full_pages(info, &info->cur_row))
goto err;
lsn_store(log_data + FILEID_STORE_SIZE, undo_lsn);
/* undo_lsn must be first for compression to work */
lsn_store(log_data, undo_lsn);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data))
log_data+ FILEID_STORE_SIZE))
goto err;
info->s->state.state.records--;
......@@ -4748,3 +4873,183 @@ err:
_ma_unpin_all_pages(info, lsn);
DBUG_RETURN(res);
}
/* Execute undo of a row delete (insert the row back somewhere) */
my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
const uchar *header, size_t length)
{
uchar *record;
const uchar *null_bits, *field_length_data;
MARIA_SHARE *share= info->s;
MARIA_ROW row;
uint *null_field_lengths;
ulong *blob_lengths;
MARIA_COLUMNDEF *column, *end_column;
DBUG_ENTER("_ma_apply_undo_row_delete");
/*
Use cur row as a base; We need to make a copy as we will change
some buffers to point directly to 'header'
*/
memcpy(&row, &info->cur_row, sizeof(row));
null_field_lengths= row.null_field_lengths;
blob_lengths= row.blob_lengths;
/*
Fill in info->cur_row with information about the row, like in
calc_record_size(), to be used by write_block_record()
*/
row.normal_length= row.char_length= row.varchar_length=
row.blob_length= row.extents_count= row.field_lengths_length= 0;
null_bits= header;
header+= share->base.null_bytes;
row.empty_bits= (uchar*) header;
header+= share->base.pack_bytes;
if (share->base.max_field_lengths)
{
row.field_lengths_length= uint2korr(header);
row.field_lengths= (uchar*) header + 2 ;
header+= 2 + row.field_lengths_length;
}
if (share->base.blobs)
row.blob_length= ma_get_length((uchar**) &header);
/* We need to build up a record (without blobs) in rec_buff */
if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
length - row.blob_length))
DBUG_RETURN(1);
record= info->rec_buff;
memcpy(record, null_bits, share->base.null_bytes);
/* Copy field information from header to record */
/* Handle constant length fields that are always present */
for (column= share->columndef,
end_column= column+ share->base.fixed_not_null_fields;
column < end_column;
column++)
{
memcpy(record + column->offset, header, column->length);
header+= column->length;
}
/* Handle NULL fields and CHAR/VARCHAR fields */
field_length_data= row.field_lengths;
for (end_column= share->columndef + share->base.fields;
column < end_column;
column++, null_field_lengths++)
{
if ((record[column->null_pos] & column->null_bit) ||
row.empty_bits[column->empty_pos] & column->empty_bit)
{
if (column->type != FIELD_BLOB)
*null_field_lengths= 0;
else
*blob_lengths++= 0;
if (share->calc_checksum)
bzero(record + column->offset, column->length);
continue;
}
switch ((enum en_fieldtype) column->type) {
case FIELD_CHECK:
case FIELD_NORMAL: /* Fixed length field */
case FIELD_ZERO:
case FIELD_SKIP_PRESPACE: /* Not packed */
case FIELD_SKIP_ZERO: /* Fixed length field */
row.normal_length+= column->length;
*null_field_lengths= column->length;
memcpy(record + column->offset, header, column->length);
header+= column->length;
break;
case FIELD_SKIP_ENDSPACE: /* CHAR */
if (column->length <= 255)
length= (uint) *field_length_data++;
else
{
length= uint2korr(field_length_data);
field_length_data+= 2;
}
row.char_length+= length;
*null_field_lengths= length;
memcpy(record + column->offset, header, length);
if (share->calc_checksum)
bfill(record + column->offset + length, (column->length - length),
' ');
header+= length;
break;
case FIELD_VARCHAR:
{
uint length;
uchar *field_pos= record + column->offset;
/* 256 is correct as this includes the length uchar */
if (column->length <= 256)
{
field_pos[0]= *field_length_data;
length= (uint) *field_length_data++;
}
else
{
field_pos[0]= field_length_data[0];
field_pos[1]= field_length_data[1];
length= uint2korr(field_length_data);
field_length_data+= 2;
}
row.varchar_length+= length;
*null_field_lengths= length;
memcpy(record + column->offset, header, length);
header+= length;
break;
}
case FIELD_BLOB:
{
/* Copy length of blob and pointer to blob data to record */
uchar *field_pos= record + column->offset;
uint size_length= column->length - portable_sizeof_char_ptr;
ulong blob_length= _ma_calc_blob_length(size_length, field_length_data);
memcpy(field_pos, field_length_data, size_length);
field_length_data+= size_length;
memcpy(field_pos + size_length, &header, sizeof(&header));
header+= blob_length;
*blob_lengths++= blob_length;
row.blob_length+= blob_length;
break;
}
default:
DBUG_ASSERT(0);
}
}
row.head_length= (row.base_length +
share->base.fixed_not_null_fields_length +
row.field_lengths_length +
size_to_store_key_length(row.field_lengths_length) +
row.normal_length +
row.char_length + row.varchar_length);
row.total_length= (row.head_length + row.blob_length);
if (row.total_length < share->base.min_row_length)
row.total_length= share->base.min_row_length;
/* Row is now up to date. Time to insert the record */
DBUG_RETURN(allocate_and_write_block_record(info, record, &row, undo_lsn));
}
/* Execute undo of a row update */
my_bool _ma_apply_undo_row_update(MARIA_HA *info __attribute__ ((unused)),
LSN undo_lsn __attribute__ ((unused)),
const uchar *header __attribute__ ((unused)),
size_t length __attribute__ ((unused)))
{
DBUG_ENTER("_ma_apply_undo_row_update");
fprintf(stderr, "Undo of row update is not yet done\n");
exit(1);
DBUG_RETURN(0);
}
......@@ -189,3 +189,7 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info, LSN lsn,
const uchar *header);
my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header);
my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
const uchar *header, size_t length);
my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
const uchar *header, size_t length);
......@@ -3258,7 +3258,9 @@ restart:
DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
block->type == PAGECACHE_READ_UNKNOWN_PAGE ||
block->type == type);
block->type == type ||
(block->type == PAGECACHE_PLAIN_PAGE &&
type == PAGECACHE_LSN_PAGE));
block->type= type;
if (make_lock_and_pin(pagecache, block,
......
......@@ -76,6 +76,8 @@ prototype_redo_exec_hook(UNDO_ROW_UPDATE);
prototype_redo_exec_hook(UNDO_ROW_PURGE);
prototype_redo_exec_hook(COMMIT);
prototype_undo_exec_hook(UNDO_ROW_INSERT);
prototype_undo_exec_hook(UNDO_ROW_DELETE);
prototype_undo_exec_hook(UNDO_ROW_UPDATE);
static int run_redo_phase(LSN lsn, my_bool apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
......@@ -977,6 +979,88 @@ prototype_undo_exec_hook(UNDO_ROW_INSERT)
}
prototype_undo_exec_hook(UNDO_ROW_DELETE)
{
my_bool error;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
return 1;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
fprintf(tracef, "Failed to read record\n");
return 1;
}
/* Set undo to point to previous undo record */
info->trn= trn;
info->trn->undo_lsn= lsn_korr(rec->header);
/*
For now we skip the page and directory entry. This is to be used
later when we mark rows as deleted.
*/
error= _ma_apply_undo_row_delete(info, rec->lsn,
log_record_buffer.str + LSN_STORE_SIZE +
FILEID_STORE_SIZE + PAGE_STORE_SIZE +
DIRPOS_STORE_SIZE,
rec->record_length -
(LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE));
info->trn= 0;
return error;
}
prototype_undo_exec_hook(UNDO_ROW_UPDATE)
{
my_bool error;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
return 1;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
fprintf(tracef, "Failed to read record\n");
return 1;
}
/* Set undo to point to previous undo record */
info->trn= trn;
info->trn->undo_lsn= lsn_korr(rec->header);
/*
For now we skip the page and directory entry. This is to be used
later when we mark rows as deleted.
*/
error= _ma_apply_undo_row_update(info, rec->lsn,
log_record_buffer.str + LSN_STORE_SIZE +
FILEID_STORE_SIZE + PAGE_STORE_SIZE +
DIRPOS_STORE_SIZE,
rec->record_length -
(LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE));
info->trn= 0;
return error;
}
static int run_redo_phase(LSN lsn, my_bool apply)
{
/* install hooks for execution */
......@@ -1003,6 +1087,8 @@ static int run_redo_phase(LSN lsn, my_bool apply)
install_redo_exec_hook(UNDO_ROW_PURGE);
install_redo_exec_hook(COMMIT);
install_undo_exec_hook(UNDO_ROW_INSERT);
install_undo_exec_hook(UNDO_ROW_DELETE);
install_undo_exec_hook(UNDO_ROW_UPDATE);
current_group_end_lsn= LSN_IMPOSSIBLE;
......
......@@ -37,8 +37,9 @@ static enum data_file_type record_type= DYNAMIC_RECORD;
static uint insert_count, update_count, remove_count;
static uint pack_keys=0, pack_seg=0, key_length;
static uint unique_key=HA_NOSAME;
static my_bool pagecacheing, null_fields, silent, skip_update, opt_unique,
verbose, skip_delete, transactional, die_in_middle_of_transaction;
static uint die_in_middle_of_transaction;
static my_bool pagecacheing, null_fields, silent, skip_update, opt_unique;
static my_bool verbose, skip_delete, transactional;
static MARIA_COLUMNDEF recinfo[4];
static MARIA_KEYDEF keyinfo[10];
static HA_KEYSEG keyseg[10];
......@@ -94,6 +95,7 @@ static int run_test(const char *filename)
{
MARIA_HA *file;
int i,j,error,deleted,rec_length,uniques=0;
uint offset_to_key;
ha_rows found,row_count;
char record[MAX_REC_LENGTH],key[MAX_REC_LENGTH],read_record[MAX_REC_LENGTH];
MARIA_UNIQUEDEF uniquedef;
......@@ -182,6 +184,10 @@ static int run_test(const char *filename)
else
uniques=0;
offset_to_key= test(null_fields);
if (key_field == FIELD_BLOB)
offset_to_key+= 2;
if (!silent)
printf("- Creating maria file\n");
create_info.max_rows=(ulong) (rec_pointer_size ?
......@@ -234,7 +240,7 @@ static int run_test(const char *filename)
flags[0]=2;
}
if (die_in_middle_of_transaction)
if (die_in_middle_of_transaction == 1)
{
/*
Ensure we get changed pages and log to disk
......@@ -242,6 +248,7 @@ static int run_test(const char *filename)
*/
_ma_flush_table_files(file, MARIA_FLUSH_DATA, FLUSH_RELEASE,
FLUSH_RELEASE);
printf("Dying on request after insert without maria_close()\n");
exit(1);
}
......@@ -333,14 +340,14 @@ static int run_test(const char *filename)
if (verbose || (flags[j] >= 1 ||
(error && my_errno != HA_ERR_KEY_NOT_FOUND)))
printf("key: '%.*s' maria_rkey: %3d errno: %3d\n",
(int) key_length,key+test(null_fields),error,my_errno);
(int) key_length,key+offset_to_key,error,my_errno);
}
else
{
error=maria_delete(file,read_record);
if (verbose || error)
printf("key: '%.*s' maria_delete: %3d errno: %3d\n",
(int) key_length, key+test(null_fields), error, my_errno);
(int) key_length, key+offset_to_key, error, my_errno);
if (! error)
{
deleted++;
......@@ -348,6 +355,18 @@ static int run_test(const char *filename)
}
}
}
if (die_in_middle_of_transaction == 2)
{
/*
Ensure we get changed pages and log to disk
As commit record is not done, the undo entries needs to be rolled back.
*/
_ma_flush_table_files(file, MARIA_FLUSH_DATA, FLUSH_RELEASE,
FLUSH_RELEASE);
printf("Dying on request after delete without maria_close()\n");
exit(1);
}
}
if (!silent)
printf("- Reading rows with key\n");
......@@ -362,7 +381,7 @@ static int run_test(const char *filename)
(error && (flags[i] != 0 || my_errno != HA_ERR_KEY_NOT_FOUND)))
{
printf("key: '%.*s' maria_rkey: %3d errno: %3d record: %s\n",
(int) key_length,key+test(null_fields),error,my_errno,record+1);
(int) key_length,key+offset_to_key,error,my_errno,record+1);
}
}
......@@ -661,7 +680,7 @@ static struct my_option my_long_options[] =
"Abort hard after doing inserts. Used for testing recovery with undo",
(uchar**) &die_in_middle_of_transaction,
(uchar**) &die_in_middle_of_transaction,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
0, GET_INT, OPT_ARG, 0, 0, 0, 0, 0, 0},
{"transactional", 'T',
"Test in transactional mode. (Only works with block format)",
(uchar**) &transactional, (uchar**) &transactional, 0, GET_BOOL, NO_ARG,
......@@ -749,6 +768,12 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
case 'K': /* Use key cacheing */
pagecacheing=1;
break;
case 'A':
if (!argument)
die_in_middle_of_transaction= 1;
else
die_in_middle_of_transaction= atoi(argument);
break;
case 'V':
printf("test1 Ver 1.2 \n");
exit(0);
......
......@@ -373,6 +373,7 @@ typedef struct st_maria_row
uint field_lengths_length; /* Length of data in field_lengths */
uint extents_count; /* number of extents in 'extents' */
uint full_page_count, tail_count; /* For maria_chk */
uint space_on_head_page;
} MARIA_ROW;
/* Data to scan row in blocked format */
......@@ -434,6 +435,8 @@ struct st_maria_info
ulong packed_length, blob_length; /* Length of found, packed record */
size_t rec_buff_size;
PAGECACHE_FILE dfile; /* The datafile */
IO_CACHE rec_cache; /* When cacheing records */
LIST open_list;
uint opt_flag; /* Optim. for space/speed */
uint update; /* If file changed since open */
int lastinx; /* Last used index */
......@@ -449,8 +452,6 @@ struct st_maria_info
uint data_changed; /* Somebody has changed data */
uint save_update; /* When using KEY_READ */
int save_lastinx;
LIST open_list;
IO_CACHE rec_cache; /* When cacheing records */
uint preload_buff_size; /* When preloading indexes */
myf lock_wait; /* is 0 or MY_DONT_WAIT */
my_bool was_locked; /* Was locked in panic */
......@@ -468,6 +469,7 @@ struct st_maria_info
THR_LOCK_DATA lock;
#endif
uchar *maria_rtree_recursion_state; /* For RTREE */
uchar length_buff[5]; /* temp buff to store blob lengths */
int maria_rtree_recursion_depth;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment