Commit 361fe1a5 authored by marko's avatar marko

branches/zip: Allow dirty compressed-only blocks to exist in the buffer pool

and to be flushed to disk.

buf_LRU_free_block(): Enable the freeing of uncompressed pages of
compressed tablespaces.

trx_doublewrite->buf_block_arr[]: Change the type from buf_block_t*
to buf_page_t*.

buf_flush_ready_for_flush(): Add debug assertion.

buf_flush_buffered_writes(), buf_flush_try_page(): Support blocks of type
BUF_BLOCK_ZIP_DIRTY.

buf_flush_post_to_doublewrite_buf(), buf_flush_write_block_low():
Change the type of the parameter from buf_block_t* to buf_page_t*.

buf_flush_init_for_writing(): Allow page to be NULL if page_zip_ is non-NULL.
parent e7b0c68a
......@@ -200,6 +200,8 @@ buf_flush_ready_for_flush(
if (bpage->oldest_modification != 0
&& buf_page_get_io_fix(bpage) == BUF_IO_NONE) {
ut_ad(bpage->in_flush_list);
if (flush_type != BUF_FLUSH_LRU) {
return(TRUE);
......@@ -307,7 +309,6 @@ void
buf_flush_buffered_writes(void)
/*===========================*/
{
buf_block_t* block;
byte* write_buf;
ulint len;
ulint len2;
......@@ -334,10 +335,12 @@ buf_flush_buffered_writes(void)
for (i = 0; i < trx_doublewrite->first_free; i++) {
block = trx_doublewrite->buf_block_arr[i];
ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
const buf_block_t* block;
if (UNIV_LIKELY_NULL(block->page.zip.data)) {
block = (buf_block_t*) trx_doublewrite->buf_block_arr[i];
if (buf_block_get_state(block) != BUF_BLOCK_FILE_PAGE
|| block->page.zip.data) {
/* No simple validate for compressed pages exists. */
continue;
}
......@@ -401,8 +404,12 @@ corrupted_page:
for (len2 = 0; len2 + UNIV_PAGE_SIZE <= len;
len2 += UNIV_PAGE_SIZE, i++) {
block = trx_doublewrite->buf_block_arr[i];
const buf_block_t* block = (buf_block_t*)
trx_doublewrite->buf_block_arr[i];
if (UNIV_LIKELY(!block->page.zip.data)
&& UNIV_LIKELY(buf_block_get_state(block)
== BUF_BLOCK_FILE_PAGE)
&& UNIV_UNLIKELY
(memcmp(write_buf + len2 + (FIL_PAGE_LSN + 4),
write_buf + len2
......@@ -434,8 +441,12 @@ corrupted_page:
for (len2 = 0; len2 + UNIV_PAGE_SIZE <= len;
len2 += UNIV_PAGE_SIZE, i++) {
block = trx_doublewrite->buf_block_arr[i];
const buf_block_t* block = (buf_block_t*)
trx_doublewrite->buf_block_arr[i];
if (UNIV_LIKELY(!block->page.zip.data)
&& UNIV_LIKELY(buf_block_get_state(block)
== BUF_BLOCK_FILE_PAGE)
&& UNIV_UNLIKELY
(memcmp(write_buf + len2 + (FIL_PAGE_LSN + 4),
write_buf + len2
......@@ -461,23 +472,28 @@ flush:
blocks. Next do the writes to the intended positions. */
for (i = 0; i < trx_doublewrite->first_free; i++) {
block = trx_doublewrite->buf_block_arr[i];
ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
if (UNIV_UNLIKELY(buf_block_get_zip_size(block))) {
const buf_block_t* block = (buf_block_t*)
trx_doublewrite->buf_block_arr[i];
ut_a(buf_page_in_file(&block->page));
if (UNIV_LIKELY_NULL(block->page.zip.data)) {
fil_io(OS_FILE_WRITE | OS_AIO_SIMULATED_WAKE_LATER,
FALSE, buf_block_get_space(block),
buf_block_get_zip_size(block),
buf_block_get_page_no(block), 0,
buf_block_get_zip_size(block),
FALSE, buf_page_get_space(&block->page),
buf_page_get_zip_size(&block->page),
buf_page_get_page_no(&block->page), 0,
buf_page_get_zip_size(&block->page),
(void*)block->page.zip.data,
(void*)block);
continue;
} else if (UNIV_UNLIKELY
(memcmp(block->frame + (FIL_PAGE_LSN + 4),
block->frame
+ (UNIV_PAGE_SIZE
- FIL_PAGE_END_LSN_OLD_CHKSUM + 4),
4))) {
}
ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
if (UNIV_UNLIKELY(memcmp(block->frame + (FIL_PAGE_LSN + 4),
block->frame
+ (UNIV_PAGE_SIZE
- FIL_PAGE_END_LSN_OLD_CHKSUM + 4),
4))) {
ut_print_timestamp(stderr);
fprintf(stderr,
" InnoDB: ERROR: The page to be written"
......@@ -528,13 +544,13 @@ static
void
buf_flush_post_to_doublewrite_buf(
/*==============================*/
buf_block_t* block) /* in: buffer block to write */
buf_page_t* bpage) /* in: buffer block to write */
{
ulint zip_size;
try_again:
mutex_enter(&(trx_doublewrite->mutex));
ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
ut_a(buf_page_in_file(bpage));
if (trx_doublewrite->first_free
>= 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
......@@ -545,23 +561,25 @@ try_again:
goto try_again;
}
zip_size = buf_block_get_zip_size(block);
zip_size = buf_page_get_zip_size(bpage);
if (UNIV_UNLIKELY(zip_size)) {
/* Copy the compressed page and clear the rest. */
memcpy(trx_doublewrite->write_buf
+ UNIV_PAGE_SIZE * trx_doublewrite->first_free,
block->page.zip.data, zip_size);
bpage->zip.data, zip_size);
memset(trx_doublewrite->write_buf
+ UNIV_PAGE_SIZE * trx_doublewrite->first_free
+ zip_size, 0, UNIV_PAGE_SIZE - zip_size);
} else {
ut_a(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
memcpy(trx_doublewrite->write_buf
+ UNIV_PAGE_SIZE * trx_doublewrite->first_free,
block->frame, UNIV_PAGE_SIZE);
((buf_block_t*) bpage)->frame, UNIV_PAGE_SIZE);
}
trx_doublewrite->buf_block_arr[trx_doublewrite->first_free] = block;
trx_doublewrite->buf_block_arr[trx_doublewrite->first_free] = bpage;
trx_doublewrite->first_free++;
......@@ -583,7 +601,8 @@ Initializes a page for writing to the tablespace. */
void
buf_flush_init_for_writing(
/*=======================*/
byte* page, /* in/out: page */
byte* page, /* in/out: page, may be NULL
if page_zip_ is non-NULL */
void* page_zip_, /* in/out: compressed page, or NULL */
ib_uint64_t newest_lsn) /* in: newest modification lsn
to the page */
......@@ -595,14 +614,18 @@ buf_flush_init_for_writing(
ut_ad(ut_is_2pow(zip_size));
ut_ad(zip_size <= UNIV_PAGE_SIZE);
switch (UNIV_EXPECT(fil_page_get_type(page), FIL_PAGE_INDEX)) {
switch (UNIV_EXPECT(fil_page_get_type(page
? page : page_zip->data),
FIL_PAGE_INDEX)) {
case FIL_PAGE_TYPE_ALLOCATED:
case FIL_PAGE_INODE:
case FIL_PAGE_IBUF_BITMAP:
case FIL_PAGE_TYPE_FSP_HDR:
case FIL_PAGE_TYPE_XDES:
/* These are essentially uncompressed pages. */
memcpy(page_zip->data, page, zip_size);
if (page) {
memcpy(page_zip->data, page, zip_size);
}
/* fall through */
case FIL_PAGE_TYPE_ZBLOB:
case FIL_PAGE_INDEX:
......@@ -621,6 +644,8 @@ buf_flush_init_for_writing(
ut_error;
}
ut_ad(page);
/* Write the newest modification lsn to the page header and trailer */
mach_write_ull(page + FIL_PAGE_LSN, newest_lsn);
......@@ -653,18 +678,20 @@ static
void
buf_flush_write_block_low(
/*======================*/
buf_block_t* block) /* in: buffer block to write */
buf_page_t* bpage) /* in: buffer block to write */
{
ulint zip_size = buf_page_get_zip_size(bpage);
page_t* frame = NULL;
#ifdef UNIV_LOG_DEBUG
static ibool univ_log_debug_warned;
#endif /* UNIV_LOG_DEBUG */
ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
ut_ad(buf_page_in_file(bpage));
#ifdef UNIV_IBUF_DEBUG
ut_a(ibuf_count_get(buf_block_get_space(block),
buf_block_get_page_no(block)) == 0);
ut_a(ibuf_count_get(bpage->space, bpage->offset) == 0);
#endif
ut_ad(block->page.newest_modification != 0);
ut_ad(bpage->newest_modification != 0);
#ifdef UNIV_LOG_DEBUG
if (!univ_log_debug_warned) {
......@@ -676,22 +703,42 @@ buf_flush_write_block_low(
}
#else
/* Force the log to the disk before writing the modified block */
log_write_up_to(block->page.newest_modification,
LOG_WAIT_ALL_GROUPS, TRUE);
log_write_up_to(bpage->newest_modification, LOG_WAIT_ALL_GROUPS, TRUE);
#endif
buf_flush_init_for_writing(block->frame,
buf_block_get_page_zip(block),
block->page.newest_modification);
if (!srv_use_doublewrite_buf || !trx_doublewrite) {
ulint zip_size = buf_block_get_zip_size(block);
switch (buf_page_get_state(bpage)) {
case BUF_BLOCK_ZIP_FREE:
case BUF_BLOCK_ZIP_PAGE: /* The page should be dirty. */
case BUF_BLOCK_NOT_USED:
case BUF_BLOCK_READY_FOR_USE:
case BUF_BLOCK_MEMORY:
case BUF_BLOCK_REMOVE_HASH:
ut_error;
break;
case BUF_BLOCK_ZIP_DIRTY:
if (UNIV_LIKELY(srv_use_checksums)) {
ut_a(mach_read_from_4(bpage->zip.data
+ FIL_PAGE_SPACE_OR_CHKSUM)
== page_zip_calc_checksum(bpage->zip.data,
zip_size));
}
break;
case BUF_BLOCK_FILE_PAGE:
frame = ((buf_block_t*) bpage)->frame;
break;
}
buf_flush_init_for_writing(frame,
bpage->zip.data ? &bpage->zip : NULL,
bpage->newest_modification);
if (!srv_use_doublewrite_buf || !trx_doublewrite) {
fil_io(OS_FILE_WRITE | OS_AIO_SIMULATED_WAKE_LATER,
FALSE, buf_block_get_space(block), zip_size,
buf_block_get_page_no(block), 0,
FALSE, buf_page_get_space(bpage), zip_size,
buf_page_get_page_no(bpage), 0,
zip_size ? zip_size : UNIV_PAGE_SIZE,
(void*)block->frame, (void*)block);
frame, bpage);
} else {
buf_flush_post_to_doublewrite_buf(block);
buf_flush_post_to_doublewrite_buf(bpage);
}
}
......@@ -711,7 +758,8 @@ buf_flush_try_page(
enum buf_flush flush_type) /* in: BUF_FLUSH_LRU, BUF_FLUSH_LIST,
or BUF_FLUSH_SINGLE_PAGE */
{
buf_block_t* block;
buf_page_t* bpage;
mutex_t* block_mutex;
ibool locked;
ut_ad(flush_type == BUF_FLUSH_LRU || flush_type == BUF_FLUSH_LIST
......@@ -719,23 +767,29 @@ buf_flush_try_page(
mutex_enter(&(buf_pool->mutex));
block = (buf_block_t*) buf_page_hash_get(space, offset);
bpage = buf_page_hash_get(space, offset);
if (!block) {
if (!bpage) {
mutex_exit(&(buf_pool->mutex));
return(0);
}
ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE); /* TODO */
ut_a(buf_page_in_file(bpage));
block_mutex = buf_page_get_mutex(bpage);
mutex_enter(&block->mutex);
mutex_enter(block_mutex);
if (flush_type == BUF_FLUSH_LIST
&& buf_flush_ready_for_flush(&block->page, flush_type)) {
if (!buf_flush_ready_for_flush(bpage, flush_type)) {
mutex_exit(block_mutex);
mutex_exit(&buf_pool->mutex);
return(0);
}
buf_block_set_io_fix(block, BUF_IO_WRITE);
switch (flush_type) {
case BUF_FLUSH_LIST:
buf_page_set_io_fix(bpage, BUF_IO_WRITE);
buf_page_set_flush_type(&block->page, flush_type);
buf_page_set_flush_type(bpage, flush_type);
if (buf_pool->n_flush[flush_type] == 0) {
......@@ -744,43 +798,32 @@ buf_flush_try_page(
buf_pool->n_flush[flush_type]++;
locked = FALSE;
/* If the simulated aio thread is not running, we must
not wait for any latch, as we may end up in a deadlock:
if buf_fix_count == 0, then we know we need not wait */
if (block->page.buf_fix_count == 0) {
rw_lock_s_lock_gen(&(block->lock), BUF_IO_WRITE);
locked = TRUE;
locked = bpage->buf_fix_count == 0;
if (locked
&& buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE) {
rw_lock_s_lock_gen(&((buf_block_t*) bpage)->lock,
BUF_IO_WRITE);
}
mutex_exit(&block->mutex);
mutex_exit(block_mutex);
mutex_exit(&(buf_pool->mutex));
if (!locked) {
buf_flush_buffered_writes();
rw_lock_s_lock_gen(&(block->lock), BUF_IO_WRITE);
if (buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE) {
rw_lock_s_lock_gen(&((buf_block_t*) bpage)
->lock, BUF_IO_WRITE);
}
}
#ifdef UNIV_DEBUG
if (buf_debug_prints) {
fprintf(stderr,
"Flushing page space %lu, page no %lu \n",
(ulong) buf_block_get_space(block),
(ulong) buf_block_get_page_no(block));
}
#endif /* UNIV_DEBUG */
buf_flush_write_block_low(block);
return(1);
} else if (flush_type == BUF_FLUSH_LRU
&& buf_flush_ready_for_flush(&block->page, flush_type)) {
break;
case BUF_FLUSH_LRU:
/* VERY IMPORTANT:
Because any thread may call the LRU flush, even when owning
locks on pages, to avoid deadlocks, we must make sure that the
......@@ -789,9 +832,9 @@ buf_flush_try_page(
the page not to be bufferfixed (in function
..._ready_for_flush). */
buf_block_set_io_fix(block, BUF_IO_WRITE);
buf_page_set_io_fix(bpage, BUF_IO_WRITE);
buf_page_set_flush_type(&block->page, flush_type);
buf_page_set_flush_type(bpage, flush_type);
if (buf_pool->n_flush[flush_type] == 0) {
......@@ -800,25 +843,23 @@ buf_flush_try_page(
buf_pool->n_flush[flush_type]++;
rw_lock_s_lock_gen(&(block->lock), BUF_IO_WRITE);
if (buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE) {
rw_lock_s_lock_gen(&((buf_block_t*) bpage)->lock,
BUF_IO_WRITE);
}
/* Note that the s-latch is acquired before releasing the
buf_pool mutex: this ensures that the latch is acquired
immediately. */
mutex_exit(&block->mutex);
mutex_exit(block_mutex);
mutex_exit(&(buf_pool->mutex));
break;
buf_flush_write_block_low(block);
return(1);
} else if (flush_type == BUF_FLUSH_SINGLE_PAGE
&& buf_flush_ready_for_flush(&block->page, flush_type)) {
buf_block_set_io_fix(block, BUF_IO_WRITE);
case BUF_FLUSH_SINGLE_PAGE:
buf_page_set_io_fix(bpage, BUF_IO_WRITE);
buf_page_set_flush_type(&block->page, flush_type);
buf_page_set_flush_type(bpage, flush_type);
if (buf_pool->n_flush[flush_type] == 0) {
......@@ -827,30 +868,29 @@ buf_flush_try_page(
buf_pool->n_flush[flush_type]++;
mutex_exit(&block->mutex);
mutex_exit(block_mutex);
mutex_exit(&(buf_pool->mutex));
rw_lock_s_lock_gen(&(block->lock), BUF_IO_WRITE);
#ifdef UNIV_DEBUG
if (buf_debug_prints) {
fprintf(stderr,
"Flushing single page space %lu,"
" page no %lu \n",
(ulong) buf_block_get_space(block),
(ulong) buf_block_get_page_no(block));
if (buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE) {
rw_lock_s_lock_gen(&((buf_block_t*) bpage)->lock,
BUF_IO_WRITE);
}
#endif /* UNIV_DEBUG */
buf_flush_write_block_low(block);
break;
return(1);
default:
ut_error;
}
mutex_exit(&block->mutex);
mutex_exit(&(buf_pool->mutex));
#ifdef UNIV_DEBUG
if (buf_debug_prints) {
fprintf(stderr,
"Flushing %u space %u page %u\n",
flush_type, bpage->space, bpage->offset);
}
#endif /* UNIV_DEBUG */
buf_flush_write_block_low(bpage);
return(0);
return(1);
}
/***************************************************************
......
......@@ -929,7 +929,7 @@ buf_LRU_free_block(
return(FALSE);
}
// b = buf_buddy_alloc(sizeof *b, FALSE); // TODO: enable this
b = buf_buddy_alloc(sizeof *b, FALSE);
if (!b) {
return(FALSE);
......
......@@ -48,7 +48,8 @@ Initializes a page for writing to the tablespace. */
void
buf_flush_init_for_writing(
/*=======================*/
byte* page, /* in/out: page */
byte* page, /* in/out: page, may be NULL
if page_zip_ is non-NULL */
void* page_zip_, /* in/out: compressed page, or NULL */
ib_uint64_t newest_lsn); /* in: newest modification lsn
to the page */
......
......@@ -408,7 +408,7 @@ struct trx_doublewrite_struct{
address divisible by UNIV_PAGE_SIZE
(which is required by Windows aio) */
byte* write_buf_unaligned; /* pointer to write_buf, but unaligned */
buf_block_t**
buf_page_t**
buf_block_arr; /* array to store pointers to the buffer
blocks which have been cached to write_buf */
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment