page0zip.c 114 KB
Newer Older
marko's avatar
marko committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/******************************************************
Compressed page interface

(c) 2005 Innobase Oy

Created June 2005 by Marko Makela
*******************************************************/

#define THIS_MODULE
#include "page0zip.h"
#ifdef UNIV_NONINL
# include "page0zip.ic"
#endif
#undef THIS_MODULE
#include "page0page.h"
#include "mtr0log.h"
17
#include "ut0sort.h"
18
#include "dict0boot.h"
19
#include "dict0dict.h"
20
#include "btr0sea.h"
21 22
#include "btr0cur.h"
#include "page0types.h"
23
#include "lock0lock.h"
24
#include "log0recv.h"
marko's avatar
marko committed
25 26
#include "zlib.h"

27 28 29 30 31 32 33
/** Number of page compressions, indexed by page_zip_des_t::ssize */
ulint	page_zip_compress_count[8];
/** Number of successful page compressions, indexed by page_zip_des_t::ssize */
ulint	page_zip_compress_ok[8];
/** Number of page decompressions, indexed by page_zip_des_t::ssize */
ulint	page_zip_decompress_count[8];

34 35 36
/* Please refer to ../include/page0zip.ic for a description of the
compressed page format. */

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
/* The infimum and supremum records are omitted from the compressed page.
On compress, we compare that the records are there, and on uncompress we
restore the records. */
static const byte infimum_extra[] = {
	0x01,			/* info_bits=0, n_owned=1 */
	0x00, 0x02		/* heap_no=0, status=2 */
	/* ?, ?	*/		/* next=(first user rec, or supremum) */
};
static const byte infimum_data[] = {
	0x69, 0x6e, 0x66, 0x69,
	0x6d, 0x75, 0x6d, 0x00	/* "infimum\0" */
};
static const byte supremum_extra_data[] = {
	/* 0x0?, */		/* info_bits=0, n_owned=1..8 */
	0x00, 0x0b,		/* heap_no=1, status=3 */
	0x00, 0x00,		/* next=0 */
	0x73, 0x75, 0x70, 0x72,
	0x65, 0x6d, 0x75, 0x6d	/* "supremum" */
};

marko's avatar
marko committed
57 58 59 60 61
#ifdef UNIV_DEBUG
/* Array of zeros, used for debug assertions */
static const byte zero[BTR_EXTERN_FIELD_REF_SIZE] = { 0, };
#endif

62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
/**************************************************************************
Determine the guaranteed free space on an empty page. */

ulint
page_zip_empty_size(
/*================*/
				/* out: minimum payload size on the page */
	ulint	n_fields,	/* in: number of columns in the index */
	ulint	zip_size)	/* in: compressed page size in bytes */
{
	lint	size = zip_size
		/* subtract the page header and the longest
		uncompressed data needed for one record */
		- (PAGE_DATA
		   + PAGE_ZIP_DIR_SLOT_SIZE
		   + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN
		   + 1/* encoded heap_no==2 in page_zip_write_rec() */
		   + 1/* end of modification log */
		   - REC_N_NEW_EXTRA_BYTES/* omitted bytes */)
		/* subtract the space for page_zip_fields_encode() */
		- compressBound(2 * (n_fields + 1));
	return(size > 0 ? (ulint) size : 0);
}

86 87 88 89 90 91 92 93 94 95 96 97 98
/*****************************************************************
Gets the size of the compressed page trailer (the dense page directory),
including deleted records (the free list). */
UNIV_INLINE
ulint
page_zip_dir_size(
/*==============*/
						/* out: length of dense page
						directory, in bytes */
	const page_zip_des_t*	page_zip)	/* in: compressed page */
{
	/* Exclude the page infimum and supremum from the record count. */
	ulint	size = PAGE_ZIP_DIR_SLOT_SIZE
99 100
		* (page_dir_get_n_heap(page_zip->data)
		   - PAGE_HEAP_NO_USER_LOW);
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
	return(size);
}

/*****************************************************************
Gets the size of the compressed page trailer (the dense page directory),
only including user records (excluding the free list). */
UNIV_INLINE
ulint
page_zip_dir_user_size(
/*===================*/
						/* out: length of dense page
						directory comprising existing
						records, in bytes */
	const page_zip_des_t*	page_zip)	/* in: compressed page */
{
	ulint	size = PAGE_ZIP_DIR_SLOT_SIZE
117
		* page_get_n_recs(page_zip->data);
118
	ut_ad(size <= page_zip_dir_size(page_zip));
119 120 121 122
	return(size);
}

/*****************************************************************
123
Find the slot of the given record in the dense page directory. */
124 125
UNIV_INLINE
byte*
126 127 128 129 130 131 132
page_zip_dir_find_low(
/*==================*/
					/* out: dense directory slot,
					or NULL if record not found */
	byte*	slot,			/* in: start of records */
	byte*	end,			/* in: end of records */
	ulint	offset)			/* in: offset of user record */
133
{
134
	ut_ad(slot <= end);
135 136 137

	for (; slot < end; slot += PAGE_ZIP_DIR_SLOT_SIZE) {
		if ((mach_read_from_2(slot) & PAGE_ZIP_DIR_SLOT_MASK)
138
		    == offset) {
139 140 141 142 143 144 145 146
			return(slot);
		}
	}

	return(NULL);
}

/*****************************************************************
147
Find the slot of the given non-free record in the dense page directory. */
148 149
UNIV_INLINE
byte*
150 151 152 153 154 155
page_zip_dir_find(
/*==============*/
						/* out: dense directory slot,
						or NULL if record not found */
	page_zip_des_t*	page_zip,		/* in: compressed page */
	ulint		offset)			/* in: offset of user record */
156
{
157
	byte*	end	= page_zip->data + page_zip_get_size(page_zip);
158

159
	ut_ad(page_zip_simple_validate(page_zip));
160

161 162 163
	return(page_zip_dir_find_low(end - page_zip_dir_user_size(page_zip),
				     end,
				     offset));
164 165
}

166 167 168 169 170 171 172 173 174 175 176
/*****************************************************************
Find the slot of the given free record in the dense page directory. */
UNIV_INLINE
byte*
page_zip_dir_find_free(
/*===================*/
						/* out: dense directory slot,
						or NULL if record not found */
	page_zip_des_t*	page_zip,		/* in: compressed page */
	ulint		offset)			/* in: offset of user record */
{
177
	byte*	end	= page_zip->data + page_zip_get_size(page_zip);
178 179 180

	ut_ad(page_zip_simple_validate(page_zip));

181 182 183
	return(page_zip_dir_find_low(end - page_zip_dir_size(page_zip),
				     end - page_zip_dir_user_size(page_zip),
				     offset));
184 185
}

186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
/*****************************************************************
Read a given slot in the dense page directory. */
UNIV_INLINE
ulint
page_zip_dir_get(
/*=============*/
						/* out: record offset
						on the uncompressed page,
						possibly ORed with
						PAGE_ZIP_DIR_SLOT_DEL or
						PAGE_ZIP_DIR_SLOT_OWNED */
	const page_zip_des_t*	page_zip,	/* in: compressed page */
	ulint			slot)		/* in: slot
						(0=first user record) */
{
	ut_ad(page_zip_simple_validate(page_zip));
	ut_ad(slot < page_zip_dir_size(page_zip) / PAGE_ZIP_DIR_SLOT_SIZE);
203
	return(mach_read_from_2(page_zip->data + page_zip_get_size(page_zip)
204
				- PAGE_ZIP_DIR_SLOT_SIZE * (slot + 1)));
205 206
}

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
/**************************************************************************
Write a log record of compressing an index page. */
static
void
page_zip_compress_write_log(
/*========================*/
	const page_zip_des_t*	page_zip,/* in: compressed page */
	const page_t*		page,	/* in: uncompressed page */
	dict_index_t*		index,	/* in: index of the B-tree node */
	mtr_t*			mtr)	/* in: mini-transaction */
{
	byte*	log_ptr;
	ulint	trailer_size;

	log_ptr = mlog_open(mtr, 11 + 2 + 2);

	if (!log_ptr) {

		return;
	}

228 229 230
	/* Read the number of user records. */
	trailer_size = page_dir_get_n_heap(page_zip->data)
		- PAGE_HEAP_NO_USER_LOW;
231
	/* Multiply by uncompressed of size stored per record */
232
	if (!page_is_leaf(page)) {
233
		trailer_size *= PAGE_ZIP_DIR_SLOT_SIZE + REC_NODE_PTR_SIZE;
234 235
	} else if (dict_index_is_clust(index)) {
		trailer_size *= PAGE_ZIP_DIR_SLOT_SIZE
236
			+ DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
237 238
	} else {
		trailer_size *= PAGE_ZIP_DIR_SLOT_SIZE;
239 240 241 242 243 244 245
	}
	/* Add the space occupied by BLOB pointers. */
	trailer_size += page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
	ut_a(page_zip->m_end > PAGE_DATA);
#if FIL_PAGE_DATA > PAGE_DATA
# error "FIL_PAGE_DATA > PAGE_DATA"
#endif
246
	ut_a(page_zip->m_end + trailer_size <= page_zip_get_size(page_zip));
247

248 249 250
	log_ptr = mlog_write_initial_log_record_fast((page_t*) page,
						     MLOG_ZIP_PAGE_COMPRESS,
						     log_ptr, mtr);
251 252 253 254 255 256 257 258 259 260 261 262
	mach_write_to_2(log_ptr, page_zip->m_end - FIL_PAGE_TYPE);
	log_ptr += 2;
	mach_write_to_2(log_ptr, trailer_size);
	log_ptr += 2;
	mlog_close(mtr, log_ptr);

	/* Write FIL_PAGE_PREV and FIL_PAGE_NEXT */
	mlog_catenate_string(mtr, page_zip->data + FIL_PAGE_PREV, 4);
	mlog_catenate_string(mtr, page_zip->data + FIL_PAGE_NEXT, 4);
	/* Write most of the page header, the compressed stream and
	the modification log. */
	mlog_catenate_string(mtr, page_zip->data + FIL_PAGE_TYPE,
263
			     page_zip->m_end - FIL_PAGE_TYPE);
264
	/* Write the uncompressed trailer of the compressed page. */
265
	mlog_catenate_string(mtr, page_zip->data + page_zip_get_size(page_zip)
266
			     - trailer_size, trailer_size);
267 268
}

269 270
/**********************************************************
Determine how many externally stored columns are contained
271
in existing records with smaller heap_no than rec. */
272
static
273 274 275
ulint
page_zip_get_n_prev_extern(
/*=======================*/
276 277 278 279 280
	const page_zip_des_t*	page_zip,/* in: dense page directory on
					compressed page */
	const rec_t*		rec,	/* in: compact physical record
					on a B-tree leaf page */
	dict_index_t*		index)	/* in: record descriptor */
281
{
282
	const page_t*	page	= page_align(rec);
283 284 285 286 287
	ulint		n_ext	= 0;
	ulint		i;
	ulint		left;
	ulint		heap_no;
	ulint		n_recs	= page_get_n_recs(page_zip->data);
288 289 290 291

	ut_ad(page_is_leaf(page));
	ut_ad(page_is_comp(page));
	ut_ad(dict_table_is_comp(index->table));
292
	ut_ad(dict_index_is_clust(index));
293

294
	heap_no = rec_get_heap_no_new(rec);
295 296
	ut_ad(heap_no >= PAGE_HEAP_NO_USER_LOW);
	left = heap_no - PAGE_HEAP_NO_USER_LOW;
297 298 299
	if (UNIV_UNLIKELY(!left)) {
		return(0);
	}
300

301
	for (i = 0; i < n_recs; i++) {
302 303
		const rec_t*	r	= page + (page_zip_dir_get(page_zip, i)
						  & PAGE_ZIP_DIR_SLOT_MASK);
304

305
		if (rec_get_heap_no_new(r) < heap_no) {
306 307
			n_ext += rec_get_n_extern_new(r, index,
						      ULINT_UNDEFINED);
308 309 310
			if (!--left) {
				break;
			}
311
		}
312
	}
313

314
	return(n_ext);
315 316
}

317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
/**************************************************************************
Encode the length of a fixed-length column. */
static
byte*
page_zip_fixed_field_encode(
/*========================*/
			/* out: buf + length of encoded val */
	byte*	buf,	/* in: pointer to buffer where to write */
	ulint	val)	/* in: value to write */
{
	ut_ad(val >= 2);

	if (UNIV_LIKELY(val < 126)) {
		/*
		0 = nullable variable field of at most 255 bytes length;
		1 = not null variable field of at most 255 bytes length;
		126 = nullable variable field with maximum length >255;
		127 = not null variable field with maximum length >255
		*/
		*buf++ = val;
	} else {
338
		*buf++ = 0x80 | val >> 8;
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
		*buf++ = 0xff & val;
	}

	return(buf);
}

/**************************************************************************
Write the index information for the compressed page. */
static
ulint
page_zip_fields_encode(
/*===================*/
				/* out: used size of buf */
	ulint		n,	/* in: number of fields to compress */
	dict_index_t*	index,	/* in: index comprising at least n fields */
	ulint		trx_id_pos,/* in: position of the trx_id column
				in the index, or ULINT_UNDEFINED if
				this is a non-leaf page */
	byte*		buf)	/* out: buffer of (n + 1) * 2 bytes */
{
	const byte*	buf_start	= buf;
	ulint		i;
	ulint		col;
	ulint		trx_id_col	= 0;
	/* sum of lengths of preceding non-nullable fixed fields, or 0 */
	ulint		fixed_sum	= 0;

	ut_ad(trx_id_pos == ULINT_UNDEFINED || trx_id_pos < n);

	for (i = col = 0; i < n; i++) {
		dict_field_t*	field = dict_index_get_nth_field(index, i);
		ulint		val;

372
		if (dict_field_get_col(field)->prtype & DATA_NOT_NULL) {
373 374 375 376 377 378 379
			val = 1; /* set the "not nullable" flag */
		} else {
			val = 0; /* nullable field */
		}

		if (!field->fixed_len) {
			/* variable-length field */
380 381
			const dict_col_t*	column
				= dict_field_get_col(field);
382

383 384
			if (UNIV_UNLIKELY(column->len > 255)
			    || UNIV_UNLIKELY(column->mtype == DATA_BLOB)) {
385 386 387 388 389 390
				val |= 0x7e; /* max > 255 bytes */
			}

			if (fixed_sum) {
				/* write out the length of any
				preceding non-nullable fields */
391 392
				buf = page_zip_fixed_field_encode(
					buf, fixed_sum << 1 | 1);
393 394 395 396 397 398 399 400
				fixed_sum = 0;
				col++;
			}

			*buf++ = val;
			col++;
		} else if (val) {
			/* fixed-length non-nullable field */
401

402 403 404
			if (fixed_sum && UNIV_UNLIKELY
			    (fixed_sum + field->fixed_len
			     > DICT_MAX_INDEX_COL_LEN)) {
405 406 407 408
				/* Write out the length of the
				preceding non-nullable fields,
				to avoid exceeding the maximum
				length of a fixed-length column. */
409 410
				buf = page_zip_fixed_field_encode(
					buf, fixed_sum << 1 | 1);
411 412 413 414
				fixed_sum = 0;
				col++;
			}

415 416 417 418 419
			if (i && UNIV_UNLIKELY(i == trx_id_pos)) {
				if (fixed_sum) {
					/* Write out the length of any
					preceding non-nullable fields,
					and start a new trx_id column. */
420 421
					buf = page_zip_fixed_field_encode(
						buf, fixed_sum << 1 | 1);
422
					col++;
423 424
				}

425
				trx_id_col = col;
426 427 428 429 430 431 432 433 434 435 436
				fixed_sum = field->fixed_len;
			} else {
				/* add to the sum */
				fixed_sum += field->fixed_len;
			}
		} else {
			/* fixed-length nullable field */

			if (fixed_sum) {
				/* write out the length of any
				preceding non-nullable fields */
437 438
				buf = page_zip_fixed_field_encode(
					buf, fixed_sum << 1 | 1);
439 440 441 442
				fixed_sum = 0;
				col++;
			}

443 444
			buf = page_zip_fixed_field_encode(
				buf, field->fixed_len << 1);
445 446 447 448 449 450 451 452 453 454 455
			col++;
		}
	}

	if (fixed_sum) {
		/* Write out the lengths of last fixed-length columns. */
		buf = page_zip_fixed_field_encode(buf, fixed_sum << 1 | 1);
	}

	if (trx_id_pos != ULINT_UNDEFINED) {
		/* Write out the position of the trx_id column */
marko's avatar
marko committed
456 457 458 459
		i = trx_id_col;
	} else {
		/* Write out the number of nullable fields */
		i = index->n_nullable;
460 461
	}

marko's avatar
marko committed
462 463 464 465 466 467 468 469
	if (i < 128) {
		*buf++ = i;
	} else {
		*buf++ = 0x80 | i >> 8;
		*buf++ = 0xff & i;
	}

	ut_ad((ulint) (buf - buf_start) <= (n + 2) * 2);
470 471 472
	return((ulint) (buf - buf_start));
}

473 474 475 476 477 478 479
/**************************************************************************
Populate the dense page directory from the sparse directory. */
static
void
page_zip_dir_encode(
/*================*/
	const page_t*	page,	/* in: compact page */
480 481 482
	byte*		buf,	/* in: pointer to dense page directory[-1];
				out: dense directory on compressed page */
	const rec_t**	recs)	/* in: pointer to an array of 0, or NULL;
483 484
				out: dense page directory sorted by ascending
				address (and heap_no) */
485
{
486 487 488 489 490 491 492
	const byte*	rec;
	ulint		status;
	ulint		min_mark;
	ulint		heap_no;
	ulint		i;
	ulint		n_heap;
	ulint		offs;
493 494 495

	min_mark = 0;

496 497 498
	if (page_is_leaf(page)) {
		status = REC_STATUS_ORDINARY;
	} else {
499
		status = REC_STATUS_NODE_PTR;
500
		if (UNIV_UNLIKELY
501
		    (mach_read_from_4(page + FIL_PAGE_PREV) == FIL_NULL)) {
502 503 504 505
			min_mark = REC_INFO_MIN_REC_FLAG;
		}
	}

506
	n_heap = page_dir_get_n_heap(page);
507 508 509 510

	/* Traverse the list of stored records in the collation order,
	starting from the first user record. */

511
	rec = page + PAGE_NEW_INFIMUM, TRUE;
512 513 514 515 516 517 518 519 520

	i = 0;

	for (;;) {
		ulint	info_bits;
		offs = rec_get_next_offs(rec, TRUE);
		if (UNIV_UNLIKELY(offs == PAGE_NEW_SUPREMUM)) {
			break;
		}
521
		rec = page + offs;
522
		heap_no = rec_get_heap_no_new(rec);
523
		ut_a(heap_no >= PAGE_HEAP_NO_USER_LOW);
524
		ut_a(heap_no < n_heap);
525 526 527 528 529
		ut_a(offs < UNIV_PAGE_SIZE - PAGE_DIR);
		ut_a(offs >= PAGE_ZIP_START);
#if PAGE_ZIP_DIR_SLOT_MASK & UNIV_PAGE_SIZE
# error "PAGE_ZIP_DIR_SLOT_MASK & UNIV_PAGE_SIZE"
#endif
530 531 532 533 534 535 536 537 538 539 540 541 542 543
		if (UNIV_UNLIKELY(rec_get_n_owned_new(rec))) {
			offs |= PAGE_ZIP_DIR_SLOT_OWNED;
		}

		info_bits = rec_get_info_bits(rec, TRUE);
		if (UNIV_UNLIKELY(info_bits & REC_INFO_DELETED_FLAG)) {
			info_bits &= ~REC_INFO_DELETED_FLAG;
			offs |= PAGE_ZIP_DIR_SLOT_DEL;
		}
		ut_a(info_bits == min_mark);
		/* Only the smallest user record can have
		REC_INFO_MIN_REC_FLAG set. */
		min_mark = 0;

544
		mach_write_to_2(buf - PAGE_ZIP_DIR_SLOT_SIZE * ++i, offs);
545

546 547
		if (UNIV_LIKELY_NULL(recs)) {
			/* Ensure that each heap_no occurs at most once. */
548
			ut_a(!recs[heap_no - PAGE_HEAP_NO_USER_LOW]);
549
			/* exclude infimum and supremum */
550
			recs[heap_no - PAGE_HEAP_NO_USER_LOW] = rec;
551
		}
552 553 554 555

		ut_a(rec_get_status(rec) == status);
	}

556
	offs = page_header_get_field(page, PAGE_FREE);
557 558 559 560

	/* Traverse the free list (of deleted records). */
	while (offs) {
		ut_ad(!(offs & ~PAGE_ZIP_DIR_SLOT_MASK));
561
		rec = page + offs;
562 563

		heap_no = rec_get_heap_no_new(rec);
564
		ut_a(heap_no >= PAGE_HEAP_NO_USER_LOW);
565 566 567 568 569
		ut_a(heap_no < n_heap);

		ut_a(!rec[-REC_N_NEW_EXTRA_BYTES]); /* info_bits and n_owned */
		ut_a(rec_get_status(rec) == status);

570
		mach_write_to_2(buf - PAGE_ZIP_DIR_SLOT_SIZE * ++i, offs);
571

572 573
		if (UNIV_LIKELY_NULL(recs)) {
			/* Ensure that each heap_no occurs at most once. */
574
			ut_a(!recs[heap_no - PAGE_HEAP_NO_USER_LOW]);
575
			/* exclude infimum and supremum */
576
			recs[heap_no - PAGE_HEAP_NO_USER_LOW] = rec;
577
		}
578 579 580 581 582

		offs = rec_get_next_offs(rec, TRUE);
	}

	/* Ensure that each heap no occurs at least once. */
583
	ut_a(i + PAGE_HEAP_NO_USER_LOW == n_heap);
584 585
}

586 587 588 589
/**************************************************************************
Allocate memory for zlib. */
static
void*
590 591
page_zip_malloc(
/*============*/
592
	void*	opaque,
593 594 595
	uInt	items,
	uInt	size)
{
596
	return(mem_heap_alloc(opaque, items * size));
597 598 599 600 601 602 603 604
}

/**************************************************************************
Deallocate memory for zlib. */
static
void
page_zip_free(
/*==========*/
605 606
	void*	opaque __attribute__((unused)),
	void*	address __attribute__((unused)))
607 608 609
{
}

610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
/**************************************************************************
Configure the zlib allocator to use the given memory heap. */

void
page_zip_set_alloc(
/*===============*/
	void*		stream,		/* in/out: zlib stream */
	mem_heap_t*	heap)		/* in: memory heap to use */
{
	z_stream*	strm = stream;

	strm->zalloc = page_zip_malloc;
	strm->zfree = page_zip_free;
	strm->opaque = heap;
}

626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
/* Set this variable in a debugger to enable
excessive logging in page_zip_compress(). */
ibool	page_zip_compress_dbg;

/**************************************************************************
Wrapper for deflate().  Log the operation if page_zip_compress_dbg is set. */
static
ibool
page_zip_compress_deflate(
/*======================*/
	z_streamp	strm,	/* in/out: compressed stream for deflate() */
	int		flush)	/* in: deflate() flushing method */
{
	int	status;
	if (UNIV_UNLIKELY(page_zip_compress_dbg)) {
		ut_print_buf(stderr, strm->next_in, strm->avail_in);
	}
	status = deflate(strm, flush);
	if (UNIV_UNLIKELY(page_zip_compress_dbg)) {
		fprintf(stderr, " -> %d\n", status);
	}
	return(status);
}

/* Redefine deflate(). */
# undef deflate
# define deflate page_zip_compress_deflate
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */

656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
/**************************************************************************
Compress the records of a node pointer page. */
static
int
page_zip_compress_node_ptrs(
/*========================*/
					/* out: Z_OK, or a zlib error code */
	z_stream*	c_stream,	/* in/out: compressed page stream */
	const rec_t**	recs,		/* in: dense page directory
					sorted by address */
	ulint		n_dense,	/* in: size of recs[] */
	dict_index_t*	index,		/* in: the index of the page */
	byte*		storage,	/* in: end of dense page directory */
	mem_heap_t*	heap)		/* in: temporary memory heap */
{
	int	err	= Z_OK;
	ulint*	offsets = NULL;

	do {
675
		const rec_t*	rec = *recs++;
676 677 678 679 680 681

		offsets = rec_get_offsets(rec, index, offsets,
					  ULINT_UNDEFINED, &heap);
		/* Only leaf nodes may contain externally stored columns. */
		ut_ad(!rec_offs_any_extern(offsets));

682 683 684
		UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
		UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
				   rec_offs_extra_size(offsets));
685

686 687 688 689 690 691 692 693 694 695 696 697 698
		/* Compress the extra bytes. */
		c_stream->avail_in = rec - REC_N_NEW_EXTRA_BYTES
			- c_stream->next_in;

		if (c_stream->avail_in) {
			err = deflate(c_stream, Z_NO_FLUSH);
			if (UNIV_UNLIKELY(err != Z_OK)) {
				break;
			}
		}
		ut_ad(!c_stream->avail_in);

		/* Compress the data bytes, except node_ptr. */
699
		c_stream->next_in = (byte*) rec;
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743
		c_stream->avail_in = rec_offs_data_size(offsets)
			- REC_NODE_PTR_SIZE;
		ut_ad(c_stream->avail_in);

		err = deflate(c_stream, Z_NO_FLUSH);
		if (UNIV_UNLIKELY(err != Z_OK)) {
			break;
		}

		ut_ad(!c_stream->avail_in);

		memcpy(storage - REC_NODE_PTR_SIZE
		       * (rec_get_heap_no_new(rec) - 1),
		       c_stream->next_in, REC_NODE_PTR_SIZE);
		c_stream->next_in += REC_NODE_PTR_SIZE;
	} while (--n_dense);

	return(err);
}

/**************************************************************************
Compress the records of a leaf node of a secondary index. */
static
int
page_zip_compress_sec(
/*==================*/
					/* out: Z_OK, or a zlib error code */
	z_stream*	c_stream,	/* in/out: compressed page stream */
	const rec_t**	recs,		/* in: dense page directory
					sorted by address */
	ulint		n_dense)	/* in: size of recs[] */
{
	int		err	= Z_OK;

	ut_ad(n_dense > 0);

	do {
		const rec_t*	rec = *recs++;

		/* Compress everything up to this record. */
		c_stream->avail_in = rec - REC_N_NEW_EXTRA_BYTES
			- c_stream->next_in;

		if (UNIV_LIKELY(c_stream->avail_in)) {
744 745
			UNIV_MEM_ASSERT_RW(c_stream->next_in,
					   c_stream->avail_in);
746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762
			err = deflate(c_stream, Z_NO_FLUSH);
			if (UNIV_UNLIKELY(err != Z_OK)) {
				break;
			}
		}

		ut_ad(!c_stream->avail_in);
		ut_ad(c_stream->next_in == rec - REC_N_NEW_EXTRA_BYTES);

		/* Skip the REC_N_NEW_EXTRA_BYTES. */

		c_stream->next_in = (byte*) rec;
	} while (--n_dense);

	return(err);
}

763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785
/**************************************************************************
Compress a record of a leaf node of a clustered index that contains
externally stored columns. */
static
int
page_zip_compress_clust_ext(
/*========================*/
					/* out: Z_OK, or a zlib error code */
	z_stream*	c_stream,	/* in/out: compressed page stream */
	const rec_t*	rec,		/* in: record */
	const ulint*	offsets,	/* in: rec_get_offsets(rec) */
	ulint		trx_id_col,	/* in: position of of DB_TRX_ID */
	byte*		deleted,	/* in: dense directory entry pointing
					to the head of the free list */
	byte*		storage,	/* in: end of dense page directory */
	byte**		externs,	/* in/out: pointer to the next
					available BLOB pointer */
	ulint*		n_blobs)	/* in/out: number of
					externally stored columns */
{
	int	err;
	ulint	i;

786 787 788
	UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
	UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
			   rec_offs_extra_size(offsets));
789

790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889
	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
		ulint		len;
		const byte*	src;

		if (UNIV_UNLIKELY(i == trx_id_col)) {
			ut_ad(!rec_offs_nth_extern(offsets, i));
			/* Store trx_id and roll_ptr
			in uncompressed form. */
			src = rec_get_nth_field(rec, offsets, i, &len);
			ut_ad(src + DATA_TRX_ID_LEN
			      == rec_get_nth_field(rec, offsets,
						   i + 1, &len));
			ut_ad(len == DATA_ROLL_PTR_LEN);

			/* Compress any preceding bytes. */
			c_stream->avail_in
				= src - c_stream->next_in;

			if (c_stream->avail_in) {
				err = deflate(c_stream, Z_NO_FLUSH);
				if (UNIV_UNLIKELY(err != Z_OK)) {

					return(err);
				}
			}

			ut_ad(!c_stream->avail_in);
			ut_ad(c_stream->next_in == src);

			memcpy(storage
			       - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
			       * (rec_get_heap_no_new(rec) - 1),
			       c_stream->next_in,
			       DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);

			c_stream->next_in
				+= DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;

			/* Skip also roll_ptr */
			i++;
		} else if (rec_offs_nth_extern(offsets, i)) {
			src = rec_get_nth_field(rec, offsets, i, &len);
			ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
			src += len - BTR_EXTERN_FIELD_REF_SIZE;

			c_stream->avail_in = src
				- c_stream->next_in;
			if (UNIV_LIKELY(c_stream->avail_in)) {
				err = deflate(c_stream, Z_NO_FLUSH);
				if (UNIV_UNLIKELY(err != Z_OK)) {

					return(err);
				}
			}

			ut_ad(!c_stream->avail_in);
			ut_ad(c_stream->next_in == src);

			/* Reserve space for the data at
			the end of the space reserved for
			the compressed data and the page
			modification log. */

			if (UNIV_UNLIKELY
			    (c_stream->avail_out
			     <= BTR_EXTERN_FIELD_REF_SIZE)) {
				/* out of space */
				return(Z_BUF_ERROR);
			}

			ut_ad(*externs == c_stream->next_out
			      + c_stream->avail_out
			      + 1/* end of modif. log */);

			c_stream->next_in
				+= BTR_EXTERN_FIELD_REF_SIZE;

			/* Skip deleted records. */
			if (UNIV_LIKELY_NULL
			    (page_zip_dir_find_low(
				    storage, deleted,
				    page_offset(rec)))) {
				continue;
			}

			(*n_blobs)++;
			c_stream->avail_out
				-= BTR_EXTERN_FIELD_REF_SIZE;
			*externs -= BTR_EXTERN_FIELD_REF_SIZE;

			/* Copy the BLOB pointer */
			memcpy(*externs, c_stream->next_in
			       - BTR_EXTERN_FIELD_REF_SIZE,
			       BTR_EXTERN_FIELD_REF_SIZE);
		}
	}

	return(Z_OK);
}

890 891 892 893 894 895 896 897 898 899 900 901
/**************************************************************************
Compress the records of a leaf node of a clustered index. */
static
int
page_zip_compress_clust(
/*====================*/
					/* out: Z_OK, or a zlib error code */
	z_stream*	c_stream,	/* in/out: compressed page stream */
	const rec_t**	recs,		/* in: dense page directory
					sorted by address */
	ulint		n_dense,	/* in: size of recs[] */
	dict_index_t*	index,		/* in: the index of the page */
902
	ulint*		n_blobs,	/* in: 0; out: number of
903 904 905 906 907 908 909 910 911 912 913 914
					externally stored columns */
	ulint		trx_id_col,	/* index of the trx_id column */
	byte*		deleted,	/* in: dense directory entry pointing
					to the head of the free list */
	byte*		storage,	/* in: end of dense page directory */
	mem_heap_t*	heap)		/* in: temporary memory heap */
{
	int	err		= Z_OK;
	ulint*	offsets		= NULL;
	/* BTR_EXTERN_FIELD_REF storage */
	byte*	externs		= storage - n_dense
		* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
915 916

	ut_ad(*n_blobs == 0);
917 918

	do {
919
		const rec_t*	rec = *recs++;
920 921 922

		offsets = rec_get_offsets(rec, index, offsets,
					  ULINT_UNDEFINED, &heap);
923 924
		ut_ad(rec_offs_n_fields(offsets)
		      == dict_index_get_n_fields(index));
925 926 927
		UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
		UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
				   rec_offs_extra_size(offsets));
928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944

		/* Compress the extra bytes. */
		c_stream->avail_in = rec - REC_N_NEW_EXTRA_BYTES
			- c_stream->next_in;

		if (c_stream->avail_in) {
			err = deflate(c_stream, Z_NO_FLUSH);
			if (UNIV_UNLIKELY(err != Z_OK)) {

				goto func_exit;
			}
		}
		ut_ad(!c_stream->avail_in);
		ut_ad(c_stream->next_in == rec - REC_N_NEW_EXTRA_BYTES);

		/* Compress the data bytes. */

945
		c_stream->next_in = (byte*) rec;
946 947 948 949

		/* Check if there are any externally stored columns.
		For each externally stored column, store the
		BTR_EXTERN_FIELD_REF separately. */
950 951
		if (UNIV_UNLIKELY(rec_offs_any_extern(offsets))) {
			ut_ad(dict_index_is_clust(index));
952

953 954 955
			err = page_zip_compress_clust_ext(
				c_stream, rec, offsets, trx_id_col,
				deleted, storage, &externs, n_blobs);
956

957
			if (UNIV_UNLIKELY(err != Z_OK)) {
958

959 960 961 962 963
				goto func_exit;
			}
		} else {
			ulint		len;
			const byte*	src;
964

965 966 967 968 969 970 971
			/* Store trx_id and roll_ptr in uncompressed form. */
			src = rec_get_nth_field(rec, offsets,
						trx_id_col, &len);
			ut_ad(src + DATA_TRX_ID_LEN
			      == rec_get_nth_field(rec, offsets,
						   trx_id_col + 1, &len));
			ut_ad(len == DATA_ROLL_PTR_LEN);
972 973 974
			UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
			UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
					   rec_offs_extra_size(offsets));
975

976 977
			/* Compress any preceding bytes. */
			c_stream->avail_in = src - c_stream->next_in;
978

979 980 981
			if (c_stream->avail_in) {
				err = deflate(c_stream, Z_NO_FLUSH);
				if (UNIV_UNLIKELY(err != Z_OK)) {
982

983
					return(err);
984
				}
985
			}
986

987 988
			ut_ad(!c_stream->avail_in);
			ut_ad(c_stream->next_in == src);
989

990 991 992 993 994
			memcpy(storage
			       - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
			       * (rec_get_heap_no_new(rec) - 1),
			       c_stream->next_in,
			       DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
995

996 997
			c_stream->next_in
				+= DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
998

999 1000
			/* Skip also roll_ptr */
			ut_ad(trx_id_col + 1 < rec_offs_n_fields(offsets));
1001 1002 1003
		}

		/* Compress the last bytes of the record. */
1004
		c_stream->avail_in = rec + rec_offs_data_size(offsets)
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
			- c_stream->next_in;

		if (c_stream->avail_in) {
			err = deflate(c_stream, Z_NO_FLUSH);
			if (UNIV_UNLIKELY(err != Z_OK)) {

				goto func_exit;
			}
		}
		ut_ad(!c_stream->avail_in);
	} while (--n_dense);

func_exit:
	return(err);
}

marko's avatar
marko committed
1021 1022 1023 1024 1025 1026 1027 1028
/**************************************************************************
Compress a page. */

ibool
page_zip_compress(
/*==============*/
				/* out: TRUE on success, FALSE on failure;
				page_zip will be left intact on failure. */
1029
	page_zip_des_t*	page_zip,/* in: size; out: data, n_blobs,
1030
				m_start, m_end, m_nonempty */
1031
	const page_t*	page,	/* in: uncompressed page */
1032 1033
	dict_index_t*	index,	/* in: index of the B-tree node */
	mtr_t*		mtr)	/* in: mini-transaction, or NULL */
marko's avatar
marko committed
1034 1035 1036
{
	z_stream	c_stream;
	int		err;
1037 1038 1039
	ulint		n_fields;/* number of index fields needed */
	byte*		fields;	/* index field information */
	byte*		buf;	/* compressed payload of the page */
1040
	byte*		buf_end;/* end of buf */
1041
	ulint		n_dense;
1042
	ulint		slot_size;/* amount of uncompressed bytes per record */
1043
	const rec_t**	recs;	/* dense page directory, sorted by address */
1044
	mem_heap_t*	heap;
1045 1046 1047 1048
	ulint		trx_id_col;
	ulint*		offsets	= NULL;
	ulint		n_blobs	= 0;
	byte*		storage;/* storage of uncompressed columns */
1049

1050 1051
	ut_a(page_is_comp(page));
	ut_a(fil_page_get_type(page) == FIL_PAGE_INDEX);
1052
	ut_ad(page_simple_validate_new((page_t*) page));
1053
	ut_ad(page_zip_simple_validate(page_zip));
1054

1055 1056
	UNIV_MEM_ASSERT_RW(page, UNIV_PAGE_SIZE);

1057 1058 1059 1060 1061 1062
	/* Check the data that will be omitted. */
	ut_a(!memcmp(page + (PAGE_NEW_INFIMUM - REC_N_NEW_EXTRA_BYTES),
		     infimum_extra, sizeof infimum_extra));
	ut_a(!memcmp(page + PAGE_NEW_INFIMUM,
		     infimum_data, sizeof infimum_data));
	ut_a(page[PAGE_NEW_SUPREMUM - REC_N_NEW_EXTRA_BYTES]
1063 1064
	     /* info_bits == 0, n_owned <= max */
	     <= PAGE_DIR_SLOT_MAX_N_OWNED);
1065 1066
	ut_a(!memcmp(page + (PAGE_NEW_SUPREMUM - REC_N_NEW_EXTRA_BYTES + 1),
		     supremum_extra_data, sizeof supremum_extra_data));
1067

1068 1069
	if (UNIV_UNLIKELY(!page_get_n_recs(page))) {
		ut_a(rec_get_next_offs(page + PAGE_NEW_INFIMUM, TRUE)
1070
		     == PAGE_NEW_SUPREMUM);
1071
	}
1072

1073 1074 1075 1076 1077 1078
	if (page_is_leaf(page)) {
		n_fields = dict_index_get_n_fields(index);
	} else {
		n_fields = dict_index_get_n_unique_in_tree(index);
	}

1079
	/* The dense directory excludes the infimum and supremum records. */
1080
	n_dense = page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW;
1081 1082 1083 1084 1085 1086 1087 1088
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
	if (UNIV_UNLIKELY(page_zip_compress_dbg)) {
		fprintf(stderr, "compress %p %p %lu %lu %lu\n",
			(void*) page_zip, (void*) page,
			page_is_leaf(page),
			n_fields, n_dense);
	}
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1089 1090
	page_zip_compress_count[page_zip->ssize]++;

1091
	if (UNIV_UNLIKELY(n_dense * PAGE_ZIP_DIR_SLOT_SIZE
1092
			  >= page_zip_get_size(page_zip))) {
1093 1094
		return(FALSE);
	}
1095

1096
	heap = mem_heap_create(page_zip_get_size(page_zip)
1097 1098
			       + n_fields * (2 + sizeof *offsets)
			       + n_dense * ((sizeof *recs)
1099 1100 1101
					    - PAGE_ZIP_DIR_SLOT_SIZE)
			       + UNIV_PAGE_SIZE * 4
			       + (512 << MAX_MEM_LEVEL));
1102

1103
	recs = mem_heap_zalloc(heap, n_dense * sizeof *recs);
1104

1105 1106
	fields = mem_heap_alloc(heap, (n_fields + 1) * 2);

1107 1108
	buf = mem_heap_alloc(heap, page_zip_get_size(page_zip) - PAGE_DATA);
	buf_end = buf + page_zip_get_size(page_zip) - PAGE_DATA;
1109

marko's avatar
marko committed
1110
	/* Compress the data payload. */
1111
	page_zip_set_alloc(&c_stream, heap);
marko's avatar
marko committed
1112

1113 1114 1115
	err = deflateInit2(&c_stream, Z_DEFAULT_COMPRESSION,
			   Z_DEFLATED, UNIV_PAGE_SIZE_SHIFT,
			   MAX_MEM_LEVEL, Z_DEFAULT_STRATEGY);
marko's avatar
marko committed
1116 1117 1118
	ut_a(err == Z_OK);

	c_stream.next_out = buf;
1119
	/* Subtract the space reserved for uncompressed data. */
1120 1121
	/* Page header and the end marker of the modification log */
	c_stream.avail_out = buf_end - buf - 1;
1122 1123
	/* Dense page directory and uncompressed columns, if any */
	if (page_is_leaf(page)) {
1124
		if (dict_index_is_clust(index)) {
1125 1126
			trx_id_col = dict_index_get_sys_col_pos(
				index, DATA_TRX_ID);
1127 1128 1129
			ut_ad(trx_id_col > 0);
			ut_ad(trx_id_col != ULINT_UNDEFINED);

1130
			slot_size = PAGE_ZIP_DIR_SLOT_SIZE
1131
				+ DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
1132
		} else {
1133 1134
			/* Signal the absence of trx_id
			in page_zip_fields_encode() */
1135 1136
			ut_ad(dict_index_get_sys_col_pos(index, DATA_TRX_ID)
			      == ULINT_UNDEFINED);
1137
			trx_id_col = 0;
1138
			slot_size = PAGE_ZIP_DIR_SLOT_SIZE;
1139
		}
1140
	} else {
1141
		slot_size = PAGE_ZIP_DIR_SLOT_SIZE + REC_NODE_PTR_SIZE;
1142
		trx_id_col = ULINT_UNDEFINED;
1143 1144
	}

1145 1146
	if (UNIV_UNLIKELY(c_stream.avail_out <= n_dense * slot_size
			  + 6/* sizeof(zlib header and footer) */)) {
1147 1148 1149 1150
		goto zlib_error;
	}

	c_stream.avail_out -= n_dense * slot_size;
1151 1152
	c_stream.avail_in = page_zip_fields_encode(n_fields, index,
						   trx_id_col, fields);
1153
	c_stream.next_in = fields;
1154
	if (UNIV_LIKELY(!trx_id_col)) {
1155 1156
		trx_id_col = ULINT_UNDEFINED;
	}
1157

1158
	UNIV_MEM_ASSERT_RW(c_stream.next_in, c_stream.avail_in);
1159 1160 1161 1162 1163
	err = deflate(&c_stream, Z_FULL_FLUSH);
	if (err != Z_OK) {
		goto zlib_error;
	}

1164 1165
	ut_ad(!c_stream.avail_in);

1166
	page_zip_dir_encode(page, buf_end, recs);
1167 1168 1169

	c_stream.next_in = (byte*) page + PAGE_ZIP_START;

1170
	storage = buf_end - n_dense * PAGE_ZIP_DIR_SLOT_SIZE;
1171

1172
	/* Compress the records in heap_no order. */
1173
	if (UNIV_UNLIKELY(!n_dense)) {
1174 1175
	} else if (!page_is_leaf(page)) {
		/* This is a node pointer page. */
1176 1177 1178 1179 1180
		err = page_zip_compress_node_ptrs(&c_stream, recs, n_dense,
						  index, storage, heap);
		if (UNIV_UNLIKELY(err != Z_OK)) {
			goto zlib_error;
		}
1181
	} else if (UNIV_LIKELY(trx_id_col == ULINT_UNDEFINED)) {
1182 1183 1184 1185 1186
		/* This is a leaf page in a secondary index. */
		err = page_zip_compress_sec(&c_stream, recs, n_dense);
		if (UNIV_UNLIKELY(err != Z_OK)) {
			goto zlib_error;
		}
1187 1188
	} else {
		/* This is a leaf page in a clustered index. */
1189 1190
		err = page_zip_compress_clust(&c_stream, recs, n_dense,
					      index, &n_blobs, trx_id_col,
1191
					      buf_end - PAGE_ZIP_DIR_SLOT_SIZE
1192
					      * page_get_n_recs(page),
1193 1194 1195 1196
					      storage, heap);
		if (UNIV_UNLIKELY(err != Z_OK)) {
			goto zlib_error;
		}
1197 1198
	}

1199 1200
	/* Finish the compression. */
	ut_ad(!c_stream.avail_in);
1201
	/* Compress any trailing garbage, in case the last record was
1202 1203
	allocated from an originally longer space on the free list,
	or the data of the last record from page_zip_compress_sec(). */
1204
	c_stream.avail_in
1205
		= page_header_get_field(page, PAGE_HEAP_TOP)
1206
		- (c_stream.next_in - page);
1207
	ut_a(c_stream.avail_in <= UNIV_PAGE_SIZE - PAGE_ZIP_START - PAGE_DIR);
marko's avatar
marko committed
1208

1209
	UNIV_MEM_ASSERT_RW(c_stream.next_in, c_stream.avail_in);
marko's avatar
marko committed
1210
	err = deflate(&c_stream, Z_FINISH);
1211

1212
	if (UNIV_UNLIKELY(err != Z_STREAM_END)) {
1213
zlib_error:
marko's avatar
marko committed
1214
		deflateEnd(&c_stream);
1215
		mem_heap_free(heap);
marko's avatar
marko committed
1216 1217 1218 1219 1220 1221
		return(FALSE);
	}

	err = deflateEnd(&c_stream);
	ut_a(err == Z_OK);

1222
	ut_ad(buf + c_stream.total_out == c_stream.next_out);
1223 1224
	ut_ad((ulint) (storage - c_stream.next_out) >= c_stream.avail_out);

1225 1226 1227 1228
	/* Valgrind believes that zlib does not initialize some bits
	in the last 7 or 8 bytes of the stream.  Make Valgrind happy. */
	UNIV_MEM_VALID(buf, c_stream.total_out);

1229 1230 1231 1232
	/* Zero out the area reserved for the modification log.
	Space for the end marker of the modification log is not
	included in avail_out. */
	memset(c_stream.next_out, 0, c_stream.avail_out + 1/* end marker */);
1233

1234 1235 1236 1237 1238
#ifdef UNIV_DEBUG
	page_zip->m_start =
#endif /* UNIV_DEBUG */
		page_zip->m_end = PAGE_DATA + c_stream.total_out;
	page_zip->m_nonempty = FALSE;
1239
	page_zip->n_blobs = n_blobs;
1240 1241 1242
	/* Copy those header fields that will not be written
	in buf_flush_init_for_writing() */
	memcpy(page_zip->data + FIL_PAGE_PREV, page + FIL_PAGE_PREV,
1243
	       FIL_PAGE_LSN - FIL_PAGE_PREV);
1244 1245
	memcpy(page_zip->data + FIL_PAGE_TYPE, page + FIL_PAGE_TYPE, 2);
	memcpy(page_zip->data + FIL_PAGE_DATA, page + FIL_PAGE_DATA,
1246
	       PAGE_DATA - FIL_PAGE_DATA);
1247
	/* Copy the rest of the compressed page */
1248 1249
	memcpy(page_zip->data + PAGE_DATA, buf,
	       page_zip_get_size(page_zip) - PAGE_DATA);
1250
	mem_heap_free(heap);
1251
#ifdef UNIV_ZIP_DEBUG
1252
	ut_a(page_zip_validate(page_zip, page));
1253
#endif /* UNIV_ZIP_DEBUG */
marko's avatar
marko committed
1254

1255 1256 1257 1258
	if (mtr) {
		page_zip_compress_write_log(page_zip, page, index, mtr);
	}

1259 1260
	page_zip_compress_ok[page_zip->ssize]++;

1261
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
1262

1263
	return(TRUE);
marko's avatar
marko committed
1264 1265
}

1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286
/**************************************************************************
Compare two page directory entries. */
UNIV_INLINE
ibool
page_zip_dir_cmp(
/*=============*/
				/* out: positive if rec1 > rec2 */
	const rec_t*	rec1,	/* in: rec1 */
	const rec_t*	rec2)	/* in: rec2 */
{
	return(rec1 > rec2);
}

/**************************************************************************
Sort the dense page directory by address (heap_no). */
static
void
page_zip_dir_sort(
/*==============*/
	rec_t**	arr,	/* in/out: dense page directory */
	rec_t**	aux_arr,/* in/out: work area */
1287 1288
	ulint	low,	/* in: lower bound of the sorting area, inclusive */
	ulint	high)	/* in: upper bound of the sorting area, exclusive */
1289
{
1290 1291
	UT_SORT_FUNCTION_BODY(page_zip_dir_sort, arr, aux_arr, low, high,
			      page_zip_dir_cmp);
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326
}

/**************************************************************************
Deallocate the index information initialized by page_zip_fields_decode(). */
static
void
page_zip_fields_free(
/*=================*/
	dict_index_t*	index)	/* in: dummy index to be freed */
{
	if (index) {
		dict_table_t*	table = index->table;
		mem_heap_free(index->heap);
		mutex_free(&(table->autoinc_mutex));
		mem_heap_free(table->heap);
	}
}

/**************************************************************************
Read the index information for the compressed page. */
static
dict_index_t*
page_zip_fields_decode(
/*===================*/
				/* out,own: dummy index describing the page,
				or NULL on error */
	const byte*	buf,	/* in: index information */
	const byte*	end,	/* in: end of buf */
	ulint*		trx_id_col)/* in: NULL for non-leaf pages;
				for leaf pages, pointer to where to store
				the position of the trx_id column */
{
	const byte*	b;
	ulint		n;
	ulint		i;
marko's avatar
marko committed
1327
	ulint		val;
1328 1329 1330 1331 1332 1333 1334 1335 1336 1337
	dict_table_t*	table;
	dict_index_t*	index;

	/* Determine the number of fields. */
	for (b = buf, n = 0; b < end; n++) {
		if (*b++ & 0x80) {
			b++; /* skip the second byte */
		}
	}

marko's avatar
marko committed
1338 1339
	n--; /* n_nullable or trx_id */

1340
	if (UNIV_UNLIKELY(n > REC_MAX_N_FIELDS)
1341
	    || UNIV_UNLIKELY(b > end)) {
1342 1343 1344 1345

		return(NULL);
	}

1346
	table = dict_mem_table_create("ZIP_DUMMY", DICT_HDR_SPACE, n,
1347
				      DICT_TF_COMPACT);
1348
	index = dict_mem_index_create("ZIP_DUMMY", "ZIP_DUMMY",
1349
				      DICT_HDR_SPACE, 0, n);
1350 1351 1352 1353 1354 1355 1356 1357 1358 1359
	index->table = table;
	index->n_uniq = n;
	/* avoid ut_ad(index->cached) in dict_index_get_n_unique_in_tree */
	index->cached = TRUE;

	/* Initialize the fields. */
	for (b = buf, i = 0; i < n; i++) {
		ulint	mtype;
		ulint	len;

marko's avatar
marko committed
1360 1361
		val = *b++;

1362
		if (UNIV_UNLIKELY(val & 0x80)) {
1363
			/* fixed length > 62 bytes */
1364
			val = (val & 0x7f) << 8 | *b++;
1365 1366 1367 1368
			len = val >> 1;
			mtype = DATA_FIXBINARY;
		} else if (UNIV_UNLIKELY(val >= 126)) {
			/* variable length with max > 255 bytes */
1369 1370
			len = 0x7fff;
			mtype = DATA_BINARY;
1371 1372 1373 1374 1375 1376 1377
		} else if (val <= 1) {
			/* variable length with max <= 255 bytes */
			len = 0;
			mtype = DATA_BINARY;
		} else {
			/* fixed length < 62 bytes */
			len = val >> 1;
1378 1379
			mtype = DATA_FIXBINARY;
		}
1380

1381
		dict_mem_table_add_col(table, NULL, NULL, mtype,
1382
				       val & 1 ? DATA_NOT_NULL : 0, len);
1383
		dict_index_add_col(index, table,
1384
				   dict_table_get_nth_col(table, i), 0);
1385 1386
	}

marko's avatar
marko committed
1387 1388 1389 1390 1391
	val = *b++;
	if (UNIV_UNLIKELY(val & 0x80)) {
		val = (val & 0x7f) << 8 | *b++;
	}

1392 1393 1394 1395
	/* Decode the position of the trx_id column. */
	if (trx_id_col) {
		if (!val) {
			val = ULINT_UNDEFINED;
1396 1397 1398
		} else if (UNIV_UNLIKELY(val >= n)) {
			page_zip_fields_free(index);
			index = NULL;
1399 1400
		} else {
			index->type = DICT_CLUSTERED;
1401 1402 1403
		}

		*trx_id_col = val;
marko's avatar
marko committed
1404 1405 1406 1407 1408 1409 1410 1411
	} else {
		/* Decode the number of nullable fields. */
		if (UNIV_UNLIKELY(index->n_nullable > val)) {
			page_zip_fields_free(index);
			index = NULL;
		} else {
			index->n_nullable = val;
		}
1412 1413 1414 1415 1416
	}

	ut_ad(b == end);

	return(index);
1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434
}

/**************************************************************************
Populate the sparse page directory from the dense directory. */
static
ibool
page_zip_dir_decode(
/*================*/
					/* out: TRUE on success,
					FALSE on failure */
	const page_zip_des_t*	page_zip,/* in: dense page directory on
					compressed page */
	page_t*			page,	/* in: compact page with valid header;
					out: trailer and sparse page directory
					filled in */
	rec_t**			recs,	/* out: dense page directory sorted by
					ascending address (and heap_no) */
	rec_t**			recs_aux,/* in/out: scratch area */
1435
	ulint			n_dense)/* in: number of user records, and
1436 1437 1438 1439 1440 1441 1442 1443
					size of recs[] and recs_aux[] */
{
	ulint	i;
	ulint	n_recs;
	byte*	slot;

	n_recs = page_get_n_recs(page);

1444 1445 1446 1447
	if (UNIV_UNLIKELY(n_recs > n_dense)) {
		return(FALSE);
	}

1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470
	/* Traverse the list of stored records in the sorting order,
	starting from the first user record. */

	slot = page + (UNIV_PAGE_SIZE - PAGE_DIR - PAGE_DIR_SLOT_SIZE);
	UNIV_PREFETCH_RW(slot);

	/* Zero out the page trailer. */
	memset(slot + PAGE_DIR_SLOT_SIZE, 0, PAGE_DIR);

	mach_write_to_2(slot, PAGE_NEW_INFIMUM);
	slot -= PAGE_DIR_SLOT_SIZE;
	UNIV_PREFETCH_RW(slot);

	/* Initialize the sparse directory and copy the dense directory. */
	for (i = 0; i < n_recs; i++) {
		ulint	offs = page_zip_dir_get(page_zip, i);

		if (offs & PAGE_ZIP_DIR_SLOT_OWNED) {
			mach_write_to_2(slot, offs & PAGE_ZIP_DIR_SLOT_MASK);
			slot -= PAGE_DIR_SLOT_SIZE;
			UNIV_PREFETCH_RW(slot);
		}

1471
		if (UNIV_UNLIKELY((offs & PAGE_ZIP_DIR_SLOT_MASK)
1472
				  < PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES)) {
1473 1474
			return(FALSE);
		}
1475

1476 1477 1478 1479
		recs[i] = page + (offs & PAGE_ZIP_DIR_SLOT_MASK);
	}

	mach_write_to_2(slot, PAGE_NEW_SUPREMUM);
1480 1481 1482
	if (UNIV_UNLIKELY
	    (slot != page_dir_get_nth_slot(page,
					   page_dir_get_n_slots(page) - 1))) {
1483 1484 1485 1486
		return(FALSE);
	}

	/* Copy the rest of the dense directory. */
1487
	for (; i < n_dense; i++) {
1488 1489 1490 1491 1492 1493 1494 1495 1496
		ulint	offs = page_zip_dir_get(page_zip, i);

		if (UNIV_UNLIKELY(offs & ~PAGE_ZIP_DIR_SLOT_MASK)) {
			return(FALSE);
		}

		recs[i] = page + offs;
	}

1497
	if (UNIV_LIKELY(n_dense > 1)) {
1498
		page_zip_dir_sort(recs, recs_aux, 0, n_dense);
1499 1500 1501 1502
	}
	return(TRUE);
}

1503 1504
/**************************************************************************
Initialize the REC_N_NEW_EXTRA_BYTES of each record. */
1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
static
ibool
page_zip_set_extra_bytes(
/*=====================*/
					/* out: TRUE on success,
					FALSE on failure */
	const page_zip_des_t*	page_zip,/* in: compressed page */
	page_t*			page,	/* in/out: uncompressed page */
	ulint			info_bits)/* in: REC_INFO_MIN_REC_FLAG or 0 */
{
	ulint	n;
	ulint	i;
	ulint	n_owned = 1;
	ulint	offs;
	rec_t*	rec;

	n = page_get_n_recs(page);
	rec = page + PAGE_NEW_INFIMUM;

	for (i = 0; i < n; i++) {
		offs = page_zip_dir_get(page_zip, i);

		if (UNIV_UNLIKELY(offs & PAGE_ZIP_DIR_SLOT_DEL)) {
			info_bits |= REC_INFO_DELETED_FLAG;
		}
		if (UNIV_UNLIKELY(offs & PAGE_ZIP_DIR_SLOT_OWNED)) {
			info_bits |= n_owned;
			n_owned = 1;
		} else {
			n_owned++;
		}
		offs &= PAGE_ZIP_DIR_SLOT_MASK;
1537
		if (UNIV_UNLIKELY(offs < PAGE_ZIP_START
1538
				  + REC_N_NEW_EXTRA_BYTES)) {
1539 1540
			return(FALSE);
		}
1541

1542
		rec_set_next_offs_new(rec, offs);
1543
		rec = page + offs;
1544 1545
		rec[-REC_N_NEW_EXTRA_BYTES] = info_bits;
		info_bits = 0;
1546 1547 1548
	}

	/* Set the next pointer of the last user record. */
1549
	rec_set_next_offs_new(rec, PAGE_NEW_SUPREMUM);
1550 1551 1552 1553

	/* Set n_owned of the supremum record. */
	page[PAGE_NEW_SUPREMUM - REC_N_NEW_EXTRA_BYTES] = n_owned;

1554
	/* The dense directory excludes the infimum and supremum records. */
1555
	n = page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW;
1556 1557

	if (i >= n) {
1558

1559
		return(UNIV_LIKELY(i == n));
1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573
	}

	offs = page_zip_dir_get(page_zip, i);

	/* Set the extra bytes of deleted records on the free list. */
	for (;;) {
		if (UNIV_UNLIKELY(!offs)
		    || UNIV_UNLIKELY(offs & ~PAGE_ZIP_DIR_SLOT_MASK)) {
			return(FALSE);
		}

		rec = page + offs;
		rec[-REC_N_NEW_EXTRA_BYTES] = 0; /* info_bits and n_owned */

1574 1575 1576 1577 1578
		if (++i == n) {
			break;
		}

		offs = page_zip_dir_get(page_zip, i);
1579
		rec_set_next_offs_new(rec, offs);
1580 1581 1582 1583
	}

	/* Terminate the free list. */
	rec[-REC_N_NEW_EXTRA_BYTES] = 0; /* info_bits and n_owned */
1584
	rec_set_next_offs_new(rec, 0);
1585

1586
	return(TRUE);
1587 1588
}

1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663
/**************************************************************************
Apply the modification log to a record containing externally stored
columns.  Do not copy the fields that are stored separately. */
static
const byte*
page_zip_apply_log_ext(
/*===================*/
					/* out: pointer to modification log,
					or NULL on failure */
	rec_t*		rec,		/* in/out: record */
	const ulint*	offsets,	/* in: rec_get_offsets(rec) */
	ulint		trx_id_col,	/* in: position of of DB_TRX_ID */
	const byte*	data,		/* in: modification log */
	const byte*	end)		/* in: end of modification log */
{
	ulint	i;
	ulint	len;
	byte*	next_out = rec;

	/* Check if there are any externally stored columns.
	For each externally stored column, skip the
	BTR_EXTERN_FIELD_REF. */

	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
		byte*	dst;

		if (UNIV_UNLIKELY(i == trx_id_col)) {
			/* Skip trx_id and roll_ptr */
			dst = rec_get_nth_field(rec, offsets,
						i, &len);
			if (UNIV_UNLIKELY(dst - next_out
					  >= end - data)
			    || UNIV_UNLIKELY
			    (len < (DATA_TRX_ID_LEN
				    + DATA_ROLL_PTR_LEN))
			    || rec_offs_nth_extern(offsets,
						   i)) {
				return(NULL);
			}

			memcpy(next_out, data, dst - next_out);
			data += dst - next_out;
			next_out = dst + (DATA_TRX_ID_LEN
					  + DATA_ROLL_PTR_LEN);
		} else if (rec_offs_nth_extern(offsets, i)) {
			dst = rec_get_nth_field(rec, offsets,
						i, &len);
			ut_ad(len
			      >= BTR_EXTERN_FIELD_REF_SIZE);

			len += dst - next_out
				- BTR_EXTERN_FIELD_REF_SIZE;

			if (UNIV_UNLIKELY(data + len >= end)) {
				return(NULL);
			}

			memcpy(next_out, data, len);
			data += len;
			next_out += len
				+ BTR_EXTERN_FIELD_REF_SIZE;
		}
	}

	/* Copy the last bytes of the record. */
	len = rec_get_end(rec, offsets) - next_out;
	if (UNIV_UNLIKELY(data + len >= end)) {
		return(NULL);
	}
	memcpy(next_out, data, len);
	data += len;

	return(data);
}

1664
/**************************************************************************
1665 1666
Apply the modification log to an uncompressed page.
Do not copy the fields that are stored separately. */
1667 1668 1669 1670 1671 1672 1673
static
const byte*
page_zip_apply_log(
/*===============*/
				/* out: pointer to end of modification log,
				or NULL on failure */
	const byte*	data,	/* in: modification log */
1674
	ulint		size,	/* in: maximum length of the log, in bytes */
1675
	rec_t**		recs,	/* in: dense page directory,
1676 1677
				sorted by address (indexed by
				heap_no - PAGE_HEAP_NO_USER_LOW) */
1678
	ulint		n_dense,/* in: size of recs[] */
1679 1680
	ulint		trx_id_col,/* in: column number of trx_id in the index,
				or ULINT_UNDEFINED if none */
1681 1682 1683 1684 1685 1686
	ulint		heap_status,
				/* in: heap_no and status bits for
				the next record to uncompress */
	dict_index_t*	index,	/* in: index of the page */
	ulint*		offsets)/* in/out: work area for
				rec_get_offsets_reverse() */
1687
{
1688 1689
	const byte* const end = data + size;

1690
	for (;;) {
1691
		ulint	val;
1692 1693 1694 1695
		rec_t*	rec;
		ulint	len;
		ulint	hs;

1696 1697
		val = *data++;
		if (UNIV_UNLIKELY(!val)) {
1698
			return(data - 1);
1699
		}
1700
		if (val & 0x80) {
1701
			val = (val & 0x7f) << 8 | *data++;
1702 1703 1704
			if (UNIV_UNLIKELY(!val)) {
				return(NULL);
			}
1705 1706
		}
		if (UNIV_UNLIKELY(data >= end)) {
1707 1708
			return(NULL);
		}
1709
		if (UNIV_UNLIKELY((val >> 1) > n_dense)) {
1710 1711
			return(NULL);
		}
1712 1713

		/* Determine the heap number and status bits of the record. */
1714 1715 1716 1717 1718 1719 1720
		rec = recs[(val >> 1) - 1];

		if (val & 1) {
			/* Clear the data bytes of the record. */
			mem_heap_t*	heap	= NULL;
			ulint*		offs;
			offs = rec_get_offsets(rec, index, offsets,
1721
					       ULINT_UNDEFINED, &heap);
1722 1723 1724 1725 1726 1727 1728 1729 1730
			memset(rec, 0, rec_offs_data_size(offs));

			if (UNIV_LIKELY_NULL(heap)) {
				mem_heap_free(heap);
			}
			continue;
		}

		hs = ((val >> 1) + 1) << REC_HEAP_NO_SHIFT;
1731 1732 1733 1734 1735 1736 1737 1738 1739 1740
		hs |= heap_status & ((1 << REC_HEAP_NO_SHIFT) - 1);

		/* This may either be an old record that is being
		overwritten (updated in place, or allocated from
		the free list), or a new record, with the next
		available_heap_no. */
		if (UNIV_UNLIKELY(hs > heap_status)) {
			return(NULL);
		} else if (hs == heap_status) {
			/* A new record was allocated from the heap. */
marko's avatar
marko committed
1741
			heap_status += 1 << REC_HEAP_NO_SHIFT;
1742 1743
		}

marko's avatar
marko committed
1744 1745 1746 1747
		mach_write_to_2(rec - REC_NEW_HEAP_NO, hs);
#if REC_STATUS_NODE_PTR != TRUE
# error "REC_STATUS_NODE_PTR != TRUE"
#endif
1748
		rec_get_offsets_reverse(data, index,
1749 1750
					hs & REC_STATUS_NODE_PTR,
					offsets);
marko's avatar
marko committed
1751
		rec_offs_make_valid(rec, index, offsets);
1752 1753 1754

		/* Copy the extra bytes (backwards). */
		{
1755 1756 1757 1758
			byte*	start	= rec_get_start(rec, offsets);
			byte*	b	= rec - REC_N_NEW_EXTRA_BYTES;
			while (b != start) {
				*--b = *data++;
1759 1760 1761 1762
			}
		}

		/* Copy the data bytes. */
1763
		if (UNIV_UNLIKELY(rec_offs_any_extern(offsets))) {
1764 1765
			/* Non-leaf nodes should not contain any
			externally stored columns. */
1766
			if (UNIV_UNLIKELY(hs & REC_STATUS_NODE_PTR)) {
1767 1768
				return(NULL);
			}
1769

1770 1771 1772 1773 1774 1775 1776
			data = page_zip_apply_log_ext(
				rec, offsets, trx_id_col, data, end);

			if (UNIV_UNLIKELY(!data)) {
				return(NULL);
			}
		} else if (UNIV_UNLIKELY(hs & REC_STATUS_NODE_PTR)) {
1777
			len = rec_offs_data_size(offsets)
1778
				- REC_NODE_PTR_SIZE;
1779 1780 1781 1782 1783 1784
			/* Copy the data bytes, except node_ptr. */
			if (UNIV_UNLIKELY(data + len >= end)) {
				return(NULL);
			}
			memcpy(rec, data, len);
			data += len;
1785 1786
		} else if (UNIV_LIKELY(trx_id_col == ULINT_UNDEFINED)) {
			len = rec_offs_data_size(offsets);
1787

1788 1789 1790 1791 1792
			/* Copy all data bytes of
			a record in a secondary index. */
			if (UNIV_UNLIKELY(data + len >= end)) {
				return(NULL);
			}
1793

1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805
			memcpy(rec, data, len);
			data += len;
		} else {
			/* Skip DB_TRX_ID and DB_ROLL_PTR. */
			ulint	l = rec_get_nth_field_offs(offsets,
							   trx_id_col, &len);
			byte*	b;

			if (UNIV_UNLIKELY(data + l >= end)
			    || UNIV_UNLIKELY(len < (DATA_TRX_ID_LEN
						    + DATA_ROLL_PTR_LEN))) {
				return(NULL);
1806 1807
			}

1808 1809 1810 1811 1812 1813 1814
			/* Copy any preceding data bytes. */
			memcpy(rec, data, l);
			data += l;

			/* Copy any bytes following DB_TRX_ID, DB_ROLL_PTR. */
			b = rec + l + (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
			len = rec_get_end(rec, offsets) - b;
1815 1816 1817
			if (UNIV_UNLIKELY(data + len >= end)) {
				return(NULL);
			}
1818
			memcpy(b, data, len);
1819 1820
			data += len;
		}
1821 1822 1823
	}
}

marko's avatar
marko committed
1824
/**************************************************************************
1825 1826
Decompress the records of a node pointer page. */
static
marko's avatar
marko committed
1827
ibool
1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839
page_zip_decompress_node_ptrs(
/*==========================*/
					/* out: TRUE on success,
					FALSE on failure */
	page_zip_des_t*	page_zip,	/* in/out: compressed page */
	z_stream*	d_stream,	/* in/out: compressed page stream */
	rec_t**		recs,		/* in: dense page directory
					sorted by address */
	ulint		n_dense,	/* in: size of recs[] */
	dict_index_t*	index,		/* in: the index of the page */
	ulint*		offsets,	/* in/out: temporary offsets */
	mem_heap_t*	heap)		/* in: temporary memory heap */
marko's avatar
marko committed
1840
{
1841
	ulint		heap_status = REC_STATUS_NODE_PTR
1842
		| PAGE_HEAP_NO_USER_LOW << REC_HEAP_NO_SHIFT;
1843
	ulint		slot;
1844
	const byte*	storage;
marko's avatar
marko committed
1845

1846 1847 1848
	/* Subtract the space reserved for uncompressed data. */
	d_stream->avail_in -= n_dense
		* (PAGE_ZIP_DIR_SLOT_SIZE + REC_NODE_PTR_SIZE);
marko's avatar
marko committed
1849

1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912
	/* Decompress the records in heap_no order. */
	for (slot = 0; slot < n_dense; slot++) {
		rec_t*	rec = recs[slot];

		d_stream->avail_out = rec - REC_N_NEW_EXTRA_BYTES
			- d_stream->next_out;

		ut_ad(d_stream->avail_out < UNIV_PAGE_SIZE
		      - PAGE_ZIP_START - PAGE_DIR);
		switch (inflate(d_stream, Z_SYNC_FLUSH)) {
		case Z_STREAM_END:
			/* Apparently, n_dense has grown
			since the time the page was last compressed. */
			goto zlib_done;
		case Z_OK:
		case Z_BUF_ERROR:
			if (!d_stream->avail_out) {
				break;
			}
			/* fall through */
		default:
			goto zlib_error;
		}

		ut_ad(d_stream->next_out == rec - REC_N_NEW_EXTRA_BYTES);
		/* Prepare to decompress the data bytes. */
		d_stream->next_out = rec;
		/* Set heap_no and the status bits. */
		mach_write_to_2(rec - REC_NEW_HEAP_NO, heap_status);
		heap_status += 1 << REC_HEAP_NO_SHIFT;

		/* Read the offsets. The status bits are needed here. */
		offsets = rec_get_offsets(rec, index, offsets,
					  ULINT_UNDEFINED, &heap);

		/* Non-leaf nodes should not have any externally
		stored columns. */
		ut_ad(!rec_offs_any_extern(offsets));

		/* Decompress the data bytes, except node_ptr. */
		d_stream->avail_out = rec_offs_data_size(offsets)
			- REC_NODE_PTR_SIZE;

		switch (inflate(d_stream, Z_SYNC_FLUSH)) {
		case Z_STREAM_END:
			goto zlib_done;
		case Z_OK:
		case Z_BUF_ERROR:
			if (!d_stream->avail_out) {
				break;
			}
			/* fall through */
		default:
			goto zlib_error;
		}

		/* Clear the node pointer in case the record
		will be deleted and the space will be reallocated
		to a smaller record. */
		memset(d_stream->next_out, 0, REC_NODE_PTR_SIZE);
		d_stream->next_out += REC_NODE_PTR_SIZE;

		ut_ad(d_stream->next_out == rec_get_end(rec, offsets));
1913
	}
1914

1915 1916 1917 1918 1919 1920 1921
	/* Decompress any trailing garbage, in case the last record was
	allocated from an originally longer space on the free list. */
	d_stream->avail_out = page_header_get_field(page_zip->data,
						    PAGE_HEAP_TOP)
		- page_offset(d_stream->next_out);
	if (UNIV_UNLIKELY(d_stream->avail_out > UNIV_PAGE_SIZE
			  - PAGE_ZIP_START - PAGE_DIR)) {
1922

1923 1924
		goto zlib_error;
	}
1925

1926 1927 1928
	if (UNIV_UNLIKELY(inflate(d_stream, Z_FINISH) != Z_STREAM_END)) {
zlib_error:
		inflateEnd(d_stream);
1929 1930 1931
		return(FALSE);
	}

1932 1933 1934 1935 1936 1937
	/* Note that d_stream->avail_out > 0 may hold here
	if the modification log is nonempty. */

zlib_done:
	if (UNIV_UNLIKELY(inflateEnd(d_stream) != Z_OK)) {
		ut_error;
1938 1939
	}

1940 1941
	{
		page_t*	page = page_align(d_stream->next_out);
marko's avatar
marko committed
1942

1943 1944 1945 1946 1947
		/* Clear the unused heap space on the uncompressed page. */
		memset(d_stream->next_out, 0,
		       page_dir_get_nth_slot(page,
					     page_dir_get_n_slots(page) - 1)
		       - d_stream->next_out);
marko's avatar
marko committed
1948
	}
marko's avatar
marko committed
1949

1950
#ifdef UNIV_DEBUG
1951
	page_zip->m_start = PAGE_DATA + d_stream->total_in;
1952
#endif /* UNIV_DEBUG */
1953

1954 1955 1956
	/* Apply the modification log. */
	{
		const byte*	mod_log_ptr;
1957
		mod_log_ptr = page_zip_apply_log(d_stream->next_in,
1958 1959 1960 1961
						 d_stream->avail_in + 1,
						 recs, n_dense,
						 ULINT_UNDEFINED, heap_status,
						 index, offsets);
1962

1963 1964 1965 1966
		if (UNIV_UNLIKELY(!mod_log_ptr)) {
			return(FALSE);
		}
		page_zip->m_end = mod_log_ptr - page_zip->data;
1967
		page_zip->m_nonempty = mod_log_ptr != d_stream->next_in;
1968 1969
		ut_a(page_zip_get_trailer_len(page_zip,
					      dict_index_is_clust(index), NULL)
1970
		     + page_zip->m_end < page_zip_get_size(page_zip));
1971
	}
1972

1973
	/* Restore the uncompressed columns in heap_no order. */
1974
	storage	= page_zip->data + page_zip_get_size(page_zip)
1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988
		- n_dense * PAGE_ZIP_DIR_SLOT_SIZE;

	for (slot = 0; slot < n_dense; slot++) {
		rec_t*		rec	= recs[slot];

		offsets = rec_get_offsets(rec, index, offsets,
					  ULINT_UNDEFINED, &heap);
		/* Non-leaf nodes should not have any externally
		stored columns. */
		ut_ad(!rec_offs_any_extern(offsets));
		storage -= REC_NODE_PTR_SIZE;

		memcpy(rec_get_end(rec, offsets) - REC_NODE_PTR_SIZE,
		       storage, REC_NODE_PTR_SIZE);
1989 1990
	}

1991 1992
	return(TRUE);
}
1993

1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009
/**************************************************************************
Decompress the records of a leaf node of a secondary index. */
static
ibool
page_zip_decompress_sec(
/*====================*/
					/* out: TRUE on success,
					FALSE on failure */
	page_zip_des_t*	page_zip,	/* in/out: compressed page */
	z_stream*	d_stream,	/* in/out: compressed page stream */
	rec_t**		recs,		/* in: dense page directory
					sorted by address */
	ulint		n_dense,	/* in: size of recs[] */
	dict_index_t*	index,		/* in: the index of the page */
	ulint*		offsets)	/* in/out: temporary offsets */
{
2010 2011
	ulint	heap_status	= REC_STATUS_ORDINARY
		| PAGE_HEAP_NO_USER_LOW << REC_HEAP_NO_SHIFT;
2012 2013
	ulint	slot;

2014 2015
	ut_a(!dict_index_is_clust(index));

2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061
	/* Subtract the space reserved for uncompressed data. */
	d_stream->avail_in -= n_dense * PAGE_ZIP_DIR_SLOT_SIZE;

	for (slot = 0; slot < n_dense; slot++) {
		rec_t*	rec = recs[slot];

		/* Decompress everything up to this record. */
		d_stream->avail_out = rec - REC_N_NEW_EXTRA_BYTES
			- d_stream->next_out;

		if (UNIV_LIKELY(d_stream->avail_out)) {
			switch (inflate(d_stream, Z_SYNC_FLUSH)) {
			case Z_STREAM_END:
				/* Apparently, n_dense has grown
				since the time the page was last compressed. */
				goto zlib_done;
			case Z_OK:
			case Z_BUF_ERROR:
				if (!d_stream->avail_out) {
					break;
				}
				/* fall through */
			default:
				goto zlib_error;
			}
		}

		ut_ad(d_stream->next_out == rec - REC_N_NEW_EXTRA_BYTES);

		/* Skip the REC_N_NEW_EXTRA_BYTES. */

		d_stream->next_out = rec;

		/* Set heap_no and the status bits. */
		mach_write_to_2(rec - REC_NEW_HEAP_NO, heap_status);
		heap_status += 1 << REC_HEAP_NO_SHIFT;
	}

	/* Decompress the data of the last record and any trailing garbage,
	in case the last record was allocated from an originally longer space
	on the free list. */
	d_stream->avail_out = page_header_get_field(page_zip->data,
						    PAGE_HEAP_TOP)
		- page_offset(d_stream->next_out);
	if (UNIV_UNLIKELY(d_stream->avail_out > UNIV_PAGE_SIZE
			  - PAGE_ZIP_START - PAGE_DIR)) {
2062 2063 2064 2065

		goto zlib_error;
	}

2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078
	if (UNIV_UNLIKELY(inflate(d_stream, Z_FINISH) != Z_STREAM_END)) {
zlib_error:
		inflateEnd(d_stream);
		return(FALSE);
	}

	/* Note that d_stream->avail_out > 0 may hold here
	if the modification log is nonempty. */

zlib_done:
	if (UNIV_UNLIKELY(inflateEnd(d_stream) != Z_OK)) {
		ut_error;
	}
2079 2080

	{
2081
		page_t*	page = page_align(d_stream->next_out);
2082

2083 2084 2085 2086 2087 2088
		/* Clear the unused heap space on the uncompressed page. */
		memset(d_stream->next_out, 0,
		       page_dir_get_nth_slot(page,
					     page_dir_get_n_slots(page) - 1)
		       - d_stream->next_out);
	}
2089

2090
#ifdef UNIV_DEBUG
2091
	page_zip->m_start = PAGE_DATA + d_stream->total_in;
2092
#endif /* UNIV_DEBUG */
2093 2094 2095 2096

	/* Apply the modification log. */
	{
		const byte*	mod_log_ptr;
2097
		mod_log_ptr = page_zip_apply_log(d_stream->next_in,
2098 2099 2100 2101 2102 2103 2104 2105 2106
						 d_stream->avail_in + 1,
						 recs, n_dense,
						 ULINT_UNDEFINED, heap_status,
						 index, offsets);

		if (UNIV_UNLIKELY(!mod_log_ptr)) {
			return(FALSE);
		}
		page_zip->m_end = mod_log_ptr - page_zip->data;
2107
		page_zip->m_nonempty = mod_log_ptr != d_stream->next_in;
2108
		ut_a(page_zip_get_trailer_len(page_zip, FALSE, NULL)
2109
		     + page_zip->m_end < page_zip_get_size(page_zip));
2110 2111
	}

2112 2113 2114 2115 2116 2117
	/* There are no uncompressed columns on leaf pages of
	secondary indexes. */

	return(TRUE);
}

2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218
/**************************************************************************
Decompress a record of a leaf node of a clustered index that contains
externally stored columns. */
static
ibool
page_zip_decompress_clust_ext(
/*==========================*/
					/* out: TRUE on success */
	z_stream*	d_stream,	/* in/out: compressed page stream */
	rec_t*		rec,		/* in/out: record */
	const ulint*	offsets,	/* in: rec_get_offsets(rec) */
	ulint		trx_id_col)	/* in: position of of DB_TRX_ID */
{
	ulint	i;

	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
		ulint	len;
		byte*	dst;

		if (UNIV_UNLIKELY(i == trx_id_col)) {
			/* Skip trx_id and roll_ptr */
			dst = rec_get_nth_field(rec, offsets, i, &len);
			if (UNIV_UNLIKELY(len < DATA_TRX_ID_LEN
					  + DATA_ROLL_PTR_LEN)
			    || rec_offs_nth_extern(offsets, i)) {

				return(FALSE);
			}

			d_stream->avail_out = dst - d_stream->next_out;

			switch (inflate(d_stream, Z_SYNC_FLUSH)) {
			case Z_STREAM_END:
			case Z_OK:
			case Z_BUF_ERROR:
				if (!d_stream->avail_out) {
					break;
				}
				/* fall through */
			default:
				return(FALSE);
			}

			ut_ad(d_stream->next_out == dst);

			d_stream->next_out += DATA_TRX_ID_LEN
				+ DATA_ROLL_PTR_LEN;
		} else if (rec_offs_nth_extern(offsets, i)) {
			dst = rec_get_nth_field(rec, offsets, i, &len);
			ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
			dst += len - BTR_EXTERN_FIELD_REF_SIZE;

			d_stream->avail_out = dst - d_stream->next_out;
			switch (inflate(d_stream,
					Z_SYNC_FLUSH)) {
			case Z_STREAM_END:
			case Z_OK:
			case Z_BUF_ERROR:
				if (!d_stream->avail_out) {
					break;
				}
				/* fall through */
			default:
				return(FALSE);
			}

			ut_ad(d_stream->next_out == dst);

			/* Reserve space for the data at
			the end of the space reserved for
			the compressed data and the
			page modification log. */

			if (UNIV_UNLIKELY
			    (d_stream->avail_in
			     <= BTR_EXTERN_FIELD_REF_SIZE)) {
				/* out of space */
				return(FALSE);
			}

			/* Clear the BLOB pointer in case
			the record will be deleted and the
			space will not be reused.  Note that
			the final initialization of the BLOB
			pointers (copying from "externs"
			or clearing) will have to take place
			only after the page modification log
			has been applied.  Otherwise, we
			could end up with an uninitialized
			BLOB pointer when a record is deleted,
			reallocated and deleted. */
			memset(d_stream->next_out, 0,
			       BTR_EXTERN_FIELD_REF_SIZE);
			d_stream->next_out
				+= BTR_EXTERN_FIELD_REF_SIZE;
		}
	}

	return(TRUE);
}

2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239
/**************************************************************************
Compress the records of a leaf node of a clustered index. */
static
ibool
page_zip_decompress_clust(
/*======================*/
					/* out: TRUE on success,
					FALSE on failure */
	page_zip_des_t*	page_zip,	/* in/out: compressed page */
	z_stream*	d_stream,	/* in/out: compressed page stream */
	rec_t**		recs,		/* in: dense page directory
					sorted by address */
	ulint		n_dense,	/* in: size of recs[] */
	dict_index_t*	index,		/* in: the index of the page */
	ulint		trx_id_col,	/* index of the trx_id column */
	ulint*		offsets,	/* in/out: temporary offsets */
	mem_heap_t*	heap)		/* in: temporary memory heap */
{
	int		err;
	ulint		slot;
	ulint		heap_status	= REC_STATUS_ORDINARY
2240
		| PAGE_HEAP_NO_USER_LOW << REC_HEAP_NO_SHIFT;
2241 2242 2243
	const byte*	storage;
	const byte*	externs;

2244 2245
	ut_a(dict_index_is_clust(index));

2246 2247 2248 2249 2250
	/* Subtract the space reserved for uncompressed data. */
	d_stream->avail_in -= n_dense * (PAGE_ZIP_DIR_SLOT_SIZE
					 + DATA_TRX_ID_LEN
					 + DATA_ROLL_PTR_LEN);

2251
	/* Decompress the records in heap_no order. */
2252
	for (slot = 0; slot < n_dense; slot++) {
2253
		rec_t*	rec	= recs[slot];
2254

2255 2256
		d_stream->avail_out = rec - REC_N_NEW_EXTRA_BYTES
			- d_stream->next_out;
2257

2258
		ut_ad(d_stream->avail_out < UNIV_PAGE_SIZE
2259
		      - PAGE_ZIP_START - PAGE_DIR);
2260 2261
		err = inflate(d_stream, Z_SYNC_FLUSH);
		switch (err) {
2262 2263
		case Z_STREAM_END:
			/* Apparently, n_dense has grown
2264
			since the time the page was last compressed. */
2265
			goto zlib_done;
2266
		case Z_OK:
2267
		case Z_BUF_ERROR:
2268
			if (UNIV_LIKELY(!d_stream->avail_out)) {
2269 2270
				break;
			}
2271
			/* fall through */
2272 2273 2274 2275
		default:
			goto zlib_error;
		}

2276
		ut_ad(d_stream->next_out == rec - REC_N_NEW_EXTRA_BYTES);
2277
		/* Prepare to decompress the data bytes. */
2278
		d_stream->next_out = rec;
2279 2280 2281
		/* Set heap_no and the status bits. */
		mach_write_to_2(rec - REC_NEW_HEAP_NO, heap_status);
		heap_status += 1 << REC_HEAP_NO_SHIFT;
2282

2283 2284
		/* Read the offsets. The status bits are needed here. */
		offsets = rec_get_offsets(rec, index, offsets,
2285
					  ULINT_UNDEFINED, &heap);
2286

2287
		/* This is a leaf page in a clustered index. */
2288

2289 2290 2291
		/* Check if there are any externally stored columns.
		For each externally stored column, restore the
		BTR_EXTERN_FIELD_REF separately. */
2292

2293 2294 2295 2296
		if (UNIV_UNLIKELY(rec_offs_any_extern(offsets))) {
			if (UNIV_UNLIKELY
			    (!page_zip_decompress_clust_ext(
				    d_stream, rec, offsets, trx_id_col))) {
2297

2298 2299 2300 2301 2302 2303 2304 2305 2306
				goto zlib_error;
			}
		} else {
			/* Skip trx_id and roll_ptr */
			ulint	len;
			byte*	dst = rec_get_nth_field(rec, offsets,
							trx_id_col, &len);
			if (UNIV_UNLIKELY(len < DATA_TRX_ID_LEN
					  + DATA_ROLL_PTR_LEN)) {
2307

2308 2309
				goto zlib_error;
			}
2310

2311
			d_stream->avail_out = dst - d_stream->next_out;
2312

2313 2314 2315 2316 2317 2318
			switch (inflate(d_stream, Z_SYNC_FLUSH)) {
			case Z_STREAM_END:
			case Z_OK:
			case Z_BUF_ERROR:
				if (!d_stream->avail_out) {
					break;
2319
				}
2320 2321 2322 2323
				/* fall through */
			default:
				goto zlib_error;
			}
2324

2325
			ut_ad(d_stream->next_out == dst);
2326

2327 2328
			d_stream->next_out += DATA_TRX_ID_LEN
				+ DATA_ROLL_PTR_LEN;
2329
		}
marko's avatar
marko committed
2330

2331 2332 2333
		/* Decompress the last bytes of the record. */
		d_stream->avail_out = rec_get_end(rec, offsets)
			- d_stream->next_out;
2334

2335 2336 2337 2338 2339 2340
		switch (inflate(d_stream, Z_SYNC_FLUSH)) {
		case Z_STREAM_END:
		case Z_OK:
		case Z_BUF_ERROR:
			if (!d_stream->avail_out) {
				break;
2341
			}
2342 2343 2344
			/* fall through */
		default:
			goto zlib_error;
2345
		}
2346 2347
	}

2348 2349
	/* Decompress any trailing garbage, in case the last record was
	allocated from an originally longer space on the free list. */
2350 2351 2352 2353
	d_stream->avail_out = page_header_get_field(page_zip->data,
						    PAGE_HEAP_TOP)
		- page_offset(d_stream->next_out);
	if (UNIV_UNLIKELY(d_stream->avail_out > UNIV_PAGE_SIZE
2354
			  - PAGE_ZIP_START - PAGE_DIR)) {
2355 2356 2357 2358

		goto zlib_error;
	}

2359
	if (UNIV_UNLIKELY(inflate(d_stream, Z_FINISH) != Z_STREAM_END)) {
2360
zlib_error:
2361 2362
		inflateEnd(d_stream);
		return(FALSE);
2363 2364
	}

2365
	/* Note that d_stream->avail_out > 0 may hold here
2366
	if the modification log is nonempty. */
2367 2368

zlib_done:
2369
	if (UNIV_UNLIKELY(inflateEnd(d_stream) != Z_OK)) {
marko's avatar
marko committed
2370 2371
		ut_error;
	}
2372

2373 2374
	{
		page_t*	page = page_align(d_stream->next_out);
2375

2376 2377 2378 2379 2380 2381
		/* Clear the unused heap space on the uncompressed page. */
		memset(d_stream->next_out, 0,
		       page_dir_get_nth_slot(page,
					     page_dir_get_n_slots(page) - 1)
		       - d_stream->next_out);
	}
2382

2383
#ifdef UNIV_DEBUG
2384
	page_zip->m_start = PAGE_DATA + d_stream->total_in;
2385
#endif /* UNIV_DEBUG */
marko's avatar
marko committed
2386 2387

	/* Apply the modification log. */
2388 2389
	{
		const byte*	mod_log_ptr;
2390
		mod_log_ptr = page_zip_apply_log(d_stream->next_in,
2391
						 d_stream->avail_in + 1,
2392 2393 2394
						 recs, n_dense,
						 trx_id_col, heap_status,
						 index, offsets);
2395

2396
		if (UNIV_UNLIKELY(!mod_log_ptr)) {
2397
			return(FALSE);
marko's avatar
marko committed
2398
		}
2399
		page_zip->m_end = mod_log_ptr - page_zip->data;
2400
		page_zip->m_nonempty = mod_log_ptr != d_stream->next_in;
2401
		ut_a(page_zip_get_trailer_len(page_zip, TRUE, NULL)
2402
		     + page_zip->m_end < page_zip_get_size(page_zip));
marko's avatar
marko committed
2403 2404
	}

2405
	storage = page_zip->data + page_zip_get_size(page_zip)
2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433
		- n_dense * PAGE_ZIP_DIR_SLOT_SIZE;

	externs = storage - n_dense
		* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);

	/* Restore the uncompressed columns in heap_no order. */

	for (slot = 0; slot < n_dense; slot++) {
		ulint	i;
		ulint	len;
		byte*	dst;
		rec_t*	rec	= recs[slot];
		ibool	exists	= !page_zip_dir_find_free(
			page_zip, page_offset(rec));
		offsets = rec_get_offsets(rec, index, offsets,
					  ULINT_UNDEFINED, &heap);

		dst = rec_get_nth_field(rec, offsets,
					trx_id_col, &len);
		ut_ad(len >= DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
		storage -= DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
		memcpy(dst, storage,
		       DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);

		/* Check if there are any externally stored
		columns in this record.  For each externally
		stored column, restore or clear the
		BTR_EXTERN_FIELD_REF. */
2434 2435 2436
		if (!rec_offs_any_extern(offsets)) {
			continue;
		}
2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476

		for (i = 0; i < rec_offs_n_fields(offsets); i++) {
			if (!rec_offs_nth_extern(offsets, i)) {
				continue;
			}
			dst = rec_get_nth_field(rec, offsets, i, &len);
			ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
			dst += len - BTR_EXTERN_FIELD_REF_SIZE;

			if (UNIV_LIKELY(exists)) {
				/* Existing record:
				restore the BLOB pointer */
				externs -= BTR_EXTERN_FIELD_REF_SIZE;

				memcpy(dst, externs,
				       BTR_EXTERN_FIELD_REF_SIZE);

				page_zip->n_blobs++;
			} else {
				/* Deleted record:
				clear the BLOB pointer */
				memset(dst, 0,
				       BTR_EXTERN_FIELD_REF_SIZE);
			}
		}
	}

	return(TRUE);
}

/**************************************************************************
Decompress a page.  This function should tolerate errors on the compressed
page.  Instead of letting assertions fail, it will return FALSE if an
inconsistency is detected. */

ibool
page_zip_decompress(
/*================*/
				/* out: TRUE on success, FALSE on failure */
	page_zip_des_t*	page_zip,/* in: data, size;
2477
				out: m_start, m_end, m_nonempty, n_blobs */
2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488
	page_t*		page)	/* out: uncompressed page, may be trashed */
{
	z_stream	d_stream;
	dict_index_t*	index	= NULL;
	rec_t**		recs;	/* dense page directory, sorted by address */
	ulint		n_dense;/* number of user records on the page */
	ulint		trx_id_col = ULINT_UNDEFINED;
	mem_heap_t*	heap;
	ulint*		offsets;

	ut_ad(page_zip_simple_validate(page_zip));
2489 2490
	UNIV_MEM_ASSERT_W(page, UNIV_PAGE_SIZE);
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
2491 2492

	/* The dense directory excludes the infimum and supremum records. */
2493
	n_dense = page_dir_get_n_heap(page_zip->data) - PAGE_HEAP_NO_USER_LOW;
2494
	if (UNIV_UNLIKELY(n_dense * PAGE_ZIP_DIR_SLOT_SIZE
2495
			  >= page_zip_get_size(page_zip))) {
2496 2497 2498
		return(FALSE);
	}

2499
	heap = mem_heap_create(n_dense * (3 * sizeof *recs) + UNIV_PAGE_SIZE);
2500 2501 2502 2503 2504 2505
	recs = mem_heap_alloc(heap, n_dense * (2 * sizeof *recs));

#ifdef UNIV_ZIP_DEBUG
	/* Clear the page. */
	memset(page, 0x55, UNIV_PAGE_SIZE);
#endif /* UNIV_ZIP_DEBUG */
2506
	UNIV_MEM_INVALID(page, UNIV_PAGE_SIZE);
2507 2508 2509 2510 2511 2512 2513
	/* Copy the page header. */
	memcpy(page, page_zip->data, PAGE_DATA);

	/* Copy the page directory. */
	if (UNIV_UNLIKELY(!page_zip_dir_decode(page_zip, page, recs,
					       recs + n_dense, n_dense))) {
zlib_error:
2514 2515 2516 2517
		mem_heap_free(heap);
		return(FALSE);
	}

2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531
	/* Copy the infimum and supremum records. */
	memcpy(page + (PAGE_NEW_INFIMUM - REC_N_NEW_EXTRA_BYTES),
	       infimum_extra, sizeof infimum_extra);
	if (UNIV_UNLIKELY(!page_get_n_recs(page))) {
		rec_set_next_offs_new(page + PAGE_NEW_INFIMUM,
				      PAGE_NEW_SUPREMUM);
	} else {
		rec_set_next_offs_new(page + PAGE_NEW_INFIMUM,
				      page_zip_dir_get(page_zip, 0)
				      & PAGE_ZIP_DIR_SLOT_MASK);
	}
	memcpy(page + PAGE_NEW_INFIMUM, infimum_data, sizeof infimum_data);
	memcpy(page + (PAGE_NEW_SUPREMUM - REC_N_NEW_EXTRA_BYTES + 1),
	       supremum_extra_data, sizeof supremum_extra_data);
2532

2533
	page_zip_set_alloc(&d_stream, heap);
2534

2535 2536
	if (UNIV_UNLIKELY(inflateInit2(&d_stream, UNIV_PAGE_SIZE_SHIFT)
			  != Z_OK)) {
2537
		ut_error;
2538
	}
2539

2540 2541 2542
	d_stream.next_in = page_zip->data + PAGE_DATA;
	/* Subtract the space reserved for
	the page header and the end marker of the modification log. */
2543
	d_stream.avail_in = page_zip_get_size(page_zip) - (PAGE_DATA + 1);
2544

2545 2546
	d_stream.next_out = page + PAGE_ZIP_START;
	d_stream.avail_out = UNIV_PAGE_SIZE - PAGE_ZIP_START;
2547

2548 2549 2550
	/* Decode the zlib header and the index information. */
	if (UNIV_UNLIKELY(inflate(&d_stream, Z_BLOCK) != Z_OK)
	    || UNIV_UNLIKELY(inflate(&d_stream, Z_BLOCK) != Z_OK)) {
2551

2552 2553
		goto zlib_error;
	}
2554

2555 2556 2557
	index = page_zip_fields_decode(
		page + PAGE_ZIP_START, d_stream.next_out,
		page_is_leaf(page) ? &trx_id_col : NULL);
2558

2559
	if (UNIV_UNLIKELY(!index)) {
2560

2561 2562
		goto zlib_error;
	}
2563

2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601
	/* Decompress the user records. */
	page_zip->n_blobs = 0;
	d_stream.next_out = page + PAGE_ZIP_START;

	{
		/* Pre-allocate the offsets for rec_get_offsets_reverse(). */
		ulint	n = 1 + 1/* node ptr */ + REC_OFFS_HEADER_SIZE
			+ dict_index_get_n_fields(index);
		offsets = mem_heap_alloc(heap, n * sizeof(ulint));
		*offsets = n;
	}

	/* Decompress the records in heap_no order. */
	if (!page_is_leaf(page)) {
		/* This is a node pointer page. */
		ulint	info_bits;

		if (UNIV_UNLIKELY
		    (!page_zip_decompress_node_ptrs(page_zip, &d_stream,
						    recs, n_dense, index,
						    offsets, heap))) {
			goto err_exit;
		}

		info_bits = mach_read_from_4(page + FIL_PAGE_PREV) == FIL_NULL
			? REC_INFO_MIN_REC_FLAG : 0;

		if (UNIV_UNLIKELY(!page_zip_set_extra_bytes(page_zip, page,
							    info_bits))) {
			goto err_exit;
		}
	} else if (UNIV_LIKELY(trx_id_col == ULINT_UNDEFINED)) {
		/* This is a leaf page in a secondary index. */
		if (UNIV_UNLIKELY(!page_zip_decompress_sec(page_zip, &d_stream,
							   recs, n_dense,
							   index, offsets))) {
			goto err_exit;
		}
2602

2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618
		if (UNIV_UNLIKELY(!page_zip_set_extra_bytes(page_zip,
							    page, 0))) {
err_exit:
			page_zip_fields_free(index);
			mem_heap_free(heap);
			return(FALSE);
		}
	} else {
		/* This is a leaf page in a clustered index. */
		if (UNIV_UNLIKELY(!page_zip_decompress_clust(page_zip,
							     &d_stream, recs,
							     n_dense, index,
							     trx_id_col,
							     offsets, heap))) {
			goto err_exit;
		}
2619

2620 2621 2622 2623
		if (UNIV_UNLIKELY(!page_zip_set_extra_bytes(page_zip,
							    page, 0))) {
			goto err_exit;
		}
2624 2625
	}

marko's avatar
marko committed
2626 2627
	ut_a(page_is_comp(page));

2628 2629
	page_zip_fields_free(index);
	mem_heap_free(heap);
2630
	page_zip_decompress_count[page_zip->ssize]++;
2631

marko's avatar
marko committed
2632 2633 2634
	return(TRUE);
}

2635
#ifdef UNIV_ZIP_DEBUG
2636 2637 2638
/* Flag: make page_zip_validate() compare page headers only */
ibool	page_zip_validate_header_only = FALSE;

marko's avatar
marko committed
2639 2640 2641 2642 2643 2644
/**************************************************************************
Check that the compressed and decompressed pages match. */

ibool
page_zip_validate(
/*==============*/
2645
					/* out: TRUE if valid, FALSE if not */
2646 2647
	const page_zip_des_t*	page_zip,/* in: compressed page */
	const page_t*		page)	/* in: uncompressed page */
marko's avatar
marko committed
2648 2649
{
	page_zip_des_t	temp_page_zip = *page_zip;
2650
	byte*		temp_page_buf;
2651
	page_t*		temp_page;
2652
	ibool		valid;
marko's avatar
marko committed
2653

2654
	if (memcmp(page_zip->data + FIL_PAGE_PREV, page + FIL_PAGE_PREV,
2655
		   FIL_PAGE_LSN - FIL_PAGE_PREV)
2656 2657
	    || memcmp(page_zip->data + FIL_PAGE_TYPE, page + FIL_PAGE_TYPE, 2)
	    || memcmp(page_zip->data + FIL_PAGE_DATA, page + FIL_PAGE_DATA,
2658
		      PAGE_DATA - FIL_PAGE_DATA)) {
2659 2660 2661 2662
		fputs("page_zip_validate(): page header mismatch\n", stderr);
		return(FALSE);
	}

2663 2664
	ut_a(page_is_comp(page));

2665 2666 2667 2668
	if (page_zip_validate_header_only) {
		return(TRUE);
	}

2669 2670 2671 2672
	/* page_zip_decompress() expects the uncompressed page to be
	UNIV_PAGE_SIZE aligned. */
	temp_page_buf = ut_malloc(2 * UNIV_PAGE_SIZE);
	temp_page = ut_align(temp_page_buf, UNIV_PAGE_SIZE);
2673

2674
	valid = page_zip_decompress(&temp_page_zip, temp_page);
2675 2676 2677 2678 2679 2680
	if (!valid) {
		fputs("page_zip_validate(): failed to decompress\n", stderr);
		goto func_exit;
	}
	if (page_zip->n_blobs != temp_page_zip.n_blobs) {
		fprintf(stderr,
2681
			"page_zip_validate(): n_blobs mismatch: %d!=%d\n",
2682 2683 2684
			page_zip->n_blobs, temp_page_zip.n_blobs);
		valid = FALSE;
	}
2685
#ifdef UNIV_DEBUG
2686 2687
	if (page_zip->m_start != temp_page_zip.m_start) {
		fprintf(stderr,
2688
			"page_zip_validate(): m_start mismatch: %d!=%d\n",
2689 2690 2691
			page_zip->m_start, temp_page_zip.m_start);
		valid = FALSE;
	}
2692
#endif /* UNIV_DEBUG */
2693 2694
	if (page_zip->m_end != temp_page_zip.m_end) {
		fprintf(stderr,
2695
			"page_zip_validate(): m_end mismatch: %d!=%d\n",
2696 2697 2698
			page_zip->m_end, temp_page_zip.m_end);
		valid = FALSE;
	}
2699 2700 2701 2702 2703 2704
	if (page_zip->m_nonempty != temp_page_zip.m_nonempty) {
		fprintf(stderr,
			"page_zip_validate(): m_nonempty mismatch: %d!=%d\n",
			page_zip->m_nonempty, temp_page_zip.m_nonempty);
		valid = FALSE;
	}
2705
	if (memcmp(page + PAGE_HEADER, temp_page + PAGE_HEADER,
2706
		   UNIV_PAGE_SIZE - PAGE_HEADER - FIL_PAGE_DATA_END)) {
2707
		fputs("page_zip_validate(): content mismatch\n", stderr);
2708 2709 2710 2711
		valid = FALSE;
	}

func_exit:
2712
	ut_free(temp_page_buf);
2713
	return(valid);
marko's avatar
marko committed
2714
}
2715
#endif /* UNIV_ZIP_DEBUG */
marko's avatar
marko committed
2716

2717 2718 2719 2720 2721 2722 2723 2724 2725 2726
#ifdef UNIV_DEBUG
static
ibool
page_zip_header_cmp(
/*================*/
					/* out: TRUE */
	const page_zip_des_t*	page_zip,/* in: compressed page */
	const byte*		page)	/* in: uncompressed page */
{
	ut_ad(!memcmp(page_zip->data + FIL_PAGE_PREV, page + FIL_PAGE_PREV,
2727
		      FIL_PAGE_LSN - FIL_PAGE_PREV));
2728
	ut_ad(!memcmp(page_zip->data + FIL_PAGE_TYPE, page + FIL_PAGE_TYPE,
2729
		      2));
2730
	ut_ad(!memcmp(page_zip->data + FIL_PAGE_DATA, page + FIL_PAGE_DATA,
2731
		      PAGE_DATA - FIL_PAGE_DATA));
2732 2733 2734 2735 2736

	return(TRUE);
}
#endif /* UNIV_DEBUG */

2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762
/**************************************************************************
Write a record on the compressed page that contains externally stored
columns.  The data must already have been written to the uncompressed page. */
static
byte*
page_zip_write_rec_ext(
/*===================*/
					/* out: end of modification log */
	page_zip_des_t*	page_zip,	/* in/out: compressed page */
	const page_t*	page,		/* in: page containing rec */
	const byte*	rec,		/* in: record being written */
	dict_index_t*	index,		/* in: record descriptor */
	const ulint*	offsets,	/* in: rec_get_offsets(rec, index) */
	ulint		create,		/* in: nonzero=insert, zero=update */
	ulint		trx_id_col,	/* in: position of DB_TRX_ID */
	ulint		heap_no,	/* in: heap number of rec */
	byte*		storage,	/* in: end of dense page directory */
	byte*		data)		/* in: end of modification log */
{
	const byte*	start	= rec;
	ulint		i;
	ulint		len;
	byte*		externs	= storage;
	ulint		n_ext	= rec_offs_n_extern(offsets);

	ut_ad(rec_offs_validate(rec, index, offsets));
2763 2764 2765
	UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
	UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
			   rec_offs_extra_size(offsets));
2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864

	externs -= (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
		* (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW);

	/* Note that this will not take into account
	the BLOB columns of rec if create==TRUE. */
	ut_ad(data + rec_offs_data_size(offsets)
	      - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
	      - n_ext * BTR_EXTERN_FIELD_REF_SIZE
	      < externs - BTR_EXTERN_FIELD_REF_SIZE * page_zip->n_blobs);

	{
		ulint	blob_no = page_zip_get_n_prev_extern(
			page_zip, rec, index);
		byte*	ext_end = externs - page_zip->n_blobs
			* BTR_EXTERN_FIELD_REF_SIZE;
		ut_ad(blob_no <= page_zip->n_blobs);
		externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;

		if (create) {
			page_zip->n_blobs += n_ext;
			ut_ad(!memcmp
			      (ext_end - n_ext
			       * BTR_EXTERN_FIELD_REF_SIZE,
			       zero,
			       BTR_EXTERN_FIELD_REF_SIZE));
			memmove(ext_end - n_ext
				* BTR_EXTERN_FIELD_REF_SIZE,
				ext_end,
				externs - ext_end);
		}

		ut_a(blob_no + n_ext <= page_zip->n_blobs);
	}

	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
		const byte*	src;

		if (UNIV_UNLIKELY(i == trx_id_col)) {
			ut_ad(!rec_offs_nth_extern(offsets,
						   i));
			ut_ad(!rec_offs_nth_extern(offsets,
						   i + 1));
			/* Locate trx_id and roll_ptr. */
			src = rec_get_nth_field(rec, offsets,
						i, &len);
			ut_ad(len == DATA_TRX_ID_LEN);
			ut_ad(src + DATA_TRX_ID_LEN
			      == rec_get_nth_field(
				      rec, offsets,
				      i + 1, &len));
			ut_ad(len == DATA_ROLL_PTR_LEN);

			/* Log the preceding fields. */
			ut_ad(!memcmp(data, zero,
				      ut_min(src - start,
					     sizeof zero)));
			memcpy(data, start, src - start);
			data += src - start;
			start = src + (DATA_TRX_ID_LEN
				       + DATA_ROLL_PTR_LEN);

			/* Store trx_id and roll_ptr. */
			memcpy(storage - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
			       * (heap_no - 1),
			       src, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
			i++; /* skip also roll_ptr */
		} else if (rec_offs_nth_extern(offsets, i)) {
			src = rec_get_nth_field(rec, offsets,
						i, &len);

			ut_ad(dict_index_is_clust(index));
			ut_ad(len
			      >= BTR_EXTERN_FIELD_REF_SIZE);
			src += len - BTR_EXTERN_FIELD_REF_SIZE;

			ut_ad(!memcmp(data, zero,
				      ut_min(src - start, sizeof zero)));
			memcpy(data, start, src - start);
			data += src - start;
			start = src + BTR_EXTERN_FIELD_REF_SIZE;

			/* Store the BLOB pointer. */
			externs -= BTR_EXTERN_FIELD_REF_SIZE;
			ut_ad(data < externs);
			memcpy(externs, src, BTR_EXTERN_FIELD_REF_SIZE);
		}
	}

	/* Log the last bytes of the record. */
	len = rec_offs_data_size(offsets) - (start - rec);

	ut_ad(!memcmp(data, zero, ut_min(len, sizeof zero)));
	memcpy(data, start, len);
	data += len;

	return(data);
}

marko's avatar
marko committed
2865
/**************************************************************************
2866
Write an entire record on the compressed page.  The data must already
marko's avatar
marko committed
2867 2868 2869
have been written to the uncompressed page. */

void
2870 2871
page_zip_write_rec(
/*===============*/
marko's avatar
marko committed
2872
	page_zip_des_t*	page_zip,/* in/out: compressed page */
2873
	const byte*	rec,	/* in: record being written */
2874
	dict_index_t*	index,	/* in: the index the record belongs to */
2875 2876
	const ulint*	offsets,/* in: rec_get_offsets(rec, index) */
	ulint		create)	/* in: nonzero=insert, zero=update */
marko's avatar
marko committed
2877
{
2878 2879 2880 2881 2882
	const page_t*	page;
	byte*		data;
	byte*		storage;
	ulint		heap_no;
	byte*		slot;
marko's avatar
marko committed
2883

2884
	ut_ad(buf_frame_get_page_zip(rec) == page_zip);
marko's avatar
marko committed
2885
	ut_ad(page_zip_simple_validate(page_zip));
2886 2887
	ut_ad(page_zip_get_size(page_zip)
	      > PAGE_DATA + page_zip_dir_size(page_zip));
2888
	ut_ad(rec_offs_comp(offsets));
2889
	ut_ad(rec_offs_validate(rec, index, offsets));
marko's avatar
marko committed
2890

2891
	ut_ad(page_zip->m_start >= PAGE_DATA);
marko's avatar
marko committed
2892

2893
	page = page_align(rec);
2894 2895

	ut_ad(page_zip_header_cmp(page_zip, page));
2896
	ut_ad(page_simple_validate_new((page_t*) page));
2897

2898
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
2899 2900 2901
	UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
	UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
			   rec_offs_extra_size(offsets));
2902

2903
	slot = page_zip_dir_find(page_zip, page_offset(rec));
marko's avatar
marko committed
2904
	ut_a(slot);
2905
	/* Copy the delete mark. */
2906
	if (rec_get_deleted_flag(rec, TRUE)) {
2907 2908 2909 2910
		*slot |= PAGE_ZIP_DIR_SLOT_DEL >> 8;
	} else {
		*slot &= ~(PAGE_ZIP_DIR_SLOT_DEL >> 8);
	}
marko's avatar
marko committed
2911

2912 2913
	ut_ad(rec_get_start((rec_t*) rec, offsets) >= page + PAGE_ZIP_START);
	ut_ad(rec_get_end((rec_t*) rec, offsets) <= page + UNIV_PAGE_SIZE
2914 2915
	      - PAGE_DIR - PAGE_DIR_SLOT_SIZE
	      * page_dir_get_n_slots(page));
marko's avatar
marko committed
2916

2917
	heap_no = rec_get_heap_no_new(rec);
2918
	ut_ad(heap_no >= PAGE_HEAP_NO_USER_LOW); /* not infimum or supremum */
2919 2920
	ut_ad(heap_no < page_dir_get_n_heap(page));

marko's avatar
marko committed
2921
	/* Append to the modification log. */
2922
	data = page_zip->data + page_zip->m_end;
2923 2924 2925 2926 2927
	ut_ad(!*data);

	/* Identify the record by writing its heap number - 1.
	0 is reserved to indicate the end of the modification log. */

2928 2929
	if (UNIV_UNLIKELY(heap_no - 1 >= 64)) {
		*data++ = 0x80 | (heap_no - 1) >> 7;
marko's avatar
marko committed
2930
		ut_ad(!*data);
2931
	}
2932
	*data++ = (heap_no - 1) << 1;
marko's avatar
marko committed
2933
	ut_ad(!*data);
2934 2935

	{
2936
		const byte*	start	= rec - rec_offs_extra_size(offsets);
2937 2938 2939 2940 2941 2942 2943 2944 2945
		const byte*	b	= rec - REC_N_NEW_EXTRA_BYTES;

		/* Write the extra bytes backwards, so that
		rec_offs_extra_size() can be easily computed in
		page_zip_apply_log() by invoking
		rec_get_offsets_reverse(). */

		while (b != start) {
			*data++ = *--b;
marko's avatar
marko committed
2946
			ut_ad(!*data);
2947 2948 2949 2950
		}
	}

	/* Write the data bytes.  Store the uncompressed bytes separately. */
2951
	storage = page_zip->data + page_zip_get_size(page_zip)
2952
		- (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW)
2953
		* PAGE_ZIP_DIR_SLOT_SIZE;
2954 2955 2956

	if (page_is_leaf(page)) {
		ulint		len;
2957 2958

		if (dict_index_is_clust(index)) {
2959 2960
			ulint		trx_id_col;

2961 2962
			trx_id_col = dict_index_get_sys_col_pos(index,
								DATA_TRX_ID);
2963
			ut_ad(trx_id_col != ULINT_UNDEFINED);
2964

2965 2966
			/* Store separately trx_id, roll_ptr and
			the BTR_EXTERN_FIELD_REF of each BLOB column. */
2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006
			if (rec_offs_any_extern(offsets)) {
				data = page_zip_write_rec_ext(
					page_zip, page,
					rec, index, offsets, create,
					trx_id_col, heap_no, storage, data);
			} else {
				/* Locate trx_id and roll_ptr. */
				const byte*	src
					= rec_get_nth_field(rec, offsets,
							    trx_id_col, &len);
				ut_ad(len == DATA_TRX_ID_LEN);
				ut_ad(src + DATA_TRX_ID_LEN
				      == rec_get_nth_field(
					      rec, offsets,
					      trx_id_col + 1, &len));
				ut_ad(len == DATA_ROLL_PTR_LEN);

				/* Log the preceding fields. */
				ut_ad(!memcmp(data, zero,
					      ut_min(src - rec, sizeof zero)));
				memcpy(data, rec, src - rec);
				data += src - rec;

				/* Store trx_id and roll_ptr. */
				memcpy(storage
				       - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
				       * (heap_no - 1),
				       src,
				       DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);

				src += DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;

				/* Log the last bytes of the record. */
				len = rec_offs_data_size(offsets)
					- (src - rec);

				ut_ad(!memcmp(data, zero,
					      ut_min(len, sizeof zero)));
				memcpy(data, src, len);
				data += len;
3007
			}
3008 3009 3010 3011
		} else {
			/* Leaf page of a secondary index:
			no externally stored columns */
			ut_ad(dict_index_get_sys_col_pos(index, DATA_TRX_ID)
3012
			      == ULINT_UNDEFINED);
3013
			ut_ad(!rec_offs_any_extern(offsets));
3014

3015 3016
			/* Log the entire record. */
			len = rec_offs_data_size(offsets);
3017

3018 3019 3020 3021
			ut_ad(!memcmp(data, zero, ut_min(len, sizeof zero)));
			memcpy(data, rec, len);
			data += len;
		}
3022 3023 3024 3025 3026 3027 3028 3029 3030 3031
	} else {
		/* This is a node pointer page. */
		ulint	len;

		/* Non-leaf nodes should not have any externally
		stored columns. */
		ut_ad(!rec_offs_any_extern(offsets));

		/* Copy the data bytes, except node_ptr. */
		len = rec_offs_data_size(offsets) - REC_NODE_PTR_SIZE;
3032
		ut_ad(data + len < storage - REC_NODE_PTR_SIZE
3033
		      * (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW));
marko's avatar
marko committed
3034
		ut_ad(!memcmp(data, zero, ut_min(len, sizeof zero)));
3035 3036 3037 3038 3039
		memcpy(data, rec, len);
		data += len;

		/* Copy the node pointer to the uncompressed area. */
		memcpy(storage - REC_NODE_PTR_SIZE
3040 3041 3042
		       * (heap_no - 1),
		       rec + len,
		       REC_NODE_PTR_SIZE);
3043 3044
	}

3045
	ut_a(!*data);
3046
	ut_ad((ulint) (data - page_zip->data) < page_zip_get_size(page_zip));
3047
	page_zip->m_end = data - page_zip->data;
3048
	page_zip->m_nonempty = TRUE;
3049

3050
#ifdef UNIV_ZIP_DEBUG
3051
	ut_a(page_zip_validate(page_zip, page_align(rec)));
3052
#endif /* UNIV_ZIP_DEBUG */
3053 3054
}

3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071
/***************************************************************
Parses a log record of writing a BLOB pointer of a record. */

byte*
page_zip_parse_write_blob_ptr(
/*==========================*/
				/* out: end of log record or NULL */
	byte*		ptr,	/* in: redo log buffer */
	byte*		end_ptr,/* in: redo log buffer end */
	page_t*		page,	/* in/out: uncompressed page */
	page_zip_des_t*	page_zip)/* in/out: compressed page */
{
	ulint	offset;
	ulint	z_offset;

	ut_ad(!page == !page_zip);

3072 3073
	if (UNIV_UNLIKELY
	    (end_ptr < ptr + (2 + 2 + BTR_EXTERN_FIELD_REF_SIZE))) {
3074 3075 3076 3077 3078 3079 3080 3081

		return(NULL);
	}

	offset = mach_read_from_2(ptr);
	z_offset = mach_read_from_2(ptr + 2);

	if (UNIV_UNLIKELY(offset < PAGE_ZIP_START)
3082 3083
	    || UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)
	    || UNIV_UNLIKELY(z_offset >= UNIV_PAGE_SIZE)) {
3084 3085 3086 3087 3088 3089 3090
corrupt:
		recv_sys->found_corrupt_log = TRUE;

		return(NULL);
	}

	if (page) {
3091
		if (UNIV_UNLIKELY(!page_zip)
3092
		    || UNIV_UNLIKELY(!page_is_leaf(page))) {
3093 3094 3095 3096

			goto corrupt;
		}

3097
#ifdef UNIV_ZIP_DEBUG
3098
		ut_a(page_zip_validate(page_zip, page));
3099
#endif /* UNIV_ZIP_DEBUG */
3100

3101
		memcpy(page + offset,
3102
		       ptr + 4, BTR_EXTERN_FIELD_REF_SIZE);
3103
		memcpy(page_zip->data + z_offset,
3104
		       ptr + 4, BTR_EXTERN_FIELD_REF_SIZE);
3105

3106
#ifdef UNIV_ZIP_DEBUG
3107
		ut_a(page_zip_validate(page_zip, page));
3108
#endif /* UNIV_ZIP_DEBUG */
3109 3110 3111 3112 3113
	}

	return(ptr + (2 + 2 + BTR_EXTERN_FIELD_REF_SIZE));
}

3114
/**************************************************************************
3115
Write a BLOB pointer of a record on the leaf page of a clustered index.
3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129
The information must already have been updated on the uncompressed page. */

void
page_zip_write_blob_ptr(
/*====================*/
	page_zip_des_t*	page_zip,/* in/out: compressed page */
	const byte*	rec,	/* in/out: record whose data is being
				written */
	dict_index_t*	index,	/* in: index of the page */
	const ulint*	offsets,/* in: rec_get_offsets(rec, index) */
	ulint		n,	/* in: column index */
	mtr_t*		mtr)	/* in: mini-transaction handle,
				or NULL if no logging is needed */
{
3130 3131
	const byte*	field;
	byte*		externs;
3132
	const page_t*	page	= page_align(rec);
3133 3134
	ulint		blob_no;
	ulint		len;
3135

3136 3137
	ut_ad(buf_frame_get_page_zip(rec) == page_zip);
	ut_ad(page_simple_validate_new((page_t*) page));
3138
	ut_ad(page_zip_simple_validate(page_zip));
3139 3140
	ut_ad(page_zip_get_size(page_zip)
	      > PAGE_DATA + page_zip_dir_size(page_zip));
3141
	ut_ad(rec_offs_comp(offsets));
3142
	ut_ad(rec_offs_validate(rec, NULL, offsets));
3143
	ut_ad(rec_offs_any_extern(offsets));
3144 3145 3146
	ut_ad(rec_offs_nth_extern(offsets, n));

	ut_ad(page_zip->m_start >= PAGE_DATA);
3147
	ut_ad(page_zip_header_cmp(page_zip, page));
3148 3149

	ut_ad(page_is_leaf(page));
3150
	ut_ad(dict_index_is_clust(index));
3151

3152
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
3153 3154 3155
	UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
	UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
			   rec_offs_extra_size(offsets));
3156

3157
	blob_no = page_zip_get_n_prev_extern(page_zip, rec, index)
3158
		+ rec_get_n_extern_new(rec, index, n);
3159
	ut_a(blob_no < page_zip->n_blobs);
3160

3161
	externs = page_zip->data + page_zip_get_size(page_zip)
3162
		- (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW)
3163 3164
		* (PAGE_ZIP_DIR_SLOT_SIZE
		   + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
3165

3166
	field = rec_get_nth_field(rec, offsets, n, &len);
3167

3168 3169 3170 3171
	externs -= (blob_no + 1) * BTR_EXTERN_FIELD_REF_SIZE;
	field += len - BTR_EXTERN_FIELD_REF_SIZE;

	memcpy(externs, field, BTR_EXTERN_FIELD_REF_SIZE);
3172

3173
#ifdef UNIV_ZIP_DEBUG
3174
	ut_a(page_zip_validate(page_zip, page));
3175
#endif /* UNIV_ZIP_DEBUG */
3176

3177
	if (mtr) {
3178 3179
		byte*	log_ptr	= mlog_open(
			mtr, 11 + 2 + 2 + BTR_EXTERN_FIELD_REF_SIZE);
3180 3181 3182 3183
		if (UNIV_UNLIKELY(!log_ptr)) {
			return;
		}

3184 3185 3186
		log_ptr = mlog_write_initial_log_record_fast(
			(byte*) field, MLOG_ZIP_WRITE_BLOB_PTR, log_ptr, mtr);
		mach_write_to_2(log_ptr, page_offset(field));
3187
		log_ptr += 2;
3188 3189 3190 3191 3192
		mach_write_to_2(log_ptr, externs - page_zip->data);
		log_ptr += 2;
		memcpy(log_ptr, externs, BTR_EXTERN_FIELD_REF_SIZE);
		log_ptr += BTR_EXTERN_FIELD_REF_SIZE;
		mlog_close(mtr, log_ptr);
3193 3194 3195
	}
}

3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212
/***************************************************************
Parses a log record of writing the node pointer of a record. */

byte*
page_zip_parse_write_node_ptr(
/*==========================*/
				/* out: end of log record or NULL */
	byte*		ptr,	/* in: redo log buffer */
	byte*		end_ptr,/* in: redo log buffer end */
	page_t*		page,	/* in/out: uncompressed page */
	page_zip_des_t*	page_zip)/* in/out: compressed page */
{
	ulint	offset;
	ulint	z_offset;

	ut_ad(!page == !page_zip);

3213
	if (UNIV_UNLIKELY(end_ptr < ptr + (2 + 2 + REC_NODE_PTR_SIZE))) {
3214 3215 3216 3217 3218 3219 3220 3221

		return(NULL);
	}

	offset = mach_read_from_2(ptr);
	z_offset = mach_read_from_2(ptr + 2);

	if (UNIV_UNLIKELY(offset < PAGE_ZIP_START)
3222 3223
	    || UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)
	    || UNIV_UNLIKELY(z_offset >= UNIV_PAGE_SIZE)) {
3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235
corrupt:
		recv_sys->found_corrupt_log = TRUE;

		return(NULL);
	}

	if (page) {
		byte*	storage_end;
		byte*	field;
		byte*	storage;
		ulint	heap_no;

3236
		if (UNIV_UNLIKELY(!page_zip)
3237
		    || UNIV_UNLIKELY(page_is_leaf(page))) {
3238 3239 3240 3241

			goto corrupt;
		}

3242
#ifdef UNIV_ZIP_DEBUG
3243
		ut_a(page_zip_validate(page_zip, page));
3244
#endif /* UNIV_ZIP_DEBUG */
3245

3246 3247 3248
		field = page + offset;
		storage = page_zip->data + z_offset;

3249
		storage_end = page_zip->data + page_zip_get_size(page_zip)
3250
			- (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW)
3251 3252 3253 3254 3255
			* PAGE_ZIP_DIR_SLOT_SIZE;

		heap_no = 1 + (storage_end - storage) / REC_NODE_PTR_SIZE;

		if (UNIV_UNLIKELY((storage_end - storage) % REC_NODE_PTR_SIZE)
3256
		    || UNIV_UNLIKELY(heap_no < PAGE_HEAP_NO_USER_LOW)
3257 3258 3259 3260 3261 3262 3263 3264
		    || UNIV_UNLIKELY(heap_no >= page_dir_get_n_heap(page))) {

			goto corrupt;
		}

		memcpy(field, ptr + 4, REC_NODE_PTR_SIZE);
		memcpy(storage, ptr + 4, REC_NODE_PTR_SIZE);

3265
#ifdef UNIV_ZIP_DEBUG
3266
		ut_a(page_zip_validate(page_zip, page));
3267
#endif /* UNIV_ZIP_DEBUG */
3268 3269
	}

3270
	return(ptr + (2 + 2 + REC_NODE_PTR_SIZE));
3271 3272
}

3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286
/**************************************************************************
Write the node pointer of a record on a non-leaf compressed page. */

void
page_zip_write_node_ptr(
/*====================*/
	page_zip_des_t*	page_zip,/* in/out: compressed page */
	byte*		rec,	/* in/out: record */
	ulint		size,	/* in: data size of rec */
	ulint		ptr,	/* in: node pointer */
	mtr_t*		mtr)	/* in: mini-transaction, or NULL */
{
	byte*	field;
	byte*	storage;
3287
	page_t*	page	= page_align(rec);
3288

3289
	ut_ad(buf_frame_get_page_zip(rec) == page_zip);
3290
	ut_ad(page_simple_validate_new(page));
3291
	ut_ad(page_zip_simple_validate(page_zip));
3292 3293
	ut_ad(page_zip_get_size(page_zip)
	      > PAGE_DATA + page_zip_dir_size(page_zip));
3294 3295 3296
	ut_ad(page_rec_is_comp(rec));

	ut_ad(page_zip->m_start >= PAGE_DATA);
3297
	ut_ad(page_zip_header_cmp(page_zip, page));
3298 3299 3300

	ut_ad(!page_is_leaf(page));

3301 3302 3303
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
	UNIV_MEM_ASSERT_RW(rec, size);

3304
	storage = page_zip->data + page_zip_get_size(page_zip)
3305
		- (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW)
3306 3307
		* PAGE_ZIP_DIR_SLOT_SIZE
		- (rec_get_heap_no_new(rec) - 1) * REC_NODE_PTR_SIZE;
3308 3309
	field = rec + size - REC_NODE_PTR_SIZE;

3310
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
3311
	ut_a(!memcmp(storage, field, REC_NODE_PTR_SIZE));
3312
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
3313 3314 3315 3316 3317 3318 3319
#if REC_NODE_PTR_SIZE != 4
# error "REC_NODE_PTR_SIZE != 4"
#endif
	mach_write_to_4(field, ptr);
	memcpy(storage, field, REC_NODE_PTR_SIZE);

	if (mtr) {
3320
		byte*	log_ptr	= mlog_open(mtr,
3321
					    11 + 2 + 2 + REC_NODE_PTR_SIZE);
3322 3323 3324 3325
		if (UNIV_UNLIKELY(!log_ptr)) {
			return;
		}

3326 3327 3328
		log_ptr = mlog_write_initial_log_record_fast(
			field, MLOG_ZIP_WRITE_NODE_PTR, log_ptr, mtr);
		mach_write_to_2(log_ptr, page_offset(field));
3329
		log_ptr += 2;
3330 3331
		mach_write_to_2(log_ptr, storage - page_zip->data);
		log_ptr += 2;
3332
		memcpy(log_ptr, field, REC_NODE_PTR_SIZE);
3333 3334
		log_ptr += REC_NODE_PTR_SIZE;
		mlog_close(mtr, log_ptr);
3335
	}
marko's avatar
marko committed
3336 3337 3338
}

/**************************************************************************
3339
Write the trx_id and roll_ptr of a record on a B-tree leaf node page. */
marko's avatar
marko committed
3340

3341
void
3342 3343
page_zip_write_trx_id_and_roll_ptr(
/*===============================*/
3344 3345
	page_zip_des_t*	page_zip,/* in/out: compressed page */
	byte*		rec,	/* in/out: record */
3346 3347
	const ulint*	offsets,/* in: rec_get_offsets(rec, index) */
	ulint		trx_id_col,/* in: column number of TRX_ID in rec */
3348
	dulint		trx_id,	/* in: transaction identifier */
3349
	dulint		roll_ptr)/* in: roll_ptr */
marko's avatar
marko committed
3350
{
3351 3352
	byte*	field;
	byte*	storage;
3353
	page_t*	page	= page_align(rec);
3354
	ulint	len;
3355

3356
	ut_ad(buf_frame_get_page_zip(rec) == page_zip);
3357
	ut_ad(page_simple_validate_new(page));
3358
	ut_ad(page_zip_simple_validate(page_zip));
3359 3360
	ut_ad(page_zip_get_size(page_zip)
	      > PAGE_DATA + page_zip_dir_size(page_zip));
3361 3362
	ut_ad(rec_offs_validate(rec, NULL, offsets));
	ut_ad(rec_offs_comp(offsets));
3363 3364

	ut_ad(page_zip->m_start >= PAGE_DATA);
3365
	ut_ad(page_zip_header_cmp(page_zip, page));
3366 3367 3368

	ut_ad(page_is_leaf(page));

3369 3370
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));

3371
	storage = page_zip->data + page_zip_get_size(page_zip)
3372
		- (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW)
3373 3374 3375
		* PAGE_ZIP_DIR_SLOT_SIZE
		- (rec_get_heap_no_new(rec) - 1)
		* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
3376 3377 3378 3379 3380 3381 3382

#if DATA_TRX_ID + 1 != DATA_ROLL_PTR
# error "DATA_TRX_ID + 1 != DATA_ROLL_PTR"
#endif
	field = rec_get_nth_field(rec, offsets, trx_id_col, &len);
	ut_ad(len == DATA_TRX_ID_LEN);
	ut_ad(field + DATA_TRX_ID_LEN
3383
	      == rec_get_nth_field(rec, offsets, trx_id_col + 1, &len));
3384
	ut_ad(len == DATA_ROLL_PTR_LEN);
3385 3386 3387 3388 3389 3390 3391 3392 3393 3394
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
	ut_a(!memcmp(storage, field, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
#if DATA_TRX_ID_LEN != 6
# error "DATA_TRX_ID_LEN != 6"
#endif
	mach_write_to_6(field, trx_id);
#if DATA_ROLL_PTR_LEN != 7
# error "DATA_ROLL_PTR_LEN != 7"
#endif
marko's avatar
marko committed
3395
	mach_write_to_7(field + DATA_TRX_ID_LEN, roll_ptr);
3396
	memcpy(storage, field, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
3397

3398 3399 3400
	UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
	UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
			   rec_offs_extra_size(offsets));
3401
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
3402 3403
}

3404 3405 3406
#ifdef UNIV_ZIP_DEBUG
/* Set this variable in a debugger to disable page_zip_clear_rec().
The only observable effect should be the compression ratio due to
3407 3408 3409
deleted records not being zeroed out.  In rare cases, there can be
page_zip_validate() failures on the node_ptr, trx_id and roll_ptr
columns if the space is reallocated for a smaller record. */
3410 3411 3412
ibool	page_zip_clear_rec_disable;
#endif /* UNIV_ZIP_DEBUG */

3413 3414
/**************************************************************************
Clear an area on the uncompressed and compressed page, if possible. */
3415
static
3416 3417 3418 3419 3420 3421
void
page_zip_clear_rec(
/*===============*/
	page_zip_des_t*	page_zip,/* in/out: compressed page */
	byte*		rec,	/* in: record to clear */
	dict_index_t*	index,	/* in: index of rec */
3422
	const ulint*	offsets)/* in: rec_get_offsets(rec, index) */
3423
{
3424
	ulint	heap_no;
3425
	page_t*	page	= page_align(rec);
3426 3427
	/* page_zip_validate() would fail here if a record
	containing externally stored columns is being deleted. */
3428
	ut_ad(rec_offs_validate(rec, index, offsets));
3429 3430
	ut_ad(!page_zip_dir_find(page_zip, page_offset(rec)));
	ut_ad(page_zip_dir_find_free(page_zip, page_offset(rec)));
3431
	ut_ad(page_zip_header_cmp(page_zip, page));
3432

3433
	heap_no = rec_get_heap_no_new(rec);
3434
	ut_ad(heap_no >= PAGE_HEAP_NO_USER_LOW);
3435

3436
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
3437 3438 3439
	UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
	UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
			   rec_offs_extra_size(offsets));
3440

3441 3442
	if (
#ifdef UNIV_ZIP_DEBUG
3443
	    !page_zip_clear_rec_disable &&
3444
#endif /* UNIV_ZIP_DEBUG */
3445 3446
	    page_zip->m_end
	    + 1 + ((heap_no - 1) >= 64)/* size of the log entry */
3447 3448
	    + page_zip_get_trailer_len(page_zip,
				       dict_index_is_clust(index), NULL)
3449
	    < page_zip_get_size(page_zip)) {
3450
		byte*	data;
3451 3452 3453

		/* Clear only the data bytes, because the allocator and
		the decompressor depend on the extra bytes. */
3454 3455
		memset(rec, 0, rec_offs_data_size(offsets));

marko's avatar
marko committed
3456 3457
		if (!page_is_leaf(page)) {
			/* Clear node_ptr on the compressed page. */
3458 3459
			byte*	storage	= page_zip->data
				+ page_zip_get_size(page_zip)
3460 3461
				- (page_dir_get_n_heap(page)
				   - PAGE_HEAP_NO_USER_LOW)
3462
				* PAGE_ZIP_DIR_SLOT_SIZE;
marko's avatar
marko committed
3463 3464

			memset(storage - (heap_no - 1) * REC_NODE_PTR_SIZE,
3465
			       0, REC_NODE_PTR_SIZE);
3466
		} else if (dict_index_is_clust(index)) {
3467
			/* Clear trx_id and roll_ptr on the compressed page. */
3468 3469
			byte*	storage	= page_zip->data
				+ page_zip_get_size(page_zip)
3470 3471
				- (page_dir_get_n_heap(page)
				   - PAGE_HEAP_NO_USER_LOW)
3472
				* PAGE_ZIP_DIR_SLOT_SIZE;
3473 3474

			memset(storage - (heap_no - 1)
3475 3476
			       * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN),
			       0, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
3477 3478
		}

3479
		/* Log that the data was zeroed out. */
3480 3481 3482 3483
		data = page_zip->data + page_zip->m_end;
		ut_ad(!*data);
		if (UNIV_UNLIKELY(heap_no - 1 >= 64)) {
			*data++ = 0x80 | (heap_no - 1) >> 7;
marko's avatar
marko committed
3484
			ut_ad(!*data);
3485 3486 3487
		}
		*data++ = (heap_no - 1) << 1 | 1;
		ut_ad(!*data);
3488 3489
		ut_ad((ulint) (data - page_zip->data)
		      < page_zip_get_size(page_zip));
3490
		page_zip->m_end = data - page_zip->data;
3491
		page_zip->m_nonempty = TRUE;
3492
	} else if (page_is_leaf(page) && dict_index_is_clust(index)) {
marko's avatar
marko committed
3493 3494 3495
		/* Do not clear the record, because there is not enough space
		to log the operation. */

3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509
		if (rec_offs_any_extern(offsets)) {
			ulint	i;

			for (i = rec_offs_n_fields(offsets); i--; ) {
				/* Clear all BLOB pointers in order to make
				page_zip_validate() pass. */
				if (rec_offs_nth_extern(offsets, i)) {
					ulint	len;
					byte*	field = rec_get_nth_field(
						rec, offsets, i, &len);
					memset(field + len
					       - BTR_EXTERN_FIELD_REF_SIZE,
					       0, BTR_EXTERN_FIELD_REF_SIZE);
				}
marko's avatar
marko committed
3510
			}
3511 3512
		}
	}
marko's avatar
marko committed
3513 3514 3515 3516

#ifdef UNIV_ZIP_DEBUG
	ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_ZIP_DEBUG */
3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529
}

/**************************************************************************
Write the "deleted" flag of a record on a compressed page.  The flag must
already have been written on the uncompressed page. */

void
page_zip_rec_set_deleted(
/*=====================*/
	page_zip_des_t*	page_zip,/* in/out: compressed page */
	const byte*	rec,	/* in: record on the uncompressed page */
	ulint		flag)	/* in: the deleted flag (nonzero=TRUE) */
{
3530
	byte*	slot = page_zip_dir_find(page_zip, page_offset(rec));
3531
	ut_a(slot);
3532
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550
	if (flag) {
		*slot |= (PAGE_ZIP_DIR_SLOT_DEL >> 8);
	} else {
		*slot &= ~(PAGE_ZIP_DIR_SLOT_DEL >> 8);
	}
}

/**************************************************************************
Write the "owned" flag of a record on a compressed page.  The n_owned field
must already have been written on the uncompressed page. */

void
page_zip_rec_set_owned(
/*===================*/
	page_zip_des_t*	page_zip,/* in/out: compressed page */
	const byte*	rec,	/* in: record on the uncompressed page */
	ulint		flag)	/* in: the owned flag (nonzero=TRUE) */
{
3551
	byte*	slot = page_zip_dir_find(page_zip, page_offset(rec));
3552
	ut_a(slot);
3553
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
3554 3555 3556 3557 3558 3559 3560
	if (flag) {
		*slot |= (PAGE_ZIP_DIR_SLOT_OWNED >> 8);
	} else {
		*slot &= ~(PAGE_ZIP_DIR_SLOT_OWNED >> 8);
	}
}

3561 3562 3563 3564 3565 3566 3567 3568 3569 3570
/**************************************************************************
Insert a record to the dense page directory. */

void
page_zip_dir_insert(
/*================*/
	page_zip_des_t*	page_zip,/* in/out: compressed page */
	const byte*	prev_rec,/* in: record after which to insert */
	const byte*	free_rec,/* in: record from which rec was
				allocated, or NULL */
3571
	byte*		rec)	/* in: record to insert */
3572 3573 3574 3575 3576 3577 3578
{
	ulint	n_dense;
	byte*	slot_rec;
	byte*	slot_free;

	ut_ad(prev_rec != rec);
	ut_ad(page_rec_get_next((rec_t*) prev_rec) == rec);
3579
	ut_ad(page_zip_simple_validate(page_zip));
3580

3581 3582
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));

3583 3584
	if (page_rec_is_infimum(prev_rec)) {
		/* Use the first slot. */
3585
		slot_rec = page_zip->data + page_zip_get_size(page_zip);
3586
	} else {
3587
		byte*	end	= page_zip->data + page_zip_get_size(page_zip);
3588 3589 3590 3591
		byte*	start	= end - page_zip_dir_user_size(page_zip);

		if (UNIV_LIKELY(!free_rec)) {
			/* PAGE_N_RECS was already incremented
3592
			in page_cur_insert_rec_zip(), but the
3593 3594 3595 3596 3597
			dense directory slot at that position
			contains garbage.  Skip it. */
			start += PAGE_ZIP_DIR_SLOT_SIZE;
		}

3598 3599
		slot_rec = page_zip_dir_find_low(start, end,
						 page_offset(prev_rec));
3600 3601 3602
		ut_a(slot_rec);
	}

3603 3604 3605
	/* Read the old n_dense (n_heap may have been incremented). */
	n_dense = page_dir_get_n_heap(page_zip->data)
		- (PAGE_HEAP_NO_USER_LOW + 1);
3606 3607 3608 3609 3610

	if (UNIV_LIKELY_NULL(free_rec)) {
		/* The record was allocated from the free list.
		Shift the dense directory only up to that slot.
		Note that in this case, n_dense is actually
3611
		off by one, because page_cur_insert_rec_zip()
3612 3613
		did not increment n_heap. */
		ut_ad(rec_get_heap_no_new(rec) < n_dense + 1
3614
		      + PAGE_HEAP_NO_USER_LOW);
3615
		ut_ad(rec >= free_rec);
3616
		slot_free = page_zip_dir_find(page_zip, page_offset(free_rec));
3617 3618 3619 3620 3621 3622
		ut_ad(slot_free);
		slot_free += PAGE_ZIP_DIR_SLOT_SIZE;
	} else {
		/* The record was allocated from the heap.
		Shift the entire dense directory. */
		ut_ad(rec_get_heap_no_new(rec) == n_dense
3623
		      + PAGE_HEAP_NO_USER_LOW);
3624 3625

		/* Shift to the end of the dense page directory. */
3626
		slot_free = page_zip->data + page_zip_get_size(page_zip)
3627
			- PAGE_ZIP_DIR_SLOT_SIZE * n_dense;
3628 3629 3630 3631
	}

	/* Shift the dense directory to allocate place for rec. */
	memmove(slot_free - PAGE_ZIP_DIR_SLOT_SIZE, slot_free,
3632
		slot_rec - slot_free);
3633 3634 3635

	/* Write the entry for the inserted record.
	The "owned" and "deleted" flags must be zero. */
3636
	mach_write_to_2(slot_rec - PAGE_ZIP_DIR_SLOT_SIZE, page_offset(rec));
3637 3638
}

3639
/**************************************************************************
3640 3641
Shift the dense page directory and the array of BLOB pointers
when a record is deleted. */
3642 3643 3644 3645 3646

void
page_zip_dir_delete(
/*================*/
	page_zip_des_t*	page_zip,/* in/out: compressed page */
3647 3648 3649
	byte*		rec,	/* in: record to delete */
	dict_index_t*	index,	/* in: index of rec */
	const ulint*	offsets,/* in: rec_get_offsets(rec) */
3650 3651 3652 3653
	const byte*	free)	/* in: previous start of the free list */
{
	byte*	slot_rec;
	byte*	slot_free;
3654
	ulint	n_ext;
3655
	page_t*	page	= page_align(rec);
3656

3657 3658
	ut_ad(rec_offs_validate(rec, index, offsets));
	ut_ad(rec_offs_comp(offsets));
marko's avatar
marko committed
3659

3660
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
3661 3662 3663
	UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
	UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
			   rec_offs_extra_size(offsets));
3664

3665
	slot_rec = page_zip_dir_find(page_zip, page_offset(rec));
3666 3667 3668

	ut_a(slot_rec);

3669 3670
	/* This could not be done before page_zip_dir_find(). */
	page_header_set_field(page, page_zip, PAGE_N_RECS,
3671
			      (ulint)(page_get_n_recs(page) - 1));
3672

marko's avatar
marko committed
3673
	if (UNIV_UNLIKELY(!free)) {
3674
		/* Make the last slot the start of the free list. */
3675
		slot_free = page_zip->data + page_zip_get_size(page_zip)
3676
			- PAGE_ZIP_DIR_SLOT_SIZE
3677 3678
			* (page_dir_get_n_heap(page_zip->data)
			   - PAGE_HEAP_NO_USER_LOW);
3679
	} else {
3680 3681
		slot_free = page_zip_dir_find_free(page_zip,
						   page_offset(free));
3682 3683 3684 3685 3686
		ut_a(slot_free < slot_rec);
		/* Grow the free list by one slot by moving the start. */
		slot_free += PAGE_ZIP_DIR_SLOT_SIZE;
	}

marko's avatar
marko committed
3687
	if (UNIV_LIKELY(slot_rec > slot_free)) {
3688 3689
		memmove(slot_free + PAGE_ZIP_DIR_SLOT_SIZE,
			slot_free,
marko's avatar
marko committed
3690
			slot_rec - slot_free);
3691 3692 3693 3694
	}

	/* Write the entry for the deleted record.
	The "owned" and "deleted" flags will be cleared. */
3695
	mach_write_to_2(slot_free, page_offset(rec));
3696

3697 3698 3699 3700 3701
	if (!page_is_leaf(page) || !dict_index_is_clust(index)) {
		ut_ad(!rec_offs_any_extern(offsets));
		goto skip_blobs;
	}

3702 3703 3704 3705 3706 3707 3708 3709 3710 3711
	n_ext = rec_offs_n_extern(offsets);
	if (UNIV_UNLIKELY(n_ext)) {
		/* Shift and zero fill the array of BLOB pointers. */
		ulint	blob_no;
		byte*	externs;
		byte*	ext_end;

		blob_no = page_zip_get_n_prev_extern(page_zip, rec, index);
		ut_a(blob_no + n_ext <= page_zip->n_blobs);

3712
		externs = page_zip->data + page_zip_get_size(page_zip)
3713
			- (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW)
3714 3715
			* (PAGE_ZIP_DIR_SLOT_SIZE
			   + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
3716 3717

		ext_end = externs - page_zip->n_blobs
3718
			* BTR_EXTERN_FIELD_REF_SIZE;
3719 3720 3721 3722 3723
		externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;

		page_zip->n_blobs -= n_ext;
		/* Shift and zero fill the array. */
		memmove(ext_end + n_ext * BTR_EXTERN_FIELD_REF_SIZE, ext_end,
3724 3725
			(page_zip->n_blobs - blob_no)
			* BTR_EXTERN_FIELD_REF_SIZE);
3726 3727 3728
		memset(ext_end, 0, n_ext * BTR_EXTERN_FIELD_REF_SIZE);
	}

3729
skip_blobs:
3730 3731 3732 3733
	/* The compression algorithm expects info_bits and n_owned
	to be 0 for deleted records. */
	rec[-REC_N_NEW_EXTRA_BYTES] = 0; /* info_bits and n_owned */

3734
	page_zip_clear_rec(page_zip, rec, index, offsets);
marko's avatar
marko committed
3735
}
3736

3737 3738 3739 3740 3741 3742
/**************************************************************************
Add a slot to the dense page directory. */

void
page_zip_dir_add_slot(
/*==================*/
3743 3744 3745
	page_zip_des_t*	page_zip,	/* in/out: compressed page */
	ulint		is_clustered)	/* in: nonzero for clustered index,
					zero for others */
3746 3747 3748 3749 3750
{
	ulint	n_dense;
	byte*	dir;
	byte*	stored;

3751
	ut_ad(page_is_comp(page_zip->data));
3752
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
3753

3754 3755 3756
	/* Read the old n_dense (n_heap has already been incremented). */
	n_dense = page_dir_get_n_heap(page_zip->data)
		- (PAGE_HEAP_NO_USER_LOW + 1);
3757

3758
	dir = page_zip->data + page_zip_get_size(page_zip)
3759
		- PAGE_ZIP_DIR_SLOT_SIZE * n_dense;
3760

3761 3762 3763 3764
	if (!page_is_leaf(page_zip->data)) {
		ut_ad(!page_zip->n_blobs);
		stored = dir - n_dense * REC_NODE_PTR_SIZE;
	} else if (UNIV_UNLIKELY(is_clustered)) {
3765 3766 3767 3768
		/* Move the BLOB pointer array backwards to make space for the
		roll_ptr and trx_id columns and the dense directory slot. */
		byte*	externs;

3769
		stored = dir - n_dense
3770 3771
			* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
		externs = stored
3772
			- page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
3773 3774 3775 3776 3777
		ut_ad(!memcmp(zero, externs
			      - (PAGE_ZIP_DIR_SLOT_SIZE
				 + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN),
			      PAGE_ZIP_DIR_SLOT_SIZE
			      + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN));
3778
		memmove(externs - (PAGE_ZIP_DIR_SLOT_SIZE
3779
				   + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN),
3780
			externs, stored - externs);
3781
	} else {
3782 3783
		stored = dir
			- page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
marko's avatar
marko committed
3784
		ut_ad(!memcmp(zero, stored - PAGE_ZIP_DIR_SLOT_SIZE,
3785
			      PAGE_ZIP_DIR_SLOT_SIZE));
3786 3787
	}

3788 3789 3790
	/* Move the uncompressed area backwards to make space
	for one directory slot. */
	memmove(stored - PAGE_ZIP_DIR_SLOT_SIZE, stored, dir - stored);
3791 3792
}

3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807
/***************************************************************
Parses a log record of writing to the header of a page. */

byte*
page_zip_parse_write_header(
/*========================*/
				/* out: end of log record or NULL */
	byte*		ptr,	/* in: redo log buffer */
	byte*		end_ptr,/* in: redo log buffer end */
	page_t*		page,	/* in/out: uncompressed page */
	page_zip_des_t*	page_zip)/* in/out: compressed page */
{
	ulint	offset;
	ulint	len;

3808
	ut_ad(ptr && end_ptr);
3809 3810
	ut_ad(!page == !page_zip);

3811
	if (UNIV_UNLIKELY(end_ptr < ptr + (1 + 1))) {
3812 3813 3814 3815

		return(NULL);
	}

3816
	offset = (ulint) *ptr++;
3817 3818
	len = (ulint) *ptr++;

3819 3820
	if (UNIV_UNLIKELY(!len) || UNIV_UNLIKELY(offset + len >= PAGE_DATA)) {
corrupt:
3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831
		recv_sys->found_corrupt_log = TRUE;

		return(NULL);
	}

	if (UNIV_UNLIKELY(end_ptr < ptr + len)) {

		return(NULL);
	}

	if (page) {
3832 3833 3834 3835
		if (UNIV_UNLIKELY(!page_zip)) {

			goto corrupt;
		}
3836
#ifdef UNIV_ZIP_DEBUG
3837
		ut_a(page_zip_validate(page_zip, page));
3838
#endif /* UNIV_ZIP_DEBUG */
3839 3840 3841 3842

		memcpy(page + offset, ptr, len);
		memcpy(page_zip->data + offset, ptr, len);

3843
#ifdef UNIV_ZIP_DEBUG
3844
		ut_a(page_zip_validate(page_zip, page));
3845
#endif /* UNIV_ZIP_DEBUG */
3846 3847 3848 3849 3850
	}

	return(ptr + len);
}

3851 3852 3853 3854 3855 3856
/**************************************************************************
Write a log record of writing to the uncompressed header portion of a page. */

void
page_zip_write_header_log(
/*======================*/
3857 3858 3859
	const byte*	data,	/* in: data on the uncompressed page */
	ulint		length,	/* in: length of the data */
	mtr_t*		mtr)	/* in: mini-transaction */
3860
{
3861
	byte*	log_ptr	= mlog_open(mtr, 11 + 1 + 1);
3862
	ulint	offset	= page_offset(data);
3863

3864 3865 3866 3867 3868 3869 3870 3871
	ut_ad(offset < PAGE_DATA);
	ut_ad(offset + length < PAGE_DATA);
#if PAGE_DATA > 255
# error "PAGE_DATA > 255"
#endif
	ut_ad(length < 256);

	/* If no logging is requested, we may return now */
3872
	if (UNIV_UNLIKELY(!log_ptr)) {
3873 3874 3875 3876

		return;
	}

3877 3878
	log_ptr = mlog_write_initial_log_record_fast(
		(byte*) data, MLOG_ZIP_WRITE_HEADER, log_ptr, mtr);
3879
	*log_ptr++ = (byte) offset;
3880
	*log_ptr++ = (byte) length;
3881 3882
	mlog_close(mtr, log_ptr);

3883
	mlog_catenate_string(mtr, data, length);
3884
}
3885

3886 3887 3888
/**************************************************************************
Reorganize and compress a page.  This is a low-level operation for
compressed pages, to be used when page_zip_compress() fails.
3889
On success, a redo log entry MLOG_ZIP_PAGE_COMPRESS will be written.
3890 3891 3892 3893 3894
The function btr_page_reorganize() should be preferred whenever possible.
IMPORTANT: if page_zip_reorganize() is invoked on a leaf page of a
non-clustered index, the caller must update the insert buffer free
bits in the same mini-transaction in such a way that the modification
will be redo-logged. */
3895 3896 3897 3898 3899 3900 3901

ibool
page_zip_reorganize(
/*================*/
				/* out: TRUE on success, FALSE on failure;
				page and page_zip will be left intact
				on failure. */
3902 3903
	buf_block_t*	block,	/* in/out: page with compressed page;
				on the compressed page, in: size;
3904 3905
				out: data, n_blobs,
				m_start, m_end, m_nonempty */
3906 3907 3908
	dict_index_t*	index,	/* in: index of the B-tree node */
	mtr_t*		mtr)	/* in: mini-transaction */
{
3909 3910
	page_zip_des_t*	page_zip	= buf_block_get_page_zip(block);
	page_t*		page		= buf_block_get_frame(block);
3911
	buf_block_t*	temp_block;
3912 3913
	page_t*		temp_page;
	ulint		log_mode;
3914

marko's avatar
marko committed
3915
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3916 3917
	ut_ad(page_is_comp(page));
	/* Note that page_zip_validate(page_zip, page) may fail here. */
3918 3919
	UNIV_MEM_ASSERT_RW(page, UNIV_PAGE_SIZE);
	UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
3920 3921 3922 3923

	/* Disable logging */
	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);

3924 3925
	temp_block = buf_block_alloc(0);
	temp_page = temp_block->frame;
3926 3927 3928 3929 3930 3931 3932

	/* Copy the old page to temporary space */
	buf_frame_copy(temp_page, page);

	/* Recreate the page: note that global data on page (possible
	segment headers, next page-field, etc.) is preserved intact */

3933
	page_create(block, mtr, dict_table_is_comp(index->table));
3934
	block->check_index_page_at_flush = TRUE;
3935 3936 3937 3938

	/* Copy the records from the temporary space to the recreated page;
	do not copy the lock bits yet */

3939 3940
	page_copy_rec_list_end_no_locks(block, temp_block,
					page_get_infimum_rec(temp_page),
3941
					index, mtr);
3942
	/* Copy max trx id to recreated page */
marko's avatar
marko committed
3943
	page_set_max_trx_id(block, NULL, page_get_max_trx_id(temp_page));
3944

3945 3946 3947
	/* Restore logging. */
	mtr_set_log_mode(mtr, log_mode);

3948 3949 3950 3951 3952
	if (UNIV_UNLIKELY(!page_zip_compress(page_zip, page, index, mtr))) {

		/* Restore the old page and exit. */
		buf_frame_copy(page, temp_page);

3953
		buf_block_free(temp_block);
3954 3955 3956
		return(FALSE);
	}

3957
	lock_move_reorganize_page(block, temp_block);
3958
	btr_search_drop_page_hash_index(block);
3959

3960
	buf_block_free(temp_block);
3961 3962 3963
	return(TRUE);
}

3964 3965 3966 3967 3968 3969
/**************************************************************************
Copy a page byte for byte, except for the file page header and trailer. */

void
page_zip_copy(
/*==========*/
3970 3971
	page_zip_des_t*		page_zip,	/* out: copy of src_zip
						(n_blobs, m_start, m_end,
3972
						m_nonempty, data[0..size-1]) */
3973 3974 3975
	page_t*			page,		/* out: copy of src */
	const page_zip_des_t*	src_zip,	/* in: compressed page */
	const page_t*		src,		/* in: page */
3976
	dict_index_t*		index,		/* in: index of the B-tree */
3977 3978
	mtr_t*			mtr)		/* in: mini-transaction */
{
3979 3980
	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
	ut_ad(mtr_memo_contains_page(mtr, (page_t*) src, MTR_MEMO_PAGE_X_FIX));
3981
#ifdef UNIV_ZIP_DEBUG
3982
	ut_a(page_zip_validate(src_zip, src));
3983
#endif /* UNIV_ZIP_DEBUG */
3984
	ut_a(page_zip_get_size(page_zip) == page_zip_get_size(src_zip));
3985
	if (UNIV_UNLIKELY(src_zip->n_blobs)) {
3986
		ut_a(page_is_leaf(src));
3987 3988
		ut_a(dict_index_is_clust(index));
	}
3989

3990 3991 3992 3993 3994
	UNIV_MEM_ASSERT_W(page, UNIV_PAGE_SIZE);
	UNIV_MEM_ASSERT_W(page_zip->data, page_zip_get_size(page_zip));
	UNIV_MEM_ASSERT_RW(src, UNIV_PAGE_SIZE);
	UNIV_MEM_ASSERT_RW(src_zip->data, page_zip_get_size(page_zip));

3995 3996
	/* Skip the file page header and trailer. */
	memcpy(page + FIL_PAGE_DATA, src + FIL_PAGE_DATA,
3997 3998
	       UNIV_PAGE_SIZE - FIL_PAGE_DATA
	       - FIL_PAGE_DATA_END);
3999
	memcpy(page_zip->data + FIL_PAGE_DATA,
4000
	       src_zip->data + FIL_PAGE_DATA,
4001
	       page_zip_get_size(page_zip) - FIL_PAGE_DATA);
4002

4003 4004 4005 4006 4007
	{
		page_zip_t*	data = page_zip->data;
		memcpy(page_zip, src_zip, sizeof *page_zip);
		page_zip->data = data;
	}
4008 4009
	ut_ad(page_zip_get_trailer_len(page_zip,
				       dict_index_is_clust(index), NULL)
4010
	      + page_zip->m_end < page_zip_get_size(page_zip));
4011

4012
	if (!page_is_leaf(src)
4013
	    && UNIV_UNLIKELY(mach_read_from_4(src + FIL_PAGE_PREV) == FIL_NULL)
4014 4015
	    && UNIV_LIKELY(mach_read_from_4(page
					    + FIL_PAGE_PREV) != FIL_NULL)) {
4016
		/* Clear the REC_INFO_MIN_REC_FLAG of the first user record. */
4017 4018
		ulint	offs = rec_get_next_offs(page + PAGE_NEW_INFIMUM,
						 TRUE);
4019 4020 4021
		if (UNIV_LIKELY(offs != PAGE_NEW_SUPREMUM)) {
			rec_t*	rec = page + offs;
			ut_a(rec[-REC_N_NEW_EXTRA_BYTES]
4022
			     & REC_INFO_MIN_REC_FLAG);
4023 4024 4025 4026
			rec[-REC_N_NEW_EXTRA_BYTES] &= ~ REC_INFO_MIN_REC_FLAG;
		}
	}

4027
#ifdef UNIV_ZIP_DEBUG
4028
	ut_a(page_zip_validate(page_zip, page));
4029
#endif /* UNIV_ZIP_DEBUG */
4030

4031
	page_zip_compress_write_log(page_zip, page, index, mtr);
4032 4033
}

4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046
/**************************************************************************
Parses a log record of compressing an index page. */

byte*
page_zip_parse_compress(
/*====================*/
				/* out: end of log record or NULL */
	byte*		ptr,	/* in: buffer */
	byte*		end_ptr,/* in: buffer end */
	page_t*		page,	/* out: uncompressed page */
	page_zip_des_t*	page_zip)/* out: compressed page */
{
	ulint	size;
4047
	ulint	trailer_size;
4048 4049

	ut_ad(ptr && end_ptr);
4050
	ut_ad(!page == !page_zip);
4051

4052
	if (UNIV_UNLIKELY(ptr + (2 + 2) > end_ptr)) {
4053 4054 4055 4056 4057 4058

		return(NULL);
	}

	size = mach_read_from_2(ptr);
	ptr += 2;
4059 4060
	trailer_size = mach_read_from_2(ptr);
	ptr += 2;
4061

4062
	if (UNIV_UNLIKELY(ptr + 8 + size + trailer_size > end_ptr)) {
4063 4064 4065 4066 4067 4068

		return(NULL);
	}

	if (page) {
		if (UNIV_UNLIKELY(!page_zip)
4069
		    || UNIV_UNLIKELY(page_zip_get_size(page_zip) < size)) {
4070 4071 4072 4073 4074 4075
corrupt:
			recv_sys->found_corrupt_log = TRUE;

			return(NULL);
		}

4076 4077 4078 4079
		memcpy(page_zip->data + FIL_PAGE_PREV, ptr, 4);
		memcpy(page_zip->data + FIL_PAGE_NEXT, ptr + 4, 4);
		memcpy(page_zip->data + FIL_PAGE_TYPE, ptr + 8, size);
		memset(page_zip->data + FIL_PAGE_TYPE + size, 0,
4080
		       page_zip_get_size(page_zip) - trailer_size
4081
		       - (FIL_PAGE_TYPE + size));
4082 4083
		memcpy(page_zip->data + page_zip_get_size(page_zip)
		       - trailer_size, ptr + 8 + size, trailer_size);
4084

4085 4086 4087 4088 4089 4090
		if (UNIV_UNLIKELY(!page_zip_decompress(page_zip, page))) {

			goto corrupt;
		}
	}

4091
	return(ptr + 8 + size + trailer_size);
4092
}
4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103

/**************************************************************************
Calculate the compressed page checksum. */

ulint
page_zip_calc_checksum(
/*===================*/
				/* out: page checksum */
	const void*	data,	/* in: compressed page */
	ulint		size)	/* in: size of compressed page */
{
4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118
	/* Exclude FIL_PAGE_SPACE_OR_CHKSUM, FIL_PAGE_LSN,
	and FIL_PAGE_FILE_FLUSH_LSN from the checksum. */

	const Bytef*	s	= data;
	uLong		adler;

	ut_ad(size > FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);

	adler = adler32(0L, s + FIL_PAGE_OFFSET,
			FIL_PAGE_LSN - FIL_PAGE_OFFSET);
	adler = adler32(adler, s + FIL_PAGE_TYPE, 2);
	adler = adler32(adler, s + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
			size - FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);

	return((ulint) adler);
4119
}