lock0lock.cc 180 KB
Newer Older
1 2
/*****************************************************************************

3
Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
4
Copyright (c) 2014, 2021, MariaDB Corporation.
5 6 7 8 9 10 11 12 13 14 15

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
Vicențiu Ciorbaru's avatar
Vicențiu Ciorbaru committed
16
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 18 19 20 21 22 23 24 25 26 27 28

*****************************************************************************/

/**************************************************//**
@file lock/lock0lock.cc
The transaction lock system

Created 5/7/1996 Heikki Tuuri
*******************************************************/

#define LOCK_MODULE_IMPLEMENTATION

29
#include "univ.i"
30

31
#include <mysql/service_thd_error_context.h>
32
#include <mysql/service_thd_wait.h>
33
#include <sql_class.h>
34

35 36
#include "lock0lock.h"
#include "lock0priv.h"
37
#include "dict0mem.h"
38 39 40
#include "trx0purge.h"
#include "trx0sys.h"
#include "ut0vec.h"
41
#include "btr0cur.h"
42 43
#include "row0sel.h"
#include "row0mysql.h"
44
#include "row0vers.h"
45
#include "pars0pars.h"
46
#include "srv0mon.h"
47

48 49
#include <set>

50
#ifdef WITH_WSREP
51
#include <mysql/service_wsrep.h>
52
#include <debug_sync.h>
53
#endif /* WITH_WSREP */
54

55
/** The value of innodb_deadlock_detect */
56 57 58
my_bool innodb_deadlock_detect;
/** The value of innodb_deadlock_report */
ulong innodb_deadlock_report;
59

60
#ifdef HAVE_REPLICATION
61 62
extern "C" void thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd);
extern "C" int thd_need_wait_reports(const MYSQL_THD thd);
63
extern "C" int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
64
#endif
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
/** Functor for accessing the embedded node within a table lock. */
struct TableLockGetNode
{
  ut_list_node<lock_t> &operator()(lock_t &elem)
  { return(elem.un_member.tab_lock.locks); }
};

/** Create the hash table.
@param n  the lower bound of n_cells */
void lock_sys_t::hash_table::create(ulint n)
{
  n_cells= ut_find_prime(n);
  const size_t size= pad(n_cells) * sizeof *array;
  void* v= aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
  memset(v, 0, size);
  array= static_cast<hash_cell_t*>(v);
}

/** Resize the hash table.
@param n  the lower bound of n_cells */
void lock_sys_t::hash_table::resize(ulint n)
{
  ut_ad(lock_sys.is_writer());
  ulint new_n_cells= ut_find_prime(n);
  const size_t size= pad(new_n_cells) * sizeof *array;
  void* v= aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
  memset(v, 0, size);
  hash_cell_t *new_array= static_cast<hash_cell_t*>(v);

  for (auto i= pad(n_cells); i--; )
  {
    if (lock_t *lock= static_cast<lock_t*>(array[i].node))
    {
99 100
      /* all hash_latch must vacated */
      ut_ad(i % (ELEMENTS_PER_LATCH + LATCH) >= LATCH);
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
      do
      {
        ut_ad(!lock->is_table());
        hash_cell_t *c= calc_hash(lock->un_member.rec_lock.page_id.fold(),
                                  new_n_cells) + new_array;
        lock_t *next= lock->hash;
        lock->hash= nullptr;
        if (!c->node)
          c->node= lock;
        else if (!lock->is_waiting())
        {
          lock->hash= static_cast<lock_t*>(c->node);
          c->node= lock;
        }
        else
        {
          lock_t *next= static_cast<lock_t*>(c->node);
          while (next->hash)
            next= next->hash;
          next->hash= lock;
        }
        lock= next;
      }
      while (lock);
    }
  }

128
  aligned_free(array);
129 130 131 132
  array= new_array;
  n_cells= new_n_cells;
}

133
#ifdef SUX_LOCK_GENERIC
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
void lock_sys_t::hash_latch::wait()
{
  pthread_mutex_lock(&lock_sys.hash_mutex);
  while (!write_trylock())
    pthread_cond_wait(&lock_sys.hash_cond, &lock_sys.hash_mutex);
  pthread_mutex_unlock(&lock_sys.hash_mutex);
}

void lock_sys_t::hash_latch::release()
{
  pthread_mutex_lock(&lock_sys.hash_mutex);
  write_unlock();
  pthread_cond_signal(&lock_sys.hash_cond);
  pthread_mutex_unlock(&lock_sys.hash_mutex);
}
#endif

#ifdef UNIV_DEBUG
/** Assert that a lock shard is exclusively latched by this thread */
void lock_sys_t::assert_locked(const lock_t &lock) const
{
  ut_ad(this == &lock_sys);
  if (is_writer())
    return;
  if (lock.is_table())
    assert_locked(*lock.un_member.tab_lock.table);
  else
    lock_sys.hash_get(lock.type_mode).
      assert_locked(lock.un_member.rec_lock.page_id);
}

/** Assert that a table lock shard is exclusively latched by this thread */
void lock_sys_t::assert_locked(const dict_table_t &table) const
{
  ut_ad(!table.is_temporary());

  const os_thread_id_t current_thread= os_thread_get_curr_id();
  if (writer.load(std::memory_order_relaxed) == current_thread)
    return;
  ut_ad(readers);
  ut_ad(table.lock_mutex_is_owner());
}

177
/** Assert that hash cell for page is exclusively latched by this thread */
178 179 180 181 182
void lock_sys_t::hash_table::assert_locked(const page_id_t id) const
{
  if (lock_sys.is_writer())
    return;
  ut_ad(lock_sys.readers);
183 184 185 186 187 188 189 190 191 192
  ut_ad(latch(cell_get(id.fold()))->is_locked());
}

/** Assert that a hash table cell is exclusively latched (by some thread) */
void lock_sys_t::assert_locked(const hash_cell_t &cell) const
{
  if (lock_sys.is_writer())
    return;
  ut_ad(lock_sys.readers);
  ut_ad(hash_table::latch(const_cast<hash_cell_t*>(&cell))->is_locked());
193 194 195 196 197 198 199
}
#endif

LockGuard::LockGuard(lock_sys_t::hash_table &hash, page_id_t id)
{
  const auto id_fold= id.fold();
  lock_sys.rd_lock(SRW_LOCK_CALL);
200 201
  cell_= hash.cell_get(id_fold);
  hash.latch(cell_)->acquire();
202 203 204 205 206 207 208 209
}

LockMultiGuard::LockMultiGuard(lock_sys_t::hash_table &hash,
                               const page_id_t id1, const page_id_t id2)
{
  ut_ad(id1.space() == id2.space());
  const auto id1_fold= id1.fold(), id2_fold= id2.fold();
  lock_sys.rd_lock(SRW_LOCK_CALL);
210 211 212 213
  cell1_= hash.cell_get(id1_fold);
  cell2_= hash.cell_get(id2_fold);

  auto latch1= hash.latch(cell1_), latch2= hash.latch(cell2_);
214 215 216 217 218 219 220 221 222
  if (latch1 > latch2)
    std::swap(latch1, latch2);
  latch1->acquire();
  if (latch1 != latch2)
    latch2->acquire();
}

LockMultiGuard::~LockMultiGuard()
{
223 224
  auto latch1= lock_sys_t::hash_table::latch(cell1_),
    latch2= lock_sys_t::hash_table::latch(cell2_);
225 226 227 228 229 230 231
  latch1->release();
  if (latch1 != latch2)
    latch2->release();
  /* Must be last, to avoid a race with lock_sys_t::hash_table::resize() */
  lock_sys.rd_unlock();
}

Marko Mäkelä's avatar
Marko Mäkelä committed
232
/** Pretty-print a table lock.
233 234
@param[in,out]	file	output stream
@param[in]	lock	table lock */
Marko Mäkelä's avatar
Marko Mäkelä committed
235
static void lock_table_print(FILE* file, const lock_t* lock);
236

Marko Mäkelä's avatar
Marko Mäkelä committed
237
/** Pretty-print a record lock.
238
@param[in,out]	file	output stream
Marko Mäkelä's avatar
Marko Mäkelä committed
239 240 241
@param[in]	lock	record lock
@param[in,out]	mtr	mini-transaction for accessing the record */
static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr);
242

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
namespace Deadlock
{
  /** Whether to_check may be nonempty */
  static Atomic_relaxed<bool> to_be_checked;
  /** Transactions to check for deadlock. Protected by lock_sys.wait_mutex. */
  static std::set<trx_t*> to_check;

  MY_ATTRIBUTE((nonnull, warn_unused_result))
  /** Check if a lock request results in a deadlock.
  Resolve a deadlock by choosing a transaction that will be rolled back.
  @param trx    transaction requesting a lock
  @return whether trx must report DB_DEADLOCK */
  static bool check_and_resolve(trx_t *trx);

  /** Quickly detect a deadlock using Brent's cycle detection algorithm.
  @param trx     transaction that is waiting for another transaction
  @return a transaction that is part of a cycle
  @retval nullptr if no cycle was found */
  inline trx_t *find_cycle(trx_t *trx)
  {
    mysql_mutex_assert_owner(&lock_sys.wait_mutex);
    trx_t *tortoise= trx, *hare= trx;
    for (unsigned power= 1, l= 1; (hare= hare->lock.wait_trx) != nullptr; l++)
    {
      if (tortoise == hare)
      {
        ut_ad(l > 1);
        lock_sys.deadlocks++;
        /* Note: Normally, trx should be part of any deadlock cycle
        that is found. However, if innodb_deadlock_detect=OFF had been
        in effect in the past, it is possible that trx will be waiting
        for a transaction that participates in a pre-existing deadlock
        cycle. In that case, our victim will not be trx. */
        return hare;
      }
      if (l == power)
      {
        /* The maximum concurrent number of TRX_STATE_ACTIVE transactions
        is TRX_RSEG_N_SLOTS * 128, or innodb_page_size / 16 * 128
        (default: 131,072, maximum: 524,288).
        Our maximum possible number of iterations should be twice that. */
        power<<= 1;
        l= 0;
        tortoise= hare;
      }
    }
    return nullptr;
  }
291 292
};

293
#ifdef UNIV_DEBUG
294 295
/** Validate the transactional locks. */
static void lock_validate();
296

297 298 299 300 301 302
/** Validate the record lock queues on a page.
@param block    buffer pool block
@param latched  whether the tablespace latch may be held
@return true if ok */
static bool lock_rec_validate_page(const buf_block_t *block, bool latched)
  MY_ATTRIBUTE((nonnull, warn_unused_result));
303 304 305
#endif /* UNIV_DEBUG */

/* The lock system */
306
lock_sys_t lock_sys;
307

308 309
/** Only created if !srv_read_only_mode. Protected by lock_sys.latch. */
static FILE *lock_latest_err_file;
310 311 312

/*********************************************************************//**
Reports that a transaction id is insensible, i.e., in the future. */
313
ATTRIBUTE_COLD
314 315 316 317 318 319
void
lock_report_trx_id_insanity(
/*========================*/
	trx_id_t	trx_id,		/*!< in: trx id */
	const rec_t*	rec,		/*!< in: user record */
	dict_index_t*	index,		/*!< in: index */
320
	const rec_offs*	offsets,	/*!< in: rec_get_offsets(rec, index) */
321
	trx_id_t	max_trx_id)	/*!< in: trx_sys.get_max_trx_id() */
322
{
323
	ut_ad(rec_offs_validate(rec, index, offsets));
324
	ut_ad(!rec_is_metadata(rec, *index));
325

326
	ib::error()
327
		<< "Transaction id " << ib::hex(trx_id)
328 329 330 331 332
		<< " associated with record" << rec_offsets_print(rec, offsets)
		<< " in index " << index->name
		<< " of table " << index->table->name
		<< " is greater than the global counter " << max_trx_id
		<< "! The table is corrupted.";
333 334 335 336
}

/*********************************************************************//**
Checks that a transaction id is sensible, i.e., not in the future.
337
@return true if ok */
338 339 340 341 342 343
bool
lock_check_trx_id_sanity(
/*=====================*/
	trx_id_t	trx_id,		/*!< in: trx id */
	const rec_t*	rec,		/*!< in: user record */
	dict_index_t*	index,		/*!< in: index */
344
	const rec_offs*	offsets)	/*!< in: rec_get_offsets(rec, index) */
345
{
346 347 348 349 350 351 352 353 354 355 356 357
  ut_ad(rec_offs_validate(rec, index, offsets));
  ut_ad(!rec_is_metadata(rec, *index));

  trx_id_t max_trx_id= trx_sys.get_max_trx_id();
  ut_ad(max_trx_id || srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN);

  if (UNIV_LIKELY(max_trx_id != 0) && UNIV_UNLIKELY(trx_id >= max_trx_id))
  {
    lock_report_trx_id_insanity(trx_id, rec, index, offsets, max_trx_id);
    return false;
  }
  return true;
358 359 360
}


361 362
/**
  Creates the lock system at database start.
363

364 365 366 367
  @param[in] n_cells number of slots in lock hash table
*/
void lock_sys_t::create(ulint n_cells)
{
368 369
  ut_ad(this == &lock_sys);
  ut_ad(!is_initialised());
370

371
  m_initialised= true;
372

373
  latch.SRW_LOCK_INIT(lock_latch_key);
374
  mysql_mutex_init(lock_wait_mutex_key, &wait_mutex, nullptr);
375
#ifdef SUX_LOCK_GENERIC
376 377 378
  pthread_mutex_init(&hash_mutex, nullptr);
  pthread_cond_init(&hash_cond, nullptr);
#endif
379

380 381 382
  rec_hash.create(n_cells);
  prdt_hash.create(n_cells);
  prdt_page_hash.create(n_cells);
383

384 385 386 387 388
  if (!srv_read_only_mode)
  {
    lock_latest_err_file= os_file_create_tmpfile();
    ut_a(lock_latest_err_file);
  }
389 390
}

391 392 393 394
#ifdef UNIV_PFS_RWLOCK
/** Acquire exclusive lock_sys.latch */
void lock_sys_t::wr_lock(const char *file, unsigned line)
{
395
  mysql_mutex_assert_not_owner(&wait_mutex);
396 397 398 399 400 401 402 403 404 405
  latch.wr_lock(file, line);
  ut_ad(!writer.exchange(os_thread_get_curr_id(), std::memory_order_relaxed));
}
/** Release exclusive lock_sys.latch */
void lock_sys_t::wr_unlock()
{
  ut_ad(writer.exchange(0, std::memory_order_relaxed) ==
        os_thread_get_curr_id());
  latch.wr_unlock();
}
406

407 408 409
/** Acquire shared lock_sys.latch */
void lock_sys_t::rd_lock(const char *file, unsigned line)
{
410
  mysql_mutex_assert_not_owner(&wait_mutex);
411 412 413 414 415 416 417 418 419 420 421 422 423
  latch.rd_lock(file, line);
  ut_ad(!writer.load(std::memory_order_relaxed));
  ut_d(readers.fetch_add(1, std::memory_order_relaxed));
}

/** Release shared lock_sys.latch */
void lock_sys_t::rd_unlock()
{
  ut_ad(!writer.load(std::memory_order_relaxed));
  ut_ad(readers.fetch_sub(1, std::memory_order_relaxed));
  latch.rd_unlock();
}
#endif
424

425 426 427 428 429 430
/**
  Resize the lock hash table.

  @param[in] n_cells number of slots in lock hash table
*/
void lock_sys_t::resize(ulint n_cells)
431
{
432 433 434 435 436
  ut_ad(this == &lock_sys);
  LockMutexGuard g{SRW_LOCK_CALL};
  rec_hash.resize(n_cells);
  prdt_hash.resize(n_cells);
  prdt_page_hash.resize(n_cells);
437 438
}

439 440
/** Closes the lock system at database shutdown. */
void lock_sys_t::close()
441
{
442
  ut_ad(this == &lock_sys);
443

444 445
  if (!m_initialised)
    return;
446

447 448 449 450 451
  if (lock_latest_err_file)
  {
    my_fclose(lock_latest_err_file, MYF(MY_WME));
    lock_latest_err_file= nullptr;
  }
452

453 454 455
  rec_hash.free();
  prdt_hash.free();
  prdt_page_hash.free();
456
#ifdef SUX_LOCK_GENERIC
457 458 459
  pthread_mutex_destroy(&hash_mutex);
  pthread_cond_destroy(&hash_cond);
#endif
460

461
  latch.destroy();
462
  mysql_mutex_destroy(&wait_mutex);
463

464 465 466
  Deadlock::to_check.clear();
  Deadlock::to_be_checked= false;

467
  m_initialised= false;
468 469
}

470
#ifdef WITH_WSREP
471
# ifdef UNIV_DEBUG
472 473 474 475 476 477
/** Check if both conflicting lock transaction and other transaction
requesting record lock are brute force (BF). If they are check is
this BF-BF wait correct and if not report BF wait and assert.

@param[in]	lock_rec	other waiting record lock
@param[in]	trx		trx requesting conflicting record lock
478
*/
479
static void wsrep_assert_no_bf_bf_wait(const lock_t *lock, const trx_t *trx)
480
{
Marko Mäkelä's avatar
Marko Mäkelä committed
481 482
	ut_ad(!lock->is_table());
	lock_sys.assert_locked(*lock);
483
	trx_t* lock_trx= lock->trx;
484

Marko Mäkelä's avatar
Marko Mäkelä committed
485
	/* Note that we are holding lock_sys.latch, thus we should
Marko Mäkelä's avatar
Marko Mäkelä committed
486
	not acquire THD::LOCK_thd_data mutex below to avoid latching
487
	order violation. */
488

489
	if (!trx->is_wsrep() || !lock_trx->is_wsrep())
490
		return;
491 492
	if (UNIV_LIKELY(!wsrep_thd_is_BF(trx->mysql_thd, FALSE))
	    || UNIV_LIKELY(!wsrep_thd_is_BF(lock_trx->mysql_thd, FALSE)))
493 494
		return;

495 496
	ut_ad(trx->state == TRX_STATE_ACTIVE);

Marko Mäkelä's avatar
Marko Mäkelä committed
497
	switch (lock_trx->state) {
498
	case TRX_STATE_COMMITTED_IN_MEMORY:
Marko Mäkelä's avatar
Marko Mäkelä committed
499 500
		/* The state change is only protected by trx_t::mutex,
		which we are not even holding here. */
501
	case TRX_STATE_PREPARED:
Marko Mäkelä's avatar
Marko Mäkelä committed
502 503
		/* Wait for lock->trx to complete the commit
		(or XA ROLLBACK) and to release the lock. */
504
		return;
505 506 507 508 509
	case TRX_STATE_ACTIVE:
		break;
	default:
		ut_ad("invalid state" == 0);
	}
510

511 512 513 514 515
	/* If BF - BF order is honored, i.e. trx already holding
	record lock should be ordered before this new lock request
	we can keep trx waiting for the lock. If conflicting
	transaction is already aborting or rolling back for replaying
	we can also let new transaction waiting. */
Marko Mäkelä's avatar
Marko Mäkelä committed
516 517
	if (wsrep_thd_order_before(lock_trx->mysql_thd, trx->mysql_thd)
	    || wsrep_thd_is_aborting(lock_trx->mysql_thd)) {
518 519 520
		return;
	}

521 522 523
	mtr_t mtr;

	ib::error() << "Conflicting lock on table: "
524
		    << lock->index->table->name
525
		    << " index: "
526
		    << lock->index->name()
527
		    << " that has lock ";
528
	lock_rec_print(stderr, lock, mtr);
529 530 531

	ib::error() << "WSREP state: ";

532 533 534 535
	wsrep_report_bf_lock_wait(trx->mysql_thd,
				  trx->id);
	wsrep_report_bf_lock_wait(lock_trx->mysql_thd,
				  lock_trx->id);
536 537 538
	/* BF-BF wait is a bug */
	ut_error;
}
539
# endif /* UNIV_DEBUG */
540

541
/** check if lock timeout was for priority thread,
542
as a side effect trigger lock monitor
543 544 545
@param trx    transaction owning the lock
@return false for regular lock timeout */
ATTRIBUTE_NOINLINE static bool wsrep_is_BF_lock_timeout(const trx_t &trx)
546
{
547
  ut_ad(trx.is_wsrep());
548

549 550 551 552 553 554 555
  if (trx.error_state == DB_DEADLOCK || !srv_monitor_timer ||
      !wsrep_thd_is_BF(trx.mysql_thd, false))
    return false;

  ib::info() << "WSREP: BF lock wait long for trx:" << ib::hex(trx.id)
             << " query: " << wsrep_thd_query(trx.mysql_thd);
  return true;
556
}
557 558
#endif /* WITH_WSREP */

559 560
/*********************************************************************//**
Checks if a lock request for a new lock has to wait for request lock2.
561
@return TRUE if new lock has to wait for lock2 to be removed */
562
UNIV_INLINE
563
bool
564 565
lock_rec_has_to_wait(
/*=================*/
566 567
	bool		for_locking,
				/*!< in is called locking or releasing */
568
	const trx_t*	trx,	/*!< in: trx of new lock */
569
	unsigned	type_mode,/*!< in: precise mode of the new lock
570 571 572 573 574 575 576
				to set: LOCK_S or LOCK_X, possibly
				ORed to LOCK_GAP or LOCK_REC_NOT_GAP,
				LOCK_INSERT_INTENTION */
	const lock_t*	lock2,	/*!< in: another record lock; NOTE that
				it is assumed that this has a lock bit
				set on the same record as in the new
				lock we are setting */
577 578
	bool		lock_is_on_supremum)
				/*!< in: TRUE if we are setting the
579 580 581 582
				lock on the 'supremum' record of an
				index page: we know then that the lock
				request is really for a 'gap' type lock */
{
583 584
	ut_ad(trx);
	ut_ad(!lock2->is_table());
Marko Mäkelä's avatar
Marko Mäkelä committed
585 586
	ut_d(lock_sys.hash_get(type_mode).assert_locked(
		     lock2->un_member.rec_lock.page_id));
587

588 589 590
	if (trx == lock2->trx
	    || lock_mode_compatible(
		       static_cast<lock_mode>(LOCK_MODE_MASK & type_mode),
591
		       lock2->mode())) {
Eugene Kosov's avatar
Eugene Kosov committed
592
		return false;
593
	}
594

595 596
	/* We have somewhat complex rules when gap type record locks
	cause waits */
597

598 599
	if ((lock_is_on_supremum || (type_mode & LOCK_GAP))
	    && !(type_mode & LOCK_INSERT_INTENTION)) {
600

601 602 603 604
		/* Gap type locks without LOCK_INSERT_INTENTION flag
		do not need to wait for anything. This is because
		different users can have conflicting lock types
		on gaps. */
605

Eugene Kosov's avatar
Eugene Kosov committed
606
		return false;
607
	}
608

609
	if (!(type_mode & LOCK_INSERT_INTENTION) && lock2->is_gap()) {
610

611 612
		/* Record lock (LOCK_ORDINARY or LOCK_REC_NOT_GAP
		does not need to wait for a gap type lock */
613

Eugene Kosov's avatar
Eugene Kosov committed
614
		return false;
615
	}
616

617
	if ((type_mode & LOCK_GAP) && lock2->is_record_not_gap()) {
618

619 620
		/* Lock on gap does not need to wait for
		a LOCK_REC_NOT_GAP type lock */
621

Eugene Kosov's avatar
Eugene Kosov committed
622
		return false;
623
	}
624

625
	if (lock2->is_insert_intention()) {
626 627 628 629 630 631 632
		/* No lock request needs to wait for an insert
		intention lock to be removed. This is ok since our
		rules allow conflicting locks on gaps. This eliminates
		a spurious deadlock caused by a next-key lock waiting
		for an insert intention lock; when the insert
		intention lock was granted, the insert deadlocked on
		the waiting next-key lock.
633

634 635
		Also, insert intention locks do not disturb each
		other. */
636

Eugene Kosov's avatar
Eugene Kosov committed
637
		return false;
638
	}
639

640
#ifdef HAVE_REPLICATION
641
	if ((type_mode & LOCK_GAP || lock2->is_gap())
642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662
	    && !thd_need_ordering_with(trx->mysql_thd, lock2->trx->mysql_thd)) {
		/* If the upper server layer has already decided on the
		commit order between the transaction requesting the
		lock and the transaction owning the lock, we do not
		need to wait for gap locks. Such ordeering by the upper
		server layer happens in parallel replication, where the
		commit order is fixed to match the original order on the
		master.

		Such gap locks are mainly needed to get serialisability
		between transactions so that they will be binlogged in
		the correct order so that statement-based replication
		will give the correct results. Since the right order
		was already determined on the master, we do not need
		to enforce it again here.

		Skipping the locks is not essential for correctness,
		since in case of deadlock we will just kill the later
		transaction and retry it. But it can save some
		unnecessary rollbacks and retries. */

Eugene Kosov's avatar
Eugene Kosov committed
663
		return false;
664
	}
665
#endif /* HAVE_REPLICATION */
666

667
#ifdef WITH_WSREP
668 669 670 671 672
		/* New lock request from a transaction is using unique key
		scan and this transaction is a wsrep high priority transaction
		(brute force). If conflicting transaction is also wsrep high
		priority transaction we should avoid lock conflict because
		ordering of these transactions is already decided and
Marko Mäkelä's avatar
Marko Mäkelä committed
673
		conflicting transaction will be later replayed. */
674
		if (trx->is_wsrep_UK_scan()
675
		    && wsrep_thd_is_BF(lock2->trx->mysql_thd, false)) {
676
			return false;
677 678
		}

679 680 681 682 683
		/* We very well can let bf to wait normally as other
		BF will be replayed in case of conflict. For debug
		builds we will do additional sanity checks to catch
		unsupported bf wait if any. */
		ut_d(wsrep_assert_no_bf_bf_wait(lock2, trx));
684
#endif /* WITH_WSREP */
685

Eugene Kosov's avatar
Eugene Kosov committed
686
	return true;
687 688 689 690
}

/*********************************************************************//**
Checks if a lock request lock1 has to wait for request lock2.
691
@return TRUE if lock1 has to wait for lock2 to be removed */
Eugene Kosov's avatar
Eugene Kosov committed
692
bool
693 694 695 696 697 698 699 700 701 702
lock_has_to_wait(
/*=============*/
	const lock_t*	lock1,	/*!< in: waiting lock */
	const lock_t*	lock2)	/*!< in: another lock; NOTE that it is
				assumed that this has a lock bit set
				on the same record as in lock1 if the
				locks are record locks */
{
	ut_ad(lock1 && lock2);

Eugene Kosov's avatar
Eugene Kosov committed
703
	if (lock1->trx == lock2->trx
704
	    || lock_mode_compatible(lock1->mode(), lock2->mode())) {
Eugene Kosov's avatar
Eugene Kosov committed
705
		return false;
Eugene Kosov's avatar
Eugene Kosov committed
706
	}
707

708
	if (lock1->is_table()) {
Eugene Kosov's avatar
Eugene Kosov committed
709
		return true;
710 711
	}

712
	ut_ad(!lock2->is_table());
Eugene Kosov's avatar
Eugene Kosov committed
713 714

	if (lock1->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)) {
Eugene Kosov's avatar
Eugene Kosov committed
715
		return lock_prdt_has_to_wait(lock1->trx, lock1->type_mode,
Eugene Kosov's avatar
Eugene Kosov committed
716
					     lock_get_prdt_from_lock(lock1),
Eugene Kosov's avatar
Eugene Kosov committed
717
					     lock2);
Eugene Kosov's avatar
Eugene Kosov committed
718 719
	}

Eugene Kosov's avatar
Eugene Kosov committed
720 721 722
	return lock_rec_has_to_wait(
		false, lock1->trx, lock1->type_mode, lock2,
		lock_rec_get_nth_bit(lock1, PAGE_HEAP_NO_SUPREMUM));
723 724 725 726 727 728 729 730 731 732 733 734 735 736
}

/*============== RECORD LOCK BASIC FUNCTIONS ============================*/

/**********************************************************************//**
Looks for a set bit in a record lock bitmap. Returns ULINT_UNDEFINED,
if none found.
@return bit index == heap number of the record, or ULINT_UNDEFINED if
none found */
ulint
lock_rec_find_set_bit(
/*==================*/
	const lock_t*	lock)	/*!< in: record lock with at least one bit set */
{
737
	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {
738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759

		if (lock_rec_get_nth_bit(lock, i)) {

			return(i);
		}
	}

	return(ULINT_UNDEFINED);
}

/*********************************************************************//**
Resets the record lock bitmap to zero. NOTE: does not touch the wait_lock
pointer in the transaction! This function is used in lock object creation
and resetting. */
static
void
lock_rec_bitmap_reset(
/*==================*/
	lock_t*	lock)	/*!< in: record lock */
{
	ulint	n_bytes;

760
	ut_ad(!lock->is_table());
761 762 763 764 765 766 767 768

	/* Reset to zero the bitmap which resides immediately after the lock
	struct */

	n_bytes = lock_rec_get_n_bits(lock) / 8;

	ut_ad((lock_rec_get_n_bits(lock) % 8) == 0);

769
	memset(reinterpret_cast<void*>(&lock[1]), 0, n_bytes);
770 771 772 773
}

/*********************************************************************//**
Copies a record lock to heap.
774
@return copy of lock */
775 776 777 778 779 780 781 782 783
static
lock_t*
lock_rec_copy(
/*==========*/
	const lock_t*	lock,	/*!< in: record lock */
	mem_heap_t*	heap)	/*!< in: memory heap */
{
	ulint	size;

784
	ut_ad(!lock->is_table());
785 786 787 788 789 790 791 792

	size = sizeof(lock_t) + lock_rec_get_n_bits(lock) / 8;

	return(static_cast<lock_t*>(mem_heap_dup(heap, lock, size)));
}

/*********************************************************************//**
Gets the previous record lock set on a record.
793
@return previous lock on the same record, NULL if none exists */
794 795 796 797 798 799
const lock_t*
lock_rec_get_prev(
/*==============*/
	const lock_t*	in_lock,/*!< in: record lock */
	ulint		heap_no)/*!< in: heap number of the record */
{
800 801 802
  ut_ad(!in_lock->is_table());
  const page_id_t id{in_lock->un_member.rec_lock.page_id};
  hash_cell_t *cell= lock_sys.hash_get(in_lock->type_mode).cell_get(id.fold());
803

804 805 806 807
  for (lock_t *lock= lock_sys_t::get_first(*cell, id); lock != in_lock;
       lock= lock_rec_get_next_on_page(lock))
    if (lock_rec_get_nth_bit(lock, heap_no))
      return lock;
808

809
  return nullptr;
810 811 812 813 814 815 816
}

/*============= FUNCTIONS FOR ANALYZING RECORD LOCK QUEUE ================*/

/*********************************************************************//**
Checks if a transaction has a GRANTED explicit lock on rec stronger or equal
to precise_mode.
817
@return lock or NULL */
818 819 820 821 822 823 824 825 826
UNIV_INLINE
lock_t*
lock_rec_has_expl(
/*==============*/
	ulint			precise_mode,/*!< in: LOCK_S or LOCK_X
					possibly ORed to LOCK_GAP or
					LOCK_REC_NOT_GAP, for a
					supremum record we regard this
					always a gap type request */
827
	const hash_cell_t&	cell,	/*!< in: lock hash table cell */
828
	const page_id_t		id,	/*!< in: page identifier */
829 830 831
	ulint			heap_no,/*!< in: heap number of the record */
	const trx_t*		trx)	/*!< in: transaction */
{
832 833 834
  ut_ad((precise_mode & LOCK_MODE_MASK) == LOCK_S
	|| (precise_mode & LOCK_MODE_MASK) == LOCK_X);
  ut_ad(!(precise_mode & LOCK_INSERT_INTENTION));
835

836
  for (lock_t *lock= lock_sys_t::get_first(cell, id, heap_no); lock;
837
       lock= lock_rec_get_next(heap_no, lock))
838 839 840 841 842 843 844 845
    if (lock->trx == trx &&
	!(lock->type_mode & (LOCK_WAIT | LOCK_INSERT_INTENTION)) &&
	(!((LOCK_REC_NOT_GAP | LOCK_GAP) & lock->type_mode) ||
	 heap_no == PAGE_HEAP_NO_SUPREMUM ||
	 ((LOCK_REC_NOT_GAP | LOCK_GAP) & precise_mode & lock->type_mode)) &&
	lock_mode_stronger_or_eq(lock->mode(), static_cast<lock_mode>
				 (precise_mode & LOCK_MODE_MASK)))
      return lock;
846

847
  return nullptr;
848 849 850 851 852
}

#ifdef UNIV_DEBUG
/*********************************************************************//**
Checks if some other transaction has a lock request in the queue.
853
@return lock or NULL */
854
static
855
lock_t*
856 857
lock_rec_other_has_expl_req(
/*========================*/
858
	lock_mode		mode,	/*!< in: LOCK_S or LOCK_X */
859
	const hash_cell_t&	cell,	/*!< in: lock hash table cell */
860
	const page_id_t		id,	/*!< in: page identifier */
861 862
	bool			wait,	/*!< in: whether also waiting locks
					are taken into account */
863 864 865 866 867 868 869
	ulint			heap_no,/*!< in: heap number of the record */
	const trx_t*		trx)	/*!< in: transaction, or NULL if
					requests by all transactions
					are taken into account */
{
	ut_ad(mode == LOCK_X || mode == LOCK_S);

870 871 872 873 874 875
	/* Only GAP lock can be on SUPREMUM, and we are not looking for
	GAP lock */
	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
		return(NULL);
	}

876
	for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no);
877
	     lock; lock = lock_rec_get_next(heap_no, lock)) {
878
		if (lock->trx != trx
879 880 881
		    && !lock->is_gap()
		    && (!lock->is_waiting() || wait)
		    && lock_mode_stronger_or_eq(lock->mode(), mode)) {
882 883 884 885 886 887 888 889 890

			return(lock);
		}
	}

	return(NULL);
}
#endif /* UNIV_DEBUG */

891
#ifdef WITH_WSREP
892
void lock_wait_wsrep_kill(trx_t *bf_trx, ulong thd_id, trx_id_t trx_id);
893

894 895 896 897
/** Kill the holders of conflicting locks.
@param trx   brute-force applier transaction running in the current thread */
ATTRIBUTE_COLD ATTRIBUTE_NOINLINE static void lock_wait_wsrep(trx_t *trx)
{
898
  DBUG_ASSERT(wsrep_on(trx->mysql_thd));
899 900
  if (!wsrep_thd_is_BF(trx->mysql_thd, false))
    return;
Marko Mäkelä's avatar
Marko Mäkelä committed
901

902
  std::set<trx_t*> victims;
903

904 905 906 907 908
  lock_sys.wr_lock(SRW_LOCK_CALL);
  mysql_mutex_lock(&lock_sys.wait_mutex);

  const lock_t *wait_lock= trx->lock.wait_lock;
  if (!wait_lock)
909 910
  {
func_exit:
911 912 913
    lock_sys.wr_unlock();
    mysql_mutex_unlock(&lock_sys.wait_mutex);
    return;
914
  }
915

916 917 918 919 920
  if (wait_lock->is_table())
  {
    dict_table_t *table= wait_lock->un_member.tab_lock.table;
    for (lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock;
         lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
921 922 923 924 925
      /* if victim has also BF status, but has earlier seqno, we have to wait */
      if (lock->trx != trx &&
          !(wsrep_thd_is_BF(lock->trx->mysql_thd, false) &&
            wsrep_thd_order_before(lock->trx->mysql_thd, trx->mysql_thd)))
      {
926
        victims.emplace(lock->trx);
927
      }
928
  }
929
  else
930
  {
931 932 933 934 935 936 937 938 939 940
    const page_id_t id{wait_lock->un_member.rec_lock.page_id};
    hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                         ? lock_sys.prdt_hash : lock_sys.rec_hash).cell_get
      (id.fold());
    if (lock_t *lock= lock_sys_t::get_first(cell, id))
    {
      const ulint heap_no= lock_rec_find_set_bit(wait_lock);
      if (!lock_rec_get_nth_bit(lock, heap_no))
        lock= lock_rec_get_next(heap_no, lock);
      do
941 942 943 944 945
        /* if victim has also BF status, but has earlier seqno, we have to wait */
        if (lock->trx != trx &&
            !(wsrep_thd_is_BF(lock->trx->mysql_thd, false) &&
              wsrep_thd_order_before(lock->trx->mysql_thd, trx->mysql_thd)))
        {
946
          victims.emplace(lock->trx);
947
        }
948 949
      while ((lock= lock_rec_get_next(heap_no, lock)));
    }
950
  }
951

952 953
  if (victims.empty())
    goto func_exit;
954

955 956 957 958
  std::vector<std::pair<ulong,trx_id_t>> victim_id;
  for (trx_t *v : victims)
    victim_id.emplace_back(std::pair<ulong,trx_id_t>
                           {thd_get_thread_id(v->mysql_thd), v->id});
959

960 961 962 963 964 965 966 967
  DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort",
                  {
                    const char act[]=
                      "now SIGNAL sync.before_wsrep_thd_abort_reached "
                      "WAIT_FOR signal.before_wsrep_thd_abort";
                    DBUG_ASSERT(!debug_sync_set_action(trx->mysql_thd,
                                                       STRING_WITH_LEN(act)));
                  };);
968

969 970
  lock_sys.wr_unlock();
  mysql_mutex_unlock(&lock_sys.wait_mutex);
971

972 973
  for (const auto &v : victim_id)
    lock_wait_wsrep_kill(trx, v.first, v.second);
974
}
975 976
#endif /* WITH_WSREP */

977 978 979
/*********************************************************************//**
Checks if some other transaction has a conflicting explicit lock request
in the queue, so that we have to wait.
980
@return lock or NULL */
981
static
982
lock_t*
983 984
lock_rec_other_has_conflicting(
/*===========================*/
985
	unsigned		mode,	/*!< in: LOCK_S or LOCK_X,
986 987 988
					possibly ORed to LOCK_GAP or
					LOC_REC_NOT_GAP,
					LOCK_INSERT_INTENTION */
989
	const hash_cell_t&	cell,	/*!< in: lock hash table cell */
990
	const page_id_t		id,	/*!< in: page identifier */
991 992 993
	ulint			heap_no,/*!< in: heap number of the record */
	const trx_t*		trx)	/*!< in: our transaction */
{
994
	bool	is_supremum = (heap_no == PAGE_HEAP_NO_SUPREMUM);
995

996
	for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no);
997
	     lock; lock = lock_rec_get_next(heap_no, lock)) {
998
		if (lock_rec_has_to_wait(true, trx, mode, lock, is_supremum)) {
999 1000 1001 1002 1003 1004 1005 1006 1007 1008
			return(lock);
		}
	}

	return(NULL);
}

/*********************************************************************//**
Checks if some transaction has an implicit x-lock on a record in a secondary
index.
1009
@return transaction id of the transaction which has the x-lock, or 0;
1010 1011 1012 1013
NOTE that this function can return false positives but never false
negatives. The caller must confirm all positive results by calling
trx_is_active(). */
static
1014
trx_t*
1015 1016
lock_sec_rec_some_has_impl(
/*=======================*/
1017
	trx_t*		caller_trx,/*!<in/out: trx of current thread */
1018 1019
	const rec_t*	rec,	/*!< in: user record */
	dict_index_t*	index,	/*!< in: secondary index */
1020
	const rec_offs*	offsets)/*!< in: rec_get_offsets(rec, index) */
1021
{
1022
	trx_t*		trx;
1023 1024 1025
	trx_id_t	max_trx_id;
	const page_t*	page = page_align(rec);

1026
	lock_sys.assert_unlocked();
1027 1028 1029
	ut_ad(!dict_index_is_clust(index));
	ut_ad(page_rec_is_user_rec(rec));
	ut_ad(rec_offs_validate(rec, index, offsets));
1030
	ut_ad(!rec_is_metadata(rec, *index));
1031 1032 1033 1034 1035

	max_trx_id = page_get_max_trx_id(page);

	/* Some transaction may have an implicit x-lock on the record only
	if the max trx id for the page >= min trx id for the trx list, or
Marko Mäkelä's avatar
Marko Mäkelä committed
1036
	database recovery is running. */
1037

1038
	if (max_trx_id < trx_sys.get_min_trx_id()) {
1039

1040
		trx = 0;
1041 1042 1043 1044

	} else if (!lock_check_trx_id_sanity(max_trx_id, rec, index, offsets)) {

		/* The page is corrupt: try to avoid a crash by returning 0 */
1045
		trx = 0;
1046 1047 1048 1049 1050

	/* In this case it is possible that some transaction has an implicit
	x-lock. We have to look in the clustered index. */

	} else {
1051
		trx = row_vers_impl_x_locked(caller_trx, rec, index, offsets);
1052 1053
	}

1054
	return(trx);
1055 1056
}

1057 1058
/*********************************************************************//**
Return the number of table locks for a transaction.
1059
The caller must be holding lock_sys.latch. */
1060 1061 1062 1063
ulint
lock_number_of_tables_locked(
/*=========================*/
	const trx_lock_t*	trx_lock)	/*!< in: transaction locks */
1064 1065
{
	const lock_t*	lock;
1066
	ulint		n_tables = 0;
1067

1068
	lock_sys.assert_locked();
1069 1070 1071 1072 1073

	for (lock = UT_LIST_GET_FIRST(trx_lock->trx_locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {

1074
		if (lock->is_table()) {
1075
			n_tables++;
1076 1077 1078
		}
	}

1079
	return(n_tables);
1080 1081 1082 1083
}

/*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/

1084 1085 1086 1087
/** Reset the wait status of a lock.
@param[in,out]	lock	lock that was possibly being waited for */
static void lock_reset_lock_and_trx_wait(lock_t *lock)
{
1088 1089 1090
  lock_sys.assert_locked(*lock);
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  trx_t *trx= lock->trx;
1091
  ut_ad(lock->is_waiting());
1092
  ut_ad(!trx->lock.wait_lock || trx->lock.wait_lock == lock);
1093 1094
  if (trx_t *wait_trx= trx->lock.wait_trx)
    Deadlock::to_check.erase(wait_trx);
1095
  trx->lock.wait_lock= nullptr;
1096
  trx->lock.wait_trx= nullptr;
1097 1098 1099
  lock->type_mode&= ~LOCK_WAIT;
}

1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
#ifdef UNIV_DEBUG
/** Check transaction state */
static void check_trx_state(const trx_t *trx)
{
  ut_ad(!trx->auto_commit || trx->will_lock);
  const auto state= trx->state;
  ut_ad(state == TRX_STATE_ACTIVE ||
        state == TRX_STATE_PREPARED_RECOVERED ||
        state == TRX_STATE_PREPARED ||
        state == TRX_STATE_COMMITTED_IN_MEMORY);
}
#endif

1113 1114
/** Create a new record lock and inserts it to the lock queue,
without checking for deadlocks or conflicts.
1115
@param[in]	c_lock		conflicting lock
1116
@param[in]	type_mode	lock mode and wait flag
1117
@param[in]	page_id		index page number
1118 1119 1120 1121 1122 1123
@param[in]	page		R-tree index page, or NULL
@param[in]	heap_no		record heap number in the index page
@param[in]	index		the index tree
@param[in,out]	trx		transaction
@param[in]	holds_trx_mutex	whether the caller holds trx->mutex
@return created lock */
1124
lock_t*
1125
lock_rec_create_low(
1126
	lock_t*		c_lock,
1127
	unsigned	type_mode,
1128
	const page_id_t	page_id,
1129 1130
	const page_t*	page,
	ulint		heap_no,
1131
	dict_index_t*	index,
1132 1133
	trx_t*		trx,
	bool		holds_trx_mutex)
1134
{
1135 1136
	lock_t*		lock;
	ulint		n_bytes;
1137

1138
	ut_d(lock_sys.hash_get(type_mode).assert_locked(page_id));
1139
	ut_ad(holds_trx_mutex == trx->mutex_is_owner());
1140
	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1141
	ut_ad(!(type_mode & LOCK_TABLE));
1142
	ut_ad(trx->state != TRX_STATE_NOT_STARTED);
Marko Mäkelä's avatar
Marko Mäkelä committed
1143
	ut_ad(!trx->is_autocommit_non_locking());
1144

1145 1146 1147
	/* If rec is the supremum record, then we reset the gap and
	LOCK_REC_NOT_GAP bits, as all locks on the supremum are
	automatically of the gap type */
1148

1149 1150 1151 1152
	if (UNIV_UNLIKELY(heap_no == PAGE_HEAP_NO_SUPREMUM)) {
		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));
		type_mode = type_mode & ~(LOCK_GAP | LOCK_REC_NOT_GAP);
	}
1153

1154
	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
Marko Mäkelä's avatar
Marko Mäkelä committed
1155
		n_bytes = (page_dir_get_n_heap(page) + 7) / 8;
1156
	} else {
1157
		ut_ad(heap_no == PRDT_HEAPNO);
1158

1159 1160 1161 1162
		/* The lock is always on PAGE_HEAP_NO_INFIMUM (0), so
		we only need 1 bit (which round up to 1 byte) for
		lock bit setting */
		n_bytes = 1;
1163

1164 1165
		if (type_mode & LOCK_PREDICATE) {
			ulint	tmp = UNIV_WORD_SIZE - 1;
1166

1167 1168 1169 1170 1171 1172 1173 1174
			/* We will attach predicate structure after lock.
			Make sure the memory is aligned on 8 bytes,
			the mem_heap_alloc will align it with
			MEM_SPACE_NEEDED anyway. */
			n_bytes = (n_bytes + sizeof(lock_prdt_t) + tmp) & ~tmp;
			ut_ad(n_bytes == sizeof(lock_prdt_t) + UNIV_WORD_SIZE);
		}
	}
1175

1176 1177 1178 1179 1180 1181
	if (!holds_trx_mutex) {
		trx->mutex_lock();
	}
	ut_ad(trx->mutex_is_owner());
	ut_ad(trx->state != TRX_STATE_NOT_STARTED);

1182 1183
	if (trx->lock.rec_cached >= UT_ARR_SIZE(trx->lock.rec_pool)
	    || sizeof *lock + n_bytes > sizeof *trx->lock.rec_pool) {
1184 1185 1186
		lock = static_cast<lock_t*>(
			mem_heap_alloc(trx->lock.lock_heap,
				       sizeof *lock + n_bytes));
1187
	} else {
1188
		lock = &trx->lock.rec_pool[trx->lock.rec_cached++].lock;
1189
	}
1190 1191

	lock->trx = trx;
1192
	lock->type_mode = type_mode;
1193
	lock->index = index;
1194
	lock->un_member.rec_lock.page_id = page_id;
1195

1196 1197 1198 1199 1200 1201 1202 1203 1204
	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
		lock->un_member.rec_lock.n_bits = uint32_t(n_bytes * 8);
	} else {
		/* Predicate lock always on INFIMUM (0) */
		lock->un_member.rec_lock.n_bits = 8;
 	}
	lock_rec_bitmap_reset(lock);
	lock_rec_set_nth_bit(lock, heap_no);
	index->table->n_rec_locks++;
1205
	ut_ad(index->table->get_ref_count() || !index->table->can_be_evicted);
1206

1207 1208
	const auto lock_hash = &lock_sys.hash_get(type_mode);
	HASH_INSERT(lock_t, hash, lock_hash, page_id.fold(), lock);
1209

1210
	if (type_mode & LOCK_WAIT) {
1211 1212 1213 1214 1215 1216 1217 1218 1219
		if (trx->lock.wait_trx) {
			ut_ad(!c_lock || trx->lock.wait_trx == c_lock->trx);
			ut_ad(trx->lock.wait_lock);
			ut_ad((*trx->lock.wait_lock).trx == trx);
		} else {
			ut_ad(c_lock);
			trx->lock.wait_trx = c_lock->trx;
			ut_ad(!trx->lock.wait_lock);
		}
1220
		trx->lock.wait_lock = lock;
1221 1222 1223
	}
	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
	if (!holds_trx_mutex) {
1224
		trx->mutex_unlock();
1225
	}
1226
	MONITOR_INC(MONITOR_RECLOCK_CREATED);
1227
	MONITOR_INC(MONITOR_NUM_RECLOCK);
1228

1229
	return lock;
1230 1231
}

1232 1233
/** Enqueue a waiting request for a lock which cannot be granted immediately.
Check for deadlocks.
1234
@param[in]	c_lock		conflicting lock
1235 1236 1237 1238 1239 1240 1241
@param[in]	type_mode	the requested lock mode (LOCK_S or LOCK_X)
				possibly ORed with LOCK_GAP or
				LOCK_REC_NOT_GAP, ORed with
				LOCK_INSERT_INTENTION if this
				waiting lock request is set
				when performing an insert of
				an index record
1242 1243
@param[in]	id		page identifier
@param[in]	page		leaf page in the index
1244 1245 1246 1247 1248
@param[in]	heap_no		record heap number in the block
@param[in]	index		index tree
@param[in,out]	thr		query thread
@param[in]	prdt		minimum bounding box (spatial index)
@retval	DB_LOCK_WAIT		if the waiting lock was enqueued
1249
@retval	DB_DEADLOCK		if this transaction was chosen as the victim */
1250 1251
dberr_t
lock_rec_enqueue_waiting(
1252
	lock_t*			c_lock,
1253
	unsigned		type_mode,
1254 1255
	const page_id_t		id,
	const page_t*		page,
1256 1257 1258 1259
	ulint			heap_no,
	dict_index_t*		index,
	que_thr_t*		thr,
	lock_prdt_t*		prdt)
1260
{
1261
	ut_d(lock_sys.hash_get(type_mode).assert_locked(id));
1262 1263
	ut_ad(!srv_read_only_mode);
	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1264

1265
	trx_t* trx = thr_get_trx(thr);
1266
	ut_ad(trx->mutex_is_owner());
1267

1268
	if (UNIV_UNLIKELY(trx->dict_operation)) {
1269 1270 1271 1272 1273 1274 1275
		ib::error() << "A record lock wait happens in a dictionary"
			" operation. index "
			<< index->name
			<< " of table "
			<< index->table->name
			<< ". " << BUG_REPORT_MSG;
		ut_ad(0);
1276 1277
	}

1278 1279 1280 1281 1282
	if (trx->mysql_thd && thd_lock_wait_timeout(trx->mysql_thd) == 0) {
		trx->error_state = DB_LOCK_WAIT_TIMEOUT;
		return DB_LOCK_WAIT_TIMEOUT;
	}

1283 1284
	/* Enqueue the lock request that will wait to be granted, note that
	we already own the trx mutex. */
1285
	lock_t* lock = lock_rec_create_low(
1286
		c_lock,
1287
		type_mode | LOCK_WAIT, id, page, heap_no, index, trx, true);
1288

1289 1290
	if (prdt && type_mode & LOCK_PREDICATE) {
		lock_prdt_set_prdt(lock, prdt);
1291
	}
1292

1293
	trx->lock.wait_thr = thr;
1294 1295
	trx->lock.was_chosen_as_deadlock_victim
		IF_WSREP(.fetch_and(byte(~1)), = false);
1296

1297 1298 1299
	DBUG_LOG("ib_lock", "trx " << ib::hex(trx->id)
		 << " waits for lock in index " << index->name
		 << " of table " << index->table->name);
1300

1301
	MONITOR_INC(MONITOR_LOCKREC_WAIT);
1302

1303
	return DB_LOCK_WAIT;
1304 1305
}

1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318
/*********************************************************************//**
Looks for a suitable type record lock struct by the same trx on the same page.
This can be used to save space when a new record lock should be set on a page:
no new struct is needed, if a suitable old is found.
@return lock or NULL */
static inline
lock_t*
lock_rec_find_similar_on_page(
	ulint           type_mode,      /*!< in: lock type_mode field */
	ulint           heap_no,        /*!< in: heap number of the record */
	lock_t*         lock,           /*!< in: lock_sys.get_first() */
	const trx_t*    trx)            /*!< in: transaction */
{
1319
	lock_sys.rec_hash.assert_locked(lock->un_member.rec_lock.page_id);
1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335

	for (/* No op */;
	     lock != NULL;
	     lock = lock_rec_get_next_on_page(lock)) {

		if (lock->trx == trx
		    && lock->type_mode == type_mode
		    && lock_rec_get_n_bits(lock) > heap_no) {

			return(lock);
		}
	}

	return(NULL);
}

1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347
/*********************************************************************//**
Adds a record lock request in the record queue. The request is normally
added as the last in the queue, but if there are no waiting lock requests
on the record, and the request to be added is not a waiting request, we
can reuse a suitable record lock object already existing on the same page,
just setting the appropriate bit in its bitmap. This is a low-level function
which does NOT check for deadlocks or lock compatibility!
@return lock where the bit was set */
static
void
lock_rec_add_to_queue(
/*==================*/
1348
	unsigned		type_mode,/*!< in: lock mode, wait, gap
1349
					etc. flags */
1350
	hash_cell_t&		cell,	/*!< in,out: first hash table cell */
1351 1352
	const page_id_t		id,	/*!< in: page identifier */
	const page_t*		page,	/*!< in: buffer block containing
1353 1354 1355 1356 1357 1358 1359 1360
					the record */
	ulint			heap_no,/*!< in: heap number of the record */
	dict_index_t*		index,	/*!< in: index of record */
	trx_t*			trx,	/*!< in/out: transaction */
	bool			caller_owns_trx_mutex)
					/*!< in: TRUE if caller owns the
					transaction mutex */
{
1361
	ut_d(lock_sys.hash_get(type_mode).assert_locked(id));
1362
	ut_ad(caller_owns_trx_mutex == trx->mutex_is_owner());
1363
	ut_ad(index->is_primary()
1364
	      || dict_index_get_online_status(index) != ONLINE_INDEX_CREATION);
1365
	ut_ad(!(type_mode & LOCK_TABLE));
1366
#ifdef UNIV_DEBUG
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380
	switch (type_mode & LOCK_MODE_MASK) {
	case LOCK_X:
	case LOCK_S:
		break;
	default:
		ut_error;
	}

	if (!(type_mode & (LOCK_WAIT | LOCK_GAP))) {
		lock_mode	mode = (type_mode & LOCK_MODE_MASK) == LOCK_S
			? LOCK_X
			: LOCK_S;
		const lock_t*	other_lock
			= lock_rec_other_has_expl_req(
1381
				mode, cell, id, false, heap_no, trx);
1382
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
1383
		if (UNIV_LIKELY_NULL(other_lock) && trx->is_wsrep()) {
1384 1385 1386 1387 1388 1389 1390 1391 1392 1393
			/* Only BF transaction may be granted lock
			before other conflicting lock request. */
			if (!wsrep_thd_is_BF(trx->mysql_thd, FALSE)
			    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
				/* If it is not BF, this case is a bug. */
				wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
				ut_error;
			}
		} else
1394
#endif /* WITH_WSREP */
1395
		ut_ad(!other_lock);
1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412
	}
#endif /* UNIV_DEBUG */

	/* If rec is the supremum record, then we can reset the gap bit, as
	all locks on the supremum are automatically of the gap type, and we
	try to avoid unnecessary memory consumption of a new record lock
	struct for a gap type lock */

	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));

		/* There should never be LOCK_REC_NOT_GAP on a supremum
		record, but let us play safe */

		type_mode &= ~(LOCK_GAP | LOCK_REC_NOT_GAP);
	}

1413 1414
	if (type_mode & LOCK_WAIT) {
		goto create;
1415
	} else if (lock_t *first_lock = lock_sys_t::get_first(cell, id)) {
1416 1417 1418 1419 1420 1421 1422 1423
		for (lock_t* lock = first_lock;;) {
			if (lock->is_waiting()
			    && lock_rec_get_nth_bit(lock, heap_no)) {
				goto create;
			}
			if (!(lock = lock_rec_get_next_on_page(lock))) {
				break;
			}
1424 1425 1426 1427 1428
		}

		/* Look for a similar record lock on the same page:
		if one is found and there are no waiting lock requests,
		we can just set the bit */
1429 1430
		if (lock_t* lock = lock_rec_find_similar_on_page(
			    type_mode, heap_no, first_lock, trx)) {
1431 1432 1433 1434 1435
			trx_t* lock_trx = lock->trx;
			if (caller_owns_trx_mutex) {
				trx->mutex_unlock();
			}
			lock_trx->mutex_lock();
1436
			lock_rec_set_nth_bit(lock, heap_no);
1437 1438 1439 1440
			lock_trx->mutex_unlock();
			if (caller_owns_trx_mutex) {
				trx->mutex_lock();
			}
1441 1442 1443 1444
			return;
		}
	}

Marko Mäkelä's avatar
Marko Mäkelä committed
1445
create:
1446 1447 1448 1449 1450 1451 1452
	/* Note: We will not pass any conflicting lock to lock_rec_create(),
	because we should be moving an existing waiting lock request. */
	ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx);

	lock_rec_create_low(nullptr,
			    type_mode, id, page, heap_no, index, trx,
			    caller_owns_trx_mutex);
1453
}
1454 1455 1456 1457 1458 1459 1460

/*********************************************************************//**
Tries to lock the specified record in the mode requested. If not immediately
possible, enqueues a waiting lock request. This is a low-level function
which does NOT look at implicit locks! Checks lock compatibility within
explicit locks. This function sets a normal next-key lock, or in the case
of a page supremum record, a gap type lock.
1461
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
1462 1463 1464 1465
static
dberr_t
lock_rec_lock(
/*==========*/
1466
	bool			impl,	/*!< in: if true, no lock is set
1467 1468 1469
					if no wait is necessary: we
					assume that the caller will
					set an implicit lock */
1470
	unsigned		mode,	/*!< in: lock mode: LOCK_X or
1471 1472 1473 1474 1475 1476 1477 1478
					LOCK_S possibly ORed to either
					LOCK_GAP or LOCK_REC_NOT_GAP */
	const buf_block_t*	block,	/*!< in: buffer block containing
					the record */
	ulint			heap_no,/*!< in: heap number of record */
	dict_index_t*		index,	/*!< in: index of record */
	que_thr_t*		thr)	/*!< in: query thread */
{
1479 1480 1481
  trx_t *trx= thr_get_trx(thr);

  ut_ad(!srv_read_only_mode);
1482 1483 1484
  ut_ad(((LOCK_MODE_MASK | LOCK_TABLE) & mode) == LOCK_S ||
        ((LOCK_MODE_MASK | LOCK_TABLE) & mode) == LOCK_X);
  ut_ad(~mode & (LOCK_GAP | LOCK_REC_NOT_GAP));
1485 1486 1487 1488 1489 1490 1491 1492
  ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
  DBUG_EXECUTE_IF("innodb_report_deadlock", return DB_DEADLOCK;);

  ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ||
        lock_table_has(trx, index->table, LOCK_IS));
  ut_ad((LOCK_MODE_MASK & mode) != LOCK_X ||
         lock_table_has(trx, index->table, LOCK_IX));

1493
  if (lock_table_has(trx, index->table,
1494 1495 1496 1497 1498
                     static_cast<lock_mode>(LOCK_MODE_MASK & mode)))
    return DB_SUCCESS;

  MONITOR_ATOMIC_INC(MONITOR_NUM_RECLOCK_REQ);
  const page_id_t id{block->page.id()};
1499
  LockGuard g{lock_sys.rec_hash, id};
1500

1501
  if (lock_t *lock= lock_sys_t::get_first(g.cell(), id))
1502
  {
1503
    dberr_t err= DB_SUCCESS;
1504
    trx->mutex_lock();
1505 1506
    if (lock_rec_get_next_on_page(lock) ||
        lock->trx != trx ||
1507
        lock->type_mode != mode ||
1508 1509
        lock_rec_get_n_bits(lock) <= heap_no)
    {
1510
      /* Do nothing if the trx already has a strong enough lock on rec */
1511
      if (!lock_rec_has_expl(mode, g.cell(), id, heap_no, trx))
1512
      {
1513
        if (lock_t *c_lock= lock_rec_other_has_conflicting(mode, g.cell(), id,
1514
                                                           heap_no, trx))
1515 1516 1517 1518
          /*
            If another transaction has a non-gap conflicting
            request in the queue, as this transaction does not
            have a lock strong enough already granted on the
1519 1520 1521 1522
            record, we have to wait.
          */
          err= lock_rec_enqueue_waiting(c_lock, mode, id, block->frame, heap_no,
                                        index, thr, nullptr);
1523 1524 1525
        else if (!impl)
        {
          /* Set the requested lock on the record. */
1526 1527
          lock_rec_add_to_queue(mode, g.cell(), id, block->frame, heap_no,
                                index, trx, true);
1528 1529 1530
          err= DB_SUCCESS_LOCKED_REC;
        }
      }
1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543
    }
    else if (!impl)
    {
      /*
        If the nth bit of the record lock is already set then we do not set
        a new lock bit, otherwise we do set
      */
      if (!lock_rec_get_nth_bit(lock, heap_no))
      {
        lock_rec_set_nth_bit(lock, heap_no);
        err= DB_SUCCESS_LOCKED_REC;
      }
    }
1544
    trx->mutex_unlock();
1545
    return err;
1546 1547 1548 1549 1550 1551 1552 1553
  }
  else
  {
    /*
      Simplified and faster path for the most common cases
      Note that we don't own the trx mutex.
    */
    if (!impl)
1554 1555
      lock_rec_create_low(nullptr,
                          mode, id, block->frame, heap_no, index, trx, false);
1556

1557
    return DB_SUCCESS_LOCKED_REC;
1558
  }
1559 1560 1561 1562
}

/*********************************************************************//**
Checks if a waiting record lock request still has to wait in a queue.
1563
@return lock that is causing the wait */
1564 1565
static
const lock_t*
1566
lock_rec_has_to_wait_in_queue(const hash_cell_t &cell, const lock_t *wait_lock)
1567 1568 1569 1570 1571 1572
{
	const lock_t*	lock;
	ulint		heap_no;
	ulint		bit_mask;
	ulint		bit_offset;

1573
	ut_ad(wait_lock->is_waiting());
1574
	ut_ad(!wait_lock->is_table());
1575 1576 1577 1578

	heap_no = lock_rec_find_set_bit(wait_lock);

	bit_offset = heap_no / 8;
1579
	bit_mask = static_cast<ulint>(1) << (heap_no % 8);
1580

1581 1582
	for (lock = lock_sys_t::get_first(
		     cell, wait_lock->un_member.rec_lock.page_id);
1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596
	     lock != wait_lock;
	     lock = lock_rec_get_next_on_page_const(lock)) {
		const byte*	p = (const byte*) &lock[1];

		if (heap_no < lock_rec_get_n_bits(lock)
		    && (p[bit_offset] & bit_mask)
		    && lock_has_to_wait(wait_lock, lock)) {
			return(lock);
		}
	}

	return(NULL);
}

1597 1598 1599 1600
/** Note that a record lock wait started */
inline void lock_sys_t::wait_start()
{
  mysql_mutex_assert_owner(&wait_mutex);
1601 1602 1603 1604 1605
  wait_count+= WAIT_COUNT_STEP + 1;
  /* The maximum number of concurrently waiting transactions is one less
  than the maximum number of concurrent transactions. */
  static_assert(WAIT_COUNT_STEP == UNIV_PAGE_SIZE_MAX / 16 * TRX_SYS_N_RSEGS,
                "compatibility");
1606 1607 1608 1609 1610 1611 1612
}

/** Note that a record lock wait resumed */
inline
void lock_sys_t::wait_resume(THD *thd, my_hrtime_t start, my_hrtime_t now)
{
  mysql_mutex_assert_owner(&wait_mutex);
1613 1614 1615
  ut_ad(get_wait_pending());
  ut_ad(get_wait_cumulative());
  wait_count--;
1616 1617
  if (now.val >= start.val)
  {
1618 1619
    const uint32_t diff_time=
      static_cast<uint32_t>((now.val - start.val) / 1000);
1620 1621 1622 1623 1624 1625 1626 1627 1628
    wait_time+= diff_time;

    if (diff_time > wait_time_max)
      wait_time_max= diff_time;

    thd_storage_lock_wait(thd, diff_time);
  }
}

1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648
#ifdef HAVE_REPLICATION
ATTRIBUTE_NOINLINE MY_ATTRIBUTE((nonnull))
/** Report lock waits to parallel replication.
@param trx       transaction that may be waiting for a lock
@param wait_lock lock that is being waited for */
static void lock_wait_rpl_report(trx_t *trx)
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  ut_ad(trx->state == TRX_STATE_ACTIVE);
  THD *const thd= trx->mysql_thd;
  ut_ad(thd);
  const lock_t *wait_lock= trx->lock.wait_lock;
  if (!wait_lock)
    return;
  ut_ad(!(wait_lock->type_mode & LOCK_AUTO_INC));
  if (!lock_sys.wr_lock_try())
  {
    mysql_mutex_unlock(&lock_sys.wait_mutex);
    lock_sys.wr_lock(SRW_LOCK_CALL);
    mysql_mutex_lock(&lock_sys.wait_mutex);
1649 1650 1651
    wait_lock= trx->lock.wait_lock;
    if (!wait_lock)
    {
1652
func_exit:
1653 1654 1655
      lock_sys.wr_unlock();
      return;
    }
1656 1657 1658 1659 1660 1661
  }
  ut_ad(wait_lock->is_waiting());
  ut_ad(!(wait_lock->type_mode & LOCK_AUTO_INC));

  if (wait_lock->is_table())
  {
1662 1663 1664 1665 1666
    dict_table_t *table= wait_lock->un_member.tab_lock.table;
    for (lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock;
         lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
      if (!(lock->type_mode & LOCK_AUTO_INC) && lock->trx != trx)
        thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
1667
  }
1668
  else
1669
  {
1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683
    const page_id_t id{wait_lock->un_member.rec_lock.page_id};
    hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                         ? lock_sys.prdt_hash : lock_sys.rec_hash).cell_get
      (id.fold());
    if (lock_t *lock= lock_sys_t::get_first(cell, id))
    {
      const ulint heap_no= lock_rec_find_set_bit(wait_lock);
      if (!lock_rec_get_nth_bit(lock, heap_no))
        lock= lock_rec_get_next(heap_no, lock);
      do
        if (lock->trx->mysql_thd != thd)
          thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
      while ((lock= lock_rec_get_next(heap_no, lock)));
    }
1684 1685 1686 1687 1688 1689
  }

  goto func_exit;
}
#endif /* HAVE_REPLICATION */

1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709
/** Wait for a lock to be released.
@retval DB_DEADLOCK if this transaction was chosen as the deadlock victim
@retval DB_INTERRUPTED if the execution was interrupted by the user
@retval DB_LOCK_WAIT_TIMEOUT if the lock wait timed out
@retval DB_SUCCESS if the lock was granted */
dberr_t lock_wait(que_thr_t *thr)
{
  trx_t *trx= thr_get_trx(thr);

  if (trx->mysql_thd)
    DEBUG_SYNC_C("lock_wait_suspend_thread_enter");

  /* InnoDB system transactions may use the global value of
  innodb_lock_wait_timeout, because trx->mysql_thd == NULL. */
  const ulong innodb_lock_wait_timeout= trx_lock_wait_timeout_get(trx);
  const bool no_timeout= innodb_lock_wait_timeout > 100000000;
  const my_hrtime_t suspend_time= my_hrtime_coarse();
  ut_ad(!trx->dict_operation_lock_mode ||
        trx->dict_operation_lock_mode == RW_S_LATCH);

1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727
  /* The wait_lock can be cleared by another thread in lock_grant(),
  lock_rec_cancel(), or lock_cancel_waiting_and_release(). But, a wait
  can only be initiated by the current thread which owns the transaction.

  Even if trx->lock.wait_lock were changed, the object that it used to
  point to it will remain valid memory (remain allocated from
  trx->lock.lock_heap). If trx->lock.wait_lock was set to nullptr, the
  original object could be transformed to a granted lock. On a page
  split or merge, we would change trx->lock.wait_lock to point to
  another waiting lock request object, and the old object would be
  logically discarded.

  In any case, it is safe to read the memory that wait_lock points to,
  even though we are not holding any mutex. We are only reading
  wait_lock->type_mode & (LOCK_TABLE | LOCK_AUTO_INC), which will be
  unaffected by any page split or merge operation. (Furthermore,
  table lock objects will never be cloned or moved.) */
  const lock_t *const wait_lock= trx->lock.wait_lock;
1728

1729
  if (!wait_lock)
1730 1731
  {
    /* The lock has already been released or this transaction
1732 1733
    was chosen as a deadlock victim: no need to wait */
    if (trx->lock.was_chosen_as_deadlock_victim.fetch_and(byte(~1)))
1734
      trx->error_state= DB_DEADLOCK;
1735 1736
    else
      trx->error_state= DB_SUCCESS;
1737 1738 1739 1740 1741 1742

    return trx->error_state;
  }

  trx->lock.suspend_time= suspend_time;

1743 1744 1745
  const auto had_dict_lock= trx->dict_operation_lock_mode;
  if (had_dict_lock) /* Release foreign key check latch */
    row_mysql_unfreeze_data_dictionary(trx);
1746

1747
  IF_WSREP(if (trx->is_wsrep()) lock_wait_wsrep(trx),);
1748

1749
  const auto type_mode= wait_lock->type_mode;
1750
#ifdef HAVE_REPLICATION
1751 1752 1753 1754 1755 1756 1757 1758
  /* Even though lock_wait_rpl_report() has nothing to do with
  deadlock detection, it was always disabled by innodb_deadlock_detect=OFF.
  We will keep it in that way, because unfortunately
  thd_need_wait_reports() will hold even if parallel (or any) replication
  is not being used. We want to be allow the user to skip
  lock_wait_rpl_report(). */
  const bool rpl= !(type_mode & LOCK_AUTO_INC) && trx->mysql_thd &&
    innodb_deadlock_detect && thd_need_wait_reports(trx->mysql_thd);
1759
#endif
1760 1761 1762 1763 1764
  const bool row_lock_wait= thr->lock_state == QUE_THR_LOCK_ROW;
  timespec abstime;
  set_timespec_time_nsec(abstime, suspend_time.val * 1000);
  abstime.MY_tv_sec+= innodb_lock_wait_timeout;
  thd_wait_begin(trx->mysql_thd, (type_mode & LOCK_TABLE)
1765
                 ? THD_WAIT_TABLE_LOCK : THD_WAIT_ROW_LOCK);
1766
  dberr_t error_state= DB_SUCCESS;
1767

1768 1769 1770
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (trx->lock.wait_lock)
  {
1771 1772
    if (Deadlock::check_and_resolve(trx))
    {
1773 1774
      ut_ad(!trx->lock.wait_lock);
      error_state= DB_DEADLOCK;
1775
      goto end_wait;
1776
    }
1777 1778 1779 1780 1781 1782
  }
  else
    goto end_wait;

  if (row_lock_wait)
    lock_sys.wait_start();
1783 1784

#ifdef HAVE_REPLICATION
1785 1786
  if (rpl)
    lock_wait_rpl_report(trx);
1787
#endif
1788

1789 1790 1791 1792 1793 1794 1795
  trx->error_state= DB_SUCCESS;

  while (trx->lock.wait_lock)
  {
    int err;

    if (no_timeout)
1796
    {
1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816
      my_cond_wait(&trx->lock.cond, &lock_sys.wait_mutex.m_mutex);
      err= 0;
    }
    else
      err= my_cond_timedwait(&trx->lock.cond, &lock_sys.wait_mutex.m_mutex,
                             &abstime);
    error_state= trx->error_state;
    switch (error_state) {
    case DB_DEADLOCK:
    case DB_INTERRUPTED:
      break;
    default:
      ut_ad(error_state != DB_LOCK_WAIT_TIMEOUT);
      if (trx_is_interrupted(trx))
        /* innobase_kill_query() can only set trx->error_state=DB_INTERRUPTED
        for any transaction that is attached to a connection. */
        error_state= DB_INTERRUPTED;
      else if (!err)
        continue;
#ifdef WITH_WSREP
1817
      else if (trx->is_wsrep() && wsrep_is_BF_lock_timeout(*trx));
1818
#endif
1819
      else
1820 1821 1822
      {
        error_state= DB_LOCK_WAIT_TIMEOUT;
        lock_sys.timeouts++;
1823 1824
      }
    }
1825
    break;
1826 1827 1828 1829 1830
  }

  if (row_lock_wait)
    lock_sys.wait_resume(trx->mysql_thd, suspend_time, my_hrtime_coarse());

1831
end_wait:
1832
  if (lock_t *lock= trx->lock.wait_lock)
1833
  {
1834
    lock_sys_t::cancel(trx, lock, false);
1835
    lock_sys.deadlock_check();
1836 1837
  }

1838 1839 1840
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  thd_wait_end(trx->mysql_thd);

1841 1842 1843 1844 1845
  if (had_dict_lock)
    row_mysql_freeze_data_dictionary(trx);

  trx->error_state= error_state;
  return error_state;
1846
}
1847

1848

1849 1850 1851 1852
/** Resume a lock wait */
static void lock_wait_end(trx_t *trx)
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
1853
  ut_ad(trx->mutex_is_owner());
1854 1855
  ut_ad(trx->state == TRX_STATE_ACTIVE);
  ut_ad(trx->lock.wait_thr);
1856

1857
  if (trx->lock.was_chosen_as_deadlock_victim.fetch_and(byte(~1)))
1858
    trx->error_state= DB_DEADLOCK;
1859

1860
  trx->lock.wait_thr= nullptr;
1861
  pthread_cond_signal(&trx->lock.cond);
1862
}
1863

1864 1865 1866 1867 1868
/** Grant a waiting lock request and release the waiting transaction. */
static void lock_grant(lock_t *lock)
{
  lock_reset_lock_and_trx_wait(lock);
  trx_t *trx= lock->trx;
1869
  trx->mutex_lock();
1870 1871 1872 1873 1874 1875 1876
  if (lock->mode() == LOCK_AUTO_INC)
  {
    dict_table_t *table= lock->un_member.tab_lock.table;
    ut_ad(!table->autoinc_trx);
    table->autoinc_trx= trx;
    ib_vector_push(trx->autoinc_locks, &lock);
  }
1877

1878
  DBUG_PRINT("ib_lock", ("wait for trx " TRX_ID_FMT " ends", trx->id));
1879

1880 1881
  /* If we are resolving a deadlock by choosing another transaction as
  a victim, then our original transaction may not be waiting anymore */
1882

1883 1884
  if (trx->lock.wait_thr)
    lock_wait_end(trx);
1885

1886
  trx->mutex_unlock();
1887 1888 1889 1890 1891 1892
}

/*************************************************************//**
Cancels a waiting record lock request and releases the waiting transaction
that requested it. NOTE: does NOT check if waiting lock requests behind this
one can now be granted! */
1893
static void lock_rec_cancel(lock_t *lock)
1894
{
1895 1896 1897
  trx_t *trx= lock->trx;
  mysql_mutex_lock(&lock_sys.wait_mutex);
  trx->mutex_lock();
1898

1899 1900
  ut_d(lock_sys.hash_get(lock->type_mode).
       assert_locked(lock->un_member.rec_lock.page_id));
1901 1902
  /* Reset the bit (there can be only one set bit) in the lock bitmap */
  lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
1903

1904 1905
  /* Reset the wait flag and the back pointer to lock in trx */
  lock_reset_lock_and_trx_wait(lock);
1906

1907 1908 1909 1910
  /* The following releases the trx from lock wait */
  lock_wait_end(trx);
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  trx->mutex_unlock();
1911 1912
}

1913 1914 1915
/** Remove a record lock request, waiting or granted, from the queue and
grant locks to other transactions in the queue if they now are entitled
to a lock. NOTE: all record locks contained in in_lock are removed.
1916 1917 1918
@param[in,out]	in_lock		record lock
@param[in]	owns_wait_mutex	whether lock_sys.wait_mutex is held */
static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex)
1919
{
1920 1921 1922
#ifdef SAFE_MUTEX
	ut_ad(owns_wait_mutex == mysql_mutex_is_owner(&lock_sys.wait_mutex));
#endif /* SAFE_MUTEX */
1923
	ut_ad(!in_lock->is_table());
1924

1925
	const page_id_t page_id{in_lock->un_member.rec_lock.page_id};
1926 1927
	auto& lock_hash = lock_sys.hash_get(in_lock->type_mode);
	ut_ad(lock_sys.is_writer() || in_lock->trx->mutex_is_owner());
1928

1929
	ut_d(auto old_n_locks=)
1930
	in_lock->index->table->n_rec_locks--;
1931
	ut_ad(old_n_locks);
1932

1933
	const ulint rec_fold = page_id.fold();
1934 1935
	hash_cell_t &cell = *lock_hash.cell_get(rec_fold);
	lock_sys.assert_locked(cell);
1936

1937 1938
	HASH_DELETE(lock_t, hash, &lock_hash, rec_fold, in_lock);
	ut_ad(lock_sys.is_writer() || in_lock->trx->mutex_is_owner());
1939
	UT_LIST_REMOVE(in_lock->trx->lock.trx_locks, in_lock);
1940 1941 1942

	MONITOR_INC(MONITOR_RECLOCK_REMOVED);
	MONITOR_DEC(MONITOR_NUM_RECLOCK);
1943

1944 1945
	bool acquired = false;

1946 1947 1948
	/* Check if waiting locks in the queue can now be granted:
	grant locks if there are no conflicting locks ahead. Stop at
	the first X lock that is waiting or has been granted. */
1949

1950
	for (lock_t* lock = lock_sys_t::get_first(cell, page_id);
1951 1952
	     lock != NULL;
	     lock = lock_rec_get_next_on_page(lock)) {
1953

1954
		if (!lock->is_waiting()) {
1955 1956
			continue;
		}
1957

1958 1959 1960 1961
		if (!owns_wait_mutex) {
			mysql_mutex_lock(&lock_sys.wait_mutex);
			acquired = owns_wait_mutex = true;
		}
1962 1963 1964 1965

		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

1966 1967
		if (const lock_t* c = lock_rec_has_to_wait_in_queue(
			    cell, lock)) {
1968 1969 1970 1971 1972 1973 1974 1975
			trx_t* c_trx = c->trx;
			lock->trx->lock.wait_trx = c_trx;
			if (c_trx->lock.wait_trx
			    && innodb_deadlock_detect
			    && Deadlock::to_check.emplace(c_trx).second) {
				Deadlock::to_be_checked = true;
			}
		} else {
1976 1977 1978
			/* Grant the lock */
			ut_ad(lock->trx != in_lock->trx);
			lock_grant(lock);
1979
		}
1980
	}
1981 1982 1983 1984

	if (acquired) {
		mysql_mutex_unlock(&lock_sys.wait_mutex);
	}
1985 1986
}

1987 1988 1989 1990
/** Remove a record lock request, waiting or granted, on a discarded page
@param hash     hash table
@param in_lock  lock object */
void lock_rec_discard(lock_sys_t::hash_table &lock_hash, lock_t *in_lock)
1991
{
1992 1993
  ut_ad(!in_lock->is_table());
  lock_hash.assert_locked(in_lock->un_member.rec_lock.page_id);
1994

1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005
  HASH_DELETE(lock_t, hash, &lock_hash,
              in_lock->un_member.rec_lock.page_id.fold(), in_lock);
  trx_t *trx= in_lock->trx;
  trx->mutex_lock();
  ut_d(auto old_locks=)
  in_lock->index->table->n_rec_locks--;
  ut_ad(old_locks);
  UT_LIST_REMOVE(trx->lock.trx_locks, in_lock);
  trx->mutex_unlock();
  MONITOR_INC(MONITOR_RECLOCK_REMOVED);
  MONITOR_DEC(MONITOR_NUM_RECLOCK);
2006 2007 2008 2009 2010 2011
}

/*************************************************************//**
Removes record lock objects set on an index page which is discarded. This
function does not move locks, or check for waiting locks, therefore the
lock bitmaps must already be reset when this function is called. */
2012
static void
2013
lock_rec_free_all_from_discard_page(page_id_t id, const hash_cell_t &cell,
2014
                                    lock_sys_t::hash_table &lock_hash)
2015
{
2016
  for (lock_t *lock= lock_sys_t::get_first(cell, id); lock; )
2017
  {
2018 2019
    ut_ad(&lock_hash != &lock_sys.rec_hash ||
          lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
2020
    ut_ad(!lock->is_waiting());
2021
    lock_t *next_lock= lock_rec_get_next_on_page(lock);
2022
    lock_rec_discard(lock_hash, lock);
2023 2024
    lock= next_lock;
  }
2025 2026
}

2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051
/** Discard locks for an index */
void lock_discard_for_index(const dict_index_t &index)
{
  ut_ad(!index.is_committed());
  lock_sys.wr_lock(SRW_LOCK_CALL);
  const ulint n= lock_sys.rec_hash.pad(lock_sys.rec_hash.n_cells);
  for (ulint i= 0; i < n; i++)
  {
    for (lock_t *lock= static_cast<lock_t*>(lock_sys.rec_hash.array[i].node);
         lock; )
    {
      ut_ad(!lock->is_table());
      if (lock->index == &index)
      {
        ut_ad(!lock->is_waiting());
        lock_rec_discard(lock_sys.rec_hash, lock);
        lock= static_cast<lock_t*>(lock_sys.rec_hash.array[i].node);
      }
      else
        lock= lock->hash;
    }
  }
  lock_sys.wr_unlock();
}

2052 2053 2054 2055 2056 2057 2058
/*============= RECORD LOCK MOVING AND INHERITING ===================*/

/*************************************************************//**
Resets the lock bits for a single record. Releases transactions waiting for
lock requests here. */
static
void
2059 2060
lock_rec_reset_and_release_wait(const hash_cell_t &cell, const page_id_t id,
                                ulint heap_no)
2061
{
2062
  for (lock_t *lock= lock_sys.get_first(cell, id, heap_no); lock;
2063
       lock= lock_rec_get_next(heap_no, lock))
2064
  {
2065 2066 2067
    if (lock->is_waiting())
      lock_rec_cancel(lock);
    else
2068 2069 2070
    {
      trx_t *lock_trx= lock->trx;
      lock_trx->mutex_lock();
2071
      lock_rec_reset_nth_bit(lock, heap_no);
2072 2073 2074
      lock_trx->mutex_unlock();
    }
  }
2075 2076
}

2077 2078 2079 2080 2081 2082 2083 2084 2085
/*************************************************************//**
Makes a record to inherit the locks (except LOCK_INSERT_INTENTION type)
of another record as gap type locks, but does not reset the lock bits of
the other record. Also waiting lock requests on rec are inherited as
GRANTED gap locks. */
static
void
lock_rec_inherit_to_gap(
/*====================*/
2086
	hash_cell_t&		heir_cell,	/*!< heir hash table cell */
2087
	const page_id_t		heir,		/*!< in: page containing the
2088
						record which inherits */
2089 2090
	const hash_cell_t&	donor_cell,	/*!< donor hash table cell */
	const page_id_t		donor,		/*!< in: page containing the
2091 2092 2093
						record from which inherited;
						does NOT reset the locks on
						this record */
2094
	const page_t*		heir_page,	/*!< in: heir page frame */
2095 2096 2097 2098 2099
	ulint			heir_heap_no,	/*!< in: heap_no of the
						inheriting record */
	ulint			heap_no)	/*!< in: heap_no of the
						donating record */
{
2100 2101
	/* At READ UNCOMMITTED or READ COMMITTED isolation level,
	we do not want locks set
2102
	by an UPDATE or a DELETE to be inherited as gap type locks. But we
Sergei Golubchik's avatar
Sergei Golubchik committed
2103
	DO want S-locks/X-locks(taken for replace) set by a consistency
2104
	constraint to be inherited also then. */
2105

2106
	for (lock_t* lock= lock_sys_t::get_first(donor_cell, donor, heap_no);
2107
	     lock;
2108
	     lock = lock_rec_get_next(heap_no, lock)) {
2109
		trx_t* lock_trx = lock->trx;
2110
		if (!lock->is_insert_intention()
2111
		    && (lock_trx->isolation_level > TRX_ISO_READ_COMMITTED
2112
			|| lock->mode() !=
2113
			(lock_trx->duplicates ? LOCK_S : LOCK_X))) {
2114
			lock_rec_add_to_queue(LOCK_GAP | lock->mode(),
2115
					      heir_cell, heir, heir_page,
2116
					      heir_heap_no,
2117
					      lock->index, lock_trx, false);
2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137
		}
	}
}

/*************************************************************//**
Makes a record to inherit the gap locks (except LOCK_INSERT_INTENTION type)
of another record as gap type locks, but does not reset the lock bits of the
other record. Also waiting lock requests are inherited as GRANTED gap locks. */
static
void
lock_rec_inherit_to_gap_if_gap_lock(
/*================================*/
	const buf_block_t*	block,		/*!< in: buffer block */
	ulint			heir_heap_no,	/*!< in: heap_no of
						record which inherits */
	ulint			heap_no)	/*!< in: heap_no of record
						from which inherited;
						does NOT reset the locks
						on this record */
{
2138
  const page_id_t id{block->page.id()};
2139
  LockGuard g{lock_sys.rec_hash, id};
2140

2141
  for (lock_t *lock= lock_sys_t::get_first(g.cell(), id, heap_no); lock;
2142
       lock= lock_rec_get_next(heap_no, lock))
2143 2144 2145
     if (!lock->is_insert_intention() && (heap_no == PAGE_HEAP_NO_SUPREMUM ||
                                          !lock->is_record_not_gap()) &&
         !lock_table_has(lock->trx, lock->index->table, LOCK_X))
2146 2147
       lock_rec_add_to_queue(LOCK_GAP | lock->mode(),
                             g.cell(), id, block->frame,
2148
                             heir_heap_no, lock->index, lock->trx, false);
2149 2150 2151 2152 2153
}

/*************************************************************//**
Moves the locks of a record to another record and resets the lock bits of
the donating record. */
2154
static
2155
void
2156 2157
lock_rec_move(
	hash_cell_t&		receiver_cell,	/*!< in: hash table cell */
2158
	const buf_block_t&	receiver,	/*!< in: buffer block containing
2159
						the receiving record */
2160 2161
	const page_id_t		receiver_id,	/*!< in: page identifier */
	const hash_cell_t&	donator_cell,	/*!< in: hash table cell */
2162
	const page_id_t		donator_id,	/*!< in: page identifier of
2163 2164 2165 2166 2167 2168 2169 2170
						the donating record */
	ulint			receiver_heap_no,/*!< in: heap_no of the record
						which gets the locks; there
						must be no lock requests
						on it! */
	ulint			donator_heap_no)/*!< in: heap_no of the record
						which gives the locks */
{
2171 2172
	ut_ad(!lock_sys_t::get_first(receiver_cell,
				     receiver_id, receiver_heap_no));
2173

2174 2175
	for (lock_t *lock = lock_sys_t::get_first(donator_cell, donator_id,
						  donator_heap_no);
2176 2177
	     lock != NULL;
	     lock = lock_rec_get_next(donator_heap_no, lock)) {
2178
		const auto type_mode = lock->type_mode;
2179
		if (type_mode & LOCK_WAIT) {
2180 2181
			ut_ad(lock->trx->lock.wait_lock == lock);
			lock->type_mode &= ~LOCK_WAIT;
2182 2183
		}

2184 2185 2186 2187
		trx_t* lock_trx = lock->trx;
		lock_trx->mutex_lock();
		lock_rec_reset_nth_bit(lock, donator_heap_no);

2188
		/* Note that we FIRST reset the bit, and then set the lock:
2189
		the function works also if donator_id == receiver_id */
2190

2191 2192
		lock_rec_add_to_queue(type_mode, receiver_cell,
				      receiver_id, receiver.frame,
2193
				      receiver_heap_no,
2194 2195
				      lock->index, lock_trx, true);
		lock_trx->mutex_unlock();
2196 2197
	}

2198 2199
	ut_ad(!lock_sys_t::get_first(donator_cell, donator_id,
				     donator_heap_no));
2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228
}

/** Move all the granted locks to the front of the given lock list.
All the waiting locks will be at the end of the list.
@param[in,out]	lock_list	the given lock list.  */
static
void
lock_move_granted_locks_to_front(
	UT_LIST_BASE_NODE_T(lock_t)&	lock_list)
{
	lock_t*	lock;

	bool seen_waiting_lock = false;

	for (lock = UT_LIST_GET_FIRST(lock_list); lock;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {

		if (!seen_waiting_lock) {
			if (lock->is_waiting()) {
				seen_waiting_lock = true;
			}
			continue;
		}

		ut_ad(seen_waiting_lock);

		if (!lock->is_waiting()) {
			lock_t* prev = UT_LIST_GET_PREV(trx_locks, lock);
			ut_a(prev);
Marko Mäkelä's avatar
Marko Mäkelä committed
2229
			ut_list_move_to_front(lock_list, lock);
2230 2231 2232
			lock = prev;
		}
	}
2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247
}

/*************************************************************//**
Updates the lock table when we have reorganized a page. NOTE: we copy
also the locks set on the infimum of the page; the infimum may carry
locks if an update of a record is occurring on the page, and its locks
were temporarily stored on the infimum. */
void
lock_move_reorganize_page(
/*======================*/
	const buf_block_t*	block,	/*!< in: old index page, now
					reorganized */
	const buf_block_t*	oblock)	/*!< in: copy of the old, not
					reorganized page */
{
2248
  mem_heap_t *heap;
2249

2250 2251 2252
  {
    UT_LIST_BASE_NODE_T(lock_t) old_locks;
    UT_LIST_INIT(old_locks, &lock_t::trx_locks);
2253

2254
    const page_id_t id{block->page.id()};
2255
    const auto id_fold= id.fold();
2256 2257
    {
      LockGuard g{lock_sys.rec_hash, id};
2258
      if (!lock_sys_t::get_first(g.cell(), id))
2259 2260 2261 2262
        return;
    }

    /* We will modify arbitrary trx->lock.trx_locks. */
2263
    LockMutexGuard g{SRW_LOCK_CALL};
2264
    hash_cell_t &cell= *lock_sys.rec_hash.cell_get(id_fold);
2265

2266 2267 2268
    /* Note: Predicate locks for SPATIAL INDEX are not affected by
    page reorganize, because they do not refer to individual record
    heap numbers. */
2269
    lock_t *lock= lock_sys_t::get_first(cell, id);
2270

2271 2272
    if (!lock)
      return;
2273

2274
    heap= mem_heap_create(256);
2275

2276 2277 2278
    /* Copy first all the locks on the page to heap and reset the
    bitmaps in the original locks; chain the copies of the locks
    using the trx_locks field in them. */
2279

2280 2281 2282 2283
    do
    {
      /* Make a copy of the lock */
      lock_t *old_lock= lock_rec_copy(lock, heap);
2284

2285
      UT_LIST_ADD_LAST(old_locks, old_lock);
2286

2287 2288
      /* Reset bitmap of lock */
      lock_rec_bitmap_reset(lock);
2289

2290 2291 2292 2293 2294
      if (lock->is_waiting())
      {
        ut_ad(lock->trx->lock.wait_lock == lock);
        lock->type_mode&= ~LOCK_WAIT;
      }
2295

2296 2297 2298
      lock= lock_rec_get_next_on_page(lock);
    }
    while (lock);
2299

2300 2301
    const ulint comp= page_is_comp(block->frame);
    ut_ad(comp == page_is_comp(oblock->frame));
2302

2303
    lock_move_granted_locks_to_front(old_locks);
2304

2305 2306
    DBUG_EXECUTE_IF("do_lock_reverse_page_reorganize",
                    ut_list_reverse(old_locks););
2307

2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324
    for (lock= UT_LIST_GET_FIRST(old_locks); lock;
         lock= UT_LIST_GET_NEXT(trx_locks, lock))
    {
      /* NOTE: we copy also the locks set on the infimum and
      supremum of the page; the infimum may carry locks if an
      update of a record is occurring on the page, and its locks
      were temporarily stored on the infimum */
      const rec_t *rec1= page_get_infimum_rec(block->frame);
      const rec_t *rec2= page_get_infimum_rec(oblock->frame);

      /* Set locks according to old locks */
      for (;;)
      {
        ulint old_heap_no;
        ulint new_heap_no;
        ut_d(const rec_t* const orec= rec1);
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
2325

2326 2327 2328 2329
        if (comp)
        {
          old_heap_no= rec_get_heap_no_new(rec2);
          new_heap_no= rec_get_heap_no_new(rec1);
2330

2331 2332 2333 2334 2335 2336 2337 2338
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          old_heap_no= rec_get_heap_no_old(rec2);
          new_heap_no= rec_get_heap_no_old(rec1);
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
2339

2340 2341 2342
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2343

2344 2345 2346
        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();

2347 2348 2349 2350 2351
        /* Clear the bit in old_lock. */
        if (old_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, old_heap_no))
        {
          ut_ad(!page_rec_is_metadata(orec));
2352

2353 2354
          /* NOTE that the old lock bitmap could be too
          small for the new heap number! */
2355 2356
          lock_rec_add_to_queue(lock->type_mode, cell, id, block->frame,
                                new_heap_no, lock->index, lock_trx, true);
2357
        }
2358

2359 2360
        lock_trx->mutex_unlock();

2361 2362 2363 2364 2365 2366
        if (new_heap_no == PAGE_HEAP_NO_SUPREMUM)
        {
           ut_ad(old_heap_no == PAGE_HEAP_NO_SUPREMUM);
           break;
        }
      }
2367

2368 2369 2370
      ut_ad(lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
    }
  }
2371

2372
  mem_heap_free(heap);
2373 2374

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2375 2376 2377 2378 2379
  if (fil_space_t *space= fil_space_t::get(id.space()))
  {
    ut_ad(lock_rec_validate_page(block, space->is_latched()));
    space->release();
  }
2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393
#endif
}

/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list end is moved to another page. */
void
lock_move_rec_list_end(
/*===================*/
	const buf_block_t*	new_block,	/*!< in: index page to move to */
	const buf_block_t*	block,		/*!< in: index page */
	const rec_t*		rec)		/*!< in: record on page: this
						is the first record moved */
{
2394
  const ulint comp= page_rec_is_comp(rec);
2395

2396 2397
  ut_ad(block->frame == page_align(rec));
  ut_ad(comp == page_is_comp(new_block->frame));
2398

2399 2400 2401
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
  {
2402
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2403

2404 2405 2406 2407 2408
    /* Note: when we move locks from record to record, waiting locks
    and possible granted gap type locks behind them are enqueued in
    the original order, because new elements are inserted to a hash
    table to the end of the hash chain, and lock_rec_add_to_queue
    does not reuse locks if there are waiters in the queue. */
2409
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2410 2411 2412 2413 2414
         lock= lock_rec_get_next_on_page(lock))
    {
      const rec_t *rec1= rec;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2415

2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427
      if (comp)
      {
        if (page_offset(rec1) == PAGE_NEW_INFIMUM)
          rec1= page_rec_get_next_low(rec1, TRUE);
        rec2= page_rec_get_next_low(new_block->frame + PAGE_NEW_INFIMUM, TRUE);
      }
      else
      {
        if (page_offset(rec1) == PAGE_OLD_INFIMUM)
          rec1= page_rec_get_next_low(rec1, FALSE);
        rec2= page_rec_get_next_low(new_block->frame + PAGE_OLD_INFIMUM,FALSE);
      }
2428

2429 2430 2431 2432 2433 2434
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
      for (;;)
      {
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
        ut_d(const rec_t* const orec= rec1);
2435

2436 2437
        ulint rec1_heap_no;
        ulint rec2_heap_no;
2438

2439 2440 2441 2442 2443
        if (comp)
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
          if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM)
            break;
2444

2445 2446 2447 2448 2449 2450 2451
          rec2_heap_no= rec_get_heap_no_new(rec2);
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
2452

2453 2454 2455
          if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM)
            break;
          rec2_heap_no= rec_get_heap_no_old(rec2);
2456

2457 2458
          ut_ad(rec_get_data_size_old(rec1) == rec_get_data_size_old(rec2));
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec1)));
2459

2460 2461 2462
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2463

2464 2465 2466
        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();

2467 2468 2469 2470
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
        {
          ut_ad(!page_rec_is_metadata(orec));
2471

2472 2473
          if (type_mode & LOCK_WAIT)
          {
2474
            ut_ad(lock_trx->lock.wait_lock == lock);
2475 2476
            lock->type_mode&= ~LOCK_WAIT;
          }
2477

2478
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id, new_block->frame,
2479
                                rec2_heap_no, lock->index, lock_trx, true);
2480
        }
2481 2482

        lock_trx->mutex_unlock();
2483 2484 2485
      }
    }
  }
2486 2487

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2488 2489 2490 2491 2492 2493 2494
  if (fil_space_t *space= fil_space_t::get(id.space()))
  {
    const bool is_latched{space->is_latched()};
    ut_ad(lock_rec_validate_page(block, is_latched));
    ut_ad(lock_rec_validate_page(new_block, is_latched));
    space->release();
  }
2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515
#endif
}

/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list start is moved to another page. */
void
lock_move_rec_list_start(
/*=====================*/
	const buf_block_t*	new_block,	/*!< in: index page to
						move to */
	const buf_block_t*	block,		/*!< in: index page */
	const rec_t*		rec,		/*!< in: record on page:
						this is the first
						record NOT copied */
	const rec_t*		old_end)	/*!< in: old
						previous-to-last
						record on new_page
						before the records
						were copied */
{
2516
  const ulint comp= page_rec_is_comp(rec);
2517

2518 2519 2520 2521 2522 2523
  ut_ad(block->frame == page_align(rec));
  ut_ad(comp == page_is_comp(new_block->frame));
  ut_ad(new_block->frame == page_align(old_end));
  ut_ad(!page_rec_is_metadata(rec));
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
2524

2525
  {
2526
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2527

2528
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2529 2530 2531 2532 2533
         lock= lock_rec_get_next_on_page(lock))
    {
      const rec_t *rec1;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2534

2535 2536 2537 2538 2539 2540 2541 2542 2543 2544
      if (comp)
      {
        rec1= page_rec_get_next_low(block->frame + PAGE_NEW_INFIMUM, TRUE);
        rec2= page_rec_get_next_low(old_end, TRUE);
      }
      else
      {
        rec1= page_rec_get_next_low(block->frame + PAGE_OLD_INFIMUM, FALSE);
        rec2= page_rec_get_next_low(old_end, FALSE);
      }
2545

2546 2547
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
2548

2549 2550 2551 2552
      while (rec1 != rec)
      {
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
        ut_d(const rec_t* const prev= rec1);
2553

2554 2555
        ulint rec1_heap_no;
        ulint rec2_heap_no;
2556

2557 2558 2559 2560
        if (comp)
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
          rec2_heap_no= rec_get_heap_no_new(rec2);
2561

2562 2563 2564 2565 2566 2567 2568
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
          rec2_heap_no= rec_get_heap_no_old(rec2);
2569

2570
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
2571

2572 2573 2574
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2575

2576 2577 2578
        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();

2579 2580 2581 2582
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
        {
          ut_ad(!page_rec_is_metadata(prev));
2583

2584 2585
          if (type_mode & LOCK_WAIT)
          {
2586
            ut_ad(lock_trx->lock.wait_lock == lock);
2587 2588 2589
            lock->type_mode&= ~LOCK_WAIT;
          }

2590
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id, new_block->frame,
2591
                                rec2_heap_no, lock->index, lock_trx, true);
2592
        }
2593 2594

        lock_trx->mutex_unlock();
2595
      }
2596 2597

#ifdef UNIV_DEBUG
2598 2599 2600
      if (page_rec_is_supremum(rec))
        for (auto i= lock_rec_get_n_bits(lock); --i > PAGE_HEAP_NO_USER_LOW; )
          ut_ad(!lock_rec_get_nth_bit(lock, i));
2601
#endif /* UNIV_DEBUG */
2602 2603
    }
  }
2604 2605

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2606
  ut_ad(lock_rec_validate_page(block));
2607 2608 2609
#endif
}

2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622
/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list start is moved to another page. */
void
lock_rtr_move_rec_list(
/*===================*/
	const buf_block_t*	new_block,	/*!< in: index page to
						move to */
	const buf_block_t*	block,		/*!< in: index page */
	rtr_rec_move_t*		rec_move,       /*!< in: recording records
						moved */
	ulint			num_move)       /*!< in: num of rec to move */
{
2623 2624
  if (!num_move)
    return;
2625

2626
  const ulint comp= page_rec_is_comp(rec_move[0].old_rec);
2627

2628 2629 2630 2631 2632
  ut_ad(block->frame == page_align(rec_move[0].old_rec));
  ut_ad(new_block->frame == page_align(rec_move[0].new_rec));
  ut_ad(comp == page_rec_is_comp(rec_move[0].new_rec));
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
2633

2634
  {
2635
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2636

2637
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2638
         lock= lock_rec_get_next_on_page(lock))
2639 2640 2641 2642
    {
      const rec_t *rec1;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2643

2644 2645
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
2646

2647 2648 2649
      for (ulint moved= 0; moved < num_move; moved++)
      {
        ulint rec1_heap_no;
2650
        ulint rec2_heap_no;
2651

2652 2653 2654 2655
        rec1= rec_move[moved].old_rec;
        rec2= rec_move[moved].new_rec;
        ut_ad(!page_rec_is_metadata(rec1));
        ut_ad(!page_rec_is_metadata(rec2));
2656

2657
        if (comp)
2658 2659
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
2660 2661 2662
          rec2_heap_no= rec_get_heap_no_new(rec2);
        }
        else
2663 2664
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
2665
          rec2_heap_no= rec_get_heap_no_old(rec2);
2666

2667 2668 2669 2670 2671
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
        }

        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();
2672

2673 2674
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
2675 2676 2677
        {
          if (type_mode & LOCK_WAIT)
          {
2678
            ut_ad(lock_trx->lock.wait_lock == lock);
2679 2680
            lock->type_mode&= ~LOCK_WAIT;
          }
2681

2682
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id, new_block->frame,
2683
                                rec2_heap_no, lock->index, lock_trx, true);
2684

2685 2686 2687 2688
          rec_move[moved].moved= true;
        }

        lock_trx->mutex_unlock();
2689 2690 2691
      }
    }
  }
2692 2693

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2694
  ut_ad(lock_rec_validate_page(block));
2695 2696
#endif
}
2697 2698 2699 2700 2701 2702 2703 2704
/*************************************************************//**
Updates the lock table when a page is split to the right. */
void
lock_update_split_right(
/*====================*/
	const buf_block_t*	right_block,	/*!< in: right page */
	const buf_block_t*	left_block)	/*!< in: left page */
{
2705 2706 2707
  const ulint h= lock_get_min_heap_no(right_block);
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
2708

2709
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2710

2711 2712
  /* Move the locks on the supremum of the left page to the supremum
  of the right page */
2713

2714 2715
  lock_rec_move(g.cell2(), *right_block, r, g.cell1(), l,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
2716

2717 2718
  /* Inherit the locks to the supremum of left page from the successor
  of the infimum on right page */
2719 2720
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->frame,
                          PAGE_HEAP_NO_SUPREMUM, h);
2721 2722
}

2723 2724 2725 2726
#ifdef UNIV_DEBUG
static void lock_assert_no_spatial(const page_id_t id)
{
  const auto id_fold= id.fold();
2727 2728
  auto cell= lock_sys.prdt_page_hash.cell_get(id_fold);
  auto latch= lock_sys_t::hash_table::latch(cell);
2729 2730 2731
  latch->acquire();
  /* there should exist no page lock on the left page,
  otherwise, it will be blocked from merge */
2732
  ut_ad(!lock_sys_t::get_first(*cell, id));
2733
  latch->release();
2734 2735
  cell= lock_sys.prdt_hash.cell_get(id_fold);
  latch= lock_sys_t::hash_table::latch(cell);
2736
  latch->acquire();
2737
  ut_ad(!lock_sys_t::get_first(*cell, id));
2738 2739 2740 2741
  latch->release();
}
#endif

2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756
/*************************************************************//**
Updates the lock table when a page is merged to the right. */
void
lock_update_merge_right(
/*====================*/
	const buf_block_t*	right_block,	/*!< in: right page to
						which merged */
	const rec_t*		orig_succ,	/*!< in: original
						successor of infimum
						on the right page
						before merge */
	const buf_block_t*	left_block)	/*!< in: merged index
						page which will be
						discarded */
{
2757
  ut_ad(!page_rec_is_metadata(orig_succ));
2758

2759 2760 2761
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2762

2763 2764 2765
  /* Inherit the locks from the supremum of the left page to the
  original successor of infimum on the right page, to which the left
  page was merged */
2766
  lock_rec_inherit_to_gap(g.cell2(), r, g.cell1(), l, right_block->frame,
2767 2768
                          page_rec_get_heap_no(orig_succ),
                          PAGE_HEAP_NO_SUPREMUM);
2769

2770 2771
  /* Reset the locks on the supremum of the left page, releasing
  waiting transactions */
2772 2773
  lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(l, g.cell1(), lock_sys.rec_hash);
2774

2775
  ut_d(lock_assert_no_spatial(l));
2776 2777
}

2778 2779
/** Update locks when the root page is copied to another in
btr_root_raise_and_insert(). Note that we leave lock structs on the
2780 2781 2782 2783
root page, even though they do not make sense on other than leaf
pages: the reason is that in a pessimistic update the infimum record
of the root page will act as a dummy carrier of the locks of the record
to be updated. */
2784
void lock_update_root_raise(const buf_block_t &block, const page_id_t root)
2785
{
2786 2787
  const page_id_t id{block.page.id()};
  LockMultiGuard g{lock_sys.rec_hash, id, root};
2788
  /* Move the locks on the supremum of the root to the supremum of block */
2789 2790
  lock_rec_move(g.cell1(), block, id, g.cell2(), root,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
2791 2792
}

2793 2794 2795 2796
/** Update the lock table when a page is copied to another.
@param new_block  the target page
@param old        old page (not index root page) */
void lock_update_copy_and_discard(const buf_block_t &new_block, page_id_t old)
2797
{
2798 2799
  const page_id_t id{new_block.page.id()};
  LockMultiGuard g{lock_sys.rec_hash, id, old};
2800
  /* Move the locks on the supremum of the old page to the supremum of new */
2801 2802 2803
  lock_rec_move(g.cell1(), new_block, id, g.cell2(), old,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(old, g.cell2(), lock_sys.rec_hash);
2804 2805 2806 2807 2808 2809 2810 2811 2812 2813
}

/*************************************************************//**
Updates the lock table when a page is split to the left. */
void
lock_update_split_left(
/*===================*/
	const buf_block_t*	right_block,	/*!< in: right page */
	const buf_block_t*	left_block)	/*!< in: left page */
{
2814 2815 2816
  ulint h= lock_get_min_heap_no(right_block);
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
2817
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2818 2819
  /* Inherit the locks to the supremum of the left page from the
  successor of the infimum on the right page */
2820 2821
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->frame,
                          PAGE_HEAP_NO_SUPREMUM, h);
2822 2823
}

2824 2825 2826 2827 2828 2829
/** Update the lock table when a page is merged to the left.
@param left      left page
@param orig_pred original predecessor of supremum on the left page before merge
@param right     merged, to-be-discarded right page */
void lock_update_merge_left(const buf_block_t& left, const rec_t *orig_pred,
                            const page_id_t right)
2830
{
2831
  ut_ad(left.frame == page_align(orig_pred));
2832

2833
  const page_id_t l{left.page.id()};
2834

2835
  LockMultiGuard g{lock_sys.rec_hash, l, right};
2836
  const rec_t *left_next_rec= page_rec_get_next_const(orig_pred);
2837

2838 2839 2840 2841
  if (!page_rec_is_supremum(left_next_rec))
  {
    /* Inherit the locks on the supremum of the left page to the
    first record which was moved from the right page */
2842
    lock_rec_inherit_to_gap(g.cell1(), l, g.cell1(), l, left.frame,
2843 2844 2845 2846 2847
                            page_rec_get_heap_no(left_next_rec),
                            PAGE_HEAP_NO_SUPREMUM);

    /* Reset the locks on the supremum of the left page,
    releasing waiting transactions */
2848
    lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
2849
  }
2850

2851 2852
  /* Move the locks from the supremum of right page to the supremum
  of the left page */
2853 2854 2855
  lock_rec_move(g.cell1(), left, l, g.cell2(), right,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(right, g.cell2(), lock_sys.rec_hash);
2856

2857 2858
  /* there should exist no page lock on the right page,
  otherwise, it will be blocked from merge */
2859
  ut_d(lock_assert_no_spatial(right));
2860 2861
}

2862 2863 2864 2865 2866 2867
/*************************************************************//**
Resets the original locks on heir and replaces them with gap type locks
inherited from rec. */
void
lock_rec_reset_and_inherit_gap_locks(
/*=================================*/
2868
	const buf_block_t&	heir_block,	/*!< in: block containing the
2869
						record which inherits */
2870
	const page_id_t		donor,		/*!< in: page containing the
2871 2872 2873 2874 2875 2876 2877 2878
						record from which inherited;
						does NOT reset the locks on
						this record */
	ulint			heir_heap_no,	/*!< in: heap_no of the
						inheriting record */
	ulint			heap_no)	/*!< in: heap_no of the
						donating record */
{
2879
  const page_id_t heir{heir_block.page.id()};
2880
  LockMultiGuard g{lock_sys.rec_hash, heir, donor};
2881 2882 2883
  lock_rec_reset_and_release_wait(g.cell1(), heir, heir_heap_no);
  lock_rec_inherit_to_gap(g.cell1(), heir, g.cell2(), donor, heir_block.frame,
                          heir_heap_no, heap_no);
2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897
}

/*************************************************************//**
Updates the lock table when a page is discarded. */
void
lock_update_discard(
/*================*/
	const buf_block_t*	heir_block,	/*!< in: index page
						which will inherit the locks */
	ulint			heir_heap_no,	/*!< in: heap_no of the record
						which will inherit the locks */
	const buf_block_t*	block)		/*!< in: index page
						which will be discarded */
{
2898
	const page_t*	page = block->frame;
2899 2900
	const rec_t*	rec;
	ulint		heap_no;
2901
	const page_id_t	heir(heir_block->page.id());
2902
	const page_id_t	page_id(block->page.id());
2903
	LockMultiGuard	g{lock_sys.rec_hash, heir, page_id};
2904

2905
	if (lock_sys_t::get_first(g.cell2(), page_id)) {
2906
		ut_d(lock_assert_no_spatial(page_id));
2907 2908
		/* Inherit all the locks on the page to the record and
		reset all the locks on the page */
2909

2910 2911
		if (page_is_comp(page)) {
			rec = page + PAGE_NEW_INFIMUM;
2912

2913 2914
			do {
				heap_no = rec_get_heap_no_new(rec);
2915

2916 2917
				lock_rec_inherit_to_gap(g.cell1(), heir,
							g.cell2(), page_id,
2918
							heir_block->frame,
2919
							heir_heap_no, heap_no);
2920

2921
				lock_rec_reset_and_release_wait(
2922
					g.cell2(), page_id, heap_no);
2923

2924 2925 2926 2927
				rec = page + rec_get_next_offs(rec, TRUE);
			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
		} else {
			rec = page + PAGE_OLD_INFIMUM;
2928

2929 2930
			do {
				heap_no = rec_get_heap_no_old(rec);
2931

2932 2933
				lock_rec_inherit_to_gap(g.cell1(), heir,
							g.cell2(), page_id,
2934
							heir_block->frame,
2935
							heir_heap_no, heap_no);
2936

2937
				lock_rec_reset_and_release_wait(
2938
					g.cell2(), page_id, heap_no);
2939

2940 2941 2942
				rec = page + rec_get_next_offs(rec, FALSE);
			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
		}
2943

2944
		lock_rec_free_all_from_discard_page(page_id, g.cell2(),
2945
						    lock_sys.rec_hash);
2946
	} else {
2947
		const auto fold = page_id.fold();
2948 2949
		auto cell = lock_sys.prdt_hash.cell_get(fold);
		auto latch = lock_sys_t::hash_table::latch(cell);
2950
		latch->acquire();
2951
		lock_rec_free_all_from_discard_page(page_id, *cell,
2952 2953
						    lock_sys.prdt_hash);
		latch->release();
2954 2955
		cell = lock_sys.prdt_page_hash.cell_get(fold);
		latch = lock_sys_t::hash_table::latch(cell);
2956
		latch->acquire();
2957 2958
		lock_rec_free_all_from_discard_page(page_id, *cell,
						    lock_sys.prdt_page_hash);
2959
		latch->release();
2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974
	}
}

/*************************************************************//**
Updates the lock table when a new user record is inserted. */
void
lock_update_insert(
/*===============*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: the inserted record */
{
	ulint	receiver_heap_no;
	ulint	donator_heap_no;

	ut_ad(block->frame == page_align(rec));
2975
	ut_ad(!page_rec_is_metadata(rec));
2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006

	/* Inherit the gap-locking locks for rec, in gap mode, from the next
	record */

	if (page_rec_is_comp(rec)) {
		receiver_heap_no = rec_get_heap_no_new(rec);
		donator_heap_no = rec_get_heap_no_new(
			page_rec_get_next_low(rec, TRUE));
	} else {
		receiver_heap_no = rec_get_heap_no_old(rec);
		donator_heap_no = rec_get_heap_no_old(
			page_rec_get_next_low(rec, FALSE));
	}

	lock_rec_inherit_to_gap_if_gap_lock(
		block, receiver_heap_no, donator_heap_no);
}

/*************************************************************//**
Updates the lock table when a record is removed. */
void
lock_update_delete(
/*===============*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: the record to be removed */
{
	const page_t*	page = block->frame;
	ulint		heap_no;
	ulint		next_heap_no;

	ut_ad(page == page_align(rec));
3007
	ut_ad(!page_rec_is_metadata(rec));
3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020

	if (page_is_comp(page)) {
		heap_no = rec_get_heap_no_new(rec);
		next_heap_no = rec_get_heap_no_new(page
						   + rec_get_next_offs(rec,
								       TRUE));
	} else {
		heap_no = rec_get_heap_no_old(rec);
		next_heap_no = rec_get_heap_no_old(page
						   + rec_get_next_offs(rec,
								       FALSE));
	}

3021
	const page_id_t id{block->page.id()};
3022
	LockGuard g{lock_sys.rec_hash, id};
3023 3024 3025

	/* Let the next record inherit the locks from rec, in gap mode */

3026 3027
	lock_rec_inherit_to_gap(g.cell(), id, g.cell(), id, block->frame,
				next_heap_no, heap_no);
3028 3029

	/* Reset the lock bits on rec and release waiting transactions */
3030
	lock_rec_reset_and_release_wait(g.cell(), id, heap_no);
3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049
}

/*********************************************************************//**
Stores on the page infimum record the explicit locks of another record.
This function is used to store the lock state of a record when it is
updated and the size of the record changes in the update. The record
is moved in such an update, perhaps to another page. The infimum record
acts as a dummy carrier record, taking care of lock releases while the
actual record is being moved. */
void
lock_rec_store_on_page_infimum(
/*===========================*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: record whose lock state
					is stored on the infimum
					record of the same page; lock
					bits are reset on the
					record */
{
3050
  const ulint heap_no= page_rec_get_heap_no(rec);
3051

3052 3053
  ut_ad(block->frame == page_align(rec));
  const page_id_t id{block->page.id()};
3054

3055
  LockGuard g{lock_sys.rec_hash, id};
3056 3057
  lock_rec_move(g.cell(), *block, id, g.cell(), id,
                PAGE_HEAP_NO_INFIMUM, heap_no);
3058 3059
}

3060 3061 3062 3063 3064 3065 3066 3067
/** Restore the explicit lock requests on a single record, where the
state was stored on the infimum of a page.
@param block   buffer block containing rec
@param rec     record whose lock state is restored
@param donator page (rec is not necessarily on this page)
whose infimum stored the lock state; lock bits are reset on the infimum */
void lock_rec_restore_from_page_infimum(const buf_block_t &block,
					const rec_t *rec, page_id_t donator)
3068
{
3069
  const ulint heap_no= page_rec_get_heap_no(rec);
3070 3071 3072 3073
  const page_id_t id{block.page.id()};
  LockMultiGuard g{lock_sys.rec_hash, id, donator};
  lock_rec_move(g.cell1(), block, id, g.cell2(), donator, heap_no,
                PAGE_HEAP_NO_INFIMUM);
3074 3075
}

3076
/*========================= TABLE LOCKS ==============================*/
3077 3078

/*********************************************************************//**
3079 3080 3081
Creates a table lock object and adds it as the last in the lock queue
of the table. Does NOT check for deadlocks or lock compatibility.
@return own: new lock object */
3082
UNIV_INLINE
3083 3084 3085 3086 3087
lock_t*
lock_table_create(
/*==============*/
	dict_table_t*	table,	/*!< in/out: database table
				in dictionary cache */
3088
	unsigned	type_mode,/*!< in: lock mode possibly ORed with
3089
				LOCK_WAIT */
3090 3091
	trx_t*		trx,	/*!< in: trx */
	lock_t*		c_lock)	/*!< in: conflicting lock */
3092
{
3093
	lock_t*		lock;
3094

3095
	lock_sys.assert_locked(*table);
3096
	ut_ad(trx->mutex_is_owner());
3097 3098
	ut_ad(!trx->is_wsrep() || lock_sys.is_writer());
	ut_ad(trx->state == TRX_STATE_ACTIVE || trx->is_recovered);
Marko Mäkelä's avatar
Marko Mäkelä committed
3099
	ut_ad(!trx->is_autocommit_non_locking());
3100

3101 3102
	switch (LOCK_MODE_MASK & type_mode) {
	case LOCK_AUTO_INC:
3103
		++table->n_waiting_or_granted_auto_inc_locks;
3104 3105 3106 3107 3108
		/* For AUTOINC locking we reuse the lock instance only if
		there is no wait involved else we allocate the waiting lock
		from the transaction lock heap. */
		if (type_mode == LOCK_AUTO_INC) {
			lock = table->autoinc_lock;
3109

3110 3111
			ut_ad(!table->autoinc_trx);
			table->autoinc_trx = trx;
3112

3113 3114 3115
			ib_vector_push(trx->autoinc_locks, &lock);
			goto allocated;
		}
3116

3117 3118 3119 3120 3121
		break;
	case LOCK_X:
	case LOCK_S:
		++table->n_lock_x_or_s;
		break;
3122 3123
	}

3124 3125 3126 3127 3128 3129
	lock = trx->lock.table_cached < array_elements(trx->lock.table_pool)
		? &trx->lock.table_pool[trx->lock.table_cached++]
		: static_cast<lock_t*>(
			mem_heap_alloc(trx->lock.lock_heap, sizeof *lock));

allocated:
3130 3131
	lock->type_mode = ib_uint32_t(type_mode | LOCK_TABLE);
	lock->trx = trx;
3132

3133
	lock->un_member.tab_lock.table = table;
3134

3135
	ut_ad(table->get_ref_count() > 0 || !table->can_be_evicted);
3136

3137
	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
3138

3139
	ut_list_append(table->locks, lock, TableLockGetNode());
3140

3141
	if (type_mode & LOCK_WAIT) {
3142 3143 3144 3145 3146 3147 3148 3149 3150
		if (trx->lock.wait_trx) {
			ut_ad(!c_lock || trx->lock.wait_trx == c_lock->trx);
			ut_ad(trx->lock.wait_lock);
			ut_ad((*trx->lock.wait_lock).trx == trx);
		} else {
			ut_ad(c_lock);
			trx->lock.wait_trx = c_lock->trx;
			ut_ad(!trx->lock.wait_lock);
		}
3151
		trx->lock.wait_lock = lock;
3152 3153
	}

3154 3155 3156 3157
	lock->trx->lock.table_locks.push_back(lock);

	MONITOR_INC(MONITOR_TABLELOCK_CREATED);
	MONITOR_INC(MONITOR_NUM_TABLELOCK);
3158 3159 3160 3161

	return(lock);
}

3162 3163 3164 3165 3166
/*************************************************************//**
Pops autoinc lock requests from the transaction's autoinc_locks. We
handle the case where there are gaps in the array and they need to
be popped off the stack. */
UNIV_INLINE
3167
void
3168 3169 3170
lock_table_pop_autoinc_locks(
/*=========================*/
	trx_t*	trx)	/*!< in/out: transaction that owns the AUTOINC locks */
3171
{
3172
	ut_ad(!ib_vector_is_empty(trx->autoinc_locks));
3173

3174 3175
	/* Skip any gaps, gaps are NULL lock entries in the
	trx->autoinc_locks vector. */
3176

3177 3178
	do {
		ib_vector_pop(trx->autoinc_locks);
3179

3180 3181 3182
		if (ib_vector_is_empty(trx->autoinc_locks)) {
			return;
		}
3183

3184 3185
	} while (*(lock_t**) ib_vector_get_last(trx->autoinc_locks) == NULL);
}
3186

3187 3188 3189 3190 3191 3192 3193 3194 3195
/*************************************************************//**
Removes an autoinc lock request from the transaction's autoinc_locks. */
UNIV_INLINE
void
lock_table_remove_autoinc_lock(
/*===========================*/
	lock_t*	lock,	/*!< in: table lock */
	trx_t*	trx)	/*!< in/out: transaction that owns the lock */
{
3196
	ut_ad(lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE));
3197
	lock_sys.assert_locked(*lock->un_member.tab_lock.table);
3198 3199 3200 3201
	ut_ad(trx->mutex_is_owner());

	auto s = ib_vector_size(trx->autoinc_locks);
	ut_ad(s);
3202

3203 3204 3205 3206
	/* With stored functions and procedures the user may drop
	a table within the same "statement". This special case has
	to be handled by deleting only those AUTOINC locks that were
	held by the table being dropped. */
3207

3208 3209
	lock_t*	autoinc_lock = *static_cast<lock_t**>(
		ib_vector_get(trx->autoinc_locks, --s));
3210

3211
	/* This is the default fast case. */
3212

3213 3214 3215 3216 3217
	if (autoinc_lock == lock) {
		lock_table_pop_autoinc_locks(trx);
	} else {
		/* The last element should never be NULL */
		ut_a(autoinc_lock != NULL);
3218

3219
		/* Handle freeing the locks from within the stack. */
3220

3221
		while (s) {
3222
			autoinc_lock = *static_cast<lock_t**>(
3223
				ib_vector_get(trx->autoinc_locks, --s));
3224

3225 3226
			if (autoinc_lock == lock) {
				void*	null_var = NULL;
3227
				ib_vector_set(trx->autoinc_locks, s, &null_var);
3228 3229
				return;
			}
3230
		}
3231

3232 3233
		/* Must find the autoinc lock. */
		ut_error;
3234
	}
3235 3236
}

3237 3238 3239 3240 3241
/*************************************************************//**
Removes a table lock request from the queue and the trx list of locks;
this is a low-level function which does NOT check if waiting requests
can now be granted. */
UNIV_INLINE
3242
const dict_table_t*
3243 3244 3245
lock_table_remove_low(
/*==================*/
	lock_t*	lock)	/*!< in/out: table lock */
3246
{
3247 3248
	ut_ad(lock->is_table());

3249 3250
	trx_t*		trx;
	dict_table_t*	table;
3251

3252
	ut_ad(lock->is_table());
3253 3254
	trx = lock->trx;
	table = lock->un_member.tab_lock.table;
3255
	lock_sys.assert_locked(*table);
3256
	ut_ad(trx->mutex_is_owner());
3257

3258 3259
	/* Remove the table from the transaction's AUTOINC vector, if
	the lock that is being released is an AUTOINC lock. */
3260 3261
	switch (lock->mode()) {
	case LOCK_AUTO_INC:
3262
		ut_ad((table->autoinc_trx == trx) == !lock->is_waiting());
3263

3264 3265
		if (table->autoinc_trx == trx) {
			table->autoinc_trx = NULL;
3266 3267 3268
			/* The locks must be freed in the reverse order from
			the one in which they were acquired. This is to avoid
			traversing the AUTOINC lock vector unnecessarily.
3269

3270 3271 3272
			We only store locks that were granted in the
			trx->autoinc_locks vector (see lock_table_create()
			and lock_grant()). */
3273 3274 3275
			lock_table_remove_autoinc_lock(lock, trx);
		}

3276 3277 3278 3279 3280 3281 3282 3283 3284 3285
		ut_ad(table->n_waiting_or_granted_auto_inc_locks);
		--table->n_waiting_or_granted_auto_inc_locks;
		break;
	case LOCK_X:
	case LOCK_S:
		ut_ad(table->n_lock_x_or_s);
		--table->n_lock_x_or_s;
		break;
	default:
		break;
3286 3287 3288 3289 3290 3291 3292
	}

	UT_LIST_REMOVE(trx->lock.trx_locks, lock);
	ut_list_remove(table->locks, lock, TableLockGetNode());

	MONITOR_INC(MONITOR_TABLELOCK_REMOVED);
	MONITOR_DEC(MONITOR_NUM_TABLELOCK);
3293
	return table;
3294 3295
}

3296 3297 3298
/*********************************************************************//**
Enqueues a waiting request for a table lock which cannot be granted
immediately. Checks for deadlocks.
3299
@retval	DB_LOCK_WAIT	if the waiting lock was enqueued
3300
@retval	DB_DEADLOCK	if this transaction was chosen as the victim */
3301
static
3302 3303 3304
dberr_t
lock_table_enqueue_waiting(
/*=======================*/
3305
	unsigned	mode,	/*!< in: lock mode this transaction is
3306 3307
				requesting */
	dict_table_t*	table,	/*!< in/out: table */
3308 3309
	que_thr_t*	thr,	/*!< in: query thread */
	lock_t*		c_lock)	/*!< in: conflicting lock or NULL */
3310
{
3311
	lock_sys.assert_locked(*table);
3312
	ut_ad(!srv_read_only_mode);
3313

3314 3315
	trx_t* trx = thr_get_trx(thr);
	ut_ad(trx->mutex_is_owner());
3316

3317
	if (UNIV_UNLIKELY(trx->dict_operation)) {
3318 3319 3320 3321 3322
		ib::error() << "A table lock wait happens in a dictionary"
			" operation. Table " << table->name
			<< ". " << BUG_REPORT_MSG;
		ut_ad(0);
	}
3323

3324
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
3325
	if (trx->is_wsrep() && trx->lock.was_chosen_as_deadlock_victim) {
3326 3327 3328
		return(DB_DEADLOCK);
	}
#endif /* WITH_WSREP */
3329

3330
	/* Enqueue the lock request that will wait to be granted */
3331
	lock_table_create(table, mode | LOCK_WAIT, trx, c_lock);
3332

3333
	trx->lock.wait_thr = thr;
3334 3335
	trx->lock.was_chosen_as_deadlock_victim
		IF_WSREP(.fetch_and(byte(~1)), = false);
3336

3337 3338 3339
	MONITOR_INC(MONITOR_TABLELOCK_WAIT);
	return(DB_LOCK_WAIT);
}
3340

3341 3342 3343 3344 3345
/*********************************************************************//**
Checks if other transactions have an incompatible mode lock request in
the lock queue.
@return lock or NULL */
UNIV_INLINE
3346
lock_t*
3347 3348 3349 3350 3351 3352 3353 3354 3355 3356
lock_table_other_has_incompatible(
/*==============================*/
	const trx_t*		trx,	/*!< in: transaction, or NULL if all
					transactions should be included */
	ulint			wait,	/*!< in: LOCK_WAIT if also
					waiting locks are taken into
					account, or 0 if not */
	const dict_table_t*	table,	/*!< in: table */
	lock_mode		mode)	/*!< in: lock mode */
{
3357
	lock_sys.assert_locked(*table);
3358

3359 3360 3361 3362 3363 3364 3365 3366 3367
	static_assert(LOCK_IS == 0, "compatibility");
	static_assert(LOCK_IX == 1, "compatibility");

	if (UNIV_LIKELY(mode <= LOCK_IX && !table->n_lock_x_or_s)) {
		return(NULL);
	}

	for (lock_t* lock = UT_LIST_GET_LAST(table->locks);
	     lock;
3368
	     lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock)) {
3369

3370 3371 3372
		trx_t* lock_trx = lock->trx;

		if (lock_trx != trx
3373 3374
		    && !lock_mode_compatible(lock->mode(), mode)
		    && (wait || !lock->is_waiting())) {
3375 3376 3377
			return(lock);
		}
	}
3378

3379 3380
	return(NULL);
}
3381

3382 3383 3384
/*********************************************************************//**
Locks the specified database table in the mode given. If the lock cannot
be granted immediately, the query thread is put to wait.
3385
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
3386 3387 3388 3389 3390 3391 3392 3393 3394 3395
dberr_t
lock_table(
/*=======*/
	dict_table_t*	table,	/*!< in/out: database table
				in dictionary cache */
	lock_mode	mode,	/*!< in: lock mode */
	que_thr_t*	thr)	/*!< in: query thread */
{
	trx_t*		trx;
	dberr_t		err;
3396
	lock_t*		wait_for;
3397

3398 3399
	if (table->is_temporary()) {
		return DB_SUCCESS;
3400
	}
3401

3402
	trx = thr_get_trx(thr);
3403

3404
	/* Look for equal or stronger locks the same trx already
3405
	has on the table. No need to acquire LockMutexGuard here
3406 3407
	because only this transacton can add/access table locks
	to/from trx_t::table_locks. */
3408

3409
	if (lock_table_has(trx, table, mode) || srv_read_only_mode) {
3410
		return(DB_SUCCESS);
3411 3412
	}

3413 3414 3415 3416
	/* Read only transactions can write to temp tables, we don't want
	to promote them to RW transactions. Their updates cannot be visible
	to other transactions. Therefore we can keep them out
	of the read views. */
3417

3418 3419 3420
	if ((mode == LOCK_IX || mode == LOCK_X)
	    && !trx->read_only
	    && trx->rsegs.m_redo.rseg == 0) {
3421

3422 3423
		trx_set_rw_mode(trx);
	}
3424

3425 3426
	err = DB_SUCCESS;

3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437
#ifdef WITH_WSREP
	if (trx->is_wsrep()) {
		lock_sys.wr_lock(SRW_LOCK_CALL);
	} else {
		lock_sys.rd_lock(SRW_LOCK_CALL);
		table->lock_mutex_lock();
	}
#else
	lock_sys.rd_lock(SRW_LOCK_CALL);
	table->lock_mutex_lock();
#endif
3438

3439 3440
	/* We have to check if the new lock is compatible with any locks
	other transactions have in the table lock queue. */
3441

3442 3443
	wait_for = lock_table_other_has_incompatible(
		trx, LOCK_WAIT, table, mode);
3444

3445
	trx->mutex_lock();
3446

3447
	if (wait_for) {
3448
		err = lock_table_enqueue_waiting(mode, table, thr, wait_for);
3449
	} else {
3450
		lock_table_create(table, mode, trx, wait_for);
3451
	}
3452

3453 3454 3455 3456 3457 3458 3459 3460 3461
#ifdef WITH_WSREP
	if (trx->is_wsrep()) {
		lock_sys.wr_unlock();
		trx->mutex_unlock();
		return err;
	}
#endif
	table->lock_mutex_unlock();
	lock_sys.rd_unlock();
3462
	trx->mutex_unlock();
3463 3464

	return(err);
3465 3466
}

3467
/** Create a table lock object for a resurrected transaction.
3468
@param table    table to be X-locked
3469 3470 3471
@param trx      transaction
@param mode     LOCK_X or LOCK_IX */
void lock_table_resurrect(dict_table_t *table, trx_t *trx, lock_mode mode)
3472 3473
{
  ut_ad(trx->is_recovered);
3474 3475 3476
  ut_ad(mode == LOCK_X || mode == LOCK_IX);

  if (lock_table_has(trx, table, mode))
3477 3478
    return;

3479
  {
3480
    LockMutexGuard g{SRW_LOCK_CALL};
3481
    ut_ad(!lock_table_other_has_incompatible(trx, LOCK_WAIT, table, mode));
3482

3483
    trx->mutex_lock();
3484
    lock_table_create(table, mode, trx, nullptr);
3485
  }
3486
  trx->mutex_unlock();
3487 3488
}

3489 3490
/** Find a lock that a waiting table lock request still has to wait for. */
static const lock_t *lock_table_has_to_wait_in_queue(const lock_t *wait_lock)
3491
{
3492 3493
  ut_ad(wait_lock->is_waiting());
  ut_ad(wait_lock->is_table());
3494

3495 3496
  dict_table_t *table= wait_lock->un_member.tab_lock.table;
  lock_sys.assert_locked(*table);
3497

3498 3499
  static_assert(LOCK_IS == 0, "compatibility");
  static_assert(LOCK_IX == 1, "compatibility");
3500

3501 3502
  if (UNIV_LIKELY(wait_lock->mode() <= LOCK_IX && !table->n_lock_x_or_s))
    return nullptr;
3503

3504 3505 3506 3507
  for (const lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock != wait_lock;
       lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
    if (lock_has_to_wait(wait_lock, lock))
      return lock;
3508

3509
  return nullptr;
3510
}
3511

3512 3513 3514
/*************************************************************//**
Removes a table lock request, waiting or granted, from the queue and grants
locks to other transactions in the queue, if they now are entitled to a
3515 3516 3517 3518
lock.
@param[in,out]	in_lock		table lock
@param[in]	owns_wait_mutex	whether lock_sys.wait_mutex is held */
static void lock_table_dequeue(lock_t *in_lock, bool owns_wait_mutex)
3519
{
3520 3521 3522
#ifdef SAFE_MUTEX
	ut_ad(owns_wait_mutex == mysql_mutex_is_owner(&lock_sys.wait_mutex));
#endif
3523
	ut_ad(in_lock->trx->mutex_is_owner());
3524
	lock_t*	lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, in_lock);
3525

3526
	const dict_table_t* table = lock_table_remove_low(in_lock);
3527

3528 3529 3530 3531 3532 3533 3534
	static_assert(LOCK_IS == 0, "compatibility");
	static_assert(LOCK_IX == 1, "compatibility");

	if (UNIV_LIKELY(in_lock->mode() <= LOCK_IX && !table->n_lock_x_or_s)) {
		return;
	}

3535 3536
	bool acquired = false;

3537 3538
	/* Check if waiting locks in the queue can now be granted: grant
	locks if there are no conflicting locks ahead. */
3539

3540 3541 3542
	for (/* No op */;
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
3543 3544 3545 3546 3547 3548 3549 3550
		if (!lock->is_waiting()) {
			continue;
		}

		if (!owns_wait_mutex) {
			mysql_mutex_lock(&lock_sys.wait_mutex);
			acquired = owns_wait_mutex = true;
		}
3551

3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563
		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

		if (const lock_t* c = lock_table_has_to_wait_in_queue(lock)) {
			trx_t* c_trx = c->trx;
			lock->trx->lock.wait_trx = c_trx;
			if (c_trx->lock.wait_trx
			    && innodb_deadlock_detect
			    && Deadlock::to_check.emplace(c_trx).second) {
				Deadlock::to_be_checked = true;
			}
		} else {
3564 3565
			/* Grant the lock */
			ut_ad(in_lock->trx != lock->trx);
3566
			in_lock->trx->mutex_unlock();
3567
			lock_grant(lock);
3568
			in_lock->trx->mutex_lock();
3569 3570
		}
	}
3571 3572 3573 3574

	if (acquired) {
		mysql_mutex_unlock(&lock_sys.wait_mutex);
	}
3575 3576
}

3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608
/** Sets a lock on a table based on the given mode.
@param[in]	table	table to lock
@param[in,out]	trx	transaction
@param[in]	mode	LOCK_X or LOCK_S
@return error code or DB_SUCCESS. */
dberr_t
lock_table_for_trx(
	dict_table_t*	table,
	trx_t*		trx,
	enum lock_mode	mode)
{
	mem_heap_t*	heap;
	que_thr_t*	thr;
	dberr_t		err;
	sel_node_t*	node;
	heap = mem_heap_create(512);

	node = sel_node_create(heap);
	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
	thr->graph->state = QUE_FORK_ACTIVE;

	/* We use the select query graph as the dummy graph needed
	in the lock module call */

	thr = static_cast<que_thr_t*>(
		que_fork_get_first_thr(
			static_cast<que_fork_t*>(que_node_get_parent(thr))));

run_again:
	thr->run_node = thr;
	thr->prev_node = thr->common.parent;

3609
	err = lock_table(table, mode, thr);
3610 3611 3612

	trx->error_state = err;

3613
	if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
3614
		if (row_mysql_handle_errors(&err, trx, thr, NULL)) {
3615 3616 3617 3618 3619 3620 3621 3622 3623 3624
			goto run_again;
		}
	}

	que_graph_free(thr->graph);
	trx->op_info = "";

	return(err);
}

3625
/*=========================== LOCK RELEASE ==============================*/
3626

3627 3628 3629 3630 3631 3632 3633 3634 3635
/*************************************************************//**
Removes a granted record lock of a transaction from the queue and grants
locks to other transactions waiting in the queue if they now are entitled
to a lock. */
void
lock_rec_unlock(
/*============*/
	trx_t*			trx,	/*!< in/out: transaction that has
					set a record lock */
3636
	const page_id_t		id,	/*!< in: page containing rec */
3637 3638
	const rec_t*		rec,	/*!< in: record */
	lock_mode		lock_mode)/*!< in: LOCK_S or LOCK_X */
3639
{
3640 3641 3642
	lock_t*		first_lock;
	lock_t*		lock;
	ulint		heap_no;
3643

3644 3645 3646 3647
	ut_ad(trx);
	ut_ad(rec);
	ut_ad(!trx->lock.wait_lock);
	ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
3648
	ut_ad(!page_rec_is_metadata(rec));
3649

3650
	heap_no = page_rec_get_heap_no(rec);
3651

3652
	LockGuard g{lock_sys.rec_hash, id};
3653

3654
	first_lock = lock_sys_t::get_first(g.cell(), id, heap_no);
3655

3656 3657
	/* Find the last lock with the same lock_mode and transaction
	on the record. */
3658

3659 3660
	for (lock = first_lock; lock != NULL;
	     lock = lock_rec_get_next(heap_no, lock)) {
3661
		if (lock->trx == trx && lock->mode() == lock_mode) {
3662 3663
			goto released;
		}
3664 3665
	}

3666 3667 3668 3669
	{
		ib::error	err;
		err << "Unlock row could not find a " << lock_mode
			<< " mode lock on the record. Current statement: ";
Marko Mäkelä's avatar
Marko Mäkelä committed
3670 3671 3672 3673
		size_t		stmt_len;
		if (const char* stmt = innobase_get_stmt_unsafe(
			    trx->mysql_thd, &stmt_len)) {
			err.write(stmt, stmt_len);
3674
		}
3675
	}
3676

3677
	return;
3678

3679
released:
3680
	ut_a(!lock->is_waiting());
3681
	trx->mutex_lock();
3682
	lock_rec_reset_nth_bit(lock, heap_no);
3683
	trx->mutex_unlock();
3684

3685
	/* Check if we can now grant waiting lock requests */
3686

3687 3688
	for (lock = first_lock; lock != NULL;
	     lock = lock_rec_get_next(heap_no, lock)) {
3689
		if (!lock->is_waiting()) {
3690 3691
			continue;
		}
3692 3693 3694 3695
		mysql_mutex_lock(&lock_sys.wait_mutex);
		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

3696 3697
		if (const lock_t* c = lock_rec_has_to_wait_in_queue(g.cell(),
								    lock)) {
3698 3699 3700 3701 3702
			lock->trx->lock.wait_trx = c->trx;
		} else {
			/* Grant the lock */
			ut_ad(trx != lock->trx);
			lock_grant(lock);
3703
		}
3704
		mysql_mutex_unlock(&lock_sys.wait_mutex);
3705
	}
3706
}
3707

3708
/** Release the explicit locks of a committing transaction,
3709 3710 3711
and release possible other transactions waiting because of these locks.
@return whether the operation succeeded */
static bool lock_release_try(trx_t *trx)
3712
{
3713 3714 3715 3716
  /* At this point, trx->lock.trx_locks cannot be modified by other
  threads, because our transaction has been committed.
  See the checks and assertions in lock_rec_create_low() and
  lock_rec_add_to_queue().
3717

3718 3719 3720 3721 3722
  The function lock_table_create() should never be invoked on behalf
  of a transaction running in another thread. Also there, we will
  assert that the current transaction be active. */
  DBUG_ASSERT(trx->state == TRX_STATE_COMMITTED_IN_MEMORY);
  DBUG_ASSERT(!trx->is_referenced());
3723

3724 3725 3726 3727
  bool all_released= true;
restart:
  ulint count= 1000;
  lock_sys.rd_lock(SRW_LOCK_CALL);
3728
  trx->mutex_lock();
3729

3730 3731 3732 3733 3734 3735
  /* Note: Anywhere else, trx->mutex is not held while acquiring
  a lock table latch, but here we are following the opposite order.
  To avoid deadlocks, we only try to acquire the lock table latches
  but not keep waiting for them. */

  for (lock_t *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock; )
3736 3737
  {
    ut_ad(lock->trx == trx);
3738
    lock_t *prev= UT_LIST_GET_PREV(trx_locks, lock);
3739 3740 3741 3742 3743 3744
    if (!lock->is_table())
    {
      ut_ad(!lock->index->table->is_temporary());
      ut_ad(lock->mode() != LOCK_X ||
            lock->index->table->id >= DICT_HDR_FIRST_ID ||
            trx->dict_operation);
3745
      auto &lock_hash= lock_sys.hash_get(lock->type_mode);
3746 3747
      auto cell= lock_hash.cell_get(lock->un_member.rec_lock.page_id.fold());
      auto latch= lock_sys_t::hash_table::latch(cell);
3748 3749 3750 3751 3752 3753 3754
      if (!latch->try_acquire())
        all_released= false;
      else
      {
        lock_rec_dequeue_from_page(lock, false);
        latch->release();
      }
3755 3756 3757
    }
    else
    {
3758
      dict_table_t *table= lock->un_member.tab_lock.table;
3759 3760 3761 3762
      ut_ad(!table->is_temporary());
      ut_ad(table->id >= DICT_HDR_FIRST_ID ||
            (lock->mode() != LOCK_IX && lock->mode() != LOCK_X) ||
            trx->dict_operation);
3763 3764 3765 3766 3767 3768 3769
      if (!table->lock_mutex_trylock())
        all_released= false;
      else
      {
        lock_table_dequeue(lock, false);
        table->lock_mutex_unlock();
      }
3770
    }
3771

3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787
    lock= all_released ? UT_LIST_GET_LAST(trx->lock.trx_locks) : prev;
    if (!--count)
      break;
  }

  lock_sys.rd_unlock();
  trx->mutex_unlock();
  if (all_released && !count)
    goto restart;
  return all_released;
}

/** Release the explicit locks of a committing transaction,
and release possible other transactions waiting because of these locks. */
void lock_release(trx_t *trx)
{
3788 3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799
#if defined SAFE_MUTEX && defined UNIV_DEBUG
  std::set<table_id_t> to_evict;
  if (innodb_evict_tables_on_commit_debug && !trx->is_recovered)
# if 1 /* if dict_stats_exec_sql() were not playing dirty tricks */
    if (!dict_sys.mutex_is_locked())
# else /* this would be more proper way to do it */
    if (!trx->dict_operation_lock_mode && !trx->dict_operation)
# endif
      for (const auto& p: trx->mod_tables)
        if (!p.first->is_temporary())
          to_evict.emplace(p.first->id);
#endif
3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815
  ulint count;

  for (count= 5; count--; )
    if (lock_release_try(trx))
      goto released;

  /* Fall back to acquiring lock_sys.latch in exclusive mode */
restart:
  count= 1000;
  lock_sys.wr_lock(SRW_LOCK_CALL);
  trx->mutex_lock();

  while (lock_t *lock= UT_LIST_GET_LAST(trx->lock.trx_locks))
  {
    ut_ad(lock->trx == trx);
    if (!lock->is_table())
3816
    {
3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830
      ut_ad(!lock->index->table->is_temporary());
      ut_ad(lock->mode() != LOCK_X ||
            lock->index->table->id >= DICT_HDR_FIRST_ID ||
            trx->dict_operation);
      lock_rec_dequeue_from_page(lock, false);
    }
    else
    {
      ut_d(dict_table_t *table= lock->un_member.tab_lock.table);
      ut_ad(!table->is_temporary());
      ut_ad(table->id >= DICT_HDR_FIRST_ID ||
            (lock->mode() != LOCK_IX && lock->mode() != LOCK_X) ||
            trx->dict_operation);
      lock_table_dequeue(lock, false);
3831
    }
3832

3833 3834
    if (!--count)
      break;
3835
  }
3836

3837
  lock_sys.wr_unlock();
3838
  trx->mutex_unlock();
3839 3840 3841 3842
  if (!count)
    goto restart;

released:
3843 3844 3845
  if (UNIV_UNLIKELY(Deadlock::to_be_checked))
  {
    mysql_mutex_lock(&lock_sys.wait_mutex);
3846
    lock_sys.deadlock_check();
3847 3848
    mysql_mutex_unlock(&lock_sys.wait_mutex);
  }
3849

3850 3851
  trx->lock.was_chosen_as_deadlock_victim= false;
  trx->lock.n_rec_locks= 0;
3852 3853 3854 3855 3856 3857 3858 3859

#if defined SAFE_MUTEX && defined UNIV_DEBUG
  if (to_evict.empty())
    return;
  dict_sys.mutex_lock();
  LockMutexGuard g{SRW_LOCK_CALL};
  for (const table_id_t id : to_evict)
  {
3860
    if (dict_table_t *table= dict_sys.find_table(id))
3861 3862 3863 3864 3865
      if (!table->get_ref_count() && !UT_LIST_GET_LEN(table->locks))
        dict_sys.remove(table, true);
  }
  dict_sys.mutex_unlock();
#endif
3866 3867
}

3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897
/** Release locks on a table whose creation is being rolled back */
ATTRIBUTE_COLD void lock_release_on_rollback(trx_t *trx, dict_table_t *table)
{
  trx->mod_tables.erase(table);

  lock_sys.wr_lock(SRW_LOCK_CALL);
  trx->mutex_lock();

  for (lock_t *next, *lock= UT_LIST_GET_FIRST(table->locks); lock; lock= next)
  {
    next= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock);
    ut_ad(lock->trx == trx);
    UT_LIST_REMOVE(trx->lock.trx_locks, lock);
    ut_list_remove(table->locks, lock, TableLockGetNode());
  }

  for (lock_t *p, *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock; lock= p)
  {
    p= UT_LIST_GET_PREV(trx_locks, lock);
    ut_ad(lock->trx == trx);
    if (lock->is_table())
      ut_ad(lock->un_member.tab_lock.table != table);
    else if (lock->index->table == table)
      lock_rec_dequeue_from_page(lock, false);
  }

  lock_sys.wr_unlock();
  trx->mutex_unlock();
}

3898 3899 3900
/*********************************************************************//**
Removes table locks of the transaction on a table to be dropped. */
static
3901
void
3902 3903 3904
lock_trx_table_locks_remove(
/*========================*/
	const lock_t*	lock_to_remove)		/*!< in: lock to remove */
3905
{
3906
	trx_t*		trx = lock_to_remove->trx;
3907

3908
	ut_ad(lock_to_remove->is_table());
3909
	lock_sys.assert_locked(*lock_to_remove->un_member.tab_lock.table);
3910
	ut_ad(trx->mutex_is_owner());
3911

3912 3913
	for (lock_list::iterator it = trx->lock.table_locks.begin(),
             end = trx->lock.table_locks.end(); it != end; ++it) {
3914 3915
		const lock_t*	lock = *it;

3916
		ut_ad(!lock || trx == lock->trx);
3917
		ut_ad(!lock || lock->is_table());
3918
		ut_ad(!lock || lock->un_member.tab_lock.table);
3919

3920 3921 3922 3923
		if (lock == lock_to_remove) {
			*it = NULL;
			return;
		}
3924 3925
	}

3926 3927
	/* Lock must exist in the vector. */
	ut_error;
3928 3929
}

3930
/*===================== VALIDATION AND DEBUGGING ====================*/
3931

3932 3933 3934 3935
/** Print info of a table lock.
@param[in,out]	file	output stream
@param[in]	lock	table lock */
static
3936
void
3937
lock_table_print(FILE* file, const lock_t* lock)
3938
{
3939
	lock_sys.assert_locked();
3940
	ut_a(lock->is_table());
3941

3942 3943 3944
	fputs("TABLE LOCK table ", file);
	ut_print_name(file, lock->trx,
		      lock->un_member.tab_lock.table->name.m_name);
3945
	fprintf(file, " trx id " TRX_ID_FMT, lock->trx->id);
3946

3947 3948
	switch (auto mode = lock->mode()) {
	case LOCK_S:
3949
		fputs(" lock mode S", file);
3950 3951
		break;
	case LOCK_X:
3952 3953
		ut_ad(lock->trx->id != 0);
		fputs(" lock mode X", file);
3954 3955
		break;
	case LOCK_IS:
3956
		fputs(" lock mode IS", file);
3957 3958
		break;
	case LOCK_IX:
3959 3960
		ut_ad(lock->trx->id != 0);
		fputs(" lock mode IX", file);
3961 3962
		break;
	case LOCK_AUTO_INC:
3963
		fputs(" lock mode AUTO-INC", file);
3964 3965 3966
		break;
	default:
		fprintf(file, " unknown lock mode %u", mode);
3967 3968
	}

3969
	if (lock->is_waiting()) {
3970 3971
		fputs(" waiting", file);
	}
3972

3973
	putc('\n', file);
3974 3975
}

Marko Mäkelä's avatar
Marko Mäkelä committed
3976
/** Pretty-print a record lock.
3977
@param[in,out]	file	output stream
Marko Mäkelä's avatar
Marko Mäkelä committed
3978 3979 3980
@param[in]	lock	record lock
@param[in,out]	mtr	mini-transaction for accessing the record */
static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr)
3981
{
3982
	ut_ad(!lock->is_table());
3983

3984
	const page_id_t page_id{lock->un_member.rec_lock.page_id};
3985
	ut_d(lock_sys.hash_get(lock->type_mode).assert_locked(page_id));
3986

3987 3988 3989 3990
	fprintf(file, "RECORD LOCKS space id %u page no %u n bits " ULINTPF
		" index %s of table ",
		page_id.space(), page_id.page_no(),
		lock_rec_get_n_bits(lock),
3991
		lock->index->name());
3992
	ut_print_name(file, lock->trx, lock->index->table->name.m_name);
3993
	fprintf(file, " trx id " TRX_ID_FMT, lock->trx->id);
3994

3995 3996
	switch (lock->mode()) {
	case LOCK_S:
3997
		fputs(" lock mode S", file);
3998 3999
		break;
	case LOCK_X:
4000
		fputs(" lock_mode X", file);
4001 4002
		break;
	default:
4003 4004
		ut_error;
	}
4005

4006
	if (lock->is_gap()) {
4007 4008
		fputs(" locks gap before rec", file);
	}
4009

4010
	if (lock->is_record_not_gap()) {
4011 4012
		fputs(" locks rec but not gap", file);
	}
4013

4014
	if (lock->is_insert_intention()) {
4015 4016
		fputs(" insert intention", file);
	}
4017

4018
	if (lock->is_waiting()) {
4019 4020
		fputs(" waiting", file);
	}
4021

4022 4023
	putc('\n', file);

Marko Mäkelä's avatar
Marko Mäkelä committed
4024
	mem_heap_t*		heap		= NULL;
4025 4026
	rec_offs		offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*		offsets		= offsets_;
Marko Mäkelä's avatar
Marko Mäkelä committed
4027
	rec_offs_init(offsets_);
4028

Marko Mäkelä's avatar
Marko Mäkelä committed
4029
	mtr.start();
4030
	const buf_block_t* block = buf_page_try_get(page_id, &mtr);
4031 4032 4033 4034 4035 4036 4037 4038 4039 4040

	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {

		if (!lock_rec_get_nth_bit(lock, i)) {
			continue;
		}

		fprintf(file, "Record lock, heap no %lu", (ulong) i);

		if (block) {
4041
			ut_ad(page_is_leaf(block->frame));
4042 4043 4044 4045
			const rec_t*	rec;

			rec = page_find_rec_with_heap_no(
				buf_block_get_frame(block), i);
4046
			ut_ad(!page_rec_is_metadata(rec));
4047

4048
			offsets = rec_get_offsets(
4049 4050
				rec, lock->index, offsets,
				lock->index->n_core_fields,
4051 4052 4053 4054
				ULINT_UNDEFINED, &heap);

			putc(' ', file);
			rec_print_new(file, rec, offsets);
4055
		}
4056 4057

		putc('\n', file);
4058 4059
	}

Marko Mäkelä's avatar
Marko Mäkelä committed
4060
	mtr.commit();
4061

Marko Mäkelä's avatar
Marko Mäkelä committed
4062
	if (UNIV_LIKELY_NULL(heap)) {
4063 4064
		mem_heap_free(heap);
	}
4065 4066
}

4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077
#ifdef UNIV_DEBUG
/* Print the number of lock structs from lock_print_info_summary() only
in non-production builds for performance reasons, see
http://bugs.mysql.com/36942 */
#define PRINT_NUM_OF_LOCK_STRUCTS
#endif /* UNIV_DEBUG */

#ifdef PRINT_NUM_OF_LOCK_STRUCTS
/*********************************************************************//**
Calculates the number of record lock structs in the record lock hash table.
@return number of record locks */
4078
static ulint lock_get_n_rec_locks()
4079
{
4080 4081
	ulint	n_locks	= 0;
	ulint	i;
4082

4083
	lock_sys.assert_locked();
4084

4085
	for (i = 0; i < lock_sys.rec_hash.n_cells; i++) {
4086
		const lock_t*	lock;
4087

4088
		for (lock = static_cast<const lock_t*>(
4089
			     HASH_GET_FIRST(&lock_sys.rec_hash, i));
4090 4091 4092
		     lock != 0;
		     lock = static_cast<const lock_t*>(
				HASH_GET_NEXT(hash, lock))) {
4093

4094 4095 4096
			n_locks++;
		}
	}
4097

4098 4099 4100
	return(n_locks);
}
#endif /* PRINT_NUM_OF_LOCK_STRUCTS */
4101

4102 4103
/*********************************************************************//**
Prints info of locks for all transactions.
4104
@return FALSE if not able to acquire lock_sys.latch (and dislay info) */
4105 4106 4107 4108
ibool
lock_print_info_summary(
/*====================*/
	FILE*	file,	/*!< in: file where to print */
4109
	ibool	nowait)	/*!< in: whether to wait for lock_sys.latch */
4110 4111
{
	if (!nowait) {
4112 4113
		lock_sys.wr_lock(SRW_LOCK_CALL);
	} else if (!lock_sys.wr_lock_try()) {
4114 4115 4116 4117
		fputs("FAIL TO OBTAIN LOCK MUTEX,"
		      " SKIP LOCK INFO PRINTING\n", file);
		return(FALSE);
	}
4118

4119
	if (lock_sys.deadlocks) {
4120 4121 4122 4123 4124 4125
		fputs("------------------------\n"
		      "LATEST DETECTED DEADLOCK\n"
		      "------------------------\n", file);

		if (!srv_read_only_mode) {
			ut_copy_file(file, lock_latest_err_file);
4126 4127 4128
		}
	}

4129 4130 4131
	fputs("------------\n"
	      "TRANSACTIONS\n"
	      "------------\n", file);
4132

4133
	fprintf(file, "Trx id counter " TRX_ID_FMT "\n",
4134
		trx_sys.get_max_trx_id());
4135

4136 4137
	fprintf(file,
		"Purge done for trx's n:o < " TRX_ID_FMT
4138
		" undo n:o < " TRX_ID_FMT " state: %s\n"
4139
		"History list length %u\n",
4140
		purge_sys.tail.trx_no(),
4141 4142 4143 4144 4145
		purge_sys.tail.undo_no,
		purge_sys.enabled()
		? (purge_sys.running() ? "running"
		   : purge_sys.paused() ? "stopped" : "running but idle")
		: "disabled",
4146
		uint32_t{trx_sys.rseg_history_len});
4147

4148 4149 4150 4151 4152 4153 4154
#ifdef PRINT_NUM_OF_LOCK_STRUCTS
	fprintf(file,
		"Total number of lock structs in row lock hash table %lu\n",
		(ulong) lock_get_n_rec_locks());
#endif /* PRINT_NUM_OF_LOCK_STRUCTS */
	return(TRUE);
}
4155

4156 4157
/** Prints transaction lock wait and MVCC state.
@param[in,out]	file	file where to print
Marko Mäkelä's avatar
Marko Mäkelä committed
4158
@param[in]	trx	transaction
4159 4160 4161
@param[in]	now	current my_hrtime_coarse() */
void lock_trx_print_wait_and_mvcc_state(FILE *file, const trx_t *trx,
                                        my_hrtime_t now)
4162
{
4163
	fprintf(file, "---");
4164

4165
	trx_print_latched(file, trx, 600);
4166
	trx->read_view.print_limits(file);
4167

4168
	if (const lock_t* wait_lock = trx->lock.wait_lock) {
4169
		const my_hrtime_t suspend_time= trx->lock.suspend_time;
4170
		fprintf(file,
4171
			"------- TRX HAS BEEN WAITING %llu ns"
4172
			" FOR THIS LOCK TO BE GRANTED:\n",
4173
			now.val - suspend_time.val);
4174

4175
		if (!wait_lock->is_table()) {
Marko Mäkelä's avatar
Marko Mäkelä committed
4176
			mtr_t mtr;
4177
			lock_rec_print(file, wait_lock, mtr);
4178
		} else {
4179
			lock_table_print(file, wait_lock);
4180 4181
		}

4182 4183 4184
		fprintf(file, "------------------\n");
	}
}
4185

4186
/*********************************************************************//**
4187
Prints info of locks for a transaction. */
4188
static
4189
void
4190 4191 4192
lock_trx_print_locks(
/*=================*/
	FILE*		file,		/*!< in/out: File to write */
4193
	const trx_t*	trx)		/*!< in: current transaction */
4194
{
Marko Mäkelä's avatar
Marko Mäkelä committed
4195
	mtr_t mtr;
4196
	uint32_t i= 0;
4197
	/* Iterate over the transaction's locks. */
4198
	lock_sys.assert_locked();
4199 4200 4201
	for (lock_t *lock = UT_LIST_GET_FIRST(trx->lock.trx_locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
4202
		if (!lock->is_table()) {
Marko Mäkelä's avatar
Marko Mäkelä committed
4203
			lock_rec_print(file, lock, mtr);
4204 4205 4206 4207
		} else {
			lock_table_print(file, lock);
		}

4208
		if (++i == 10) {
4209 4210 4211 4212 4213 4214

			fprintf(file,
				"10 LOCKS PRINTED FOR THIS TRX:"
				" SUPPRESSING FURTHER PRINTS\n");

			break;
4215 4216
		}
	}
4217
}
4218

Marko Mäkelä's avatar
Marko Mäkelä committed
4219
/** Functor to display all transactions */
4220
struct lock_print_info
4221
{
4222
  lock_print_info(FILE* file, my_hrtime_t now) :
4223
    file(file), now(now),
4224
    purge_trx(purge_sys.query ? purge_sys.query->trx : nullptr)
4225
  {}
4226

4227
  void operator()(const trx_t &trx) const
4228
  {
4229
    if (UNIV_UNLIKELY(&trx == purge_trx))
4230
      return;
4231
    lock_trx_print_wait_and_mvcc_state(file, &trx, now);
4232

4233 4234
    if (trx.will_lock && srv_print_innodb_lock_monitor)
      lock_trx_print_locks(file, &trx);
4235
  }
4236

4237
  FILE* const file;
4238
  const my_hrtime_t now;
4239
  const trx_t* const purge_trx;
4240
};
4241

4242
/*********************************************************************//**
4243 4244
Prints info of locks for each transaction. This function will release
lock_sys.latch, which the caller must be holding in exclusive mode. */
4245 4246 4247 4248
void
lock_print_info_all_transactions(
/*=============================*/
	FILE*		file)	/*!< in/out: file where to print */
4249
{
4250 4251
	fprintf(file, "LIST OF TRANSACTIONS FOR EACH SESSION:\n");

4252
	trx_sys.trx_list.for_each(lock_print_info(file, my_hrtime_coarse()));
4253
	lock_sys.wr_unlock();
4254

4255
	ut_d(lock_validate());
4256 4257
}

4258
#ifdef UNIV_DEBUG
4259
/*********************************************************************//**
4260 4261 4262 4263 4264 4265 4266 4267
Find the the lock in the trx_t::trx_lock_t::table_locks vector.
@return true if found */
static
bool
lock_trx_table_locks_find(
/*======================*/
	trx_t*		trx,		/*!< in: trx to validate */
	const lock_t*	find_lock)	/*!< in: lock to find */
4268
{
4269
	bool		found = false;
4270

4271 4272
	ut_ad(trx->mutex_is_owner());

4273 4274
	for (lock_list::const_iterator it = trx->lock.table_locks.begin(),
             end = trx->lock.table_locks.end(); it != end; ++it) {
4275

4276
		const lock_t*	lock = *it;
4277

4278
		if (lock == NULL) {
4279

4280
			continue;
4281

4282
		} else if (lock == find_lock) {
4283

4284 4285 4286 4287
			/* Can't be duplicates. */
			ut_a(!found);
			found = true;
		}
4288

4289
		ut_a(trx == lock->trx);
4290
		ut_a(lock->is_table());
4291
		ut_a(lock->un_member.tab_lock.table != NULL);
4292 4293
	}

4294 4295
	return(found);
}
4296 4297

/*********************************************************************//**
4298 4299 4300 4301 4302 4303 4304
Validates the lock queue on a table.
@return TRUE if ok */
static
ibool
lock_table_queue_validate(
/*======================*/
	const dict_table_t*	table)	/*!< in: table */
4305
{
4306 4307
	const lock_t*	lock;

4308
	lock_sys.assert_locked(*table);
4309

4310 4311 4312
	for (lock = UT_LIST_GET_FIRST(table->locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
4313

4314
		/* lock->trx->state cannot change from or to NOT_STARTED
4315
		while we are holding the lock_sys.latch. It may change
4316
		from ACTIVE or PREPARED to PREPARED or COMMITTED. */
4317
		lock->trx->mutex_lock();
4318
		check_trx_state(lock->trx);
4319

Marko Mäkelä's avatar
Marko Mäkelä committed
4320
		if (lock->trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4321
		} else if (!lock->is_waiting()) {
4322 4323
			ut_a(!lock_table_other_has_incompatible(
				     lock->trx, 0, table,
4324
				     lock->mode()));
4325 4326 4327 4328 4329
		} else {
			ut_a(lock_table_has_to_wait_in_queue(lock));
		}

		ut_a(lock_trx_table_locks_find(lock->trx, lock));
4330
		lock->trx->mutex_unlock();
4331 4332 4333
	}

	return(TRUE);
4334 4335 4336
}

/*********************************************************************//**
4337 4338 4339
Validates the lock queue on a single record.
@return TRUE if ok */
static
4340
bool
4341 4342
lock_rec_queue_validate(
/*====================*/
4343
	bool			locked_lock_trx_sys,
4344
					/*!< in: if the caller holds
4345
					both the lock_sys.latch and
4346
					trx_sys_t->lock. */
4347
	const page_id_t		id,	/*!< in: page identifier */
4348 4349
	const rec_t*		rec,	/*!< in: record to look at */
	const dict_index_t*	index,	/*!< in: index, or NULL if not known */
4350
	const rec_offs*		offsets)/*!< in: rec_get_offsets(rec, index) */
4351
{
4352 4353
	const lock_t*	lock;
	ulint		heap_no;
4354

4355 4356 4357
	ut_a(rec);
	ut_ad(rec_offs_validate(rec, index, offsets));
	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
4358
	ut_ad(page_rec_is_leaf(rec));
4359 4360
	ut_ad(!index || dict_index_is_clust(index)
	      || !dict_index_is_online_ddl(index));
4361

4362
	heap_no = page_rec_get_heap_no(rec);
4363

4364
	if (!locked_lock_trx_sys) {
4365
		lock_sys.wr_lock(SRW_LOCK_CALL);
4366
	}
4367

4368 4369
	hash_cell_t &cell= *lock_sys.rec_hash.cell_get(id.fold());
	lock_sys.assert_locked(cell);
4370

4371
	if (!page_rec_is_user_rec(rec)) {
4372

4373
		for (lock = lock_sys_t::get_first(cell, id, heap_no);
4374 4375
		     lock != NULL;
		     lock = lock_rec_get_next_const(heap_no, lock)) {
4376

4377
			ut_ad(!index || lock->index == index);
4378

4379
			lock->trx->mutex_lock();
4380 4381
			ut_ad(!lock->trx->read_only
			      || !lock->trx->is_autocommit_non_locking());
4382 4383
			ut_ad(trx_state_eq(lock->trx,
					   TRX_STATE_COMMITTED_IN_MEMORY)
4384
			      || !lock->is_waiting()
4385
			      || lock_rec_has_to_wait_in_queue(cell, lock));
4386
			lock->trx->mutex_unlock();
4387
		}
4388

Marko Mäkelä's avatar
Marko Mäkelä committed
4389 4390
func_exit:
		if (!locked_lock_trx_sys) {
4391
			lock_sys.wr_unlock();
4392
		}
4393

Marko Mäkelä's avatar
Marko Mäkelä committed
4394
		return true;
4395 4396
	}

4397 4398
	ut_ad(page_rec_is_leaf(rec));

Marko Mäkelä's avatar
Marko Mäkelä committed
4399 4400 4401
	const trx_id_t impl_trx_id = index && index->is_primary()
		? lock_clust_rec_some_has_impl(rec, index, offsets)
		: 0;
4402

Marko Mäkelä's avatar
Marko Mäkelä committed
4403 4404 4405
	if (trx_t *impl_trx = impl_trx_id
	    ? trx_sys.find(current_trx(), impl_trx_id, false)
	    : 0) {
4406 4407
		/* impl_trx could have been committed before we
		acquire its mutex, but not thereafter. */
4408

4409
		impl_trx->mutex_lock();
4410 4411
		ut_ad(impl_trx->state != TRX_STATE_NOT_STARTED);
		if (impl_trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4412 4413
		} else if (const lock_t* other_lock
			   = lock_rec_other_has_expl_req(
4414 4415
				   LOCK_S, cell, id, true, heap_no,
				   impl_trx)) {
4416 4417 4418 4419 4420
			/* The impl_trx is holding an implicit lock on the
			given record 'rec'. So there cannot be another
			explicit granted lock.  Also, there can be another
			explicit waiting lock only if the impl_trx has an
			explicit granted lock. */
4421

4422
#ifdef WITH_WSREP
4423 4424 4425 4426 4427 4428 4429 4430 4431 4432 4433 4434 4435 4436 4437 4438 4439 4440 4441
			/** Galera record locking rules:
			* If there is no other record lock to the same record, we may grant
			the lock request.
			* If there is other record lock but this requested record lock is
			compatible, we may grant the lock request.
			* If there is other record lock and it is not compatible with
			requested lock, all normal transactions must wait.
			* BF (brute force) additional exceptions :
			** If BF already holds record lock for requested record, we may
			grant new record lock even if there is conflicting record lock(s)
			waiting on a queue.
			** If conflicting transaction holds requested record lock,
			we will cancel this record lock and select conflicting transaction
			for BF abort or kill victim.
			** If conflicting transaction is waiting for requested record lock
			we will cancel this wait and select conflicting transaction
			for BF abort or kill victim.
			** There should not be two BF transactions waiting for same record lock
			*/
4442
			if (other_lock->trx->is_wsrep() && !other_lock->is_waiting()) {
4443 4444
				wsrep_report_bf_lock_wait(impl_trx->mysql_thd, impl_trx->id);
				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
4445

4446
				if (!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
4447
						       cell, id, heap_no,
4448
						       impl_trx)) {
4449
					ib::info() << "WSREP impl BF lock conflict";
4450
				}
4451
			} else
4452
#endif /* WITH_WSREP */
4453
			{
4454
				ut_ad(other_lock->is_waiting());
4455
				ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
4456 4457
						        cell, id, heap_no,
							impl_trx));
4458
			}
4459
		}
4460

4461
		impl_trx->mutex_unlock();
4462
	}
4463

4464
	for (lock = lock_sys_t::get_first(cell, id, heap_no);
4465 4466
	     lock != NULL;
	     lock = lock_rec_get_next_const(heap_no, lock)) {
4467 4468
		ut_ad(!lock->trx->read_only
		      || !lock->trx->is_autocommit_non_locking());
4469
		ut_ad(!page_rec_is_metadata(rec));
4470

4471 4472 4473
		if (index) {
			ut_a(lock->index == index);
		}
4474

4475 4476
		if (lock->is_waiting()) {
			ut_a(lock->is_gap()
4477
			     || lock_rec_has_to_wait_in_queue(cell, lock));
4478 4479 4480
		} else if (!lock->is_gap()) {
			const lock_mode	mode = lock->mode() == LOCK_S
				? LOCK_X : LOCK_S;
4481

4482 4483
			const lock_t*	other_lock
				= lock_rec_other_has_expl_req(
4484 4485
					mode, cell, id, false, heap_no,
					lock->trx);
4486
#ifdef WITH_WSREP
4487 4488 4489 4490 4491 4492 4493 4494 4495 4496 4497 4498
			if (UNIV_UNLIKELY(other_lock && lock->trx->is_wsrep())) {
				/* Only BF transaction may be granted
				lock before other conflicting lock
				request. */
				if (!wsrep_thd_is_BF(lock->trx->mysql_thd, FALSE)
				    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
					/* If no BF, this case is a bug. */
					wsrep_report_bf_lock_wait(lock->trx->mysql_thd, lock->trx->id);
					wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
					ut_error;
				}
			} else
4499
#endif /* WITH_WSREP */
4500
			ut_ad(!other_lock);
4501
		}
4502
	}
4503

Marko Mäkelä's avatar
Marko Mäkelä committed
4504
	goto func_exit;
4505
}
4506

4507 4508 4509 4510 4511
/** Validate the record lock queues on a page.
@param block    buffer pool block
@param latched  whether the tablespace latch may be held
@return true if ok */
static bool lock_rec_validate_page(const buf_block_t *block, bool latched)
4512
{
4513 4514 4515 4516 4517 4518
	const lock_t*	lock;
	const rec_t*	rec;
	ulint		nth_lock	= 0;
	ulint		nth_bit		= 0;
	ulint		i;
	mem_heap_t*	heap		= NULL;
4519 4520
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*	offsets		= offsets_;
4521
	rec_offs_init(offsets_);
4522

4523
	const page_id_t id{block->page.id()};
4524 4525

	LockGuard g{lock_sys.rec_hash, id};
4526
loop:
4527
	lock = lock_sys_t::get_first(g.cell(), id);
4528

4529 4530
	if (!lock) {
		goto function_exit;
4531 4532
	}

4533
	DBUG_ASSERT(block->page.status != buf_page_t::FREED);
4534

4535
	for (i = 0; i < nth_lock; i++) {
4536

4537
		lock = lock_rec_get_next_on_page_const(lock);
4538

4539 4540
		if (!lock) {
			goto function_exit;
4541 4542 4543
		}
	}

4544 4545
	ut_ad(!lock->trx->read_only
	      || !lock->trx->is_autocommit_non_locking());
4546

4547
	/* Only validate the record queues when this thread is not
4548 4549
	holding a tablespace latch. */
	if (!latched)
4550
	for (i = nth_bit; i < lock_rec_get_n_bits(lock); i++) {
4551

4552 4553
		if (i == PAGE_HEAP_NO_SUPREMUM
		    || lock_rec_get_nth_bit(lock, i)) {
4554

4555 4556
			rec = page_find_rec_with_heap_no(block->frame, i);
			ut_a(rec);
4557 4558
			ut_ad(!lock_rec_get_nth_bit(lock, i)
			      || page_rec_is_leaf(rec));
4559
			offsets = rec_get_offsets(rec, lock->index, offsets,
4560 4561
						  lock->index->n_core_fields,
						  ULINT_UNDEFINED, &heap);
4562

4563 4564 4565 4566
			/* If this thread is holding the file space
			latch (fil_space_t::latch), the following
			check WILL break the latching order and may
			cause a deadlock of threads. */
4567

4568
			lock_rec_queue_validate(
4569
				true, id, rec, lock->index, offsets);
Sergei Golubchik's avatar
Sergei Golubchik committed
4570

4571
			nth_bit = i + 1;
4572

4573
			goto loop;
4574 4575 4576
		}
	}

4577 4578
	nth_bit = 0;
	nth_lock++;
4579

4580
	goto loop;
4581

4582 4583 4584 4585
function_exit:
	if (heap != NULL) {
		mem_heap_free(heap);
	}
4586 4587 4588
	return(TRUE);
}

4589 4590 4591
/*********************************************************************//**
Validate record locks up to a limit.
@return lock at limit or NULL if no more locks in the hash bucket */
4592
static MY_ATTRIBUTE((warn_unused_result))
4593 4594 4595
const lock_t*
lock_rec_validate(
/*==============*/
4596
	ulint		start,		/*!< in: lock_sys.rec_hash
4597
					bucket */
4598
	page_id_t*	limit)		/*!< in/out: upper limit of
4599 4600
					(space, page_no) */
{
4601
	lock_sys.assert_locked();
4602

4603
	for (const lock_t* lock = static_cast<const lock_t*>(
4604
		     HASH_GET_FIRST(&lock_sys.rec_hash, start));
4605 4606
	     lock != NULL;
	     lock = static_cast<const lock_t*>(HASH_GET_NEXT(hash, lock))) {
4607

4608 4609
		ut_ad(!lock->trx->read_only
		      || !lock->trx->is_autocommit_non_locking());
4610
		ut_ad(!lock->is_table());
4611

4612
		page_id_t current(lock->un_member.rec_lock.page_id);
4613

4614 4615 4616 4617
		if (current > *limit) {
			*limit = current + 1;
			return(lock);
		}
4618 4619
	}

4620 4621
	return(0);
}
4622

4623 4624
/*********************************************************************//**
Validate a record lock's block */
4625
static void lock_rec_block_validate(const page_id_t page_id)
4626 4627 4628 4629
{
	/* The lock and the block that it is referring to may be freed at
	this point. We pass BUF_GET_POSSIBLY_FREED to skip a debug check.
	If the lock exists in lock_rec_validate_page() we assert
4630
	block->page.status != FREED. */
4631

4632 4633
	buf_block_t*	block;
	mtr_t		mtr;
4634

4635 4636 4637 4638 4639
	/* Transactional locks should never refer to dropped
	tablespaces, because all DDL operations that would drop or
	discard or rebuild a tablespace do hold an exclusive table
	lock, which would conflict with any locks referring to the
	tablespace from other transactions. */
4640
	if (fil_space_t* space = fil_space_t::get(page_id.space())) {
4641 4642
		dberr_t err = DB_SUCCESS;
		mtr_start(&mtr);
4643

4644
		block = buf_page_get_gen(
4645
			page_id,
4646
			space->zip_size(),
4647 4648
			RW_X_LATCH, NULL,
			BUF_GET_POSSIBLY_FREED,
4649
			&mtr, &err);
4650

4651 4652
		if (err != DB_SUCCESS) {
			ib::error() << "Lock rec block validate failed for tablespace "
4653
				   << space->chain.start->name
4654
				   << page_id << " err " << err;
4655 4656
		}

Marko Mäkelä's avatar
Marko Mäkelä committed
4657 4658
		ut_ad(!block || block->page.status == buf_page_t::FREED
		      || lock_rec_validate_page(block, space->is_latched()));
4659

4660
		mtr_commit(&mtr);
4661

4662
		space->release();
4663
	}
4664
}
4665

4666

4667
static my_bool lock_validate_table_locks(rw_trx_hash_element_t *element, void*)
4668
{
4669
  lock_sys.assert_locked();
4670
  mysql_mutex_lock(&element->mutex);
4671 4672 4673 4674 4675 4676
  if (element->trx)
  {
    check_trx_state(element->trx);
    for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
         lock != NULL;
         lock= UT_LIST_GET_NEXT(trx_locks, lock))
4677
      if (lock->is_table())
4678 4679
        lock_table_queue_validate(lock->un_member.tab_lock.table);
  }
4680
  mysql_mutex_unlock(&element->mutex);
4681 4682 4683 4684
  return 0;
}


4685 4686
/** Validate the transactional locks. */
static void lock_validate()
4687
{
4688 4689
  std::set<page_id_t> pages;
  {
4690
    LockMutexGuard g{SRW_LOCK_CALL};
4691 4692
    /* Validate table locks */
    trx_sys.rw_trx_hash.iterate(lock_validate_table_locks);
4693

4694 4695 4696 4697 4698 4699 4700 4701 4702 4703 4704 4705
    for (ulint i= 0; i < lock_sys.rec_hash.n_cells; i++)
    {
      page_id_t limit{0, 0};
      while (const lock_t *lock= lock_rec_validate(i, &limit))
      {
        if (lock_rec_find_set_bit(lock) == ULINT_UNDEFINED)
          /* The lock bitmap is empty; ignore it. */
          continue;
        pages.insert(lock->un_member.rec_lock.page_id);
      }
    }
  }
4706

4707 4708
  for (page_id_t page_id : pages)
    lock_rec_block_validate(page_id);
4709 4710 4711 4712 4713 4714 4715 4716 4717 4718
}
#endif /* UNIV_DEBUG */
/*============ RECORD LOCK CHECKS FOR ROW OPERATIONS ====================*/

/*********************************************************************//**
Checks if locks of other transactions prevent an immediate insert of
a record. If they do, first tests if the query thread should anyway
be suspended for some reason; if not, then puts the transaction and
the query thread to the lock wait state and inserts a waiting request
for a gap x-lock to the lock queue.
4719
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
4720 4721 4722 4723 4724 4725 4726 4727
dberr_t
lock_rec_insert_check_and_lock(
/*===========================*/
	const rec_t*	rec,	/*!< in: record after which to insert */
	buf_block_t*	block,	/*!< in/out: buffer block of rec */
	dict_index_t*	index,	/*!< in: index */
	que_thr_t*	thr,	/*!< in: query thread */
	mtr_t*		mtr,	/*!< in/out: mini-transaction */
Eugene Kosov's avatar
Eugene Kosov committed
4728
	bool*		inherit)/*!< out: set to true if the new
4729 4730 4731 4732
				inserted record maybe should inherit
				LOCK_GAP type locks from the successor
				record */
{
4733 4734 4735 4736
  ut_ad(block->frame == page_align(rec));
  ut_ad(mtr->is_named_space(index->table->space));
  ut_ad(page_is_leaf(block->frame));
  ut_ad(!index->table->is_temporary());
4737

4738 4739 4740 4741 4742 4743 4744
  dberr_t err= DB_SUCCESS;
  bool inherit_in= *inherit;
  trx_t *trx= thr_get_trx(thr);
  const rec_t *next_rec= page_rec_get_next_const(rec);
  ulint heap_no= page_rec_get_heap_no(next_rec);
  const page_id_t id{block->page.id()};
  ut_ad(!rec_is_metadata(next_rec, *index));
4745

4746
  {
4747
    LockGuard g{lock_sys.rec_hash, id};
4748 4749 4750
    /* Because this code is invoked for a running transaction by
    the thread that is serving the transaction, it is not necessary
    to hold trx->mutex here. */
4751

4752 4753 4754 4755
    /* When inserting a record into an index, the table must be at
    least IX-locked. When we are building an index, we would pass
    BTR_NO_LOCKING_FLAG and skip the locking altogether. */
    ut_ad(lock_table_has(trx, index->table, LOCK_IX));
4756

4757
    *inherit= lock_sys_t::get_first(g.cell(), id, heap_no);
4758

4759 4760 4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776
    if (*inherit)
    {
      /* Spatial index does not use GAP lock protection. It uses
      "predicate lock" to protect the "range" */
      if (index->is_spatial())
        return DB_SUCCESS;

      /* If another transaction has an explicit lock request which locks
      the gap, waiting or granted, on the successor, the insert has to wait.

      An exception is the case where the lock by the another transaction
      is a gap type lock which it placed to wait for its turn to insert. We
      do not consider that kind of a lock conflicting with our insert. This
      eliminates an unnecessary deadlock which resulted when 2 transactions
      had to wait for their insert. Both had waiting gap type lock requests
      on the successor, which produced an unnecessary deadlock. */
      const unsigned type_mode= LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION;

4777 4778 4779
      if (lock_t *c_lock= lock_rec_other_has_conflicting(type_mode,
                                                         g.cell(), id,
                                                         heap_no, trx))
4780 4781
      {
        trx->mutex_lock();
4782 4783
        err= lock_rec_enqueue_waiting(c_lock, type_mode, id, block->frame,
                                      heap_no, index, thr, nullptr);
4784 4785 4786 4787
        trx->mutex_unlock();
      }
    }
  }
4788

4789 4790 4791 4792 4793 4794 4795 4796 4797 4798 4799 4800 4801
  switch (err) {
  case DB_SUCCESS_LOCKED_REC:
    err = DB_SUCCESS;
    /* fall through */
  case DB_SUCCESS:
    if (!inherit_in || index->is_clust())
      break;
    /* Update the page max trx id field */
    page_update_max_trx_id(block, buf_block_get_page_zip(block), trx->id, mtr);
  default:
    /* We only care about the two return values. */
    break;
  }
4802

4803
#ifdef UNIV_DEBUG
4804 4805 4806 4807 4808
  {
    mem_heap_t *heap= nullptr;
    rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
    const rec_offs *offsets;
    rec_offs_init(offsets_);
4809

Marko Mäkelä's avatar
Marko Mäkelä committed
4810
    offsets= rec_get_offsets(next_rec, index, offsets_, index->n_core_fields,
4811
                             ULINT_UNDEFINED, &heap);
4812

4813
    ut_ad(lock_rec_queue_validate(false, id, next_rec, index, offsets));
4814

4815 4816 4817
    if (UNIV_LIKELY_NULL(heap))
      mem_heap_free(heap);
  }
4818 4819
#endif /* UNIV_DEBUG */

4820
  return err;
4821 4822 4823
}

/*********************************************************************//**
4824 4825 4826 4827
Creates an explicit record lock for a running transaction that currently only
has an implicit lock on the record. The transaction instance must have a
reference count > 0 so that it can't be committed and freed before this
function has completed. */
4828
static
4829 4830 4831
void
lock_rec_convert_impl_to_expl_for_trx(
/*==================================*/
4832
	const page_id_t		id,	/*!< in: page identifier */
4833 4834 4835 4836
	const rec_t*		rec,	/*!< in: user record on page */
	dict_index_t*		index,	/*!< in: index of record */
	trx_t*			trx,	/*!< in/out: active transaction */
	ulint			heap_no)/*!< in: rec heap number to lock */
4837
{
4838 4839 4840
  ut_ad(trx->is_referenced());
  ut_ad(page_rec_is_leaf(rec));
  ut_ad(!rec_is_metadata(rec, *index));
4841

4842 4843
  DEBUG_SYNC_C("before_lock_rec_convert_impl_to_expl_for_trx");
  {
4844
    LockGuard g{lock_sys.rec_hash, id};
4845 4846
    trx->mutex_lock();
    ut_ad(!trx_state_eq(trx, TRX_STATE_NOT_STARTED));
4847

4848
    if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) &&
4849 4850 4851 4852
        !lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id, heap_no,
                           trx))
      lock_rec_add_to_queue(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id,
                            page_align(rec), heap_no, index, trx, true);
4853
  }
4854

4855 4856
  trx->mutex_unlock();
  trx->release_reference();
4857

4858
  DEBUG_SYNC_C("after_lock_rec_convert_impl_to_expl_for_trx");
4859 4860
}

4861 4862 4863 4864 4865

#ifdef UNIV_DEBUG
struct lock_rec_other_trx_holds_expl_arg
{
  const ulint heap_no;
4866
  const hash_cell_t &cell;
4867 4868
  const page_id_t id;
  const trx_t &impl_trx;
4869 4870 4871 4872 4873 4874 4875
};


static my_bool lock_rec_other_trx_holds_expl_callback(
  rw_trx_hash_element_t *element,
  lock_rec_other_trx_holds_expl_arg *arg)
{
4876
  mysql_mutex_lock(&element->mutex);
4877 4878
  if (element->trx)
  {
4879
    element->trx->mutex_lock();
Marko Mäkelä's avatar
Marko Mäkelä committed
4880 4881
    ut_ad(element->trx->state != TRX_STATE_NOT_STARTED);
    lock_t *expl_lock= element->trx->state == TRX_STATE_COMMITTED_IN_MEMORY
4882 4883
      ? nullptr
      : lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP,
4884
                          arg->cell, arg->id, arg->heap_no, element->trx);
4885 4886 4887 4888
    /*
      An explicit lock is held by trx other than the trx holding the implicit
      lock.
    */
4889
    ut_ad(!expl_lock || expl_lock->trx == &arg->impl_trx);
4890
    element->trx->mutex_unlock();
4891
  }
4892
  mysql_mutex_unlock(&element->mutex);
4893 4894 4895 4896 4897 4898 4899 4900 4901 4902 4903 4904 4905 4906 4907
  return 0;
}


/**
  Checks if some transaction, other than given trx_id, has an explicit
  lock on the given rec.

  FIXME: if the current transaction holds implicit lock from INSERT, a
  subsequent locking read should not convert it to explicit. See also
  MDEV-11215.

  @param      caller_trx  trx of current thread
  @param[in]  trx         trx holding implicit lock on rec
  @param[in]  rec         user record
4908
  @param[in]  id          page identifier
4909 4910 4911 4912
*/

static void lock_rec_other_trx_holds_expl(trx_t *caller_trx, trx_t *trx,
                                          const rec_t *rec,
4913
                                          const page_id_t id)
4914 4915 4916
{
  if (trx)
  {
4917
    ut_ad(!page_rec_is_metadata(rec));
4918
    LockGuard g{lock_sys.rec_hash, id};
4919
    ut_ad(trx->is_referenced());
4920
    const trx_state_t state{trx->state};
4921 4922
    ut_ad(state != TRX_STATE_NOT_STARTED);
    if (state == TRX_STATE_COMMITTED_IN_MEMORY)
4923
      /* The transaction was committed before we acquired LockGuard. */
4924
      return;
4925
    lock_rec_other_trx_holds_expl_arg arg=
4926
    { page_rec_get_heap_no(rec), g.cell(), id, *trx };
4927
    trx_sys.rw_trx_hash.iterate(caller_trx,
4928
                                lock_rec_other_trx_holds_expl_callback, &arg);
4929 4930 4931 4932 4933
  }
}
#endif /* UNIV_DEBUG */


4934 4935 4936 4937 4938 4939 4940 4941 4942 4943 4944 4945
/** If an implicit x-lock exists on a record, convert it to an explicit one.

Often, this is called by a transaction that is about to enter a lock wait
due to the lock conflict. Two explicit locks would be created: first the
exclusive lock on behalf of the lock-holder transaction in this function,
and then a wait request on behalf of caller_trx, in the calling function.

This may also be called by the same transaction that is already holding
an implicit exclusive lock on the record. In this case, no explicit lock
should be created.

@param[in,out]	caller_trx	current transaction
4946
@param[in]	id		index tree leaf page identifier
4947 4948 4949 4950
@param[in]	rec		record on the leaf page
@param[in]	index		the index of the record
@param[in]	offsets		rec_get_offsets(rec,index)
@return	whether caller_trx already holds an exclusive lock on rec */
4951
static
4952
bool
4953
lock_rec_convert_impl_to_expl(
4954
	trx_t*			caller_trx,
4955
	page_id_t		id,
4956 4957
	const rec_t*		rec,
	dict_index_t*		index,
4958
	const rec_offs*		offsets)
4959
{
4960
	trx_t*		trx;
4961

4962
	lock_sys.assert_unlocked();
4963
	ut_ad(page_rec_is_user_rec(rec));
4964 4965
	ut_ad(rec_offs_validate(rec, index, offsets));
	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
4966
	ut_ad(page_rec_is_leaf(rec));
4967
	ut_ad(!rec_is_metadata(rec, *index));
4968

4969 4970
	if (dict_index_is_clust(index)) {
		trx_id_t	trx_id;
4971

4972 4973
		trx_id = lock_clust_rec_some_has_impl(rec, index, offsets);

4974 4975 4976 4977 4978 4979 4980
		if (trx_id == 0) {
			return false;
		}
		if (UNIV_UNLIKELY(trx_id == caller_trx->id)) {
			return true;
		}

4981
		trx = trx_sys.find(caller_trx, trx_id);
4982 4983 4984
	} else {
		ut_ad(!dict_index_is_online_ddl(index));

4985 4986
		trx = lock_sec_rec_some_has_impl(caller_trx, rec, index,
						 offsets);
4987 4988 4989 4990
		if (trx == caller_trx) {
			trx->release_reference();
			return true;
		}
4991

4992
		ut_d(lock_rec_other_trx_holds_expl(caller_trx, trx, rec, id));
4993 4994
	}

4995
	if (trx) {
4996
		ulint	heap_no = page_rec_get_heap_no(rec);
4997

4998
		ut_ad(trx->is_referenced());
4999

5000 5001 5002
		/* If the transaction is still active and has no
		explicit x-lock set on the record, set one for it.
		trx cannot be committed until the ref count is zero. */
5003

5004
		lock_rec_convert_impl_to_expl_for_trx(
5005
			id, rec, index, trx, heap_no);
5006
	}
5007 5008

	return false;
5009
}
5010

5011 5012 5013 5014 5015 5016 5017
/*********************************************************************//**
Checks if locks of other transactions prevent an immediate modify (update,
delete mark, or delete unmark) of a clustered index record. If they do,
first tests if the query thread should anyway be suspended for some
reason; if not, then puts the transaction and the query thread to the
lock wait state and inserts a waiting request for a record x-lock to the
lock queue.
5018
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5019 5020 5021 5022 5023 5024 5025
dberr_t
lock_clust_rec_modify_check_and_lock(
/*=================================*/
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: record which should be
					modified */
	dict_index_t*		index,	/*!< in: clustered index */
5026
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5027 5028 5029 5030
	que_thr_t*		thr)	/*!< in: query thread */
{
	dberr_t	err;
	ulint	heap_no;
5031

5032
	ut_ad(rec_offs_validate(rec, index, offsets));
5033
	ut_ad(page_rec_is_leaf(rec));
5034 5035
	ut_ad(dict_index_is_clust(index));
	ut_ad(block->frame == page_align(rec));
5036

5037
	ut_ad(!rec_is_metadata(rec, *index));
5038
	ut_ad(!index->table->is_temporary());
5039

5040 5041 5042
	heap_no = rec_offs_comp(offsets)
		? rec_get_heap_no_new(rec)
		: rec_get_heap_no_old(rec);
5043

5044 5045
	/* If a transaction has no explicit x-lock set on the record, set one
	for it */
5046

5047 5048
	if (lock_rec_convert_impl_to_expl(thr_get_trx(thr), block->page.id(),
					  rec, index, offsets)) {
5049 5050 5051
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
	}
5052

5053
	err = lock_rec_lock(true, LOCK_X | LOCK_REC_NOT_GAP,
5054
			    block, heap_no, index, thr);
5055

5056 5057
	ut_ad(lock_rec_queue_validate(false, block->page.id(),
				      rec, index, offsets));
5058 5059 5060

	if (err == DB_SUCCESS_LOCKED_REC) {
		err = DB_SUCCESS;
5061 5062
	}

5063
	return(err);
5064 5065 5066
}

/*********************************************************************//**
5067 5068
Checks if locks of other transactions prevent an immediate modify (delete
mark or delete unmark) of a secondary index record.
5069
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5070 5071 5072 5073 5074 5075 5076 5077 5078 5079 5080 5081 5082 5083 5084
dberr_t
lock_sec_rec_modify_check_and_lock(
/*===============================*/
	ulint		flags,	/*!< in: if BTR_NO_LOCKING_FLAG
				bit is set, does nothing */
	buf_block_t*	block,	/*!< in/out: buffer block of rec */
	const rec_t*	rec,	/*!< in: record which should be
				modified; NOTE: as this is a secondary
				index, we always have to modify the
				clustered index record first: see the
				comment below */
	dict_index_t*	index,	/*!< in: secondary index */
	que_thr_t*	thr,	/*!< in: query thread
				(can be NULL if BTR_NO_LOCKING_FLAG) */
	mtr_t*		mtr)	/*!< in/out: mini-transaction */
5085
{
5086 5087
	dberr_t	err;
	ulint	heap_no;
5088

5089 5090 5091
	ut_ad(!dict_index_is_clust(index));
	ut_ad(!dict_index_is_online_ddl(index) || (flags & BTR_CREATE_FLAG));
	ut_ad(block->frame == page_align(rec));
5092
	ut_ad(mtr->is_named_space(index->table->space));
5093
	ut_ad(page_rec_is_leaf(rec));
5094
	ut_ad(!rec_is_metadata(rec, *index));
5095

5096
	if (flags & BTR_NO_LOCKING_FLAG) {
5097

5098
		return(DB_SUCCESS);
5099
	}
5100
	ut_ad(!index->table->is_temporary());
5101

5102
	heap_no = page_rec_get_heap_no(rec);
5103

5104 5105 5106 5107 5108 5109 5110 5111 5112 5113
#ifdef WITH_WSREP
	trx_t *trx= thr_get_trx(thr);
	/* If transaction scanning an unique secondary key is wsrep
	high priority thread (brute force) this scanning may involve
	GAP-locking in the index. As this locking happens also when
	applying replication events in high priority applier threads,
	there is a probability for lock conflicts between two wsrep
	high priority threads. To avoid this GAP-locking we mark that
	this transaction is using unique key scan here. */
	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
Marko Mäkelä's avatar
Marko Mäkelä committed
5114
		trx->wsrep = 3;
5115 5116
#endif /* WITH_WSREP */

5117 5118 5119 5120
	/* Another transaction cannot have an implicit lock on the record,
	because when we come here, we already have modified the clustered
	index record, and this would not have been possible if another active
	transaction had modified this secondary index record. */
5121

5122
	err = lock_rec_lock(true, LOCK_X | LOCK_REC_NOT_GAP,
5123
			    block, heap_no, index, thr);
5124

5125
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
5126
	if (trx->wsrep == 3) trx->wsrep = 1;
5127
#endif /* WITH_WSREP */
5128

5129 5130 5131
#ifdef UNIV_DEBUG
	{
		mem_heap_t*	heap		= NULL;
5132 5133
		rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
		const rec_offs*	offsets;
5134
		rec_offs_init(offsets_);
5135

5136 5137
		offsets = rec_get_offsets(rec, index, offsets_,
					  index->n_core_fields,
5138
					  ULINT_UNDEFINED, &heap);
5139

5140
		ut_ad(lock_rec_queue_validate(
5141
			      false, block->page.id(), rec, index, offsets));
5142

5143 5144
		if (heap != NULL) {
			mem_heap_free(heap);
5145 5146
		}
	}
5147
#endif /* UNIV_DEBUG */
5148

5149 5150 5151 5152 5153 5154 5155 5156 5157
	if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
		/* Update the page max trx id field */
		/* It might not be necessary to do this if
		err == DB_SUCCESS (no new lock created),
		but it should not cost too much performance. */
		page_update_max_trx_id(block,
				       buf_block_get_page_zip(block),
				       thr_get_trx(thr)->id, mtr);
		err = DB_SUCCESS;
5158
	}
5159 5160

	return(err);
5161 5162 5163
}

/*********************************************************************//**
5164 5165
Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record.
5166
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5167 5168 5169 5170 5171 5172 5173 5174 5175 5176 5177
dberr_t
lock_sec_rec_read_check_and_lock(
/*=============================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: secondary index */
5178
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5179 5180 5181 5182 5183
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5184
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5185 5186
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5187
{
5188 5189
	dberr_t	err;
	ulint	heap_no;
5190

5191 5192 5193 5194 5195
	ut_ad(!dict_index_is_clust(index));
	ut_ad(!dict_index_is_online_ddl(index));
	ut_ad(block->frame == page_align(rec));
	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
	ut_ad(rec_offs_validate(rec, index, offsets));
5196
	ut_ad(page_rec_is_leaf(rec));
5197
	ut_ad(mode == LOCK_X || mode == LOCK_S);
5198

5199 5200
	if ((flags & BTR_NO_LOCKING_FLAG)
	    || srv_read_only_mode
5201
	    || index->table->is_temporary()) {
5202

5203 5204
		return(DB_SUCCESS);
	}
5205

5206 5207
	const page_id_t id{block->page.id()};

5208
	ut_ad(!rec_is_metadata(rec, *index));
5209
	heap_no = page_rec_get_heap_no(rec);
5210

5211 5212 5213
	/* Some transaction may have an implicit x-lock on the record only
	if the max trx id for the page >= min trx id for the trx list or a
	database recovery is running. */
5214

5215
	if (!page_rec_is_supremum(rec)
5216
	    && page_get_max_trx_id(block->frame) >= trx_sys.get_min_trx_id()
5217
	    && lock_rec_convert_impl_to_expl(thr_get_trx(thr), id, rec,
5218 5219 5220
					     index, offsets)) {
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
5221
	}
5222

5223 5224 5225 5226 5227 5228 5229 5230 5231 5232
#ifdef WITH_WSREP
	trx_t *trx= thr_get_trx(thr);
	/* If transaction scanning an unique secondary key is wsrep
	high priority thread (brute force) this scanning may involve
	GAP-locking in the index. As this locking happens also when
	applying replication events in high priority applier threads,
	there is a probability for lock conflicts between two wsrep
	high priority threads. To avoid this GAP-locking we mark that
	this transaction is using unique key scan here. */
	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
Marko Mäkelä's avatar
Marko Mäkelä committed
5233
		trx->wsrep = 3;
5234
#endif /* WITH_WSREP */
5235

5236
	err = lock_rec_lock(false, gap_mode | mode,
5237
			    block, heap_no, index, thr);
5238

5239
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
5240
	if (trx->wsrep == 3) trx->wsrep = 1;
5241
#endif /* WITH_WSREP */
5242

5243
	ut_ad(lock_rec_queue_validate(false, id, rec, index, offsets));
5244

5245
	return(err);
5246 5247 5248
}

/*********************************************************************//**
5249 5250 5251 5252 5253 5254
Checks if locks of other transactions prevent an immediate read, or passing
over by a read cursor, of a clustered index record. If they do, first tests
if the query thread should anyway be suspended for some reason; if not, then
puts the transaction and the query thread to the lock wait state and inserts a
waiting request for a record lock to the lock queue. Sets the requested mode
lock on the record.
5255
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5256 5257 5258 5259 5260 5261 5262 5263 5264 5265 5266
dberr_t
lock_clust_rec_read_check_and_lock(
/*===============================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: clustered index */
5267
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5268 5269 5270 5271 5272
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5273
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5274 5275
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5276
{
5277 5278
	dberr_t	err;
	ulint	heap_no;
5279

5280 5281 5282 5283 5284 5285
	ut_ad(dict_index_is_clust(index));
	ut_ad(block->frame == page_align(rec));
	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
	ut_ad(gap_mode == LOCK_ORDINARY || gap_mode == LOCK_GAP
	      || gap_mode == LOCK_REC_NOT_GAP);
	ut_ad(rec_offs_validate(rec, index, offsets));
5286
	ut_ad(page_rec_is_leaf(rec));
5287
	ut_ad(!rec_is_metadata(rec, *index));
5288

5289 5290
	if ((flags & BTR_NO_LOCKING_FLAG)
	    || srv_read_only_mode
5291
	    || index->table->is_temporary()) {
5292

5293
		return(DB_SUCCESS);
5294 5295
	}

5296 5297
	const page_id_t id{block->page.id()};

5298
	heap_no = page_rec_get_heap_no(rec);
5299

5300
	if (heap_no != PAGE_HEAP_NO_SUPREMUM
5301
	    && lock_rec_convert_impl_to_expl(thr_get_trx(thr), id, rec,
5302 5303 5304
					     index, offsets)) {
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
5305
	}
5306

5307
	err = lock_rec_lock(false, gap_mode | mode,
5308
			    block, heap_no, index, thr);
5309

5310
	ut_ad(lock_rec_queue_validate(false, id, rec, index, offsets));
5311

5312
	DEBUG_SYNC_C("after_lock_clust_rec_read_check_and_lock");
5313

5314 5315
	return(err);
}
5316
/*********************************************************************//**
5317 5318 5319 5320 5321 5322 5323 5324
Checks if locks of other transactions prevent an immediate read, or passing
over by a read cursor, of a clustered index record. If they do, first tests
if the query thread should anyway be suspended for some reason; if not, then
puts the transaction and the query thread to the lock wait state and inserts a
waiting request for a record lock to the lock queue. Sets the requested mode
lock on the record. This is an alternative version of
lock_clust_rec_read_check_and_lock() that does not require the parameter
"offsets".
5325
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5326
dberr_t
5327 5328 5329 5330 5331 5332 5333 5334 5335 5336 5337 5338 5339 5340 5341
lock_clust_rec_read_check_and_lock_alt(
/*===================================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: clustered index */
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5342
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5343 5344
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5345
{
5346
	mem_heap_t*	tmp_heap	= NULL;
5347 5348
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*	offsets		= offsets_;
5349
	dberr_t		err;
5350
	rec_offs_init(offsets_);
5351

5352
	ut_ad(page_rec_is_leaf(rec));
5353
	offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
5354 5355 5356 5357 5358 5359
				  ULINT_UNDEFINED, &tmp_heap);
	err = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
						 offsets, mode, gap_mode, thr);
	if (tmp_heap) {
		mem_heap_free(tmp_heap);
	}
5360

5361 5362
	if (err == DB_SUCCESS_LOCKED_REC) {
		err = DB_SUCCESS;
5363 5364
	}

5365 5366
	return(err);
}
5367

5368 5369 5370 5371 5372 5373 5374 5375 5376 5377
/*******************************************************************//**
Check if a transaction holds any autoinc locks.
@return TRUE if the transaction holds any AUTOINC locks. */
static
ibool
lock_trx_holds_autoinc_locks(
/*=========================*/
	const trx_t*	trx)		/*!< in: transaction */
{
	ut_a(trx->autoinc_locks != NULL);
5378

5379 5380
	return(!ib_vector_is_empty(trx->autoinc_locks));
}
5381

5382
/** Release all AUTO_INCREMENT locks of the transaction. */
5383
static void lock_release_autoinc_locks(trx_t *trx)
5384
{
5385
  {
5386 5387 5388 5389 5390 5391 5392 5393 5394 5395 5396 5397 5398 5399 5400 5401 5402
    LockMutexGuard g{SRW_LOCK_CALL};
    mysql_mutex_lock(&lock_sys.wait_mutex);
    trx->mutex_lock();
    auto autoinc_locks= trx->autoinc_locks;
    ut_a(autoinc_locks);

    /* We release the locks in the reverse order. This is to avoid
    searching the vector for the element to delete at the lower level.
    See (lock_table_remove_low()) for details. */
    while (ulint size= ib_vector_size(autoinc_locks))
    {
      lock_t *lock= *static_cast<lock_t**>
        (ib_vector_get(autoinc_locks, size - 1));
      ut_ad(lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE));
      lock_table_dequeue(lock, true);
      lock_trx_table_locks_remove(lock);
    }
5403
  }
5404 5405
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  trx->mutex_unlock();
5406
}
5407

5408
/** Cancel a waiting lock request and release possibly waiting transactions */
5409
static void lock_cancel_waiting_and_release(lock_t *lock)
5410
{
5411
  lock_sys.assert_locked(*lock);
5412 5413
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  trx_t *trx= lock->trx;
5414
  trx->mutex_lock();
5415
  ut_ad(trx->state == TRX_STATE_ACTIVE);
5416

5417
  if (!lock->is_table())
5418
    lock_rec_dequeue_from_page(lock, true);
5419 5420
  else
  {
5421 5422 5423 5424 5425
    if (lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE))
    {
      ut_ad(trx->autoinc_locks);
      ib_vector_remove(trx->autoinc_locks, lock);
    }
5426
    lock_table_dequeue(lock, true);
5427 5428 5429
    /* Remove the lock from table lock vector too. */
    lock_trx_table_locks_remove(lock);
  }
5430

5431 5432
  /* Reset the wait flag and the back pointer to lock in trx. */
  lock_reset_lock_and_trx_wait(lock);
5433

5434
  lock_wait_end(trx);
5435 5436
  trx->mutex_unlock();
}
5437 5438 5439 5440 5441 5442 5443 5444 5445 5446 5447 5448 5449 5450 5451
#ifdef WITH_WSREP
void lock_sys_t::cancel_lock_wait_for_trx(trx_t *trx)
{
  lock_sys.wr_lock(SRW_LOCK_CALL);
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (lock_t *lock= trx->lock.wait_lock)
  {
    /* check if victim is still waiting */
    if (lock->is_waiting())
      lock_cancel_waiting_and_release(lock);
  }
  lock_sys.wr_unlock();
  mysql_mutex_unlock(&lock_sys.wait_mutex);
}
#endif /* WITH_WSREP */
5452 5453 5454

/** Cancel a waiting lock request.
@param lock   waiting lock request
5455 5456 5457 5458 5459 5460
@param trx    active transaction
@param check_victim  whether to check trx->lock.was_chosen_as_deadlock_victim
@retval DB_SUCCESS    if no lock existed
@retval DB_DEADLOCK   if trx->lock.was_chosen_as_deadlock_victim was set
@retval DB_LOCK_WAIT  if the lock was canceled */
dberr_t lock_sys_t::cancel(trx_t *trx, lock_t *lock, bool check_victim)
5461 5462 5463 5464
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  ut_ad(trx->lock.wait_lock == lock);
  ut_ad(trx->state == TRX_STATE_ACTIVE);
5465
  dberr_t err= DB_SUCCESS;
5466 5467 5468

  if (lock->is_table())
  {
5469 5470 5471 5472 5473 5474 5475 5476 5477 5478 5479 5480 5481 5482 5483 5484 5485 5486 5487 5488 5489 5490 5491 5492 5493 5494 5495 5496 5497
    if (!lock_sys.rd_lock_try())
    {
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.rd_lock(SRW_LOCK_CALL);
      mysql_mutex_lock(&lock_sys.wait_mutex);
      lock= trx->lock.wait_lock;
      if (!lock);
      else if (check_victim && trx->lock.was_chosen_as_deadlock_victim)
        err= DB_DEADLOCK;
      else
        goto resolve_table_lock;
    }
    else
    {
resolve_table_lock:
      dict_table_t *table= lock->un_member.tab_lock.table;
      table->lock_mutex_lock();
      if (lock->is_waiting())
        lock_cancel_waiting_and_release(lock);
      table->lock_mutex_unlock();
      /* Even if lock->is_waiting() did not hold above, we must return
      DB_LOCK_WAIT, or otherwise optimistic parallel replication could
      occasionally hang. Potentially affected tests:
      rpl.rpl_parallel_optimistic
      rpl.rpl_parallel_optimistic_nobinlog
      rpl.rpl_parallel_optimistic_xa_lsu_off */
      err= DB_LOCK_WAIT;
    }
    lock_sys.rd_unlock();
5498 5499 5500
  }
  else
  {
5501 5502 5503 5504 5505 5506 5507 5508 5509 5510 5511 5512 5513 5514 5515 5516 5517 5518 5519 5520 5521 5522 5523 5524 5525 5526 5527 5528
    /* To prevent the record lock from being moved between pages
    during a page split or merge, we must hold exclusive lock_sys.latch. */
    if (!lock_sys.wr_lock_try())
    {
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.wr_lock(SRW_LOCK_CALL);
      mysql_mutex_lock(&lock_sys.wait_mutex);
      lock= trx->lock.wait_lock;
      if (!lock);
      else if (check_victim && trx->lock.was_chosen_as_deadlock_victim)
        err= DB_DEADLOCK;
      else
        goto resolve_record_lock;
    }
    else
    {
resolve_record_lock:
      if (lock->is_waiting())
        lock_cancel_waiting_and_release(lock);
      /* Even if lock->is_waiting() did not hold above, we must return
      DB_LOCK_WAIT, or otherwise optimistic parallel replication could
      occasionally hang. Potentially affected tests:
      rpl.rpl_parallel_optimistic
      rpl.rpl_parallel_optimistic_nobinlog
      rpl.rpl_parallel_optimistic_xa_lsu_off */
      err= DB_LOCK_WAIT;
    }
    lock_sys.wr_unlock();
5529
  }
5530 5531

  return err;
5532 5533 5534 5535 5536 5537 5538 5539 5540
}

/** Cancel a waiting lock request (if any) when killing a transaction */
void lock_sys_t::cancel(trx_t *trx)
{
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (lock_t *lock= trx->lock.wait_lock)
  {
    trx->error_state= DB_INTERRUPTED;
5541
    cancel(trx, lock, false);
5542 5543 5544
  }
  lock_sys.deadlock_check();
  mysql_mutex_unlock(&lock_sys.wait_mutex);
5545 5546 5547 5548 5549 5550 5551 5552 5553 5554 5555
}

/*********************************************************************//**
Unlocks AUTO_INC type locks that were possibly reserved by a trx. This
function should be called at the the end of an SQL statement, by the
connection thread that owns the transaction (trx->mysql_thd). */
void
lock_unlock_table_autoinc(
/*======================*/
	trx_t*	trx)	/*!< in/out: transaction */
{
5556
	lock_sys.assert_unlocked();
5557
	ut_ad(!trx->mutex_is_owner());
5558 5559 5560 5561
	ut_ad(!trx->lock.wait_lock);

	/* This can be invoked on NOT_STARTED, ACTIVE, PREPARED,
	but not COMMITTED transactions. */
5562

5563 5564
	ut_ad(trx_state_eq(trx, TRX_STATE_NOT_STARTED)
	      || !trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY));
5565

5566 5567 5568
	/* This function is invoked for a running transaction by the
	thread that is serving the transaction. Therefore it is not
	necessary to hold trx->mutex here. */
5569

5570
	if (lock_trx_holds_autoinc_locks(trx)) {
5571
		lock_release_autoinc_locks(trx);
5572 5573 5574
	}
}

5575 5576 5577 5578 5579 5580 5581 5582
/** Handle a pending lock wait (DB_LOCK_WAIT) in a semi-consistent read
while holding a clustered index leaf page latch.
@param trx           transaction that is or was waiting for a lock
@retval DB_SUCCESS   if the lock was granted
@retval DB_DEADLOCK  if the transaction must be aborted due to a deadlock
@retval DB_LOCK_WAIT if a lock wait would be necessary; the pending
                     lock request was released */
dberr_t lock_trx_handle_wait(trx_t *trx)
5583
{
5584 5585 5586 5587
  if (trx->lock.was_chosen_as_deadlock_victim)
    return DB_DEADLOCK;
  if (!trx->lock.wait_lock)
    return DB_SUCCESS;
5588 5589 5590 5591 5592
  dberr_t err= DB_SUCCESS;
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (trx->lock.was_chosen_as_deadlock_victim)
    err= DB_DEADLOCK;
  else if (lock_t *wait_lock= trx->lock.wait_lock)
5593
    err= lock_sys_t::cancel(trx, wait_lock, true);
5594
  lock_sys.deadlock_check();
5595
  mysql_mutex_unlock(&lock_sys.wait_mutex);
5596
  return err;
5597 5598
}

5599
#ifdef UNIV_DEBUG
5600 5601 5602 5603 5604 5605 5606 5607 5608 5609
/**
  Do an exhaustive check for any locks (table or rec) against the table.

  @param[in]  table  check if there are any locks held on records in this table
                     or on the table itself
*/

static my_bool lock_table_locks_lookup(rw_trx_hash_element_t *element,
                                       const dict_table_t *table)
{
5610
  lock_sys.assert_locked();
5611
  mysql_mutex_lock(&element->mutex);
5612 5613
  if (element->trx)
  {
5614
    element->trx->mutex_lock();
5615
    check_trx_state(element->trx);
Marko Mäkelä's avatar
Marko Mäkelä committed
5616
    if (element->trx->state != TRX_STATE_COMMITTED_IN_MEMORY)
5617
    {
Marko Mäkelä's avatar
Marko Mäkelä committed
5618 5619 5620
      for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
           lock != NULL;
           lock= UT_LIST_GET_NEXT(trx_locks, lock))
5621
      {
Marko Mäkelä's avatar
Marko Mäkelä committed
5622
        ut_ad(lock->trx == element->trx);
5623
        if (!lock->is_table())
Marko Mäkelä's avatar
Marko Mäkelä committed
5624
        {
Marko Mäkelä's avatar
Marko Mäkelä committed
5625
          ut_ad(lock->index->online_status != ONLINE_INDEX_CREATION ||
Marko Mäkelä's avatar
Marko Mäkelä committed
5626 5627 5628 5629 5630
                lock->index->is_primary());
          ut_ad(lock->index->table != table);
        }
        else
          ut_ad(lock->un_member.tab_lock.table != table);
5631 5632
      }
    }
5633
    element->trx->mutex_unlock();
5634
  }
5635
  mysql_mutex_unlock(&element->mutex);
5636
  return 0;
5637
}
5638
#endif /* UNIV_DEBUG */
5639

5640
/** Check if there are any locks on a table.
5641
@return true if table has either table or record locks. */
5642
bool lock_table_has_locks(dict_table_t *table)
5643
{
5644 5645 5646 5647 5648 5649 5650
  if (table->n_rec_locks)
    return true;
  table->lock_mutex_lock();
  auto len= UT_LIST_GET_LEN(table->locks);
  table->lock_mutex_unlock();
  if (len)
    return true;
5651
#ifdef UNIV_DEBUG
5652 5653 5654 5655 5656
  {
    LockMutexGuard g{SRW_LOCK_CALL};
    trx_sys.rw_trx_hash.iterate(lock_table_locks_lookup,
                                const_cast<const dict_table_t*>(table));
  }
5657
#endif /* UNIV_DEBUG */
5658
  return false;
5659
}
5660

5661 5662 5663 5664 5665 5666 5667 5668 5669
/*******************************************************************//**
Initialise the table lock list. */
void
lock_table_lock_list_init(
/*======================*/
	table_lock_list_t*	lock_list)	/*!< List to initialise */
{
	UT_LIST_INIT(*lock_list, &lock_table_t::locks);
}
5670

5671 5672 5673 5674 5675 5676 5677 5678 5679 5680 5681 5682
#ifdef UNIV_DEBUG
/*******************************************************************//**
Check if the transaction holds any locks on the sys tables
or its records.
@return the strongest lock found on any sys table or 0 for none */
const lock_t*
lock_trx_has_sys_table_locks(
/*=========================*/
	const trx_t*	trx)	/*!< in: transaction to check */
{
	const lock_t*	strongest_lock = 0;
	lock_mode	strongest = LOCK_NONE;
5683

5684
	LockMutexGuard g{SRW_LOCK_CALL};
5685

5686 5687
	const lock_list::const_iterator end = trx->lock.table_locks.end();
	lock_list::const_iterator it = trx->lock.table_locks.begin();
5688

5689
	/* Find a valid mode. Note: ib_vector_size() can be 0. */
5690

5691 5692 5693 5694 5695 5696
	for (/* No op */; it != end; ++it) {
		const lock_t*	lock = *it;

		if (lock != NULL
		    && dict_is_sys_table(lock->un_member.tab_lock.table->id)) {

5697
			strongest = lock->mode();
5698 5699 5700 5701
			ut_ad(strongest != LOCK_NONE);
			strongest_lock = lock;
			break;
		}
5702 5703
	}

5704 5705
	if (strongest == LOCK_NONE) {
		return(NULL);
5706 5707
	}

5708 5709
	for (/* No op */; it != end; ++it) {
		const lock_t*	lock = *it;
5710

5711 5712 5713
		if (lock == NULL) {
			continue;
		}
5714

5715
		ut_ad(trx == lock->trx);
5716 5717
		ut_ad(lock->is_table());
		ut_ad(lock->un_member.tab_lock.table);
5718

5719
		lock_mode mode = lock->mode();
5720

5721 5722
		if (dict_is_sys_table(lock->un_member.tab_lock.table->id)
		    && lock_mode_stronger_or_eq(mode, strongest)) {
5723

5724 5725 5726 5727
			strongest = mode;
			strongest_lock = lock;
		}
	}
5728

5729
	return(strongest_lock);
5730 5731
}

5732 5733 5734
/** Check if the transaction holds an explicit exclusive lock on a record.
@param[in]	trx	transaction
@param[in]	table	table
5735
@param[in]	id	leaf page identifier
5736 5737
@param[in]	heap_no	heap number identifying the record
@return whether an explicit X-lock is held */
5738 5739
bool lock_trx_has_expl_x_lock(const trx_t &trx, const dict_table_t &table,
                              page_id_t id, ulint heap_no)
5740
{
5741 5742
  ut_ad(heap_no > PAGE_HEAP_NO_SUPREMUM);
  ut_ad(lock_table_has(&trx, &table, LOCK_IX));
5743 5744
  if (!lock_table_has(&trx, &table, LOCK_X))
  {
5745
    LockGuard g{lock_sys.rec_hash, id};
5746 5747
    ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
                            g.cell(), id, heap_no, &trx));
5748
  }
5749
  return true;
5750
}
5751
#endif /* UNIV_DEBUG */
5752

5753
namespace Deadlock
5754
{
5755 5756 5757 5758 5759 5760
  /** rewind(3) the file used for storing the latest detected deadlock and
  print a heading message to stderr if printing of all deadlocks to stderr
  is enabled. */
  static void start_print()
  {
    lock_sys.assert_locked();
5761

5762 5763
    rewind(lock_latest_err_file);
    ut_print_timestamp(lock_latest_err_file);
5764

5765 5766 5767 5768
    if (srv_print_all_deadlocks)
      ib::info() << "Transactions deadlock detected,"
                    " dumping detailed information.";
  }
5769

5770 5771 5772
  /** Print a message to the deadlock file and possibly to stderr.
  @param msg message to print */
  static void print(const char *msg)
5773
  {
5774 5775 5776
    fputs(msg, lock_latest_err_file);
    if (srv_print_all_deadlocks)
      ib::info() << msg;
5777
  }
5778

5779 5780 5781 5782 5783
  /** Print transaction data to the deadlock file and possibly to stderr.
  @param trx transaction */
  static void print(const trx_t &trx)
  {
    lock_sys.assert_locked();
5784

5785 5786 5787
    ulint n_rec_locks= trx.lock.n_rec_locks;
    ulint n_trx_locks= UT_LIST_GET_LEN(trx.lock.trx_locks);
    ulint heap_size= mem_heap_get_size(trx.lock.lock_heap);
5788

5789 5790
    trx_print_low(lock_latest_err_file, &trx, 3000,
                  n_rec_locks, n_trx_locks, heap_size);
5791

5792 5793 5794
    if (srv_print_all_deadlocks)
      trx_print_low(stderr, &trx, 3000, n_rec_locks, n_trx_locks, heap_size);
  }
5795

5796 5797 5798 5799 5800
  /** Print lock data to the deadlock file and possibly to stderr.
  @param lock record or table type lock */
  static void print(const lock_t &lock)
  {
    lock_sys.assert_locked();
5801

5802 5803 5804 5805
    if (!lock.is_table())
    {
      mtr_t mtr;
      lock_rec_print(lock_latest_err_file, &lock, mtr);
5806

5807 5808 5809 5810 5811 5812
      if (srv_print_all_deadlocks)
        lock_rec_print(stderr, &lock, mtr);
    }
    else
    {
      lock_table_print(lock_latest_err_file, &lock);
5813

5814 5815 5816 5817
      if (srv_print_all_deadlocks)
        lock_table_print(stderr, &lock);
    }
  }
5818

5819 5820 5821 5822 5823 5824
  ATTRIBUTE_COLD
  /** Report a deadlock (cycle in the waits-for graph).
  @param trx        transaction waiting for a lock in this thread
  @param current_trx whether trx belongs to the current thread
  @return the transaction to be rolled back (unless one was committed already)
  @return nullptr if no deadlock */
5825
  static trx_t *report(trx_t *const trx, bool current_trx)
5826
  {
5827 5828
    mysql_mutex_assert_owner(&lock_sys.wait_mutex);
    ut_ad(lock_sys.is_writer() == !current_trx);
5829

5830 5831
    /* Normally, trx should be a direct part of the deadlock
    cycle. However, if innodb_deadlock_detect had been OFF in the
5832 5833 5834 5835
    past, or if current_trx=false, trx may be waiting for a lock that
    is held by a participant of a pre-existing deadlock, without being
    part of the deadlock itself. That is, the path to the deadlock may be
    P-shaped instead of O-shaped, with trx being at the foot of the P.
5836 5837 5838 5839 5840 5841 5842

    We will process the entire path leading to a cycle, and we will
    choose the victim (to be aborted) among the cycle. */

    static const char rollback_msg[]= "*** WE ROLL BACK TRANSACTION (%u)\n";
    char buf[9 + sizeof rollback_msg];

5843 5844 5845 5846
    /* If current_trx=true, trx is owned by this thread, and we can
    safely invoke these without holding trx->mutex or lock_sys.latch.
    If current_trx=false, a concurrent commit is protected by both
    lock_sys.latch and lock_sys.wait_mutex. */
5847
    const undo_no_t trx_weight= TRX_WEIGHT(trx) |
5848 5849 5850 5851 5852 5853 5854
      (trx->mysql_thd &&
#ifdef WITH_WSREP
       (thd_has_edited_nontrans_tables(trx->mysql_thd) ||
        (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false)))
#else
       thd_has_edited_nontrans_tables(trx->mysql_thd)
#endif /* WITH_WSREP */
5855
       ? 1ULL << 63 : 0);
5856

5857 5858 5859
    trx_t *victim= nullptr;
    undo_no_t victim_weight= ~0ULL;
    unsigned victim_pos= 0, trx_pos= 0;
5860 5861

    if (current_trx && !lock_sys.wr_lock_try())
5862
    {
5863 5864
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.wr_lock(SRW_LOCK_CALL);
5865
      mysql_mutex_lock(&lock_sys.wait_mutex);
5866 5867 5868 5869
    }

    {
      unsigned l= 0;
5870 5871 5872 5873
      /* Now that we are holding lock_sys.wait_mutex again, check
      whether a cycle still exists. */
      trx_t *cycle= find_cycle(trx);
      if (!cycle)
5874
        goto func_exit; /* One of the transactions was already aborted. */
5875 5876 5877 5878
      for (trx_t *next= cycle;;)
      {
        next= next->lock.wait_trx;
        const undo_no_t next_weight= TRX_WEIGHT(next) |
5879 5880 5881 5882 5883 5884 5885
          (next->mysql_thd &&
#ifdef WITH_WSREP
           (thd_has_edited_nontrans_tables(next->mysql_thd) ||
            (next->is_wsrep() && wsrep_thd_is_BF(next->mysql_thd, false)))
#else
           thd_has_edited_nontrans_tables(next->mysql_thd)
#endif /* WITH_WSREP */
5886 5887 5888 5889 5890 5891 5892 5893 5894 5895 5896 5897
           ? 1ULL << 63 : 0);
        if (next_weight < victim_weight)
        {
          victim_weight= next_weight;
          victim= next;
          victim_pos= l;
        }
        if (next == victim)
          trx_pos= l;
        if (next == cycle)
          break;
      }
5898

5899 5900 5901 5902 5903
      if (trx_pos && trx_weight == victim_weight)
      {
        victim= trx;
        victim_pos= trx_pos;
      }
5904

5905 5906 5907 5908 5909 5910 5911 5912
      /* Finally, display the deadlock */
      switch (const auto r= static_cast<enum report>(innodb_deadlock_report)) {
      case REPORT_OFF:
        break;
      case REPORT_BASIC:
      case REPORT_FULL:
        start_print();
        l= 0;
5913

5914 5915 5916 5917 5918 5919 5920 5921 5922 5923 5924 5925
        for (trx_t *next= cycle;;)
        {
          next= next->lock.wait_trx;
          ut_ad(next);
          ut_ad(next->state == TRX_STATE_ACTIVE);
          const lock_t *wait_lock= next->lock.wait_lock;
          ut_ad(wait_lock);
          snprintf(buf, sizeof buf, "\n*** (%u) TRANSACTION:\n", ++l);
          print(buf);
          print(*next);
          print("*** WAITING FOR THIS LOCK TO BE GRANTED:\n");
          print(*wait_lock);
5926
          if (r == REPORT_BASIC);
5927 5928 5929 5930 5931 5932 5933 5934 5935 5936 5937 5938 5939 5940 5941
          else if (wait_lock->is_table())
          {
            if (const lock_t *lock=
                UT_LIST_GET_FIRST(wait_lock->un_member.tab_lock.table->locks))
            {
              ut_ad(!lock->is_waiting());
              print("*** CONFLICTING WITH:\n");
              do
                print(*lock);
              while ((lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) &&
                     !lock->is_waiting());
            }
            else
              ut_ad("no conflicting table lock found" == 0);
          }
5942
          else
5943
          {
5944 5945 5946 5947 5948 5949 5950 5951 5952 5953 5954 5955 5956 5957 5958 5959 5960 5961
            const page_id_t id{wait_lock->un_member.rec_lock.page_id};
            hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                                 ? lock_sys.prdt_hash : lock_sys.rec_hash).
              cell_get(id.fold());
            if (const lock_t *lock= lock_sys_t::get_first(cell, id))
            {
              const ulint heap_no= lock_rec_find_set_bit(wait_lock);
              if (!lock_rec_get_nth_bit(lock, heap_no))
                lock= lock_rec_get_next_const(heap_no, lock);
              ut_ad(!lock->is_waiting());
              print("*** CONFLICTING WITH:\n");
              do
                print(*lock);
              while ((lock= lock_rec_get_next_const(heap_no, lock)) &&
                     !lock->is_waiting());
            }
            else
              ut_ad("no conflicting record lock found" == 0);
5962 5963 5964 5965 5966 5967 5968
          }
          if (next == cycle)
            break;
        }
        snprintf(buf, sizeof buf, rollback_msg, victim_pos);
        print(buf);
      }
5969

5970
      ut_ad(victim->state == TRX_STATE_ACTIVE);
5971

5972 5973
      victim->lock.was_chosen_as_deadlock_victim= true;
      lock_cancel_waiting_and_release(victim->lock.wait_lock);
5974 5975 5976 5977 5978
#ifdef WITH_WSREP
      if (victim->is_wsrep() && wsrep_thd_is_SR(victim->mysql_thd))
        wsrep_handle_SR_rollback(trx->mysql_thd, victim->mysql_thd);
#endif
    }
5979

5980 5981 5982
func_exit:
    if (current_trx)
      lock_sys.wr_unlock();
5983 5984
    return victim;
  }
5985 5986
}

5987 5988 5989 5990 5991
/** Check if a lock request results in a deadlock.
Resolve a deadlock by choosing a transaction that will be rolled back.
@param trx    transaction requesting a lock
@return whether trx must report DB_DEADLOCK */
static bool Deadlock::check_and_resolve(trx_t *trx)
5992
{
5993 5994
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);

5995
  ut_ad(!trx->mutex_is_owner());
5996 5997
  ut_ad(trx->state == TRX_STATE_ACTIVE);
  ut_ad(!srv_read_only_mode);
5998

5999 6000
  if (!innodb_deadlock_detect)
    return false;
6001

6002 6003 6004 6005 6006 6007
  if (UNIV_LIKELY_NULL(find_cycle(trx)) && report(trx, true) == trx)
    return true;

  if (UNIV_LIKELY(!trx->lock.was_chosen_as_deadlock_victim))
    return false;

6008
  if (lock_t *wait_lock= trx->lock.wait_lock)
6009
    lock_sys_t::cancel(trx, wait_lock, false);
6010

6011
  lock_sys.deadlock_check();
6012
  return true;
6013
}
6014

6015 6016
/** Check for deadlocks while holding only lock_sys.wait_mutex. */
void lock_sys_t::deadlock_check()
6017
{
6018 6019
  ut_ad(!is_writer());
  mysql_mutex_assert_owner(&wait_mutex);
6020
  bool acquired= false;
6021

6022 6023
  if (Deadlock::to_be_checked)
  {
6024
    for (;;)
6025
    {
6026
      auto i= Deadlock::to_check.begin();
6027 6028
      if (i == Deadlock::to_check.end())
        break;
6029
      if (!acquired)
6030
      {
6031 6032
        acquired= wr_lock_try();
        if (!acquired)
6033
        {
6034 6035
          acquired= true;
          mysql_mutex_unlock(&wait_mutex);
6036
          lock_sys.wr_lock(SRW_LOCK_CALL);
6037
          mysql_mutex_lock(&wait_mutex);
6038 6039 6040
          continue;
        }
      }
6041 6042 6043 6044
      trx_t *trx= *i;
      Deadlock::to_check.erase(i);
      if (Deadlock::find_cycle(trx))
        Deadlock::report(trx, false);
6045
    }
6046
    Deadlock::to_be_checked= false;
6047
  }
6048
  ut_ad(Deadlock::to_check.empty());
6049
  if (acquired)
6050
    wr_unlock();
6051 6052
}

6053

6054 6055 6056
/*************************************************************//**
Updates the lock table when a page is split and merged to
two pages. */
6057
UNIV_INTERN
6058 6059 6060 6061 6062 6063
void
lock_update_split_and_merge(
	const buf_block_t* left_block,	/*!< in: left page to which merged */
	const rec_t* orig_pred,		/*!< in: original predecessor of
					supremum on the left page before merge*/
	const buf_block_t* right_block)	/*!< in: right page from which merged */
6064
{
6065 6066 6067
  ut_ad(page_is_leaf(left_block->frame));
  ut_ad(page_is_leaf(right_block->frame));
  ut_ad(page_align(orig_pred) == left_block->frame);
6068

6069 6070
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
6071

6072 6073 6074
  LockMultiGuard g{lock_sys.rec_hash, l, r};
  const rec_t *left_next_rec= page_rec_get_next_const(orig_pred);
  ut_ad(!page_rec_is_metadata(left_next_rec));
6075

6076 6077
  /* Inherit the locks on the supremum of the left page to the
  first record which was moved from the right page */
6078
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell1(), l, left_block->frame,
6079 6080
                          page_rec_get_heap_no(left_next_rec),
                          PAGE_HEAP_NO_SUPREMUM);
6081

6082 6083
  /* Reset the locks on the supremum of the left page,
  releasing waiting transactions */
6084
  lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
6085

6086 6087
  /* Inherit the locks to the supremum of the left page from the
  successor of the infimum on the right page */
6088 6089
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->frame,
                          PAGE_HEAP_NO_SUPREMUM,
6090
                          lock_get_min_heap_no(right_block));
6091
}