Commit 5ac6f321 authored by marko's avatar marko

branches/zip: buf_page_init_for_read(): Do not allocate an uncompressed page

frame when reading compressed pages, unless crash recovery is in progress.

buf_page_read_low(): Adapt for buf_page_init_for_read() returning
buf_page_t* instead of buf_block_t*.
parent 7315b062
...@@ -2325,7 +2325,7 @@ Sets the io_fix flag to BUF_IO_READ and sets a non-recursive exclusive lock ...@@ -2325,7 +2325,7 @@ Sets the io_fix flag to BUF_IO_READ and sets a non-recursive exclusive lock
on the buffer frame. The io-handler must take care that the flag is cleared on the buffer frame. The io-handler must take care that the flag is cleared
and the lock released later. */ and the lock released later. */
buf_block_t* buf_page_t*
buf_page_init_for_read( buf_page_init_for_read(
/*===================*/ /*===================*/
/* out: pointer to the block or NULL */ /* out: pointer to the block or NULL */
...@@ -2339,6 +2339,7 @@ buf_page_init_for_read( ...@@ -2339,6 +2339,7 @@ buf_page_init_for_read(
ulint offset) /* in: page number */ ulint offset) /* in: page number */
{ {
buf_block_t* block; buf_block_t* block;
buf_page_t* bpage;
mtr_t mtr; mtr_t mtr;
ut_ad(buf_pool); ut_ad(buf_pool);
...@@ -2363,21 +2364,47 @@ buf_page_init_for_read( ...@@ -2363,21 +2364,47 @@ buf_page_init_for_read(
ut_ad(mode == BUF_READ_ANY_PAGE); ut_ad(mode == BUF_READ_ANY_PAGE);
} }
block = buf_LRU_get_free_block(0); if (zip_size && UNIV_LIKELY(!recv_recovery_is_on())) {
void* data;
mutex_enter(&buf_pool->mutex);
ut_a(block); /* This must be allocated before bpage, in order to
avoid the invocation of buf_buddy_relocate_block()
on uninitialized data. */
data = buf_buddy_alloc(zip_size, TRUE);
mutex_enter(&(buf_pool->mutex)); bpage = buf_buddy_alloc(sizeof *bpage, TRUE);
mutex_enter(&block->mutex); page_zip_des_init(&bpage->zip);
page_zip_set_size(&bpage->zip, zip_size);
bpage->zip.data = data;
block = NULL;
mutex_enter(&buf_pool->zip_mutex);
} else {
block = buf_LRU_get_free_block(0);
ut_ad(block);
bpage = &block->page;
mutex_enter(&buf_pool->mutex);
mutex_enter(&block->mutex);
}
if (buf_page_hash_get(space, offset)) { if (buf_page_hash_get(space, offset)) {
/* The page is already in the buffer pool. */ /* The page is already in the buffer pool. */
err_exit: err_exit:
mutex_exit(&block->mutex); if (block) {
mutex_exit(&buf_pool->mutex); buf_LRU_block_free_non_file_page(block);
mutex_exit(&buf_pool->mutex);
mutex_exit(&block->mutex);
} else {
void* data = bpage->zip.data;
bpage->zip.data = NULL;
buf_block_free(block); mutex_exit(&buf_pool->zip_mutex);
buf_buddy_free(data, zip_size);
buf_buddy_free(bpage, sizeof *bpage);
mutex_exit(&buf_pool->mutex);
}
if (mode == BUF_READ_IBUF_PAGES_ONLY) { if (mode == BUF_READ_IBUF_PAGES_ONLY) {
...@@ -2396,51 +2423,87 @@ buf_page_init_for_read( ...@@ -2396,51 +2423,87 @@ buf_page_init_for_read(
goto err_exit; goto err_exit;
} }
ut_ad(block); if (block) {
buf_page_init(space, offset, (buf_block_t*) bpage);
buf_page_init(space, offset, block);
/* The block must be put to the LRU list, to the old blocks */ /* The block must be put to the LRU list, to the old blocks */
buf_LRU_add_block(bpage, TRUE/* to old blocks */);
buf_LRU_add_block(&block->page, TRUE/* to old blocks */); /* We set a pass-type x-lock on the frame because then
the same thread which called for the read operation
(and is running now at this point of code) can wait
for the read to complete by waiting for the x-lock on
the frame; if the x-lock were recursive, the same
thread would illegally get the x-lock before the page
read is completed. The x-lock is cleared by the
io-handler thread. */
buf_page_set_io_fix(&block->page, BUF_IO_READ); rw_lock_x_lock_gen(&((buf_block_t*) bpage)->lock, BUF_IO_READ);
buf_pool->n_pend_reads++; if (UNIV_UNLIKELY(zip_size)) {
void* data;
page_zip_set_size(&block->page.zip, zip_size);
mutex_exit(&block->mutex);
/* buf_pool->mutex may be released and
reacquired by buf_buddy_alloc(). Thus, we
must release block->mutex in order not to
break the latching order in the reacquisition
of buf_pool->mutex. We also must defer this
operation until after the block descriptor has
been added to buf_pool->LRU and
buf_pool->page_hash. */
data = buf_buddy_alloc(zip_size, TRUE);
mutex_enter(&block->mutex);
block->page.zip.data = data;
}
/* We set a pass-type x-lock on the frame because then the same buf_page_set_io_fix(bpage, BUF_IO_READ);
thread which called for the read operation (and is running now at
this point of code) can wait for the read to complete by waiting
for the x-lock on the frame; if the x-lock were recursive, the
same thread would illegally get the x-lock before the page read
is completed. The x-lock is cleared by the io-handler thread. */
rw_lock_x_lock_gen(&(block->lock), BUF_IO_READ); buf_pool->n_pend_reads++;
if (zip_size) {
void* data;
page_zip_set_size(&block->page.zip, zip_size);
mutex_exit(&block->mutex); mutex_exit(&block->mutex);
/* buf_pool->mutex may be released and reacquired by mutex_exit(&buf_pool->mutex);
buf_buddy_alloc(). Thus, we must release block->mutex } else {
in order not to break the latching order in UNIV_MEM_DESC(bpage->zip.data,
the reacquisition of buf_pool->mutex. We also must page_zip_get_size(&bpage->zip), bpage);
defer this operation until after the block descriptor buf_page_init_low(bpage);
has been added to buf_pool->LRU and buf_pool->page_hash. */ bpage->state = BUF_BLOCK_ZIP_PAGE;
data = buf_buddy_alloc(zip_size, TRUE); bpage->space = space;
mutex_enter(&block->mutex); bpage->offset = offset;
block->page.zip.data = data; #ifdef UNIV_DEBUG_FILE_ACCESSES
} bpage->file_page_was_freed = FALSE;
#endif /* UNIV_DEBUG_FILE_ACCESSES */
mutex_exit(&block->mutex); #ifdef UNIV_DEBUG
mutex_exit(&(buf_pool->mutex)); bpage->in_page_hash = FALSE;
bpage->in_zip_hash = FALSE;
bpage->in_flush_list = FALSE;
bpage->in_free_list = FALSE;
bpage->in_LRU_list = FALSE;
#endif /* UNIV_DEBUG */
ut_d(bpage->in_page_hash = TRUE);
HASH_INSERT(buf_page_t, hash, buf_pool->page_hash,
buf_page_address_fold(space, offset), bpage);
/* The block must be put to the LRU list, to the old blocks */
buf_LRU_add_block(bpage, TRUE/* to old blocks */);
buf_LRU_insert_zip_clean(bpage);
buf_page_set_io_fix(bpage, BUF_IO_READ);
buf_pool->n_pend_reads++;
mutex_exit(&buf_pool->zip_mutex);
mutex_exit(&buf_pool->mutex);
}
if (mode == BUF_READ_IBUF_PAGES_ONLY) { if (mode == BUF_READ_IBUF_PAGES_ONLY) {
mtr_commit(&mtr); mtr_commit(&mtr);
} }
return(block); return(bpage);
} }
/************************************************************************ /************************************************************************
......
...@@ -73,7 +73,7 @@ buf_read_page_low( ...@@ -73,7 +73,7 @@ buf_read_page_low(
which we have DISCARDed + IMPORTed back */ which we have DISCARDed + IMPORTed back */
ulint offset) /* in: page number */ ulint offset) /* in: page number */
{ {
buf_block_t* block; buf_page_t* bpage;
ulint wake_later; ulint wake_later;
*err = DB_SUCCESS; *err = DB_SUCCESS;
...@@ -113,9 +113,9 @@ buf_read_page_low( ...@@ -113,9 +113,9 @@ buf_read_page_low(
or is being dropped; if we succeed in initing the page in the buffer or is being dropped; if we succeed in initing the page in the buffer
pool for read, then DISCARD cannot proceed until the read has pool for read, then DISCARD cannot proceed until the read has
completed */ completed */
block = buf_page_init_for_read(err, mode, space, zip_size, bpage = buf_page_init_for_read(err, mode, space, zip_size,
tablespace_version, offset); tablespace_version, offset);
if (block == NULL) { if (bpage == NULL) {
return(0); return(0);
} }
...@@ -129,23 +129,25 @@ buf_read_page_low( ...@@ -129,23 +129,25 @@ buf_read_page_low(
} }
#endif #endif
ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE); ut_ad(buf_page_in_file(bpage));
if (zip_size) { if (zip_size) {
*err = fil_io(OS_FILE_READ | wake_later, *err = fil_io(OS_FILE_READ | wake_later,
sync, space, zip_size, offset, 0, zip_size, sync, space, zip_size, offset, 0, zip_size,
(void*) block->page.zip.data, (void*) block); bpage->zip.data, bpage);
} else { } else {
ut_a(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
*err = fil_io(OS_FILE_READ | wake_later, *err = fil_io(OS_FILE_READ | wake_later,
sync, space, 0, offset, 0, UNIV_PAGE_SIZE, sync, space, 0, offset, 0, UNIV_PAGE_SIZE,
(void*) block->frame, (void*) block); ((buf_block_t*) bpage)->frame, bpage);
} }
ut_a(*err == DB_SUCCESS); ut_a(*err == DB_SUCCESS);
if (sync) { if (sync) {
/* The i/o is already completed when we arrive from /* The i/o is already completed when we arrive from
fil_read */ fil_read */
buf_page_io_complete(&block->page); buf_page_io_complete(bpage);
} }
return(1); return(1);
......
...@@ -869,7 +869,7 @@ Sets the io_fix flag to BUF_IO_READ and sets a non-recursive exclusive lock ...@@ -869,7 +869,7 @@ Sets the io_fix flag to BUF_IO_READ and sets a non-recursive exclusive lock
on the buffer frame. The io-handler must take care that the flag is cleared on the buffer frame. The io-handler must take care that the flag is cleared
and the lock released later. */ and the lock released later. */
buf_block_t* buf_page_t*
buf_page_init_for_read( buf_page_init_for_read(
/*===================*/ /*===================*/
/* out: pointer to the block or NULL */ /* out: pointer to the block or NULL */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment