Commit d9a71d5c authored by Annamalai Gurusami's avatar Annamalai Gurusami

Bug #16417635 INNODB FAILS TO MERGE UNDER-FILLED PAGES DEPENDING

ON DELETION ORDER

Problem:

When a InnoDB index page is under-filled, we will merge it with either
the left sibling node or the right sibling node.  But this checking is
incorrect.  When the left sibling node is available, even if merging
is not possible with left sibling node, we do not check for the 
possibility of merging with the right sibling node.  

Solution:

If left sibling node is available, and merging with left sibling node
is not possible, then check if merge with right sibling node is
possible.

rb#2506 approved by jimmy & ima.
parent c8fc3db1
......@@ -42,7 +42,21 @@ Created 6/2/1994 Heikki Tuuri
#include "ibuf0ibuf.h"
#include "trx0trx.h"
/**************************************************************//**
Checks if the page in the cursor can be merged with given page.
If necessary, re-organize the merge_page.
@return TRUE if possible to merge. */
UNIV_INTERN
ibool
btr_can_merge_with_page(
/*====================*/
btr_cur_t* cursor, /*!< in: cursor on the page to merge */
ulint page_no, /*!< in: a sibling page */
buf_block_t** merge_block, /*!< out: the merge block */
mtr_t* mtr); /*!< in: mini-transaction */
#endif /* UNIV_HOTBACKUP */
/**************************************************************//**
Report that an index page is corrupted. */
UNIV_INTERN
......@@ -3252,7 +3266,7 @@ btr_compress(
ulint left_page_no;
ulint right_page_no;
buf_block_t* merge_block;
page_t* merge_page;
page_t* merge_page = NULL;
page_zip_des_t* merge_page_zip;
ibool is_left;
buf_block_t* block;
......@@ -3260,11 +3274,8 @@ btr_compress(
btr_cur_t father_cursor;
mem_heap_t* heap;
ulint* offsets;
ulint data_size;
ulint n_recs;
ulint nth_rec = 0; /* remove bogus warning */
ulint max_ins_size;
ulint max_ins_size_reorg;
DBUG_ENTER("btr_compress");
block = btr_cur_get_block(cursor);
page = btr_cur_get_page(cursor);
......@@ -3281,10 +3292,13 @@ btr_compress(
left_page_no = btr_page_get_prev(page, mtr);
right_page_no = btr_page_get_next(page, mtr);
#if 0
fprintf(stderr, "Merge left page %lu right %lu \n",
left_page_no, right_page_no);
#endif
#ifdef UNIV_DEBUG
if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
page_rec_get_next(page_get_infimum_rec(page)),
page_is_comp(page)));
}
#endif /* UNIV_DEBUG */
heap = mem_heap_create(100);
offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
......@@ -3295,30 +3309,7 @@ btr_compress(
ut_ad(nth_rec > 0);
}
/* Decide the page to which we try to merge and which will inherit
the locks */
is_left = left_page_no != FIL_NULL;
if (is_left) {
merge_block = btr_block_get(space, zip_size, left_page_no,
RW_X_LATCH, index, mtr);
merge_page = buf_block_get_frame(merge_block);
#ifdef UNIV_BTR_DEBUG
ut_a(btr_page_get_next(merge_page, mtr)
== buf_block_get_page_no(block));
#endif /* UNIV_BTR_DEBUG */
} else if (right_page_no != FIL_NULL) {
merge_block = btr_block_get(space, zip_size, right_page_no,
RW_X_LATCH, index, mtr);
merge_page = buf_block_get_frame(merge_block);
#ifdef UNIV_BTR_DEBUG
ut_a(btr_page_get_prev(merge_page, mtr)
== buf_block_get_page_no(block));
#endif /* UNIV_BTR_DEBUG */
} else {
if (left_page_no == FIL_NULL && right_page_no == FIL_NULL) {
/* The page is the only one on the level, lift the records
to the father */
......@@ -3326,56 +3317,33 @@ btr_compress(
goto func_exit;
}
n_recs = page_get_n_recs(page);
data_size = page_get_data_size(page);
#ifdef UNIV_BTR_DEBUG
ut_a(page_is_comp(merge_page) == page_is_comp(page));
#endif /* UNIV_BTR_DEBUG */
/* Decide the page to which we try to merge and which will inherit
the locks */
max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
merge_page, n_recs);
if (data_size > max_ins_size_reorg) {
is_left = btr_can_merge_with_page(cursor, left_page_no,
&merge_block, mtr);
/* No space for merge */
err_exit:
/* We play it safe and reset the free bits. */
if (zip_size
&& page_is_leaf(merge_page)
&& !dict_index_is_clust(index)) {
ibuf_reset_free_bits(merge_block);
}
DBUG_EXECUTE_IF("ib_always_merge_right", is_left = FALSE;);
mem_heap_free(heap);
return(FALSE);
if(!is_left
&& !btr_can_merge_with_page(cursor, right_page_no, &merge_block,
mtr)) {
goto err_exit;
}
ut_ad(page_validate(merge_page, index));
max_ins_size = page_get_max_insert_size(merge_page, n_recs);
if (UNIV_UNLIKELY(data_size > max_ins_size)) {
/* We have to reorganize merge_page */
if (UNIV_UNLIKELY(!btr_page_reorganize(merge_block,
index, mtr))) {
goto err_exit;
}
max_ins_size = page_get_max_insert_size(merge_page, n_recs);
ut_ad(page_validate(merge_page, index));
ut_ad(max_ins_size == max_ins_size_reorg);
if (UNIV_UNLIKELY(data_size > max_ins_size)) {
/* Add fault tolerance, though this should
never happen */
merge_page = buf_block_get_frame(merge_block);
goto err_exit;
}
#ifdef UNIV_BTR_DEBUG
if (is_left) {
ut_a(btr_page_get_next(merge_page, mtr)
== buf_block_get_page_no(block));
} else {
ut_a(btr_page_get_prev(merge_page, mtr)
== buf_block_get_page_no(block));
}
#endif /* UNIV_BTR_DEBUG */
ut_ad(page_validate(merge_page, index));
merge_page_zip = buf_block_get_page_zip(merge_block);
#ifdef UNIV_ZIP_DEBUG
......@@ -3411,11 +3379,18 @@ btr_compress(
}
} else {
rec_t* orig_succ;
ibool compressed;
ulint err;
btr_cur_t cursor2;
/* father cursor pointing to node ptr
of the right sibling */
#ifdef UNIV_BTR_DEBUG
byte fil_page_prev[4];
#endif /* UNIV_BTR_DEBUG */
if (UNIV_LIKELY_NULL(merge_page_zip)) {
btr_page_get_father(index, merge_block, mtr, &cursor2);
if (merge_page_zip && left_page_no == FIL_NULL) {
/* The function page_zip_compress(), which will be
invoked by page_copy_rec_list_end() below,
requires that FIL_PAGE_PREV be FIL_NULL.
......@@ -3436,9 +3411,12 @@ btr_compress(
if (UNIV_UNLIKELY(!orig_succ)) {
ut_a(merge_page_zip);
#ifdef UNIV_BTR_DEBUG
/* FIL_PAGE_PREV was restored from merge_page_zip. */
ut_a(!memcmp(fil_page_prev,
merge_page + FIL_PAGE_PREV, 4));
if (left_page_no == FIL_NULL) {
/* FIL_PAGE_PREV was restored from
merge_page_zip. */
ut_a(!memcmp(fil_page_prev,
merge_page + FIL_PAGE_PREV, 4));
}
#endif /* UNIV_BTR_DEBUG */
goto err_exit;
}
......@@ -3446,7 +3424,7 @@ btr_compress(
btr_search_drop_page_hash_index(block);
#ifdef UNIV_BTR_DEBUG
if (UNIV_LIKELY_NULL(merge_page_zip)) {
if (merge_page_zip && left_page_no == FIL_NULL) {
/* Restore FIL_PAGE_PREV in order to avoid an assertion
failure in btr_level_list_remove(), which will set
the field again to FIL_NULL. Even though this makes
......@@ -3462,12 +3440,18 @@ btr_compress(
/* Replace the address of the old child node (= page) with the
address of the merge page to the right */
btr_node_ptr_set_child_page_no(
btr_cur_get_rec(&father_cursor),
btr_cur_get_page_zip(&father_cursor),
offsets, right_page_no, mtr);
btr_node_ptr_delete(index, merge_block, mtr);
compressed = btr_cur_pessimistic_delete(&err, TRUE, &cursor2,
RB_NONE, mtr);
ut_a(err == DB_SUCCESS);
if (!compressed) {
btr_cur_compress_if_useful(&cursor2, FALSE, mtr);
}
lock_update_merge_right(merge_block, orig_succ, block);
}
......@@ -3535,8 +3519,19 @@ btr_compress(
page_rec_get_nth(merge_block->frame, nth_rec),
merge_block, cursor);
}
DBUG_RETURN(TRUE);
return(TRUE);
err_exit:
/* We play it safe and reset the free bits. */
if (zip_size
&& merge_page
&& page_is_leaf(merge_page)
&& !dict_index_is_clust(index)) {
ibuf_reset_free_bits(merge_block);
}
mem_heap_free(heap);
DBUG_RETURN(FALSE);
}
/*************************************************************//**
......@@ -4532,4 +4527,86 @@ btr_validate_index(
return(TRUE);
}
/**************************************************************//**
Checks if the page in the cursor can be merged with given page.
If necessary, re-organize the merge_page.
@return TRUE if possible to merge. */
UNIV_INTERN
ibool
btr_can_merge_with_page(
/*====================*/
btr_cur_t* cursor, /*!< in: cursor on the page to merge */
ulint page_no, /*!< in: a sibling page */
buf_block_t** merge_block, /*!< out: the merge block */
mtr_t* mtr) /*!< in: mini-transaction */
{
dict_index_t* index;
page_t* page;
ulint space;
ulint zip_size;
ulint n_recs;
ulint data_size;
ulint max_ins_size_reorg;
ulint max_ins_size;
buf_block_t* mblock;
page_t* mpage;
DBUG_ENTER("btr_can_merge_with_page");
if (page_no == FIL_NULL) {
goto error;
}
index = btr_cur_get_index(cursor);
page = btr_cur_get_page(cursor);
space = dict_index_get_space(index);
zip_size = dict_table_zip_size(index->table);
mblock = btr_block_get(space, zip_size, page_no, RW_X_LATCH, index,
mtr);
mpage = buf_block_get_frame(mblock);
n_recs = page_get_n_recs(page);
data_size = page_get_data_size(page);
max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
mpage, n_recs);
if (data_size > max_ins_size_reorg) {
goto error;
}
max_ins_size = page_get_max_insert_size(mpage, n_recs);
if (data_size > max_ins_size) {
/* We have to reorganize mpage */
if (!btr_page_reorganize(mblock, index, mtr)) {
goto error;
}
max_ins_size = page_get_max_insert_size(mpage, n_recs);
ut_ad(page_validate(mpage, index));
ut_ad(max_ins_size == max_ins_size_reorg);
if (data_size > max_ins_size) {
/* Add fault tolerance, though this should
never happen */
goto error;
}
}
*merge_block = mblock;
DBUG_RETURN(TRUE);
error:
*merge_block = NULL;
DBUG_RETURN(FALSE);
}
#endif /* !UNIV_HOTBACKUP */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment