trx0purge.c 27.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/******************************************************
Purge old versions

(c) 1996 Innobase Oy

Created 3/26/1996 Heikki Tuuri
*******************************************************/

#include "trx0purge.h"

#ifdef UNIV_NONINL
#include "trx0purge.ic"
#endif

#include "fsp0fsp.h"
#include "mach0data.h"
#include "trx0rseg.h"
#include "trx0trx.h"
#include "trx0roll.h"
#include "read0read.h"
#include "fut0fut.h"
#include "que0que.h"
#include "row0purge.h"
#include "row0upd.h"
#include "trx0rec.h"
26
#include "srv0que.h"
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
#include "os0thread.h"

/* The global data structure coordinating a purge */
trx_purge_t*	purge_sys = NULL;

/* A dummy undo record used as a return value when we have a whole undo log
which needs no purge */
trx_undo_rec_t	trx_purge_dummy_rec;

/*********************************************************************
Checks if trx_id is >= purge_view: then it is guaranteed that its update
undo log still exists in the system. */

ibool
trx_purge_update_undo_must_exist(
/*=============================*/
			/* out: TRUE if is sure that it is preserved, also
			if the function returns FALSE, it is possible that
			the undo log still exists in the system */
	dulint	trx_id)	/* in: transaction id */
{
48
#ifdef UNIV_SYNC_DEBUG
49
	ut_ad(rw_lock_own(&(purge_sys->latch), RW_LOCK_SHARED));
50
#endif /* UNIV_SYNC_DEBUG */
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76

	if (!read_view_sees_trx_id(purge_sys->view, trx_id)) {

		return(TRUE);
	}

	return(FALSE);
}

/*=================== PURGE RECORD ARRAY =============================*/

/***********************************************************************
Stores info of an undo log record during a purge. */
static
trx_undo_inf_t*
trx_purge_arr_store_info(
/*=====================*/
			/* out: pointer to the storage cell */
	dulint	trx_no,	/* in: transaction number */
	dulint	undo_no)/* in: undo number */
{
	trx_undo_inf_t*	cell;
	trx_undo_arr_t*	arr;
	ulint		i;

	arr = purge_sys->arr;
77

78 79 80 81 82 83 84 85
	for (i = 0;; i++) {
		cell = trx_undo_arr_get_nth_info(arr, i);

		if (!(cell->in_use)) {
			/* Not in use, we may store here */
			cell->undo_no = undo_no;
			cell->trx_no = trx_no;
			cell->in_use = TRUE;
86

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
			arr->n_used++;

			return(cell);
		}
	}
}

/***********************************************************************
Removes info of an undo log record during a purge. */
UNIV_INLINE
void
trx_purge_arr_remove_info(
/*======================*/
	trx_undo_inf_t*	cell)	/* in: pointer to the storage cell */
{
	trx_undo_arr_t*	arr;

104
	arr = purge_sys->arr;
105 106

	cell->in_use = FALSE;
107

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
	ut_ad(arr->n_used > 0);

	arr->n_used--;
}

/***********************************************************************
Gets the biggest pair of a trx number and an undo number in a purge array. */
static
void
trx_purge_arr_get_biggest(
/*======================*/
	trx_undo_arr_t*	arr,	/* in: purge array */
	dulint*		trx_no,	/* out: transaction number: ut_dulint_zero
				if array is empty */
	dulint*		undo_no)/* out: undo number */
{
	trx_undo_inf_t*	cell;
	dulint		pair_trx_no;
	dulint		pair_undo_no;
	int		trx_cmp;
	ulint		n_used;
	ulint		i;
	ulint		n;
131

132 133 134 135
	n = 0;
	n_used = arr->n_used;
	pair_trx_no = ut_dulint_zero;
	pair_undo_no = ut_dulint_zero;
136

137 138 139 140 141
	for (i = 0;; i++) {
		cell = trx_undo_arr_get_nth_info(arr, i);

		if (cell->in_use) {
			n++;
142
			trx_cmp = ut_dulint_cmp(cell->trx_no, pair_trx_no);
143 144

			if ((trx_cmp > 0)
unknown's avatar
unknown committed
145 146 147
			    || ((trx_cmp == 0)
				&& (ut_dulint_cmp(cell->undo_no,
						  pair_undo_no) >= 0))) {
148

149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
				pair_trx_no = cell->trx_no;
				pair_undo_no = cell->undo_no;
			}
		}

		if (n == n_used) {
			*trx_no = pair_trx_no;
			*undo_no = pair_undo_no;

			return;
		}
	}
}

/********************************************************************
Builds a purge 'query' graph. The actual purge is performed by executing
this query graph. */
static
que_t*
trx_purge_graph_build(void)
/*=======================*/
				/* out, own: the query graph */
{
	mem_heap_t*	heap;
	que_fork_t*	fork;
	que_thr_t*	thr;
unknown's avatar
unknown committed
175
	/*	que_thr_t*	thr2; */
176

177 178 179
	heap = mem_heap_create(512);
	fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap);
	fork->trx = purge_sys->trx;
180

181 182
	thr = que_thr_create(fork, heap);

183
	thr->child = row_purge_node_create(thr, heap);
184

unknown's avatar
unknown committed
185
	/*	thr2 = que_thr_create(fork, fork, heap);
186

187
	thr2->child = row_purge_node_create(fork, thr2, heap);	 */
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210

	return(fork);
}

/************************************************************************
Creates the global purge system control structure and inits the history
mutex. */

void
trx_purge_sys_create(void)
/*======================*/
{
	ut_ad(mutex_own(&kernel_mutex));

	purge_sys = mem_alloc(sizeof(trx_purge_t));

	purge_sys->state = TRX_STOP_PURGE;

	purge_sys->n_pages_handled = 0;

	purge_sys->purge_trx_no = ut_dulint_zero;
	purge_sys->purge_undo_no = ut_dulint_zero;
	purge_sys->next_stored = FALSE;
211

unknown's avatar
unknown committed
212
	rw_lock_create(&purge_sys->latch, SYNC_PURGE_LATCH);
213

unknown's avatar
unknown committed
214
	mutex_create(&purge_sys->mutex, SYNC_PURGE_SYS);
215 216 217 218 219

	purge_sys->heap = mem_heap_create(256);

	purge_sys->arr = trx_undo_arr_create();

unknown's avatar
unknown committed
220
	purge_sys->sess = sess_open();
221

unknown's avatar
unknown committed
222
	purge_sys->trx = purge_sys->sess->trx;
223

unknown's avatar
unknown committed
224
	purge_sys->trx->is_purge = 1;
225 226 227 228

	ut_a(trx_start_low(purge_sys->trx, ULINT_UNDEFINED));

	purge_sys->query = trx_purge_graph_build();
229

unknown's avatar
unknown committed
230 231
	purge_sys->view = read_view_oldest_copy_or_open_new(ut_dulint_zero,
							    purge_sys->heap);
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
}

/*================ UNDO LOG HISTORY LIST =============================*/

/************************************************************************
Adds the update undo log as the first log in the history list. Removes the
update undo log segment from the rseg slot if it is too big for reuse. */

void
trx_purge_add_update_undo_to_history(
/*=================================*/
	trx_t*	trx,		/* in: transaction */
	page_t*	undo_page,	/* in: update undo log header page,
				x-latched */
	mtr_t*	mtr)		/* in: mtr */
{
	trx_undo_t*	undo;
	trx_rseg_t*	rseg;
	trx_rsegf_t*	rseg_header;
	trx_usegf_t*	seg_header;
	trx_ulogf_t*	undo_header;
	trx_upagef_t*	page_header;
	ulint		hist_size;
255

256
	undo = trx->update_undo;
257

258
	ut_ad(undo);
259

260
	rseg = undo->rseg;
261

262 263 264 265 266 267 268
	ut_ad(mutex_own(&(rseg->mutex)));

	rseg_header = trx_rsegf_get(rseg->space, rseg->page_no, mtr);

	undo_header = undo_page + undo->hdr_offset;
	seg_header  = undo_page + TRX_UNDO_SEG_HDR;
	page_header = undo_page + TRX_UNDO_PAGE_HDR;
269

270 271 272
	if (undo->state != TRX_UNDO_CACHED) {
		/* The undo log segment will not be reused */

273 274
		if (undo->id >= TRX_RSEG_N_SLOTS) {
			fprintf(stderr,
unknown's avatar
unknown committed
275 276
				"InnoDB: Error: undo->id is %lu\n",
				(ulong) undo->id);
277
			ut_error;
278 279
		}

280 281 282
		trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr);

		hist_size = mtr_read_ulint(rseg_header + TRX_RSEG_HISTORY_SIZE,
unknown's avatar
unknown committed
283
					   MLOG_4BYTES, mtr);
284 285
		ut_ad(undo->size == flst_get_len(
			      seg_header + TRX_UNDO_PAGE_LIST, mtr));
286 287

		mlog_write_ulint(rseg_header + TRX_RSEG_HISTORY_SIZE,
unknown's avatar
unknown committed
288
				 hist_size + undo->size, MLOG_4BYTES, mtr);
289 290 291 292
	}

	/* Add the log as the first in the history list */
	flst_add_first(rseg_header + TRX_RSEG_HISTORY,
unknown's avatar
unknown committed
293
		       undo_header + TRX_UNDO_HISTORY_NODE, mtr);
294 295 296
	mutex_enter(&kernel_mutex);
	trx_sys->rseg_history_len++;
	mutex_exit(&kernel_mutex);
297 298

	/* Write the trx number to the undo log header */
299
	mlog_write_dulint(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
300
	/* Write information about delete markings to the undo log header */
301

302 303
	if (!undo->del_marks) {
		mlog_write_ulint(undo_header + TRX_UNDO_DEL_MARKS, FALSE,
unknown's avatar
unknown committed
304
				 MLOG_2BYTES, mtr);
305
	}
306

307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
	if (rseg->last_page_no == FIL_NULL) {

		rseg->last_page_no = undo->hdr_page_no;
		rseg->last_offset = undo->hdr_offset;
		rseg->last_trx_no = trx->no;
		rseg->last_del_marks = undo->del_marks;
	}
}

/**************************************************************************
Frees an undo log segment which is in the history list. Cuts the end of the
history list at the youngest undo log in this segment. */
static
void
trx_purge_free_segment(
/*===================*/
	trx_rseg_t*	rseg,		/* in: rollback segment */
	fil_addr_t	hdr_addr,	/* in: the file address of log_hdr */
	ulint		n_removed_logs)	/* in: count of how many undo logs we
					will cut off from the end of the
					history list */
{
	page_t*		undo_page;
	trx_rsegf_t*	rseg_hdr;
	trx_ulogf_t*	log_hdr;
	trx_usegf_t*	seg_hdr;
	ibool		freed;
	ulint		seg_size;
	ulint		hist_size;
	ibool		marked		= FALSE;
	mtr_t		mtr;
338

unknown's avatar
unknown committed
339
	/*	fputs("Freeing an update undo log segment\n", stderr); */
340 341

	ut_ad(mutex_own(&(purge_sys->mutex)));
342
loop:
343
	mtr_start(&mtr);
344 345
	mutex_enter(&(rseg->mutex));

346 347 348 349 350 351 352 353 354 355 356 357 358 359
	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);

	undo_page = trx_undo_page_get(rseg->space, hdr_addr.page, &mtr);
	seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
	log_hdr = undo_page + hdr_addr.boffset;

	/* Mark the last undo log totally purged, so that if the system
	crashes, the tail of the undo log will not get accessed again. The
	list of pages in the undo log tail gets inconsistent during the
	freeing of the segment, and therefore purge should not try to access
	them again. */

	if (!marked) {
		mlog_write_ulint(log_hdr + TRX_UNDO_DEL_MARKS, FALSE,
unknown's avatar
unknown committed
360
				 MLOG_2BYTES, &mtr);
361 362
		marked = TRUE;
	}
363

364
	freed = fseg_free_step_not_header(seg_hdr + TRX_UNDO_FSEG_HEADER,
unknown's avatar
unknown committed
365
					  &mtr);
366
	if (!freed) {
367
		mutex_exit(&(rseg->mutex));
368 369 370 371 372 373 374 375
		mtr_commit(&mtr);

		goto loop;
	}

	/* The page list may now be inconsistent, but the length field
	stored in the list base node tells us how big it was before we
	started the freeing. */
376

377 378 379 380 381 382 383 384
	seg_size = flst_get_len(seg_hdr + TRX_UNDO_PAGE_LIST, &mtr);

	/* We may free the undo log segment header page; it must be freed
	within the same mtr as the undo log header is removed from the
	history list: otherwise, in case of a database crash, the segment
	could become inaccessible garbage in the file space. */

	flst_cut_end(rseg_hdr + TRX_RSEG_HISTORY,
unknown's avatar
unknown committed
385
		     log_hdr + TRX_UNDO_HISTORY_NODE, n_removed_logs, &mtr);
386 387 388 389 390 391

	mutex_enter(&kernel_mutex);
	ut_ad(trx_sys->rseg_history_len >= n_removed_logs);
	trx_sys->rseg_history_len -= n_removed_logs;
	mutex_exit(&kernel_mutex);

392 393 394 395 396 397 398
	freed = FALSE;

	while (!freed) {
		/* Here we assume that a file segment with just the header
		page can be freed in a few steps, so that the buffer pool
		is not flooded with bufferfixed pages: see the note in
		fsp0fsp.c. */
399

400
		freed = fseg_free_step(seg_hdr + TRX_UNDO_FSEG_HEADER,
unknown's avatar
unknown committed
401
				       &mtr);
402 403 404
	}

	hist_size = mtr_read_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
unknown's avatar
unknown committed
405
				   MLOG_4BYTES, &mtr);
406 407 408
	ut_ad(hist_size >= seg_size);

	mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
unknown's avatar
unknown committed
409
			 hist_size - seg_size, MLOG_4BYTES, &mtr);
410 411

	ut_ad(rseg->curr_size >= seg_size);
412

413 414
	rseg->curr_size -= seg_size;

415
	mutex_exit(&(rseg->mutex));
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445

	mtr_commit(&mtr);
}

/************************************************************************
Removes unnecessary history data from a rollback segment. */
static
void
trx_purge_truncate_rseg_history(
/*============================*/
	trx_rseg_t*	rseg,		/* in: rollback segment */
	dulint		limit_trx_no,	/* in: remove update undo logs whose
					trx number is < limit_trx_no */
	dulint		limit_undo_no)	/* in: if transaction number is equal
					to limit_trx_no, truncate undo records
					with undo number < limit_undo_no */
{
	fil_addr_t	hdr_addr;
	fil_addr_t	prev_hdr_addr;
	trx_rsegf_t*	rseg_hdr;
	page_t*		undo_page;
	trx_ulogf_t*	log_hdr;
	trx_usegf_t*	seg_hdr;
	int		cmp;
	ulint		n_removed_logs	= 0;
	mtr_t		mtr;

	ut_ad(mutex_own(&(purge_sys->mutex)));

	mtr_start(&mtr);
446 447
	mutex_enter(&(rseg->mutex));

448 449
	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);

450 451
	hdr_addr = trx_purge_get_log_from_hist(
		flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));
452 453 454
loop:
	if (hdr_addr.page == FIL_NULL) {

455
		mutex_exit(&(rseg->mutex));
456 457 458 459 460 461 462 463 464 465

		mtr_commit(&mtr);

		return;
	}

	undo_page = trx_undo_page_get(rseg->space, hdr_addr.page, &mtr);

	log_hdr = undo_page + hdr_addr.boffset;

466
	cmp = ut_dulint_cmp(mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO),
unknown's avatar
unknown committed
467
			    limit_trx_no);
468 469 470 471 472 473
	if (cmp == 0) {
		trx_undo_truncate_start(rseg, rseg->space, hdr_addr.page,
					hdr_addr.boffset, limit_undo_no);
	}

	if (cmp >= 0) {
474 475 476 477 478
		mutex_enter(&kernel_mutex);
		ut_a(trx_sys->rseg_history_len >= n_removed_logs);
		trx_sys->rseg_history_len -= n_removed_logs;
		mutex_exit(&kernel_mutex);

479
		flst_truncate_end(rseg_hdr + TRX_RSEG_HISTORY,
unknown's avatar
unknown committed
480 481
				  log_hdr + TRX_UNDO_HISTORY_NODE,
				  n_removed_logs, &mtr);
482

483
		mutex_exit(&(rseg->mutex));
484 485 486 487 488
		mtr_commit(&mtr);

		return;
	}

489 490
	prev_hdr_addr = trx_purge_get_log_from_hist(
		flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
491
	n_removed_logs++;
492

493 494 495
	seg_hdr = undo_page + TRX_UNDO_SEG_HDR;

	if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE)
unknown's avatar
unknown committed
496
	    && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
497 498 499

		/* We can free the whole log segment */

500
		mutex_exit(&(rseg->mutex));
501
		mtr_commit(&mtr);
502

503 504 505 506
		trx_purge_free_segment(rseg, hdr_addr, n_removed_logs);

		n_removed_logs = 0;
	} else {
507
		mutex_exit(&(rseg->mutex));
508 509 510 511
		mtr_commit(&mtr);
	}

	mtr_start(&mtr);
512
	mutex_enter(&(rseg->mutex));
513 514 515 516

	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);

	hdr_addr = prev_hdr_addr;
517

518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
	goto loop;
}

/************************************************************************
Removes unnecessary history data from rollback segments. NOTE that when this
function is called, the caller must not have any latches on undo log pages! */
static
void
trx_purge_truncate_history(void)
/*============================*/
{
	trx_rseg_t*	rseg;
	dulint		limit_trx_no;
	dulint		limit_undo_no;

	ut_ad(mutex_own(&(purge_sys->mutex)));

	trx_purge_arr_get_biggest(purge_sys->arr, &limit_trx_no,
unknown's avatar
unknown committed
536
				  &limit_undo_no);
537

538
	if (ut_dulint_cmp(limit_trx_no, ut_dulint_zero) == 0) {
539

540 541 542 543 544 545 546
		limit_trx_no = purge_sys->purge_trx_no;
		limit_undo_no = purge_sys->purge_undo_no;
	}

	/* We play safe and set the truncate limit at most to the purge view
	low_limit number, though this is not necessary */

unknown's avatar
Merge  
unknown committed
547 548
	if (ut_dulint_cmp(limit_trx_no, purge_sys->view->low_limit_no) >= 0) {
		limit_trx_no = purge_sys->view->low_limit_no;
549 550 551 552
		limit_undo_no = ut_dulint_zero;
	}

	ut_ad((ut_dulint_cmp(limit_trx_no,
unknown's avatar
unknown committed
553
			     purge_sys->view->low_limit_no) <= 0));
554 555 556 557 558

	rseg = UT_LIST_GET_FIRST(trx_sys->rseg_list);

	while (rseg) {
		trx_purge_truncate_rseg_history(rseg, limit_trx_no,
unknown's avatar
unknown committed
559
						limit_undo_no);
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
		rseg = UT_LIST_GET_NEXT(rseg_list, rseg);
	}
}

/************************************************************************
Does a truncate if the purge array is empty. NOTE that when this function is
called, the caller must not have any latches on undo log pages! */
UNIV_INLINE
ibool
trx_purge_truncate_if_arr_empty(void)
/*=================================*/
			/* out: TRUE if array empty */
{
	ut_ad(mutex_own(&(purge_sys->mutex)));

unknown's avatar
Merge  
unknown committed
575
	if (purge_sys->arr->n_used == 0) {
576 577 578 579 580 581 582 583 584 585 586 587

		trx_purge_truncate_history();

		return(TRUE);
	}

	return(FALSE);
}

/***************************************************************************
Updates the last not yet purged history log info in rseg when we have purged
a whole undo log. Advances also purge_sys->purge_trx_no past the purged log. */
588
static
589 590 591 592 593
void
trx_purge_rseg_get_next_history_log(
/*================================*/
	trx_rseg_t*	rseg)	/* in: rollback segment */
{
594
	page_t*		undo_page;
595 596 597 598 599 600 601 602 603 604 605
	trx_ulogf_t*	log_hdr;
	trx_usegf_t*	seg_hdr;
	fil_addr_t	prev_log_addr;
	dulint		trx_no;
	ibool		del_marks;
	mtr_t		mtr;

	ut_ad(mutex_own(&(purge_sys->mutex)));

	mutex_enter(&(rseg->mutex));

unknown's avatar
unknown committed
606
	ut_a(rseg->last_page_no != FIL_NULL);
607 608 609 610

	purge_sys->purge_trx_no = ut_dulint_add(rseg->last_trx_no, 1);
	purge_sys->purge_undo_no = ut_dulint_zero;
	purge_sys->next_stored = FALSE;
611

612
	mtr_start(&mtr);
613

614 615 616 617 618
	undo_page = trx_undo_page_get_s_latched(rseg->space,
						rseg->last_page_no, &mtr);
	log_hdr = undo_page + rseg->last_offset;
	seg_hdr = undo_page + TRX_UNDO_SEG_HDR;

unknown's avatar
unknown committed
619
	/* Increase the purge page count by one for every handled log */
620

unknown's avatar
unknown committed
621
	purge_sys->n_pages_handled++;
622

623 624
	prev_log_addr = trx_purge_get_log_from_hist(
		flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
625 626 627 628
	if (prev_log_addr.page == FIL_NULL) {
		/* No logs left in the history list */

		rseg->last_page_no = FIL_NULL;
629

630 631 632
		mutex_exit(&(rseg->mutex));
		mtr_commit(&mtr);

unknown's avatar
unknown committed
633
		mutex_enter(&kernel_mutex);
634

unknown's avatar
unknown committed
635 636 637 638 639 640 641
		/* Add debug code to track history list corruption reported
		on the MySQL mailing list on Nov 9, 2004. The fut0lst.c
		file-based list was corrupt. The prev node pointer was
		FIL_NULL, even though the list length was over 8 million nodes!
		We assume that purge truncates the history list in moderate
		size pieces, and if we here reach the head of the list, the
		list cannot be longer than 20 000 undo logs now. */
642

unknown's avatar
unknown committed
643 644 645
		if (trx_sys->rseg_history_len > 20000) {
			ut_print_timestamp(stderr);
			fprintf(stderr,
unknown's avatar
unknown committed
646 647 648 649 650 651 652
				"  InnoDB: Warning: purge reached the"
				" head of the history list,\n"
				"InnoDB: but its length is still"
				" reported as %lu! Make a detailed bug\n"
				"InnoDB: report, and submit it"
				" to http://bugs.mysql.com\n",
				(ulong) trx_sys->rseg_history_len);
unknown's avatar
unknown committed
653 654 655 656
		}

		mutex_exit(&kernel_mutex);

657 658 659 660 661 662 663 664 665 666
		return;
	}

	mutex_exit(&(rseg->mutex));
	mtr_commit(&mtr);

	/* Read the trx number and del marks from the previous log header */
	mtr_start(&mtr);

	log_hdr = trx_undo_page_get_s_latched(rseg->space,
unknown's avatar
unknown committed
667 668
					      prev_log_addr.page, &mtr)
		+ prev_log_addr.boffset;
669 670

	trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
671

672 673 674 675 676 677 678 679 680 681 682 683 684
	del_marks = mach_read_from_2(log_hdr + TRX_UNDO_DEL_MARKS);

	mtr_commit(&mtr);

	mutex_enter(&(rseg->mutex));

	rseg->last_page_no = prev_log_addr.page;
	rseg->last_offset = prev_log_addr.boffset;
	rseg->last_trx_no = trx_no;
	rseg->last_del_marks = del_marks;

	mutex_exit(&(rseg->mutex));
}
685

686 687 688 689 690
/***************************************************************************
Chooses the next undo log to purge and updates the info in purge_sys. This
function is used to initialize purge_sys when the next record to purge is
not known, and also to update the purge system info on the next record when
purge has handled the whole undo log for a transaction. */
691
static
692 693 694 695 696 697 698 699
void
trx_purge_choose_next_log(void)
/*===========================*/
{
	trx_undo_rec_t*	rec;
	trx_rseg_t*	rseg;
	trx_rseg_t*	min_rseg;
	dulint		min_trx_no;
unknown's avatar
unknown committed
700 701 702
	ulint		space = 0;   /* remove warning (??? bug ???) */
	ulint		page_no = 0; /* remove warning (??? bug ???) */
	ulint		offset = 0;  /* remove warning (??? bug ???) */
703
	mtr_t		mtr;
704

705 706 707 708 709
	ut_ad(mutex_own(&(purge_sys->mutex)));
	ut_ad(purge_sys->next_stored == FALSE);

	rseg = UT_LIST_GET_FIRST(trx_sys->rseg_list);

710 711
	min_trx_no = ut_dulint_max;

712
	min_rseg = NULL;
713

714 715
	while (rseg) {
		mutex_enter(&(rseg->mutex));
716

717 718 719
		if (rseg->last_page_no != FIL_NULL) {

			if ((min_rseg == NULL)
unknown's avatar
unknown committed
720 721
			    || (ut_dulint_cmp(min_trx_no,
					      rseg->last_trx_no) > 0)) {
722 723 724 725

				min_rseg = rseg;
				min_trx_no = rseg->last_trx_no;
				space = rseg->space;
726
				ut_a(space == 0); /* We assume in purge of
unknown's avatar
unknown committed
727 728
						  externally stored fields
						  that space id == 0 */
729 730 731 732 733 734 735 736 737
				page_no = rseg->last_page_no;
				offset = rseg->last_offset;
			}
		}

		mutex_exit(&(rseg->mutex));

		rseg = UT_LIST_GET_NEXT(rseg_list, rseg);
	}
738

739 740 741 742 743 744 745 746 747 748 749 750 751
	if (min_rseg == NULL) {

		return;
	}

	mtr_start(&mtr);

	if (!min_rseg->last_del_marks) {
		/* No need to purge this log */

		rec = &trx_purge_dummy_rec;
	} else {
		rec = trx_undo_get_first_rec(space, page_no, offset,
unknown's avatar
unknown committed
752
					     RW_S_LATCH, &mtr);
753 754 755 756 757 758
		if (rec == NULL) {
			/* Undo log empty */

			rec = &trx_purge_dummy_rec;
		}
	}
759

760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
	purge_sys->next_stored = TRUE;
	purge_sys->rseg = min_rseg;

	purge_sys->hdr_page_no = page_no;
	purge_sys->hdr_offset = offset;

	purge_sys->purge_trx_no = min_trx_no;

	if (rec == &trx_purge_dummy_rec) {

		purge_sys->purge_undo_no = ut_dulint_zero;
		purge_sys->page_no = page_no;
		purge_sys->offset = 0;
	} else {
		purge_sys->purge_undo_no = trx_undo_rec_get_undo_no(rec);

		purge_sys->page_no = buf_frame_get_page_no(rec);
		purge_sys->offset = rec - buf_frame_align(rec);
	}

	mtr_commit(&mtr);
}

/***************************************************************************
Gets the next record to purge and updates the info in the purge system. */
static
trx_undo_rec_t*
trx_purge_get_next_rec(
/*===================*/
				/* out: copy of an undo log record or
				pointer to the dummy undo log record */
	mem_heap_t*	heap)	/* in: memory heap where copied */
{
	trx_undo_rec_t*	rec;
	trx_undo_rec_t*	rec_copy;
	trx_undo_rec_t*	rec2;
	trx_undo_rec_t*	next_rec;
797 798
	page_t*		undo_page;
	page_t*		page;
799 800 801 802 803 804 805 806 807 808
	ulint		offset;
	ulint		page_no;
	ulint		space;
	ulint		type;
	ulint		cmpl_info;
	mtr_t		mtr;

	ut_ad(mutex_own(&(purge_sys->mutex)));
	ut_ad(purge_sys->next_stored);

unknown's avatar
Merge  
unknown committed
809
	space = purge_sys->rseg->space;
810 811 812 813 814 815 816 817
	page_no = purge_sys->page_no;
	offset = purge_sys->offset;

	if (offset == 0) {
		/* It is the dummy undo log record, which means that there is
		no need to purge this undo log */

		trx_purge_rseg_get_next_history_log(purge_sys->rseg);
818

819 820 821 822 823 824
		/* Look for the next undo log and record to purge */

		trx_purge_choose_next_log();

		return(&trx_purge_dummy_rec);
	}
825

826 827 828 829 830 831 832 833 834 835
	mtr_start(&mtr);

	undo_page = trx_undo_page_get_s_latched(space, page_no, &mtr);
	rec = undo_page + offset;

	rec2 = rec;

	for (;;) {
		/* Try first to find the next record which requires a purge
		operation from the same page of the same undo log */
836

837
		next_rec = trx_undo_page_get_next_rec(rec2,
unknown's avatar
unknown committed
838 839
						      purge_sys->hdr_page_no,
						      purge_sys->hdr_offset);
840
		if (next_rec == NULL) {
841 842 843
			rec2 = trx_undo_get_next_rec(
				rec2, purge_sys->hdr_page_no,
				purge_sys->hdr_offset, &mtr);
844 845 846 847
			break;
		}

		rec2 = next_rec;
848

849 850 851 852 853
		type = trx_undo_rec_get_type(rec2);

		if (type == TRX_UNDO_DEL_MARK_REC) {

			break;
854
		}
855 856

		cmpl_info = trx_undo_rec_get_cmpl_info(rec2);
857 858 859 860

		if (trx_undo_rec_get_extern_storage(rec2)) {
			break;
		}
861

862
		if ((type == TRX_UNDO_UPD_EXIST_REC)
unknown's avatar
unknown committed
863
		    && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
864 865
			break;
		}
866 867 868 869
	}

	if (rec2 == NULL) {
		mtr_commit(&mtr);
870

871
		trx_purge_rseg_get_next_history_log(purge_sys->rseg);
872

873 874
		/* Look for the next undo log and record to purge */

875
		trx_purge_choose_next_log();
876 877 878 879 880 881 882 883

		mtr_start(&mtr);

		undo_page = trx_undo_page_get_s_latched(space, page_no, &mtr);

		rec = undo_page + offset;
	} else {
		page = buf_frame_align(rec2);
884

885 886 887 888 889 890 891 892 893
		purge_sys->purge_undo_no = trx_undo_rec_get_undo_no(rec2);
		purge_sys->page_no = buf_frame_get_page_no(page);
		purge_sys->offset = rec2 - page;

		if (undo_page != page) {
			/* We advance to a new page of the undo log: */
			purge_sys->n_pages_handled++;
		}
	}
894

895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918
	rec_copy = trx_undo_rec_copy(rec, heap);

	mtr_commit(&mtr);

	return(rec_copy);
}

/************************************************************************
Fetches the next undo log record from the history list to purge. It must be
released with the corresponding release function. */

trx_undo_rec_t*
trx_purge_fetch_next_rec(
/*=====================*/
				/* out: copy of an undo log record or
				pointer to the dummy undo log record
				&trx_purge_dummy_rec, if the whole undo log
				can skipped in purge; NULL if none left */
	dulint*		roll_ptr,/* out: roll pointer to undo record */
	trx_undo_inf_t** cell,	/* out: storage cell for the record in the
				purge array */
	mem_heap_t*	heap)	/* in: memory heap where copied */
{
	trx_undo_rec_t*	undo_rec;
919

920 921 922 923 924 925 926 927 928 929 930 931 932 933 934
	mutex_enter(&(purge_sys->mutex));

	if (purge_sys->state == TRX_STOP_PURGE) {
		trx_purge_truncate_if_arr_empty();

		mutex_exit(&(purge_sys->mutex));

		return(NULL);
	}

	if (!purge_sys->next_stored) {
		trx_purge_choose_next_log();

		if (!purge_sys->next_stored) {
			purge_sys->state = TRX_STOP_PURGE;
935

936 937 938
			trx_purge_truncate_if_arr_empty();

			if (srv_print_thread_releases) {
939
				fprintf(stderr,
unknown's avatar
unknown committed
940 941
					"Purge: No logs left in the"
					" history list; pages handled %lu\n",
942
					(ulong) purge_sys->n_pages_handled);
943 944 945 946 947
			}

			mutex_exit(&(purge_sys->mutex));

			return(NULL);
948 949
		}
	}
950 951 952 953

	if (purge_sys->n_pages_handled >= purge_sys->handle_limit) {

		purge_sys->state = TRX_STOP_PURGE;
954

955 956 957 958 959
		trx_purge_truncate_if_arr_empty();

		mutex_exit(&(purge_sys->mutex));

		return(NULL);
960
	}
961 962

	if (ut_dulint_cmp(purge_sys->purge_trx_no,
unknown's avatar
unknown committed
963
			  purge_sys->view->low_limit_no) >= 0) {
964
		purge_sys->state = TRX_STOP_PURGE;
965

966 967 968 969 970 971
		trx_purge_truncate_if_arr_empty();

		mutex_exit(&(purge_sys->mutex));

		return(NULL);
	}
972

unknown's avatar
unknown committed
973 974 975 976
	/*	fprintf(stderr, "Thread %lu purging trx %lu undo record %lu\n",
	os_thread_get_curr_id(),
	ut_dulint_get_low(purge_sys->purge_trx_no),
	ut_dulint_get_low(purge_sys->purge_undo_no)); */
977 978

	*roll_ptr = trx_undo_build_roll_ptr(FALSE, (purge_sys->rseg)->id,
unknown's avatar
unknown committed
979 980
					    purge_sys->page_no,
					    purge_sys->offset);
981 982 983 984 985

	*cell = trx_purge_arr_store_info(purge_sys->purge_trx_no,
					 purge_sys->purge_undo_no);

	ut_ad(ut_dulint_cmp(purge_sys->purge_trx_no,
unknown's avatar
unknown committed
986
			    (purge_sys->view)->low_limit_no) < 0);
987

988 989
	/* The following call will advance the stored values of purge_trx_no
	and purge_undo_no, therefore we had to store them first */
990

991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006
	undo_rec = trx_purge_get_next_rec(heap);

	mutex_exit(&(purge_sys->mutex));

	return(undo_rec);
}

/***********************************************************************
Releases a reserved purge undo record. */

void
trx_purge_rec_release(
/*==================*/
	trx_undo_inf_t*	cell)	/* in: storage cell */
{
	trx_undo_arr_t*	arr;
1007

1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
	mutex_enter(&(purge_sys->mutex));

	arr = purge_sys->arr;

	trx_purge_arr_remove_info(cell);

	mutex_exit(&(purge_sys->mutex));
}

/***********************************************************************
This function runs a purge batch. */

ulint
trx_purge(void)
/*===========*/
				/* out: number of undo log pages handled in
				the batch */
{
	que_thr_t*	thr;
unknown's avatar
unknown committed
1027
	/*	que_thr_t*	thr2; */
1028 1029 1030 1031 1032
	ulint		old_pages_handled;

	mutex_enter(&(purge_sys->mutex));

	if (purge_sys->trx->n_active_thrs > 0) {
1033

1034 1035 1036 1037
		mutex_exit(&(purge_sys->mutex));

		/* Should not happen */

1038
		ut_error;
1039

1040
		return(0);
1041
	}
1042 1043 1044 1045 1046

	rw_lock_x_lock(&(purge_sys->latch));

	mutex_enter(&kernel_mutex);

1047
	/* Close and free the old purge view */
1048 1049 1050 1051 1052

	read_view_close(purge_sys->view);
	purge_sys->view = NULL;
	mem_heap_empty(purge_sys->heap);

1053 1054 1055 1056 1057 1058 1059 1060 1061
	/* Determine how much data manipulation language (DML) statements
	need to be delayed in order to reduce the lagging of the purge
	thread. */
	srv_dml_needed_delay = 0; /* in microseconds; default: no delay */

	/* If we cannot advance the 'purge view' because of an old
	'consistent read view', then the DML statements cannot be delayed.
	Also, srv_max_purge_lag <= 0 means 'infinity'. */
	if (srv_max_purge_lag > 0
unknown's avatar
unknown committed
1062
	    && !UT_LIST_GET_LAST(trx_sys->view_list)) {
1063
		float	ratio = (float) trx_sys->rseg_history_len
unknown's avatar
unknown committed
1064
			/ srv_max_purge_lag;
1065 1066 1067 1068
		if (ratio > ULINT_MAX / 10000) {
			/* Avoid overflow: maximum delay is 4295 seconds */
			srv_dml_needed_delay = ULINT_MAX;
		} else if (ratio > 1) {
1069 1070 1071 1072
			/* If the history list length exceeds the
			innodb_max_purge_lag, the
			data manipulation statements are delayed
			by at least 5000 microseconds. */
1073
			srv_dml_needed_delay = (ulint) ((ratio - .5) * 10000);
1074 1075 1076
		}
	}

unknown's avatar
unknown committed
1077 1078
	purge_sys->view = read_view_oldest_copy_or_open_new(ut_dulint_zero,
							    purge_sys->heap);
1079
	mutex_exit(&kernel_mutex);
1080 1081 1082

	rw_lock_x_unlock(&(purge_sys->latch));

1083 1084
	purge_sys->state = TRX_PURGE_ON;

1085 1086 1087 1088 1089 1090 1091 1092 1093 1094
	/* Handle at most 20 undo log pages in one purge batch */

	purge_sys->handle_limit = purge_sys->n_pages_handled + 20;

	old_pages_handled = purge_sys->n_pages_handled;

	mutex_exit(&(purge_sys->mutex));

	mutex_enter(&kernel_mutex);

unknown's avatar
unknown committed
1095
	thr = que_fork_start_command(purge_sys->query);
1096 1097

	ut_ad(thr);
1098

unknown's avatar
unknown committed
1099
	/*	thr2 = que_fork_start_command(purge_sys->query);
1100

1101
	ut_ad(thr2); */
1102

1103 1104 1105

	mutex_exit(&kernel_mutex);

unknown's avatar
unknown committed
1106
	/*	srv_que_task_enqueue(thr2); */
1107

1108
	if (srv_print_thread_releases) {
1109

1110
		fputs("Starting purge\n", stderr);
1111 1112 1113 1114 1115 1116
	}

	que_run_threads(thr);

	if (srv_print_thread_releases) {

1117
		fprintf(stderr,
unknown's avatar
unknown committed
1118 1119
			"Purge ends; pages handled %lu\n",
			(ulong) purge_sys->n_pages_handled);
1120 1121 1122 1123
	}

	return(purge_sys->n_pages_handled - old_pages_handled);
}
unknown's avatar
Merge  
unknown committed
1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135

/**********************************************************************
Prints information of the purge system to stderr. */

void
trx_purge_sys_print(void)
/*=====================*/
{
	fprintf(stderr, "InnoDB: Purge system view:\n");
	read_view_print(purge_sys->view);

	fprintf(stderr, "InnoDB: Purge trx n:o %lu %lu, undo n_o %lu %lu\n",
unknown's avatar
unknown committed
1136 1137 1138 1139
		(ulong) ut_dulint_get_high(purge_sys->purge_trx_no),
		(ulong) ut_dulint_get_low(purge_sys->purge_trx_no),
		(ulong) ut_dulint_get_high(purge_sys->purge_undo_no),
		(ulong) ut_dulint_get_low(purge_sys->purge_undo_no));
unknown's avatar
Merge  
unknown committed
1140
	fprintf(stderr,
unknown's avatar
unknown committed
1141 1142
		"InnoDB: Purge next stored %lu, page_no %lu, offset %lu,\n"
		"InnoDB: Purge hdr_page_no %lu, hdr_offset %lu\n",
1143 1144 1145 1146 1147
		(ulong) purge_sys->next_stored,
		(ulong) purge_sys->page_no,
		(ulong) purge_sys->offset,
		(ulong) purge_sys->hdr_page_no,
		(ulong) purge_sys->hdr_offset);
unknown's avatar
Merge  
unknown committed
1148
}