Commit c9c58163 authored by unknown's avatar unknown

Remove SAFE_MODE for opt_range as it disables UPDATE to use keys

REDO optimization (Bascily avoid moving blocks from/to pagecache)
More command line arguments to maria_read_log
Fixed recovery bug when recreating table


sql/opt_range.cc:
  Remove SAFE_MODE for opt_range as it disables UPDATE to use keys
storage/maria/ma_blockrec.c:
  REDO optimization
  Use new interface for pagecache_reads to avoid copying page buffers
storage/maria/ma_loghandler.c:
  Patch from Sanja:
  - Added new parameter to translog_get_page to use direct links to pagecache
  - Changed scanner to be able to use direct links
  
  This avoids a lot of calls to bmove512() in page cache.
storage/maria/ma_loghandler.h:
  Added direct link to pagecache objects
storage/maria/ma_open.c:
  Added const to parameter
  Added missing braces
storage/maria/ma_pagecache.c:
  From Sanja:
  - Added direct links to pagecache (from pagecache_read())
    Dirrect link means that on pagecache_read we get back a pointer to the pagecache buffer
  
  
  From Monty:
  - Fixed arguments to init_page_cache to handle big page caches
  - Fixed compiler warnings
  - Replaced PAGECACHE_PAGE_LINK with PAGECACHE_BLOCK_LINK * to catch errors
storage/maria/ma_pagecache.h:
  Changed block numbers from int to long to be able to handle big page caches
  Changed some PAGECACHE_PAGE_LINK to PAGECACHE_BLOCK_LINK
storage/maria/ma_recovery.c:
  Fixed recovery bug when recreating table (table was kept open)
  Moved some variables to function start (portability)
  Added space to some print messages
storage/maria/maria_chk.c:
  key_buffer_size -> page_buffer_size
storage/maria/maria_def.h:
  Changed default page_buffer_size to 10M
storage/maria/maria_read_log.c:
  Added more startup options:
  --version
  --undo (apply undo)
  --page_cache_size (to run with big cache sizes)
  --silent (to not get any output from --apply)
storage/maria/unittest/ma_control_file-t.c:
  Fixed compiler warning
storage/maria/unittest/ma_test_loghandler-t.c:
  Added new argument to translog_init_scanner()
storage/maria/unittest/ma_test_loghandler_multigroup-t.c:
  Added new argument to translog_init_scanner()
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
  Added new argument to translog_init_scanner()
parent 8b5dddbc
...@@ -2130,9 +2130,6 @@ int SQL_SELECT::test_quick_select(THD *thd, key_map keys_to_use, ...@@ -2130,9 +2130,6 @@ int SQL_SELECT::test_quick_select(THD *thd, key_map keys_to_use,
quick=0; quick=0;
needed_reg.clear_all(); needed_reg.clear_all();
quick_keys.clear_all(); quick_keys.clear_all();
if ((specialflag & SPECIAL_SAFE_MODE) && ! force_quick_range ||
!limit)
DBUG_RETURN(0); /* purecov: inspected */
if (keys_to_use.is_clear_all()) if (keys_to_use.is_clear_all())
DBUG_RETURN(0); DBUG_RETURN(0);
records= head->file->stats.records; records= head->file->stats.records;
......
...@@ -4609,6 +4609,9 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4609,6 +4609,9 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint block_size= share->block_size; uint block_size= share->block_size;
uint rec_offset; uint rec_offset;
uchar *buff= info->keyread_buff, *dir; uchar *buff= info->keyread_buff, *dir;
MARIA_PINNED_PAGE page_link;
enum pagecache_page_lock unlock_method;
enum pagecache_page_pin unpin_method;
DBUG_ENTER("_ma_apply_redo_insert_row_head_or_tail"); DBUG_ENTER("_ma_apply_redo_insert_row_head_or_tail");
info->keyread_buff_used= 1; info->keyread_buff_used= 1;
...@@ -4635,26 +4638,31 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4635,26 +4638,31 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
empty_space= (block_size - PAGE_OVERHEAD_SIZE); empty_space= (block_size - PAGE_OVERHEAD_SIZE);
rec_offset= PAGE_HEADER_SIZE; rec_offset= PAGE_HEADER_SIZE;
dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE; dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
unlock_method= PAGECACHE_LOCK_LEFT_UNLOCKED;
unpin_method= PAGECACHE_PIN_LEFT_UNPINNED;
} }
else else
{ {
uint max_entry; uint max_entry;
if (!(buff= pagecache_read(share->pagecache, if (!(buff= pagecache_read(share->pagecache, &info->dfile,
&info->dfile, page, 0, 0,
page, 0, PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
buff, PAGECACHE_PLAIN_PAGE, &page_link.link)))
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
if (lsn_korr(buff) >= lsn) if (lsn_korr(buff) >= lsn) /* Test if already applied */
{ {
/* Already applied */ pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
/* Fix bitmap, just in case */ /* Fix bitmap, just in case */
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET); empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space)) if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
unlock_method= PAGECACHE_LOCK_WRITE_UNLOCK;
unpin_method= PAGECACHE_UNPIN;
max_entry= (uint) ((uchar*) buff)[DIR_COUNT_OFFSET]; max_entry= (uint) ((uchar*) buff)[DIR_COUNT_OFFSET];
if (((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != page_type)) if (((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != page_type))
...@@ -4725,8 +4733,7 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4725,8 +4733,7 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
if (pagecache_write(share->pagecache, if (pagecache_write(share->pagecache,
&info->dfile, page, 0, &info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE, buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, unlock_method, unpin_method,
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DELAY, 0)) PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
...@@ -4747,6 +4754,11 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4747,6 +4754,11 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
DBUG_RETURN(0); DBUG_RETURN(0);
err: err:
if (unlock_method == PAGECACHE_LOCK_WRITE_UNLOCK)
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD); DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
} }
...@@ -4778,6 +4790,8 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4778,6 +4790,8 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint rownr, empty_space; uint rownr, empty_space;
uint block_size= share->block_size; uint block_size= share->block_size;
uchar *buff= info->keyread_buff; uchar *buff= info->keyread_buff;
int result;
MARIA_PINNED_PAGE page_link;
DBUG_ENTER("_ma_apply_redo_purge_row_head_or_tail"); DBUG_ENTER("_ma_apply_redo_purge_row_head_or_tail");
page= page_korr(header); page= page_korr(header);
...@@ -4788,11 +4802,10 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4788,11 +4802,10 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
info->keyread_buff_used= 1; info->keyread_buff_used= 1;
if (!(buff= pagecache_read(share->pagecache, if (!(buff= pagecache_read(share->pagecache, &info->dfile,
&info->dfile, page, 0, 0,
page, 0, PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
buff, PAGECACHE_PLAIN_PAGE, &page_link.link)))
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
if (lsn_korr(buff) >= lsn) if (lsn_korr(buff) >= lsn)
...@@ -4802,6 +4815,11 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4802,6 +4815,11 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
Note that in case the page is not anymore a head or tail page Note that in case the page is not anymore a head or tail page
a future redo will fix the bitmap. a future redo will fix the bitmap.
*/ */
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type) if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type)
{ {
empty_space= uint2korr(buff+EMPTY_SPACE_OFFSET); empty_space= uint2korr(buff+EMPTY_SPACE_OFFSET);
...@@ -4815,22 +4833,30 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4815,22 +4833,30 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == (uchar) page_type); DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == (uchar) page_type);
if (delete_dir_entry(buff, block_size, rownr, &empty_space) < 0) if (delete_dir_entry(buff, block_size, rownr, &empty_space) < 0)
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD); goto err;
lsn_store(buff, lsn); lsn_store(buff, lsn);
result= 0;
if (pagecache_write(share->pagecache, if (pagecache_write(share->pagecache,
&info->dfile, page, 0, &info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE, buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, PAGECACHE_LOCK_WRITE_UNLOCK, PAGECACHE_UNPIN,
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DELAY, 0)) PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(my_errno); result= my_errno;
/* This will work even if the page was marked as UNALLOCATED_PAGE */ /* This will work even if the page was marked as UNALLOCATED_PAGE */
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space)) if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno); result= my_errno;
DBUG_RETURN(result);
err:
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
DBUG_RETURN(0);
} }
...@@ -4872,16 +4898,21 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info, ...@@ -4872,16 +4898,21 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
for (i= 0; i < page_range ; i++) for (i= 0; i < page_range ; i++)
{ {
MARIA_PINNED_PAGE page_link;
if (!(buff= pagecache_read(share->pagecache, if (!(buff= pagecache_read(share->pagecache,
&info->dfile, &info->dfile,
page+i, 0, page+i, 0,
buff, PAGECACHE_PLAIN_PAGE, buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) PAGECACHE_LOCK_WRITE, &page_link.link)))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
if (lsn_korr(buff) >= lsn) if (lsn_korr(buff) >= lsn)
{ {
/* Already applied */ /* Already applied */
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
continue; continue;
} }
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE; buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
...@@ -4889,8 +4920,7 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info, ...@@ -4889,8 +4920,7 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
if (pagecache_write(share->pagecache, if (pagecache_write(share->pagecache,
&info->dfile, page+i, 0, &info->dfile, page+i, 0,
buff, PAGECACHE_PLAIN_PAGE, buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, PAGECACHE_LOCK_WRITE_UNLOCK, PAGECACHE_UNPIN,
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DELAY, 0)) PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
} }
......
...@@ -2296,21 +2296,21 @@ my_bool translog_unlock() ...@@ -2296,21 +2296,21 @@ my_bool translog_unlock()
} }
/* /**
Get log page by file number and offset of the beginning of the page @brief Get log page by file number and offset of the beginning of the page
SYNOPSIS @param data validator data, which contains the page address
translog_get_page() @param buffer buffer for page placing
data validator data, which contains the page address
buffer buffer for page placing
(might not be used in some cache implementations) (might not be used in some cache implementations)
@param direct_link if it is not NULL then caller can accept direct
link to the page cache
RETURN @retval NULL Error
NULL - Error @retval # pointer to the page cache which should be used to read this page
# pointer to the page cache which should be used to read this page
*/ */
static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer) static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
PAGECACHE_PAGE_LINK *direct_link)
{ {
TRANSLOG_ADDRESS addr= *(data->addr), in_buffers; TRANSLOG_ADDRESS addr= *(data->addr), in_buffers;
uint cache_index; uint cache_index;
...@@ -2324,6 +2324,9 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer) ...@@ -2324,6 +2324,9 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
/* it is really page address */ /* it is really page address */
DBUG_ASSERT(LSN_OFFSET(addr) % TRANSLOG_PAGE_SIZE == 0); DBUG_ASSERT(LSN_OFFSET(addr) % TRANSLOG_PAGE_SIZE == 0);
if (direct_link)
*direct_link= NULL;
in_buffers= translog_only_in_buffers(); in_buffers= translog_only_in_buffers();
DBUG_PRINT("info", ("in_buffers: (%lu,0x%lx)", DBUG_PRINT("info", ("in_buffers: (%lu,0x%lx)",
LSN_IN_PARTS(in_buffers))); LSN_IN_PARTS(in_buffers)));
...@@ -2336,7 +2339,9 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer) ...@@ -2336,7 +2339,9 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
if (cmp_translog_addr(addr, in_buffers) >= 0) if (cmp_translog_addr(addr, in_buffers) >= 0)
{ {
uint16 buffer_no= log_descriptor.bc.buffer_no; uint16 buffer_no= log_descriptor.bc.buffer_no;
#ifndef DBUG_OFF
uint16 buffer_start= buffer_no; uint16 buffer_start= buffer_no;
#endif
struct st_translog_buffer *buffer_unlock= log_descriptor.bc.buffer; struct st_translog_buffer *buffer_unlock= log_descriptor.bc.buffer;
struct st_translog_buffer *curr_buffer= log_descriptor.bc.buffer; struct st_translog_buffer *curr_buffer= log_descriptor.bc.buffer;
for (;;) for (;;)
...@@ -2437,13 +2442,23 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer) ...@@ -2437,13 +2442,23 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
} }
file.file= log_descriptor.log_file_num[cache_index]; file.file= log_descriptor.log_file_num[cache_index];
buffer= (uchar*) buffer=
pagecache_valid_read(log_descriptor.pagecache, &file, (uchar*) (direct_link ?
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE, pagecache_valid_read(log_descriptor.pagecache, &file,
3, (char*) buffer, LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
PAGECACHE_PLAIN_PAGE, 3, NULL,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0, PAGECACHE_PLAIN_PAGE,
&translog_page_validator, (uchar*) data); PAGECACHE_LOCK_READ, direct_link,
&translog_page_validator, (uchar*) data) :
pagecache_valid_read(log_descriptor.pagecache, &file,
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
3, (char*) buffer,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, direct_link,
&translog_page_validator, (uchar*) data));
DBUG_PRINT("info", ("Direct link is assigned to : 0x%lx * 0x%lx",
(ulong) direct_link,
(ulong)(direct_link ? *direct_link : NULL)));
} }
else else
{ {
...@@ -2468,6 +2483,24 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer) ...@@ -2468,6 +2483,24 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
DBUG_RETURN(buffer); DBUG_RETURN(buffer);
} }
/**
@brief free direct log page link
@param direct_link the direct log page link to be freed
*/
static void translog_free_link(PAGECACHE_PAGE_LINK *direct_link)
{
DBUG_ENTER("translog_free_link");
DBUG_PRINT("info", ("Direct link: 0x%lx",
(ulong) direct_link));
if (direct_link)
pagecache_unlock_by_link(log_descriptor.pagecache, direct_link,
PAGECACHE_LOCK_READ_UNLOCK, PAGECACHE_UNPIN,
LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
DBUG_VOID_RETURN;
}
/* /*
Finds last page of the given log file Finds last page of the given log file
...@@ -2796,7 +2829,7 @@ my_bool translog_init(const char *directory, ...@@ -2796,7 +2829,7 @@ my_bool translog_init(const char *directory,
TRANSLOG_VALIDATOR_DATA data; TRANSLOG_VALIDATOR_DATA data;
uchar buffer[TRANSLOG_PAGE_SIZE], *page; uchar buffer[TRANSLOG_PAGE_SIZE], *page;
data.addr= &current_page; data.addr= &current_page;
if ((page= translog_get_page(&data, buffer)) == NULL) if ((page= translog_get_page(&data, buffer, NULL)) == NULL)
DBUG_RETURN(1); DBUG_RETURN(1);
if (data.was_recovered) if (data.was_recovered)
{ {
...@@ -2848,7 +2881,7 @@ my_bool translog_init(const char *directory, ...@@ -2848,7 +2881,7 @@ my_bool translog_init(const char *directory,
/* continue old log */ /* continue old log */
DBUG_ASSERT(LSN_FILE_NO(last_valid_page)== DBUG_ASSERT(LSN_FILE_NO(last_valid_page)==
LSN_FILE_NO(log_descriptor.horizon)); LSN_FILE_NO(log_descriptor.horizon));
if ((page= translog_get_page(&data, buffer)) == NULL || if ((page= translog_get_page(&data, buffer, NULL)) == NULL ||
(chunk_offset= translog_get_first_chunk_offset(page)) == 0) (chunk_offset= translog_get_first_chunk_offset(page)) == 0)
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -5153,24 +5186,54 @@ static my_bool translog_scanner_set_last_page(TRANSLOG_SCANNER_DATA ...@@ -5153,24 +5186,54 @@ static my_bool translog_scanner_set_last_page(TRANSLOG_SCANNER_DATA
} }
/* /**
Initialize reader scanner @brief Get page from page cache according to requested method
SYNOPSIS @param scanner The scanner data
translog_init_scanner()
lsn LSN with which it have to be inited @return operation status
fixed_horizon true if it is OK do not read records which was written @retval 0 OK
@retval 1 Error
*/
static my_bool
translog_scanner_get_page(TRANSLOG_SCANNER_DATA *scanner)
{
TRANSLOG_VALIDATOR_DATA data;
DBUG_ENTER("translog_scanner_get_page");
data.addr= &scanner->page_addr;
data.was_recovered= 0;
DBUG_RETURN((scanner->page=
translog_get_page(&data, scanner->buffer,
(scanner->use_direct_link ?
&scanner->direct_link :
NULL))) ==
NULL);
}
/**
@brief Initialize reader scanner.
@param lsn LSN with which it have to be inited
@param fixed_horizon true if it is OK do not read records which was written
after scanning beginning after scanning beginning
scanner scanner which have to be inited @param scanner scanner which have to be inited
@param use_direct prefer using direct lings from page handler
where it is possible.
RETURN @note If direct link was used translog_destroy_scanner should be
0 OK called after it using
1 Error
@return status of the operation
@retval 0 OK
@retval 1 Error
*/ */
my_bool translog_init_scanner(LSN lsn, my_bool translog_init_scanner(LSN lsn,
my_bool fixed_horizon, my_bool fixed_horizon,
struct st_translog_scanner_data *scanner) TRANSLOG_SCANNER_DATA *scanner,
my_bool use_direct)
{ {
TRANSLOG_VALIDATOR_DATA data; TRANSLOG_VALIDATOR_DATA data;
DBUG_ENTER("translog_init_scanner"); DBUG_ENTER("translog_init_scanner");
...@@ -5184,6 +5247,8 @@ my_bool translog_init_scanner(LSN lsn, ...@@ -5184,6 +5247,8 @@ my_bool translog_init_scanner(LSN lsn,
scanner->page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE; scanner->page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE;
scanner->fixed_horizon= fixed_horizon; scanner->fixed_horizon= fixed_horizon;
scanner->use_direct_link= use_direct;
scanner->direct_link= NULL;
scanner->horizon= translog_get_horizon(); scanner->horizon= translog_get_horizon();
DBUG_PRINT("info", ("horizon: (0x%lu,0x%lx)", DBUG_PRINT("info", ("horizon: (0x%lu,0x%lx)",
...@@ -5198,12 +5263,24 @@ my_bool translog_init_scanner(LSN lsn, ...@@ -5198,12 +5263,24 @@ my_bool translog_init_scanner(LSN lsn,
if (translog_scanner_set_last_page(scanner)) if (translog_scanner_set_last_page(scanner))
DBUG_RETURN(1); DBUG_RETURN(1);
if ((scanner->page= translog_get_page(&data, scanner->buffer)) == NULL) if (translog_scanner_get_page(scanner))
DBUG_RETURN(1); DBUG_RETURN(1);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/**
@brief Destroy scanner object;
@param scanner The scanner object to destroy
*/
void translog_destroy_scanner(TRANSLOG_SCANNER_DATA *scanner)
{
translog_free_link(scanner->direct_link);
}
/* /*
Checks End of the Log Checks End of the Log
...@@ -5298,7 +5375,6 @@ static my_bool translog_scanner_eof(TRANSLOG_SCANNER_DATA *scanner) ...@@ -5298,7 +5375,6 @@ static my_bool translog_scanner_eof(TRANSLOG_SCANNER_DATA *scanner)
scanner->last_file_page); scanner->last_file_page);
} }
/* /*
Move scanner to the next chunk Move scanner to the next chunk
...@@ -5315,7 +5391,6 @@ static my_bool ...@@ -5315,7 +5391,6 @@ static my_bool
translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner) translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner)
{ {
uint16 len; uint16 len;
TRANSLOG_VALIDATOR_DATA data;
DBUG_ENTER("translog_get_next_chunk"); DBUG_ENTER("translog_get_next_chunk");
if ((len= translog_get_total_chunk_length(scanner->page, if ((len= translog_get_total_chunk_length(scanner->page,
...@@ -5331,6 +5406,8 @@ translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner) ...@@ -5331,6 +5406,8 @@ translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner)
} }
if (translog_scanner_eop(scanner)) if (translog_scanner_eop(scanner))
{ {
/* before reading next page we should unpin current one if it was pinned */
translog_free_link(scanner->direct_link);
if (translog_scanner_eof(scanner)) if (translog_scanner_eof(scanner))
{ {
DBUG_PRINT("info", ("horizon: (%lu,0x%lx) pageaddr: (%lu,0x%lx)", DBUG_PRINT("info", ("horizon: (%lu,0x%lx) pageaddr: (%lu,0x%lx)",
...@@ -5350,9 +5427,7 @@ translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner) ...@@ -5350,9 +5427,7 @@ translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner)
scanner->page_addr+= TRANSLOG_PAGE_SIZE; /* offset increased */ scanner->page_addr+= TRANSLOG_PAGE_SIZE; /* offset increased */
} }
data.addr= &scanner->page_addr; if (translog_scanner_get_page(scanner))
data.was_recovered= 0;
if ((scanner->page= translog_get_page(&data, scanner->buffer)) == NULL)
DBUG_RETURN(1); DBUG_RETURN(1);
scanner->page_offset= translog_get_first_chunk_offset(scanner->page); scanner->page_offset= translog_get_first_chunk_offset(scanner->page);
...@@ -5482,7 +5557,7 @@ translog_variable_length_header(uchar *page, translog_size_t page_offset, ...@@ -5482,7 +5557,7 @@ translog_variable_length_header(uchar *page, translog_size_t page_offset,
{ {
DBUG_PRINT("info", ("use internal scanner for header reading")); DBUG_PRINT("info", ("use internal scanner for header reading"));
scanner= &internal_scanner; scanner= &internal_scanner;
if (translog_init_scanner(buff->lsn, 1, scanner)) if (translog_init_scanner(buff->lsn, 1, scanner, 0))
DBUG_RETURN(RECHEADER_READ_ERROR); DBUG_RETURN(RECHEADER_READ_ERROR);
} }
if (translog_get_next_chunk(scanner)) if (translog_get_next_chunk(scanner))
...@@ -5502,13 +5577,15 @@ translog_variable_length_header(uchar *page, translog_size_t page_offset, ...@@ -5502,13 +5577,15 @@ translog_variable_length_header(uchar *page, translog_size_t page_offset,
} }
base_lsn= buff->groups[0].addr; base_lsn= buff->groups[0].addr;
translog_init_scanner(base_lsn, 1, scanner); translog_init_scanner(base_lsn, 1, scanner, scanner == &internal_scanner);
/* first group chunk is always chunk type 2 */ /* first group chunk is always chunk type 2 */
page= scanner->page; page= scanner->page;
page_offset= scanner->page_offset; page_offset= scanner->page_offset;
src= page + page_offset + 1; src= page + page_offset + 1;
page_rest= TRANSLOG_PAGE_SIZE - (src - page); page_rest= TRANSLOG_PAGE_SIZE - (src - page);
body_len= page_rest; body_len= page_rest;
if (scanner == &internal_scanner)
translog_destroy_scanner(scanner);
} }
if (lsns) if (lsns)
{ {
...@@ -5615,6 +5692,7 @@ int translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff) ...@@ -5615,6 +5692,7 @@ int translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff)
{ {
uchar buffer[TRANSLOG_PAGE_SIZE], *page; uchar buffer[TRANSLOG_PAGE_SIZE], *page;
translog_size_t res, page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE; translog_size_t res, page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE;
PAGECACHE_PAGE_LINK direct_link;
TRANSLOG_ADDRESS addr; TRANSLOG_ADDRESS addr;
TRANSLOG_VALIDATOR_DATA data; TRANSLOG_VALIDATOR_DATA data;
DBUG_ENTER("translog_read_record_header"); DBUG_ENTER("translog_read_record_header");
...@@ -5628,8 +5706,10 @@ int translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff) ...@@ -5628,8 +5706,10 @@ int translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff)
data.was_recovered= 0; data.was_recovered= 0;
addr= lsn; addr= lsn;
addr-= page_offset; /* offset decreasing */ addr-= page_offset; /* offset decreasing */
res= (!(page= translog_get_page(&data, buffer))) ? RECHEADER_READ_ERROR : res= (!(page= translog_get_page(&data, buffer, &direct_link))) ?
RECHEADER_READ_ERROR :
translog_read_record_header_from_buffer(page, page_offset, buff, 0); translog_read_record_header_from_buffer(page, page_offset, buff, 0);
translog_free_link(direct_link);
DBUG_RETURN(res); DBUG_RETURN(res);
} }
...@@ -5774,8 +5854,9 @@ static my_bool translog_record_read_next_chunk(struct st_translog_reader_data ...@@ -5774,8 +5854,9 @@ static my_bool translog_record_read_next_chunk(struct st_translog_reader_data
data->current_group++; data->current_group++;
data->current_chunk= 0; data->current_chunk= 0;
DBUG_PRINT("info", ("skip to group: #%u", data->current_group)); DBUG_PRINT("info", ("skip to group: #%u", data->current_group));
translog_destroy_scanner(&data->scanner);
translog_init_scanner(data->header.groups[data->current_group].addr, translog_init_scanner(data->header.groups[data->current_group].addr,
1, &data->scanner); 1, &data->scanner, 1);
} }
else else
{ {
...@@ -5794,7 +5875,8 @@ static my_bool translog_record_read_next_chunk(struct st_translog_reader_data ...@@ -5794,7 +5875,8 @@ static my_bool translog_record_read_next_chunk(struct st_translog_reader_data
DBUG_ASSERT(data->header.groups_no - 1 == data->current_group); DBUG_ASSERT(data->header.groups_no - 1 == data->current_group);
DBUG_ASSERT(data->header.lsn == DBUG_ASSERT(data->header.lsn ==
data->scanner.page_addr + data->scanner.page_offset); data->scanner.page_addr + data->scanner.page_offset);
translog_init_scanner(data->header.chunk0_data_addr, 1, &data->scanner); translog_destroy_scanner(&data->scanner);
translog_init_scanner(data->header.chunk0_data_addr, 1, &data->scanner, 1);
data->chunk_size= data->header.chunk0_data_len; data->chunk_size= data->header.chunk0_data_len;
data->body_offset= data->scanner.page_offset; data->body_offset= data->scanner.page_offset;
data->current_offset= new_current_offset; data->current_offset= new_current_offset;
...@@ -5844,7 +5926,7 @@ static my_bool translog_init_reader_data(LSN lsn, ...@@ -5844,7 +5926,7 @@ static my_bool translog_init_reader_data(LSN lsn,
{ {
int read_header; int read_header;
DBUG_ENTER("translog_init_reader_data"); DBUG_ENTER("translog_init_reader_data");
if (translog_init_scanner(lsn, 1, &data->scanner) || if (translog_init_scanner(lsn, 1, &data->scanner, 1) ||
((read_header= ((read_header=
translog_read_record_header_scan(&data->scanner, &data->header, 1)) translog_read_record_header_scan(&data->scanner, &data->header, 1))
== RECHEADER_READ_ERROR)) == RECHEADER_READ_ERROR))
...@@ -5865,6 +5947,16 @@ static my_bool translog_init_reader_data(LSN lsn, ...@@ -5865,6 +5947,16 @@ static my_bool translog_init_reader_data(LSN lsn,
} }
/**
@brief Destroy reader data object
*/
static void translog_destroy_reader_data(struct st_translog_reader_data *data)
{
translog_destroy_scanner(&data->scanner);
}
/* /*
Read a part of the record. Read a part of the record.
...@@ -5924,7 +6016,10 @@ translog_size_t translog_read_record(LSN lsn, ...@@ -5924,7 +6016,10 @@ translog_size_t translog_read_record(LSN lsn,
memcpy(buffer, data->header.header + offset, len); memcpy(buffer, data->header.header + offset, len);
length-= len; length-= len;
if (length == 0) if (length == 0)
{
translog_destroy_reader_data(data);
DBUG_RETURN(requested_length); DBUG_RETURN(requested_length);
}
offset+= len; offset+= len;
buffer+= len; buffer+= len;
DBUG_PRINT("info", DBUG_PRINT("info",
...@@ -5952,7 +6047,10 @@ translog_size_t translog_read_record(LSN lsn, ...@@ -5952,7 +6047,10 @@ translog_size_t translog_read_record(LSN lsn,
(offset - data->current_offset), len); (offset - data->current_offset), len);
length-= len; length-= len;
if (length == 0) if (length == 0)
{
translog_destroy_reader_data(data);
DBUG_RETURN(requested_length); DBUG_RETURN(requested_length);
}
offset+= len; offset+= len;
buffer+= len; buffer+= len;
DBUG_PRINT("info", DBUG_PRINT("info",
...@@ -5961,7 +6059,10 @@ translog_size_t translog_read_record(LSN lsn, ...@@ -5961,7 +6059,10 @@ translog_size_t translog_read_record(LSN lsn,
(ulong) length)); (ulong) length));
} }
if (translog_record_read_next_chunk(data)) if (translog_record_read_next_chunk(data))
{
translog_destroy_reader_data(data);
DBUG_RETURN(requested_length - length); DBUG_RETURN(requested_length - length);
}
} }
} }
...@@ -6624,6 +6725,7 @@ LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon) ...@@ -6624,6 +6725,7 @@ LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
{ {
uint chunk_type; uint chunk_type;
TRANSLOG_SCANNER_DATA scanner; TRANSLOG_SCANNER_DATA scanner;
LSN result;
DBUG_ENTER("translog_next_LSN"); DBUG_ENTER("translog_next_LSN");
if (horizon == LSN_IMPOSSIBLE) if (horizon == LSN_IMPOSSIBLE)
...@@ -6632,7 +6734,7 @@ LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon) ...@@ -6632,7 +6734,7 @@ LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
if (addr == horizon) if (addr == horizon)
DBUG_RETURN(LSN_IMPOSSIBLE); DBUG_RETURN(LSN_IMPOSSIBLE);
translog_init_scanner(addr, 0, &scanner); translog_init_scanner(addr, 0, &scanner, 1);
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE; chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type, DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
...@@ -6647,9 +6749,13 @@ LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon) ...@@ -6647,9 +6749,13 @@ LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type, DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset])); (uint) scanner.page[scanner.page_offset]));
} }
if (scanner.page[scanner.page_offset] == 0) if (scanner.page[scanner.page_offset] == 0)
DBUG_RETURN(LSN_IMPOSSIBLE); /* reached page filler */ result= LSN_IMPOSSIBLE; /* reached page filler */
DBUG_RETURN(scanner.page_addr + scanner.page_offset); else
result= scanner.page_addr + scanner.page_offset;
translog_destroy_scanner(&scanner);
DBUG_RETURN(result);
} }
/** /**
...@@ -6681,7 +6787,7 @@ LSN translog_first_lsn_in_log() ...@@ -6681,7 +6787,7 @@ LSN translog_first_lsn_in_log()
data.addr= &addr; data.addr= &addr;
{ {
uchar buffer[TRANSLOG_PAGE_SIZE]; uchar buffer[TRANSLOG_PAGE_SIZE];
if ((page= translog_get_page(&data, buffer)) == NULL || if ((page= translog_get_page(&data, buffer, NULL)) == NULL ||
(chunk_offset= translog_get_first_chunk_offset(page)) == 0) (chunk_offset= translog_get_first_chunk_offset(page)) == 0)
DBUG_RETURN(LSN_ERROR); DBUG_RETURN(LSN_ERROR);
} }
...@@ -6719,7 +6825,7 @@ LSN translog_first_theoretical_lsn() ...@@ -6719,7 +6825,7 @@ LSN translog_first_theoretical_lsn()
addr= MAKE_LSN(1, TRANSLOG_PAGE_SIZE); /* the first page of the file */ addr= MAKE_LSN(1, TRANSLOG_PAGE_SIZE); /* the first page of the file */
data.addr= &addr; data.addr= &addr;
if ((page= translog_get_page(&data, buffer)) == NULL) if ((page= translog_get_page(&data, buffer, NULL)) == NULL)
DBUG_RETURN(LSN_ERROR); DBUG_RETURN(LSN_ERROR);
DBUG_RETURN(MAKE_LSN(1, TRANSLOG_PAGE_SIZE + DBUG_RETURN(MAKE_LSN(1, TRANSLOG_PAGE_SIZE +
......
...@@ -182,10 +182,14 @@ typedef struct st_translog_scanner_data ...@@ -182,10 +182,14 @@ typedef struct st_translog_scanner_data
TRANSLOG_ADDRESS horizon; TRANSLOG_ADDRESS horizon;
TRANSLOG_ADDRESS last_file_page; /* Last page on in this file */ TRANSLOG_ADDRESS last_file_page; /* Last page on in this file */
uchar *page; /* page content pointer */ uchar *page; /* page content pointer */
/* direct link on the current page or NULL if it is not supported/requested */
PAGECACHE_PAGE_LINK direct_link;
/* offset of the chunk in the page */ /* offset of the chunk in the page */
translog_size_t page_offset; translog_size_t page_offset;
/* set horizon only once at init */ /* set horizon only once at init */
my_bool fixed_horizon; my_bool fixed_horizon;
/* try to get direct link on the page if it is possible */
my_bool use_direct_link;
} TRANSLOG_SCANNER_DATA; } TRANSLOG_SCANNER_DATA;
...@@ -245,7 +249,9 @@ extern my_bool translog_flush(LSN lsn); ...@@ -245,7 +249,9 @@ extern my_bool translog_flush(LSN lsn);
extern my_bool translog_init_scanner(LSN lsn, extern my_bool translog_init_scanner(LSN lsn,
my_bool fixed_horizon, my_bool fixed_horizon,
struct st_translog_scanner_data *scanner); struct st_translog_scanner_data *scanner,
my_bool use_direct_link);
extern void translog_destroy_scanner(TRANSLOG_SCANNER_DATA *scanner);
extern int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner, extern int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner,
TRANSLOG_HEADER_BUFFER *buff); TRANSLOG_HEADER_BUFFER *buff);
......
...@@ -58,7 +58,7 @@ if (pos > end_pos) \ ...@@ -58,7 +58,7 @@ if (pos > end_pos) \
** In MySQL the server will handle version issues. ** In MySQL the server will handle version issues.
******************************************************************************/ ******************************************************************************/
MARIA_HA *_ma_test_if_reopen(char *filename) MARIA_HA *_ma_test_if_reopen(const char *filename)
{ {
LIST *pos; LIST *pos;
...@@ -1001,7 +1001,9 @@ uint _ma_state_info_write(MARIA_SHARE *share, uint pWrite) ...@@ -1001,7 +1001,9 @@ uint _ma_state_info_write(MARIA_SHARE *share, uint pWrite)
if (pWrite & 4) if (pWrite & 4)
pthread_mutex_lock(&share->intern_lock); pthread_mutex_lock(&share->intern_lock);
else if (maria_multi_threaded) else if (maria_multi_threaded)
{
safe_mutex_assert_owner(&share->intern_lock); safe_mutex_assert_owner(&share->intern_lock);
}
if (share->base.born_transactional && translog_inited && if (share->base.born_transactional && translog_inited &&
!maria_in_recovery) !maria_in_recovery)
{ {
......
...@@ -96,7 +96,7 @@ ...@@ -96,7 +96,7 @@
#define PCBLOCK_INFO(B) \ #define PCBLOCK_INFO(B) \
DBUG_PRINT("info", \ DBUG_PRINT("info", \
("block: 0x%lx file: %lu page: %lu s: %0x hshL: 0x%lx req: %u/%u " \ ("block: 0x%lx file: %lu page: %lu s: %0x hshL: 0x%lx req: %u/%u " \
"wrlocks: %u", \ "wrlocks: %u pins: %u", \
(ulong)(B), \ (ulong)(B), \
(ulong)((B)->hash_link ? \ (ulong)((B)->hash_link ? \
(B)->hash_link->file.file : \ (B)->hash_link->file.file : \
...@@ -110,7 +110,8 @@ ...@@ -110,7 +110,8 @@
(uint)((B)->hash_link ? \ (uint)((B)->hash_link ? \
(B)->hash_link->requests : \ (B)->hash_link->requests : \
0), \ 0), \
block->wlocks)) block->wlocks, \
(uint)(B)->pins))
/* TODO: put it to my_static.c */ /* TODO: put it to my_static.c */
my_bool my_disable_flush_pagecache_blocks= 0; my_bool my_disable_flush_pagecache_blocks= 0;
...@@ -457,8 +458,10 @@ static my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block, ...@@ -457,8 +458,10 @@ static my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block,
#define FLUSH_CACHE 2000 /* sort this many blocks at once */ #define FLUSH_CACHE 2000 /* sort this many blocks at once */
static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block); static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block);
#ifndef DBUG_OFF
static void test_key_cache(PAGECACHE *pagecache, static void test_key_cache(PAGECACHE *pagecache,
const char *where, my_bool lock); const char *where, my_bool lock);
#endif
#define PAGECACHE_HASH(p, f, pos) (((ulong) (pos) + \ #define PAGECACHE_HASH(p, f, pos) (((ulong) (pos) + \
(ulong) (f).file) & (p->hash_entries-1)) (ulong) (f).file) & (p->hash_entries-1))
...@@ -655,11 +658,11 @@ static inline uint next_power(uint value) ...@@ -655,11 +658,11 @@ static inline uint next_power(uint value)
*/ */
int init_pagecache(PAGECACHE *pagecache, size_t use_mem, ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem,
uint division_limit, uint age_threshold, uint division_limit, uint age_threshold,
uint block_size) uint block_size)
{ {
uint blocks, hash_links, length; ulong blocks, hash_links, length;
int error; int error;
DBUG_ENTER("init_pagecache"); DBUG_ENTER("init_pagecache");
DBUG_ASSERT(block_size >= 512); DBUG_ASSERT(block_size >= 512);
...@@ -689,10 +692,10 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem, ...@@ -689,10 +692,10 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
block_size)); block_size));
DBUG_ASSERT(((uint)(1 << pagecache->shift)) == block_size); DBUG_ASSERT(((uint)(1 << pagecache->shift)) == block_size);
blocks= (int) (use_mem / (sizeof(PAGECACHE_BLOCK_LINK) + blocks= (ulong) (use_mem / (sizeof(PAGECACHE_BLOCK_LINK) +
2 * sizeof(PAGECACHE_HASH_LINK) + 2 * sizeof(PAGECACHE_HASH_LINK) +
sizeof(PAGECACHE_HASH_LINK*) * sizeof(PAGECACHE_HASH_LINK*) *
5/4 + block_size)); 5/4 + block_size));
/* /*
We need to support page cache with just one block to be able to do We need to support page cache with just one block to be able to do
scanning of rows-in-block files scanning of rows-in-block files
...@@ -714,7 +717,7 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem, ...@@ -714,7 +717,7 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
ALIGN_SIZE(hash_links * sizeof(PAGECACHE_HASH_LINK)) + ALIGN_SIZE(hash_links * sizeof(PAGECACHE_HASH_LINK)) +
ALIGN_SIZE(sizeof(PAGECACHE_HASH_LINK*) * ALIGN_SIZE(sizeof(PAGECACHE_HASH_LINK*) *
pagecache->hash_entries))) + pagecache->hash_entries))) +
(((ulong) blocks) << pagecache->shift) > use_mem) (blocks << pagecache->shift) > use_mem)
blocks--; blocks--;
/* Allocate memory for cache page buffers */ /* Allocate memory for cache page buffers */
if ((pagecache->block_mem= if ((pagecache->block_mem=
...@@ -726,8 +729,7 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem, ...@@ -726,8 +729,7 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
For each block 2 hash links are allocated For each block 2 hash links are allocated
*/ */
if ((pagecache->block_root= if ((pagecache->block_root=
(PAGECACHE_BLOCK_LINK*) my_malloc((uint) length, (PAGECACHE_BLOCK_LINK*) my_malloc((size_t) length, MYF(0))))
MYF(0))))
break; break;
my_large_free(pagecache->block_mem, MYF(0)); my_large_free(pagecache->block_mem, MYF(0));
pagecache->block_mem= 0; pagecache->block_mem= 0;
...@@ -739,8 +741,8 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem, ...@@ -739,8 +741,8 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
} }
blocks= blocks / 4*3; blocks= blocks / 4*3;
} }
pagecache->blocks_unused= (ulong) blocks; pagecache->blocks_unused= blocks;
pagecache->disk_blocks= (int) blocks; pagecache->disk_blocks= (long) blocks;
pagecache->hash_links= hash_links; pagecache->hash_links= hash_links;
pagecache->hash_root= pagecache->hash_root=
(PAGECACHE_HASH_LINK**) ((char*) pagecache->block_root + (PAGECACHE_HASH_LINK**) ((char*) pagecache->block_root +
...@@ -782,8 +784,8 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem, ...@@ -782,8 +784,8 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
pagecache->waiting_for_hash_link.last_thread= NULL; pagecache->waiting_for_hash_link.last_thread= NULL;
pagecache->waiting_for_block.last_thread= NULL; pagecache->waiting_for_block.last_thread= NULL;
DBUG_PRINT("exit", DBUG_PRINT("exit",
("disk_blocks: %d block_root: 0x%lx hash_entries: %d\ ("disk_blocks: %ld block_root: 0x%lx hash_entries: %ld\
hash_root: 0x%lx hash_links: %d hash_link_root: 0x%lx", hash_root: 0x%lx hash_links: %ld hash_link_root: 0x%lx",
pagecache->disk_blocks, (long) pagecache->block_root, pagecache->disk_blocks, (long) pagecache->block_root,
pagecache->hash_entries, (long) pagecache->hash_root, pagecache->hash_entries, (long) pagecache->hash_root,
pagecache->hash_links, (long) pagecache->hash_link_root)); pagecache->hash_links, (long) pagecache->hash_link_root));
...@@ -796,7 +798,7 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem, ...@@ -796,7 +798,7 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
} }
pagecache->blocks= pagecache->disk_blocks > 0 ? pagecache->disk_blocks : 0; pagecache->blocks= pagecache->disk_blocks > 0 ? pagecache->disk_blocks : 0;
DBUG_RETURN((int) pagecache->disk_blocks); DBUG_RETURN((ulong) pagecache->disk_blocks);
err: err:
error= my_errno; error= my_errno;
...@@ -887,11 +889,11 @@ static int flush_all_key_blocks(PAGECACHE *pagecache) ...@@ -887,11 +889,11 @@ static int flush_all_key_blocks(PAGECACHE *pagecache)
So we disable it for now. So we disable it for now.
*/ */
#if NOT_USED /* keep disabled until code is fixed see above !! */ #if NOT_USED /* keep disabled until code is fixed see above !! */
int resize_pagecache(PAGECACHE *pagecache, ulong resize_pagecache(PAGECACHE *pagecache,
size_t use_mem, uint division_limit, size_t use_mem, uint division_limit,
uint age_threshold) uint age_threshold)
{ {
int blocks; ulong blocks;
#ifdef THREAD #ifdef THREAD
struct st_my_thread_var *thread; struct st_my_thread_var *thread;
WQUEUE *wqueue; WQUEUE *wqueue;
...@@ -1282,8 +1284,10 @@ static void unlink_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block) ...@@ -1282,8 +1284,10 @@ static void unlink_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
DBUG_ENTER("unlink_block"); DBUG_ENTER("unlink_block");
DBUG_PRINT("unlink_block", ("unlink 0x%lx", (ulong)block)); DBUG_PRINT("unlink_block", ("unlink 0x%lx", (ulong)block));
if (block->next_used == block) if (block->next_used == block)
{
/* The list contains only one member */ /* The list contains only one member */
pagecache->used_last= pagecache->used_ins= NULL; pagecache->used_last= pagecache->used_ins= NULL;
}
else else
{ {
block->next_used->prev_used= block->prev_used; block->next_used->prev_used= block->prev_used;
...@@ -2661,13 +2665,12 @@ void pagecache_unpin(PAGECACHE *pagecache, ...@@ -2661,13 +2665,12 @@ void pagecache_unpin(PAGECACHE *pagecache,
*/ */
void pagecache_unlock_by_link(PAGECACHE *pagecache, void pagecache_unlock_by_link(PAGECACHE *pagecache,
PAGECACHE_PAGE_LINK *link, PAGECACHE_BLOCK_LINK *block,
enum pagecache_page_lock lock, enum pagecache_page_lock lock,
enum pagecache_page_pin pin, enum pagecache_page_pin pin,
LSN first_REDO_LSN_for_page, LSN first_REDO_LSN_for_page,
LSN lsn) LSN lsn)
{ {
PAGECACHE_BLOCK_LINK *block= (PAGECACHE_BLOCK_LINK *)link;
DBUG_ENTER("pagecache_unlock_by_link"); DBUG_ENTER("pagecache_unlock_by_link");
DBUG_PRINT("enter", ("block: 0x%lx fd: %u page: %lu %s %s", DBUG_PRINT("enter", ("block: 0x%lx fd: %u page: %lu %s %s",
(ulong) block, (ulong) block,
...@@ -2820,16 +2823,28 @@ void pagecache_unpin_by_link(PAGECACHE *pagecache, ...@@ -2820,16 +2823,28 @@ void pagecache_unpin_by_link(PAGECACHE *pagecache,
Pin will be chosen according to lock parameter (see lock_to_pin) Pin will be chosen according to lock parameter (see lock_to_pin)
*/ */
static enum pagecache_page_pin lock_to_pin[]= static enum pagecache_page_pin lock_to_pin[2][8]=
{ {
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/, {
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_READLOCKED*/, PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/, PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ*/, PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
PAGECACHE_PIN /*PAGECACHE_LOCK_WRITE*/, PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/, PAGECACHE_PIN /*PAGECACHE_LOCK_WRITE*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_UNLOCK*/, PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_TO_READ*/ PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_TO_READ*/
},
{
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
PAGECACHE_PIN /*PAGECACHE_LOCK_READ*/,
PAGECACHE_PIN /*PAGECACHE_LOCK_WRITE*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_WRITE_TO_READ*/
}
}; };
uchar *pagecache_valid_read(PAGECACHE *pagecache, uchar *pagecache_valid_read(PAGECACHE *pagecache,
...@@ -2839,24 +2854,27 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache, ...@@ -2839,24 +2854,27 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache,
uchar *buff, uchar *buff,
enum pagecache_page_type type, enum pagecache_page_type type,
enum pagecache_page_lock lock, enum pagecache_page_lock lock,
PAGECACHE_PAGE_LINK *link, PAGECACHE_BLOCK_LINK **link,
pagecache_disk_read_validator validator, pagecache_disk_read_validator validator,
uchar* validator_data) uchar* validator_data)
{ {
int error= 0; int error= 0;
enum pagecache_page_pin pin= lock_to_pin[lock]; enum pagecache_page_pin pin= lock_to_pin[test(buff==0)][lock];
PAGECACHE_PAGE_LINK fake_link; PAGECACHE_BLOCK_LINK *fake_link;
DBUG_ENTER("pagecache_valid_read"); DBUG_ENTER("pagecache_valid_read");
DBUG_PRINT("enter", ("fd: %u page: %lu level: %u t:%s %s %s", DBUG_PRINT("enter", ("fd: %u page: %lu buffer: 0x%lx level: %u "
(uint) file->file, (ulong) pageno, level, "t:%s %s %s",
(uint) file->file, (ulong) pageno,
(ulong) buff, level,
page_cache_page_type_str[type], page_cache_page_type_str[type],
page_cache_page_lock_str[lock], page_cache_page_lock_str[lock],
page_cache_page_pin_str[pin])); page_cache_page_pin_str[pin]));
DBUG_ASSERT(buff != 0 || (buff == 0 && (pin == PAGECACHE_PIN ||
pin == PAGECACHE_PIN_LEFT_PINNED)));
if (!link) if (!link)
link= &fake_link; link= &fake_link;
else *link= 0; /* Catch errors */
*link= 0;
restart: restart:
...@@ -2910,19 +2928,25 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache, ...@@ -2910,19 +2928,25 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache,
goto restart; goto restart;
} }
if (! ((status= block->status) & PCBLOCK_ERROR)) status= block->status;
if (!buff)
buff= block->buffer;
else
{ {
if (!(status & PCBLOCK_ERROR))
{
#if !defined(SERIALIZED_READ_FROM_CACHE) #if !defined(SERIALIZED_READ_FROM_CACHE)
pagecache_pthread_mutex_unlock(&pagecache->cache_lock); pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
#endif #endif
DBUG_ASSERT((pagecache->block_size & 511) == 0); DBUG_ASSERT((pagecache->block_size & 511) == 0);
/* Copy data from the cache buffer */ /* Copy data from the cache buffer */
bmove512(buff, block->buffer, pagecache->block_size); bmove512(buff, block->buffer, pagecache->block_size);
#if !defined(SERIALIZED_READ_FROM_CACHE) #if !defined(SERIALIZED_READ_FROM_CACHE)
pagecache_pthread_mutex_lock(&pagecache->cache_lock); pagecache_pthread_mutex_lock(&pagecache->cache_lock);
#endif #endif
}
} }
remove_reader(block); remove_reader(block);
...@@ -2934,7 +2958,7 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache, ...@@ -2934,7 +2958,7 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache,
if (pin == PAGECACHE_PIN_LEFT_UNPINNED || pin == PAGECACHE_UNPIN) if (pin == PAGECACHE_PIN_LEFT_UNPINNED || pin == PAGECACHE_UNPIN)
unreg_request(pagecache, block, 1); unreg_request(pagecache, block, 1);
else else
*link= (PAGECACHE_PAGE_LINK)block; *link= block;
dec_counter_for_resize_op(pagecache); dec_counter_for_resize_op(pagecache);
...@@ -2983,7 +3007,7 @@ my_bool pagecache_delete(PAGECACHE *pagecache, ...@@ -2983,7 +3007,7 @@ my_bool pagecache_delete(PAGECACHE *pagecache,
my_bool flush) my_bool flush)
{ {
int error= 0; int error= 0;
enum pagecache_page_pin pin= lock_to_pin[lock]; enum pagecache_page_pin pin= lock_to_pin[0][lock];
DBUG_ENTER("pagecache_delete"); DBUG_ENTER("pagecache_delete");
DBUG_PRINT("enter", ("fd: %u page: %lu %s %s", DBUG_PRINT("enter", ("fd: %u page: %lu %s %s",
(uint) file->file, (ulong) pageno, (uint) file->file, (ulong) pageno,
...@@ -3830,7 +3854,8 @@ int flush_pagecache_blocks(PAGECACHE *pagecache, ...@@ -3830,7 +3854,8 @@ int flush_pagecache_blocks(PAGECACHE *pagecache,
0 on success (always because it can't fail) 0 on success (always because it can't fail)
*/ */
int reset_pagecache_counters(const char *name, PAGECACHE *pagecache) int reset_pagecache_counters(const char *name __attribute__((unused)),
PAGECACHE *pagecache)
{ {
DBUG_ENTER("reset_pagecache_counters"); DBUG_ENTER("reset_pagecache_counters");
if (!pagecache->inited) if (!pagecache->inited)
......
...@@ -73,8 +73,6 @@ enum pagecache_write_mode ...@@ -73,8 +73,6 @@ enum pagecache_write_mode
PAGECACHE_WRITE_DONE PAGECACHE_WRITE_DONE
}; };
typedef void *PAGECACHE_PAGE_LINK;
/* file descriptor for Maria */ /* file descriptor for Maria */
typedef struct st_pagecache_file typedef struct st_pagecache_file
{ {
...@@ -93,6 +91,8 @@ typedef struct st_pagecache_page PAGECACHE_PAGE; ...@@ -93,6 +91,8 @@ typedef struct st_pagecache_page PAGECACHE_PAGE;
struct st_pagecache_hash_link; struct st_pagecache_hash_link;
typedef struct st_pagecache_hash_link PAGECACHE_HASH_LINK; typedef struct st_pagecache_hash_link PAGECACHE_HASH_LINK;
typedef PAGECACHE_BLOCK_LINK * PAGECACHE_PAGE_LINK; /* To be removed */
#include <wqueue.h> #include <wqueue.h>
typedef my_bool (*pagecache_disk_read_validator)(uchar *page, uchar *data); typedef my_bool (*pagecache_disk_read_validator)(uchar *page, uchar *data);
...@@ -106,25 +106,22 @@ typedef my_bool (*pagecache_disk_read_validator)(uchar *page, uchar *data); ...@@ -106,25 +106,22 @@ typedef my_bool (*pagecache_disk_read_validator)(uchar *page, uchar *data);
typedef struct st_pagecache typedef struct st_pagecache
{ {
my_bool inited; size_t mem_size; /* specified size of the cache memory */
my_bool resize_in_flush; /* true during flush of resize operation */
my_bool can_be_used; /* usage of cache for read/write is allowed */
uint shift; /* block size = 2 ^ shift */
size_t mem_size; /* specified size of the cache memory */
uint32 block_size; /* size of the page buffer of a cache block */
ulong min_warm_blocks; /* min number of warm blocks; */ ulong min_warm_blocks; /* min number of warm blocks; */
ulong age_threshold; /* age threshold for hot blocks */ ulong age_threshold; /* age threshold for hot blocks */
ulonglong time; /* total number of block link operations */ ulonglong time; /* total number of block link operations */
uint hash_entries; /* max number of entries in the hash table */ ulong hash_entries; /* max number of entries in the hash table */
int hash_links; /* max number of hash links */ long hash_links; /* max number of hash links */
int hash_links_used; /* number of hash links taken from free links pool */ long hash_links_used; /* number of hash links taken from free links pool */
int disk_blocks; /* max number of blocks in the cache */ long disk_blocks; /* max number of blocks in the cache */
ulong blocks_used; /* maximum number of concurrently used blocks */ ulong blocks_used; /* maximum number of concurrently used blocks */
ulong blocks_unused; /* number of currently unused blocks */ ulong blocks_unused; /* number of currently unused blocks */
ulong blocks_changed; /* number of currently dirty blocks */ ulong blocks_changed; /* number of currently dirty blocks */
ulong warm_blocks; /* number of blocks in warm sub-chain */ ulong warm_blocks; /* number of blocks in warm sub-chain */
ulong cnt_for_resize_op; /* counter to block resize operation */ ulong cnt_for_resize_op; /* counter to block resize operation */
ulong blocks_available; /* number of blocks available in the LRU chain */ ulong blocks_available; /* number of blocks available in the LRU chain */
long blocks; /* max number of blocks in the cache */
uint32 block_size; /* size of the page buffer of a cache block */
PAGECACHE_HASH_LINK **hash_root;/* arr. of entries into hash table buckets */ PAGECACHE_HASH_LINK **hash_root;/* arr. of entries into hash table buckets */
PAGECACHE_HASH_LINK *hash_link_root;/* memory for hash table links */ PAGECACHE_HASH_LINK *hash_link_root;/* memory for hash table links */
PAGECACHE_HASH_LINK *free_hash_list;/* list of free hash links */ PAGECACHE_HASH_LINK *free_hash_list;/* list of free hash links */
...@@ -159,19 +156,22 @@ typedef struct st_pagecache ...@@ -159,19 +156,22 @@ typedef struct st_pagecache
ulonglong global_cache_r_requests;/* number of read requests (read hits) */ ulonglong global_cache_r_requests;/* number of read requests (read hits) */
ulonglong global_cache_read; /* number of reads from files to cache */ ulonglong global_cache_read; /* number of reads from files to cache */
int blocks; /* max number of blocks in the cache */ uint shift; /* block size = 2 ^ shift */
my_bool inited;
my_bool resize_in_flush; /* true during flush of resize operation */
my_bool can_be_used; /* usage of cache for read/write is allowed */
my_bool in_init; /* Set to 1 in MySQL during init/resize */ my_bool in_init; /* Set to 1 in MySQL during init/resize */
} PAGECACHE; } PAGECACHE;
/* The default key cache */ /* The default key cache */
extern PAGECACHE dflt_pagecache_var, *dflt_pagecache; extern PAGECACHE dflt_pagecache_var, *dflt_pagecache;
extern int init_pagecache(PAGECACHE *pagecache, size_t use_mem, extern ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem,
uint division_limit, uint age_threshold, uint division_limit, uint age_threshold,
uint block_size); uint block_size);
extern int resize_pagecache(PAGECACHE *pagecache, extern ulong resize_pagecache(PAGECACHE *pagecache,
size_t use_mem, uint division_limit, size_t use_mem, uint division_limit,
uint age_threshold); uint age_threshold);
extern void change_pagecache_param(PAGECACHE *pagecache, uint division_limit, extern void change_pagecache_param(PAGECACHE *pagecache, uint division_limit,
uint age_threshold); uint age_threshold);
...@@ -185,7 +185,7 @@ extern uchar *pagecache_valid_read(PAGECACHE *pagecache, ...@@ -185,7 +185,7 @@ extern uchar *pagecache_valid_read(PAGECACHE *pagecache,
uchar *buff, uchar *buff,
enum pagecache_page_type type, enum pagecache_page_type type,
enum pagecache_page_lock lock, enum pagecache_page_lock lock,
PAGECACHE_PAGE_LINK *link, PAGECACHE_BLOCK_LINK **link,
pagecache_disk_read_validator validator, pagecache_disk_read_validator validator,
uchar* validator_data); uchar* validator_data);
...@@ -218,7 +218,7 @@ extern void pagecache_unlock(PAGECACHE *pagecache, ...@@ -218,7 +218,7 @@ extern void pagecache_unlock(PAGECACHE *pagecache,
LSN first_REDO_LSN_for_page, LSN first_REDO_LSN_for_page,
LSN lsn); LSN lsn);
extern void pagecache_unlock_by_link(PAGECACHE *pagecache, extern void pagecache_unlock_by_link(PAGECACHE *pagecache,
PAGECACHE_PAGE_LINK *link, PAGECACHE_BLOCK_LINK *block,
enum pagecache_page_lock lock, enum pagecache_page_lock lock,
enum pagecache_page_pin pin, enum pagecache_page_pin pin,
LSN first_REDO_LSN_for_page, LSN first_REDO_LSN_for_page,
......
...@@ -105,6 +105,7 @@ static int new_table(uint16 sid, const char *name, ...@@ -105,6 +105,7 @@ static int new_table(uint16 sid, const char *name,
static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn, static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
struct st_dirty_page *dirty_page); struct st_dirty_page *dirty_page);
static int close_all_tables(void); static int close_all_tables(void);
static my_bool close_one_table(const char *name, LSN addr);
static void print_redo_phase_progress(TRANSLOG_ADDRESS addr); static void print_redo_phase_progress(TRANSLOG_ADDRESS addr);
/** @brief global [out] buffer for translog_read_record(); never shrinks */ /** @brief global [out] buffer for translog_read_record(); never shrinks */
...@@ -415,6 +416,8 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE) ...@@ -415,6 +416,8 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
uint flags; uint flags;
int error= 1, create_mode= O_RDWR | O_TRUNC; int error= 1, create_mode= O_RDWR | O_TRUNC;
MARIA_HA *info= NULL; MARIA_HA *info= NULL;
uint kfile_size_before_extension, keystart;
if (skip_DDLs) if (skip_DDLs)
{ {
tprint(tracef, "we skip DDLs\n"); tprint(tracef, "we skip DDLs\n");
...@@ -431,6 +434,12 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE) ...@@ -431,6 +434,12 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
} }
name= log_record_buffer.str; name= log_record_buffer.str;
tprint(tracef, "Table '%s'", name); tprint(tracef, "Table '%s'", name);
if (close_one_table(name, rec->lsn))
{
tprint(tracef, " got error %d on close\n", my_errno);
ALERT_USER();
goto end;
}
/* we try hard to get create_rename_lsn, to avoid mistakes if possible */ /* we try hard to get create_rename_lsn, to avoid mistakes if possible */
info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR); info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
if (info) if (info)
...@@ -474,7 +483,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE) ...@@ -474,7 +483,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
info= NULL; info= NULL;
} }
else /* one or two files absent, or header corrupted... */ else /* one or two files absent, or header corrupted... */
tprint(tracef, "can't be opened, probably does not exist"); tprint(tracef, " can't be opened, probably does not exist");
/* if does not exist, or is older, overwrite it */ /* if does not exist, or is older, overwrite it */
/** @todo symlinks */ /** @todo symlinks */
ptr= name + strlen(name) + 1; ptr= name + strlen(name) + 1;
...@@ -490,13 +499,13 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE) ...@@ -490,13 +499,13 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode, if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME|create_flag))) < 0) MYF(MY_WME|create_flag))) < 0)
{ {
tprint(tracef, "Failed to create index file\n"); tprint(tracef, " Failed to create index file\n");
goto end; goto end;
} }
ptr++; ptr++;
uint kfile_size_before_extension= uint2korr(ptr); kfile_size_before_extension= uint2korr(ptr);
ptr+= 2; ptr+= 2;
uint keystart= uint2korr(ptr); keystart= uint2korr(ptr);
ptr+= 2; ptr+= 2;
/* set create_rename_lsn (for maria_read_log to be idempotent) */ /* set create_rename_lsn (for maria_read_log to be idempotent) */
lsn_store(ptr + sizeof(info->s->state.header) + 2, rec->lsn); lsn_store(ptr + sizeof(info->s->state.header) + 2, rec->lsn);
...@@ -507,7 +516,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE) ...@@ -507,7 +516,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) || kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
my_chsize(kfile, keystart, 0, MYF(MY_WME))) my_chsize(kfile, keystart, 0, MYF(MY_WME)))
{ {
tprint(tracef, "Failed to write to index file\n"); tprint(tracef, " Failed to write to index file\n");
goto end; goto end;
} }
if (!(flags & HA_DONT_TOUCH_DATA)) if (!(flags & HA_DONT_TOUCH_DATA))
...@@ -521,7 +530,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE) ...@@ -521,7 +530,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
MYF(MY_WME | create_flag))) < 0) || MYF(MY_WME | create_flag))) < 0) ||
my_close(dfile, MYF(MY_WME))) my_close(dfile, MYF(MY_WME)))
{ {
tprint(tracef, "Failed to create data file\n"); tprint(tracef, " Failed to create data file\n");
goto end; goto end;
} }
/* /*
...@@ -533,7 +542,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE) ...@@ -533,7 +542,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
if (((info= maria_open(name, O_RDONLY, 0)) == NULL) || if (((info= maria_open(name, O_RDONLY, 0)) == NULL) ||
_ma_initialize_data_file(info->s, info->dfile.file)) _ma_initialize_data_file(info->s, info->dfile.file))
{ {
tprint(tracef, "Failed to open new table or write to data file\n"); tprint(tracef, " Failed to open new table or write to data file\n");
goto end; goto end;
} }
} }
...@@ -1436,6 +1445,11 @@ prototype_undo_exec_hook(UNDO_ROW_UPDATE) ...@@ -1436,6 +1445,11 @@ prototype_undo_exec_hook(UNDO_ROW_UPDATE)
static int run_redo_phase(LSN lsn, my_bool apply) static int run_redo_phase(LSN lsn, my_bool apply)
{ {
TRANSLOG_HEADER_BUFFER rec;
struct st_translog_scanner_data scanner;
int len;
uint i;
/* install hooks for execution */ /* install hooks for execution */
#define install_redo_exec_hook(R) \ #define install_redo_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \ log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
...@@ -1467,8 +1481,6 @@ static int run_redo_phase(LSN lsn, my_bool apply) ...@@ -1467,8 +1481,6 @@ static int run_redo_phase(LSN lsn, my_bool apply)
current_group_end_lsn= LSN_IMPOSSIBLE; current_group_end_lsn= LSN_IMPOSSIBLE;
TRANSLOG_HEADER_BUFFER rec;
if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon())) if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon()))
{ {
tprint(tracef, "checkpoint address refers to the log end log or " tprint(tracef, "checkpoint address refers to the log end log or "
...@@ -1476,7 +1488,7 @@ static int run_redo_phase(LSN lsn, my_bool apply) ...@@ -1476,7 +1488,7 @@ static int run_redo_phase(LSN lsn, my_bool apply)
return 0; return 0;
} }
int len= translog_read_record_header(lsn, &rec); len= translog_read_record_header(lsn, &rec);
/** @todo EOF should be detected */ /** @todo EOF should be detected */
if (len == RECHEADER_READ_ERROR) if (len == RECHEADER_READ_ERROR)
...@@ -1484,13 +1496,11 @@ static int run_redo_phase(LSN lsn, my_bool apply) ...@@ -1484,13 +1496,11 @@ static int run_redo_phase(LSN lsn, my_bool apply)
tprint(tracef, "Failed to read header of the first record.\n"); tprint(tracef, "Failed to read header of the first record.\n");
return 1; return 1;
} }
struct st_translog_scanner_data scanner; if (translog_init_scanner(lsn, 1, &scanner, 0))
if (translog_init_scanner(lsn, 1, &scanner))
{ {
tprint(tracef, "Scanner init failed\n"); tprint(tracef, "Scanner init failed\n");
return 1; return 1;
} }
uint i;
for (i= 1;;i++) for (i= 1;;i++)
{ {
uint16 sid= rec.short_trid; uint16 sid= rec.short_trid;
...@@ -1533,7 +1543,7 @@ static int run_redo_phase(LSN lsn, my_bool apply) ...@@ -1533,7 +1543,7 @@ static int run_redo_phase(LSN lsn, my_bool apply)
tprint(tracef, "Cannot find record where it should be\n"); tprint(tracef, "Cannot find record where it should be\n");
return 1; return 1;
} }
if (translog_init_scanner(rec2.lsn, 1, &scanner2)) if (translog_init_scanner(rec2.lsn, 1, &scanner2, 0))
{ {
tprint(tracef, "Scanner2 init failed\n"); tprint(tracef, "Scanner2 init failed\n");
return 1; return 1;
...@@ -1607,6 +1617,7 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase) ...@@ -1607,6 +1617,7 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
{ {
uint sid, unfinished= 0; uint sid, unfinished= 0;
char llbuf[22]; char llbuf[22];
LSN addr;
hash_free(&all_dirty_pages); hash_free(&all_dirty_pages);
/* /*
...@@ -1667,7 +1678,7 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase) ...@@ -1667,7 +1678,7 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
The UNDO phase uses some normal run-time code of ROLLBACK: generates log The UNDO phase uses some normal run-time code of ROLLBACK: generates log
records, etc; prepare tables for that records, etc; prepare tables for that
*/ */
LSN addr= translog_get_horizon(); addr= translog_get_horizon();
for (sid= 0; sid <= SHARE_ID_MAX; sid++) for (sid= 0; sid <= SHARE_ID_MAX; sid++)
{ {
MARIA_HA *info= all_tables[sid].info; MARIA_HA *info= all_tables[sid].info;
...@@ -2070,6 +2081,42 @@ static int close_all_tables(void) ...@@ -2070,6 +2081,42 @@ static int close_all_tables(void)
return error; return error;
} }
/* Close one table during redo phase */
static my_bool close_one_table(const char *open_name, LSN addr)
{
my_bool res= 0;
LIST *pos;
/* There are no other threads using the tables, so we don't need any locks */
for (pos=maria_open_list ; pos ;)
{
MARIA_HA *info= (MARIA_HA*) pos->data;
MARIA_SHARE *share= info->s;
pos= pos->next;
if (!strcmp(share->open_file_name, open_name))
{
struct st_table_for_recovery *internal_table, *end;
for (internal_table= all_tables, end= internal_table + SHARE_ID_MAX + 1;
internal_table < end ;
internal_table++)
{
if (internal_table->info == info)
{
internal_table->info= 0;
break;
}
}
prepare_table_for_close(info, addr);
if (maria_close(info))
res= 1;
}
}
return res;
}
static void print_redo_phase_progress(TRANSLOG_ADDRESS addr) static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
{ {
static int end_logno= FILENO_IMPOSSIBLE, end_offset, percentage_printed= 0; static int end_logno= FILENO_IMPOSSIBLE, end_offset, percentage_printed= 0;
......
...@@ -154,7 +154,7 @@ int main(int argc, char **argv) ...@@ -154,7 +154,7 @@ int main(int argc, char **argv)
enum options_mc { enum options_mc {
OPT_CHARSETS_DIR=256, OPT_SET_COLLATION,OPT_START_CHECK_POS, OPT_CHARSETS_DIR=256, OPT_SET_COLLATION,OPT_START_CHECK_POS,
OPT_CORRECT_CHECKSUM, OPT_KEY_BUFFER_SIZE, OPT_CORRECT_CHECKSUM, OPT_PAGE_BUFFER_SIZE,
OPT_KEY_CACHE_BLOCK_SIZE, OPT_MARIA_BLOCK_SIZE, OPT_KEY_CACHE_BLOCK_SIZE, OPT_MARIA_BLOCK_SIZE,
OPT_READ_BUFFER_SIZE, OPT_WRITE_BUFFER_SIZE, OPT_SORT_BUFFER_SIZE, OPT_READ_BUFFER_SIZE, OPT_WRITE_BUFFER_SIZE, OPT_SORT_BUFFER_SIZE,
OPT_SORT_KEY_BLOCKS, OPT_DECODE_BITS, OPT_FT_MIN_WORD_LEN, OPT_SORT_KEY_BLOCKS, OPT_DECODE_BITS, OPT_FT_MIN_WORD_LEN,
...@@ -296,7 +296,7 @@ static struct my_option my_long_options[] = ...@@ -296,7 +296,7 @@ static struct my_option my_long_options[] =
{"wait", 'w', {"wait", 'w',
"Wait if table is locked.", "Wait if table is locked.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{ "key_buffer_size", OPT_KEY_BUFFER_SIZE, "", { "page_buffer_size", OPT_PAGE_BUFFER_SIZE, "",
(uchar**) &check_param.use_buffers, (uchar**) &check_param.use_buffers, 0, (uchar**) &check_param.use_buffers, (uchar**) &check_param.use_buffers, 0,
GET_ULONG, REQUIRED_ARG, (long) USE_BUFFER_INIT, (long) MALLOC_OVERHEAD, GET_ULONG, REQUIRED_ARG, (long) USE_BUFFER_INIT, (long) MALLOC_OVERHEAD,
(long) ~0L, (long) MALLOC_OVERHEAD, (long) IO_SIZE, 0}, (long) ~0L, (long) MALLOC_OVERHEAD, (long) IO_SIZE, 0},
......
...@@ -842,7 +842,7 @@ typedef struct st_maria_block_info ...@@ -842,7 +842,7 @@ typedef struct st_maria_block_info
#define UPDATE_AUTO_INC 8 #define UPDATE_AUTO_INC 8
#define UPDATE_OPEN_COUNT 16 #define UPDATE_OPEN_COUNT 16
#define USE_BUFFER_INIT (((1024L*512L-MALLOC_OVERHEAD)/IO_SIZE)*IO_SIZE) #define USE_BUFFER_INIT (((1024L*1024L*10-MALLOC_OVERHEAD)/8192)*8192)
#define READ_BUFFER_INIT (1024L*256L-MALLOC_OVERHEAD) #define READ_BUFFER_INIT (1024L*256L-MALLOC_OVERHEAD)
#define SORT_BUFFER_INIT (2048L*1024L-MALLOC_OVERHEAD) #define SORT_BUFFER_INIT (2048L*1024L-MALLOC_OVERHEAD)
#define MIN_SORT_BUFFER (4096-MALLOC_OVERHEAD) #define MIN_SORT_BUFFER (4096-MALLOC_OVERHEAD)
...@@ -906,7 +906,7 @@ my_bool _ma_check_status(void *param); ...@@ -906,7 +906,7 @@ my_bool _ma_check_status(void *param);
void _ma_reset_status(MARIA_HA *maria); void _ma_reset_status(MARIA_HA *maria);
#include "ma_commit.h" #include "ma_commit.h"
extern MARIA_HA *_ma_test_if_reopen(char *filename); extern MARIA_HA *_ma_test_if_reopen(const char *filename);
my_bool _ma_check_table_is_closed(const char *name, const char *where); my_bool _ma_check_table_is_closed(const char *name, const char *where);
int _ma_open_datafile(MARIA_HA *info, MARIA_SHARE *share, File file_to_dup); int _ma_open_datafile(MARIA_HA *info, MARIA_SHARE *share, File file_to_dup);
int _ma_open_keyfile(MARIA_SHARE *share); int _ma_open_keyfile(MARIA_SHARE *share);
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
#include "ma_recovery.h" #include "ma_recovery.h"
#include <my_getopt.h> #include <my_getopt.h>
#define PCACHE_SIZE (1024*1024*10)
#define LOG_FLAGS 0 #define LOG_FLAGS 0
#define LOG_FILE_SIZE (1024L*1024L) #define LOG_FILE_SIZE (1024L*1024L)
...@@ -30,7 +29,8 @@ const char *default_dbug_option= "d:t:i:O,\\maria_read_log.trace"; ...@@ -30,7 +29,8 @@ const char *default_dbug_option= "d:t:i:O,\\maria_read_log.trace";
const char *default_dbug_option= "d:t:i:o,/tmp/maria_read_log.trace"; const char *default_dbug_option= "d:t:i:o,/tmp/maria_read_log.trace";
#endif #endif
#endif /* DBUG_OFF */ #endif /* DBUG_OFF */
static my_bool opt_only_display, opt_display_and_apply; static my_bool opt_only_display, opt_apply, opt_apply_undo, opt_silent;
static ulong opt_page_buffer_size;
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
...@@ -63,7 +63,7 @@ int main(int argc, char **argv) ...@@ -63,7 +63,7 @@ int main(int argc, char **argv)
} }
/* same page cache for log and data; assumes same page size... */ /* same page cache for log and data; assumes same page size... */
DBUG_ASSERT(maria_block_size == TRANSLOG_PAGE_SIZE); DBUG_ASSERT(maria_block_size == TRANSLOG_PAGE_SIZE);
if (init_pagecache(maria_pagecache, PCACHE_SIZE, 0, 0, if (init_pagecache(maria_pagecache, opt_page_buffer_size, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) TRANSLOG_PAGE_SIZE) == 0)
{ {
fprintf(stderr, "Got error in init_pagecache() (errno: %d)\n", errno); fprintf(stderr, "Got error in init_pagecache() (errno: %d)\n", errno);
...@@ -100,8 +100,8 @@ int main(int argc, char **argv) ...@@ -100,8 +100,8 @@ int main(int argc, char **argv)
LSN_IN_PARTS(lsn)); LSN_IN_PARTS(lsn));
fprintf(stdout, "TRACE of the last maria_read_log\n"); fprintf(stdout, "TRACE of the last maria_read_log\n");
if (maria_apply_log(lsn, opt_display_and_apply, stdout, if (maria_apply_log(lsn, opt_apply, opt_silent ? NULL : stdout,
opt_display_and_apply, FALSE)) opt_apply_undo, FALSE))
goto err; goto err;
fprintf(stdout, "%s: SUCCESS\n", my_progname); fprintf(stdout, "%s: SUCCESS\n", my_progname);
...@@ -121,19 +121,32 @@ int main(int argc, char **argv) ...@@ -121,19 +121,32 @@ int main(int argc, char **argv)
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
{"apply", 'a',
"Apply log to tables. Will display a lot of information if not run with --silent",
(uchar **) &opt_apply, (uchar **) &opt_apply, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
#ifndef DBUG_OFF
{"debug", '#', "Output debug log. Often the argument is 'd:t:o,filename'.",
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
#endif
{"help", '?', "Display this help and exit.", {"help", '?', "Display this help and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{"only-display", 'o', "display brief info about records's header", {"only-display", 'o', "display brief info about records's header",
(uchar **) &opt_only_display, (uchar **) &opt_only_display, 0, GET_BOOL, (uchar **) &opt_only_display, (uchar **) &opt_only_display, 0, GET_BOOL,
NO_ARG,0, 0, 0, 0, 0, 0}, NO_ARG,0, 0, 0, 0, 0, 0},
{"display-and-apply", 'a', { "page_buffer_size", 'P', "",
"like --only-display but displays more info and modifies tables", (uchar**) &opt_page_buffer_size, (uchar**) &opt_page_buffer_size, 0,
(uchar **) &opt_display_and_apply, (uchar **) &opt_display_and_apply, 0, GET_ULONG, REQUIRED_ARG, (long) USE_BUFFER_INIT,
(long) MALLOC_OVERHEAD, (long) ~(ulong) 0, (long) MALLOC_OVERHEAD,
(long) IO_SIZE, 0},
{"silent", 's', "Print less information during apply/undo phase",
(uchar **) &opt_silent, (uchar **) &opt_silent, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
#ifndef DBUG_OFF {"undo", 'u', "Apply undos to tables. (disable with --disable-undo)",
{"debug", '#', "Output debug log. Often this is 'd:t:o,filename'.", (uchar **) &opt_apply_undo, (uchar **) &opt_apply_undo, 0,
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0}, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0},
#endif {"version", 'V', "Print version and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
}; };
...@@ -141,7 +154,7 @@ static struct my_option my_long_options[] = ...@@ -141,7 +154,7 @@ static struct my_option my_long_options[] =
static void print_version(void) static void print_version(void)
{ {
VOID(printf("%s Ver 1.0 for %s on %s\n", VOID(printf("%s Ver 1.1 for %s on %s\n",
my_progname, SYSTEM_TYPE, MACHINE_TYPE)); my_progname, SYSTEM_TYPE, MACHINE_TYPE));
NETWARE_SET_SCREEN_MODE(1); NETWARE_SET_SCREEN_MODE(1);
} }
...@@ -174,6 +187,9 @@ get_one_option(int optid __attribute__((unused)), ...@@ -174,6 +187,9 @@ get_one_option(int optid __attribute__((unused)),
case '?': case '?':
usage(); usage();
exit(0); exit(0);
case 'V':
print_version();
exit(0);
#ifndef DBUG_OFF #ifndef DBUG_OFF
case '#': case '#':
DBUG_SET_INITIAL(argument ? argument : default_dbug_option); DBUG_SET_INITIAL(argument ? argument : default_dbug_option);
...@@ -192,7 +208,10 @@ static void get_options(int *argc,char ***argv) ...@@ -192,7 +208,10 @@ static void get_options(int *argc,char ***argv)
if ((ho_error=handle_options(argc, argv, my_long_options, get_one_option))) if ((ho_error=handle_options(argc, argv, my_long_options, get_one_option)))
exit(ho_error); exit(ho_error);
if ((opt_only_display + opt_display_and_apply) != 1) if (opt_apply_undo)
opt_apply= 1;
if ((opt_only_display + opt_apply) != 1)
{ {
usage(); usage();
exit(1); exit(1);
......
...@@ -407,7 +407,7 @@ static void version() ...@@ -407,7 +407,7 @@ static void version()
static my_bool static my_bool
get_one_option(int optid, const struct my_option *opt __attribute__((unused)), get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
char *argument) char *argument __attribute__((unused)))
{ {
switch(optid) { switch(optid) {
case 'V': case 'V':
......
...@@ -363,7 +363,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -363,7 +363,7 @@ int main(int argc __attribute__((unused)), char *argv[])
read_ok(&rec); read_ok(&rec);
translog_free_record_header(&rec); translog_free_record_header(&rec);
lsn= first_lsn; lsn= first_lsn;
if (translog_init_scanner(first_lsn, 1, &scanner)) if (translog_init_scanner(first_lsn, 1, &scanner, 0))
{ {
fprintf(stderr, "scanner init failed\n"); fprintf(stderr, "scanner init failed\n");
goto err; goto err;
......
...@@ -378,7 +378,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -378,7 +378,7 @@ int main(int argc __attribute__((unused)), char *argv[])
ok(1, "read record"); ok(1, "read record");
translog_free_record_header(&rec); translog_free_record_header(&rec);
lsn= first_lsn; lsn= first_lsn;
if (translog_init_scanner(first_lsn, 1, &scanner)) if (translog_init_scanner(first_lsn, 1, &scanner, 0))
{ {
fprintf(stderr, "scanner init failed\n"); fprintf(stderr, "scanner init failed\n");
goto err; goto err;
......
...@@ -373,7 +373,7 @@ int main(int argc __attribute__((unused)), ...@@ -373,7 +373,7 @@ int main(int argc __attribute__((unused)),
bzero(indeces, sizeof(indeces)); bzero(indeces, sizeof(indeces));
if (translog_init_scanner(first_lsn, 1, &scanner)) if (translog_init_scanner(first_lsn, 1, &scanner, 0))
{ {
fprintf(stderr, "scanner init failed\n"); fprintf(stderr, "scanner init failed\n");
goto err; goto err;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment