Commit 85da5133 authored by unknown's avatar unknown

Implement applying of REDO entries for

- LOGREC_REDO_INSERT_ROW_HEAD
- LOGREC_REDO_INSERT_ROW_TAIL
- LOGREC_REDO_PURGE_ROW_HEAD
- LOGREC_REDO_PURGE_ROW_TAIL


sql/sql_yacc.yy:
  Fixed typo in previous push
storage/maria/ma_bitmap.c:
  Ensure we flush the new bitmap on close
storage/maria/ma_blockrec.c:
  Implement applying of REDO entries for
  - LOGREC_REDO_INSERT_ROW_HEAD
  - LOGREC_REDO_INSERT_ROW_TAIL
  - LOGREC_REDO_PURGE_ROW_HEAD
  - LOGREC_REDO_PURGE_ROW_TAIL
  Split some functions into subfunctions to be able to reuse code
storage/maria/ma_blockrec.h:
  Added prototypes for REDO applying functions
storage/maria/ma_loghandler.h:
  Safety fix
storage/maria/ma_loghandler_lsn.h:
  Avoid compiler warnings
storage/maria/maria_read_log.c:
  Added hocks for:
  - REDO_INSERT_ROW_HEAD
  - REDO_INSERT_ROW_TAIL
  - REDO_PURGE_ROW_HEAD
  - REDO_PURGE_ROW_TAIL
  
  Added dummy hooks for:
  - UNDO_ROW_INSERT
  - UNDO_ROW_DELETE
  
  Changed to use maria_pagecache instead of own pagecache (fixed problem with unitialized share->pagecache)
  Use maria_panic() at end to ensure that all files are closed properly.
  Fixed option handling for --debug
parent ef7a757b
...@@ -4280,7 +4280,7 @@ row_types: ...@@ -4280,7 +4280,7 @@ row_types:
| DYNAMIC_SYM { $$= ROW_TYPE_DYNAMIC; } | DYNAMIC_SYM { $$= ROW_TYPE_DYNAMIC; }
| COMPRESSED_SYM { $$= ROW_TYPE_COMPRESSED; } | COMPRESSED_SYM { $$= ROW_TYPE_COMPRESSED; }
| REDUNDANT_SYM { $$= ROW_TYPE_REDUNDANT; } | REDUNDANT_SYM { $$= ROW_TYPE_REDUNDANT; }
| COMPACT_SYM { $$= ROW_TYPE_COMPACT; }; | COMPACT_SYM { $$= ROW_TYPE_COMPACT; }
| PAGE_SYM { $$= ROW_TYPE_PAGE; }; | PAGE_SYM { $$= ROW_TYPE_PAGE; };
merge_insert_types: merge_insert_types:
......
...@@ -296,7 +296,7 @@ void _ma_bitmap_delete_all(MARIA_SHARE *share) ...@@ -296,7 +296,7 @@ void _ma_bitmap_delete_all(MARIA_SHARE *share)
{ {
bzero(bitmap->map, share->block_size); bzero(bitmap->map, share->block_size);
memcpy(bitmap->map + share->block_size - 2, maria_bitmap_marker, 2); memcpy(bitmap->map + share->block_size - 2, maria_bitmap_marker, 2);
bitmap->changed= 0; bitmap->changed= 1;
bitmap->page= 0; bitmap->page= 0;
bitmap->used_size= bitmap->total_size; bitmap->used_size= bitmap->total_size;
} }
......
...@@ -868,7 +868,7 @@ static void calc_record_size(MARIA_HA *info, const byte *record, ...@@ -868,7 +868,7 @@ static void calc_record_size(MARIA_HA *info, const byte *record,
compact_page() compact_page()
buff Page to compact buff Page to compact
block_size Size of page block_size Size of page
recnr Put empty data after this row rownr Put empty data after this row
extend_block If 1, extend the block at 'rownr' to cover the extend_block If 1, extend the block at 'rownr' to cover the
whole block. whole block.
*/ */
...@@ -980,6 +980,13 @@ static void compact_page(byte *buff, uint block_size, uint rownr, ...@@ -980,6 +980,13 @@ static void compact_page(byte *buff, uint block_size, uint rownr,
uint length= (uint) (dir - buff) - start_of_found_block; uint length= (uint) (dir - buff) - start_of_found_block;
int2store(dir+2, length); int2store(dir+2, length);
} }
else
{
/*
TODO:
Update (buff + EMPTY_SPACE_OFFSET) if we remove transid from rows
*/
}
buff[PAGE_TYPE_OFFSET]&= ~(byte) PAGE_CAN_BE_COMPACTED; buff[PAGE_TYPE_OFFSET]&= ~(byte) PAGE_CAN_BE_COMPACTED;
} }
DBUG_EXECUTE("directory", _ma_print_directory(buff, block_size);); DBUG_EXECUTE("directory", _ma_print_directory(buff, block_size););
...@@ -987,6 +994,37 @@ static void compact_page(byte *buff, uint block_size, uint rownr, ...@@ -987,6 +994,37 @@ static void compact_page(byte *buff, uint block_size, uint rownr,
} }
/*
Create an empty tail or head page
SYNOPSIS
make_empty_page()
buff Page buffer
block_size Block size
page_type HEAD_PAGE or TAIL_PAGE
NOTES
EMPTY_SPACE is not updated
*/
static void make_empty_page(byte *buff, uint block_size, uint page_type)
{
bzero(buff, PAGE_HEADER_SIZE);
/*
We zero the rest of the block to avoid getting old memory information
to disk and to allow the file to be compressed better if archived.
The rest of the code does not assume the block is zeroed above
PAGE_OVERHEAD_SIZE
*/
bzero(buff+ PAGE_HEADER_SIZE, block_size - PAGE_HEADER_SIZE);
buff[PAGE_TYPE_OFFSET]= (byte) page_type;
buff[DIR_COUNT_OFFSET]= 1;
/* Store position to the first row */
int2store(buff + block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE,
PAGE_HEADER_SIZE);
}
/* /*
Read or initialize new head or tail page Read or initialize new head or tail page
...@@ -1019,6 +1057,7 @@ struct st_row_pos_info ...@@ -1019,6 +1057,7 @@ struct st_row_pos_info
uint empty_space; /* Space left on page */ uint empty_space; /* Space left on page */
}; };
static my_bool get_head_or_tail_page(MARIA_HA *info, static my_bool get_head_or_tail_page(MARIA_HA *info,
MARIA_BITMAP_BLOCK *block, MARIA_BITMAP_BLOCK *block,
byte *buff, uint length, uint page_type, byte *buff, uint length, uint page_type,
...@@ -1035,25 +1074,12 @@ static my_bool get_head_or_tail_page(MARIA_HA *info, ...@@ -1035,25 +1074,12 @@ static my_bool get_head_or_tail_page(MARIA_HA *info,
if (block->org_bitmap_value == 0) /* Empty block */ if (block->org_bitmap_value == 0) /* Empty block */
{ {
/* New page */ /* New page */
bzero(buff, PAGE_HEADER_SIZE); make_empty_page(buff, block_size, page_type);
/*
We zero the rest of the block to avoid getting old memory information
to disk and to allow the file to be compressed better if archived.
The rest of the code does not assume the block is zeroed above
PAGE_OVERHEAD_SIZE
*/
bzero(buff+ PAGE_HEADER_SIZE, block_size - PAGE_HEADER_SIZE);
buff[PAGE_TYPE_OFFSET]= (byte) page_type;
buff[DIR_COUNT_OFFSET]= 1;
res->buff= buff; res->buff= buff;
res->empty_space= res->length= (block_size - PAGE_OVERHEAD_SIZE); res->empty_space= res->length= (block_size - PAGE_OVERHEAD_SIZE);
res->data= (buff + PAGE_HEADER_SIZE); res->data= (buff + PAGE_HEADER_SIZE);
res->dir= res->data + res->length; res->dir= res->data + res->length;
res->rownr= 0; res->rownr= 0;
/* Store position to the first row */
int2store(res->dir, PAGE_HEADER_SIZE);
DBUG_ASSERT(length <= res->length); DBUG_ASSERT(length <= res->length);
} }
else else
...@@ -1710,8 +1736,12 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -1710,8 +1736,12 @@ static my_bool write_block_record(MARIA_HA *info,
uint length= (uint) (data - row_pos->data); uint length= (uint) (data - row_pos->data);
DBUG_PRINT("info", ("head length: %u", length)); DBUG_PRINT("info", ("head length: %u", length));
if (length < info->s->base.min_row_length) if (length < info->s->base.min_row_length)
{
uint diff_length= info->s->base.min_row_length - length;
bzero(data, diff_length);
data+= diff_length;
length= info->s->base.min_row_length; length= info->s->base.min_row_length;
}
int2store(row_pos->dir + 2, length); int2store(row_pos->dir + 2, length);
/* update empty space at start of block */ /* update empty space at start of block */
row_pos->empty_space-= length; row_pos->empty_space-= length;
...@@ -2471,6 +2501,76 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, ...@@ -2471,6 +2501,76 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
} }
/*
Delete a directory entry
SYNOPSIS
delete_dir_entry()
buff Page buffer
block_size Block size
record_number Record number to delete
empty_space Empty space on page after delete
RETURN
-1 Error on page
0 ok
1 Page is now empty
*/
static int delete_dir_entry(byte *buff, uint block_size, uint record_number,
uint *empty_space_res)
{
uint number_of_records= (uint) ((uchar *) buff)[DIR_COUNT_OFFSET];
uint length, empty_space;
byte *dir;
DBUG_ENTER("delete_dir_entry");
#ifdef SANITY_CHECKS
if (record_number >= number_of_records ||
record_number > ((block_size - LSN_SIZE - PAGE_TYPE_SIZE - 1 -
PAGE_SUFFIX_SIZE) / DIR_ENTRY_SIZE))
{
DBUG_PRINT("error", ("record_number: %u number_of_records: %u",
record_number, number_of_records));
DBUG_RETURN(-1);
}
#endif
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
dir= (buff + block_size - DIR_ENTRY_SIZE * record_number -
DIR_ENTRY_SIZE - PAGE_SUFFIX_SIZE);
dir[0]= dir[1]= 0; /* Delete entry */
length= uint2korr(dir + 2);
if (record_number == number_of_records - 1)
{
/* Delete this entry and all following empty directory entries */
byte *end= buff + block_size - PAGE_SUFFIX_SIZE;
do
{
number_of_records--;
dir+= DIR_ENTRY_SIZE;
empty_space+= DIR_ENTRY_SIZE;
} while (dir < end && dir[0] == 0 && dir[1] == 0);
buff[DIR_COUNT_OFFSET]= (byte) (uchar) number_of_records;
}
empty_space+= length;
if (number_of_records != 0)
{
/* Update directory */
int2store(buff + EMPTY_SPACE_OFFSET, empty_space);
buff[PAGE_TYPE_OFFSET]|= (byte) PAGE_CAN_BE_COMPACTED;
*empty_space_res= empty_space;
DBUG_RETURN(0);
}
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
*empty_space_res= block_size;
DBUG_RETURN(1);
}
/* /*
Delete a head a tail part Delete a head a tail part
...@@ -2493,11 +2593,12 @@ static my_bool delete_head_or_tail(MARIA_HA *info, ...@@ -2493,11 +2593,12 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
my_bool head) my_bool head)
{ {
MARIA_SHARE *share= info->s; MARIA_SHARE *share= info->s;
uint number_of_records, empty_space, length; uint empty_space;
uint block_size= share->block_size; uint block_size= share->block_size;
byte *buff, *dir; byte *buff;
LSN lsn; LSN lsn;
MARIA_PINNED_PAGE page_link; MARIA_PINNED_PAGE page_link;
int res;
DBUG_ENTER("delete_head_or_tail"); DBUG_ENTER("delete_head_or_tail");
info->keyread_buff_used= 1; info->keyread_buff_used= 1;
...@@ -2511,60 +2612,30 @@ static my_bool delete_head_or_tail(MARIA_HA *info, ...@@ -2511,60 +2612,30 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK; page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
push_dynamic(&info->pinned_pages, (void*) &page_link); push_dynamic(&info->pinned_pages, (void*) &page_link);
number_of_records= (uint) ((uchar *) buff)[DIR_COUNT_OFFSET]; res= delete_dir_entry(buff, block_size, record_number, &empty_space);
#ifdef SANITY_CHECKS if (res < 0)
if (record_number >= number_of_records ||
record_number > ((block_size - LSN_SIZE - PAGE_TYPE_SIZE - 1 -
PAGE_SUFFIX_SIZE) / DIR_ENTRY_SIZE))
{
DBUG_PRINT("error", ("record_number: %u number_of_records: %u",
record_number, number_of_records));
DBUG_RETURN(1); DBUG_RETURN(1);
} if (res == 0)
#endif
dir= (buff + block_size - DIR_ENTRY_SIZE * record_number -
DIR_ENTRY_SIZE - PAGE_SUFFIX_SIZE);
dir[0]= dir[1]= 0; /* Delete entry */
length= uint2korr(dir + 2);
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
if (record_number == number_of_records - 1)
{
/* Delete this entry and all following empty directory entries */
byte *end= buff + block_size - PAGE_SUFFIX_SIZE;
do
{
number_of_records--;
dir+= DIR_ENTRY_SIZE;
empty_space+= DIR_ENTRY_SIZE;
} while (dir < end && dir[0] == 0 && dir[1] == 0);
buff[DIR_COUNT_OFFSET]= (byte) (uchar) number_of_records;
}
empty_space+= length;
if (number_of_records != 0)
{ {
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE]; uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
/* Update directory */ if (info->s->base.transactional)
int2store(buff + EMPTY_SPACE_OFFSET, empty_space); {
buff[PAGE_TYPE_OFFSET]|= (byte) PAGE_CAN_BE_COMPACTED; /* Log REDO data */
DBUG_ASSERT(share->pagecache->block_size == block_size); page_store(log_data+ FILEID_STORE_SIZE, page);
dirpos_store(log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE,
/* Log REDO data */
page_store(log_data+ FILEID_STORE_SIZE, page);
dirpos_store(log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE,
record_number); record_number);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, (head ? LOGREC_REDO_PURGE_ROW_HEAD : if (translog_write_record(&lsn, (head ? LOGREC_REDO_PURGE_ROW_HEAD :
LOGREC_REDO_PURGE_ROW_TAIL), LOGREC_REDO_PURGE_ROW_TAIL),
info->trn, share, sizeof(log_data), info->trn, share, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array, TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data)) log_data))
DBUG_RETURN(1); DBUG_RETURN(1);
}
if (pagecache_write(share->pagecache, if (pagecache_write(share->pagecache,
&info->dfile, page, 0, &info->dfile, page, 0,
buff, share->page_type, buff, share->page_type,
...@@ -2579,20 +2650,21 @@ static my_bool delete_head_or_tail(MARIA_HA *info, ...@@ -2579,20 +2650,21 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE]; PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
pagerange_store(log_data + FILEID_STORE_SIZE, 1); if (info->s->base.transactional)
page_store(log_data+ FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, page); {
pagerange_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + pagerange_store(log_data + FILEID_STORE_SIZE, 1);
PAGE_STORE_SIZE, 1); page_store(log_data+ FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, page);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; pagerange_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); PAGE_STORE_SIZE, 1);
if (translog_write_record(&lsn, LOGREC_REDO_PURGE_BLOCKS, log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
info->trn, share, sizeof(log_data), log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
TRANSLOG_INTERNAL_PARTS + 1, log_array, if (translog_write_record(&lsn, LOGREC_REDO_PURGE_BLOCKS,
log_data)) info->trn, share, sizeof(log_data),
DBUG_RETURN(1); TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data))
DBUG_RETURN(1);
}
/* Write the empty page (needed only for REPAIR to work) */ /* Write the empty page (needed only for REPAIR to work) */
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
if (pagecache_write(share->pagecache, if (pagecache_write(share->pagecache,
&info->dfile, page, 0, &info->dfile, page, 0,
buff, share->page_type, buff, share->page_type,
...@@ -4024,3 +4096,268 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const byte *oldrec, ...@@ -4024,3 +4096,268 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const byte *oldrec,
row_length+= start_log_parts->length; row_length+= start_log_parts->length;
DBUG_RETURN(row_length); DBUG_RETURN(row_length);
} }
/***************************************************************************
Applying of REDO log records
***************************************************************************/
/*
Apply LOGREC_REDO_INSERT_ROW_HEAD & LOGREC_REDO_INSERT_ROW_TAIL
SYNOPSIS
_ma_apply_redo_insert_row_head_or_tail()
info Maria handler
lsn LSN to put on page
page_type HEAD_PAGE or TAIL_PAGE
header Header (without FILEID)
data Data to be put on page
data_length Length of data
RETURN
0 ok
# Error number
*/
uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint page_type,
const byte *header,
const byte *data,
size_t data_length)
{
MARIA_SHARE *share= info->s;
ulonglong page;
uint rownr, empty_space;
uint block_size= share->block_size;
uint rec_offset;
byte *buff= info->keyread_buff, *dir;
DBUG_ENTER("_ma_apply_redo_insert_row_head");
info->keyread_buff_used= 1;
page= page_korr(header);
rownr= dirpos_korr(header+PAGE_STORE_SIZE);
if (page * info->s->block_size > info->state->data_file_length)
{
/* New page at end of file */
DBUG_ASSERT(rownr == 0);
if (rownr != 0)
goto err;
make_empty_page(buff, block_size, page_type);
empty_space= (block_size - PAGE_OVERHEAD_SIZE);
rec_offset= PAGE_HEADER_SIZE;
dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
/* Update that file is extended */
info->state->data_file_length= page * info->s->block_size;
}
else
{
uint max_entry;
if (!(buff= pagecache_read(share->pagecache,
&info->dfile,
page, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
DBUG_RETURN(my_errno);
if (lsn_korr(buff) >= lsn)
{
/* Already applied */
/* Fix bitmap, just in case */
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno);
DBUG_RETURN(0);
}
max_entry= (uint) ((uchar*) buff)[DIR_COUNT_OFFSET];
if (((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != page_type))
{
/*
This is a page that has been freed before and now should be
changed to new type.
*/
if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != BLOB_PAGE &&
(buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != UNALLOCATED_PAGE)
goto err;
make_empty_page(buff, block_size, page_type);
empty_space= (block_size - PAGE_OVERHEAD_SIZE);
rec_offset= PAGE_HEADER_SIZE;
dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
}
else
{
dir= (buff + block_size - DIR_ENTRY_SIZE * (rownr + 1) -
PAGE_SUFFIX_SIZE);
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
if (max_entry >= rownr)
{
/* Add directory entry first in directory and data last on page */
DBUG_ASSERT(max_entry == rownr);
if (max_entry != rownr)
goto err;
rec_offset= (uint2korr(dir + DIR_ENTRY_SIZE) +
uint2korr(dir + DIR_ENTRY_SIZE +2));
if ((uint) (dir - buff) < rec_offset + data_length)
{
/* Create place for directory & data */
compact_page(buff, block_size, max_entry - 1, 0);
rec_offset= (uint2korr(dir + DIR_ENTRY_SIZE) +
uint2korr(dir + DIR_ENTRY_SIZE +2));
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
DBUG_ASSERT(!((uint) (dir - buff) < rec_offset + data_length));
if ((uint) (dir - buff) < rec_offset + data_length)
goto err;
}
buff[DIR_COUNT_OFFSET]= (byte) (uchar) max_entry+1;
int2store(dir, rec_offset);
empty_space-= DIR_ENTRY_SIZE;
}
else
{
/* reuse old empty entry */
byte *pos, *end, *end_data;
DBUG_ASSERT(uint2korr(dir) == 0);
if (uint2korr(dir))
goto err; /* Should have been empty */
/* Find start of where we can put data */
end= (buff + block_size - DIR_ENTRY_SIZE * max_entry -
PAGE_SUFFIX_SIZE);
for (pos= dir ; pos >= end ; pos-= DIR_ENTRY_SIZE)
{
if ((rec_offset= uint2korr(pos)))
{
rec_offset+= uint2korr(pos+2);
break;
}
}
DBUG_ASSERT(pos >= end);
if (pos < end) /* Wrong directory */
goto err;
/* find end data */
end_data= end; /* Start of directory */
end= (buff + block_size - PAGE_SUFFIX_SIZE);
for (pos= dir ; pos < end ; pos+= DIR_ENTRY_SIZE)
{
uint offset;
if ((offset= uint2korr(pos)))
{
end_data= buff + offset;
break;
}
}
if ((uint) (end_data - (buff + rec_offset)) < data_length)
{
uint length;
/* Not enough continues space, compact page to get more */
int2store(dir, rec_offset);
compact_page(buff, block_size, rownr, 1);
rec_offset= uint2korr(dir);
length= uint2korr(dir+2);
DBUG_ASSERT(length >= data_length);
if (length < data_length)
goto err;
empty_space= length;
}
}
}
}
/* Copy data */
int2store(dir+2, data_length);
memcpy(buff + rec_offset, data, data_length);
empty_space-= data_length;
int2store(buff + EMPTY_SPACE_OFFSET, empty_space);
/* Write modified page */
lsn_store(buff, lsn);
if (pagecache_write(share->pagecache,
&info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(my_errno);
/* Fix bitmap */
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno);
DBUG_RETURN(0);
err:
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
}
/*
Apply LOGREC_REDO_PURGE_ROW_HEAD & LOGREC_REDO_PURGE_ROW_TAIL
SYNOPSIS
_ma_apply_redo_purge_row_head_or_tail()
info Maria handler
lsn LSN to put on page
page_type HEAD_PAGE or TAIL_PAGE
header Header (without FILEID)
data Data to be put on page
data_length Length of data
NOTES
This function is very similar to delete_head_or_tail()
RETURN
0 ok
# Error number
*/
uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint page_type,
const byte *header)
{
MARIA_SHARE *share= info->s;
ulonglong page;
uint record_number, empty_space;
uint block_size= share->block_size;
byte *buff= info->keyread_buff;
DBUG_ENTER("_ma_apply_redo_purge_row_head_or_tail");
info->keyread_buff_used= 1;
page= page_korr(header);
record_number= dirpos_korr(header+PAGE_STORE_SIZE);
if (!(buff= pagecache_read(share->pagecache,
&info->dfile,
page, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
DBUG_RETURN(my_errno);
DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == (byte) page_type);
if (lsn_korr(buff) >= lsn)
{
/* Already applied */
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno);
DBUG_RETURN(0);
}
if (delete_dir_entry(buff, block_size, record_number, &empty_space) < 0)
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
if (pagecache_write(share->pagecache,
&info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(my_errno);
/* This will work even if the page was marked as UNALLOCATED_PAGE */
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno);
DBUG_RETURN(0);
}
...@@ -178,3 +178,11 @@ my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info, ...@@ -178,3 +178,11 @@ my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info,
ulonglong page, ulonglong page,
uint *bitmap_pattern); uint *bitmap_pattern);
void _ma_bitmap_delete_all(MARIA_SHARE *share); void _ma_bitmap_delete_all(MARIA_SHARE *share);
uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint page_type,
const byte *header,
const byte *data,
size_t data_length);
uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint page_type,
const byte *header);
...@@ -62,7 +62,7 @@ struct st_maria_share; ...@@ -62,7 +62,7 @@ struct st_maria_share;
#define pagerange_store(T,A) int2store(T,A) #define pagerange_store(T,A) int2store(T,A)
#define fileid_korr(P) uint2korr(P) #define fileid_korr(P) uint2korr(P)
#define page_korr(P) uint5korr(P) #define page_korr(P) uint5korr(P)
#define dirpos_korr(P) (P[0]) #define dirpos_korr(P) ((P)[0])
#define pagerange_korr(P) uint2korr(P) #define pagerange_korr(P) uint2korr(P)
/* /*
......
...@@ -45,7 +45,7 @@ typedef TRANSLOG_ADDRESS LSN; ...@@ -45,7 +45,7 @@ typedef TRANSLOG_ADDRESS LSN;
#define LSN_OFFSET(L) ((L) & 0xFFFFFFFFL) #define LSN_OFFSET(L) ((L) & 0xFFFFFFFFL)
/* Makes lsn/log address from file number and record offset */ /* Makes lsn/log address from file number and record offset */
#define MAKE_LSN(F,S) ((((uint64)(F)) << 32) | (S)) #define MAKE_LSN(F,S) ((LSN) ((((uint64)(F)) << 32) | (S)))
/* checks LSN */ /* checks LSN */
#define LSN_VALID(L) \ #define LSN_VALID(L) \
......
...@@ -14,20 +14,22 @@ ...@@ -14,20 +14,22 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "maria_def.h" #include "maria_def.h"
#include <ma_blockrec.h>
#include <my_getopt.h> #include <my_getopt.h>
#define PCACHE_SIZE (1024*1024*10) #define PCACHE_SIZE (1024*1024*10)
#define LOG_FLAGS 0 #define LOG_FLAGS 0
#define LOG_FILE_SIZE (1024L*1024L) #define LOG_FILE_SIZE (1024L*1024L)
static PAGECACHE pagecache;
static const char *load_default_groups[]= { "maria_read_log",0 }; static const char *load_default_groups[]= { "maria_read_log",0 };
static void get_options(int *argc,char * * *argv); static void get_options(int *argc,char * * *argv);
#ifndef DBUG_OFF #ifndef DBUG_OFF
static const char *default_dbug_option; #if defined(__WIN__)
const char *default_dbug_option= "d:t:i:O,\\maria_read_log.trace";
#else
const char *default_dbug_option= "d:t:i:o,/tmp/maria_read_log.trace";
#endif #endif
#endif /* DBUG_OFF */
static my_bool opt_only_display, opt_display_and_apply; static my_bool opt_only_display, opt_display_and_apply;
struct TRN_FOR_RECOVERY struct TRN_FOR_RECOVERY
...@@ -55,7 +57,25 @@ prototype_exec_hook(CHECKPOINT); ...@@ -55,7 +57,25 @@ prototype_exec_hook(CHECKPOINT);
prototype_exec_hook(REDO_CREATE_TABLE); prototype_exec_hook(REDO_CREATE_TABLE);
prototype_exec_hook(FILE_ID); prototype_exec_hook(FILE_ID);
prototype_exec_hook(REDO_INSERT_ROW_HEAD); prototype_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_exec_hook(REDO_PURGE_ROW_HEAD);
prototype_exec_hook(REDO_PURGE_ROW_TAIL);
prototype_exec_hook(UNDO_ROW_INSERT);
prototype_exec_hook(UNDO_ROW_DELETE);
prototype_exec_hook(COMMIT); prototype_exec_hook(COMMIT);
/*
TODO: Avoid mallocs in exec.
Proposed fix:
Add either a context/buffer argument to all exec_hook functions
or add 'record_buffer' and 'record_buffer_length' to
TRANSLOG_HEADER_BUFFER.
With this we could use my_realloc() instead of my_malloc() to
allocate data and save some mallocs.
*/
/* /*
To implement REDO_DROP_TABLE and REDO_RENAME_TABLE, we would need to go To implement REDO_DROP_TABLE and REDO_RENAME_TABLE, we would need to go
through the all_tables[] array, find all open instances of the through the all_tables[] array, find all open instances of the
...@@ -78,19 +98,6 @@ int main(int argc, char **argv) ...@@ -78,19 +98,6 @@ int main(int argc, char **argv)
maria_data_root= "."; maria_data_root= ".";
#ifndef DBUG_OFF
#if defined(__WIN__)
default_dbug_option= "d:t:i:O,\\maria_read_log.trace";
#else
default_dbug_option= "d:t:i:o,/tmp/maria_read_log.trace";
#endif
if (argc > 1)
{
DBUG_SET(default_dbug_option);
DBUG_SET_INITIAL(default_dbug_option);
}
#endif
if (maria_init()) if (maria_init())
{ {
fprintf(stderr, "Can't init Maria engine (%d)\n", errno); fprintf(stderr, "Can't init Maria engine (%d)\n", errno);
...@@ -107,7 +114,7 @@ int main(int argc, char **argv) ...@@ -107,7 +114,7 @@ int main(int argc, char **argv)
fprintf(stderr, "Can't find any log\n"); fprintf(stderr, "Can't find any log\n");
goto err; goto err;
} }
if (init_pagecache(&pagecache, PCACHE_SIZE, 0, 0, if (init_pagecache(maria_pagecache, PCACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) TRANSLOG_PAGE_SIZE) == 0)
{ {
fprintf(stderr, "Got error in init_pagecache() (errno: %d)\n", errno); fprintf(stderr, "Got error in init_pagecache() (errno: %d)\n", errno);
...@@ -119,7 +126,7 @@ int main(int argc, char **argv) ...@@ -119,7 +126,7 @@ int main(int argc, char **argv)
But if it finds a log and this log was crashed, it will create a new log, But if it finds a log and this log was crashed, it will create a new log,
which is useless. TODO: start log handler in read-only mode. which is useless. TODO: start log handler in read-only mode.
*/ */
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, if (translog_init(".", LOG_FILE_SIZE, 50112, 0, maria_pagecache,
TRANSLOG_DEFAULT_FLAGS)) TRANSLOG_DEFAULT_FLAGS))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
...@@ -137,6 +144,11 @@ int main(int argc, char **argv) ...@@ -137,6 +144,11 @@ int main(int argc, char **argv)
install_exec_hook(REDO_CREATE_TABLE); install_exec_hook(REDO_CREATE_TABLE);
install_exec_hook(FILE_ID); install_exec_hook(FILE_ID);
install_exec_hook(REDO_INSERT_ROW_HEAD); install_exec_hook(REDO_INSERT_ROW_HEAD);
install_exec_hook(REDO_INSERT_ROW_TAIL);
install_exec_hook(REDO_PURGE_ROW_HEAD);
install_exec_hook(REDO_PURGE_ROW_TAIL);
install_exec_hook(UNDO_ROW_INSERT);
install_exec_hook(UNDO_ROW_DELETE);
install_exec_hook(COMMIT); install_exec_hook(COMMIT);
if (opt_only_display) if (opt_only_display)
...@@ -261,7 +273,7 @@ int main(int argc, char **argv) ...@@ -261,7 +273,7 @@ int main(int argc, char **argv)
/* don't touch anything more, in case we hit a bug */ /* don't touch anything more, in case we hit a bug */
exit(1); exit(1);
end: end:
maria_end(); maria_panic(HA_PANIC_CLOSE);
free_defaults(default_argv); free_defaults(default_argv);
my_end(0); my_end(0);
exit(0); exit(0);
...@@ -318,7 +330,13 @@ get_one_option(int optid __attribute__((unused)), ...@@ -318,7 +330,13 @@ get_one_option(int optid __attribute__((unused)),
const struct my_option *opt __attribute__((unused)), const struct my_option *opt __attribute__((unused)),
char *argument __attribute__((unused))) char *argument __attribute__((unused)))
{ {
/* for now there is nothing special with our options */ switch (optid) {
#ifndef DBUG_OFF
case '#':
DBUG_SET_INITIAL(argument ? argument : default_dbug_option);
break;
}
#endif
return 0; return 0;
} }
...@@ -619,6 +637,140 @@ prototype_exec_hook(REDO_INSERT_ROW_HEAD) ...@@ -619,6 +637,140 @@ prototype_exec_hook(REDO_INSERT_ROW_HEAD)
ulonglong page; ulonglong page;
MARIA_HA *info; MARIA_HA *info;
char llbuf[22]; char llbuf[22];
byte *buff= 0;
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
llstr(page, llbuf);
printf("For page %s of table of short id %u", llbuf, sid);
info= all_tables[sid];
if (info == NULL)
{
printf(", table skipped, so skipping record\n");
goto end;
}
printf(", '%s'", info->s->open_file_name);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
goto end;
}
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint).
*/
printf(", applying record\n");
/*
If REDO's LSN is > page's LSN (read from disk), we are going to modify the
page and change its LSN. The normal runtime code stores the UNDO's LSN
into the page. Here storing the REDO's LSN (rec->lsn) would work
(we are not writing to the log here, so don't have to "flush up to UNDO's
LSN"). But in a test scenario where we do updates at runtime, then remove
tables, apply the log and check that this results in the same table as at
runtime, putting the same LSN as runtime had done will decrease
differences. So we use the UNDO's LSN which is current_group_end_lsn.
*/
if ((!(buff= (byte*) my_malloc(rec->record_length, MYF(MY_WME)))) ||
(translog_read_record(rec->lsn, 0, rec->record_length, buff, NULL) !=
rec->record_length))
{
fprintf(stderr, "Failed to read record\n");
goto end;
}
if (_ma_apply_redo_insert_row_head_or_tail(info, rec->lsn, HEAD_PAGE,
rec->header + FILEID_STORE_SIZE,
buff + (rec->record_length -
rec->non_header_data_len),
rec->non_header_data_len))
goto end;
my_free(buff, MYF(0));
return 0;
end:
/* as we don't have apply working: */
my_free(buff, MYF(MY_ALLOW_ZERO_PTR));
return 1;
}
prototype_exec_hook(REDO_INSERT_ROW_TAIL)
{
uint16 sid;
ulonglong page;
MARIA_HA *info;
char llbuf[22];
byte *buff= 0;
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
llstr(page, llbuf);
printf("For page %s of table of short id %u", llbuf, sid);
info= all_tables[sid];
if (info == NULL)
{
printf(", table skipped, so skipping record\n");
goto end;
}
printf(", '%s'", info->s->open_file_name);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
goto end;
}
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint).
*/
printf(", applying record\n");
/*
If REDO's LSN is > page's LSN (read from disk), we are going to modify the
page and change its LSN. The normal runtime code stores the UNDO's LSN
into the page. Here storing the REDO's LSN (rec->lsn) would work
(we are not writing to the log here, so don't have to "flush up to UNDO's
LSN"). But in a test scenario where we do updates at runtime, then remove
tables, apply the log and check that this results in the same table as at
runtime, putting the same LSN as runtime had done will decrease
differences. So we use the UNDO's LSN which is current_group_end_lsn.
*/
if ((!(buff= (byte*) my_malloc(rec->record_length, MYF(MY_WME)))) ||
(translog_read_record(rec->lsn, 0, rec->record_length, buff, NULL) !=
rec->record_length))
{
fprintf(stderr, "Failed to read record\n");
goto end;
}
if (_ma_apply_redo_insert_row_head_or_tail(info, rec->lsn, TAIL_PAGE,
rec->header + FILEID_STORE_SIZE,
buff + (rec->record_length -
rec->non_header_data_len),
rec->non_header_data_len))
goto end;
my_free(buff, MYF(0));
return 0;
end:
/* as we don't have apply working: */
my_free(buff, MYF(MY_ALLOW_ZERO_PTR));
return 1;
}
prototype_exec_hook(REDO_PURGE_ROW_HEAD)
{
uint16 sid;
ulonglong page;
MARIA_HA *info;
char llbuf[22];
sid= fileid_korr(rec->header); sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE); page= page_korr(rec->header + FILEID_STORE_SIZE);
llstr(page, llbuf); llstr(page, llbuf);
...@@ -653,13 +805,89 @@ prototype_exec_hook(REDO_INSERT_ROW_HEAD) ...@@ -653,13 +805,89 @@ prototype_exec_hook(REDO_INSERT_ROW_HEAD)
runtime, putting the same LSN as runtime had done will decrease runtime, putting the same LSN as runtime had done will decrease
differences. So we use the UNDO's LSN which is current_group_end_lsn. differences. So we use the UNDO's LSN which is current_group_end_lsn.
*/ */
DBUG_ASSERT("Monty" == "this is the place");
if (_ma_apply_redo_purge_row_head_or_tail(info, rec->lsn, HEAD_PAGE,
rec->header + FILEID_STORE_SIZE))
goto end;
return 0;
end: end:
/* as we don't have apply working: */ /* as we don't have apply working: */
return 1; return 1;
} }
prototype_exec_hook(REDO_PURGE_ROW_TAIL)
{
uint16 sid;
ulonglong page;
MARIA_HA *info;
char llbuf[22];
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
llstr(page, llbuf);
printf("For page %s of table of short id %u", llbuf, sid);
info= all_tables[sid];
if (info == NULL)
{
printf(", table skipped, so skipping record\n");
goto end;
}
printf(", '%s'", info->s->open_file_name);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
goto end;
}
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint).
*/
printf(", applying record\n");
/*
If REDO's LSN is > page's LSN (read from disk), we are going to modify the
page and change its LSN. The normal runtime code stores the UNDO's LSN
into the page. Here storing the REDO's LSN (rec->lsn) would work
(we are not writing to the log here, so don't have to "flush up to UNDO's
LSN"). But in a test scenario where we do updates at runtime, then remove
tables, apply the log and check that this results in the same table as at
runtime, putting the same LSN as runtime had done will decrease
differences. So we use the UNDO's LSN which is current_group_end_lsn.
*/
if (_ma_apply_redo_purge_row_head_or_tail(info, rec->lsn, TAIL_PAGE,
rec->header + FILEID_STORE_SIZE))
goto end;
return 0;
end:
/* as we don't have apply working: */
return 1;
}
static int exec_LOGREC_UNDO_ROW_INSERT(const TRANSLOG_HEADER_BUFFER *rec
__attribute__((unused)))
{
/* Ignore this during the redo phase */
return 0;
}
static int exec_LOGREC_UNDO_ROW_DELETE(const TRANSLOG_HEADER_BUFFER *rec
__attribute__((unused)))
{
/* Ignore this during the redo phase */
return 0;
}
prototype_exec_hook(COMMIT) prototype_exec_hook(COMMIT)
{ {
uint16 sid= rec->short_trid; uint16 sid= rec->short_trid;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment