Commit 2cc2f3e7 authored by unknown's avatar unknown

Generalized the way update and redo extends the size of a directory record.


storage/maria/ma_blockrec.c:
  Generalized the way update and redo extends the size of a directory record.
  This will (for now) ensure that data files are idenitical after normal run and after a apply-log run.
storage/maria/ma_open.c:
  Disabled reservation of transid on rows (for now) as these are not yet used.
  (I had to disable this as otherwise update thougth rows had grown in size when they hadn't and we had thus different row sizes on update and redo, which caused different block information)
storage/maria/ma_test1.c:
  Added comment
storage/maria/ma_test2.c:
  Do commit on error/abort
storage/maria/ma_test_all.sh:
  Some more testing (to cover a bug that was not found in previous runs)
storage/maria/ma_test_recovery:
  More tests
parent f7b766c0
......@@ -504,11 +504,12 @@ void _ma_end_block_record(MARIA_HA *info)
****************************************************************************/
/*
Return the next used uchar on the page after a directory entry.
Return the next unused postion on the page after a directory entry.
SYNOPSIS
start_of_next_entry()
dir Directory entry to be used
dir Directory entry to be used. This can not be the
the last entry on the page!
RETURN
# Position in page where next entry starts.
......@@ -530,6 +531,20 @@ static inline uint start_of_next_entry(uchar *dir)
}
/*
Return the offset where the previous entry ends (before on page)
SYNOPSIS
end_of_previous_entry()
dir Address for current directory entry
end Address to last directory entry
RETURN
# Position where previous entry ends (smallest address on page)
Everything between # and current entry are free to be used.
*/
static inline uint end_of_previous_entry(uchar *dir, uchar *end)
{
uchar *pos;
......@@ -537,14 +552,108 @@ static inline uint end_of_previous_entry(uchar *dir, uchar *end)
{
uint offset;
if ((offset= uint2korr(pos)))
{
return offset + uint2korr(pos+2);
}
}
return PAGE_HEADER_SIZE;
}
/**
@brief Extend a record area to fit a given size block
@fn extend_area_on_page()
@param buff Page buffer
@param dir Pointer to dir entry in buffer
@param rownr Row number we working on
@param block_size Block size of buffer
@param request_length How much data we want to put at [dir]
@param empty_space Total empty space in buffer
IMPLEMENTATION
The logic is as follows (same as in _ma_update_block_record())
- If new data fits in old block, use old block.
- Extend block with empty space before block. If enough, use it.
- Extend block with empty space after block. If enough, use it.
- Use compact_page() to get all empty space at dir.
RETURN
@retval 0 ok
@retval ret_offset Pointer to store offset to found area
@retval ret_length Pointer to store length of found area
@retval [dir] rec_offset is store here too
@retval 1 error (wrong info in block)
*/
static my_bool extend_area_on_page(uchar *buff, uchar *dir,
uint rownr, uint block_size,
uint request_length,
uint *empty_space, uint *ret_offset,
uint *ret_length)
{
uint rec_offset, length;
DBUG_ENTER("extend_area_on_page");
rec_offset= uint2korr(dir);
length= uint2korr(dir + 2);
DBUG_PRINT("enter", ("rec_offset: %u length: %u request_length: %u",
rec_offset, length, request_length));
*empty_space+= length;
if (length < request_length)
{
uint max_entry= (uint) ((uchar*) buff)[DIR_COUNT_OFFSET];
uint old_rec_offset;
/*
New data did not fit in old position.
Find first possible position where to put new data.
*/
old_rec_offset= rec_offset;
rec_offset= end_of_previous_entry(dir, buff + block_size -
PAGE_SUFFIX_SIZE);
length+= (uint) (old_rec_offset - rec_offset);
/*
old_rec_offset is 0 if we are doing an insert into a not allocated block.
This can only happen during REDO of INSERT
*/
if (!old_rec_offset || length < request_length)
{
/*
Did not fit in current block + empty space. Extend with
empty space after block.
*/
if (rownr == max_entry - 1)
{
/* Last entry; Everything is free between this and directory */
length= ((block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE * max_entry) -
rec_offset);
}
else
length= start_of_next_entry(dir) - rec_offset;
DBUG_ASSERT((int) length > 0);
if (length < request_length)
{
/* Not enough continues space, compact page to get more */
int2store(dir, rec_offset);
compact_page(buff, block_size, rownr, 1);
rec_offset= uint2korr(dir);
length= uint2korr(dir+2);
if (length < request_length)
DBUG_RETURN(1); /* Error in block */
*empty_space= length; /* All space is here */
}
}
}
int2store(dir, rec_offset);
*ret_offset= rec_offset;
*ret_length= length;
DBUG_RETURN(0);
}
/*
Check that a region is all zero
......@@ -1610,7 +1719,7 @@ static my_bool write_block_record(MARIA_HA *info,
row->head_length))
DBUG_RETURN(1);
tmp_data_used= 0; /* Either 0 or last used uchar in 'data' */
tmp_data_used= 0; /* Either 0 or last used uchar in 'data' */
tmp_data= data;
if (row_extents_in_use)
......@@ -2447,7 +2556,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
if ((org_empty_size + cur_row->head_length) >= new_row->total_length)
{
uint empty, offset, length;
uint rec_offset, length;
MARIA_BITMAP_BLOCK block;
/*
......@@ -2456,27 +2565,18 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
*/
block.org_bitmap_value= _ma_free_size_to_head_pattern(&share->bitmap,
org_empty_size);
offset= uint2korr(dir);
length= uint2korr(dir + 2);
empty= 0;
if (new_row->total_length > length)
{
/* See if there is empty space after */
if (rownr != (uint) ((uchar *) buff)[DIR_COUNT_OFFSET] - 1)
empty= start_of_next_entry(dir) - (offset + length);
if (new_row->total_length > length + empty)
{
compact_page(buff, share->block_size, rownr, 1);
org_empty_size= 0;
length= uint2korr(dir + 2);
}
}
if (extend_area_on_page(buff, dir, rownr, share->block_size,
new_row->total_length, &org_empty_size,
&rec_offset, &length))
DBUG_RETURN(1);
row_pos.buff= buff;
row_pos.rownr= rownr;
row_pos.empty_space= org_empty_size + length;
row_pos.empty_space= org_empty_size;
row_pos.dir= dir;
row_pos.data= buff + uint2korr(dir);
row_pos.length= length + empty;
row_pos.data= buff + rec_offset;
row_pos.length= length;
blocks->block= &block;
blocks->count= 1;
block.page= page;
......@@ -2545,13 +2645,13 @@ err:
0 ok
1 Page is now empty
*/
static int delete_dir_entry(uchar *buff, uint block_size, uint record_number,
uint *empty_space_res)
{
uint number_of_records= (uint) ((uchar *) buff)[DIR_COUNT_OFFSET];
uint length, empty_space;
uchar *dir;
uchar *dir, *org_dir;
DBUG_ENTER("delete_dir_entry");
#ifdef SANITY_CHECKS
......@@ -2567,9 +2667,8 @@ static int delete_dir_entry(uchar *buff, uint block_size, uint record_number,
#endif
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
dir= (buff + block_size - DIR_ENTRY_SIZE * record_number -
DIR_ENTRY_SIZE - PAGE_SUFFIX_SIZE);
dir[0]= dir[1]= 0; /* Delete entry */
org_dir= dir= (buff + block_size - DIR_ENTRY_SIZE * record_number -
DIR_ENTRY_SIZE - PAGE_SUFFIX_SIZE);
length= uint2korr(dir + 2);
if (record_number == number_of_records - 1)
......@@ -2582,21 +2681,24 @@ static int delete_dir_entry(uchar *buff, uint block_size, uint record_number,
dir+= DIR_ENTRY_SIZE;
empty_space+= DIR_ENTRY_SIZE;
} while (dir < end && dir[0] == 0 && dir[1] == 0);
if (number_of_records == 0)
{
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
*empty_space_res= block_size;
DBUG_RETURN(1);
}
buff[DIR_COUNT_OFFSET]= (uchar) number_of_records;
}
empty_space+= length;
if (number_of_records != 0)
{
/* Update directory */
int2store(buff + EMPTY_SPACE_OFFSET, empty_space);
buff[PAGE_TYPE_OFFSET]|= (uchar) PAGE_CAN_BE_COMPACTED;
*empty_space_res= empty_space;
DBUG_RETURN(0);
}
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
*empty_space_res= block_size;
DBUG_RETURN(1);
/* Update directory */
org_dir[0]= org_dir[1]= 0; org_dir[2]= org_dir[3]= 0; /* Delete entry */
int2store(buff + EMPTY_SPACE_OFFSET, empty_space);
buff[PAGE_TYPE_OFFSET]|= (uchar) PAGE_CAN_BE_COMPACTED;
*empty_space_res= empty_space;
DBUG_RETURN(0);
}
......@@ -2654,7 +2756,7 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
page_store(log_data+ FILEID_STORE_SIZE, page);
dirpos_store(log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE,
record_number);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, (head ? LOGREC_REDO_PURGE_ROW_HEAD :
......@@ -3972,7 +4074,7 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
*/
start_field_data= field_data= info->update_field_data + 4;
log_parts++;
if (memcmp(oldrec, newrec, share->base.null_bytes))
{
/* Store changed null bits */
......@@ -3993,7 +4095,7 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
if (memcmp(oldrec + column->offset, newrec + column->offset,
column->length))
{
field_data= ma_store_length(field_data,
field_data= ma_store_length(field_data,
(uint) (column - share->columndef));
field_count++;
log_parts->str= (char*) oldrec + column->offset;
......@@ -4027,7 +4129,7 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
continue; /* Both are empty; skip */
/* Store null length column */
field_data= ma_store_length(field_data,
field_data= ma_store_length(field_data,
(uint) (column - share->columndef));
field_data= ma_store_length(field_data, 0);
field_count++;
......@@ -4098,7 +4200,7 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
if (new_column_is_empty || new_column_length != old_column_length ||
memcmp(old_column_pos, new_column_pos, new_column_length))
{
field_data= ma_store_length(field_data,
field_data= ma_store_length(field_data,
(uint) (column - share->columndef));
field_data= ma_store_length(field_data, old_column_length);
field_count++;
......@@ -4133,7 +4235,7 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
SYNOPSIS
_ma_apply_redo_insert_row_head_or_tail()
info Maria handler
lsn LSN to put on page
lsn LSN to put on page
page_type HEAD_PAGE or TAIL_PAGE
header Header (without FILEID)
data Data to be put on page
......@@ -4157,11 +4259,15 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint rec_offset;
uchar *buff= info->keyread_buff, *dir;
DBUG_ENTER("_ma_apply_redo_insert_row_head_or_tail");
info->keyread_buff_used= 1;
page= page_korr(header);
rownr= dirpos_korr(header+PAGE_STORE_SIZE);
DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u data_length: %u",
(ulong) ma_recordpos(page, rownr),
(ulong) page, rownr, (uint) data_length));
if (((page + 1) * info->s->block_size) > info->state->data_file_length)
{
/*
......@@ -4222,7 +4328,7 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
dir= (buff + block_size - DIR_ENTRY_SIZE * (rownr + 1) -
PAGE_SUFFIX_SIZE);
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
if (max_entry <= rownr)
{
/* Add directory entry first in directory and data last on page */
......@@ -4248,39 +4354,15 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
}
else
{
uint length;
/*
reuse old entry. This is empty if the command was an insert and
Reuse old entry. This is empty if the command was an insert and
possible used if the command was an update.
*/
uchar *end_data;
uint rec_end;
/* Add back space if we are reusing entry */
empty_space+= uint2korr(dir+2);
/* Find first possible position where to put new data */
end_data= (buff + block_size - PAGE_SUFFIX_SIZE -
DIR_ENTRY_SIZE * max_entry);
rec_offset= end_of_previous_entry(dir, end_data);
if (rownr != max_entry -1)
rec_end= start_of_next_entry(dir);
else
rec_end= (uint) (buff - end_data);
DBUG_ASSERT(rec_end > rec_offset);
if ((uint) (rec_end - rec_offset) < data_length)
{
uint length;
/* Not enough continues space, compact page to get more */
int2store(dir, rec_offset);
compact_page(buff, block_size, rownr, 1);
rec_offset= uint2korr(dir);
length= uint2korr(dir+2);
DBUG_ASSERT(length >= data_length);
if (length < data_length)
goto err;
empty_space= length;
}
if (extend_area_on_page(buff, dir, rownr, block_size,
data_length, &empty_space,
&rec_offset, &length))
goto err;
}
}
}
......@@ -4317,7 +4399,7 @@ err:
SYNOPSIS
_ma_apply_redo_purge_row_head_or_tail()
info Maria handler
lsn LSN to put on page
lsn LSN to put on page
page_type HEAD_PAGE or TAIL_PAGE
header Header (without FILEID)
......@@ -4339,7 +4421,7 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint block_size= share->block_size;
uchar *buff= info->keyread_buff;
DBUG_ENTER("_ma_apply_redo_purge_row_head_or_tail");
info->keyread_buff_used= 1;
page= page_korr(header);
record_number= dirpos_korr(header+PAGE_STORE_SIZE);
......@@ -4405,7 +4487,7 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
uchar *buff= info->keyread_buff;
DBUG_ENTER("_ma_apply_redo_purge_blocks");
info->keyread_buff_used= 1;
info->keyread_buff_used= 1;
ranges= pagerange_korr(header);
header+= PAGERANGE_STORE_SIZE;
......
......@@ -600,7 +600,9 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
if (share->base.born_transactional)
{
share->page_type= PAGECACHE_LSN_PAGE;
#ifdef ENABLE_WHEN_WE_HAVE_TRANS_ROW_ID /* QQ */
share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE;
#endif
if (unlikely((share->state.create_rename_lsn == (LSN)ULONGLONG_MAX) &&
(open_flags & HA_OPEN_FROM_SQL_LAYER)))
{
......
......@@ -633,7 +633,7 @@ static struct my_option my_long_options[] =
0, 0, 0, 0, 0, 0},
{"unique", 'C', "Undocumented", (uchar**) &opt_unique,
(uchar**) &opt_unique, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"update-rows", 'u', "Undocumented", (uchar**) &update_count,
{"update-rows", 'u', "Max number of rows to update", (uchar**) &update_count,
(uchar**) &update_count, 0, GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0},
{"verbose", 'v', "Be more verbose", (uchar**) &verbose,
(uchar**) &verbose, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
......
......@@ -895,7 +895,11 @@ end:
if (maria_commit(file))
goto err;
if (maria_close(file))
{
file= 0;
goto err;
}
file= 0;
maria_panic(HA_PANIC_CLOSE); /* Should close log */
if (!silent)
{
......@@ -937,7 +941,11 @@ reads: %10lu\n",
err:
printf("got error: %d when using MARIA-database\n",my_errno);
if (file)
{
if (maria_commit(file))
goto err;
VOID(maria_close(file));
}
maria_end();
return(1);
} /* main */
......
......@@ -143,6 +143,10 @@ run_repair_tests()
$maria_path/maria_chk$suffix -se test2
$maria_path/maria_chk$suffix -s --parallel-recover --quick test2
$maria_path/maria_chk$suffix -se test2
$maria_path/ma_test2$suffix $silent -c $row_type
$maria_path/maria_chk$suffix -se test2
$maria_path/maria_chk$suffix -sr test2
$maria_path/maria_chk$suffix -se test2
}
run_pack_tests()
......
......@@ -21,7 +21,7 @@ echo "MARIA RECOVERY TESTS - success is if exit code is 0"
# identical to the saved original.
# Does not test the index file as we don't have logging for it yet.
for prog in "$maria_path/ma_test1 $silent -M -T --skip-update -c" "$maria_path/ma_test2 $silent -L -K -W -P -M -T -g -c"
for prog in "$maria_path/ma_test1 $silent -M -T -c" "$maria_path/ma_test2 $silent -L -K -W -P -M -T -c" "$maria_path/ma_test2 $silent -M -T -c -b"
do
rm -f maria_log.* maria_log_control
echo "TEST WITH $prog"
......@@ -36,6 +36,8 @@ do
$maria_path/maria_read_log -a > $tmp/maria_read_log_$table.txt
$maria_path/maria_chk -dvv $table > $tmp/maria_chk_message.txt 2>&1
cmp $table.MAD $tmp/$table.MAD.good
# QQ: Remove the following line when we also can recovert the index file
$maria_path/maria_chk -s -r $table
......@@ -46,7 +48,7 @@ do
echo "checksum differs for $table before and after recovery"
exit 1;
fi
# cmp $table.MAD $tmp/$table.MAD.good
# When "recovery of the table's state" is ready, we can test it like this:
# diff $tmp/maria_chk_message.good.txt $tmp/maria_chk_message.txt > $tmp/maria_chk_diff.txt || true
# if [ -s $tmp/maria_chk_diff.txt ]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment