Commit efd79c7a authored by marko's avatar marko

branches/zip: Enable the insert buffer on compressed tablespaces.

page_zip_max_ins_size(): New function.

btr_cur_optimistic_insert(), btr_cur_optimistic_delete(),
btr_page_split_and_insert(), btr_compress(): Do not update the
ibuf free bits for non-leaf pages or pages belonging to a clustered index.
The insert buffer only covers operations on leaf pages of secondary indexes.
For pages covered by the insert buffer, limit the max_ins_size to
page_zip_max_ins_size().

buf_page_get_gen(): Merge the insert buffer after decompressing the page.

buf_page_io_complete(): Relax the assertion about ibuf_count.  For
compressed-only pages, the insert buffer merge takes place
in buf_page_get_gen().

ibuf_index_page_calc_free_bits(), ibuf_index_page_calc_free_from_bits(),
ibuf_index_page_calc_free(), ibuf_update_free_bits_if_full(),
ibuf_update_free_bits_low(), ibuf_update_free_bits_for_two_pages_low(),
ibuf_set_free_bits_low(): Add the parameter zip_size.  Limit the maximum
insert size to page_zip_max_ins_size().
parent ea9a89da
......@@ -1949,17 +1949,8 @@ btr_page_split_and_insert(
#endif /* UNIV_ZIP_DEBUG */
if (UNIV_LIKELY(rec != NULL)) {
/* Insert fit on the page: update the free bits for the
left and right pages in the same mtr */
ibuf_update_free_bits_for_two_pages_low(cursor->index,
left_block,
right_block, mtr);
/* fprintf(stderr, "Split and insert done %lu %lu\n",
page_get_page_no(buf_block_get_frame(left_block)),
page_get_page_no(buf_block_get_frame(right_block))); */
mem_heap_free(heap);
return(rec);
goto func_exit;
}
/* 8. If insert did not fit, try page reorganization */
......@@ -1993,11 +1984,17 @@ btr_page_split_and_insert(
goto func_start;
}
func_exit:
/* Insert fit on the page: update the free bits for the
left and right pages in the same mtr */
ibuf_update_free_bits_for_two_pages_low(cursor->index, left_block,
right_block, mtr);
if (!dict_index_is_clust(cursor->index) && page_is_leaf(page)) {
ibuf_update_free_bits_for_two_pages_low(
cursor->index,
buf_block_get_zip_size(left_block),
left_block, right_block, mtr);
}
#if 0
fprintf(stderr, "Split and insert done %lu %lu\n",
buf_block_get_page_no(left_block),
......@@ -2510,9 +2507,12 @@ btr_compress(
mem_heap_free(heap);
/* We have added new records to merge_page: update its free bits */
ibuf_update_free_bits_if_full(index, merge_block,
UNIV_PAGE_SIZE, ULINT_UNDEFINED);
if (!dict_index_is_clust(index) && page_is_leaf(merge_page)) {
/* We have added new records to merge_page:
update its free bits */
ibuf_update_free_bits_if_full(index, zip_size, merge_block,
UNIV_PAGE_SIZE, ULINT_UNDEFINED);
}
ut_ad(page_validate(merge_page, index));
......
......@@ -1058,6 +1058,7 @@ btr_cur_optimistic_insert(
ulint level;
ibool reorg;
ibool inherit;
ulint zip_size;
ulint rec_size;
mem_heap_t* heap = NULL;
ulint err;
......@@ -1067,6 +1068,7 @@ btr_cur_optimistic_insert(
block = btr_cur_get_block(cursor);
page = buf_block_get_frame(block);
index = cursor->index;
zip_size = buf_block_get_zip_size(block);
if (!dtuple_check_typed_no_assert(entry)) {
fputs("InnoDB: Error in a tuple to insert into ", stderr);
......@@ -1086,8 +1088,7 @@ btr_cur_optimistic_insert(
/* Calculate the record size when entry is converted to a record */
rec_size = rec_get_converted_size(index, entry, ext, n_ext);
if (page_zip_rec_needs_ext(rec_size, page_is_comp(page),
buf_block_get_zip_size(block))) {
if (page_zip_rec_needs_ext(rec_size, page_is_comp(page), zip_size)) {
/* The record is so big that we have to store some fields
externally on separate database pages */
......@@ -1149,6 +1150,19 @@ btr_cur_optimistic_insert(
/* Now, try the insert */
if (zip_size
&& !dict_index_is_clust(index) && UNIV_LIKELY(0 == level)) {
/* Compute the reduced max_size for the insert buffer
before inserting the record. */
lint zip_max_ins = page_zip_max_ins_size(
buf_block_get_page_zip(block), FALSE);
if (UNIV_LIKELY(max_size > (ulint) zip_max_ins)) {
max_size = (ulint) zip_max_ins;
}
}
*rec = page_cur_tuple_insert(page_cursor, entry, index,
ext, n_ext, mtr);
if (UNIV_UNLIKELY(!(*rec))) {
......@@ -1159,7 +1173,9 @@ btr_cur_optimistic_insert(
goto fail;
}
ut_ad(page_get_max_insert_size(page, 1) == max_size);
ut_ad(page_get_max_insert_size(page, 1) <= max_size);
ut_ad(zip_size
|| page_get_max_insert_size(page, 1) == max_size);
reorg = TRUE;
......@@ -1207,9 +1223,10 @@ btr_cur_optimistic_insert(
buf_block_get_page_no(block), max_size,
rec_size + PAGE_DIR_SLOT_SIZE, index->type);
#endif
if (!dict_index_is_clust(index)) {
if (!dict_index_is_clust(index) && UNIV_LIKELY(0 == level)) {
/* We have added a record to page: update its free bits */
ibuf_update_free_bits_if_full(cursor->index, block, max_size,
ibuf_update_free_bits_if_full(cursor->index, zip_size,
block, max_size,
rec_size + PAGE_DIR_SLOT_SIZE);
}
......@@ -2671,9 +2688,8 @@ btr_cur_optimistic_delete(
if (no_compress_needed) {
page_t* page = buf_block_get_frame(block);
#ifdef UNIV_ZIP_DEBUG
page_zip_des_t* page_zip= buf_block_get_page_zip(block);
#endif /* UNIV_ZIP_DEBUG */
ulint zip_size= buf_block_get_zip_size(block);
lock_update_delete(block, rec);
......@@ -2681,6 +2697,14 @@ btr_cur_optimistic_delete(
max_ins_size = page_get_max_insert_size_after_reorganize(
page, 1);
if (zip_size) {
lint zip_max_ins = page_zip_max_ins_size(
page_zip, FALSE/* not clustered */);
if (UNIV_LIKELY(max_ins_size > (ulint) zip_max_ins)) {
max_ins_size = (ulint) zip_max_ins;
}
}
#ifdef UNIV_ZIP_DEBUG
ut_a(!page_zip || page_zip_validate(page_zip, page));
#endif /* UNIV_ZIP_DEBUG */
......@@ -2690,8 +2714,8 @@ btr_cur_optimistic_delete(
ut_a(!page_zip || page_zip_validate(page_zip, page));
#endif /* UNIV_ZIP_DEBUG */
ibuf_update_free_bits_low(cursor->index, block, max_ins_size,
mtr);
ibuf_update_free_bits_low(cursor->index, zip_size,
block, max_ins_size, mtr);
}
if (UNIV_LIKELY_NULL(heap)) {
......
......@@ -1873,9 +1873,11 @@ buf_page_get_gen(
mutex_exit(&buf_pool->mutex);
/* Decompress the page while not holding
buf_pool->mutex or block->mutex. */
/* Decompress the page and apply buffered operations
while not holding buf_pool->mutex or block->mutex. */
buf_zip_decompress(block, srv_use_checksums);
ibuf_merge_or_delete_for_page(block, space, offset,
zip_size, TRUE);
/* Unfix and unlatch the block. */
mutex_enter(&buf_pool->mutex);
......@@ -2807,7 +2809,12 @@ buf_page_io_complete(
mutex_enter(buf_page_get_mutex(bpage));
#ifdef UNIV_IBUF_DEBUG
ut_a(ibuf_count_get(bpage->space, bpage->offset) == 0);
if (io_type == BUF_IO_WRITE || uncompressed) {
/* For BUF_IO_READ of compressed-only blocks, the
buffered operations will be merged by buf_page_get_gen()
after the block has been uncompressed. */
ut_a(ibuf_count_get(bpage->space, bpage->offset) == 0);
}
#endif
/* Because this thread which does the unlocking is not the same that
did the locking, we use a pass value != 0 in unlock, which simply
......
......@@ -792,6 +792,8 @@ void
ibuf_set_free_bits_low(
/*===================*/
ulint type, /* in: index type */
ulint zip_size,/* in: compressed page size in bytes;
0 for uncompressed pages */
buf_block_t* block, /* in: index page; free bits are set if
the index is non-clustered and page
level is 0 */
......@@ -801,7 +803,6 @@ ibuf_set_free_bits_low(
page_t* bitmap_page;
ulint space;
ulint page_no;
ulint zip_size;
if (type & DICT_CLUSTERED) {
......@@ -815,17 +816,16 @@ ibuf_set_free_bits_low(
space = buf_block_get_space(block);
page_no = buf_block_get_page_no(block);
zip_size = buf_block_get_zip_size(block);
bitmap_page = ibuf_bitmap_get_map_page(space, page_no, zip_size, mtr);
#ifdef UNIV_IBUF_DEBUG
# if 0
fprintf(stderr,
"Setting page no %lu free bits to %lu should be %lu\n",
page_get_page_no(page), val,
ibuf_index_page_calc_free(buf_block_get_frame(block)));
"Setting space %lu page %lu free bits to %lu should be %lu\n",
space, page_no, val,
ibuf_index_page_calc_free(zip_size, block));
# endif
ut_a(val <= ibuf_index_page_calc_free(buf_block_get_frame(block)));
ut_a(val <= ibuf_index_page_calc_free(zip_size, block));
#endif /* UNIV_IBUF_DEBUG */
ibuf_bitmap_page_set_bits(bitmap_page, page_no, zip_size,
IBUF_BITMAP_FREE, val, mtr);
......@@ -898,10 +898,10 @@ ibuf_set_free_bits(
# if 0
fprintf(stderr, "Setting page no %lu free bits to %lu should be %lu\n",
page_get_page_no(page), val,
ibuf_index_page_calc_free(page));
ibuf_index_page_calc_free(zip_size, block));
# endif
ut_a(val <= ibuf_index_page_calc_free(page));
ut_a(val <= ibuf_index_page_calc_free(zip_size, block));
#endif
ibuf_bitmap_page_set_bits(bitmap_page, page_no, zip_size,
IBUF_BITMAP_FREE, val, &mtr);
......@@ -934,6 +934,8 @@ void
ibuf_update_free_bits_low(
/*======================*/
dict_index_t* index, /* in: index */
ulint zip_size, /* in: compressed page size in bytes;
0 for uncompressed pages */
buf_block_t* block, /* in: index page */
ulint max_ins_size, /* in: value of maximum insert size
with reorganize before the latest
......@@ -943,12 +945,13 @@ ibuf_update_free_bits_low(
ulint before;
ulint after;
before = ibuf_index_page_calc_free_bits(max_ins_size);
before = ibuf_index_page_calc_free_bits(zip_size, max_ins_size);
after = ibuf_index_page_calc_free(buf_block_get_frame(block));
after = ibuf_index_page_calc_free(zip_size, block);
if (before != after) {
ibuf_set_free_bits_low(index->type, block, after, mtr);
ibuf_set_free_bits_low(index->type, zip_size,
block, after, mtr);
}
}
......@@ -961,6 +964,8 @@ void
ibuf_update_free_bits_for_two_pages_low(
/*====================================*/
dict_index_t* index, /* in: index */
ulint zip_size,/* in: compressed page size in bytes;
0 for uncompressed pages */
buf_block_t* block1, /* in: index page */
buf_block_t* block2, /* in: index page */
mtr_t* mtr) /* in: mtr */
......@@ -973,13 +978,13 @@ ibuf_update_free_bits_for_two_pages_low(
mutex_enter(&ibuf_bitmap_mutex);
state = ibuf_index_page_calc_free(buf_block_get_frame(block1));
state = ibuf_index_page_calc_free(zip_size, block1);
ibuf_set_free_bits_low(index->type, block1, state, mtr);
ibuf_set_free_bits_low(index->type, zip_size, block1, state, mtr);
state = ibuf_index_page_calc_free(buf_block_get_frame(block2));
state = ibuf_index_page_calc_free(zip_size, block2);
ibuf_set_free_bits_low(index->type, block2, state, mtr);
ibuf_set_free_bits_low(index->type, zip_size, block2, state, mtr);
mutex_exit(&ibuf_bitmap_mutex);
}
......@@ -1303,6 +1308,9 @@ ibuf_build_entry_from_ibuf_rec(
ibuf_dummy_index_add_col(index, dfield_get_type(field), len);
}
/* Fix an ut_ad() failure in page_zip_write_rec(). */
ut_d(dict_table_add_system_columns(index->table, index->table->heap));
*pindex = index;
return(tuple);
}
......@@ -2691,7 +2699,7 @@ ibuf_insert_low(
IBUF_BITMAP_FREE, &bitmap_mtr);
if (buffered + entry_size + page_dir_calc_reserved_space(1)
> ibuf_index_page_calc_free_from_bits(bits)) {
> ibuf_index_page_calc_free_from_bits(zip_size, bits)) {
mtr_commit(&bitmap_mtr);
/* It may not fit */
......@@ -2827,7 +2835,6 @@ ibuf_insert(
ut_ad(dtuple_check_typed(entry));
ut_a(!dict_index_is_clust(index));
ut_a(!dict_table_zip_size(index->table));
if (rec_get_converted_size(index, entry, NULL, 0)
>= (page_get_free_space_of_empty(dict_table_is_comp(index->table))
......@@ -3374,11 +3381,11 @@ ibuf_merge_or_delete_for_page(
ibuf_bitmap_page_set_bits(bitmap_page, page_no, zip_size,
IBUF_BITMAP_BUFFERED, FALSE, &mtr);
if (block) {
page_t* page = block->frame;
ulint old_bits = ibuf_bitmap_page_get_bits(
bitmap_page, page_no, zip_size,
IBUF_BITMAP_FREE, &mtr);
ulint new_bits = ibuf_index_page_calc_free(page);
ulint new_bits = ibuf_index_page_calc_free(
zip_size, block);
#if 0 /* defined UNIV_IBUF_DEBUG */
fprintf(stderr, "Old bits %lu new bits %lu"
" max size %lu\n",
......
......@@ -77,6 +77,8 @@ void
ibuf_update_free_bits_if_full(
/*==========================*/
dict_index_t* index, /* in: index */
ulint zip_size,/* in: compressed page size in bytes;
0 for uncompressed pages */
buf_block_t* block, /* in: index page to which we have added new
records; the free bits are updated if the
index is non-clustered and non-unique and
......@@ -97,6 +99,8 @@ void
ibuf_update_free_bits_low(
/*======================*/
dict_index_t* index, /* in: index */
ulint zip_size, /* in: compressed page size in bytes;
0 for uncompressed pages */
buf_block_t* block, /* in: index page */
ulint max_ins_size, /* in: value of maximum insert size
with reorganize before the latest
......@@ -111,6 +115,8 @@ void
ibuf_update_free_bits_for_two_pages_low(
/*====================================*/
dict_index_t* index, /* in: index */
ulint zip_size,/* in: compressed page size in bytes;
0 for uncompressed pages */
buf_block_t* block1, /* in: index page */
buf_block_t* block2, /* in: index page */
mtr_t* mtr); /* in: mtr */
......
......@@ -8,6 +8,7 @@ Created 7/19/1997 Heikki Tuuri
#include "buf0lru.h"
#include "page0page.h"
#include "page0zip.h"
extern ulint ibuf_flush_count;
......@@ -81,7 +82,6 @@ ibuf_should_try(
decide */
{
if (!dict_index_is_clust(index)
&& !dict_table_zip_size(index->table)
&& (ignore_sec_unique || !(index->type & DICT_UNIQUE))) {
ibuf_flush_count++;
......@@ -126,12 +126,23 @@ ulint
ibuf_index_page_calc_free_bits(
/*===========================*/
/* out: value for ibuf bitmap bits */
ulint zip_size, /* in: compressed page size in bytes;
0 for uncompressed pages */
ulint max_ins_size) /* in: maximum insert size after reorganize
for the page */
{
ulint n;
ut_ad(ut_is_2pow(zip_size));
ut_ad(!zip_size || zip_size > IBUF_PAGE_SIZE_PER_FREE_SPACE);
ut_ad(zip_size <= UNIV_PAGE_SIZE);
n = max_ins_size / (UNIV_PAGE_SIZE / IBUF_PAGE_SIZE_PER_FREE_SPACE);
if (zip_size) {
n = max_ins_size
/ (zip_size / IBUF_PAGE_SIZE_PER_FREE_SPACE);
} else {
n = max_ins_size
/ (UNIV_PAGE_SIZE / IBUF_PAGE_SIZE_PER_FREE_SPACE);
}
if (n == 3) {
n = 2;
......@@ -152,15 +163,28 @@ ibuf_index_page_calc_free_from_bits(
/*================================*/
/* out: maximum insert size after reorganize for the
page */
ulint zip_size,/* in: compressed page size in bytes;
0 for uncompressed pages */
ulint bits) /* in: value for ibuf bitmap bits */
{
ut_ad(bits < 4);
ut_ad(ut_is_2pow(zip_size));
ut_ad(!zip_size || zip_size > IBUF_PAGE_SIZE_PER_FREE_SPACE);
ut_ad(zip_size <= UNIV_PAGE_SIZE);
if (zip_size) {
if (bits == 3) {
return(4 * zip_size / IBUF_PAGE_SIZE_PER_FREE_SPACE);
}
return(bits * zip_size / IBUF_PAGE_SIZE_PER_FREE_SPACE);
}
if (bits == 3) {
return(4 * UNIV_PAGE_SIZE / IBUF_PAGE_SIZE_PER_FREE_SPACE);
}
return(bits * UNIV_PAGE_SIZE / IBUF_PAGE_SIZE_PER_FREE_SPACE);
return(bits * (UNIV_PAGE_SIZE / IBUF_PAGE_SIZE_PER_FREE_SPACE));
}
/*************************************************************************
......@@ -169,11 +193,31 @@ UNIV_INLINE
ulint
ibuf_index_page_calc_free(
/*======================*/
/* out: value for ibuf bitmap bits */
page_t* page) /* in: non-unique secondary index page */
/* out: value for ibuf bitmap bits */
ulint zip_size,/* in: compressed page size in bytes;
0 for uncompressed pages */
buf_block_t* block) /* in: buffer block */
{
return(ibuf_index_page_calc_free_bits(
page_get_max_insert_size_after_reorganize(page, 1)));
ulint max_ins_size;
ut_ad(zip_size == buf_block_get_zip_size(block));
max_ins_size = page_get_max_insert_size_after_reorganize(
buf_block_get_frame(block), 1);
if (!zip_size) {
return(ibuf_index_page_calc_free_bits(0, max_ins_size));
} else {
page_zip_des_t* page_zip = buf_block_get_page_zip(block);
lint zip_max_ins = page_zip_max_ins_size(
page_zip, FALSE/* not clustered */);
if (UNIV_LIKELY(max_ins_size > (ulint) zip_max_ins)) {
max_ins_size = (ulint) zip_max_ins;
}
return(ibuf_index_page_calc_free_bits(zip_size, max_ins_size));
}
}
/****************************************************************************
......@@ -186,6 +230,8 @@ void
ibuf_update_free_bits_if_full(
/*==========================*/
dict_index_t* index, /* in: index */
ulint zip_size,/* in: compressed page size in bytes;
0 for uncompressed pages */
buf_block_t* block, /* in: index page to which we have added new
records; the free bits are updated if the
index is non-clustered and non-unique and
......@@ -201,20 +247,19 @@ ibuf_update_free_bits_if_full(
ulint before;
ulint after;
before = ibuf_index_page_calc_free_bits(max_ins_size);
before = ibuf_index_page_calc_free_bits(zip_size, max_ins_size);
if (max_ins_size >= increase) {
#if ULINT32_UNDEFINED <= UNIV_PAGE_SIZE
# error "ULINT32_UNDEFINED <= UNIV_PAGE_SIZE"
#endif
after = ibuf_index_page_calc_free_bits(max_ins_size
after = ibuf_index_page_calc_free_bits(zip_size, max_ins_size
- increase);
#ifdef UNIV_IBUF_DEBUG
ut_a(after <= ibuf_index_page_calc_free(buf_block_get_frame(
block)));
ut_a(after <= ibuf_index_page_calc_free(zip_size, block));
#endif
} else {
after = ibuf_index_page_calc_free(buf_block_get_frame(block));
after = ibuf_index_page_calc_free(zip_size, block);
}
if (after == 0) {
......
......@@ -134,8 +134,19 @@ page_zip_validate(
#endif /* UNIV_ZIP_DEBUG */
/**************************************************************************
Determine if enough space is available for a page_zip_write_rec() call
in the modification log. */
Determine how big record can be inserted without recompressing the page. */
UNIV_INLINE
lint
page_zip_max_ins_size(
/*==================*/
/* out: TRUE if page_zip_write_rec()
will succeed */
const page_zip_des_t* page_zip,/* in: compressed page */
ibool is_clust)/* in: TRUE if clustered index */
__attribute__((warn_unused_result, nonnull, pure));
/**************************************************************************
Determine if enough space is available in the modification log. */
UNIV_INLINE
ibool
page_zip_available(
......
......@@ -237,6 +237,37 @@ page_zip_get_trailer_len(
+ page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE);
}
/**************************************************************************
Determine how big record can be inserted without recompressing the page. */
UNIV_INLINE
lint
page_zip_max_ins_size(
/*==================*/
/* out: TRUE if page_zip_write_rec()
will succeed */
const page_zip_des_t* page_zip,/* in: compressed page */
ibool is_clust)/* in: TRUE if clustered index */
{
ulint uncompressed_size;
ulint trailer_len;
trailer_len = page_zip_get_trailer_len(page_zip, is_clust,
&uncompressed_size);
/* When a record is created, a pointer may be added to
the dense directory.
Likewise, space for the columns that will not be
compressed will be allocated from the page trailer.
Also the BLOB pointers will be allocated from there, but
we may as well count them in the length of the record. */
trailer_len += uncompressed_size;
return((lint) page_zip_get_size(page_zip)
- trailer_len - page_zip->m_end
- (REC_N_NEW_EXTRA_BYTES - 2));
}
/**************************************************************************
Determine if enough space is available in the modification log. */
UNIV_INLINE
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment