data0data.c 15.6 KB
Newer Older
osku's avatar
osku committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/************************************************************************
SQL data field and tuple

(c) 1994-1996 Innobase Oy

Created 5/30/1994 Heikki Tuuri
*************************************************************************/

#include "data0data.h"

#ifdef UNIV_NONINL
#include "data0data.ic"
#endif

#include "rem0rec.h"
#include "rem0cmp.h"
#include "page0page.h"
marko's avatar
marko committed
18
#include "page0zip.h"
osku's avatar
osku committed
19 20 21
#include "dict0dict.h"
#include "btr0cur.h"

22
#ifdef UNIV_DEBUG
osku's avatar
osku committed
23 24 25 26 27 28 29 30
byte	data_error;	/* data pointers of tuple fields are initialized
			to point here for error checking */

ulint	data_dummy;	/* this is used to fool the compiler in
			dtuple_validate */
#endif /* UNIV_DEBUG */

/* Some non-inlined functions used in the MySQL interface: */
31
void
osku's avatar
osku committed
32
dfield_set_data_noninline(
33
	dfield_t*	field,	/* in: field */
osku's avatar
osku committed
34 35 36 37 38
	void*		data,	/* in: data */
	ulint		len)	/* in: length or UNIV_SQL_NULL */
{
	dfield_set_data(field, data, len);
}
39
void*
osku's avatar
osku committed
40 41 42 43 44 45 46 47 48 49 50
dfield_get_data_noninline(
	dfield_t* field)	/* in: field */
{
	return(dfield_get_data(field));
}
ulint
dfield_get_len_noninline(
	dfield_t* field)	/* in: field */
{
	return(dfield_get_len(field));
}
51
ulint
osku's avatar
osku committed
52
dtuple_get_n_fields_noninline(
53
	dtuple_t*	tuple)	/* in: tuple */
osku's avatar
osku committed
54 55 56
{
	return(dtuple_get_n_fields(tuple));
}
57
dfield_t*
osku's avatar
osku committed
58
dtuple_get_nth_field_noninline(
59
	dtuple_t*	tuple,	/* in: tuple */
osku's avatar
osku committed
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
	ulint		n)	/* in: index of field */
{
	return(dtuple_get_nth_field(tuple, n));
}

/*************************************************************************
Tests if dfield data length and content is equal to the given. */

ibool
dfield_data_is_binary_equal(
/*========================*/
				/* out: TRUE if equal */
	dfield_t*	field,	/* in: field */
	ulint		len,	/* in: data length or UNIV_SQL_NULL */
	byte*		data)	/* in: data */
{
	if (len != field->len) {

		return(FALSE);
	}

	if (len == UNIV_SQL_NULL) {

		return(TRUE);
	}

	if (0 != ut_memcmp(field->data, data, len)) {
87

osku's avatar
osku committed
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
		return(FALSE);
	}

	return(TRUE);
}

/****************************************************************
Returns TRUE if lengths of two dtuples are equal and respective data fields
in them are equal when compared with collation in char fields (not as binary
strings). */

ibool
dtuple_datas_are_ordering_equal(
/*============================*/
				/* out: TRUE if length and fieds are equal
				when compared with cmp_data_data:
				NOTE: in character type fields some letters
				are identified with others! (collation) */
	dtuple_t*	tuple1,	/* in: tuple 1 */
	dtuple_t*	tuple2)	/* in: tuple 2 */
{
	dfield_t*	field1;
	dfield_t*	field2;
	ulint		n_fields;
	ulint		i;

	ut_ad(tuple1 && tuple2);
	ut_ad(tuple1->magic_n == DATA_TUPLE_MAGIC_N);
	ut_ad(tuple2->magic_n == DATA_TUPLE_MAGIC_N);
	ut_ad(dtuple_check_typed(tuple1));
	ut_ad(dtuple_check_typed(tuple2));

	n_fields = dtuple_get_n_fields(tuple1);

	if (n_fields != dtuple_get_n_fields(tuple2)) {

		return(FALSE);
	}
126

osku's avatar
osku committed
127 128 129 130 131 132
	for (i = 0; i < n_fields; i++) {

		field1 = dtuple_get_nth_field(tuple1, i);
		field2 = dtuple_get_nth_field(tuple2, i);

		if (0 != cmp_dfield_dfield(field1, field2)) {
133

osku's avatar
osku committed
134
			return(FALSE);
135
		}
osku's avatar
osku committed
136
	}
137

osku's avatar
osku committed
138 139 140 141 142 143 144 145 146 147
	return(TRUE);
}

/*************************************************************************
Creates a dtuple for use in MySQL. */

dtuple_t*
dtuple_create_for_mysql(
/*====================*/
				/* out, own created dtuple */
148 149
	void**	heap,		/* out: created memory heap */
	ulint	n_fields)	/* in: number of fields */
osku's avatar
osku committed
150
{
151 152 153
	*heap = (void*)mem_heap_create(500);

	return(dtuple_create(*((mem_heap_t**)heap), n_fields));
osku's avatar
osku committed
154 155 156 157 158 159 160 161 162 163
}

/*************************************************************************
Frees a dtuple used in MySQL. */

void
dtuple_free_for_mysql(
/*==================*/
	void*	heap) /* in: memory heap where tuple was created */
{
164
	mem_heap_free((mem_heap_t*)heap);
osku's avatar
osku committed
165 166 167 168
}

/*************************************************************************
Sets number of fields used in a tuple. Normally this is set in
169
dtuple_create, but if you want later to set it smaller, you can use this. */
osku's avatar
osku committed
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192

void
dtuple_set_n_fields(
/*================*/
	dtuple_t*	tuple,		/* in: tuple */
	ulint		n_fields)	/* in: number of fields */
{
	ut_ad(tuple);

	tuple->n_fields = n_fields;
	tuple->n_fields_cmp = n_fields;
}

/**************************************************************
Checks that a data field is typed. */
static
ibool
dfield_check_typed_no_assert(
/*=========================*/
				/* out: TRUE if ok */
	dfield_t*	field)	/* in: data field */
{
	if (dfield_get_type(field)->mtype > DATA_MYSQL
193
	    || dfield_get_type(field)->mtype < DATA_VARCHAR) {
osku's avatar
osku committed
194 195

		fprintf(stderr,
196
			"InnoDB: Error: data field type %lu, len %lu\n",
osku's avatar
osku committed
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
			(ulong) dfield_get_type(field)->mtype,
			(ulong) dfield_get_len(field));
		return(FALSE);
	}

	return(TRUE);
}

/**************************************************************
Checks that a data tuple is typed. */

ibool
dtuple_check_typed_no_assert(
/*=========================*/
				/* out: TRUE if ok */
	dtuple_t*	tuple)	/* in: tuple */
{
	dfield_t*	field;
215 216
	ulint		i;

osku's avatar
osku committed
217 218
	if (dtuple_get_n_fields(tuple) > REC_MAX_N_FIELDS) {
		fprintf(stderr,
219
			"InnoDB: Error: index entry has %lu fields\n",
osku's avatar
osku committed
220
			(ulong) dtuple_get_n_fields(tuple));
221
dump:
osku's avatar
osku committed
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
		fputs("InnoDB: Tuple contents: ", stderr);
		dtuple_print(stderr, tuple);
		putc('\n', stderr);

		return(FALSE);
	}

	for (i = 0; i < dtuple_get_n_fields(tuple); i++) {

		field = dtuple_get_nth_field(tuple, i);

		if (!dfield_check_typed_no_assert(field)) {
			goto dump;
		}
	}

	return(TRUE);
}

/**************************************************************
Checks that a data field is typed. Asserts an error if not. */

ibool
dfield_check_typed(
/*===============*/
				/* out: TRUE if ok */
	dfield_t*	field)	/* in: data field */
{
	if (dfield_get_type(field)->mtype > DATA_MYSQL
251
	    || dfield_get_type(field)->mtype < DATA_VARCHAR) {
osku's avatar
osku committed
252 253

		fprintf(stderr,
254
			"InnoDB: Error: data field type %lu, len %lu\n",
osku's avatar
osku committed
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
			(ulong) dfield_get_type(field)->mtype,
			(ulong) dfield_get_len(field));

		ut_error;
	}

	return(TRUE);
}

/**************************************************************
Checks that a data tuple is typed. Asserts an error if not. */

ibool
dtuple_check_typed(
/*===============*/
				/* out: TRUE if ok */
	dtuple_t*	tuple)	/* in: tuple */
{
	dfield_t*	field;
274
	ulint		i;
osku's avatar
osku committed
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297

	for (i = 0; i < dtuple_get_n_fields(tuple); i++) {

		field = dtuple_get_nth_field(tuple, i);

		ut_a(dfield_check_typed(field));
	}

	return(TRUE);
}

#ifdef UNIV_DEBUG
/**************************************************************
Validates the consistency of a tuple which must be complete, i.e,
all fields must have been set. */

ibool
dtuple_validate(
/*============*/
				/* out: TRUE if ok */
	dtuple_t*	tuple)	/* in: tuple */
{
	dfield_t*	field;
298 299 300 301 302
	byte*		data;
	ulint		n_fields;
	ulint		len;
	ulint		i;
	ulint		j;
osku's avatar
osku committed
303 304 305 306 307 308 309 310 311 312 313 314

	ut_ad(tuple->magic_n == DATA_TUPLE_MAGIC_N);

	n_fields = dtuple_get_n_fields(tuple);

	/* We dereference all the data of each field to test
	for memory traps */

	for (i = 0; i < n_fields; i++) {

		field = dtuple_get_nth_field(tuple, i);
		len = dfield_get_len(field);
315

osku's avatar
osku committed
316 317 318 319 320 321 322
		if (len != UNIV_SQL_NULL) {

			data = field->data;

			for (j = 0; j < len; j++) {

				data_dummy  += *data; /* fool the compiler not
323 324
						      to optimize out this
						      code */
osku's avatar
osku committed
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
				data++;
			}
		}
	}

	ut_a(dtuple_check_typed(tuple));

	return(TRUE);
}
#endif /* UNIV_DEBUG */

/*****************************************************************
Pretty prints a dfield value according to its data type. */

void
dfield_print(
/*=========*/
342
	dfield_t*	dfield)	/* in: dfield */
osku's avatar
osku committed
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
{
	byte*	data;
	ulint	len;
	ulint	mtype;
	ulint	i;

	len = dfield_get_len(dfield);
	data = dfield_get_data(dfield);

	if (len == UNIV_SQL_NULL) {
		fputs("NULL", stderr);

		return;
	}

	mtype = dtype_get_mtype(dfield_get_type(dfield));

	if ((mtype == DATA_CHAR) || (mtype == DATA_VARCHAR)) {
361

osku's avatar
osku committed
362 363 364 365 366 367 368 369 370 371 372 373 374 375
		for (i = 0; i < len; i++) {
			int	c = *data++;
			putc(isprint(c) ? c : ' ', stderr);
		}
	} else if (mtype == DATA_INT) {
		ut_a(len == 4); /* only works for 32-bit integers */
		fprintf(stderr, "%d", (int)mach_read_from_4(data));
	} else {
		ut_error;
	}
}

/*****************************************************************
Pretty prints a dfield value according to its data type. Also the hex string
376
is printed if a string contains non-printable characters. */
osku's avatar
osku committed
377 378 379 380

void
dfield_print_also_hex(
/*==================*/
381
	dfield_t*	dfield)	/* in: dfield */
osku's avatar
osku committed
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
{
	byte*	data;
	ulint	len;
	ulint	mtype;
	ulint	i;
	ibool	print_also_hex;

	len = dfield_get_len(dfield);
	data = dfield_get_data(dfield);

	if (len == UNIV_SQL_NULL) {
		fputs("NULL", stderr);

		return;
	}

	mtype = dtype_get_mtype(dfield_get_type(dfield));

	if ((mtype == DATA_CHAR) || (mtype == DATA_VARCHAR)) {

		print_also_hex = FALSE;
403

osku's avatar
osku committed
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
		for (i = 0; i < len; i++) {
			int c = *data++;
			if (!isprint(c)) {
				print_also_hex = TRUE;
				c = ' ';
			}
			putc(c, stderr);
		}

		if (!print_also_hex) {

			return;
		}

		fputs(" Hex: ", stderr);
419

osku's avatar
osku committed
420
		data = dfield_get_data(dfield);
421

osku's avatar
osku committed
422 423 424 425 426 427 428 429 430 431 432 433 434
		for (i = 0; i < len; i++) {
			fprintf(stderr, "%02lx", (ulint)*data);

			data++;
		}
	} else if (mtype == DATA_INT) {
		ut_a(len == 4); /* only works for 32-bit integers */
		fprintf(stderr, "%d", (int)mach_read_from_4(data));
	} else {
		ut_error;
	}
}

435 436
/*****************************************************************
Print a dfield value using ut_print_buf. */
437
static
438 439 440 441 442 443
void
dfield_print_raw(
/*=============*/
	FILE*		f,		/* in: output stream */
	dfield_t*	dfield)		/* in: dfield */
{
444 445 446 447 448 449 450
	ulint	len	= dfield->len;
	if (len != UNIV_SQL_NULL) {
		ulint	print_len = ut_min(len, 1000);
		ut_print_buf(f, dfield->data, print_len);
		if (len != print_len) {
			fprintf(f, "(total %lu bytes)", (ulong) len);
		}
451 452 453 454 455
	} else {
		fputs(" SQL NULL", f);
	}
}

osku's avatar
osku committed
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
/**************************************************************
The following function prints the contents of a tuple. */

void
dtuple_print(
/*=========*/
	FILE*		f,	/* in: output stream */
	dtuple_t*	tuple)	/* in: tuple */
{
	ulint		n_fields;
	ulint		i;

	n_fields = dtuple_get_n_fields(tuple);

	fprintf(f, "DATA TUPLE: %lu fields;\n", (ulong) n_fields);

	for (i = 0; i < n_fields; i++) {
		fprintf(f, " %lu:", (ulong) i);

475
		dfield_print_raw(f, dtuple_get_nth_field(tuple, i));
osku's avatar
osku committed
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495

		putc(';', f);
	}

	putc('\n', f);
	ut_ad(dtuple_validate(tuple));
}

/******************************************************************
Moves parts of long fields in entry to the big record vector so that
the size of tuple drops below the maximum record size allowed in the
database. Moves data only from those fields which are not necessary
to determine uniquely the insertion place of the tuple in the index. */

big_rec_t*
dtuple_convert_big_rec(
/*===================*/
				/* out, own: created big record vector,
				NULL if we are not able to shorten
				the entry enough, i.e., if there are
496 497
				too many fixed-length or short fields
				in entry or the index is clustered */
osku's avatar
osku committed
498 499
	dict_index_t*	index,	/* in: index */
	dtuple_t*	entry,	/* in: index entry */
500
	const ulint*	ext_vec,/* in: array of externally stored fields,
osku's avatar
osku committed
501 502 503 504 505 506 507 508
				or NULL: if a field already is externally
				stored, then we cannot move it to the vector
				this function returns */
	ulint		n_ext_vec)/* in: number of elements is ext_vec */
{
	mem_heap_t*	heap;
	big_rec_t*	vector;
	dfield_t*	dfield;
509
	dict_field_t*	ifield;
osku's avatar
osku committed
510 511
	ulint		size;
	ulint		n_fields;
512

513 514 515 516
	if (UNIV_UNLIKELY(!dict_index_is_clust(index))) {
		return(NULL);
	}

osku's avatar
osku committed
517 518 519 520 521 522
	ut_a(dtuple_check_typed_no_assert(entry));

	size = rec_get_converted_size(index, entry);

	if (UNIV_UNLIKELY(size > 1000000000)) {
		fprintf(stderr,
523 524
			"InnoDB: Warning: tuple size very big: %lu\n",
			(ulong) size);
osku's avatar
osku committed
525 526 527 528 529 530
		fputs("InnoDB: Tuple contents: ", stderr);
		dtuple_print(stderr, entry);
		putc('\n', stderr);
	}

	heap = mem_heap_create(size + dtuple_get_n_fields(entry)
531
			       * sizeof(big_rec_field_t) + 1000);
osku's avatar
osku committed
532 533 534 535 536 537 538 539

	vector = mem_heap_alloc(heap, sizeof(big_rec_t));

	vector->heap = heap;
	vector->fields = mem_heap_alloc(heap, dtuple_get_n_fields(entry)
					* sizeof(big_rec_field_t));

	/* Decide which fields to shorten: the algorithm is to look for
540 541
	a variable-length field that yields the biggest savings when
	stored externally */
osku's avatar
osku committed
542 543 544

	n_fields = 0;

545
	while (page_zip_rec_needs_ext(rec_get_converted_size(index, entry),
546 547
				      dict_table_is_comp(index->table),
				      dict_table_zip_size(index->table))) {
548 549 550
		ulint	i;
		ulint	longest		= 0;
		ulint	longest_i	= ULINT_MAX;
osku's avatar
osku committed
551 552

		for (i = dict_index_get_n_unique_in_tree(index);
553
		     i < dtuple_get_n_fields(entry); i++) {
osku's avatar
osku committed
554

555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587
			ulint	min_prefix;

			dfield = dtuple_get_nth_field(entry, i);
			ifield = dict_index_get_nth_field(index, i);

			/* Skip fixed-length or NULL or short columns */

			if (ifield->fixed_len
			    || dfield->len == UNIV_SQL_NULL
			    || dfield->len <= (BTR_EXTERN_FIELD_REF_SIZE
					       + REC_1BYTE_OFFS_LIMIT + 1)) {
				continue;
			}

			min_prefix = ifield->col->min_prefix;

			/* Skip indexed columns */
			if (min_prefix == ULINT_UNDEFINED) {
				continue;
			}

			if (min_prefix < (REC_1BYTE_OFFS_LIMIT + 1
					  - BTR_EXTERN_FIELD_REF_SIZE)) {
				min_prefix = (REC_1BYTE_OFFS_LIMIT + 1
					      - BTR_EXTERN_FIELD_REF_SIZE);
			}

			/* Check that there would be savings */
			if (longest >= dfield->len - min_prefix) {
				continue;
			}

			/* Skip externally stored columns */
osku's avatar
osku committed
588 589

			if (ext_vec) {
590 591
				ulint	j;

osku's avatar
osku committed
592 593
				for (j = 0; j < n_ext_vec; j++) {
					if (ext_vec[j] == i) {
marko's avatar
marko committed
594
						goto is_externally_stored;
osku's avatar
osku committed
595 596 597
					}
				}
			}
598

599 600
			longest_i = i;
			longest = dfield->len - min_prefix;
marko's avatar
marko committed
601 602 603

is_externally_stored:
			continue;
osku's avatar
osku committed
604
		}
605

606
		if (!longest) {
osku's avatar
osku committed
607 608 609 610 611 612 613
			/* Cannot shorten more */

			mem_heap_free(heap);

			return(NULL);
		}

614
		/* Move data from field longest_i to big rec vector.
osku's avatar
osku committed
615 616 617 618 619 620

		We store the first bytes locally to the record. Then
		we can calculate all ordering fields in all indexes
		from locally stored data. */

		dfield = dtuple_get_nth_field(entry, longest_i);
621
		ifield = dict_index_get_nth_field(index, longest_i);
osku's avatar
osku committed
622 623
		vector->fields[n_fields].field_no = longest_i;

624 625 626 627
		vector->fields[n_fields].len
			= dfield->len - ut_max(REC_1BYTE_OFFS_LIMIT + 1
					       - BTR_EXTERN_FIELD_REF_SIZE,
					       ifield->col->min_prefix);
628

629 630
		vector->fields[n_fields].data
			= mem_heap_alloc(heap, vector->fields[n_fields].len);
osku's avatar
osku committed
631 632 633

		/* Copy data (from the end of field) to big rec vector */

634 635 636 637 638 639
		memcpy(vector->fields[n_fields].data,
		       ((byte*)dfield->data) + dfield->len
		       - vector->fields[n_fields].len,
		       vector->fields[n_fields].len);
		dfield->len -= vector->fields[n_fields].len
			- BTR_EXTERN_FIELD_REF_SIZE;
osku's avatar
osku committed
640 641 642

		/* Set the extern field reference in dfield to zero */
		memset(((byte*)dfield->data)
643 644
		       + dfield->len - BTR_EXTERN_FIELD_REF_SIZE,
		       0, BTR_EXTERN_FIELD_REF_SIZE);
osku's avatar
osku committed
645
		n_fields++;
646
	}
osku's avatar
osku committed
647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665

	vector->n_fields = n_fields;
	return(vector);
}

/******************************************************************
Puts back to entry the data stored in vector. Note that to ensure the
fields in entry can accommodate the data, vector must have been created
from entry with dtuple_convert_big_rec. */

void
dtuple_convert_back_big_rec(
/*========================*/
	dict_index_t*	index __attribute__((unused)),	/* in: index */
	dtuple_t*	entry,	/* in: entry whose data was put to vector */
	big_rec_t*	vector)	/* in, own: big rec vector; it is
				freed in this function */
{
	dfield_t*	dfield;
666
	ulint		i;
osku's avatar
osku committed
667 668

	for (i = 0; i < vector->n_fields; i++) {
669

osku's avatar
osku committed
670
		dfield = dtuple_get_nth_field(entry,
671
					      vector->fields[i].field_no);
osku's avatar
osku committed
672 673
		/* Copy data from big rec vector */

674 675 676 677
		memcpy((byte*) dfield->data
		       + dfield->len - BTR_EXTERN_FIELD_REF_SIZE,
		       vector->fields[i].data, vector->fields[i].len);
		dfield->len += vector->fields[i].len
678
			- BTR_EXTERN_FIELD_REF_SIZE;
679
	}
osku's avatar
osku committed
680 681 682 683 684 685 686 687 688 689 690 691 692 693 694

	mem_heap_free(vector->heap);
}

/******************************************************************
Frees the memory in a big rec vector. */

void
dtuple_big_rec_free(
/*================*/
	big_rec_t*	vector)	/* in, own: big rec vector; it is
				freed in this function */
{
	mem_heap_free(vector->heap);
}