ibuf0ibuf.cc 141 KB
Newer Older
1 2
/*****************************************************************************

Sergei Golubchik's avatar
Sergei Golubchik committed
3
Copyright (c) 1997, 2016, Oracle and/or its affiliates. All Rights Reserved.
4
Copyright (c) 2016, 2017, MariaDB Corporation.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA

*****************************************************************************/

/**************************************************//**
@file ibuf/ibuf0ibuf.cc
Insert buffer

Created 7/19/1997 Heikki Tuuri
*******************************************************/

27 28
#include "ha_prototypes.h"

29
#include "ibuf0ibuf.h"
30 31
#include "sync0sync.h"
#include "btr0sea.h"
32 33

#if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
34
my_bool	srv_ibuf_disable_background_merge;
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */

/** Number of bits describing a single page */
#define IBUF_BITS_PER_PAGE	4
#if IBUF_BITS_PER_PAGE % 2
# error "IBUF_BITS_PER_PAGE must be an even number!"
#endif
/** The start address for an insert buffer bitmap page bitmap */
#define IBUF_BITMAP		PAGE_DATA

#include "buf0buf.h"
#include "buf0rea.h"
#include "fsp0fsp.h"
#include "trx0sys.h"
#include "fil0fil.h"
#include "rem0rec.h"
#include "btr0cur.h"
#include "btr0pcur.h"
#include "btr0btr.h"
#include "row0upd.h"
#include "dict0boot.h"
#include "fut0lst.h"
#include "lock0lock.h"
#include "log0recv.h"
#include "que0que.h"
#include "srv0start.h" /* srv_shutdown_state */
61
#include "fsp0sysspace.h"
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
#include "rem0cmp.h"

/*	STRUCTURE OF AN INSERT BUFFER RECORD

In versions < 4.1.x:

1. The first field is the page number.
2. The second field is an array which stores type info for each subsequent
   field. We store the information which affects the ordering of records, and
   also the physical storage size of an SQL NULL value. E.g., for CHAR(10) it
   is 10 bytes.
3. Next we have the fields of the actual index record.

In versions >= 4.1.x:

Note that contary to what we planned in the 1990's, there will only be one
insert buffer tree, and that is in the system tablespace of InnoDB.

1. The first field is the space id.
2. The second field is a one-byte marker (0) which differentiates records from
   the < 4.1.x storage format.
3. The third field is the page number.
4. The fourth field contains the type info, where we have also added 2 bytes to
   store the charset. In the compressed table format of 5.0.x we must add more
   information here so that we can build a dummy 'index' struct which 5.0.x
   can use in the binary search on the index page in the ibuf merge phase.
5. The rest of the fields contain the fields of the actual index record.

In versions >= 5.0.3:

The first byte of the fourth field is an additional marker (0) if the record
is in the compact format.  The presence of this marker can be detected by
looking at the length of the field modulo DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE.

The high-order bit of the character set field in the type info is the
"nullable" flag for the field.

In versions >= 5.5:

The optional marker byte at the start of the fourth field is replaced by
mandatory 3 fields, totaling 4 bytes:

 1. 2 bytes: Counter field, used to sort records within a (space id, page
    no) in the order they were added. This is needed so that for example the
    sequence of operations "INSERT x, DEL MARK x, INSERT x" is handled
    correctly.

 2. 1 byte: Operation type (see ibuf_op_t).

 3. 1 byte: Flags. Currently only one flag exists, IBUF_REC_COMPACT.

To ensure older records, which do not have counters to enforce correct
sorting, are merged before any new records, ibuf_insert checks if we're
trying to insert to a position that contains old-style records, and if so,
refuses the insert. Thus, ibuf pages are gradually converted to the new
format as their corresponding buffer pool pages are read into memory.
*/


/*	PREVENTING DEADLOCKS IN THE INSERT BUFFER SYSTEM

If an OS thread performs any operation that brings in disk pages from
non-system tablespaces into the buffer pool, or creates such a page there,
then the operation may have as a side effect an insert buffer index tree
compression. Thus, the tree latch of the insert buffer tree may be acquired
in the x-mode, and also the file space latch of the system tablespace may
be acquired in the x-mode.

Also, an insert to an index in a non-system tablespace can have the same
effect. How do we know this cannot lead to a deadlock of OS threads? There
is a problem with the i\o-handler threads: they break the latching order
because they own x-latches to pages which are on a lower level than the
insert buffer tree latch, its page latches, and the tablespace latch an
insert buffer operation can reserve.

The solution is the following: Let all the tree and page latches connected
with the insert buffer be later in the latching order than the fsp latch and
fsp page latches.

Insert buffer pages must be such that the insert buffer is never invoked
when these pages are accessed as this would result in a recursion violating
the latching order. We let a special i/o-handler thread take care of i/o to
the insert buffer pages and the ibuf bitmap pages, as well as the fsp bitmap
pages and the first inode page, which contains the inode of the ibuf tree: let
us call all these ibuf pages. To prevent deadlocks, we do not let a read-ahead
access both non-ibuf and ibuf pages.

Then an i/o-handler for the insert buffer never needs to access recursively the
insert buffer tree and thus obeys the latching order. On the other hand, other
i/o-handlers for other tablespaces may require access to the insert buffer,
but because all kinds of latches they need to access there are later in the
latching order, no violation of the latching order occurs in this case,
either.

A problem is how to grow and contract an insert buffer tree. As it is later
in the latching order than the fsp management, we have to reserve the fsp
latch first, before adding or removing pages from the insert buffer tree.
We let the insert buffer tree have its own file space management: a free
list of pages linked to the tree root. To prevent recursive using of the
insert buffer when adding pages to the tree, we must first load these pages
to memory, obtaining a latch on them, and only after that add them to the
free list of the insert buffer tree. More difficult is removing of pages
from the free list. If there is an excess of pages in the free list of the
ibuf tree, they might be needed if some thread reserves the fsp latch,
intending to allocate more file space. So we do the following: if a thread
reserves the fsp latch, we check the writer count field of the latch. If
this field has value 1, it means that the thread did not own the latch
before entering the fsp system, and the mtr of the thread contains no
modifications to the fsp pages. Now we are free to reserve the ibuf latch,
and check if there is an excess of pages in the free list. We can then, in a
separate mini-transaction, take them out of the free list and free them to
the fsp system.

To avoid deadlocks in the ibuf system, we divide file pages into three levels:

(1) non-ibuf pages,
(2) ibuf tree pages and the pages in the ibuf tree free list, and
(3) ibuf bitmap pages.

No OS thread is allowed to access higher level pages if it has latches to
lower level pages; even if the thread owns a B-tree latch it must not access
the B-tree non-leaf pages if it has latches on lower level pages. Read-ahead
is only allowed for level 1 and 2 pages. Dedicated i/o-handler threads handle
exclusively level 1 i/o. A dedicated i/o handler thread handles exclusively
level 2 i/o. However, if an OS thread does the i/o handling for itself, i.e.,
it uses synchronous aio, it can access any pages, as long as it obeys the
access order rules. */

/** Operations that can currently be buffered. */
191
ibuf_use_t	ibuf_use		= IBUF_USE_ALL;
192 193 194

#if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
/** Flag to control insert buffer debugging. */
195
uint	ibuf_debug;
196 197 198
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */

/** The insert buffer control structure */
199
ibuf_t*	ibuf			= NULL;
200 201 202 203 204 205 206 207 208 209

#ifdef UNIV_IBUF_COUNT_DEBUG
/** Number of tablespaces in the ibuf_counts array */
#define IBUF_COUNT_N_SPACES	4
/** Number of pages within each tablespace in the ibuf_counts array */
#define IBUF_COUNT_N_PAGES	130000

/** Buffered entry counts for file pages, used in debugging */
static ulint	ibuf_counts[IBUF_COUNT_N_SPACES][IBUF_COUNT_N_PAGES];

210 211
/** Checks that the indexes to ibuf_counts[][] are within limits.
@param[in]	page_id	page id */
212 213 214
UNIV_INLINE
void
ibuf_count_check(
215
	const page_id_t&	page_id)
216
{
217 218
	if (page_id.space() < IBUF_COUNT_N_SPACES
	    && page_id.page_no() < IBUF_COUNT_N_PAGES) {
219 220 221
		return;
	}

222 223 224 225 226
	ib::fatal() << "UNIV_IBUF_COUNT_DEBUG limits space_id and page_no"
		" and breaks crash recovery. space_id=" << page_id.space()
		<< ", should be 0<=space_id<" << IBUF_COUNT_N_SPACES
		<< ". page_no=" << page_id.page_no()
		<< ", should be 0<=page_no<" << IBUF_COUNT_N_PAGES;
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
}
#endif

/** @name Offsets to the per-page bits in the insert buffer bitmap */
/* @{ */
#define	IBUF_BITMAP_FREE	0	/*!< Bits indicating the
					amount of free space */
#define IBUF_BITMAP_BUFFERED	2	/*!< TRUE if there are buffered
					changes for the page */
#define IBUF_BITMAP_IBUF	3	/*!< TRUE if page is a part of
					the ibuf tree, excluding the
					root page, or is in the free
					list of the ibuf */
/* @} */

#define IBUF_REC_FIELD_SPACE	0	/*!< in the pre-4.1 format,
					the page number. later, the space_id */
#define IBUF_REC_FIELD_MARKER	1	/*!< starting with 4.1, a marker
					consisting of 1 byte that is 0 */
#define IBUF_REC_FIELD_PAGE	2	/*!< starting with 4.1, the
					page number */
#define IBUF_REC_FIELD_METADATA	3	/* the metadata field */
#define IBUF_REC_FIELD_USER	4	/* first user field */

/* Various constants for checking the type of an ibuf record and extracting
data from it. For details, see the description of the record format at the
top of this file. */

/** @name Format of the IBUF_REC_FIELD_METADATA of an insert buffer record
The fourth column in the MySQL 5.5 format contains an operation
type, counter, and some flags. */
/* @{ */
#define IBUF_REC_INFO_SIZE	4	/*!< Combined size of info fields at
					the beginning of the fourth field */
#if IBUF_REC_INFO_SIZE >= DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE
# error "IBUF_REC_INFO_SIZE >= DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE"
#endif

/* Offsets for the fields at the beginning of the fourth field */
#define IBUF_REC_OFFSET_COUNTER	0	/*!< Operation counter */
#define IBUF_REC_OFFSET_TYPE	2	/*!< Type of operation */
#define IBUF_REC_OFFSET_FLAGS	3	/*!< Additional flags */

/* Record flag masks */
#define IBUF_REC_COMPACT	0x1	/*!< Set in
					IBUF_REC_OFFSET_FLAGS if the
					user index is in COMPACT
					format or later */


/** The mutex used to block pessimistic inserts to ibuf trees */
static ib_mutex_t	ibuf_pessimistic_insert_mutex;

/** The mutex protecting the insert buffer structs */
static ib_mutex_t	ibuf_mutex;

/** The mutex protecting the insert buffer bitmaps */
static ib_mutex_t	ibuf_bitmap_mutex;

/** The area in pages from which contract looks for page numbers for merge */
287
const ulint		IBUF_MERGE_AREA = 8;
288 289 290 291

/** Inside the merge area, pages which have at most 1 per this number less
buffered entries compared to maximum volume that can buffered for a single
page are merged along with the page whose buffer became full */
292
const ulint		IBUF_MERGE_THRESHOLD = 4;
293 294 295

/** In ibuf_contract at most this number of pages is read to memory in one
batch, in order to merge the entries for them in the insert buffer */
296
const ulint		IBUF_MAX_N_PAGES_MERGED = IBUF_MERGE_AREA;
297 298 299 300

/** If the combined size of the ibuf trees exceeds ibuf->max_size by this
many pages, we start to contract it in connection to inserts there, using
non-synchronous contract */
301
const ulint		IBUF_CONTRACT_ON_INSERT_NON_SYNC = 0;
302 303 304 305

/** If the combined size of the ibuf trees exceeds ibuf->max_size by this
many pages, we start to contract it in connection to inserts there, using
synchronous contract */
306
const ulint		IBUF_CONTRACT_ON_INSERT_SYNC = 5;
307 308 309 310

/** If the combined size of the ibuf trees exceeds ibuf->max_size by
this many pages, we start to contract it synchronous contract, but do
not insert */
311
const ulint		IBUF_CONTRACT_DO_NOT_INSERT = 10;
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327

/* TODO: how to cope with drop table if there are records in the insert
buffer for the indexes of the table? Is there actually any problem,
because ibuf merge is done to a page when it is read in, and it is
still physically like the index page even if the index would have been
dropped! So, there seems to be no problem. */

/******************************************************************//**
Sets the flag in the current mini-transaction record indicating we're
inside an insert buffer routine. */
UNIV_INLINE
void
ibuf_enter(
/*=======*/
	mtr_t*	mtr)	/*!< in/out: mini-transaction */
{
328 329
	ut_ad(!mtr->is_inside_ibuf());
	mtr->enter_ibuf();
330 331 332 333 334 335 336 337 338 339 340
}

/******************************************************************//**
Sets the flag in the current mini-transaction record indicating we're
exiting an insert buffer routine. */
UNIV_INLINE
void
ibuf_exit(
/*======*/
	mtr_t*	mtr)	/*!< in/out: mini-transaction */
{
341 342
	ut_ad(mtr->is_inside_ibuf());
	mtr->exit_ibuf();
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
}

/**************************************************************//**
Commits an insert buffer mini-transaction and sets the persistent
cursor latch mode to BTR_NO_LATCHES, that is, detaches the cursor. */
UNIV_INLINE
void
ibuf_btr_pcur_commit_specify_mtr(
/*=============================*/
	btr_pcur_t*	pcur,	/*!< in/out: persistent cursor */
	mtr_t*		mtr)	/*!< in/out: mini-transaction */
{
	ut_d(ibuf_exit(mtr));
	btr_pcur_commit_specify_mtr(pcur, mtr);
}

/******************************************************************//**
Gets the ibuf header page and x-latches it.
361
@return insert buffer header page */
362 363 364 365 366 367 368 369 370
static
page_t*
ibuf_header_page_get(
/*=================*/
	mtr_t*	mtr)	/*!< in/out: mini-transaction */
{
	buf_block_t*	block;

	ut_ad(!ibuf_inside(mtr));
371
	page_t* page = NULL;
372 373

	block = buf_page_get(
374 375 376
		page_id_t(IBUF_SPACE_ID, FSP_IBUF_HEADER_PAGE_NO),
		univ_page_size, RW_X_LATCH, mtr);

377

378 379 380 381 382 383 384
	if (!block->page.encrypted) {
		buf_block_dbg_add_level(block, SYNC_IBUF_HEADER);

		page = buf_block_get_frame(block);
	}

	return page;
385 386 387
}

/******************************************************************//**
388 389
Gets the root page and sx-latches it.
@return insert buffer tree root page */
390 391 392 393 394 395 396 397 398 399 400 401
static
page_t*
ibuf_tree_root_get(
/*===============*/
	mtr_t*		mtr)	/*!< in: mtr */
{
	buf_block_t*	block;
	page_t*		root;

	ut_ad(ibuf_inside(mtr));
	ut_ad(mutex_own(&ibuf_mutex));

402
	mtr_sx_lock(dict_index_get_lock(ibuf->index), mtr);
403

404
	/* only segment list access is exclusive each other */
405
	block = buf_page_get(
406 407
		page_id_t(IBUF_SPACE_ID, FSP_IBUF_TREE_ROOT_PAGE_NO),
		univ_page_size, RW_SX_LATCH, mtr);
408 409 410 411 412 413 414 415 416 417 418 419 420

	buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);

	root = buf_block_get_frame(block);

	ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
	ut_ad(page_get_page_no(root) == FSP_IBUF_TREE_ROOT_PAGE_NO);
	ut_ad(ibuf->empty == page_is_empty(root));

	return(root);
}

#ifdef UNIV_IBUF_COUNT_DEBUG
421 422 423

/** Gets the ibuf count for a given page.
@param[in]	page_id	page id
424 425 426 427
@return number of entries in the insert buffer currently buffered for
this page */
ulint
ibuf_count_get(
428
	const page_id_t&	page_id)
429
{
430
	ibuf_count_check(page_id);
431

432
	return(ibuf_counts[page_id.space()][page_id.page_no()]);
433 434
}

435 436 437
/** Sets the ibuf count for a given page.
@param[in]	page_id	page id
@param[in]	val	value to set */
438 439 440
static
void
ibuf_count_set(
441 442
	const page_id_t&	page_id,
	ulint			val)
443
{
444
	ibuf_count_check(page_id);
445 446
	ut_a(val < UNIV_PAGE_SIZE);

447
	ibuf_counts[page_id.space()][page_id.page_no()] = val;
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
}
#endif

/******************************************************************//**
Closes insert buffer and frees the data structures. */
void
ibuf_close(void)
/*============*/
{
	mutex_free(&ibuf_pessimistic_insert_mutex);

	mutex_free(&ibuf_mutex);

	mutex_free(&ibuf_bitmap_mutex);

463 464 465 466 467 468
	dict_table_t*	ibuf_table = ibuf->index->table;
	rw_lock_free(&ibuf->index->lock);
	dict_mem_index_free(ibuf->index);
	dict_mem_table_free(ibuf_table);

	ut_free(ibuf);
469 470 471 472 473 474 475 476 477 478
	ibuf = NULL;
}

/******************************************************************//**
Updates the size information of the ibuf, assuming the segment size has not
changed. */
static
void
ibuf_size_update(
/*=============*/
479
	const page_t*	root)	/*!< in: ibuf tree root */
480 481 482 483
{
	ut_ad(mutex_own(&ibuf_mutex));

	ibuf->free_list_len = flst_get_len(root + PAGE_HEADER
484
					   + PAGE_BTR_IBUF_FREE_LIST);
485

486
	ibuf->height = 1 + btr_page_get_level_low(root);
487 488 489 490 491 492 493

	/* the '1 +' is the ibuf header page */
	ibuf->size = ibuf->seg_size - (1 + ibuf->free_list_len);
}

/******************************************************************//**
Creates the insert buffer data structure at a database startup and initializes
494 495 496
the data structures for the insert buffer.
@return DB_SUCCESS or failure */
dberr_t
497 498 499 500 501 502 503
ibuf_init_at_db_start(void)
/*=======================*/
{
	page_t*		root;
	mtr_t		mtr;
	ulint		n_used;
	page_t*		header_page;
504
	dberr_t		error= DB_SUCCESS;
505

506
	ibuf = static_cast<ibuf_t*>(ut_zalloc_nokey(sizeof(ibuf_t)));
507 508 509 510 511 512 513 514 515

	/* At startup we intialize ibuf to have a maximum of
	CHANGE_BUFFER_DEFAULT_SIZE in terms of percentage of the
	buffer pool size. Once ibuf struct is initialized this
	value is updated with the user supplied size by calling
	ibuf_max_size_update(). */
	ibuf->max_size = ((buf_pool_get_curr_size() / UNIV_PAGE_SIZE)
			  * CHANGE_BUFFER_DEFAULT_SIZE) / 100;

516
	mutex_create(LATCH_ID_IBUF, &ibuf_mutex);
517

518
	mutex_create(LATCH_ID_IBUF_BITMAP, &ibuf_bitmap_mutex);
519

520 521
	mutex_create(LATCH_ID_IBUF_PESSIMISTIC_INSERT,
		     &ibuf_pessimistic_insert_mutex);
522 523 524

	mtr_start(&mtr);

525
	mtr_x_lock_space(IBUF_SPACE_ID, &mtr);
526

527
	mutex_enter(&ibuf_mutex);
528 529 530

	header_page = ibuf_header_page_get(&mtr);

531 532 533 534
	if (!header_page) {
		return (DB_DECRYPTION_FAILED);
	}

535 536 537 538 539 540 541 542 543 544 545
	fseg_n_reserved_pages(header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER,
			      &n_used, &mtr);

	ut_ad(n_used >= 2);

	ibuf->seg_size = n_used;

	{
		buf_block_t*	block;

		block = buf_page_get(
546 547 548
			page_id_t(IBUF_SPACE_ID, FSP_IBUF_TREE_ROOT_PAGE_NO),
			univ_page_size, RW_X_LATCH, &mtr);

549 550 551 552 553
		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);

		root = buf_block_get_frame(block);
	}

554
	ibuf_size_update(root);
555 556 557
	mutex_exit(&ibuf_mutex);

	ibuf->empty = page_is_empty(root);
558
	mtr.commit();
559

560 561
	ibuf->index = dict_mem_index_create(
		"innodb_change_buffer", "CLUST_IND",
562
		IBUF_SPACE_ID, DICT_CLUSTERED | DICT_UNIVERSAL | DICT_IBUF, 1);
563 564 565 566 567 568
	ibuf->index->id = DICT_IBUF_ID_MIN + IBUF_SPACE_ID;
	ibuf->index->table = dict_mem_table_create(
		"innodb_change_buffer", IBUF_SPACE_ID, 1, 0, 0, 0);
	ibuf->index->n_uniq = REC_MAX_N_FIELDS;
	rw_lock_create(index_tree_rw_lock_key, &ibuf->index->lock,
		       SYNC_IBUF_INDEX_TREE);
569
#ifdef BTR_CUR_ADAPT
570
	ibuf->index->search_info = btr_search_info_create(ibuf->index->heap);
571
#endif /* BTR_CUR_ADAPT */
572 573
	ibuf->index->page = FSP_IBUF_TREE_ROOT_PAGE_NO;
	ut_d(ibuf->index->cached = TRUE);
574
	return (error);
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
}

/*********************************************************************//**
Updates the max_size value for ibuf. */
void
ibuf_max_size_update(
/*=================*/
	ulint	new_val)	/*!< in: new value in terms of
				percentage of the buffer pool size */
{
	ulint	new_size = ((buf_pool_get_curr_size() / UNIV_PAGE_SIZE)
			    * new_val) / 100;
	mutex_enter(&ibuf_mutex);
	ibuf->max_size = new_size;
	mutex_exit(&ibuf_mutex);
}


/*********************************************************************//**
Initializes an ibuf bitmap page. */
void
ibuf_bitmap_page_init(
/*==================*/
	buf_block_t*	block,	/*!< in: bitmap page */
	mtr_t*		mtr)	/*!< in: mtr */
{
	page_t*	page;
	ulint	byte_offset;

	page = buf_block_get_frame(block);
	fil_page_set_type(page, FIL_PAGE_IBUF_BITMAP);

	/* Write all zeros to the bitmap */

609 610
	byte_offset = UT_BITS_IN_BYTES(block->page.size.physical()
				       * IBUF_BITS_PER_PAGE);
611 612 613 614 615 616 617 618 619

	memset(page + IBUF_BITMAP, 0, byte_offset);

	/* The remaining area (up to the page trailer) is uninitialized. */
	mlog_write_initial_log_record(page, MLOG_IBUF_BITMAP_INIT, mtr);
}

/*********************************************************************//**
Parses a redo log record of an ibuf bitmap page init.
620
@return end of log record or NULL */
621 622 623 624
byte*
ibuf_parse_bitmap_init(
/*===================*/
	byte*		ptr,	/*!< in: buffer */
Sergei Golubchik's avatar
Sergei Golubchik committed
625
	byte*		end_ptr MY_ATTRIBUTE((unused)), /*!< in: buffer end */
626 627 628
	buf_block_t*	block,	/*!< in: block or NULL */
	mtr_t*		mtr)	/*!< in: mtr or NULL */
{
629 630
	ut_ad(ptr != NULL);
	ut_ad(end_ptr != NULL);
631 632 633 634 635 636 637

	if (block) {
		ibuf_bitmap_page_init(block, mtr);
	}

	return(ptr);
}
638

639 640
# ifdef UNIV_DEBUG
/** Gets the desired bits for a given page from a bitmap page.
641 642 643 644 645 646 647 648 649
@param[in]	page		bitmap page
@param[in]	page_id		page id whose bits to get
@param[in]	page_size	page id whose bits to get
@param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
@param[in,out]	mtr		mini-transaction holding an x-latch on the
bitmap page
@return value of bits */
#  define ibuf_bitmap_page_get_bits(page, page_id, page_size, bit, mtr)	\
	ibuf_bitmap_page_get_bits_low(page, page_id, page_size,		\
650 651 652
				      MTR_MEMO_PAGE_X_FIX, mtr, bit)
# else /* UNIV_DEBUG */
/** Gets the desired bits for a given page from a bitmap page.
653 654 655 656 657 658 659 660 661
@param[in]	page		bitmap page
@param[in]	page_id		page id whose bits to get
@param[in]	page_size	page id whose bits to get
@param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
@param[in,out]	mtr		mini-transaction holding an x-latch on the
bitmap page
@return value of bits */
#  define ibuf_bitmap_page_get_bits(page, page_id, page_size, bit, mtr)	\
	ibuf_bitmap_page_get_bits_low(page, page_id, page_size, bit)
662 663
# endif /* UNIV_DEBUG */

664 665 666 667 668 669 670 671 672
/** Gets the desired bits for a given page from a bitmap page.
@param[in]	page		bitmap page
@param[in]	page_id		page id whose bits to get
@param[in]	page_size	page size
@param[in]	latch_type	MTR_MEMO_PAGE_X_FIX, MTR_MEMO_BUF_FIX, ...
@param[in,out]	mtr		mini-transaction holding latch_type on the
bitmap page
@param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
@return value of bits */
673 674 675
UNIV_INLINE
ulint
ibuf_bitmap_page_get_bits_low(
676 677 678
	const page_t*		page,
	const page_id_t&	page_id,
	const page_size_t&	page_size,
679
#ifdef UNIV_DEBUG
680 681
	ulint			latch_type,
	mtr_t*			mtr,
682
#endif /* UNIV_DEBUG */
683
	ulint			bit)
684 685 686 687 688 689 690 691 692 693 694 695
{
	ulint	byte_offset;
	ulint	bit_offset;
	ulint	map_byte;
	ulint	value;

	ut_ad(bit < IBUF_BITS_PER_PAGE);
#if IBUF_BITS_PER_PAGE % 2
# error "IBUF_BITS_PER_PAGE % 2 != 0"
#endif
	ut_ad(mtr_memo_contains_page(mtr, page, latch_type));

696 697
	bit_offset = (page_id.page_no() % page_size.physical())
		* IBUF_BITS_PER_PAGE + bit;
698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716

	byte_offset = bit_offset / 8;
	bit_offset = bit_offset % 8;

	ut_ad(byte_offset + IBUF_BITMAP < UNIV_PAGE_SIZE);

	map_byte = mach_read_from_1(page + IBUF_BITMAP + byte_offset);

	value = ut_bit_get_nth(map_byte, bit_offset);

	if (bit == IBUF_BITMAP_FREE) {
		ut_ad(bit_offset + 1 < 8);

		value = value * 2 + ut_bit_get_nth(map_byte, bit_offset + 1);
	}

	return(value);
}

717 718 719 720 721 722 723
/** Sets the desired bit for a given page in a bitmap page.
@param[in,out]	page		bitmap page
@param[in]	page_id		page id whose bits to set
@param[in]	page_size	page size
@param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
@param[in]	val		value to set
@param[in,out]	mtr		mtr containing an x-latch to the bitmap page */
724 725 726
static
void
ibuf_bitmap_page_set_bits(
727 728 729 730 731 732
	page_t*			page,
	const page_id_t&	page_id,
	const page_size_t&	page_size,
	ulint			bit,
	ulint			val,
	mtr_t*			mtr)
733 734 735 736 737 738 739 740 741 742
{
	ulint	byte_offset;
	ulint	bit_offset;
	ulint	map_byte;

	ut_ad(bit < IBUF_BITS_PER_PAGE);
#if IBUF_BITS_PER_PAGE % 2
# error "IBUF_BITS_PER_PAGE % 2 != 0"
#endif
	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
743
	ut_ad(mtr->is_named_space(page_id.space()));
744 745
#ifdef UNIV_IBUF_COUNT_DEBUG
	ut_a((bit != IBUF_BITMAP_BUFFERED) || (val != FALSE)
746
	     || (0 == ibuf_count_get(page_id)));
747
#endif
748 749 750

	bit_offset = (page_id.page_no() % page_size.physical())
		* IBUF_BITS_PER_PAGE + bit;
751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773

	byte_offset = bit_offset / 8;
	bit_offset = bit_offset % 8;

	ut_ad(byte_offset + IBUF_BITMAP < UNIV_PAGE_SIZE);

	map_byte = mach_read_from_1(page + IBUF_BITMAP + byte_offset);

	if (bit == IBUF_BITMAP_FREE) {
		ut_ad(bit_offset + 1 < 8);
		ut_ad(val <= 3);

		map_byte = ut_bit_set_nth(map_byte, bit_offset, val / 2);
		map_byte = ut_bit_set_nth(map_byte, bit_offset + 1, val % 2);
	} else {
		ut_ad(val <= 1);
		map_byte = ut_bit_set_nth(map_byte, bit_offset, val);
	}

	mlog_write_ulint(page + IBUF_BITMAP + byte_offset, map_byte,
			 MLOG_1BYTE, mtr);
}

774 775 776 777
/** Calculates the bitmap page number for a given page number.
@param[in]	page_id		page id
@param[in]	page_size	page size
@return the bitmap page id where the file page is mapped */
778
UNIV_INLINE
779
const page_id_t
780
ibuf_bitmap_page_no_calc(
781 782
	const page_id_t&	page_id,
	const page_size_t&	page_size)
783
{
784
	ulint	bitmap_page_no;
785

786 787 788 789
	bitmap_page_no = FSP_IBUF_BITMAP_OFFSET
		+ (page_id.page_no() & ~(page_size.physical() - 1));

	return(page_id_t(page_id.space(), bitmap_page_no));
790 791
}

792
/** Gets the ibuf bitmap page where the bits describing a given file page are
793
stored.
794 795 796 797 798
@param[in]	page_id		page id of the file page
@param[in]	page_size	page size of the file page
@param[in]	file		file name
@param[in]	line		line where called
@param[in,out]	mtr		mini-transaction
799 800 801 802 803 804
@return bitmap page where the file page is mapped, that is, the bitmap
page containing the descriptor bits for the file page; the bitmap page
is x-latched */
static
page_t*
ibuf_bitmap_get_map_page_func(
805 806 807
	const page_id_t&	page_id,
	const page_size_t&	page_size,
	const char*		file,
808
	unsigned		line,
809
	mtr_t*			mtr)
810
{
811 812
	buf_block_t*	block = NULL;
	dberr_t		err = DB_SUCCESS;
813

814 815
	block = buf_page_get_gen(ibuf_bitmap_page_no_calc(page_id, page_size),
				 page_size, RW_X_LATCH, NULL, BUF_GET,
816 817 818 819 820 821
				 file, line, mtr, &err);

	if (err != DB_SUCCESS) {
		return NULL;
	}

822

823 824 825 826 827
	buf_block_dbg_add_level(block, SYNC_IBUF_BITMAP);

	return(buf_block_get_frame(block));
}

828
/** Gets the ibuf bitmap page where the bits describing a given file page are
829
stored.
830 831 832
@param[in]	page_id		page id of the file page
@param[in]	page_size	page size of the file page
@param[in,out]	mtr		mini-transaction
833 834
@return bitmap page where the file page is mapped, that is, the bitmap
page containing the descriptor bits for the file page; the bitmap page
835 836 837
is x-latched */
#define ibuf_bitmap_get_map_page(page_id, page_size, mtr)	\
	ibuf_bitmap_get_map_page_func(page_id, page_size, \
838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855
				      __FILE__, __LINE__, mtr)

/************************************************************************//**
Sets the free bits of the page in the ibuf bitmap. This is done in a separate
mini-transaction, hence this operation does not restrict further work to only
ibuf bitmap operations, which would result if the latch to the bitmap page
were kept. */
UNIV_INLINE
void
ibuf_set_free_bits_low(
/*===================*/
	const buf_block_t*	block,	/*!< in: index page; free bits are set if
					the index is non-clustered and page
					level is 0 */
	ulint			val,	/*!< in: value to set: < 4 */
	mtr_t*			mtr)	/*!< in/out: mtr */
{
	page_t*	bitmap_page;
856
	buf_frame_t* frame;
857 858

	ut_ad(mtr->is_named_space(block->page.id.space()));
859

860 861 862 863 864
	if (!block) {
		return;
	}

	frame = buf_block_get_frame(block);
865

866
	if (!frame || !page_is_leaf(frame)) {
867 868 869
		return;
	}

870 871
	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
					       block->page.size, mtr);
872

873 874
#ifdef UNIV_IBUF_DEBUG
	ut_a(val <= ibuf_index_page_calc_free(block));
875
#endif /* UNIV_IBUF_DEBUG */
876 877 878 879

	ibuf_bitmap_page_set_bits(
		bitmap_page, block->page.id, block->page.size,
		IBUF_BITMAP_FREE, val, mtr);
880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910
}

/************************************************************************//**
Sets the free bit of the page in the ibuf bitmap. This is done in a separate
mini-transaction, hence this operation does not restrict further work to only
ibuf bitmap operations, which would result if the latch to the bitmap page
were kept. */
void
ibuf_set_free_bits_func(
/*====================*/
	buf_block_t*	block,	/*!< in: index page of a non-clustered index;
				free bit is reset if page level is 0 */
#ifdef UNIV_IBUF_DEBUG
	ulint		max_val,/*!< in: ULINT_UNDEFINED or a maximum
				value which the bits must have before
				setting; this is for debugging */
#endif /* UNIV_IBUF_DEBUG */
	ulint		val)	/*!< in: value to set: < 4 */
{
	mtr_t	mtr;
	page_t*	page;
	page_t*	bitmap_page;

	page = buf_block_get_frame(block);

	if (!page_is_leaf(page)) {

		return;
	}

	mtr_start(&mtr);
911 912 913 914
	const fil_space_t* space = mtr.set_named_space(block->page.id.space());

	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
					       block->page.size, &mtr);
915

916 917 918 919 920 921 922 923 924 925 926 927 928 929
	switch (space->purpose) {
	case FIL_TYPE_LOG:
		ut_ad(0);
		break;
	case FIL_TYPE_TABLESPACE:
		/* Avoid logging while fixing up truncate of table. */
		if (!srv_is_tablespace_truncated(block->page.id.space())) {
			break;
		}
		/* fall through */
	case FIL_TYPE_TEMPORARY:
	case FIL_TYPE_IMPORT:
		mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
	}
930 931 932 933 934 935

#ifdef UNIV_IBUF_DEBUG
	if (max_val != ULINT_UNDEFINED) {
		ulint	old_val;

		old_val = ibuf_bitmap_page_get_bits(
936
			bitmap_page, block->page.id,
937 938 939 940 941 942 943 944 945 946 947 948 949 950 951
			IBUF_BITMAP_FREE, &mtr);
# if 0
		if (old_val != max_val) {
			fprintf(stderr,
				"Ibuf: page %lu old val %lu max val %lu\n",
				page_get_page_no(page),
				old_val, max_val);
		}
# endif

		ut_a(old_val <= max_val);
	}
# if 0
	fprintf(stderr, "Setting page no %lu free bits to %lu should be %lu\n",
		page_get_page_no(page), val,
952
		ibuf_index_page_calc_free(block));
953 954
# endif

955
	ut_a(val <= ibuf_index_page_calc_free(block));
956
#endif /* UNIV_IBUF_DEBUG */
957 958 959 960 961

	ibuf_bitmap_page_set_bits(
		bitmap_page, block->page.id, block->page.size,
		IBUF_BITMAP_FREE, val, &mtr);

962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006
	mtr_commit(&mtr);
}

/************************************************************************//**
Resets the free bits of the page in the ibuf bitmap. This is done in a
separate mini-transaction, hence this operation does not restrict
further work to only ibuf bitmap operations, which would result if the
latch to the bitmap page were kept.  NOTE: The free bits in the insert
buffer bitmap must never exceed the free space on a page.  It is safe
to decrement or reset the bits in the bitmap in a mini-transaction
that is committed before the mini-transaction that affects the free
space. */
void
ibuf_reset_free_bits(
/*=================*/
	buf_block_t*	block)	/*!< in: index page; free bits are set to 0
				if the index is a non-clustered
				non-unique, and page level is 0 */
{
	ibuf_set_free_bits(block, 0, ULINT_UNDEFINED);
}

/**********************************************************************//**
Updates the free bits for an uncompressed page to reflect the present
state.  Does this in the mtr given, which means that the latching
order rules virtually prevent any further operations for this OS
thread until mtr is committed.  NOTE: The free bits in the insert
buffer bitmap must never exceed the free space on a page.  It is safe
to set the free bits in the same mini-transaction that updated the
page. */
void
ibuf_update_free_bits_low(
/*======================*/
	const buf_block_t*	block,		/*!< in: index page */
	ulint			max_ins_size,	/*!< in: value of
						maximum insert size
						with reorganize before
						the latest operation
						performed to the page */
	mtr_t*			mtr)		/*!< in/out: mtr */
{
	ulint	before;
	ulint	after;

	ut_a(!buf_block_get_page_zip(block));
1007
	ut_ad(mtr->is_named_space(block->page.id.space()));
1008

1009 1010
	before = ibuf_index_page_calc_free_bits(block->page.size.logical(),
						max_ins_size);
1011

1012
	after = ibuf_index_page_calc_free(block);
1013 1014 1015 1016 1017 1018

	/* This approach cannot be used on compressed pages, since the
	computed value of "before" often does not match the current
	state of the bitmap.  This is because the free space may
	increase or decrease when a compressed page is reorganized. */
	if (before != after) {
1019
		ibuf_set_free_bits_low(block, after, mtr);
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
	}
}

/**********************************************************************//**
Updates the free bits for a compressed page to reflect the present
state.  Does this in the mtr given, which means that the latching
order rules virtually prevent any further operations for this OS
thread until mtr is committed.  NOTE: The free bits in the insert
buffer bitmap must never exceed the free space on a page.  It is safe
to set the free bits in the same mini-transaction that updated the
page. */
void
ibuf_update_free_bits_zip(
/*======================*/
	buf_block_t*	block,	/*!< in/out: index page */
	mtr_t*		mtr)	/*!< in/out: mtr */
{
	page_t*	bitmap_page;
	ulint	after;

1040 1041
	ut_a(block);
	buf_frame_t* frame = buf_block_get_frame(block);
1042 1043
	ut_a(frame);
	ut_a(page_is_leaf(frame));
1044
	ut_a(block->page.size.is_compressed());
1045

1046 1047
	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
					       block->page.size, mtr);
1048

1049
	after = ibuf_index_page_calc_free_zip(block);
1050 1051 1052 1053 1054 1055 1056 1057 1058 1059

	if (after == 0) {
		/* We move the page to the front of the buffer pool LRU list:
		the purpose of this is to prevent those pages to which we
		cannot make inserts using the insert buffer from slipping
		out of the buffer pool */

		buf_page_make_young(&block->page);
	}

1060 1061 1062
	ibuf_bitmap_page_set_bits(
		bitmap_page, block->page.id, block->page.size,
		IBUF_BITMAP_FREE, after, mtr);
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
}

/**********************************************************************//**
Updates the free bits for the two pages to reflect the present state.
Does this in the mtr given, which means that the latching order rules
virtually prevent any further operations until mtr is committed.
NOTE: The free bits in the insert buffer bitmap must never exceed the
free space on a page.  It is safe to set the free bits in the same
mini-transaction that updated the pages. */
void
ibuf_update_free_bits_for_two_pages_low(
/*====================================*/
	buf_block_t*	block1,	/*!< in: index page */
	buf_block_t*	block2,	/*!< in: index page */
	mtr_t*		mtr)	/*!< in: mtr */
{
	ulint	state;

1081 1082 1083
	ut_ad(mtr->is_named_space(block1->page.id.space()));
	ut_ad(block1->page.id.space() == block2->page.id.space());

1084 1085 1086 1087 1088 1089
	/* As we have to x-latch two random bitmap pages, we have to acquire
	the bitmap mutex to prevent a deadlock with a similar operation
	performed by another OS thread. */

	mutex_enter(&ibuf_bitmap_mutex);

1090
	state = ibuf_index_page_calc_free(block1);
1091

1092
	ibuf_set_free_bits_low(block1, state, mtr);
1093

1094
	state = ibuf_index_page_calc_free(block2);
1095

1096
	ibuf_set_free_bits_low(block2, state, mtr);
1097 1098 1099 1100

	mutex_exit(&ibuf_bitmap_mutex);
}

1101 1102 1103 1104
/** Returns TRUE if the page is one of the fixed address ibuf pages.
@param[in]	page_id		page id
@param[in]	page_size	page size
@return TRUE if a fixed address ibuf i/o page */
1105 1106 1107
UNIV_INLINE
ibool
ibuf_fixed_addr_page(
1108 1109
	const page_id_t&	page_id,
	const page_size_t&	page_size)
1110
{
1111 1112 1113
	return((page_id.space() == IBUF_SPACE_ID
		&& page_id.page_no() == IBUF_TREE_ROOT_PAGE_NO)
	       || ibuf_bitmap_page(page_id, page_size));
1114 1115
}

1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127
/** Checks if a page is a level 2 or 3 page in the ibuf hierarchy of pages.
Must not be called when recv_no_ibuf_operations==true.
@param[in]	page_id		page id
@param[in]	page_size	page size
@param[in]	x_latch		FALSE if relaxed check (avoid latching the
bitmap page)
@param[in]	file		file name
@param[in]	line		line where called
@param[in,out]	mtr		mtr which will contain an x-latch to the
bitmap page if the page is not one of the fixed address ibuf pages, or NULL,
in which case a new transaction is created.
@return TRUE if level 2 or level 3 page */
1128 1129
ibool
ibuf_page_low(
1130 1131
	const page_id_t&	page_id,
	const page_size_t&	page_size,
1132
#ifdef UNIV_DEBUG
1133
	ibool			x_latch,
1134
#endif /* UNIV_DEBUG */
1135
	const char*		file,
1136
	unsigned		line,
1137
	mtr_t*			mtr)
1138 1139 1140 1141 1142 1143 1144 1145
{
	ibool	ret;
	mtr_t	local_mtr;
	page_t*	bitmap_page;

	ut_ad(!recv_no_ibuf_operations);
	ut_ad(x_latch || mtr == NULL);

1146
	if (ibuf_fixed_addr_page(page_id, page_size)) {
1147 1148

		return(TRUE);
1149
	} else if (page_id.space() != IBUF_SPACE_ID) {
1150 1151 1152 1153

		return(FALSE);
	}

1154
	ut_ad(fil_space_get_type(IBUF_SPACE_ID) == FIL_TYPE_TABLESPACE);
1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169

#ifdef UNIV_DEBUG
	if (!x_latch) {
		mtr_start(&local_mtr);

		/* Get the bitmap page without a page latch, so that
		we will not be violating the latching order when
		another bitmap page has already been latched by this
		thread. The page will be buffer-fixed, and thus it
		cannot be removed or relocated while we are looking at
		it. The contents of the page could change, but the
		IBUF_BITMAP_IBUF bit that we are interested in should
		not be modified by any other thread. Nobody should be
		calling ibuf_add_free_page() or ibuf_remove_free_page()
		while the page is linked to the insert buffer b-tree. */
1170 1171 1172 1173 1174 1175
		dberr_t err = DB_SUCCESS;

		buf_block_t* block = buf_page_get_gen(
				ibuf_bitmap_page_no_calc(page_id, page_size),
				page_size, RW_NO_LATCH, NULL, BUF_GET_NO_LATCH,
				file, line, &local_mtr, &err);
1176

1177
		bitmap_page = buf_block_get_frame(block);
1178 1179

		ret = ibuf_bitmap_page_get_bits_low(
1180
			bitmap_page, page_id, page_size,
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192
			MTR_MEMO_BUF_FIX, &local_mtr, IBUF_BITMAP_IBUF);

		mtr_commit(&local_mtr);
		return(ret);
	}
#endif /* UNIV_DEBUG */

	if (mtr == NULL) {
		mtr = &local_mtr;
		mtr_start(mtr);
	}

1193
	bitmap_page = ibuf_bitmap_get_map_page_func(page_id, page_size,
1194 1195
						    file, line, mtr);

1196
	ret = ibuf_bitmap_page_get_bits(bitmap_page, page_id, page_size,
1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
					IBUF_BITMAP_IBUF, mtr);

	if (mtr == &local_mtr) {
		mtr_commit(mtr);
	}

	return(ret);
}

#ifdef UNIV_DEBUG
# define ibuf_rec_get_page_no(mtr,rec) ibuf_rec_get_page_no_func(mtr,rec)
#else /* UNIV_DEBUG */
# define ibuf_rec_get_page_no(mtr,rec) ibuf_rec_get_page_no_func(rec)
#endif /* UNIV_DEBUG */

/********************************************************************//**
Returns the page number field of an ibuf record.
1214
@return page number */
1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226
static
ulint
ibuf_rec_get_page_no_func(
/*======================*/
#ifdef UNIV_DEBUG
	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
#endif /* UNIV_DEBUG */
	const rec_t*	rec)	/*!< in: ibuf record */
{
	const byte*	field;
	ulint		len;

1227 1228 1229
	ut_ad(mtr_memo_contains_page_flagged(mtr, rec,
					     MTR_MEMO_PAGE_X_FIX
					     | MTR_MEMO_PAGE_S_FIX));
1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
	ut_ad(ibuf_inside(mtr));
	ut_ad(rec_get_n_fields_old(rec) > 2);

	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);

	ut_a(len == 1);

	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_PAGE, &len);

	ut_a(len == 4);

	return(mach_read_from_4(field));
}

#ifdef UNIV_DEBUG
# define ibuf_rec_get_space(mtr,rec) ibuf_rec_get_space_func(mtr,rec)
#else /* UNIV_DEBUG */
# define ibuf_rec_get_space(mtr,rec) ibuf_rec_get_space_func(rec)
#endif /* UNIV_DEBUG */

/********************************************************************//**
Returns the space id field of an ibuf record. For < 4.1.x format records
returns 0.
1253
@return space id */
1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265
static
ulint
ibuf_rec_get_space_func(
/*====================*/
#ifdef UNIV_DEBUG
	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
#endif /* UNIV_DEBUG */
	const rec_t*	rec)	/*!< in: ibuf record */
{
	const byte*	field;
	ulint		len;

1266 1267
	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
					     | MTR_MEMO_PAGE_S_FIX));
1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
	ut_ad(ibuf_inside(mtr));
	ut_ad(rec_get_n_fields_old(rec) > 2);

	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);

	ut_a(len == 1);

	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);

	ut_a(len == 4);

	return(mach_read_from_4(field));
}

#ifdef UNIV_DEBUG
# define ibuf_rec_get_info(mtr,rec,op,comp,info_len,counter)	\
	ibuf_rec_get_info_func(mtr,rec,op,comp,info_len,counter)
#else /* UNIV_DEBUG */
# define ibuf_rec_get_info(mtr,rec,op,comp,info_len,counter)	\
	ibuf_rec_get_info_func(rec,op,comp,info_len,counter)
#endif
/****************************************************************//**
Get various information about an ibuf record in >= 4.1.x format. */
static
void
ibuf_rec_get_info_func(
/*===================*/
#ifdef UNIV_DEBUG
	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
#endif /* UNIV_DEBUG */
	const rec_t*	rec,		/*!< in: ibuf record */
	ibuf_op_t*	op,		/*!< out: operation type, or NULL */
	ibool*		comp,		/*!< out: compact flag, or NULL */
	ulint*		info_len,	/*!< out: length of info fields at the
					start of the fourth field, or
					NULL */
	ulint*		counter)	/*!< in: counter value, or NULL */
{
	const byte*	types;
	ulint		fields;
	ulint		len;

	/* Local variables to shadow arguments. */
	ibuf_op_t	op_local;
	ibool		comp_local;
	ulint		info_len_local;
	ulint		counter_local;

1316 1317
	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
					     | MTR_MEMO_PAGE_S_FIX));
1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375
	ut_ad(ibuf_inside(mtr));
	fields = rec_get_n_fields_old(rec);
	ut_a(fields > IBUF_REC_FIELD_USER);

	types = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);

	info_len_local = len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;

	switch (info_len_local) {
	case 0:
	case 1:
		op_local = IBUF_OP_INSERT;
		comp_local = info_len_local;
		ut_ad(!counter);
		counter_local = ULINT_UNDEFINED;
		break;

	case IBUF_REC_INFO_SIZE:
		op_local = (ibuf_op_t) types[IBUF_REC_OFFSET_TYPE];
		comp_local = types[IBUF_REC_OFFSET_FLAGS] & IBUF_REC_COMPACT;
		counter_local = mach_read_from_2(
			types + IBUF_REC_OFFSET_COUNTER);
		break;

	default:
		ut_error;
	}

	ut_a(op_local < IBUF_OP_COUNT);
	ut_a((len - info_len_local) ==
	     (fields - IBUF_REC_FIELD_USER)
	     * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);

	if (op) {
		*op = op_local;
	}

	if (comp) {
		*comp = comp_local;
	}

	if (info_len) {
		*info_len = info_len_local;
	}

	if (counter) {
		*counter = counter_local;
	}
}

#ifdef UNIV_DEBUG
# define ibuf_rec_get_op_type(mtr,rec) ibuf_rec_get_op_type_func(mtr,rec)
#else /* UNIV_DEBUG */
# define ibuf_rec_get_op_type(mtr,rec) ibuf_rec_get_op_type_func(rec)
#endif

/****************************************************************//**
Returns the operation type field of an ibuf record.
1376
@return operation type */
1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387
static
ibuf_op_t
ibuf_rec_get_op_type_func(
/*======================*/
#ifdef UNIV_DEBUG
	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
#endif /* UNIV_DEBUG */
	const rec_t*	rec)	/*!< in: ibuf record */
{
	ulint		len;

1388 1389
	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
					     | MTR_MEMO_PAGE_S_FIX));
1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450
	ut_ad(ibuf_inside(mtr));
	ut_ad(rec_get_n_fields_old(rec) > 2);

	(void) rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);

	if (len > 1) {
		/* This is a < 4.1.x format record */

		return(IBUF_OP_INSERT);
	} else {
		ibuf_op_t	op;

		ibuf_rec_get_info(mtr, rec, &op, NULL, NULL, NULL);

		return(op);
	}
}

/****************************************************************//**
Read the first two bytes from a record's fourth field (counter field in new
records; something else in older records).
@return "counter" field, or ULINT_UNDEFINED if for some reason it
can't be read */
ulint
ibuf_rec_get_counter(
/*=================*/
	const rec_t*	rec)	/*!< in: ibuf record */
{
	const byte*	ptr;
	ulint		len;

	if (rec_get_n_fields_old(rec) <= IBUF_REC_FIELD_METADATA) {

		return(ULINT_UNDEFINED);
	}

	ptr = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);

	if (len >= 2) {

		return(mach_read_from_2(ptr));
	} else {

		return(ULINT_UNDEFINED);
	}
}

/****************************************************************//**
Add accumulated operation counts to a permanent array. Both arrays must be
of size IBUF_OP_COUNT. */
static
void
ibuf_add_ops(
/*=========*/
	ulint*		arr,	/*!< in/out: array to modify */
	const ulint*	ops)	/*!< in: operation counts */

{
	ulint	i;

	for (i = 0; i < IBUF_OP_COUNT; i++) {
1451
		my_atomic_addlint(&arr[i], ops[i]);
1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
	}
}

/****************************************************************//**
Print operation counts. The array must be of size IBUF_OP_COUNT. */
static
void
ibuf_print_ops(
/*===========*/
	const ulint*	ops,	/*!< in: operation counts */
	FILE*		file)	/*!< in: file where to print */
{
	static const char* op_names[] = {
		"insert",
		"delete mark",
		"delete"
	};
	ulint	i;

	ut_a(UT_ARR_SIZE(op_names) == IBUF_OP_COUNT);

	for (i = 0; i < IBUF_OP_COUNT; i++) {
1474 1475
		fprintf(file, "%s " ULINTPF "%s", op_names[i],
			ops[i], (i < (IBUF_OP_COUNT - 1)) ? ", " : "");
1476 1477 1478 1479 1480 1481 1482
	}

	putc('\n', file);
}

/********************************************************************//**
Creates a dummy index for inserting a record to a non-clustered index.
1483
@return dummy index */
1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494
static
dict_index_t*
ibuf_dummy_index_create(
/*====================*/
	ulint		n,	/*!< in: number of fields */
	ibool		comp)	/*!< in: TRUE=use compact record format */
{
	dict_table_t*	table;
	dict_index_t*	index;

	table = dict_mem_table_create("IBUF_DUMMY",
1495
				      DICT_HDR_SPACE, n, 0,
1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586
				      comp ? DICT_TF_COMPACT : 0, 0);

	index = dict_mem_index_create("IBUF_DUMMY", "IBUF_DUMMY",
				      DICT_HDR_SPACE, 0, n);

	index->table = table;

	/* avoid ut_ad(index->cached) in dict_index_get_n_unique_in_tree */
	index->cached = TRUE;

	return(index);
}
/********************************************************************//**
Add a column to the dummy index */
static
void
ibuf_dummy_index_add_col(
/*=====================*/
	dict_index_t*	index,	/*!< in: dummy index */
	const dtype_t*	type,	/*!< in: the data type of the column */
	ulint		len)	/*!< in: length of the column */
{
	ulint	i	= index->table->n_def;
	dict_mem_table_add_col(index->table, NULL, NULL,
			       dtype_get_mtype(type),
			       dtype_get_prtype(type),
			       dtype_get_len(type));
	dict_index_add_col(index, index->table,
			   dict_table_get_nth_col(index->table, i), len);
}
/********************************************************************//**
Deallocates a dummy index for inserting a record to a non-clustered index. */
static
void
ibuf_dummy_index_free(
/*==================*/
	dict_index_t*	index)	/*!< in, own: dummy index */
{
	dict_table_t*	table = index->table;

	dict_mem_index_free(index);
	dict_mem_table_free(table);
}

#ifdef UNIV_DEBUG
# define ibuf_build_entry_from_ibuf_rec(mtr,ibuf_rec,heap,pindex)	\
	ibuf_build_entry_from_ibuf_rec_func(mtr,ibuf_rec,heap,pindex)
#else /* UNIV_DEBUG */
# define ibuf_build_entry_from_ibuf_rec(mtr,ibuf_rec,heap,pindex)	\
	ibuf_build_entry_from_ibuf_rec_func(ibuf_rec,heap,pindex)
#endif

/*********************************************************************//**
Builds the entry used to

1) IBUF_OP_INSERT: insert into a non-clustered index

2) IBUF_OP_DELETE_MARK: find the record whose delete-mark flag we need to
   activate

3) IBUF_OP_DELETE: find the record we need to delete

when we have the corresponding record in an ibuf index.

NOTE that as we copy pointers to fields in ibuf_rec, the caller must
hold a latch to the ibuf_rec page as long as the entry is used!

@return own: entry to insert to a non-clustered index */
static
dtuple_t*
ibuf_build_entry_from_ibuf_rec_func(
/*================================*/
#ifdef UNIV_DEBUG
	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
#endif /* UNIV_DEBUG */
	const rec_t*	ibuf_rec,	/*!< in: record in an insert buffer */
	mem_heap_t*	heap,		/*!< in: heap where built */
	dict_index_t**	pindex)		/*!< out, own: dummy index that
					describes the entry */
{
	dtuple_t*	tuple;
	dfield_t*	field;
	ulint		n_fields;
	const byte*	types;
	const byte*	data;
	ulint		len;
	ulint		info_len;
	ulint		i;
	ulint		comp;
	dict_index_t*	index;

1587 1588
	ut_ad(mtr_memo_contains_page_flagged(mtr, ibuf_rec, MTR_MEMO_PAGE_X_FIX
					     | MTR_MEMO_PAGE_S_FIX));
1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640
	ut_ad(ibuf_inside(mtr));

	data = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_MARKER, &len);

	ut_a(len == 1);
	ut_a(*data == 0);
	ut_a(rec_get_n_fields_old(ibuf_rec) > IBUF_REC_FIELD_USER);

	n_fields = rec_get_n_fields_old(ibuf_rec) - IBUF_REC_FIELD_USER;

	tuple = dtuple_create(heap, n_fields);

	types = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_METADATA, &len);

	ibuf_rec_get_info(mtr, ibuf_rec, NULL, &comp, &info_len, NULL);

	index = ibuf_dummy_index_create(n_fields, comp);

	len -= info_len;
	types += info_len;

	ut_a(len == n_fields * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);

	for (i = 0; i < n_fields; i++) {
		field = dtuple_get_nth_field(tuple, i);

		data = rec_get_nth_field_old(
			ibuf_rec, i + IBUF_REC_FIELD_USER, &len);

		dfield_set_data(field, data, len);

		dtype_new_read_for_order_and_null_size(
			dfield_get_type(field),
			types + i * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);

		ibuf_dummy_index_add_col(index, dfield_get_type(field), len);
	}

	/* Prevent an ut_ad() failure in page_zip_write_rec() by
	adding system columns to the dummy table pointed to by the
	dummy secondary index.  The insert buffer is only used for
	secondary indexes, whose records never contain any system
	columns, such as DB_TRX_ID. */
	ut_d(dict_table_add_system_columns(index->table, index->table->heap));

	*pindex = index;

	return(tuple);
}

/******************************************************************//**
Get the data size.
1641
@return size of fields */
1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708
UNIV_INLINE
ulint
ibuf_rec_get_size(
/*==============*/
	const rec_t*	rec,			/*!< in: ibuf record */
	const byte*	types,			/*!< in: fields */
	ulint		n_fields,		/*!< in: number of fields */
	ulint		comp)			/*!< in: 0=ROW_FORMAT=REDUNDANT,
						nonzero=ROW_FORMAT=COMPACT */
{
	ulint	i;
	ulint	field_offset;
	ulint	types_offset;
	ulint	size = 0;

	field_offset = IBUF_REC_FIELD_USER;
	types_offset = DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;

	for (i = 0; i < n_fields; i++) {
		ulint		len;
		dtype_t		dtype;

		rec_get_nth_field_offs_old(rec, i + field_offset, &len);

		if (len != UNIV_SQL_NULL) {
			size += len;
		} else {
			dtype_new_read_for_order_and_null_size(&dtype, types);

			size += dtype_get_sql_null_size(&dtype, comp);
		}

		types += types_offset;
	}

	return(size);
}

#ifdef UNIV_DEBUG
# define ibuf_rec_get_volume(mtr,rec) ibuf_rec_get_volume_func(mtr,rec)
#else /* UNIV_DEBUG */
# define ibuf_rec_get_volume(mtr,rec) ibuf_rec_get_volume_func(rec)
#endif

/********************************************************************//**
Returns the space taken by a stored non-clustered index entry if converted to
an index record.
@return size of index record in bytes + an upper limit of the space
taken in the page directory */
static
ulint
ibuf_rec_get_volume_func(
/*=====================*/
#ifdef UNIV_DEBUG
	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
#endif /* UNIV_DEBUG */
	const rec_t*	ibuf_rec)/*!< in: ibuf record */
{
	ulint		len;
	const byte*	data;
	const byte*	types;
	ulint		n_fields;
	ulint		data_size;
	ulint		comp;
	ibuf_op_t	op;
	ulint		info_len;

1709 1710
	ut_ad(mtr_memo_contains_page_flagged(mtr, ibuf_rec, MTR_MEMO_PAGE_X_FIX
					     | MTR_MEMO_PAGE_S_FIX));
1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764
	ut_ad(ibuf_inside(mtr));
	ut_ad(rec_get_n_fields_old(ibuf_rec) > 2);

	data = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_MARKER, &len);
	ut_a(len == 1);
	ut_a(*data == 0);

	types = rec_get_nth_field_old(
		ibuf_rec, IBUF_REC_FIELD_METADATA, &len);

	ibuf_rec_get_info(mtr, ibuf_rec, &op, &comp, &info_len, NULL);

	if (op == IBUF_OP_DELETE_MARK || op == IBUF_OP_DELETE) {
		/* Delete-marking a record doesn't take any
		additional space, and while deleting a record
		actually frees up space, we have to play it safe and
		pretend it takes no additional space (the record
		might not exist, etc.).  */

		return(0);
	} else if (comp) {
		dtuple_t*	entry;
		ulint		volume;
		dict_index_t*	dummy_index;
		mem_heap_t*	heap = mem_heap_create(500);

		entry = ibuf_build_entry_from_ibuf_rec(mtr, ibuf_rec,
			heap, &dummy_index);

		volume = rec_get_converted_size(dummy_index, entry, 0);

		ibuf_dummy_index_free(dummy_index);
		mem_heap_free(heap);

		return(volume + page_dir_calc_reserved_space(1));
	}

	types += info_len;
	n_fields = rec_get_n_fields_old(ibuf_rec)
		- IBUF_REC_FIELD_USER;

	data_size = ibuf_rec_get_size(ibuf_rec, types, n_fields, comp);

	return(data_size + rec_get_converted_extra_size(data_size, n_fields, 0)
	       + page_dir_calc_reserved_space(1));
}

/*********************************************************************//**
Builds the tuple to insert to an ibuf tree when we have an entry for a
non-clustered index.

NOTE that the original entry must be kept because we copy pointers to
its fields.

1765
@return own: entry to insert into an ibuf index tree */
1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926
static
dtuple_t*
ibuf_entry_build(
/*=============*/
	ibuf_op_t	op,	/*!< in: operation type */
	dict_index_t*	index,	/*!< in: non-clustered index */
	const dtuple_t*	entry,	/*!< in: entry for a non-clustered index */
	ulint		space,	/*!< in: space id */
	ulint		page_no,/*!< in: index page number where entry should
				be inserted */
	ulint		counter,/*!< in: counter value;
				ULINT_UNDEFINED=not used */
	mem_heap_t*	heap)	/*!< in: heap into which to build */
{
	dtuple_t*	tuple;
	dfield_t*	field;
	const dfield_t*	entry_field;
	ulint		n_fields;
	byte*		buf;
	byte*		ti;
	byte*		type_info;
	ulint		i;

	ut_ad(counter != ULINT_UNDEFINED || op == IBUF_OP_INSERT);
	ut_ad(counter == ULINT_UNDEFINED || counter <= 0xFFFF);
	ut_ad(op < IBUF_OP_COUNT);

	/* We have to build a tuple with the following fields:

	1-4) These are described at the top of this file.

	5) The rest of the fields are copied from the entry.

	All fields in the tuple are ordered like the type binary in our
	insert buffer tree. */

	n_fields = dtuple_get_n_fields(entry);

	tuple = dtuple_create(heap, n_fields + IBUF_REC_FIELD_USER);

	/* 1) Space Id */

	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_SPACE);

	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));

	mach_write_to_4(buf, space);

	dfield_set_data(field, buf, 4);

	/* 2) Marker byte */

	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_MARKER);

	buf = static_cast<byte*>(mem_heap_alloc(heap, 1));

	/* We set the marker byte zero */

	mach_write_to_1(buf, 0);

	dfield_set_data(field, buf, 1);

	/* 3) Page number */

	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_PAGE);

	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));

	mach_write_to_4(buf, page_no);

	dfield_set_data(field, buf, 4);

	/* 4) Type info, part #1 */

	if (counter == ULINT_UNDEFINED) {
		i = dict_table_is_comp(index->table) ? 1 : 0;
	} else {
		ut_ad(counter <= 0xFFFF);
		i = IBUF_REC_INFO_SIZE;
	}

	ti = type_info = static_cast<byte*>(
		mem_heap_alloc(
			heap,
			i + n_fields * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE));

	switch (i) {
	default:
		ut_error;
		break;
	case 1:
		/* set the flag for ROW_FORMAT=COMPACT */
		*ti++ = 0;
		/* fall through */
	case 0:
		/* the old format does not allow delete buffering */
		ut_ad(op == IBUF_OP_INSERT);
		break;
	case IBUF_REC_INFO_SIZE:
		mach_write_to_2(ti + IBUF_REC_OFFSET_COUNTER, counter);

		ti[IBUF_REC_OFFSET_TYPE] = (byte) op;
		ti[IBUF_REC_OFFSET_FLAGS] = dict_table_is_comp(index->table)
			? IBUF_REC_COMPACT : 0;
		ti += IBUF_REC_INFO_SIZE;
		break;
	}

	/* 5+) Fields from the entry */

	for (i = 0; i < n_fields; i++) {
		ulint			fixed_len;
		const dict_field_t*	ifield;

		field = dtuple_get_nth_field(tuple, i + IBUF_REC_FIELD_USER);
		entry_field = dtuple_get_nth_field(entry, i);
		dfield_copy(field, entry_field);

		ifield = dict_index_get_nth_field(index, i);
		/* Prefix index columns of fixed-length columns are of
		fixed length.  However, in the function call below,
		dfield_get_type(entry_field) contains the fixed length
		of the column in the clustered index.  Replace it with
		the fixed length of the secondary index column. */
		fixed_len = ifield->fixed_len;

#ifdef UNIV_DEBUG
		if (fixed_len) {
			/* dict_index_add_col() should guarantee these */
			ut_ad(fixed_len <= (ulint)
			      dfield_get_type(entry_field)->len);
			if (ifield->prefix_len) {
				ut_ad(ifield->prefix_len == fixed_len);
			} else {
				ut_ad(fixed_len == (ulint)
				      dfield_get_type(entry_field)->len);
			}
		}
#endif /* UNIV_DEBUG */

		dtype_new_store_for_order_and_null_size(
			ti, dfield_get_type(entry_field), fixed_len);
		ti += DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
	}

	/* 4) Type info, part #2 */

	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_METADATA);

	dfield_set_data(field, type_info, ti - type_info);

	/* Set all the types in the new tuple binary */

	dtuple_set_types_binary(tuple, n_fields + IBUF_REC_FIELD_USER);

	return(tuple);
}

/*********************************************************************//**
Builds a search tuple used to search buffered inserts for an index page.
This is for >= 4.1.x format records.
1927
@return own: search tuple */
1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979
static
dtuple_t*
ibuf_search_tuple_build(
/*====================*/
	ulint		space,	/*!< in: space id */
	ulint		page_no,/*!< in: index page number */
	mem_heap_t*	heap)	/*!< in: heap into which to build */
{
	dtuple_t*	tuple;
	dfield_t*	field;
	byte*		buf;

	tuple = dtuple_create(heap, IBUF_REC_FIELD_METADATA);

	/* Store the space id in tuple */

	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_SPACE);

	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));

	mach_write_to_4(buf, space);

	dfield_set_data(field, buf, 4);

	/* Store the new format record marker byte */

	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_MARKER);

	buf = static_cast<byte*>(mem_heap_alloc(heap, 1));

	mach_write_to_1(buf, 0);

	dfield_set_data(field, buf, 1);

	/* Store the page number in tuple */

	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_PAGE);

	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));

	mach_write_to_4(buf, page_no);

	dfield_set_data(field, buf, 4);

	dtuple_set_types_binary(tuple, IBUF_REC_FIELD_METADATA);

	return(tuple);
}

/*********************************************************************//**
Checks if there are enough pages in the free list of the ibuf tree that we
dare to start a pessimistic insert to the insert buffer.
1980
@return TRUE if enough free pages in list */
1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999
UNIV_INLINE
ibool
ibuf_data_enough_free_for_insert(void)
/*==================================*/
{
	ut_ad(mutex_own(&ibuf_mutex));

	/* We want a big margin of free pages, because a B-tree can sometimes
	grow in size also if records are deleted from it, as the node pointers
	can change, and we must make sure that we are able to delete the
	inserts buffered for pages that we read to the buffer pool, without
	any risk of running out of free space in the insert buffer. */

	return(ibuf->free_list_len >= (ibuf->size / 2) + 3 * ibuf->height);
}

/*********************************************************************//**
Checks if there are enough pages in the free list of the ibuf tree that we
should remove them and free to the file space management.
2000
@return TRUE if enough free pages in list */
2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013
UNIV_INLINE
ibool
ibuf_data_too_much_free(void)
/*=========================*/
{
	ut_ad(mutex_own(&ibuf_mutex));

	return(ibuf->free_list_len >= 3 + (ibuf->size / 2) + 3 * ibuf->height);
}

/*********************************************************************//**
Allocates a new page from the ibuf file segment and adds it to the free
list.
2014
@return TRUE on success, FALSE if no space left */
2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027
static
ibool
ibuf_add_free_page(void)
/*====================*/
{
	mtr_t		mtr;
	page_t*		header_page;
	buf_block_t*	block;
	page_t*		page;
	page_t*		root;
	page_t*		bitmap_page;

	mtr_start(&mtr);
2028
	fil_space_t* space = mtr.set_sys_modified();
2029 2030 2031

	/* Acquire the fsp latch before the ibuf header, obeying the latching
	order */
2032
	mtr_x_lock(&space->latch, &mtr);
2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076
	header_page = ibuf_header_page_get(&mtr);

	/* Allocate a new page: NOTE that if the page has been a part of a
	non-clustered index which has subsequently been dropped, then the
	page may have buffered inserts in the insert buffer, and these
	should be deleted from there. These get deleted when the page
	allocation creates the page in buffer. Thus the call below may end
	up calling the insert buffer routines and, as we yet have no latches
	to insert buffer tree pages, these routines can run without a risk
	of a deadlock. This is the reason why we created a special ibuf
	header page apart from the ibuf tree. */

	block = fseg_alloc_free_page(
		header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER, 0, FSP_UP,
		&mtr);

	if (block == NULL) {
		mtr_commit(&mtr);

		return(FALSE);
	}

	ut_ad(rw_lock_get_x_lock_count(&block->lock) == 1);
	ibuf_enter(&mtr);
	mutex_enter(&ibuf_mutex);
	root = ibuf_tree_root_get(&mtr);

	buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
	page = buf_block_get_frame(block);

	/* Add the page to the free list and update the ibuf size data */

	flst_add_last(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
		      page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, &mtr);

	mlog_write_ulint(page + FIL_PAGE_TYPE, FIL_PAGE_IBUF_FREE_LIST,
			 MLOG_2BYTES, &mtr);

	ibuf->seg_size++;
	ibuf->free_list_len++;

	/* Set the bit indicating that this page is now an ibuf tree page
	(level 2 page) */

2077 2078 2079 2080
	const page_id_t		page_id(IBUF_SPACE_ID, block->page.id.page_no());
	const page_size_t	page_size(space->flags);

	bitmap_page = ibuf_bitmap_get_map_page(page_id, page_size, &mtr);
2081 2082 2083

	mutex_exit(&ibuf_mutex);

2084 2085
	ibuf_bitmap_page_set_bits(bitmap_page, page_id, page_size,
				  IBUF_BITMAP_IBUF, TRUE, &mtr);
2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106

	ibuf_mtr_commit(&mtr);

	return(TRUE);
}

/*********************************************************************//**
Removes a page from the free list and frees it to the fsp system. */
static
void
ibuf_remove_free_page(void)
/*=======================*/
{
	mtr_t	mtr;
	mtr_t	mtr2;
	page_t*	header_page;
	ulint	page_no;
	page_t*	page;
	page_t*	root;
	page_t*	bitmap_page;

2107 2108
	log_free_check();

2109
	mtr_start(&mtr);
2110 2111
	fil_space_t*		space = mtr.set_sys_modified();
	const page_size_t	page_size(space->flags);
2112 2113 2114 2115

	/* Acquire the fsp latch before the ibuf header, obeying the latching
	order */

2116
	mtr_x_lock(&space->latch, &mtr);
2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156
	header_page = ibuf_header_page_get(&mtr);

	/* Prevent pessimistic inserts to insert buffer trees for a while */
	ibuf_enter(&mtr);
	mutex_enter(&ibuf_pessimistic_insert_mutex);
	mutex_enter(&ibuf_mutex);

	if (!ibuf_data_too_much_free()) {

		mutex_exit(&ibuf_mutex);
		mutex_exit(&ibuf_pessimistic_insert_mutex);

		ibuf_mtr_commit(&mtr);

		return;
	}

	ibuf_mtr_start(&mtr2);

	root = ibuf_tree_root_get(&mtr2);

	mutex_exit(&ibuf_mutex);

	page_no = flst_get_last(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
				&mtr2).page;

	/* NOTE that we must release the latch on the ibuf tree root
	because in fseg_free_page we access level 1 pages, and the root
	is a level 2 page. */

	ibuf_mtr_commit(&mtr2);
	ibuf_exit(&mtr);

	/* Since pessimistic inserts were prevented, we know that the
	page is still in the free list. NOTE that also deletes may take
	pages from the free list, but they take them from the start, and
	the free list was so long that they cannot have taken the last
	page from it. */

	fseg_free_page(header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER,
2157 2158 2159
		       IBUF_SPACE_ID, page_no, false, &mtr);

	const page_id_t	page_id(IBUF_SPACE_ID, page_no);
2160

2161
	ut_d(buf_page_reset_file_page_was_freed(page_id));
2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174

	ibuf_enter(&mtr);

	mutex_enter(&ibuf_mutex);

	root = ibuf_tree_root_get(&mtr);

	ut_ad(page_no == flst_get_last(root + PAGE_HEADER
				       + PAGE_BTR_IBUF_FREE_LIST, &mtr).page);

	{
		buf_block_t*	block;

2175
		block = buf_page_get(page_id, univ_page_size, RW_X_LATCH, &mtr);
2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194

		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);

		page = buf_block_get_frame(block);
	}

	/* Remove the page from the free list and update the ibuf size data */

	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
		    page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, &mtr);

	mutex_exit(&ibuf_pessimistic_insert_mutex);

	ibuf->seg_size--;
	ibuf->free_list_len--;

	/* Set the bit indicating that this page is no more an ibuf tree page
	(level 2 page) */

2195
	bitmap_page = ibuf_bitmap_get_map_page(page_id, page_size, &mtr);
2196 2197 2198 2199

	mutex_exit(&ibuf_mutex);

	ibuf_bitmap_page_set_bits(
2200 2201 2202 2203
		bitmap_page, page_id, page_size, IBUF_BITMAP_IBUF, FALSE,
		&mtr);

	ut_d(buf_page_set_file_page_was_freed(page_id));
2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215

	ibuf_mtr_commit(&mtr);
}

/***********************************************************************//**
Frees excess pages from the ibuf free list. This function is called when an OS
thread calls fsp services to allocate a new file segment, or a new page to a
file segment, and the thread did not own the fsp latch before this call. */
void
ibuf_free_excess_pages(void)
/*========================*/
{
2216
	if (srv_force_recovery >= SRV_FORCE_NO_IBUF_MERGE) {
2217 2218 2219 2220 2221 2222
		return;
	}

	/* Free at most a few pages at a time, so that we do not delay the
	requested service too much */

2223
	for (ulint i = 0; i < 4; i++) {
2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239

		ibool	too_much_free;

		mutex_enter(&ibuf_mutex);
		too_much_free = ibuf_data_too_much_free();
		mutex_exit(&ibuf_mutex);

		if (!too_much_free) {
			return;
		}

		ibuf_remove_free_page();
	}
}

#ifdef UNIV_DEBUG
2240 2241
# define ibuf_get_merge_page_nos(contract,rec,mtr,ids,pages,n_stored) \
	ibuf_get_merge_page_nos_func(contract,rec,mtr,ids,pages,n_stored)
2242
#else /* UNIV_DEBUG */
2243 2244
# define ibuf_get_merge_page_nos(contract,rec,mtr,ids,pages,n_stored) \
	ibuf_get_merge_page_nos_func(contract,rec,ids,pages,n_stored)
2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281
#endif /* UNIV_DEBUG */

/*********************************************************************//**
Reads page numbers from a leaf in an ibuf tree.
@return a lower limit for the combined volume of records which will be
merged */
static
ulint
ibuf_get_merge_page_nos_func(
/*=========================*/
	ibool		contract,/*!< in: TRUE if this function is called to
				contract the tree, FALSE if this is called
				when a single page becomes full and we look
				if it pays to read also nearby pages */
	const rec_t*	rec,	/*!< in: insert buffer record */
#ifdef UNIV_DEBUG
	mtr_t*		mtr,	/*!< in: mini-transaction holding rec */
#endif /* UNIV_DEBUG */
	ulint*		space_ids,/*!< in/out: space id's of the pages */
	ulint*		page_nos,/*!< in/out: buffer for at least
				IBUF_MAX_N_PAGES_MERGED many page numbers;
				the page numbers are in an ascending order */
	ulint*		n_stored)/*!< out: number of page numbers stored to
				page_nos in this function */
{
	ulint	prev_page_no;
	ulint	prev_space_id;
	ulint	first_page_no;
	ulint	first_space_id;
	ulint	rec_page_no;
	ulint	rec_space_id;
	ulint	sum_volumes;
	ulint	volume_for_page;
	ulint	rec_volume;
	ulint	limit;
	ulint	n_pages;

2282 2283
	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
					     | MTR_MEMO_PAGE_S_FIX));
2284 2285 2286 2287
	ut_ad(ibuf_inside(mtr));

	*n_stored = 0;

2288 2289
	limit = ut_min(IBUF_MAX_N_PAGES_MERGED,
		       buf_pool_get_curr_size() / 4);
2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358

	if (page_rec_is_supremum(rec)) {

		rec = page_rec_get_prev_const(rec);
	}

	if (page_rec_is_infimum(rec)) {

		rec = page_rec_get_next_const(rec);
	}

	if (page_rec_is_supremum(rec)) {

		return(0);
	}

	first_page_no = ibuf_rec_get_page_no(mtr, rec);
	first_space_id = ibuf_rec_get_space(mtr, rec);
	n_pages = 0;
	prev_page_no = 0;
	prev_space_id = 0;

	/* Go backwards from the first rec until we reach the border of the
	'merge area', or the page start or the limit of storeable pages is
	reached */

	while (!page_rec_is_infimum(rec) && UNIV_LIKELY(n_pages < limit)) {

		rec_page_no = ibuf_rec_get_page_no(mtr, rec);
		rec_space_id = ibuf_rec_get_space(mtr, rec);

		if (rec_space_id != first_space_id
		    || (rec_page_no / IBUF_MERGE_AREA)
		    != (first_page_no / IBUF_MERGE_AREA)) {

			break;
		}

		if (rec_page_no != prev_page_no
		    || rec_space_id != prev_space_id) {
			n_pages++;
		}

		prev_page_no = rec_page_no;
		prev_space_id = rec_space_id;

		rec = page_rec_get_prev_const(rec);
	}

	rec = page_rec_get_next_const(rec);

	/* At the loop start there is no prev page; we mark this with a pair
	of space id, page no (0, 0) for which there can never be entries in
	the insert buffer */

	prev_page_no = 0;
	prev_space_id = 0;
	sum_volumes = 0;
	volume_for_page = 0;

	while (*n_stored < limit) {
		if (page_rec_is_supremum(rec)) {
			/* When no more records available, mark this with
			another 'impossible' pair of space id, page no */
			rec_page_no = 1;
			rec_space_id = 0;
		} else {
			rec_page_no = ibuf_rec_get_page_no(mtr, rec);
			rec_space_id = ibuf_rec_get_space(mtr, rec);
2359
			/* In the system tablespace the smallest
2360
			possible secondary index leaf page number is
2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375
			bigger than FSP_DICT_HDR_PAGE_NO (7).
			In all tablespaces, pages 0 and 1 are reserved
			for the allocation bitmap and the change
			buffer bitmap. In file-per-table tablespaces,
			a file segment inode page will be created at
			page 2 and the clustered index tree is created
			at page 3.  So for file-per-table tablespaces,
			page 4 is the smallest possible secondary
			index leaf page. CREATE TABLESPACE also initially
			uses pages 2 and 3 for the first created table,
			but that table may be dropped, allowing page 2
			to be reused for a secondary index leaf page.
			To keep this assertion simple, just
			make sure the page is >= 2. */
			ut_ad(rec_page_no >= FSP_FIRST_INODE_PAGE_NO);
2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439
		}

#ifdef UNIV_IBUF_DEBUG
		ut_a(*n_stored < IBUF_MAX_N_PAGES_MERGED);
#endif
		if ((rec_space_id != prev_space_id
		     || rec_page_no != prev_page_no)
		    && (prev_space_id != 0 || prev_page_no != 0)) {

			if (contract
			    || (prev_page_no == first_page_no
				&& prev_space_id == first_space_id)
			    || (volume_for_page
				> ((IBUF_MERGE_THRESHOLD - 1)
				   * 4 * UNIV_PAGE_SIZE
				   / IBUF_PAGE_SIZE_PER_FREE_SPACE)
				/ IBUF_MERGE_THRESHOLD)) {

				space_ids[*n_stored] = prev_space_id;
				page_nos[*n_stored] = prev_page_no;

				(*n_stored)++;

				sum_volumes += volume_for_page;
			}

			if (rec_space_id != first_space_id
			    || rec_page_no / IBUF_MERGE_AREA
			    != first_page_no / IBUF_MERGE_AREA) {

				break;
			}

			volume_for_page = 0;
		}

		if (rec_page_no == 1 && rec_space_id == 0) {
			/* Supremum record */

			break;
		}

		rec_volume = ibuf_rec_get_volume(mtr, rec);

		volume_for_page += rec_volume;

		prev_page_no = rec_page_no;
		prev_space_id = rec_space_id;

		rec = page_rec_get_next_const(rec);
	}

#ifdef UNIV_IBUF_DEBUG
	ut_a(*n_stored <= IBUF_MAX_N_PAGES_MERGED);
#endif
#if 0
	fprintf(stderr, "Ibuf merge batch %lu pages %lu volume\n",
		*n_stored, sum_volumes);
#endif
	return(sum_volumes);
}

/*******************************************************************//**
Get the matching records for space id.
2440
@return current rec or NULL */
Sergei Golubchik's avatar
Sergei Golubchik committed
2441
static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462
const rec_t*
ibuf_get_user_rec(
/*===============*/
	btr_pcur_t*	pcur,		/*!< in: the current cursor */
	mtr_t*		mtr)		/*!< in: mini transaction */
{
	do {
		const rec_t* rec = btr_pcur_get_rec(pcur);

		if (page_rec_is_user_rec(rec)) {
			return(rec);
		}
	} while (btr_pcur_move_to_next(pcur, mtr));

	return(NULL);
}

/*********************************************************************//**
Reads page numbers for a space id from an ibuf tree.
@return a lower limit for the combined volume of records which will be
merged */
Sergei Golubchik's avatar
Sergei Golubchik committed
2463
static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527
ulint
ibuf_get_merge_pages(
/*=================*/
	btr_pcur_t*	pcur,	/*!< in/out: cursor */
	ulint		space,	/*!< in: space for which to merge */
	ulint		limit,	/*!< in: max page numbers to read */
	ulint*		pages,	/*!< out: pages read */
	ulint*		spaces,	/*!< out: spaces read */
	ulint*		n_pages,/*!< out: number of pages read */
	mtr_t*		mtr)	/*!< in: mini transaction */
{
	const rec_t*	rec;
	ulint		volume = 0;

	ut_a(space != ULINT_UNDEFINED);

	*n_pages = 0;

	while ((rec = ibuf_get_user_rec(pcur, mtr)) != 0
	       && ibuf_rec_get_space(mtr, rec) == space
	       && *n_pages < limit) {

		ulint	page_no = ibuf_rec_get_page_no(mtr, rec);

		if (*n_pages == 0 || pages[*n_pages - 1] != page_no) {
			spaces[*n_pages] = space;
			pages[*n_pages] = page_no;
			++*n_pages;
		}

		volume += ibuf_rec_get_volume(mtr, rec);

		btr_pcur_move_to_next(pcur, mtr);
	}

	return(volume);
}

/*********************************************************************//**
Contracts insert buffer trees by reading pages to the buffer pool.
@return a lower limit for the combined size in bytes of entries which
will be merged from ibuf trees to the pages read, 0 if ibuf is
empty */
static
ulint
ibuf_merge_pages(
/*=============*/
	ulint*	n_pages,	/*!< out: number of pages to which merged */
	bool	sync)		/*!< in: true if the caller wants to wait for
				the issued read with the highest tablespace
				address to complete */
{
	mtr_t		mtr;
	btr_pcur_t	pcur;
	ulint		sum_sizes;
	ulint		page_nos[IBUF_MAX_N_PAGES_MERGED];
	ulint		space_ids[IBUF_MAX_N_PAGES_MERGED];

	*n_pages = 0;

	ibuf_mtr_start(&mtr);

	/* Open a cursor to a randomly chosen leaf of the tree, at a random
	position within the leaf */
2528
	bool available;
2529

2530 2531 2532 2533
	available = btr_pcur_open_at_rnd_pos(ibuf->index, BTR_SEARCH_LEAF,
					     &pcur, &mtr);
	/* No one should make this index unavailable when server is running */
	ut_a(available);
2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554

	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));

	if (page_is_empty(btr_pcur_get_page(&pcur))) {
		/* If a B-tree page is empty, it must be the root page
		and the whole B-tree must be empty. InnoDB does not
		allow empty B-tree pages other than the root. */
		ut_ad(ibuf->empty);
		ut_ad(page_get_space_id(btr_pcur_get_page(&pcur))
		      == IBUF_SPACE_ID);
		ut_ad(page_get_page_no(btr_pcur_get_page(&pcur))
		      == FSP_IBUF_TREE_ROOT_PAGE_NO);

		ibuf_mtr_commit(&mtr);
		btr_pcur_close(&pcur);

		return(0);
	}

	sum_sizes = ibuf_get_merge_page_nos(TRUE,
					    btr_pcur_get_rec(&pcur), &mtr,
2555
					    space_ids,
2556
					    page_nos, n_pages);
2557 2558 2559 2560 2561 2562 2563 2564
#if 0 /* defined UNIV_IBUF_DEBUG */
	fprintf(stderr, "Ibuf contract sync %lu pages %lu volume %lu\n",
		sync, *n_pages, sum_sizes);
#endif
	ibuf_mtr_commit(&mtr);
	btr_pcur_close(&pcur);

	buf_read_ibuf_merge_pages(
2565
		sync, space_ids, page_nos, *n_pages);
2566 2567 2568 2569 2570

	return(sum_sizes + 1);
}

/*********************************************************************//**
Sergei Golubchik's avatar
Sergei Golubchik committed
2571 2572 2573
Contracts insert buffer trees by reading pages referring to space_id
to the buffer pool.
@returns number of pages merged.*/
2574 2575 2576
ulint
ibuf_merge_space(
/*=============*/
Sergei Golubchik's avatar
Sergei Golubchik committed
2577
	ulint		space)	/*!< in: tablespace id to merge */
2578 2579 2580 2581 2582
{
	mtr_t		mtr;
	btr_pcur_t	pcur;
	mem_heap_t*	heap = mem_heap_create(512);
	dtuple_t*	tuple = ibuf_search_tuple_build(space, 0, heap);
Sergei Golubchik's avatar
Sergei Golubchik committed
2583 2584 2585
	ulint		n_pages = 0;

	ut_ad(space < SRV_LOG_SPACE_FIRST_ID);
2586

2587 2588
	ut_ad(space < SRV_LOG_SPACE_FIRST_ID);

2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617
	ibuf_mtr_start(&mtr);

	/* Position the cursor on the first matching record. */

	btr_pcur_open(
		ibuf->index, tuple, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur,
		&mtr);

	mem_heap_free(heap);

	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));

	ulint		sum_sizes = 0;
	ulint		pages[IBUF_MAX_N_PAGES_MERGED];
	ulint		spaces[IBUF_MAX_N_PAGES_MERGED];

	if (page_is_empty(btr_pcur_get_page(&pcur))) {
		/* If a B-tree page is empty, it must be the root page
		and the whole B-tree must be empty. InnoDB does not
		allow empty B-tree pages other than the root. */
		ut_ad(ibuf->empty);
		ut_ad(page_get_space_id(btr_pcur_get_page(&pcur))
		      == IBUF_SPACE_ID);
		ut_ad(page_get_page_no(btr_pcur_get_page(&pcur))
		      == FSP_IBUF_TREE_ROOT_PAGE_NO);

	} else {

		sum_sizes = ibuf_get_merge_pages(
2618 2619 2620
			&pcur, space, IBUF_MAX_N_PAGES_MERGED,
			&pages[0], &spaces[0], &n_pages,
			&mtr);
2621
		ib::info() << "Size of pages merged " << sum_sizes;
2622 2623 2624 2625 2626 2627
	}

	ibuf_mtr_commit(&mtr);

	btr_pcur_close(&pcur);

Sergei Golubchik's avatar
Sergei Golubchik committed
2628 2629
	if (n_pages > 0) {
		ut_ad(n_pages <= UT_ARR_SIZE(pages));
2630

2631
#ifdef UNIV_DEBUG
Sergei Golubchik's avatar
Sergei Golubchik committed
2632
		for (ulint i = 0; i < n_pages; ++i) {
2633 2634 2635 2636 2637
			ut_ad(spaces[i] == space);
		}
#endif /* UNIV_DEBUG */

		buf_read_ibuf_merge_pages(
2638
			true, spaces, pages, n_pages);
2639 2640
	}

Sergei Golubchik's avatar
Sergei Golubchik committed
2641
	return(n_pages);
2642 2643
}

Sergei Golubchik's avatar
Sergei Golubchik committed
2644 2645 2646 2647
/** Contract the change buffer by reading pages to the buffer pool.
@param[out]	n_pages		number of pages merged
@param[in]	sync		whether the caller waits for
the issued reads to complete
2648 2649 2650
@return a lower limit for the combined size in bytes of entries which
will be merged from ibuf trees to the pages read, 0 if ibuf is
empty */
2651
static MY_ATTRIBUTE((warn_unused_result))
2652 2653
ulint
ibuf_merge(
2654
	ulint*		n_pages,
2655
	bool		sync)
2656 2657 2658 2659 2660 2661 2662 2663 2664 2665
{
	*n_pages = 0;

	/* We perform a dirty read of ibuf->empty, without latching
	the insert buffer root page. We trust this dirty read except
	when a slow shutdown is being executed. During a slow
	shutdown, the insert buffer merge must be completed. */

	if (ibuf->empty && !srv_shutdown_state) {
		return(0);
Sergei Golubchik's avatar
Sergei Golubchik committed
2666 2667 2668 2669
#if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
	} else if (ibuf_debug) {
		return(0);
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
2670
	} else {
2671
		return(ibuf_merge_pages(n_pages, sync));
2672 2673 2674
	}
}

Sergei Golubchik's avatar
Sergei Golubchik committed
2675 2676 2677
/** Contract the change buffer by reading pages to the buffer pool.
@param[in]	sync	whether the caller waits for
the issued reads to complete
2678
@return a lower limit for the combined size in bytes of entries which
Sergei Golubchik's avatar
Sergei Golubchik committed
2679
will be merged from ibuf trees to the pages read, 0 if ibuf is empty */
2680 2681 2682
static
ulint
ibuf_contract(
2683
	bool	sync)
2684 2685 2686
{
	ulint	n_pages;

Sergei Golubchik's avatar
Sergei Golubchik committed
2687
	return(ibuf_merge_pages(&n_pages, sync));
2688 2689
}

Sergei Golubchik's avatar
Sergei Golubchik committed
2690 2691 2692 2693
/** Contract the change buffer by reading pages to the buffer pool.
@param[in]	full		If true, do a full contraction based
on PCT_IO(100). If false, the size of contract batch is determined
based on the current size of the change buffer.
2694 2695 2696 2697
@return a lower limit for the combined size in bytes of entries which
will be merged from ibuf trees to the pages read, 0 if ibuf is
empty */
ulint
Sergei Golubchik's avatar
Sergei Golubchik committed
2698
ibuf_merge_in_background(
2699
	bool	full)
2700 2701 2702 2703 2704 2705 2706
{
	ulint	sum_bytes	= 0;
	ulint	sum_pages	= 0;
	ulint	n_pag2;
	ulint	n_pages;

#if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
2707
	if (srv_ibuf_disable_background_merge) {
2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732
		return(0);
	}
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */

	if (full) {
		/* Caller has requested a full batch */
		n_pages = PCT_IO(100);
	} else {
		/* By default we do a batch of 5% of the io_capacity */
		n_pages = PCT_IO(5);

		mutex_enter(&ibuf_mutex);

		/* If the ibuf->size is more than half the max_size
		then we make more agreesive contraction.
		+1 is to avoid division by zero. */
		if (ibuf->size > ibuf->max_size / 2) {
			ulint diff = ibuf->size - ibuf->max_size / 2;
			n_pages += PCT_IO((diff * 100)
					   / (ibuf->max_size + 1));
		}

		mutex_exit(&ibuf_mutex);
	}

2733 2734 2735 2736 2737 2738
#if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
	if (ibuf_debug) {
		return(0);
	}
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */

2739 2740 2741
	while (sum_pages < n_pages) {
		ulint	n_bytes;

2742
		n_bytes = ibuf_merge(&n_pag2, false);
2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797

		if (n_bytes == 0) {
			return(sum_bytes);
		}

		sum_bytes += n_bytes;
		sum_pages += n_pag2;
	}

	return(sum_bytes);
}

/*********************************************************************//**
Contract insert buffer trees after insert if they are too big. */
UNIV_INLINE
void
ibuf_contract_after_insert(
/*=======================*/
	ulint	entry_size)	/*!< in: size of a record which was inserted
				into an ibuf tree */
{
	ibool	sync;
	ulint	sum_sizes;
	ulint	size;
	ulint	max_size;

	/* Perform dirty reads of ibuf->size and ibuf->max_size, to
	reduce ibuf_mutex contention. ibuf->max_size remains constant
	after ibuf_init_at_db_start(), but ibuf->size should be
	protected by ibuf_mutex. Given that ibuf->size fits in a
	machine word, this should be OK; at worst we are doing some
	excessive ibuf_contract() or occasionally skipping a
	ibuf_contract(). */
	size = ibuf->size;
	max_size = ibuf->max_size;

	if (size < max_size + IBUF_CONTRACT_ON_INSERT_NON_SYNC) {
		return;
	}

	sync = (size >= max_size + IBUF_CONTRACT_ON_INSERT_SYNC);

	/* Contract at least entry_size many bytes */
	sum_sizes = 0;
	size = 1;

	do {

		size = ibuf_contract(sync);
		sum_sizes += size;
	} while (size > 0 && sum_sizes < entry_size);
}

/*********************************************************************//**
Determine if an insert buffer record has been encountered already.
2798
@return TRUE if a new record, FALSE if possible duplicate */
2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820
static
ibool
ibuf_get_volume_buffered_hash(
/*==========================*/
	const rec_t*	rec,	/*!< in: ibuf record in post-4.1 format */
	const byte*	types,	/*!< in: fields */
	const byte*	data,	/*!< in: start of user record data */
	ulint		comp,	/*!< in: 0=ROW_FORMAT=REDUNDANT,
				nonzero=ROW_FORMAT=COMPACT */
	ulint*		hash,	/*!< in/out: hash array */
	ulint		size)	/*!< in: number of elements in hash array */
{
	ulint		len;
	ulint		fold;
	ulint		bitmask;

	len = ibuf_rec_get_size(
		rec, types,
		rec_get_n_fields_old(rec) - IBUF_REC_FIELD_USER, comp);
	fold = ut_fold_binary(data, len);

	hash += (fold / (CHAR_BIT * sizeof *hash)) % size;
2821
	bitmask = static_cast<ulint>(1) << (fold % (CHAR_BIT * sizeof(*hash)));
2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839

	if (*hash & bitmask) {

		return(FALSE);
	}

	/* We have not seen this record yet.  Insert it. */
	*hash |= bitmask;

	return(TRUE);
}

#ifdef UNIV_DEBUG
# define ibuf_get_volume_buffered_count(mtr,rec,hash,size,n_recs)	\
	ibuf_get_volume_buffered_count_func(mtr,rec,hash,size,n_recs)
#else /* UNIV_DEBUG */
# define ibuf_get_volume_buffered_count(mtr,rec,hash,size,n_recs)	\
	ibuf_get_volume_buffered_count_func(rec,hash,size,n_recs)
2840 2841
#endif /* UNIV_DEBUG */

2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864
/*********************************************************************//**
Update the estimate of the number of records on a page, and
get the space taken by merging the buffered record to the index page.
@return size of index record in bytes + an upper limit of the space
taken in the page directory */
static
ulint
ibuf_get_volume_buffered_count_func(
/*================================*/
#ifdef UNIV_DEBUG
	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
#endif /* UNIV_DEBUG */
	const rec_t*	rec,	/*!< in: insert buffer record */
	ulint*		hash,	/*!< in/out: hash array */
	ulint		size,	/*!< in: number of elements in hash array */
	lint*		n_recs)	/*!< in/out: estimated number of records
				on the page that rec points to */
{
	ulint		len;
	ibuf_op_t	ibuf_op;
	const byte*	types;
	ulint		n_fields;

2865 2866
	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
					     | MTR_MEMO_PAGE_S_FIX));
2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051
	ut_ad(ibuf_inside(mtr));

	n_fields = rec_get_n_fields_old(rec);
	ut_ad(n_fields > IBUF_REC_FIELD_USER);
	n_fields -= IBUF_REC_FIELD_USER;

	rec_get_nth_field_offs_old(rec, 1, &len);
	/* This function is only invoked when buffering new
	operations.  All pre-4.1 records should have been merged
	when the database was started up. */
	ut_a(len == 1);

	if (rec_get_deleted_flag(rec, 0)) {
		/* This record has been merged already,
		but apparently the system crashed before
		the change was discarded from the buffer.
		Pretend that the record does not exist. */
		return(0);
	}

	types = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);

	switch (UNIV_EXPECT(len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE,
			    IBUF_REC_INFO_SIZE)) {
	default:
		ut_error;
	case 0:
		/* This ROW_TYPE=REDUNDANT record does not include an
		operation counter.  Exclude it from the *n_recs,
		because deletes cannot be buffered if there are
		old-style inserts buffered for the page. */

		len = ibuf_rec_get_size(rec, types, n_fields, 0);

		return(len
		       + rec_get_converted_extra_size(len, n_fields, 0)
		       + page_dir_calc_reserved_space(1));
	case 1:
		/* This ROW_TYPE=COMPACT record does not include an
		operation counter.  Exclude it from the *n_recs,
		because deletes cannot be buffered if there are
		old-style inserts buffered for the page. */
		goto get_volume_comp;

	case IBUF_REC_INFO_SIZE:
		ibuf_op = (ibuf_op_t) types[IBUF_REC_OFFSET_TYPE];
		break;
	}

	switch (ibuf_op) {
	case IBUF_OP_INSERT:
		/* Inserts can be done by updating a delete-marked record.
		Because delete-mark and insert operations can be pointing to
		the same records, we must not count duplicates. */
	case IBUF_OP_DELETE_MARK:
		/* There must be a record to delete-mark.
		See if this record has been already buffered. */
		if (n_recs && ibuf_get_volume_buffered_hash(
			    rec, types + IBUF_REC_INFO_SIZE,
			    types + len,
			    types[IBUF_REC_OFFSET_FLAGS] & IBUF_REC_COMPACT,
			    hash, size)) {
			(*n_recs)++;
		}

		if (ibuf_op == IBUF_OP_DELETE_MARK) {
			/* Setting the delete-mark flag does not
			affect the available space on the page. */
			return(0);
		}
		break;
	case IBUF_OP_DELETE:
		/* A record will be removed from the page. */
		if (n_recs) {
			(*n_recs)--;
		}
		/* While deleting a record actually frees up space,
		we have to play it safe and pretend that it takes no
		additional space (the record might not exist, etc.). */
		return(0);
	default:
		ut_error;
	}

	ut_ad(ibuf_op == IBUF_OP_INSERT);

get_volume_comp:
	{
		dtuple_t*	entry;
		ulint		volume;
		dict_index_t*	dummy_index;
		mem_heap_t*	heap = mem_heap_create(500);

		entry = ibuf_build_entry_from_ibuf_rec(
			mtr, rec, heap, &dummy_index);

		volume = rec_get_converted_size(dummy_index, entry, 0);

		ibuf_dummy_index_free(dummy_index);
		mem_heap_free(heap);

		return(volume + page_dir_calc_reserved_space(1));
	}
}

/*********************************************************************//**
Gets an upper limit for the combined size of entries buffered in the insert
buffer for a given page.
@return upper limit for the volume of buffered inserts for the index
page, in bytes; UNIV_PAGE_SIZE, if the entries for the index page span
several pages in the insert buffer */
static
ulint
ibuf_get_volume_buffered(
/*=====================*/
	const btr_pcur_t*pcur,	/*!< in: pcur positioned at a place in an
				insert buffer tree where we would insert an
				entry for the index page whose number is
				page_no, latch mode has to be BTR_MODIFY_PREV
				or BTR_MODIFY_TREE */
	ulint		space,	/*!< in: space id */
	ulint		page_no,/*!< in: page number of an index page */
	lint*		n_recs,	/*!< in/out: minimum number of records on the
				page after the buffered changes have been
				applied, or NULL to disable the counting */
	mtr_t*		mtr)	/*!< in: mini-transaction of pcur */
{
	ulint		volume;
	const rec_t*	rec;
	const page_t*	page;
	ulint		prev_page_no;
	const page_t*	prev_page;
	ulint		next_page_no;
	const page_t*	next_page;
	/* bitmap of buffered recs */
	ulint		hash_bitmap[128 / sizeof(ulint)];

	ut_ad((pcur->latch_mode == BTR_MODIFY_PREV)
	      || (pcur->latch_mode == BTR_MODIFY_TREE));

	/* Count the volume of inserts earlier in the alphabetical order than
	pcur */

	volume = 0;

	if (n_recs) {
		memset(hash_bitmap, 0, sizeof hash_bitmap);
	}

	rec = btr_pcur_get_rec(pcur);
	page = page_align(rec);
	ut_ad(page_validate(page, ibuf->index));

	if (page_rec_is_supremum(rec)) {
		rec = page_rec_get_prev_const(rec);
	}

	for (; !page_rec_is_infimum(rec);
	     rec = page_rec_get_prev_const(rec)) {
		ut_ad(page_align(rec) == page);

		if (page_no != ibuf_rec_get_page_no(mtr, rec)
		    || space != ibuf_rec_get_space(mtr, rec)) {

			goto count_later;
		}

		volume += ibuf_get_volume_buffered_count(
			mtr, rec,
			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
	}

	/* Look at the previous page */

	prev_page_no = btr_page_get_prev(page, mtr);

	if (prev_page_no == FIL_NULL) {

		goto count_later;
	}

	{
		buf_block_t*	block;

		block = buf_page_get(
3052 3053
			page_id_t(IBUF_SPACE_ID, prev_page_no),
			univ_page_size, RW_X_LATCH, mtr);
3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123

		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);

		prev_page = buf_block_get_frame(block);
		ut_ad(page_validate(prev_page, ibuf->index));
	}

#ifdef UNIV_BTR_DEBUG
	ut_a(btr_page_get_next(prev_page, mtr) == page_get_page_no(page));
#endif /* UNIV_BTR_DEBUG */

	rec = page_get_supremum_rec(prev_page);
	rec = page_rec_get_prev_const(rec);

	for (;; rec = page_rec_get_prev_const(rec)) {
		ut_ad(page_align(rec) == prev_page);

		if (page_rec_is_infimum(rec)) {

			/* We cannot go to yet a previous page, because we
			do not have the x-latch on it, and cannot acquire one
			because of the latching order: we have to give up */

			return(UNIV_PAGE_SIZE);
		}

		if (page_no != ibuf_rec_get_page_no(mtr, rec)
		    || space != ibuf_rec_get_space(mtr, rec)) {

			goto count_later;
		}

		volume += ibuf_get_volume_buffered_count(
			mtr, rec,
			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
	}

count_later:
	rec = btr_pcur_get_rec(pcur);

	if (!page_rec_is_supremum(rec)) {
		rec = page_rec_get_next_const(rec);
	}

	for (; !page_rec_is_supremum(rec);
	     rec = page_rec_get_next_const(rec)) {
		if (page_no != ibuf_rec_get_page_no(mtr, rec)
		    || space != ibuf_rec_get_space(mtr, rec)) {

			return(volume);
		}

		volume += ibuf_get_volume_buffered_count(
			mtr, rec,
			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
	}

	/* Look at the next page */

	next_page_no = btr_page_get_next(page, mtr);

	if (next_page_no == FIL_NULL) {

		return(volume);
	}

	{
		buf_block_t*	block;

		block = buf_page_get(
3124 3125
			page_id_t(IBUF_SPACE_ID, next_page_no),
			univ_page_size, RW_X_LATCH, mtr);
3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237

		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);

		next_page = buf_block_get_frame(block);
		ut_ad(page_validate(next_page, ibuf->index));
	}

#ifdef UNIV_BTR_DEBUG
	ut_a(btr_page_get_prev(next_page, mtr) == page_get_page_no(page));
#endif /* UNIV_BTR_DEBUG */

	rec = page_get_infimum_rec(next_page);
	rec = page_rec_get_next_const(rec);

	for (;; rec = page_rec_get_next_const(rec)) {
		ut_ad(page_align(rec) == next_page);

		if (page_rec_is_supremum(rec)) {

			/* We give up */

			return(UNIV_PAGE_SIZE);
		}

		if (page_no != ibuf_rec_get_page_no(mtr, rec)
		    || space != ibuf_rec_get_space(mtr, rec)) {

			return(volume);
		}

		volume += ibuf_get_volume_buffered_count(
			mtr, rec,
			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
	}
}

/*********************************************************************//**
Reads the biggest tablespace id from the high end of the insert buffer
tree and updates the counter in fil_system. */
void
ibuf_update_max_tablespace_id(void)
/*===============================*/
{
	ulint		max_space_id;
	const rec_t*	rec;
	const byte*	field;
	ulint		len;
	btr_pcur_t	pcur;
	mtr_t		mtr;

	ut_a(!dict_table_is_comp(ibuf->index->table));

	ibuf_mtr_start(&mtr);

	btr_pcur_open_at_index_side(
		false, ibuf->index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);

	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));

	btr_pcur_move_to_prev(&pcur, &mtr);

	if (btr_pcur_is_before_first_on_page(&pcur)) {
		/* The tree is empty */

		max_space_id = 0;
	} else {
		rec = btr_pcur_get_rec(&pcur);

		field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);

		ut_a(len == 4);

		max_space_id = mach_read_from_4(field);
	}

	ibuf_mtr_commit(&mtr);

	/* printf("Maximum space id in insert buffer %lu\n", max_space_id); */

	fil_set_max_space_id_if_bigger(max_space_id);
}

#ifdef UNIV_DEBUG
# define ibuf_get_entry_counter_low(mtr,rec,space,page_no)	\
	ibuf_get_entry_counter_low_func(mtr,rec,space,page_no)
#else /* UNIV_DEBUG */
# define ibuf_get_entry_counter_low(mtr,rec,space,page_no)	\
	ibuf_get_entry_counter_low_func(rec,space,page_no)
#endif
/****************************************************************//**
Helper function for ibuf_get_entry_counter_func. Checks if rec is for
(space, page_no), and if so, reads counter value from it and returns
that + 1.
@retval ULINT_UNDEFINED if the record does not contain any counter
@retval 0 if the record is not for (space, page_no)
@retval 1 + previous counter value, otherwise */
static
ulint
ibuf_get_entry_counter_low_func(
/*============================*/
#ifdef UNIV_DEBUG
	mtr_t*		mtr,		/*!< in: mini-transaction of rec */
#endif /* UNIV_DEBUG */
	const rec_t*	rec,		/*!< in: insert buffer record */
	ulint		space,		/*!< in: space id */
	ulint		page_no)	/*!< in: page number */
{
	ulint		counter;
	const byte*	field;
	ulint		len;

	ut_ad(ibuf_inside(mtr));
3238 3239
	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
					     | MTR_MEMO_PAGE_S_FIX));
3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287
	ut_ad(rec_get_n_fields_old(rec) > 2);

	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);

	ut_a(len == 1);

	/* Check the tablespace identifier. */
	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);

	ut_a(len == 4);

	if (mach_read_from_4(field) != space) {

		return(0);
	}

	/* Check the page offset. */
	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_PAGE, &len);
	ut_a(len == 4);

	if (mach_read_from_4(field) != page_no) {

		return(0);
	}

	/* Check if the record contains a counter field. */
	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);

	switch (len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE) {
	default:
		ut_error;
	case 0: /* ROW_FORMAT=REDUNDANT */
	case 1: /* ROW_FORMAT=COMPACT */
		return(ULINT_UNDEFINED);

	case IBUF_REC_INFO_SIZE:
		counter = mach_read_from_2(field + IBUF_REC_OFFSET_COUNTER);
		ut_a(counter < 0xFFFF);
		return(counter + 1);
	}
}

#ifdef UNIV_DEBUG
# define ibuf_get_entry_counter(space,page_no,rec,mtr,exact_leaf) \
	ibuf_get_entry_counter_func(space,page_no,rec,mtr,exact_leaf)
#else /* UNIV_DEBUG */
# define ibuf_get_entry_counter(space,page_no,rec,mtr,exact_leaf) \
	ibuf_get_entry_counter_func(space,page_no,rec,exact_leaf)
3288
#endif /* UNIV_DEBUG */
3289 3290 3291 3292

/****************************************************************//**
Calculate the counter field for an entry based on the current
last record in ibuf for (space, page_no).
3293
@return the counter field, or ULINT_UNDEFINED
3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338
if we should abort this insertion to ibuf */
static
ulint
ibuf_get_entry_counter_func(
/*========================*/
	ulint		space,		/*!< in: space id of entry */
	ulint		page_no,	/*!< in: page number of entry */
	const rec_t*	rec,		/*!< in: the record preceding the
					insertion point */
#ifdef UNIV_DEBUG
	mtr_t*		mtr,		/*!< in: mini-transaction */
#endif /* UNIV_DEBUG */
	ibool		only_leaf)	/*!< in: TRUE if this is the only
					leaf page that can contain entries
					for (space,page_no), that is, there
					was no exact match for (space,page_no)
					in the node pointer */
{
	ut_ad(ibuf_inside(mtr));
	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX));
	ut_ad(page_validate(page_align(rec), ibuf->index));

	if (page_rec_is_supremum(rec)) {
		/* This is just for safety. The record should be a
		page infimum or a user record. */
		ut_ad(0);
		return(ULINT_UNDEFINED);
	} else if (!page_rec_is_infimum(rec)) {
		return(ibuf_get_entry_counter_low(mtr, rec, space, page_no));
	} else if (only_leaf
		   || fil_page_get_prev(page_align(rec)) == FIL_NULL) {
		/* The parent node pointer did not contain the
		searched for (space, page_no), which means that the
		search ended on the correct page regardless of the
		counter value, and since we're at the infimum record,
		there are no existing records. */
		return(0);
	} else {
		/* We used to read the previous page here. It would
		break the latching order, because the caller has
		buffer-fixed an insert buffer bitmap page. */
		return(ULINT_UNDEFINED);
	}
}

3339
/** Buffer an operation in the insert/delete buffer, instead of doing it
3340
directly to the disk page, if this is possible.
3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352
@param[in]	mode		BTR_MODIFY_PREV or BTR_MODIFY_TREE
@param[in]	op		operation type
@param[in]	no_counter	TRUE=use 5.0.3 format; FALSE=allow delete
buffering
@param[in]	entry		index entry to insert
@param[in]	entry_size	rec_get_converted_size(index, entry)
@param[in,out]	index		index where to insert; must not be unique
or clustered
@param[in]	page_id		page id where to insert
@param[in]	page_size	page size
@param[in,out]	thr		query thread
@return DB_SUCCESS, DB_STRONG_FAIL or other error */
3353
static MY_ATTRIBUTE((warn_unused_result))
3354 3355
dberr_t
ibuf_insert_low(
3356 3357 3358 3359 3360 3361 3362 3363 3364
	ulint			mode,
	ibuf_op_t		op,
	ibool			no_counter,
	const dtuple_t*		entry,
	ulint			entry_size,
	dict_index_t*		index,
	const page_id_t&	page_id,
	const page_size_t&	page_size,
	que_thr_t*		thr)
3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388
{
	big_rec_t*	dummy_big_rec;
	btr_pcur_t	pcur;
	btr_cur_t*	cursor;
	dtuple_t*	ibuf_entry;
	mem_heap_t*	offsets_heap	= NULL;
	mem_heap_t*	heap;
	ulint*		offsets		= NULL;
	ulint		buffered;
	lint		min_n_recs;
	rec_t*		ins_rec;
	ibool		old_bit_value;
	page_t*		bitmap_page;
	buf_block_t*	block;
	page_t*		root;
	dberr_t		err;
	ibool		do_merge;
	ulint		space_ids[IBUF_MAX_N_PAGES_MERGED];
	ulint		page_nos[IBUF_MAX_N_PAGES_MERGED];
	ulint		n_stored;
	mtr_t		mtr;
	mtr_t		bitmap_mtr;

	ut_a(!dict_index_is_clust(index));
3389
	ut_ad(!dict_index_is_spatial(index));
3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412
	ut_ad(dtuple_check_typed(entry));
	ut_ad(!no_counter || op == IBUF_OP_INSERT);
	ut_a(op < IBUF_OP_COUNT);

	do_merge = FALSE;

	/* Perform dirty reads of ibuf->size and ibuf->max_size, to
	reduce ibuf_mutex contention. Given that ibuf->max_size and
	ibuf->size fit in a machine word, this should be OK; at worst
	we are doing some excessive ibuf_contract() or occasionally
	skipping an ibuf_contract(). */
	if (ibuf->max_size == 0) {
		return(DB_STRONG_FAIL);
	}

	if (ibuf->size >= ibuf->max_size + IBUF_CONTRACT_DO_NOT_INSERT) {
		/* Insert buffer is now too big, contract it but do not try
		to insert */


#ifdef UNIV_IBUF_DEBUG
		fputs("Ibuf too big\n", stderr);
#endif
Sergei Golubchik's avatar
Sergei Golubchik committed
3413
		ibuf_contract(true);
3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428

		return(DB_STRONG_FAIL);
	}

	heap = mem_heap_create(1024);

	/* Build the entry which contains the space id and the page number
	as the first fields and the type information for other fields, and
	which will be inserted to the insert buffer. Using a counter value
	of 0xFFFF we find the last record for (space, page_no), from which
	we can then read the counter value N and use N + 1 in the record we
	insert. (We patch the ibuf_entry's counter field to the correct
	value just before actually inserting the entry.) */

	ibuf_entry = ibuf_entry_build(
3429
		op, index, entry, page_id.space(), page_id.page_no(),
3430 3431 3432 3433 3434 3435
		no_counter ? ULINT_UNDEFINED : 0xFFFF, heap);

	/* Open a cursor to the insert buffer tree to calculate if we can add
	the new entry to it without exceeding the free space limit for the
	page. */

3436
	if (BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE) {
3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448
		for (;;) {
			mutex_enter(&ibuf_pessimistic_insert_mutex);
			mutex_enter(&ibuf_mutex);

			if (UNIV_LIKELY(ibuf_data_enough_free_for_insert())) {

				break;
			}

			mutex_exit(&ibuf_mutex);
			mutex_exit(&ibuf_pessimistic_insert_mutex);

3449
			if (!ibuf_add_free_page()) {
3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464

				mem_heap_free(heap);
				return(DB_STRONG_FAIL);
			}
		}
	}

	ibuf_mtr_start(&mtr);

	btr_pcur_open(ibuf->index, ibuf_entry, PAGE_CUR_LE, mode, &pcur, &mtr);
	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));

	/* Find out the volume of already buffered inserts for the same index
	page */
	min_n_recs = 0;
3465 3466 3467
	buffered = ibuf_get_volume_buffered(&pcur,
					    page_id.space(),
					    page_id.page_no(),
3468 3469 3470 3471 3472
					    op == IBUF_OP_DELETE
					    ? &min_n_recs
					    : NULL, &mtr);

	if (op == IBUF_OP_DELETE
3473
	    && (min_n_recs < 2 || buf_pool_watch_occurred(page_id))) {
3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491
		/* The page could become empty after the record is
		deleted, or the page has been read in to the buffer
		pool.  Refuse to buffer the operation. */

		/* The buffer pool watch is needed for IBUF_OP_DELETE
		because of latching order considerations.  We can
		check buf_pool_watch_occurred() only after latching
		the insert buffer B-tree pages that contain buffered
		changes for the page.  We never buffer IBUF_OP_DELETE,
		unless some IBUF_OP_INSERT or IBUF_OP_DELETE_MARK have
		been previously buffered for the page.  Because there
		are buffered operations for the page, the insert
		buffer B-tree page latches held by mtr will guarantee
		that no changes for the user page will be merged
		before mtr_commit(&mtr).  We must not mtr_commit(&mtr)
		until after the IBUF_OP_DELETE has been buffered. */

fail_exit:
3492
		if (BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE) {
3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510
			mutex_exit(&ibuf_mutex);
			mutex_exit(&ibuf_pessimistic_insert_mutex);
		}

		err = DB_STRONG_FAIL;
		goto func_exit;
	}

	/* After this point, the page could still be loaded to the
	buffer pool, but we do not have to care about it, since we are
	holding a latch on the insert buffer leaf page that contains
	buffered changes for (space, page_no).  If the page enters the
	buffer pool, buf_page_io_complete() for (space, page_no) will
	have to acquire a latch on the same insert buffer leaf page,
	which it cannot do until we have buffered the IBUF_OP_DELETE
	and done mtr_commit(&mtr) to release the latch. */

#ifdef UNIV_IBUF_COUNT_DEBUG
3511
	ut_a((buffered == 0) || ibuf_count_get(page_id));
3512 3513
#endif
	ibuf_mtr_start(&bitmap_mtr);
3514
	bitmap_mtr.set_named_space(page_id.space());
3515

3516 3517
	bitmap_page = ibuf_bitmap_get_map_page(page_id, page_size,
					       &bitmap_mtr);
3518 3519 3520

	/* We check if the index page is suitable for buffered entries */

3521 3522 3523
	if (buf_page_peek(page_id)
	    || lock_rec_expl_exist_on_page(page_id.space(),
					   page_id.page_no())) {
3524 3525 3526 3527 3528 3529 3530

		ibuf_mtr_commit(&bitmap_mtr);
		goto fail_exit;
	}

	if (op == IBUF_OP_INSERT) {
		ulint	bits = ibuf_bitmap_page_get_bits(
3531
			bitmap_page, page_id, page_size, IBUF_BITMAP_FREE,
3532 3533 3534
			&bitmap_mtr);

		if (buffered + entry_size + page_dir_calc_reserved_space(1)
3535
		    > ibuf_index_page_calc_free_from_bits(page_size, bits)) {
3536 3537 3538 3539 3540 3541 3542 3543
			/* Release the bitmap page latch early. */
			ibuf_mtr_commit(&bitmap_mtr);

			/* It may not fit */
			do_merge = TRUE;

			ibuf_get_merge_page_nos(FALSE,
						btr_pcur_get_rec(&pcur), &mtr,
3544
						space_ids,
3545
						page_nos, &n_stored);
3546 3547 3548 3549 3550 3551 3552 3553 3554 3555

			goto fail_exit;
		}
	}

	if (!no_counter) {
		/* Patch correct counter value to the entry to
		insert. This can change the insert position, which can
		result in the need to abort in some cases. */
		ulint		counter = ibuf_get_entry_counter(
3556 3557
			page_id.space(), page_id.page_no(),
			btr_pcur_get_rec(&pcur), &mtr,
3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577
			btr_pcur_get_btr_cur(&pcur)->low_match
			< IBUF_REC_FIELD_METADATA);
		dfield_t*	field;

		if (counter == ULINT_UNDEFINED) {
			ibuf_mtr_commit(&bitmap_mtr);
			goto fail_exit;
		}

		field = dtuple_get_nth_field(
			ibuf_entry, IBUF_REC_FIELD_METADATA);
		mach_write_to_2(
			(byte*) dfield_get_data(field)
			+ IBUF_REC_OFFSET_COUNTER, counter);
	}

	/* Set the bitmap bit denoting that the insert buffer contains
	buffered entries for this index page, if the bit is not set yet */

	old_bit_value = ibuf_bitmap_page_get_bits(
3578
		bitmap_page, page_id, page_size,
3579 3580 3581
		IBUF_BITMAP_BUFFERED, &bitmap_mtr);

	if (!old_bit_value) {
3582
		ibuf_bitmap_page_set_bits(bitmap_page, page_id, page_size,
3583 3584 3585 3586 3587 3588 3589 3590 3591 3592
					  IBUF_BITMAP_BUFFERED, TRUE,
					  &bitmap_mtr);
	}

	ibuf_mtr_commit(&bitmap_mtr);

	cursor = btr_pcur_get_btr_cur(&pcur);

	if (mode == BTR_MODIFY_PREV) {
		err = btr_cur_optimistic_insert(
3593
			BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG,
3594 3595 3596 3597
			cursor, &offsets, &offsets_heap,
			ibuf_entry, &ins_rec,
			&dummy_big_rec, 0, thr, &mtr);
		block = btr_cur_get_block(cursor);
3598
		ut_ad(block->page.id.space() == IBUF_SPACE_ID);
3599 3600

		/* If this is the root page, update ibuf->empty. */
3601
		if (block->page.id.page_no() == FSP_IBUF_TREE_ROOT_PAGE_NO) {
3602 3603 3604 3605 3606 3607 3608 3609 3610
			const page_t*	root = buf_block_get_frame(block);

			ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
			ut_ad(page_get_page_no(root)
			      == FSP_IBUF_TREE_ROOT_PAGE_NO);

			ibuf->empty = page_is_empty(root);
		}
	} else {
3611 3612
		ut_ad(BTR_LATCH_MODE_WITHOUT_INTENTION(mode)
		      == BTR_MODIFY_TREE);
3613

3614
		/* We acquire an sx-latch to the root page before the insert,
3615
		because a pessimistic insert releases the tree x-latch,
3616
		which would cause the sx-latching of the root after that to
3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635
		break the latching order. */

		root = ibuf_tree_root_get(&mtr);

		err = btr_cur_optimistic_insert(
			BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG,
			cursor, &offsets, &offsets_heap,
			ibuf_entry, &ins_rec,
			&dummy_big_rec, 0, thr, &mtr);

		if (err == DB_FAIL) {
			err = btr_cur_pessimistic_insert(
				BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG,
				cursor, &offsets, &offsets_heap,
				ibuf_entry, &ins_rec,
				&dummy_big_rec, 0, thr, &mtr);
		}

		mutex_exit(&ibuf_pessimistic_insert_mutex);
3636
		ibuf_size_update(root);
3637 3638 3639 3640
		mutex_exit(&ibuf_mutex);
		ibuf->empty = page_is_empty(root);

		block = btr_cur_get_block(cursor);
3641
		ut_ad(block->page.id.space() == IBUF_SPACE_ID);
3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657
	}

	if (offsets_heap) {
		mem_heap_free(offsets_heap);
	}

	if (err == DB_SUCCESS && op != IBUF_OP_DELETE) {
		/* Update the page max trx id field */
		page_update_max_trx_id(block, NULL,
				       thr_get_trx(thr)->id, &mtr);
	}

func_exit:
#ifdef UNIV_IBUF_COUNT_DEBUG
	if (err == DB_SUCCESS) {

3658 3659 3660 3661 3662
		ib::info() << "Incrementing ibuf count of page " << page_id
			<< " from " << ibuf_count_get(space, page_no)
			<< " by 1";

		ibuf_count_set(page_id, ibuf_count_get(page_id) + 1);
3663 3664 3665 3666 3667 3668 3669 3670
	}
#endif

	ibuf_mtr_commit(&mtr);
	btr_pcur_close(&pcur);

	mem_heap_free(heap);

3671 3672
	if (err == DB_SUCCESS
	    && BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE) {
3673 3674 3675 3676 3677 3678 3679
		ibuf_contract_after_insert(entry_size);
	}

	if (do_merge) {
#ifdef UNIV_IBUF_DEBUG
		ut_a(n_stored <= IBUF_MAX_N_PAGES_MERGED);
#endif
3680
		buf_read_ibuf_merge_pages(false, space_ids,
3681 3682 3683 3684 3685 3686
					  page_nos, n_stored);
	}

	return(err);
}

3687
/** Buffer an operation in the insert/delete buffer, instead of doing it
3688 3689
directly to the disk page, if this is possible. Does not do it if the index
is clustered or unique.
3690 3691 3692 3693 3694 3695 3696
@param[in]	op		operation type
@param[in]	entry		index entry to insert
@param[in,out]	index		index where to insert
@param[in]	page_id		page id where to insert
@param[in]	page_size	page size
@param[in,out]	thr		query thread
@return TRUE if success */
3697 3698
ibool
ibuf_insert(
3699 3700 3701 3702 3703 3704
	ibuf_op_t		op,
	const dtuple_t*		entry,
	dict_index_t*		index,
	const page_id_t&	page_id,
	const page_size_t&	page_size,
	que_thr_t*		thr)
3705 3706 3707 3708 3709 3710 3711 3712 3713
{
	dberr_t		err;
	ulint		entry_size;
	ibool		no_counter;
	/* Read the settable global variable ibuf_use only once in
	this function, so that we will have a consistent view of it. */
	ibuf_use_t	use		= ibuf_use;
	DBUG_ENTER("ibuf_insert");

3714 3715
	DBUG_PRINT("ibuf", ("op: %d, space: " UINT32PF ", page_no: " UINT32PF,
			    op, page_id.space(), page_id.page_no()));
3716 3717

	ut_ad(dtuple_check_typed(entry));
3718
	ut_ad(page_id.space() != SRV_TMP_SPACE_ID);
3719 3720

	ut_a(!dict_index_is_clust(index));
3721
	ut_ad(!dict_table_is_temporary(index->table));
3722 3723 3724 3725 3726 3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787 3788 3789 3790

	no_counter = use <= IBUF_USE_INSERT;

	switch (op) {
	case IBUF_OP_INSERT:
		switch (use) {
		case IBUF_USE_NONE:
		case IBUF_USE_DELETE:
		case IBUF_USE_DELETE_MARK:
			DBUG_RETURN(FALSE);
		case IBUF_USE_INSERT:
		case IBUF_USE_INSERT_DELETE_MARK:
		case IBUF_USE_ALL:
			goto check_watch;
		case IBUF_USE_COUNT:
			break;
		}
		break;
	case IBUF_OP_DELETE_MARK:
		switch (use) {
		case IBUF_USE_NONE:
		case IBUF_USE_INSERT:
			DBUG_RETURN(FALSE);
		case IBUF_USE_DELETE_MARK:
		case IBUF_USE_DELETE:
		case IBUF_USE_INSERT_DELETE_MARK:
		case IBUF_USE_ALL:
			ut_ad(!no_counter);
			goto check_watch;
		case IBUF_USE_COUNT:
			break;
		}
		break;
	case IBUF_OP_DELETE:
		switch (use) {
		case IBUF_USE_NONE:
		case IBUF_USE_INSERT:
		case IBUF_USE_INSERT_DELETE_MARK:
			DBUG_RETURN(FALSE);
		case IBUF_USE_DELETE_MARK:
		case IBUF_USE_DELETE:
		case IBUF_USE_ALL:
			ut_ad(!no_counter);
			goto skip_watch;
		case IBUF_USE_COUNT:
			break;
		}
		break;
	case IBUF_OP_COUNT:
		break;
	}

	/* unknown op or use */
	ut_error;

check_watch:
	/* If a thread attempts to buffer an insert on a page while a
	purge is in progress on the same page, the purge must not be
	buffered, because it could remove a record that was
	re-inserted later.  For simplicity, we block the buffering of
	all operations on a page that has a purge pending.

	We do not check this in the IBUF_OP_DELETE case, because that
	would always trigger the buffer pool watch during purge and
	thus prevent the buffering of delete operations.  We assume
	that the issuer of IBUF_OP_DELETE has called
	buf_pool_watch_set(space, page_no). */

	{
3791 3792 3793
		buf_pool_t*	buf_pool = buf_pool_get(page_id);
		buf_page_t*	bpage
			= buf_page_get_also_watch(buf_pool, page_id);
3794

3795
		if (bpage != NULL) {
3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817
			/* A buffer pool watch has been set or the
			page has been read into the buffer pool.
			Do not buffer the request.  If a purge operation
			is being buffered, have this request executed
			directly on the page in the buffer pool after the
			buffered entries for this page have been merged. */
			DBUG_RETURN(FALSE);
		}
	}

skip_watch:
	entry_size = rec_get_converted_size(index, entry, 0);

	if (entry_size
	    >= page_get_free_space_of_empty(dict_table_is_comp(index->table))
	    / 2) {

		DBUG_RETURN(FALSE);
	}

	err = ibuf_insert_low(BTR_MODIFY_PREV, op, no_counter,
			      entry, entry_size,
3818
			      index, page_id, page_size, thr);
3819
	if (err == DB_FAIL) {
3820 3821 3822
		err = ibuf_insert_low(BTR_MODIFY_TREE | BTR_LATCH_FOR_INSERT,
				      op, no_counter, entry, entry_size,
				      index, page_id, page_size, thr);
3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840
	}

	if (err == DB_SUCCESS) {
#ifdef UNIV_IBUF_DEBUG
		/* fprintf(stderr, "Ibuf insert for page no %lu of index %s\n",
		page_no, index->name); */
#endif
		DBUG_RETURN(TRUE);

	} else {
		ut_a(err == DB_STRONG_FAIL || err == DB_TOO_BIG_RECORD);

		DBUG_RETURN(FALSE);
	}
}

/********************************************************************//**
During merge, inserts to an index page a secondary index entry extracted
Sergei Golubchik's avatar
Sergei Golubchik committed
3841
from the insert buffer.
3842
@return	newly inserted record */
Sergei Golubchik's avatar
Sergei Golubchik committed
3843
static MY_ATTRIBUTE((nonnull))
3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888
rec_t*
ibuf_insert_to_index_page_low(
/*==========================*/
	const dtuple_t*	entry,	/*!< in: buffered entry to insert */
	buf_block_t*	block,	/*!< in/out: index page where the buffered
				entry should be placed */
	dict_index_t*	index,	/*!< in: record descriptor */
	ulint**		offsets,/*!< out: offsets on *rec */
	mem_heap_t*	heap,	/*!< in/out: memory heap */
	mtr_t*		mtr,	/*!< in/out: mtr */
	page_cur_t*	page_cur)/*!< in/out: cursor positioned on the record
				after which to insert the buffered entry */
{
	const page_t*	page;
	const page_t*	bitmap_page;
	ulint		old_bits;
	rec_t*		rec;
	DBUG_ENTER("ibuf_insert_to_index_page_low");

	rec = page_cur_tuple_insert(page_cur, entry, index,
				    offsets, &heap, 0, mtr);
	if (rec != NULL) {
		DBUG_RETURN(rec);
	}

	/* Page reorganization or recompression should already have
	been attempted by page_cur_tuple_insert(). Besides, per
	ibuf_index_page_calc_free_zip() the page should not have been
	recompressed or reorganized. */
	ut_ad(!buf_block_get_page_zip(block));

	/* If the record did not fit, reorganize */

	btr_page_reorganize(page_cur, index, mtr);

	/* This time the record must fit */

	rec = page_cur_tuple_insert(page_cur, entry, index,
				    offsets, &heap, 0, mtr);
	if (rec != NULL) {
		DBUG_RETURN(rec);
	}

	page = buf_block_get_frame(block);

3889 3890 3891
	ib::error() << "Insert buffer insert fails; page free "
		<< page_get_max_insert_size(page, 1) << ", dtuple size "
		<< rec_get_converted_size(index, entry, 0);
3892 3893 3894 3895 3896 3897 3898

	fputs("InnoDB: Cannot insert index record ", stderr);
	dtuple_print(stderr, entry);
	fputs("\nInnoDB: The table where this index record belongs\n"
	      "InnoDB: is now probably corrupt. Please run CHECK TABLE on\n"
	      "InnoDB: that table.\n", stderr);

3899 3900 3901 3902 3903
	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
					       block->page.size, mtr);
	old_bits = ibuf_bitmap_page_get_bits(
		bitmap_page, block->page.id, block->page.size,
		IBUF_BITMAP_FREE, mtr);
3904

3905 3906
	ib::error() << "page " << block->page.id << ", size "
		<< block->page.size.physical() << ", bitmap bits " << old_bits;
3907

3908
	ib::error() << BUG_REPORT_MSG;
3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935

	ut_ad(0);
	DBUG_RETURN(NULL);
}

/************************************************************************
During merge, inserts to an index page a secondary index entry extracted
from the insert buffer. */
static
void
ibuf_insert_to_index_page(
/*======================*/
	const dtuple_t*	entry,	/*!< in: buffered entry to insert */
	buf_block_t*	block,	/*!< in/out: index page where the buffered entry
				should be placed */
	dict_index_t*	index,	/*!< in: record descriptor */
	mtr_t*		mtr)	/*!< in: mtr */
{
	page_cur_t	page_cur;
	ulint		low_match;
	page_t*		page		= buf_block_get_frame(block);
	rec_t*		rec;
	ulint*		offsets;
	mem_heap_t*	heap;

	DBUG_ENTER("ibuf_insert_to_index_page");

3936 3937 3938
	DBUG_PRINT("ibuf", ("page " UINT32PF ":" UINT32PF,
			    block->page.id.space(),
			    block->page.id.page_no()));
3939

3940
	ut_ad(!dict_index_is_online_ddl(index));// this is an ibuf_dummy index
3941 3942
	ut_ad(ibuf_inside(mtr));
	ut_ad(dtuple_check_typed(entry));
3943
#ifdef BTR_CUR_HASH_ADAPT
3944 3945 3946
	/* A change buffer merge must occur before users are granted
	any access to the page. No adaptive hash index entries may
	point to a freshly read page. */
3947
	ut_ad(!block->index);
3948
	assert_block_ahi_empty(block);
3949
#endif /* BTR_CUR_HASH_ADAPT */
3950
	ut_ad(mtr->is_named_space(block->page.id.space()));
3951 3952 3953

	if (UNIV_UNLIKELY(dict_table_is_comp(index->table)
			  != (ibool)!!page_is_comp(page))) {
3954 3955 3956
		ib::warn() << "Trying to insert a record from the insert"
			" buffer to an index page but the 'compact' flag does"
			" not match!";
3957 3958 3959 3960 3961 3962
		goto dump;
	}

	rec = page_rec_get_next(page_get_infimum_rec(page));

	if (page_rec_is_supremum(rec)) {
3963 3964 3965
		ib::warn() << "Trying to insert a record from the insert"
			" buffer to an index page but the index page"
			" is empty!";
3966 3967 3968
		goto dump;
	}

3969 3970 3971 3972 3973
	if (!rec_n_fields_is_sane(index, rec, entry)) {
		ib::warn() << "Trying to insert a record from the insert"
			" buffer to an index page but the number of fields"
			" does not match!";
		rec_print(stderr, rec, index);
3974 3975 3976 3977
dump:
		dtuple_print(stderr, entry);
		ut_ad(0);

3978 3979 3980
		ib::warn() << "The table where this index record belongs"
			" is now probably corrupt. Please run CHECK TABLE on"
			" your tables. " << BUG_REPORT_MSG;
3981 3982 3983 3984

		DBUG_VOID_RETURN;
	}

3985
	low_match = page_cur_search(block, index, entry, &page_cur);
3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034 4035

	heap = mem_heap_create(
		sizeof(upd_t)
		+ REC_OFFS_HEADER_SIZE * sizeof(*offsets)
		+ dtuple_get_n_fields(entry)
		* (sizeof(upd_field_t) + sizeof *offsets));

	if (UNIV_UNLIKELY(low_match == dtuple_get_n_fields(entry))) {
		upd_t*		update;
		page_zip_des_t*	page_zip;

		rec = page_cur_get_rec(&page_cur);

		/* This is based on
		row_ins_sec_index_entry_by_modify(BTR_MODIFY_LEAF). */
		ut_ad(rec_get_deleted_flag(rec, page_is_comp(page)));

		offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED,
					  &heap);
		update = row_upd_build_sec_rec_difference_binary(
			rec, index, offsets, entry, heap);

		page_zip = buf_block_get_page_zip(block);

		if (update->n_fields == 0) {
			/* The records only differ in the delete-mark.
			Clear the delete-mark, like we did before
			Bug #56680 was fixed. */
			btr_cur_set_deleted_flag_for_ibuf(
				rec, page_zip, FALSE, mtr);
			goto updated_in_place;
		}

		/* Copy the info bits. Clear the delete-mark. */
		update->info_bits = rec_get_info_bits(rec, page_is_comp(page));
		update->info_bits &= ~REC_INFO_DELETED_FLAG;

		/* We cannot invoke btr_cur_optimistic_update() here,
		because we do not have a btr_cur_t or que_thr_t,
		as the insert buffer merge occurs at a very low level. */
		if (!row_upd_changes_field_size_or_external(index, offsets,
							    update)
		    && (!page_zip || btr_cur_update_alloc_zip(
				page_zip, &page_cur, index, offsets,
				rec_offs_size(offsets), false, mtr))) {
			/* This is the easy case. Do something similar
			to btr_cur_update_in_place(). */
			rec = page_cur_get_rec(&page_cur);
			row_upd_rec_in_place(rec, index, offsets,
					     update, page_zip);
Sergei Golubchik's avatar
Sergei Golubchik committed
4036 4037 4038 4039 4040 4041 4042

			/* Log the update in place operation. During recovery
			MLOG_COMP_REC_UPDATE_IN_PLACE/MLOG_REC_UPDATE_IN_PLACE
			expects trx_id, roll_ptr for secondary indexes. So we
			just write dummy trx_id(0), roll_ptr(0) */
			btr_cur_update_in_place_log(BTR_KEEP_SYS_FLAG, rec,
						    index, update, 0, 0, mtr);
4043

Sergei Golubchik's avatar
Sergei Golubchik committed
4044 4045 4046
			DBUG_EXECUTE_IF(
				"crash_after_log_ibuf_upd_inplace",
				log_buffer_flush_to_disk();
4047 4048
				ib::info() << "Wrote log record for ibuf"
					" update in place operation";
Sergei Golubchik's avatar
Sergei Golubchik committed
4049 4050 4051
				DBUG_SUICIDE();
			);

4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115
			goto updated_in_place;
		}

		/* btr_cur_update_alloc_zip() may have changed this */
		rec = page_cur_get_rec(&page_cur);

		/* A collation may identify values that differ in
		storage length.
		Some examples (1 or 2 bytes):
		utf8_turkish_ci: I = U+0131 LATIN SMALL LETTER DOTLESS I
		utf8_general_ci: S = U+00DF LATIN SMALL LETTER SHARP S
		utf8_general_ci: A = U+00E4 LATIN SMALL LETTER A WITH DIAERESIS

		latin1_german2_ci: SS = U+00DF LATIN SMALL LETTER SHARP S

		Examples of a character (3-byte UTF-8 sequence)
		identified with 2 or 4 characters (1-byte UTF-8 sequences):

		utf8_unicode_ci: 'II' = U+2171 SMALL ROMAN NUMERAL TWO
		utf8_unicode_ci: '(10)' = U+247D PARENTHESIZED NUMBER TEN
		*/

		/* Delete the different-length record, and insert the
		buffered one. */

		lock_rec_store_on_page_infimum(block, rec);
		page_cur_delete_rec(&page_cur, index, offsets, mtr);
		page_cur_move_to_prev(&page_cur);
		rec = ibuf_insert_to_index_page_low(entry, block, index,
				      		    &offsets, heap, mtr,
						    &page_cur);

		ut_ad(!cmp_dtuple_rec(entry, rec, offsets));
		lock_rec_restore_from_page_infimum(block, rec, block);
	} else {
		offsets = NULL;
		ibuf_insert_to_index_page_low(entry, block, index,
					      &offsets, heap, mtr,
					      &page_cur);
	}
updated_in_place:
	mem_heap_free(heap);

	DBUG_VOID_RETURN;
}

/****************************************************************//**
During merge, sets the delete mark on a record for a secondary index
entry. */
static
void
ibuf_set_del_mark(
/*==============*/
	const dtuple_t*		entry,	/*!< in: entry */
	buf_block_t*		block,	/*!< in/out: block */
	const dict_index_t*	index,	/*!< in: record descriptor */
	mtr_t*			mtr)	/*!< in: mtr */
{
	page_cur_t	page_cur;
	ulint		low_match;

	ut_ad(ibuf_inside(mtr));
	ut_ad(dtuple_check_typed(entry));

4116
	low_match = page_cur_search(block, index, entry, &page_cur);
4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142

	if (low_match == dtuple_get_n_fields(entry)) {
		rec_t*		rec;
		page_zip_des_t*	page_zip;

		rec = page_cur_get_rec(&page_cur);
		page_zip = page_cur_get_page_zip(&page_cur);

		/* Delete mark the old index record. According to a
		comment in row_upd_sec_index_entry(), it can already
		have been delete marked if a lock wait occurred in
		row_ins_sec_index_entry() in a previous invocation of
		row_upd_sec_index_entry(). */

		if (UNIV_LIKELY
		    (!rec_get_deleted_flag(
			    rec, dict_table_is_comp(index->table)))) {
			btr_cur_set_deleted_flag_for_ibuf(rec, page_zip,
							  TRUE, mtr);
		}
	} else {
		const page_t*		page
			= page_cur_get_page(&page_cur);
		const buf_block_t*	block
			= page_cur_get_block(&page_cur);

4143
		ib::error() << "Unable to find a record to delete-mark";
4144 4145 4146 4147 4148
		fputs("InnoDB: tuple ", stderr);
		dtuple_print(stderr, entry);
		fputs("\n"
		      "InnoDB: record ", stderr);
		rec_print(stderr, page_cur_get_rec(&page_cur), index);
4149 4150 4151 4152 4153 4154

		ib::error() << "page " << block->page.id << " ("
			<< page_get_n_recs(page) << " records, index id "
			<< btr_page_get_index_id(page) << ").";

		ib::error() << BUG_REPORT_MSG;
4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175
		ut_ad(0);
	}
}

/****************************************************************//**
During merge, delete a record for a secondary index entry. */
static
void
ibuf_delete(
/*========*/
	const dtuple_t*	entry,	/*!< in: entry */
	buf_block_t*	block,	/*!< in/out: block */
	dict_index_t*	index,	/*!< in: record descriptor */
	mtr_t*		mtr)	/*!< in/out: mtr; must be committed
				before latching any further pages */
{
	page_cur_t	page_cur;
	ulint		low_match;

	ut_ad(ibuf_inside(mtr));
	ut_ad(dtuple_check_typed(entry));
4176
	ut_ad(!dict_index_is_spatial(index));
4177

4178
	low_match = page_cur_search(block, index, entry, &page_cur);
4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202

	if (low_match == dtuple_get_n_fields(entry)) {
		page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
		page_t*		page	= buf_block_get_frame(block);
		rec_t*		rec	= page_cur_get_rec(&page_cur);

		/* TODO: the below should probably be a separate function,
		it's a bastardized version of btr_cur_optimistic_delete. */

		ulint		offsets_[REC_OFFS_NORMAL_SIZE];
		ulint*		offsets	= offsets_;
		mem_heap_t*	heap = NULL;
		ulint		max_ins_size = 0;

		rec_offs_init(offsets_);

		offsets = rec_get_offsets(
			rec, index, offsets, ULINT_UNDEFINED, &heap);

		if (page_get_n_recs(page) <= 1
		    || !(REC_INFO_DELETED_FLAG
			 & rec_get_info_bits(rec, page_is_comp(page)))) {
			/* Refuse to purge the last record or a
			record that has not been marked for deletion. */
4203
			ib::error() << "Unable to purge a record";
4204 4205 4206 4207 4208
			fputs("InnoDB: tuple ", stderr);
			dtuple_print(stderr, entry);
			fputs("\n"
			      "InnoDB: record ", stderr);
			rec_print_new(stderr, rec, offsets);
4209
			fprintf(stderr, "\nspace " UINT32PF " offset " UINT32PF
4210 4211 4212
				" (%u records, index id %llu)\n"
				"InnoDB: Submit a detailed bug report"
				" to http://bugs.mysql.com\n",
4213 4214
				block->page.id.space(),
				block->page.id.page_no(),
4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252
				(unsigned) page_get_n_recs(page),
				(ulonglong) btr_page_get_index_id(page));

			ut_ad(0);
			return;
		}

		lock_update_delete(block, rec);

		if (!page_zip) {
			max_ins_size
				= page_get_max_insert_size_after_reorganize(
					page, 1);
		}
#ifdef UNIV_ZIP_DEBUG
		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
#endif /* UNIV_ZIP_DEBUG */
		page_cur_delete_rec(&page_cur, index, offsets, mtr);
#ifdef UNIV_ZIP_DEBUG
		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
#endif /* UNIV_ZIP_DEBUG */

		if (page_zip) {
			ibuf_update_free_bits_zip(block, mtr);
		} else {
			ibuf_update_free_bits_low(block, max_ins_size, mtr);
		}

		if (UNIV_LIKELY_NULL(heap)) {
			mem_heap_free(heap);
		}
	} else {
		/* The record must have been purged already. */
	}
}

/*********************************************************************//**
Restores insert buffer tree cursor position
4253
@return TRUE if the position was restored; FALSE if not */
Sergei Golubchik's avatar
Sergei Golubchik committed
4254
static MY_ATTRIBUTE((nonnull))
4255 4256 4257 4258 4259 4260 4261 4262 4263 4264 4265 4266 4267
ibool
ibuf_restore_pos(
/*=============*/
	ulint		space,	/*!< in: space id */
	ulint		page_no,/*!< in: index page number where the record
				should belong */
	const dtuple_t*	search_tuple,
				/*!< in: search tuple for entries of page_no */
	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE */
	btr_pcur_t*	pcur,	/*!< in/out: persistent cursor whose
				position is to be restored */
	mtr_t*		mtr)	/*!< in/out: mini-transaction */
{
4268 4269
	ut_ad(mode == BTR_MODIFY_LEAF
	      || BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE);
4270 4271 4272 4273 4274 4275 4276 4277 4278 4279 4280 4281

	if (btr_pcur_restore_position(mode, pcur, mtr)) {

		return(TRUE);
	}

	if (fil_space_get_flags(space) == ULINT_UNDEFINED) {
		/* The tablespace has been dropped.  It is possible
		that another thread has deleted the insert buffer
		entry.  Do not complain. */
		ibuf_btr_pcur_commit_specify_mtr(pcur, mtr);
	} else {
4282 4283 4284 4285 4286
		ib::error() << "ibuf cursor restoration fails!."
			" ibuf record inserted to page "
			<< space << ":" << page_no;

		ib::error() << BUG_REPORT_MSG;
4287 4288 4289 4290 4291 4292 4293 4294

		rec_print_old(stderr, btr_pcur_get_rec(pcur));
		rec_print_old(stderr, pcur->old_rec);
		dtuple_print(stderr, search_tuple);

		rec_print_old(stderr,
			      page_rec_get_next(btr_pcur_get_rec(pcur)));

4295
		ib::fatal() << "Failed to restore ibuf position.";
4296 4297 4298 4299 4300 4301 4302 4303 4304
	}

	return(FALSE);
}

/*********************************************************************//**
Deletes from ibuf the record on which pcur is positioned. If we have to
resort to a pessimistic delete, this function commits mtr and closes
the cursor.
4305
@return TRUE if mtr was committed and pcur closed in this operation */
Sergei Golubchik's avatar
Sergei Golubchik committed
4306
static MY_ATTRIBUTE((warn_unused_result))
4307 4308 4309 4310 4311 4312 4313 4314 4315 4316 4317 4318 4319 4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336 4337
ibool
ibuf_delete_rec(
/*============*/
	ulint		space,	/*!< in: space id */
	ulint		page_no,/*!< in: index page number that the record
				should belong to */
	btr_pcur_t*	pcur,	/*!< in: pcur positioned on the record to
				delete, having latch mode BTR_MODIFY_LEAF */
	const dtuple_t*	search_tuple,
				/*!< in: search tuple for entries of page_no */
	mtr_t*		mtr)	/*!< in: mtr */
{
	ibool		success;
	page_t*		root;
	dberr_t		err;

	ut_ad(ibuf_inside(mtr));
	ut_ad(page_rec_is_user_rec(btr_pcur_get_rec(pcur)));
	ut_ad(ibuf_rec_get_page_no(mtr, btr_pcur_get_rec(pcur)) == page_no);
	ut_ad(ibuf_rec_get_space(mtr, btr_pcur_get_rec(pcur)) == space);

#if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
	if (ibuf_debug == 2) {
		/* Inject a fault (crash). We do this before trying
		optimistic delete, because a pessimistic delete in the
		change buffer would require a larger test case. */

		/* Flag the buffered record as processed, to avoid
		an assertion failure after crash recovery. */
		btr_cur_set_deleted_flag_for_ibuf(
			btr_pcur_get_rec(pcur), NULL, TRUE, mtr);
4338

4339
		ibuf_mtr_commit(mtr);
4340
		log_write_up_to(LSN_MAX, true);
4341 4342 4343 4344 4345 4346 4347
		DBUG_SUICIDE();
	}
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */

	success = btr_cur_optimistic_delete(btr_pcur_get_btr_cur(pcur),
					    0, mtr);

4348 4349
	const page_id_t	page_id(space, page_no);

4350 4351 4352 4353 4354 4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367
	if (success) {
		if (page_is_empty(btr_pcur_get_page(pcur))) {
			/* If a B-tree page is empty, it must be the root page
			and the whole B-tree must be empty. InnoDB does not
			allow empty B-tree pages other than the root. */
			root = btr_pcur_get_page(pcur);

			ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
			ut_ad(page_get_page_no(root)
			      == FSP_IBUF_TREE_ROOT_PAGE_NO);

			/* ibuf->empty is protected by the root page latch.
			Before the deletion, it had to be FALSE. */
			ut_ad(!ibuf->empty);
			ibuf->empty = true;
		}

#ifdef UNIV_IBUF_COUNT_DEBUG
4368 4369 4370 4371 4372 4373 4374
		ib::info() << "Decrementing ibuf count of space " << space
			<< " page " << page_no << " from "
			<< ibuf_count_get(page_id) << " by 1";

		ibuf_count_set(page_id, ibuf_count_get(page_id) - 1);
#endif /* UNIV_IBUF_COUNT_DEBUG */

4375 4376 4377 4378 4379 4380 4381 4382 4383 4384 4385 4386 4387 4388 4389 4390 4391 4392 4393 4394 4395
		return(FALSE);
	}

	ut_ad(page_rec_is_user_rec(btr_pcur_get_rec(pcur)));
	ut_ad(ibuf_rec_get_page_no(mtr, btr_pcur_get_rec(pcur)) == page_no);
	ut_ad(ibuf_rec_get_space(mtr, btr_pcur_get_rec(pcur)) == space);

	/* We have to resort to a pessimistic delete from ibuf.
	Delete-mark the record so that it will not be applied again,
	in case the server crashes before the pessimistic delete is
	made persistent. */
	btr_cur_set_deleted_flag_for_ibuf(
		btr_pcur_get_rec(pcur), NULL, TRUE, mtr);

	btr_pcur_store_position(pcur, mtr);
	ibuf_btr_pcur_commit_specify_mtr(pcur, mtr);

	ibuf_mtr_start(mtr);
	mutex_enter(&ibuf_mutex);

	if (!ibuf_restore_pos(space, page_no, search_tuple,
4396 4397
			      BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
			      pcur, mtr)) {
4398 4399

		mutex_exit(&ibuf_mutex);
4400
		ut_ad(mtr->has_committed());
4401 4402 4403 4404 4405 4406
		goto func_exit;
	}

	root = ibuf_tree_root_get(mtr);

	btr_cur_pessimistic_delete(&err, TRUE, btr_pcur_get_btr_cur(pcur), 0,
4407
				   false, mtr);
4408 4409 4410
	ut_a(err == DB_SUCCESS);

#ifdef UNIV_IBUF_COUNT_DEBUG
4411 4412 4413 4414
	ibuf_count_set(page_id, ibuf_count_get(page_id) - 1);
#endif /* UNIV_IBUF_COUNT_DEBUG */

	ibuf_size_update(root);
4415 4416 4417 4418 4419 4420
	mutex_exit(&ibuf_mutex);

	ibuf->empty = page_is_empty(root);
	ibuf_btr_pcur_commit_specify_mtr(pcur, mtr);

func_exit:
4421
	ut_ad(mtr->has_committed());
4422 4423 4424 4425 4426
	btr_pcur_close(pcur);

	return(TRUE);
}

4427
/** When an index page is read from a disk to the buffer pool, this function
4428 4429 4430 4431
applies any buffered operations to the page and deletes the entries from the
insert buffer. If the page is not read, but created in the buffer pool, this
function deletes its buffered entries from the insert buffer; there can
exist entries for such a page if the page belonged to an index which
4432 4433 4434 4435 4436 4437 4438
subsequently was dropped.
@param[in,out]	block			if page has been read from disk,
pointer to the page x-latched, else NULL
@param[in]	page_id			page id of the index page
@param[in]	update_ibuf_bitmap	normally this is set to TRUE, but
if we have deleted or are deleting the tablespace, then we naturally do not
want to update a non-existent bitmap page */
4439 4440
void
ibuf_merge_or_delete_for_page(
4441 4442 4443 4444
	buf_block_t*		block,
	const page_id_t&	page_id,
	const page_size_t*	page_size,
	ibool			update_ibuf_bitmap)
4445 4446 4447 4448 4449 4450
{
	mem_heap_t*	heap;
	btr_pcur_t	pcur;
	dtuple_t*	search_tuple;
#ifdef UNIV_IBUF_DEBUG
	ulint		volume			= 0;
4451
#endif /* UNIV_IBUF_DEBUG */
4452
	page_zip_des_t*	page_zip		= NULL;
4453
	bool		corruption_noticed	= false;
4454 4455 4456 4457 4458 4459
	mtr_t		mtr;

	/* Counts for merged & discarded operations. */
	ulint		mops[IBUF_OP_COUNT];
	ulint		dops[IBUF_OP_COUNT];

4460 4461
	ut_ad(block == NULL || page_id.equals_to(block->page.id));
	ut_ad(block == NULL || buf_block_get_io_fix(block) == BUF_IO_READ);
4462 4463

	if (srv_force_recovery >= SRV_FORCE_NO_IBUF_MERGE
4464 4465
	    || trx_sys_hdr_page(page_id)
	    || fsp_is_system_temporary(page_id.space())) {
4466 4467 4468
		return;
	}

4469 4470 4471 4472 4473 4474
	/* We cannot refer to page_size in the following, because it is passed
	as NULL (it is unknown) when buf_read_ibuf_merge_pages() is merging
	(discarding) changes for a dropped tablespace. When block != NULL or
	update_ibuf_bitmap is specified, then page_size must be known.
	That is why we will repeat the check below, with page_size in
	place of univ_page_size. Passing univ_page_size assumes that the
4475 4476 4477
	uncompressed page size always is a power-of-2 multiple of the
	compressed page size. */

4478 4479
	if (ibuf_fixed_addr_page(page_id, univ_page_size)
	    || fsp_descr_page(page_id, univ_page_size)) {
4480 4481 4482
		return;
	}

4483 4484
	fil_space_t*	space;

4485 4486 4487
	if (update_ibuf_bitmap) {

		ut_ad(page_size != NULL);
4488

4489 4490
		if (ibuf_fixed_addr_page(page_id, *page_size)
		    || fsp_descr_page(page_id, *page_size)) {
4491 4492 4493
			return;
		}

4494
		space = fil_space_acquire(page_id.space());
4495

4496
		if (UNIV_UNLIKELY(!space)) {
4497 4498
			/* Do not try to read the bitmap page from the
			non-existent tablespace, delete the ibuf records */
4499 4500 4501
			block = NULL;
			update_ibuf_bitmap = FALSE;
		} else {
4502 4503
			page_t*	bitmap_page = NULL;
			ulint	bitmap_bits = 0;
4504 4505 4506 4507

			ibuf_mtr_start(&mtr);

			bitmap_page = ibuf_bitmap_get_map_page(
4508
				page_id, *page_size, &mtr);
4509 4510 4511 4512

			if (bitmap_page &&
			    fil_page_get_type(bitmap_page) != FIL_PAGE_TYPE_ALLOCATED) {
				bitmap_bits = ibuf_bitmap_page_get_bits(
4513
				bitmap_page, page_id, *page_size,
4514 4515
					IBUF_BITMAP_BUFFERED, &mtr);
			}
4516 4517 4518 4519 4520 4521

			ibuf_mtr_commit(&mtr);

			if (!bitmap_bits) {
				/* No inserts buffered for this page */

4522
				fil_space_release(space);
4523 4524 4525
				return;
			}
		}
4526 4527 4528
	} else if (block != NULL
		   && (ibuf_fixed_addr_page(page_id, *page_size)
		       || fsp_descr_page(page_id, *page_size))) {
4529 4530

		return;
4531 4532
	} else {
		space = NULL;
4533 4534 4535 4536
	}

	heap = mem_heap_create(512);

4537 4538
	search_tuple = ibuf_search_tuple_build(
		page_id.space(), page_id.page_no(), heap);
4539

4540
	if (block != NULL) {
4541 4542 4543 4544 4545 4546 4547 4548
		/* Move the ownership of the x-latch on the page to this OS
		thread, so that we can acquire a second x-latch on it. This
		is needed for the insert operations to the index page to pass
		the debug checks. */

		rw_lock_x_lock_move_ownership(&(block->lock));
		page_zip = buf_block_get_page_zip(block);

4549 4550
		if (!fil_page_index_page_check(block->frame)
		    || !page_is_leaf(block->frame)) {
4551

4552
			corruption_noticed = true;
4553

4554 4555 4556 4557 4558 4559 4560 4561 4562
			ib::error() << "Corruption in the tablespace. Bitmap"
				" shows insert buffer records to page "
				<< page_id << " though the page type is "
				<< fil_page_get_type(block->frame)
				<< ", which is not an index leaf page. We try"
				" to resolve the problem by skipping the"
				" insert buffer merge for this page. Please"
				" run CHECK TABLE on your tables to determine"
				" if they are corrupt after this.";
4563 4564 4565 4566 4567 4568 4569 4570 4571 4572 4573 4574 4575 4576 4577 4578
			ut_ad(0);
		}
	}

	memset(mops, 0, sizeof(mops));
	memset(dops, 0, sizeof(dops));

loop:
	ibuf_mtr_start(&mtr);

	/* Position pcur in the insert buffer at the first entry for this
	index page */
	btr_pcur_open_on_user_rec(
		ibuf->index, search_tuple, PAGE_CUR_GE, BTR_MODIFY_LEAF,
		&pcur, &mtr);

4579
	if (block != NULL) {
4580 4581
		ibool success;

4582 4583
		mtr.set_named_space(page_id.space());

4584 4585 4586 4587 4588 4589 4590 4591 4592 4593 4594 4595 4596
		success = buf_page_get_known_nowait(
			RW_X_LATCH, block,
			BUF_KEEP_OLD, __FILE__, __LINE__, &mtr);

		ut_a(success);

		/* This is a user page (secondary index leaf page),
		but we pretend that it is a change buffer page in
		order to obey the latching order. This should be OK,
		because buffered changes are applied immediately while
		the block is io-fixed. Other threads must not try to
		latch an io-fixed block. */
		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);
4597 4598
	} else if (update_ibuf_bitmap) {
		mtr.set_named_space(page_id.space());
4599 4600 4601 4602 4603 4604 4605 4606 4607 4608 4609 4610 4611 4612 4613 4614
	}

	if (!btr_pcur_is_on_user_rec(&pcur)) {
		ut_ad(btr_pcur_is_after_last_in_tree(&pcur, &mtr));

		goto reset_bit;
	}

	for (;;) {
		rec_t*	rec;

		ut_ad(btr_pcur_is_on_user_rec(&pcur));

		rec = btr_pcur_get_rec(&pcur);

		/* Check if the entry is for this index page */
4615 4616
		if (ibuf_rec_get_page_no(&mtr, rec) != page_id.page_no()
		    || ibuf_rec_get_space(&mtr, rec) != page_id.space()) {
4617

4618
			if (block != NULL) {
4619 4620 4621 4622 4623 4624 4625
				page_header_reset_last_insert(
					block->frame, page_zip, &mtr);
			}

			goto reset_bit;
		}

4626
		if (corruption_noticed) {
4627 4628 4629
			fputs("InnoDB: Discarding record\n ", stderr);
			rec_print_old(stderr, rec);
			fputs("\nInnoDB: from the insert buffer!\n\n", stderr);
4630
		} else if (block != NULL && !rec_get_deleted_flag(rec, 0)) {
4631 4632 4633 4634 4635 4636 4637 4638 4639 4640 4641 4642 4643 4644 4645 4646 4647 4648 4649 4650 4651 4652 4653 4654 4655 4656 4657 4658 4659 4660 4661 4662 4663 4664 4665 4666 4667 4668 4669 4670 4671 4672 4673 4674 4675 4676 4677 4678 4679 4680 4681
			/* Now we have at pcur a record which should be
			applied on the index page; NOTE that the call below
			copies pointers to fields in rec, and we must
			keep the latch to the rec page until the
			insertion is finished! */
			dtuple_t*	entry;
			trx_id_t	max_trx_id;
			dict_index_t*	dummy_index;
			ibuf_op_t	op = ibuf_rec_get_op_type(&mtr, rec);

			max_trx_id = page_get_max_trx_id(page_align(rec));
			page_update_max_trx_id(block, page_zip, max_trx_id,
					       &mtr);

			ut_ad(page_validate(page_align(rec), ibuf->index));

			entry = ibuf_build_entry_from_ibuf_rec(
				&mtr, rec, heap, &dummy_index);

			ut_ad(page_validate(block->frame, dummy_index));

			switch (op) {
				ibool	success;
			case IBUF_OP_INSERT:
#ifdef UNIV_IBUF_DEBUG
				volume += rec_get_converted_size(
					dummy_index, entry, 0);

				volume += page_dir_calc_reserved_space(1);

				ut_a(volume <= 4 * UNIV_PAGE_SIZE
					/ IBUF_PAGE_SIZE_PER_FREE_SPACE);
#endif
				ibuf_insert_to_index_page(
					entry, block, dummy_index, &mtr);
				break;

			case IBUF_OP_DELETE_MARK:
				ibuf_set_del_mark(
					entry, block, dummy_index, &mtr);
				break;

			case IBUF_OP_DELETE:
				ibuf_delete(entry, block, dummy_index, &mtr);
				/* Because ibuf_delete() will latch an
				insert buffer bitmap page, commit mtr
				before latching any further pages.
				Store and restore the cursor position. */
				ut_ad(rec == btr_pcur_get_rec(&pcur));
				ut_ad(page_rec_is_user_rec(rec));
				ut_ad(ibuf_rec_get_page_no(&mtr, rec)
4682 4683 4684
				      == page_id.page_no());
				ut_ad(ibuf_rec_get_space(&mtr, rec)
				      == page_id.space());
4685 4686 4687 4688 4689 4690 4691 4692 4693 4694 4695 4696 4697 4698 4699

				/* Mark the change buffer record processed,
				so that it will not be merged again in case
				the server crashes between the following
				mtr_commit() and the subsequent mtr_commit()
				of deleting the change buffer record. */

				btr_cur_set_deleted_flag_for_ibuf(
					btr_pcur_get_rec(&pcur), NULL,
					TRUE, &mtr);

				btr_pcur_store_position(&pcur, &mtr);
				ibuf_btr_pcur_commit_specify_mtr(&pcur, &mtr);

				ibuf_mtr_start(&mtr);
4700
				mtr.set_named_space(page_id.space());
4701 4702 4703 4704 4705 4706 4707 4708 4709 4710 4711 4712 4713 4714

				success = buf_page_get_known_nowait(
					RW_X_LATCH, block,
					BUF_KEEP_OLD,
					__FILE__, __LINE__, &mtr);
				ut_a(success);

				/* This is a user page (secondary
				index leaf page), but it should be OK
				to use too low latching order for it,
				as the block is io-fixed. */
				buf_block_dbg_add_level(
					block, SYNC_IBUF_TREE_NODE);

4715 4716
				if (!ibuf_restore_pos(page_id.space(),
						      page_id.page_no(),
4717 4718 4719 4720
						      search_tuple,
						      BTR_MODIFY_LEAF,
						      &pcur, &mtr)) {

4721
					ut_ad(mtr.has_committed());
4722 4723 4724 4725 4726 4727 4728 4729 4730 4731 4732 4733 4734 4735 4736 4737 4738 4739
					mops[op]++;
					ibuf_dummy_index_free(dummy_index);
					goto loop;
				}

				break;
			default:
				ut_error;
			}

			mops[op]++;

			ibuf_dummy_index_free(dummy_index);
		} else {
			dops[ibuf_rec_get_op_type(&mtr, rec)]++;
		}

		/* Delete the record from ibuf */
4740 4741
		if (ibuf_delete_rec(page_id.space(), page_id.page_no(),
				    &pcur, search_tuple, &mtr)) {
4742 4743 4744
			/* Deletion was pessimistic and mtr was committed:
			we start from the beginning again */

4745
			ut_ad(mtr.has_committed());
4746 4747 4748 4749 4750 4751 4752 4753 4754 4755
			goto loop;
		} else if (btr_pcur_is_after_last_on_page(&pcur)) {
			ibuf_mtr_commit(&mtr);
			btr_pcur_close(&pcur);

			goto loop;
		}
	}

reset_bit:
4756
	if (update_ibuf_bitmap) {
4757 4758
		page_t*	bitmap_page;

4759 4760
		bitmap_page = ibuf_bitmap_get_map_page(page_id, *page_size,
						       &mtr);
4761 4762

		ibuf_bitmap_page_set_bits(
4763
			bitmap_page, page_id, *page_size,
4764 4765
			IBUF_BITMAP_BUFFERED, FALSE, &mtr);

4766
		if (block != NULL) {
4767
			ulint old_bits = ibuf_bitmap_page_get_bits(
4768
				bitmap_page, page_id, *page_size,
4769 4770
				IBUF_BITMAP_FREE, &mtr);

4771
			ulint new_bits = ibuf_index_page_calc_free(block);
4772 4773 4774

			if (old_bits != new_bits) {
				ibuf_bitmap_page_set_bits(
4775
					bitmap_page, page_id, *page_size,
4776 4777 4778 4779 4780 4781
					IBUF_BITMAP_FREE, new_bits, &mtr);
			}
		}
	}

	ibuf_mtr_commit(&mtr);
4782 4783 4784 4785 4786

	if (space) {
		fil_space_release(space);
	}

4787 4788 4789
	btr_pcur_close(&pcur);
	mem_heap_free(heap);

4790
	my_atomic_addlint(&ibuf->n_merges, 1);
4791 4792 4793 4794
	ibuf_add_ops(ibuf->n_merged_ops, mops);
	ibuf_add_ops(ibuf->n_discarded_ops, dops);

#ifdef UNIV_IBUF_COUNT_DEBUG
4795
	ut_a(ibuf_count_get(page_id) == 0);
4796 4797 4798 4799 4800
#endif
}

/*********************************************************************//**
Deletes all entries in the insert buffer for a given space id. This is used
4801
in DISCARD TABLESPACE, IMPORT TABLESPACE and TRUNCATE TABLESPACE.
4802 4803 4804 4805 4806 4807 4808 4809 4810 4811 4812 4813 4814 4815 4816 4817 4818 4819 4820 4821 4822 4823 4824 4825 4826 4827 4828 4829 4830 4831 4832 4833 4834 4835 4836 4837 4838 4839 4840 4841 4842 4843 4844 4845 4846 4847 4848 4849 4850 4851 4852 4853 4854 4855 4856 4857 4858 4859 4860 4861 4862
NOTE: this does not update the page free bitmaps in the space. The space will
become CORRUPT when you call this function! */
void
ibuf_delete_for_discarded_space(
/*============================*/
	ulint	space)	/*!< in: space id */
{
	mem_heap_t*	heap;
	btr_pcur_t	pcur;
	dtuple_t*	search_tuple;
	const rec_t*	ibuf_rec;
	ulint		page_no;
	mtr_t		mtr;

	/* Counts for discarded operations. */
	ulint		dops[IBUF_OP_COUNT];

	heap = mem_heap_create(512);

	/* Use page number 0 to build the search tuple so that we get the
	cursor positioned at the first entry for this space id */

	search_tuple = ibuf_search_tuple_build(space, 0, heap);

	memset(dops, 0, sizeof(dops));
loop:
	ibuf_mtr_start(&mtr);

	/* Position pcur in the insert buffer at the first entry for the
	space */
	btr_pcur_open_on_user_rec(
		ibuf->index, search_tuple, PAGE_CUR_GE, BTR_MODIFY_LEAF,
		&pcur, &mtr);

	if (!btr_pcur_is_on_user_rec(&pcur)) {
		ut_ad(btr_pcur_is_after_last_in_tree(&pcur, &mtr));

		goto leave_loop;
	}

	for (;;) {
		ut_ad(btr_pcur_is_on_user_rec(&pcur));

		ibuf_rec = btr_pcur_get_rec(&pcur);

		/* Check if the entry is for this space */
		if (ibuf_rec_get_space(&mtr, ibuf_rec) != space) {

			goto leave_loop;
		}

		page_no = ibuf_rec_get_page_no(&mtr, ibuf_rec);

		dops[ibuf_rec_get_op_type(&mtr, ibuf_rec)]++;

		/* Delete the record from ibuf */
		if (ibuf_delete_rec(space, page_no, &pcur, search_tuple,
				    &mtr)) {
			/* Deletion was pessimistic and mtr was committed:
			we start from the beginning again */

4863
			ut_ad(mtr.has_committed());
4864 4865 4866 4867 4868 4869 4870 4871 4872 4873 4874 4875 4876 4877 4878 4879 4880 4881 4882 4883 4884 4885
			goto loop;
		}

		if (btr_pcur_is_after_last_on_page(&pcur)) {
			ibuf_mtr_commit(&mtr);
			btr_pcur_close(&pcur);

			goto loop;
		}
	}

leave_loop:
	ibuf_mtr_commit(&mtr);
	btr_pcur_close(&pcur);

	ibuf_add_ops(ibuf->n_discarded_ops, dops);

	mem_heap_free(heap);
}

/******************************************************************//**
Looks if the insert buffer is empty.
4886
@return true if empty */
4887 4888 4889 4890 4891 4892 4893 4894 4895 4896 4897 4898 4899 4900 4901 4902 4903 4904 4905 4906 4907 4908 4909 4910 4911 4912 4913 4914 4915 4916 4917 4918 4919 4920 4921 4922
bool
ibuf_is_empty(void)
/*===============*/
{
	bool		is_empty;
	const page_t*	root;
	mtr_t		mtr;

	ibuf_mtr_start(&mtr);

	mutex_enter(&ibuf_mutex);
	root = ibuf_tree_root_get(&mtr);
	mutex_exit(&ibuf_mutex);

	is_empty = page_is_empty(root);
	ut_a(is_empty == ibuf->empty);
	ibuf_mtr_commit(&mtr);

	return(is_empty);
}

/******************************************************************//**
Prints info of ibuf. */
void
ibuf_print(
/*=======*/
	FILE*	file)	/*!< in: file where to print */
{
#ifdef UNIV_IBUF_COUNT_DEBUG
	ulint		i;
	ulint		j;
#endif

	mutex_enter(&ibuf_mutex);

	fprintf(file,
4923 4924 4925 4926 4927 4928
		"Ibuf: size " ULINTPF ", free list len " ULINTPF ","
		" seg size " ULINTPF ", " ULINTPF " merges\n",
		ibuf->size,
		ibuf->free_list_len,
		ibuf->seg_size,
		ibuf->n_merges);
4929 4930 4931 4932 4933 4934 4935 4936 4937 4938

	fputs("merged operations:\n ", file);
	ibuf_print_ops(ibuf->n_merged_ops, file);

	fputs("discarded operations:\n ", file);
	ibuf_print_ops(ibuf->n_discarded_ops, file);

#ifdef UNIV_IBUF_COUNT_DEBUG
	for (i = 0; i < IBUF_COUNT_N_SPACES; i++) {
		for (j = 0; j < IBUF_COUNT_N_PAGES; j++) {
4939
			ulint	count = ibuf_count_get(page_id_t(i, j, 0));
4940 4941 4942

			if (count > 0) {
				fprintf(stderr,
4943 4944 4945 4946
					"Ibuf count for page "
					ULINTPF ":" ULINTPF ""
					" is " ULINTPF "\n",
					i, j, count);
4947 4948 4949 4950 4951 4952 4953 4954 4955 4956 4957 4958 4959 4960 4961 4962 4963 4964 4965 4966 4967 4968 4969
			}
		}
	}
#endif /* UNIV_IBUF_COUNT_DEBUG */

	mutex_exit(&ibuf_mutex);
}

/******************************************************************//**
Checks the insert buffer bitmaps on IMPORT TABLESPACE.
@return DB_SUCCESS or error code */
dberr_t
ibuf_check_bitmap_on_import(
/*========================*/
	const trx_t*	trx,		/*!< in: transaction */
	ulint		space_id)	/*!< in: tablespace identifier */
{
	ulint	size;
	ulint	page_no;

	ut_ad(space_id);
	ut_ad(trx->mysql_thd);

4970 4971 4972
	bool			found;
	const page_size_t&	page_size
		= fil_space_get_page_size(space_id, &found);
4973

4974
	if (!found) {
4975 4976 4977 4978 4979 4980 4981 4982 4983 4984 4985
		return(DB_TABLE_NOT_FOUND);
	}

	size = fil_space_get_size(space_id);

	if (size == 0) {
		return(DB_TABLE_NOT_FOUND);
	}

	mutex_enter(&ibuf_mutex);

4986 4987 4988 4989 4990
	/* The two bitmap pages (allocation bitmap and ibuf bitmap) repeat
	every page_size pages. For example if page_size is 16 KiB, then the
	two bitmap pages repeat every 16 KiB * 16384 = 256 MiB. In the loop
	below page_no is measured in number of pages since the beginning of
	the space, as usual. */
4991

4992
	for (page_no = 0; page_no < size; page_no += page_size.physical()) {
4993 4994 4995 4996 4997 4998 4999 5000 5001 5002 5003 5004 5005 5006 5007 5008
		mtr_t	mtr;
		page_t*	bitmap_page;
		ulint	i;

		if (trx_is_interrupted(trx)) {
			mutex_exit(&ibuf_mutex);
			return(DB_INTERRUPTED);
		}

		mtr_start(&mtr);

		mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);

		ibuf_enter(&mtr);

		bitmap_page = ibuf_bitmap_get_map_page(
5009 5010 5011 5012 5013 5014 5015 5016 5017 5018 5019 5020 5021 5022 5023 5024 5025 5026 5027 5028 5029 5030 5031 5032 5033 5034
			page_id_t(space_id, page_no), page_size, &mtr);

		if (buf_page_is_zeroes(bitmap_page, page_size)) {
			/* This means we got all-zero page instead of
			ibuf bitmap page. The subsequent page should be
			all-zero pages. */
#ifdef UNIV_DEBUG
			for (ulint curr_page = page_no + 1;
			     curr_page < page_size.physical(); curr_page++) {

				buf_block_t* block = buf_page_get(
						page_id_t(space_id, curr_page),
						page_size,
						RW_S_LATCH, &mtr);
	                        page_t*	page = buf_block_get_frame(block);
				ut_ad(buf_page_is_zeroes(page, page_size));
			}
#endif /* UNIV_DEBUG */
			ibuf_exit(&mtr);
			mtr_commit(&mtr);
			continue;
		}

		for (i = FSP_IBUF_BITMAP_OFFSET + 1;
		     i < page_size.physical();
		     i++) {
5035 5036 5037

			const ulint	offset = page_no + i;

5038 5039
			const page_id_t	cur_page_id(space_id, offset);

5040
			if (ibuf_bitmap_page_get_bits(
5041 5042
					bitmap_page, cur_page_id, page_size,
					IBUF_BITMAP_IBUF, &mtr)) {
5043 5044 5045 5046 5047 5048 5049 5050 5051 5052 5053 5054 5055 5056 5057 5058 5059 5060

				mutex_exit(&ibuf_mutex);
				ibuf_exit(&mtr);
				mtr_commit(&mtr);

				ib_errf(trx->mysql_thd,
					IB_LOG_LEVEL_ERROR,
					 ER_INNODB_INDEX_CORRUPT,
					 "Space %u page %u"
					 " is wrongly flagged to belong to the"
					 " insert buffer",
					 (unsigned) space_id,
					 (unsigned) offset);

				return(DB_CORRUPTION);
			}

			if (ibuf_bitmap_page_get_bits(
5061
				    bitmap_page, cur_page_id, page_size,
5062 5063 5064 5065 5066 5067 5068 5069 5070 5071 5072 5073 5074 5075
				    IBUF_BITMAP_BUFFERED, &mtr)) {

				ib_errf(trx->mysql_thd,
					IB_LOG_LEVEL_WARN,
					ER_INNODB_INDEX_CORRUPT,
					"Buffered changes"
					" for space %u page %u are lost",
					(unsigned) space_id,
					(unsigned) offset);

				/* Tolerate this error, so that
				slightly corrupted tables can be
				imported and dumped.  Clear the bit. */
				ibuf_bitmap_page_set_bits(
5076
					bitmap_page, cur_page_id, page_size,
5077 5078 5079 5080 5081 5082 5083 5084 5085 5086 5087
					IBUF_BITMAP_BUFFERED, FALSE, &mtr);
			}
		}

		ibuf_exit(&mtr);
		mtr_commit(&mtr);
	}

	mutex_exit(&ibuf_mutex);
	return(DB_SUCCESS);
}
5088 5089 5090 5091 5092 5093 5094 5095 5096 5097 5098 5099 5100 5101 5102 5103 5104 5105 5106 5107 5108 5109 5110 5111 5112 5113 5114 5115 5116 5117 5118 5119 5120 5121

/** Updates free bits and buffered bits for bulk loaded page.
@param[in]	block	index page
@param[in]	reset	flag if reset free val */
void
ibuf_set_bitmap_for_bulk_load(
	buf_block_t*	block,
	bool		reset)
{
	page_t*	bitmap_page;
	mtr_t	mtr;
	ulint	free_val;

	ut_a(page_is_leaf(buf_block_get_frame(block)));

	free_val = ibuf_index_page_calc_free(block);

	mtr_start(&mtr);
	mtr.set_named_space(block->page.id.space());

	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
                                               block->page.size, &mtr);

	free_val = reset ? 0 : ibuf_index_page_calc_free(block);
	ibuf_bitmap_page_set_bits(
		bitmap_page, block->page.id, block->page.size,
		IBUF_BITMAP_FREE, free_val, &mtr);

	ibuf_bitmap_page_set_bits(
		bitmap_page, block->page.id, block->page.size,
		IBUF_BITMAP_BUFFERED, FALSE, &mtr);

	mtr_commit(&mtr);
}