Commit 26f98859 authored by Michael Widenius's avatar Michael Widenius

Aria fixes:

- Don't delete pages without flushing that has had a tail or head information in pagecache_delete()
  This fixes a case where REPAIR could find old deleted rows.


storage/maria/ha_maria.cc:
  Remove calls to depricated function ha_statistic_increment
storage/maria/ma_blockrec.c:
  Don't delete pages without flushing that has had a tail or head information in pagecache_delete()
storage/maria/ma_pagecache.c:
  Added possibility to mark pages to not be deleted by pagecache_delete() without beeing flushed.
storage/maria/ma_pagecache.h:
  Added new prototype
parent fdaaf48c
...@@ -1059,8 +1059,6 @@ int ha_maria::close(void) ...@@ -1059,8 +1059,6 @@ int ha_maria::close(void)
int ha_maria::write_row(uchar * buf) int ha_maria::write_row(uchar * buf)
{ {
ha_statistic_increment(&SSV::ha_write_count);
/* If we have a timestamp column, update it to the current time */ /* If we have a timestamp column, update it to the current time */
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
table->timestamp_field->set_time(); table->timestamp_field->set_time();
...@@ -2131,7 +2129,6 @@ bool ha_maria::is_crashed() const ...@@ -2131,7 +2129,6 @@ bool ha_maria::is_crashed() const
int ha_maria::update_row(const uchar * old_data, uchar * new_data) int ha_maria::update_row(const uchar * old_data, uchar * new_data)
{ {
CHECK_UNTIL_WE_FULLY_IMPLEMENTED_VERSIONING("UPDATE in WRITE CONCURRENT"); CHECK_UNTIL_WE_FULLY_IMPLEMENTED_VERSIONING("UPDATE in WRITE CONCURRENT");
ha_statistic_increment(&SSV::ha_update_count);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
table->timestamp_field->set_time(); table->timestamp_field->set_time();
return maria_update(file, old_data, new_data); return maria_update(file, old_data, new_data);
...@@ -2141,7 +2138,6 @@ int ha_maria::update_row(const uchar * old_data, uchar * new_data) ...@@ -2141,7 +2138,6 @@ int ha_maria::update_row(const uchar * old_data, uchar * new_data)
int ha_maria::delete_row(const uchar * buf) int ha_maria::delete_row(const uchar * buf)
{ {
CHECK_UNTIL_WE_FULLY_IMPLEMENTED_VERSIONING("DELETE in WRITE CONCURRENT"); CHECK_UNTIL_WE_FULLY_IMPLEMENTED_VERSIONING("DELETE in WRITE CONCURRENT");
ha_statistic_increment(&SSV::ha_delete_count);
return maria_delete(file, buf); return maria_delete(file, buf);
} }
...@@ -2151,7 +2147,6 @@ int ha_maria::index_read_map(uchar * buf, const uchar * key, ...@@ -2151,7 +2147,6 @@ int ha_maria::index_read_map(uchar * buf, const uchar * key,
enum ha_rkey_function find_flag) enum ha_rkey_function find_flag)
{ {
DBUG_ASSERT(inited == INDEX); DBUG_ASSERT(inited == INDEX);
ha_statistic_increment(&SSV::ha_read_key_count);
int error= maria_rkey(file, buf, active_index, key, keypart_map, find_flag); int error= maria_rkey(file, buf, active_index, key, keypart_map, find_flag);
table->status= error ? STATUS_NOT_FOUND : 0; table->status= error ? STATUS_NOT_FOUND : 0;
return error; return error;
...@@ -2162,7 +2157,6 @@ int ha_maria::index_read_idx_map(uchar * buf, uint index, const uchar * key, ...@@ -2162,7 +2157,6 @@ int ha_maria::index_read_idx_map(uchar * buf, uint index, const uchar * key,
key_part_map keypart_map, key_part_map keypart_map,
enum ha_rkey_function find_flag) enum ha_rkey_function find_flag)
{ {
ha_statistic_increment(&SSV::ha_read_key_count);
int error= maria_rkey(file, buf, index, key, keypart_map, find_flag); int error= maria_rkey(file, buf, index, key, keypart_map, find_flag);
table->status= error ? STATUS_NOT_FOUND : 0; table->status= error ? STATUS_NOT_FOUND : 0;
return error; return error;
...@@ -2174,7 +2168,6 @@ int ha_maria::index_read_last_map(uchar * buf, const uchar * key, ...@@ -2174,7 +2168,6 @@ int ha_maria::index_read_last_map(uchar * buf, const uchar * key,
{ {
DBUG_ENTER("ha_maria::index_read_last_map"); DBUG_ENTER("ha_maria::index_read_last_map");
DBUG_ASSERT(inited == INDEX); DBUG_ASSERT(inited == INDEX);
ha_statistic_increment(&SSV::ha_read_key_count);
int error= maria_rkey(file, buf, active_index, key, keypart_map, int error= maria_rkey(file, buf, active_index, key, keypart_map,
HA_READ_PREFIX_LAST); HA_READ_PREFIX_LAST);
table->status= error ? STATUS_NOT_FOUND : 0; table->status= error ? STATUS_NOT_FOUND : 0;
...@@ -2185,7 +2178,6 @@ int ha_maria::index_read_last_map(uchar * buf, const uchar * key, ...@@ -2185,7 +2178,6 @@ int ha_maria::index_read_last_map(uchar * buf, const uchar * key,
int ha_maria::index_next(uchar * buf) int ha_maria::index_next(uchar * buf)
{ {
DBUG_ASSERT(inited == INDEX); DBUG_ASSERT(inited == INDEX);
ha_statistic_increment(&SSV::ha_read_next_count);
int error= maria_rnext(file, buf, active_index); int error= maria_rnext(file, buf, active_index);
table->status= error ? STATUS_NOT_FOUND : 0; table->status= error ? STATUS_NOT_FOUND : 0;
return error; return error;
...@@ -2195,7 +2187,6 @@ int ha_maria::index_next(uchar * buf) ...@@ -2195,7 +2187,6 @@ int ha_maria::index_next(uchar * buf)
int ha_maria::index_prev(uchar * buf) int ha_maria::index_prev(uchar * buf)
{ {
DBUG_ASSERT(inited == INDEX); DBUG_ASSERT(inited == INDEX);
ha_statistic_increment(&SSV::ha_read_prev_count);
int error= maria_rprev(file, buf, active_index); int error= maria_rprev(file, buf, active_index);
table->status= error ? STATUS_NOT_FOUND : 0; table->status= error ? STATUS_NOT_FOUND : 0;
return error; return error;
...@@ -2205,7 +2196,6 @@ int ha_maria::index_prev(uchar * buf) ...@@ -2205,7 +2196,6 @@ int ha_maria::index_prev(uchar * buf)
int ha_maria::index_first(uchar * buf) int ha_maria::index_first(uchar * buf)
{ {
DBUG_ASSERT(inited == INDEX); DBUG_ASSERT(inited == INDEX);
ha_statistic_increment(&SSV::ha_read_first_count);
int error= maria_rfirst(file, buf, active_index); int error= maria_rfirst(file, buf, active_index);
table->status= error ? STATUS_NOT_FOUND : 0; table->status= error ? STATUS_NOT_FOUND : 0;
return error; return error;
...@@ -2215,7 +2205,6 @@ int ha_maria::index_first(uchar * buf) ...@@ -2215,7 +2205,6 @@ int ha_maria::index_first(uchar * buf)
int ha_maria::index_last(uchar * buf) int ha_maria::index_last(uchar * buf)
{ {
DBUG_ASSERT(inited == INDEX); DBUG_ASSERT(inited == INDEX);
ha_statistic_increment(&SSV::ha_read_last_count);
int error= maria_rlast(file, buf, active_index); int error= maria_rlast(file, buf, active_index);
table->status= error ? STATUS_NOT_FOUND : 0; table->status= error ? STATUS_NOT_FOUND : 0;
return error; return error;
...@@ -2228,7 +2217,6 @@ int ha_maria::index_next_same(uchar * buf, ...@@ -2228,7 +2217,6 @@ int ha_maria::index_next_same(uchar * buf,
{ {
int error; int error;
DBUG_ASSERT(inited == INDEX); DBUG_ASSERT(inited == INDEX);
ha_statistic_increment(&SSV::ha_read_next_count);
/* /*
TODO: Delete this loop in Maria 1.5 as versioning will ensure this never TODO: Delete this loop in Maria 1.5 as versioning will ensure this never
happens happens
...@@ -2260,7 +2248,6 @@ int ha_maria::rnd_end() ...@@ -2260,7 +2248,6 @@ int ha_maria::rnd_end()
int ha_maria::rnd_next(uchar *buf) int ha_maria::rnd_next(uchar *buf)
{ {
ha_statistic_increment(&SSV::ha_read_rnd_next_count);
int error= maria_scan(file, buf); int error= maria_scan(file, buf);
table->status= error ? STATUS_NOT_FOUND : 0; table->status= error ? STATUS_NOT_FOUND : 0;
return error; return error;
...@@ -2282,7 +2269,6 @@ int ha_maria::restart_rnd_next(uchar *buf) ...@@ -2282,7 +2269,6 @@ int ha_maria::restart_rnd_next(uchar *buf)
int ha_maria::rnd_pos(uchar *buf, uchar *pos) int ha_maria::rnd_pos(uchar *buf, uchar *pos)
{ {
ha_statistic_increment(&SSV::ha_read_rnd_count);
int error= maria_rrnd(file, buf, my_get_ptr(pos, ref_length)); int error= maria_rrnd(file, buf, my_get_ptr(pos, ref_length));
table->status= error ? STATUS_NOT_FOUND : 0; table->status= error ? STATUS_NOT_FOUND : 0;
return error; return error;
......
...@@ -4181,6 +4181,13 @@ static my_bool delete_head_or_tail(MARIA_HA *info, ...@@ -4181,6 +4181,13 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
log_data, NULL)) log_data, NULL))
DBUG_RETURN(1); DBUG_RETURN(1);
} }
/*
Mark that this page must be written to disk by page cache, even
if we could call pagecache_delete() on it.
This is needed to ensure that repair finds the empty page on disk
and not old data.
*/
pagecache_set_write_on_delete_by_link(page_link.link);
DBUG_ASSERT(empty_space >= share->bitmap.sizes[0]); DBUG_ASSERT(empty_space >= share->bitmap.sizes[0]);
} }
......
...@@ -158,6 +158,7 @@ struct st_pagecache_hash_link ...@@ -158,6 +158,7 @@ struct st_pagecache_hash_link
#define PCBLOCK_IN_FLUSH 16 /* block is in flush operation */ #define PCBLOCK_IN_FLUSH 16 /* block is in flush operation */
#define PCBLOCK_CHANGED 32 /* block buffer contains a dirty page */ #define PCBLOCK_CHANGED 32 /* block buffer contains a dirty page */
#define PCBLOCK_DIRECT_W 64 /* possible direct write to the block */ #define PCBLOCK_DIRECT_W 64 /* possible direct write to the block */
#define PCBLOCK_DEL_WRITE 128 /* should be written on delete */
/* page status, returned by find_block */ /* page status, returned by find_block */
#define PAGE_READ 0 #define PAGE_READ 0
...@@ -1215,7 +1216,7 @@ static void link_to_file_list(PAGECACHE *pagecache, ...@@ -1215,7 +1216,7 @@ static void link_to_file_list(PAGECACHE *pagecache,
link_changed(block, &pagecache->file_blocks[FILE_HASH(*file)]); link_changed(block, &pagecache->file_blocks[FILE_HASH(*file)]);
if (block->status & PCBLOCK_CHANGED) if (block->status & PCBLOCK_CHANGED)
{ {
block->status&= ~PCBLOCK_CHANGED; block->status&= ~(PCBLOCK_CHANGED | PCBLOCK_DEL_WRITE);
block->rec_lsn= LSN_MAX; block->rec_lsn= LSN_MAX;
pagecache->blocks_changed--; pagecache->blocks_changed--;
pagecache->global_blocks_changed--; pagecache->global_blocks_changed--;
...@@ -3472,6 +3473,31 @@ uchar *pagecache_read(PAGECACHE *pagecache, ...@@ -3472,6 +3473,31 @@ uchar *pagecache_read(PAGECACHE *pagecache,
} }
/*
@brief Set/reset flag that page always should be flushed on delete
@param pagecache pointer to a page cache data structure
@param link direct link to page (returned by read or write)
@param write write on delete flag value
*/
void pagecache_set_write_on_delete_by_link(PAGECACHE_BLOCK_LINK *block)
{
DBUG_ENTER("pagecache_set_write_on_delete_by_link");
DBUG_PRINT("enter", ("fd: %d block 0x%lx %d -> TRUE",
block->hash_link->file.file,
(ulong) block,
(int) block->status & PCBLOCK_DEL_WRITE));
DBUG_ASSERT(block->pins); /* should be pinned */
DBUG_ASSERT(block->wlocks); /* should be write locked */
block->status|= PCBLOCK_DEL_WRITE;
DBUG_VOID_RETURN;
}
/* /*
@brief Delete page from the buffer (common part for link and file/page) @brief Delete page from the buffer (common part for link and file/page)
...@@ -3501,6 +3527,7 @@ static my_bool pagecache_delete_internal(PAGECACHE *pagecache, ...@@ -3501,6 +3527,7 @@ static my_bool pagecache_delete_internal(PAGECACHE *pagecache,
} }
if (block->status & PCBLOCK_CHANGED) if (block->status & PCBLOCK_CHANGED)
{ {
flush= (flush || (block->status & PCBLOCK_DEL_WRITE));
if (flush) if (flush)
{ {
/* The block contains a dirty page - push it out of the cache */ /* The block contains a dirty page - push it out of the cache */
......
...@@ -251,6 +251,7 @@ extern void pagecache_unpin(PAGECACHE *pagecache, ...@@ -251,6 +251,7 @@ extern void pagecache_unpin(PAGECACHE *pagecache,
extern void pagecache_unpin_by_link(PAGECACHE *pagecache, extern void pagecache_unpin_by_link(PAGECACHE *pagecache,
PAGECACHE_BLOCK_LINK *link, PAGECACHE_BLOCK_LINK *link,
LSN lsn); LSN lsn);
extern void pagecache_set_write_on_delete_by_link(PAGECACHE_BLOCK_LINK *block);
/* Results of flush operation (bit field in fact) */ /* Results of flush operation (bit field in fact) */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment