lock0lock.cc 177 KB
Newer Older
1 2
/*****************************************************************************

3
Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
4
Copyright (c) 2014, 2021, MariaDB Corporation.
5 6 7 8 9 10 11 12 13 14 15

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
Vicențiu Ciorbaru's avatar
Vicențiu Ciorbaru committed
16
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 18 19 20 21 22 23 24 25 26 27 28

*****************************************************************************/

/**************************************************//**
@file lock/lock0lock.cc
The transaction lock system

Created 5/7/1996 Heikki Tuuri
*******************************************************/

#define LOCK_MODULE_IMPLEMENTATION

29
#include "univ.i"
30

31
#include <mysql/service_thd_error_context.h>
32
#include <mysql/service_thd_wait.h>
33
#include <sql_class.h>
34

35 36
#include "lock0lock.h"
#include "lock0priv.h"
37
#include "dict0mem.h"
38 39 40
#include "trx0purge.h"
#include "trx0sys.h"
#include "ut0vec.h"
41
#include "btr0cur.h"
42 43
#include "row0sel.h"
#include "row0mysql.h"
44
#include "row0vers.h"
45
#include "pars0pars.h"
46
#include "srv0mon.h"
47

48 49
#include <set>

50
#ifdef WITH_WSREP
51
#include <mysql/service_wsrep.h>
52
#include <debug_sync.h>
53
#endif /* WITH_WSREP */
54

55
/** The value of innodb_deadlock_detect */
56 57 58
my_bool innodb_deadlock_detect;
/** The value of innodb_deadlock_report */
ulong innodb_deadlock_report;
59

60
#ifdef HAVE_REPLICATION
61 62
extern "C" void thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd);
extern "C" int thd_need_wait_reports(const MYSQL_THD thd);
63
extern "C" int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
64
#endif
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
/** Functor for accessing the embedded node within a table lock. */
struct TableLockGetNode
{
  ut_list_node<lock_t> &operator()(lock_t &elem)
  { return(elem.un_member.tab_lock.locks); }
};

/** Create the hash table.
@param n  the lower bound of n_cells */
void lock_sys_t::hash_table::create(ulint n)
{
  n_cells= ut_find_prime(n);
  const size_t size= pad(n_cells) * sizeof *array;
  void* v= aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
  memset(v, 0, size);
  array= static_cast<hash_cell_t*>(v);
}

/** Resize the hash table.
@param n  the lower bound of n_cells */
void lock_sys_t::hash_table::resize(ulint n)
{
  ut_ad(lock_sys.is_writer());
  ulint new_n_cells= ut_find_prime(n);
  const size_t size= pad(new_n_cells) * sizeof *array;
  void* v= aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
  memset(v, 0, size);
  hash_cell_t *new_array= static_cast<hash_cell_t*>(v);

  for (auto i= pad(n_cells); i--; )
  {
    if (lock_t *lock= static_cast<lock_t*>(array[i].node))
    {
      ut_ad(i % (ELEMENTS_PER_LATCH + 1)); /* all hash_latch must vacated */
      do
      {
        ut_ad(!lock->is_table());
        hash_cell_t *c= calc_hash(lock->un_member.rec_lock.page_id.fold(),
                                  new_n_cells) + new_array;
        lock_t *next= lock->hash;
        lock->hash= nullptr;
        if (!c->node)
          c->node= lock;
        else if (!lock->is_waiting())
        {
          lock->hash= static_cast<lock_t*>(c->node);
          c->node= lock;
        }
        else
        {
          lock_t *next= static_cast<lock_t*>(c->node);
          while (next->hash)
            next= next->hash;
          next->hash= lock;
        }
        lock= next;
      }
      while (lock);
    }
  }

127
  aligned_free(array);
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
  array= new_array;
  n_cells= new_n_cells;
}

#if defined SRW_LOCK_DUMMY && !defined _WIN32
void lock_sys_t::hash_latch::wait()
{
  pthread_mutex_lock(&lock_sys.hash_mutex);
  while (!write_trylock())
    pthread_cond_wait(&lock_sys.hash_cond, &lock_sys.hash_mutex);
  pthread_mutex_unlock(&lock_sys.hash_mutex);
}

void lock_sys_t::hash_latch::release()
{
  pthread_mutex_lock(&lock_sys.hash_mutex);
  write_unlock();
  pthread_cond_signal(&lock_sys.hash_cond);
  pthread_mutex_unlock(&lock_sys.hash_mutex);
}
#endif

#ifdef UNIV_DEBUG
/** Assert that a lock shard is exclusively latched by this thread */
void lock_sys_t::assert_locked(const lock_t &lock) const
{
  ut_ad(this == &lock_sys);
  if (is_writer())
    return;
  if (lock.is_table())
    assert_locked(*lock.un_member.tab_lock.table);
  else
    lock_sys.hash_get(lock.type_mode).
      assert_locked(lock.un_member.rec_lock.page_id);
}

/** Assert that a table lock shard is exclusively latched by this thread */
void lock_sys_t::assert_locked(const dict_table_t &table) const
{
  ut_ad(!table.is_temporary());

  const os_thread_id_t current_thread= os_thread_get_curr_id();
  if (writer.load(std::memory_order_relaxed) == current_thread)
    return;
  ut_ad(readers);
  ut_ad(table.lock_mutex_is_owner());
}

176
/** Assert that hash cell for page is exclusively latched by this thread */
177 178 179 180 181
void lock_sys_t::hash_table::assert_locked(const page_id_t id) const
{
  if (lock_sys.is_writer())
    return;
  ut_ad(lock_sys.readers);
182 183 184 185 186 187 188 189 190 191
  ut_ad(latch(cell_get(id.fold()))->is_locked());
}

/** Assert that a hash table cell is exclusively latched (by some thread) */
void lock_sys_t::assert_locked(const hash_cell_t &cell) const
{
  if (lock_sys.is_writer())
    return;
  ut_ad(lock_sys.readers);
  ut_ad(hash_table::latch(const_cast<hash_cell_t*>(&cell))->is_locked());
192 193 194 195 196 197 198
}
#endif

LockGuard::LockGuard(lock_sys_t::hash_table &hash, page_id_t id)
{
  const auto id_fold= id.fold();
  lock_sys.rd_lock(SRW_LOCK_CALL);
199 200
  cell_= hash.cell_get(id_fold);
  hash.latch(cell_)->acquire();
201 202 203 204 205 206 207 208
}

LockMultiGuard::LockMultiGuard(lock_sys_t::hash_table &hash,
                               const page_id_t id1, const page_id_t id2)
{
  ut_ad(id1.space() == id2.space());
  const auto id1_fold= id1.fold(), id2_fold= id2.fold();
  lock_sys.rd_lock(SRW_LOCK_CALL);
209 210 211 212
  cell1_= hash.cell_get(id1_fold);
  cell2_= hash.cell_get(id2_fold);

  auto latch1= hash.latch(cell1_), latch2= hash.latch(cell2_);
213 214 215 216 217 218 219 220 221
  if (latch1 > latch2)
    std::swap(latch1, latch2);
  latch1->acquire();
  if (latch1 != latch2)
    latch2->acquire();
}

LockMultiGuard::~LockMultiGuard()
{
222 223
  auto latch1= lock_sys_t::hash_table::latch(cell1_),
    latch2= lock_sys_t::hash_table::latch(cell2_);
224 225 226 227 228 229 230
  latch1->release();
  if (latch1 != latch2)
    latch2->release();
  /* Must be last, to avoid a race with lock_sys_t::hash_table::resize() */
  lock_sys.rd_unlock();
}

Marko Mäkelä's avatar
Marko Mäkelä committed
231
/** Pretty-print a table lock.
232 233
@param[in,out]	file	output stream
@param[in]	lock	table lock */
Marko Mäkelä's avatar
Marko Mäkelä committed
234
static void lock_table_print(FILE* file, const lock_t* lock);
235

Marko Mäkelä's avatar
Marko Mäkelä committed
236
/** Pretty-print a record lock.
237
@param[in,out]	file	output stream
Marko Mäkelä's avatar
Marko Mäkelä committed
238 239 240
@param[in]	lock	record lock
@param[in,out]	mtr	mini-transaction for accessing the record */
static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr);
241

242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
namespace Deadlock
{
  /** Whether to_check may be nonempty */
  static Atomic_relaxed<bool> to_be_checked;
  /** Transactions to check for deadlock. Protected by lock_sys.wait_mutex. */
  static std::set<trx_t*> to_check;

  MY_ATTRIBUTE((nonnull, warn_unused_result))
  /** Check if a lock request results in a deadlock.
  Resolve a deadlock by choosing a transaction that will be rolled back.
  @param trx    transaction requesting a lock
  @return whether trx must report DB_DEADLOCK */
  static bool check_and_resolve(trx_t *trx);

  /** Quickly detect a deadlock using Brent's cycle detection algorithm.
  @param trx     transaction that is waiting for another transaction
  @return a transaction that is part of a cycle
  @retval nullptr if no cycle was found */
  inline trx_t *find_cycle(trx_t *trx)
  {
    mysql_mutex_assert_owner(&lock_sys.wait_mutex);
    trx_t *tortoise= trx, *hare= trx;
    for (unsigned power= 1, l= 1; (hare= hare->lock.wait_trx) != nullptr; l++)
    {
      if (tortoise == hare)
      {
        ut_ad(l > 1);
        lock_sys.deadlocks++;
        /* Note: Normally, trx should be part of any deadlock cycle
        that is found. However, if innodb_deadlock_detect=OFF had been
        in effect in the past, it is possible that trx will be waiting
        for a transaction that participates in a pre-existing deadlock
        cycle. In that case, our victim will not be trx. */
        return hare;
      }
      if (l == power)
      {
        /* The maximum concurrent number of TRX_STATE_ACTIVE transactions
        is TRX_RSEG_N_SLOTS * 128, or innodb_page_size / 16 * 128
        (default: 131,072, maximum: 524,288).
        Our maximum possible number of iterations should be twice that. */
        power<<= 1;
        l= 0;
        tortoise= hare;
      }
    }
    return nullptr;
  }
290 291
};

292
#ifdef UNIV_DEBUG
293 294
/** Validate the transactional locks. */
static void lock_validate();
295

296 297 298 299 300 301
/** Validate the record lock queues on a page.
@param block    buffer pool block
@param latched  whether the tablespace latch may be held
@return true if ok */
static bool lock_rec_validate_page(const buf_block_t *block, bool latched)
  MY_ATTRIBUTE((nonnull, warn_unused_result));
302 303 304
#endif /* UNIV_DEBUG */

/* The lock system */
305
lock_sys_t lock_sys;
306

307 308
/** Only created if !srv_read_only_mode. Protected by lock_sys.latch. */
static FILE *lock_latest_err_file;
309 310 311

/*********************************************************************//**
Reports that a transaction id is insensible, i.e., in the future. */
312
ATTRIBUTE_COLD
313 314 315 316 317 318
void
lock_report_trx_id_insanity(
/*========================*/
	trx_id_t	trx_id,		/*!< in: trx id */
	const rec_t*	rec,		/*!< in: user record */
	dict_index_t*	index,		/*!< in: index */
319
	const rec_offs*	offsets,	/*!< in: rec_get_offsets(rec, index) */
320
	trx_id_t	max_trx_id)	/*!< in: trx_sys.get_max_trx_id() */
321
{
322
	ut_ad(rec_offs_validate(rec, index, offsets));
323
	ut_ad(!rec_is_metadata(rec, *index));
324

325
	ib::error()
326
		<< "Transaction id " << ib::hex(trx_id)
327 328 329 330 331
		<< " associated with record" << rec_offsets_print(rec, offsets)
		<< " in index " << index->name
		<< " of table " << index->table->name
		<< " is greater than the global counter " << max_trx_id
		<< "! The table is corrupted.";
332 333 334 335
}

/*********************************************************************//**
Checks that a transaction id is sensible, i.e., not in the future.
336
@return true if ok */
337 338 339 340 341 342
bool
lock_check_trx_id_sanity(
/*=====================*/
	trx_id_t	trx_id,		/*!< in: trx id */
	const rec_t*	rec,		/*!< in: user record */
	dict_index_t*	index,		/*!< in: index */
343
	const rec_offs*	offsets)	/*!< in: rec_get_offsets(rec, index) */
344
{
345 346 347 348 349 350 351 352 353 354 355 356
  ut_ad(rec_offs_validate(rec, index, offsets));
  ut_ad(!rec_is_metadata(rec, *index));

  trx_id_t max_trx_id= trx_sys.get_max_trx_id();
  ut_ad(max_trx_id || srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN);

  if (UNIV_LIKELY(max_trx_id != 0) && UNIV_UNLIKELY(trx_id >= max_trx_id))
  {
    lock_report_trx_id_insanity(trx_id, rec, index, offsets, max_trx_id);
    return false;
  }
  return true;
357 358 359
}


360 361
/**
  Creates the lock system at database start.
362

363 364 365 366
  @param[in] n_cells number of slots in lock hash table
*/
void lock_sys_t::create(ulint n_cells)
{
367 368
  ut_ad(this == &lock_sys);
  ut_ad(!is_initialised());
369

370
  m_initialised= true;
371

372
  latch.SRW_LOCK_INIT(lock_latch_key);
373
  mysql_mutex_init(lock_wait_mutex_key, &wait_mutex, nullptr);
374 375 376 377
#if defined SRW_LOCK_DUMMY && !defined _WIN32
  pthread_mutex_init(&hash_mutex, nullptr);
  pthread_cond_init(&hash_cond, nullptr);
#endif
378

379 380 381
  rec_hash.create(n_cells);
  prdt_hash.create(n_cells);
  prdt_page_hash.create(n_cells);
382

383 384 385 386 387
  if (!srv_read_only_mode)
  {
    lock_latest_err_file= os_file_create_tmpfile();
    ut_a(lock_latest_err_file);
  }
388 389
}

390 391 392 393 394 395 396 397 398 399 400 401 402 403
#ifdef UNIV_PFS_RWLOCK
/** Acquire exclusive lock_sys.latch */
void lock_sys_t::wr_lock(const char *file, unsigned line)
{
  latch.wr_lock(file, line);
  ut_ad(!writer.exchange(os_thread_get_curr_id(), std::memory_order_relaxed));
}
/** Release exclusive lock_sys.latch */
void lock_sys_t::wr_unlock()
{
  ut_ad(writer.exchange(0, std::memory_order_relaxed) ==
        os_thread_get_curr_id());
  latch.wr_unlock();
}
404

405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
/** Acquire shared lock_sys.latch */
void lock_sys_t::rd_lock(const char *file, unsigned line)
{
  latch.rd_lock(file, line);
  ut_ad(!writer.load(std::memory_order_relaxed));
  ut_d(readers.fetch_add(1, std::memory_order_relaxed));
}

/** Release shared lock_sys.latch */
void lock_sys_t::rd_unlock()
{
  ut_ad(!writer.load(std::memory_order_relaxed));
  ut_ad(readers.fetch_sub(1, std::memory_order_relaxed));
  latch.rd_unlock();
}
#endif
421

422 423 424 425 426 427
/**
  Resize the lock hash table.

  @param[in] n_cells number of slots in lock hash table
*/
void lock_sys_t::resize(ulint n_cells)
428
{
429 430 431 432 433
  ut_ad(this == &lock_sys);
  LockMutexGuard g{SRW_LOCK_CALL};
  rec_hash.resize(n_cells);
  prdt_hash.resize(n_cells);
  prdt_page_hash.resize(n_cells);
434 435
}

436 437
/** Closes the lock system at database shutdown. */
void lock_sys_t::close()
438
{
439
  ut_ad(this == &lock_sys);
440

441 442
  if (!m_initialised)
    return;
443

444 445 446 447 448
  if (lock_latest_err_file)
  {
    my_fclose(lock_latest_err_file, MYF(MY_WME));
    lock_latest_err_file= nullptr;
  }
449

450 451 452
  rec_hash.free();
  prdt_hash.free();
  prdt_page_hash.free();
453 454 455 456
#if defined SRW_LOCK_DUMMY && !defined _WIN32
  pthread_mutex_destroy(&hash_mutex);
  pthread_cond_destroy(&hash_cond);
#endif
457

458
  latch.destroy();
459
  mysql_mutex_destroy(&wait_mutex);
460

461 462 463
  Deadlock::to_check.clear();
  Deadlock::to_be_checked= false;

464
  m_initialised= false;
465 466
}

467
#ifdef UNIV_DEBUG
468
#ifdef WITH_WSREP
469 470 471 472 473 474
/** Check if both conflicting lock transaction and other transaction
requesting record lock are brute force (BF). If they are check is
this BF-BF wait correct and if not report BF wait and assert.

@param[in]	lock_rec	other waiting record lock
@param[in]	trx		trx requesting conflicting record lock
475
*/
476
static void wsrep_assert_no_bf_bf_wait(const lock_t *lock, const trx_t *trx)
477
{
Marko Mäkelä's avatar
Marko Mäkelä committed
478 479
	ut_ad(!lock->is_table());
	lock_sys.assert_locked(*lock);
480
	trx_t* lock_trx= lock->trx;
481

Marko Mäkelä's avatar
Marko Mäkelä committed
482
	/* Note that we are holding lock_sys.latch, thus we should
Marko Mäkelä's avatar
Marko Mäkelä committed
483
	not acquire THD::LOCK_thd_data mutex below to avoid latching
484
	order violation. */
485

486
	if (!trx->is_wsrep() || !lock_trx->is_wsrep())
487
		return;
488 489
	if (UNIV_LIKELY(!wsrep_thd_is_BF(trx->mysql_thd, FALSE))
	    || UNIV_LIKELY(!wsrep_thd_is_BF(lock_trx->mysql_thd, FALSE)))
490 491
		return;

492 493
	ut_ad(trx->state == TRX_STATE_ACTIVE);

Marko Mäkelä's avatar
Marko Mäkelä committed
494
	switch (lock_trx->state) {
495
	case TRX_STATE_COMMITTED_IN_MEMORY:
Marko Mäkelä's avatar
Marko Mäkelä committed
496 497
		/* The state change is only protected by trx_t::mutex,
		which we are not even holding here. */
498
	case TRX_STATE_PREPARED:
Marko Mäkelä's avatar
Marko Mäkelä committed
499 500
		/* Wait for lock->trx to complete the commit
		(or XA ROLLBACK) and to release the lock. */
501
		return;
502 503 504 505 506
	case TRX_STATE_ACTIVE:
		break;
	default:
		ut_ad("invalid state" == 0);
	}
507

508 509 510 511 512
	/* If BF - BF order is honored, i.e. trx already holding
	record lock should be ordered before this new lock request
	we can keep trx waiting for the lock. If conflicting
	transaction is already aborting or rolling back for replaying
	we can also let new transaction waiting. */
Marko Mäkelä's avatar
Marko Mäkelä committed
513 514
	if (wsrep_thd_order_before(lock_trx->mysql_thd, trx->mysql_thd)
	    || wsrep_thd_is_aborting(lock_trx->mysql_thd)) {
515 516 517
		return;
	}

518 519 520
	mtr_t mtr;

	ib::error() << "Conflicting lock on table: "
521
		    << lock->index->table->name
522
		    << " index: "
523
		    << lock->index->name()
524
		    << " that has lock ";
525
	lock_rec_print(stderr, lock, mtr);
526 527 528

	ib::error() << "WSREP state: ";

529 530 531 532
	wsrep_report_bf_lock_wait(trx->mysql_thd,
				  trx->id);
	wsrep_report_bf_lock_wait(lock_trx->mysql_thd,
				  lock_trx->id);
533 534 535
	/* BF-BF wait is a bug */
	ut_error;
}
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553

/*********************************************************************//**
check if lock timeout was for priority thread,
as a side effect trigger lock monitor
@param[in]    trx    transaction owning the lock
@param[in]    locked true if trx and lock_sys.latch is held
@return	false for regular lock timeout */
static
bool
wsrep_is_BF_lock_timeout(
	const trx_t*	trx,
	bool		locked = true)
{
	if (trx->error_state != DB_DEADLOCK && trx->is_wsrep() &&
	    srv_monitor_timer && wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
		ib::info() << "WSREP: BF lock wait long for trx:" << ib::hex(trx->id)
			   << " query: " << wsrep_thd_query(trx->mysql_thd);
		if (!locked) {
554
			LockMutexGuard g{SRW_LOCK_CALL};
555 556
			trx_print_latched(stderr, trx, 3000);
		} else {
557
			lock_sys.assert_locked();
558 559 560 561 562 563 564 565 566 567
			trx_print_latched(stderr, trx, 3000);
		}

		srv_print_innodb_monitor 	= TRUE;
		srv_print_innodb_lock_monitor 	= TRUE;
		srv_monitor_timer_schedule_now();
		return true;
	}
	return false;
}
568
#endif /* WITH_WSREP */
569
#endif /* UNIV_DEBUG */
570

571 572
/*********************************************************************//**
Checks if a lock request for a new lock has to wait for request lock2.
573
@return TRUE if new lock has to wait for lock2 to be removed */
574
UNIV_INLINE
575
bool
576 577
lock_rec_has_to_wait(
/*=================*/
578 579
	bool		for_locking,
				/*!< in is called locking or releasing */
580
	const trx_t*	trx,	/*!< in: trx of new lock */
581
	unsigned	type_mode,/*!< in: precise mode of the new lock
582 583 584 585 586 587 588
				to set: LOCK_S or LOCK_X, possibly
				ORed to LOCK_GAP or LOCK_REC_NOT_GAP,
				LOCK_INSERT_INTENTION */
	const lock_t*	lock2,	/*!< in: another record lock; NOTE that
				it is assumed that this has a lock bit
				set on the same record as in the new
				lock we are setting */
589 590
	bool		lock_is_on_supremum)
				/*!< in: TRUE if we are setting the
591 592 593 594
				lock on the 'supremum' record of an
				index page: we know then that the lock
				request is really for a 'gap' type lock */
{
595 596
	ut_ad(trx);
	ut_ad(!lock2->is_table());
Marko Mäkelä's avatar
Marko Mäkelä committed
597 598
	ut_d(lock_sys.hash_get(type_mode).assert_locked(
		     lock2->un_member.rec_lock.page_id));
599

600 601 602
	if (trx == lock2->trx
	    || lock_mode_compatible(
		       static_cast<lock_mode>(LOCK_MODE_MASK & type_mode),
603
		       lock2->mode())) {
Eugene Kosov's avatar
Eugene Kosov committed
604
		return false;
605
	}
606

607 608
	/* We have somewhat complex rules when gap type record locks
	cause waits */
609

610 611
	if ((lock_is_on_supremum || (type_mode & LOCK_GAP))
	    && !(type_mode & LOCK_INSERT_INTENTION)) {
612

613 614 615 616
		/* Gap type locks without LOCK_INSERT_INTENTION flag
		do not need to wait for anything. This is because
		different users can have conflicting lock types
		on gaps. */
617

Eugene Kosov's avatar
Eugene Kosov committed
618
		return false;
619
	}
620

621
	if (!(type_mode & LOCK_INSERT_INTENTION) && lock2->is_gap()) {
622

623 624
		/* Record lock (LOCK_ORDINARY or LOCK_REC_NOT_GAP
		does not need to wait for a gap type lock */
625

Eugene Kosov's avatar
Eugene Kosov committed
626
		return false;
627
	}
628

629
	if ((type_mode & LOCK_GAP) && lock2->is_record_not_gap()) {
630

631 632
		/* Lock on gap does not need to wait for
		a LOCK_REC_NOT_GAP type lock */
633

Eugene Kosov's avatar
Eugene Kosov committed
634
		return false;
635
	}
636

637
	if (lock2->is_insert_intention()) {
638 639 640 641 642 643 644
		/* No lock request needs to wait for an insert
		intention lock to be removed. This is ok since our
		rules allow conflicting locks on gaps. This eliminates
		a spurious deadlock caused by a next-key lock waiting
		for an insert intention lock; when the insert
		intention lock was granted, the insert deadlocked on
		the waiting next-key lock.
645

646 647
		Also, insert intention locks do not disturb each
		other. */
648

Eugene Kosov's avatar
Eugene Kosov committed
649
		return false;
650
	}
651

652
#ifdef HAVE_REPLICATION
653
	if ((type_mode & LOCK_GAP || lock2->is_gap())
654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
	    && !thd_need_ordering_with(trx->mysql_thd, lock2->trx->mysql_thd)) {
		/* If the upper server layer has already decided on the
		commit order between the transaction requesting the
		lock and the transaction owning the lock, we do not
		need to wait for gap locks. Such ordeering by the upper
		server layer happens in parallel replication, where the
		commit order is fixed to match the original order on the
		master.

		Such gap locks are mainly needed to get serialisability
		between transactions so that they will be binlogged in
		the correct order so that statement-based replication
		will give the correct results. Since the right order
		was already determined on the master, we do not need
		to enforce it again here.

		Skipping the locks is not essential for correctness,
		since in case of deadlock we will just kill the later
		transaction and retry it. But it can save some
		unnecessary rollbacks and retries. */

Eugene Kosov's avatar
Eugene Kosov committed
675
		return false;
676
	}
677
#endif /* HAVE_REPLICATION */
678

679
#ifdef WITH_WSREP
680 681 682 683 684
		/* New lock request from a transaction is using unique key
		scan and this transaction is a wsrep high priority transaction
		(brute force). If conflicting transaction is also wsrep high
		priority transaction we should avoid lock conflict because
		ordering of these transactions is already decided and
Marko Mäkelä's avatar
Marko Mäkelä committed
685
		conflicting transaction will be later replayed. */
686
		if (trx->is_wsrep_UK_scan()
687
		    && wsrep_thd_is_BF(lock2->trx->mysql_thd, false)) {
688
			return false;
689 690
		}

691 692 693 694 695
		/* We very well can let bf to wait normally as other
		BF will be replayed in case of conflict. For debug
		builds we will do additional sanity checks to catch
		unsupported bf wait if any. */
		ut_d(wsrep_assert_no_bf_bf_wait(lock2, trx));
696
#endif /* WITH_WSREP */
697

Eugene Kosov's avatar
Eugene Kosov committed
698
	return true;
699 700 701 702
}

/*********************************************************************//**
Checks if a lock request lock1 has to wait for request lock2.
703
@return TRUE if lock1 has to wait for lock2 to be removed */
Eugene Kosov's avatar
Eugene Kosov committed
704
bool
705 706 707 708 709 710 711 712 713 714
lock_has_to_wait(
/*=============*/
	const lock_t*	lock1,	/*!< in: waiting lock */
	const lock_t*	lock2)	/*!< in: another lock; NOTE that it is
				assumed that this has a lock bit set
				on the same record as in lock1 if the
				locks are record locks */
{
	ut_ad(lock1 && lock2);

Eugene Kosov's avatar
Eugene Kosov committed
715
	if (lock1->trx == lock2->trx
716
	    || lock_mode_compatible(lock1->mode(), lock2->mode())) {
Eugene Kosov's avatar
Eugene Kosov committed
717
		return false;
Eugene Kosov's avatar
Eugene Kosov committed
718
	}
719

720
	if (lock1->is_table()) {
Eugene Kosov's avatar
Eugene Kosov committed
721
		return true;
722 723
	}

724
	ut_ad(!lock2->is_table());
Eugene Kosov's avatar
Eugene Kosov committed
725 726

	if (lock1->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)) {
Eugene Kosov's avatar
Eugene Kosov committed
727
		return lock_prdt_has_to_wait(lock1->trx, lock1->type_mode,
Eugene Kosov's avatar
Eugene Kosov committed
728
					     lock_get_prdt_from_lock(lock1),
Eugene Kosov's avatar
Eugene Kosov committed
729
					     lock2);
Eugene Kosov's avatar
Eugene Kosov committed
730 731
	}

Eugene Kosov's avatar
Eugene Kosov committed
732 733 734
	return lock_rec_has_to_wait(
		false, lock1->trx, lock1->type_mode, lock2,
		lock_rec_get_nth_bit(lock1, PAGE_HEAP_NO_SUPREMUM));
735 736 737 738 739 740 741 742 743 744 745 746 747 748
}

/*============== RECORD LOCK BASIC FUNCTIONS ============================*/

/**********************************************************************//**
Looks for a set bit in a record lock bitmap. Returns ULINT_UNDEFINED,
if none found.
@return bit index == heap number of the record, or ULINT_UNDEFINED if
none found */
ulint
lock_rec_find_set_bit(
/*==================*/
	const lock_t*	lock)	/*!< in: record lock with at least one bit set */
{
749
	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771

		if (lock_rec_get_nth_bit(lock, i)) {

			return(i);
		}
	}

	return(ULINT_UNDEFINED);
}

/*********************************************************************//**
Resets the record lock bitmap to zero. NOTE: does not touch the wait_lock
pointer in the transaction! This function is used in lock object creation
and resetting. */
static
void
lock_rec_bitmap_reset(
/*==================*/
	lock_t*	lock)	/*!< in: record lock */
{
	ulint	n_bytes;

772
	ut_ad(!lock->is_table());
773 774 775 776 777 778 779 780

	/* Reset to zero the bitmap which resides immediately after the lock
	struct */

	n_bytes = lock_rec_get_n_bits(lock) / 8;

	ut_ad((lock_rec_get_n_bits(lock) % 8) == 0);

781
	memset(reinterpret_cast<void*>(&lock[1]), 0, n_bytes);
782 783 784 785
}

/*********************************************************************//**
Copies a record lock to heap.
786
@return copy of lock */
787 788 789 790 791 792 793 794 795
static
lock_t*
lock_rec_copy(
/*==========*/
	const lock_t*	lock,	/*!< in: record lock */
	mem_heap_t*	heap)	/*!< in: memory heap */
{
	ulint	size;

796
	ut_ad(!lock->is_table());
797 798 799 800 801 802 803 804

	size = sizeof(lock_t) + lock_rec_get_n_bits(lock) / 8;

	return(static_cast<lock_t*>(mem_heap_dup(heap, lock, size)));
}

/*********************************************************************//**
Gets the previous record lock set on a record.
805
@return previous lock on the same record, NULL if none exists */
806 807 808 809 810 811
const lock_t*
lock_rec_get_prev(
/*==============*/
	const lock_t*	in_lock,/*!< in: record lock */
	ulint		heap_no)/*!< in: heap number of the record */
{
812 813 814
  ut_ad(!in_lock->is_table());
  const page_id_t id{in_lock->un_member.rec_lock.page_id};
  hash_cell_t *cell= lock_sys.hash_get(in_lock->type_mode).cell_get(id.fold());
815

816 817 818 819
  for (lock_t *lock= lock_sys_t::get_first(*cell, id); lock != in_lock;
       lock= lock_rec_get_next_on_page(lock))
    if (lock_rec_get_nth_bit(lock, heap_no))
      return lock;
820

821
  return nullptr;
822 823 824 825 826 827 828
}

/*============= FUNCTIONS FOR ANALYZING RECORD LOCK QUEUE ================*/

/*********************************************************************//**
Checks if a transaction has a GRANTED explicit lock on rec stronger or equal
to precise_mode.
829
@return lock or NULL */
830 831 832 833 834 835 836 837 838
UNIV_INLINE
lock_t*
lock_rec_has_expl(
/*==============*/
	ulint			precise_mode,/*!< in: LOCK_S or LOCK_X
					possibly ORed to LOCK_GAP or
					LOCK_REC_NOT_GAP, for a
					supremum record we regard this
					always a gap type request */
839
	const hash_cell_t&	cell,	/*!< in: lock hash table cell */
840
	const page_id_t		id,	/*!< in: page identifier */
841 842 843
	ulint			heap_no,/*!< in: heap number of the record */
	const trx_t*		trx)	/*!< in: transaction */
{
844 845 846
  ut_ad((precise_mode & LOCK_MODE_MASK) == LOCK_S
	|| (precise_mode & LOCK_MODE_MASK) == LOCK_X);
  ut_ad(!(precise_mode & LOCK_INSERT_INTENTION));
847

848
  for (lock_t *lock= lock_sys_t::get_first(cell, id, heap_no); lock;
849
       lock= lock_rec_get_next(heap_no, lock))
850 851 852 853 854 855 856 857
    if (lock->trx == trx &&
	!(lock->type_mode & (LOCK_WAIT | LOCK_INSERT_INTENTION)) &&
	(!((LOCK_REC_NOT_GAP | LOCK_GAP) & lock->type_mode) ||
	 heap_no == PAGE_HEAP_NO_SUPREMUM ||
	 ((LOCK_REC_NOT_GAP | LOCK_GAP) & precise_mode & lock->type_mode)) &&
	lock_mode_stronger_or_eq(lock->mode(), static_cast<lock_mode>
				 (precise_mode & LOCK_MODE_MASK)))
      return lock;
858

859
  return nullptr;
860 861 862 863 864
}

#ifdef UNIV_DEBUG
/*********************************************************************//**
Checks if some other transaction has a lock request in the queue.
865
@return lock or NULL */
866
static
867
lock_t*
868 869
lock_rec_other_has_expl_req(
/*========================*/
870
	lock_mode		mode,	/*!< in: LOCK_S or LOCK_X */
871
	const hash_cell_t&	cell,	/*!< in: lock hash table cell */
872
	const page_id_t		id,	/*!< in: page identifier */
873 874
	bool			wait,	/*!< in: whether also waiting locks
					are taken into account */
875 876 877 878 879 880 881
	ulint			heap_no,/*!< in: heap number of the record */
	const trx_t*		trx)	/*!< in: transaction, or NULL if
					requests by all transactions
					are taken into account */
{
	ut_ad(mode == LOCK_X || mode == LOCK_S);

882 883 884 885 886 887
	/* Only GAP lock can be on SUPREMUM, and we are not looking for
	GAP lock */
	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
		return(NULL);
	}

888
	for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no);
889
	     lock; lock = lock_rec_get_next(heap_no, lock)) {
890
		if (lock->trx != trx
891 892 893
		    && !lock->is_gap()
		    && (!lock->is_waiting() || wait)
		    && lock_mode_stronger_or_eq(lock->mode(), mode)) {
894 895 896 897 898 899 900 901 902

			return(lock);
		}
	}

	return(NULL);
}
#endif /* UNIV_DEBUG */

903
#ifdef WITH_WSREP
904
void lock_wait_wsrep_kill(trx_t *bf_trx, ulong thd_id, trx_id_t trx_id);
905

906 907 908 909
/** Kill the holders of conflicting locks.
@param trx   brute-force applier transaction running in the current thread */
ATTRIBUTE_COLD ATTRIBUTE_NOINLINE static void lock_wait_wsrep(trx_t *trx)
{
910
  DBUG_ASSERT(wsrep_on(trx->mysql_thd));
911 912
  if (!wsrep_thd_is_BF(trx->mysql_thd, false))
    return;
Marko Mäkelä's avatar
Marko Mäkelä committed
913

914
  std::set<trx_t*> victims;
915

916 917 918 919 920
  lock_sys.wr_lock(SRW_LOCK_CALL);
  mysql_mutex_lock(&lock_sys.wait_mutex);

  const lock_t *wait_lock= trx->lock.wait_lock;
  if (!wait_lock)
921 922
  {
func_exit:
923 924 925
    lock_sys.wr_unlock();
    mysql_mutex_unlock(&lock_sys.wait_mutex);
    return;
926
  }
927

928 929 930 931 932 933 934 935
  if (wait_lock->is_table())
  {
    dict_table_t *table= wait_lock->un_member.tab_lock.table;
    for (lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock;
         lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
      if (lock->trx != trx)
        victims.emplace(lock->trx);
  }
936
  else
937
  {
938 939 940 941 942 943 944 945 946 947 948 949 950 951
    const page_id_t id{wait_lock->un_member.rec_lock.page_id};
    hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                         ? lock_sys.prdt_hash : lock_sys.rec_hash).cell_get
      (id.fold());
    if (lock_t *lock= lock_sys_t::get_first(cell, id))
    {
      const ulint heap_no= lock_rec_find_set_bit(wait_lock);
      if (!lock_rec_get_nth_bit(lock, heap_no))
        lock= lock_rec_get_next(heap_no, lock);
      do
        if (lock->trx != trx)
          victims.emplace(lock->trx);
      while ((lock= lock_rec_get_next(heap_no, lock)));
    }
952
  }
953

954 955
  if (victims.empty())
    goto func_exit;
956

957 958 959 960
  std::vector<std::pair<ulong,trx_id_t>> victim_id;
  for (trx_t *v : victims)
    victim_id.emplace_back(std::pair<ulong,trx_id_t>
                           {thd_get_thread_id(v->mysql_thd), v->id});
961

962 963 964 965 966 967 968 969
  DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort",
                  {
                    const char act[]=
                      "now SIGNAL sync.before_wsrep_thd_abort_reached "
                      "WAIT_FOR signal.before_wsrep_thd_abort";
                    DBUG_ASSERT(!debug_sync_set_action(trx->mysql_thd,
                                                       STRING_WITH_LEN(act)));
                  };);
970

971 972
  lock_sys.wr_unlock();
  mysql_mutex_unlock(&lock_sys.wait_mutex);
973

974 975
  for (const auto &v : victim_id)
    lock_wait_wsrep_kill(trx, v.first, v.second);
976
}
977 978
#endif /* WITH_WSREP */

979 980 981
/*********************************************************************//**
Checks if some other transaction has a conflicting explicit lock request
in the queue, so that we have to wait.
982
@return lock or NULL */
983
static
984
lock_t*
985 986
lock_rec_other_has_conflicting(
/*===========================*/
987
	unsigned		mode,	/*!< in: LOCK_S or LOCK_X,
988 989 990
					possibly ORed to LOCK_GAP or
					LOC_REC_NOT_GAP,
					LOCK_INSERT_INTENTION */
991
	const hash_cell_t&	cell,	/*!< in: lock hash table cell */
992
	const page_id_t		id,	/*!< in: page identifier */
993 994 995
	ulint			heap_no,/*!< in: heap number of the record */
	const trx_t*		trx)	/*!< in: our transaction */
{
996
	bool	is_supremum = (heap_no == PAGE_HEAP_NO_SUPREMUM);
997

998
	for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no);
999
	     lock; lock = lock_rec_get_next(heap_no, lock)) {
1000
		if (lock_rec_has_to_wait(true, trx, mode, lock, is_supremum)) {
1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
			return(lock);
		}
	}

	return(NULL);
}

/*********************************************************************//**
Checks if some transaction has an implicit x-lock on a record in a secondary
index.
1011
@return transaction id of the transaction which has the x-lock, or 0;
1012 1013 1014 1015
NOTE that this function can return false positives but never false
negatives. The caller must confirm all positive results by calling
trx_is_active(). */
static
1016
trx_t*
1017 1018
lock_sec_rec_some_has_impl(
/*=======================*/
1019
	trx_t*		caller_trx,/*!<in/out: trx of current thread */
1020 1021
	const rec_t*	rec,	/*!< in: user record */
	dict_index_t*	index,	/*!< in: secondary index */
1022
	const rec_offs*	offsets)/*!< in: rec_get_offsets(rec, index) */
1023
{
1024
	trx_t*		trx;
1025 1026 1027
	trx_id_t	max_trx_id;
	const page_t*	page = page_align(rec);

1028
	lock_sys.assert_unlocked();
1029 1030 1031
	ut_ad(!dict_index_is_clust(index));
	ut_ad(page_rec_is_user_rec(rec));
	ut_ad(rec_offs_validate(rec, index, offsets));
1032
	ut_ad(!rec_is_metadata(rec, *index));
1033 1034 1035 1036 1037

	max_trx_id = page_get_max_trx_id(page);

	/* Some transaction may have an implicit x-lock on the record only
	if the max trx id for the page >= min trx id for the trx list, or
Marko Mäkelä's avatar
Marko Mäkelä committed
1038
	database recovery is running. */
1039

1040
	if (max_trx_id < trx_sys.get_min_trx_id()) {
1041

1042
		trx = 0;
1043 1044 1045 1046

	} else if (!lock_check_trx_id_sanity(max_trx_id, rec, index, offsets)) {

		/* The page is corrupt: try to avoid a crash by returning 0 */
1047
		trx = 0;
1048 1049 1050 1051 1052

	/* In this case it is possible that some transaction has an implicit
	x-lock. We have to look in the clustered index. */

	} else {
1053
		trx = row_vers_impl_x_locked(caller_trx, rec, index, offsets);
1054 1055
	}

1056
	return(trx);
1057 1058
}

1059 1060
/*********************************************************************//**
Return the number of table locks for a transaction.
1061
The caller must be holding lock_sys.latch. */
1062 1063 1064 1065
ulint
lock_number_of_tables_locked(
/*=========================*/
	const trx_lock_t*	trx_lock)	/*!< in: transaction locks */
1066 1067
{
	const lock_t*	lock;
1068
	ulint		n_tables = 0;
1069

1070
	lock_sys.assert_locked();
1071 1072 1073 1074 1075

	for (lock = UT_LIST_GET_FIRST(trx_lock->trx_locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {

1076
		if (lock->is_table()) {
1077
			n_tables++;
1078 1079 1080
		}
	}

1081
	return(n_tables);
1082 1083 1084 1085
}

/*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/

1086 1087 1088 1089
/** Reset the wait status of a lock.
@param[in,out]	lock	lock that was possibly being waited for */
static void lock_reset_lock_and_trx_wait(lock_t *lock)
{
1090 1091 1092
  lock_sys.assert_locked(*lock);
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  trx_t *trx= lock->trx;
1093
  ut_ad(lock->is_waiting());
1094
  ut_ad(!trx->lock.wait_lock || trx->lock.wait_lock == lock);
1095 1096
  if (trx_t *wait_trx= trx->lock.wait_trx)
    Deadlock::to_check.erase(wait_trx);
1097
  trx->lock.wait_lock= nullptr;
1098
  trx->lock.wait_trx= nullptr;
1099 1100 1101
  lock->type_mode&= ~LOCK_WAIT;
}

1102 1103
/** Create a new record lock and inserts it to the lock queue,
without checking for deadlocks or conflicts.
1104
@param[in]	c_lock		conflicting lock
1105
@param[in]	type_mode	lock mode and wait flag
1106
@param[in]	page_id		index page number
1107 1108 1109 1110 1111 1112
@param[in]	page		R-tree index page, or NULL
@param[in]	heap_no		record heap number in the index page
@param[in]	index		the index tree
@param[in,out]	trx		transaction
@param[in]	holds_trx_mutex	whether the caller holds trx->mutex
@return created lock */
1113
lock_t*
1114
lock_rec_create_low(
1115
	lock_t*		c_lock,
1116
	unsigned	type_mode,
1117
	const page_id_t	page_id,
1118 1119
	const page_t*	page,
	ulint		heap_no,
1120
	dict_index_t*	index,
1121 1122
	trx_t*		trx,
	bool		holds_trx_mutex)
1123
{
1124 1125
	lock_t*		lock;
	ulint		n_bytes;
1126

1127
	ut_d(lock_sys.hash_get(type_mode).assert_locked(page_id));
1128
	ut_ad(holds_trx_mutex == trx->mutex_is_owner());
1129
	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1130
	ut_ad(!(type_mode & LOCK_TABLE));
1131 1132
	ut_ad(trx->state != TRX_STATE_NOT_STARTED);
	ut_ad(!trx_is_autocommit_non_locking(trx));
1133

1134 1135 1136
	/* If rec is the supremum record, then we reset the gap and
	LOCK_REC_NOT_GAP bits, as all locks on the supremum are
	automatically of the gap type */
1137

1138 1139 1140 1141
	if (UNIV_UNLIKELY(heap_no == PAGE_HEAP_NO_SUPREMUM)) {
		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));
		type_mode = type_mode & ~(LOCK_GAP | LOCK_REC_NOT_GAP);
	}
1142

1143
	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
Marko Mäkelä's avatar
Marko Mäkelä committed
1144
		n_bytes = (page_dir_get_n_heap(page) + 7) / 8;
1145
	} else {
1146
		ut_ad(heap_no == PRDT_HEAPNO);
1147

1148 1149 1150 1151
		/* The lock is always on PAGE_HEAP_NO_INFIMUM (0), so
		we only need 1 bit (which round up to 1 byte) for
		lock bit setting */
		n_bytes = 1;
1152

1153 1154
		if (type_mode & LOCK_PREDICATE) {
			ulint	tmp = UNIV_WORD_SIZE - 1;
1155

1156 1157 1158 1159 1160 1161 1162 1163
			/* We will attach predicate structure after lock.
			Make sure the memory is aligned on 8 bytes,
			the mem_heap_alloc will align it with
			MEM_SPACE_NEEDED anyway. */
			n_bytes = (n_bytes + sizeof(lock_prdt_t) + tmp) & ~tmp;
			ut_ad(n_bytes == sizeof(lock_prdt_t) + UNIV_WORD_SIZE);
		}
	}
1164

1165 1166 1167 1168 1169 1170
	if (!holds_trx_mutex) {
		trx->mutex_lock();
	}
	ut_ad(trx->mutex_is_owner());
	ut_ad(trx->state != TRX_STATE_NOT_STARTED);

1171 1172
	if (trx->lock.rec_cached >= UT_ARR_SIZE(trx->lock.rec_pool)
	    || sizeof *lock + n_bytes > sizeof *trx->lock.rec_pool) {
1173 1174 1175
		lock = static_cast<lock_t*>(
			mem_heap_alloc(trx->lock.lock_heap,
				       sizeof *lock + n_bytes));
1176
	} else {
1177
		lock = &trx->lock.rec_pool[trx->lock.rec_cached++].lock;
1178
	}
1179 1180

	lock->trx = trx;
1181
	lock->type_mode = type_mode;
1182
	lock->index = index;
1183
	lock->un_member.rec_lock.page_id = page_id;
1184

1185 1186 1187 1188 1189 1190 1191 1192 1193
	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
		lock->un_member.rec_lock.n_bits = uint32_t(n_bytes * 8);
	} else {
		/* Predicate lock always on INFIMUM (0) */
		lock->un_member.rec_lock.n_bits = 8;
 	}
	lock_rec_bitmap_reset(lock);
	lock_rec_set_nth_bit(lock, heap_no);
	index->table->n_rec_locks++;
1194
	ut_ad(index->table->get_ref_count() || !index->table->can_be_evicted);
1195

1196 1197
	const auto lock_hash = &lock_sys.hash_get(type_mode);
	HASH_INSERT(lock_t, hash, lock_hash, page_id.fold(), lock);
1198

1199
	if (type_mode & LOCK_WAIT) {
1200 1201 1202 1203 1204 1205 1206 1207 1208
		if (trx->lock.wait_trx) {
			ut_ad(!c_lock || trx->lock.wait_trx == c_lock->trx);
			ut_ad(trx->lock.wait_lock);
			ut_ad((*trx->lock.wait_lock).trx == trx);
		} else {
			ut_ad(c_lock);
			trx->lock.wait_trx = c_lock->trx;
			ut_ad(!trx->lock.wait_lock);
		}
1209
		trx->lock.wait_lock = lock;
1210 1211 1212
	}
	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
	if (!holds_trx_mutex) {
1213
		trx->mutex_unlock();
1214
	}
1215
	MONITOR_INC(MONITOR_RECLOCK_CREATED);
1216
	MONITOR_INC(MONITOR_NUM_RECLOCK);
1217

1218
	return lock;
1219 1220
}

1221 1222
/** Enqueue a waiting request for a lock which cannot be granted immediately.
Check for deadlocks.
1223
@param[in]	c_lock		conflicting lock
1224 1225 1226 1227 1228 1229 1230
@param[in]	type_mode	the requested lock mode (LOCK_S or LOCK_X)
				possibly ORed with LOCK_GAP or
				LOCK_REC_NOT_GAP, ORed with
				LOCK_INSERT_INTENTION if this
				waiting lock request is set
				when performing an insert of
				an index record
1231 1232
@param[in]	id		page identifier
@param[in]	page		leaf page in the index
1233 1234 1235 1236 1237
@param[in]	heap_no		record heap number in the block
@param[in]	index		index tree
@param[in,out]	thr		query thread
@param[in]	prdt		minimum bounding box (spatial index)
@retval	DB_LOCK_WAIT		if the waiting lock was enqueued
1238
@retval	DB_DEADLOCK		if this transaction was chosen as the victim */
1239 1240
dberr_t
lock_rec_enqueue_waiting(
1241
	lock_t*			c_lock,
1242
	unsigned		type_mode,
1243 1244
	const page_id_t		id,
	const page_t*		page,
1245 1246 1247 1248
	ulint			heap_no,
	dict_index_t*		index,
	que_thr_t*		thr,
	lock_prdt_t*		prdt)
1249
{
1250
	ut_d(lock_sys.hash_get(type_mode).assert_locked(id));
1251 1252
	ut_ad(!srv_read_only_mode);
	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1253

1254
	trx_t* trx = thr_get_trx(thr);
1255
	ut_ad(trx->mutex_is_owner());
1256

1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268
	switch (trx_get_dict_operation(trx)) {
	case TRX_DICT_OP_NONE:
		break;
	case TRX_DICT_OP_TABLE:
	case TRX_DICT_OP_INDEX:
		ib::error() << "A record lock wait happens in a dictionary"
			" operation. index "
			<< index->name
			<< " of table "
			<< index->table->name
			<< ". " << BUG_REPORT_MSG;
		ut_ad(0);
1269 1270
	}

1271 1272 1273 1274 1275
	if (trx->mysql_thd && thd_lock_wait_timeout(trx->mysql_thd) == 0) {
		trx->error_state = DB_LOCK_WAIT_TIMEOUT;
		return DB_LOCK_WAIT_TIMEOUT;
	}

1276 1277
	/* Enqueue the lock request that will wait to be granted, note that
	we already own the trx mutex. */
1278
	lock_t* lock = lock_rec_create_low(
1279
		c_lock,
1280
		type_mode | LOCK_WAIT, id, page, heap_no, index, trx, true);
1281

1282 1283
	if (prdt && type_mode & LOCK_PREDICATE) {
		lock_prdt_set_prdt(lock, prdt);
1284
	}
1285

1286
	trx->lock.wait_thr = thr;
1287 1288
	trx->lock.was_chosen_as_deadlock_victim
		IF_WSREP(.fetch_and(byte(~1)), = false);
1289

1290 1291 1292
	DBUG_LOG("ib_lock", "trx " << ib::hex(trx->id)
		 << " waits for lock in index " << index->name
		 << " of table " << index->table->name);
1293

1294
	MONITOR_INC(MONITOR_LOCKREC_WAIT);
1295

1296
	return DB_LOCK_WAIT;
1297 1298
}

1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311
/*********************************************************************//**
Looks for a suitable type record lock struct by the same trx on the same page.
This can be used to save space when a new record lock should be set on a page:
no new struct is needed, if a suitable old is found.
@return lock or NULL */
static inline
lock_t*
lock_rec_find_similar_on_page(
	ulint           type_mode,      /*!< in: lock type_mode field */
	ulint           heap_no,        /*!< in: heap number of the record */
	lock_t*         lock,           /*!< in: lock_sys.get_first() */
	const trx_t*    trx)            /*!< in: transaction */
{
1312
	lock_sys.rec_hash.assert_locked(lock->un_member.rec_lock.page_id);
1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328

	for (/* No op */;
	     lock != NULL;
	     lock = lock_rec_get_next_on_page(lock)) {

		if (lock->trx == trx
		    && lock->type_mode == type_mode
		    && lock_rec_get_n_bits(lock) > heap_no) {

			return(lock);
		}
	}

	return(NULL);
}

1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340
/*********************************************************************//**
Adds a record lock request in the record queue. The request is normally
added as the last in the queue, but if there are no waiting lock requests
on the record, and the request to be added is not a waiting request, we
can reuse a suitable record lock object already existing on the same page,
just setting the appropriate bit in its bitmap. This is a low-level function
which does NOT check for deadlocks or lock compatibility!
@return lock where the bit was set */
static
void
lock_rec_add_to_queue(
/*==================*/
1341
	unsigned		type_mode,/*!< in: lock mode, wait, gap
1342
					etc. flags */
1343
	hash_cell_t&		cell,	/*!< in,out: first hash table cell */
1344 1345
	const page_id_t		id,	/*!< in: page identifier */
	const page_t*		page,	/*!< in: buffer block containing
1346 1347 1348 1349 1350 1351 1352 1353
					the record */
	ulint			heap_no,/*!< in: heap number of the record */
	dict_index_t*		index,	/*!< in: index of record */
	trx_t*			trx,	/*!< in/out: transaction */
	bool			caller_owns_trx_mutex)
					/*!< in: TRUE if caller owns the
					transaction mutex */
{
1354
	ut_d(lock_sys.hash_get(type_mode).assert_locked(id));
1355
	ut_ad(caller_owns_trx_mutex == trx->mutex_is_owner());
1356
	ut_ad(index->is_primary()
1357
	      || dict_index_get_online_status(index) != ONLINE_INDEX_CREATION);
1358
	ut_ad(!(type_mode & LOCK_TABLE));
1359
#ifdef UNIV_DEBUG
1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373
	switch (type_mode & LOCK_MODE_MASK) {
	case LOCK_X:
	case LOCK_S:
		break;
	default:
		ut_error;
	}

	if (!(type_mode & (LOCK_WAIT | LOCK_GAP))) {
		lock_mode	mode = (type_mode & LOCK_MODE_MASK) == LOCK_S
			? LOCK_X
			: LOCK_S;
		const lock_t*	other_lock
			= lock_rec_other_has_expl_req(
1374
				mode, cell, id, false, heap_no, trx);
1375
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
1376
		if (UNIV_LIKELY_NULL(other_lock) && trx->is_wsrep()) {
1377 1378 1379 1380 1381 1382 1383 1384 1385 1386
			/* Only BF transaction may be granted lock
			before other conflicting lock request. */
			if (!wsrep_thd_is_BF(trx->mysql_thd, FALSE)
			    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
				/* If it is not BF, this case is a bug. */
				wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
				ut_error;
			}
		} else
1387
#endif /* WITH_WSREP */
1388
		ut_ad(!other_lock);
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405
	}
#endif /* UNIV_DEBUG */

	/* If rec is the supremum record, then we can reset the gap bit, as
	all locks on the supremum are automatically of the gap type, and we
	try to avoid unnecessary memory consumption of a new record lock
	struct for a gap type lock */

	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));

		/* There should never be LOCK_REC_NOT_GAP on a supremum
		record, but let us play safe */

		type_mode &= ~(LOCK_GAP | LOCK_REC_NOT_GAP);
	}

1406 1407
	if (type_mode & LOCK_WAIT) {
		goto create;
1408
	} else if (lock_t *first_lock = lock_sys_t::get_first(cell, id)) {
1409 1410 1411 1412 1413 1414 1415 1416
		for (lock_t* lock = first_lock;;) {
			if (lock->is_waiting()
			    && lock_rec_get_nth_bit(lock, heap_no)) {
				goto create;
			}
			if (!(lock = lock_rec_get_next_on_page(lock))) {
				break;
			}
1417 1418 1419 1420 1421
		}

		/* Look for a similar record lock on the same page:
		if one is found and there are no waiting lock requests,
		we can just set the bit */
1422 1423
		if (lock_t* lock = lock_rec_find_similar_on_page(
			    type_mode, heap_no, first_lock, trx)) {
1424 1425 1426 1427 1428
			trx_t* lock_trx = lock->trx;
			if (caller_owns_trx_mutex) {
				trx->mutex_unlock();
			}
			lock_trx->mutex_lock();
1429
			lock_rec_set_nth_bit(lock, heap_no);
1430 1431 1432 1433
			lock_trx->mutex_unlock();
			if (caller_owns_trx_mutex) {
				trx->mutex_lock();
			}
1434 1435 1436 1437
			return;
		}
	}

Marko Mäkelä's avatar
Marko Mäkelä committed
1438
create:
1439 1440 1441 1442 1443 1444 1445
	/* Note: We will not pass any conflicting lock to lock_rec_create(),
	because we should be moving an existing waiting lock request. */
	ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx);

	lock_rec_create_low(nullptr,
			    type_mode, id, page, heap_no, index, trx,
			    caller_owns_trx_mutex);
1446
}
1447 1448 1449 1450 1451 1452 1453

/*********************************************************************//**
Tries to lock the specified record in the mode requested. If not immediately
possible, enqueues a waiting lock request. This is a low-level function
which does NOT look at implicit locks! Checks lock compatibility within
explicit locks. This function sets a normal next-key lock, or in the case
of a page supremum record, a gap type lock.
1454
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
1455 1456 1457 1458
static
dberr_t
lock_rec_lock(
/*==========*/
1459
	bool			impl,	/*!< in: if true, no lock is set
1460 1461 1462
					if no wait is necessary: we
					assume that the caller will
					set an implicit lock */
1463
	unsigned		mode,	/*!< in: lock mode: LOCK_X or
1464 1465 1466 1467 1468 1469 1470 1471
					LOCK_S possibly ORed to either
					LOCK_GAP or LOCK_REC_NOT_GAP */
	const buf_block_t*	block,	/*!< in: buffer block containing
					the record */
	ulint			heap_no,/*!< in: heap number of record */
	dict_index_t*		index,	/*!< in: index of record */
	que_thr_t*		thr)	/*!< in: query thread */
{
1472 1473 1474
  trx_t *trx= thr_get_trx(thr);

  ut_ad(!srv_read_only_mode);
1475 1476 1477
  ut_ad(((LOCK_MODE_MASK | LOCK_TABLE) & mode) == LOCK_S ||
        ((LOCK_MODE_MASK | LOCK_TABLE) & mode) == LOCK_X);
  ut_ad(~mode & (LOCK_GAP | LOCK_REC_NOT_GAP));
1478 1479 1480 1481 1482 1483 1484 1485
  ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
  DBUG_EXECUTE_IF("innodb_report_deadlock", return DB_DEADLOCK;);

  ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ||
        lock_table_has(trx, index->table, LOCK_IS));
  ut_ad((LOCK_MODE_MASK & mode) != LOCK_X ||
         lock_table_has(trx, index->table, LOCK_IX));

1486
  if (lock_table_has(trx, index->table,
1487 1488 1489 1490 1491
                     static_cast<lock_mode>(LOCK_MODE_MASK & mode)))
    return DB_SUCCESS;

  MONITOR_ATOMIC_INC(MONITOR_NUM_RECLOCK_REQ);
  const page_id_t id{block->page.id()};
1492
  LockGuard g{lock_sys.rec_hash, id};
1493

1494
  if (lock_t *lock= lock_sys_t::get_first(g.cell(), id))
1495
  {
1496
    dberr_t err= DB_SUCCESS;
1497
    trx->mutex_lock();
1498 1499
    if (lock_rec_get_next_on_page(lock) ||
        lock->trx != trx ||
1500
        lock->type_mode != mode ||
1501 1502
        lock_rec_get_n_bits(lock) <= heap_no)
    {
1503
      /* Do nothing if the trx already has a strong enough lock on rec */
1504
      if (!lock_rec_has_expl(mode, g.cell(), id, heap_no, trx))
1505
      {
1506
        if (lock_t *c_lock= lock_rec_other_has_conflicting(mode, g.cell(), id,
1507
                                                           heap_no, trx))
1508 1509 1510 1511
          /*
            If another transaction has a non-gap conflicting
            request in the queue, as this transaction does not
            have a lock strong enough already granted on the
1512 1513 1514 1515
            record, we have to wait.
          */
          err= lock_rec_enqueue_waiting(c_lock, mode, id, block->frame, heap_no,
                                        index, thr, nullptr);
1516 1517 1518
        else if (!impl)
        {
          /* Set the requested lock on the record. */
1519 1520
          lock_rec_add_to_queue(mode, g.cell(), id, block->frame, heap_no,
                                index, trx, true);
1521 1522 1523
          err= DB_SUCCESS_LOCKED_REC;
        }
      }
1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
    }
    else if (!impl)
    {
      /*
        If the nth bit of the record lock is already set then we do not set
        a new lock bit, otherwise we do set
      */
      if (!lock_rec_get_nth_bit(lock, heap_no))
      {
        lock_rec_set_nth_bit(lock, heap_no);
        err= DB_SUCCESS_LOCKED_REC;
      }
    }
1537
    trx->mutex_unlock();
1538
    return err;
1539 1540 1541 1542 1543 1544 1545 1546
  }
  else
  {
    /*
      Simplified and faster path for the most common cases
      Note that we don't own the trx mutex.
    */
    if (!impl)
1547 1548
      lock_rec_create_low(nullptr,
                          mode, id, block->frame, heap_no, index, trx, false);
1549

1550
    return DB_SUCCESS_LOCKED_REC;
1551
  }
1552 1553 1554 1555
}

/*********************************************************************//**
Checks if a waiting record lock request still has to wait in a queue.
1556
@return lock that is causing the wait */
1557 1558
static
const lock_t*
1559
lock_rec_has_to_wait_in_queue(const hash_cell_t &cell, const lock_t *wait_lock)
1560 1561 1562 1563 1564 1565
{
	const lock_t*	lock;
	ulint		heap_no;
	ulint		bit_mask;
	ulint		bit_offset;

1566
	ut_ad(wait_lock->is_waiting());
1567
	ut_ad(!wait_lock->is_table());
1568 1569 1570 1571

	heap_no = lock_rec_find_set_bit(wait_lock);

	bit_offset = heap_no / 8;
1572
	bit_mask = static_cast<ulint>(1) << (heap_no % 8);
1573

1574 1575
	for (lock = lock_sys_t::get_first(
		     cell, wait_lock->un_member.rec_lock.page_id);
1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589
	     lock != wait_lock;
	     lock = lock_rec_get_next_on_page_const(lock)) {
		const byte*	p = (const byte*) &lock[1];

		if (heap_no < lock_rec_get_n_bits(lock)
		    && (p[bit_offset] & bit_mask)
		    && lock_has_to_wait(wait_lock, lock)) {
			return(lock);
		}
	}

	return(NULL);
}

1590 1591 1592 1593
/** Note that a record lock wait started */
inline void lock_sys_t::wait_start()
{
  mysql_mutex_assert_owner(&wait_mutex);
1594 1595 1596 1597 1598
  wait_count+= WAIT_COUNT_STEP + 1;
  /* The maximum number of concurrently waiting transactions is one less
  than the maximum number of concurrent transactions. */
  static_assert(WAIT_COUNT_STEP == UNIV_PAGE_SIZE_MAX / 16 * TRX_SYS_N_RSEGS,
                "compatibility");
1599 1600 1601 1602 1603 1604 1605
}

/** Note that a record lock wait resumed */
inline
void lock_sys_t::wait_resume(THD *thd, my_hrtime_t start, my_hrtime_t now)
{
  mysql_mutex_assert_owner(&wait_mutex);
1606 1607 1608
  ut_ad(get_wait_pending());
  ut_ad(get_wait_cumulative());
  wait_count--;
1609 1610
  if (now.val >= start.val)
  {
1611 1612
    const uint32_t diff_time=
      static_cast<uint32_t>((now.val - start.val) / 1000);
1613 1614 1615 1616 1617 1618 1619 1620 1621
    wait_time+= diff_time;

    if (diff_time > wait_time_max)
      wait_time_max= diff_time;

    thd_storage_lock_wait(thd, diff_time);
  }
}

1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641
#ifdef HAVE_REPLICATION
ATTRIBUTE_NOINLINE MY_ATTRIBUTE((nonnull))
/** Report lock waits to parallel replication.
@param trx       transaction that may be waiting for a lock
@param wait_lock lock that is being waited for */
static void lock_wait_rpl_report(trx_t *trx)
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  ut_ad(trx->state == TRX_STATE_ACTIVE);
  THD *const thd= trx->mysql_thd;
  ut_ad(thd);
  const lock_t *wait_lock= trx->lock.wait_lock;
  if (!wait_lock)
    return;
  ut_ad(!(wait_lock->type_mode & LOCK_AUTO_INC));
  if (!lock_sys.wr_lock_try())
  {
    mysql_mutex_unlock(&lock_sys.wait_mutex);
    lock_sys.wr_lock(SRW_LOCK_CALL);
    mysql_mutex_lock(&lock_sys.wait_mutex);
1642 1643 1644
    wait_lock= trx->lock.wait_lock;
    if (!wait_lock)
    {
1645
func_exit:
1646 1647 1648
      lock_sys.wr_unlock();
      return;
    }
1649 1650 1651 1652 1653 1654
  }
  ut_ad(wait_lock->is_waiting());
  ut_ad(!(wait_lock->type_mode & LOCK_AUTO_INC));

  if (wait_lock->is_table())
  {
1655 1656 1657 1658 1659
    dict_table_t *table= wait_lock->un_member.tab_lock.table;
    for (lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock;
         lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
      if (!(lock->type_mode & LOCK_AUTO_INC) && lock->trx != trx)
        thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
1660
  }
1661
  else
1662
  {
1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676
    const page_id_t id{wait_lock->un_member.rec_lock.page_id};
    hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                         ? lock_sys.prdt_hash : lock_sys.rec_hash).cell_get
      (id.fold());
    if (lock_t *lock= lock_sys_t::get_first(cell, id))
    {
      const ulint heap_no= lock_rec_find_set_bit(wait_lock);
      if (!lock_rec_get_nth_bit(lock, heap_no))
        lock= lock_rec_get_next(heap_no, lock);
      do
        if (lock->trx->mysql_thd != thd)
          thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
      while ((lock= lock_rec_get_next(heap_no, lock)));
    }
1677 1678 1679 1680 1681 1682
  }

  goto func_exit;
}
#endif /* HAVE_REPLICATION */

1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702
/** Wait for a lock to be released.
@retval DB_DEADLOCK if this transaction was chosen as the deadlock victim
@retval DB_INTERRUPTED if the execution was interrupted by the user
@retval DB_LOCK_WAIT_TIMEOUT if the lock wait timed out
@retval DB_SUCCESS if the lock was granted */
dberr_t lock_wait(que_thr_t *thr)
{
  trx_t *trx= thr_get_trx(thr);

  if (trx->mysql_thd)
    DEBUG_SYNC_C("lock_wait_suspend_thread_enter");

  /* InnoDB system transactions may use the global value of
  innodb_lock_wait_timeout, because trx->mysql_thd == NULL. */
  const ulong innodb_lock_wait_timeout= trx_lock_wait_timeout_get(trx);
  const bool no_timeout= innodb_lock_wait_timeout > 100000000;
  const my_hrtime_t suspend_time= my_hrtime_coarse();
  ut_ad(!trx->dict_operation_lock_mode ||
        trx->dict_operation_lock_mode == RW_S_LATCH);

1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720
  /* The wait_lock can be cleared by another thread in lock_grant(),
  lock_rec_cancel(), or lock_cancel_waiting_and_release(). But, a wait
  can only be initiated by the current thread which owns the transaction.

  Even if trx->lock.wait_lock were changed, the object that it used to
  point to it will remain valid memory (remain allocated from
  trx->lock.lock_heap). If trx->lock.wait_lock was set to nullptr, the
  original object could be transformed to a granted lock. On a page
  split or merge, we would change trx->lock.wait_lock to point to
  another waiting lock request object, and the old object would be
  logically discarded.

  In any case, it is safe to read the memory that wait_lock points to,
  even though we are not holding any mutex. We are only reading
  wait_lock->type_mode & (LOCK_TABLE | LOCK_AUTO_INC), which will be
  unaffected by any page split or merge operation. (Furthermore,
  table lock objects will never be cloned or moved.) */
  const lock_t *const wait_lock= trx->lock.wait_lock;
1721

1722
  if (!wait_lock)
1723 1724
  {
    /* The lock has already been released or this transaction
1725 1726
    was chosen as a deadlock victim: no need to wait */
    if (trx->lock.was_chosen_as_deadlock_victim.fetch_and(byte(~1)))
1727
      trx->error_state= DB_DEADLOCK;
1728 1729
    else
      trx->error_state= DB_SUCCESS;
1730 1731 1732 1733 1734 1735

    return trx->error_state;
  }

  trx->lock.suspend_time= suspend_time;

1736 1737 1738
  const auto had_dict_lock= trx->dict_operation_lock_mode;
  if (had_dict_lock) /* Release foreign key check latch */
    row_mysql_unfreeze_data_dictionary(trx);
1739

1740
  IF_WSREP(if (trx->is_wsrep()) lock_wait_wsrep(trx),);
1741

1742
  const auto type_mode= wait_lock->type_mode;
1743
#ifdef HAVE_REPLICATION
1744 1745 1746 1747 1748 1749 1750 1751
  /* Even though lock_wait_rpl_report() has nothing to do with
  deadlock detection, it was always disabled by innodb_deadlock_detect=OFF.
  We will keep it in that way, because unfortunately
  thd_need_wait_reports() will hold even if parallel (or any) replication
  is not being used. We want to be allow the user to skip
  lock_wait_rpl_report(). */
  const bool rpl= !(type_mode & LOCK_AUTO_INC) && trx->mysql_thd &&
    innodb_deadlock_detect && thd_need_wait_reports(trx->mysql_thd);
1752
#endif
1753 1754 1755 1756 1757
  const bool row_lock_wait= thr->lock_state == QUE_THR_LOCK_ROW;
  timespec abstime;
  set_timespec_time_nsec(abstime, suspend_time.val * 1000);
  abstime.MY_tv_sec+= innodb_lock_wait_timeout;
  thd_wait_begin(trx->mysql_thd, (type_mode & LOCK_TABLE)
1758
                 ? THD_WAIT_TABLE_LOCK : THD_WAIT_ROW_LOCK);
1759
  dberr_t error_state= DB_SUCCESS;
1760

1761 1762 1763
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (trx->lock.wait_lock)
  {
1764 1765
    if (Deadlock::check_and_resolve(trx))
    {
1766 1767
      ut_ad(!trx->lock.wait_lock);
      error_state= DB_DEADLOCK;
1768
      goto end_wait;
1769
    }
1770 1771 1772 1773 1774 1775
  }
  else
    goto end_wait;

  if (row_lock_wait)
    lock_sys.wait_start();
1776 1777

#ifdef HAVE_REPLICATION
1778 1779
  if (rpl)
    lock_wait_rpl_report(trx);
1780
#endif
1781

1782 1783 1784 1785 1786 1787 1788
  trx->error_state= DB_SUCCESS;

  while (trx->lock.wait_lock)
  {
    int err;

    if (no_timeout)
1789
    {
1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811
      my_cond_wait(&trx->lock.cond, &lock_sys.wait_mutex.m_mutex);
      err= 0;
    }
    else
      err= my_cond_timedwait(&trx->lock.cond, &lock_sys.wait_mutex.m_mutex,
                             &abstime);
    error_state= trx->error_state;
    switch (error_state) {
    case DB_DEADLOCK:
    case DB_INTERRUPTED:
      break;
    default:
      ut_ad(error_state != DB_LOCK_WAIT_TIMEOUT);
      if (trx_is_interrupted(trx))
        /* innobase_kill_query() can only set trx->error_state=DB_INTERRUPTED
        for any transaction that is attached to a connection. */
        error_state= DB_INTERRUPTED;
      else if (!err)
        continue;
#ifdef WITH_WSREP
      else if (trx->is_wsrep() && wsrep_is_BF_lock_timeout(trx, false));
#endif
1812
      else
1813 1814 1815
      {
        error_state= DB_LOCK_WAIT_TIMEOUT;
        lock_sys.timeouts++;
1816 1817
      }
    }
1818
    break;
1819 1820 1821 1822 1823
  }

  if (row_lock_wait)
    lock_sys.wait_resume(trx->mysql_thd, suspend_time, my_hrtime_coarse());

1824
end_wait:
1825
  if (lock_t *lock= trx->lock.wait_lock)
1826
  {
1827
    lock_sys_t::cancel(trx, lock, false);
1828
    lock_sys.deadlock_check();
1829 1830
  }

1831 1832 1833
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  thd_wait_end(trx->mysql_thd);

1834 1835 1836 1837 1838
  if (had_dict_lock)
    row_mysql_freeze_data_dictionary(trx);

  trx->error_state= error_state;
  return error_state;
1839
}
1840

1841

1842 1843 1844 1845
/** Resume a lock wait */
static void lock_wait_end(trx_t *trx)
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
1846
  ut_ad(trx->mutex_is_owner());
1847 1848
  ut_ad(trx->state == TRX_STATE_ACTIVE);
  ut_ad(trx->lock.wait_thr);
1849

1850
  if (trx->lock.was_chosen_as_deadlock_victim.fetch_and(byte(~1)))
1851
    trx->error_state= DB_DEADLOCK;
1852

1853
  trx->lock.wait_thr= nullptr;
1854
  pthread_cond_signal(&trx->lock.cond);
1855
}
1856

1857 1858 1859 1860 1861
/** Grant a waiting lock request and release the waiting transaction. */
static void lock_grant(lock_t *lock)
{
  lock_reset_lock_and_trx_wait(lock);
  trx_t *trx= lock->trx;
1862
  trx->mutex_lock();
1863 1864 1865 1866 1867 1868 1869
  if (lock->mode() == LOCK_AUTO_INC)
  {
    dict_table_t *table= lock->un_member.tab_lock.table;
    ut_ad(!table->autoinc_trx);
    table->autoinc_trx= trx;
    ib_vector_push(trx->autoinc_locks, &lock);
  }
1870

1871
  DBUG_PRINT("ib_lock", ("wait for trx " TRX_ID_FMT " ends", trx->id));
1872

1873 1874
  /* If we are resolving a deadlock by choosing another transaction as
  a victim, then our original transaction may not be waiting anymore */
1875

1876 1877
  if (trx->lock.wait_thr)
    lock_wait_end(trx);
1878

1879
  trx->mutex_unlock();
1880 1881 1882 1883 1884 1885
}

/*************************************************************//**
Cancels a waiting record lock request and releases the waiting transaction
that requested it. NOTE: does NOT check if waiting lock requests behind this
one can now be granted! */
1886
static void lock_rec_cancel(lock_t *lock)
1887
{
1888 1889 1890
  trx_t *trx= lock->trx;
  mysql_mutex_lock(&lock_sys.wait_mutex);
  trx->mutex_lock();
1891

1892 1893
  ut_d(lock_sys.hash_get(lock->type_mode).
       assert_locked(lock->un_member.rec_lock.page_id));
1894 1895
  /* Reset the bit (there can be only one set bit) in the lock bitmap */
  lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
1896

1897 1898
  /* Reset the wait flag and the back pointer to lock in trx */
  lock_reset_lock_and_trx_wait(lock);
1899

1900 1901 1902 1903
  /* The following releases the trx from lock wait */
  lock_wait_end(trx);
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  trx->mutex_unlock();
1904 1905
}

1906 1907 1908
/** Remove a record lock request, waiting or granted, from the queue and
grant locks to other transactions in the queue if they now are entitled
to a lock. NOTE: all record locks contained in in_lock are removed.
1909 1910 1911
@param[in,out]	in_lock		record lock
@param[in]	owns_wait_mutex	whether lock_sys.wait_mutex is held */
static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex)
1912
{
1913 1914 1915
#ifdef SAFE_MUTEX
	ut_ad(owns_wait_mutex == mysql_mutex_is_owner(&lock_sys.wait_mutex));
#endif /* SAFE_MUTEX */
1916
	ut_ad(!in_lock->is_table());
1917

1918
	const page_id_t page_id{in_lock->un_member.rec_lock.page_id};
1919 1920
	auto& lock_hash = lock_sys.hash_get(in_lock->type_mode);
	ut_ad(lock_sys.is_writer() || in_lock->trx->mutex_is_owner());
1921

1922
	ut_d(auto old_n_locks=)
1923
	in_lock->index->table->n_rec_locks--;
1924
	ut_ad(old_n_locks);
1925

1926
	const ulint rec_fold = page_id.fold();
1927 1928
	hash_cell_t &cell = *lock_hash.cell_get(rec_fold);
	lock_sys.assert_locked(cell);
1929

1930 1931
	HASH_DELETE(lock_t, hash, &lock_hash, rec_fold, in_lock);
	ut_ad(lock_sys.is_writer() || in_lock->trx->mutex_is_owner());
1932
	UT_LIST_REMOVE(in_lock->trx->lock.trx_locks, in_lock);
1933 1934 1935

	MONITOR_INC(MONITOR_RECLOCK_REMOVED);
	MONITOR_DEC(MONITOR_NUM_RECLOCK);
1936

1937 1938
	bool acquired = false;

1939 1940 1941
	/* Check if waiting locks in the queue can now be granted:
	grant locks if there are no conflicting locks ahead. Stop at
	the first X lock that is waiting or has been granted. */
1942

1943
	for (lock_t* lock = lock_sys_t::get_first(cell, page_id);
1944 1945
	     lock != NULL;
	     lock = lock_rec_get_next_on_page(lock)) {
1946

1947
		if (!lock->is_waiting()) {
1948 1949
			continue;
		}
1950

1951 1952 1953 1954
		if (!owns_wait_mutex) {
			mysql_mutex_lock(&lock_sys.wait_mutex);
			acquired = owns_wait_mutex = true;
		}
1955 1956 1957 1958

		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

1959 1960
		if (const lock_t* c = lock_rec_has_to_wait_in_queue(
			    cell, lock)) {
1961 1962 1963 1964 1965 1966 1967 1968
			trx_t* c_trx = c->trx;
			lock->trx->lock.wait_trx = c_trx;
			if (c_trx->lock.wait_trx
			    && innodb_deadlock_detect
			    && Deadlock::to_check.emplace(c_trx).second) {
				Deadlock::to_be_checked = true;
			}
		} else {
1969 1970 1971
			/* Grant the lock */
			ut_ad(lock->trx != in_lock->trx);
			lock_grant(lock);
1972
		}
1973
	}
1974 1975 1976 1977

	if (acquired) {
		mysql_mutex_unlock(&lock_sys.wait_mutex);
	}
1978 1979
}

1980 1981 1982 1983
/** Remove a record lock request, waiting or granted, on a discarded page
@param hash     hash table
@param in_lock  lock object */
void lock_rec_discard(lock_sys_t::hash_table &lock_hash, lock_t *in_lock)
1984
{
1985 1986
  ut_ad(!in_lock->is_table());
  lock_hash.assert_locked(in_lock->un_member.rec_lock.page_id);
1987

1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998
  HASH_DELETE(lock_t, hash, &lock_hash,
              in_lock->un_member.rec_lock.page_id.fold(), in_lock);
  trx_t *trx= in_lock->trx;
  trx->mutex_lock();
  ut_d(auto old_locks=)
  in_lock->index->table->n_rec_locks--;
  ut_ad(old_locks);
  UT_LIST_REMOVE(trx->lock.trx_locks, in_lock);
  trx->mutex_unlock();
  MONITOR_INC(MONITOR_RECLOCK_REMOVED);
  MONITOR_DEC(MONITOR_NUM_RECLOCK);
1999 2000 2001 2002 2003 2004
}

/*************************************************************//**
Removes record lock objects set on an index page which is discarded. This
function does not move locks, or check for waiting locks, therefore the
lock bitmaps must already be reset when this function is called. */
2005
static void
2006
lock_rec_free_all_from_discard_page(page_id_t id, const hash_cell_t &cell,
2007
                                    lock_sys_t::hash_table &lock_hash)
2008
{
2009
  for (lock_t *lock= lock_sys_t::get_first(cell, id); lock; )
2010
  {
2011 2012
    ut_ad(&lock_hash != &lock_sys.rec_hash ||
          lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
2013
    ut_ad(!lock->is_waiting());
2014
    lock_t *next_lock= lock_rec_get_next_on_page(lock);
2015
    lock_rec_discard(lock_hash, lock);
2016 2017
    lock= next_lock;
  }
2018 2019 2020 2021 2022 2023 2024 2025 2026
}

/*============= RECORD LOCK MOVING AND INHERITING ===================*/

/*************************************************************//**
Resets the lock bits for a single record. Releases transactions waiting for
lock requests here. */
static
void
2027 2028
lock_rec_reset_and_release_wait(const hash_cell_t &cell, const page_id_t id,
                                ulint heap_no)
2029
{
2030
  for (lock_t *lock= lock_sys.get_first(cell, id, heap_no); lock;
2031
       lock= lock_rec_get_next(heap_no, lock))
2032
  {
2033 2034 2035
    if (lock->is_waiting())
      lock_rec_cancel(lock);
    else
2036 2037 2038
    {
      trx_t *lock_trx= lock->trx;
      lock_trx->mutex_lock();
2039
      lock_rec_reset_nth_bit(lock, heap_no);
2040 2041 2042
      lock_trx->mutex_unlock();
    }
  }
2043 2044
}

2045 2046 2047 2048 2049 2050 2051 2052 2053
/*************************************************************//**
Makes a record to inherit the locks (except LOCK_INSERT_INTENTION type)
of another record as gap type locks, but does not reset the lock bits of
the other record. Also waiting lock requests on rec are inherited as
GRANTED gap locks. */
static
void
lock_rec_inherit_to_gap(
/*====================*/
2054
	hash_cell_t&		heir_cell,	/*!< heir hash table cell */
2055
	const page_id_t		heir,		/*!< in: page containing the
2056
						record which inherits */
2057 2058
	const hash_cell_t&	donor_cell,	/*!< donor hash table cell */
	const page_id_t		donor,		/*!< in: page containing the
2059 2060 2061
						record from which inherited;
						does NOT reset the locks on
						this record */
2062
	const page_t*		heir_page,	/*!< in: heir page frame */
2063 2064 2065 2066 2067
	ulint			heir_heap_no,	/*!< in: heap_no of the
						inheriting record */
	ulint			heap_no)	/*!< in: heap_no of the
						donating record */
{
2068 2069
	/* At READ UNCOMMITTED or READ COMMITTED isolation level,
	we do not want locks set
2070
	by an UPDATE or a DELETE to be inherited as gap type locks. But we
Sergei Golubchik's avatar
Sergei Golubchik committed
2071
	DO want S-locks/X-locks(taken for replace) set by a consistency
2072
	constraint to be inherited also then. */
2073

2074
	for (lock_t* lock= lock_sys_t::get_first(donor_cell, donor, heap_no);
2075
	     lock;
2076
	     lock = lock_rec_get_next(heap_no, lock)) {
2077
		trx_t* lock_trx = lock->trx;
2078
		if (!lock->is_insert_intention()
2079
		    && (lock_trx->isolation_level > TRX_ISO_READ_COMMITTED
2080
			|| lock->mode() !=
2081
			(lock_trx->duplicates ? LOCK_S : LOCK_X))) {
2082
			lock_rec_add_to_queue(LOCK_GAP | lock->mode(),
2083
					      heir_cell, heir, heir_page,
2084
					      heir_heap_no,
2085
					      lock->index, lock_trx, false);
2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105
		}
	}
}

/*************************************************************//**
Makes a record to inherit the gap locks (except LOCK_INSERT_INTENTION type)
of another record as gap type locks, but does not reset the lock bits of the
other record. Also waiting lock requests are inherited as GRANTED gap locks. */
static
void
lock_rec_inherit_to_gap_if_gap_lock(
/*================================*/
	const buf_block_t*	block,		/*!< in: buffer block */
	ulint			heir_heap_no,	/*!< in: heap_no of
						record which inherits */
	ulint			heap_no)	/*!< in: heap_no of record
						from which inherited;
						does NOT reset the locks
						on this record */
{
2106
  const page_id_t id{block->page.id()};
2107
  LockGuard g{lock_sys.rec_hash, id};
2108

2109
  for (lock_t *lock= lock_sys_t::get_first(g.cell(), id, heap_no); lock;
2110
       lock= lock_rec_get_next(heap_no, lock))
2111 2112 2113
     if (!lock->is_insert_intention() && (heap_no == PAGE_HEAP_NO_SUPREMUM ||
                                          !lock->is_record_not_gap()) &&
         !lock_table_has(lock->trx, lock->index->table, LOCK_X))
2114 2115
       lock_rec_add_to_queue(LOCK_GAP | lock->mode(),
                             g.cell(), id, block->frame,
2116
                             heir_heap_no, lock->index, lock->trx, false);
2117 2118 2119 2120 2121
}

/*************************************************************//**
Moves the locks of a record to another record and resets the lock bits of
the donating record. */
2122
static
2123
void
2124 2125
lock_rec_move(
	hash_cell_t&		receiver_cell,	/*!< in: hash table cell */
2126
	const buf_block_t&	receiver,	/*!< in: buffer block containing
2127
						the receiving record */
2128 2129
	const page_id_t		receiver_id,	/*!< in: page identifier */
	const hash_cell_t&	donator_cell,	/*!< in: hash table cell */
2130
	const page_id_t		donator_id,	/*!< in: page identifier of
2131 2132 2133 2134 2135 2136 2137 2138
						the donating record */
	ulint			receiver_heap_no,/*!< in: heap_no of the record
						which gets the locks; there
						must be no lock requests
						on it! */
	ulint			donator_heap_no)/*!< in: heap_no of the record
						which gives the locks */
{
2139 2140
	ut_ad(!lock_sys_t::get_first(receiver_cell,
				     receiver_id, receiver_heap_no));
2141

2142 2143
	for (lock_t *lock = lock_sys_t::get_first(donator_cell, donator_id,
						  donator_heap_no);
2144 2145
	     lock != NULL;
	     lock = lock_rec_get_next(donator_heap_no, lock)) {
2146
		const auto type_mode = lock->type_mode;
2147
		if (type_mode & LOCK_WAIT) {
2148 2149
			ut_ad(lock->trx->lock.wait_lock == lock);
			lock->type_mode &= ~LOCK_WAIT;
2150 2151
		}

2152 2153 2154 2155
		trx_t* lock_trx = lock->trx;
		lock_trx->mutex_lock();
		lock_rec_reset_nth_bit(lock, donator_heap_no);

2156
		/* Note that we FIRST reset the bit, and then set the lock:
2157
		the function works also if donator_id == receiver_id */
2158

2159 2160
		lock_rec_add_to_queue(type_mode, receiver_cell,
				      receiver_id, receiver.frame,
2161
				      receiver_heap_no,
2162 2163
				      lock->index, lock_trx, true);
		lock_trx->mutex_unlock();
2164 2165
	}

2166 2167
	ut_ad(!lock_sys_t::get_first(donator_cell, donator_id,
				     donator_heap_no));
2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196
}

/** Move all the granted locks to the front of the given lock list.
All the waiting locks will be at the end of the list.
@param[in,out]	lock_list	the given lock list.  */
static
void
lock_move_granted_locks_to_front(
	UT_LIST_BASE_NODE_T(lock_t)&	lock_list)
{
	lock_t*	lock;

	bool seen_waiting_lock = false;

	for (lock = UT_LIST_GET_FIRST(lock_list); lock;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {

		if (!seen_waiting_lock) {
			if (lock->is_waiting()) {
				seen_waiting_lock = true;
			}
			continue;
		}

		ut_ad(seen_waiting_lock);

		if (!lock->is_waiting()) {
			lock_t* prev = UT_LIST_GET_PREV(trx_locks, lock);
			ut_a(prev);
Marko Mäkelä's avatar
Marko Mäkelä committed
2197
			ut_list_move_to_front(lock_list, lock);
2198 2199 2200
			lock = prev;
		}
	}
2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215
}

/*************************************************************//**
Updates the lock table when we have reorganized a page. NOTE: we copy
also the locks set on the infimum of the page; the infimum may carry
locks if an update of a record is occurring on the page, and its locks
were temporarily stored on the infimum. */
void
lock_move_reorganize_page(
/*======================*/
	const buf_block_t*	block,	/*!< in: old index page, now
					reorganized */
	const buf_block_t*	oblock)	/*!< in: copy of the old, not
					reorganized page */
{
2216
  mem_heap_t *heap;
2217

2218 2219 2220
  {
    UT_LIST_BASE_NODE_T(lock_t) old_locks;
    UT_LIST_INIT(old_locks, &lock_t::trx_locks);
2221

2222
    const page_id_t id{block->page.id()};
2223
    const auto id_fold= id.fold();
2224 2225
    {
      LockGuard g{lock_sys.rec_hash, id};
2226
      if (!lock_sys_t::get_first(g.cell(), id))
2227 2228 2229 2230
        return;
    }

    /* We will modify arbitrary trx->lock.trx_locks. */
2231
    LockMutexGuard g{SRW_LOCK_CALL};
2232
    hash_cell_t &cell= *lock_sys.rec_hash.cell_get(id_fold);
2233

2234 2235 2236
    /* Note: Predicate locks for SPATIAL INDEX are not affected by
    page reorganize, because they do not refer to individual record
    heap numbers. */
2237
    lock_t *lock= lock_sys_t::get_first(cell, id);
2238

2239 2240
    if (!lock)
      return;
2241

2242
    heap= mem_heap_create(256);
2243

2244 2245 2246
    /* Copy first all the locks on the page to heap and reset the
    bitmaps in the original locks; chain the copies of the locks
    using the trx_locks field in them. */
2247

2248 2249 2250 2251
    do
    {
      /* Make a copy of the lock */
      lock_t *old_lock= lock_rec_copy(lock, heap);
2252

2253
      UT_LIST_ADD_LAST(old_locks, old_lock);
2254

2255 2256
      /* Reset bitmap of lock */
      lock_rec_bitmap_reset(lock);
2257

2258 2259 2260 2261 2262
      if (lock->is_waiting())
      {
        ut_ad(lock->trx->lock.wait_lock == lock);
        lock->type_mode&= ~LOCK_WAIT;
      }
2263

2264 2265 2266
      lock= lock_rec_get_next_on_page(lock);
    }
    while (lock);
2267

2268 2269
    const ulint comp= page_is_comp(block->frame);
    ut_ad(comp == page_is_comp(oblock->frame));
2270

2271
    lock_move_granted_locks_to_front(old_locks);
2272

2273 2274
    DBUG_EXECUTE_IF("do_lock_reverse_page_reorganize",
                    ut_list_reverse(old_locks););
2275

2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292
    for (lock= UT_LIST_GET_FIRST(old_locks); lock;
         lock= UT_LIST_GET_NEXT(trx_locks, lock))
    {
      /* NOTE: we copy also the locks set on the infimum and
      supremum of the page; the infimum may carry locks if an
      update of a record is occurring on the page, and its locks
      were temporarily stored on the infimum */
      const rec_t *rec1= page_get_infimum_rec(block->frame);
      const rec_t *rec2= page_get_infimum_rec(oblock->frame);

      /* Set locks according to old locks */
      for (;;)
      {
        ulint old_heap_no;
        ulint new_heap_no;
        ut_d(const rec_t* const orec= rec1);
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
2293

2294 2295 2296 2297
        if (comp)
        {
          old_heap_no= rec_get_heap_no_new(rec2);
          new_heap_no= rec_get_heap_no_new(rec1);
2298

2299 2300 2301 2302 2303 2304 2305 2306
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          old_heap_no= rec_get_heap_no_old(rec2);
          new_heap_no= rec_get_heap_no_old(rec1);
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
2307

2308 2309 2310
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2311

2312 2313 2314
        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();

2315 2316 2317 2318 2319
        /* Clear the bit in old_lock. */
        if (old_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, old_heap_no))
        {
          ut_ad(!page_rec_is_metadata(orec));
2320

2321 2322
          /* NOTE that the old lock bitmap could be too
          small for the new heap number! */
2323 2324
          lock_rec_add_to_queue(lock->type_mode, cell, id, block->frame,
                                new_heap_no, lock->index, lock_trx, true);
2325
        }
2326

2327 2328
        lock_trx->mutex_unlock();

2329 2330 2331 2332 2333 2334
        if (new_heap_no == PAGE_HEAP_NO_SUPREMUM)
        {
           ut_ad(old_heap_no == PAGE_HEAP_NO_SUPREMUM);
           break;
        }
      }
2335

2336 2337 2338
      ut_ad(lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
    }
  }
2339

2340
  mem_heap_free(heap);
2341 2342

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2343 2344 2345 2346 2347
  if (fil_space_t *space= fil_space_t::get(id.space()))
  {
    ut_ad(lock_rec_validate_page(block, space->is_latched()));
    space->release();
  }
2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361
#endif
}

/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list end is moved to another page. */
void
lock_move_rec_list_end(
/*===================*/
	const buf_block_t*	new_block,	/*!< in: index page to move to */
	const buf_block_t*	block,		/*!< in: index page */
	const rec_t*		rec)		/*!< in: record on page: this
						is the first record moved */
{
2362
  const ulint comp= page_rec_is_comp(rec);
2363

2364 2365
  ut_ad(block->frame == page_align(rec));
  ut_ad(comp == page_is_comp(new_block->frame));
2366

2367 2368 2369
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
  {
2370
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2371

2372 2373 2374 2375 2376
    /* Note: when we move locks from record to record, waiting locks
    and possible granted gap type locks behind them are enqueued in
    the original order, because new elements are inserted to a hash
    table to the end of the hash chain, and lock_rec_add_to_queue
    does not reuse locks if there are waiters in the queue. */
2377
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2378 2379 2380 2381 2382
         lock= lock_rec_get_next_on_page(lock))
    {
      const rec_t *rec1= rec;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2383

2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395
      if (comp)
      {
        if (page_offset(rec1) == PAGE_NEW_INFIMUM)
          rec1= page_rec_get_next_low(rec1, TRUE);
        rec2= page_rec_get_next_low(new_block->frame + PAGE_NEW_INFIMUM, TRUE);
      }
      else
      {
        if (page_offset(rec1) == PAGE_OLD_INFIMUM)
          rec1= page_rec_get_next_low(rec1, FALSE);
        rec2= page_rec_get_next_low(new_block->frame + PAGE_OLD_INFIMUM,FALSE);
      }
2396

2397 2398 2399 2400 2401 2402
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
      for (;;)
      {
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
        ut_d(const rec_t* const orec= rec1);
2403

2404 2405
        ulint rec1_heap_no;
        ulint rec2_heap_no;
2406

2407 2408 2409 2410 2411
        if (comp)
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
          if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM)
            break;
2412

2413 2414 2415 2416 2417 2418 2419
          rec2_heap_no= rec_get_heap_no_new(rec2);
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
2420

2421 2422 2423
          if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM)
            break;
          rec2_heap_no= rec_get_heap_no_old(rec2);
2424

2425 2426
          ut_ad(rec_get_data_size_old(rec1) == rec_get_data_size_old(rec2));
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec1)));
2427

2428 2429 2430
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2431

2432 2433 2434
        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();

2435 2436 2437 2438
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
        {
          ut_ad(!page_rec_is_metadata(orec));
2439

2440 2441
          if (type_mode & LOCK_WAIT)
          {
2442
            ut_ad(lock_trx->lock.wait_lock == lock);
2443 2444
            lock->type_mode&= ~LOCK_WAIT;
          }
2445

2446
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id, new_block->frame,
2447
                                rec2_heap_no, lock->index, lock_trx, true);
2448
        }
2449 2450

        lock_trx->mutex_unlock();
2451 2452 2453
      }
    }
  }
2454 2455

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2456 2457 2458 2459 2460 2461 2462
  if (fil_space_t *space= fil_space_t::get(id.space()))
  {
    const bool is_latched{space->is_latched()};
    ut_ad(lock_rec_validate_page(block, is_latched));
    ut_ad(lock_rec_validate_page(new_block, is_latched));
    space->release();
  }
2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483
#endif
}

/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list start is moved to another page. */
void
lock_move_rec_list_start(
/*=====================*/
	const buf_block_t*	new_block,	/*!< in: index page to
						move to */
	const buf_block_t*	block,		/*!< in: index page */
	const rec_t*		rec,		/*!< in: record on page:
						this is the first
						record NOT copied */
	const rec_t*		old_end)	/*!< in: old
						previous-to-last
						record on new_page
						before the records
						were copied */
{
2484
  const ulint comp= page_rec_is_comp(rec);
2485

2486 2487 2488 2489 2490 2491
  ut_ad(block->frame == page_align(rec));
  ut_ad(comp == page_is_comp(new_block->frame));
  ut_ad(new_block->frame == page_align(old_end));
  ut_ad(!page_rec_is_metadata(rec));
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
2492

2493
  {
2494
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2495

2496
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2497 2498 2499 2500 2501
         lock= lock_rec_get_next_on_page(lock))
    {
      const rec_t *rec1;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2502

2503 2504 2505 2506 2507 2508 2509 2510 2511 2512
      if (comp)
      {
        rec1= page_rec_get_next_low(block->frame + PAGE_NEW_INFIMUM, TRUE);
        rec2= page_rec_get_next_low(old_end, TRUE);
      }
      else
      {
        rec1= page_rec_get_next_low(block->frame + PAGE_OLD_INFIMUM, FALSE);
        rec2= page_rec_get_next_low(old_end, FALSE);
      }
2513

2514 2515
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
2516

2517 2518 2519 2520
      while (rec1 != rec)
      {
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
        ut_d(const rec_t* const prev= rec1);
2521

2522 2523
        ulint rec1_heap_no;
        ulint rec2_heap_no;
2524

2525 2526 2527 2528
        if (comp)
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
          rec2_heap_no= rec_get_heap_no_new(rec2);
2529

2530 2531 2532 2533 2534 2535 2536
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
          rec2_heap_no= rec_get_heap_no_old(rec2);
2537

2538
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
2539

2540 2541 2542
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2543

2544 2545 2546
        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();

2547 2548 2549 2550
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
        {
          ut_ad(!page_rec_is_metadata(prev));
2551

2552 2553
          if (type_mode & LOCK_WAIT)
          {
2554
            ut_ad(lock_trx->lock.wait_lock == lock);
2555 2556 2557
            lock->type_mode&= ~LOCK_WAIT;
          }

2558
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id, new_block->frame,
2559
                                rec2_heap_no, lock->index, lock_trx, true);
2560
        }
2561 2562

        lock_trx->mutex_unlock();
2563
      }
2564 2565

#ifdef UNIV_DEBUG
2566 2567 2568
      if (page_rec_is_supremum(rec))
        for (auto i= lock_rec_get_n_bits(lock); --i > PAGE_HEAP_NO_USER_LOW; )
          ut_ad(!lock_rec_get_nth_bit(lock, i));
2569
#endif /* UNIV_DEBUG */
2570 2571
    }
  }
2572 2573

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2574
  ut_ad(lock_rec_validate_page(block));
2575 2576 2577
#endif
}

2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590
/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list start is moved to another page. */
void
lock_rtr_move_rec_list(
/*===================*/
	const buf_block_t*	new_block,	/*!< in: index page to
						move to */
	const buf_block_t*	block,		/*!< in: index page */
	rtr_rec_move_t*		rec_move,       /*!< in: recording records
						moved */
	ulint			num_move)       /*!< in: num of rec to move */
{
2591 2592
  if (!num_move)
    return;
2593

2594
  const ulint comp= page_rec_is_comp(rec_move[0].old_rec);
2595

2596 2597 2598 2599 2600
  ut_ad(block->frame == page_align(rec_move[0].old_rec));
  ut_ad(new_block->frame == page_align(rec_move[0].new_rec));
  ut_ad(comp == page_rec_is_comp(rec_move[0].new_rec));
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
2601

2602
  {
2603
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2604

2605
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2606
         lock= lock_rec_get_next_on_page(lock))
2607 2608 2609 2610
    {
      const rec_t *rec1;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2611

2612 2613
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
2614

2615 2616 2617
      for (ulint moved= 0; moved < num_move; moved++)
      {
        ulint rec1_heap_no;
2618
        ulint rec2_heap_no;
2619

2620 2621 2622 2623
        rec1= rec_move[moved].old_rec;
        rec2= rec_move[moved].new_rec;
        ut_ad(!page_rec_is_metadata(rec1));
        ut_ad(!page_rec_is_metadata(rec2));
2624

2625
        if (comp)
2626 2627
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
2628 2629 2630
          rec2_heap_no= rec_get_heap_no_new(rec2);
        }
        else
2631 2632
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
2633
          rec2_heap_no= rec_get_heap_no_old(rec2);
2634

2635 2636 2637 2638 2639
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
        }

        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();
2640

2641 2642
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
2643 2644 2645
        {
          if (type_mode & LOCK_WAIT)
          {
2646
            ut_ad(lock_trx->lock.wait_lock == lock);
2647 2648
            lock->type_mode&= ~LOCK_WAIT;
          }
2649

2650
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id, new_block->frame,
2651
                                rec2_heap_no, lock->index, lock_trx, true);
2652

2653 2654 2655 2656
          rec_move[moved].moved= true;
        }

        lock_trx->mutex_unlock();
2657 2658 2659
      }
    }
  }
2660 2661

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2662
  ut_ad(lock_rec_validate_page(block));
2663 2664
#endif
}
2665 2666 2667 2668 2669 2670 2671 2672
/*************************************************************//**
Updates the lock table when a page is split to the right. */
void
lock_update_split_right(
/*====================*/
	const buf_block_t*	right_block,	/*!< in: right page */
	const buf_block_t*	left_block)	/*!< in: left page */
{
2673 2674 2675
  const ulint h= lock_get_min_heap_no(right_block);
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
2676

2677
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2678

2679 2680
  /* Move the locks on the supremum of the left page to the supremum
  of the right page */
2681

2682 2683
  lock_rec_move(g.cell2(), *right_block, r, g.cell1(), l,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
2684

2685 2686
  /* Inherit the locks to the supremum of left page from the successor
  of the infimum on right page */
2687 2688
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->frame,
                          PAGE_HEAP_NO_SUPREMUM, h);
2689 2690
}

2691 2692 2693 2694
#ifdef UNIV_DEBUG
static void lock_assert_no_spatial(const page_id_t id)
{
  const auto id_fold= id.fold();
2695 2696
  auto cell= lock_sys.prdt_page_hash.cell_get(id_fold);
  auto latch= lock_sys_t::hash_table::latch(cell);
2697 2698 2699
  latch->acquire();
  /* there should exist no page lock on the left page,
  otherwise, it will be blocked from merge */
2700
  ut_ad(!lock_sys_t::get_first(*cell, id));
2701
  latch->release();
2702 2703
  cell= lock_sys.prdt_hash.cell_get(id_fold);
  latch= lock_sys_t::hash_table::latch(cell);
2704
  latch->acquire();
2705
  ut_ad(!lock_sys_t::get_first(*cell, id));
2706 2707 2708 2709
  latch->release();
}
#endif

2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724
/*************************************************************//**
Updates the lock table when a page is merged to the right. */
void
lock_update_merge_right(
/*====================*/
	const buf_block_t*	right_block,	/*!< in: right page to
						which merged */
	const rec_t*		orig_succ,	/*!< in: original
						successor of infimum
						on the right page
						before merge */
	const buf_block_t*	left_block)	/*!< in: merged index
						page which will be
						discarded */
{
2725
  ut_ad(!page_rec_is_metadata(orig_succ));
2726

2727 2728 2729
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2730

2731 2732 2733
  /* Inherit the locks from the supremum of the left page to the
  original successor of infimum on the right page, to which the left
  page was merged */
2734
  lock_rec_inherit_to_gap(g.cell2(), r, g.cell1(), l, right_block->frame,
2735 2736
                          page_rec_get_heap_no(orig_succ),
                          PAGE_HEAP_NO_SUPREMUM);
2737

2738 2739
  /* Reset the locks on the supremum of the left page, releasing
  waiting transactions */
2740 2741
  lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(l, g.cell1(), lock_sys.rec_hash);
2742

2743
  ut_d(lock_assert_no_spatial(l));
2744 2745
}

2746 2747
/** Update locks when the root page is copied to another in
btr_root_raise_and_insert(). Note that we leave lock structs on the
2748 2749 2750 2751
root page, even though they do not make sense on other than leaf
pages: the reason is that in a pessimistic update the infimum record
of the root page will act as a dummy carrier of the locks of the record
to be updated. */
2752
void lock_update_root_raise(const buf_block_t &block, const page_id_t root)
2753
{
2754 2755
  const page_id_t id{block.page.id()};
  LockMultiGuard g{lock_sys.rec_hash, id, root};
2756
  /* Move the locks on the supremum of the root to the supremum of block */
2757 2758
  lock_rec_move(g.cell1(), block, id, g.cell2(), root,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
2759 2760
}

2761 2762 2763 2764
/** Update the lock table when a page is copied to another.
@param new_block  the target page
@param old        old page (not index root page) */
void lock_update_copy_and_discard(const buf_block_t &new_block, page_id_t old)
2765
{
2766 2767
  const page_id_t id{new_block.page.id()};
  LockMultiGuard g{lock_sys.rec_hash, id, old};
2768
  /* Move the locks on the supremum of the old page to the supremum of new */
2769 2770 2771
  lock_rec_move(g.cell1(), new_block, id, g.cell2(), old,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(old, g.cell2(), lock_sys.rec_hash);
2772 2773 2774 2775 2776 2777 2778 2779 2780 2781
}

/*************************************************************//**
Updates the lock table when a page is split to the left. */
void
lock_update_split_left(
/*===================*/
	const buf_block_t*	right_block,	/*!< in: right page */
	const buf_block_t*	left_block)	/*!< in: left page */
{
2782 2783 2784
  ulint h= lock_get_min_heap_no(right_block);
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
2785
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2786 2787
  /* Inherit the locks to the supremum of the left page from the
  successor of the infimum on the right page */
2788 2789
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->frame,
                          PAGE_HEAP_NO_SUPREMUM, h);
2790 2791
}

2792 2793 2794 2795 2796 2797
/** Update the lock table when a page is merged to the left.
@param left      left page
@param orig_pred original predecessor of supremum on the left page before merge
@param right     merged, to-be-discarded right page */
void lock_update_merge_left(const buf_block_t& left, const rec_t *orig_pred,
                            const page_id_t right)
2798
{
2799
  ut_ad(left.frame == page_align(orig_pred));
2800

2801
  const page_id_t l{left.page.id()};
2802

2803
  LockMultiGuard g{lock_sys.rec_hash, l, right};
2804
  const rec_t *left_next_rec= page_rec_get_next_const(orig_pred);
2805

2806 2807 2808 2809
  if (!page_rec_is_supremum(left_next_rec))
  {
    /* Inherit the locks on the supremum of the left page to the
    first record which was moved from the right page */
2810
    lock_rec_inherit_to_gap(g.cell1(), l, g.cell1(), l, left.frame,
2811 2812 2813 2814 2815
                            page_rec_get_heap_no(left_next_rec),
                            PAGE_HEAP_NO_SUPREMUM);

    /* Reset the locks on the supremum of the left page,
    releasing waiting transactions */
2816
    lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
2817
  }
2818

2819 2820
  /* Move the locks from the supremum of right page to the supremum
  of the left page */
2821 2822 2823
  lock_rec_move(g.cell1(), left, l, g.cell2(), right,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(right, g.cell2(), lock_sys.rec_hash);
2824

2825 2826
  /* there should exist no page lock on the right page,
  otherwise, it will be blocked from merge */
2827
  ut_d(lock_assert_no_spatial(right));
2828 2829
}

2830 2831 2832 2833 2834 2835
/*************************************************************//**
Resets the original locks on heir and replaces them with gap type locks
inherited from rec. */
void
lock_rec_reset_and_inherit_gap_locks(
/*=================================*/
2836
	const buf_block_t&	heir_block,	/*!< in: block containing the
2837
						record which inherits */
2838
	const page_id_t		donor,		/*!< in: page containing the
2839 2840 2841 2842 2843 2844 2845 2846
						record from which inherited;
						does NOT reset the locks on
						this record */
	ulint			heir_heap_no,	/*!< in: heap_no of the
						inheriting record */
	ulint			heap_no)	/*!< in: heap_no of the
						donating record */
{
2847
  const page_id_t heir{heir_block.page.id()};
2848
  LockMultiGuard g{lock_sys.rec_hash, heir, donor};
2849 2850 2851
  lock_rec_reset_and_release_wait(g.cell1(), heir, heir_heap_no);
  lock_rec_inherit_to_gap(g.cell1(), heir, g.cell2(), donor, heir_block.frame,
                          heir_heap_no, heap_no);
2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865
}

/*************************************************************//**
Updates the lock table when a page is discarded. */
void
lock_update_discard(
/*================*/
	const buf_block_t*	heir_block,	/*!< in: index page
						which will inherit the locks */
	ulint			heir_heap_no,	/*!< in: heap_no of the record
						which will inherit the locks */
	const buf_block_t*	block)		/*!< in: index page
						which will be discarded */
{
2866
	const page_t*	page = block->frame;
2867 2868
	const rec_t*	rec;
	ulint		heap_no;
2869
	const page_id_t	heir(heir_block->page.id());
2870
	const page_id_t	page_id(block->page.id());
2871
	LockMultiGuard	g{lock_sys.rec_hash, heir, page_id};
2872

2873
	if (lock_sys_t::get_first(g.cell2(), page_id)) {
2874
		ut_d(lock_assert_no_spatial(page_id));
2875 2876
		/* Inherit all the locks on the page to the record and
		reset all the locks on the page */
2877

2878 2879
		if (page_is_comp(page)) {
			rec = page + PAGE_NEW_INFIMUM;
2880

2881 2882
			do {
				heap_no = rec_get_heap_no_new(rec);
2883

2884 2885
				lock_rec_inherit_to_gap(g.cell1(), heir,
							g.cell2(), page_id,
2886
							heir_block->frame,
2887
							heir_heap_no, heap_no);
2888

2889
				lock_rec_reset_and_release_wait(
2890
					g.cell2(), page_id, heap_no);
2891

2892 2893 2894 2895
				rec = page + rec_get_next_offs(rec, TRUE);
			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
		} else {
			rec = page + PAGE_OLD_INFIMUM;
2896

2897 2898
			do {
				heap_no = rec_get_heap_no_old(rec);
2899

2900 2901
				lock_rec_inherit_to_gap(g.cell1(), heir,
							g.cell2(), page_id,
2902
							heir_block->frame,
2903
							heir_heap_no, heap_no);
2904

2905
				lock_rec_reset_and_release_wait(
2906
					g.cell2(), page_id, heap_no);
2907

2908 2909 2910
				rec = page + rec_get_next_offs(rec, FALSE);
			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
		}
2911

2912
		lock_rec_free_all_from_discard_page(page_id, g.cell2(),
2913
						    lock_sys.rec_hash);
2914
	} else {
2915
		const auto fold = page_id.fold();
2916 2917
		auto cell = lock_sys.prdt_hash.cell_get(fold);
		auto latch = lock_sys_t::hash_table::latch(cell);
2918
		latch->acquire();
2919
		lock_rec_free_all_from_discard_page(page_id, *cell,
2920 2921
						    lock_sys.prdt_hash);
		latch->release();
2922 2923
		cell = lock_sys.prdt_page_hash.cell_get(fold);
		latch = lock_sys_t::hash_table::latch(cell);
2924
		latch->acquire();
2925 2926
		lock_rec_free_all_from_discard_page(page_id, *cell,
						    lock_sys.prdt_page_hash);
2927
		latch->release();
2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942
	}
}

/*************************************************************//**
Updates the lock table when a new user record is inserted. */
void
lock_update_insert(
/*===============*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: the inserted record */
{
	ulint	receiver_heap_no;
	ulint	donator_heap_no;

	ut_ad(block->frame == page_align(rec));
2943
	ut_ad(!page_rec_is_metadata(rec));
2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974

	/* Inherit the gap-locking locks for rec, in gap mode, from the next
	record */

	if (page_rec_is_comp(rec)) {
		receiver_heap_no = rec_get_heap_no_new(rec);
		donator_heap_no = rec_get_heap_no_new(
			page_rec_get_next_low(rec, TRUE));
	} else {
		receiver_heap_no = rec_get_heap_no_old(rec);
		donator_heap_no = rec_get_heap_no_old(
			page_rec_get_next_low(rec, FALSE));
	}

	lock_rec_inherit_to_gap_if_gap_lock(
		block, receiver_heap_no, donator_heap_no);
}

/*************************************************************//**
Updates the lock table when a record is removed. */
void
lock_update_delete(
/*===============*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: the record to be removed */
{
	const page_t*	page = block->frame;
	ulint		heap_no;
	ulint		next_heap_no;

	ut_ad(page == page_align(rec));
2975
	ut_ad(!page_rec_is_metadata(rec));
2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988

	if (page_is_comp(page)) {
		heap_no = rec_get_heap_no_new(rec);
		next_heap_no = rec_get_heap_no_new(page
						   + rec_get_next_offs(rec,
								       TRUE));
	} else {
		heap_no = rec_get_heap_no_old(rec);
		next_heap_no = rec_get_heap_no_old(page
						   + rec_get_next_offs(rec,
								       FALSE));
	}

2989
	const page_id_t id{block->page.id()};
2990
	LockGuard g{lock_sys.rec_hash, id};
2991 2992 2993

	/* Let the next record inherit the locks from rec, in gap mode */

2994 2995
	lock_rec_inherit_to_gap(g.cell(), id, g.cell(), id, block->frame,
				next_heap_no, heap_no);
2996 2997

	/* Reset the lock bits on rec and release waiting transactions */
2998
	lock_rec_reset_and_release_wait(g.cell(), id, heap_no);
2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017
}

/*********************************************************************//**
Stores on the page infimum record the explicit locks of another record.
This function is used to store the lock state of a record when it is
updated and the size of the record changes in the update. The record
is moved in such an update, perhaps to another page. The infimum record
acts as a dummy carrier record, taking care of lock releases while the
actual record is being moved. */
void
lock_rec_store_on_page_infimum(
/*===========================*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: record whose lock state
					is stored on the infimum
					record of the same page; lock
					bits are reset on the
					record */
{
3018
  const ulint heap_no= page_rec_get_heap_no(rec);
3019

3020 3021
  ut_ad(block->frame == page_align(rec));
  const page_id_t id{block->page.id()};
3022

3023
  LockGuard g{lock_sys.rec_hash, id};
3024 3025
  lock_rec_move(g.cell(), *block, id, g.cell(), id,
                PAGE_HEAP_NO_INFIMUM, heap_no);
3026 3027
}

3028 3029 3030 3031 3032 3033 3034 3035
/** Restore the explicit lock requests on a single record, where the
state was stored on the infimum of a page.
@param block   buffer block containing rec
@param rec     record whose lock state is restored
@param donator page (rec is not necessarily on this page)
whose infimum stored the lock state; lock bits are reset on the infimum */
void lock_rec_restore_from_page_infimum(const buf_block_t &block,
					const rec_t *rec, page_id_t donator)
3036
{
3037
  const ulint heap_no= page_rec_get_heap_no(rec);
3038 3039 3040 3041
  const page_id_t id{block.page.id()};
  LockMultiGuard g{lock_sys.rec_hash, id, donator};
  lock_rec_move(g.cell1(), block, id, g.cell2(), donator, heap_no,
                PAGE_HEAP_NO_INFIMUM);
3042 3043
}

3044
/*========================= TABLE LOCKS ==============================*/
3045 3046

/*********************************************************************//**
3047 3048 3049
Creates a table lock object and adds it as the last in the lock queue
of the table. Does NOT check for deadlocks or lock compatibility.
@return own: new lock object */
3050
UNIV_INLINE
3051 3052 3053 3054 3055
lock_t*
lock_table_create(
/*==============*/
	dict_table_t*	table,	/*!< in/out: database table
				in dictionary cache */
3056
	unsigned	type_mode,/*!< in: lock mode possibly ORed with
3057
				LOCK_WAIT */
3058 3059
	trx_t*		trx,	/*!< in: trx */
	lock_t*		c_lock)	/*!< in: conflicting lock */
3060
{
3061
	lock_t*		lock;
3062

3063
	lock_sys.assert_locked(*table);
3064
	ut_ad(trx->mutex_is_owner());
3065 3066 3067
	ut_ad(!trx->is_wsrep() || lock_sys.is_writer());
	ut_ad(trx->state == TRX_STATE_ACTIVE || trx->is_recovered);
	ut_ad(!trx_is_autocommit_non_locking(trx));
3068

3069 3070
	switch (LOCK_MODE_MASK & type_mode) {
	case LOCK_AUTO_INC:
3071
		++table->n_waiting_or_granted_auto_inc_locks;
3072 3073 3074 3075 3076
		/* For AUTOINC locking we reuse the lock instance only if
		there is no wait involved else we allocate the waiting lock
		from the transaction lock heap. */
		if (type_mode == LOCK_AUTO_INC) {
			lock = table->autoinc_lock;
3077

3078 3079
			ut_ad(!table->autoinc_trx);
			table->autoinc_trx = trx;
3080

3081 3082 3083
			ib_vector_push(trx->autoinc_locks, &lock);
			goto allocated;
		}
3084

3085 3086 3087 3088 3089
		break;
	case LOCK_X:
	case LOCK_S:
		++table->n_lock_x_or_s;
		break;
3090 3091
	}

3092 3093 3094 3095 3096 3097
	lock = trx->lock.table_cached < array_elements(trx->lock.table_pool)
		? &trx->lock.table_pool[trx->lock.table_cached++]
		: static_cast<lock_t*>(
			mem_heap_alloc(trx->lock.lock_heap, sizeof *lock));

allocated:
3098 3099
	lock->type_mode = ib_uint32_t(type_mode | LOCK_TABLE);
	lock->trx = trx;
3100

3101
	lock->un_member.tab_lock.table = table;
3102

3103
	ut_ad(table->get_ref_count() > 0 || !table->can_be_evicted);
3104

3105
	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
3106

3107
	ut_list_append(table->locks, lock, TableLockGetNode());
3108

3109
	if (type_mode & LOCK_WAIT) {
3110 3111 3112 3113 3114 3115 3116 3117 3118
		if (trx->lock.wait_trx) {
			ut_ad(!c_lock || trx->lock.wait_trx == c_lock->trx);
			ut_ad(trx->lock.wait_lock);
			ut_ad((*trx->lock.wait_lock).trx == trx);
		} else {
			ut_ad(c_lock);
			trx->lock.wait_trx = c_lock->trx;
			ut_ad(!trx->lock.wait_lock);
		}
3119
		trx->lock.wait_lock = lock;
3120 3121
	}

3122 3123 3124 3125
	lock->trx->lock.table_locks.push_back(lock);

	MONITOR_INC(MONITOR_TABLELOCK_CREATED);
	MONITOR_INC(MONITOR_NUM_TABLELOCK);
3126 3127 3128 3129

	return(lock);
}

3130 3131 3132 3133 3134
/*************************************************************//**
Pops autoinc lock requests from the transaction's autoinc_locks. We
handle the case where there are gaps in the array and they need to
be popped off the stack. */
UNIV_INLINE
3135
void
3136 3137 3138
lock_table_pop_autoinc_locks(
/*=========================*/
	trx_t*	trx)	/*!< in/out: transaction that owns the AUTOINC locks */
3139
{
3140
	ut_ad(!ib_vector_is_empty(trx->autoinc_locks));
3141

3142 3143
	/* Skip any gaps, gaps are NULL lock entries in the
	trx->autoinc_locks vector. */
3144

3145 3146
	do {
		ib_vector_pop(trx->autoinc_locks);
3147

3148 3149 3150
		if (ib_vector_is_empty(trx->autoinc_locks)) {
			return;
		}
3151

3152 3153
	} while (*(lock_t**) ib_vector_get_last(trx->autoinc_locks) == NULL);
}
3154

3155 3156 3157 3158 3159 3160 3161 3162 3163
/*************************************************************//**
Removes an autoinc lock request from the transaction's autoinc_locks. */
UNIV_INLINE
void
lock_table_remove_autoinc_lock(
/*===========================*/
	lock_t*	lock,	/*!< in: table lock */
	trx_t*	trx)	/*!< in/out: transaction that owns the lock */
{
3164
	ut_ad(lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE));
3165
	lock_sys.assert_locked(*lock->un_member.tab_lock.table);
3166 3167 3168 3169
	ut_ad(trx->mutex_is_owner());

	auto s = ib_vector_size(trx->autoinc_locks);
	ut_ad(s);
3170

3171 3172 3173 3174
	/* With stored functions and procedures the user may drop
	a table within the same "statement". This special case has
	to be handled by deleting only those AUTOINC locks that were
	held by the table being dropped. */
3175

3176 3177
	lock_t*	autoinc_lock = *static_cast<lock_t**>(
		ib_vector_get(trx->autoinc_locks, --s));
3178

3179
	/* This is the default fast case. */
3180

3181 3182 3183 3184 3185
	if (autoinc_lock == lock) {
		lock_table_pop_autoinc_locks(trx);
	} else {
		/* The last element should never be NULL */
		ut_a(autoinc_lock != NULL);
3186

3187
		/* Handle freeing the locks from within the stack. */
3188

3189
		while (s) {
3190
			autoinc_lock = *static_cast<lock_t**>(
3191
				ib_vector_get(trx->autoinc_locks, --s));
3192

3193 3194
			if (autoinc_lock == lock) {
				void*	null_var = NULL;
3195
				ib_vector_set(trx->autoinc_locks, s, &null_var);
3196 3197
				return;
			}
3198
		}
3199

3200 3201
		/* Must find the autoinc lock. */
		ut_error;
3202
	}
3203 3204
}

3205 3206 3207 3208 3209
/*************************************************************//**
Removes a table lock request from the queue and the trx list of locks;
this is a low-level function which does NOT check if waiting requests
can now be granted. */
UNIV_INLINE
3210
const dict_table_t*
3211 3212 3213
lock_table_remove_low(
/*==================*/
	lock_t*	lock)	/*!< in/out: table lock */
3214
{
3215 3216
	ut_ad(lock->is_table());

3217 3218
	trx_t*		trx;
	dict_table_t*	table;
3219

3220
	ut_ad(lock->is_table());
3221 3222
	trx = lock->trx;
	table = lock->un_member.tab_lock.table;
3223
	lock_sys.assert_locked(*table);
3224
	ut_ad(trx->mutex_is_owner());
3225

3226 3227
	/* Remove the table from the transaction's AUTOINC vector, if
	the lock that is being released is an AUTOINC lock. */
3228 3229
	switch (lock->mode()) {
	case LOCK_AUTO_INC:
3230
		ut_ad((table->autoinc_trx == trx) == !lock->is_waiting());
3231

3232 3233
		if (table->autoinc_trx == trx) {
			table->autoinc_trx = NULL;
3234 3235 3236
			/* The locks must be freed in the reverse order from
			the one in which they were acquired. This is to avoid
			traversing the AUTOINC lock vector unnecessarily.
3237

3238 3239 3240
			We only store locks that were granted in the
			trx->autoinc_locks vector (see lock_table_create()
			and lock_grant()). */
3241 3242 3243
			lock_table_remove_autoinc_lock(lock, trx);
		}

3244 3245 3246 3247 3248 3249 3250 3251 3252 3253
		ut_ad(table->n_waiting_or_granted_auto_inc_locks);
		--table->n_waiting_or_granted_auto_inc_locks;
		break;
	case LOCK_X:
	case LOCK_S:
		ut_ad(table->n_lock_x_or_s);
		--table->n_lock_x_or_s;
		break;
	default:
		break;
3254 3255 3256 3257 3258 3259 3260
	}

	UT_LIST_REMOVE(trx->lock.trx_locks, lock);
	ut_list_remove(table->locks, lock, TableLockGetNode());

	MONITOR_INC(MONITOR_TABLELOCK_REMOVED);
	MONITOR_DEC(MONITOR_NUM_TABLELOCK);
3261
	return table;
3262 3263
}

3264 3265 3266
/*********************************************************************//**
Enqueues a waiting request for a table lock which cannot be granted
immediately. Checks for deadlocks.
3267
@retval	DB_LOCK_WAIT	if the waiting lock was enqueued
3268
@retval	DB_DEADLOCK	if this transaction was chosen as the victim */
3269
static
3270 3271 3272
dberr_t
lock_table_enqueue_waiting(
/*=======================*/
3273
	unsigned	mode,	/*!< in: lock mode this transaction is
3274 3275
				requesting */
	dict_table_t*	table,	/*!< in/out: table */
3276 3277
	que_thr_t*	thr,	/*!< in: query thread */
	lock_t*		c_lock)	/*!< in: conflicting lock or NULL */
3278
{
3279
	lock_sys.assert_locked(*table);
3280
	ut_ad(!srv_read_only_mode);
3281

3282 3283
	trx_t* trx = thr_get_trx(thr);
	ut_ad(trx->mutex_is_owner());
3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294

	switch (trx_get_dict_operation(trx)) {
	case TRX_DICT_OP_NONE:
		break;
	case TRX_DICT_OP_TABLE:
	case TRX_DICT_OP_INDEX:
		ib::error() << "A table lock wait happens in a dictionary"
			" operation. Table " << table->name
			<< ". " << BUG_REPORT_MSG;
		ut_ad(0);
	}
3295

3296
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
3297
	if (trx->is_wsrep() && trx->lock.was_chosen_as_deadlock_victim) {
3298 3299 3300
		return(DB_DEADLOCK);
	}
#endif /* WITH_WSREP */
3301

3302
	/* Enqueue the lock request that will wait to be granted */
3303
	lock_table_create(table, mode | LOCK_WAIT, trx, c_lock);
3304

3305
	trx->lock.wait_thr = thr;
3306 3307
	trx->lock.was_chosen_as_deadlock_victim
		IF_WSREP(.fetch_and(byte(~1)), = false);
3308

3309 3310 3311
	MONITOR_INC(MONITOR_TABLELOCK_WAIT);
	return(DB_LOCK_WAIT);
}
3312

3313 3314 3315 3316 3317
/*********************************************************************//**
Checks if other transactions have an incompatible mode lock request in
the lock queue.
@return lock or NULL */
UNIV_INLINE
3318
lock_t*
3319 3320 3321 3322 3323 3324 3325 3326 3327 3328
lock_table_other_has_incompatible(
/*==============================*/
	const trx_t*		trx,	/*!< in: transaction, or NULL if all
					transactions should be included */
	ulint			wait,	/*!< in: LOCK_WAIT if also
					waiting locks are taken into
					account, or 0 if not */
	const dict_table_t*	table,	/*!< in: table */
	lock_mode		mode)	/*!< in: lock mode */
{
3329
	lock_sys.assert_locked(*table);
3330

3331 3332 3333 3334 3335 3336 3337 3338 3339
	static_assert(LOCK_IS == 0, "compatibility");
	static_assert(LOCK_IX == 1, "compatibility");

	if (UNIV_LIKELY(mode <= LOCK_IX && !table->n_lock_x_or_s)) {
		return(NULL);
	}

	for (lock_t* lock = UT_LIST_GET_LAST(table->locks);
	     lock;
3340
	     lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock)) {
3341

3342 3343 3344
		trx_t* lock_trx = lock->trx;

		if (lock_trx != trx
3345 3346
		    && !lock_mode_compatible(lock->mode(), mode)
		    && (wait || !lock->is_waiting())) {
3347 3348 3349
			return(lock);
		}
	}
3350

3351 3352
	return(NULL);
}
3353

3354 3355 3356
/*********************************************************************//**
Locks the specified database table in the mode given. If the lock cannot
be granted immediately, the query thread is put to wait.
3357
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
3358 3359 3360 3361 3362 3363 3364 3365 3366 3367
dberr_t
lock_table(
/*=======*/
	dict_table_t*	table,	/*!< in/out: database table
				in dictionary cache */
	lock_mode	mode,	/*!< in: lock mode */
	que_thr_t*	thr)	/*!< in: query thread */
{
	trx_t*		trx;
	dberr_t		err;
3368
	lock_t*		wait_for;
3369

3370 3371
	if (table->is_temporary()) {
		return DB_SUCCESS;
3372
	}
3373

3374
	trx = thr_get_trx(thr);
3375

3376
	/* Look for equal or stronger locks the same trx already
3377
	has on the table. No need to acquire LockMutexGuard here
3378 3379
	because only this transacton can add/access table locks
	to/from trx_t::table_locks. */
3380

3381
	if (lock_table_has(trx, table, mode) || srv_read_only_mode) {
3382
		return(DB_SUCCESS);
3383 3384
	}

3385 3386 3387 3388
	/* Read only transactions can write to temp tables, we don't want
	to promote them to RW transactions. Their updates cannot be visible
	to other transactions. Therefore we can keep them out
	of the read views. */
3389

3390 3391 3392
	if ((mode == LOCK_IX || mode == LOCK_X)
	    && !trx->read_only
	    && trx->rsegs.m_redo.rseg == 0) {
3393

3394 3395
		trx_set_rw_mode(trx);
	}
3396

3397 3398
	err = DB_SUCCESS;

3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409
#ifdef WITH_WSREP
	if (trx->is_wsrep()) {
		lock_sys.wr_lock(SRW_LOCK_CALL);
	} else {
		lock_sys.rd_lock(SRW_LOCK_CALL);
		table->lock_mutex_lock();
	}
#else
	lock_sys.rd_lock(SRW_LOCK_CALL);
	table->lock_mutex_lock();
#endif
3410

3411 3412
	/* We have to check if the new lock is compatible with any locks
	other transactions have in the table lock queue. */
3413

3414 3415
	wait_for = lock_table_other_has_incompatible(
		trx, LOCK_WAIT, table, mode);
3416

3417
	trx->mutex_lock();
3418

3419
	if (wait_for) {
3420
		err = lock_table_enqueue_waiting(mode, table, thr, wait_for);
3421
	} else {
3422
		lock_table_create(table, mode, trx, wait_for);
3423
	}
3424

3425 3426 3427 3428 3429 3430 3431 3432 3433
#ifdef WITH_WSREP
	if (trx->is_wsrep()) {
		lock_sys.wr_unlock();
		trx->mutex_unlock();
		return err;
	}
#endif
	table->lock_mutex_unlock();
	lock_sys.rd_unlock();
3434
	trx->mutex_unlock();
3435 3436

	return(err);
3437 3438
}

3439
/** Create a table lock object for a resurrected transaction.
3440
@param table    table to be X-locked
3441 3442 3443
@param trx      transaction
@param mode     LOCK_X or LOCK_IX */
void lock_table_resurrect(dict_table_t *table, trx_t *trx, lock_mode mode)
3444 3445
{
  ut_ad(trx->is_recovered);
3446 3447 3448
  ut_ad(mode == LOCK_X || mode == LOCK_IX);

  if (lock_table_has(trx, table, mode))
3449 3450
    return;

3451
  {
3452
    LockMutexGuard g{SRW_LOCK_CALL};
3453
    ut_ad(!lock_table_other_has_incompatible(trx, LOCK_WAIT, table, mode));
3454

3455
    trx->mutex_lock();
3456
    lock_table_create(table, mode, trx, nullptr);
3457
  }
3458
  trx->mutex_unlock();
3459 3460
}

3461 3462
/** Find a lock that a waiting table lock request still has to wait for. */
static const lock_t *lock_table_has_to_wait_in_queue(const lock_t *wait_lock)
3463
{
3464 3465
  ut_ad(wait_lock->is_waiting());
  ut_ad(wait_lock->is_table());
3466

3467 3468
  dict_table_t *table= wait_lock->un_member.tab_lock.table;
  lock_sys.assert_locked(*table);
3469

3470 3471
  static_assert(LOCK_IS == 0, "compatibility");
  static_assert(LOCK_IX == 1, "compatibility");
3472

3473 3474
  if (UNIV_LIKELY(wait_lock->mode() <= LOCK_IX && !table->n_lock_x_or_s))
    return nullptr;
3475

3476 3477 3478 3479
  for (const lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock != wait_lock;
       lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
    if (lock_has_to_wait(wait_lock, lock))
      return lock;
3480

3481
  return nullptr;
3482
}
3483

3484 3485 3486
/*************************************************************//**
Removes a table lock request, waiting or granted, from the queue and grants
locks to other transactions in the queue, if they now are entitled to a
3487 3488 3489 3490
lock.
@param[in,out]	in_lock		table lock
@param[in]	owns_wait_mutex	whether lock_sys.wait_mutex is held */
static void lock_table_dequeue(lock_t *in_lock, bool owns_wait_mutex)
3491
{
3492 3493 3494
#ifdef SAFE_MUTEX
	ut_ad(owns_wait_mutex == mysql_mutex_is_owner(&lock_sys.wait_mutex));
#endif
3495
	ut_ad(in_lock->trx->mutex_is_owner());
3496
	lock_t*	lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, in_lock);
3497

3498
	const dict_table_t* table = lock_table_remove_low(in_lock);
3499

3500 3501 3502 3503 3504 3505 3506
	static_assert(LOCK_IS == 0, "compatibility");
	static_assert(LOCK_IX == 1, "compatibility");

	if (UNIV_LIKELY(in_lock->mode() <= LOCK_IX && !table->n_lock_x_or_s)) {
		return;
	}

3507 3508
	bool acquired = false;

3509 3510
	/* Check if waiting locks in the queue can now be granted: grant
	locks if there are no conflicting locks ahead. */
3511

3512 3513 3514
	for (/* No op */;
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
3515 3516 3517 3518 3519 3520 3521 3522
		if (!lock->is_waiting()) {
			continue;
		}

		if (!owns_wait_mutex) {
			mysql_mutex_lock(&lock_sys.wait_mutex);
			acquired = owns_wait_mutex = true;
		}
3523

3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535
		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

		if (const lock_t* c = lock_table_has_to_wait_in_queue(lock)) {
			trx_t* c_trx = c->trx;
			lock->trx->lock.wait_trx = c_trx;
			if (c_trx->lock.wait_trx
			    && innodb_deadlock_detect
			    && Deadlock::to_check.emplace(c_trx).second) {
				Deadlock::to_be_checked = true;
			}
		} else {
3536 3537
			/* Grant the lock */
			ut_ad(in_lock->trx != lock->trx);
3538
			in_lock->trx->mutex_unlock();
3539
			lock_grant(lock);
3540
			in_lock->trx->mutex_lock();
3541 3542
		}
	}
3543 3544 3545 3546

	if (acquired) {
		mysql_mutex_unlock(&lock_sys.wait_mutex);
	}
3547 3548
}

3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580
/** Sets a lock on a table based on the given mode.
@param[in]	table	table to lock
@param[in,out]	trx	transaction
@param[in]	mode	LOCK_X or LOCK_S
@return error code or DB_SUCCESS. */
dberr_t
lock_table_for_trx(
	dict_table_t*	table,
	trx_t*		trx,
	enum lock_mode	mode)
{
	mem_heap_t*	heap;
	que_thr_t*	thr;
	dberr_t		err;
	sel_node_t*	node;
	heap = mem_heap_create(512);

	node = sel_node_create(heap);
	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
	thr->graph->state = QUE_FORK_ACTIVE;

	/* We use the select query graph as the dummy graph needed
	in the lock module call */

	thr = static_cast<que_thr_t*>(
		que_fork_get_first_thr(
			static_cast<que_fork_t*>(que_node_get_parent(thr))));

run_again:
	thr->run_node = thr;
	thr->prev_node = thr->common.parent;

3581
	err = lock_table(table, mode, thr);
3582 3583 3584

	trx->error_state = err;

3585
	if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
3586
		if (row_mysql_handle_errors(&err, trx, thr, NULL)) {
3587 3588 3589 3590 3591 3592 3593 3594 3595 3596
			goto run_again;
		}
	}

	que_graph_free(thr->graph);
	trx->op_info = "";

	return(err);
}

3597
/*=========================== LOCK RELEASE ==============================*/
3598

3599 3600 3601 3602 3603 3604 3605 3606 3607
/*************************************************************//**
Removes a granted record lock of a transaction from the queue and grants
locks to other transactions waiting in the queue if they now are entitled
to a lock. */
void
lock_rec_unlock(
/*============*/
	trx_t*			trx,	/*!< in/out: transaction that has
					set a record lock */
3608
	const page_id_t		id,	/*!< in: page containing rec */
3609 3610
	const rec_t*		rec,	/*!< in: record */
	lock_mode		lock_mode)/*!< in: LOCK_S or LOCK_X */
3611
{
3612 3613 3614
	lock_t*		first_lock;
	lock_t*		lock;
	ulint		heap_no;
3615

3616 3617 3618 3619
	ut_ad(trx);
	ut_ad(rec);
	ut_ad(!trx->lock.wait_lock);
	ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
3620
	ut_ad(!page_rec_is_metadata(rec));
3621

3622
	heap_no = page_rec_get_heap_no(rec);
3623

3624
	LockGuard g{lock_sys.rec_hash, id};
3625

3626
	first_lock = lock_sys_t::get_first(g.cell(), id, heap_no);
3627

3628 3629
	/* Find the last lock with the same lock_mode and transaction
	on the record. */
3630

3631 3632
	for (lock = first_lock; lock != NULL;
	     lock = lock_rec_get_next(heap_no, lock)) {
3633
		if (lock->trx == trx && lock->mode() == lock_mode) {
3634 3635
			goto released;
		}
3636 3637
	}

3638 3639 3640 3641
	{
		ib::error	err;
		err << "Unlock row could not find a " << lock_mode
			<< " mode lock on the record. Current statement: ";
Marko Mäkelä's avatar
Marko Mäkelä committed
3642 3643 3644 3645
		size_t		stmt_len;
		if (const char* stmt = innobase_get_stmt_unsafe(
			    trx->mysql_thd, &stmt_len)) {
			err.write(stmt, stmt_len);
3646
		}
3647
	}
3648

3649
	return;
3650

3651
released:
3652
	ut_a(!lock->is_waiting());
3653
	trx->mutex_lock();
3654
	lock_rec_reset_nth_bit(lock, heap_no);
3655
	trx->mutex_unlock();
3656

3657
	/* Check if we can now grant waiting lock requests */
3658

3659 3660
	for (lock = first_lock; lock != NULL;
	     lock = lock_rec_get_next(heap_no, lock)) {
3661
		if (!lock->is_waiting()) {
3662 3663
			continue;
		}
3664 3665 3666 3667
		mysql_mutex_lock(&lock_sys.wait_mutex);
		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

3668 3669
		if (const lock_t* c = lock_rec_has_to_wait_in_queue(g.cell(),
								    lock)) {
3670 3671 3672 3673 3674
			lock->trx->lock.wait_trx = c->trx;
		} else {
			/* Grant the lock */
			ut_ad(trx != lock->trx);
			lock_grant(lock);
3675
		}
3676
		mysql_mutex_unlock(&lock_sys.wait_mutex);
3677
	}
3678
}
3679

3680
/** Release the explicit locks of a committing transaction,
3681 3682 3683
and release possible other transactions waiting because of these locks.
@return whether the operation succeeded */
static bool lock_release_try(trx_t *trx)
3684
{
3685 3686 3687 3688
  /* At this point, trx->lock.trx_locks cannot be modified by other
  threads, because our transaction has been committed.
  See the checks and assertions in lock_rec_create_low() and
  lock_rec_add_to_queue().
3689

3690 3691 3692 3693 3694
  The function lock_table_create() should never be invoked on behalf
  of a transaction running in another thread. Also there, we will
  assert that the current transaction be active. */
  DBUG_ASSERT(trx->state == TRX_STATE_COMMITTED_IN_MEMORY);
  DBUG_ASSERT(!trx->is_referenced());
3695

3696 3697 3698 3699
  bool all_released= true;
restart:
  ulint count= 1000;
  lock_sys.rd_lock(SRW_LOCK_CALL);
3700
  trx->mutex_lock();
3701

3702 3703 3704 3705 3706 3707
  /* Note: Anywhere else, trx->mutex is not held while acquiring
  a lock table latch, but here we are following the opposite order.
  To avoid deadlocks, we only try to acquire the lock table latches
  but not keep waiting for them. */

  for (lock_t *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock; )
3708 3709
  {
    ut_ad(lock->trx == trx);
3710
    lock_t *prev= UT_LIST_GET_PREV(trx_locks, lock);
3711 3712 3713 3714 3715 3716
    if (!lock->is_table())
    {
      ut_ad(!lock->index->table->is_temporary());
      ut_ad(lock->mode() != LOCK_X ||
            lock->index->table->id >= DICT_HDR_FIRST_ID ||
            trx->dict_operation);
3717
      auto &lock_hash= lock_sys.hash_get(lock->type_mode);
3718 3719
      auto cell= lock_hash.cell_get(lock->un_member.rec_lock.page_id.fold());
      auto latch= lock_sys_t::hash_table::latch(cell);
3720 3721 3722 3723 3724 3725 3726
      if (!latch->try_acquire())
        all_released= false;
      else
      {
        lock_rec_dequeue_from_page(lock, false);
        latch->release();
      }
3727 3728 3729
    }
    else
    {
3730
      dict_table_t *table= lock->un_member.tab_lock.table;
3731 3732 3733 3734
      ut_ad(!table->is_temporary());
      ut_ad(table->id >= DICT_HDR_FIRST_ID ||
            (lock->mode() != LOCK_IX && lock->mode() != LOCK_X) ||
            trx->dict_operation);
3735 3736 3737 3738 3739 3740 3741
      if (!table->lock_mutex_trylock())
        all_released= false;
      else
      {
        lock_table_dequeue(lock, false);
        table->lock_mutex_unlock();
      }
3742
    }
3743

3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759
    lock= all_released ? UT_LIST_GET_LAST(trx->lock.trx_locks) : prev;
    if (!--count)
      break;
  }

  lock_sys.rd_unlock();
  trx->mutex_unlock();
  if (all_released && !count)
    goto restart;
  return all_released;
}

/** Release the explicit locks of a committing transaction,
and release possible other transactions waiting because of these locks. */
void lock_release(trx_t *trx)
{
3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771
#if defined SAFE_MUTEX && defined UNIV_DEBUG
  std::set<table_id_t> to_evict;
  if (innodb_evict_tables_on_commit_debug && !trx->is_recovered)
# if 1 /* if dict_stats_exec_sql() were not playing dirty tricks */
    if (!dict_sys.mutex_is_locked())
# else /* this would be more proper way to do it */
    if (!trx->dict_operation_lock_mode && !trx->dict_operation)
# endif
      for (const auto& p: trx->mod_tables)
        if (!p.first->is_temporary())
          to_evict.emplace(p.first->id);
#endif
3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787
  ulint count;

  for (count= 5; count--; )
    if (lock_release_try(trx))
      goto released;

  /* Fall back to acquiring lock_sys.latch in exclusive mode */
restart:
  count= 1000;
  lock_sys.wr_lock(SRW_LOCK_CALL);
  trx->mutex_lock();

  while (lock_t *lock= UT_LIST_GET_LAST(trx->lock.trx_locks))
  {
    ut_ad(lock->trx == trx);
    if (!lock->is_table())
3788
    {
3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799 3800 3801 3802
      ut_ad(!lock->index->table->is_temporary());
      ut_ad(lock->mode() != LOCK_X ||
            lock->index->table->id >= DICT_HDR_FIRST_ID ||
            trx->dict_operation);
      lock_rec_dequeue_from_page(lock, false);
    }
    else
    {
      ut_d(dict_table_t *table= lock->un_member.tab_lock.table);
      ut_ad(!table->is_temporary());
      ut_ad(table->id >= DICT_HDR_FIRST_ID ||
            (lock->mode() != LOCK_IX && lock->mode() != LOCK_X) ||
            trx->dict_operation);
      lock_table_dequeue(lock, false);
3803
    }
3804

3805 3806
    if (!--count)
      break;
3807
  }
3808

3809
  lock_sys.wr_unlock();
3810
  trx->mutex_unlock();
3811 3812 3813 3814
  if (!count)
    goto restart;

released:
3815 3816 3817
  if (UNIV_UNLIKELY(Deadlock::to_be_checked))
  {
    mysql_mutex_lock(&lock_sys.wait_mutex);
3818
    lock_sys.deadlock_check();
3819 3820
    mysql_mutex_unlock(&lock_sys.wait_mutex);
  }
3821

3822 3823
  trx->lock.was_chosen_as_deadlock_victim= false;
  trx->lock.n_rec_locks= 0;
3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837

#if defined SAFE_MUTEX && defined UNIV_DEBUG
  if (to_evict.empty())
    return;
  dict_sys.mutex_lock();
  LockMutexGuard g{SRW_LOCK_CALL};
  for (const table_id_t id : to_evict)
  {
    if (dict_table_t *table= dict_sys.get_table(id))
      if (!table->get_ref_count() && !UT_LIST_GET_LEN(table->locks))
        dict_sys.remove(table, true);
  }
  dict_sys.mutex_unlock();
#endif
3838 3839
}

3840 3841 3842
/*********************************************************************//**
Removes table locks of the transaction on a table to be dropped. */
static
3843
void
3844 3845 3846
lock_trx_table_locks_remove(
/*========================*/
	const lock_t*	lock_to_remove)		/*!< in: lock to remove */
3847
{
3848
	trx_t*		trx = lock_to_remove->trx;
3849

3850
	ut_ad(lock_to_remove->is_table());
3851
	lock_sys.assert_locked(*lock_to_remove->un_member.tab_lock.table);
3852
	ut_ad(trx->mutex_is_owner());
3853

3854 3855
	for (lock_list::iterator it = trx->lock.table_locks.begin(),
             end = trx->lock.table_locks.end(); it != end; ++it) {
3856 3857
		const lock_t*	lock = *it;

3858
		ut_ad(!lock || trx == lock->trx);
3859
		ut_ad(!lock || lock->is_table());
3860
		ut_ad(!lock || lock->un_member.tab_lock.table);
3861

3862 3863 3864 3865
		if (lock == lock_to_remove) {
			*it = NULL;
			return;
		}
3866 3867
	}

3868 3869
	/* Lock must exist in the vector. */
	ut_error;
3870 3871
}

3872
/*===================== VALIDATION AND DEBUGGING ====================*/
3873

3874 3875 3876 3877
/** Print info of a table lock.
@param[in,out]	file	output stream
@param[in]	lock	table lock */
static
3878
void
3879
lock_table_print(FILE* file, const lock_t* lock)
3880
{
3881
	lock_sys.assert_locked();
3882
	ut_a(lock->is_table());
3883

3884 3885 3886
	fputs("TABLE LOCK table ", file);
	ut_print_name(file, lock->trx,
		      lock->un_member.tab_lock.table->name.m_name);
3887
	fprintf(file, " trx id " TRX_ID_FMT, lock->trx->id);
3888

3889 3890
	switch (auto mode = lock->mode()) {
	case LOCK_S:
3891
		fputs(" lock mode S", file);
3892 3893
		break;
	case LOCK_X:
3894 3895
		ut_ad(lock->trx->id != 0);
		fputs(" lock mode X", file);
3896 3897
		break;
	case LOCK_IS:
3898
		fputs(" lock mode IS", file);
3899 3900
		break;
	case LOCK_IX:
3901 3902
		ut_ad(lock->trx->id != 0);
		fputs(" lock mode IX", file);
3903 3904
		break;
	case LOCK_AUTO_INC:
3905
		fputs(" lock mode AUTO-INC", file);
3906 3907 3908
		break;
	default:
		fprintf(file, " unknown lock mode %u", mode);
3909 3910
	}

3911
	if (lock->is_waiting()) {
3912 3913
		fputs(" waiting", file);
	}
3914

3915
	putc('\n', file);
3916 3917
}

Marko Mäkelä's avatar
Marko Mäkelä committed
3918
/** Pretty-print a record lock.
3919
@param[in,out]	file	output stream
Marko Mäkelä's avatar
Marko Mäkelä committed
3920 3921 3922
@param[in]	lock	record lock
@param[in,out]	mtr	mini-transaction for accessing the record */
static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr)
3923
{
3924
	ut_ad(!lock->is_table());
3925

3926
	const page_id_t page_id{lock->un_member.rec_lock.page_id};
3927
	ut_d(lock_sys.hash_get(lock->type_mode).assert_locked(page_id));
3928

3929 3930 3931 3932
	fprintf(file, "RECORD LOCKS space id %u page no %u n bits " ULINTPF
		" index %s of table ",
		page_id.space(), page_id.page_no(),
		lock_rec_get_n_bits(lock),
3933
		lock->index->name());
3934
	ut_print_name(file, lock->trx, lock->index->table->name.m_name);
3935
	fprintf(file, " trx id " TRX_ID_FMT, lock->trx->id);
3936

3937 3938
	switch (lock->mode()) {
	case LOCK_S:
3939
		fputs(" lock mode S", file);
3940 3941
		break;
	case LOCK_X:
3942
		fputs(" lock_mode X", file);
3943 3944
		break;
	default:
3945 3946
		ut_error;
	}
3947

3948
	if (lock->is_gap()) {
3949 3950
		fputs(" locks gap before rec", file);
	}
3951

3952
	if (lock->is_record_not_gap()) {
3953 3954
		fputs(" locks rec but not gap", file);
	}
3955

3956
	if (lock->is_insert_intention()) {
3957 3958
		fputs(" insert intention", file);
	}
3959

3960
	if (lock->is_waiting()) {
3961 3962
		fputs(" waiting", file);
	}
3963

3964 3965
	putc('\n', file);

Marko Mäkelä's avatar
Marko Mäkelä committed
3966
	mem_heap_t*		heap		= NULL;
3967 3968
	rec_offs		offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*		offsets		= offsets_;
Marko Mäkelä's avatar
Marko Mäkelä committed
3969
	rec_offs_init(offsets_);
3970

Marko Mäkelä's avatar
Marko Mäkelä committed
3971
	mtr.start();
3972
	const buf_block_t* block = buf_page_try_get(page_id, &mtr);
3973 3974 3975 3976 3977 3978 3979 3980 3981 3982

	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {

		if (!lock_rec_get_nth_bit(lock, i)) {
			continue;
		}

		fprintf(file, "Record lock, heap no %lu", (ulong) i);

		if (block) {
3983
			ut_ad(page_is_leaf(block->frame));
3984 3985 3986 3987
			const rec_t*	rec;

			rec = page_find_rec_with_heap_no(
				buf_block_get_frame(block), i);
3988
			ut_ad(!page_rec_is_metadata(rec));
3989

3990
			offsets = rec_get_offsets(
3991
				rec, lock->index, offsets, true,
3992 3993 3994 3995
				ULINT_UNDEFINED, &heap);

			putc(' ', file);
			rec_print_new(file, rec, offsets);
3996
		}
3997 3998

		putc('\n', file);
3999 4000
	}

Marko Mäkelä's avatar
Marko Mäkelä committed
4001
	mtr.commit();
4002

Marko Mäkelä's avatar
Marko Mäkelä committed
4003
	if (UNIV_LIKELY_NULL(heap)) {
4004 4005
		mem_heap_free(heap);
	}
4006 4007
}

4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018
#ifdef UNIV_DEBUG
/* Print the number of lock structs from lock_print_info_summary() only
in non-production builds for performance reasons, see
http://bugs.mysql.com/36942 */
#define PRINT_NUM_OF_LOCK_STRUCTS
#endif /* UNIV_DEBUG */

#ifdef PRINT_NUM_OF_LOCK_STRUCTS
/*********************************************************************//**
Calculates the number of record lock structs in the record lock hash table.
@return number of record locks */
4019
static ulint lock_get_n_rec_locks()
4020
{
4021 4022
	ulint	n_locks	= 0;
	ulint	i;
4023

4024
	lock_sys.assert_locked();
4025

4026
	for (i = 0; i < lock_sys.rec_hash.n_cells; i++) {
4027
		const lock_t*	lock;
4028

4029
		for (lock = static_cast<const lock_t*>(
4030
			     HASH_GET_FIRST(&lock_sys.rec_hash, i));
4031 4032 4033
		     lock != 0;
		     lock = static_cast<const lock_t*>(
				HASH_GET_NEXT(hash, lock))) {
4034

4035 4036 4037
			n_locks++;
		}
	}
4038

4039 4040 4041
	return(n_locks);
}
#endif /* PRINT_NUM_OF_LOCK_STRUCTS */
4042

4043 4044
/*********************************************************************//**
Prints info of locks for all transactions.
4045
@return FALSE if not able to acquire lock_sys.latch (and dislay info) */
4046 4047 4048 4049
ibool
lock_print_info_summary(
/*====================*/
	FILE*	file,	/*!< in: file where to print */
4050
	ibool	nowait)	/*!< in: whether to wait for lock_sys.latch */
4051 4052
{
	if (!nowait) {
4053 4054
		lock_sys.wr_lock(SRW_LOCK_CALL);
	} else if (!lock_sys.wr_lock_try()) {
4055 4056 4057 4058
		fputs("FAIL TO OBTAIN LOCK MUTEX,"
		      " SKIP LOCK INFO PRINTING\n", file);
		return(FALSE);
	}
4059

4060
	if (lock_sys.deadlocks) {
4061 4062 4063 4064 4065 4066
		fputs("------------------------\n"
		      "LATEST DETECTED DEADLOCK\n"
		      "------------------------\n", file);

		if (!srv_read_only_mode) {
			ut_copy_file(file, lock_latest_err_file);
4067 4068 4069
		}
	}

4070 4071 4072
	fputs("------------\n"
	      "TRANSACTIONS\n"
	      "------------\n", file);
4073

4074
	fprintf(file, "Trx id counter " TRX_ID_FMT "\n",
4075
		trx_sys.get_max_trx_id());
4076

4077 4078
	fprintf(file,
		"Purge done for trx's n:o < " TRX_ID_FMT
4079
		" undo n:o < " TRX_ID_FMT " state: %s\n"
4080
		"History list length %u\n",
4081
		purge_sys.tail.trx_no(),
4082 4083 4084 4085 4086
		purge_sys.tail.undo_no,
		purge_sys.enabled()
		? (purge_sys.running() ? "running"
		   : purge_sys.paused() ? "stopped" : "running but idle")
		: "disabled",
4087
		uint32_t{trx_sys.rseg_history_len});
4088

4089 4090 4091 4092 4093 4094 4095
#ifdef PRINT_NUM_OF_LOCK_STRUCTS
	fprintf(file,
		"Total number of lock structs in row lock hash table %lu\n",
		(ulong) lock_get_n_rec_locks());
#endif /* PRINT_NUM_OF_LOCK_STRUCTS */
	return(TRUE);
}
4096

4097 4098
/** Prints transaction lock wait and MVCC state.
@param[in,out]	file	file where to print
Marko Mäkelä's avatar
Marko Mäkelä committed
4099
@param[in]	trx	transaction
4100 4101 4102
@param[in]	now	current my_hrtime_coarse() */
void lock_trx_print_wait_and_mvcc_state(FILE *file, const trx_t *trx,
                                        my_hrtime_t now)
4103
{
4104
	fprintf(file, "---");
4105

4106
	trx_print_latched(file, trx, 600);
4107
	trx->read_view.print_limits(file);
4108

4109
	if (const lock_t* wait_lock = trx->lock.wait_lock) {
4110
		const my_hrtime_t suspend_time= trx->lock.suspend_time;
4111
		fprintf(file,
4112
			"------- TRX HAS BEEN WAITING %llu ns"
4113
			" FOR THIS LOCK TO BE GRANTED:\n",
4114
			now.val - suspend_time.val);
4115

4116
		if (!wait_lock->is_table()) {
Marko Mäkelä's avatar
Marko Mäkelä committed
4117
			mtr_t mtr;
4118
			lock_rec_print(file, wait_lock, mtr);
4119
		} else {
4120
			lock_table_print(file, wait_lock);
4121 4122
		}

4123 4124 4125
		fprintf(file, "------------------\n");
	}
}
4126

4127
/*********************************************************************//**
4128
Prints info of locks for a transaction. */
4129
static
4130
void
4131 4132 4133
lock_trx_print_locks(
/*=================*/
	FILE*		file,		/*!< in/out: File to write */
4134
	const trx_t*	trx)		/*!< in: current transaction */
4135
{
Marko Mäkelä's avatar
Marko Mäkelä committed
4136
	mtr_t mtr;
4137
	uint32_t i= 0;
4138
	/* Iterate over the transaction's locks. */
4139
	lock_sys.assert_locked();
4140 4141 4142
	for (lock_t *lock = UT_LIST_GET_FIRST(trx->lock.trx_locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
4143
		if (!lock->is_table()) {
Marko Mäkelä's avatar
Marko Mäkelä committed
4144
			lock_rec_print(file, lock, mtr);
4145 4146 4147 4148
		} else {
			lock_table_print(file, lock);
		}

4149
		if (++i == 10) {
4150 4151 4152 4153 4154 4155

			fprintf(file,
				"10 LOCKS PRINTED FOR THIS TRX:"
				" SUPPRESSING FURTHER PRINTS\n");

			break;
4156 4157
		}
	}
4158
}
4159

Marko Mäkelä's avatar
Marko Mäkelä committed
4160
/** Functor to display all transactions */
4161
struct lock_print_info
4162
{
4163
  lock_print_info(FILE* file, my_hrtime_t now) :
4164
    file(file), now(now),
4165
    purge_trx(purge_sys.query ? purge_sys.query->trx : nullptr)
4166
  {}
4167

4168
  void operator()(const trx_t &trx) const
4169
  {
4170
    if (UNIV_UNLIKELY(&trx == purge_trx))
4171
      return;
4172
    lock_trx_print_wait_and_mvcc_state(file, &trx, now);
4173

4174 4175
    if (trx.will_lock && srv_print_innodb_lock_monitor)
      lock_trx_print_locks(file, &trx);
4176
  }
4177

4178
  FILE* const file;
4179
  const my_hrtime_t now;
4180
  const trx_t* const purge_trx;
4181
};
4182

4183
/*********************************************************************//**
4184 4185
Prints info of locks for each transaction. This function will release
lock_sys.latch, which the caller must be holding in exclusive mode. */
4186 4187 4188 4189
void
lock_print_info_all_transactions(
/*=============================*/
	FILE*		file)	/*!< in/out: file where to print */
4190
{
4191 4192
	fprintf(file, "LIST OF TRANSACTIONS FOR EACH SESSION:\n");

4193
	trx_sys.trx_list.for_each(lock_print_info(file, my_hrtime_coarse()));
4194
	lock_sys.wr_unlock();
4195

4196
	ut_d(lock_validate());
4197 4198
}

4199
#ifdef UNIV_DEBUG
4200
/*********************************************************************//**
4201 4202 4203 4204 4205 4206 4207 4208
Find the the lock in the trx_t::trx_lock_t::table_locks vector.
@return true if found */
static
bool
lock_trx_table_locks_find(
/*======================*/
	trx_t*		trx,		/*!< in: trx to validate */
	const lock_t*	find_lock)	/*!< in: lock to find */
4209
{
4210
	bool		found = false;
4211

4212 4213
	ut_ad(trx->mutex_is_owner());

4214 4215
	for (lock_list::const_iterator it = trx->lock.table_locks.begin(),
             end = trx->lock.table_locks.end(); it != end; ++it) {
4216

4217
		const lock_t*	lock = *it;
4218

4219
		if (lock == NULL) {
4220

4221
			continue;
4222

4223
		} else if (lock == find_lock) {
4224

4225 4226 4227 4228
			/* Can't be duplicates. */
			ut_a(!found);
			found = true;
		}
4229

4230
		ut_a(trx == lock->trx);
4231
		ut_a(lock->is_table());
4232
		ut_a(lock->un_member.tab_lock.table != NULL);
4233 4234
	}

4235 4236
	return(found);
}
4237 4238

/*********************************************************************//**
4239 4240 4241 4242 4243 4244 4245
Validates the lock queue on a table.
@return TRUE if ok */
static
ibool
lock_table_queue_validate(
/*======================*/
	const dict_table_t*	table)	/*!< in: table */
4246
{
4247 4248
	const lock_t*	lock;

4249
	lock_sys.assert_locked(*table);
4250

4251 4252 4253
	for (lock = UT_LIST_GET_FIRST(table->locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
4254

4255
		/* lock->trx->state cannot change from or to NOT_STARTED
4256
		while we are holding the lock_sys.latch. It may change
4257
		from ACTIVE or PREPARED to PREPARED or COMMITTED. */
4258
		lock->trx->mutex_lock();
4259
		check_trx_state(lock->trx);
4260

Marko Mäkelä's avatar
Marko Mäkelä committed
4261
		if (lock->trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4262
		} else if (!lock->is_waiting()) {
4263 4264
			ut_a(!lock_table_other_has_incompatible(
				     lock->trx, 0, table,
4265
				     lock->mode()));
4266 4267 4268 4269 4270
		} else {
			ut_a(lock_table_has_to_wait_in_queue(lock));
		}

		ut_a(lock_trx_table_locks_find(lock->trx, lock));
4271
		lock->trx->mutex_unlock();
4272 4273 4274
	}

	return(TRUE);
4275 4276 4277
}

/*********************************************************************//**
4278 4279 4280
Validates the lock queue on a single record.
@return TRUE if ok */
static
4281
bool
4282 4283
lock_rec_queue_validate(
/*====================*/
4284
	bool			locked_lock_trx_sys,
4285
					/*!< in: if the caller holds
4286
					both the lock_sys.latch and
4287
					trx_sys_t->lock. */
4288
	const page_id_t		id,	/*!< in: page identifier */
4289 4290
	const rec_t*		rec,	/*!< in: record to look at */
	const dict_index_t*	index,	/*!< in: index, or NULL if not known */
4291
	const rec_offs*		offsets)/*!< in: rec_get_offsets(rec, index) */
4292
{
4293 4294
	const lock_t*	lock;
	ulint		heap_no;
4295

4296 4297 4298
	ut_a(rec);
	ut_ad(rec_offs_validate(rec, index, offsets));
	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
4299
	ut_ad(page_rec_is_leaf(rec));
4300 4301
	ut_ad(!index || dict_index_is_clust(index)
	      || !dict_index_is_online_ddl(index));
4302

4303
	heap_no = page_rec_get_heap_no(rec);
4304

4305
	if (!locked_lock_trx_sys) {
4306
		lock_sys.wr_lock(SRW_LOCK_CALL);
4307
	}
4308

4309 4310
	hash_cell_t &cell= *lock_sys.rec_hash.cell_get(id.fold());
	lock_sys.assert_locked(cell);
4311

4312
	if (!page_rec_is_user_rec(rec)) {
4313

4314
		for (lock = lock_sys_t::get_first(cell, id, heap_no);
4315 4316
		     lock != NULL;
		     lock = lock_rec_get_next_const(heap_no, lock)) {
4317

4318
			ut_ad(!index || lock->index == index);
4319

4320
			lock->trx->mutex_lock();
4321 4322 4323
			ut_ad(!trx_is_ac_nl_ro(lock->trx));
			ut_ad(trx_state_eq(lock->trx,
					   TRX_STATE_COMMITTED_IN_MEMORY)
4324
			      || !lock->is_waiting()
4325
			      || lock_rec_has_to_wait_in_queue(cell, lock));
4326
			lock->trx->mutex_unlock();
4327
		}
4328

Marko Mäkelä's avatar
Marko Mäkelä committed
4329 4330
func_exit:
		if (!locked_lock_trx_sys) {
4331
			lock_sys.wr_unlock();
4332
		}
4333

Marko Mäkelä's avatar
Marko Mäkelä committed
4334
		return true;
4335 4336
	}

4337 4338
	ut_ad(page_rec_is_leaf(rec));

Marko Mäkelä's avatar
Marko Mäkelä committed
4339 4340 4341
	const trx_id_t impl_trx_id = index && index->is_primary()
		? lock_clust_rec_some_has_impl(rec, index, offsets)
		: 0;
4342

Marko Mäkelä's avatar
Marko Mäkelä committed
4343 4344 4345
	if (trx_t *impl_trx = impl_trx_id
	    ? trx_sys.find(current_trx(), impl_trx_id, false)
	    : 0) {
4346 4347
		/* impl_trx could have been committed before we
		acquire its mutex, but not thereafter. */
4348

4349
		impl_trx->mutex_lock();
4350 4351
		ut_ad(impl_trx->state != TRX_STATE_NOT_STARTED);
		if (impl_trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4352 4353
		} else if (const lock_t* other_lock
			   = lock_rec_other_has_expl_req(
4354 4355
				   LOCK_S, cell, id, true, heap_no,
				   impl_trx)) {
4356 4357 4358 4359 4360
			/* The impl_trx is holding an implicit lock on the
			given record 'rec'. So there cannot be another
			explicit granted lock.  Also, there can be another
			explicit waiting lock only if the impl_trx has an
			explicit granted lock. */
4361

4362
#ifdef WITH_WSREP
4363 4364 4365 4366 4367 4368 4369 4370 4371 4372 4373 4374 4375 4376 4377 4378 4379 4380 4381
			/** Galera record locking rules:
			* If there is no other record lock to the same record, we may grant
			the lock request.
			* If there is other record lock but this requested record lock is
			compatible, we may grant the lock request.
			* If there is other record lock and it is not compatible with
			requested lock, all normal transactions must wait.
			* BF (brute force) additional exceptions :
			** If BF already holds record lock for requested record, we may
			grant new record lock even if there is conflicting record lock(s)
			waiting on a queue.
			** If conflicting transaction holds requested record lock,
			we will cancel this record lock and select conflicting transaction
			for BF abort or kill victim.
			** If conflicting transaction is waiting for requested record lock
			we will cancel this wait and select conflicting transaction
			for BF abort or kill victim.
			** There should not be two BF transactions waiting for same record lock
			*/
4382
			if (other_lock->trx->is_wsrep() && !other_lock->is_waiting()) {
4383 4384
				wsrep_report_bf_lock_wait(impl_trx->mysql_thd, impl_trx->id);
				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
4385

4386
				if (!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
4387
						       cell, id, heap_no,
4388
						       impl_trx)) {
4389
					ib::info() << "WSREP impl BF lock conflict";
4390
				}
4391
			} else
4392
#endif /* WITH_WSREP */
4393
			{
4394
				ut_ad(other_lock->is_waiting());
4395
				ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
4396 4397
						        cell, id, heap_no,
							impl_trx));
4398
			}
4399
		}
4400

4401
		impl_trx->mutex_unlock();
4402
	}
4403

4404
	for (lock = lock_sys_t::get_first(cell, id, heap_no);
4405 4406
	     lock != NULL;
	     lock = lock_rec_get_next_const(heap_no, lock)) {
4407

4408
		ut_ad(!trx_is_ac_nl_ro(lock->trx));
4409
		ut_ad(!page_rec_is_metadata(rec));
4410

4411 4412 4413
		if (index) {
			ut_a(lock->index == index);
		}
4414

4415 4416
		if (lock->is_waiting()) {
			ut_a(lock->is_gap()
4417
			     || lock_rec_has_to_wait_in_queue(cell, lock));
4418 4419 4420
		} else if (!lock->is_gap()) {
			const lock_mode	mode = lock->mode() == LOCK_S
				? LOCK_X : LOCK_S;
4421

4422 4423
			const lock_t*	other_lock
				= lock_rec_other_has_expl_req(
4424 4425
					mode, cell, id, false, heap_no,
					lock->trx);
4426
#ifdef WITH_WSREP
4427 4428 4429 4430 4431 4432 4433 4434 4435 4436 4437 4438
			if (UNIV_UNLIKELY(other_lock && lock->trx->is_wsrep())) {
				/* Only BF transaction may be granted
				lock before other conflicting lock
				request. */
				if (!wsrep_thd_is_BF(lock->trx->mysql_thd, FALSE)
				    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
					/* If no BF, this case is a bug. */
					wsrep_report_bf_lock_wait(lock->trx->mysql_thd, lock->trx->id);
					wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
					ut_error;
				}
			} else
4439
#endif /* WITH_WSREP */
4440
			ut_ad(!other_lock);
4441
		}
4442
	}
4443

Marko Mäkelä's avatar
Marko Mäkelä committed
4444
	goto func_exit;
4445
}
4446

4447 4448 4449 4450 4451
/** Validate the record lock queues on a page.
@param block    buffer pool block
@param latched  whether the tablespace latch may be held
@return true if ok */
static bool lock_rec_validate_page(const buf_block_t *block, bool latched)
4452
{
4453 4454 4455 4456 4457 4458
	const lock_t*	lock;
	const rec_t*	rec;
	ulint		nth_lock	= 0;
	ulint		nth_bit		= 0;
	ulint		i;
	mem_heap_t*	heap		= NULL;
4459 4460
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*	offsets		= offsets_;
4461
	rec_offs_init(offsets_);
4462

4463
	const page_id_t id{block->page.id()};
4464 4465

	LockGuard g{lock_sys.rec_hash, id};
4466
loop:
4467
	lock = lock_sys_t::get_first(g.cell(), id);
4468

4469 4470
	if (!lock) {
		goto function_exit;
4471 4472
	}

4473
	DBUG_ASSERT(block->page.status != buf_page_t::FREED);
4474

4475
	for (i = 0; i < nth_lock; i++) {
4476

4477
		lock = lock_rec_get_next_on_page_const(lock);
4478

4479 4480
		if (!lock) {
			goto function_exit;
4481 4482 4483
		}
	}

4484
	ut_ad(!trx_is_ac_nl_ro(lock->trx));
4485

4486
	/* Only validate the record queues when this thread is not
4487 4488
	holding a tablespace latch. */
	if (!latched)
4489
	for (i = nth_bit; i < lock_rec_get_n_bits(lock); i++) {
4490

4491 4492
		if (i == PAGE_HEAP_NO_SUPREMUM
		    || lock_rec_get_nth_bit(lock, i)) {
4493

4494 4495
			rec = page_find_rec_with_heap_no(block->frame, i);
			ut_a(rec);
4496 4497
			ut_ad(!lock_rec_get_nth_bit(lock, i)
			      || page_rec_is_leaf(rec));
4498
			offsets = rec_get_offsets(rec, lock->index, offsets,
4499 4500
						  true, ULINT_UNDEFINED,
						  &heap);
4501

4502 4503 4504 4505
			/* If this thread is holding the file space
			latch (fil_space_t::latch), the following
			check WILL break the latching order and may
			cause a deadlock of threads. */
4506

4507
			lock_rec_queue_validate(
4508
				true, id, rec, lock->index, offsets);
Sergei Golubchik's avatar
Sergei Golubchik committed
4509

4510
			nth_bit = i + 1;
4511

4512
			goto loop;
4513 4514 4515
		}
	}

4516 4517
	nth_bit = 0;
	nth_lock++;
4518

4519
	goto loop;
4520

4521 4522 4523 4524
function_exit:
	if (heap != NULL) {
		mem_heap_free(heap);
	}
4525 4526 4527
	return(TRUE);
}

4528 4529 4530
/*********************************************************************//**
Validate record locks up to a limit.
@return lock at limit or NULL if no more locks in the hash bucket */
4531
static MY_ATTRIBUTE((warn_unused_result))
4532 4533 4534
const lock_t*
lock_rec_validate(
/*==============*/
4535
	ulint		start,		/*!< in: lock_sys.rec_hash
4536
					bucket */
4537
	page_id_t*	limit)		/*!< in/out: upper limit of
4538 4539
					(space, page_no) */
{
4540
	lock_sys.assert_locked();
4541

4542
	for (const lock_t* lock = static_cast<const lock_t*>(
4543
		     HASH_GET_FIRST(&lock_sys.rec_hash, start));
4544 4545
	     lock != NULL;
	     lock = static_cast<const lock_t*>(HASH_GET_NEXT(hash, lock))) {
4546

4547
		ut_ad(!trx_is_ac_nl_ro(lock->trx));
4548
		ut_ad(!lock->is_table());
4549

4550
		page_id_t current(lock->un_member.rec_lock.page_id);
4551

4552 4553 4554 4555
		if (current > *limit) {
			*limit = current + 1;
			return(lock);
		}
4556 4557
	}

4558 4559
	return(0);
}
4560

4561 4562
/*********************************************************************//**
Validate a record lock's block */
4563
static void lock_rec_block_validate(const page_id_t page_id)
4564 4565 4566 4567
{
	/* The lock and the block that it is referring to may be freed at
	this point. We pass BUF_GET_POSSIBLY_FREED to skip a debug check.
	If the lock exists in lock_rec_validate_page() we assert
4568
	block->page.status != FREED. */
4569

4570 4571
	buf_block_t*	block;
	mtr_t		mtr;
4572

4573 4574 4575 4576 4577
	/* Transactional locks should never refer to dropped
	tablespaces, because all DDL operations that would drop or
	discard or rebuild a tablespace do hold an exclusive table
	lock, which would conflict with any locks referring to the
	tablespace from other transactions. */
4578
	if (fil_space_t* space = fil_space_t::get(page_id.space())) {
4579 4580
		dberr_t err = DB_SUCCESS;
		mtr_start(&mtr);
4581

4582
		block = buf_page_get_gen(
4583
			page_id,
4584
			space->zip_size(),
4585 4586
			RW_X_LATCH, NULL,
			BUF_GET_POSSIBLY_FREED,
4587
			&mtr, &err);
4588

4589 4590
		if (err != DB_SUCCESS) {
			ib::error() << "Lock rec block validate failed for tablespace "
4591
				   << space->name
4592
				   << page_id << " err " << err;
4593 4594
		}

Marko Mäkelä's avatar
Marko Mäkelä committed
4595 4596
		ut_ad(!block || block->page.status == buf_page_t::FREED
		      || lock_rec_validate_page(block, space->is_latched()));
4597

4598
		mtr_commit(&mtr);
4599

4600
		space->release();
4601
	}
4602
}
4603

4604

4605
static my_bool lock_validate_table_locks(rw_trx_hash_element_t *element, void*)
4606
{
4607
  lock_sys.assert_locked();
4608
  mysql_mutex_lock(&element->mutex);
4609 4610 4611 4612 4613 4614
  if (element->trx)
  {
    check_trx_state(element->trx);
    for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
         lock != NULL;
         lock= UT_LIST_GET_NEXT(trx_locks, lock))
4615
      if (lock->is_table())
4616 4617
        lock_table_queue_validate(lock->un_member.tab_lock.table);
  }
4618
  mysql_mutex_unlock(&element->mutex);
4619 4620 4621 4622
  return 0;
}


4623 4624
/** Validate the transactional locks. */
static void lock_validate()
4625
{
4626 4627
  std::set<page_id_t> pages;
  {
4628
    LockMutexGuard g{SRW_LOCK_CALL};
4629 4630
    /* Validate table locks */
    trx_sys.rw_trx_hash.iterate(lock_validate_table_locks);
4631

4632 4633 4634 4635 4636 4637 4638 4639 4640 4641 4642 4643
    for (ulint i= 0; i < lock_sys.rec_hash.n_cells; i++)
    {
      page_id_t limit{0, 0};
      while (const lock_t *lock= lock_rec_validate(i, &limit))
      {
        if (lock_rec_find_set_bit(lock) == ULINT_UNDEFINED)
          /* The lock bitmap is empty; ignore it. */
          continue;
        pages.insert(lock->un_member.rec_lock.page_id);
      }
    }
  }
4644

4645 4646
  for (page_id_t page_id : pages)
    lock_rec_block_validate(page_id);
4647 4648 4649 4650 4651 4652 4653 4654 4655 4656
}
#endif /* UNIV_DEBUG */
/*============ RECORD LOCK CHECKS FOR ROW OPERATIONS ====================*/

/*********************************************************************//**
Checks if locks of other transactions prevent an immediate insert of
a record. If they do, first tests if the query thread should anyway
be suspended for some reason; if not, then puts the transaction and
the query thread to the lock wait state and inserts a waiting request
for a gap x-lock to the lock queue.
4657
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
4658 4659 4660 4661 4662 4663 4664 4665
dberr_t
lock_rec_insert_check_and_lock(
/*===========================*/
	const rec_t*	rec,	/*!< in: record after which to insert */
	buf_block_t*	block,	/*!< in/out: buffer block of rec */
	dict_index_t*	index,	/*!< in: index */
	que_thr_t*	thr,	/*!< in: query thread */
	mtr_t*		mtr,	/*!< in/out: mini-transaction */
Eugene Kosov's avatar
Eugene Kosov committed
4666
	bool*		inherit)/*!< out: set to true if the new
4667 4668 4669 4670
				inserted record maybe should inherit
				LOCK_GAP type locks from the successor
				record */
{
4671 4672 4673 4674
  ut_ad(block->frame == page_align(rec));
  ut_ad(mtr->is_named_space(index->table->space));
  ut_ad(page_is_leaf(block->frame));
  ut_ad(!index->table->is_temporary());
4675

4676 4677 4678 4679 4680 4681 4682
  dberr_t err= DB_SUCCESS;
  bool inherit_in= *inherit;
  trx_t *trx= thr_get_trx(thr);
  const rec_t *next_rec= page_rec_get_next_const(rec);
  ulint heap_no= page_rec_get_heap_no(next_rec);
  const page_id_t id{block->page.id()};
  ut_ad(!rec_is_metadata(next_rec, *index));
4683

4684
  {
4685
    LockGuard g{lock_sys.rec_hash, id};
4686 4687 4688
    /* Because this code is invoked for a running transaction by
    the thread that is serving the transaction, it is not necessary
    to hold trx->mutex here. */
4689

4690 4691 4692 4693
    /* When inserting a record into an index, the table must be at
    least IX-locked. When we are building an index, we would pass
    BTR_NO_LOCKING_FLAG and skip the locking altogether. */
    ut_ad(lock_table_has(trx, index->table, LOCK_IX));
4694

4695
    *inherit= lock_sys_t::get_first(g.cell(), id, heap_no);
4696

4697 4698 4699 4700 4701 4702 4703 4704 4705 4706 4707 4708 4709 4710 4711 4712 4713 4714
    if (*inherit)
    {
      /* Spatial index does not use GAP lock protection. It uses
      "predicate lock" to protect the "range" */
      if (index->is_spatial())
        return DB_SUCCESS;

      /* If another transaction has an explicit lock request which locks
      the gap, waiting or granted, on the successor, the insert has to wait.

      An exception is the case where the lock by the another transaction
      is a gap type lock which it placed to wait for its turn to insert. We
      do not consider that kind of a lock conflicting with our insert. This
      eliminates an unnecessary deadlock which resulted when 2 transactions
      had to wait for their insert. Both had waiting gap type lock requests
      on the successor, which produced an unnecessary deadlock. */
      const unsigned type_mode= LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION;

4715 4716 4717
      if (lock_t *c_lock= lock_rec_other_has_conflicting(type_mode,
                                                         g.cell(), id,
                                                         heap_no, trx))
4718 4719
      {
        trx->mutex_lock();
4720 4721
        err= lock_rec_enqueue_waiting(c_lock, type_mode, id, block->frame,
                                      heap_no, index, thr, nullptr);
4722 4723 4724 4725
        trx->mutex_unlock();
      }
    }
  }
4726

4727 4728 4729 4730 4731 4732 4733 4734 4735 4736 4737 4738 4739
  switch (err) {
  case DB_SUCCESS_LOCKED_REC:
    err = DB_SUCCESS;
    /* fall through */
  case DB_SUCCESS:
    if (!inherit_in || index->is_clust())
      break;
    /* Update the page max trx id field */
    page_update_max_trx_id(block, buf_block_get_page_zip(block), trx->id, mtr);
  default:
    /* We only care about the two return values. */
    break;
  }
4740

4741
#ifdef UNIV_DEBUG
4742 4743 4744 4745 4746
  {
    mem_heap_t *heap= nullptr;
    rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
    const rec_offs *offsets;
    rec_offs_init(offsets_);
4747

4748 4749
    offsets= rec_get_offsets(next_rec, index, offsets_, true,
                             ULINT_UNDEFINED, &heap);
4750

4751
    ut_ad(lock_rec_queue_validate(false, id, next_rec, index, offsets));
4752

4753 4754 4755
    if (UNIV_LIKELY_NULL(heap))
      mem_heap_free(heap);
  }
4756 4757
#endif /* UNIV_DEBUG */

4758
  return err;
4759 4760 4761
}

/*********************************************************************//**
4762 4763 4764 4765
Creates an explicit record lock for a running transaction that currently only
has an implicit lock on the record. The transaction instance must have a
reference count > 0 so that it can't be committed and freed before this
function has completed. */
4766
static
4767 4768 4769
void
lock_rec_convert_impl_to_expl_for_trx(
/*==================================*/
4770
	const page_id_t		id,	/*!< in: page identifier */
4771 4772 4773 4774
	const rec_t*		rec,	/*!< in: user record on page */
	dict_index_t*		index,	/*!< in: index of record */
	trx_t*			trx,	/*!< in/out: active transaction */
	ulint			heap_no)/*!< in: rec heap number to lock */
4775
{
4776 4777 4778
  ut_ad(trx->is_referenced());
  ut_ad(page_rec_is_leaf(rec));
  ut_ad(!rec_is_metadata(rec, *index));
4779

4780 4781
  DEBUG_SYNC_C("before_lock_rec_convert_impl_to_expl_for_trx");
  {
4782
    LockGuard g{lock_sys.rec_hash, id};
4783 4784
    trx->mutex_lock();
    ut_ad(!trx_state_eq(trx, TRX_STATE_NOT_STARTED));
4785

4786
    if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) &&
4787 4788 4789 4790
        !lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id, heap_no,
                           trx))
      lock_rec_add_to_queue(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id,
                            page_align(rec), heap_no, index, trx, true);
4791
  }
4792

4793 4794
  trx->mutex_unlock();
  trx->release_reference();
4795

4796
  DEBUG_SYNC_C("after_lock_rec_convert_impl_to_expl_for_trx");
4797 4798
}

4799 4800 4801 4802 4803

#ifdef UNIV_DEBUG
struct lock_rec_other_trx_holds_expl_arg
{
  const ulint heap_no;
4804
  const hash_cell_t &cell;
4805 4806
  const page_id_t id;
  const trx_t &impl_trx;
4807 4808 4809 4810 4811 4812 4813
};


static my_bool lock_rec_other_trx_holds_expl_callback(
  rw_trx_hash_element_t *element,
  lock_rec_other_trx_holds_expl_arg *arg)
{
4814
  mysql_mutex_lock(&element->mutex);
4815 4816
  if (element->trx)
  {
4817
    element->trx->mutex_lock();
Marko Mäkelä's avatar
Marko Mäkelä committed
4818 4819
    ut_ad(element->trx->state != TRX_STATE_NOT_STARTED);
    lock_t *expl_lock= element->trx->state == TRX_STATE_COMMITTED_IN_MEMORY
4820 4821
      ? nullptr
      : lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP,
4822
                          arg->cell, arg->id, arg->heap_no, element->trx);
4823 4824 4825 4826
    /*
      An explicit lock is held by trx other than the trx holding the implicit
      lock.
    */
4827
    ut_ad(!expl_lock || expl_lock->trx == &arg->impl_trx);
4828
    element->trx->mutex_unlock();
4829
  }
4830
  mysql_mutex_unlock(&element->mutex);
4831 4832 4833 4834 4835 4836 4837 4838 4839 4840 4841 4842 4843 4844 4845
  return 0;
}


/**
  Checks if some transaction, other than given trx_id, has an explicit
  lock on the given rec.

  FIXME: if the current transaction holds implicit lock from INSERT, a
  subsequent locking read should not convert it to explicit. See also
  MDEV-11215.

  @param      caller_trx  trx of current thread
  @param[in]  trx         trx holding implicit lock on rec
  @param[in]  rec         user record
4846
  @param[in]  id          page identifier
4847 4848 4849 4850
*/

static void lock_rec_other_trx_holds_expl(trx_t *caller_trx, trx_t *trx,
                                          const rec_t *rec,
4851
                                          const page_id_t id)
4852 4853 4854
{
  if (trx)
  {
4855
    ut_ad(!page_rec_is_metadata(rec));
4856
    LockGuard g{lock_sys.rec_hash, id};
4857
    ut_ad(trx->is_referenced());
4858
    const trx_state_t state{trx->state};
4859 4860
    ut_ad(state != TRX_STATE_NOT_STARTED);
    if (state == TRX_STATE_COMMITTED_IN_MEMORY)
4861
      /* The transaction was committed before we acquired LockGuard. */
4862
      return;
4863
    lock_rec_other_trx_holds_expl_arg arg=
4864
    { page_rec_get_heap_no(rec), g.cell(), id, *trx };
4865
    trx_sys.rw_trx_hash.iterate(caller_trx,
4866
                                lock_rec_other_trx_holds_expl_callback, &arg);
4867 4868 4869 4870 4871
  }
}
#endif /* UNIV_DEBUG */


4872 4873 4874 4875 4876 4877 4878 4879 4880 4881 4882 4883
/** If an implicit x-lock exists on a record, convert it to an explicit one.

Often, this is called by a transaction that is about to enter a lock wait
due to the lock conflict. Two explicit locks would be created: first the
exclusive lock on behalf of the lock-holder transaction in this function,
and then a wait request on behalf of caller_trx, in the calling function.

This may also be called by the same transaction that is already holding
an implicit exclusive lock on the record. In this case, no explicit lock
should be created.

@param[in,out]	caller_trx	current transaction
4884
@param[in]	id		index tree leaf page identifier
4885 4886 4887 4888
@param[in]	rec		record on the leaf page
@param[in]	index		the index of the record
@param[in]	offsets		rec_get_offsets(rec,index)
@return	whether caller_trx already holds an exclusive lock on rec */
4889
static
4890
bool
4891
lock_rec_convert_impl_to_expl(
4892
	trx_t*			caller_trx,
4893
	page_id_t		id,
4894 4895
	const rec_t*		rec,
	dict_index_t*		index,
4896
	const rec_offs*		offsets)
4897
{
4898
	trx_t*		trx;
4899

4900
	lock_sys.assert_unlocked();
4901
	ut_ad(page_rec_is_user_rec(rec));
4902 4903
	ut_ad(rec_offs_validate(rec, index, offsets));
	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
4904
	ut_ad(page_rec_is_leaf(rec));
4905
	ut_ad(!rec_is_metadata(rec, *index));
4906

4907 4908
	if (dict_index_is_clust(index)) {
		trx_id_t	trx_id;
4909

4910 4911
		trx_id = lock_clust_rec_some_has_impl(rec, index, offsets);

4912 4913 4914 4915 4916 4917 4918
		if (trx_id == 0) {
			return false;
		}
		if (UNIV_UNLIKELY(trx_id == caller_trx->id)) {
			return true;
		}

4919
		trx = trx_sys.find(caller_trx, trx_id);
4920 4921 4922
	} else {
		ut_ad(!dict_index_is_online_ddl(index));

4923 4924
		trx = lock_sec_rec_some_has_impl(caller_trx, rec, index,
						 offsets);
4925 4926 4927 4928
		if (trx == caller_trx) {
			trx->release_reference();
			return true;
		}
4929

4930
		ut_d(lock_rec_other_trx_holds_expl(caller_trx, trx, rec, id));
4931 4932
	}

4933
	if (trx) {
4934
		ulint	heap_no = page_rec_get_heap_no(rec);
4935

4936
		ut_ad(trx->is_referenced());
4937

4938 4939 4940
		/* If the transaction is still active and has no
		explicit x-lock set on the record, set one for it.
		trx cannot be committed until the ref count is zero. */
4941

4942
		lock_rec_convert_impl_to_expl_for_trx(
4943
			id, rec, index, trx, heap_no);
4944
	}
4945 4946

	return false;
4947
}
4948

4949 4950 4951 4952 4953 4954 4955
/*********************************************************************//**
Checks if locks of other transactions prevent an immediate modify (update,
delete mark, or delete unmark) of a clustered index record. If they do,
first tests if the query thread should anyway be suspended for some
reason; if not, then puts the transaction and the query thread to the
lock wait state and inserts a waiting request for a record x-lock to the
lock queue.
4956
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
4957 4958 4959 4960 4961 4962 4963
dberr_t
lock_clust_rec_modify_check_and_lock(
/*=================================*/
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: record which should be
					modified */
	dict_index_t*		index,	/*!< in: clustered index */
4964
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
4965 4966 4967 4968
	que_thr_t*		thr)	/*!< in: query thread */
{
	dberr_t	err;
	ulint	heap_no;
4969

4970
	ut_ad(rec_offs_validate(rec, index, offsets));
4971
	ut_ad(page_rec_is_leaf(rec));
4972 4973
	ut_ad(dict_index_is_clust(index));
	ut_ad(block->frame == page_align(rec));
4974

4975
	ut_ad(!rec_is_metadata(rec, *index));
4976
	ut_ad(!index->table->is_temporary());
4977

4978 4979 4980
	heap_no = rec_offs_comp(offsets)
		? rec_get_heap_no_new(rec)
		: rec_get_heap_no_old(rec);
4981

4982 4983
	/* If a transaction has no explicit x-lock set on the record, set one
	for it */
4984

4985 4986
	if (lock_rec_convert_impl_to_expl(thr_get_trx(thr), block->page.id(),
					  rec, index, offsets)) {
4987 4988 4989
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
	}
4990

4991
	err = lock_rec_lock(true, LOCK_X | LOCK_REC_NOT_GAP,
4992
			    block, heap_no, index, thr);
4993

4994 4995
	ut_ad(lock_rec_queue_validate(false, block->page.id(),
				      rec, index, offsets));
4996 4997 4998

	if (err == DB_SUCCESS_LOCKED_REC) {
		err = DB_SUCCESS;
4999 5000
	}

5001
	return(err);
5002 5003 5004
}

/*********************************************************************//**
5005 5006
Checks if locks of other transactions prevent an immediate modify (delete
mark or delete unmark) of a secondary index record.
5007
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5008 5009 5010 5011 5012 5013 5014 5015 5016 5017 5018 5019 5020 5021 5022
dberr_t
lock_sec_rec_modify_check_and_lock(
/*===============================*/
	ulint		flags,	/*!< in: if BTR_NO_LOCKING_FLAG
				bit is set, does nothing */
	buf_block_t*	block,	/*!< in/out: buffer block of rec */
	const rec_t*	rec,	/*!< in: record which should be
				modified; NOTE: as this is a secondary
				index, we always have to modify the
				clustered index record first: see the
				comment below */
	dict_index_t*	index,	/*!< in: secondary index */
	que_thr_t*	thr,	/*!< in: query thread
				(can be NULL if BTR_NO_LOCKING_FLAG) */
	mtr_t*		mtr)	/*!< in/out: mini-transaction */
5023
{
5024 5025
	dberr_t	err;
	ulint	heap_no;
5026

5027 5028 5029
	ut_ad(!dict_index_is_clust(index));
	ut_ad(!dict_index_is_online_ddl(index) || (flags & BTR_CREATE_FLAG));
	ut_ad(block->frame == page_align(rec));
5030
	ut_ad(mtr->is_named_space(index->table->space));
5031
	ut_ad(page_rec_is_leaf(rec));
5032
	ut_ad(!rec_is_metadata(rec, *index));
5033

5034
	if (flags & BTR_NO_LOCKING_FLAG) {
5035

5036
		return(DB_SUCCESS);
5037
	}
5038
	ut_ad(!index->table->is_temporary());
5039

5040
	heap_no = page_rec_get_heap_no(rec);
5041

5042 5043 5044 5045 5046 5047 5048 5049 5050 5051
#ifdef WITH_WSREP
	trx_t *trx= thr_get_trx(thr);
	/* If transaction scanning an unique secondary key is wsrep
	high priority thread (brute force) this scanning may involve
	GAP-locking in the index. As this locking happens also when
	applying replication events in high priority applier threads,
	there is a probability for lock conflicts between two wsrep
	high priority threads. To avoid this GAP-locking we mark that
	this transaction is using unique key scan here. */
	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
Marko Mäkelä's avatar
Marko Mäkelä committed
5052
		trx->wsrep = 3;
5053 5054
#endif /* WITH_WSREP */

5055 5056 5057 5058
	/* Another transaction cannot have an implicit lock on the record,
	because when we come here, we already have modified the clustered
	index record, and this would not have been possible if another active
	transaction had modified this secondary index record. */
5059

5060
	err = lock_rec_lock(true, LOCK_X | LOCK_REC_NOT_GAP,
5061
			    block, heap_no, index, thr);
5062

5063
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
5064
	if (trx->wsrep == 3) trx->wsrep = 1;
5065
#endif /* WITH_WSREP */
5066

5067 5068 5069
#ifdef UNIV_DEBUG
	{
		mem_heap_t*	heap		= NULL;
5070 5071
		rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
		const rec_offs*	offsets;
5072
		rec_offs_init(offsets_);
5073

5074
		offsets = rec_get_offsets(rec, index, offsets_, true,
5075
					  ULINT_UNDEFINED, &heap);
5076

5077
		ut_ad(lock_rec_queue_validate(
5078
			      false, block->page.id(), rec, index, offsets));
5079

5080 5081
		if (heap != NULL) {
			mem_heap_free(heap);
5082 5083
		}
	}
5084
#endif /* UNIV_DEBUG */
5085

5086 5087 5088 5089 5090 5091 5092 5093 5094
	if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
		/* Update the page max trx id field */
		/* It might not be necessary to do this if
		err == DB_SUCCESS (no new lock created),
		but it should not cost too much performance. */
		page_update_max_trx_id(block,
				       buf_block_get_page_zip(block),
				       thr_get_trx(thr)->id, mtr);
		err = DB_SUCCESS;
5095
	}
5096 5097

	return(err);
5098 5099 5100
}

/*********************************************************************//**
5101 5102
Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record.
5103
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5104 5105 5106 5107 5108 5109 5110 5111 5112 5113 5114
dberr_t
lock_sec_rec_read_check_and_lock(
/*=============================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: secondary index */
5115
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5116 5117 5118 5119 5120
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5121
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5122 5123
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5124
{
5125 5126
	dberr_t	err;
	ulint	heap_no;
5127

5128 5129 5130 5131 5132
	ut_ad(!dict_index_is_clust(index));
	ut_ad(!dict_index_is_online_ddl(index));
	ut_ad(block->frame == page_align(rec));
	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
	ut_ad(rec_offs_validate(rec, index, offsets));
5133
	ut_ad(page_rec_is_leaf(rec));
5134
	ut_ad(mode == LOCK_X || mode == LOCK_S);
5135

5136 5137
	if ((flags & BTR_NO_LOCKING_FLAG)
	    || srv_read_only_mode
5138
	    || index->table->is_temporary()) {
5139

5140 5141
		return(DB_SUCCESS);
	}
5142

5143 5144
	const page_id_t id{block->page.id()};

5145
	ut_ad(!rec_is_metadata(rec, *index));
5146
	heap_no = page_rec_get_heap_no(rec);
5147

5148 5149 5150
	/* Some transaction may have an implicit x-lock on the record only
	if the max trx id for the page >= min trx id for the trx list or a
	database recovery is running. */
5151

5152
	if (!page_rec_is_supremum(rec)
5153
	    && page_get_max_trx_id(block->frame) >= trx_sys.get_min_trx_id()
5154
	    && lock_rec_convert_impl_to_expl(thr_get_trx(thr), id, rec,
5155 5156 5157
					     index, offsets)) {
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
5158
	}
5159

5160 5161 5162 5163 5164 5165 5166 5167 5168 5169
#ifdef WITH_WSREP
	trx_t *trx= thr_get_trx(thr);
	/* If transaction scanning an unique secondary key is wsrep
	high priority thread (brute force) this scanning may involve
	GAP-locking in the index. As this locking happens also when
	applying replication events in high priority applier threads,
	there is a probability for lock conflicts between two wsrep
	high priority threads. To avoid this GAP-locking we mark that
	this transaction is using unique key scan here. */
	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
Marko Mäkelä's avatar
Marko Mäkelä committed
5170
		trx->wsrep = 3;
5171
#endif /* WITH_WSREP */
5172

5173
	err = lock_rec_lock(false, gap_mode | mode,
5174
			    block, heap_no, index, thr);
5175

5176
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
5177
	if (trx->wsrep == 3) trx->wsrep = 1;
5178
#endif /* WITH_WSREP */
5179

5180
	ut_ad(lock_rec_queue_validate(false, id, rec, index, offsets));
5181

5182
	return(err);
5183 5184 5185
}

/*********************************************************************//**
5186 5187 5188 5189 5190 5191
Checks if locks of other transactions prevent an immediate read, or passing
over by a read cursor, of a clustered index record. If they do, first tests
if the query thread should anyway be suspended for some reason; if not, then
puts the transaction and the query thread to the lock wait state and inserts a
waiting request for a record lock to the lock queue. Sets the requested mode
lock on the record.
5192
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5193 5194 5195 5196 5197 5198 5199 5200 5201 5202 5203
dberr_t
lock_clust_rec_read_check_and_lock(
/*===============================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: clustered index */
5204
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5205 5206 5207 5208 5209
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5210
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5211 5212
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5213
{
5214 5215
	dberr_t	err;
	ulint	heap_no;
5216

5217 5218 5219 5220 5221 5222
	ut_ad(dict_index_is_clust(index));
	ut_ad(block->frame == page_align(rec));
	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
	ut_ad(gap_mode == LOCK_ORDINARY || gap_mode == LOCK_GAP
	      || gap_mode == LOCK_REC_NOT_GAP);
	ut_ad(rec_offs_validate(rec, index, offsets));
5223
	ut_ad(page_rec_is_leaf(rec));
5224
	ut_ad(!rec_is_metadata(rec, *index));
5225

5226 5227
	if ((flags & BTR_NO_LOCKING_FLAG)
	    || srv_read_only_mode
5228
	    || index->table->is_temporary()) {
5229

5230
		return(DB_SUCCESS);
5231 5232
	}

5233 5234
	const page_id_t id{block->page.id()};

5235
	heap_no = page_rec_get_heap_no(rec);
5236

5237
	if (heap_no != PAGE_HEAP_NO_SUPREMUM
5238
	    && lock_rec_convert_impl_to_expl(thr_get_trx(thr), id, rec,
5239 5240 5241
					     index, offsets)) {
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
5242
	}
5243

5244
	err = lock_rec_lock(false, gap_mode | mode,
5245
			    block, heap_no, index, thr);
5246

5247
	ut_ad(lock_rec_queue_validate(false, id, rec, index, offsets));
5248

5249
	DEBUG_SYNC_C("after_lock_clust_rec_read_check_and_lock");
5250

5251 5252
	return(err);
}
5253
/*********************************************************************//**
5254 5255 5256 5257 5258 5259 5260 5261
Checks if locks of other transactions prevent an immediate read, or passing
over by a read cursor, of a clustered index record. If they do, first tests
if the query thread should anyway be suspended for some reason; if not, then
puts the transaction and the query thread to the lock wait state and inserts a
waiting request for a record lock to the lock queue. Sets the requested mode
lock on the record. This is an alternative version of
lock_clust_rec_read_check_and_lock() that does not require the parameter
"offsets".
5262
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5263
dberr_t
5264 5265 5266 5267 5268 5269 5270 5271 5272 5273 5274 5275 5276 5277 5278
lock_clust_rec_read_check_and_lock_alt(
/*===================================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: clustered index */
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5279
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5280 5281
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5282
{
5283
	mem_heap_t*	tmp_heap	= NULL;
5284 5285
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*	offsets		= offsets_;
5286
	dberr_t		err;
5287
	rec_offs_init(offsets_);
5288

5289 5290
	ut_ad(page_rec_is_leaf(rec));
	offsets = rec_get_offsets(rec, index, offsets, true,
5291 5292 5293 5294 5295 5296
				  ULINT_UNDEFINED, &tmp_heap);
	err = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
						 offsets, mode, gap_mode, thr);
	if (tmp_heap) {
		mem_heap_free(tmp_heap);
	}
5297

5298 5299
	if (err == DB_SUCCESS_LOCKED_REC) {
		err = DB_SUCCESS;
5300 5301
	}

5302 5303
	return(err);
}
5304

5305 5306 5307 5308 5309 5310 5311 5312 5313 5314
/*******************************************************************//**
Check if a transaction holds any autoinc locks.
@return TRUE if the transaction holds any AUTOINC locks. */
static
ibool
lock_trx_holds_autoinc_locks(
/*=========================*/
	const trx_t*	trx)		/*!< in: transaction */
{
	ut_a(trx->autoinc_locks != NULL);
5315

5316 5317
	return(!ib_vector_is_empty(trx->autoinc_locks));
}
5318

5319
/** Release all AUTO_INCREMENT locks of the transaction. */
5320
static void lock_release_autoinc_locks(trx_t *trx)
5321
{
5322
  {
5323 5324 5325 5326 5327 5328 5329 5330 5331 5332 5333 5334 5335 5336 5337 5338 5339
    LockMutexGuard g{SRW_LOCK_CALL};
    mysql_mutex_lock(&lock_sys.wait_mutex);
    trx->mutex_lock();
    auto autoinc_locks= trx->autoinc_locks;
    ut_a(autoinc_locks);

    /* We release the locks in the reverse order. This is to avoid
    searching the vector for the element to delete at the lower level.
    See (lock_table_remove_low()) for details. */
    while (ulint size= ib_vector_size(autoinc_locks))
    {
      lock_t *lock= *static_cast<lock_t**>
        (ib_vector_get(autoinc_locks, size - 1));
      ut_ad(lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE));
      lock_table_dequeue(lock, true);
      lock_trx_table_locks_remove(lock);
    }
5340
  }
5341 5342
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  trx->mutex_unlock();
5343
}
5344

5345
/** Cancel a waiting lock request and release possibly waiting transactions */
5346
static void lock_cancel_waiting_and_release(lock_t *lock)
5347
{
5348
  lock_sys.assert_locked(*lock);
5349 5350
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  trx_t *trx= lock->trx;
5351
  trx->mutex_lock();
5352
  ut_ad(trx->state == TRX_STATE_ACTIVE);
5353

5354
  if (!lock->is_table())
5355
    lock_rec_dequeue_from_page(lock, true);
5356 5357
  else
  {
5358 5359 5360 5361 5362
    if (lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE))
    {
      ut_ad(trx->autoinc_locks);
      ib_vector_remove(trx->autoinc_locks, lock);
    }
5363
    lock_table_dequeue(lock, true);
5364 5365 5366
    /* Remove the lock from table lock vector too. */
    lock_trx_table_locks_remove(lock);
  }
5367

5368 5369
  /* Reset the wait flag and the back pointer to lock in trx. */
  lock_reset_lock_and_trx_wait(lock);
5370

5371
  lock_wait_end(trx);
5372 5373 5374 5375 5376
  trx->mutex_unlock();
}

/** Cancel a waiting lock request.
@param lock   waiting lock request
5377 5378 5379 5380 5381 5382
@param trx    active transaction
@param check_victim  whether to check trx->lock.was_chosen_as_deadlock_victim
@retval DB_SUCCESS    if no lock existed
@retval DB_DEADLOCK   if trx->lock.was_chosen_as_deadlock_victim was set
@retval DB_LOCK_WAIT  if the lock was canceled */
dberr_t lock_sys_t::cancel(trx_t *trx, lock_t *lock, bool check_victim)
5383 5384 5385 5386
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  ut_ad(trx->lock.wait_lock == lock);
  ut_ad(trx->state == TRX_STATE_ACTIVE);
5387
  dberr_t err= DB_SUCCESS;
5388 5389 5390

  if (lock->is_table())
  {
5391 5392 5393 5394 5395 5396 5397 5398 5399 5400 5401 5402 5403 5404 5405 5406 5407 5408 5409 5410 5411 5412 5413 5414 5415 5416 5417 5418 5419
    if (!lock_sys.rd_lock_try())
    {
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.rd_lock(SRW_LOCK_CALL);
      mysql_mutex_lock(&lock_sys.wait_mutex);
      lock= trx->lock.wait_lock;
      if (!lock);
      else if (check_victim && trx->lock.was_chosen_as_deadlock_victim)
        err= DB_DEADLOCK;
      else
        goto resolve_table_lock;
    }
    else
    {
resolve_table_lock:
      dict_table_t *table= lock->un_member.tab_lock.table;
      table->lock_mutex_lock();
      if (lock->is_waiting())
        lock_cancel_waiting_and_release(lock);
      table->lock_mutex_unlock();
      /* Even if lock->is_waiting() did not hold above, we must return
      DB_LOCK_WAIT, or otherwise optimistic parallel replication could
      occasionally hang. Potentially affected tests:
      rpl.rpl_parallel_optimistic
      rpl.rpl_parallel_optimistic_nobinlog
      rpl.rpl_parallel_optimistic_xa_lsu_off */
      err= DB_LOCK_WAIT;
    }
    lock_sys.rd_unlock();
5420 5421 5422
  }
  else
  {
5423 5424 5425 5426 5427 5428 5429 5430 5431 5432 5433 5434 5435 5436 5437 5438 5439 5440 5441 5442 5443 5444 5445 5446 5447 5448 5449 5450
    /* To prevent the record lock from being moved between pages
    during a page split or merge, we must hold exclusive lock_sys.latch. */
    if (!lock_sys.wr_lock_try())
    {
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.wr_lock(SRW_LOCK_CALL);
      mysql_mutex_lock(&lock_sys.wait_mutex);
      lock= trx->lock.wait_lock;
      if (!lock);
      else if (check_victim && trx->lock.was_chosen_as_deadlock_victim)
        err= DB_DEADLOCK;
      else
        goto resolve_record_lock;
    }
    else
    {
resolve_record_lock:
      if (lock->is_waiting())
        lock_cancel_waiting_and_release(lock);
      /* Even if lock->is_waiting() did not hold above, we must return
      DB_LOCK_WAIT, or otherwise optimistic parallel replication could
      occasionally hang. Potentially affected tests:
      rpl.rpl_parallel_optimistic
      rpl.rpl_parallel_optimistic_nobinlog
      rpl.rpl_parallel_optimistic_xa_lsu_off */
      err= DB_LOCK_WAIT;
    }
    lock_sys.wr_unlock();
5451
  }
5452 5453

  return err;
5454 5455 5456 5457 5458 5459 5460 5461 5462
}

/** Cancel a waiting lock request (if any) when killing a transaction */
void lock_sys_t::cancel(trx_t *trx)
{
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (lock_t *lock= trx->lock.wait_lock)
  {
    trx->error_state= DB_INTERRUPTED;
5463
    cancel(trx, lock, false);
5464 5465 5466
  }
  lock_sys.deadlock_check();
  mysql_mutex_unlock(&lock_sys.wait_mutex);
5467 5468 5469 5470 5471 5472 5473 5474 5475 5476 5477
}

/*********************************************************************//**
Unlocks AUTO_INC type locks that were possibly reserved by a trx. This
function should be called at the the end of an SQL statement, by the
connection thread that owns the transaction (trx->mysql_thd). */
void
lock_unlock_table_autoinc(
/*======================*/
	trx_t*	trx)	/*!< in/out: transaction */
{
5478
	lock_sys.assert_unlocked();
5479
	ut_ad(!trx->mutex_is_owner());
5480 5481 5482 5483
	ut_ad(!trx->lock.wait_lock);

	/* This can be invoked on NOT_STARTED, ACTIVE, PREPARED,
	but not COMMITTED transactions. */
5484

5485 5486
	ut_ad(trx_state_eq(trx, TRX_STATE_NOT_STARTED)
	      || !trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY));
5487

5488 5489 5490
	/* This function is invoked for a running transaction by the
	thread that is serving the transaction. Therefore it is not
	necessary to hold trx->mutex here. */
5491

5492
	if (lock_trx_holds_autoinc_locks(trx)) {
5493
		lock_release_autoinc_locks(trx);
5494 5495 5496
	}
}

5497 5498 5499 5500 5501 5502 5503 5504
/** Handle a pending lock wait (DB_LOCK_WAIT) in a semi-consistent read
while holding a clustered index leaf page latch.
@param trx           transaction that is or was waiting for a lock
@retval DB_SUCCESS   if the lock was granted
@retval DB_DEADLOCK  if the transaction must be aborted due to a deadlock
@retval DB_LOCK_WAIT if a lock wait would be necessary; the pending
                     lock request was released */
dberr_t lock_trx_handle_wait(trx_t *trx)
5505
{
5506 5507 5508 5509
  if (trx->lock.was_chosen_as_deadlock_victim)
    return DB_DEADLOCK;
  if (!trx->lock.wait_lock)
    return DB_SUCCESS;
5510 5511 5512 5513 5514
  dberr_t err= DB_SUCCESS;
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (trx->lock.was_chosen_as_deadlock_victim)
    err= DB_DEADLOCK;
  else if (lock_t *wait_lock= trx->lock.wait_lock)
5515
    err= lock_sys_t::cancel(trx, wait_lock, true);
5516
  lock_sys.deadlock_check();
5517
  mysql_mutex_unlock(&lock_sys.wait_mutex);
5518
  return err;
5519 5520
}

5521
#ifdef UNIV_DEBUG
5522 5523 5524 5525 5526 5527 5528 5529 5530 5531
/**
  Do an exhaustive check for any locks (table or rec) against the table.

  @param[in]  table  check if there are any locks held on records in this table
                     or on the table itself
*/

static my_bool lock_table_locks_lookup(rw_trx_hash_element_t *element,
                                       const dict_table_t *table)
{
5532
  lock_sys.assert_locked();
5533
  mysql_mutex_lock(&element->mutex);
5534 5535
  if (element->trx)
  {
5536
    element->trx->mutex_lock();
5537
    check_trx_state(element->trx);
Marko Mäkelä's avatar
Marko Mäkelä committed
5538
    if (element->trx->state != TRX_STATE_COMMITTED_IN_MEMORY)
5539
    {
Marko Mäkelä's avatar
Marko Mäkelä committed
5540 5541 5542
      for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
           lock != NULL;
           lock= UT_LIST_GET_NEXT(trx_locks, lock))
5543
      {
Marko Mäkelä's avatar
Marko Mäkelä committed
5544
        ut_ad(lock->trx == element->trx);
5545
        if (!lock->is_table())
Marko Mäkelä's avatar
Marko Mäkelä committed
5546
        {
Marko Mäkelä's avatar
Marko Mäkelä committed
5547
          ut_ad(lock->index->online_status != ONLINE_INDEX_CREATION ||
Marko Mäkelä's avatar
Marko Mäkelä committed
5548 5549 5550 5551 5552
                lock->index->is_primary());
          ut_ad(lock->index->table != table);
        }
        else
          ut_ad(lock->un_member.tab_lock.table != table);
5553 5554
      }
    }
5555
    element->trx->mutex_unlock();
5556
  }
5557
  mysql_mutex_unlock(&element->mutex);
5558
  return 0;
5559
}
5560
#endif /* UNIV_DEBUG */
5561

5562
/** Check if there are any locks on a table.
5563
@return true if table has either table or record locks. */
5564
bool lock_table_has_locks(dict_table_t *table)
5565
{
5566 5567 5568 5569 5570 5571 5572
  if (table->n_rec_locks)
    return true;
  table->lock_mutex_lock();
  auto len= UT_LIST_GET_LEN(table->locks);
  table->lock_mutex_unlock();
  if (len)
    return true;
5573
#ifdef UNIV_DEBUG
5574 5575 5576 5577 5578
  {
    LockMutexGuard g{SRW_LOCK_CALL};
    trx_sys.rw_trx_hash.iterate(lock_table_locks_lookup,
                                const_cast<const dict_table_t*>(table));
  }
5579
#endif /* UNIV_DEBUG */
5580
  return false;
5581
}
5582

5583 5584 5585 5586 5587 5588 5589 5590 5591
/*******************************************************************//**
Initialise the table lock list. */
void
lock_table_lock_list_init(
/*======================*/
	table_lock_list_t*	lock_list)	/*!< List to initialise */
{
	UT_LIST_INIT(*lock_list, &lock_table_t::locks);
}
5592

5593 5594 5595 5596 5597 5598 5599 5600 5601 5602 5603 5604
#ifdef UNIV_DEBUG
/*******************************************************************//**
Check if the transaction holds any locks on the sys tables
or its records.
@return the strongest lock found on any sys table or 0 for none */
const lock_t*
lock_trx_has_sys_table_locks(
/*=========================*/
	const trx_t*	trx)	/*!< in: transaction to check */
{
	const lock_t*	strongest_lock = 0;
	lock_mode	strongest = LOCK_NONE;
5605

5606
	LockMutexGuard g{SRW_LOCK_CALL};
5607

5608 5609
	const lock_list::const_iterator end = trx->lock.table_locks.end();
	lock_list::const_iterator it = trx->lock.table_locks.begin();
5610

5611
	/* Find a valid mode. Note: ib_vector_size() can be 0. */
5612

5613 5614 5615 5616 5617 5618
	for (/* No op */; it != end; ++it) {
		const lock_t*	lock = *it;

		if (lock != NULL
		    && dict_is_sys_table(lock->un_member.tab_lock.table->id)) {

5619
			strongest = lock->mode();
5620 5621 5622 5623
			ut_ad(strongest != LOCK_NONE);
			strongest_lock = lock;
			break;
		}
5624 5625
	}

5626 5627
	if (strongest == LOCK_NONE) {
		return(NULL);
5628 5629
	}

5630 5631
	for (/* No op */; it != end; ++it) {
		const lock_t*	lock = *it;
5632

5633 5634 5635
		if (lock == NULL) {
			continue;
		}
5636

5637
		ut_ad(trx == lock->trx);
5638 5639
		ut_ad(lock->is_table());
		ut_ad(lock->un_member.tab_lock.table);
5640

5641
		lock_mode mode = lock->mode();
5642

5643 5644
		if (dict_is_sys_table(lock->un_member.tab_lock.table->id)
		    && lock_mode_stronger_or_eq(mode, strongest)) {
5645

5646 5647 5648 5649
			strongest = mode;
			strongest_lock = lock;
		}
	}
5650

5651
	return(strongest_lock);
5652 5653
}

5654 5655 5656
/** Check if the transaction holds an explicit exclusive lock on a record.
@param[in]	trx	transaction
@param[in]	table	table
5657
@param[in]	id	leaf page identifier
5658 5659
@param[in]	heap_no	heap number identifying the record
@return whether an explicit X-lock is held */
5660 5661
bool lock_trx_has_expl_x_lock(const trx_t &trx, const dict_table_t &table,
                              page_id_t id, ulint heap_no)
5662
{
5663 5664
  ut_ad(heap_no > PAGE_HEAP_NO_SUPREMUM);
  ut_ad(lock_table_has(&trx, &table, LOCK_IX));
5665 5666
  if (!lock_table_has(&trx, &table, LOCK_X))
  {
5667
    LockGuard g{lock_sys.rec_hash, id};
5668 5669
    ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
                            g.cell(), id, heap_no, &trx));
5670
  }
5671
  return true;
5672
}
5673
#endif /* UNIV_DEBUG */
5674

5675
namespace Deadlock
5676
{
5677 5678 5679 5680 5681 5682
  /** rewind(3) the file used for storing the latest detected deadlock and
  print a heading message to stderr if printing of all deadlocks to stderr
  is enabled. */
  static void start_print()
  {
    lock_sys.assert_locked();
5683

5684 5685
    rewind(lock_latest_err_file);
    ut_print_timestamp(lock_latest_err_file);
5686

5687 5688 5689 5690
    if (srv_print_all_deadlocks)
      ib::info() << "Transactions deadlock detected,"
                    " dumping detailed information.";
  }
5691

5692 5693 5694
  /** Print a message to the deadlock file and possibly to stderr.
  @param msg message to print */
  static void print(const char *msg)
5695
  {
5696 5697 5698
    fputs(msg, lock_latest_err_file);
    if (srv_print_all_deadlocks)
      ib::info() << msg;
5699
  }
5700

5701 5702 5703 5704 5705
  /** Print transaction data to the deadlock file and possibly to stderr.
  @param trx transaction */
  static void print(const trx_t &trx)
  {
    lock_sys.assert_locked();
5706

5707 5708 5709
    ulint n_rec_locks= trx.lock.n_rec_locks;
    ulint n_trx_locks= UT_LIST_GET_LEN(trx.lock.trx_locks);
    ulint heap_size= mem_heap_get_size(trx.lock.lock_heap);
5710

5711 5712
    trx_print_low(lock_latest_err_file, &trx, 3000,
                  n_rec_locks, n_trx_locks, heap_size);
5713

5714 5715 5716
    if (srv_print_all_deadlocks)
      trx_print_low(stderr, &trx, 3000, n_rec_locks, n_trx_locks, heap_size);
  }
5717

5718 5719 5720 5721 5722
  /** Print lock data to the deadlock file and possibly to stderr.
  @param lock record or table type lock */
  static void print(const lock_t &lock)
  {
    lock_sys.assert_locked();
5723

5724 5725 5726 5727
    if (!lock.is_table())
    {
      mtr_t mtr;
      lock_rec_print(lock_latest_err_file, &lock, mtr);
5728

5729 5730 5731 5732 5733 5734
      if (srv_print_all_deadlocks)
        lock_rec_print(stderr, &lock, mtr);
    }
    else
    {
      lock_table_print(lock_latest_err_file, &lock);
5735

5736 5737 5738 5739
      if (srv_print_all_deadlocks)
        lock_table_print(stderr, &lock);
    }
  }
5740

5741 5742 5743 5744 5745 5746
  ATTRIBUTE_COLD
  /** Report a deadlock (cycle in the waits-for graph).
  @param trx        transaction waiting for a lock in this thread
  @param current_trx whether trx belongs to the current thread
  @return the transaction to be rolled back (unless one was committed already)
  @return nullptr if no deadlock */
5747
  static trx_t *report(trx_t *const trx, bool current_trx)
5748
  {
5749 5750
    mysql_mutex_assert_owner(&lock_sys.wait_mutex);
    ut_ad(lock_sys.is_writer() == !current_trx);
5751

5752 5753
    /* Normally, trx should be a direct part of the deadlock
    cycle. However, if innodb_deadlock_detect had been OFF in the
5754 5755 5756 5757
    past, or if current_trx=false, trx may be waiting for a lock that
    is held by a participant of a pre-existing deadlock, without being
    part of the deadlock itself. That is, the path to the deadlock may be
    P-shaped instead of O-shaped, with trx being at the foot of the P.
5758 5759 5760 5761 5762 5763 5764

    We will process the entire path leading to a cycle, and we will
    choose the victim (to be aborted) among the cycle. */

    static const char rollback_msg[]= "*** WE ROLL BACK TRANSACTION (%u)\n";
    char buf[9 + sizeof rollback_msg];

5765 5766 5767 5768
    /* If current_trx=true, trx is owned by this thread, and we can
    safely invoke these without holding trx->mutex or lock_sys.latch.
    If current_trx=false, a concurrent commit is protected by both
    lock_sys.latch and lock_sys.wait_mutex. */
5769 5770 5771 5772 5773 5774
    const undo_no_t trx_weight= TRX_WEIGHT(trx) |
      (trx->mysql_thd && thd_has_edited_nontrans_tables(trx->mysql_thd)
       ? 1ULL << 63 : 0);
    trx_t *victim= nullptr;
    undo_no_t victim_weight= ~0ULL;
    unsigned victim_pos= 0, trx_pos= 0;
5775 5776

    if (current_trx && !lock_sys.wr_lock_try())
5777
    {
5778 5779
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.wr_lock(SRW_LOCK_CALL);
5780
      mysql_mutex_lock(&lock_sys.wait_mutex);
5781 5782 5783 5784
    }

    {
      unsigned l= 0;
5785 5786 5787 5788
      /* Now that we are holding lock_sys.wait_mutex again, check
      whether a cycle still exists. */
      trx_t *cycle= find_cycle(trx);
      if (!cycle)
5789
        goto func_exit; /* One of the transactions was already aborted. */
5790 5791 5792 5793 5794 5795 5796 5797 5798 5799 5800 5801 5802 5803 5804 5805 5806
      for (trx_t *next= cycle;;)
      {
        next= next->lock.wait_trx;
        const undo_no_t next_weight= TRX_WEIGHT(next) |
          (next->mysql_thd && thd_has_edited_nontrans_tables(next->mysql_thd)
           ? 1ULL << 63 : 0);
        if (next_weight < victim_weight)
        {
          victim_weight= next_weight;
          victim= next;
          victim_pos= l;
        }
        if (next == victim)
          trx_pos= l;
        if (next == cycle)
          break;
      }
5807

5808 5809 5810 5811 5812
      if (trx_pos && trx_weight == victim_weight)
      {
        victim= trx;
        victim_pos= trx_pos;
      }
5813

5814 5815 5816 5817 5818 5819 5820 5821
      /* Finally, display the deadlock */
      switch (const auto r= static_cast<enum report>(innodb_deadlock_report)) {
      case REPORT_OFF:
        break;
      case REPORT_BASIC:
      case REPORT_FULL:
        start_print();
        l= 0;
5822

5823 5824 5825 5826 5827 5828 5829 5830 5831 5832 5833 5834
        for (trx_t *next= cycle;;)
        {
          next= next->lock.wait_trx;
          ut_ad(next);
          ut_ad(next->state == TRX_STATE_ACTIVE);
          const lock_t *wait_lock= next->lock.wait_lock;
          ut_ad(wait_lock);
          snprintf(buf, sizeof buf, "\n*** (%u) TRANSACTION:\n", ++l);
          print(buf);
          print(*next);
          print("*** WAITING FOR THIS LOCK TO BE GRANTED:\n");
          print(*wait_lock);
5835
          if (r == REPORT_BASIC);
5836 5837 5838 5839 5840 5841 5842 5843 5844 5845 5846 5847 5848 5849 5850
          else if (wait_lock->is_table())
          {
            if (const lock_t *lock=
                UT_LIST_GET_FIRST(wait_lock->un_member.tab_lock.table->locks))
            {
              ut_ad(!lock->is_waiting());
              print("*** CONFLICTING WITH:\n");
              do
                print(*lock);
              while ((lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) &&
                     !lock->is_waiting());
            }
            else
              ut_ad("no conflicting table lock found" == 0);
          }
5851
          else
5852
          {
5853 5854 5855 5856 5857 5858 5859 5860 5861 5862 5863 5864 5865 5866 5867 5868 5869 5870
            const page_id_t id{wait_lock->un_member.rec_lock.page_id};
            hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                                 ? lock_sys.prdt_hash : lock_sys.rec_hash).
              cell_get(id.fold());
            if (const lock_t *lock= lock_sys_t::get_first(cell, id))
            {
              const ulint heap_no= lock_rec_find_set_bit(wait_lock);
              if (!lock_rec_get_nth_bit(lock, heap_no))
                lock= lock_rec_get_next_const(heap_no, lock);
              ut_ad(!lock->is_waiting());
              print("*** CONFLICTING WITH:\n");
              do
                print(*lock);
              while ((lock= lock_rec_get_next_const(heap_no, lock)) &&
                     !lock->is_waiting());
            }
            else
              ut_ad("no conflicting record lock found" == 0);
5871 5872 5873 5874 5875 5876 5877
          }
          if (next == cycle)
            break;
        }
        snprintf(buf, sizeof buf, rollback_msg, victim_pos);
        print(buf);
      }
5878

5879
      ut_ad(victim->state == TRX_STATE_ACTIVE);
5880

5881 5882
      victim->lock.was_chosen_as_deadlock_victim= true;
      lock_cancel_waiting_and_release(victim->lock.wait_lock);
5883 5884 5885 5886 5887
#ifdef WITH_WSREP
      if (victim->is_wsrep() && wsrep_thd_is_SR(victim->mysql_thd))
        wsrep_handle_SR_rollback(trx->mysql_thd, victim->mysql_thd);
#endif
    }
5888

5889 5890 5891
func_exit:
    if (current_trx)
      lock_sys.wr_unlock();
5892 5893
    return victim;
  }
5894 5895
}

5896 5897 5898 5899 5900
/** Check if a lock request results in a deadlock.
Resolve a deadlock by choosing a transaction that will be rolled back.
@param trx    transaction requesting a lock
@return whether trx must report DB_DEADLOCK */
static bool Deadlock::check_and_resolve(trx_t *trx)
5901
{
5902 5903
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);

5904
  ut_ad(!trx->mutex_is_owner());
5905 5906
  ut_ad(trx->state == TRX_STATE_ACTIVE);
  ut_ad(!srv_read_only_mode);
5907

5908 5909
  if (!innodb_deadlock_detect)
    return false;
5910

5911 5912 5913 5914 5915 5916
  if (UNIV_LIKELY_NULL(find_cycle(trx)) && report(trx, true) == trx)
    return true;

  if (UNIV_LIKELY(!trx->lock.was_chosen_as_deadlock_victim))
    return false;

5917
  if (lock_t *wait_lock= trx->lock.wait_lock)
5918
    lock_sys_t::cancel(trx, wait_lock, false);
5919

5920
  lock_sys.deadlock_check();
5921
  return true;
5922
}
5923

5924 5925
/** Check for deadlocks while holding only lock_sys.wait_mutex. */
void lock_sys_t::deadlock_check()
5926
{
5927 5928
  ut_ad(!is_writer());
  mysql_mutex_assert_owner(&wait_mutex);
5929
  bool acquired= false;
5930

5931 5932
  if (Deadlock::to_be_checked)
  {
5933
    for (;;)
5934
    {
5935
      auto i= Deadlock::to_check.begin();
5936 5937
      if (i == Deadlock::to_check.end())
        break;
5938
      if (!acquired)
5939
      {
5940 5941
        acquired= wr_lock_try();
        if (!acquired)
5942
        {
5943 5944
          acquired= true;
          mysql_mutex_unlock(&wait_mutex);
5945
          lock_sys.wr_lock(SRW_LOCK_CALL);
5946
          mysql_mutex_lock(&wait_mutex);
5947 5948 5949
          continue;
        }
      }
5950 5951 5952 5953
      trx_t *trx= *i;
      Deadlock::to_check.erase(i);
      if (Deadlock::find_cycle(trx))
        Deadlock::report(trx, false);
5954
    }
5955
    Deadlock::to_be_checked= false;
5956
  }
5957
  ut_ad(Deadlock::to_check.empty());
5958
  if (acquired)
5959
    wr_unlock();
5960 5961
}

5962

5963 5964 5965
/*************************************************************//**
Updates the lock table when a page is split and merged to
two pages. */
5966
UNIV_INTERN
5967 5968 5969 5970 5971 5972
void
lock_update_split_and_merge(
	const buf_block_t* left_block,	/*!< in: left page to which merged */
	const rec_t* orig_pred,		/*!< in: original predecessor of
					supremum on the left page before merge*/
	const buf_block_t* right_block)	/*!< in: right page from which merged */
5973
{
5974 5975 5976
  ut_ad(page_is_leaf(left_block->frame));
  ut_ad(page_is_leaf(right_block->frame));
  ut_ad(page_align(orig_pred) == left_block->frame);
5977

5978 5979
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
5980

5981 5982 5983
  LockMultiGuard g{lock_sys.rec_hash, l, r};
  const rec_t *left_next_rec= page_rec_get_next_const(orig_pred);
  ut_ad(!page_rec_is_metadata(left_next_rec));
5984

5985 5986
  /* Inherit the locks on the supremum of the left page to the
  first record which was moved from the right page */
5987
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell1(), l, left_block->frame,
5988 5989
                          page_rec_get_heap_no(left_next_rec),
                          PAGE_HEAP_NO_SUPREMUM);
5990

5991 5992
  /* Reset the locks on the supremum of the left page,
  releasing waiting transactions */
5993
  lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
5994

5995 5996
  /* Inherit the locks to the supremum of the left page from the
  successor of the infimum on the right page */
5997 5998
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->frame,
                          PAGE_HEAP_NO_SUPREMUM,
5999
                          lock_get_min_heap_no(right_block));
6000
}