Commit 16f3d271 authored by marko's avatar marko

branches/zip: Fix various bugs.

btr_root_raise_and_insert(): Because btr_page_set_level() must not
change level from non-zero to zero on compressed pages, invoke
btr_page_set_level() with page_zip==NULL and compress the entire
root page after creating it from the scratch.

btr_attach_half_pages(): Simplify the computation of lower_page_zip
and upper_page_zip.  Invoke btr_node_ptr_set_child_page_no() with
the correct page_zip.

page0page.h: Add __attribute__((const)) to functions testing for
infimum or supremum.

page_zip_dir_delete(): Note that the third parameter may be NULL.
Correct offset errors.

page_zip_available(): Use n_heap instead of n_recs.

page_zip_dir_find(), page_zip_dir_find_free(): Fix off-by-one error.

page_zip_fields_encode(), page_zip_fields_decode(): Encode and decode
index->n_nullable for non-leaf pages.

page_zip_apply_log(): Write REC_NEW_HEAP_NO before calling
rec_offs_make_valid().

page_zip_write_node_ptr(): Correct off-by-one error.

page_cur_search_with_match(): Make use of page_is_leaf().

page_dir_add_slots(): Replaced with page_dir_add_slot().  Use memmove().
parent eeedd93a
......@@ -1064,17 +1064,14 @@ btr_root_raise_and_insert(
moving the root records to the new page, emptying the root, putting
a node pointer to the new page, and then splitting the new page. */
new_page = btr_page_alloc(tree, 0, FSP_NO_DIR,
btr_page_get_level(root, mtr), mtr);
level = btr_page_get_level(root, mtr);
btr_page_create(new_page, tree, mtr);
new_page = btr_page_alloc(tree, 0, FSP_NO_DIR, level, mtr);
root_page_zip = buf_block_get_page_zip(buf_block_align(root));
level = btr_page_get_level(root, mtr);
btr_page_create(new_page, tree, mtr);
/* Set the levels of the new index page and root page */
/* Set the level of the new index page */
btr_page_set_level(new_page, NULL, level, mtr);
btr_page_set_level(root, root_page_zip, level + 1, mtr);
/* Set the next node and previous node fields of new page */
btr_page_set_next(new_page, NULL, FIL_NULL, mtr);
......@@ -1084,9 +1081,11 @@ btr_root_raise_and_insert(
new_page_zip = buf_block_get_page_zip(buf_block_align(new_page));
if (UNIV_UNLIKELY(!page_move_rec_list_end(new_page, new_page_zip,
page_get_infimum_rec(root), root_page_zip,
cursor->index, mtr))) {
if (UNIV_UNLIKELY(!page_copy_rec_list_end(new_page, new_page_zip,
page_get_infimum_rec(root), cursor->index, mtr))) {
/* This should always succeed, as new_page
is created from the scratch and receives
the records in sorted order. */
ut_error;
}
......@@ -1108,13 +1107,11 @@ btr_root_raise_and_insert(
node_ptr = dict_tree_build_node_ptr(tree, rec, new_page_no, heap,
level);
/* Reorganize the root to get free space */
if (!btr_page_reorganize_low(FALSE, root, root_page_zip,
cursor->index, mtr)) {
/* The page should be empty at this point.
Thus, the operation should succeed. */
ut_error;
}
/* Rebuild the root page to get free space */
root_page_zip = buf_block_get_page_zip(buf_block_align(root));
btr_page_set_level(root, NULL, level + 1, mtr);
page_create(root, root_page_zip, mtr, cursor->index);
buf_block_align(root)->check_index_page_at_flush = TRUE;
page_cursor = btr_cur_get_page_cur(cursor);
......@@ -1552,6 +1549,9 @@ btr_attach_half_pages(
upper_page_no = buf_frame_get_page_no(page);
lower_page = new_page;
upper_page = page;
lower_page_zip = buf_block_get_page_zip(
buf_block_align(new_page));
upper_page_zip = page_zip;
/* Look from the tree for the node pointer to page */
node_ptr = btr_page_get_father_node_ptr(tree, page, mtr);
......@@ -1559,7 +1559,8 @@ btr_attach_half_pages(
/* Replace the address of the old child node (= page) with the
address of the new lower half */
btr_node_ptr_set_child_page_no(node_ptr, page_zip,
btr_node_ptr_set_child_page_no(node_ptr,
buf_block_get_page_zip(buf_block_align(node_ptr)),
rec_get_offsets(node_ptr,
UT_LIST_GET_FIRST(tree->tree_indexes),
NULL, ULINT_UNDEFINED, &heap),
......@@ -1570,11 +1571,11 @@ btr_attach_half_pages(
upper_page_no = buf_frame_get_page_no(new_page);
lower_page = page;
upper_page = new_page;
lower_page_zip = page_zip;
upper_page_zip = buf_block_get_page_zip(
buf_block_align(new_page));
}
lower_page_zip = buf_block_get_page_zip(buf_block_align(lower_page));
upper_page_zip = buf_block_get_page_zip(buf_block_align(upper_page));
/* Get the level of the split pages */
level = btr_page_get_level(page, mtr);
......
......@@ -447,7 +447,8 @@ ibool
page_rec_is_user_rec_low(
/*=====================*/
/* out: TRUE if a user record */
ulint offset);/* in: record offset on page */
ulint offset) /* in: record offset on page */
__attribute__((const));
/****************************************************************
TRUE if the record is the supremum record on a page. */
UNIV_INLINE
......@@ -455,7 +456,8 @@ ibool
page_rec_is_supremum_low(
/*=====================*/
/* out: TRUE if the supremum record */
ulint offset);/* in: record offset on page */
ulint offset) /* in: record offset on page */
__attribute__((const));
/****************************************************************
TRUE if the record is the infimum record on a page. */
UNIV_INLINE
......@@ -463,7 +465,8 @@ ibool
page_rec_is_infimum_low(
/*====================*/
/* out: TRUE if the infimum record */
ulint offset);/* in: record offset on page */
ulint offset) /* in: record offset on page */
__attribute__((const));
/****************************************************************
TRUE if the record is a user record on the page. */
......@@ -472,7 +475,8 @@ ibool
page_rec_is_user_rec(
/*=================*/
/* out: TRUE if a user record */
const rec_t* rec); /* in: record */
const rec_t* rec) /* in: record */
__attribute__((const));
/****************************************************************
TRUE if the record is the supremum record on a page. */
UNIV_INLINE
......@@ -480,7 +484,9 @@ ibool
page_rec_is_supremum(
/*=================*/
/* out: TRUE if the supremum record */
const rec_t* rec); /* in: record */
const rec_t* rec) /* in: record */
__attribute__((const));
/****************************************************************
TRUE if the record is the infimum record on a page. */
UNIV_INLINE
......@@ -488,7 +494,8 @@ ibool
page_rec_is_infimum(
/*================*/
/* out: TRUE if the infimum record */
const rec_t* rec); /* in: record */
const rec_t* rec) /* in: record */
__attribute__((const));
/*******************************************************************
Looks for the record which owns the given record. */
UNIV_INLINE
......
......@@ -112,7 +112,7 @@ page_zip_dir_delete(
page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* rec, /* in: deleted record */
const byte* free) /* in: previous start of the free list */
__attribute__((nonnull));
__attribute__((nonnull(1,2)));
/**************************************************************************
Add a slot to the dense page directory. */
......
......@@ -228,7 +228,7 @@ page_zip_dir_delete(
page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* rec, /* in: deleted record */
const byte* free) /* in: previous start of the free list */
__attribute__((nonnull));
__attribute__((nonnull(1,2)));
/**************************************************************************
Add a slot to the dense page directory. */
......
......@@ -210,7 +210,7 @@ page_zip_available(
+ REC_NODE_PTR_SIZE;
}
trailer_len = page_get_n_recs((page_t*) page_zip->data)
trailer_len = (page_dir_get_n_heap((page_t*) page_zip->data) - 2)
* uncompressed_size
+ page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE;
......
......@@ -254,7 +254,7 @@ page_cur_search_with_match(
page_check_dir(page);
#ifdef PAGE_CUR_ADAPT
if ((page_header_get_field(page, PAGE_LEVEL) == 0)
if (page_is_leaf(page)
&& (mode == PAGE_CUR_LE)
&& (page_header_get_field(page, PAGE_N_DIRECTION) > 3)
&& (page_header_get_ptr(page, PAGE_LAST_INSERT))
......
......@@ -1175,39 +1175,27 @@ in the added slots or update n_owned values: this is the responsibility
of the caller. */
UNIV_INLINE
void
page_dir_add_slots(
/*===============*/
page_dir_add_slot(
/*==============*/
page_t* page, /* in/out: the index page */
page_zip_des_t* page_zip,/* in/out: comprssed page, or NULL */
ulint start, /* in: the slot above which the new slots
ulint start) /* in: the slot above which the new slots
are added */
ulint n) /* in: number of slots to add
(currently only n == 1 allowed) */
{
page_dir_slot_t* slot;
ulint n_slots;
ulint i;
rec_t* rec;
ut_ad(n == 1);
n_slots = page_dir_get_n_slots(page);
ut_ad(start < n_slots - 1);
/* Update the page header */
page_dir_set_n_slots(page, page_zip, n_slots + n);
page_dir_set_n_slots(page, page_zip, n_slots + 1);
/* Move slots up */
for (i = n_slots - 1; i > start; i--) {
slot = page_dir_get_nth_slot(page, i);
rec = page_dir_slot_get_rec(slot);
slot = page_dir_get_nth_slot(page, i + n);
page_dir_slot_set_rec(slot, rec);
}
slot = page_dir_get_nth_slot(page, n_slots);
memmove(slot, slot + PAGE_DIR_SLOT_SIZE,
(n_slots - 1 - start) * PAGE_DIR_SLOT_SIZE);
}
/********************************************************************
......@@ -1252,7 +1240,7 @@ page_dir_split_slot(
/* 2. We add one directory slot immediately below the slot to be
split. */
page_dir_add_slots(page, page_zip, slot_no - 1, 1);
page_dir_add_slot(page, page_zip, slot_no - 1);
/* The added slot is now number slot_no, and the old slot is
now number slot_no + 1 */
......
......@@ -95,7 +95,7 @@ page_zip_dir_find(
ut_ad(page_zip_simple_validate(page_zip));
end = page_zip->data + page_zip->size - PAGE_ZIP_DIR_SLOT_SIZE;
end = page_zip->data + page_zip->size;
slot = end - page_zip_dir_user_size(page_zip);
for (; slot < end; slot += PAGE_ZIP_DIR_SLOT_SIZE) {
......@@ -126,7 +126,7 @@ page_zip_dir_find_free(
slot = end = page_zip->data + page_zip->size;
slot -= page_zip_dir_size(page_zip);
end -= PAGE_ZIP_DIR_SLOT_SIZE + page_zip_dir_user_size(page_zip);
end -= page_zip_dir_user_size(page_zip);
for (; slot < end; slot += PAGE_ZIP_DIR_SLOT_SIZE) {
if ((mach_read_from_2(slot) & PAGE_ZIP_DIR_SLOT_MASK)
......@@ -287,15 +287,20 @@ page_zip_fields_encode(
if (trx_id_pos != ULINT_UNDEFINED) {
/* Write out the position of the trx_id column */
if (trx_id_col < 128) {
*buf++ = trx_id_col;
} else {
*buf++ = 0x80 | trx_id_col >> 8;
*buf++ = 0xff & trx_id_col;
}
i = trx_id_col;
} else {
/* Write out the number of nullable fields */
i = index->n_nullable;
}
ut_ad((ulint) (buf - buf_start) <= (n + 1) * 2);
if (i < 128) {
*buf++ = i;
} else {
*buf++ = 0x80 | i >> 8;
*buf++ = 0xff & i;
}
ut_ad((ulint) (buf - buf_start) <= (n + 2) * 2);
return((ulint) (buf - buf_start));
}
......@@ -821,6 +826,7 @@ page_zip_fields_decode(
const byte* b;
ulint n;
ulint i;
ulint val;
dict_table_t* table;
dict_index_t* index;
......@@ -831,16 +837,14 @@ page_zip_fields_decode(
}
}
n--; /* n_nullable or trx_id */
if (UNIV_UNLIKELY(n > REC_MAX_N_FIELDS)
|| UNIV_UNLIKELY(b > end)) {
return(NULL);
}
if (trx_id_col) {
n--;
}
table = dict_mem_table_create("ZIP_DUMMY", DICT_HDR_SPACE, n, TRUE);
index = dict_mem_index_create("ZIP_DUMMY", "ZIP_DUMMY",
DICT_HDR_SPACE, 0, n);
......@@ -851,10 +855,11 @@ page_zip_fields_decode(
/* Initialize the fields. */
for (b = buf, i = 0; i < n; i++) {
ulint val = *b++;
ulint mtype;
ulint len;
val = *b++;
if (UNIV_UNLIKELY(val & 0x80)) {
/* fixed length > 62 bytes */
val = (val & 0x7f) << 8 | *b++;
......@@ -880,13 +885,13 @@ page_zip_fields_decode(
dict_table_get_nth_col(table, i), 0);
}
val = *b++;
if (UNIV_UNLIKELY(val & 0x80)) {
val = (val & 0x7f) << 8 | *b++;
}
/* Decode the position of the trx_id column. */
if (trx_id_col) {
ulint val = *b++;
if (UNIV_UNLIKELY(val & 0x80)) {
val = (val & 0x7f) << 8 | *b++;
}
if (UNIV_UNLIKELY(val >= n)) {
page_zip_fields_free(index);
index = NULL;
......@@ -897,6 +902,14 @@ page_zip_fields_decode(
}
*trx_id_col = val;
} else {
/* Decode the number of nullable fields. */
if (UNIV_UNLIKELY(index->n_nullable > val)) {
page_zip_fields_free(index);
index = NULL;
} else {
index->n_nullable = val;
}
}
ut_ad(b == end);
......@@ -1132,13 +1145,15 @@ page_zip_apply_log(
heap_status += 1 << REC_HEAP_NO_SHIFT;
}
mach_write_to_2(rec - REC_NEW_HEAP_NO, hs);
#if REC_STATUS_NODE_PTR != TRUE
# error "REC_STATUS_NODE_PTR != TRUE"
#endif
rec_get_offsets_reverse(data, index,
hs & REC_STATUS_NODE_PTR,
offsets);
rec_offs_make_valid(rec, index, offsets);
mach_write_to_2(rec - REC_NEW_HEAP_NO, hs);
/* Copy the extra bytes (backwards). */
{
byte* start = rec_get_start(rec, offsets);
......@@ -1963,7 +1978,7 @@ page_zip_write_node_ptr(
storage = page_zip->data + page_zip->size
- (page_dir_get_n_heap(page) - 2)
* PAGE_ZIP_DIR_SLOT_SIZE
- (rec_get_heap_no_new(rec) - 2) * REC_NODE_PTR_SIZE;
- (rec_get_heap_no_new(rec) - 1) * REC_NODE_PTR_SIZE;
field = rec + size - REC_NODE_PTR_SIZE;
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
......@@ -2200,28 +2215,30 @@ page_zip_dir_delete(
byte* slot_rec;
byte* slot_free;
ut_ad(rec);
slot_rec = page_zip_dir_find(page_zip,
ut_align_offset(rec, UNIV_PAGE_SIZE));
slot_free = page_zip_dir_find_free(page_zip,
ut_align_offset(free, UNIV_PAGE_SIZE));
ut_a(slot_rec);
if (UNIV_UNLIKELY(!slot_free)) {
if (UNIV_UNLIKELY(!free)) {
/* Make the last slot the start of the free list. */
slot_free = page_zip->data + page_zip->size
- PAGE_ZIP_DIR_SLOT_SIZE
* page_dir_get_n_heap(page_zip->data);
* (page_dir_get_n_heap(page_zip->data) - 2);
} else {
slot_free = page_zip_dir_find_free(page_zip,
ut_align_offset(free, UNIV_PAGE_SIZE));
ut_a(slot_free < slot_rec);
/* Grow the free list by one slot by moving the start. */
slot_free += PAGE_ZIP_DIR_SLOT_SIZE;
}
if (UNIV_LIKELY(slot_free < slot_rec)) {
if (UNIV_LIKELY(slot_rec > slot_free)) {
memmove(slot_free + PAGE_ZIP_DIR_SLOT_SIZE,
slot_free,
slot_rec - slot_free - PAGE_ZIP_DIR_SLOT_SIZE);
slot_rec - slot_free);
}
/* Write the entry for the deleted record.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment