row0merge.c 54.7 KB
Newer Older
1 2 3
/******************************************************
New index creation routines using a merge sort

4
(c) 2005,2007 Innobase Oy
5 6

Created 12/4/2005 Jan Lindstrom
7
Completed by Sunny Bains and Marko Makela
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
*******************************************************/

#include "row0merge.h"
#include "row0ext.h"
#include "row0row.h"
#include "row0upd.h"
#include "row0ins.h"
#include "row0sel.h"
#include "dict0dict.h"
#include "dict0mem.h"
#include "dict0boot.h"
#include "dict0crea.h"
#include "dict0load.h"
#include "btr0btr.h"
#include "mach0data.h"
#include "trx0rseg.h"
#include "trx0trx.h"
#include "trx0roll.h"
#include "trx0undo.h"
#include "trx0purge.h"
#include "trx0rec.h"
#include "que0que.h"
#include "rem0cmp.h"
#include "read0read.h"
#include "os0file.h"
#include "lock0lock.h"
#include "data0data.h"
#include "data0type.h"
#include "que0que.h"
#include "pars0pars.h"
#include "mem0mem.h"
#include "log0log.h"
40
#include "ut0sort.h"
41

42 43 44 45 46 47 48
#ifdef UNIV_DEBUG
/* Set these in order ot enable debug printout. */
static ibool	row_merge_print_cmp;
static ibool	row_merge_print_read;
static ibool	row_merge_print_write;
#endif /* UNIV_DEBUG */

49 50
/* Block size for I/O operations in merge sort */

51
typedef byte	row_merge_block_t[16384];
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73

/* Secondary buffer for I/O operations of merge records */

typedef byte	mrec_buf_t[UNIV_PAGE_SIZE / 2];

/* Merge record in row_merge_block_t.  The format is the same as a
record in ROW_FORMAT=COMPACT with the exception that the
REC_N_NEW_EXTRA_BYTES are omitted. */
typedef byte	mrec_t;

/* Buffer for sorting in main memory. */
struct row_merge_buf_struct {
	mem_heap_t*	heap;		/* memory heap where allocated */
	dict_index_t*	index;		/* the index the tuples belong to */
	ulint		total_size;	/* total amount of data bytes */
	ulint		n_tuples;	/* number of data tuples */
	ulint		max_tuples;	/* maximum number of data tuples */
	const dfield_t**tuples;		/* array of pointers to
					arrays of fields that form
					the data tuples */
	const dfield_t**tmp_tuples;	/* temporary copy of tuples,
					for sorting */
74 75
};

76
typedef struct row_merge_buf_struct row_merge_buf_t;
77

78 79
/* Information about temporary files used in merge sort are stored
to this structure */
80

81 82 83
struct merge_file_struct {
	int	fd;		/* File descriptor */
	ulint	offset;		/* File offset */
84 85
};

86
typedef struct merge_file_struct merge_file_t;
87

88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
#ifdef UNIV_DEBUG
/**********************************************************
Display a merge tuple. */
static
void
row_merge_tuple_print(
/*==================*/
	FILE*		f,	/* in: output stream */
	const dfield_t*	entry,	/* in: tuple to print */
	ulint		n_fields)/* in: number of fields in the tuple */
{
	ulint	j;

	for (j = 0; j < n_fields; j++) {
		const dfield_t*	field = &entry[j];

		if (dfield_is_null(field)) {
			fputs("\n NULL;", f);
		} else {
			ulint	len = ut_min(field->len, 20);
			if (dfield_is_ext(field)) {
				fputs("\nE", f);
			} else {
				fputs("\n ", f);
			}
			ut_print_buf(f, field->data, len);
			if (len != field->len) {
				fprintf(f, " (total %lu bytes)",
					(ulong) field->len);
			}
		}
	}
	putc('\n', f);
}
#endif /* UNIV_DEBUG */

124 125
/**********************************************************
Allocate a sort buffer. */
126
static
127 128 129 130 131 132
row_merge_buf_t*
row_merge_buf_create_low(
/*=====================*/
					/* out,own: sort buffer */
	mem_heap_t*	heap,		/* in: heap where allocated */
	dict_index_t*	index,		/* in: secondary index */
133 134
	ulint		max_tuples,	/* in: maximum number of data tuples */
	ulint		buf_size)	/* in: size of the buffer, in bytes */
135
{
136 137
	row_merge_buf_t*	buf;

138 139 140 141
	ut_ad(max_tuples > 0);
	ut_ad(max_tuples <= sizeof(row_merge_block_t));
	ut_ad(max_tuples < buf_size);

142
	buf = mem_heap_zalloc(heap, buf_size);
143 144 145 146 147 148 149 150
	buf->heap = heap;
	buf->index = index;
	buf->max_tuples = max_tuples;
	buf->tuples = mem_heap_alloc(heap,
				     2 * max_tuples * sizeof *buf->tuples);
	buf->tmp_tuples = buf->tuples + max_tuples;

	return(buf);
151 152
}

153 154
/**********************************************************
Allocate a sort buffer. */
155
static
156 157
row_merge_buf_t*
row_merge_buf_create(
158
/*=================*/
159 160
				/* out,own: sort buffer */
	dict_index_t*	index)	/* in: secondary index */
161
{
162 163 164 165
	row_merge_buf_t*	buf;
	ulint			max_tuples;
	ulint			buf_size;
	mem_heap_t*		heap;
166

167 168
	max_tuples = sizeof(row_merge_block_t)
		/ ut_max(1, dict_index_get_min_size(index));
169

170
	buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
171

172
	heap = mem_heap_create(buf_size + sizeof(row_merge_block_t));
173

174
	buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
175

176
	return(buf);
177 178
}

179 180
/**********************************************************
Empty a sort buffer. */
181
static
182
row_merge_buf_t*
183 184
row_merge_buf_empty(
/*================*/
185 186
					/* out: sort buffer */
	row_merge_buf_t*	buf)	/* in,own: sort buffer */
187
{
188 189 190 191
	ulint		buf_size;
	ulint		max_tuples	= buf->max_tuples;
	mem_heap_t*	heap		= buf->heap;
	dict_index_t*	index		= buf->index;
192

193
	buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
194

195
	mem_heap_empty(heap);
196

197
	return(row_merge_buf_create_low(heap, index, max_tuples, buf_size));
198 199
}

200 201
/**********************************************************
Deallocate a sort buffer. */
202
static
203 204 205 206
void
row_merge_buf_free(
/*===============*/
	row_merge_buf_t*	buf)	/* in,own: sort buffer, to be freed */
207
{
208
	mem_heap_free(buf->heap);
209 210
}

211 212
/**********************************************************
Insert a data tuple into a sort buffer. */
213
static
214 215 216 217 218 219 220 221 222
ibool
row_merge_buf_add(
/*==============*/
					/* out: TRUE if added,
					FALSE if out of space */
	row_merge_buf_t*	buf,	/* in/out: sort buffer */
	const dtuple_t*		row,	/* in: row in clustered index */
	row_ext_t*		ext)	/* in/out: cache of externally stored
					column prefixes, or NULL */
223
{
224 225 226 227
	ulint		i;
	ulint		n_fields;
	ulint		data_size;
	ulint		extra_size;
228
	dict_index_t*	index;
229 230
	dfield_t*	entry;
	dfield_t*	field;
231

232 233
	if (buf->n_tuples >= buf->max_tuples) {
		return(FALSE);
234 235
	}

236 237
	UNIV_PREFETCH_R(row->fields);

238 239 240
	index = buf->index;

	n_fields = dict_index_get_n_fields(index);
241 242 243 244 245 246

	entry = mem_heap_alloc(buf->heap, n_fields * sizeof *entry);
	buf->tuples[buf->n_tuples] = entry;
	field = entry;

	data_size = 0;
247
	extra_size = UT_BITS_IN_BYTES(index->n_nullable);
248

249
	for (i = 0; i < n_fields; i++, field++) {
250 251 252 253
		dict_field_t*		ifield;
		const dict_col_t*	col;
		ulint			col_no;
		const dfield_t*		row_field;
254
		ulint			len;
255

256
		ifield = dict_index_get_nth_field(index, i);
257 258 259 260
		col = ifield->col;
		col_no = dict_col_get_no(col);
		row_field = dtuple_get_nth_field(row, col_no);
		dfield_copy(field, row_field);
261
		len = field->len;
262

263 264 265 266 267 268 269
		if (dfield_is_null(field)) {
			ut_ad(!(col->prtype & DATA_NOT_NULL));
			field->data = NULL;
			continue;
		} else if (UNIV_LIKELY(!ext)) {
		} else if (dict_index_is_clust(index)) {
			/* Flag externally stored fields. */
270 271 272 273 274 275 276 277 278
			byte*	buf = row_ext_lookup(ext, col_no,
						     field->data, len, &len);
			if (UNIV_LIKELY_NULL(buf)) {
				if (i < dict_index_get_n_unique(index)) {
					dfield_set_data(field, buf, len);
				} else {
					dfield_set_ext(field);
					len = field->len;
				}
279 280
			}
		} else {
281
			byte*	buf = row_ext_lookup(ext, col_no,
282
						     field->data, len, &len);
283
			if (UNIV_LIKELY_NULL(buf)) {
284
				dfield_set_data(field, buf, len);
285 286
			}
		}
287

288
		/* If a column prefix index, take only the prefix */
289

290
		if (ifield->prefix_len) {
291
			field->len = len = dtype_get_at_most_n_mbchars(
292 293 294
				col->prtype,
				col->mbminlen, col->mbmaxlen,
				ifield->prefix_len,
295
				len, field->data);
296
		}
297

298
		ut_ad(len <= col->len || col->mtype == DATA_BLOB);
299

300
		if (ifield->fixed_len) {
301
			ut_ad(len == ifield->fixed_len);
302 303 304
			ut_ad(!dfield_is_ext(field));
		} else if (dfield_is_ext(field)) {
			extra_size += 2;
305
		} else if (len < 128
306 307 308
			   || (col->len < 256 && col->mtype != DATA_BLOB)) {
			extra_size++;
		} else {
309 310 311 312
			/* For variable-length columns, we look up the
			maximum length from the column itself.  If this
			is a prefix index column shorter than 256 bytes,
			this will waste one byte. */
313 314
			extra_size += 2;
		}
315
		data_size += len;
316
	}
317

318 319 320 321
#ifdef UNIV_DEBUG
	{
		ulint	size;
		ulint	extra;
322

323
		size = rec_get_converted_size_comp(index,
324
						   REC_STATUS_ORDINARY,
325
						   entry, n_fields, &extra);
326

327 328
		ut_ad(data_size + extra_size + REC_N_NEW_EXTRA_BYTES == size);
		ut_ad(extra_size + REC_N_NEW_EXTRA_BYTES == extra);
329
	}
330
#endif /* UNIV_DEBUG */
331

332 333 334 335
	/* Add to the total size of the record in row_merge_block_t
	the encoded length of extra_size and the extra bytes (extra_size).
	See row_merge_buf_write() for the variable-length encoding
	of extra_size. */
336
	data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
337

338 339 340 341
	/* Reserve one byte for the end marker of row_merge_block_t. */
	if (buf->total_size + data_size >= sizeof(row_merge_block_t) - 1) {
		return(FALSE);
	}
342

343 344
	buf->total_size += data_size;
	buf->n_tuples++;
345

346
	field = entry;
347

348
	/* Copy the data fields. */
349 350

	do {
351
		if (!dfield_is_null(field)) {
352 353 354
			field->data = mem_heap_dup(buf->heap,
						   field->data, field->len);
		}
355 356 357

		field++;
	} while (--n_fields);
358

359
	return(TRUE);
360 361 362
}

/*****************************************************************
363
Compare two tuples. */
364
static
365
int
366 367 368 369 370 371 372 373
row_merge_tuple_cmp(
/*================*/
					/* out: 1, 0, -1 if a is greater,
					equal, less, respectively, than b */
	ulint			n_field,/* in: number of fields */
	ulint*			n_dup,	/* in/out: number of duplicates */
	const dfield_t*		a,	/* in: first tuple to be compared */
	const dfield_t*		b)	/* in: second tuple to be compared */
374
{
375
	int	cmp;
376

377 378 379
	do {
		cmp = cmp_dfield_dfield(a++, b++);
	} while (!cmp && --n_field);
380

381 382
	if (!cmp) {
		(*n_dup)++;
383
	}
384

385
	return(cmp);
386 387
}

388 389
/**************************************************************************
Merge sort the tuple buffer in main memory. */
390
static
391 392 393 394 395 396 397 398 399 400 401
void
row_merge_tuple_sort(
/*=================*/
	ulint			n_field,/* in: number of fields */
	ulint*			n_dup,	/* in/out: number of duplicates */
	const dfield_t**	tuples,	/* in/out: tuples */
	const dfield_t**	aux,	/* in/out: work area */
	ulint			low,	/* in: lower bound of the
					sorting area, inclusive */
	ulint			high)	/* in: upper bound of the
					sorting area, exclusive */
402
{
403 404 405
#define row_merge_tuple_sort_ctx(a,b,c,d) \
	row_merge_tuple_sort(n_field, n_dup, a, b, c, d)
#define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, n_dup, a, b)
406

407 408
	UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
			      tuples, aux, low, high, row_merge_tuple_cmp_ctx);
409 410
}

411 412
/**********************************************************
Sort a buffer. */
413
static
414 415
ulint
row_merge_buf_sort(
416
/*===============*/
417 418 419
					/* out: number of duplicates
					encountered */
	row_merge_buf_t*	buf)	/* in/out: sort buffer */
420
{
421
	ulint	n_dup	= 0;
422

423
	row_merge_tuple_sort(dict_index_get_n_unique(buf->index), &n_dup,
424
			     buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
425

426
	return(n_dup);
427 428
}

429 430
/**********************************************************
Write a buffer to a block. */
431
static
432 433 434 435
void
row_merge_buf_write(
/*================*/
	const row_merge_buf_t*	buf,	/* in: sorted buffer */
436 437 438
#ifdef UNIV_DEBUG
	const merge_file_t*	of,	/* in: output file */
#endif /* UNIV_DEBUG */
439
	row_merge_block_t*	block)	/* out: buffer for writing to file */
440 441 442
#ifndef UNIV_DEBUG
# define row_merge_buf_write(buf, of, block) row_merge_buf_write(buf, block)
#endif /* !UNIV_DEBUG */
443
{
444 445 446 447
	dict_index_t*	index	= buf->index;
	ulint		n_fields= dict_index_get_n_fields(index);
	byte*		b	= &(*block)[0];

448
	ulint		i;
449 450 451 452

	for (i = 0; i < buf->n_tuples; i++) {
		ulint		size;
		ulint		extra_size;
453
		const dfield_t*	entry		= buf->tuples[i];
454 455 456

		size = rec_get_converted_size_comp(buf->index,
						   REC_STATUS_ORDINARY,
457
						   entry, n_fields,
458 459 460 461 462 463 464 465 466 467
						   &extra_size);
		ut_ad(size > extra_size);
		ut_ad(extra_size >= REC_N_NEW_EXTRA_BYTES);
		extra_size -= REC_N_NEW_EXTRA_BYTES;
		size -= REC_N_NEW_EXTRA_BYTES;

		/* Encode extra_size + 1 */
		if (extra_size + 1 < 0x80) {
			*b++ = extra_size + 1;
		} else {
468
			ut_ad((extra_size + 1) < 0x8000);
469 470
			*b++ = 0x80 | ((extra_size + 1) >> 8);
			*b++ = (byte) (extra_size + 1);
471 472
		}

473
		ut_ad(b + size < block[1]);
474

475 476
		rec_convert_dtuple_to_rec_comp(b + extra_size, 0, index,
					       REC_STATUS_ORDINARY,
477
					       entry, n_fields);
478

479
		b += size;
480 481 482 483 484 485 486 487 488

#ifdef UNIV_DEBUG
		if (row_merge_print_write) {
			fprintf(stderr, "row_merge_buf_write %p,%d,%lu %lu",
				(void*) b, of->fd, (ulong) of->offset,
				(ulong) i);
			row_merge_tuple_print(stderr, entry, n_fields);
		}
#endif /* UNIV_DEBUG */
489 490
	}

491 492
	/* Write an "end-of-chunk" marker. */
	ut_a(b < block[1]);
493
	ut_a(b == block[0] + buf->total_size);
494 495 496 497
	*b++ = 0;
#ifdef UNIV_DEBUG_VALGRIND
	/* The rest of the block is uninitialized.  Initialize it
	to avoid bogus warnings. */
498
	memset(b, 0xff, block[1] - b);
499
#endif /* UNIV_DEBUG_VALGRIND */
500 501
#ifdef UNIV_DEBUG
	if (row_merge_print_write) {
502 503
		fprintf(stderr, "row_merge_buf_write %p,%d,%lu EOF\n",
			(void*) b, of->fd, (ulong) of->offset);
504 505
	}
#endif /* UNIV_DEBUG */
506 507
}

508 509
/**********************************************************
Create a memory heap and allocate space for row_merge_rec_offsets(). */
510
static
511 512 513 514 515 516 517
mem_heap_t*
row_merge_heap_create(
/*==================*/
					/* out: memory heap */
	dict_index_t*	index,		/* in: record descriptor */
	ulint**		offsets1,	/* out: offsets */
	ulint**		offsets2)	/* out: offsets */
518
{
519
	ulint		i	= 1 + REC_OFFS_HEADER_SIZE
520 521
		+ dict_index_get_n_fields(index);
	mem_heap_t*	heap	= mem_heap_create(2 * i * sizeof *offsets1);
522

523 524
	*offsets1 = mem_heap_alloc(heap, i * sizeof *offsets1);
	*offsets2 = mem_heap_alloc(heap, i * sizeof *offsets2);
525

526 527
	(*offsets1)[0] = (*offsets2)[0] = i;
	(*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
528

529
	return(heap);
530 531
}

532 533 534
/**************************************************************************
Search an index object by name and column names.  If several indexes match,
return the index with the max id. */
535
static
536 537 538 539 540 541 542
dict_index_t*
row_merge_dict_table_get_index(
/*===========================*/
						/* out: matching index,
						NULL if not found */
	dict_table_t*		table,		/* in: table */
	const merge_index_def_t*index_def)	/* in: index definition */
543
{
544 545 546
	ulint		i;
	dict_index_t*	index;
	const char**	column_names;
547

548
	column_names = mem_alloc(index_def->n_fields * sizeof *column_names);
549

550 551
	for (i = 0; i < index_def->n_fields; ++i) {
		column_names[i] = index_def->fields[i].field_name;
552 553
	}

554 555
	index = dict_table_get_index_by_max_id(
		table, index_def->name, column_names, index_def->n_fields);
556

557
	mem_free(column_names);
558

559
	return(index);
560 561
}

562 563 564 565 566 567 568 569 570 571 572 573 574
/************************************************************************
Read a merge block from the file system. */
static
ibool
row_merge_read(
/*===========*/
					/* out: TRUE if request was
					successful, FALSE if fail */
	int			fd,	/* in: file descriptor */
	ulint			offset,	/* in: offset where to read */
	row_merge_block_t*	buf)	/* out: data */
{
	ib_uint64_t	ofs = ((ib_uint64_t) offset) * sizeof *buf;
575 576 577 578 579 580 581 582 583 584 585
	ibool		success;

	success = os_file_read_no_error_handling(OS_FILE_FROM_FD(fd), buf,
						 (ulint) (ofs & 0xFFFFFFFF),
						 (ulint) (ofs >> 32),
						 sizeof *buf);
	if (UNIV_UNLIKELY(!success)) {
		ut_print_timestamp(stderr);
		fprintf(stderr,
			"  InnoDB: failed to read merge block at %llu\n", ofs);
	}
586

587
	return(UNIV_LIKELY(success));
588
}
589

590 591 592 593 594 595 596 597 598 599 600 601 602 603
/************************************************************************
Read a merge block from the file system. */
static
ibool
row_merge_write(
/*============*/
				/* out: TRUE if request was
				successful, FALSE if fail */
	int		fd,	/* in: file descriptor */
	ulint		offset,	/* in: offset where to write */
	const void*	buf)	/* in: data */
{
	ib_uint64_t	ofs = ((ib_uint64_t) offset)
		* sizeof(row_merge_block_t);
604

605 606 607 608 609
	return(UNIV_LIKELY(os_file_write("(merge)", OS_FILE_FROM_FD(fd), buf,
					 (ulint) (ofs & 0xFFFFFFFF),
					 (ulint) (ofs >> 32),
					 sizeof(row_merge_block_t))));
}
610

611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
/************************************************************************
Read a merge record. */
static
const byte*
row_merge_read_rec(
/*===============*/
					/* out: pointer to next record,
					or NULL on I/O error
					or end of list */
	row_merge_block_t*	block,	/* in/out: file buffer */
	mrec_buf_t*		buf,	/* in/out: secondary buffer */
	const byte*		b,	/* in: pointer to record */
	dict_index_t*		index,	/* in: index of the record */
	int			fd,	/* in: file descriptor */
	ulint*			foffs,	/* in/out: file offset */
	const mrec_t**		mrec,	/* out: pointer to merge record,
					or NULL on end of list
					(non-NULL on I/O error) */
	ulint*			offsets)/* out: offsets of mrec */
{
	ulint	extra_size;
	ulint	data_size;
	ulint	avail_size;

	ut_ad(block);
	ut_ad(buf);
	ut_ad(b >= block[0]);
	ut_ad(b < block[1]);
	ut_ad(index);
	ut_ad(foffs);
	ut_ad(mrec);
	ut_ad(offsets);
643

644
	ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
645
	      + dict_index_get_n_fields(index));
646

647
	extra_size = *b++;
648

649 650 651
	if (UNIV_UNLIKELY(!extra_size)) {
		/* End of list */
		*mrec = NULL;
652 653
#ifdef UNIV_DEBUG
		if (row_merge_print_read) {
654 655 656
			fprintf(stderr, "row_merge_read %p,%p,%d,%lu EOF\n",
				(const void*) b, (const void*) block,
				fd, (ulong) *foffs);
657 658
		}
#endif /* UNIV_DEBUG */
659 660
		return(NULL);
	}
661

662 663
	if (extra_size >= 0x80) {
		/* Read another byte of extra_size. */
664

665 666 667 668 669 670 671
		if (UNIV_UNLIKELY(b >= block[1])) {
			if (!row_merge_read(fd, ++(*foffs), block)) {
err_exit:
				/* Signal I/O error. */
				*mrec = b;
				return(NULL);
			}
672

673 674 675
			/* Wrap around to the beginning of the buffer. */
			b = block[0];
		}
676

677 678 679
		extra_size = (extra_size & 0x7f) << 8;
		extra_size |= *b++;
	}
680

681
	/* Normalize extra_size.  Above, value 0 signals "end of list". */
682
	extra_size--;
683

684
	/* Read the extra bytes. */
685

686 687 688 689
	if (UNIV_UNLIKELY(b + extra_size >= block[1])) {
		/* The record spans two blocks.  Copy the entire record
		to the auxiliary buffer and handle this as a special
		case. */
690

691
		avail_size = block[1] - b;
692

693
		memcpy(*buf, b, avail_size);
694

695
		if (!row_merge_read(fd, ++(*foffs), block)) {
696

697 698
			goto err_exit;
		}
699

700 701
		/* Wrap around to the beginning of the buffer. */
		b = block[0];
702

703 704 705
		/* Copy the record. */
		memcpy(*buf + avail_size, b, extra_size - avail_size);
		b += extra_size - avail_size;
706

707
		*mrec = *buf + extra_size;
708

709
		rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
710

711
		data_size = rec_offs_data_size(offsets);
712

713 714 715 716 717
		/* These overflows should be impossible given that
		records are much smaller than either buffer, and
		the record starts near the beginning of each buffer. */
		ut_a(extra_size + data_size < sizeof *buf);
		ut_a(b + data_size < block[1]);
718

719 720 721
		/* Copy the data bytes. */
		memcpy(*buf + extra_size, b, data_size);
		b += data_size;
722

723
		goto func_exit;
724
	}
725

726
	*mrec = b + extra_size;
727

728
	rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
729

730 731
	data_size = rec_offs_data_size(offsets);
	ut_ad(extra_size + data_size < sizeof *buf);
732

733
	b += extra_size + data_size;
734

735 736 737
	if (UNIV_LIKELY(b < block[1])) {
		/* The record fits entirely in the block.
		This is the normal case. */
738
		goto func_exit;
739
	}
740

741
	/* The record spans two blocks.  Copy it to buf. */
742

743
	b -= extra_size + data_size;
744 745 746 747
	avail_size = block[1] - b;
	memcpy(*buf, b, avail_size);
	*mrec = *buf + extra_size;
	rec_offs_make_valid(*mrec, index, offsets);
748

749
	if (!row_merge_read(fd, ++(*foffs), block)) {
750

751 752
		goto err_exit;
	}
753

754 755
	/* Wrap around to the beginning of the buffer. */
	b = block[0];
756

757 758 759
	/* Copy the rest of the record. */
	memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
	b += extra_size + data_size - avail_size;
760

761 762 763
func_exit:
#ifdef UNIV_DEBUG
	if (row_merge_print_read) {
764 765 766
		fprintf(stderr, "row_merge_read %p,%p,%d,%lu ",
			(const void*) b, (const void*) block,
			fd, (ulong) *foffs);
767 768 769 770 771
		rec_print_comp(stderr, *mrec, offsets);
		putc('\n', stderr);
	}
#endif /* UNIV_DEBUG */

772 773
	return(b);
}
774

775 776 777 778 779 780 781 782
/************************************************************************
Write a merge record. */
static
void
row_merge_write_rec_low(
/*====================*/
	byte*		b,	/* out: buffer */
	ulint		e,	/* in: encoded extra_size */
783 784
#ifdef UNIV_DEBUG
	ulint		size,	/* in: total size to write */
785 786
	int		fd,	/* in: file descriptor */
	ulint		foffs,	/* in: file offset */
787
#endif /* UNIV_DEBUG */
788 789
	const mrec_t*	mrec,	/* in: record to write */
	const ulint*	offsets)/* in: offsets of mrec */
790 791 792 793
#ifndef UNIV_DEBUG
# define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets)	\
	row_merge_write_rec_low(b, e, mrec, offsets)
#endif /* !UNIV_DEBUG */
794
{
795 796 797 798
#ifdef UNIV_DEBUG
	const byte* const end = b + size;
	ut_ad(e == rec_offs_extra_size(offsets) + 1);

799
	if (row_merge_print_write) {
800 801
		fprintf(stderr, "row_merge_write %p,%d,%lu ",
			(void*) b, fd, (ulong) foffs);
802 803 804 805 806
		rec_print_comp(stderr, mrec, offsets);
		putc('\n', stderr);
	}
#endif /* UNIV_DEBUG */

807 808 809 810 811
	if (e < 0x80) {
		*b++ = e;
	} else {
		*b++ = 0x80 | (e >> 8);
		*b++ = (byte) e;
812 813
	}

814
	memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
815
	ut_ad(b + rec_offs_size(offsets) == end);
816 817 818
}

/************************************************************************
819
Write a merge record. */
820
static
821 822 823 824 825 826 827 828 829 830 831 832
byte*
row_merge_write_rec(
/*================*/
					/* out: pointer to end of block,
					or NULL on error */
	row_merge_block_t*	block,	/* in/out: file buffer */
	mrec_buf_t*		buf,	/* in/out: secondary buffer */
	byte*			b,	/* in: pointer to end of block */
	int			fd,	/* in: file descriptor */
	ulint*			foffs,	/* in/out: file offset */
	const mrec_t*		mrec,	/* in: record to write */
	const ulint*		offsets)/* in: offsets of mrec */
833
{
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857
	ulint	extra_size;
	ulint	size;
	ulint	avail_size;

	ut_ad(block);
	ut_ad(buf);
	ut_ad(b >= block[0]);
	ut_ad(b < block[1]);
	ut_ad(mrec);
	ut_ad(foffs);
	ut_ad(mrec < block[0] || mrec > block[1]);
	ut_ad(mrec < buf[0] || mrec > buf[1]);

	/* Normalize extra_size.  Value 0 signals "end of list". */
	extra_size = rec_offs_extra_size(offsets) + 1;

	size = extra_size + (extra_size >= 0x80)
		+ rec_offs_data_size(offsets);

	if (UNIV_UNLIKELY(b + size >= block[1])) {
		/* The record spans two blocks.
		Copy it to the temporary buffer first. */
		avail_size = block[1] - b;

858
		row_merge_write_rec_low(buf[0],
859 860
					extra_size, size, fd, *foffs,
					mrec, offsets);
861 862 863 864 865 866 867 868 869

		/* Copy the head of the temporary buffer, write
		the completed block, and copy the tail of the
		record to the head of the new block. */
		memcpy(b, buf[0], avail_size);

		if (!row_merge_write(fd, (*foffs)++, block)) {
			return(NULL);
		}
870

871
		UNIV_MEM_INVALID(block[0], sizeof block[0]);
872

873 874 875 876 877
		/* Copy the rest. */
		b = block[0];
		memcpy(b, buf[0] + avail_size, size - avail_size);
		b += size - avail_size;
	} else {
878 879
		row_merge_write_rec_low(b, extra_size, size, fd, *foffs,
					mrec, offsets);
880
		b += size;
881 882
	}

883
	return(b);
884 885 886
}

/************************************************************************
887
Write an end-of-list marker. */
888
static
889 890 891 892 893 894 895 896 897
byte*
row_merge_write_eof(
/*================*/
					/* out: pointer to end of block,
					or NULL on error */
	row_merge_block_t*	block,	/* in/out: file buffer */
	byte*			b,	/* in: pointer to end of block */
	int			fd,	/* in: file descriptor */
	ulint*			foffs)	/* in/out: file offset */
898
{
899 900 901 902
	ut_ad(block);
	ut_ad(b >= block[0]);
	ut_ad(b < block[1]);
	ut_ad(foffs);
903 904
#ifdef UNIV_DEBUG
	if (row_merge_print_write) {
905 906
		fprintf(stderr, "row_merge_write %p,%p,%d,%lu EOF\n",
			(void*) b, (void*) block, fd, (ulong) *foffs);
907 908
	}
#endif /* UNIV_DEBUG */
909 910

	*b++ = 0;
911 912
	UNIV_MEM_ASSERT_RW(block[0], b - block[0]);
	UNIV_MEM_ASSERT_W(block[0], sizeof block[0]);
913 914 915 916 917
#ifdef UNIV_DEBUG_VALGRIND
	/* The rest of the block is uninitialized.  Initialize it
	to avoid bogus warnings. */
	memset(b, 0xff, block[1] - b);
#endif /* UNIV_DEBUG_VALGRIND */
918 919 920

	if (!row_merge_write(fd, (*foffs)++, block)) {
		return(NULL);
921 922
	}

923
	UNIV_MEM_INVALID(block[0], sizeof block[0]);
924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943
	return(block[0]);
}

/*****************************************************************
Compare two merge records. */
static
int
row_merge_cmp(
/*==========*/
					/* out: 1, 0, -1 if mrec1 is
					greater, equal, less,
					respectively, than mrec2 */
	const mrec_t*	mrec1,		/* in: first merge record to be
					compared */
	const mrec_t*	mrec2,		/* in: second merge record to be
					compared */
	const ulint*	offsets1,	/* in: first record offsets */
	const ulint*	offsets2,	/* in: second record offsets */
	dict_index_t*	index)		/* in: index */
{
944 945 946 947 948 949 950 951 952 953 954 955 956 957 958
	int	cmp;

	cmp = cmp_rec_rec_simple(mrec1, mrec2, offsets1, offsets2, index);

#ifdef UNIV_DEBUG
	if (row_merge_print_cmp) {
		fputs("row_merge_cmp1 ", stderr);
		rec_print_comp(stderr, mrec1, offsets1);
		fputs("\nrow_merge_cmp2 ", stderr);
		rec_print_comp(stderr, mrec2, offsets2);
		fprintf(stderr, "\nrow_merge_cmp=%d\n", cmp);
	}
#endif /* UNIV_DEBUG */

	return(cmp);
959 960 961 962
}

/************************************************************************
Reads clustered index of the table and create temporary files
963
containing the index entries for the indexes to be built. */
964
static
965 966 967
ulint
row_merge_read_clustered_index(
/*===========================*/
968 969
					/* out: DB_SUCCESS or error */
	trx_t*			trx,	/* in: transaction */
970 971 972 973 974
	dict_table_t*		old_table,/* in: table where rows are
					read from */
	dict_table_t*		new_table,/* in: table where indexes are
					created; identical to old_table
					unless creating a PRIMARY KEY */
975 976 977 978
	dict_index_t**		index,	/* in: indexes to be created */
	merge_file_t*		files,	/* in: temporary files */
	ulint			n_index,/* in: number of indexes to create */
	row_merge_block_t*	block)	/* in/out: file buffer */
979
{
980 981
	dict_index_t*		clust_index;	/* Clustered index */
	mem_heap_t*		row_heap;	/* Heap memory to create
982
						clustered index records */
983 984
	row_merge_buf_t**	merge_buf;	/* Temporary list for records*/
	btr_pcur_t		pcur;		/* Persistent cursor on the
985
						clustered index */
986 987 988
	mtr_t			mtr;		/* Mini transaction */
	ulint			err = DB_SUCCESS;/* Return code */
	ulint			i;
989 990 991
	ulint			n_nonnull = 0;	/* number of columns
						changed to NOT NULL */
	ulint*			nonnull = NULL;	/* NOT NULL columns */
992

993
	trx->op_info = "reading clustered index";
994

995
	ut_ad(trx);
996 997
	ut_ad(old_table);
	ut_ad(new_table);
998 999
	ut_ad(index);
	ut_ad(files);
1000

1001
	/* Create and initialize memory for record buffers */
1002

1003
	merge_buf = mem_alloc(n_index * sizeof *merge_buf);
1004

1005 1006
	for (i = 0; i < n_index; i++) {
		merge_buf[i] = row_merge_buf_create(index[i]);
1007 1008 1009 1010 1011 1012 1013
	}

	mtr_start(&mtr);

	/* Find the clustered index and create a persistent cursor
	based on that. */

1014
	clust_index = dict_table_get_first_index(old_table);
1015 1016 1017 1018

	btr_pcur_open_at_index_side(
		TRUE, clust_index, BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);

1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
	if (UNIV_UNLIKELY(old_table != new_table)) {
		ulint	n_cols = dict_table_get_n_cols(old_table);

		/* A primary key will be created.  Identify the
		columns that were flagged NOT NULL in the new table,
		so that we can quickly check that the records in the
		(old) clustered index do not violate the added NOT
		NULL constraints. */

		ut_a(n_cols == dict_table_get_n_cols(new_table));

		nonnull = mem_alloc(n_cols * sizeof *nonnull);

		for (i = 0; i < n_cols; i++) {
			if (dict_table_get_nth_col(old_table, i)->prtype
			    & DATA_NOT_NULL) {

				continue;
			}

			if (dict_table_get_nth_col(new_table, i)->prtype
			    & DATA_NOT_NULL) {

				nonnull[n_nonnull++] = i;
			}
		}

		if (!n_nonnull) {
			mem_free(nonnull);
			nonnull = NULL;
		}
	}

1052
	row_heap = mem_heap_create(UNIV_PAGE_SIZE);
1053

1054
	/* Scan the clustered index. */
1055 1056
	for (;;) {
		const rec_t*	rec;
1057
		dtuple_t*	row		= NULL;
1058
		row_ext_t*	ext;
1059
		ibool		has_next	= TRUE;
1060

1061 1062
		btr_pcur_move_to_next_on_page(&pcur, &mtr);

1063 1064 1065 1066 1067 1068 1069 1070 1071
		/* When switching pages, commit the mini-transaction
		in order to release the latch on the old page. */

		if (btr_pcur_is_after_last_on_page(&pcur, &mtr)) {
			btr_pcur_store_position(&pcur, &mtr);
			mtr_commit(&mtr);
			mtr_start(&mtr);
			btr_pcur_restore_position(BTR_SEARCH_LEAF,
						  &pcur, &mtr);
1072
			has_next = btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1073
		}
1074

1075 1076
		if (UNIV_LIKELY(has_next)) {
			rec = btr_pcur_get_rec(&pcur);
1077

1078
			/* Skip delete marked records. */
1079 1080
			if (rec_get_deleted_flag(
				    rec, dict_table_is_comp(old_table))) {
1081 1082
				continue;
			}
1083 1084

			srv_n_rows_inserted++;
1085

1086
			/* Build a row based on the clustered index. */
1087 1088 1089 1090

			row = row_build(ROW_COPY_POINTERS, clust_index,
					rec, NULL, &ext, row_heap);

1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107
			if (UNIV_LIKELY_NULL(nonnull)) {
				for (i = 0; i < n_nonnull; i++) {
					dfield_t*	field
						= &row->fields[nonnull[i]];

					ut_a(!(field->type.prtype
					       & DATA_NOT_NULL));

					if (dfield_is_null(field)) {
						trx->error_key_num = 0;
						err = DB_PRIMARY_KEY_IS_NULL;
						goto func_exit;
					}

					field->type.prtype |= DATA_NOT_NULL;
				}
			}
1108 1109
		}

1110 1111 1112
		/* Build all entries for all the indexes to be created
		in a single scan of the clustered index. */

1113 1114 1115
		for (i = 0; i < n_index; i++) {
			row_merge_buf_t*	buf	= merge_buf[i];
			merge_file_t*		file	= &files[i];
1116

1117
			if (UNIV_LIKELY
1118
			    (row && row_merge_buf_add(buf, row, ext))) {
1119 1120 1121
				continue;
			}

1122 1123
			/* The buffer must be sufficiently large
			to hold at least one record. */
1124
			ut_ad(buf->n_tuples || !has_next);
1125

1126 1127
			/* We have enough data tuples to form a block.
			Sort them and write to disk. */
1128

1129 1130 1131
			if (buf->n_tuples
			    && row_merge_buf_sort(buf)
			    && dict_index_is_unique(buf->index)) {
1132
				trx->error_key_num = i;
1133 1134 1135
				err = DB_DUPLICATE_KEY;
				goto func_exit;
			}
1136

1137
			row_merge_buf_write(buf, file, block);
1138

1139 1140 1141 1142 1143 1144
			if (!row_merge_write(file->fd, file->offset++,
					     block)) {
				trx->error_key_num = i;
				err = DB_OUT_OF_FILE_SPACE;
				goto func_exit;
			}
1145

1146
			UNIV_MEM_INVALID(block[0], sizeof block[0]);
1147
			merge_buf[i] = row_merge_buf_empty(buf);
1148 1149 1150 1151 1152 1153 1154 1155 1156 1157

			/* Try writing the record again, now that
			the buffer has been written out and emptied. */

			if (UNIV_UNLIKELY
			    (row && !row_merge_buf_add(buf, row, ext))) {
				/* An empty buffer should have enough
				room for at least one record. */
				ut_error;
			}
1158
		}
1159

1160
		mem_heap_empty(row_heap);
1161

1162 1163 1164 1165
		if (UNIV_UNLIKELY(!has_next)) {
			goto func_exit;
		}
	}
1166

1167 1168 1169 1170
func_exit:
	btr_pcur_close(&pcur);
	mtr_commit(&mtr);
	mem_heap_free(row_heap);
1171

1172 1173 1174 1175
	if (UNIV_LIKELY_NULL(nonnull)) {
		mem_free(nonnull);
	}

1176 1177 1178
	for (i = 0; i < n_index; i++) {
		row_merge_buf_free(merge_buf[i]);
	}
1179

1180
	mem_free(merge_buf);
1181

1182
	trx->op_info = "";
1183

1184 1185
	return(err);
}
1186

1187 1188 1189 1190 1191 1192 1193 1194 1195 1196
/*****************************************************************
Merge two blocks of linked lists on disk and write a bigger block. */
static
ulint
row_merge_blocks(
/*=============*/
					/* out: DB_SUCCESS or error code */
	dict_index_t*		index,	/* in: index being created */
	merge_file_t*		file,	/* in/out: file containing
					index entries */
1197 1198
	row_merge_block_t*	block,	/* in/out: 3 buffers */
	ulint*			foffs0,	/* in/out: offset of first
1199
					source list in the file */
1200
	ulint*			foffs1,	/* in/out: offset of second
1201 1202 1203
					source list in the file */
	merge_file_t*		of)	/* in/out: output file */
{
1204 1205 1206 1207 1208 1209 1210 1211 1212
	mem_heap_t*	heap;	/* memory heap for offsets0, offsets1 */

	mrec_buf_t	buf[3];	/* buffer for handling split mrec in block[] */
	const byte*	b0;	/* pointer to block[0] */
	const byte*	b1;	/* pointer to block[1] */
	byte*		b2;	/* pointer to block[2] */
	const mrec_t*	mrec0;	/* merge rec, points to block[0] or buf[0] */
	const mrec_t*	mrec1;	/* merge rec, points to block[1] or buf[1] */
	ulint*		offsets0;/* offsets of mrec0 */
1213 1214
	ulint*		offsets1;/* offsets of mrec1 */

1215
	heap = row_merge_heap_create(index, &offsets0, &offsets1);
1216 1217 1218 1219 1220

	/* Write a record and read the next record.  Split the output
	file in two halves, which can be merged on the following pass. */
#define ROW_MERGE_WRITE_GET_NEXT(N, AT_END)				\
	do {								\
1221
		b2 = row_merge_write_rec(&block[2], &buf[2], b2,	\
1222 1223
					 of->fd, &of->offset,		\
					 mrec##N, offsets##N);		\
1224
		if (UNIV_UNLIKELY(!b2)) {				\
1225 1226
			goto corrupt;					\
		}							\
1227
		b##N = row_merge_read_rec(&block[N], &buf[N],		\
1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238
					  b##N, index,			\
					  file->fd, foffs##N,		\
					  &mrec##N, offsets##N);	\
		if (UNIV_UNLIKELY(!b##N)) {				\
			if (mrec##N) {					\
				goto corrupt;				\
			}						\
			AT_END;						\
		}							\
	} while (0)

1239 1240
	if (!row_merge_read(file->fd, *foffs0, &block[0])
	    || !row_merge_read(file->fd, *foffs1, &block[1])) {
1241 1242 1243 1244 1245
corrupt:
		mem_heap_free(heap);
		return(DB_CORRUPTION);
	}

1246 1247 1248
	b0 = block[0];
	b1 = block[1];
	b2 = block[2];
1249

1250 1251 1252
	b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
				foffs0, &mrec0, offsets0);
	b1 = row_merge_read_rec(&block[1], &buf[1], b1, index, file->fd,
1253
				foffs1, &mrec1, offsets1);
1254 1255
	if (UNIV_UNLIKELY(!b0 && mrec0)
	    || UNIV_UNLIKELY(!b1 && mrec1)) {
1256

1257 1258 1259
		goto corrupt;
	}

1260 1261 1262
	while (mrec0 && mrec1) {
		switch (row_merge_cmp(mrec0, mrec1,
				      offsets0, offsets1, index)) {
1263 1264 1265 1266 1267
		case 0:
			if (UNIV_UNLIKELY
			    (dict_index_is_unique(index))) {
				mem_heap_free(heap);
				return(DB_DUPLICATE_KEY);
1268
			}
1269 1270
			/* fall through */
		case -1:
1271
			ROW_MERGE_WRITE_GET_NEXT(0, goto merged);
1272 1273
			break;
		case 1:
1274
			ROW_MERGE_WRITE_GET_NEXT(1, goto merged);
1275 1276 1277
			break;
		default:
			ut_error;
1278
		}
1279

1280 1281
	}

1282
merged:
1283 1284
	if (mrec0) {
		/* append all mrec0 to output */
1285
		for (;;) {
1286
			ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
1287 1288
		}
	}
1289
done0:
1290 1291
	if (mrec1) {
		/* append all mrec1 to output */
1292
		for (;;) {
1293
			ROW_MERGE_WRITE_GET_NEXT(1, goto done1);
1294 1295
		}
	}
1296
done1:
1297

1298
	mem_heap_free(heap);
1299 1300
	b2 = row_merge_write_eof(&block[2], b2, of->fd, &of->offset);
	return(b2 ? DB_SUCCESS : DB_CORRUPTION);
1301
}
1302

1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313
/*****************************************************************
Merge disk files. */
static
ulint
row_merge(
/*======*/
						/* out: DB_SUCCESS
						or error code */
	dict_index_t*		index,		/* in: index being created */
	merge_file_t*		file,		/* in/out: file containing
						index entries */
1314
	ulint			half,		/* in: half the file */
1315
	row_merge_block_t*	block,		/* in/out: 3 buffers */
1316 1317 1318
	int*			tmpfd)		/* in/out: temporary file
						handle */
{
1319 1320
	ulint		foffs0;	/* first input offset */
	ulint		foffs1;	/* second input offset */
1321 1322
	ulint		error;	/* error code */
	merge_file_t	of;	/* output file */
1323

1324
	UNIV_MEM_ASSERT_W(block[0], 3 * sizeof block[0]);
1325
	ut_ad(half > 0);
1326

1327 1328
	of.fd = *tmpfd;
	of.offset = 0;
1329

1330
	/* Merge blocks to the output file. */
1331 1332
	foffs0 = 0;
	foffs1 = half;
1333

1334
	for (; foffs0 < half && foffs1 < file->offset; foffs0++, foffs1++) {
1335 1336
		error = row_merge_blocks(index, file, block,
					 &foffs0, &foffs1, &of);
1337

1338 1339
		if (error != DB_SUCCESS) {
			return(error);
1340
		}
1341
	}
1342

1343
	/* Copy the last block, if there is one. */
1344 1345 1346 1347 1348 1349
	while (foffs0 < half) {
		if (!row_merge_read(file->fd, foffs0++, block)
		    || !row_merge_write(of.fd, of.offset++, block)) {
			return(DB_CORRUPTION);
		}
	}
1350 1351 1352
	while (foffs1 < file->offset) {
		if (!row_merge_read(file->fd, foffs1++, block)
		    || !row_merge_write(of.fd, of.offset++, block)) {
1353
			return(DB_CORRUPTION);
1354
		}
1355 1356
	}

1357 1358 1359
	/* Swap file descriptors for the next pass. */
	*tmpfd = file->fd;
	*file = of;
1360

1361 1362
	UNIV_MEM_INVALID(block[0], 3 * sizeof block[0]);

1363 1364
	return(DB_SUCCESS);
}
1365

1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376
/*****************************************************************
Merge disk files. */
static
ulint
row_merge_sort(
/*===========*/
						/* out: DB_SUCCESS
						or error code */
	dict_index_t*		index,		/* in: index being created */
	merge_file_t*		file,		/* in/out: file containing
						index entries */
1377
	row_merge_block_t*	block,		/* in/out: 3 buffers */
1378 1379 1380 1381
	int*			tmpfd)		/* in/out: temporary file
						handle */
{
	ulint	blksz;	/* block size */
1382

1383
	for (blksz = 1; blksz < file->offset; blksz *= 2) {
1384 1385 1386 1387 1388
		ulint	half;
		ulint	error;

		half = ut_2pow_round((file->offset + blksz - 1) / 2, blksz);
		error = row_merge(index, file, half, block, tmpfd);
1389

1390 1391 1392 1393
		if (error != DB_SUCCESS) {
			return(error);
		}
	}
1394

1395
	return(DB_SUCCESS);
1396 1397
}

1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430
/*****************************************************************
Copy externally stored columns to the data tuple. */
static
void
row_merge_copy_blobs(
/*=================*/
	const mrec_t*	mrec,	/* in: merge record */
	const ulint*	offsets,/* in: offsets of mrec */
	ulint		zip_size,/* in: compressed page size in bytes, or 0 */
	dtuple_t*	tuple,	/* in/out: data tuple */
	mem_heap_t*	heap)	/* in/out: memory heap */
{
	ulint	i;
	ulint	n_fields = dtuple_get_n_fields(tuple);

	for (i = 0; i < n_fields; i++) {
		ulint		len;
		const void*	data;
		dfield_t*	field = dtuple_get_nth_field(tuple, i);

		if (!dfield_is_ext(field)) {
			continue;
		}

		ut_ad(!dfield_is_null(field));

		data = btr_rec_copy_externally_stored_field(
			mrec, offsets, zip_size, i, &len, heap);

		dfield_set_data(field, data, len);
	}
}

1431 1432 1433
/************************************************************************
Read sorted file containing index data tuples and insert these data
tuples to the index */
1434
static
1435 1436 1437
ulint
row_merge_insert_index_tuples(
/*==========================*/
1438 1439 1440
					/* out: DB_SUCCESS or error number */
	trx_t*			trx,	/* in: transaction */
	dict_index_t*		index,	/* in: index */
1441 1442 1443
	dict_table_t*		table,	/* in: new table */
	ulint			zip_size,/* in: compressed page size of
					 the old table, or 0 if uncompressed */
1444 1445
	int			fd,	/* in: file descriptor */
	row_merge_block_t*	block)	/* in/out: file buffer */
1446
{
1447 1448 1449 1450 1451 1452 1453 1454 1455
	mrec_buf_t		buf;
	const byte*		b;
	que_thr_t*		thr;
	ins_node_t*		node;
	mem_heap_t*		tuple_heap;
	mem_heap_t*		graph_heap;
	ulint			error = DB_SUCCESS;
	ulint			foffs = 0;
	ulint*			offsets;
1456

1457 1458 1459
	ut_ad(trx);
	ut_ad(index);
	ut_ad(table);
1460 1461 1462 1463 1464 1465

	/* We use the insert query graph as the dummy graph
	needed in the row module call */

	trx->op_info = "inserting index entries";

1466
	graph_heap = mem_heap_create(500);
1467 1468 1469 1470 1471 1472
	node = ins_node_create(INS_DIRECT, table, graph_heap);

	thr = pars_complete_graph_for_exec(node, trx, graph_heap);

	que_thr_move_to_run_state_for_mysql(thr, trx);

1473
	tuple_heap = mem_heap_create(1000);
1474

1475
	{
1476
		ulint i	= 1 + REC_OFFS_HEADER_SIZE
1477 1478 1479 1480 1481
			+ dict_index_get_n_fields(index);
		offsets = mem_heap_alloc(graph_heap, i * sizeof *offsets);
		offsets[0] = i;
		offsets[1] = dict_index_get_n_fields(index);
	}
1482

1483
	b = *block;
1484

1485 1486 1487 1488 1489 1490
	if (!row_merge_read(fd, foffs, block)) {
		error = DB_CORRUPTION;
	} else {
		for (;;) {
			const mrec_t*	mrec;
			dtuple_t*	dtuple;
1491
			ulint		n_ext;
1492 1493 1494 1495 1496 1497 1498 1499 1500 1501

			b = row_merge_read_rec(block, &buf, b, index,
					       fd, &foffs, &mrec, offsets);
			if (UNIV_UNLIKELY(!b)) {
				/* End of list, or I/O error */
				if (mrec) {
					error = DB_CORRUPTION;
				}
				break;
			}
1502

1503
			n_ext = 0;
1504
			dtuple = row_rec_to_index_entry_low(
1505 1506 1507 1508 1509 1510
				mrec, index, offsets, &n_ext, tuple_heap);

			if (UNIV_UNLIKELY(n_ext)) {
				row_merge_copy_blobs(mrec, offsets, zip_size,
						     dtuple, tuple_heap);
			}
1511

1512 1513 1514
			node->row = dtuple;
			node->table = table;
			node->trx_id = trx->id;
1515

1516
			ut_ad(dtuple_validate(dtuple));
1517

1518 1519 1520
			do {
				thr->run_node = thr;
				thr->prev_node = thr->common.parent;
1521

1522 1523
				error = row_ins_index_entry(index, dtuple,
							    0, FALSE, thr);
1524

1525
				if (UNIV_LIKELY(error == DB_SUCCESS)) {
1526

1527 1528
					goto next_rec;
				}
1529

1530 1531 1532 1533 1534 1535
				thr->lock_state = QUE_THR_LOCK_ROW;
				trx->error_state = error;
				que_thr_stop_for_mysql(thr);
				thr->lock_state = QUE_THR_LOCK_NOLOCK;
			} while (row_mysql_handle_errors(&error, trx,
							 thr, NULL));
1536

1537
			goto err_exit;
1538
next_rec:
1539
			mem_heap_empty(tuple_heap);
1540
		}
1541
	}
1542 1543

	que_thr_stop_for_mysql_no_error(thr, trx);
1544
err_exit:
1545 1546 1547 1548
	que_graph_free(thr->graph);

	trx->op_info = "";

1549
	mem_heap_free(tuple_heap);
1550 1551 1552 1553

	return(error);
}

1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630
/*************************************************************************
Sets an exclusive lock on a table, for the duration of creating indexes. */

ulint
row_merge_lock_table(
/*=================*/
					/* out: error code or DB_SUCCESS */
	trx_t*		trx,		/* in/out: transaction */
	dict_table_t*	table)		/* in: table to LOCK_X */
{
	mem_heap_t*	heap;
	que_thr_t*	thr;
	ulint		err;
	sel_node_t*	node;

	ut_ad(trx);
	ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());

	heap = mem_heap_create(512);

	trx->op_info = "setting table lock for index merge";

	node = sel_node_create(heap);
	thr = pars_complete_graph_for_exec(node, trx, heap);
	thr->graph->state = QUE_FORK_ACTIVE;

	/* We use the select query graph as the dummy graph needed
	in the lock module call */

	thr = que_fork_get_first_thr(que_node_get_parent(thr));
	que_thr_move_to_run_state_for_mysql(thr, trx);

run_again:
	thr->run_node = thr;
	thr->prev_node = thr->common.parent;

	err = lock_table(0, table, LOCK_X, thr);

	trx->error_state = err;

	if (UNIV_LIKELY(err == DB_SUCCESS)) {
		que_thr_stop_for_mysql_no_error(thr, trx);
	} else {
		que_thr_stop_for_mysql(thr);

		if (err != DB_QUE_THR_SUSPENDED) {
			ibool	was_lock_wait;

			was_lock_wait = row_mysql_handle_errors(
				&err, trx, thr, NULL);

			if (was_lock_wait) {
				goto run_again;
			}
		} else {
			que_thr_t*	run_thr;
			que_node_t*	parent;

			parent = que_node_get_parent(thr);
			run_thr = que_fork_start_command(parent);

			ut_a(run_thr == thr);

			/* There was a lock wait but the thread was not
			in a ready to run or running state. */
			trx->error_state = DB_LOCK_WAIT;

			goto run_again;
		}
	}

	que_graph_free(thr->graph);
	trx->op_info = "";

	return(err);
}

1631
/*************************************************************************
marko's avatar
marko committed
1632
Drop an index from the InnoDB system tables. */
1633

marko's avatar
marko committed
1634 1635 1636
void
row_merge_drop_index(
/*=================*/
1637 1638 1639 1640 1641 1642
	dict_index_t*	index,	/* in: index to be removed */
	dict_table_t*	table,	/* in: table */
	trx_t*		trx)	/* in: transaction handle */
{
	ulint		err;
	ibool		dict_lock = FALSE;
1643
	pars_info_t*	info = pars_info_create();
1644 1645 1646 1647 1648 1649 1650 1651 1652

	/* We use the private SQL parser of Innobase to generate the
	query graphs needed in deleting the dictionary data from system
	tables in Innobase. Deleting a row from SYS_INDEXES table also
	frees the file segments of the B-tree associated with the index. */

	static const char str1[] =
		"PROCEDURE DROP_INDEX_PROC () IS\n"
		"BEGIN\n"
1653 1654 1655
		"DELETE FROM SYS_FIELDS WHERE INDEX_ID = :indexid;\n"
		"DELETE FROM SYS_INDEXES WHERE ID = :indexid\n"
		"		AND TABLE_ID = :tableid;\n"
1656 1657 1658 1659
		"END;\n";

	ut_ad(index && table && trx);

1660 1661 1662
	pars_info_add_dulint_literal(info, "indexid", index->id);
	pars_info_add_dulint_literal(info, "tableid", table->id);

1663 1664 1665 1666 1667 1668 1669 1670
	trx_start_if_not_started(trx);
	trx->op_info = "dropping index";

	if (trx->dict_operation_lock_mode == 0) {
		row_mysql_lock_data_dictionary(trx);
		dict_lock = TRUE;
	}

1671
	err = que_eval_sql(info, str1, FALSE, trx);
1672

1673
	ut_a(err == DB_SUCCESS);
1674

1675 1676
	/* Replace this index with another equivalent index for all
	foreign key constraints on this table where this index is used */
1677

1678 1679
	dict_table_replace_index_in_foreign_list(table, index);
	dict_index_remove_from_cache(table, index);
1680 1681 1682 1683 1684 1685

	if (dict_lock) {
		row_mysql_unlock_data_dictionary(trx);
	}

	trx->op_info = "";
marko's avatar
marko committed
1686
}
1687

marko's avatar
marko committed
1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704
/*************************************************************************
Drop those indexes which were created before an error occurred
when building an index. */

void
row_merge_drop_indexes(
/*===================*/
	trx_t*		trx,		/* in: transaction */
	dict_table_t*	table,		/* in: table containing the indexes */
	dict_index_t**	index,		/* in: indexes to drop */
	ulint		num_created)	/* in: number of elements in index[] */
{
	ulint	key_num;

	for (key_num = 0; key_num < num_created; key_num++) {
		row_merge_drop_index(index[key_num], table, trx);
	}
1705 1706
}

1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753
/*************************************************************************
Drop all partially created indexes during crash recovery. */

void
row_merge_drop_temp_indexes(void)
/*=============================*/
{
	trx_t*		trx;
	ulint		err;

	/* We use the private SQL parser of Innobase to generate the
	query graphs needed in deleting the dictionary data from system
	tables in Innobase. Deleting a row from SYS_INDEXES table also
	frees the file segments of the B-tree associated with the index. */
#if TEMP_INDEX_PREFIX != '\377'
# error "TEMP_INDEX_PREFIX != '\377'"
#endif
	static const char drop_temp_indexes[] =
		"PROCEDURE DROP_TEMP_INDEXES_PROC () IS\n"
		"indexid CHAR;\n"
		"DECLARE CURSOR c IS SELECT ID FROM SYS_INDEXES\n"
		"WHERE SUBSTR(NAME,0,1)='\377' FOR UPDATE;\n"
		"BEGIN\n"
		"\tOPEN c;\n"
		"\tWHILE 1 LOOP\n"
		"\t\tFETCH c INTO indexid;\n"
		"\t\tIF (SQL % NOTFOUND) THEN\n"
		"\t\t\tEXIT;\n"
		"\t\tEND IF;\n"
		"\t\tDELETE FROM SYS_FIELDS WHERE INDEX_ID = indexid;\n"
		"\t\tDELETE FROM SYS_INDEXES WHERE CURRENT OF c;\n"
		"\tEND LOOP;\n"
		"\tCLOSE c;\n"
		"END;\n";

	trx = trx_allocate_for_background();
	trx->op_info = "dropping partially created indexes";
	row_mysql_lock_data_dictionary(trx);

	err = que_eval_sql(NULL, drop_temp_indexes, FALSE, trx);
	ut_a(err == DB_SUCCESS);

	row_mysql_unlock_data_dictionary(trx);
	trx_commit_for_mysql(trx);
	trx_free_for_background(trx);
}

1754
/*************************************************************************
1755 1756
Create a merge file. */
static
1757 1758 1759 1760
void
row_merge_file_create(
/*==================*/
	merge_file_t*	merge_file)	/* out: merge file structure */
1761
{
1762
	merge_file->fd = innobase_mysql_tmpfile();
1763
	merge_file->offset = 0;
1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777
}

/*************************************************************************
Destroy a merge file. */
static
void
row_merge_file_destroy(
/*===================*/
	merge_file_t*	merge_file)	/* out: merge file structure */
{
	if (merge_file->fd != -1) {
		close(merge_file->fd);
		merge_file->fd = -1;
	}
1778 1779 1780
}

/*************************************************************************
1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818
Determine the precise type of a column that is added to a tem
if a column must be constrained NOT NULL. */
UNIV_INLINE
ulint
row_merge_col_prtype(
/*=================*/
						/* out: col->prtype, possibly
						ORed with DATA_NOT_NULL */
	const dict_col_t*	col,		/* in: column */
	const char*		col_name,	/* in: name of the column */
	const merge_index_def_t*index_def)	/* in: the index definition
						of the primary key */
{
	ulint	prtype = col->prtype;
	ulint	i;

	ut_ad(index_def->ind_type & DICT_CLUSTERED);

	if (prtype & DATA_NOT_NULL) {

		return(prtype);
	}

	/* All columns that are included
	in the PRIMARY KEY must be NOT NULL. */

	for (i = 0; i < index_def->n_fields; i++) {
		if (!strcmp(col_name, index_def->fields[i].field_name)) {
			return(prtype | DATA_NOT_NULL);
		}
	}

	return(prtype);
}

/*************************************************************************
Create a temporary table for creating a primary key, using the definition
of an existing table. */
1819 1820 1821 1822

dict_table_t*
row_merge_create_temporary_table(
/*=============================*/
1823 1824 1825 1826 1827 1828 1829 1830
						/* out: table,
						or NULL on error */
	const char*		table_name,	/* in: new table name */
	const merge_index_def_t*index_def,	/* in: the index definition
						of the primary key */
	const dict_table_t*	table,		/* in: old table definition */
	trx_t*			trx)		/* in/out: transaction
						(sets error_state) */
1831 1832 1833 1834
{
	ulint		i;
	dict_table_t*	new_table = NULL;
	ulint		n_cols = dict_table_get_n_user_cols(table);
1835
	ulint		error;
1836
	mem_heap_t*	heap = mem_heap_create(1000);
1837

1838
	ut_ad(table_name);
1839
	ut_ad(index_def);
1840
	ut_ad(table);
1841 1842
	ut_ad(mutex_own(&dict_sys->mutex));

1843
	new_table = dict_mem_table_create(table_name, 0, n_cols, table->flags);
1844

1845 1846 1847
	for (i = 0; i < n_cols; i++) {
		const dict_col_t*	col;
		const char*		col_name;
1848

1849 1850
		col = dict_table_get_nth_col(table, i);
		col_name = dict_table_get_col_name(table, i);
1851

1852 1853 1854 1855
		dict_mem_table_add_col(new_table, heap, col_name, col->mtype,
				       row_merge_col_prtype(col, col_name,
							    index_def),
				       col->len);
1856 1857
	}

1858 1859 1860
	error = row_create_table_for_mysql(new_table, trx);
	mem_heap_free(heap);

1861 1862
	if (error != DB_SUCCESS) {
		trx->error_state = error;
1863
		dict_mem_table_free(new_table);
1864
		new_table = NULL;
1865 1866 1867 1868 1869 1870
	}

	return(new_table);
}

/*************************************************************************
1871
Rename the temporary indexes in the dictionary to permanent ones. */
1872 1873

ulint
1874 1875
row_merge_rename_indexes(
/*=====================*/
1876
					/* out: DB_SUCCESS if all OK */
1877 1878
	trx_t*		trx,		/* in/out: transaction */
	dict_table_t*	table)		/* in/out: table with new indexes */
1879 1880 1881
{
	ibool		dict_lock = FALSE;
	ulint		err = DB_SUCCESS;
1882
	pars_info_t*	info = pars_info_create();
1883 1884

	/* We use the private SQL parser of Innobase to generate the
1885
	query graphs needed in renaming indexes. */
1886

1887 1888 1889 1890 1891 1892
#if TEMP_INDEX_PREFIX != '\377'
# error "TEMP_INDEX_PREFIX != '\377'"
#endif

	static const char rename_indexes[] =
		"PROCEDURE RENAME_INDEXES_PROC () IS\n"
1893
		"BEGIN\n"
1894 1895
		"UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
		"WHERE TABLE_ID = :tableid AND SUBSTR(NAME,0,1)='\377';\n"
1896 1897
		"END;\n";

1898
	ut_ad(table && trx);
1899 1900

	trx_start_if_not_started(trx);
1901
	trx->op_info = "renaming indexes";
1902

1903
	pars_info_add_dulint_literal(info, "tableid", table->id);
1904 1905 1906 1907 1908 1909

	if (trx->dict_operation_lock_mode == 0) {
		row_mysql_lock_data_dictionary(trx);
		dict_lock = TRUE;
	}

1910
	err = que_eval_sql(info, rename_indexes, FALSE, trx);
1911 1912

	if (err == DB_SUCCESS) {
1913 1914 1915 1916 1917 1918 1919
		dict_index_t*	index = dict_table_get_first_index(table);
		do {
			if (*index->name == TEMP_INDEX_PREFIX) {
				index->name++;
			}
			index = dict_table_get_next_index(index);
		} while (index);
1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930
	}

	if (dict_lock) {
		row_mysql_unlock_data_dictionary(trx);
	}

	trx->op_info = "";

	return(err);
}

1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950
/*************************************************************************
Rename the tables in the data dictionary. */

ulint
row_merge_rename_tables(
/*====================*/
					/* out: error code or DB_SUCCESS */
	dict_table_t*	old_table,	/* in/out: old table, renamed to
					tmp_name */
	dict_table_t*	new_table,	/* in/out: new table, renamed to
					old_table->name */
	const char*	tmp_name,	/* in: new name for old_table */
	trx_t*		trx)		/* in: transaction handle */
{
	ulint		err	= DB_ERROR;
	pars_info_t*	info;
	const char*	old_name= old_table->name;

	ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
	ut_ad(old_table != new_table);
1951
	ut_ad(mutex_own(&dict_sys->mutex));
1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002

	trx->op_info = "renaming tables";
	trx_start_if_not_started(trx);

	/* We use the private SQL parser of Innobase to generate the query
	graphs needed in updating the dictionary data in system tables. */

	info = pars_info_create();

	pars_info_add_str_literal(info, "new_name", new_table->name);
	pars_info_add_str_literal(info, "old_name", old_name);
	pars_info_add_str_literal(info, "tmp_name", tmp_name);

	err = que_eval_sql(info,
			   "PROCEDURE RENAME_TABLES () IS\n"
			   "BEGIN\n"
			   "UPDATE SYS_TABLES SET NAME = :tmp_name\n"
			   " WHERE NAME = :old_name;\n"
			   "UPDATE SYS_TABLES SET NAME = :old_name\n"
			   " WHERE NAME = :new_name;\n"
			   "END;\n", FALSE, trx);

	if (err != DB_SUCCESS) {

		goto err_exit;
	}

	/* The following calls will also rename the .ibd data files if
	the tables are stored in a single-table tablespace */

	if (!dict_table_rename_in_cache(old_table, tmp_name, FALSE)
	    || !dict_table_rename_in_cache(new_table, old_name, FALSE)) {

		err = DB_ERROR;
		goto err_exit;
	}

	err = dict_load_foreigns(old_name, TRUE);

	if (err != DB_SUCCESS) {
err_exit:
		trx->error_state = DB_SUCCESS;
		trx_general_rollback_for_mysql(trx, FALSE, NULL);
		trx->error_state = DB_SUCCESS;
	}

	trx->op_info = "";

	return(err);
}

2003
/*************************************************************************
2004
Create the index and load in to the dictionary. */
2005

2006
dict_index_t*
2007 2008
row_merge_create_index(
/*===================*/
2009 2010
					/* out: index, or NULL on error */
	trx_t*		trx,		/* in/out: trx (sets error_state) */
2011 2012 2013 2014
	dict_table_t*	table,		/* in: the index is on this table */
	const merge_index_def_t*	/* in: the index definition */
			index_def)
{
2015
	dict_index_t*	index;
2016
	ulint		err;
2017
	ulint		n_fields = index_def->n_fields;
2018
	ulint		i;
2019 2020 2021

	/* Create the index prototype, using the passed in def, this is not
	a persistent operation. We pass 0 as the space id, and determine at
2022
	a lower level the space id where to store the table. */
2023

2024 2025
	index = dict_mem_index_create(table->name, index_def->name,
				      0, index_def->ind_type, n_fields);
2026

2027
	ut_a(index);
2028 2029 2030

	/* Create the index id, as it will be required when we build
	the index. We assign the id here because we want to write an
2031 2032
	UNDO record before we insert the entry into SYS_INDEXES. */
	ut_a(ut_dulint_is_zero(index->id));
2033

2034 2035
	index->id = dict_hdr_get_new_id(DICT_HDR_INDEX_ID);
	index->table = table;
2036

2037 2038
	for (i = 0; i < n_fields; i++) {
		merge_index_field_t*	ifield = &index_def->fields[i];
2039

2040 2041 2042
		dict_mem_index_add_field(index, ifield->field_name,
					 ifield->prefix_len);
	}
2043

2044 2045 2046
	/* Add the index to SYS_INDEXES, this will use the prototype
	to create an entry in SYS_INDEXES. */
	err = row_create_index_graph_for_mysql(trx, table, index);
2047

2048
	if (err == DB_SUCCESS) {
2049

2050 2051
		index = row_merge_dict_table_get_index(
			table, index_def);
2052

2053
		ut_a(index);
2054

2055
#ifdef ROW_MERGE_IS_INDEX_USABLE
2056 2057 2058 2059
		/* Note the id of the transaction that created this
		index, we use it to restrict readers from accessing
		this index, to ensure read consistency. */
		index->trx_id = trx->id;
2060
#endif /* ROW_MERGE_IS_INDEX_USABLE */
2061
	} else {
2062
		trx->error_state = err;
2063
		index = NULL;
2064 2065 2066
	}

	return(index);
2067 2068
}

2069
#ifdef ROW_MERGE_IS_INDEX_USABLE
2070
/*************************************************************************
2071
Check if a transaction can use an index. */
2072 2073 2074 2075

ibool
row_merge_is_index_usable(
/*======================*/
2076 2077
	const trx_t*		trx,	/* in: transaction */
	const dict_index_t*	index)	/* in: index to check */
2078 2079 2080 2081 2082 2083 2084
{
	if (!trx->read_view) {
		return(TRUE);
	}

	return(ut_dulint_cmp(index->trx_id, trx->read_view->low_limit_id) < 0);
}
2085
#endif /* ROW_MERGE_IS_INDEX_USABLE */
2086 2087

/*************************************************************************
2088
Drop the old table. */
2089 2090 2091 2092

ulint
row_merge_drop_table(
/*=================*/
2093
					/* out: DB_SUCCESS or error code */
2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104
	trx_t*		trx,		/* in: transaction */
	dict_table_t*	table)		/* in: table to drop */
{
	ulint		err = DB_SUCCESS;
	ibool		dict_locked = FALSE;

	if (trx->dict_operation_lock_mode == 0) {
		row_mysql_lock_data_dictionary(trx);
		dict_locked = TRUE;
	}

marko's avatar
marko committed
2105
	/* Drop the table immediately if it is not referenced by MySQL */
2106
	if (table->n_mysql_handles_opened == 0) {
2107 2108
		err = row_drop_table_for_mysql_no_commit(table->name, trx,
							 FALSE);
2109 2110 2111 2112 2113 2114 2115 2116
	}

	if (dict_locked) {
		row_mysql_unlock_data_dictionary(trx);
	}

	return(err);
}
2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127

/*************************************************************************
Build indexes on a table by reading a clustered index,
creating a temporary file containing index entries, merge sorting
these index entries and inserting sorted index entries to indexes. */

ulint
row_merge_build_indexes(
/*====================*/
					/* out: DB_SUCCESS or error code */
	trx_t*		trx,		/* in: transaction */
marko's avatar
marko committed
2128
	dict_table_t*	old_table,	/* in: table where rows are
2129
					read from */
marko's avatar
marko committed
2130 2131 2132
	dict_table_t*	new_table,	/* in: table where indexes are
					created; identical to old_table
					unless creating a PRIMARY KEY */
2133 2134 2135 2136
	dict_index_t**	indexes,	/* in: indexes to be created */
	ulint		n_indexes)	/* in: size of indexes[] */
{
	merge_file_t*		merge_files;
2137 2138
	row_merge_block_t*	block;
	ulint			block_size;
2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154
	ulint			i;
	ulint			error;
	int			tmpfd;

	ut_ad(trx);
	ut_ad(old_table);
	ut_ad(new_table);
	ut_ad(indexes);
	ut_ad(n_indexes);

	trx_start_if_not_started(trx);

	/* Allocate memory for merge file data structure and initialize
	fields */

	merge_files = mem_alloc(n_indexes * sizeof *merge_files);
2155 2156
	block_size = 3 * sizeof *block;
	block = os_mem_alloc_large(&block_size);
2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168

	for (i = 0; i < n_indexes; i++) {

		row_merge_file_create(&merge_files[i]);
	}

	tmpfd = innobase_mysql_tmpfile();

	/* Read clustered index of the table and create files for
	secondary index entries for merge sort */

	error = row_merge_read_clustered_index(
2169 2170
		trx, old_table, new_table, indexes,
		merge_files, n_indexes, block);
2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181

	if (error != DB_SUCCESS) {

		goto func_exit;
	}

	/* Now we have files containing index entries ready for
	sorting and inserting. */

	for (i = 0; i < n_indexes; i++) {
		error = row_merge_sort(indexes[i], &merge_files[i],
2182
				       block, &tmpfd);
2183 2184 2185 2186

		if (error == DB_SUCCESS) {
			error = row_merge_insert_index_tuples(
				trx, indexes[i], new_table,
2187
				dict_table_zip_size(old_table),
2188
				merge_files[i].fd, block);
2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207
		}

		/* Close the temporary file to free up space. */
		row_merge_file_destroy(&merge_files[i]);

		if (error != DB_SUCCESS) {
			trx->error_key_num = i;
			goto func_exit;
		}
	}

func_exit:
	close(tmpfd);

	for (i = 0; i < n_indexes; i++) {
		row_merge_file_destroy(&merge_files[i]);
	}

	mem_free(merge_files);
2208
	os_mem_free_large(block, block_size);
2209 2210 2211

	return(error);
}