row0merge.c 69.5 KB
Newer Older
1 2
/*****************************************************************************

3
Copyright (c) 2005, 2010, Innobase Oy. All Rights Reserved.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 59 Temple
Place, Suite 330, Boston, MA 02111-1307 USA

*****************************************************************************/

19 20
/**************************************************//**
@file row/row0merge.c
21 22 23
New index creation routines using a merge sort

Created 12/4/2005 Jan Lindstrom
24
Completed by Sunny Bains and Marko Makela
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
*******************************************************/

#include "row0merge.h"
#include "row0ext.h"
#include "row0row.h"
#include "row0upd.h"
#include "row0ins.h"
#include "row0sel.h"
#include "dict0dict.h"
#include "dict0mem.h"
#include "dict0boot.h"
#include "dict0crea.h"
#include "dict0load.h"
#include "btr0btr.h"
#include "mach0data.h"
#include "trx0rseg.h"
#include "trx0trx.h"
#include "trx0roll.h"
#include "trx0undo.h"
#include "trx0purge.h"
#include "trx0rec.h"
#include "que0que.h"
#include "rem0cmp.h"
#include "read0read.h"
#include "os0file.h"
#include "lock0lock.h"
#include "data0data.h"
#include "data0type.h"
#include "que0que.h"
#include "pars0pars.h"
#include "mem0mem.h"
#include "log0log.h"
57
#include "ut0sort.h"
58
#include "handler0alter.h"
59

60 61 62 63 64
/* Ignore posix_fadvise() on those platforms where it does not exist */
#if defined __WIN__
# define posix_fadvise(fd, offset, len, advice) /* nothing */
#endif /* __WIN__ */

65
#ifdef UNIV_DEBUG
66 67
/** Set these in order ot enable debug printout. */
/* @{ */
68
/** Log the outcome of each row_merge_cmp() call, comparing records. */
69
static ibool	row_merge_print_cmp;
70
/** Log each record read from temporary file. */
71
static ibool	row_merge_print_read;
72
/** Log each record write to temporary file. */
73
static ibool	row_merge_print_write;
74 75 76 77 78 79 80
/** Log each row_merge_blocks() call, merging two blocks of records to
a bigger one. */
static ibool	row_merge_print_block;
/** Log each block read from temporary file. */
static ibool	row_merge_print_block_read;
/** Log each block read from temporary file. */
static ibool	row_merge_print_block_write;
81
/* @} */
82 83
#endif /* UNIV_DEBUG */

84 85 86 87
/** @brief Block size for I/O operations in merge sort.

The minimum is UNIV_PAGE_SIZE, or page_get_free_space_of_empty()
rounded to a power of 2.
88 89 90 91

When not creating a PRIMARY KEY that contains column prefixes, this
can be set as small as UNIV_PAGE_SIZE / 2.  See the comment above
ut_ad(data_size < sizeof(row_merge_block_t)). */
92
typedef byte	row_merge_block_t[1048576];
93

94
/** @brief Secondary buffer for I/O operations of merge records.
95

96 97 98 99
This buffer is used for writing or reading a record that spans two
row_merge_block_t.  Thus, it must be able to hold one merge record,
whose maximum size is the same as the minimum size of
row_merge_block_t. */
100
typedef byte	mrec_buf_t[UNIV_PAGE_SIZE];
101

102 103 104 105
/** @brief Merge record in row_merge_block_t.

The format is the same as a record in ROW_FORMAT=COMPACT with the
exception that the REC_N_NEW_EXTRA_BYTES are omitted. */
106 107
typedef byte	mrec_t;

108
/** Buffer for sorting in main memory. */
109
struct row_merge_buf_struct {
110 111 112 113 114 115
	mem_heap_t*	heap;		/*!< memory heap where allocated */
	dict_index_t*	index;		/*!< the index the tuples belong to */
	ulint		total_size;	/*!< total amount of data bytes */
	ulint		n_tuples;	/*!< number of data tuples */
	ulint		max_tuples;	/*!< maximum number of data tuples */
	const dfield_t**tuples;		/*!< array of pointers to
116 117
					arrays of fields that form
					the data tuples */
118
	const dfield_t**tmp_tuples;	/*!< temporary copy of tuples,
119
					for sorting */
120 121
};

122
/** Buffer for sorting in main memory. */
123
typedef struct row_merge_buf_struct row_merge_buf_t;
124

125
/** Information about temporary files used in merge sort */
126
struct merge_file_struct {
127 128 129
	int		fd;		/*!< file descriptor */
	ulint		offset;		/*!< file offset (end of file) */
	ib_uint64_t	n_rec;		/*!< number of records in the file */
130 131
};

132
/** Information about temporary files used in merge sort */
133
typedef struct merge_file_struct merge_file_t;
134

135
#ifdef UNIV_DEBUG
136
/******************************************************//**
137 138 139 140 141
Display a merge tuple. */
static
void
row_merge_tuple_print(
/*==================*/
142 143 144
	FILE*		f,	/*!< in: output stream */
	const dfield_t*	entry,	/*!< in: tuple to print */
	ulint		n_fields)/*!< in: number of fields in the tuple */
145 146 147 148 149 150 151 152 153
{
	ulint	j;

	for (j = 0; j < n_fields; j++) {
		const dfield_t*	field = &entry[j];

		if (dfield_is_null(field)) {
			fputs("\n NULL;", f);
		} else {
154 155
			ulint	field_len	= dfield_get_len(field);
			ulint	len		= ut_min(field_len, 20);
156 157 158 159 160
			if (dfield_is_ext(field)) {
				fputs("\nE", f);
			} else {
				fputs("\n ", f);
			}
161 162 163
			ut_print_buf(f, dfield_get_data(field), len);
			if (len != field_len) {
				fprintf(f, " (total %lu bytes)", field_len);
164 165 166 167 168 169 170
			}
		}
	}
	putc('\n', f);
}
#endif /* UNIV_DEBUG */

171
/******************************************************//**
172 173
Allocate a sort buffer.
@return	own: sort buffer */
174
static
175 176 177
row_merge_buf_t*
row_merge_buf_create_low(
/*=====================*/
178 179 180 181
	mem_heap_t*	heap,		/*!< in: heap where allocated */
	dict_index_t*	index,		/*!< in: secondary index */
	ulint		max_tuples,	/*!< in: maximum number of data tuples */
	ulint		buf_size)	/*!< in: size of the buffer, in bytes */
182
{
183 184
	row_merge_buf_t*	buf;

185 186 187 188
	ut_ad(max_tuples > 0);
	ut_ad(max_tuples <= sizeof(row_merge_block_t));
	ut_ad(max_tuples < buf_size);

189
	buf = mem_heap_zalloc(heap, buf_size);
190 191 192 193 194 195 196 197
	buf->heap = heap;
	buf->index = index;
	buf->max_tuples = max_tuples;
	buf->tuples = mem_heap_alloc(heap,
				     2 * max_tuples * sizeof *buf->tuples);
	buf->tmp_tuples = buf->tuples + max_tuples;

	return(buf);
198 199
}

200
/******************************************************//**
201 202
Allocate a sort buffer.
@return	own: sort buffer */
203
static
204 205
row_merge_buf_t*
row_merge_buf_create(
206
/*=================*/
207
	dict_index_t*	index)	/*!< in: secondary index */
208
{
209 210 211 212
	row_merge_buf_t*	buf;
	ulint			max_tuples;
	ulint			buf_size;
	mem_heap_t*		heap;
213

214 215
	max_tuples = sizeof(row_merge_block_t)
		/ ut_max(1, dict_index_get_min_size(index));
216

217
	buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
218

219
	heap = mem_heap_create(buf_size + sizeof(row_merge_block_t));
220

221
	buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
222

223
	return(buf);
224 225
}

226
/******************************************************//**
227 228
Empty a sort buffer.
@return	sort buffer */
229
static
230
row_merge_buf_t*
231 232
row_merge_buf_empty(
/*================*/
233
	row_merge_buf_t*	buf)	/*!< in,own: sort buffer */
234
{
235 236 237 238
	ulint		buf_size;
	ulint		max_tuples	= buf->max_tuples;
	mem_heap_t*	heap		= buf->heap;
	dict_index_t*	index		= buf->index;
239

240
	buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
241

242
	mem_heap_empty(heap);
243

244
	return(row_merge_buf_create_low(heap, index, max_tuples, buf_size));
245 246
}

247
/******************************************************//**
248
Deallocate a sort buffer. */
249
static
250 251 252
void
row_merge_buf_free(
/*===============*/
253
	row_merge_buf_t*	buf)	/*!< in,own: sort buffer, to be freed */
254
{
255
	mem_heap_free(buf->heap);
256 257
}

258
/******************************************************//**
259 260
Insert a data tuple into a sort buffer.
@return	TRUE if added, FALSE if out of space */
261
static
262 263 264
ibool
row_merge_buf_add(
/*==============*/
265 266 267
	row_merge_buf_t*	buf,	/*!< in/out: sort buffer */
	const dtuple_t*		row,	/*!< in: row in clustered index */
	const row_ext_t*	ext)	/*!< in: cache of externally stored
268
					column prefixes, or NULL */
269
{
270 271 272 273 274 275 276
	ulint			i;
	ulint			n_fields;
	ulint			data_size;
	ulint			extra_size;
	const dict_index_t*	index;
	dfield_t*		entry;
	dfield_t*		field;
277
	const dict_field_t*	ifield;
278

279 280
	if (buf->n_tuples >= buf->max_tuples) {
		return(FALSE);
281 282
	}

283 284
	UNIV_PREFETCH_R(row->fields);

285 286 287
	index = buf->index;

	n_fields = dict_index_get_n_fields(index);
288 289 290 291 292 293

	entry = mem_heap_alloc(buf->heap, n_fields * sizeof *entry);
	buf->tuples[buf->n_tuples] = entry;
	field = entry;

	data_size = 0;
294
	extra_size = UT_BITS_IN_BYTES(index->n_nullable);
295

296 297 298
	ifield = dict_index_get_nth_field(index, 0);

	for (i = 0; i < n_fields; i++, field++, ifield++) {
299 300 301
		const dict_col_t*	col;
		ulint			col_no;
		const dfield_t*		row_field;
302
		ulint			len;
303 304 305 306 307

		col = ifield->col;
		col_no = dict_col_get_no(col);
		row_field = dtuple_get_nth_field(row, col_no);
		dfield_copy(field, row_field);
308
		len = dfield_get_len(field);
309

310 311 312 313 314 315
		if (dfield_is_null(field)) {
			ut_ad(!(col->prtype & DATA_NOT_NULL));
			continue;
		} else if (UNIV_LIKELY(!ext)) {
		} else if (dict_index_is_clust(index)) {
			/* Flag externally stored fields. */
316 317
			const byte*	buf = row_ext_lookup(ext, col_no,
							     &len);
318
			if (UNIV_LIKELY_NULL(buf)) {
319
				ut_a(buf != field_ref_zero);
320 321 322 323
				if (i < dict_index_get_n_unique(index)) {
					dfield_set_data(field, buf, len);
				} else {
					dfield_set_ext(field);
324
					len = dfield_get_len(field);
325
				}
326 327
			}
		} else {
328 329
			const byte*	buf = row_ext_lookup(ext, col_no,
							     &len);
330
			if (UNIV_LIKELY_NULL(buf)) {
331
				ut_a(buf != field_ref_zero);
332
				dfield_set_data(field, buf, len);
333 334
			}
		}
335

336
		/* If a column prefix index, take only the prefix */
337

338
		if (ifield->prefix_len) {
339
			len = dtype_get_at_most_n_mbchars(
340
				col->prtype,
341
				col->mbminmaxlen,
342
				ifield->prefix_len,
343 344
				len, dfield_get_data(field));
			dfield_set_len(field, len);
345
		}
346

347
		ut_ad(len <= col->len || col->mtype == DATA_BLOB);
348

349
		if (ifield->fixed_len) {
350
			ut_ad(len == ifield->fixed_len);
351 352 353
			ut_ad(!dfield_is_ext(field));
		} else if (dfield_is_ext(field)) {
			extra_size += 2;
354
		} else if (len < 128
355 356 357
			   || (col->len < 256 && col->mtype != DATA_BLOB)) {
			extra_size++;
		} else {
358 359 360 361
			/* For variable-length columns, we look up the
			maximum length from the column itself.  If this
			is a prefix index column shorter than 256 bytes,
			this will waste one byte. */
362 363
			extra_size += 2;
		}
364
		data_size += len;
365
	}
366

367 368 369 370
#ifdef UNIV_DEBUG
	{
		ulint	size;
		ulint	extra;
371

372
		size = rec_get_converted_size_comp(index,
373
						   REC_STATUS_ORDINARY,
374
						   entry, n_fields, &extra);
375

376 377
		ut_ad(data_size + extra_size + REC_N_NEW_EXTRA_BYTES == size);
		ut_ad(extra_size + REC_N_NEW_EXTRA_BYTES == extra);
378
	}
379
#endif /* UNIV_DEBUG */
380

381 382 383 384
	/* Add to the total size of the record in row_merge_block_t
	the encoded length of extra_size and the extra bytes (extra_size).
	See row_merge_buf_write() for the variable-length encoding
	of extra_size. */
385
	data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
386

387 388 389 390 391 392 393 394
	/* The following assertion may fail if row_merge_block_t is
	declared very small and a PRIMARY KEY is being created with
	many prefix columns.  In that case, the record may exceed the
	page_zip_rec_needs_ext() limit.  However, no further columns
	will be moved to external storage until the record is inserted
	to the clustered index B-tree. */
	ut_ad(data_size < sizeof(row_merge_block_t));

395 396 397 398
	/* Reserve one byte for the end marker of row_merge_block_t. */
	if (buf->total_size + data_size >= sizeof(row_merge_block_t) - 1) {
		return(FALSE);
	}
399

400 401
	buf->total_size += data_size;
	buf->n_tuples++;
402

403
	field = entry;
404

405
	/* Copy the data fields. */
406 407

	do {
408
		dfield_dup(field++, buf->heap);
409
	} while (--n_fields);
410

411
	return(TRUE);
412 413
}

414
/** Structure for reporting duplicate records. */
415
struct row_merge_dup_struct {
416
	const dict_index_t*	index;		/*!< index being sorted */
417
	struct TABLE*		table;		/*!< MySQL table object */
418
	ulint			n_dup;		/*!< number of duplicates */
419 420
};

421
/** Structure for reporting duplicate records. */
422 423
typedef struct row_merge_dup_struct row_merge_dup_t;

424
/*************************************************************//**
425 426 427 428 429
Report a duplicate key. */
static
void
row_merge_dup_report(
/*=================*/
430 431
	row_merge_dup_t*	dup,	/*!< in/out: for reporting duplicates */
	const dfield_t*		entry)	/*!< in: duplicate index entry */
432
{
433
	mrec_buf_t* 		buf;
434 435
	const dtuple_t*		tuple;
	dtuple_t		tuple_store;
436
	const rec_t*		rec;
437 438
	const dict_index_t*	index	= dup->index;
	ulint			n_fields= dict_index_get_n_fields(index);
439
	mem_heap_t*		heap;
440
	ulint*			offsets;
441
	ulint			n_ext;
442 443 444 445 446 447 448

	if (dup->n_dup++) {
		/* Only report the first duplicate record,
		but count all duplicate records. */
		return;
	}

449
	/* Convert the tuple to a record and then to MySQL format. */
450 451 452 453 454
	heap = mem_heap_create((1 + REC_OFFS_HEADER_SIZE + n_fields)
			       * sizeof *offsets
			       + sizeof *buf);

	buf = mem_heap_alloc(heap, sizeof *buf);
455

456 457
	tuple = dtuple_from_fields(&tuple_store, entry, n_fields);
	n_ext = dict_index_is_clust(index) ? dtuple_get_n_ext(tuple) : 0;
458

459 460
	rec = rec_convert_dtuple_to_rec(*buf, index, tuple, n_ext);
	offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
461

462
	innobase_rec_to_mysql(dup->table, rec, index, offsets);
463

464
	mem_heap_free(heap);
465 466
}

467
/*************************************************************//**
468 469
Compare two tuples.
@return	1, 0, -1 if a is greater, equal, less, respectively, than b */
470
static
471
int
472 473
row_merge_tuple_cmp(
/*================*/
474 475 476 477
	ulint			n_field,/*!< in: number of fields */
	const dfield_t*		a,	/*!< in: first tuple to be compared */
	const dfield_t*		b,	/*!< in: second tuple to be compared */
	row_merge_dup_t*	dup)	/*!< in/out: for reporting duplicates */
478
{
479 480
	int		cmp;
	const dfield_t*	field	= a;
481

482 483 484
	/* Compare the fields of the tuples until a difference is
	found or we run out of fields to compare.  If !cmp at the
	end, the tuples are equal. */
485 486 487
	do {
		cmp = cmp_dfield_dfield(a++, b++);
	} while (!cmp && --n_field);
488

489
	if (UNIV_UNLIKELY(!cmp) && UNIV_LIKELY_NULL(dup)) {
490 491 492 493 494 495 496 497 498 499 500
		/* Report a duplicate value error if the tuples are
		logically equal.  NULL columns are logically inequal,
		although they are equal in the sorting order.  Find
		out if any of the fields are NULL. */
		for (b = field; b != a; b++) {
			if (dfield_is_null(b)) {

				goto func_exit;
			}
		}

501
		row_merge_dup_report(dup, field);
502
	}
503

504
func_exit:
505
	return(cmp);
506 507
}

508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
/** Wrapper for row_merge_tuple_sort() to inject some more context to
UT_SORT_FUNCTION_BODY().
@param a	array of tuples that being sorted
@param b	aux (work area), same size as tuples[]
@param c	lower bound of the sorting area, inclusive
@param d	upper bound of the sorting area, inclusive */
#define row_merge_tuple_sort_ctx(a,b,c,d) \
	row_merge_tuple_sort(n_field, dup, a, b, c, d)
/** Wrapper for row_merge_tuple_cmp() to inject some more context to
UT_SORT_FUNCTION_BODY().
@param a	first tuple to be compared
@param b	second tuple to be compared
@return	1, 0, -1 if a is greater, equal, less, respectively, than b */
#define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, a, b, dup)

/**********************************************************************//**
524
Merge sort the tuple buffer in main memory. */
525
static
526 527 528
void
row_merge_tuple_sort(
/*=================*/
529 530 531 532 533
	ulint			n_field,/*!< in: number of fields */
	row_merge_dup_t*	dup,	/*!< in/out: for reporting duplicates */
	const dfield_t**	tuples,	/*!< in/out: tuples */
	const dfield_t**	aux,	/*!< in/out: work area */
	ulint			low,	/*!< in: lower bound of the
534
					sorting area, inclusive */
535
	ulint			high)	/*!< in: upper bound of the
536
					sorting area, exclusive */
537
{
538 539
	UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
			      tuples, aux, low, high, row_merge_tuple_cmp_ctx);
540 541
}

542
/******************************************************//**
543
Sort a buffer. */
544
static
545
void
546
row_merge_buf_sort(
547
/*===============*/
548 549
	row_merge_buf_t*	buf,	/*!< in/out: sort buffer */
	row_merge_dup_t*	dup)	/*!< in/out: for reporting duplicates */
550
{
551
	row_merge_tuple_sort(dict_index_get_n_unique(buf->index), dup,
552
			     buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
553 554
}

555
/******************************************************//**
556
Write a buffer to a block. */
557
static
558 559 560
void
row_merge_buf_write(
/*================*/
561
	const row_merge_buf_t*	buf,	/*!< in: sorted buffer */
562
#ifdef UNIV_DEBUG
563
	const merge_file_t*	of,	/*!< in: output file */
564
#endif /* UNIV_DEBUG */
565
	row_merge_block_t*	block)	/*!< out: buffer for writing to file */
566 567 568
#ifndef UNIV_DEBUG
# define row_merge_buf_write(buf, of, block) row_merge_buf_write(buf, block)
#endif /* !UNIV_DEBUG */
569
{
570 571 572
	const dict_index_t*	index	= buf->index;
	ulint			n_fields= dict_index_get_n_fields(index);
	byte*			b	= &(*block)[0];
573

574
	ulint		i;
575 576 577 578

	for (i = 0; i < buf->n_tuples; i++) {
		ulint		size;
		ulint		extra_size;
579
		const dfield_t*	entry		= buf->tuples[i];
580

581
		size = rec_get_converted_size_comp(index,
582
						   REC_STATUS_ORDINARY,
583
						   entry, n_fields,
584 585 586 587 588 589 590 591
						   &extra_size);
		ut_ad(size > extra_size);
		ut_ad(extra_size >= REC_N_NEW_EXTRA_BYTES);
		extra_size -= REC_N_NEW_EXTRA_BYTES;
		size -= REC_N_NEW_EXTRA_BYTES;

		/* Encode extra_size + 1 */
		if (extra_size + 1 < 0x80) {
592
			*b++ = (byte) (extra_size + 1);
593
		} else {
594
			ut_ad((extra_size + 1) < 0x8000);
595
			*b++ = (byte) (0x80 | ((extra_size + 1) >> 8));
596
			*b++ = (byte) (extra_size + 1);
597 598
		}

599
		ut_ad(b + size < block[1]);
600

601 602
		rec_convert_dtuple_to_rec_comp(b + extra_size, 0, index,
					       REC_STATUS_ORDINARY,
603
					       entry, n_fields);
604

605
		b += size;
606 607 608 609 610 611 612 613 614

#ifdef UNIV_DEBUG
		if (row_merge_print_write) {
			fprintf(stderr, "row_merge_buf_write %p,%d,%lu %lu",
				(void*) b, of->fd, (ulong) of->offset,
				(ulong) i);
			row_merge_tuple_print(stderr, entry, n_fields);
		}
#endif /* UNIV_DEBUG */
615 616
	}

617 618
	/* Write an "end-of-chunk" marker. */
	ut_a(b < block[1]);
619
	ut_a(b == block[0] + buf->total_size);
620 621 622 623
	*b++ = 0;
#ifdef UNIV_DEBUG_VALGRIND
	/* The rest of the block is uninitialized.  Initialize it
	to avoid bogus warnings. */
624
	memset(b, 0xff, block[1] - b);
625
#endif /* UNIV_DEBUG_VALGRIND */
626 627
#ifdef UNIV_DEBUG
	if (row_merge_print_write) {
628 629
		fprintf(stderr, "row_merge_buf_write %p,%d,%lu EOF\n",
			(void*) b, of->fd, (ulong) of->offset);
630 631
	}
#endif /* UNIV_DEBUG */
632 633
}

634
/******************************************************//**
635 636
Create a memory heap and allocate space for row_merge_rec_offsets()
and mrec_buf_t[3].
637
@return	memory heap */
638
static
639 640 641
mem_heap_t*
row_merge_heap_create(
/*==================*/
642
	const dict_index_t*	index,		/*!< in: record descriptor */
643
	mrec_buf_t**		buf,		/*!< out: 3 buffers */
644 645
	ulint**			offsets1,	/*!< out: offsets */
	ulint**			offsets2)	/*!< out: offsets */
646
{
647
	ulint		i	= 1 + REC_OFFS_HEADER_SIZE
648
		+ dict_index_get_n_fields(index);
649 650
	mem_heap_t*	heap	= mem_heap_create(2 * i * sizeof **offsets1
						  + 3 * sizeof **buf);
651

652 653 654
	*buf = mem_heap_alloc(heap, 3 * sizeof **buf);
	*offsets1 = mem_heap_alloc(heap, i * sizeof **offsets1);
	*offsets2 = mem_heap_alloc(heap, i * sizeof **offsets2);
655

656 657
	(*offsets1)[0] = (*offsets2)[0] = i;
	(*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
658

659
	return(heap);
660 661
}

662
/**********************************************************************//**
663
Search an index object by name and column names.  If several indexes match,
664 665
return the index with the max id.
@return	matching index, NULL if not found */
666
static
667 668 669
dict_index_t*
row_merge_dict_table_get_index(
/*===========================*/
670 671
	dict_table_t*		table,		/*!< in: table */
	const merge_index_def_t*index_def)	/*!< in: index definition */
672
{
673 674 675
	ulint		i;
	dict_index_t*	index;
	const char**	column_names;
676

677
	column_names = mem_alloc(index_def->n_fields * sizeof *column_names);
678

679 680
	for (i = 0; i < index_def->n_fields; ++i) {
		column_names[i] = index_def->fields[i].field_name;
681 682
	}

683 684
	index = dict_table_get_index_by_max_id(
		table, index_def->name, column_names, index_def->n_fields);
685

686
	mem_free((void*) column_names);
687

688
	return(index);
689 690
}

691
/********************************************************************//**
692 693
Read a merge block from the file system.
@return	TRUE if request was successful, FALSE if fail */
694 695 696 697
static
ibool
row_merge_read(
/*===========*/
698
	int			fd,	/*!< in: file descriptor */
699 700 701
	ulint			offset,	/*!< in: offset where to read
					in number of row_merge_block_t
					elements */
702
	row_merge_block_t*	buf)	/*!< out: data */
703 704
{
	ib_uint64_t	ofs = ((ib_uint64_t) offset) * sizeof *buf;
705 706
	ibool		success;

707 708 709 710 711 712 713
#ifdef UNIV_DEBUG
	if (row_merge_print_block_read) {
		fprintf(stderr, "row_merge_read fd=%d ofs=%lu\n",
			fd, (ulong) offset);
	}
#endif /* UNIV_DEBUG */

714 715 716 717
	success = os_file_read_no_error_handling(OS_FILE_FROM_FD(fd), buf,
						 (ulint) (ofs & 0xFFFFFFFF),
						 (ulint) (ofs >> 32),
						 sizeof *buf);
vasil's avatar
vasil committed
718
#ifdef POSIX_FADV_DONTNEED
719 720
	/* Each block is read exactly once.  Free up the file cache. */
	posix_fadvise(fd, ofs, sizeof *buf, POSIX_FADV_DONTNEED);
vasil's avatar
vasil committed
721
#endif /* POSIX_FADV_DONTNEED */
722

723 724 725 726 727
	if (UNIV_UNLIKELY(!success)) {
		ut_print_timestamp(stderr);
		fprintf(stderr,
			"  InnoDB: failed to read merge block at %llu\n", ofs);
	}
728

729
	return(UNIV_LIKELY(success));
730
}
731

732
/********************************************************************//**
733
Write a merge block to the file system.
734
@return	TRUE if request was successful, FALSE if fail */
735 736 737 738
static
ibool
row_merge_write(
/*============*/
739
	int		fd,	/*!< in: file descriptor */
740 741
	ulint		offset,	/*!< in: offset where to write,
				in number of row_merge_block_t elements */
742
	const void*	buf)	/*!< in: data */
743
{
744 745
	size_t		buf_len = sizeof(row_merge_block_t);
	ib_uint64_t	ofs = buf_len * (ib_uint64_t) offset;
746 747 748 749 750
	ibool		ret;

	ret = os_file_write("(merge)", OS_FILE_FROM_FD(fd), buf,
			    (ulint) (ofs & 0xFFFFFFFF),
			    (ulint) (ofs >> 32),
751
			    buf_len);
752

753 754 755 756 757 758
#ifdef UNIV_DEBUG
	if (row_merge_print_block_write) {
		fprintf(stderr, "row_merge_write fd=%d ofs=%lu\n",
			fd, (ulong) offset);
	}
#endif /* UNIV_DEBUG */
759

vasil's avatar
vasil committed
760
#ifdef POSIX_FADV_DONTNEED
761 762
	/* The block will be needed on the next merge pass,
	but it can be evicted from the file cache meanwhile. */
763
	posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED);
vasil's avatar
vasil committed
764
#endif /* POSIX_FADV_DONTNEED */
765

766
	return(UNIV_LIKELY(ret));
767
}
768

769
/********************************************************************//**
770 771
Read a merge record.
@return	pointer to next record, or NULL on I/O error or end of list */
772
static __attribute__((nonnull))
773 774 775
const byte*
row_merge_read_rec(
/*===============*/
776 777 778 779 780 781 782
	row_merge_block_t*	block,	/*!< in/out: file buffer */
	mrec_buf_t*		buf,	/*!< in/out: secondary buffer */
	const byte*		b,	/*!< in: pointer to record */
	const dict_index_t*	index,	/*!< in: index of the record */
	int			fd,	/*!< in: file descriptor */
	ulint*			foffs,	/*!< in/out: file offset */
	const mrec_t**		mrec,	/*!< out: pointer to merge record,
783 784
					or NULL on end of list
					(non-NULL on I/O error) */
785
	ulint*			offsets)/*!< out: offsets of mrec */
786 787 788 789 790 791 792 793 794 795 796 797 798
{
	ulint	extra_size;
	ulint	data_size;
	ulint	avail_size;

	ut_ad(block);
	ut_ad(buf);
	ut_ad(b >= block[0]);
	ut_ad(b < block[1]);
	ut_ad(index);
	ut_ad(foffs);
	ut_ad(mrec);
	ut_ad(offsets);
799

800
	ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
801
	      + dict_index_get_n_fields(index));
802

803
	extra_size = *b++;
804

805 806 807
	if (UNIV_UNLIKELY(!extra_size)) {
		/* End of list */
		*mrec = NULL;
808 809
#ifdef UNIV_DEBUG
		if (row_merge_print_read) {
810 811 812
			fprintf(stderr, "row_merge_read %p,%p,%d,%lu EOF\n",
				(const void*) b, (const void*) block,
				fd, (ulong) *foffs);
813 814
		}
#endif /* UNIV_DEBUG */
815 816
		return(NULL);
	}
817

818 819
	if (extra_size >= 0x80) {
		/* Read another byte of extra_size. */
820

821 822 823 824 825 826 827
		if (UNIV_UNLIKELY(b >= block[1])) {
			if (!row_merge_read(fd, ++(*foffs), block)) {
err_exit:
				/* Signal I/O error. */
				*mrec = b;
				return(NULL);
			}
828

829 830 831
			/* Wrap around to the beginning of the buffer. */
			b = block[0];
		}
832

833 834 835
		extra_size = (extra_size & 0x7f) << 8;
		extra_size |= *b++;
	}
836

837
	/* Normalize extra_size.  Above, value 0 signals "end of list". */
838
	extra_size--;
839

840
	/* Read the extra bytes. */
841

842 843 844 845
	if (UNIV_UNLIKELY(b + extra_size >= block[1])) {
		/* The record spans two blocks.  Copy the entire record
		to the auxiliary buffer and handle this as a special
		case. */
846

847
		avail_size = block[1] - b;
848

849
		memcpy(*buf, b, avail_size);
850

851
		if (!row_merge_read(fd, ++(*foffs), block)) {
852

853 854
			goto err_exit;
		}
855

856 857
		/* Wrap around to the beginning of the buffer. */
		b = block[0];
858

859 860 861
		/* Copy the record. */
		memcpy(*buf + avail_size, b, extra_size - avail_size);
		b += extra_size - avail_size;
862

863
		*mrec = *buf + extra_size;
864

865
		rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
866

867
		data_size = rec_offs_data_size(offsets);
868

869 870 871 872 873
		/* These overflows should be impossible given that
		records are much smaller than either buffer, and
		the record starts near the beginning of each buffer. */
		ut_a(extra_size + data_size < sizeof *buf);
		ut_a(b + data_size < block[1]);
874

875 876 877
		/* Copy the data bytes. */
		memcpy(*buf + extra_size, b, data_size);
		b += data_size;
878

879
		goto func_exit;
880
	}
881

882
	*mrec = b + extra_size;
883

884
	rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
885

886 887
	data_size = rec_offs_data_size(offsets);
	ut_ad(extra_size + data_size < sizeof *buf);
888

889
	b += extra_size + data_size;
890

891 892 893
	if (UNIV_LIKELY(b < block[1])) {
		/* The record fits entirely in the block.
		This is the normal case. */
894
		goto func_exit;
895
	}
896

897
	/* The record spans two blocks.  Copy it to buf. */
898

899
	b -= extra_size + data_size;
900 901 902
	avail_size = block[1] - b;
	memcpy(*buf, b, avail_size);
	*mrec = *buf + extra_size;
903 904 905 906 907 908 909 910
#ifdef UNIV_DEBUG
	/* We cannot invoke rec_offs_make_valid() here, because there
	are no REC_N_NEW_EXTRA_BYTES between extra_size and data_size.
	Similarly, rec_offs_validate() would fail, because it invokes
	rec_get_status(). */
	offsets[2] = (ulint) *mrec;
	offsets[3] = (ulint) index;
#endif /* UNIV_DEBUG */
911

912
	if (!row_merge_read(fd, ++(*foffs), block)) {
913

914 915
		goto err_exit;
	}
916

917 918
	/* Wrap around to the beginning of the buffer. */
	b = block[0];
919

920 921 922
	/* Copy the rest of the record. */
	memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
	b += extra_size + data_size - avail_size;
923

924 925 926
func_exit:
#ifdef UNIV_DEBUG
	if (row_merge_print_read) {
927 928 929
		fprintf(stderr, "row_merge_read %p,%p,%d,%lu ",
			(const void*) b, (const void*) block,
			fd, (ulong) *foffs);
930 931 932 933 934
		rec_print_comp(stderr, *mrec, offsets);
		putc('\n', stderr);
	}
#endif /* UNIV_DEBUG */

935 936
	return(b);
}
937

938
/********************************************************************//**
939 940 941 942 943
Write a merge record. */
static
void
row_merge_write_rec_low(
/*====================*/
944 945
	byte*		b,	/*!< out: buffer */
	ulint		e,	/*!< in: encoded extra_size */
946
#ifdef UNIV_DEBUG
947 948 949
	ulint		size,	/*!< in: total size to write */
	int		fd,	/*!< in: file descriptor */
	ulint		foffs,	/*!< in: file offset */
950
#endif /* UNIV_DEBUG */
951 952
	const mrec_t*	mrec,	/*!< in: record to write */
	const ulint*	offsets)/*!< in: offsets of mrec */
953 954 955 956
#ifndef UNIV_DEBUG
# define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets)	\
	row_merge_write_rec_low(b, e, mrec, offsets)
#endif /* !UNIV_DEBUG */
957
{
958 959 960 961
#ifdef UNIV_DEBUG
	const byte* const end = b + size;
	ut_ad(e == rec_offs_extra_size(offsets) + 1);

962
	if (row_merge_print_write) {
963 964
		fprintf(stderr, "row_merge_write %p,%d,%lu ",
			(void*) b, fd, (ulong) foffs);
965 966 967 968 969
		rec_print_comp(stderr, mrec, offsets);
		putc('\n', stderr);
	}
#endif /* UNIV_DEBUG */

970
	if (e < 0x80) {
971
		*b++ = (byte) e;
972
	} else {
973
		*b++ = (byte) (0x80 | (e >> 8));
974
		*b++ = (byte) e;
975 976
	}

977
	memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
978
	ut_ad(b + rec_offs_size(offsets) == end);
979 980
}

981
/********************************************************************//**
982 983
Write a merge record.
@return	pointer to end of block, or NULL on error */
984
static
985 986 987
byte*
row_merge_write_rec(
/*================*/
988 989 990 991 992 993 994
	row_merge_block_t*	block,	/*!< in/out: file buffer */
	mrec_buf_t*		buf,	/*!< in/out: secondary buffer */
	byte*			b,	/*!< in: pointer to end of block */
	int			fd,	/*!< in: file descriptor */
	ulint*			foffs,	/*!< in/out: file offset */
	const mrec_t*		mrec,	/*!< in: record to write */
	const ulint*		offsets)/*!< in: offsets of mrec */
995
{
996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
	ulint	extra_size;
	ulint	size;
	ulint	avail_size;

	ut_ad(block);
	ut_ad(buf);
	ut_ad(b >= block[0]);
	ut_ad(b < block[1]);
	ut_ad(mrec);
	ut_ad(foffs);
	ut_ad(mrec < block[0] || mrec > block[1]);
	ut_ad(mrec < buf[0] || mrec > buf[1]);

	/* Normalize extra_size.  Value 0 signals "end of list". */
	extra_size = rec_offs_extra_size(offsets) + 1;

	size = extra_size + (extra_size >= 0x80)
		+ rec_offs_data_size(offsets);

	if (UNIV_UNLIKELY(b + size >= block[1])) {
		/* The record spans two blocks.
		Copy it to the temporary buffer first. */
		avail_size = block[1] - b;

1020
		row_merge_write_rec_low(buf[0],
1021 1022
					extra_size, size, fd, *foffs,
					mrec, offsets);
1023 1024 1025 1026 1027 1028 1029 1030 1031

		/* Copy the head of the temporary buffer, write
		the completed block, and copy the tail of the
		record to the head of the new block. */
		memcpy(b, buf[0], avail_size);

		if (!row_merge_write(fd, (*foffs)++, block)) {
			return(NULL);
		}
1032

1033
		UNIV_MEM_INVALID(block[0], sizeof block[0]);
1034

1035 1036 1037 1038 1039
		/* Copy the rest. */
		b = block[0];
		memcpy(b, buf[0] + avail_size, size - avail_size);
		b += size - avail_size;
	} else {
1040 1041
		row_merge_write_rec_low(b, extra_size, size, fd, *foffs,
					mrec, offsets);
1042
		b += size;
1043 1044
	}

1045
	return(b);
1046 1047
}

1048
/********************************************************************//**
1049 1050
Write an end-of-list marker.
@return	pointer to end of block, or NULL on error */
1051
static
1052 1053 1054
byte*
row_merge_write_eof(
/*================*/
1055 1056 1057 1058
	row_merge_block_t*	block,	/*!< in/out: file buffer */
	byte*			b,	/*!< in: pointer to end of block */
	int			fd,	/*!< in: file descriptor */
	ulint*			foffs)	/*!< in/out: file offset */
1059
{
1060 1061 1062 1063
	ut_ad(block);
	ut_ad(b >= block[0]);
	ut_ad(b < block[1]);
	ut_ad(foffs);
1064 1065
#ifdef UNIV_DEBUG
	if (row_merge_print_write) {
1066 1067
		fprintf(stderr, "row_merge_write %p,%p,%d,%lu EOF\n",
			(void*) b, (void*) block, fd, (ulong) *foffs);
1068 1069
	}
#endif /* UNIV_DEBUG */
1070 1071

	*b++ = 0;
1072 1073
	UNIV_MEM_ASSERT_RW(block[0], b - block[0]);
	UNIV_MEM_ASSERT_W(block[0], sizeof block[0]);
1074 1075 1076 1077 1078
#ifdef UNIV_DEBUG_VALGRIND
	/* The rest of the block is uninitialized.  Initialize it
	to avoid bogus warnings. */
	memset(b, 0xff, block[1] - b);
#endif /* UNIV_DEBUG_VALGRIND */
1079 1080 1081

	if (!row_merge_write(fd, (*foffs)++, block)) {
		return(NULL);
1082 1083
	}

1084
	UNIV_MEM_INVALID(block[0], sizeof block[0]);
1085 1086 1087
	return(block[0]);
}

1088
/*************************************************************//**
1089 1090
Compare two merge records.
@return	1, 0, -1 if mrec1 is greater, equal, less, respectively, than mrec2 */
1091 1092 1093 1094
static
int
row_merge_cmp(
/*==========*/
1095
	const mrec_t*		mrec1,		/*!< in: first merge
1096
						record to be compared */
1097
	const mrec_t*		mrec2,		/*!< in: second merge
1098
						record to be compared */
1099 1100
	const ulint*		offsets1,	/*!< in: first record offsets */
	const ulint*		offsets2,	/*!< in: second record offsets */
1101 1102 1103
	const dict_index_t*	index,		/*!< in: index */
	ibool*			null_eq)	/*!< out: set to TRUE if
						found matching null values */
1104
{
1105 1106
	int	cmp;

1107 1108
	cmp = cmp_rec_rec_simple(mrec1, mrec2, offsets1, offsets2, index,
				 null_eq);
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120

#ifdef UNIV_DEBUG
	if (row_merge_print_cmp) {
		fputs("row_merge_cmp1 ", stderr);
		rec_print_comp(stderr, mrec1, offsets1);
		fputs("\nrow_merge_cmp2 ", stderr);
		rec_print_comp(stderr, mrec2, offsets2);
		fprintf(stderr, "\nrow_merge_cmp=%d\n", cmp);
	}
#endif /* UNIV_DEBUG */

	return(cmp);
1121 1122
}

1123
/********************************************************************//**
1124
Reads clustered index of the table and create temporary files
1125 1126
containing the index entries for the indexes to be built.
@return	DB_SUCCESS or error */
1127
static __attribute__((nonnull))
1128 1129 1130
ulint
row_merge_read_clustered_index(
/*===========================*/
1131
	trx_t*			trx,	/*!< in: transaction */
1132
	struct TABLE*		table,	/*!< in/out: MySQL table object,
1133
					for reporting erroneous records */
1134
	const dict_table_t*	old_table,/*!< in: table where rows are
1135
					read from */
1136
	const dict_table_t*	new_table,/*!< in: table where indexes are
1137 1138
					created; identical to old_table
					unless creating a PRIMARY KEY */
1139 1140 1141 1142
	dict_index_t**		index,	/*!< in: indexes to be created */
	merge_file_t*		files,	/*!< in: temporary files */
	ulint			n_index,/*!< in: number of indexes to create */
	row_merge_block_t*	block)	/*!< in/out: file buffer */
1143
{
1144 1145
	dict_index_t*		clust_index;	/* Clustered index */
	mem_heap_t*		row_heap;	/* Heap memory to create
1146
						clustered index records */
1147 1148
	row_merge_buf_t**	merge_buf;	/* Temporary list for records*/
	btr_pcur_t		pcur;		/* Persistent cursor on the
1149
						clustered index */
1150 1151 1152
	mtr_t			mtr;		/* Mini transaction */
	ulint			err = DB_SUCCESS;/* Return code */
	ulint			i;
1153 1154 1155
	ulint			n_nonnull = 0;	/* number of columns
						changed to NOT NULL */
	ulint*			nonnull = NULL;	/* NOT NULL columns */
1156

1157
	trx->op_info = "reading clustered index";
1158

1159
	ut_ad(trx);
1160 1161
	ut_ad(old_table);
	ut_ad(new_table);
1162 1163
	ut_ad(index);
	ut_ad(files);
1164

1165
	/* Create and initialize memory for record buffers */
1166

1167
	merge_buf = mem_alloc(n_index * sizeof *merge_buf);
1168

1169 1170
	for (i = 0; i < n_index; i++) {
		merge_buf[i] = row_merge_buf_create(index[i]);
1171 1172 1173 1174 1175 1176 1177
	}

	mtr_start(&mtr);

	/* Find the clustered index and create a persistent cursor
	based on that. */

1178
	clust_index = dict_table_get_first_index(old_table);
1179 1180 1181 1182

	btr_pcur_open_at_index_side(
		TRUE, clust_index, BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);

1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
	if (UNIV_UNLIKELY(old_table != new_table)) {
		ulint	n_cols = dict_table_get_n_cols(old_table);

		/* A primary key will be created.  Identify the
		columns that were flagged NOT NULL in the new table,
		so that we can quickly check that the records in the
		(old) clustered index do not violate the added NOT
		NULL constraints. */

		ut_a(n_cols == dict_table_get_n_cols(new_table));

		nonnull = mem_alloc(n_cols * sizeof *nonnull);

		for (i = 0; i < n_cols; i++) {
			if (dict_table_get_nth_col(old_table, i)->prtype
			    & DATA_NOT_NULL) {

				continue;
			}

			if (dict_table_get_nth_col(new_table, i)->prtype
			    & DATA_NOT_NULL) {

				nonnull[n_nonnull++] = i;
			}
		}

		if (!n_nonnull) {
			mem_free(nonnull);
			nonnull = NULL;
		}
	}

1216
	row_heap = mem_heap_create(sizeof(mrec_buf_t));
1217

1218
	/* Scan the clustered index. */
1219 1220
	for (;;) {
		const rec_t*	rec;
1221
		ulint*		offsets;
1222
		dtuple_t*	row		= NULL;
1223
		row_ext_t*	ext;
1224
		ibool		has_next	= TRUE;
1225

1226
		btr_pcur_move_to_next_on_page(&pcur);
1227

1228 1229 1230
		/* When switching pages, commit the mini-transaction
		in order to release the latch on the old page. */

1231
		if (btr_pcur_is_after_last_on_page(&pcur)) {
1232 1233 1234 1235 1236 1237
			if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
				i = 0;
				err = DB_INTERRUPTED;
				goto err_exit;
			}

1238 1239 1240 1241 1242
			btr_pcur_store_position(&pcur, &mtr);
			mtr_commit(&mtr);
			mtr_start(&mtr);
			btr_pcur_restore_position(BTR_SEARCH_LEAF,
						  &pcur, &mtr);
1243
			has_next = btr_pcur_move_to_next_user_rec(&pcur, &mtr);
1244
		}
1245

1246 1247
		if (UNIV_LIKELY(has_next)) {
			rec = btr_pcur_get_rec(&pcur);
1248 1249
			offsets = rec_get_offsets(rec, clust_index, NULL,
						  ULINT_UNDEFINED, &row_heap);
1250

1251
			/* Skip delete marked records. */
1252 1253
			if (rec_get_deleted_flag(
				    rec, dict_table_is_comp(old_table))) {
1254 1255
				continue;
			}
1256 1257

			srv_n_rows_inserted++;
1258

1259
			/* Build a row based on the clustered index. */
1260 1261

			row = row_build(ROW_COPY_POINTERS, clust_index,
1262 1263
					rec, offsets,
					new_table, &ext, row_heap);
1264

1265 1266 1267 1268
			if (UNIV_LIKELY_NULL(nonnull)) {
				for (i = 0; i < n_nonnull; i++) {
					dfield_t*	field
						= &row->fields[nonnull[i]];
1269 1270
					dtype_t*	field_type
						= dfield_get_type(field);
1271

1272
					ut_a(!(field_type->prtype
1273 1274 1275 1276
					       & DATA_NOT_NULL));

					if (dfield_is_null(field)) {
						err = DB_PRIMARY_KEY_IS_NULL;
1277 1278
						i = 0;
						goto err_exit;
1279 1280
					}

1281
					field_type->prtype |= DATA_NOT_NULL;
1282 1283
				}
			}
1284 1285
		}

1286 1287 1288
		/* Build all entries for all the indexes to be created
		in a single scan of the clustered index. */

1289 1290 1291
		for (i = 0; i < n_index; i++) {
			row_merge_buf_t*	buf	= merge_buf[i];
			merge_file_t*		file	= &files[i];
1292
			const dict_index_t*	index	= buf->index;
1293

1294
			if (UNIV_LIKELY
1295
			    (row && row_merge_buf_add(buf, row, ext))) {
1296
				file->n_rec++;
1297 1298 1299
				continue;
			}

1300 1301
			/* The buffer must be sufficiently large
			to hold at least one record. */
1302
			ut_ad(buf->n_tuples || !has_next);
1303

1304 1305
			/* We have enough data tuples to form a block.
			Sort them and write to disk. */
1306

1307 1308
			if (buf->n_tuples) {
				if (dict_index_is_unique(index)) {
1309 1310 1311 1312
					row_merge_dup_t	dup;
					dup.index = buf->index;
					dup.table = table;
					dup.n_dup = 0;
1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324

					row_merge_buf_sort(buf, &dup);

					if (dup.n_dup) {
						err = DB_DUPLICATE_KEY;
err_exit:
						trx->error_key_num = i;
						goto func_exit;
					}
				} else {
					row_merge_buf_sort(buf, NULL);
				}
1325
			}
1326

1327
			row_merge_buf_write(buf, file, block);
1328

1329 1330 1331
			if (!row_merge_write(file->fd, file->offset++,
					     block)) {
				err = DB_OUT_OF_FILE_SPACE;
1332
				goto err_exit;
1333
			}
1334

1335
			UNIV_MEM_INVALID(block[0], sizeof block[0]);
1336
			merge_buf[i] = row_merge_buf_empty(buf);
1337

1338 1339 1340 1341
			if (UNIV_LIKELY(row != NULL)) {
				/* Try writing the record again, now
				that the buffer has been written out
				and emptied. */
1342

1343 1344 1345 1346 1347 1348 1349 1350
				if (UNIV_UNLIKELY
				    (!row_merge_buf_add(buf, row, ext))) {
					/* An empty buffer should have enough
					room for at least one record. */
					ut_error;
				}

				file->n_rec++;
1351
			}
1352
		}
1353

1354
		mem_heap_empty(row_heap);
1355

1356 1357 1358 1359
		if (UNIV_UNLIKELY(!has_next)) {
			goto func_exit;
		}
	}
1360

1361 1362 1363 1364
func_exit:
	btr_pcur_close(&pcur);
	mtr_commit(&mtr);
	mem_heap_free(row_heap);
1365

1366 1367 1368 1369
	if (UNIV_LIKELY_NULL(nonnull)) {
		mem_free(nonnull);
	}

1370 1371 1372
	for (i = 0; i < n_index; i++) {
		row_merge_buf_free(merge_buf[i]);
	}
1373

1374
	mem_free(merge_buf);
1375

1376
	trx->op_info = "";
1377

1378 1379
	return(err);
}
1380

1381 1382 1383 1384 1385 1386 1387 1388
/** Write a record via buffer 2 and read the next record to buffer N.
@param N	number of the buffer (0 or 1)
@param AT_END	statement to execute at end of input */
#define ROW_MERGE_WRITE_GET_NEXT(N, AT_END)				\
	do {								\
		b2 = row_merge_write_rec(&block[2], &buf[2], b2,	\
					 of->fd, &of->offset,		\
					 mrec##N, offsets##N);		\
1389
		if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) {	\
1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404
			goto corrupt;					\
		}							\
		b##N = row_merge_read_rec(&block[N], &buf[N],		\
					  b##N, index,			\
					  file->fd, foffs##N,		\
					  &mrec##N, offsets##N);	\
		if (UNIV_UNLIKELY(!b##N)) {				\
			if (mrec##N) {					\
				goto corrupt;				\
			}						\
			AT_END;						\
		}							\
	} while (0)

/*************************************************************//**
1405
Merge two blocks of records on disk and write a bigger block.
1406
@return	DB_SUCCESS or error code */
1407 1408 1409 1410
static
ulint
row_merge_blocks(
/*=============*/
1411
	const dict_index_t*	index,	/*!< in: index being created */
1412
	const merge_file_t*	file,	/*!< in: file containing
1413
					index entries */
1414 1415
	row_merge_block_t*	block,	/*!< in/out: 3 buffers */
	ulint*			foffs0,	/*!< in/out: offset of first
1416
					source list in the file */
1417
	ulint*			foffs1,	/*!< in/out: offset of second
1418
					source list in the file */
1419
	merge_file_t*		of,	/*!< in/out: output file */
1420
	struct TABLE*		table)	/*!< in/out: MySQL table, for
1421 1422
					reporting erroneous key value
					if applicable */
1423
{
1424 1425
	mem_heap_t*	heap;	/*!< memory heap for offsets0, offsets1 */

1426 1427
	mrec_buf_t*	buf;	/*!< buffer for handling
				split mrec in block[] */
1428 1429 1430 1431 1432
	const byte*	b0;	/*!< pointer to block[0] */
	const byte*	b1;	/*!< pointer to block[1] */
	byte*		b2;	/*!< pointer to block[2] */
	const mrec_t*	mrec0;	/*!< merge rec, points to block[0] or buf[0] */
	const mrec_t*	mrec1;	/*!< merge rec, points to block[1] or buf[1] */
1433
	ulint*		offsets0;/* offsets of mrec0 */
1434 1435
	ulint*		offsets1;/* offsets of mrec1 */

1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446
#ifdef UNIV_DEBUG
	if (row_merge_print_block) {
		fprintf(stderr,
			"row_merge_blocks fd=%d ofs=%lu + fd=%d ofs=%lu"
			" = fd=%d ofs=%lu\n",
			file->fd, (ulong) *foffs0,
			file->fd, (ulong) *foffs1,
			of->fd, (ulong) of->offset);
	}
#endif /* UNIV_DEBUG */

1447
	heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
1448 1449 1450 1451

	/* Write a record and read the next record.  Split the output
	file in two halves, which can be merged on the following pass. */

1452 1453
	if (!row_merge_read(file->fd, *foffs0, &block[0])
	    || !row_merge_read(file->fd, *foffs1, &block[1])) {
1454 1455 1456 1457 1458
corrupt:
		mem_heap_free(heap);
		return(DB_CORRUPTION);
	}

1459 1460 1461
	b0 = block[0];
	b1 = block[1];
	b2 = block[2];
1462

1463 1464 1465
	b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
				foffs0, &mrec0, offsets0);
	b1 = row_merge_read_rec(&block[1], &buf[1], b1, index, file->fd,
1466
				foffs1, &mrec1, offsets1);
1467 1468
	if (UNIV_UNLIKELY(!b0 && mrec0)
	    || UNIV_UNLIKELY(!b1 && mrec1)) {
1469

1470 1471 1472
		goto corrupt;
	}

1473
	while (mrec0 && mrec1) {
1474
		ibool	null_eq = FALSE;
1475
		switch (row_merge_cmp(mrec0, mrec1,
1476 1477
				      offsets0, offsets1, index,
				      &null_eq)) {
1478 1479
		case 0:
			if (UNIV_UNLIKELY
1480
			    (dict_index_is_unique(index) && !null_eq)) {
1481 1482
				innobase_rec_to_mysql(table, mrec0,
						      index, offsets0);
1483 1484
				mem_heap_free(heap);
				return(DB_DUPLICATE_KEY);
1485
			}
1486 1487
			/* fall through */
		case -1:
1488
			ROW_MERGE_WRITE_GET_NEXT(0, goto merged);
1489 1490
			break;
		case 1:
1491
			ROW_MERGE_WRITE_GET_NEXT(1, goto merged);
1492 1493 1494
			break;
		default:
			ut_error;
1495
		}
1496

1497 1498
	}

1499
merged:
1500 1501
	if (mrec0) {
		/* append all mrec0 to output */
1502
		for (;;) {
1503
			ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
1504 1505
		}
	}
1506
done0:
1507 1508
	if (mrec1) {
		/* append all mrec1 to output */
1509
		for (;;) {
1510
			ROW_MERGE_WRITE_GET_NEXT(1, goto done1);
1511 1512
		}
	}
1513
done1:
1514

1515
	mem_heap_free(heap);
1516 1517
	b2 = row_merge_write_eof(&block[2], b2, of->fd, &of->offset);
	return(b2 ? DB_SUCCESS : DB_CORRUPTION);
1518
}
1519

1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534
/*************************************************************//**
Copy a block of index entries.
@return	TRUE on success, FALSE on failure */
static __attribute__((nonnull))
ibool
row_merge_blocks_copy(
/*==================*/
	const dict_index_t*	index,	/*!< in: index being created */
	const merge_file_t*	file,	/*!< in: input file */
	row_merge_block_t*	block,	/*!< in/out: 3 buffers */
	ulint*			foffs0,	/*!< in/out: input file offset */
	merge_file_t*		of)	/*!< in/out: output file */
{
	mem_heap_t*	heap;	/*!< memory heap for offsets0, offsets1 */

1535
	mrec_buf_t*	buf;	/*!< buffer for handling
1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552
				split mrec in block[] */
	const byte*	b0;	/*!< pointer to block[0] */
	byte*		b2;	/*!< pointer to block[2] */
	const mrec_t*	mrec0;	/*!< merge rec, points to block[0] */
	ulint*		offsets0;/* offsets of mrec0 */
	ulint*		offsets1;/* dummy offsets */

#ifdef UNIV_DEBUG
	if (row_merge_print_block) {
		fprintf(stderr,
			"row_merge_blocks_copy fd=%d ofs=%lu"
			" = fd=%d ofs=%lu\n",
			file->fd, (ulong) foffs0,
			of->fd, (ulong) of->offset);
	}
#endif /* UNIV_DEBUG */

1553
	heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590

	/* Write a record and read the next record.  Split the output
	file in two halves, which can be merged on the following pass. */

	if (!row_merge_read(file->fd, *foffs0, &block[0])) {
corrupt:
		mem_heap_free(heap);
		return(FALSE);
	}

	b0 = block[0];
	b2 = block[2];

	b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
				foffs0, &mrec0, offsets0);
	if (UNIV_UNLIKELY(!b0 && mrec0)) {

		goto corrupt;
	}

	if (mrec0) {
		/* append all mrec0 to output */
		for (;;) {
			ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
		}
	}
done0:

	/* The file offset points to the beginning of the last page
	that has been read.  Update it to point to the next block. */
	(*foffs0)++;

	mem_heap_free(heap);
	return(row_merge_write_eof(&block[2], b2, of->fd, &of->offset)
	       != NULL);
}

1591
/*************************************************************//**
1592 1593
Merge disk files.
@return	DB_SUCCESS or error code */
1594
static __attribute__((nonnull))
1595 1596 1597
ulint
row_merge(
/*======*/
1598
	trx_t*			trx,	/*!< in: transaction */
1599 1600
	const dict_index_t*	index,	/*!< in: index being created */
	merge_file_t*		file,	/*!< in/out: file containing
1601
					index entries */
1602
	ulint*			half,	/*!< in/out: half the file */
1603 1604
	row_merge_block_t*	block,	/*!< in/out: 3 buffers */
	int*			tmpfd,	/*!< in/out: temporary file handle */
1605
	struct TABLE*		table)	/*!< in/out: MySQL table, for
1606 1607
					reporting erroneous key value
					if applicable */
1608
{
1609 1610 1611 1612
	ulint		foffs0;	/*!< first input offset */
	ulint		foffs1;	/*!< second input offset */
	ulint		error;	/*!< error code */
	merge_file_t	of;	/*!< output file */
1613 1614 1615
	const ulint	ihalf	= *half;
				/*!< half the input file */
	ulint		ohalf;	/*!< half the output file */
1616

1617
	UNIV_MEM_ASSERT_W(block[0], 3 * sizeof block[0]);
1618
	ut_ad(ihalf < file->offset);
1619

1620 1621
	of.fd = *tmpfd;
	of.offset = 0;
1622
	of.n_rec = 0;
1623

vasil's avatar
vasil committed
1624
#ifdef POSIX_FADV_SEQUENTIAL
1625 1626 1627 1628 1629
	/* The input file will be read sequentially, starting from the
	beginning and the middle.  In Linux, the POSIX_FADV_SEQUENTIAL
	affects the entire file.  Each block will be read exactly once. */
	posix_fadvise(file->fd, 0, 0,
		      POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE);
vasil's avatar
vasil committed
1630
#endif /* POSIX_FADV_SEQUENTIAL */
1631

1632
	/* Merge blocks to the output file. */
1633
	ohalf = 0;
1634
	foffs0 = 0;
1635 1636 1637 1638
	foffs1 = ihalf;

	for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
		ulint	ahalf;	/*!< arithmetic half the input file */
1639

1640 1641 1642 1643
		if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
			return(DB_INTERRUPTED);
		}

1644
		error = row_merge_blocks(index, file, block,
1645
					 &foffs0, &foffs1, &of, table);
1646

1647 1648
		if (error != DB_SUCCESS) {
			return(error);
1649
		}
1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665

		/* Record the offset of the output file when
		approximately half the output has been generated.  In
		this way, the next invocation of row_merge() will
		spend most of the time in this loop.  The initial
		estimate is ohalf==0. */
		ahalf = file->offset / 2;
		ut_ad(ohalf <= of.offset);

		/* Improve the estimate until reaching half the input
		file size, or we can not get any closer to it.  All
		comparands should be non-negative when !(ohalf < ahalf)
		because ohalf <= of.offset. */
		if (ohalf < ahalf || of.offset - ahalf < ohalf - ahalf) {
			ohalf = of.offset;
		}
1666
	}
1667

1668 1669 1670
	/* Copy the last blocks, if there are any. */

	while (foffs0 < ihalf) {
1671 1672 1673 1674
		if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
			return(DB_INTERRUPTED);
		}

1675
		if (!row_merge_blocks_copy(index, file, block, &foffs0, &of)) {
1676 1677 1678
			return(DB_CORRUPTION);
		}
	}
1679 1680 1681

	ut_ad(foffs0 == ihalf);

1682
	while (foffs1 < file->offset) {
1683 1684 1685 1686
		if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
			return(DB_INTERRUPTED);
		}

1687
		if (!row_merge_blocks_copy(index, file, block, &foffs1, &of)) {
1688
			return(DB_CORRUPTION);
1689
		}
1690 1691
	}

1692 1693 1694 1695 1696 1697
	ut_ad(foffs1 == file->offset);

	if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
		return(DB_CORRUPTION);
	}

1698 1699 1700
	/* Swap file descriptors for the next pass. */
	*tmpfd = file->fd;
	*file = of;
1701
	*half = ohalf;
1702

1703 1704
	UNIV_MEM_INVALID(block[0], 3 * sizeof block[0]);

1705 1706
	return(DB_SUCCESS);
}
1707

1708
/*************************************************************//**
1709 1710
Merge disk files.
@return	DB_SUCCESS or error code */
1711 1712 1713 1714
static
ulint
row_merge_sort(
/*===========*/
1715
	trx_t*			trx,	/*!< in: transaction */
1716 1717
	const dict_index_t*	index,	/*!< in: index being created */
	merge_file_t*		file,	/*!< in/out: file containing
1718
					index entries */
1719 1720
	row_merge_block_t*	block,	/*!< in/out: 3 buffers */
	int*			tmpfd,	/*!< in/out: temporary file handle */
1721
	struct TABLE*		table)	/*!< in/out: MySQL table, for
1722 1723
					reporting erroneous key value
					if applicable */
1724
{
1725 1726 1727 1728 1729
	ulint	half = file->offset / 2;

	/* The file should always contain at least one byte (the end
	of file marker).  Thus, it must be at least one block. */
	ut_ad(file->offset > 0);
1730

1731
	do {
1732 1733
		ulint	error;

1734 1735
		error = row_merge(trx, index, file, &half,
				  block, tmpfd, table);
1736

1737 1738 1739
		if (error != DB_SUCCESS) {
			return(error);
		}
1740 1741 1742 1743 1744

		/* half > 0 should hold except when the file consists
		of one block.  No need to merge further then. */
		ut_ad(half > 0 || file->offset == 1);
	} while (half < file->offset && half > 0);
1745

1746
	return(DB_SUCCESS);
1747 1748
}

1749
/*************************************************************//**
1750 1751 1752 1753 1754
Copy externally stored columns to the data tuple. */
static
void
row_merge_copy_blobs(
/*=================*/
1755 1756 1757 1758 1759
	const mrec_t*	mrec,	/*!< in: merge record */
	const ulint*	offsets,/*!< in: offsets of mrec */
	ulint		zip_size,/*!< in: compressed page size in bytes, or 0 */
	dtuple_t*	tuple,	/*!< in/out: data tuple */
	mem_heap_t*	heap)	/*!< in/out: memory heap */
1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774
{
	ulint	i;
	ulint	n_fields = dtuple_get_n_fields(tuple);

	for (i = 0; i < n_fields; i++) {
		ulint		len;
		const void*	data;
		dfield_t*	field = dtuple_get_nth_field(tuple, i);

		if (!dfield_is_ext(field)) {
			continue;
		}

		ut_ad(!dfield_is_null(field));

1775 1776 1777 1778 1779
		/* The table is locked during index creation.
		Therefore, externally stored columns cannot possibly
		be freed between the time the BLOB pointers are read
		(row_merge_read_clustered_index()) and dereferenced
		(below). */
1780 1781
		data = btr_rec_copy_externally_stored_field(
			mrec, offsets, zip_size, i, &len, heap);
1782 1783 1784 1785 1786
		/* Because we have locked the table, any records
		written by incomplete transactions must have been
		rolled back already. There must not be any incomplete
		BLOB columns. */
		ut_a(data);
1787 1788 1789 1790 1791

		dfield_set_data(field, data, len);
	}
}

1792
/********************************************************************//**
1793
Read sorted file containing index data tuples and insert these data
1794 1795
tuples to the index
@return	DB_SUCCESS or error number */
1796
static
1797 1798 1799
ulint
row_merge_insert_index_tuples(
/*==========================*/
1800 1801 1802 1803
	trx_t*			trx,	/*!< in: transaction */
	dict_index_t*		index,	/*!< in: index */
	dict_table_t*		table,	/*!< in: new table */
	ulint			zip_size,/*!< in: compressed page size of
1804
					 the old table, or 0 if uncompressed */
1805 1806
	int			fd,	/*!< in: file descriptor */
	row_merge_block_t*	block)	/*!< in/out: file buffer */
1807
{
1808 1809 1810 1811 1812 1813 1814 1815
	const byte*		b;
	que_thr_t*		thr;
	ins_node_t*		node;
	mem_heap_t*		tuple_heap;
	mem_heap_t*		graph_heap;
	ulint			error = DB_SUCCESS;
	ulint			foffs = 0;
	ulint*			offsets;
1816

1817 1818 1819
	ut_ad(trx);
	ut_ad(index);
	ut_ad(table);
1820 1821 1822 1823 1824 1825

	/* We use the insert query graph as the dummy graph
	needed in the row module call */

	trx->op_info = "inserting index entries";

1826
	graph_heap = mem_heap_create(500 + sizeof(mrec_buf_t));
1827 1828 1829 1830 1831 1832
	node = ins_node_create(INS_DIRECT, table, graph_heap);

	thr = pars_complete_graph_for_exec(node, trx, graph_heap);

	que_thr_move_to_run_state_for_mysql(thr, trx);

1833
	tuple_heap = mem_heap_create(1000);
1834

1835
	{
1836
		ulint i	= 1 + REC_OFFS_HEADER_SIZE
1837 1838 1839 1840 1841
			+ dict_index_get_n_fields(index);
		offsets = mem_heap_alloc(graph_heap, i * sizeof *offsets);
		offsets[0] = i;
		offsets[1] = dict_index_get_n_fields(index);
	}
1842

1843
	b = *block;
1844

1845 1846 1847
	if (!row_merge_read(fd, foffs, block)) {
		error = DB_CORRUPTION;
	} else {
1848 1849
		mrec_buf_t*	buf = mem_heap_alloc(graph_heap, sizeof *buf);

1850 1851 1852
		for (;;) {
			const mrec_t*	mrec;
			dtuple_t*	dtuple;
1853
			ulint		n_ext;
1854

1855
			b = row_merge_read_rec(block, buf, b, index,
1856 1857 1858 1859 1860 1861 1862 1863
					       fd, &foffs, &mrec, offsets);
			if (UNIV_UNLIKELY(!b)) {
				/* End of list, or I/O error */
				if (mrec) {
					error = DB_CORRUPTION;
				}
				break;
			}
1864

1865
			dtuple = row_rec_to_index_entry_low(
1866 1867 1868 1869 1870 1871
				mrec, index, offsets, &n_ext, tuple_heap);

			if (UNIV_UNLIKELY(n_ext)) {
				row_merge_copy_blobs(mrec, offsets, zip_size,
						     dtuple, tuple_heap);
			}
1872

1873 1874 1875
			node->row = dtuple;
			node->table = table;
			node->trx_id = trx->id;
1876

1877
			ut_ad(dtuple_validate(dtuple));
1878

1879 1880 1881
			do {
				thr->run_node = thr;
				thr->prev_node = thr->common.parent;
1882

1883 1884
				error = row_ins_index_entry(index, dtuple,
							    0, FALSE, thr);
1885

1886
				if (UNIV_LIKELY(error == DB_SUCCESS)) {
1887

1888 1889
					goto next_rec;
				}
1890

1891 1892 1893 1894 1895 1896
				thr->lock_state = QUE_THR_LOCK_ROW;
				trx->error_state = error;
				que_thr_stop_for_mysql(thr);
				thr->lock_state = QUE_THR_LOCK_NOLOCK;
			} while (row_mysql_handle_errors(&error, trx,
							 thr, NULL));
1897

1898
			goto err_exit;
1899
next_rec:
1900
			mem_heap_empty(tuple_heap);
1901
		}
1902
	}
1903 1904

	que_thr_stop_for_mysql_no_error(thr, trx);
1905
err_exit:
1906 1907 1908 1909
	que_graph_free(thr->graph);

	trx->op_info = "";

1910
	mem_heap_free(tuple_heap);
1911 1912 1913 1914

	return(error);
}

1915
/*********************************************************************//**
1916 1917
Sets an exclusive lock on a table, for the duration of creating indexes.
@return	error code or DB_SUCCESS */
1918
UNIV_INTERN
1919 1920 1921
ulint
row_merge_lock_table(
/*=================*/
1922 1923 1924
	trx_t*		trx,		/*!< in/out: transaction */
	dict_table_t*	table,		/*!< in: table to lock */
	enum lock_mode	mode)		/*!< in: LOCK_X or LOCK_S */
1925 1926 1927 1928 1929 1930 1931 1932
{
	mem_heap_t*	heap;
	que_thr_t*	thr;
	ulint		err;
	sel_node_t*	node;

	ut_ad(trx);
	ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
1933
	ut_ad(mode == LOCK_X || mode == LOCK_S);
1934 1935 1936

	heap = mem_heap_create(512);

1937
	trx->op_info = "setting table lock for creating or dropping index";
1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952

	node = sel_node_create(heap);
	thr = pars_complete_graph_for_exec(node, trx, heap);
	thr->graph->state = QUE_FORK_ACTIVE;

	/* We use the select query graph as the dummy graph needed
	in the lock module call */

	thr = que_fork_get_first_thr(que_node_get_parent(thr));
	que_thr_move_to_run_state_for_mysql(thr, trx);

run_again:
	thr->run_node = thr;
	thr->prev_node = thr->common.parent;

1953
	err = lock_table(0, table, mode, thr);
1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993

	trx->error_state = err;

	if (UNIV_LIKELY(err == DB_SUCCESS)) {
		que_thr_stop_for_mysql_no_error(thr, trx);
	} else {
		que_thr_stop_for_mysql(thr);

		if (err != DB_QUE_THR_SUSPENDED) {
			ibool	was_lock_wait;

			was_lock_wait = row_mysql_handle_errors(
				&err, trx, thr, NULL);

			if (was_lock_wait) {
				goto run_again;
			}
		} else {
			que_thr_t*	run_thr;
			que_node_t*	parent;

			parent = que_node_get_parent(thr);
			run_thr = que_fork_start_command(parent);

			ut_a(run_thr == thr);

			/* There was a lock wait but the thread was not
			in a ready to run or running state. */
			trx->error_state = DB_LOCK_WAIT;

			goto run_again;
		}
	}

	que_graph_free(thr->graph);
	trx->op_info = "";

	return(err);
}

1994
/*********************************************************************//**
1995 1996 1997
Drop an index from the InnoDB system tables.  The data dictionary must
have been locked exclusively by the caller, because the transaction
will not be committed. */
1998
UNIV_INTERN
marko's avatar
marko committed
1999 2000 2001
void
row_merge_drop_index(
/*=================*/
2002 2003 2004
	dict_index_t*	index,	/*!< in: index to be removed */
	dict_table_t*	table,	/*!< in: table */
	trx_t*		trx)	/*!< in: transaction handle */
2005 2006
{
	ulint		err;
2007
	pars_info_t*	info = pars_info_create();
2008 2009 2010 2011 2012 2013 2014 2015 2016

	/* We use the private SQL parser of Innobase to generate the
	query graphs needed in deleting the dictionary data from system
	tables in Innobase. Deleting a row from SYS_INDEXES table also
	frees the file segments of the B-tree associated with the index. */

	static const char str1[] =
		"PROCEDURE DROP_INDEX_PROC () IS\n"
		"BEGIN\n"
2017 2018 2019 2020 2021 2022 2023
		/* Rename the index, so that it will be dropped by
		row_merge_drop_temp_indexes() at crash recovery
		if the server crashes before this trx is committed. */
		"UPDATE SYS_INDEXES SET NAME=CONCAT('"
		TEMP_INDEX_PREFIX_STR "', NAME) WHERE ID = :indexid;\n"
		"COMMIT WORK;\n"
		/* Drop the field definitions of the index. */
2024
		"DELETE FROM SYS_FIELDS WHERE INDEX_ID = :indexid;\n"
2025
		/* Drop the index definition and the B-tree. */
2026
		"DELETE FROM SYS_INDEXES WHERE ID = :indexid;\n"
2027 2028 2029 2030
		"END;\n";

	ut_ad(index && table && trx);

2031
	pars_info_add_ull_literal(info, "indexid", index->id);
2032

2033 2034 2035
	trx_start_if_not_started(trx);
	trx->op_info = "dropping index";

2036
	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2037

2038
	err = que_eval_sql(info, str1, FALSE, trx);
2039

2040
	ut_a(err == DB_SUCCESS);
2041

2042 2043
	/* Replace this index with another equivalent index for all
	foreign key constraints on this table where this index is used */
2044

2045 2046
	dict_table_replace_index_in_foreign_list(table, index);
	dict_index_remove_from_cache(table, index);
2047 2048

	trx->op_info = "";
marko's avatar
marko committed
2049
}
2050

2051
/*********************************************************************//**
2052 2053 2054 2055
Drop those indexes which were created before an error occurred when
building an index.  The data dictionary must have been locked
exclusively by the caller, because the transaction will not be
committed. */
2056
UNIV_INTERN
marko's avatar
marko committed
2057 2058 2059
void
row_merge_drop_indexes(
/*===================*/
2060 2061 2062 2063
	trx_t*		trx,		/*!< in: transaction */
	dict_table_t*	table,		/*!< in: table containing the indexes */
	dict_index_t**	index,		/*!< in: indexes to drop */
	ulint		num_created)	/*!< in: number of elements in index[] */
marko's avatar
marko committed
2064 2065 2066 2067 2068 2069
{
	ulint	key_num;

	for (key_num = 0; key_num < num_created; key_num++) {
		row_merge_drop_index(index[key_num], table, trx);
	}
2070 2071
}

2072
/*********************************************************************//**
2073
Drop all partially created indexes during crash recovery. */
2074
UNIV_INTERN
2075 2076 2077 2078 2079
void
row_merge_drop_temp_indexes(void)
/*=============================*/
{
	trx_t*		trx;
2080 2081
	btr_pcur_t	pcur;
	mtr_t		mtr;
2082

2083 2084 2085
	/* Load the table definitions that contain partially defined
	indexes, so that the data dictionary information can be checked
	when accessing the tablename.ibd files. */
2086 2087 2088 2089
	trx = trx_allocate_for_background();
	trx->op_info = "dropping partially created indexes";
	row_mysql_lock_data_dictionary(trx);

2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100
	mtr_start(&mtr);

	btr_pcur_open_at_index_side(
		TRUE,
		dict_table_get_first_index(dict_sys->sys_indexes),
		BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);

	for (;;) {
		const rec_t*	rec;
		const byte*	field;
		ulint		len;
2101
		table_id_t	table_id;
2102
		dict_table_t*	table;
2103

2104 2105 2106 2107 2108 2109 2110 2111 2112 2113
		btr_pcur_move_to_next_user_rec(&pcur, &mtr);

		if (!btr_pcur_is_on_user_rec(&pcur)) {
			break;
		}

		rec = btr_pcur_get_rec(&pcur);
		field = rec_get_nth_field_old(rec, DICT_SYS_INDEXES_NAME_FIELD,
					      &len);
		if (len == UNIV_SQL_NULL || len == 0
Vasil Dimov's avatar
Vasil Dimov committed
2114
		    || (char) *field != TEMP_INDEX_PREFIX) {
2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130
			continue;
		}

		/* This is a temporary index. */

		field = rec_get_nth_field_old(rec, 0/*TABLE_ID*/, &len);
		if (len != 8) {
			/* Corrupted TABLE_ID */
			continue;
		}

		table_id = mach_read_from_8(field);

		btr_pcur_store_position(&pcur, &mtr);
		btr_pcur_commit_specify_mtr(&pcur, &mtr);

Marko Makela's avatar
Marko Makela committed
2131
		table = dict_table_get_on_id_low(table_id);
2132 2133 2134

		if (table) {
			dict_index_t*	index;
2135
			dict_index_t*	next_index;
2136 2137

			for (index = dict_table_get_first_index(table);
2138 2139 2140
			     index; index = next_index) {

				next_index = dict_table_get_next_index(index);
2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155

				if (*index->name == TEMP_INDEX_PREFIX) {
					row_merge_drop_index(index, table, trx);
					trx_commit_for_mysql(trx);
				}
			}
		}

		mtr_start(&mtr);
		btr_pcur_restore_position(BTR_SEARCH_LEAF,
					  &pcur, &mtr);
	}

	btr_pcur_close(&pcur);
	mtr_commit(&mtr);
2156 2157 2158 2159
	row_mysql_unlock_data_dictionary(trx);
	trx_free_for_background(trx);
}

2160
/*********************************************************************//**
2161 2162
Create a merge file. */
static
2163 2164 2165
void
row_merge_file_create(
/*==================*/
2166
	merge_file_t*	merge_file)	/*!< out: merge file structure */
2167
{
2168 2169 2170 2171
#ifdef UNIV_PFS_IO
	/* This temp file open does not go through normal
	file APIs, add instrumentation to register with
	performance schema */
2172 2173
	struct PSI_file_locker*	locker = NULL;
	PSI_file_locker_state	state;
2174
	register_pfs_file_open_begin(&state, locker, innodb_file_temp_key,
2175 2176 2177 2178
				     PSI_FILE_OPEN,
				     "Innodb Merge Temp File",
				     __FILE__, __LINE__);
#endif
2179
	merge_file->fd = innobase_mysql_tmpfile();
2180
	merge_file->offset = 0;
2181
	merge_file->n_rec = 0;
2182 2183 2184
#ifdef UNIV_PFS_IO
        register_pfs_file_open_end(locker, merge_file->fd);
#endif
2185 2186
}

2187
/*********************************************************************//**
2188 2189 2190 2191 2192
Destroy a merge file. */
static
void
row_merge_file_destroy(
/*===================*/
2193
	merge_file_t*	merge_file)	/*!< out: merge file structure */
2194
{
2195
#ifdef UNIV_PFS_IO
2196 2197
	struct PSI_file_locker*	locker = NULL;
	PSI_file_locker_state	state;
2198
	register_pfs_file_io_begin(&state, locker, merge_file->fd, 0, PSI_FILE_CLOSE,
2199 2200
				   __FILE__, __LINE__);
#endif
2201 2202 2203 2204
	if (merge_file->fd != -1) {
		close(merge_file->fd);
		merge_file->fd = -1;
	}
2205 2206 2207 2208

#ifdef UNIV_PFS_IO
	register_pfs_file_io_end(locker, 0);
#endif
2209 2210
}

2211
/*********************************************************************//**
2212
Determine the precise type of a column that is added to a tem
2213 2214
if a column must be constrained NOT NULL.
@return	col->prtype, possibly ORed with DATA_NOT_NULL */
2215 2216 2217 2218
UNIV_INLINE
ulint
row_merge_col_prtype(
/*=================*/
2219 2220 2221
	const dict_col_t*	col,		/*!< in: column */
	const char*		col_name,	/*!< in: name of the column */
	const merge_index_def_t*index_def)	/*!< in: the index definition
2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245
						of the primary key */
{
	ulint	prtype = col->prtype;
	ulint	i;

	ut_ad(index_def->ind_type & DICT_CLUSTERED);

	if (prtype & DATA_NOT_NULL) {

		return(prtype);
	}

	/* All columns that are included
	in the PRIMARY KEY must be NOT NULL. */

	for (i = 0; i < index_def->n_fields; i++) {
		if (!strcmp(col_name, index_def->fields[i].field_name)) {
			return(prtype | DATA_NOT_NULL);
		}
	}

	return(prtype);
}

2246
/*********************************************************************//**
2247
Create a temporary table for creating a primary key, using the definition
2248 2249
of an existing table.
@return	table, or NULL on error */
2250
UNIV_INTERN
2251 2252 2253
dict_table_t*
row_merge_create_temporary_table(
/*=============================*/
2254 2255
	const char*		table_name,	/*!< in: new table name */
	const merge_index_def_t*index_def,	/*!< in: the index definition
2256
						of the primary key */
2257 2258
	const dict_table_t*	table,		/*!< in: old table definition */
	trx_t*			trx)		/*!< in/out: transaction
2259
						(sets error_state) */
2260 2261 2262 2263
{
	ulint		i;
	dict_table_t*	new_table = NULL;
	ulint		n_cols = dict_table_get_n_user_cols(table);
2264
	ulint		error;
2265
	mem_heap_t*	heap = mem_heap_create(1000);
2266

2267
	ut_ad(table_name);
2268
	ut_ad(index_def);
2269
	ut_ad(table);
2270 2271
	ut_ad(mutex_own(&dict_sys->mutex));

2272
	new_table = dict_mem_table_create(table_name, 0, n_cols, table->flags);
2273

2274 2275 2276
	for (i = 0; i < n_cols; i++) {
		const dict_col_t*	col;
		const char*		col_name;
2277

2278 2279
		col = dict_table_get_nth_col(table, i);
		col_name = dict_table_get_col_name(table, i);
2280

2281 2282 2283 2284
		dict_mem_table_add_col(new_table, heap, col_name, col->mtype,
				       row_merge_col_prtype(col, col_name,
							    index_def),
				       col->len);
2285 2286
	}

2287 2288 2289
	error = row_create_table_for_mysql(new_table, trx);
	mem_heap_free(heap);

2290 2291
	if (error != DB_SUCCESS) {
		trx->error_state = error;
2292
		new_table = NULL;
2293 2294 2295 2296 2297
	}

	return(new_table);
}

2298
/*********************************************************************//**
2299 2300
Rename the temporary indexes in the dictionary to permanent ones.  The
data dictionary must have been locked exclusively by the caller,
2301 2302
because the transaction will not be committed.
@return	DB_SUCCESS if all OK */
2303
UNIV_INTERN
2304
ulint
2305 2306
row_merge_rename_indexes(
/*=====================*/
2307 2308
	trx_t*		trx,		/*!< in/out: transaction */
	dict_table_t*	table)		/*!< in/out: table with new indexes */
2309 2310
{
	ulint		err = DB_SUCCESS;
2311
	pars_info_t*	info = pars_info_create();
2312 2313

	/* We use the private SQL parser of Innobase to generate the
2314
	query graphs needed in renaming indexes. */
2315

2316 2317
	static const char rename_indexes[] =
		"PROCEDURE RENAME_INDEXES_PROC () IS\n"
2318
		"BEGIN\n"
2319
		"UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
2320 2321
		"WHERE TABLE_ID = :tableid AND SUBSTR(NAME,0,1)='"
		TEMP_INDEX_PREFIX_STR "';\n"
2322 2323
		"END;\n";

2324 2325 2326
	ut_ad(table);
	ut_ad(trx);
	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2327

2328
	trx->op_info = "renaming indexes";
2329

2330
	pars_info_add_ull_literal(info, "tableid", table->id);
2331

2332
	err = que_eval_sql(info, rename_indexes, FALSE, trx);
2333 2334

	if (err == DB_SUCCESS) {
2335 2336 2337 2338 2339 2340 2341
		dict_index_t*	index = dict_table_get_first_index(table);
		do {
			if (*index->name == TEMP_INDEX_PREFIX) {
				index->name++;
			}
			index = dict_table_get_next_index(index);
		} while (index);
2342 2343 2344 2345 2346 2347 2348
	}

	trx->op_info = "";

	return(err);
}

2349
/*********************************************************************//**
2350 2351
Rename the tables in the data dictionary.  The data dictionary must
have been locked exclusively by the caller, because the transaction
2352 2353
will not be committed.
@return	error code or DB_SUCCESS */
2354
UNIV_INTERN
2355 2356 2357
ulint
row_merge_rename_tables(
/*====================*/
2358
	dict_table_t*	old_table,	/*!< in/out: old table, renamed to
2359
					tmp_name */
2360
	dict_table_t*	new_table,	/*!< in/out: new table, renamed to
2361
					old_table->name */
2362 2363
	const char*	tmp_name,	/*!< in: new name for old_table */
	trx_t*		trx)		/*!< in: transaction handle */
2364 2365 2366
{
	ulint		err	= DB_ERROR;
	pars_info_t*	info;
2367
	char		old_name[MAX_TABLE_NAME_LEN + 1];
2368 2369 2370

	ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
	ut_ad(old_table != new_table);
2371
	ut_ad(mutex_own(&dict_sys->mutex));
2372

2373 2374
	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);

2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385
	/* store the old/current name to an automatic variable */
	if (strlen(old_table->name) + 1 <= sizeof(old_name)) {
		memcpy(old_name, old_table->name, strlen(old_table->name) + 1);
	} else {
		ut_print_timestamp(stderr);
		fprintf(stderr, "InnoDB: too long table name: '%s', "
			"max length is %d\n", old_table->name,
			MAX_TABLE_NAME_LEN);
		ut_error;
	}

2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420
	trx->op_info = "renaming tables";

	/* We use the private SQL parser of Innobase to generate the query
	graphs needed in updating the dictionary data in system tables. */

	info = pars_info_create();

	pars_info_add_str_literal(info, "new_name", new_table->name);
	pars_info_add_str_literal(info, "old_name", old_name);
	pars_info_add_str_literal(info, "tmp_name", tmp_name);

	err = que_eval_sql(info,
			   "PROCEDURE RENAME_TABLES () IS\n"
			   "BEGIN\n"
			   "UPDATE SYS_TABLES SET NAME = :tmp_name\n"
			   " WHERE NAME = :old_name;\n"
			   "UPDATE SYS_TABLES SET NAME = :old_name\n"
			   " WHERE NAME = :new_name;\n"
			   "END;\n", FALSE, trx);

	if (err != DB_SUCCESS) {

		goto err_exit;
	}

	/* The following calls will also rename the .ibd data files if
	the tables are stored in a single-table tablespace */

	if (!dict_table_rename_in_cache(old_table, tmp_name, FALSE)
	    || !dict_table_rename_in_cache(new_table, old_name, FALSE)) {

		err = DB_ERROR;
		goto err_exit;
	}

2421
	err = dict_load_foreigns(old_name, FALSE, TRUE);
2422 2423 2424 2425

	if (err != DB_SUCCESS) {
err_exit:
		trx->error_state = DB_SUCCESS;
2426
		trx_general_rollback_for_mysql(trx, NULL);
2427 2428 2429 2430 2431 2432 2433 2434
		trx->error_state = DB_SUCCESS;
	}

	trx->op_info = "";

	return(err);
}

2435
/*********************************************************************//**
2436 2437
Create and execute a query graph for creating an index.
@return	DB_SUCCESS or error code */
marko's avatar
marko committed
2438 2439 2440 2441
static
ulint
row_merge_create_index_graph(
/*=========================*/
2442 2443 2444
	trx_t*		trx,		/*!< in: trx */
	dict_table_t*	table,		/*!< in: table */
	dict_index_t*	index)		/*!< in: index */
marko's avatar
marko committed
2445
{
2446 2447 2448
	ind_node_t*	node;		/*!< Index creation node */
	mem_heap_t*	heap;		/*!< Memory heap */
	que_thr_t*	thr;		/*!< Query thread */
marko's avatar
marko committed
2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471
	ulint		err;

	ut_ad(trx);
	ut_ad(table);
	ut_ad(index);

	heap = mem_heap_create(512);

	index->table = table;
	node = ind_create_graph_create(index, heap);
	thr = pars_complete_graph_for_exec(node, trx, heap);

	ut_a(thr == que_fork_start_command(que_node_get_parent(thr)));

	que_run_threads(thr);

	err = trx->error_state;

	que_graph_free((que_t*) que_node_get_parent(thr));

	return(err);
}

2472
/*********************************************************************//**
2473 2474
Create the index and load in to the dictionary.
@return	index, or NULL on error */
2475
UNIV_INTERN
2476
dict_index_t*
2477 2478
row_merge_create_index(
/*===================*/
2479 2480
	trx_t*			trx,	/*!< in/out: trx (sets error_state) */
	dict_table_t*		table,	/*!< in: the index is on this table */
2481
	const merge_index_def_t*index_def)
2482
					/*!< in: the index definition */
2483
{
2484
	dict_index_t*	index;
2485
	ulint		err;
2486
	ulint		n_fields = index_def->n_fields;
2487
	ulint		i;
2488 2489 2490

	/* Create the index prototype, using the passed in def, this is not
	a persistent operation. We pass 0 as the space id, and determine at
2491
	a lower level the space id where to store the table. */
2492

2493 2494
	index = dict_mem_index_create(table->name, index_def->name,
				      0, index_def->ind_type, n_fields);
2495

2496
	ut_a(index);
2497

2498 2499
	for (i = 0; i < n_fields; i++) {
		merge_index_field_t*	ifield = &index_def->fields[i];
2500

2501 2502 2503
		dict_mem_index_add_field(index, ifield->field_name,
					 ifield->prefix_len);
	}
2504

2505
	/* Add the index to SYS_INDEXES, using the index prototype. */
marko's avatar
marko committed
2506
	err = row_merge_create_index_graph(trx, table, index);
2507

2508
	if (err == DB_SUCCESS) {
2509

2510 2511
		index = row_merge_dict_table_get_index(
			table, index_def);
2512

2513
		ut_a(index);
2514

2515 2516 2517
		/* Note the id of the transaction that created this
		index, we use it to restrict readers from accessing
		this index, to ensure read consistency. */
2518
		index->trx_id = trx->id;
2519 2520
	} else {
		index = NULL;
2521 2522 2523
	}

	return(index);
2524 2525
}

2526
/*********************************************************************//**
2527
Check if a transaction can use an index. */
2528
UNIV_INTERN
2529 2530 2531
ibool
row_merge_is_index_usable(
/*======================*/
2532 2533
	const trx_t*		trx,	/*!< in: transaction */
	const dict_index_t*	index)	/*!< in: index to check */
2534
{
2535 2536
	return(!trx->read_view
	       || read_view_sees_trx_id(trx->read_view, index->trx_id));
2537 2538
}

2539
/*********************************************************************//**
2540 2541
Drop the old table.
@return	DB_SUCCESS or error code */
2542
UNIV_INTERN
2543 2544 2545
ulint
row_merge_drop_table(
/*=================*/
2546 2547
	trx_t*		trx,		/*!< in: transaction */
	dict_table_t*	table)		/*!< in: table to drop */
2548
{
2549 2550 2551
	/* There must be no open transactions on the table. */
	ut_a(table->n_mysql_handles_opened == 0);

2552
	return(row_drop_table_for_mysql(table->name, trx, FALSE));
2553
}
2554

2555
/*********************************************************************//**
2556 2557
Build indexes on a table by reading a clustered index,
creating a temporary file containing index entries, merge sorting
2558 2559
these index entries and inserting sorted index entries to indexes.
@return	DB_SUCCESS or error code */
2560
UNIV_INTERN
2561 2562 2563
ulint
row_merge_build_indexes(
/*====================*/
2564 2565
	trx_t*		trx,		/*!< in: transaction */
	dict_table_t*	old_table,	/*!< in: table where rows are
2566
					read from */
2567
	dict_table_t*	new_table,	/*!< in: table where indexes are
marko's avatar
marko committed
2568 2569
					created; identical to old_table
					unless creating a PRIMARY KEY */
2570 2571
	dict_index_t**	indexes,	/*!< in: indexes to be created */
	ulint		n_indexes,	/*!< in: size of indexes[] */
2572
	struct TABLE*	table)		/*!< in/out: MySQL table, for
2573 2574
					reporting erroneous key value
					if applicable */
2575 2576
{
	merge_file_t*		merge_files;
2577 2578
	row_merge_block_t*	block;
	ulint			block_size;
2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594
	ulint			i;
	ulint			error;
	int			tmpfd;

	ut_ad(trx);
	ut_ad(old_table);
	ut_ad(new_table);
	ut_ad(indexes);
	ut_ad(n_indexes);

	trx_start_if_not_started(trx);

	/* Allocate memory for merge file data structure and initialize
	fields */

	merge_files = mem_alloc(n_indexes * sizeof *merge_files);
2595 2596
	block_size = 3 * sizeof *block;
	block = os_mem_alloc_large(&block_size);
2597 2598 2599 2600 2601 2602 2603 2604

	for (i = 0; i < n_indexes; i++) {

		row_merge_file_create(&merge_files[i]);
	}

	tmpfd = innobase_mysql_tmpfile();

2605 2606 2607 2608
	/* Reset the MySQL row buffer that is used when reporting
	duplicate keys. */
	innobase_rec_reset(table);

2609 2610 2611 2612
	/* Read clustered index of the table and create files for
	secondary index entries for merge sort */

	error = row_merge_read_clustered_index(
2613
		trx, table, old_table, new_table, indexes,
2614
		merge_files, n_indexes, block);
2615 2616 2617 2618 2619 2620 2621 2622 2623 2624

	if (error != DB_SUCCESS) {

		goto func_exit;
	}

	/* Now we have files containing index entries ready for
	sorting and inserting. */

	for (i = 0; i < n_indexes; i++) {
2625
		error = row_merge_sort(trx, indexes[i], &merge_files[i],
2626
				       block, &tmpfd, table);
2627 2628 2629 2630

		if (error == DB_SUCCESS) {
			error = row_merge_insert_index_tuples(
				trx, indexes[i], new_table,
2631
				dict_table_zip_size(old_table),
2632
				merge_files[i].fd, block);
2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651
		}

		/* Close the temporary file to free up space. */
		row_merge_file_destroy(&merge_files[i]);

		if (error != DB_SUCCESS) {
			trx->error_key_num = i;
			goto func_exit;
		}
	}

func_exit:
	close(tmpfd);

	for (i = 0; i < n_indexes; i++) {
		row_merge_file_destroy(&merge_files[i]);
	}

	mem_free(merge_files);
2652
	os_mem_free_large(block, block_size);
2653 2654 2655

	return(error);
}