Commit 0a2220b6 authored by unknown's avatar unknown

Merge desktop.sanja.is.com.ua:/home/bell/mysql/bk/mysql-maria

into  desktop.sanja.is.com.ua:/home/bell/mysql/bk/work-maria-purge


storage/maria/ma_loghandler.c:
  Auto merged
storage/maria/ma_loghandler.h:
  Auto merged
parents 7c273b82 bd02914a
......@@ -3045,3 +3045,6 @@ win/vs71cache.txt
win/vs8cache.txt
zlib/*.ds?
zlib/*.vcproj
maria_log_control
storage/maria/tmp/*
storage/maria/tmp/*
......@@ -300,7 +300,8 @@ extern ha_rows maria_records_in_range(struct st_maria_info *info, int inx,
extern int maria_is_changed(struct st_maria_info *info);
extern int maria_delete_all_rows(struct st_maria_info *info);
extern uint maria_get_pointer_length(ulonglong file_length, uint def);
extern int maria_commit(struct st_maria_info *info);
extern int maria_begin(struct st_maria_info *info);
/* this is used to pass to mysql_mariachk_table */
......
......@@ -22,6 +22,16 @@
/* My memory re allocator */
/**
@brief wrapper around realloc()
@param oldpoint pointer to currently allocated area
@param size new size requested, must be >0
@param my_flags flags
@note if size==0 realloc() may return NULL; my_realloc() treats this as an
error which is not the intention of realloc()
*/
void* my_realloc(void* oldpoint, size_t size, myf my_flags)
{
void *point;
......@@ -29,6 +39,7 @@ void* my_realloc(void* oldpoint, size_t size, myf my_flags)
DBUG_PRINT("my",("ptr: 0x%lx size: %lu my_flags: %d", (long) oldpoint,
(ulong) size, my_flags));
DBUG_ASSERT(size > 0);
if (!oldpoint && (my_flags & MY_ALLOW_ZERO_PTR))
DBUG_RETURN(my_malloc(size,my_flags));
#ifdef USE_HALLOC
......
......@@ -3418,6 +3418,17 @@ server.");
using_update_log=1;
}
/* call ha_init_key_cache() on all key caches to init them */
process_key_caches(&ha_init_key_cache);
/*
Maria's pagecache needs to be ready before Maria engine (Recovery uses
pagecache, and Checkpoint may happen at startup). Maria engine is taken up
in plugin_init().
*/
#ifdef WITH_MARIA_STORAGE_ENGINE
process_pagecaches(&ha_init_pagecache);
#endif /* WITH_MARIA_STORAGE_ENGINE */
/* Allow storage engine to give real error messages */
if (ha_init_errors())
DBUG_RETURN(1);
......@@ -3588,12 +3599,6 @@ server.");
if (opt_myisam_log)
(void) mi_log(1);
/* call ha_init_key_cache() on all key caches to init them */
process_key_caches(&ha_init_key_cache);
#ifdef WITH_MARIA_STORAGE_ENGINE
process_pagecaches(&ha_init_pagecache);
#endif /* WITH_MARIA_STORAGE_ENGINE */
#if defined(HAVE_MLOCKALL) && defined(MCL_CURRENT) && !defined(EMBEDDED_LIBRARY)
if (locked_in_memory && !getuid())
{
......
......@@ -27,11 +27,16 @@
#include "ha_maria.h"
#include "trnman_public.h"
C_MODE_START
#include "maria_def.h"
#include "ma_rt_index.h"
#include "ma_blockrec.h"
#include "ma_commit.h"
C_MODE_END
/*
Note that in future versions, only *transactional* Maria tables can
rollback, so this flag should be up or down conditionally.
*/
#define MARIA_CANNOT_ROLLBACK HA_NO_TRANSACTIONS
#ifdef MARIA_CANNOT_ROLLBACK
#define trans_register_ha(A, B, C) do { /* nothing */ } while(0)
......@@ -2384,7 +2389,7 @@ static int ha_maria_init(void *p)
maria_hton->flags= HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES;
bzero(maria_log_pagecache, sizeof(*maria_log_pagecache));
maria_data_root= mysql_real_data_home;
res= maria_init() || ma_control_file_create_or_open(TRUE) ||
res= maria_init() || ma_control_file_create_or_open() ||
(init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) ||
......
......@@ -512,15 +512,19 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
MARIA_FILE_BITMAP *bitmap,
ulonglong page)
{
my_off_t position= page * bitmap->block_size;
my_off_t end_of_page= (page + 1) * bitmap->block_size;
my_bool res;
DBUG_ENTER("_ma_read_bitmap_page");
DBUG_ASSERT(page % bitmap->pages_covered == 0);
bitmap->page= page;
if (position >= share->state.state.data_file_length)
if (end_of_page > share->state.state.data_file_length)
{
share->state.state.data_file_length= position + bitmap->block_size;
/*
Inexistent or half-created page (could be crash in the middle of
_ma_bitmap_create_first(), before appending maria_bitmap_marker).
*/
share->state.state.data_file_length= end_of_page;
bzero(bitmap->map, bitmap->block_size);
memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker),
maria_bitmap_marker, sizeof(maria_bitmap_marker));
......@@ -2047,7 +2051,8 @@ int _ma_bitmap_create_first(MARIA_SHARE *share)
{
uint block_size= share->bitmap.block_size;
File file= share->bitmap.file.file;
if (my_chsize(file, block_size, 0, MYF(MY_WME)) ||
if (my_chsize(file, block_size - sizeof(maria_bitmap_marker),
0, MYF(MY_WME)) ||
my_pwrite(file, maria_bitmap_marker, sizeof(maria_bitmap_marker),
block_size - sizeof(maria_bitmap_marker),
MYF(MY_NABP | MY_WME)))
......
......@@ -504,11 +504,12 @@ void _ma_end_block_record(MARIA_HA *info)
****************************************************************************/
/*
Return the next used uchar on the page after a directory entry.
Return the next unused postion on the page after a directory entry.
SYNOPSIS
start_of_next_entry()
dir Directory entry to be used
dir Directory entry to be used. This can not be the
the last entry on the page!
RETURN
# Position in page where next entry starts.
......@@ -530,6 +531,129 @@ static inline uint start_of_next_entry(uchar *dir)
}
/*
Return the offset where the previous entry ends (before on page)
SYNOPSIS
end_of_previous_entry()
dir Address for current directory entry
end Address to last directory entry
RETURN
# Position where previous entry ends (smallest address on page)
Everything between # and current entry are free to be used.
*/
static inline uint end_of_previous_entry(uchar *dir, uchar *end)
{
uchar *pos;
for (pos= dir + DIR_ENTRY_SIZE ; pos < end ; pos+= DIR_ENTRY_SIZE)
{
uint offset;
if ((offset= uint2korr(pos)))
return offset + uint2korr(pos+2);
}
return PAGE_HEADER_SIZE;
}
/**
@brief Extend a record area to fit a given size block
@fn extend_area_on_page()
@param buff Page buffer
@param dir Pointer to dir entry in buffer
@param rownr Row number we working on
@param block_size Block size of buffer
@param request_length How much data we want to put at [dir]
@param empty_space Total empty space in buffer
IMPLEMENTATION
The logic is as follows (same as in _ma_update_block_record())
- If new data fits in old block, use old block.
- Extend block with empty space before block. If enough, use it.
- Extend block with empty space after block. If enough, use it.
- Use compact_page() to get all empty space at dir.
RETURN
@retval 0 ok
@retval ret_offset Pointer to store offset to found area
@retval ret_length Pointer to store length of found area
@retval [dir] rec_offset is store here too
@retval 1 error (wrong info in block)
*/
static my_bool extend_area_on_page(uchar *buff, uchar *dir,
uint rownr, uint block_size,
uint request_length,
uint *empty_space, uint *ret_offset,
uint *ret_length)
{
uint rec_offset, length;
DBUG_ENTER("extend_area_on_page");
rec_offset= uint2korr(dir);
length= uint2korr(dir + 2);
DBUG_PRINT("enter", ("rec_offset: %u length: %u request_length: %u",
rec_offset, length, request_length));
*empty_space+= length;
if (length < request_length)
{
uint max_entry= (uint) ((uchar*) buff)[DIR_COUNT_OFFSET];
uint old_rec_offset;
/*
New data did not fit in old position.
Find first possible position where to put new data.
*/
old_rec_offset= rec_offset;
rec_offset= end_of_previous_entry(dir, buff + block_size -
PAGE_SUFFIX_SIZE);
length+= (uint) (old_rec_offset - rec_offset);
/*
old_rec_offset is 0 if we are doing an insert into a not allocated block.
This can only happen during REDO of INSERT
*/
if (!old_rec_offset || length < request_length)
{
/*
Did not fit in current block + empty space. Extend with
empty space after block.
*/
if (rownr == max_entry - 1)
{
/* Last entry; Everything is free between this and directory */
length= ((block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE * max_entry) -
rec_offset);
}
else
length= start_of_next_entry(dir) - rec_offset;
DBUG_ASSERT((int) length > 0);
if (length < request_length)
{
/* Not enough continues space, compact page to get more */
int2store(dir, rec_offset);
compact_page(buff, block_size, rownr, 1);
rec_offset= uint2korr(dir);
length= uint2korr(dir+2);
if (length < request_length)
DBUG_RETURN(1); /* Error in block */
*empty_space= length; /* All space is here */
}
}
}
int2store(dir, rec_offset);
*ret_offset= rec_offset;
*ret_length= length;
DBUG_RETURN(0);
}
/*
Check that a region is all zero
......@@ -1438,7 +1562,7 @@ static my_bool free_full_pages(MARIA_HA *info, MARIA_ROW *row)
log_data))
DBUG_RETURN(1);
DBUG_RETURN (_ma_bitmap_free_full_pages(info, row->extents,
DBUG_RETURN(_ma_bitmap_free_full_pages(info, row->extents,
row->extents_count));
}
......@@ -1457,6 +1581,7 @@ static my_bool free_full_pages(MARIA_HA *info, MARIA_ROW *row)
static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
{
my_bool res= 0;
DBUG_ENTER("free_full_page_range");
if (pagecache_delete_pages(info->s->pagecache, &info->dfile,
page, count, PAGECACHE_LOCK_WRITE, 0))
......@@ -1490,7 +1615,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
count))
res= 1;
pthread_mutex_unlock(&info->s->bitmap.bitmap_lock);
return res;
DBUG_RETURN(res);
}
......@@ -2431,7 +2556,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
if ((org_empty_size + cur_row->head_length) >= new_row->total_length)
{
uint empty, offset, length;
uint rec_offset, length;
MARIA_BITMAP_BLOCK block;
/*
......@@ -2440,27 +2565,18 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
*/
block.org_bitmap_value= _ma_free_size_to_head_pattern(&share->bitmap,
org_empty_size);
offset= uint2korr(dir);
length= uint2korr(dir + 2);
empty= 0;
if (new_row->total_length > length)
{
/* See if there is empty space after */
if (rownr != (uint) ((uchar *) buff)[DIR_COUNT_OFFSET] - 1)
empty= start_of_next_entry(dir) - (offset + length);
if (new_row->total_length > length + empty)
{
compact_page(buff, share->block_size, rownr, 1);
org_empty_size= 0;
length= uint2korr(dir + 2);
}
}
if (extend_area_on_page(buff, dir, rownr, share->block_size,
new_row->total_length, &org_empty_size,
&rec_offset, &length))
DBUG_RETURN(1);
row_pos.buff= buff;
row_pos.rownr= rownr;
row_pos.empty_space= org_empty_size + length;
row_pos.empty_space= org_empty_size;
row_pos.dir= dir;
row_pos.data= buff + uint2korr(dir);
row_pos.length= length + empty;
row_pos.data= buff + rec_offset;
row_pos.length= length;
blocks->block= &block;
blocks->count= 1;
block.page= page;
......@@ -2470,7 +2586,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
/* Update cur_row, if someone calls update at once again */
cur_row->head_length= new_row->total_length;
if (free_full_pages(info, cur_row))
if (cur_row->extents_count && free_full_pages(info, cur_row))
goto err;
DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks,
1, &row_pos));
......@@ -2492,9 +2608,9 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
}
/* Delete old row */
if (delete_tails(info, cur_row->tail_positions))
if (*cur_row->tail_positions && delete_tails(info, cur_row->tail_positions))
goto err;
if (free_full_pages(info, cur_row))
if (cur_row->extents_count && free_full_pages(info, cur_row))
goto err;
if (_ma_bitmap_find_new_place(info, new_row, page, head_length, blocks))
goto err;
......@@ -2535,7 +2651,7 @@ static int delete_dir_entry(uchar *buff, uint block_size, uint record_number,
{
uint number_of_records= (uint) ((uchar *) buff)[DIR_COUNT_OFFSET];
uint length, empty_space;
uchar *dir;
uchar *dir, *org_dir;
DBUG_ENTER("delete_dir_entry");
#ifdef SANITY_CHECKS
......@@ -2551,9 +2667,8 @@ static int delete_dir_entry(uchar *buff, uint block_size, uint record_number,
#endif
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
dir= (buff + block_size - DIR_ENTRY_SIZE * record_number -
org_dir= dir= (buff + block_size - DIR_ENTRY_SIZE * record_number -
DIR_ENTRY_SIZE - PAGE_SUFFIX_SIZE);
dir[0]= dir[1]= 0; /* Delete entry */
length= uint2korr(dir + 2);
if (record_number == number_of_records - 1)
......@@ -2566,21 +2681,24 @@ static int delete_dir_entry(uchar *buff, uint block_size, uint record_number,
dir+= DIR_ENTRY_SIZE;
empty_space+= DIR_ENTRY_SIZE;
} while (dir < end && dir[0] == 0 && dir[1] == 0);
if (number_of_records == 0)
{
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
*empty_space_res= block_size;
DBUG_RETURN(1);
}
buff[DIR_COUNT_OFFSET]= (uchar) number_of_records;
}
empty_space+= length;
if (number_of_records != 0)
{
/* Update directory */
org_dir[0]= org_dir[1]= 0; org_dir[2]= org_dir[3]= 0; /* Delete entry */
int2store(buff + EMPTY_SPACE_OFFSET, empty_space);
buff[PAGE_TYPE_OFFSET]|= (uchar) PAGE_CAN_BE_COMPACTED;
*empty_space_res= empty_space;
DBUG_RETURN(0);
}
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
*empty_space_res= block_size;
DBUG_RETURN(1);
}
......@@ -4146,6 +4264,10 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
page= page_korr(header);
rownr= dirpos_korr(header+PAGE_STORE_SIZE);
DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u data_length: %u",
(ulong) ma_recordpos(page, rownr),
(ulong) page, rownr, (uint) data_length));
if (((page + 1) * info->s->block_size) > info->state->data_file_length)
{
/*
......@@ -4162,9 +4284,6 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
empty_space= (block_size - PAGE_OVERHEAD_SIZE);
rec_offset= PAGE_HEADER_SIZE;
dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
/* Update that file is extended */
info->state->data_file_length= (page + 1) * info->s->block_size;
}
else
{
......@@ -4207,7 +4326,7 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
PAGE_SUFFIX_SIZE);
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
if (max_entry >= rownr)
if (max_entry <= rownr)
{
/* Add directory entry first in directory and data last on page */
DBUG_ASSERT(max_entry == rownr);
......@@ -4231,53 +4350,16 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
empty_space-= DIR_ENTRY_SIZE;
}
else
{
/* reuse old empty entry */
uchar *pos, *end, *end_data;
DBUG_ASSERT(uint2korr(dir) == 0);
if (uint2korr(dir))
goto err; /* Should have been empty */
/* Find start of where we can put data */
end= (buff + block_size - DIR_ENTRY_SIZE * max_entry -
PAGE_SUFFIX_SIZE);
for (pos= dir ; pos >= end ; pos-= DIR_ENTRY_SIZE)
{
if ((rec_offset= uint2korr(pos)))
{
rec_offset+= uint2korr(pos+2);
break;
}
}
DBUG_ASSERT(pos >= end);
if (pos < end) /* Wrong directory */
goto err;
/* find end data */
end_data= end; /* Start of directory */
end= (buff + block_size - PAGE_SUFFIX_SIZE);
for (pos= dir ; pos < end ; pos+= DIR_ENTRY_SIZE)
{
uint offset;
if ((offset= uint2korr(pos)))
{
end_data= buff + offset;
break;
}
}
if ((uint) (end_data - (buff + rec_offset)) < data_length)
{
uint length;
/* Not enough continues space, compact page to get more */
int2store(dir, rec_offset);
compact_page(buff, block_size, rownr, 1);
rec_offset= uint2korr(dir);
length= uint2korr(dir+2);
DBUG_ASSERT(length >= data_length);
if (length < data_length)
/*
Reuse old entry. This is empty if the command was an insert and
possible used if the command was an update.
*/
if (extend_area_on_page(buff, dir, rownr, block_size,
data_length, &empty_space,
&rec_offset, &length))
goto err;
empty_space= length;
}
}
}
}
......@@ -4301,6 +4383,16 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno);
/*
Data page and bitmap page are in place, we can update data_file_length in
case we extended the file. We could not do it earlier: bitmap code tests
data_file_length to know if it has to create a new page or not.
*/
{
my_off_t end_of_page= (page + 1) * info->s->block_size;
set_if_bigger(info->state->data_file_length, end_of_page);
}
DBUG_RETURN(0);
err:
......@@ -4397,25 +4489,28 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
{
MARIA_SHARE *share= info->s;
ulonglong page;
uint page_range;
uint res;
uint page_range, ranges;
uint res= 0;
uchar *buff= info->keyread_buff;
uint block_size= share->block_size;
DBUG_ENTER("_ma_apply_redo_purge_blocks");
info->keyread_buff_used= 1;
page_range= pagerange_korr(header);
/* works only for a one-page range for now */
DBUG_ASSERT(page_range == 1); // for now
ranges= pagerange_korr(header);
header+= PAGERANGE_STORE_SIZE;
while (ranges--)
{
uint i;
page= page_korr(header);
header+= PAGE_STORE_SIZE;
page_range= pagerange_korr(header);
DBUG_ASSERT(page_range == 1); // for now
header+= PAGERANGE_STORE_SIZE;
for (i= 0; i < page_range ; i++)
{
if (!(buff= pagecache_read(share->pagecache,
&info->dfile,
page, 0,
page+i, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
DBUG_RETURN(my_errno);
......@@ -4423,44 +4518,24 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
if (lsn_korr(buff) >= lsn)
{
/* Already applied */
goto mark_free_in_bitmap;
continue;
}
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
/*
Strictly speaking, we don't need to zero the last directory entry of this
page; setting the directory's count to zero is enough (it makes the last
directory entry invisible, irrelevant).
But as the "runtime" code (delete_head_or_tail()) called
delete_dir_entry() which zeroed the entry, if we don't do it here, we get
a difference between runtime and log-applying. Irrelevant, but it's
time-consuming to differentiate irrelevant differences from relevant
ones. So we remove the difference by zeroing the entry.
*/
{
uint rownr= ((uint) ((uchar *) buff)[DIR_COUNT_OFFSET]) - 1;
uchar *dir= (buff + block_size - DIR_ENTRY_SIZE * rownr -
DIR_ENTRY_SIZE - PAGE_SUFFIX_SIZE);
dir[0]= dir[1]= 0; /* Delete entry */
}
buff[DIR_COUNT_OFFSET]= 0;
lsn_store(buff, lsn);
if (pagecache_write(share->pagecache,
&info->dfile, page, 0,
&info->dfile, page+i, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(my_errno);
mark_free_in_bitmap:
}
/** @todo leave bitmap lock to the bitmap code... */
pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_reset_full_page_bits(info, &share->bitmap, page, 1);
res= _ma_reset_full_page_bits(info, &share->bitmap, page, page_range);
pthread_mutex_unlock(&share->bitmap.bitmap_lock);
if (res)
DBUG_RETURN(res);
}
DBUG_RETURN(0);
}
......@@ -2046,15 +2046,8 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
goto err;
}
_ma_reset_status(sort_info.new_info);
#ifdef ASK_MONTY /* cf maria_create() */
/**
@todo ASK_MONTY
without this call, a REPAIR on an empty table leaves the data file of
size 0, which sounds reasonable.
*/
if (_ma_initialize_data_file(sort_info.new_info->s, new_file))
goto err;
#endif
block_record= 1;
}
}
......
......@@ -28,8 +28,13 @@
int ma_commit(TRN *trn)
{
int res;
LSN commit_lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS];
DBUG_ENTER("ma_commit");
if (trn->undo_lsn == 0) /* no work done, rollback (cheaper than commit) */
return trnman_rollback_trn(trn);
DBUG_RETURN(trnman_rollback_trn(trn));
/*
- if COMMIT record is written before trnman_commit_trn():
if Checkpoint comes in the middle it will see trn is not committed,
......@@ -45,27 +50,75 @@ int ma_commit(TRN *trn)
issue (transaction's updates were made visible to other transactions).
So we need to go the first way.
*/
/**
@todo RECOVERY share's state is written to disk only in
maria_lock_database(), so COMMIT record is not the last record of the
transaction! It is probably an issue. Recovery of the state is a problem
not yet solved.
*/
LSN commit_lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS];
/*
We do not store "thd->transaction.xid_state.xid" for now, it will be
needed only when we support XA.
*/
return
translog_write_record(&commit_lsn, LOGREC_COMMIT,
res= (translog_write_record(&commit_lsn, LOGREC_COMMIT,
trn, NULL, 0,
sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL) ||
translog_flush(commit_lsn) || trnman_commit_trn(trn);
translog_flush(commit_lsn) ||
trnman_commit_trn(trn));
/*
Note: if trnman_commit_trn() fails above, we have already
written the COMMIT record, so Checkpoint and Recovery will see the
transaction as committed.
*/
DBUG_RETURN(res);
}
/**
@brief Writes a COMMIT record for a transaciton associated with a file
@param info Maria handler
@return Operation status
@retval 0 ok
@retval # error (disk error or out of memory)
*/
int maria_commit(MARIA_HA *info)
{
return info->s->now_transactional ? ma_commit(info->trn) : 0;
}
/**
@brief Starts a transaction on a file handle
@param info Maria handler
@return Operation status
@retval 0 ok
@retval # Error code.
*/
int maria_begin(MARIA_HA *info)
{
DBUG_ENTER("maria_begin");
if (info->s->now_transactional)
{
TRN *trn;
struct st_my_thread_var *mysys_var= my_thread_var;
trn= trnman_new_trn(&mysys_var->mutex,
&mysys_var->suspend,
(char*) &mysys_var + STACK_DIRECTION *1024*128);
if (unlikely(!trn))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
DBUG_PRINT("info", ("TRN set to 0x%lx", (ulong) trn));
info->trn= trn;
}
DBUG_RETURN(0);
}
......@@ -41,6 +41,10 @@
#define CONTROL_FILE_SIZE (CONTROL_FILE_FILENO_OFFSET + CONTROL_FILE_FILENO_SIZE)
/* This module owns these two vars. */
/**
This LSN serves for the two-checkpoint rule, and also to find the
checkpoint record when doing a recovery.
*/
LSN last_checkpoint_lsn= LSN_IMPOSSIBLE;
uint32 last_logno= FILENO_IMPOSSIBLE;
......@@ -68,8 +72,6 @@ static int control_file_fd= -1;
the last_checkpoint_lsn and last_logno global variables.
Called at engine's start.
@param create_if_missing
@note
The format of the control file is:
4 bytes: magic string
......@@ -78,11 +80,13 @@ static int control_file_fd= -1;
4 bytes: offset in log where last checkpoint is
4 bytes: number of last log
@note If in recovery, file is not created
@return Operation status
@retval 0 OK
@retval 1 Error (in which case the file is left closed)
*/
CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool create_if_missing)
CONTROL_FILE_ERROR ma_control_file_create_or_open()
{
char buffer[CONTROL_FILE_SIZE];
char name[FN_REFLEN];
......@@ -111,7 +115,8 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool create_if_missing)
if (create_file)
{
if (!create_if_missing)
/* in a recovery, we expect to find a control file */
if (maria_in_recovery)
DBUG_RETURN(CONTROL_FILE_MISSING);
if ((control_file_fd= my_create(name, 0,
open_flags, MYF(MY_SYNC_DIR))) < 0)
......
......@@ -61,7 +61,7 @@ extern "C" {
If present, reads it to find out last checkpoint's LSN and last log.
Called at engine's start.
*/
CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool);
CONTROL_FILE_ERROR ma_control_file_create_or_open();
/*
Write information durably to the control file.
Called when we have created a new log (after syncing this log's creation)
......
......@@ -664,6 +664,14 @@ int maria_create(const char *name, enum data_file_type datafile_type,
share.base.keystart = share.state.state.key_file_length=
MY_ALIGN(info_length, maria_block_size);
if (share.data_file_type == BLOCK_RECORD)
{
/*
we are going to create a first bitmap page, set data_file_length
to reflect this, before the state goes to disk
*/
share.state.state.data_file_length= maria_block_size;
}
share.base.max_key_block_length= maria_block_size;
share.base.max_key_length=ALIGN_SIZE(max_key_length+4);
share.base.records=ci->max_rows;
......@@ -1041,36 +1049,8 @@ int maria_create(const char *name, enum data_file_type datafile_type,
goto err;
errpos=3;
/**
@todo ASK_MONTY
QQ: this sets data_file_length from 0 to 8192, but we wrote the state
already to the index file (because:
- log record is built from index header so state must be written before
log record
- data file must be created after log record, so that "missing log
record" implies "unusable table").
When we wrote the state, we hadn't called ma_initialize_data_file(), so
the data_file_length is 0!
Thus, we below create a 8192-byte data file, but its recorded size is 0,
so next time we read the bitmap (a maria_write() for example) we'll
overwrite the bitmap we just created below.
It's not very efficient.
It also makes maria_chk_size() print
Size of datafile is: 8192 Should be: 0
on a freshly created table (run "check.test" with a Maria table).
Why do we absolutely want to create a 8192-byte page for a freshly
created, empty table? Why don't we leave the data file empty?
Removing the call below at least removes the maria_chk_size() issue.
Monty wrote on IRC, about a size of 0:
"This basically ok; The first block is a bitmap that may or may not
exists", but later he asked that the first block always exists.???
*/
#ifdef ASK_MONTY
if (_ma_initialize_data_file(&share, dfile))
goto err;
#endif
}
/* Enlarge files */
......
......@@ -159,7 +159,7 @@ struct st_translog_descriptor
LSN flushed;
/* Last LSN sent to the disk (but maybe not written yet) */
LSN sent_to_file;
/* All what is after this addess is not sent to disk yet */
/* All what is after this address is not sent to disk yet */
TRANSLOG_ADDRESS in_buffers_only;
pthread_mutex_t sent_to_file_lock;
......@@ -306,13 +306,9 @@ static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_TAIL=
NULL, write_hook_for_redo, NULL, 0,
"redo_purge_row_tail", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
/* QQQ: TODO: variable and fixed size??? */
static LOG_DESC INIT_LOGREC_REDO_PURGE_BLOCKS=
{LOGRECTYPE_VARIABLE_LENGTH,
FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE,
FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE,
{LOGRECTYPE_VARIABLE_LENGTH, 0,
FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE,
NULL, write_hook_for_redo, NULL, 0,
"redo_purge_blocks", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
......@@ -682,7 +678,7 @@ static my_bool translog_max_lsn_to_header(File file, LSN lsn)
typedef struct st_loghandler_file_info
{
/*
LSN_IPOSSIBLE for current file and max LSN which parts stored in the
LSN_IMPOSSIBLE for current file and max LSN which parts stored in the
file for all other (finished) files.
*/
LSN max_lsn;
......@@ -824,7 +820,7 @@ static void translog_mark_file_unfinished(uint32 file)
goto end;
}
for (place= log_descriptor.unfinished_files.elements;
for (place= log_descriptor.unfinished_files.elements - 1;
place >= 0;
place--)
{
......@@ -5301,6 +5297,7 @@ translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner)
@param buff Buffer to be filled with header data
@param scanner If present should be moved to the header page if
it differ from LSN page
@return Length of header or operation status
@retval RECHEADER_READ_ERROR error
@retval # number of bytes in
......@@ -5323,7 +5320,6 @@ int translog_variable_length_header(uchar *page, translog_size_t page_offset,
uint16 buffer_length= length;
uint16 body_len;
TRANSLOG_SCANNER_DATA internal_scanner;
DBUG_ENTER("translog_variable_length_header");
buff->record_length= translog_variable_record_1group_decode_len(&src);
......@@ -6174,7 +6170,7 @@ static my_bool write_hook_for_redo(enum translog_record_type type
non-transactional log records (REPAIR, CREATE, RENAME, DROP) should not
call this hook; we trust them but verify ;)
*/
DBUG_ASSERT(!(maria_multi_threaded && (trn->trid == 0)));
DBUG_ASSERT(trn->trid != 0);
/*
If the hook stays so simple, it would be faster to pass
!trn->rec_lsn ? trn->rec_lsn : some_dummy_lsn
......@@ -6203,7 +6199,7 @@ static my_bool write_hook_for_undo(enum translog_record_type type
struct st_translog_parts *parts
__attribute__ ((unused)))
{
DBUG_ASSERT(!(maria_multi_threaded && (trn->trid == 0)));
DBUG_ASSERT(trn->trid != 0);
trn->undo_lsn= *lsn;
if (unlikely(LSN_WITH_FLAGS_TO_LSN(trn->first_undo_lsn) == 0))
trn->first_undo_lsn=
......@@ -6316,6 +6312,17 @@ void translog_deassign_id_from_share(MARIA_SHARE *share)
}
void translog_assign_id_to_share_from_recovery(MARIA_SHARE *share,
uint16 id)
{
DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded);
DBUG_ASSERT(share->data_file_type == BLOCK_RECORD);
DBUG_ASSERT(share->id == 0);
DBUG_ASSERT(id_to_share[id] == NULL);
id_to_share[share->id= id]= share;
}
/**
@brief check if such log file exists
......
......@@ -260,6 +260,9 @@ extern TRANSLOG_ADDRESS translog_get_horizon();
extern int translog_assign_id_to_share(struct st_maria_share *share,
struct st_transaction *trn);
extern void translog_deassign_id_from_share(struct st_maria_share *share);
extern void
translog_assign_id_to_share_from_recovery(struct st_maria_share *share,
uint16 id);
extern my_bool translog_inited;
/*
......
......@@ -84,6 +84,9 @@ typedef LSN LSN_WITH_FLAGS;
/* following LSN also is impossible */
#define LSN_ERROR 1
/** @brief some impossible LSN serve as markers */
#define LSN_REPAIRED_BY_MARIA_CHK ((LSN)1)
/**
@brief the maximum valid LSN.
Unlike ULONGLONG_MAX, it can be safely used in comparison with valid LSNs
......
......@@ -171,7 +171,8 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode,
share->delay_key_write=1;
info.state= &share->state.state; /* Change global values by default */
info.trn= &dummy_transaction_object;
if (!share->base.born_transactional) /* but for transactional ones ... */
info.trn= &dummy_transaction_object; /* ... force crash if no trn given */
pthread_mutex_unlock(&share->intern_lock);
/* Allocate buffer for one record */
......@@ -600,18 +601,35 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
if (share->base.born_transactional)
{
share->page_type= PAGECACHE_LSN_PAGE;
#ifdef ENABLE_WHEN_WE_HAVE_TRANS_ROW_ID /* QQ */
share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE;
if (unlikely((share->state.create_rename_lsn == (LSN)ULONGLONG_MAX) &&
(open_flags & HA_OPEN_FROM_SQL_LAYER)))
#endif
if (share->state.create_rename_lsn == LSN_REPAIRED_BY_MARIA_CHK)
{
/*
This table was repaired with maria_chk. Past log records should be
ignored, future log records should not: we define the present.
Was repaired with maria_chk, maybe later maria_pack-ed. Some sort of
import into the server. It starts its existence (from the point of
view of the server, including server's recovery) now.
*/
if ((open_flags & HA_OPEN_FROM_SQL_LAYER) || maria_in_recovery)
{
share->state.create_rename_lsn= translog_get_horizon();
_ma_update_create_rename_lsn_on_disk(share, TRUE);
}
}
else if (!LSN_VALID(share->state.create_rename_lsn) &&
!(open_flags & HA_OPEN_FOR_REPAIR))
{
/*
If in Recovery, it will not work. If LSN is invalid and not
LSN_REPAIRED_BY_MARIA_CHK, header must be corrupted.
In both cases, must repair.
*/
my_errno=((share->state.changed & STATE_CRASHED_ON_REPAIR) ?
HA_ERR_CRASHED_ON_REPAIR : HA_ERR_CRASHED_ON_USAGE);
goto err;
}
}
else
share->page_type= PAGECACHE_PLAIN_PAGE;
share->now_transactional= share->base.born_transactional;
......@@ -699,6 +717,14 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
{
share->lock.get_status=_ma_get_status;
share->lock.copy_status=_ma_copy_status;
/**
@todo RECOVERY
INSERT DELAYED and concurrent inserts are currently disabled for
transactional tables; when enabled again, we should re-evaluate
what problems the call to _ma_update_status() by
thr_reschedule_write_lock() can do (it may hurt Checkpoint as it
would be without intern_lock, and it modifies the state).
*/
share->lock.update_status=_ma_update_status;
share->lock.restore_status=_ma_restore_status;
share->lock.check_status=_ma_check_status;
......@@ -958,6 +984,7 @@ uint _ma_state_info_write(File file, MARIA_STATE_INFO *state, uint pWrite)
uchar buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE];
uchar *ptr=buff;
uint i, keys= (uint) state->header.keys;
size_t res;
DBUG_ENTER("_ma_state_info_write");
memcpy_fixed(ptr,&state->header,sizeof(state->header));
......@@ -1013,11 +1040,12 @@ uint _ma_state_info_write(File file, MARIA_STATE_INFO *state, uint pWrite)
}
}
if (pWrite & 1)
DBUG_RETURN(my_pwrite(file, buff, (size_t) (ptr-buff), 0L,
MYF(MY_NABP | MY_THREADSAFE)) != 0);
DBUG_RETURN(my_write(file, buff, (size_t) (ptr-buff),
MYF(MY_NABP)) != 0);
res= (pWrite & 1) ?
my_pwrite(file, buff, (size_t) (ptr-buff), 0L,
MYF(MY_NABP | MY_THREADSAFE)) :
my_write(file, buff, (size_t) (ptr-buff),
MYF(MY_NABP));
DBUG_RETURN(res != 0);
}
......@@ -1072,6 +1100,16 @@ uchar *_ma_state_info_read(uchar *ptr, MARIA_STATE_INFO *state)
}
/**
@brief Fills the state by reading its copy on disk.
@note Does nothing in single user mode.
@param file file to read from
@param state state which will be filled
@param pRead if true, use my_pread(), otherwise my_read()
*/
uint _ma_state_info_read_dsk(File file, MARIA_STATE_INFO *state, my_bool pRead)
{
char buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE];
......
......@@ -23,25 +23,39 @@
#include "maria_def.h"
#include "ma_recovery.h"
#include "ma_blockrec.h"
#include "trnman.h"
struct TRN_FOR_RECOVERY
struct st_trn_for_recovery /* used only in the REDO phase */
{
LSN group_start_lsn, undo_lsn;
LSN group_start_lsn, undo_lsn, first_undo_lsn;
TrID long_trid;
};
struct st_dirty_page /* used only in the REDO phase */
{
uint64 file_and_page_id;
LSN rec_lsn;
};
struct st_table_for_recovery /* used in the REDO and UNDO phase */
{
MARIA_HA *info;
File org_kfile, org_dfile; /**< OS descriptors when Checkpoint saw table */
};
/* Variables used by all functions of this module. Ok as single-threaded */
static struct TRN_FOR_RECOVERY *all_active_trans;
static MARIA_HA **all_tables;
static LSN current_group_end_lsn;
FILE *tracef; /**< trace file for debugging */
static struct st_trn_for_recovery *all_active_trans;
static struct st_table_for_recovery *all_tables;
static HASH all_dirty_pages;
static struct st_dirty_page *dirty_pages_pool;
static LSN current_group_end_lsn,
checkpoint_start= LSN_IMPOSSIBLE;
static FILE *tracef; /**< trace file for debugging */
#define prototype_exec_hook(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
#define prototype_exec_hook_dummy(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \
__attribute ((unused)))
prototype_exec_hook(LONG_TRANSACTION_ID);
#ifdef MARIA_CHECKPOINT
prototype_exec_hook(CHECKPOINT);
#endif
prototype_exec_hook_dummy(CHECKPOINT);
prototype_exec_hook(REDO_CREATE_TABLE);
prototype_exec_hook(REDO_DROP_TABLE);
prototype_exec_hook(FILE_ID);
......@@ -53,9 +67,12 @@ prototype_exec_hook(REDO_PURGE_BLOCKS);
prototype_exec_hook(REDO_DELETE_ALL);
prototype_exec_hook(UNDO_ROW_INSERT);
prototype_exec_hook(UNDO_ROW_DELETE);
prototype_exec_hook(UNDO_ROW_UPDATE);
prototype_exec_hook(UNDO_ROW_PURGE);
prototype_exec_hook(COMMIT);
static int end_of_redo_phase();
static int run_redo_phase(LSN lsn, my_bool apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
static int run_undo_phase(uint unfinished);
static void display_record_position(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec,
uint number);
......@@ -65,83 +82,57 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
TRANSLOG_HEADER_BUFFER *rec);
static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
TRANSLOG_HEADER_BUFFER *rec);
static int close_recovered_table(MARIA_HA *info);
static void prepare_table_for_close(MARIA_HA *info, LSN at_lsn);
static int parse_checkpoint_record(LSN lsn);
static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
LSN first_undo_lsn);
static int new_table(uint16 sid, const char *name,
File org_kfile, File org_dfile, LSN lsn);
static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
struct st_dirty_page *dirty_page);
static int close_all_tables();
/** @brief global [out] buffer for translog_read_record(); never shrinks */
static LEX_STRING log_record_buffer;
#define enlarge_buffer(rec) \
if (log_record_buffer.length < rec->record_length) \
if (log_record_buffer.length < (rec)->record_length) \
{ \
log_record_buffer.length= rec->record_length; \
log_record_buffer.length= (rec)->record_length; \
log_record_buffer.str= my_realloc(log_record_buffer.str, \
rec->record_length, MYF(MY_WME)); \
(rec)->record_length, MYF(MY_WME)); \
}
#define ALERT_USER() DBUG_ASSERT(0)
#define LSN_IN_HEX(L) (ulong)LSN_FILE_NO(L),(ulong)LSN_OFFSET(L)
/**
@brief Recovers from the last checkpoint
@brief Recovers from the last checkpoint.
Runs the REDO phase using special structures, then sets up the playground
of runtime: recreates transactions inside trnman, open tables with their
two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all
tables.
*/
int maria_recover()
{
my_bool res= TRUE;
LSN from_lsn;
int res= 1;
FILE *trace_file;
DBUG_ENTER("maria_recover");
DBUG_ASSERT(!maria_in_recovery);
maria_in_recovery= TRUE;
if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
{
from_lsn= translog_first_theoretical_lsn();
/*
as far as we have not yet any checkpoint then the very first
log file should be present.
*/
DBUG_ASSERT(from_lsn != LSN_IMPOSSIBLE);
/*
@todo process eroror of getting checkpoint
if (from_lsn == ERROR_LSN)
...
*/
}
else
{
DBUG_ASSERT(0); /* not yet implemented */
/**
@todo read the checkpoint record, fill structures
and use the minimum of checkpoint_start_lsn, rec_lsn of trns, rec_lsn
of dirty pages.
*/
//from_lsn= something;
}
/*
mysqld has not yet initialized any page cache. Let's create a dedicated
one for recovery.
*/
if ((trace_file= fopen("maria_recovery.trace", "w")))
{
fprintf(trace_file, "TRACE of the last MARIA recovery from mysqld\n");
res= (init_pagecache(maria_pagecache,
/** @todo what size? */
1024*1024,
0, 0,
maria_block_size) == 0) ||
maria_apply_log(from_lsn, TRUE, trace_file);
end_pagecache(maria_pagecache, TRUE);
DBUG_ASSERT(maria_pagecache->inited);
res= maria_apply_log(LSN_IMPOSSIBLE, TRUE, trace_file, TRUE);
if (!res)
fprintf(trace_file, "SUCCESS\n");
fclose(trace_file);
}
/**
@todo take checkpoint if log applying did some work.
Be sure to not checkpoint if no work.
*/
maria_in_recovery= FALSE;
DBUG_RETURN(res);
}
......@@ -150,7 +141,8 @@ int maria_recover()
/**
@brief Displays and/or applies the log
@param lsn LSN from which log reading/applying should start
@param from_lsn LSN from which log reading/applying should start;
LSN_IMPOSSIBLE means "use last checkpoint"
@param apply if log records should be applied or not
@param trace_file trace file where progress/debug messages will go
......@@ -163,189 +155,90 @@ int maria_recover()
@retval !=0 Error
*/
int maria_apply_log(LSN lsn, my_bool apply, FILE *trace_file)
int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
my_bool should_run_undo_phase)
{
int error= 0;
DBUG_ENTER("maria_apply_log");
DBUG_ASSERT(apply || !should_run_undo_phase);
DBUG_ASSERT(!maria_multi_threaded);
all_active_trans= (struct TRN_FOR_RECOVERY *)
my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct TRN_FOR_RECOVERY),
all_active_trans= (struct st_trn_for_recovery *)
my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery),
MYF(MY_ZEROFILL));
all_tables= (MARIA_HA **)my_malloc((SHARE_ID_MAX + 1) * sizeof(MARIA_HA *),
all_tables= (struct st_table_for_recovery *)
my_malloc((SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery),
MYF(MY_ZEROFILL));
if (!all_active_trans || !all_tables)
goto err;
tracef= trace_file;
/* install hooks for execution */
#define install_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
exec_LOGREC_ ## R;
install_exec_hook(LONG_TRANSACTION_ID);
#ifdef MARIA_CHECKPOINT
install_exec_hook(CHECKPOINT);
#endif
install_exec_hook(REDO_CREATE_TABLE);
install_exec_hook(REDO_DROP_TABLE);
install_exec_hook(FILE_ID);
install_exec_hook(REDO_INSERT_ROW_HEAD);
install_exec_hook(REDO_INSERT_ROW_TAIL);
install_exec_hook(REDO_PURGE_ROW_HEAD);
install_exec_hook(REDO_PURGE_ROW_TAIL);
install_exec_hook(REDO_PURGE_BLOCKS);
install_exec_hook(REDO_DELETE_ALL);
install_exec_hook(UNDO_ROW_INSERT);
install_exec_hook(UNDO_ROW_DELETE);
install_exec_hook(UNDO_ROW_PURGE);
install_exec_hook(COMMIT);
current_group_end_lsn= LSN_IMPOSSIBLE;
TRANSLOG_HEADER_BUFFER rec;
struct st_translog_scanner_data scanner;
uint i= 1;
int len= translog_read_record_header(lsn, &rec);
/** @todo EOF should be detected */
if (len == RECHEADER_READ_ERROR)
{
fprintf(tracef, "Cannot find a first record\n");
goto err;
}
if (translog_init_scanner(lsn, 1, &scanner))
{
fprintf(tracef, "Scanner init failed\n");
goto err;
}
for (;;i++)
{
uint16 sid= rec.short_trid;
const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
display_record_position(log_desc, &rec, i);
/*
A complete group is a set of log records with an "end mark" record
(e.g. a set of REDOs for an operation, terminated by an UNDO for this
operation); if there is no "end mark" record the group is incomplete
and won't be executed.
*/
if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
(log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
{
if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
if (from_lsn == LSN_IMPOSSIBLE)
{
if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
{
/*
can happen if the transaction got a table write error, then
unlocked tables thus wrote a COMMIT record.
*/
fprintf(tracef, "\nDiscarding unfinished group before this record\n");
ALERT_USER();
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
else
if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
{
from_lsn= translog_first_theoretical_lsn();
/*
There is a complete group for this transaction, containing more
than this event.
as far as we have not yet any checkpoint then the very first
log file should be present.
*/
fprintf(tracef, " ends a group:\n");
struct st_translog_scanner_data scanner2;
TRANSLOG_HEADER_BUFFER rec2;
len=
translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2);
if (len < 0) /* EOF or error */
{
fprintf(tracef, "Cannot find record where it should be\n");
if (unlikely((from_lsn == LSN_IMPOSSIBLE) ||
(from_lsn == LSN_ERROR)))
goto err;
}
if (translog_init_scanner(rec2.lsn, 1, &scanner2))
else
{
fprintf(tracef, "Scanner2 init failed\n");
DBUG_ASSERT(0); /* not yet implemented */
from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
if (from_lsn == LSN_IMPOSSIBLE)
goto err;
}
current_group_end_lsn= rec.lsn;
do
{
if (rec2.short_trid == sid) /* it's in our group */
{
const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
display_record_position(log_desc2, &rec2, 0);
if (apply && display_and_apply_record(log_desc2, &rec2))
goto err;
}
len= translog_read_next_record_header(&scanner2, &rec2);
if (len < 0) /* EOF or error */
{
fprintf(tracef, "Cannot find record where it should be\n");
if (run_redo_phase(from_lsn, apply))
goto err;
}
}
while (rec2.lsn < rec.lsn);
translog_free_record_header(&rec2);
/* group finished */
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
display_record_position(log_desc, &rec, 0);
}
}
if (apply && display_and_apply_record(log_desc, &rec))
uint unfinished_trans= end_of_redo_phase(should_run_undo_phase);
if (unfinished_trans == (uint)-1)
goto err;
}
else /* record does not end group */
{
/* just record the fact, can't know if can execute yet */
if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
if (should_run_undo_phase)
{
/* group not yet started */
all_active_trans[sid].group_start_lsn= rec.lsn;
}
}
len= translog_read_next_record_header(&scanner, &rec);
if (len < 0)
{
switch (len)
{
case RECHEADER_READ_EOF:
fprintf(tracef, "EOF on the log\n");
break;
case RECHEADER_READ_ERROR:
fprintf(stderr, "Error reading log\n");
goto err;
}
break;
}
if (run_undo_phase(unfinished_trans))
return 1;
}
translog_free_record_header(&rec);
else if (unfinished_trans > 0)
fprintf(tracef, "WARNING: %u unfinished transactions; some tables may be"
" left inconsistent!\n", unfinished_trans);
/*
So we have applied all REDOs.
We may now have unfinished transactions.
I don't think it's this program's job to roll them back:
to roll back and at the same time stay idempotent, it needs to write log
records (without CLRs, 2nd rollback would hit the effects of first
rollback and fail). But this standalone tool is not allowed to write to
the server's transaction log. So we do not roll back anything.
In the real Recovery code, or the code to do "recover after online
backup", yes we will roll back.
we don't use maria_panic() because it would maria_end(), and Recovery does
not want that (we want to keep modules initialized for runtime).
*/
if (end_of_redo_phase())
if (close_all_tables())
goto err;
/*
At this stage, end of recovery, trnman is left initialized. This is for
the future, when we have an online UNDO phase or prepared transactions.
*/
goto end;
err:
error= 1;
fprintf(tracef, "Recovery of tables with transaction logs FAILED\n");
end:
hash_free(&all_dirty_pages);
bzero(&all_dirty_pages, sizeof(all_dirty_pages));
my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
dirty_pages_pool= NULL;
my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
all_tables= NULL;
my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
all_active_trans= NULL;
my_free(log_record_buffer.str, MYF(MY_ALLOW_ZERO_PTR));
log_record_buffer.str= NULL;
log_record_buffer.length= 0;
/* we don't cleanly close tables if we hit some error (may corrupt them) */
DBUG_RETURN(error);
}
......@@ -360,8 +253,7 @@ static void display_record_position(const LOG_DESC *log_desc,
form a group, so we indent below the group's end record
*/
fprintf(tracef, "%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n",
number ? "" : " ", number,
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn),
number ? "" : " ", number, LSN_IN_HEX(rec->lsn),
rec->short_trid, log_desc->name, rec->type,
(ulong)rec->record_length);
}
......@@ -389,11 +281,10 @@ prototype_exec_hook(LONG_TRANSACTION_ID)
TrID long_trid= all_active_trans[sid].long_trid;
/* abort group of this trn (must be of before a crash) */
LSN gslsn= all_active_trans[sid].group_start_lsn;
char llbuf[22];
if (gslsn != LSN_IMPOSSIBLE)
{
fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
LSN_IN_HEX(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
if (long_trid != 0)
......@@ -401,18 +292,17 @@ prototype_exec_hook(LONG_TRANSACTION_ID)
LSN ulsn= all_active_trans[sid].undo_lsn;
if (ulsn != LSN_IMPOSSIBLE)
{
char llbuf[22];
llstr(long_trid, llbuf);
fprintf(tracef, "Found an old transaction long_trid %s short_trid %u"
" with same short id as this new transaction, and has neither"
" committed nor rollback (undo_lsn: (%lu,0x%lx))\n", llbuf,
sid, (ulong) LSN_FILE_NO(ulsn), (ulong) LSN_OFFSET(ulsn));
sid, LSN_IN_HEX(ulsn));
goto err;
}
}
long_trid= uint6korr(rec->header);
all_active_trans[sid].long_trid= long_trid;
llstr(long_trid, llbuf);
fprintf(tracef, "Transaction long_trid %s short_trid %u starts\n", llbuf, sid);
new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
goto end;
err:
ALERT_USER();
......@@ -422,13 +312,24 @@ end:
}
#ifdef MARIA_CHECKPOINT
prototype_exec_hook(CHECKPOINT)
static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
LSN first_undo_lsn)
{
char llbuf[22];
all_active_trans[sid].long_trid= long_id;
llstr(long_id, llbuf);
fprintf(tracef, "Transaction long_trid %s short_trid %u starts\n",
llbuf, sid);
all_active_trans[sid].undo_lsn= undo_lsn;
all_active_trans[sid].first_undo_lsn= first_undo_lsn;
}
prototype_exec_hook_dummy(CHECKPOINT)
{
/* the only checkpoint we care about was found via control file, ignore */
return 0;
}
#endif
prototype_exec_hook(REDO_CREATE_TABLE)
......@@ -473,9 +374,9 @@ prototype_exec_hook(REDO_CREATE_TABLE)
}
if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than record",
(ulong) LSN_FILE_NO(rec->lsn),
(ulong) LSN_OFFSET(rec->lsn));
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
" record, ignoring",
LSN_IN_HEX(share->state.create_rename_lsn));
error= 0;
goto end;
}
......@@ -488,7 +389,7 @@ prototype_exec_hook(REDO_CREATE_TABLE)
info= NULL;
}
/* if does not exist, is older, or its header is corrupted, overwrite it */
// TODO symlinks
/** @todo symlinks */
ptr= name + strlen(name) + 1;
if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
fprintf(tracef, ", we will only touch index file");
......@@ -512,9 +413,6 @@ prototype_exec_hook(REDO_CREATE_TABLE)
ptr+= 2;
/* set create_rename_lsn (for maria_read_log to be idempotent) */
lsn_store(ptr + sizeof(info->s->state.header) + 2, rec->lsn);
/* we also set is_of_lsn, like maria_create() does */
lsn_store(ptr + sizeof(info->s->state.header) + 2 + LSN_STORE_SIZE,
rec->lsn);
if (my_pwrite(kfile, ptr,
kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
my_chsize(kfile, keystart, 0, MYF(MY_WME)))
......@@ -593,9 +491,9 @@ prototype_exec_hook(REDO_DROP_TABLE)
}
if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than record",
(ulong) LSN_FILE_NO(rec->lsn),
(ulong) LSN_OFFSET(rec->lsn));
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
" record, ignoring",
LSN_IN_HEX(share->state.create_rename_lsn));
error= 0;
goto end;
}
......@@ -634,9 +532,15 @@ prototype_exec_hook(FILE_ID)
{
uint16 sid;
int error= 1;
char *name, *buff;
MARIA_HA *info= NULL;
MARIA_SHARE *share;
const char *name;
MARIA_HA *info;
if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
{
fprintf(tracef, "ignoring because before checkpoint\n");
return 0;
}
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
......@@ -646,21 +550,40 @@ prototype_exec_hook(FILE_ID)
fprintf(tracef, "Failed to read record\n");
goto end;
}
buff= log_record_buffer.str;
sid= fileid_korr(buff);
name= buff + FILEID_STORE_SIZE;
info= all_tables[sid];
sid= fileid_korr(log_record_buffer.str);
info= all_tables[sid].info;
if (info != NULL)
{
all_tables[sid]= NULL;
if (close_recovered_table(info))
fprintf(tracef, " Closing table '%s'\n", info->s->open_file_name);
prepare_table_for_close(info, rec->lsn);
if (maria_close(info))
{
fprintf(tracef, "Failed to close table\n");
goto end;
}
all_tables[sid].info= NULL;
}
name= log_record_buffer.str + FILEID_STORE_SIZE;
if (new_table(sid, name, -1, -1, rec->lsn))
goto end;
error= 0;
end:
return error;
}
static int new_table(uint16 sid, const char *name,
File org_kfile, File org_dfile, LSN lsn)
{
/*
-1 (skip table): close table and return 0;
1 (error): close table and return 1;
0 (success): leave table open and return 0.
*/
int error= 1;
fprintf(tracef, "Table '%s', id %u", name, sid);
info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
MARIA_HA *info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
if (info == NULL)
{
fprintf(tracef, ", is absent (must have been dropped later?)"
......@@ -678,7 +601,7 @@ prototype_exec_hook(FILE_ID)
execute them, we should not reject the crashed table here.
*/
}
share= info->s;
MARIA_SHARE *share= info->s;
/* check that we're not already using it */
DBUG_ASSERT(share->reopen == 1);
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
......@@ -686,10 +609,17 @@ prototype_exec_hook(FILE_ID)
{
fprintf(tracef, ", is not transactional\n");
ALERT_USER();
error= 0;
error= -1;
goto end;
}
if (cmp_translog_addr(lsn, share->state.create_rename_lsn) <= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
" record, ignoring",
LSN_IN_HEX(share->state.create_rename_lsn));
error= -1;
goto end;
}
all_tables[sid]= info;
/* don't log any records for this work */
_ma_tmp_disable_logging_for_table(share);
/* execution of some REDO records relies on data_file_length */
......@@ -703,17 +633,25 @@ prototype_exec_hook(FILE_ID)
}
share->state.state.data_file_length= dfile_len;
share->state.state.key_file_length= kfile_len;
if ((dfile_len == 0) || ((dfile_len % share->block_size) > 0))
if ((dfile_len % share->block_size) > 0)
{
fprintf(tracef, ", has too short last page\n");
/* Recovery will fix this, no error */
ALERT_USER();
}
all_tables[sid].info= info;
all_tables[sid].org_kfile= org_kfile;
all_tables[sid].org_dfile= org_dfile;
fprintf(tracef, ", opened\n");
error= 0;
end:
if (error && info != NULL)
error|= maria_close(info);
if (error)
{
if (info != NULL)
maria_close(info);
if (error == -1)
error= 0;
}
return error;
}
......@@ -766,7 +704,7 @@ end:
prototype_exec_hook(REDO_INSERT_ROW_TAIL)
{
int error= 1;
uchar *buff= NULL;
uchar *buff;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
......@@ -834,11 +772,24 @@ end:
prototype_exec_hook(REDO_PURGE_BLOCKS)
{
int error= 1;
uchar *buff;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
fprintf(tracef, "Failed to read record\n");
goto end;
}
buff= log_record_buffer.str;
if (_ma_apply_redo_purge_blocks(info, current_group_end_lsn,
rec->header + FILEID_STORE_SIZE))
buff + FILEID_STORE_SIZE))
goto end;
error= 0;
end:
......@@ -862,17 +813,18 @@ end:
}
#define set_undo_lsn_for_active_trans(I, L) do { \
all_active_trans[I].undo_lsn= L; \
if (all_active_trans[I].first_undo_lsn == LSN_IMPOSSIBLE) \
all_active_trans[I].first_undo_lsn= L; } while (0)
prototype_exec_hook(UNDO_ROW_INSERT)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
goto end;
all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
/*
todo: instead of above, call write_hook_for_undo, it will also set
first_undo_lsn
*/
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
/*
in an upcoming patch ("recovery of the state"), we introduce
state.is_of_lsn. For now, we just assume the state is old (true when we
......@@ -881,6 +833,13 @@ prototype_exec_hook(UNDO_ROW_INSERT)
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records++;
/** @todo RECOVERY BUG Also update the table's checksum */
/**
@todo some bits below will rather be set when executing UNDOs related
to keys
*/
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
}
fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
error= 0;
......@@ -895,14 +854,12 @@ prototype_exec_hook(UNDO_ROW_DELETE)
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
goto end;
all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
/*
todo: instead of above, call write_hook_for_undo, it will also set
first_undo_lsn
*/
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records--;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
}
fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
error= 0;
......@@ -911,6 +868,23 @@ end:
}
prototype_exec_hook(UNDO_ROW_UPDATE)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
goto end;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
{
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
}
error= 0;
end:
return error;
}
prototype_exec_hook(UNDO_ROW_PURGE)
{
int error= 1;
......@@ -918,14 +892,12 @@ prototype_exec_hook(UNDO_ROW_PURGE)
if (info == NULL)
goto end;
/* this a bit broken, but this log record type will be deleted soon */
all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
/*
todo: instead of above, call write_hook_for_undo, it will also set
first_undo_lsn
*/
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records--;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
}
fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
error= 0;
......@@ -973,77 +945,285 @@ prototype_exec_hook(COMMIT)
}
/* Just to inform about any aborted groups or unfinished transactions */
static int end_of_redo_phase()
static int run_redo_phase(LSN lsn, my_bool apply)
{
/* install hooks for execution */
#define install_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
exec_LOGREC_ ## R;
install_exec_hook(LONG_TRANSACTION_ID);
install_exec_hook(CHECKPOINT);
install_exec_hook(REDO_CREATE_TABLE);
install_exec_hook(REDO_DROP_TABLE);
install_exec_hook(FILE_ID);
install_exec_hook(REDO_INSERT_ROW_HEAD);
install_exec_hook(REDO_INSERT_ROW_TAIL);
install_exec_hook(REDO_PURGE_ROW_HEAD);
install_exec_hook(REDO_PURGE_ROW_TAIL);
install_exec_hook(REDO_PURGE_BLOCKS);
install_exec_hook(REDO_DELETE_ALL);
install_exec_hook(UNDO_ROW_INSERT);
install_exec_hook(UNDO_ROW_DELETE);
install_exec_hook(UNDO_ROW_UPDATE);
install_exec_hook(UNDO_ROW_PURGE);
install_exec_hook(COMMIT);
current_group_end_lsn= LSN_IMPOSSIBLE;
TRANSLOG_HEADER_BUFFER rec;
/*
instead of this block below we will soon use
translog_first_lsn_in_log()...
*/
int len= translog_read_record_header(lsn, &rec);
/** @todo EOF should be detected */
if (len == RECHEADER_READ_ERROR)
{
fprintf(tracef, "Cannot find a first record\n");
return 1;
}
struct st_translog_scanner_data scanner;
if (translog_init_scanner(lsn, 1, &scanner))
{
fprintf(tracef, "Scanner init failed\n");
return 1;
}
uint i;
for (i= 1;;i++)
{
uint16 sid= rec.short_trid;
const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
display_record_position(log_desc, &rec, i);
/*
A complete group is a set of log records with an "end mark" record
(e.g. a set of REDOs for an operation, terminated by an UNDO for this
operation); if there is no "end mark" record the group is incomplete
and won't be executed.
*/
if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
(log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
{
if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
{
if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
{
/*
can happen if the transaction got a table write error, then
unlocked tables thus wrote a COMMIT record.
*/
fprintf(tracef, "\nDiscarding unfinished group before this record\n");
ALERT_USER();
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
else
{
/*
There is a complete group for this transaction, containing more
than this event.
*/
fprintf(tracef, " ends a group:\n");
struct st_translog_scanner_data scanner2;
TRANSLOG_HEADER_BUFFER rec2;
len=
translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2);
if (len < 0) /* EOF or error */
{
fprintf(tracef, "Cannot find record where it should be\n");
return 1;
}
if (translog_init_scanner(rec2.lsn, 1, &scanner2))
{
fprintf(tracef, "Scanner2 init failed\n");
return 1;
}
current_group_end_lsn= rec.lsn;
do
{
if (rec2.short_trid == sid) /* it's in our group */
{
const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
display_record_position(log_desc2, &rec2, 0);
if (apply && display_and_apply_record(log_desc2, &rec2))
return 1;
}
len= translog_read_next_record_header(&scanner2, &rec2);
if (len < 0) /* EOF or error */
{
fprintf(tracef, "Cannot find record where it should be\n");
return 1;
}
}
while (rec2.lsn < rec.lsn);
translog_free_record_header(&rec2);
/* group finished */
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
display_record_position(log_desc, &rec, 0);
}
}
if (apply && display_and_apply_record(log_desc, &rec))
return 1;
}
else /* record does not end group */
{
/* just record the fact, can't know if can execute yet */
if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
{
/* group not yet started */
all_active_trans[sid].group_start_lsn= rec.lsn;
}
}
len= translog_read_next_record_header(&scanner, &rec);
if (len < 0)
{
switch (len)
{
case RECHEADER_READ_EOF:
fprintf(tracef, "EOF on the log\n");
break;
case RECHEADER_READ_ERROR:
fprintf(stderr, "Error reading log\n");
return 1;
}
break;
}
}
translog_free_record_header(&rec);
return 0;
}
/**
@brief Informs about any aborted groups or unfinished transactions,
prepares for the UNDO phase if needed.
@param prepare_for_undo_phase
@note Observe that it may init trnman.
*/
static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
{
uint sid, unfinished= 0, error= 0;
uint sid, unfinished= 0;
hash_free(&all_dirty_pages);
/*
hash_free() can be called multiple times probably, but be safe it that
changes
*/
bzero(&all_dirty_pages, sizeof(all_dirty_pages));
my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
dirty_pages_pool= NULL;
if (prepare_for_undo_phase && trnman_init())
return -1;
for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
{
TrID long_trid= all_active_trans[sid].long_trid;
LSN gslsn= all_active_trans[sid].group_start_lsn;
TRN *trn;
if (gslsn != LSN_IMPOSSIBLE)
{
fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
ALERT_USER();
}
if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
{
char llbuf[22];
llstr(long_trid, llbuf);
fprintf(tracef, "Transaction long_trid %s short_trid %u unfinished\n",
llbuf, sid);
unfinished++;
}
if (gslsn != LSN_IMPOSSIBLE)
/* dummy_transaction_object serves only for DDLs */
DBUG_ASSERT(long_trid != 0);
if (prepare_for_undo_phase)
{
fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
ALERT_USER();
if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL)
return -1;
trn->undo_lsn= all_active_trans[sid].undo_lsn;
}
/* otherwise we will just warn about it */
unfinished++;
}
/* If real recovery: roll back unfinished transaction */
#ifdef MARIA_VERSIONING
/*
If real recovery: transaction was committed, move it to some separate
list for soon purging. Create TRNs.
If real recovery: if transaction was committed, move it to some separate
list for soon purging.
*/
#endif
}
my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
all_active_trans= NULL;
/*
We don't close tables if there are some unfinished transactions, because
closing tables normally requires that all unfinished transactions on them
be rolled back. Unfinished transactions are symptom of a crash, we
reproduce the crash.
For example, closing will soon write the state to disk and when doing that
it will think this is a committed state, but it may not be.
The UNDO phase uses some normal run-time code of ROLLBACK: generates log
records, etc; prepare tables for that
*/
if (unfinished > 0)
fprintf(tracef, "WARNING: %u unfinished transactions; some tables may be"
" left inconsistent!\n", unfinished);
LSN addr= translog_get_horizon();
for (sid= 0; sid <= SHARE_ID_MAX; sid++)
{
MARIA_HA *info= all_tables[sid];
MARIA_HA *info= all_tables[sid].info;
if (info != NULL)
{
/* if error, still close other tables */
error|= close_recovered_table(info);
prepare_table_for_close(info, addr);
/*
But we don't close it; we leave it available for the UNDO phase;
it's likely that the UNDO phase will need it.
*/
if (prepare_for_undo_phase)
translog_assign_id_to_share_from_recovery(info->s, sid);
}
}
return error;
/* we don't need all_tables anymore, maria_open_list is enough */
my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
all_tables= NULL;
/*
We could take a checkpoint here, in case of a crash during the UNDO
phase. The drawback is that a page which got a REDO (thus, flushed
by this would-be checkpoint) is likely to have an UNDO executed on it
soon. And so, the flush was probably lost time.
So for now we prefer to do recovery with maximum speed and take a
checkpoint only at the end of the UNDO phase.
*/
return unfinished;
}
static int close_recovered_table(MARIA_HA *info)
static int run_undo_phase(uint unfinished)
{
if (unfinished > 0)
{
fprintf(tracef, "%u transactions will be rolled back\n", unfinished);
for( ; unfinished-- ; )
{
char llbuf[22];
TRN *trn= trnman_get_any_trn();
DBUG_ASSERT(trn != NULL);
llstr(trn->trid, llbuf);
fprintf(tracef, "Rolling back transaction of long id %s\n", llbuf);
/* of course we miss execution of UNDOs here */
if (trnman_rollback_trn(trn))
return 1;
/* We could want to span a few threads (4?) instead of 1 */
/* In the future, we want to have this phase *online* */
}
}
return 0;
}
static void prepare_table_for_close(MARIA_HA *info,
LSN at_lsn __attribute__ ((unused)))
{
int error;
MARIA_SHARE *share= info->s;
fprintf(tracef, " Closing table '%s'\n", share->open_file_name);
/* we will soon use at_lsn here */
_ma_reenable_logging_for_table(share);
/*
Recovery normally corrected problems, don't scare user with "table was not
closed properly" in CHECK TABLE and don't automatically check table at
next open (when we have --maria-recover).
*/
share->state.open_count= share->global_changed ? 1 : 0;
/* this var is set only by non-recovery operations (mi_write() etc) */
DBUG_ASSERT(!share->global_changed);
if ((error= maria_close(info)))
fprintf(tracef, "Failed to close table\n");
return error;
}
......@@ -1051,16 +1231,22 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
TRANSLOG_HEADER_BUFFER *rec)
{
uint16 sid;
ulonglong page;
pgcache_page_no_t page;
MARIA_HA *info;
char llbuf[22];
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
/* BUG not correct for REDO_PURGE_BLOCKS, page is not at this pos */
/**
@todo RECOVERY BUG
- for REDO_PURGE_BLOCKS, page is not at this pos
- for DELETE_ALL, record ends here! buffer overrun!
Solution: caller should pass a param enum { i_am_about_data_file,
i_am_about_index_file, none }.
*/
llstr(page, llbuf);
fprintf(tracef, " For page %s of table of short id %u", llbuf, sid);
info= all_tables[sid];
info= all_tables[sid].info;
if (info == NULL)
{
fprintf(tracef, ", table skipped, so skipping record\n");
......@@ -1069,23 +1255,31 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
fprintf(tracef, ", '%s'", info->s->open_file_name);
/* detect if an open instance of a dropped table (internal bug) */
DBUG_ASSERT(info->s->last_version != 0);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
/**
@todo RECOVERY BUG always assuming this is REDO for data file, but it
could soon be index file
*/
uint64 file_and_page_id=
(((uint64)all_tables[sid].org_dfile) << 32) | page;
struct st_dirty_page *dirty_page= (struct st_dirty_page *)
hash_search(&all_dirty_pages,
(uchar *)&file_and_page_id, sizeof(file_and_page_id));
if ((dirty_page == NULL) ||
cmp_translog_addr(rec->lsn, dirty_page->rec_lsn) < 0)
{
fprintf(tracef, ", ignoring because of dirty_pages list\n");
return NULL;
}
fprintf(tracef, ", applying record\n");
return info;
}
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint). Btw rec_lsn and bitmap's recovery is a
an unsolved problem (rec_lsn is to ignore a REDO without reading the data
page and to do so we need to be sure the corresponding bitmap page does
not need a _ma_bitmap_set()).
So we are going to read the page, and if its LSN is older than the
record's we will modify the page
*/
fprintf(tracef, ", applying record\n");
return info;
}
......@@ -1097,7 +1291,7 @@ static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
sid= fileid_korr(rec->header + LSN_STORE_SIZE);
fprintf(tracef, " For table of short id %u", sid);
info= all_tables[sid];
info= all_tables[sid].info;
if (info == NULL)
{
fprintf(tracef, ", table skipped, so skipping record\n");
......@@ -1105,24 +1299,177 @@ static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
}
fprintf(tracef, ", '%s'", info->s->open_file_name);
DBUG_ASSERT(info->s->last_version != 0);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
return NULL;
}
fprintf(tracef, ", applying record\n");
return info;
}
static int parse_checkpoint_record(LSN lsn)
{
uint i;
TRANSLOG_HEADER_BUFFER rec;
fprintf(tracef, "Loading data from checkpoint record\n");
int len= translog_read_record_header(lsn, &rec);
/** @todo EOF should be detected */
if (len == RECHEADER_READ_ERROR)
{
fprintf(tracef, "Cannot find checkpoint record where it should be\n");
return 1;
}
enlarge_buffer(&rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec.lsn, 0, rec.record_length,
log_record_buffer.str, NULL) !=
rec.record_length)
{
fprintf(tracef, "Failed to read record\n");
return 1;
}
char *ptr= log_record_buffer.str;
checkpoint_start= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
/* transactions */
uint nb_active_transactions= uint2korr(ptr);
ptr+= 2;
fprintf(tracef, "%u active transactions\n", nb_active_transactions);
LSN minimum_rec_lsn_of_active_transactions= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint).
how much brain juice and discussions there was to come to writing this
line
*/
set_if_smaller(checkpoint_start, minimum_rec_lsn_of_active_transactions);
for (i= 0; i < nb_active_transactions; i++)
{
uint16 sid= uint2korr(ptr);
ptr+= 2;
TrID long_id= uint6korr(ptr);
ptr+= 6;
DBUG_ASSERT(sid > 0 && long_id > 0);
LSN undo_lsn= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
LSN first_undo_lsn= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
new_transaction(sid, long_id, undo_lsn, first_undo_lsn);
}
uint nb_committed_transactions= uint4korr(ptr);
ptr+= 4;
fprintf(tracef, "%lu committed transactions\n",
(ulong)nb_committed_transactions);
/* no purging => committed transactions are not important */
ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions;
/* tables */
uint nb_tables= uint4korr(ptr);
fprintf(tracef, "%u open tables\n", nb_tables);
for (i= 0; i< nb_tables; i++)
{
char name[FN_REFLEN];
uint16 sid= uint2korr(ptr);
ptr+= 2;
DBUG_ASSERT(sid > 0);
File kfile= uint4korr(ptr);
ptr+= 4;
File dfile= uint4korr(ptr);
ptr+= 4;
LSN first_log_write_lsn= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
uint name_len= strlen(ptr) + 1;
ptr+= name_len;
strnmov(name, ptr, sizeof(name));
if (new_table(sid, name, kfile, dfile, first_log_write_lsn))
return 1;
}
/* dirty pages */
uint nb_dirty_pages= uint4korr(ptr);
ptr+= 4;
if (hash_init(&all_dirty_pages, &my_charset_bin, nb_dirty_pages,
offsetof(struct st_dirty_page, file_and_page_id),
sizeof(((struct st_dirty_page *)NULL)->file_and_page_id),
NULL, NULL, 0))
return 1;
dirty_pages_pool=
(struct st_dirty_page *)my_malloc(nb_dirty_pages *
sizeof(struct st_dirty_page),
MYF(MY_WME));
if (unlikely(dirty_pages_pool == NULL))
return 1;
struct st_dirty_page *next_dirty_page_in_pool= dirty_pages_pool;
LSN minimum_rec_lsn_of_dirty_pages= LSN_MAX;
for (i= 0; i < nb_dirty_pages ; i++)
{
File fileid= uint4korr(ptr);
ptr+= 4;
pgcache_page_no_t pageid= uint4korr(ptr);
ptr+= 4;
LSN rec_lsn= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
if (new_page(fileid, pageid, rec_lsn, next_dirty_page_in_pool++))
return 1;
set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn);
}
/* after that, there will be no insert/delete into the hash */
/*
sanity check on record (did we screw up with all those "ptr+=", did the
checkpoint write code and checkpoint read code go out of sync?).
*/
/**
@todo This probably presently and hopefully detects that
first_log_write_lsn is not written by the checkpoint record; we need
to add MARIA_SHARE::first_log_write_lsn, fill it with a inwrite-hook of
LOGREC_FILE_ID (note that when we write this record we hold intern_lock,
so Checkpoint will read the LSN correctly), and store it in the
checkpoint record.
*/
if (ptr != (log_record_buffer.str + log_record_buffer.length))
{
fprintf(tracef, "checkpoint record corrupted\n");
return 1;
}
set_if_smaller(checkpoint_start, minimum_rec_lsn_of_dirty_pages);
return 0;
}
static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
struct st_dirty_page *dirty_page)
{
/* serves as hash key */
dirty_page->file_and_page_id= (((uint64)fileid) << 32) | pageid;
dirty_page->rec_lsn= rec_lsn;
return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page);
}
static int close_all_tables()
{
int error= 0;
LIST *list_element, *next_open;
MARIA_HA *info;
pthread_mutex_lock(&THR_LOCK_maria);
if (maria_open_list == NULL)
goto end;
fprintf(tracef, "Closing all tables\n");
for (list_element= maria_open_list ; list_element ; list_element= next_open)
{
next_open= list_element->next;
info= (MARIA_HA*)list_element->data;
pthread_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */
error|= maria_close(info);
pthread_mutex_lock(&THR_LOCK_maria);
}
end:
pthread_mutex_unlock(&THR_LOCK_maria);
return error;
}
/* some comments and pseudo-code which we keep for later */
#if 0
......
......@@ -25,5 +25,6 @@
C_MODE_START
int maria_recover();
int maria_apply_log(LSN lsn, my_bool applyn, FILE *trace_file);
int maria_apply_log(LSN lsn, my_bool apply, FILE *trace_file,
my_bool execute_undo_phase);
C_MODE_END
......@@ -66,7 +66,8 @@ int main(int argc,char *argv[])
TRANSLOG_PAGE_SIZE) == 0) ||
translog_init(maria_data_root, TRANSLOG_FILE_SIZE,
0, 0, maria_log_pagecache,
TRANSLOG_DEFAULT_FLAGS))
TRANSLOG_DEFAULT_FLAGS) ||
(transactional && trnman_init()))
{
fprintf(stderr, "Error in initialization");
exit(1);
......@@ -180,6 +181,8 @@ static int run_test(const char *filename)
if (!silent)
printf("- Writing key:s\n");
if (maria_begin(file))
goto err;
my_errno=0;
row_count=deleted=0;
for (i=49 ; i>=1 ; i-=2 )
......@@ -266,8 +269,14 @@ static int run_test(const char *filename)
if (!silent)
printf("- Reopening file\n");
if (maria_close(file)) goto err;
if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED))) goto err;
if (maria_commit(file))
goto err;
if (maria_close(file))
goto err;
if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED)))
goto err;
if (maria_begin(file))
goto err;
if (!skip_delete)
{
if (!silent)
......@@ -354,6 +363,8 @@ static int run_test(const char *filename)
i-1,error,my_errno,read_record+1);
}
}
if (maria_commit(file))
goto err;
if (maria_close(file))
goto err;
maria_end();
......@@ -622,7 +633,7 @@ static struct my_option my_long_options[] =
0, 0, 0, 0, 0, 0},
{"unique", 'C', "Undocumented", (uchar**) &opt_unique,
(uchar**) &opt_unique, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"update-rows", 'u', "Undocumented", (uchar**) &update_count,
{"update-rows", 'u', "Max number of rows to update", (uchar**) &update_count,
(uchar**) &update_count, 0, GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0},
{"verbose", 'v', "Be more verbose", (uchar**) &verbose,
(uchar**) &verbose, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
......
......@@ -244,13 +244,15 @@ int main(int argc, char *argv[])
if (opt_quick_mode)
maria_extra(file,HA_EXTRA_QUICK,0);
maria_begin(file);
for (i=0 ; i < recant ; i++)
{
ulong blob_length;
#if 0
/*
Starting from i==72, there was a difference between runtime and
log-appplying. This is now fixed, by not using non_header_data_len in
log-applying. This is now fixed, by not using non_header_data_len in
log-applying.
*/
if (i == 72) goto end;
......@@ -890,8 +892,14 @@ int main(int argc, char *argv[])
goto err;
}
end:
if (maria_commit(file))
goto err;
if (maria_close(file))
{
file= 0;
goto err;
}
file= 0;
maria_panic(HA_PANIC_CLOSE); /* Should close log */
if (!silent)
{
......@@ -933,7 +941,11 @@ reads: %10lu\n",
err:
printf("got error: %d when using MARIA-database\n",my_errno);
if (file)
{
if (maria_commit(file))
goto err;
VOID(maria_close(file));
}
maria_end();
return(1);
} /* main */
......
......@@ -143,6 +143,10 @@ run_repair_tests()
$maria_path/maria_chk$suffix -se test2
$maria_path/maria_chk$suffix -s --parallel-recover --quick test2
$maria_path/maria_chk$suffix -se test2
$maria_path/ma_test2$suffix $silent -c $row_type
$maria_path/maria_chk$suffix -se test2
$maria_path/maria_chk$suffix -sr test2
$maria_path/maria_chk$suffix -se test2
}
run_pack_tests()
......
#!/bin/sh
set -e
silent="-s"
if [ -z "$maria_path" ]
......@@ -5,6 +7,13 @@ then
maria_path="."
fi
tmp=$maria_path/tmp
if test '!' -d $tmp
then
mkdir $tmp
fi
echo "MARIA RECOVERY TESTS - success is if exit code is 0"
# runs a program inserting/deleting rows, then moves the resulting table
......@@ -12,30 +21,44 @@ echo "MARIA RECOVERY TESTS - success is if exit code is 0"
# identical to the saved original.
# Does not test the index file as we don't have logging for it yet.
for prog in "$maria_path/ma_test1 $silent -M -T --skip-update -c" "$maria_path/ma_test2 $silent -L -K -W -P -M -T -g -c"
for prog in "$maria_path/ma_test1 $silent -M -T -c" "$maria_path/ma_test2 $silent -L -K -W -P -M -T -c" "$maria_path/ma_test2 $silent -M -T -c -b"
do
rm -f maria_log*
rm -f maria_log.* maria_log_control
echo "TEST WITH $prog"
$prog
# derive table's name from program's name
table=`echo $prog | sed -e 's;.*ma_\(test[0-9]\).*;\1;' `
$maria_path/maria_chk -dvv $table > maria_chk_message.good.txt 2>&1
mv -f $table.MAD $table.MAD.good
$maria_path/maria_chk -dvv $table > $tmp/maria_chk_message.good.txt 2>&1
checksum=`$maria_path/maria_chk -dss $table`
mv -f $table.MAD $tmp/$table.MAD.good
rm $table.MAI
echo "applying log"
$maria_path/maria_read_log -a > maria_read_log_$table.txt
cmp $table.MAD $table.MAD.good
$maria_path/maria_chk -dvv $table > maria_chk_message.txt 2>&1
$maria_path/maria_read_log -a > $tmp/maria_read_log_$table.txt
$maria_path/maria_chk -dvv $table > $tmp/maria_chk_message.txt 2>&1
cmp $table.MAD $tmp/$table.MAD.good
# QQ: Remove the following line when we also can recovert the index file
$maria_path/maria_chk -s -r $table
$maria_path/maria_chk -s -e $table
checksum2=`$maria_path/maria_chk -dss $table`
if test "$checksum" != "$checksum2"
then
echo "checksum differs for $table before and after recovery"
exit 1;
fi
# When "recovery of the table's state" is ready, we can test it like this:
# diff maria_chk_message.good.txt maria_chk_message.txt >maria_chk_diff.txt || true
# if [ -s maria_chk_diff.txt ]
# diff $tmp/maria_chk_message.good.txt $tmp/maria_chk_message.txt > $tmp/maria_chk_diff.txt || true
# if [ -s $tmp/maria_chk_diff.txt ]
# then
# echo "Differences in maria_chk -dvv, recovery not yet perfect !"
# echo "========DIFF START======="
# cat maria_chk_diff.txt
# cat $tmp/maria_chk_diff.txt
# echo "========DIFF END======="
# fi
rm -f $table.* maria_chk_*.txt maria_read_log_$table.txt
rm -f $table.* $tmp/maria_chk_*.txt $tmp/maria_read_log_$table.txt
done
echo "ALL RECOVERY TESTS OK"
......@@ -115,7 +115,7 @@ int main(int argc, char **argv)
(!(check_param.testflag & (T_REP | T_REP_BY_SORT | T_SORT_RECORDS |
T_SORT_INDEX))))
{
uint old_testflag=check_param.testflag;
ulonglong old_testflag=check_param.testflag;
if (!(check_param.testflag & T_REP))
check_param.testflag|= T_REP_BY_SORT;
check_param.testflag&= ~T_EXTEND; /* Don't needed */
......@@ -126,7 +126,8 @@ int main(int argc, char **argv)
}
else
error|=new_error;
if (argc && (!(check_param.testflag & T_SILENT) || check_param.testflag & T_INFO))
if (argc && (!(check_param.testflag & T_SILENT) ||
check_param.testflag & T_INFO))
{
puts("\n---------\n");
VOID(fflush(stdout));
......@@ -1034,7 +1035,7 @@ static int maria_chk(HA_CHECK *param, char *filename)
that it will have to find and store it.
*/
if (share->base.born_transactional)
share->state.create_rename_lsn= (LSN)ULONGLONG_MAX;
share->state.create_rename_lsn= LSN_REPAIRED_BY_MARIA_CHK;
if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) &&
(maria_is_any_key_active(share->state.key_map) ||
(rep_quick && !param->keys_in_use && !recreate)) &&
......@@ -1236,6 +1237,16 @@ static void descript(HA_CHECK *param, register MARIA_HA *info, char *name)
char llbuff[22],llbuff2[22];
DBUG_ENTER("describe");
if (param->testflag & T_VERY_SILENT)
{
longlong checksum= info->state->checksum;
if (!(share->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD)))
checksum= 0;
printf("%s %s %s\n", name, llstr(info->state->records,llbuff),
llstr(checksum, llbuff2));
DBUG_VOID_RETURN;
}
printf("\nMARIA file: %s\n",name);
printf("Record format: %s\n", record_formats[share->data_file_type]);
printf("Character set: %s (%d)\n",
......
......@@ -894,6 +894,7 @@ void _ma_restore_status(void *param);
void _ma_copy_status(void *to, void *from);
my_bool _ma_check_status(void *param);
void _ma_reset_status(MARIA_HA *maria);
#include "ma_commit.h"
extern MARIA_HA *_ma_test_if_reopen(char *filename);
my_bool _ma_check_table_is_closed(const char *name, const char *where);
......
......@@ -51,7 +51,7 @@ int main(int argc, char **argv)
goto err;
}
/* we don't want to create a control file, it MUST exist */
if (ma_control_file_create_or_open(FALSE))
if (ma_control_file_create_or_open())
{
fprintf(stderr, "Can't open control file (%d)\n", errno);
goto err;
......@@ -93,7 +93,8 @@ int main(int argc, char **argv)
*/
fprintf(stdout, "TRACE of the last maria_read_log\n");
if (maria_apply_log(lsn, opt_display_and_apply, stdout))
/* Until we have UNDO records, no UNDO phase */
if (maria_apply_log(lsn, opt_display_and_apply, stdout, FALSE))
goto err;
fprintf(stdout, "%s: SUCCESS\n", my_progname);
......@@ -113,6 +114,8 @@ end:
static struct my_option my_long_options[] =
{
{"help", '?', "Display this help and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{"only-display", 'o', "display brief info about records's header",
(uchar **) &opt_only_display, (uchar **) &opt_only_display, 0, GET_BOOL,
NO_ARG,0, 0, 0, 0, 0, 0},
......@@ -161,6 +164,9 @@ get_one_option(int optid __attribute__((unused)),
char *argument __attribute__((unused)))
{
switch (optid) {
case '?':
usage();
exit(0);
#ifndef DBUG_OFF
case '#':
DBUG_SET_INITIAL(argument ? argument : default_dbug_option);
......
......@@ -18,6 +18,7 @@
#include <my_sys.h>
#include <m_string.h>
#include "trnman.h"
#include "ma_control_file.h"
/*
status variables:
......@@ -708,3 +709,29 @@ end:
pthread_mutex_unlock(&LOCK_trn_list);
DBUG_RETURN(error);
}
TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid)
{
TrID old_trid_generator= global_trid_generator;
TRN *trn;
DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded);
if (unlikely((trn= trnman_new_trn(NULL, NULL, NULL)) == NULL))
return NULL;
/* deallocate excessive allocations of trnman_new_trn() */
global_trid_generator= old_trid_generator;
set_if_bigger(global_trid_generator, longid);
short_trid_to_active_trn[trn->short_id]= 0;
DBUG_ASSERT(short_trid_to_active_trn[shortid] == NULL);
short_trid_to_active_trn[shortid]= trn;
trn->trid= longid;
trn->short_id= shortid;
return trn;
}
TRN *trnman_get_any_trn()
{
TRN *trn= active_list_min.next;
return (trn != &active_list_max) ? trn : NULL;
}
......@@ -53,6 +53,8 @@ uint trnman_increment_locked_tables(TRN *trn);
uint trnman_decrement_locked_tables(TRN *trn);
my_bool trnman_has_locked_tables(TRN *trn);
void trnman_reset_locked_tables(TRN *trn);
TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid);
TRN *trnman_get_any_trn();
C_MODE_END
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment