lock0lock.cc 190 KB
Newer Older
1 2
/*****************************************************************************

3
Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
4
Copyright (c) 2014, 2022, MariaDB Corporation.
5 6 7 8 9 10 11 12 13 14 15

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
Vicențiu Ciorbaru's avatar
Vicențiu Ciorbaru committed
16
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 18 19 20 21 22 23 24 25 26 27 28

*****************************************************************************/

/**************************************************//**
@file lock/lock0lock.cc
The transaction lock system

Created 5/7/1996 Heikki Tuuri
*******************************************************/

#define LOCK_MODULE_IMPLEMENTATION

29
#include "univ.i"
30

31
#include <mysql/service_thd_error_context.h>
32
#include <mysql/service_thd_wait.h>
33
#include <sql_class.h>
34

35 36
#include "lock0lock.h"
#include "lock0priv.h"
37
#include "dict0mem.h"
38 39 40
#include "trx0purge.h"
#include "trx0sys.h"
#include "ut0vec.h"
41
#include "btr0cur.h"
42 43
#include "row0sel.h"
#include "row0mysql.h"
44
#include "row0vers.h"
45
#include "pars0pars.h"
46
#include "srv0mon.h"
47

48 49
#include <set>

50
#ifdef WITH_WSREP
51
#include <mysql/service_wsrep.h>
52
#include <debug_sync.h>
53
#endif /* WITH_WSREP */
54

55
/** The value of innodb_deadlock_detect */
56 57 58
my_bool innodb_deadlock_detect;
/** The value of innodb_deadlock_report */
ulong innodb_deadlock_report;
59

60
#ifdef HAVE_REPLICATION
61 62
extern "C" void thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd);
extern "C" int thd_need_wait_reports(const MYSQL_THD thd);
63
extern "C" int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
64
#endif
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
/** Functor for accessing the embedded node within a table lock. */
struct TableLockGetNode
{
  ut_list_node<lock_t> &operator()(lock_t &elem)
  { return(elem.un_member.tab_lock.locks); }
};

/** Create the hash table.
@param n  the lower bound of n_cells */
void lock_sys_t::hash_table::create(ulint n)
{
  n_cells= ut_find_prime(n);
  const size_t size= pad(n_cells) * sizeof *array;
  void* v= aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
  memset(v, 0, size);
  array= static_cast<hash_cell_t*>(v);
}

/** Resize the hash table.
@param n  the lower bound of n_cells */
void lock_sys_t::hash_table::resize(ulint n)
{
  ut_ad(lock_sys.is_writer());
  ulint new_n_cells= ut_find_prime(n);
  const size_t size= pad(new_n_cells) * sizeof *array;
  void* v= aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
  memset(v, 0, size);
  hash_cell_t *new_array= static_cast<hash_cell_t*>(v);

  for (auto i= pad(n_cells); i--; )
  {
    if (lock_t *lock= static_cast<lock_t*>(array[i].node))
    {
99 100
      /* all hash_latch must vacated */
      ut_ad(i % (ELEMENTS_PER_LATCH + LATCH) >= LATCH);
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
      do
      {
        ut_ad(!lock->is_table());
        hash_cell_t *c= calc_hash(lock->un_member.rec_lock.page_id.fold(),
                                  new_n_cells) + new_array;
        lock_t *next= lock->hash;
        lock->hash= nullptr;
        if (!c->node)
          c->node= lock;
        else if (!lock->is_waiting())
        {
          lock->hash= static_cast<lock_t*>(c->node);
          c->node= lock;
        }
        else
        {
          lock_t *next= static_cast<lock_t*>(c->node);
          while (next->hash)
            next= next->hash;
          next->hash= lock;
        }
        lock= next;
      }
      while (lock);
    }
  }

128
  aligned_free(array);
129 130 131 132
  array= new_array;
  n_cells= new_n_cells;
}

133
#ifdef SUX_LOCK_GENERIC
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
void lock_sys_t::hash_latch::wait()
{
  pthread_mutex_lock(&lock_sys.hash_mutex);
  while (!write_trylock())
    pthread_cond_wait(&lock_sys.hash_cond, &lock_sys.hash_mutex);
  pthread_mutex_unlock(&lock_sys.hash_mutex);
}

void lock_sys_t::hash_latch::release()
{
  pthread_mutex_lock(&lock_sys.hash_mutex);
  write_unlock();
  pthread_cond_signal(&lock_sys.hash_cond);
  pthread_mutex_unlock(&lock_sys.hash_mutex);
}
#endif

#ifdef UNIV_DEBUG
/** Assert that a lock shard is exclusively latched by this thread */
void lock_sys_t::assert_locked(const lock_t &lock) const
{
  ut_ad(this == &lock_sys);
  if (is_writer())
    return;
  if (lock.is_table())
    assert_locked(*lock.un_member.tab_lock.table);
  else
    lock_sys.hash_get(lock.type_mode).
      assert_locked(lock.un_member.rec_lock.page_id);
}

/** Assert that a table lock shard is exclusively latched by this thread */
void lock_sys_t::assert_locked(const dict_table_t &table) const
{
  ut_ad(!table.is_temporary());
169
  if (is_writer())
170 171 172 173 174
    return;
  ut_ad(readers);
  ut_ad(table.lock_mutex_is_owner());
}

175
/** Assert that hash cell for page is exclusively latched by this thread */
176 177 178 179 180
void lock_sys_t::hash_table::assert_locked(const page_id_t id) const
{
  if (lock_sys.is_writer())
    return;
  ut_ad(lock_sys.readers);
181 182 183 184 185 186
  ut_ad(latch(cell_get(id.fold()))->is_locked());
}

/** Assert that a hash table cell is exclusively latched (by some thread) */
void lock_sys_t::assert_locked(const hash_cell_t &cell) const
{
187
  if (is_writer())
188 189 190
    return;
  ut_ad(lock_sys.readers);
  ut_ad(hash_table::latch(const_cast<hash_cell_t*>(&cell))->is_locked());
191 192 193 194 195 196 197
}
#endif

LockGuard::LockGuard(lock_sys_t::hash_table &hash, page_id_t id)
{
  const auto id_fold= id.fold();
  lock_sys.rd_lock(SRW_LOCK_CALL);
198 199
  cell_= hash.cell_get(id_fold);
  hash.latch(cell_)->acquire();
200 201 202 203 204 205 206 207
}

LockMultiGuard::LockMultiGuard(lock_sys_t::hash_table &hash,
                               const page_id_t id1, const page_id_t id2)
{
  ut_ad(id1.space() == id2.space());
  const auto id1_fold= id1.fold(), id2_fold= id2.fold();
  lock_sys.rd_lock(SRW_LOCK_CALL);
208 209 210 211
  cell1_= hash.cell_get(id1_fold);
  cell2_= hash.cell_get(id2_fold);

  auto latch1= hash.latch(cell1_), latch2= hash.latch(cell2_);
212 213 214 215 216 217 218 219 220
  if (latch1 > latch2)
    std::swap(latch1, latch2);
  latch1->acquire();
  if (latch1 != latch2)
    latch2->acquire();
}

LockMultiGuard::~LockMultiGuard()
{
221 222
  auto latch1= lock_sys_t::hash_table::latch(cell1_),
    latch2= lock_sys_t::hash_table::latch(cell2_);
223 224 225 226 227 228 229
  latch1->release();
  if (latch1 != latch2)
    latch2->release();
  /* Must be last, to avoid a race with lock_sys_t::hash_table::resize() */
  lock_sys.rd_unlock();
}

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
TRANSACTIONAL_TARGET
TMLockGuard::TMLockGuard(lock_sys_t::hash_table &hash, page_id_t id)
{
  const auto id_fold= id.fold();
#if !defined NO_ELISION && !defined SUX_LOCK_GENERIC
  if (xbegin())
  {
    if (lock_sys.latch.is_write_locked())
      xabort();
    cell_= hash.cell_get(id_fold);
    if (hash.latch(cell_)->is_locked())
      xabort();
    elided= true;
    return;
  }
  elided= false;
#endif
  lock_sys.rd_lock(SRW_LOCK_CALL);
  cell_= hash.cell_get(id_fold);
  hash.latch(cell_)->acquire();
}

Marko Mäkelä's avatar
Marko Mäkelä committed
252
/** Pretty-print a table lock.
253 254
@param[in,out]	file	output stream
@param[in]	lock	table lock */
Marko Mäkelä's avatar
Marko Mäkelä committed
255
static void lock_table_print(FILE* file, const lock_t* lock);
256

Marko Mäkelä's avatar
Marko Mäkelä committed
257
/** Pretty-print a record lock.
258
@param[in,out]	file	output stream
Marko Mäkelä's avatar
Marko Mäkelä committed
259 260 261
@param[in]	lock	record lock
@param[in,out]	mtr	mini-transaction for accessing the record */
static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr);
262

263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
namespace Deadlock
{
  /** Whether to_check may be nonempty */
  static Atomic_relaxed<bool> to_be_checked;
  /** Transactions to check for deadlock. Protected by lock_sys.wait_mutex. */
  static std::set<trx_t*> to_check;

  MY_ATTRIBUTE((nonnull, warn_unused_result))
  /** Check if a lock request results in a deadlock.
  Resolve a deadlock by choosing a transaction that will be rolled back.
  @param trx    transaction requesting a lock
  @return whether trx must report DB_DEADLOCK */
  static bool check_and_resolve(trx_t *trx);

  /** Quickly detect a deadlock using Brent's cycle detection algorithm.
  @param trx     transaction that is waiting for another transaction
  @return a transaction that is part of a cycle
  @retval nullptr if no cycle was found */
  inline trx_t *find_cycle(trx_t *trx)
  {
    mysql_mutex_assert_owner(&lock_sys.wait_mutex);
    trx_t *tortoise= trx, *hare= trx;
    for (unsigned power= 1, l= 1; (hare= hare->lock.wait_trx) != nullptr; l++)
    {
      if (tortoise == hare)
      {
        ut_ad(l > 1);
        lock_sys.deadlocks++;
        /* Note: Normally, trx should be part of any deadlock cycle
        that is found. However, if innodb_deadlock_detect=OFF had been
        in effect in the past, it is possible that trx will be waiting
        for a transaction that participates in a pre-existing deadlock
        cycle. In that case, our victim will not be trx. */
        return hare;
      }
      if (l == power)
      {
        /* The maximum concurrent number of TRX_STATE_ACTIVE transactions
        is TRX_RSEG_N_SLOTS * 128, or innodb_page_size / 16 * 128
        (default: 131,072, maximum: 524,288).
        Our maximum possible number of iterations should be twice that. */
        power<<= 1;
        l= 0;
        tortoise= hare;
      }
    }
    return nullptr;
  }
311 312
};

313
#ifdef UNIV_DEBUG
314 315
/** Validate the transactional locks. */
static void lock_validate();
316

317 318 319 320 321 322
/** Validate the record lock queues on a page.
@param block    buffer pool block
@param latched  whether the tablespace latch may be held
@return true if ok */
static bool lock_rec_validate_page(const buf_block_t *block, bool latched)
  MY_ATTRIBUTE((nonnull, warn_unused_result));
323 324 325
#endif /* UNIV_DEBUG */

/* The lock system */
326
lock_sys_t lock_sys;
327

328 329
/** Only created if !srv_read_only_mode. Protected by lock_sys.latch. */
static FILE *lock_latest_err_file;
330 331 332

/*********************************************************************//**
Reports that a transaction id is insensible, i.e., in the future. */
333
ATTRIBUTE_COLD
334 335 336 337 338 339
void
lock_report_trx_id_insanity(
/*========================*/
	trx_id_t	trx_id,		/*!< in: trx id */
	const rec_t*	rec,		/*!< in: user record */
	dict_index_t*	index,		/*!< in: index */
340
	const rec_offs*	offsets,	/*!< in: rec_get_offsets(rec, index) */
341
	trx_id_t	max_trx_id)	/*!< in: trx_sys.get_max_trx_id() */
342
{
343
	ut_ad(rec_offs_validate(rec, index, offsets));
344
	ut_ad(!rec_is_metadata(rec, *index));
345

346
	ib::error()
347
		<< "Transaction id " << ib::hex(trx_id)
348 349 350 351 352
		<< " associated with record" << rec_offsets_print(rec, offsets)
		<< " in index " << index->name
		<< " of table " << index->table->name
		<< " is greater than the global counter " << max_trx_id
		<< "! The table is corrupted.";
353 354 355 356
}

/*********************************************************************//**
Checks that a transaction id is sensible, i.e., not in the future.
357
@return true if ok */
358 359 360 361 362 363
bool
lock_check_trx_id_sanity(
/*=====================*/
	trx_id_t	trx_id,		/*!< in: trx id */
	const rec_t*	rec,		/*!< in: user record */
	dict_index_t*	index,		/*!< in: index */
364
	const rec_offs*	offsets)	/*!< in: rec_get_offsets(rec, index) */
365
{
366 367 368 369 370 371 372 373 374 375 376 377
  ut_ad(rec_offs_validate(rec, index, offsets));
  ut_ad(!rec_is_metadata(rec, *index));

  trx_id_t max_trx_id= trx_sys.get_max_trx_id();
  ut_ad(max_trx_id || srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN);

  if (UNIV_LIKELY(max_trx_id != 0) && UNIV_UNLIKELY(trx_id >= max_trx_id))
  {
    lock_report_trx_id_insanity(trx_id, rec, index, offsets, max_trx_id);
    return false;
  }
  return true;
378 379 380
}


381 382
/**
  Creates the lock system at database start.
383

384 385 386 387
  @param[in] n_cells number of slots in lock hash table
*/
void lock_sys_t::create(ulint n_cells)
{
388 389
  ut_ad(this == &lock_sys);
  ut_ad(!is_initialised());
390

391
  m_initialised= true;
392

393
  latch.SRW_LOCK_INIT(lock_latch_key);
394 395 396
#ifdef __aarch64__
  mysql_mutex_init(lock_wait_mutex_key, &wait_mutex, MY_MUTEX_INIT_FAST);
#else
397
  mysql_mutex_init(lock_wait_mutex_key, &wait_mutex, nullptr);
398
#endif
399
#ifdef SUX_LOCK_GENERIC
400 401 402
  pthread_mutex_init(&hash_mutex, nullptr);
  pthread_cond_init(&hash_cond, nullptr);
#endif
403

404 405 406
  rec_hash.create(n_cells);
  prdt_hash.create(n_cells);
  prdt_page_hash.create(n_cells);
407

408 409 410 411 412
  if (!srv_read_only_mode)
  {
    lock_latest_err_file= os_file_create_tmpfile();
    ut_a(lock_latest_err_file);
  }
413 414
}

415 416 417 418
#ifdef UNIV_PFS_RWLOCK
/** Acquire exclusive lock_sys.latch */
void lock_sys_t::wr_lock(const char *file, unsigned line)
{
419
  mysql_mutex_assert_not_owner(&wait_mutex);
420
  latch.wr_lock(file, line);
Marko Mäkelä's avatar
Marko Mäkelä committed
421
  ut_ad(!writer.exchange(pthread_self(), std::memory_order_relaxed));
422 423 424 425 426
}
/** Release exclusive lock_sys.latch */
void lock_sys_t::wr_unlock()
{
  ut_ad(writer.exchange(0, std::memory_order_relaxed) ==
Marko Mäkelä's avatar
Marko Mäkelä committed
427
        pthread_self());
428 429
  latch.wr_unlock();
}
430

431 432 433
/** Acquire shared lock_sys.latch */
void lock_sys_t::rd_lock(const char *file, unsigned line)
{
434
  mysql_mutex_assert_not_owner(&wait_mutex);
435 436 437 438 439 440 441 442 443 444 445 446 447
  latch.rd_lock(file, line);
  ut_ad(!writer.load(std::memory_order_relaxed));
  ut_d(readers.fetch_add(1, std::memory_order_relaxed));
}

/** Release shared lock_sys.latch */
void lock_sys_t::rd_unlock()
{
  ut_ad(!writer.load(std::memory_order_relaxed));
  ut_ad(readers.fetch_sub(1, std::memory_order_relaxed));
  latch.rd_unlock();
}
#endif
448

449 450 451 452 453 454
/**
  Resize the lock hash table.

  @param[in] n_cells number of slots in lock hash table
*/
void lock_sys_t::resize(ulint n_cells)
455
{
456
  ut_ad(this == &lock_sys);
457 458
  /* Buffer pool resizing is rarely initiated by the user, and this
  would exceed the maximum size of a memory transaction. */
459 460 461 462
  LockMutexGuard g{SRW_LOCK_CALL};
  rec_hash.resize(n_cells);
  prdt_hash.resize(n_cells);
  prdt_page_hash.resize(n_cells);
463 464
}

465 466
/** Closes the lock system at database shutdown. */
void lock_sys_t::close()
467
{
468
  ut_ad(this == &lock_sys);
469

470 471
  if (!m_initialised)
    return;
472

473 474 475 476 477
  if (lock_latest_err_file)
  {
    my_fclose(lock_latest_err_file, MYF(MY_WME));
    lock_latest_err_file= nullptr;
  }
478

479 480 481
  rec_hash.free();
  prdt_hash.free();
  prdt_page_hash.free();
482
#ifdef SUX_LOCK_GENERIC
483 484 485
  pthread_mutex_destroy(&hash_mutex);
  pthread_cond_destroy(&hash_cond);
#endif
486

487
  latch.destroy();
488
  mysql_mutex_destroy(&wait_mutex);
489

490 491 492
  Deadlock::to_check.clear();
  Deadlock::to_be_checked= false;

493
  m_initialised= false;
494 495
}

496
#ifdef WITH_WSREP
497
# ifdef UNIV_DEBUG
498 499 500 501 502 503
/** Check if both conflicting lock transaction and other transaction
requesting record lock are brute force (BF). If they are check is
this BF-BF wait correct and if not report BF wait and assert.

@param[in]	lock_rec	other waiting record lock
@param[in]	trx		trx requesting conflicting record lock
504
*/
505
static void wsrep_assert_no_bf_bf_wait(const lock_t *lock, const trx_t *trx)
506
{
Marko Mäkelä's avatar
Marko Mäkelä committed
507 508
	ut_ad(!lock->is_table());
	lock_sys.assert_locked(*lock);
509
	trx_t* lock_trx= lock->trx;
510

Marko Mäkelä's avatar
Marko Mäkelä committed
511
	/* Note that we are holding lock_sys.latch, thus we should
Marko Mäkelä's avatar
Marko Mäkelä committed
512
	not acquire THD::LOCK_thd_data mutex below to avoid latching
513
	order violation. */
514

515
	if (!trx->is_wsrep() || !lock_trx->is_wsrep())
516
		return;
517 518
	if (UNIV_LIKELY(!wsrep_thd_is_BF(trx->mysql_thd, FALSE))
	    || UNIV_LIKELY(!wsrep_thd_is_BF(lock_trx->mysql_thd, FALSE)))
519 520
		return;

521 522
	ut_ad(trx->state == TRX_STATE_ACTIVE);

Marko Mäkelä's avatar
Marko Mäkelä committed
523
	switch (lock_trx->state) {
524
	case TRX_STATE_COMMITTED_IN_MEMORY:
Marko Mäkelä's avatar
Marko Mäkelä committed
525 526
		/* The state change is only protected by trx_t::mutex,
		which we are not even holding here. */
527
	case TRX_STATE_PREPARED:
Marko Mäkelä's avatar
Marko Mäkelä committed
528 529
		/* Wait for lock->trx to complete the commit
		(or XA ROLLBACK) and to release the lock. */
530
		return;
531 532 533 534 535
	case TRX_STATE_ACTIVE:
		break;
	default:
		ut_ad("invalid state" == 0);
	}
536

537 538 539 540 541
	/* If BF - BF order is honored, i.e. trx already holding
	record lock should be ordered before this new lock request
	we can keep trx waiting for the lock. If conflicting
	transaction is already aborting or rolling back for replaying
	we can also let new transaction waiting. */
Marko Mäkelä's avatar
Marko Mäkelä committed
542 543
	if (wsrep_thd_order_before(lock_trx->mysql_thd, trx->mysql_thd)
	    || wsrep_thd_is_aborting(lock_trx->mysql_thd)) {
544 545 546
		return;
	}

547 548 549
	mtr_t mtr;

	ib::error() << "Conflicting lock on table: "
550
		    << lock->index->table->name
551
		    << " index: "
552
		    << lock->index->name()
553
		    << " that has lock ";
554
	lock_rec_print(stderr, lock, mtr);
555 556 557

	ib::error() << "WSREP state: ";

558 559 560 561
	wsrep_report_bf_lock_wait(trx->mysql_thd,
				  trx->id);
	wsrep_report_bf_lock_wait(lock_trx->mysql_thd,
				  lock_trx->id);
562 563 564
	/* BF-BF wait is a bug */
	ut_error;
}
565
# endif /* UNIV_DEBUG */
566

567
/** check if lock timeout was for priority thread,
568
as a side effect trigger lock monitor
569 570 571
@param trx    transaction owning the lock
@return false for regular lock timeout */
ATTRIBUTE_NOINLINE static bool wsrep_is_BF_lock_timeout(const trx_t &trx)
572
{
573
  ut_ad(trx.is_wsrep());
574

575 576 577 578 579 580 581
  if (trx.error_state == DB_DEADLOCK || !srv_monitor_timer ||
      !wsrep_thd_is_BF(trx.mysql_thd, false))
    return false;

  ib::info() << "WSREP: BF lock wait long for trx:" << ib::hex(trx.id)
             << " query: " << wsrep_thd_query(trx.mysql_thd);
  return true;
582
}
583 584
#endif /* WITH_WSREP */

585 586
/*********************************************************************//**
Checks if a lock request for a new lock has to wait for request lock2.
587
@return TRUE if new lock has to wait for lock2 to be removed */
588
UNIV_INLINE
589
bool
590 591
lock_rec_has_to_wait(
/*=================*/
592 593
	bool		for_locking,
				/*!< in is called locking or releasing */
594
	const trx_t*	trx,	/*!< in: trx of new lock */
595
	unsigned	type_mode,/*!< in: precise mode of the new lock
596 597 598 599 600 601 602
				to set: LOCK_S or LOCK_X, possibly
				ORed to LOCK_GAP or LOCK_REC_NOT_GAP,
				LOCK_INSERT_INTENTION */
	const lock_t*	lock2,	/*!< in: another record lock; NOTE that
				it is assumed that this has a lock bit
				set on the same record as in the new
				lock we are setting */
603 604
	bool		lock_is_on_supremum)
				/*!< in: TRUE if we are setting the
605 606 607 608
				lock on the 'supremum' record of an
				index page: we know then that the lock
				request is really for a 'gap' type lock */
{
609 610
	ut_ad(trx);
	ut_ad(!lock2->is_table());
Marko Mäkelä's avatar
Marko Mäkelä committed
611 612
	ut_d(lock_sys.hash_get(type_mode).assert_locked(
		     lock2->un_member.rec_lock.page_id));
613

614 615 616
	if (trx == lock2->trx
	    || lock_mode_compatible(
		       static_cast<lock_mode>(LOCK_MODE_MASK & type_mode),
617
		       lock2->mode())) {
Eugene Kosov's avatar
Eugene Kosov committed
618
		return false;
619
	}
620

621 622
	/* We have somewhat complex rules when gap type record locks
	cause waits */
623

624 625
	if ((lock_is_on_supremum || (type_mode & LOCK_GAP))
	    && !(type_mode & LOCK_INSERT_INTENTION)) {
626

627 628 629 630
		/* Gap type locks without LOCK_INSERT_INTENTION flag
		do not need to wait for anything. This is because
		different users can have conflicting lock types
		on gaps. */
631

Eugene Kosov's avatar
Eugene Kosov committed
632
		return false;
633
	}
634

635
	if (!(type_mode & LOCK_INSERT_INTENTION) && lock2->is_gap()) {
636

637 638
		/* Record lock (LOCK_ORDINARY or LOCK_REC_NOT_GAP
		does not need to wait for a gap type lock */
639

Eugene Kosov's avatar
Eugene Kosov committed
640
		return false;
641
	}
642

643
	if ((type_mode & LOCK_GAP) && lock2->is_record_not_gap()) {
644

645 646
		/* Lock on gap does not need to wait for
		a LOCK_REC_NOT_GAP type lock */
647

Eugene Kosov's avatar
Eugene Kosov committed
648
		return false;
649
	}
650

651
	if (lock2->is_insert_intention()) {
652 653 654 655 656 657 658
		/* No lock request needs to wait for an insert
		intention lock to be removed. This is ok since our
		rules allow conflicting locks on gaps. This eliminates
		a spurious deadlock caused by a next-key lock waiting
		for an insert intention lock; when the insert
		intention lock was granted, the insert deadlocked on
		the waiting next-key lock.
659

660 661
		Also, insert intention locks do not disturb each
		other. */
662

Eugene Kosov's avatar
Eugene Kosov committed
663
		return false;
664
	}
665

666
#ifdef HAVE_REPLICATION
667
	if ((type_mode & LOCK_GAP || lock2->is_gap())
668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688
	    && !thd_need_ordering_with(trx->mysql_thd, lock2->trx->mysql_thd)) {
		/* If the upper server layer has already decided on the
		commit order between the transaction requesting the
		lock and the transaction owning the lock, we do not
		need to wait for gap locks. Such ordeering by the upper
		server layer happens in parallel replication, where the
		commit order is fixed to match the original order on the
		master.

		Such gap locks are mainly needed to get serialisability
		between transactions so that they will be binlogged in
		the correct order so that statement-based replication
		will give the correct results. Since the right order
		was already determined on the master, we do not need
		to enforce it again here.

		Skipping the locks is not essential for correctness,
		since in case of deadlock we will just kill the later
		transaction and retry it. But it can save some
		unnecessary rollbacks and retries. */

Eugene Kosov's avatar
Eugene Kosov committed
689
		return false;
690
	}
691
#endif /* HAVE_REPLICATION */
692

693
#ifdef WITH_WSREP
694 695 696 697 698
		/* New lock request from a transaction is using unique key
		scan and this transaction is a wsrep high priority transaction
		(brute force). If conflicting transaction is also wsrep high
		priority transaction we should avoid lock conflict because
		ordering of these transactions is already decided and
Marko Mäkelä's avatar
Marko Mäkelä committed
699
		conflicting transaction will be later replayed. */
700
		if (trx->is_wsrep_UK_scan()
701
		    && wsrep_thd_is_BF(lock2->trx->mysql_thd, false)) {
702
			return false;
703 704
		}

705 706 707 708 709
		/* We very well can let bf to wait normally as other
		BF will be replayed in case of conflict. For debug
		builds we will do additional sanity checks to catch
		unsupported bf wait if any. */
		ut_d(wsrep_assert_no_bf_bf_wait(lock2, trx));
710
#endif /* WITH_WSREP */
711

Eugene Kosov's avatar
Eugene Kosov committed
712
	return true;
713 714 715 716
}

/*********************************************************************//**
Checks if a lock request lock1 has to wait for request lock2.
717
@return TRUE if lock1 has to wait for lock2 to be removed */
Eugene Kosov's avatar
Eugene Kosov committed
718
bool
719 720 721 722 723 724 725 726 727 728
lock_has_to_wait(
/*=============*/
	const lock_t*	lock1,	/*!< in: waiting lock */
	const lock_t*	lock2)	/*!< in: another lock; NOTE that it is
				assumed that this has a lock bit set
				on the same record as in lock1 if the
				locks are record locks */
{
	ut_ad(lock1 && lock2);

Eugene Kosov's avatar
Eugene Kosov committed
729
	if (lock1->trx == lock2->trx
730
	    || lock_mode_compatible(lock1->mode(), lock2->mode())) {
Eugene Kosov's avatar
Eugene Kosov committed
731
		return false;
Eugene Kosov's avatar
Eugene Kosov committed
732
	}
733

734
	if (lock1->is_table()) {
Eugene Kosov's avatar
Eugene Kosov committed
735
		return true;
736 737
	}

738
	ut_ad(!lock2->is_table());
Eugene Kosov's avatar
Eugene Kosov committed
739 740

	if (lock1->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)) {
Eugene Kosov's avatar
Eugene Kosov committed
741
		return lock_prdt_has_to_wait(lock1->trx, lock1->type_mode,
Eugene Kosov's avatar
Eugene Kosov committed
742
					     lock_get_prdt_from_lock(lock1),
Eugene Kosov's avatar
Eugene Kosov committed
743
					     lock2);
Eugene Kosov's avatar
Eugene Kosov committed
744 745
	}

Eugene Kosov's avatar
Eugene Kosov committed
746 747 748
	return lock_rec_has_to_wait(
		false, lock1->trx, lock1->type_mode, lock2,
		lock_rec_get_nth_bit(lock1, PAGE_HEAP_NO_SUPREMUM));
749 750 751 752 753 754 755 756 757 758 759 760 761 762
}

/*============== RECORD LOCK BASIC FUNCTIONS ============================*/

/**********************************************************************//**
Looks for a set bit in a record lock bitmap. Returns ULINT_UNDEFINED,
if none found.
@return bit index == heap number of the record, or ULINT_UNDEFINED if
none found */
ulint
lock_rec_find_set_bit(
/*==================*/
	const lock_t*	lock)	/*!< in: record lock with at least one bit set */
{
763
	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {
764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785

		if (lock_rec_get_nth_bit(lock, i)) {

			return(i);
		}
	}

	return(ULINT_UNDEFINED);
}

/*********************************************************************//**
Resets the record lock bitmap to zero. NOTE: does not touch the wait_lock
pointer in the transaction! This function is used in lock object creation
and resetting. */
static
void
lock_rec_bitmap_reset(
/*==================*/
	lock_t*	lock)	/*!< in: record lock */
{
	ulint	n_bytes;

786
	ut_ad(!lock->is_table());
787 788 789 790 791 792 793 794

	/* Reset to zero the bitmap which resides immediately after the lock
	struct */

	n_bytes = lock_rec_get_n_bits(lock) / 8;

	ut_ad((lock_rec_get_n_bits(lock) % 8) == 0);

795
	memset(reinterpret_cast<void*>(&lock[1]), 0, n_bytes);
796 797 798 799
}

/*********************************************************************//**
Copies a record lock to heap.
800
@return copy of lock */
801 802 803 804 805 806 807 808 809
static
lock_t*
lock_rec_copy(
/*==========*/
	const lock_t*	lock,	/*!< in: record lock */
	mem_heap_t*	heap)	/*!< in: memory heap */
{
	ulint	size;

810
	ut_ad(!lock->is_table());
811 812 813 814 815 816 817 818

	size = sizeof(lock_t) + lock_rec_get_n_bits(lock) / 8;

	return(static_cast<lock_t*>(mem_heap_dup(heap, lock, size)));
}

/*********************************************************************//**
Gets the previous record lock set on a record.
819
@return previous lock on the same record, NULL if none exists */
820 821 822 823 824 825
const lock_t*
lock_rec_get_prev(
/*==============*/
	const lock_t*	in_lock,/*!< in: record lock */
	ulint		heap_no)/*!< in: heap number of the record */
{
826 827 828
  ut_ad(!in_lock->is_table());
  const page_id_t id{in_lock->un_member.rec_lock.page_id};
  hash_cell_t *cell= lock_sys.hash_get(in_lock->type_mode).cell_get(id.fold());
829

830 831 832 833
  for (lock_t *lock= lock_sys_t::get_first(*cell, id); lock != in_lock;
       lock= lock_rec_get_next_on_page(lock))
    if (lock_rec_get_nth_bit(lock, heap_no))
      return lock;
834

835
  return nullptr;
836 837 838 839 840 841 842
}

/*============= FUNCTIONS FOR ANALYZING RECORD LOCK QUEUE ================*/

/*********************************************************************//**
Checks if a transaction has a GRANTED explicit lock on rec stronger or equal
to precise_mode.
843
@return lock or NULL */
844 845 846 847 848 849 850 851 852
UNIV_INLINE
lock_t*
lock_rec_has_expl(
/*==============*/
	ulint			precise_mode,/*!< in: LOCK_S or LOCK_X
					possibly ORed to LOCK_GAP or
					LOCK_REC_NOT_GAP, for a
					supremum record we regard this
					always a gap type request */
853
	const hash_cell_t&	cell,	/*!< in: lock hash table cell */
854
	const page_id_t		id,	/*!< in: page identifier */
855 856 857
	ulint			heap_no,/*!< in: heap number of the record */
	const trx_t*		trx)	/*!< in: transaction */
{
858 859 860
  ut_ad((precise_mode & LOCK_MODE_MASK) == LOCK_S
	|| (precise_mode & LOCK_MODE_MASK) == LOCK_X);
  ut_ad(!(precise_mode & LOCK_INSERT_INTENTION));
861

862
  for (lock_t *lock= lock_sys_t::get_first(cell, id, heap_no); lock;
863
       lock= lock_rec_get_next(heap_no, lock))
864 865 866 867 868 869 870 871
    if (lock->trx == trx &&
	!(lock->type_mode & (LOCK_WAIT | LOCK_INSERT_INTENTION)) &&
	(!((LOCK_REC_NOT_GAP | LOCK_GAP) & lock->type_mode) ||
	 heap_no == PAGE_HEAP_NO_SUPREMUM ||
	 ((LOCK_REC_NOT_GAP | LOCK_GAP) & precise_mode & lock->type_mode)) &&
	lock_mode_stronger_or_eq(lock->mode(), static_cast<lock_mode>
				 (precise_mode & LOCK_MODE_MASK)))
      return lock;
872

873
  return nullptr;
874 875 876 877 878
}

#ifdef UNIV_DEBUG
/*********************************************************************//**
Checks if some other transaction has a lock request in the queue.
879
@return lock or NULL */
880
static
881
lock_t*
882 883
lock_rec_other_has_expl_req(
/*========================*/
884
	lock_mode		mode,	/*!< in: LOCK_S or LOCK_X */
885
	const hash_cell_t&	cell,	/*!< in: lock hash table cell */
886
	const page_id_t		id,	/*!< in: page identifier */
887 888
	bool			wait,	/*!< in: whether also waiting locks
					are taken into account */
889 890 891 892 893 894 895
	ulint			heap_no,/*!< in: heap number of the record */
	const trx_t*		trx)	/*!< in: transaction, or NULL if
					requests by all transactions
					are taken into account */
{
	ut_ad(mode == LOCK_X || mode == LOCK_S);

896 897 898 899 900 901
	/* Only GAP lock can be on SUPREMUM, and we are not looking for
	GAP lock */
	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
		return(NULL);
	}

902
	for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no);
903
	     lock; lock = lock_rec_get_next(heap_no, lock)) {
904
		if (lock->trx != trx
905 906 907
		    && !lock->is_gap()
		    && (!lock->is_waiting() || wait)
		    && lock_mode_stronger_or_eq(lock->mode(), mode)) {
908 909 910 911 912 913 914 915 916

			return(lock);
		}
	}

	return(NULL);
}
#endif /* UNIV_DEBUG */

917
#ifdef WITH_WSREP
918
void lock_wait_wsrep_kill(trx_t *bf_trx, ulong thd_id, trx_id_t trx_id);
919

920 921
/** Kill the holders of conflicting locks.
@param trx   brute-force applier transaction running in the current thread */
922 923
ATTRIBUTE_COLD ATTRIBUTE_NOINLINE
static void lock_wait_wsrep(trx_t *trx)
924
{
925
  DBUG_ASSERT(wsrep_on(trx->mysql_thd));
926 927
  if (!wsrep_thd_is_BF(trx->mysql_thd, false))
    return;
Marko Mäkelä's avatar
Marko Mäkelä committed
928

929
  std::set<trx_t*> victims;
930

931 932 933 934 935
  lock_sys.wr_lock(SRW_LOCK_CALL);
  mysql_mutex_lock(&lock_sys.wait_mutex);

  const lock_t *wait_lock= trx->lock.wait_lock;
  if (!wait_lock)
936 937
  {
func_exit:
938 939 940
    lock_sys.wr_unlock();
    mysql_mutex_unlock(&lock_sys.wait_mutex);
    return;
941
  }
942

943 944 945 946 947
  if (wait_lock->is_table())
  {
    dict_table_t *table= wait_lock->un_member.tab_lock.table;
    for (lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock;
         lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
948 949 950 951 952
      /* if victim has also BF status, but has earlier seqno, we have to wait */
      if (lock->trx != trx &&
          !(wsrep_thd_is_BF(lock->trx->mysql_thd, false) &&
            wsrep_thd_order_before(lock->trx->mysql_thd, trx->mysql_thd)))
      {
953
        victims.emplace(lock->trx);
954
      }
955
  }
956
  else
957
  {
958 959 960 961 962 963 964 965 966 967
    const page_id_t id{wait_lock->un_member.rec_lock.page_id};
    hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                         ? lock_sys.prdt_hash : lock_sys.rec_hash).cell_get
      (id.fold());
    if (lock_t *lock= lock_sys_t::get_first(cell, id))
    {
      const ulint heap_no= lock_rec_find_set_bit(wait_lock);
      if (!lock_rec_get_nth_bit(lock, heap_no))
        lock= lock_rec_get_next(heap_no, lock);
      do
968 969 970 971 972
        /* if victim has also BF status, but has earlier seqno, we have to wait */
        if (lock->trx != trx &&
            !(wsrep_thd_is_BF(lock->trx->mysql_thd, false) &&
              wsrep_thd_order_before(lock->trx->mysql_thd, trx->mysql_thd)))
        {
973
          victims.emplace(lock->trx);
974
        }
975 976
      while ((lock= lock_rec_get_next(heap_no, lock)));
    }
977
  }
978

979 980
  if (victims.empty())
    goto func_exit;
981

982 983 984 985
  std::vector<std::pair<ulong,trx_id_t>> victim_id;
  for (trx_t *v : victims)
    victim_id.emplace_back(std::pair<ulong,trx_id_t>
                           {thd_get_thread_id(v->mysql_thd), v->id});
986

987 988 989 990 991 992 993 994
  DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort",
                  {
                    const char act[]=
                      "now SIGNAL sync.before_wsrep_thd_abort_reached "
                      "WAIT_FOR signal.before_wsrep_thd_abort";
                    DBUG_ASSERT(!debug_sync_set_action(trx->mysql_thd,
                                                       STRING_WITH_LEN(act)));
                  };);
995

996 997
  lock_sys.wr_unlock();
  mysql_mutex_unlock(&lock_sys.wait_mutex);
998

999 1000
  for (const auto &v : victim_id)
    lock_wait_wsrep_kill(trx, v.first, v.second);
1001
}
1002 1003
#endif /* WITH_WSREP */

1004 1005 1006
/*********************************************************************//**
Checks if some other transaction has a conflicting explicit lock request
in the queue, so that we have to wait.
Marko Mäkelä's avatar
Marko Mäkelä committed
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
@param[in] mode LOCK_S or LOCK_X, possibly ORed to LOCK_GAP or LOC_REC_NOT_GAP,
LOCK_INSERT_INTENTION
@param[in] cell lock hash table cell
@param[in] id page identifier
@param[in] heap_no heap number of the record
@param[in] trx our transaction
@return conflicting lock and the flag which indicated if conflicting locks
which wait for the current transaction were ignored */
static lock_t *lock_rec_other_has_conflicting(unsigned mode,
                                              const hash_cell_t &cell,
                                              const page_id_t id,
                                              ulint heap_no, const trx_t *trx)
1019
{
1020
	bool	is_supremum = (heap_no == PAGE_HEAP_NO_SUPREMUM);
1021

1022
	for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no);
1023
	     lock; lock = lock_rec_get_next(heap_no, lock)) {
1024
		if (lock_rec_has_to_wait(true, trx, mode, lock, is_supremum)) {
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
			return(lock);
		}
	}

	return(NULL);
}

/*********************************************************************//**
Checks if some transaction has an implicit x-lock on a record in a secondary
index.
1035
@return transaction id of the transaction which has the x-lock, or 0;
1036 1037 1038 1039
NOTE that this function can return false positives but never false
negatives. The caller must confirm all positive results by calling
trx_is_active(). */
static
1040
trx_t*
1041 1042
lock_sec_rec_some_has_impl(
/*=======================*/
1043
	trx_t*		caller_trx,/*!<in/out: trx of current thread */
1044 1045
	const rec_t*	rec,	/*!< in: user record */
	dict_index_t*	index,	/*!< in: secondary index */
1046
	const rec_offs*	offsets)/*!< in: rec_get_offsets(rec, index) */
1047
{
1048
	trx_t*		trx;
1049 1050 1051
	trx_id_t	max_trx_id;
	const page_t*	page = page_align(rec);

1052
	lock_sys.assert_unlocked();
1053 1054 1055
	ut_ad(!dict_index_is_clust(index));
	ut_ad(page_rec_is_user_rec(rec));
	ut_ad(rec_offs_validate(rec, index, offsets));
1056
	ut_ad(!rec_is_metadata(rec, *index));
1057 1058 1059 1060 1061

	max_trx_id = page_get_max_trx_id(page);

	/* Some transaction may have an implicit x-lock on the record only
	if the max trx id for the page >= min trx id for the trx list, or
Marko Mäkelä's avatar
Marko Mäkelä committed
1062
	database recovery is running. */
1063

1064
	if (max_trx_id < trx_sys.get_min_trx_id()) {
1065

1066
		trx = 0;
1067 1068 1069 1070

	} else if (!lock_check_trx_id_sanity(max_trx_id, rec, index, offsets)) {

		/* The page is corrupt: try to avoid a crash by returning 0 */
1071
		trx = 0;
1072 1073 1074 1075 1076

	/* In this case it is possible that some transaction has an implicit
	x-lock. We have to look in the clustered index. */

	} else {
1077
		trx = row_vers_impl_x_locked(caller_trx, rec, index, offsets);
1078 1079
	}

1080
	return(trx);
1081 1082
}

1083 1084
/*********************************************************************//**
Return the number of table locks for a transaction.
1085
The caller must be holding lock_sys.latch. */
1086 1087 1088 1089
ulint
lock_number_of_tables_locked(
/*=========================*/
	const trx_lock_t*	trx_lock)	/*!< in: transaction locks */
1090 1091
{
	const lock_t*	lock;
1092
	ulint		n_tables = 0;
1093

1094
	lock_sys.assert_locked();
1095 1096 1097 1098 1099

	for (lock = UT_LIST_GET_FIRST(trx_lock->trx_locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {

1100
		if (lock->is_table()) {
1101
			n_tables++;
1102 1103 1104
		}
	}

1105
	return(n_tables);
1106 1107 1108 1109
}

/*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/

1110 1111 1112 1113
/** Reset the wait status of a lock.
@param[in,out]	lock	lock that was possibly being waited for */
static void lock_reset_lock_and_trx_wait(lock_t *lock)
{
1114 1115 1116
  lock_sys.assert_locked(*lock);
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  trx_t *trx= lock->trx;
1117
  ut_ad(lock->is_waiting());
1118
  ut_ad(!trx->lock.wait_lock || trx->lock.wait_lock == lock);
1119 1120
  if (trx_t *wait_trx= trx->lock.wait_trx)
    Deadlock::to_check.erase(wait_trx);
1121
  trx->lock.wait_lock= nullptr;
1122
  trx->lock.wait_trx= nullptr;
1123 1124 1125
  lock->type_mode&= ~LOCK_WAIT;
}

1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138
#ifdef UNIV_DEBUG
/** Check transaction state */
static void check_trx_state(const trx_t *trx)
{
  ut_ad(!trx->auto_commit || trx->will_lock);
  const auto state= trx->state;
  ut_ad(state == TRX_STATE_ACTIVE ||
        state == TRX_STATE_PREPARED_RECOVERED ||
        state == TRX_STATE_PREPARED ||
        state == TRX_STATE_COMMITTED_IN_MEMORY);
}
#endif

1139 1140
/** Create a new record lock and inserts it to the lock queue,
without checking for deadlocks or conflicts.
1141
@param[in]	c_lock		conflicting lock
1142
@param[in]	type_mode	lock mode and wait flag
1143
@param[in]	page_id		index page number
1144 1145 1146 1147 1148 1149
@param[in]	page		R-tree index page, or NULL
@param[in]	heap_no		record heap number in the index page
@param[in]	index		the index tree
@param[in,out]	trx		transaction
@param[in]	holds_trx_mutex	whether the caller holds trx->mutex
@return created lock */
1150
lock_t*
1151
lock_rec_create_low(
1152
	lock_t*		c_lock,
1153
	unsigned	type_mode,
1154
	const page_id_t	page_id,
1155 1156
	const page_t*	page,
	ulint		heap_no,
1157
	dict_index_t*	index,
1158
	trx_t*		trx,
1159
	bool		holds_trx_mutex)
1160
{
1161 1162
	lock_t*		lock;
	ulint		n_bytes;
1163

1164
	ut_d(lock_sys.hash_get(type_mode).assert_locked(page_id));
1165
	ut_ad(xtest() || holds_trx_mutex == trx->mutex_is_owner());
1166
	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1167
	ut_ad(!(type_mode & LOCK_TABLE));
1168
	ut_ad(trx->state != TRX_STATE_NOT_STARTED);
Marko Mäkelä's avatar
Marko Mäkelä committed
1169
	ut_ad(!trx->is_autocommit_non_locking());
1170

1171 1172 1173
	/* If rec is the supremum record, then we reset the gap and
	LOCK_REC_NOT_GAP bits, as all locks on the supremum are
	automatically of the gap type */
1174

1175 1176 1177 1178
	if (UNIV_UNLIKELY(heap_no == PAGE_HEAP_NO_SUPREMUM)) {
		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));
		type_mode = type_mode & ~(LOCK_GAP | LOCK_REC_NOT_GAP);
	}
1179

1180
	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
Marko Mäkelä's avatar
Marko Mäkelä committed
1181
		n_bytes = (page_dir_get_n_heap(page) + 7) / 8;
1182
	} else {
1183
		ut_ad(heap_no == PRDT_HEAPNO);
1184

1185 1186 1187 1188
		/* The lock is always on PAGE_HEAP_NO_INFIMUM (0), so
		we only need 1 bit (which round up to 1 byte) for
		lock bit setting */
		n_bytes = 1;
1189

1190 1191
		if (type_mode & LOCK_PREDICATE) {
			ulint	tmp = UNIV_WORD_SIZE - 1;
1192

1193 1194 1195 1196 1197 1198 1199 1200
			/* We will attach predicate structure after lock.
			Make sure the memory is aligned on 8 bytes,
			the mem_heap_alloc will align it with
			MEM_SPACE_NEEDED anyway. */
			n_bytes = (n_bytes + sizeof(lock_prdt_t) + tmp) & ~tmp;
			ut_ad(n_bytes == sizeof(lock_prdt_t) + UNIV_WORD_SIZE);
		}
	}
1201

1202 1203 1204 1205 1206 1207
	if (!holds_trx_mutex) {
		trx->mutex_lock();
	}
	ut_ad(trx->mutex_is_owner());
	ut_ad(trx->state != TRX_STATE_NOT_STARTED);

1208 1209
	if (trx->lock.rec_cached >= UT_ARR_SIZE(trx->lock.rec_pool)
	    || sizeof *lock + n_bytes > sizeof *trx->lock.rec_pool) {
1210 1211 1212
		lock = static_cast<lock_t*>(
			mem_heap_alloc(trx->lock.lock_heap,
				       sizeof *lock + n_bytes));
1213
	} else {
1214
		lock = &trx->lock.rec_pool[trx->lock.rec_cached++].lock;
1215
	}
1216 1217

	lock->trx = trx;
1218
	lock->type_mode = type_mode;
1219
	lock->index = index;
1220
	lock->un_member.rec_lock.page_id = page_id;
1221

1222 1223 1224 1225 1226 1227 1228 1229 1230
	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
		lock->un_member.rec_lock.n_bits = uint32_t(n_bytes * 8);
	} else {
		/* Predicate lock always on INFIMUM (0) */
		lock->un_member.rec_lock.n_bits = 8;
 	}
	lock_rec_bitmap_reset(lock);
	lock_rec_set_nth_bit(lock, heap_no);
	index->table->n_rec_locks++;
1231
	ut_ad(index->table->get_ref_count() || !index->table->can_be_evicted);
1232

1233
	const auto lock_hash = &lock_sys.hash_get(type_mode);
Marko Mäkelä's avatar
Marko Mäkelä committed
1234
	lock_hash->cell_get(page_id.fold())->append(*lock, &lock_t::hash);
1235

1236
	if (type_mode & LOCK_WAIT) {
1237 1238 1239 1240 1241 1242 1243 1244 1245
		if (trx->lock.wait_trx) {
			ut_ad(!c_lock || trx->lock.wait_trx == c_lock->trx);
			ut_ad(trx->lock.wait_lock);
			ut_ad((*trx->lock.wait_lock).trx == trx);
		} else {
			ut_ad(c_lock);
			trx->lock.wait_trx = c_lock->trx;
			ut_ad(!trx->lock.wait_lock);
		}
1246
		trx->lock.wait_lock = lock;
1247 1248 1249
	}
	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
	if (!holds_trx_mutex) {
1250
		trx->mutex_unlock();
1251
	}
1252
	MONITOR_INC(MONITOR_RECLOCK_CREATED);
1253
	MONITOR_INC(MONITOR_NUM_RECLOCK);
1254

1255
	return lock;
1256 1257
}

1258 1259 1260 1261 1262 1263 1264 1265 1266
/** Enqueue a waiting request for a lock which cannot be granted immediately.
Check for deadlocks.
@param[in]	type_mode	the requested lock mode (LOCK_S or LOCK_X)
				possibly ORed with LOCK_GAP or
				LOCK_REC_NOT_GAP, ORed with
				LOCK_INSERT_INTENTION if this
				waiting lock request is set
				when performing an insert of
				an index record
1267 1268
@param[in]	id		page identifier
@param[in]	page		leaf page in the index
1269 1270 1271 1272 1273
@param[in]	heap_no		record heap number in the block
@param[in]	index		index tree
@param[in,out]	thr		query thread
@param[in]	prdt		minimum bounding box (spatial index)
@retval	DB_LOCK_WAIT		if the waiting lock was enqueued
1274
@retval	DB_DEADLOCK		if this transaction was chosen as the victim */
1275 1276
dberr_t
lock_rec_enqueue_waiting(
1277
	lock_t*			c_lock,
1278
	unsigned		type_mode,
1279 1280
	const page_id_t		id,
	const page_t*		page,
1281 1282 1283 1284
	ulint			heap_no,
	dict_index_t*		index,
	que_thr_t*		thr,
	lock_prdt_t*		prdt)
1285
{
1286
	ut_d(lock_sys.hash_get(type_mode).assert_locked(id));
1287 1288
	ut_ad(!srv_read_only_mode);
	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1289

1290
	trx_t* trx = thr_get_trx(thr);
1291
	ut_ad(xtest() || trx->mutex_is_owner());
1292
	ut_ad(!trx->dict_operation_lock_mode);
1293

1294
	if (trx->mysql_thd && thd_lock_wait_timeout(trx->mysql_thd) == 0) {
1295 1296
		trx->error_state = DB_LOCK_WAIT_TIMEOUT;
		return DB_LOCK_WAIT_TIMEOUT;
1297 1298
	}

1299 1300
	/* Enqueue the lock request that will wait to be granted, note that
	we already own the trx mutex. */
1301
	lock_t* lock = lock_rec_create_low(
1302
		c_lock,
1303
		type_mode | LOCK_WAIT, id, page, heap_no, index, trx, true);
1304

1305 1306
	if (prdt && type_mode & LOCK_PREDICATE) {
		lock_prdt_set_prdt(lock, prdt);
1307
	}
1308

1309
	trx->lock.wait_thr = thr;
1310
	trx->lock.clear_deadlock_victim();
1311

1312 1313 1314
	DBUG_LOG("ib_lock", "trx " << ib::hex(trx->id)
		 << " waits for lock in index " << index->name
		 << " of table " << index->table->name);
1315

1316
	MONITOR_INC(MONITOR_LOCKREC_WAIT);
1317

1318
	return DB_LOCK_WAIT;
1319 1320
}

1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333
/*********************************************************************//**
Looks for a suitable type record lock struct by the same trx on the same page.
This can be used to save space when a new record lock should be set on a page:
no new struct is needed, if a suitable old is found.
@return lock or NULL */
static inline
lock_t*
lock_rec_find_similar_on_page(
	ulint           type_mode,      /*!< in: lock type_mode field */
	ulint           heap_no,        /*!< in: heap number of the record */
	lock_t*         lock,           /*!< in: lock_sys.get_first() */
	const trx_t*    trx)            /*!< in: transaction */
{
1334
	lock_sys.rec_hash.assert_locked(lock->un_member.rec_lock.page_id);
1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350

	for (/* No op */;
	     lock != NULL;
	     lock = lock_rec_get_next_on_page(lock)) {

		if (lock->trx == trx
		    && lock->type_mode == type_mode
		    && lock_rec_get_n_bits(lock) > heap_no) {

			return(lock);
		}
	}

	return(NULL);
}

1351 1352 1353 1354 1355 1356 1357
/*********************************************************************//**
Adds a record lock request in the record queue. The request is normally
added as the last in the queue, but if there are no waiting lock requests
on the record, and the request to be added is not a waiting request, we
can reuse a suitable record lock object already existing on the same page,
just setting the appropriate bit in its bitmap. This is a low-level function
which does NOT check for deadlocks or lock compatibility!
Marko Mäkelä's avatar
Marko Mäkelä committed
1358 1359 1360 1361 1362 1363 1364 1365
@param[in] type_mode lock mode, wait, gap etc. flags
@param[in,out] cell first hash table cell
@param[in] id page identifier
@param[in] page buffer block containing the record
@param[in] heap_no heap number of the record
@param[in] index index of record
@param[in,out] trx transaction
@param[in] caller_owns_trx_mutex TRUE if caller owns the transaction mutex */
1366
TRANSACTIONAL_TARGET
Marko Mäkelä's avatar
Marko Mäkelä committed
1367 1368 1369 1370
static void lock_rec_add_to_queue(unsigned type_mode, hash_cell_t &cell,
                                  const page_id_t id, const page_t *page,
                                  ulint heap_no, dict_index_t *index,
                                  trx_t *trx, bool caller_owns_trx_mutex)
1371
{
1372
	ut_d(lock_sys.hash_get(type_mode).assert_locked(id));
1373
	ut_ad(xtest() || caller_owns_trx_mutex == trx->mutex_is_owner());
1374
	ut_ad(index->is_primary()
1375
	      || dict_index_get_online_status(index) != ONLINE_INDEX_CREATION);
1376
	ut_ad(!(type_mode & LOCK_TABLE));
1377
#ifdef UNIV_DEBUG
1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391
	switch (type_mode & LOCK_MODE_MASK) {
	case LOCK_X:
	case LOCK_S:
		break;
	default:
		ut_error;
	}

	if (!(type_mode & (LOCK_WAIT | LOCK_GAP))) {
		lock_mode	mode = (type_mode & LOCK_MODE_MASK) == LOCK_S
			? LOCK_X
			: LOCK_S;
		const lock_t*	other_lock
			= lock_rec_other_has_expl_req(
1392
				mode, cell, id, false, heap_no, trx);
1393
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
1394
		if (UNIV_LIKELY_NULL(other_lock) && trx->is_wsrep()) {
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404
			/* Only BF transaction may be granted lock
			before other conflicting lock request. */
			if (!wsrep_thd_is_BF(trx->mysql_thd, FALSE)
			    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
				/* If it is not BF, this case is a bug. */
				wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
				ut_error;
			}
		} else
1405
#endif /* WITH_WSREP */
1406
		ut_ad(!other_lock);
1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423
	}
#endif /* UNIV_DEBUG */

	/* If rec is the supremum record, then we can reset the gap bit, as
	all locks on the supremum are automatically of the gap type, and we
	try to avoid unnecessary memory consumption of a new record lock
	struct for a gap type lock */

	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));

		/* There should never be LOCK_REC_NOT_GAP on a supremum
		record, but let us play safe */

		type_mode &= ~(LOCK_GAP | LOCK_REC_NOT_GAP);
	}

1424 1425
	if (type_mode & LOCK_WAIT) {
		goto create;
1426
	} else if (lock_t *first_lock = lock_sys_t::get_first(cell, id)) {
1427 1428 1429 1430 1431 1432 1433 1434
		for (lock_t* lock = first_lock;;) {
			if (lock->is_waiting()
			    && lock_rec_get_nth_bit(lock, heap_no)) {
				goto create;
			}
			if (!(lock = lock_rec_get_next_on_page(lock))) {
				break;
			}
1435 1436 1437 1438 1439
		}

		/* Look for a similar record lock on the same page:
		if one is found and there are no waiting lock requests,
		we can just set the bit */
1440 1441
		if (lock_t* lock = lock_rec_find_similar_on_page(
			    type_mode, heap_no, first_lock, trx)) {
1442 1443 1444 1445
			trx_t* lock_trx = lock->trx;
			if (caller_owns_trx_mutex) {
				trx->mutex_unlock();
			}
1446 1447 1448 1449 1450
			{
				TMTrxGuard tg{*lock_trx};
				lock_rec_set_nth_bit(lock, heap_no);
			}

1451 1452 1453
			if (caller_owns_trx_mutex) {
				trx->mutex_lock();
			}
1454 1455 1456 1457
			return;
		}
	}

Marko Mäkelä's avatar
Marko Mäkelä committed
1458
create:
1459 1460 1461 1462 1463 1464
	/* Note: We will not pass any conflicting lock to lock_rec_create(),
	because we should be moving an existing waiting lock request. */
	ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx);

	lock_rec_create_low(nullptr,
			    type_mode, id, page, heap_no, index, trx,
Vlad Lesin's avatar
Vlad Lesin committed
1465
			    caller_owns_trx_mutex);
1466
}
1467 1468 1469 1470 1471 1472 1473

/*********************************************************************//**
Tries to lock the specified record in the mode requested. If not immediately
possible, enqueues a waiting lock request. This is a low-level function
which does NOT look at implicit locks! Checks lock compatibility within
explicit locks. This function sets a normal next-key lock, or in the case
of a page supremum record, a gap type lock.
1474
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
1475 1476 1477 1478
static
dberr_t
lock_rec_lock(
/*==========*/
1479
	bool			impl,	/*!< in: if true, no lock is set
1480 1481 1482
					if no wait is necessary: we
					assume that the caller will
					set an implicit lock */
1483
	unsigned		mode,	/*!< in: lock mode: LOCK_X or
1484 1485 1486 1487 1488 1489 1490 1491
					LOCK_S possibly ORed to either
					LOCK_GAP or LOCK_REC_NOT_GAP */
	const buf_block_t*	block,	/*!< in: buffer block containing
					the record */
	ulint			heap_no,/*!< in: heap number of record */
	dict_index_t*		index,	/*!< in: index of record */
	que_thr_t*		thr)	/*!< in: query thread */
{
1492 1493 1494
  trx_t *trx= thr_get_trx(thr);

  ut_ad(!srv_read_only_mode);
1495 1496 1497
  ut_ad(((LOCK_MODE_MASK | LOCK_TABLE) & mode) == LOCK_S ||
        ((LOCK_MODE_MASK | LOCK_TABLE) & mode) == LOCK_X);
  ut_ad(~mode & (LOCK_GAP | LOCK_REC_NOT_GAP));
1498 1499 1500 1501 1502 1503 1504 1505
  ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
  DBUG_EXECUTE_IF("innodb_report_deadlock", return DB_DEADLOCK;);

  ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ||
        lock_table_has(trx, index->table, LOCK_IS));
  ut_ad((LOCK_MODE_MASK & mode) != LOCK_X ||
         lock_table_has(trx, index->table, LOCK_IX));

1506
  if (lock_table_has(trx, index->table,
1507 1508 1509
                     static_cast<lock_mode>(LOCK_MODE_MASK & mode)))
    return DB_SUCCESS;

1510 1511 1512 1513 1514 1515
  /* During CREATE TABLE, we will write to newly created FTS_*_CONFIG
  on which no lock has been created yet. */
  ut_ad(!trx->dict_operation_lock_mode ||
        (strstr(index->table->name.m_name, "/FTS_") &&
         strstr(index->table->name.m_name, "_CONFIG") + sizeof("_CONFIG") ==
         index->table->name.m_name + strlen(index->table->name.m_name) + 1));
1516 1517
  MONITOR_ATOMIC_INC(MONITOR_NUM_RECLOCK_REQ);
  const page_id_t id{block->page.id()};
1518
  LockGuard g{lock_sys.rec_hash, id};
1519

1520
  if (lock_t *lock= lock_sys_t::get_first(g.cell(), id))
1521
  {
1522
    dberr_t err= DB_SUCCESS;
1523
    trx->mutex_lock();
1524 1525
    if (lock_rec_get_next_on_page(lock) ||
        lock->trx != trx ||
1526
        lock->type_mode != mode ||
1527 1528
        lock_rec_get_n_bits(lock) <= heap_no)
    {
1529
      /* Do nothing if the trx already has a strong enough lock on rec */
1530
      if (!lock_rec_has_expl(mode, g.cell(), id, heap_no, trx))
1531
      {
1532
        if (lock_t *c_lock= lock_rec_other_has_conflicting(mode, g.cell(), id,
Vlad Lesin's avatar
Vlad Lesin committed
1533
                                                           heap_no, trx))
1534 1535 1536 1537
          /*
            If another transaction has a non-gap conflicting
            request in the queue, as this transaction does not
            have a lock strong enough already granted on the
1538 1539
            record, we have to wait.
          */
1540 1541
          err= lock_rec_enqueue_waiting(c_lock, mode, id, block->page.frame,
                                        heap_no, index, thr, nullptr);
1542 1543 1544
        else if (!impl)
        {
          /* Set the requested lock on the record. */
1545
          lock_rec_add_to_queue(mode, g.cell(), id, block->page.frame, heap_no,
Vlad Lesin's avatar
Vlad Lesin committed
1546
                                index, trx, true);
1547 1548 1549
          err= DB_SUCCESS_LOCKED_REC;
        }
      }
1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562
    }
    else if (!impl)
    {
      /*
        If the nth bit of the record lock is already set then we do not set
        a new lock bit, otherwise we do set
      */
      if (!lock_rec_get_nth_bit(lock, heap_no))
      {
        lock_rec_set_nth_bit(lock, heap_no);
        err= DB_SUCCESS_LOCKED_REC;
      }
    }
1563
    trx->mutex_unlock();
1564
    return err;
1565
  }
1566

1567 1568 1569 1570 1571 1572
  /* Simplified and faster path for the most common cases */
  if (!impl)
    lock_rec_create_low(nullptr, mode, id, block->page.frame, heap_no, index,
                        trx, false);

  return DB_SUCCESS_LOCKED_REC;
1573 1574 1575 1576
}

/*********************************************************************//**
Checks if a waiting record lock request still has to wait in a queue.
1577
@return lock that is causing the wait */
1578 1579
static
const lock_t*
1580
lock_rec_has_to_wait_in_queue(const hash_cell_t &cell, const lock_t *wait_lock)
1581 1582 1583 1584 1585 1586
{
	const lock_t*	lock;
	ulint		heap_no;
	ulint		bit_mask;
	ulint		bit_offset;

1587
	ut_ad(wait_lock->is_waiting());
1588
	ut_ad(!wait_lock->is_table());
1589 1590 1591 1592

	heap_no = lock_rec_find_set_bit(wait_lock);

	bit_offset = heap_no / 8;
1593
	bit_mask = static_cast<ulint>(1) << (heap_no % 8);
1594

1595 1596
	for (lock = lock_sys_t::get_first(
		     cell, wait_lock->un_member.rec_lock.page_id);
1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610
	     lock != wait_lock;
	     lock = lock_rec_get_next_on_page_const(lock)) {
		const byte*	p = (const byte*) &lock[1];

		if (heap_no < lock_rec_get_n_bits(lock)
		    && (p[bit_offset] & bit_mask)
		    && lock_has_to_wait(wait_lock, lock)) {
			return(lock);
		}
	}

	return(NULL);
}

1611 1612 1613 1614
/** Note that a record lock wait started */
inline void lock_sys_t::wait_start()
{
  mysql_mutex_assert_owner(&wait_mutex);
1615 1616 1617 1618 1619
  wait_count+= WAIT_COUNT_STEP + 1;
  /* The maximum number of concurrently waiting transactions is one less
  than the maximum number of concurrent transactions. */
  static_assert(WAIT_COUNT_STEP == UNIV_PAGE_SIZE_MAX / 16 * TRX_SYS_N_RSEGS,
                "compatibility");
1620 1621 1622 1623 1624 1625 1626
}

/** Note that a record lock wait resumed */
inline
void lock_sys_t::wait_resume(THD *thd, my_hrtime_t start, my_hrtime_t now)
{
  mysql_mutex_assert_owner(&wait_mutex);
1627 1628 1629
  ut_ad(get_wait_pending());
  ut_ad(get_wait_cumulative());
  wait_count--;
1630 1631
  if (now.val >= start.val)
  {
1632 1633
    const uint32_t diff_time=
      static_cast<uint32_t>((now.val - start.val) / 1000);
1634 1635 1636 1637 1638 1639 1640 1641 1642
    wait_time+= diff_time;

    if (diff_time > wait_time_max)
      wait_time_max= diff_time;

    thd_storage_lock_wait(thd, diff_time);
  }
}

1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657
#ifdef HAVE_REPLICATION
ATTRIBUTE_NOINLINE MY_ATTRIBUTE((nonnull))
/** Report lock waits to parallel replication.
@param trx       transaction that may be waiting for a lock
@param wait_lock lock that is being waited for */
static void lock_wait_rpl_report(trx_t *trx)
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  ut_ad(trx->state == TRX_STATE_ACTIVE);
  THD *const thd= trx->mysql_thd;
  ut_ad(thd);
  const lock_t *wait_lock= trx->lock.wait_lock;
  if (!wait_lock)
    return;
  ut_ad(!(wait_lock->type_mode & LOCK_AUTO_INC));
1658 1659
  /* This would likely be too large to attempt to use a memory transaction,
  even for wait_lock->is_table(). */
1660 1661 1662 1663 1664
  if (!lock_sys.wr_lock_try())
  {
    mysql_mutex_unlock(&lock_sys.wait_mutex);
    lock_sys.wr_lock(SRW_LOCK_CALL);
    mysql_mutex_lock(&lock_sys.wait_mutex);
1665 1666 1667
    wait_lock= trx->lock.wait_lock;
    if (!wait_lock)
    {
1668
func_exit:
1669 1670 1671
      lock_sys.wr_unlock();
      return;
    }
1672
    ut_ad(wait_lock->is_waiting());
1673
  }
1674 1675
  else if (!wait_lock->is_waiting())
    goto func_exit;
1676 1677 1678 1679
  ut_ad(!(wait_lock->type_mode & LOCK_AUTO_INC));

  if (wait_lock->is_table())
  {
1680 1681 1682 1683 1684
    dict_table_t *table= wait_lock->un_member.tab_lock.table;
    for (lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock;
         lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
      if (!(lock->type_mode & LOCK_AUTO_INC) && lock->trx != trx)
        thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
1685
  }
1686
  else
1687
  {
1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701
    const page_id_t id{wait_lock->un_member.rec_lock.page_id};
    hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                         ? lock_sys.prdt_hash : lock_sys.rec_hash).cell_get
      (id.fold());
    if (lock_t *lock= lock_sys_t::get_first(cell, id))
    {
      const ulint heap_no= lock_rec_find_set_bit(wait_lock);
      if (!lock_rec_get_nth_bit(lock, heap_no))
        lock= lock_rec_get_next(heap_no, lock);
      do
        if (lock->trx->mysql_thd != thd)
          thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
      while ((lock= lock_rec_get_next(heap_no, lock)));
    }
1702 1703 1704 1705 1706 1707
  }

  goto func_exit;
}
#endif /* HAVE_REPLICATION */

1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723
/** Wait for a lock to be released.
@retval DB_DEADLOCK if this transaction was chosen as the deadlock victim
@retval DB_INTERRUPTED if the execution was interrupted by the user
@retval DB_LOCK_WAIT_TIMEOUT if the lock wait timed out
@retval DB_SUCCESS if the lock was granted */
dberr_t lock_wait(que_thr_t *thr)
{
  trx_t *trx= thr_get_trx(thr);

  if (trx->mysql_thd)
    DEBUG_SYNC_C("lock_wait_suspend_thread_enter");

  /* InnoDB system transactions may use the global value of
  innodb_lock_wait_timeout, because trx->mysql_thd == NULL. */
  const ulong innodb_lock_wait_timeout= trx_lock_wait_timeout_get(trx);
  const my_hrtime_t suspend_time= my_hrtime_coarse();
1724
  ut_ad(!trx->dict_operation_lock_mode);
1725

1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743
  /* The wait_lock can be cleared by another thread in lock_grant(),
  lock_rec_cancel(), or lock_cancel_waiting_and_release(). But, a wait
  can only be initiated by the current thread which owns the transaction.

  Even if trx->lock.wait_lock were changed, the object that it used to
  point to it will remain valid memory (remain allocated from
  trx->lock.lock_heap). If trx->lock.wait_lock was set to nullptr, the
  original object could be transformed to a granted lock. On a page
  split or merge, we would change trx->lock.wait_lock to point to
  another waiting lock request object, and the old object would be
  logically discarded.

  In any case, it is safe to read the memory that wait_lock points to,
  even though we are not holding any mutex. We are only reading
  wait_lock->type_mode & (LOCK_TABLE | LOCK_AUTO_INC), which will be
  unaffected by any page split or merge operation. (Furthermore,
  table lock objects will never be cloned or moved.) */
  const lock_t *const wait_lock= trx->lock.wait_lock;
1744

1745
  if (!wait_lock)
1746 1747
  {
    /* The lock has already been released or this transaction
1748 1749
    was chosen as a deadlock victim: no need to wait */
    if (trx->lock.was_chosen_as_deadlock_victim.fetch_and(byte(~1)))
1750
      trx->error_state= DB_DEADLOCK;
1751 1752
    else
      trx->error_state= DB_SUCCESS;
1753 1754 1755 1756 1757 1758

    return trx->error_state;
  }

  trx->lock.suspend_time= suspend_time;

1759
  ut_ad(!trx->dict_operation_lock_mode);
1760

1761
  IF_WSREP(if (trx->is_wsrep()) lock_wait_wsrep(trx),);
1762

1763
  const auto type_mode= wait_lock->type_mode;
1764
#ifdef HAVE_REPLICATION
1765 1766 1767 1768 1769 1770 1771 1772
  /* Even though lock_wait_rpl_report() has nothing to do with
  deadlock detection, it was always disabled by innodb_deadlock_detect=OFF.
  We will keep it in that way, because unfortunately
  thd_need_wait_reports() will hold even if parallel (or any) replication
  is not being used. We want to be allow the user to skip
  lock_wait_rpl_report(). */
  const bool rpl= !(type_mode & LOCK_AUTO_INC) && trx->mysql_thd &&
    innodb_deadlock_detect && thd_need_wait_reports(trx->mysql_thd);
1773
#endif
1774 1775 1776 1777
  const bool row_lock_wait= thr->lock_state == QUE_THR_LOCK_ROW;
  timespec abstime;
  set_timespec_time_nsec(abstime, suspend_time.val * 1000);
  abstime.MY_tv_sec+= innodb_lock_wait_timeout;
1778 1779 1780 1781 1782 1783 1784 1785
  /* Dictionary transactions must wait be immune to lock wait timeouts
  for locks on data dictionary tables. Here we check only for
  SYS_TABLES, SYS_COLUMNS, SYS_INDEXES, SYS_FIELDS. Locks on further
  tables SYS_FOREIGN, SYS_FOREIGN_COLS, SYS_VIRTUAL will only be
  acquired while holding an exclusive lock on one of the 4 tables. */
  const bool no_timeout= innodb_lock_wait_timeout >= 100000000 ||
    ((type_mode & LOCK_TABLE) &&
     wait_lock->un_member.tab_lock.table->id <= DICT_FIELDS_ID);
1786
  thd_wait_begin(trx->mysql_thd, (type_mode & LOCK_TABLE)
1787
                 ? THD_WAIT_TABLE_LOCK : THD_WAIT_ROW_LOCK);
1788
  dberr_t error_state= DB_SUCCESS;
1789

1790 1791 1792
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (trx->lock.wait_lock)
  {
1793 1794
    if (Deadlock::check_and_resolve(trx))
    {
1795 1796
      ut_ad(!trx->lock.wait_lock);
      error_state= DB_DEADLOCK;
1797
      goto end_wait;
1798
    }
1799 1800 1801 1802 1803 1804
  }
  else
    goto end_wait;

  if (row_lock_wait)
    lock_sys.wait_start();
1805 1806

#ifdef HAVE_REPLICATION
1807 1808
  if (rpl)
    lock_wait_rpl_report(trx);
1809
#endif
1810

1811 1812 1813 1814 1815 1816 1817
  trx->error_state= DB_SUCCESS;

  while (trx->lock.wait_lock)
  {
    int err;

    if (no_timeout)
1818
    {
1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831
      my_cond_wait(&trx->lock.cond, &lock_sys.wait_mutex.m_mutex);
      err= 0;
    }
    else
      err= my_cond_timedwait(&trx->lock.cond, &lock_sys.wait_mutex.m_mutex,
                             &abstime);
    error_state= trx->error_state;
    switch (error_state) {
    case DB_DEADLOCK:
    case DB_INTERRUPTED:
      break;
    default:
      ut_ad(error_state != DB_LOCK_WAIT_TIMEOUT);
1832 1833 1834 1835
      /* Dictionary transactions must ignore KILL, because they could
      be executed as part of a multi-transaction DDL operation,
      such as rollback_inplace_alter_table() or ha_innobase::delete_table(). */
      if (!trx->dict_operation && trx_is_interrupted(trx))
1836 1837 1838 1839 1840 1841
        /* innobase_kill_query() can only set trx->error_state=DB_INTERRUPTED
        for any transaction that is attached to a connection. */
        error_state= DB_INTERRUPTED;
      else if (!err)
        continue;
#ifdef WITH_WSREP
1842
      else if (trx->is_wsrep() && wsrep_is_BF_lock_timeout(*trx));
1843
#endif
1844
      else
1845 1846 1847
      {
        error_state= DB_LOCK_WAIT_TIMEOUT;
        lock_sys.timeouts++;
1848 1849
      }
    }
1850
    break;
1851 1852 1853 1854 1855
  }

  if (row_lock_wait)
    lock_sys.wait_resume(trx->mysql_thd, suspend_time, my_hrtime_coarse());

1856
  if (lock_t *lock= trx->lock.wait_lock)
1857
  {
1858
    lock_sys_t::cancel<false>(trx, lock);
1859
    lock_sys.deadlock_check();
1860 1861
  }

1862
end_wait:
1863 1864 1865
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  thd_wait_end(trx->mysql_thd);

1866 1867
  trx->error_state= error_state;
  return error_state;
1868
}
1869

1870

1871 1872 1873 1874
/** Resume a lock wait */
static void lock_wait_end(trx_t *trx)
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
1875
  ut_ad(trx->mutex_is_owner());
1876 1877
  ut_d(const auto state= trx->state);
  ut_ad(state == TRX_STATE_ACTIVE || state == TRX_STATE_PREPARED);
1878
  ut_ad(trx->lock.wait_thr);
1879

1880
  if (trx->lock.was_chosen_as_deadlock_victim.fetch_and(byte(~1)))
1881 1882
  {
    ut_ad(state == TRX_STATE_ACTIVE);
1883
    trx->error_state= DB_DEADLOCK;
1884
  }
1885

1886
  trx->lock.wait_thr= nullptr;
1887
  pthread_cond_signal(&trx->lock.cond);
1888
}
1889

1890 1891 1892 1893 1894
/** Grant a waiting lock request and release the waiting transaction. */
static void lock_grant(lock_t *lock)
{
  lock_reset_lock_and_trx_wait(lock);
  trx_t *trx= lock->trx;
1895
  trx->mutex_lock();
1896 1897 1898 1899 1900 1901 1902
  if (lock->mode() == LOCK_AUTO_INC)
  {
    dict_table_t *table= lock->un_member.tab_lock.table;
    ut_ad(!table->autoinc_trx);
    table->autoinc_trx= trx;
    ib_vector_push(trx->autoinc_locks, &lock);
  }
1903

1904
  DBUG_PRINT("ib_lock", ("wait for trx " TRX_ID_FMT " ends", trx->id));
1905

1906 1907
  /* If we are resolving a deadlock by choosing another transaction as
  a victim, then our original transaction may not be waiting anymore */
1908

1909 1910
  if (trx->lock.wait_thr)
    lock_wait_end(trx);
1911

1912
  trx->mutex_unlock();
1913 1914 1915 1916 1917 1918
}

/*************************************************************//**
Cancels a waiting record lock request and releases the waiting transaction
that requested it. NOTE: does NOT check if waiting lock requests behind this
one can now be granted! */
1919
static void lock_rec_cancel(lock_t *lock)
1920
{
1921 1922 1923
  trx_t *trx= lock->trx;
  mysql_mutex_lock(&lock_sys.wait_mutex);
  trx->mutex_lock();
1924

1925 1926
  ut_d(lock_sys.hash_get(lock->type_mode).
       assert_locked(lock->un_member.rec_lock.page_id));
1927 1928
  /* Reset the bit (there can be only one set bit) in the lock bitmap */
  lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
1929

1930 1931
  /* Reset the wait flag and the back pointer to lock in trx */
  lock_reset_lock_and_trx_wait(lock);
1932

1933 1934 1935 1936
  /* The following releases the trx from lock wait */
  lock_wait_end(trx);
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  trx->mutex_unlock();
1937 1938
}

1939 1940 1941
/** Remove a record lock request, waiting or granted, from the queue and
grant locks to other transactions in the queue if they now are entitled
to a lock. NOTE: all record locks contained in in_lock are removed.
1942 1943 1944
@param[in,out]	in_lock		record lock
@param[in]	owns_wait_mutex	whether lock_sys.wait_mutex is held */
static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex)
1945
{
1946 1947 1948
#ifdef SAFE_MUTEX
	ut_ad(owns_wait_mutex == mysql_mutex_is_owner(&lock_sys.wait_mutex));
#endif /* SAFE_MUTEX */
1949
	ut_ad(!in_lock->is_table());
1950

1951
	const page_id_t page_id{in_lock->un_member.rec_lock.page_id};
1952 1953
	auto& lock_hash = lock_sys.hash_get(in_lock->type_mode);
	ut_ad(lock_sys.is_writer() || in_lock->trx->mutex_is_owner());
1954

1955
	ut_d(auto old_n_locks=)
1956
	in_lock->index->table->n_rec_locks--;
1957
	ut_ad(old_n_locks);
1958

1959
	const ulint rec_fold = page_id.fold();
1960 1961
	hash_cell_t &cell = *lock_hash.cell_get(rec_fold);
	lock_sys.assert_locked(cell);
1962

1963 1964
	HASH_DELETE(lock_t, hash, &lock_hash, rec_fold, in_lock);
	ut_ad(lock_sys.is_writer() || in_lock->trx->mutex_is_owner());
1965
	UT_LIST_REMOVE(in_lock->trx->lock.trx_locks, in_lock);
1966 1967 1968

	MONITOR_INC(MONITOR_RECLOCK_REMOVED);
	MONITOR_DEC(MONITOR_NUM_RECLOCK);
1969

1970 1971
	bool acquired = false;

1972 1973 1974
	/* Check if waiting locks in the queue can now be granted:
	grant locks if there are no conflicting locks ahead. Stop at
	the first X lock that is waiting or has been granted. */
1975

1976
	for (lock_t* lock = lock_sys_t::get_first(cell, page_id);
1977 1978
	     lock != NULL;
	     lock = lock_rec_get_next_on_page(lock)) {
1979

1980
		if (!lock->is_waiting()) {
1981 1982
			continue;
		}
1983

1984 1985 1986 1987
		if (!owns_wait_mutex) {
			mysql_mutex_lock(&lock_sys.wait_mutex);
			acquired = owns_wait_mutex = true;
		}
1988 1989 1990 1991

		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

1992 1993
		if (const lock_t* c = lock_rec_has_to_wait_in_queue(
			    cell, lock)) {
1994 1995 1996 1997 1998 1999 2000 2001
			trx_t* c_trx = c->trx;
			lock->trx->lock.wait_trx = c_trx;
			if (c_trx->lock.wait_trx
			    && innodb_deadlock_detect
			    && Deadlock::to_check.emplace(c_trx).second) {
				Deadlock::to_be_checked = true;
			}
		} else {
2002 2003 2004
			/* Grant the lock */
			ut_ad(lock->trx != in_lock->trx);
			lock_grant(lock);
2005
		}
2006
	}
2007 2008 2009 2010

	if (acquired) {
		mysql_mutex_unlock(&lock_sys.wait_mutex);
	}
2011 2012
}

2013 2014 2015
/** Remove a record lock request, waiting or granted, on a discarded page
@param hash     hash table
@param in_lock  lock object */
2016
TRANSACTIONAL_TARGET
2017
void lock_rec_discard(lock_sys_t::hash_table &lock_hash, lock_t *in_lock)
2018
{
2019 2020
  ut_ad(!in_lock->is_table());
  lock_hash.assert_locked(in_lock->un_member.rec_lock.page_id);
2021

2022 2023
  HASH_DELETE(lock_t, hash, &lock_hash,
              in_lock->un_member.rec_lock.page_id.fold(), in_lock);
2024 2025 2026 2027 2028 2029 2030 2031
  ut_d(uint32_t old_locks);
  {
    trx_t *trx= in_lock->trx;
    TMTrxGuard tg{*trx};
    ut_d(old_locks=)
    in_lock->index->table->n_rec_locks--;
    UT_LIST_REMOVE(trx->lock.trx_locks, in_lock);
  }
2032 2033 2034
  ut_ad(old_locks);
  MONITOR_INC(MONITOR_RECLOCK_REMOVED);
  MONITOR_DEC(MONITOR_NUM_RECLOCK);
2035 2036 2037 2038 2039 2040
}

/*************************************************************//**
Removes record lock objects set on an index page which is discarded. This
function does not move locks, or check for waiting locks, therefore the
lock bitmaps must already be reset when this function is called. */
2041
static void
2042
lock_rec_free_all_from_discard_page(page_id_t id, const hash_cell_t &cell,
2043
                                    lock_sys_t::hash_table &lock_hash)
2044
{
2045
  for (lock_t *lock= lock_sys_t::get_first(cell, id); lock; )
2046
  {
2047 2048
    ut_ad(&lock_hash != &lock_sys.rec_hash ||
          lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
2049
    ut_ad(!lock->is_waiting());
2050
    lock_t *next_lock= lock_rec_get_next_on_page(lock);
2051
    lock_rec_discard(lock_hash, lock);
2052 2053
    lock= next_lock;
  }
2054 2055
}

2056 2057 2058 2059
/** Discard locks for an index when purging DELETE FROM SYS_INDEXES
after an aborted CREATE INDEX operation.
@param index   a stale index on which ADD INDEX operation was aborted */
ATTRIBUTE_COLD void lock_discard_for_index(const dict_index_t &index)
2060 2061
{
  ut_ad(!index.is_committed());
2062 2063 2064
  /* This is very rarely executed code, and the size of the hash array
  would exceed the maximum size of a memory transaction. */
  LockMutexGuard g{SRW_LOCK_CALL};
2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083
  const ulint n= lock_sys.rec_hash.pad(lock_sys.rec_hash.n_cells);
  for (ulint i= 0; i < n; i++)
  {
    for (lock_t *lock= static_cast<lock_t*>(lock_sys.rec_hash.array[i].node);
         lock; )
    {
      ut_ad(!lock->is_table());
      if (lock->index == &index)
      {
        ut_ad(!lock->is_waiting());
        lock_rec_discard(lock_sys.rec_hash, lock);
        lock= static_cast<lock_t*>(lock_sys.rec_hash.array[i].node);
      }
      else
        lock= lock->hash;
    }
  }
}

2084 2085 2086 2087 2088
/*============= RECORD LOCK MOVING AND INHERITING ===================*/

/*************************************************************//**
Resets the lock bits for a single record. Releases transactions waiting for
lock requests here. */
2089
TRANSACTIONAL_TARGET
2090 2091
static
void
2092 2093
lock_rec_reset_and_release_wait(const hash_cell_t &cell, const page_id_t id,
                                ulint heap_no)
2094
{
2095
  for (lock_t *lock= lock_sys.get_first(cell, id, heap_no); lock;
2096
       lock= lock_rec_get_next(heap_no, lock))
2097
  {
2098 2099 2100
    if (lock->is_waiting())
      lock_rec_cancel(lock);
    else
2101
    {
2102
      TMTrxGuard tg{*lock->trx};
2103
      lock_rec_reset_nth_bit(lock, heap_no);
2104 2105
    }
  }
2106 2107
}

2108 2109 2110 2111 2112 2113 2114 2115 2116
/*************************************************************//**
Makes a record to inherit the locks (except LOCK_INSERT_INTENTION type)
of another record as gap type locks, but does not reset the lock bits of
the other record. Also waiting lock requests on rec are inherited as
GRANTED gap locks. */
static
void
lock_rec_inherit_to_gap(
/*====================*/
2117
	hash_cell_t&		heir_cell,	/*!< heir hash table cell */
2118
	const page_id_t		heir,		/*!< in: page containing the
2119
						record which inherits */
2120 2121
	const hash_cell_t&	donor_cell,	/*!< donor hash table cell */
	const page_id_t		donor,		/*!< in: page containing the
2122 2123 2124
						record from which inherited;
						does NOT reset the locks on
						this record */
2125
	const page_t*		heir_page,	/*!< in: heir page frame */
2126 2127 2128 2129 2130
	ulint			heir_heap_no,	/*!< in: heap_no of the
						inheriting record */
	ulint			heap_no)	/*!< in: heap_no of the
						donating record */
{
2131 2132
	/* At READ UNCOMMITTED or READ COMMITTED isolation level,
	we do not want locks set
2133
	by an UPDATE or a DELETE to be inherited as gap type locks. But we
Sergei Golubchik's avatar
Sergei Golubchik committed
2134
	DO want S-locks/X-locks(taken for replace) set by a consistency
2135
	constraint to be inherited also then. */
2136

2137
	for (lock_t* lock= lock_sys_t::get_first(donor_cell, donor, heap_no);
2138
	     lock;
2139
	     lock = lock_rec_get_next(heap_no, lock)) {
2140
		trx_t* lock_trx = lock->trx;
2141
		if (!lock->is_insert_intention()
2142
		    && (lock_trx->isolation_level > TRX_ISO_READ_COMMITTED
2143
			|| lock->mode() !=
2144
			(lock_trx->duplicates ? LOCK_S : LOCK_X))) {
2145
			lock_rec_add_to_queue(LOCK_GAP | lock->mode(),
2146
					      heir_cell, heir, heir_page,
2147
					      heir_heap_no,
2148
					      lock->index, lock_trx, false);
2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168
		}
	}
}

/*************************************************************//**
Makes a record to inherit the gap locks (except LOCK_INSERT_INTENTION type)
of another record as gap type locks, but does not reset the lock bits of the
other record. Also waiting lock requests are inherited as GRANTED gap locks. */
static
void
lock_rec_inherit_to_gap_if_gap_lock(
/*================================*/
	const buf_block_t*	block,		/*!< in: buffer block */
	ulint			heir_heap_no,	/*!< in: heap_no of
						record which inherits */
	ulint			heap_no)	/*!< in: heap_no of record
						from which inherited;
						does NOT reset the locks
						on this record */
{
2169
  const page_id_t id{block->page.id()};
2170
  LockGuard g{lock_sys.rec_hash, id};
2171

2172
  for (lock_t *lock= lock_sys_t::get_first(g.cell(), id, heap_no); lock;
2173
       lock= lock_rec_get_next(heap_no, lock))
2174 2175 2176
     if (!lock->is_insert_intention() && (heap_no == PAGE_HEAP_NO_SUPREMUM ||
                                          !lock->is_record_not_gap()) &&
         !lock_table_has(lock->trx, lock->index->table, LOCK_X))
2177
       lock_rec_add_to_queue(LOCK_GAP | lock->mode(),
2178
                             g.cell(), id, block->page.frame,
2179
                             heir_heap_no, lock->index, lock->trx, false);
2180 2181 2182 2183 2184
}

/*************************************************************//**
Moves the locks of a record to another record and resets the lock bits of
the donating record. */
2185
TRANSACTIONAL_TARGET
2186
static
2187
void
2188 2189
lock_rec_move(
	hash_cell_t&		receiver_cell,	/*!< in: hash table cell */
2190
	const buf_block_t&	receiver,	/*!< in: buffer block containing
2191
						the receiving record */
2192 2193
	const page_id_t		receiver_id,	/*!< in: page identifier */
	const hash_cell_t&	donator_cell,	/*!< in: hash table cell */
2194
	const page_id_t		donator_id,	/*!< in: page identifier of
2195 2196 2197 2198 2199 2200 2201 2202
						the donating record */
	ulint			receiver_heap_no,/*!< in: heap_no of the record
						which gets the locks; there
						must be no lock requests
						on it! */
	ulint			donator_heap_no)/*!< in: heap_no of the record
						which gives the locks */
{
2203 2204
	ut_ad(!lock_sys_t::get_first(receiver_cell,
				     receiver_id, receiver_heap_no));
2205

2206 2207
	for (lock_t *lock = lock_sys_t::get_first(donator_cell, donator_id,
						  donator_heap_no);
2208 2209
	     lock != NULL;
	     lock = lock_rec_get_next(donator_heap_no, lock)) {
2210
		const auto type_mode = lock->type_mode;
2211
		if (type_mode & LOCK_WAIT) {
2212 2213
			ut_ad(lock->trx->lock.wait_lock == lock);
			lock->type_mode &= ~LOCK_WAIT;
2214 2215
		}

2216 2217 2218 2219
		trx_t* lock_trx = lock->trx;
		lock_trx->mutex_lock();
		lock_rec_reset_nth_bit(lock, donator_heap_no);

2220
		/* Note that we FIRST reset the bit, and then set the lock:
2221
		the function works also if donator_id == receiver_id */
2222

2223
		lock_rec_add_to_queue(type_mode, receiver_cell,
2224
				      receiver_id, receiver.page.frame,
2225
				      receiver_heap_no,
2226 2227
				      lock->index, lock_trx, true);
		lock_trx->mutex_unlock();
2228 2229
	}

2230 2231
	ut_ad(!lock_sys_t::get_first(donator_cell, donator_id,
				     donator_heap_no));
2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260
}

/** Move all the granted locks to the front of the given lock list.
All the waiting locks will be at the end of the list.
@param[in,out]	lock_list	the given lock list.  */
static
void
lock_move_granted_locks_to_front(
	UT_LIST_BASE_NODE_T(lock_t)&	lock_list)
{
	lock_t*	lock;

	bool seen_waiting_lock = false;

	for (lock = UT_LIST_GET_FIRST(lock_list); lock;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {

		if (!seen_waiting_lock) {
			if (lock->is_waiting()) {
				seen_waiting_lock = true;
			}
			continue;
		}

		ut_ad(seen_waiting_lock);

		if (!lock->is_waiting()) {
			lock_t* prev = UT_LIST_GET_PREV(trx_locks, lock);
			ut_a(prev);
Marko Mäkelä's avatar
Marko Mäkelä committed
2261
			ut_list_move_to_front(lock_list, lock);
2262 2263 2264
			lock = prev;
		}
	}
2265 2266 2267 2268 2269 2270 2271
}

/*************************************************************//**
Updates the lock table when we have reorganized a page. NOTE: we copy
also the locks set on the infimum of the page; the infimum may carry
locks if an update of a record is occurring on the page, and its locks
were temporarily stored on the infimum. */
2272
TRANSACTIONAL_TARGET
2273 2274 2275 2276 2277 2278 2279 2280
void
lock_move_reorganize_page(
/*======================*/
	const buf_block_t*	block,	/*!< in: old index page, now
					reorganized */
	const buf_block_t*	oblock)	/*!< in: copy of the old, not
					reorganized page */
{
2281
  mem_heap_t *heap;
2282

2283 2284 2285
  {
    UT_LIST_BASE_NODE_T(lock_t) old_locks;
    UT_LIST_INIT(old_locks, &lock_t::trx_locks);
2286

2287
    const page_id_t id{block->page.id()};
2288
    const auto id_fold= id.fold();
2289
    {
2290
      TMLockGuard g{lock_sys.rec_hash, id};
2291
      if (!lock_sys_t::get_first(g.cell(), id))
2292 2293 2294
        return;
    }

2295 2296 2297
    /* We will modify arbitrary trx->lock.trx_locks.
    Do not bother with a memory transaction; we are going
    to allocate memory and copy a lot of data. */
2298
    LockMutexGuard g{SRW_LOCK_CALL};
2299
    hash_cell_t &cell= *lock_sys.rec_hash.cell_get(id_fold);
2300

2301 2302 2303
    /* Note: Predicate locks for SPATIAL INDEX are not affected by
    page reorganize, because they do not refer to individual record
    heap numbers. */
2304
    lock_t *lock= lock_sys_t::get_first(cell, id);
2305

2306 2307
    if (!lock)
      return;
2308

2309
    heap= mem_heap_create(256);
2310

2311 2312 2313
    /* Copy first all the locks on the page to heap and reset the
    bitmaps in the original locks; chain the copies of the locks
    using the trx_locks field in them. */
2314

2315 2316 2317 2318
    do
    {
      /* Make a copy of the lock */
      lock_t *old_lock= lock_rec_copy(lock, heap);
2319

2320
      UT_LIST_ADD_LAST(old_locks, old_lock);
2321

2322 2323
      /* Reset bitmap of lock */
      lock_rec_bitmap_reset(lock);
2324

2325 2326 2327 2328 2329
      if (lock->is_waiting())
      {
        ut_ad(lock->trx->lock.wait_lock == lock);
        lock->type_mode&= ~LOCK_WAIT;
      }
2330

2331 2332 2333
      lock= lock_rec_get_next_on_page(lock);
    }
    while (lock);
2334

2335 2336
    const ulint comp= page_is_comp(block->page.frame);
    ut_ad(comp == page_is_comp(oblock->page.frame));
2337

2338
    lock_move_granted_locks_to_front(old_locks);
2339

2340 2341
    DBUG_EXECUTE_IF("do_lock_reverse_page_reorganize",
                    ut_list_reverse(old_locks););
2342

2343 2344 2345 2346 2347 2348 2349
    for (lock= UT_LIST_GET_FIRST(old_locks); lock;
         lock= UT_LIST_GET_NEXT(trx_locks, lock))
    {
      /* NOTE: we copy also the locks set on the infimum and
      supremum of the page; the infimum may carry locks if an
      update of a record is occurring on the page, and its locks
      were temporarily stored on the infimum */
2350 2351
      const rec_t *rec1= page_get_infimum_rec(block->page.frame);
      const rec_t *rec2= page_get_infimum_rec(oblock->page.frame);
2352 2353 2354 2355 2356 2357 2358 2359

      /* Set locks according to old locks */
      for (;;)
      {
        ulint old_heap_no;
        ulint new_heap_no;
        ut_d(const rec_t* const orec= rec1);
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
2360

2361 2362 2363 2364
        if (comp)
        {
          old_heap_no= rec_get_heap_no_new(rec2);
          new_heap_no= rec_get_heap_no_new(rec1);
2365

2366 2367 2368 2369 2370 2371 2372 2373
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          old_heap_no= rec_get_heap_no_old(rec2);
          new_heap_no= rec_get_heap_no_old(rec1);
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
2374

2375 2376 2377
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2378

2379
        trx_t *lock_trx= lock->trx;
2380
	lock_trx->mutex_lock();
2381

2382 2383
	/* Clear the bit in old_lock. */
	if (old_heap_no < lock->un_member.rec_lock.n_bits &&
2384 2385 2386
            lock_rec_reset_nth_bit(lock, old_heap_no))
        {
          ut_ad(!page_rec_is_metadata(orec));
2387

2388 2389
          /* NOTE that the old lock bitmap could be too
          small for the new heap number! */
2390
          lock_rec_add_to_queue(lock->type_mode, cell, id, block->page.frame,
2391
                                new_heap_no, lock->index, lock_trx, true);
2392
        }
2393

2394
	lock_trx->mutex_unlock();
2395

2396 2397 2398 2399 2400 2401
        if (new_heap_no == PAGE_HEAP_NO_SUPREMUM)
        {
           ut_ad(old_heap_no == PAGE_HEAP_NO_SUPREMUM);
           break;
        }
      }
2402

2403 2404 2405
      ut_ad(lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
    }
  }
2406

2407
  mem_heap_free(heap);
2408 2409

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2410 2411 2412 2413 2414
  if (fil_space_t *space= fil_space_t::get(id.space()))
  {
    ut_ad(lock_rec_validate_page(block, space->is_latched()));
    space->release();
  }
2415 2416 2417 2418 2419 2420
#endif
}

/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list end is moved to another page. */
2421
TRANSACTIONAL_TARGET
2422 2423 2424 2425 2426 2427 2428 2429
void
lock_move_rec_list_end(
/*===================*/
	const buf_block_t*	new_block,	/*!< in: index page to move to */
	const buf_block_t*	block,		/*!< in: index page */
	const rec_t*		rec)		/*!< in: record on page: this
						is the first record moved */
{
2430
  const ulint comp= page_rec_is_comp(rec);
2431

2432 2433
  ut_ad(block->page.frame == page_align(rec));
  ut_ad(comp == page_is_comp(new_block->page.frame));
2434

2435 2436 2437
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
  {
2438
    /* This would likely be too large for a memory transaction. */
2439
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2440

2441 2442 2443 2444 2445
    /* Note: when we move locks from record to record, waiting locks
    and possible granted gap type locks behind them are enqueued in
    the original order, because new elements are inserted to a hash
    table to the end of the hash chain, and lock_rec_add_to_queue
    does not reuse locks if there are waiters in the queue. */
2446
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2447 2448 2449 2450 2451
         lock= lock_rec_get_next_on_page(lock))
    {
      const rec_t *rec1= rec;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2452

2453 2454 2455 2456
      if (comp)
      {
        if (page_offset(rec1) == PAGE_NEW_INFIMUM)
          rec1= page_rec_get_next_low(rec1, TRUE);
2457 2458
        rec2= page_rec_get_next_low(new_block->page.frame + PAGE_NEW_INFIMUM,
                                    TRUE);
2459 2460 2461 2462 2463
      }
      else
      {
        if (page_offset(rec1) == PAGE_OLD_INFIMUM)
          rec1= page_rec_get_next_low(rec1, FALSE);
2464 2465
        rec2= page_rec_get_next_low(new_block->page.frame + PAGE_OLD_INFIMUM,
                                    FALSE);
2466
      }
2467

2468 2469 2470 2471 2472 2473
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
      for (;;)
      {
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
        ut_d(const rec_t* const orec= rec1);
2474

2475 2476
        ulint rec1_heap_no;
        ulint rec2_heap_no;
2477

2478 2479 2480 2481 2482
        if (comp)
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
          if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM)
            break;
2483

2484 2485 2486 2487 2488 2489 2490
          rec2_heap_no= rec_get_heap_no_new(rec2);
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
2491

2492 2493 2494
          if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM)
            break;
          rec2_heap_no= rec_get_heap_no_old(rec2);
2495

2496 2497
          ut_ad(rec_get_data_size_old(rec1) == rec_get_data_size_old(rec2));
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec1)));
2498

2499 2500 2501
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2502

2503 2504 2505
        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();

2506 2507 2508 2509
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
        {
          ut_ad(!page_rec_is_metadata(orec));
2510

2511 2512
          if (type_mode & LOCK_WAIT)
          {
2513
            ut_ad(lock_trx->lock.wait_lock == lock);
2514 2515
            lock->type_mode&= ~LOCK_WAIT;
          }
2516

2517 2518
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id,
                                new_block->page.frame,
2519
                                rec2_heap_no, lock->index, lock_trx, true);
2520
        }
2521 2522

        lock_trx->mutex_unlock();
2523 2524 2525
      }
    }
  }
2526 2527

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2528 2529 2530 2531 2532 2533 2534
  if (fil_space_t *space= fil_space_t::get(id.space()))
  {
    const bool is_latched{space->is_latched()};
    ut_ad(lock_rec_validate_page(block, is_latched));
    ut_ad(lock_rec_validate_page(new_block, is_latched));
    space->release();
  }
2535 2536 2537 2538 2539 2540
#endif
}

/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list start is moved to another page. */
2541
TRANSACTIONAL_TARGET
2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556
void
lock_move_rec_list_start(
/*=====================*/
	const buf_block_t*	new_block,	/*!< in: index page to
						move to */
	const buf_block_t*	block,		/*!< in: index page */
	const rec_t*		rec,		/*!< in: record on page:
						this is the first
						record NOT copied */
	const rec_t*		old_end)	/*!< in: old
						previous-to-last
						record on new_page
						before the records
						were copied */
{
2557
  const ulint comp= page_rec_is_comp(rec);
2558

2559 2560 2561
  ut_ad(block->page.frame == page_align(rec));
  ut_ad(comp == page_is_comp(new_block->page.frame));
  ut_ad(new_block->page.frame == page_align(old_end));
2562 2563 2564
  ut_ad(!page_rec_is_metadata(rec));
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
2565

2566
  {
2567
    /* This would likely be too large for a memory transaction. */
2568
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2569

2570
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2571 2572 2573 2574 2575
         lock= lock_rec_get_next_on_page(lock))
    {
      const rec_t *rec1;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2576

2577 2578
      if (comp)
      {
2579 2580
        rec1= page_rec_get_next_low(block->page.frame + PAGE_NEW_INFIMUM,
                                    TRUE);
2581 2582 2583 2584
        rec2= page_rec_get_next_low(old_end, TRUE);
      }
      else
      {
2585 2586
        rec1= page_rec_get_next_low(block->page.frame + PAGE_OLD_INFIMUM,
                                    FALSE);
2587 2588
        rec2= page_rec_get_next_low(old_end, FALSE);
      }
2589

2590 2591
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
2592

2593 2594 2595 2596
      while (rec1 != rec)
      {
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
        ut_d(const rec_t* const prev= rec1);
2597

2598 2599
        ulint rec1_heap_no;
        ulint rec2_heap_no;
2600

2601 2602 2603 2604
        if (comp)
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
          rec2_heap_no= rec_get_heap_no_new(rec2);
2605

2606 2607 2608 2609 2610 2611 2612
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
          rec2_heap_no= rec_get_heap_no_old(rec2);
2613

2614
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
2615

2616 2617 2618
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2619

2620 2621 2622
        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();

2623 2624 2625 2626
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
        {
          ut_ad(!page_rec_is_metadata(prev));
2627

2628 2629
          if (type_mode & LOCK_WAIT)
          {
2630
            ut_ad(lock_trx->lock.wait_lock == lock);
2631 2632 2633
            lock->type_mode&= ~LOCK_WAIT;
          }

2634 2635
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id,
                                new_block->page.frame,
2636
                                rec2_heap_no, lock->index, lock_trx, true);
2637
        }
2638 2639

        lock_trx->mutex_unlock();
2640
      }
2641 2642

#ifdef UNIV_DEBUG
2643 2644 2645
      if (page_rec_is_supremum(rec))
        for (auto i= lock_rec_get_n_bits(lock); --i > PAGE_HEAP_NO_USER_LOW; )
          ut_ad(!lock_rec_get_nth_bit(lock, i));
2646
#endif /* UNIV_DEBUG */
2647 2648
    }
  }
2649 2650

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2651
  ut_ad(lock_rec_validate_page(block));
2652 2653 2654
#endif
}

2655 2656 2657
/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list start is moved to another page. */
2658
TRANSACTIONAL_TARGET
2659 2660 2661 2662 2663 2664 2665 2666 2667 2668
void
lock_rtr_move_rec_list(
/*===================*/
	const buf_block_t*	new_block,	/*!< in: index page to
						move to */
	const buf_block_t*	block,		/*!< in: index page */
	rtr_rec_move_t*		rec_move,       /*!< in: recording records
						moved */
	ulint			num_move)       /*!< in: num of rec to move */
{
2669 2670
  if (!num_move)
    return;
2671

2672
  const ulint comp= page_rec_is_comp(rec_move[0].old_rec);
2673

2674 2675
  ut_ad(block->page.frame == page_align(rec_move[0].old_rec));
  ut_ad(new_block->page.frame == page_align(rec_move[0].new_rec));
2676 2677 2678
  ut_ad(comp == page_rec_is_comp(rec_move[0].new_rec));
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
2679

2680
  {
2681
    /* This would likely be too large for a memory transaction. */
2682
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2683

2684
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2685
         lock= lock_rec_get_next_on_page(lock))
2686 2687 2688 2689
    {
      const rec_t *rec1;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2690

2691 2692
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
2693

2694 2695 2696
      for (ulint moved= 0; moved < num_move; moved++)
      {
        ulint rec1_heap_no;
2697
        ulint rec2_heap_no;
2698

2699 2700 2701 2702
        rec1= rec_move[moved].old_rec;
        rec2= rec_move[moved].new_rec;
        ut_ad(!page_rec_is_metadata(rec1));
        ut_ad(!page_rec_is_metadata(rec2));
2703

2704
        if (comp)
2705 2706
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
2707 2708 2709
          rec2_heap_no= rec_get_heap_no_new(rec2);
        }
        else
2710 2711
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
2712
          rec2_heap_no= rec_get_heap_no_old(rec2);
2713

2714 2715 2716 2717 2718
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
        }

        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();
2719

2720 2721
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
2722 2723 2724
        {
          if (type_mode & LOCK_WAIT)
          {
2725
            ut_ad(lock_trx->lock.wait_lock == lock);
2726 2727
            lock->type_mode&= ~LOCK_WAIT;
          }
2728

2729 2730
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id,
                                new_block->page.frame,
2731
                                rec2_heap_no, lock->index, lock_trx, true);
2732

2733 2734 2735 2736
          rec_move[moved].moved= true;
        }

        lock_trx->mutex_unlock();
2737 2738 2739
      }
    }
  }
2740 2741

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2742
  ut_ad(lock_rec_validate_page(block));
2743 2744
#endif
}
2745 2746 2747 2748 2749 2750 2751 2752
/*************************************************************//**
Updates the lock table when a page is split to the right. */
void
lock_update_split_right(
/*====================*/
	const buf_block_t*	right_block,	/*!< in: right page */
	const buf_block_t*	left_block)	/*!< in: left page */
{
2753 2754 2755
  const ulint h= lock_get_min_heap_no(right_block);
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
2756

2757
  /* This would likely be too large for a memory transaction. */
2758
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2759

2760 2761
  /* Move the locks on the supremum of the left page to the supremum
  of the right page */
2762

2763 2764
  lock_rec_move(g.cell2(), *right_block, r, g.cell1(), l,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
2765

2766 2767
  /* Inherit the locks to the supremum of left page from the successor
  of the infimum on right page */
2768
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->page.frame,
2769
                          PAGE_HEAP_NO_SUPREMUM, h);
2770 2771
}

2772 2773 2774 2775
#ifdef UNIV_DEBUG
static void lock_assert_no_spatial(const page_id_t id)
{
  const auto id_fold= id.fold();
2776 2777
  auto cell= lock_sys.prdt_page_hash.cell_get(id_fold);
  auto latch= lock_sys_t::hash_table::latch(cell);
2778 2779 2780
  latch->acquire();
  /* there should exist no page lock on the left page,
  otherwise, it will be blocked from merge */
2781
  ut_ad(!lock_sys_t::get_first(*cell, id));
2782
  latch->release();
2783 2784
  cell= lock_sys.prdt_hash.cell_get(id_fold);
  latch= lock_sys_t::hash_table::latch(cell);
2785
  latch->acquire();
2786
  ut_ad(!lock_sys_t::get_first(*cell, id));
2787 2788 2789 2790
  latch->release();
}
#endif

2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805
/*************************************************************//**
Updates the lock table when a page is merged to the right. */
void
lock_update_merge_right(
/*====================*/
	const buf_block_t*	right_block,	/*!< in: right page to
						which merged */
	const rec_t*		orig_succ,	/*!< in: original
						successor of infimum
						on the right page
						before merge */
	const buf_block_t*	left_block)	/*!< in: merged index
						page which will be
						discarded */
{
2806
  ut_ad(!page_rec_is_metadata(orig_succ));
2807

2808 2809
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
2810
  /* This would likely be too large for a memory transaction. */
2811
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2812

2813 2814 2815
  /* Inherit the locks from the supremum of the left page to the
  original successor of infimum on the right page, to which the left
  page was merged */
2816
  lock_rec_inherit_to_gap(g.cell2(), r, g.cell1(), l, right_block->page.frame,
2817 2818
                          page_rec_get_heap_no(orig_succ),
                          PAGE_HEAP_NO_SUPREMUM);
2819

2820 2821
  /* Reset the locks on the supremum of the left page, releasing
  waiting transactions */
2822 2823
  lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(l, g.cell1(), lock_sys.rec_hash);
2824

2825
  ut_d(lock_assert_no_spatial(l));
2826 2827
}

2828 2829
/** Update locks when the root page is copied to another in
btr_root_raise_and_insert(). Note that we leave lock structs on the
2830 2831 2832 2833
root page, even though they do not make sense on other than leaf
pages: the reason is that in a pessimistic update the infimum record
of the root page will act as a dummy carrier of the locks of the record
to be updated. */
2834
void lock_update_root_raise(const buf_block_t &block, const page_id_t root)
2835
{
2836
  const page_id_t id{block.page.id()};
2837
  /* This would likely be too large for a memory transaction. */
2838
  LockMultiGuard g{lock_sys.rec_hash, id, root};
2839
  /* Move the locks on the supremum of the root to the supremum of block */
2840 2841
  lock_rec_move(g.cell1(), block, id, g.cell2(), root,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
2842 2843
}

2844 2845 2846 2847
/** Update the lock table when a page is copied to another.
@param new_block  the target page
@param old        old page (not index root page) */
void lock_update_copy_and_discard(const buf_block_t &new_block, page_id_t old)
2848
{
2849
  const page_id_t id{new_block.page.id()};
2850
  /* This would likely be too large for a memory transaction. */
2851
  LockMultiGuard g{lock_sys.rec_hash, id, old};
2852
  /* Move the locks on the supremum of the old page to the supremum of new */
2853 2854 2855
  lock_rec_move(g.cell1(), new_block, id, g.cell2(), old,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(old, g.cell2(), lock_sys.rec_hash);
2856 2857 2858 2859 2860 2861 2862 2863 2864 2865
}

/*************************************************************//**
Updates the lock table when a page is split to the left. */
void
lock_update_split_left(
/*===================*/
	const buf_block_t*	right_block,	/*!< in: right page */
	const buf_block_t*	left_block)	/*!< in: left page */
{
2866 2867 2868
  ulint h= lock_get_min_heap_no(right_block);
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
2869
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2870 2871
  /* Inherit the locks to the supremum of the left page from the
  successor of the infimum on the right page */
2872
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->page.frame,
2873
                          PAGE_HEAP_NO_SUPREMUM, h);
2874 2875
}

2876 2877 2878 2879 2880 2881
/** Update the lock table when a page is merged to the left.
@param left      left page
@param orig_pred original predecessor of supremum on the left page before merge
@param right     merged, to-be-discarded right page */
void lock_update_merge_left(const buf_block_t& left, const rec_t *orig_pred,
                            const page_id_t right)
2882
{
2883
  ut_ad(left.page.frame == page_align(orig_pred));
2884

2885
  const page_id_t l{left.page.id()};
2886

2887
  /* This would likely be too large for a memory transaction. */
2888
  LockMultiGuard g{lock_sys.rec_hash, l, right};
2889
  const rec_t *left_next_rec= page_rec_get_next_const(orig_pred);
2890

2891 2892 2893 2894
  if (!page_rec_is_supremum(left_next_rec))
  {
    /* Inherit the locks on the supremum of the left page to the
    first record which was moved from the right page */
2895
    lock_rec_inherit_to_gap(g.cell1(), l, g.cell1(), l, left.page.frame,
2896 2897 2898 2899 2900
                            page_rec_get_heap_no(left_next_rec),
                            PAGE_HEAP_NO_SUPREMUM);

    /* Reset the locks on the supremum of the left page,
    releasing waiting transactions */
2901
    lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
2902
  }
2903

2904 2905
  /* Move the locks from the supremum of right page to the supremum
  of the left page */
2906 2907 2908
  lock_rec_move(g.cell1(), left, l, g.cell2(), right,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(right, g.cell2(), lock_sys.rec_hash);
2909

2910 2911
  /* there should exist no page lock on the right page,
  otherwise, it will be blocked from merge */
2912
  ut_d(lock_assert_no_spatial(right));
2913 2914
}

2915 2916 2917 2918 2919 2920
/*************************************************************//**
Resets the original locks on heir and replaces them with gap type locks
inherited from rec. */
void
lock_rec_reset_and_inherit_gap_locks(
/*=================================*/
2921
	const buf_block_t&	heir_block,	/*!< in: block containing the
2922
						record which inherits */
2923
	const page_id_t		donor,		/*!< in: page containing the
2924 2925 2926 2927 2928 2929 2930 2931
						record from which inherited;
						does NOT reset the locks on
						this record */
	ulint			heir_heap_no,	/*!< in: heap_no of the
						inheriting record */
	ulint			heap_no)	/*!< in: heap_no of the
						donating record */
{
2932
  const page_id_t heir{heir_block.page.id()};
2933
  /* This is a rare operation and likely too large for a memory transaction. */
2934
  LockMultiGuard g{lock_sys.rec_hash, heir, donor};
2935
  lock_rec_reset_and_release_wait(g.cell1(), heir, heir_heap_no);
2936 2937
  lock_rec_inherit_to_gap(g.cell1(), heir, g.cell2(), donor,
                          heir_block.page.frame, heir_heap_no, heap_no);
2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951
}

/*************************************************************//**
Updates the lock table when a page is discarded. */
void
lock_update_discard(
/*================*/
	const buf_block_t*	heir_block,	/*!< in: index page
						which will inherit the locks */
	ulint			heir_heap_no,	/*!< in: heap_no of the record
						which will inherit the locks */
	const buf_block_t*	block)		/*!< in: index page
						which will be discarded */
{
2952
	const page_t*	page = block->page.frame;
2953 2954
	const rec_t*	rec;
	ulint		heap_no;
2955
	const page_id_t	heir(heir_block->page.id());
2956
	const page_id_t	page_id(block->page.id());
2957
	/* This would likely be too large for a memory transaction. */
2958
	LockMultiGuard	g{lock_sys.rec_hash, heir, page_id};
2959

2960
	if (lock_sys_t::get_first(g.cell2(), page_id)) {
2961
		ut_d(lock_assert_no_spatial(page_id));
2962 2963
		/* Inherit all the locks on the page to the record and
		reset all the locks on the page */
2964

2965 2966
		if (page_is_comp(page)) {
			rec = page + PAGE_NEW_INFIMUM;
2967

2968 2969
			do {
				heap_no = rec_get_heap_no_new(rec);
2970

2971 2972
				lock_rec_inherit_to_gap(g.cell1(), heir,
							g.cell2(), page_id,
2973
							heir_block->page.frame,
2974
							heir_heap_no, heap_no);
2975

2976
				lock_rec_reset_and_release_wait(
2977
					g.cell2(), page_id, heap_no);
2978

2979 2980 2981 2982
				rec = page + rec_get_next_offs(rec, TRUE);
			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
		} else {
			rec = page + PAGE_OLD_INFIMUM;
2983

2984 2985
			do {
				heap_no = rec_get_heap_no_old(rec);
2986

2987 2988
				lock_rec_inherit_to_gap(g.cell1(), heir,
							g.cell2(), page_id,
2989
							heir_block->page.frame,
2990
							heir_heap_no, heap_no);
2991

2992
				lock_rec_reset_and_release_wait(
2993
					g.cell2(), page_id, heap_no);
2994

2995 2996 2997
				rec = page + rec_get_next_offs(rec, FALSE);
			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
		}
2998

2999
		lock_rec_free_all_from_discard_page(page_id, g.cell2(),
3000
						    lock_sys.rec_hash);
3001
	} else {
3002
		const auto fold = page_id.fold();
3003 3004
		auto cell = lock_sys.prdt_hash.cell_get(fold);
		auto latch = lock_sys_t::hash_table::latch(cell);
3005
		latch->acquire();
3006
		lock_rec_free_all_from_discard_page(page_id, *cell,
3007 3008
						    lock_sys.prdt_hash);
		latch->release();
3009 3010
		cell = lock_sys.prdt_page_hash.cell_get(fold);
		latch = lock_sys_t::hash_table::latch(cell);
3011
		latch->acquire();
3012 3013
		lock_rec_free_all_from_discard_page(page_id, *cell,
						    lock_sys.prdt_page_hash);
3014
		latch->release();
3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028
	}
}

/*************************************************************//**
Updates the lock table when a new user record is inserted. */
void
lock_update_insert(
/*===============*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: the inserted record */
{
	ulint	receiver_heap_no;
	ulint	donator_heap_no;

3029
	ut_ad(block->page.frame == page_align(rec));
3030
	ut_ad(!page_rec_is_metadata(rec));
3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056

	/* Inherit the gap-locking locks for rec, in gap mode, from the next
	record */

	if (page_rec_is_comp(rec)) {
		receiver_heap_no = rec_get_heap_no_new(rec);
		donator_heap_no = rec_get_heap_no_new(
			page_rec_get_next_low(rec, TRUE));
	} else {
		receiver_heap_no = rec_get_heap_no_old(rec);
		donator_heap_no = rec_get_heap_no_old(
			page_rec_get_next_low(rec, FALSE));
	}

	lock_rec_inherit_to_gap_if_gap_lock(
		block, receiver_heap_no, donator_heap_no);
}

/*************************************************************//**
Updates the lock table when a record is removed. */
void
lock_update_delete(
/*===============*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: the record to be removed */
{
3057
	const page_t*	page = block->page.frame;
3058 3059 3060 3061
	ulint		heap_no;
	ulint		next_heap_no;

	ut_ad(page == page_align(rec));
3062
	ut_ad(!page_rec_is_metadata(rec));
3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075

	if (page_is_comp(page)) {
		heap_no = rec_get_heap_no_new(rec);
		next_heap_no = rec_get_heap_no_new(page
						   + rec_get_next_offs(rec,
								       TRUE));
	} else {
		heap_no = rec_get_heap_no_old(rec);
		next_heap_no = rec_get_heap_no_old(page
						   + rec_get_next_offs(rec,
								       FALSE));
	}

3076
	const page_id_t id{block->page.id()};
3077
	LockGuard g{lock_sys.rec_hash, id};
3078 3079 3080

	/* Let the next record inherit the locks from rec, in gap mode */

3081
	lock_rec_inherit_to_gap(g.cell(), id, g.cell(), id, block->page.frame,
3082
				next_heap_no, heap_no);
3083 3084

	/* Reset the lock bits on rec and release waiting transactions */
3085
	lock_rec_reset_and_release_wait(g.cell(), id, heap_no);
3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104
}

/*********************************************************************//**
Stores on the page infimum record the explicit locks of another record.
This function is used to store the lock state of a record when it is
updated and the size of the record changes in the update. The record
is moved in such an update, perhaps to another page. The infimum record
acts as a dummy carrier record, taking care of lock releases while the
actual record is being moved. */
void
lock_rec_store_on_page_infimum(
/*===========================*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: record whose lock state
					is stored on the infimum
					record of the same page; lock
					bits are reset on the
					record */
{
3105
  const ulint heap_no= page_rec_get_heap_no(rec);
3106

3107
  ut_ad(block->page.frame == page_align(rec));
3108
  const page_id_t id{block->page.id()};
3109

3110
  LockGuard g{lock_sys.rec_hash, id};
3111 3112
  lock_rec_move(g.cell(), *block, id, g.cell(), id,
                PAGE_HEAP_NO_INFIMUM, heap_no);
3113 3114
}

3115 3116 3117 3118 3119 3120 3121 3122
/** Restore the explicit lock requests on a single record, where the
state was stored on the infimum of a page.
@param block   buffer block containing rec
@param rec     record whose lock state is restored
@param donator page (rec is not necessarily on this page)
whose infimum stored the lock state; lock bits are reset on the infimum */
void lock_rec_restore_from_page_infimum(const buf_block_t &block,
					const rec_t *rec, page_id_t donator)
3123
{
3124
  const ulint heap_no= page_rec_get_heap_no(rec);
3125 3126 3127 3128
  const page_id_t id{block.page.id()};
  LockMultiGuard g{lock_sys.rec_hash, id, donator};
  lock_rec_move(g.cell1(), block, id, g.cell2(), donator, heap_no,
                PAGE_HEAP_NO_INFIMUM);
3129 3130
}

3131
/*========================= TABLE LOCKS ==============================*/
3132

3133 3134 3135 3136 3137 3138 3139 3140 3141
/**
Create a table lock, without checking for deadlocks or lock compatibility.
@param table      table on which the lock is created
@param type_mode  lock type and mode
@param trx        transaction
@param c_lock     conflicting lock
@return the created lock object */
lock_t *lock_table_create(dict_table_t *table, unsigned type_mode, trx_t *trx,
                          lock_t *c_lock)
3142
{
3143
	lock_t*		lock;
3144

3145
	lock_sys.assert_locked(*table);
3146
	ut_ad(trx->mutex_is_owner());
3147 3148
	ut_ad(!trx->is_wsrep() || lock_sys.is_writer());
	ut_ad(trx->state == TRX_STATE_ACTIVE || trx->is_recovered);
Marko Mäkelä's avatar
Marko Mäkelä committed
3149
	ut_ad(!trx->is_autocommit_non_locking());
3150 3151 3152 3153 3154 3155
	/* During CREATE TABLE, we will write to newly created FTS_*_CONFIG
	on which no lock has been created yet. */
	ut_ad(!trx->dict_operation_lock_mode
	      || (strstr(table->name.m_name, "/FTS_")
		  && strstr(table->name.m_name, "_CONFIG") + sizeof("_CONFIG")
		  == table->name.m_name + strlen(table->name.m_name) + 1));
3156

3157 3158
	switch (LOCK_MODE_MASK & type_mode) {
	case LOCK_AUTO_INC:
3159
		++table->n_waiting_or_granted_auto_inc_locks;
3160 3161 3162 3163 3164
		/* For AUTOINC locking we reuse the lock instance only if
		there is no wait involved else we allocate the waiting lock
		from the transaction lock heap. */
		if (type_mode == LOCK_AUTO_INC) {
			lock = table->autoinc_lock;
3165

3166 3167
			ut_ad(!table->autoinc_trx);
			table->autoinc_trx = trx;
3168

3169 3170 3171
			ib_vector_push(trx->autoinc_locks, &lock);
			goto allocated;
		}
3172

3173 3174 3175 3176 3177
		break;
	case LOCK_X:
	case LOCK_S:
		++table->n_lock_x_or_s;
		break;
3178 3179
	}

3180 3181 3182 3183 3184 3185
	lock = trx->lock.table_cached < array_elements(trx->lock.table_pool)
		? &trx->lock.table_pool[trx->lock.table_cached++]
		: static_cast<lock_t*>(
			mem_heap_alloc(trx->lock.lock_heap, sizeof *lock));

allocated:
3186 3187
	lock->type_mode = ib_uint32_t(type_mode | LOCK_TABLE);
	lock->trx = trx;
3188

3189
	lock->un_member.tab_lock.table = table;
3190

3191
	ut_ad(table->get_ref_count() > 0 || !table->can_be_evicted);
3192

3193
	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
3194

3195
	ut_list_append(table->locks, lock, TableLockGetNode());
3196

3197
	if (type_mode & LOCK_WAIT) {
3198 3199 3200 3201 3202 3203 3204 3205 3206
		if (trx->lock.wait_trx) {
			ut_ad(!c_lock || trx->lock.wait_trx == c_lock->trx);
			ut_ad(trx->lock.wait_lock);
			ut_ad((*trx->lock.wait_lock).trx == trx);
		} else {
			ut_ad(c_lock);
			trx->lock.wait_trx = c_lock->trx;
			ut_ad(!trx->lock.wait_lock);
		}
3207
		trx->lock.wait_lock = lock;
3208 3209
	}

3210 3211 3212 3213
	lock->trx->lock.table_locks.push_back(lock);

	MONITOR_INC(MONITOR_TABLELOCK_CREATED);
	MONITOR_INC(MONITOR_NUM_TABLELOCK);
3214 3215 3216 3217

	return(lock);
}

3218 3219 3220 3221 3222
/*************************************************************//**
Pops autoinc lock requests from the transaction's autoinc_locks. We
handle the case where there are gaps in the array and they need to
be popped off the stack. */
UNIV_INLINE
3223
void
3224 3225 3226
lock_table_pop_autoinc_locks(
/*=========================*/
	trx_t*	trx)	/*!< in/out: transaction that owns the AUTOINC locks */
3227
{
3228
	ut_ad(!ib_vector_is_empty(trx->autoinc_locks));
3229

3230 3231
	/* Skip any gaps, gaps are NULL lock entries in the
	trx->autoinc_locks vector. */
3232

3233 3234
	do {
		ib_vector_pop(trx->autoinc_locks);
3235

3236 3237 3238
		if (ib_vector_is_empty(trx->autoinc_locks)) {
			return;
		}
3239

3240 3241
	} while (*(lock_t**) ib_vector_get_last(trx->autoinc_locks) == NULL);
}
3242

3243 3244 3245 3246 3247 3248 3249 3250 3251
/*************************************************************//**
Removes an autoinc lock request from the transaction's autoinc_locks. */
UNIV_INLINE
void
lock_table_remove_autoinc_lock(
/*===========================*/
	lock_t*	lock,	/*!< in: table lock */
	trx_t*	trx)	/*!< in/out: transaction that owns the lock */
{
3252
	ut_ad(lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE));
3253
	lock_sys.assert_locked(*lock->un_member.tab_lock.table);
3254 3255 3256 3257
	ut_ad(trx->mutex_is_owner());

	auto s = ib_vector_size(trx->autoinc_locks);
	ut_ad(s);
3258

3259 3260 3261 3262
	/* With stored functions and procedures the user may drop
	a table within the same "statement". This special case has
	to be handled by deleting only those AUTOINC locks that were
	held by the table being dropped. */
3263

3264 3265
	lock_t*	autoinc_lock = *static_cast<lock_t**>(
		ib_vector_get(trx->autoinc_locks, --s));
3266

3267
	/* This is the default fast case. */
3268

3269 3270 3271 3272 3273
	if (autoinc_lock == lock) {
		lock_table_pop_autoinc_locks(trx);
	} else {
		/* The last element should never be NULL */
		ut_a(autoinc_lock != NULL);
3274

3275
		/* Handle freeing the locks from within the stack. */
3276

3277
		while (s) {
3278
			autoinc_lock = *static_cast<lock_t**>(
3279
				ib_vector_get(trx->autoinc_locks, --s));
3280

3281 3282
			if (autoinc_lock == lock) {
				void*	null_var = NULL;
3283
				ib_vector_set(trx->autoinc_locks, s, &null_var);
3284 3285
				return;
			}
3286
		}
3287

3288 3289
		/* Must find the autoinc lock. */
		ut_error;
3290
	}
3291 3292
}

3293 3294 3295 3296 3297
/*************************************************************//**
Removes a table lock request from the queue and the trx list of locks;
this is a low-level function which does NOT check if waiting requests
can now be granted. */
UNIV_INLINE
3298
const dict_table_t*
3299 3300 3301
lock_table_remove_low(
/*==================*/
	lock_t*	lock)	/*!< in/out: table lock */
3302
{
3303 3304
	ut_ad(lock->is_table());

3305 3306
	trx_t*		trx;
	dict_table_t*	table;
3307

3308
	ut_ad(lock->is_table());
3309 3310
	trx = lock->trx;
	table = lock->un_member.tab_lock.table;
3311
	lock_sys.assert_locked(*table);
3312
	ut_ad(trx->mutex_is_owner());
3313

3314 3315
	/* Remove the table from the transaction's AUTOINC vector, if
	the lock that is being released is an AUTOINC lock. */
3316 3317
	switch (lock->mode()) {
	case LOCK_AUTO_INC:
3318
		ut_ad((table->autoinc_trx == trx) == !lock->is_waiting());
3319

3320 3321
		if (table->autoinc_trx == trx) {
			table->autoinc_trx = NULL;
3322 3323 3324
			/* The locks must be freed in the reverse order from
			the one in which they were acquired. This is to avoid
			traversing the AUTOINC lock vector unnecessarily.
3325

3326 3327 3328
			We only store locks that were granted in the
			trx->autoinc_locks vector (see lock_table_create()
			and lock_grant()). */
3329 3330 3331
			lock_table_remove_autoinc_lock(lock, trx);
		}

3332 3333 3334 3335 3336 3337 3338 3339 3340 3341
		ut_ad(table->n_waiting_or_granted_auto_inc_locks);
		--table->n_waiting_or_granted_auto_inc_locks;
		break;
	case LOCK_X:
	case LOCK_S:
		ut_ad(table->n_lock_x_or_s);
		--table->n_lock_x_or_s;
		break;
	default:
		break;
3342 3343 3344 3345 3346 3347 3348
	}

	UT_LIST_REMOVE(trx->lock.trx_locks, lock);
	ut_list_remove(table->locks, lock, TableLockGetNode());

	MONITOR_INC(MONITOR_TABLELOCK_REMOVED);
	MONITOR_DEC(MONITOR_NUM_TABLELOCK);
3349
	return table;
3350 3351
}

3352 3353 3354
/*********************************************************************//**
Enqueues a waiting request for a table lock which cannot be granted
immediately. Checks for deadlocks.
3355
@retval	DB_LOCK_WAIT	if the waiting lock was enqueued
3356
@retval	DB_DEADLOCK	if this transaction was chosen as the victim */
3357
static
3358 3359 3360
dberr_t
lock_table_enqueue_waiting(
/*=======================*/
3361
	unsigned	mode,	/*!< in: lock mode this transaction is
3362 3363
				requesting */
	dict_table_t*	table,	/*!< in/out: table */
3364 3365
	que_thr_t*	thr,	/*!< in: query thread */
	lock_t*		c_lock)	/*!< in: conflicting lock or NULL */
3366
{
3367
	lock_sys.assert_locked(*table);
3368
	ut_ad(!srv_read_only_mode);
3369

3370 3371
	trx_t* trx = thr_get_trx(thr);
	ut_ad(trx->mutex_is_owner());
3372
	ut_ad(!trx->dict_operation_lock_mode);
3373

3374
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
3375
	if (trx->is_wsrep() && trx->lock.was_chosen_as_deadlock_victim) {
3376 3377 3378
		return(DB_DEADLOCK);
	}
#endif /* WITH_WSREP */
3379

3380
	/* Enqueue the lock request that will wait to be granted */
3381
	lock_table_create(table, mode | LOCK_WAIT, trx, c_lock);
3382

3383
	trx->lock.wait_thr = thr;
3384
	trx->lock.clear_deadlock_victim();
3385

3386 3387 3388
	MONITOR_INC(MONITOR_TABLELOCK_WAIT);
	return(DB_LOCK_WAIT);
}
3389

3390 3391 3392 3393 3394
/*********************************************************************//**
Checks if other transactions have an incompatible mode lock request in
the lock queue.
@return lock or NULL */
UNIV_INLINE
3395
lock_t*
3396 3397 3398 3399 3400 3401 3402 3403 3404 3405
lock_table_other_has_incompatible(
/*==============================*/
	const trx_t*		trx,	/*!< in: transaction, or NULL if all
					transactions should be included */
	ulint			wait,	/*!< in: LOCK_WAIT if also
					waiting locks are taken into
					account, or 0 if not */
	const dict_table_t*	table,	/*!< in: table */
	lock_mode		mode)	/*!< in: lock mode */
{
3406
	lock_sys.assert_locked(*table);
3407

3408 3409 3410 3411 3412 3413 3414 3415 3416
	static_assert(LOCK_IS == 0, "compatibility");
	static_assert(LOCK_IX == 1, "compatibility");

	if (UNIV_LIKELY(mode <= LOCK_IX && !table->n_lock_x_or_s)) {
		return(NULL);
	}

	for (lock_t* lock = UT_LIST_GET_LAST(table->locks);
	     lock;
3417
	     lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock)) {
3418

3419 3420 3421
		trx_t* lock_trx = lock->trx;

		if (lock_trx != trx
3422 3423
		    && !lock_mode_compatible(lock->mode(), mode)
		    && (wait || !lock->is_waiting())) {
3424 3425 3426
			return(lock);
		}
	}
3427

3428 3429
	return(NULL);
}
3430

3431 3432 3433 3434
/** Aqcuire or enqueue a table lock */
static dberr_t lock_table_low(dict_table_t *table, lock_mode mode,
                              que_thr_t *thr, trx_t *trx)
{
3435
  DBUG_EXECUTE_IF("innodb_table_deadlock", return DB_DEADLOCK;);
3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462
  lock_t *wait_for=
    lock_table_other_has_incompatible(trx, LOCK_WAIT, table, mode);
  dberr_t err= DB_SUCCESS;

  trx->mutex_lock();

  if (wait_for)
    err= lock_table_enqueue_waiting(mode, table, thr, wait_for);
  else
    lock_table_create(table, mode, trx, nullptr);

  trx->mutex_unlock();

  return err;
}

#ifdef WITH_WSREP
/** Aqcuire or enqueue a table lock in Galera replication mode. */
ATTRIBUTE_NOINLINE
static dberr_t lock_table_wsrep(dict_table_t *table, lock_mode mode,
                                que_thr_t *thr, trx_t *trx)
{
  LockMutexGuard g{SRW_LOCK_CALL};
  return lock_table_low(table, mode, thr, trx);
}
#endif

3463 3464 3465
/*********************************************************************//**
Locks the specified database table in the mode given. If the lock cannot
be granted immediately, the query thread is put to wait.
3466
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
3467 3468 3469 3470 3471 3472 3473 3474 3475
dberr_t
lock_table(
/*=======*/
	dict_table_t*	table,	/*!< in/out: database table
				in dictionary cache */
	lock_mode	mode,	/*!< in: lock mode */
	que_thr_t*	thr)	/*!< in: query thread */
{
	trx_t*		trx;
3476

3477 3478
	if (table->is_temporary()) {
		return DB_SUCCESS;
3479
	}
3480

3481
	trx = thr_get_trx(thr);
3482

3483
	/* Look for equal or stronger locks the same trx already
3484
	has on the table. No need to acquire LockMutexGuard here
3485
	because only this transaction can add/access table locks
3486
	to/from trx_t::table_locks. */
3487

3488
	if (lock_table_has(trx, table, mode) || srv_read_only_mode) {
3489
		return(DB_SUCCESS);
3490 3491
	}

3492 3493 3494 3495
	/* Read only transactions can write to temp tables, we don't want
	to promote them to RW transactions. Their updates cannot be visible
	to other transactions. Therefore we can keep them out
	of the read views. */
3496

3497 3498 3499
	if ((mode == LOCK_IX || mode == LOCK_X)
	    && !trx->read_only
	    && trx->rsegs.m_redo.rseg == 0) {
3500

3501 3502
		trx_set_rw_mode(trx);
	}
3503

3504 3505
#ifdef WITH_WSREP
	if (trx->is_wsrep()) {
3506
		return lock_table_wsrep(table, mode, thr, trx);
3507
	}
3508
#endif
3509 3510
	lock_sys.rd_lock(SRW_LOCK_CALL);
	table->lock_mutex_lock();
3511
	dberr_t err = lock_table_low(table, mode, thr, trx);
3512 3513
	table->lock_mutex_unlock();
	lock_sys.rd_unlock();
3514

3515
	return err;
3516 3517
}

3518
/** Create a table lock object for a resurrected transaction.
3519
@param table    table to be X-locked
3520 3521 3522
@param trx      transaction
@param mode     LOCK_X or LOCK_IX */
void lock_table_resurrect(dict_table_t *table, trx_t *trx, lock_mode mode)
3523 3524
{
  ut_ad(trx->is_recovered);
3525 3526 3527
  ut_ad(mode == LOCK_X || mode == LOCK_IX);

  if (lock_table_has(trx, table, mode))
3528 3529
    return;

3530
  {
3531 3532
    /* This is executed at server startup while no connections
    are alowed. Do not bother with lock elision. */
3533
    LockMutexGuard g{SRW_LOCK_CALL};
3534
    ut_ad(!lock_table_other_has_incompatible(trx, LOCK_WAIT, table, mode));
3535

3536
    trx->mutex_lock();
3537
    lock_table_create(table, mode, trx);
3538
  }
3539
  trx->mutex_unlock();
3540 3541
}

3542 3543
/** Find a lock that a waiting table lock request still has to wait for. */
static const lock_t *lock_table_has_to_wait_in_queue(const lock_t *wait_lock)
3544
{
3545 3546
  ut_ad(wait_lock->is_waiting());
  ut_ad(wait_lock->is_table());
3547

3548 3549
  dict_table_t *table= wait_lock->un_member.tab_lock.table;
  lock_sys.assert_locked(*table);
3550

3551 3552
  static_assert(LOCK_IS == 0, "compatibility");
  static_assert(LOCK_IX == 1, "compatibility");
3553

3554 3555
  if (UNIV_LIKELY(wait_lock->mode() <= LOCK_IX && !table->n_lock_x_or_s))
    return nullptr;
3556

3557 3558 3559 3560
  for (const lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock != wait_lock;
       lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
    if (lock_has_to_wait(wait_lock, lock))
      return lock;
3561

3562
  return nullptr;
3563
}
3564

3565 3566 3567
/*************************************************************//**
Removes a table lock request, waiting or granted, from the queue and grants
locks to other transactions in the queue, if they now are entitled to a
3568 3569 3570 3571
lock.
@param[in,out]	in_lock		table lock
@param[in]	owns_wait_mutex	whether lock_sys.wait_mutex is held */
static void lock_table_dequeue(lock_t *in_lock, bool owns_wait_mutex)
3572
{
3573 3574 3575
#ifdef SAFE_MUTEX
	ut_ad(owns_wait_mutex == mysql_mutex_is_owner(&lock_sys.wait_mutex));
#endif
3576
	ut_ad(in_lock->trx->mutex_is_owner());
3577
	lock_t*	lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, in_lock);
3578

3579
	const dict_table_t* table = lock_table_remove_low(in_lock);
3580

3581 3582 3583 3584 3585 3586 3587
	static_assert(LOCK_IS == 0, "compatibility");
	static_assert(LOCK_IX == 1, "compatibility");

	if (UNIV_LIKELY(in_lock->mode() <= LOCK_IX && !table->n_lock_x_or_s)) {
		return;
	}

3588 3589
	bool acquired = false;

3590 3591
	/* Check if waiting locks in the queue can now be granted: grant
	locks if there are no conflicting locks ahead. */
3592

3593 3594 3595
	for (/* No op */;
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
3596 3597 3598 3599 3600 3601 3602 3603
		if (!lock->is_waiting()) {
			continue;
		}

		if (!owns_wait_mutex) {
			mysql_mutex_lock(&lock_sys.wait_mutex);
			acquired = owns_wait_mutex = true;
		}
3604

3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616
		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

		if (const lock_t* c = lock_table_has_to_wait_in_queue(lock)) {
			trx_t* c_trx = c->trx;
			lock->trx->lock.wait_trx = c_trx;
			if (c_trx->lock.wait_trx
			    && innodb_deadlock_detect
			    && Deadlock::to_check.emplace(c_trx).second) {
				Deadlock::to_be_checked = true;
			}
		} else {
3617 3618
			/* Grant the lock */
			ut_ad(in_lock->trx != lock->trx);
3619
			in_lock->trx->mutex_unlock();
3620
			lock_grant(lock);
3621
			in_lock->trx->mutex_lock();
3622 3623
		}
	}
3624 3625 3626 3627

	if (acquired) {
		mysql_mutex_unlock(&lock_sys.wait_mutex);
	}
3628 3629
}

3630

3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647
/** Sets a lock on a table based on the given mode.
@param table	table to lock
@param trx	transaction
@param mode	LOCK_X or LOCK_S
@param no_wait  whether to skip handling DB_LOCK_WAIT
@return error code */
dberr_t lock_table_for_trx(dict_table_t *table, trx_t *trx, lock_mode mode,
                           bool no_wait)
{
  mem_heap_t *heap= mem_heap_create(512);
  sel_node_t *node= sel_node_create(heap);
  que_thr_t *thr= pars_complete_graph_for_exec(node, trx, heap, nullptr);
  thr->graph->state= QUE_FORK_ACTIVE;

  thr= static_cast<que_thr_t*>
    (que_fork_get_first_thr(static_cast<que_fork_t*>
                            (que_node_get_parent(thr))));
3648 3649

run_again:
3650 3651 3652
  thr->run_node= thr;
  thr->prev_node= thr->common.parent;
  dberr_t err= lock_table(table, mode, thr);
3653

3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668
  switch (err) {
  case DB_SUCCESS:
    break;
  case DB_LOCK_WAIT:
    if (no_wait)
    {
      lock_sys.cancel_lock_wait_for_trx(trx);
      break;
    }
    /* fall through */
  default:
    trx->error_state= err;
    if (row_mysql_handle_errors(&err, trx, thr, nullptr))
      goto run_again;
  }
3669

3670 3671
  que_graph_free(thr->graph);
  trx->op_info= "";
3672

3673
  return err;
3674 3675
}

3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697
/** Exclusively lock the data dictionary tables.
@param trx  dictionary transaction
@return error code
@retval DB_SUCCESS on success */
dberr_t lock_sys_tables(trx_t *trx)
{
  dberr_t err;
  if (!(err= lock_table_for_trx(dict_sys.sys_tables, trx, LOCK_X)) &&
      !(err= lock_table_for_trx(dict_sys.sys_columns, trx, LOCK_X)) &&
      !(err= lock_table_for_trx(dict_sys.sys_indexes, trx, LOCK_X)) &&
      !(err= lock_table_for_trx(dict_sys.sys_fields, trx, LOCK_X)))
  {
    if (dict_sys.sys_foreign)
      err= lock_table_for_trx(dict_sys.sys_foreign, trx, LOCK_X);
    if (!err && dict_sys.sys_foreign_cols)
      err= lock_table_for_trx(dict_sys.sys_foreign_cols, trx, LOCK_X);
    if (!err && dict_sys.sys_virtual)
      err= lock_table_for_trx(dict_sys.sys_virtual, trx, LOCK_X);
  }
  return err;
}

3698
/*=========================== LOCK RELEASE ==============================*/
3699

3700 3701 3702 3703
/*************************************************************//**
Removes a granted record lock of a transaction from the queue and grants
locks to other transactions waiting in the queue if they now are entitled
to a lock. */
3704
TRANSACTIONAL_TARGET
3705 3706 3707 3708 3709
void
lock_rec_unlock(
/*============*/
	trx_t*			trx,	/*!< in/out: transaction that has
					set a record lock */
3710
	const page_id_t		id,	/*!< in: page containing rec */
3711 3712
	const rec_t*		rec,	/*!< in: record */
	lock_mode		lock_mode)/*!< in: LOCK_S or LOCK_X */
3713
{
3714 3715 3716
	lock_t*		first_lock;
	lock_t*		lock;
	ulint		heap_no;
3717

3718 3719 3720 3721
	ut_ad(trx);
	ut_ad(rec);
	ut_ad(!trx->lock.wait_lock);
	ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
3722
	ut_ad(!page_rec_is_metadata(rec));
3723

3724
	heap_no = page_rec_get_heap_no(rec);
3725

3726
	LockGuard g{lock_sys.rec_hash, id};
3727

3728
	first_lock = lock_sys_t::get_first(g.cell(), id, heap_no);
3729

3730 3731
	/* Find the last lock with the same lock_mode and transaction
	on the record. */
3732

3733 3734
	for (lock = first_lock; lock != NULL;
	     lock = lock_rec_get_next(heap_no, lock)) {
3735
		if (lock->trx == trx && lock->mode() == lock_mode) {
3736 3737
			goto released;
		}
3738 3739
	}

3740 3741 3742 3743
	{
		ib::error	err;
		err << "Unlock row could not find a " << lock_mode
			<< " mode lock on the record. Current statement: ";
Marko Mäkelä's avatar
Marko Mäkelä committed
3744 3745 3746 3747
		size_t		stmt_len;
		if (const char* stmt = innobase_get_stmt_unsafe(
			    trx->mysql_thd, &stmt_len)) {
			err.write(stmt, stmt_len);
3748
		}
3749
	}
3750

3751
	return;
3752

3753
released:
3754
	ut_a(!lock->is_waiting());
3755 3756 3757 3758
	{
		TMTrxGuard tg{*trx};
		lock_rec_reset_nth_bit(lock, heap_no);
	}
3759

3760
	/* Check if we can now grant waiting lock requests */
3761

3762 3763
	for (lock = first_lock; lock != NULL;
	     lock = lock_rec_get_next(heap_no, lock)) {
3764
		if (!lock->is_waiting()) {
3765 3766
			continue;
		}
3767 3768 3769 3770
		mysql_mutex_lock(&lock_sys.wait_mutex);
		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

3771 3772
		if (const lock_t* c = lock_rec_has_to_wait_in_queue(g.cell(),
								    lock)) {
3773 3774 3775 3776 3777
			lock->trx->lock.wait_trx = c->trx;
		} else {
			/* Grant the lock */
			ut_ad(trx != lock->trx);
			lock_grant(lock);
3778
		}
3779
		mysql_mutex_unlock(&lock_sys.wait_mutex);
3780
	}
3781
}
3782

3783
/** Release the explicit locks of a committing transaction,
3784 3785
and release possible other transactions waiting because of these locks.
@return whether the operation succeeded */
3786
TRANSACTIONAL_TARGET static bool lock_release_try(trx_t *trx)
3787
{
3788 3789 3790 3791
  /* At this point, trx->lock.trx_locks cannot be modified by other
  threads, because our transaction has been committed.
  See the checks and assertions in lock_rec_create_low() and
  lock_rec_add_to_queue().
3792

3793 3794 3795 3796 3797
  The function lock_table_create() should never be invoked on behalf
  of a transaction running in another thread. Also there, we will
  assert that the current transaction be active. */
  DBUG_ASSERT(trx->state == TRX_STATE_COMMITTED_IN_MEMORY);
  DBUG_ASSERT(!trx->is_referenced());
3798

3799 3800 3801
  bool all_released= true;
restart:
  ulint count= 1000;
3802 3803 3804 3805
  /* We will not attempt hardware lock elision (memory transaction)
  here. Both lock_rec_dequeue_from_page() and lock_table_dequeue()
  would likely lead to a memory transaction due to a system call, to
  wake up a waiting transaction. */
3806
  lock_sys.rd_lock(SRW_LOCK_CALL);
3807
  trx->mutex_lock();
3808

3809 3810 3811 3812 3813 3814
  /* Note: Anywhere else, trx->mutex is not held while acquiring
  a lock table latch, but here we are following the opposite order.
  To avoid deadlocks, we only try to acquire the lock table latches
  but not keep waiting for them. */

  for (lock_t *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock; )
3815 3816
  {
    ut_ad(lock->trx == trx);
3817
    lock_t *prev= UT_LIST_GET_PREV(trx_locks, lock);
3818 3819 3820 3821 3822 3823
    if (!lock->is_table())
    {
      ut_ad(!lock->index->table->is_temporary());
      ut_ad(lock->mode() != LOCK_X ||
            lock->index->table->id >= DICT_HDR_FIRST_ID ||
            trx->dict_operation);
3824
      auto &lock_hash= lock_sys.hash_get(lock->type_mode);
3825 3826
      auto cell= lock_hash.cell_get(lock->un_member.rec_lock.page_id.fold());
      auto latch= lock_sys_t::hash_table::latch(cell);
3827 3828 3829 3830 3831 3832 3833
      if (!latch->try_acquire())
        all_released= false;
      else
      {
        lock_rec_dequeue_from_page(lock, false);
        latch->release();
      }
3834 3835 3836
    }
    else
    {
3837
      dict_table_t *table= lock->un_member.tab_lock.table;
3838 3839 3840 3841
      ut_ad(!table->is_temporary());
      ut_ad(table->id >= DICT_HDR_FIRST_ID ||
            (lock->mode() != LOCK_IX && lock->mode() != LOCK_X) ||
            trx->dict_operation);
3842 3843 3844 3845 3846 3847 3848
      if (!table->lock_mutex_trylock())
        all_released= false;
      else
      {
        lock_table_dequeue(lock, false);
        table->lock_mutex_unlock();
      }
3849
    }
3850

3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866
    lock= all_released ? UT_LIST_GET_LAST(trx->lock.trx_locks) : prev;
    if (!--count)
      break;
  }

  lock_sys.rd_unlock();
  trx->mutex_unlock();
  if (all_released && !count)
    goto restart;
  return all_released;
}

/** Release the explicit locks of a committing transaction,
and release possible other transactions waiting because of these locks. */
void lock_release(trx_t *trx)
{
3867
#ifdef UNIV_DEBUG
3868
  std::set<table_id_t> to_evict;
3869 3870 3871 3872 3873 3874
  if (innodb_evict_tables_on_commit_debug &&
      !trx->is_recovered && !trx->dict_operation &&
      !trx->dict_operation_lock_mode)
    for (const auto& p : trx->mod_tables)
      if (!p.first->is_temporary())
        to_evict.emplace(p.first->id);
3875
#endif
3876 3877 3878 3879 3880 3881 3882 3883 3884
  ulint count;

  for (count= 5; count--; )
    if (lock_release_try(trx))
      goto released;

  /* Fall back to acquiring lock_sys.latch in exclusive mode */
restart:
  count= 1000;
3885 3886
  /* There is probably no point to try lock elision here;
  in lock_release_try() it is different. */
3887 3888 3889 3890 3891 3892 3893
  lock_sys.wr_lock(SRW_LOCK_CALL);
  trx->mutex_lock();

  while (lock_t *lock= UT_LIST_GET_LAST(trx->lock.trx_locks))
  {
    ut_ad(lock->trx == trx);
    if (!lock->is_table())
3894
    {
3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908
      ut_ad(!lock->index->table->is_temporary());
      ut_ad(lock->mode() != LOCK_X ||
            lock->index->table->id >= DICT_HDR_FIRST_ID ||
            trx->dict_operation);
      lock_rec_dequeue_from_page(lock, false);
    }
    else
    {
      ut_d(dict_table_t *table= lock->un_member.tab_lock.table);
      ut_ad(!table->is_temporary());
      ut_ad(table->id >= DICT_HDR_FIRST_ID ||
            (lock->mode() != LOCK_IX && lock->mode() != LOCK_X) ||
            trx->dict_operation);
      lock_table_dequeue(lock, false);
3909
    }
3910

3911 3912
    if (!--count)
      break;
3913
  }
3914

3915
  lock_sys.wr_unlock();
3916
  trx->mutex_unlock();
3917 3918 3919 3920
  if (!count)
    goto restart;

released:
3921 3922 3923
  if (UNIV_UNLIKELY(Deadlock::to_be_checked))
  {
    mysql_mutex_lock(&lock_sys.wait_mutex);
3924
    lock_sys.deadlock_check();
3925 3926
    mysql_mutex_unlock(&lock_sys.wait_mutex);
  }
3927

3928 3929
  trx->lock.was_chosen_as_deadlock_victim= false;
  trx->lock.n_rec_locks= 0;
3930

3931
#ifdef UNIV_DEBUG
3932 3933
  if (to_evict.empty())
    return;
3934
  dict_sys.lock(SRW_LOCK_CALL);
3935 3936
  LockMutexGuard g{SRW_LOCK_CALL};
  for (const table_id_t id : to_evict)
3937
    if (dict_table_t *table= dict_sys.find_table(id))
3938 3939
      if (!table->get_ref_count() && !UT_LIST_GET_LEN(table->locks))
        dict_sys.remove(table, true);
3940
  dict_sys.unlock();
3941
#endif
3942 3943
}

Marko Mäkelä's avatar
Marko Mäkelä committed
3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011
/** Release non-exclusive locks on XA PREPARE,
and wake up possible other transactions waiting because of these locks.
@param trx   transaction in XA PREPARE state
@return whether all locks were released */
static bool lock_release_on_prepare_try(trx_t *trx)
{
  /* At this point, trx->lock.trx_locks can still be modified by other
  threads to convert implicit exclusive locks into explicit ones.

  The function lock_table_create() should never be invoked on behalf
  of a transaction that is running in another thread. Also there, we
  will assert that the current transaction be active. */
  DBUG_ASSERT(trx->state == TRX_STATE_PREPARED);

  bool all_released= true;
  lock_sys.rd_lock(SRW_LOCK_CALL);
  trx->mutex_lock();

  /* Note: Normally, trx->mutex is not held while acquiring
  a lock table latch, but here we are following the opposite order.
  To avoid deadlocks, we only try to acquire the lock table latches
  but not keep waiting for them. */

  for (lock_t *prev, *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock;
       lock= prev)
  {
    ut_ad(lock->trx == trx);
    prev= UT_LIST_GET_PREV(trx_locks, lock);
    if (!lock->is_table())
    {
      ut_ad(!lock->index->table->is_temporary());
      if (lock->mode() == LOCK_X && !lock->is_gap())
        continue;
      auto &lock_hash= lock_sys.hash_get(lock->type_mode);
      auto cell= lock_hash.cell_get(lock->un_member.rec_lock.page_id.fold());
      auto latch= lock_sys_t::hash_table::latch(cell);
      if (latch->try_acquire())
      {
        lock_rec_dequeue_from_page(lock, false);
        latch->release();
      }
      else
        all_released= false;
    }
    else
    {
      dict_table_t *table= lock->un_member.tab_lock.table;
      ut_ad(!table->is_temporary());
      switch (lock->mode()) {
      case LOCK_IS:
      case LOCK_S:
        if (table->lock_mutex_trylock())
        {
          lock_table_dequeue(lock, false);
          table->lock_mutex_unlock();
        }
        else
          all_released= false;
        break;
      case LOCK_IX:
      case LOCK_X:
        ut_ad(table->id >= DICT_HDR_FIRST_ID || trx->dict_operation);
        /* fall through */
      default:
        break;
      }
    }
  }
4012

Marko Mäkelä's avatar
Marko Mäkelä committed
4013 4014 4015
  lock_sys.rd_unlock();
  trx->mutex_unlock();
  return all_released;
4016 4017
}

4018 4019 4020 4021
/** Release non-exclusive locks on XA PREPARE,
and release possible other transactions waiting because of these locks. */
void lock_release_on_prepare(trx_t *trx)
{
Marko Mäkelä's avatar
Marko Mäkelä committed
4022 4023 4024
  for (ulint count= 5; count--; )
    if (lock_release_on_prepare_try(trx))
      return;
4025

Marko Mäkelä's avatar
Marko Mäkelä committed
4026 4027 4028 4029 4030
  LockMutexGuard g{SRW_LOCK_CALL};
  trx->mutex_lock();

  for (lock_t *prev, *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock;
       lock= prev)
4031 4032
  {
    ut_ad(lock->trx == trx);
Marko Mäkelä's avatar
Marko Mäkelä committed
4033 4034
    prev= UT_LIST_GET_PREV(trx_locks, lock);
    if (!lock->is_table())
4035 4036
    {
      ut_ad(!lock->index->table->is_temporary());
Marko Mäkelä's avatar
Marko Mäkelä committed
4037 4038
      if (lock->mode() != LOCK_X || lock->is_gap())
        lock_rec_dequeue_from_page(lock, false);
4039 4040 4041
    }
    else
    {
4042
      ut_d(dict_table_t *table= lock->un_member.tab_lock.table);
4043
      ut_ad(!table->is_temporary());
Marko Mäkelä's avatar
Marko Mäkelä committed
4044
      switch (lock->mode()) {
4045 4046
      case LOCK_IS:
      case LOCK_S:
Marko Mäkelä's avatar
Marko Mäkelä committed
4047
        lock_table_dequeue(lock, false);
4048 4049 4050 4051 4052 4053
        break;
      case LOCK_IX:
      case LOCK_X:
        ut_ad(table->id >= DICT_HDR_FIRST_ID || trx->dict_operation);
        /* fall through */
      default:
Marko Mäkelä's avatar
Marko Mäkelä committed
4054
        break;
4055 4056
      }
    }
Marko Mäkelä's avatar
Marko Mäkelä committed
4057
  }
4058

Marko Mäkelä's avatar
Marko Mäkelä committed
4059 4060 4061
  trx->mutex_unlock();
}

4062
/** Release locks on a table whose creation is being rolled back */
4063 4064
ATTRIBUTE_COLD
void lock_release_on_rollback(trx_t *trx, dict_table_t *table)
4065 4066 4067
{
  trx->mod_tables.erase(table);

4068 4069 4070
  /* This is very rarely executed code, in the rare case that an
  CREATE TABLE operation is being rolled back. Theoretically,
  we might try to remove the locks in multiple memory transactions. */
4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095
  lock_sys.wr_lock(SRW_LOCK_CALL);
  trx->mutex_lock();

  for (lock_t *next, *lock= UT_LIST_GET_FIRST(table->locks); lock; lock= next)
  {
    next= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock);
    ut_ad(lock->trx == trx);
    UT_LIST_REMOVE(trx->lock.trx_locks, lock);
    ut_list_remove(table->locks, lock, TableLockGetNode());
  }

  for (lock_t *p, *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock; lock= p)
  {
    p= UT_LIST_GET_PREV(trx_locks, lock);
    ut_ad(lock->trx == trx);
    if (lock->is_table())
      ut_ad(lock->un_member.tab_lock.table != table);
    else if (lock->index->table == table)
      lock_rec_dequeue_from_page(lock, false);
  }

  lock_sys.wr_unlock();
  trx->mutex_unlock();
}

4096 4097 4098
/*********************************************************************//**
Removes table locks of the transaction on a table to be dropped. */
static
4099
void
4100 4101 4102
lock_trx_table_locks_remove(
/*========================*/
	const lock_t*	lock_to_remove)		/*!< in: lock to remove */
4103
{
4104
	trx_t*		trx = lock_to_remove->trx;
4105

4106
	ut_ad(lock_to_remove->is_table());
4107
	lock_sys.assert_locked(*lock_to_remove->un_member.tab_lock.table);
4108
	ut_ad(trx->mutex_is_owner());
4109

4110 4111
	for (lock_list::iterator it = trx->lock.table_locks.begin(),
             end = trx->lock.table_locks.end(); it != end; ++it) {
4112 4113
		const lock_t*	lock = *it;

4114
		ut_ad(!lock || trx == lock->trx);
4115
		ut_ad(!lock || lock->is_table());
4116
		ut_ad(!lock || lock->un_member.tab_lock.table);
4117

4118 4119 4120 4121
		if (lock == lock_to_remove) {
			*it = NULL;
			return;
		}
4122 4123
	}

4124 4125
	/* Lock must exist in the vector. */
	ut_error;
4126 4127
}

4128
/*===================== VALIDATION AND DEBUGGING ====================*/
4129

4130 4131 4132 4133
/** Print info of a table lock.
@param[in,out]	file	output stream
@param[in]	lock	table lock */
static
4134
void
4135
lock_table_print(FILE* file, const lock_t* lock)
4136
{
4137
	lock_sys.assert_locked();
4138
	ut_a(lock->is_table());
4139

4140 4141 4142
	fputs("TABLE LOCK table ", file);
	ut_print_name(file, lock->trx,
		      lock->un_member.tab_lock.table->name.m_name);
4143
	fprintf(file, " trx id " TRX_ID_FMT, lock->trx->id);
4144

4145 4146
	switch (auto mode = lock->mode()) {
	case LOCK_S:
4147
		fputs(" lock mode S", file);
4148 4149
		break;
	case LOCK_X:
4150 4151
		ut_ad(lock->trx->id != 0);
		fputs(" lock mode X", file);
4152 4153
		break;
	case LOCK_IS:
4154
		fputs(" lock mode IS", file);
4155 4156
		break;
	case LOCK_IX:
4157 4158
		ut_ad(lock->trx->id != 0);
		fputs(" lock mode IX", file);
4159 4160
		break;
	case LOCK_AUTO_INC:
4161
		fputs(" lock mode AUTO-INC", file);
4162 4163 4164
		break;
	default:
		fprintf(file, " unknown lock mode %u", mode);
4165 4166
	}

4167
	if (lock->is_waiting()) {
4168 4169
		fputs(" waiting", file);
	}
4170

4171
	putc('\n', file);
4172 4173
}

Marko Mäkelä's avatar
Marko Mäkelä committed
4174
/** Pretty-print a record lock.
4175
@param[in,out]	file	output stream
Marko Mäkelä's avatar
Marko Mäkelä committed
4176 4177 4178
@param[in]	lock	record lock
@param[in,out]	mtr	mini-transaction for accessing the record */
static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr)
4179
{
4180
	ut_ad(!lock->is_table());
4181

4182
	const page_id_t page_id{lock->un_member.rec_lock.page_id};
4183
	ut_d(lock_sys.hash_get(lock->type_mode).assert_locked(page_id));
4184

4185 4186 4187 4188
	fprintf(file, "RECORD LOCKS space id %u page no %u n bits " ULINTPF
		" index %s of table ",
		page_id.space(), page_id.page_no(),
		lock_rec_get_n_bits(lock),
4189
		lock->index->name());
4190
	ut_print_name(file, lock->trx, lock->index->table->name.m_name);
4191
	fprintf(file, " trx id " TRX_ID_FMT, lock->trx->id);
4192

4193 4194
	switch (lock->mode()) {
	case LOCK_S:
4195
		fputs(" lock mode S", file);
4196 4197
		break;
	case LOCK_X:
4198
		fputs(" lock_mode X", file);
4199 4200
		break;
	default:
4201 4202
		ut_error;
	}
4203

4204
	if (lock->is_gap()) {
4205 4206
		fputs(" locks gap before rec", file);
	}
4207

4208
	if (lock->is_record_not_gap()) {
4209 4210
		fputs(" locks rec but not gap", file);
	}
4211

4212
	if (lock->is_insert_intention()) {
4213 4214
		fputs(" insert intention", file);
	}
4215

4216
	if (lock->is_waiting()) {
4217 4218
		fputs(" waiting", file);
	}
4219

4220 4221
	putc('\n', file);

Marko Mäkelä's avatar
Marko Mäkelä committed
4222
	mem_heap_t*		heap		= NULL;
4223 4224
	rec_offs		offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*		offsets		= offsets_;
Marko Mäkelä's avatar
Marko Mäkelä committed
4225
	rec_offs_init(offsets_);
4226

Marko Mäkelä's avatar
Marko Mäkelä committed
4227
	mtr.start();
4228
	const buf_block_t* block = buf_page_try_get(page_id, &mtr);
4229 4230 4231 4232 4233 4234 4235 4236 4237 4238

	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {

		if (!lock_rec_get_nth_bit(lock, i)) {
			continue;
		}

		fprintf(file, "Record lock, heap no %lu", (ulong) i);

		if (block) {
4239
			ut_ad(page_is_leaf(block->page.frame));
4240 4241 4242 4243
			const rec_t*	rec;

			rec = page_find_rec_with_heap_no(
				buf_block_get_frame(block), i);
4244
			ut_ad(!page_rec_is_metadata(rec));
4245

4246
			offsets = rec_get_offsets(
4247 4248
				rec, lock->index, offsets,
				lock->index->n_core_fields,
4249 4250 4251 4252
				ULINT_UNDEFINED, &heap);

			putc(' ', file);
			rec_print_new(file, rec, offsets);
4253
		}
4254 4255

		putc('\n', file);
4256 4257
	}

Marko Mäkelä's avatar
Marko Mäkelä committed
4258
	mtr.commit();
4259

Marko Mäkelä's avatar
Marko Mäkelä committed
4260
	if (UNIV_LIKELY_NULL(heap)) {
4261 4262
		mem_heap_free(heap);
	}
4263 4264
}

4265 4266 4267 4268 4269 4270 4271 4272 4273 4274 4275
#ifdef UNIV_DEBUG
/* Print the number of lock structs from lock_print_info_summary() only
in non-production builds for performance reasons, see
http://bugs.mysql.com/36942 */
#define PRINT_NUM_OF_LOCK_STRUCTS
#endif /* UNIV_DEBUG */

#ifdef PRINT_NUM_OF_LOCK_STRUCTS
/*********************************************************************//**
Calculates the number of record lock structs in the record lock hash table.
@return number of record locks */
4276
TRANSACTIONAL_TARGET
4277
static ulint lock_get_n_rec_locks()
4278
{
4279 4280
	ulint	n_locks	= 0;
	ulint	i;
4281

4282
	lock_sys.assert_locked();
4283

4284
	for (i = 0; i < lock_sys.rec_hash.n_cells; i++) {
4285
		const lock_t*	lock;
4286

4287
		for (lock = static_cast<const lock_t*>(
4288
			     HASH_GET_FIRST(&lock_sys.rec_hash, i));
4289 4290 4291
		     lock != 0;
		     lock = static_cast<const lock_t*>(
				HASH_GET_NEXT(hash, lock))) {
4292

4293 4294 4295
			n_locks++;
		}
	}
4296

4297 4298 4299
	return(n_locks);
}
#endif /* PRINT_NUM_OF_LOCK_STRUCTS */
4300

4301 4302
/*********************************************************************//**
Prints info of locks for all transactions.
4303
@return FALSE if not able to acquire lock_sys.latch (and dislay info) */
4304 4305 4306 4307
ibool
lock_print_info_summary(
/*====================*/
	FILE*	file,	/*!< in: file where to print */
4308
	ibool	nowait)	/*!< in: whether to wait for lock_sys.latch */
4309
{
4310 4311 4312
	/* Here, lock elision does not make sense, because
	for the output we are going to invoke system calls,
	which would interrupt a memory transaction. */
4313
	if (!nowait) {
4314 4315
		lock_sys.wr_lock(SRW_LOCK_CALL);
	} else if (!lock_sys.wr_lock_try()) {
4316 4317 4318 4319
		fputs("FAIL TO OBTAIN LOCK MUTEX,"
		      " SKIP LOCK INFO PRINTING\n", file);
		return(FALSE);
	}
4320

4321
	if (lock_sys.deadlocks) {
4322 4323 4324 4325 4326 4327
		fputs("------------------------\n"
		      "LATEST DETECTED DEADLOCK\n"
		      "------------------------\n", file);

		if (!srv_read_only_mode) {
			ut_copy_file(file, lock_latest_err_file);
4328 4329 4330
		}
	}

4331 4332 4333
	fputs("------------\n"
	      "TRANSACTIONS\n"
	      "------------\n", file);
4334

4335
	fprintf(file, "Trx id counter " TRX_ID_FMT "\n",
4336
		trx_sys.get_max_trx_id());
4337

4338 4339
	fprintf(file,
		"Purge done for trx's n:o < " TRX_ID_FMT
4340
		" undo n:o < " TRX_ID_FMT " state: %s\n"
4341
		"History list length %u\n",
4342
		purge_sys.tail.trx_no,
4343 4344 4345 4346 4347
		purge_sys.tail.undo_no,
		purge_sys.enabled()
		? (purge_sys.running() ? "running"
		   : purge_sys.paused() ? "stopped" : "running but idle")
		: "disabled",
4348
		trx_sys.history_size());
4349

4350 4351 4352 4353 4354 4355 4356
#ifdef PRINT_NUM_OF_LOCK_STRUCTS
	fprintf(file,
		"Total number of lock structs in row lock hash table %lu\n",
		(ulong) lock_get_n_rec_locks());
#endif /* PRINT_NUM_OF_LOCK_STRUCTS */
	return(TRUE);
}
4357

4358 4359
/** Prints transaction lock wait and MVCC state.
@param[in,out]	file	file where to print
Marko Mäkelä's avatar
Marko Mäkelä committed
4360
@param[in]	trx	transaction
4361 4362 4363
@param[in]	now	current my_hrtime_coarse() */
void lock_trx_print_wait_and_mvcc_state(FILE *file, const trx_t *trx,
                                        my_hrtime_t now)
4364
{
4365
	fprintf(file, "---");
4366

4367
	trx_print_latched(file, trx, 600);
4368
	trx->read_view.print_limits(file);
4369

4370
	if (const lock_t* wait_lock = trx->lock.wait_lock) {
4371
		const my_hrtime_t suspend_time= trx->lock.suspend_time;
4372
		fprintf(file,
4373
			"------- TRX HAS BEEN WAITING %llu ns"
4374
			" FOR THIS LOCK TO BE GRANTED:\n",
4375
			now.val - suspend_time.val);
4376

4377
		if (!wait_lock->is_table()) {
Marko Mäkelä's avatar
Marko Mäkelä committed
4378
			mtr_t mtr;
4379
			lock_rec_print(file, wait_lock, mtr);
4380
		} else {
4381
			lock_table_print(file, wait_lock);
4382 4383
		}

4384 4385 4386
		fprintf(file, "------------------\n");
	}
}
4387

4388
/*********************************************************************//**
4389
Prints info of locks for a transaction. */
4390
static
4391
void
4392 4393 4394
lock_trx_print_locks(
/*=================*/
	FILE*		file,		/*!< in/out: File to write */
4395
	const trx_t*	trx)		/*!< in: current transaction */
4396
{
Marko Mäkelä's avatar
Marko Mäkelä committed
4397
	mtr_t mtr;
4398
	uint32_t i= 0;
4399
	/* Iterate over the transaction's locks. */
4400
	lock_sys.assert_locked();
4401 4402 4403
	for (lock_t *lock = UT_LIST_GET_FIRST(trx->lock.trx_locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
4404
		if (!lock->is_table()) {
Marko Mäkelä's avatar
Marko Mäkelä committed
4405
			lock_rec_print(file, lock, mtr);
4406 4407 4408 4409
		} else {
			lock_table_print(file, lock);
		}

4410
		if (++i == 10) {
4411 4412 4413 4414 4415 4416

			fprintf(file,
				"10 LOCKS PRINTED FOR THIS TRX:"
				" SUPPRESSING FURTHER PRINTS\n");

			break;
4417 4418
		}
	}
4419
}
4420

Marko Mäkelä's avatar
Marko Mäkelä committed
4421
/** Functor to display all transactions */
4422
struct lock_print_info
4423
{
4424
  lock_print_info(FILE* file, my_hrtime_t now) :
4425
    file(file), now(now),
4426
    purge_trx(purge_sys.query ? purge_sys.query->trx : nullptr)
4427
  {}
4428

4429
  void operator()(const trx_t &trx) const
4430
  {
4431
    if (UNIV_UNLIKELY(&trx == purge_trx))
4432
      return;
4433
    lock_trx_print_wait_and_mvcc_state(file, &trx, now);
4434

4435 4436
    if (trx.will_lock && srv_print_innodb_lock_monitor)
      lock_trx_print_locks(file, &trx);
4437
  }
4438

4439
  FILE* const file;
4440
  const my_hrtime_t now;
4441
  const trx_t* const purge_trx;
4442
};
4443

4444
/*********************************************************************//**
4445 4446
Prints info of locks for each transaction. This function will release
lock_sys.latch, which the caller must be holding in exclusive mode. */
4447 4448 4449 4450
void
lock_print_info_all_transactions(
/*=============================*/
	FILE*		file)	/*!< in/out: file where to print */
4451
{
4452 4453
	fprintf(file, "LIST OF TRANSACTIONS FOR EACH SESSION:\n");

4454
	trx_sys.trx_list.for_each(lock_print_info(file, my_hrtime_coarse()));
4455
	lock_sys.wr_unlock();
4456

4457
	ut_d(lock_validate());
4458 4459
}

4460
#ifdef UNIV_DEBUG
4461
/*********************************************************************//**
4462 4463 4464 4465 4466 4467 4468 4469
Find the the lock in the trx_t::trx_lock_t::table_locks vector.
@return true if found */
static
bool
lock_trx_table_locks_find(
/*======================*/
	trx_t*		trx,		/*!< in: trx to validate */
	const lock_t*	find_lock)	/*!< in: lock to find */
4470
{
4471
	bool		found = false;
4472

4473 4474
	ut_ad(trx->mutex_is_owner());

4475 4476
	for (lock_list::const_iterator it = trx->lock.table_locks.begin(),
             end = trx->lock.table_locks.end(); it != end; ++it) {
4477

4478
		const lock_t*	lock = *it;
4479

4480
		if (lock == NULL) {
4481

4482
			continue;
4483

4484
		} else if (lock == find_lock) {
4485

4486 4487 4488 4489
			/* Can't be duplicates. */
			ut_a(!found);
			found = true;
		}
4490

4491
		ut_a(trx == lock->trx);
4492
		ut_a(lock->is_table());
4493
		ut_a(lock->un_member.tab_lock.table != NULL);
4494 4495
	}

4496 4497
	return(found);
}
4498 4499

/*********************************************************************//**
4500 4501 4502 4503 4504 4505 4506
Validates the lock queue on a table.
@return TRUE if ok */
static
ibool
lock_table_queue_validate(
/*======================*/
	const dict_table_t*	table)	/*!< in: table */
4507
{
4508 4509
	const lock_t*	lock;

4510
	lock_sys.assert_locked(*table);
4511

4512 4513 4514
	for (lock = UT_LIST_GET_FIRST(table->locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
4515

4516
		/* lock->trx->state cannot change from or to NOT_STARTED
4517
		while we are holding the lock_sys.latch. It may change
4518
		from ACTIVE or PREPARED to PREPARED or COMMITTED. */
4519
		lock->trx->mutex_lock();
4520
		check_trx_state(lock->trx);
4521

Marko Mäkelä's avatar
Marko Mäkelä committed
4522
		if (lock->trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4523
		} else if (!lock->is_waiting()) {
4524 4525
			ut_a(!lock_table_other_has_incompatible(
				     lock->trx, 0, table,
4526
				     lock->mode()));
4527 4528 4529 4530 4531
		} else {
			ut_a(lock_table_has_to_wait_in_queue(lock));
		}

		ut_a(lock_trx_table_locks_find(lock->trx, lock));
4532
		lock->trx->mutex_unlock();
4533 4534 4535
	}

	return(TRUE);
4536 4537 4538
}

/*********************************************************************//**
4539 4540 4541
Validates the lock queue on a single record.
@return TRUE if ok */
static
4542
bool
4543 4544
lock_rec_queue_validate(
/*====================*/
4545
	bool			locked_lock_trx_sys,
4546
					/*!< in: if the caller holds
4547
					both the lock_sys.latch and
4548
					trx_sys_t->lock. */
4549
	const page_id_t		id,	/*!< in: page identifier */
4550 4551
	const rec_t*		rec,	/*!< in: record to look at */
	const dict_index_t*	index,	/*!< in: index, or NULL if not known */
4552
	const rec_offs*		offsets)/*!< in: rec_get_offsets(rec, index) */
4553
{
4554 4555
	const lock_t*	lock;
	ulint		heap_no;
4556

4557 4558 4559
	ut_a(rec);
	ut_ad(rec_offs_validate(rec, index, offsets));
	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
4560
	ut_ad(page_rec_is_leaf(rec));
4561 4562
	ut_ad(!index || dict_index_is_clust(index)
	      || !dict_index_is_online_ddl(index));
4563

4564
	heap_no = page_rec_get_heap_no(rec);
4565

4566
	if (!locked_lock_trx_sys) {
4567
		lock_sys.wr_lock(SRW_LOCK_CALL);
4568
	}
4569

4570 4571
	hash_cell_t &cell= *lock_sys.rec_hash.cell_get(id.fold());
	lock_sys.assert_locked(cell);
4572

4573
	if (!page_rec_is_user_rec(rec)) {
4574

4575
		for (lock = lock_sys_t::get_first(cell, id, heap_no);
4576 4577
		     lock != NULL;
		     lock = lock_rec_get_next_const(heap_no, lock)) {
4578

4579
			ut_ad(!index || lock->index == index);
4580

4581
			lock->trx->mutex_lock();
4582 4583
			ut_ad(!lock->trx->read_only
			      || !lock->trx->is_autocommit_non_locking());
4584 4585
			ut_ad(trx_state_eq(lock->trx,
					   TRX_STATE_COMMITTED_IN_MEMORY)
4586
			      || !lock->is_waiting()
4587
			      || lock_rec_has_to_wait_in_queue(cell, lock));
4588
			lock->trx->mutex_unlock();
4589
		}
4590

Marko Mäkelä's avatar
Marko Mäkelä committed
4591 4592
func_exit:
		if (!locked_lock_trx_sys) {
4593
			lock_sys.wr_unlock();
4594
		}
4595

Marko Mäkelä's avatar
Marko Mäkelä committed
4596
		return true;
4597 4598
	}

4599 4600
	ut_ad(page_rec_is_leaf(rec));

Marko Mäkelä's avatar
Marko Mäkelä committed
4601 4602 4603
	const trx_id_t impl_trx_id = index && index->is_primary()
		? lock_clust_rec_some_has_impl(rec, index, offsets)
		: 0;
4604

Marko Mäkelä's avatar
Marko Mäkelä committed
4605 4606 4607
	if (trx_t *impl_trx = impl_trx_id
	    ? trx_sys.find(current_trx(), impl_trx_id, false)
	    : 0) {
4608 4609
		/* impl_trx could have been committed before we
		acquire its mutex, but not thereafter. */
4610

4611
		impl_trx->mutex_lock();
4612 4613
		ut_ad(impl_trx->state != TRX_STATE_NOT_STARTED);
		if (impl_trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4614 4615
		} else if (const lock_t* other_lock
			   = lock_rec_other_has_expl_req(
4616 4617
				   LOCK_S, cell, id, true, heap_no,
				   impl_trx)) {
4618 4619 4620 4621 4622
			/* The impl_trx is holding an implicit lock on the
			given record 'rec'. So there cannot be another
			explicit granted lock.  Also, there can be another
			explicit waiting lock only if the impl_trx has an
			explicit granted lock. */
4623

4624
#ifdef WITH_WSREP
4625 4626 4627 4628 4629 4630 4631 4632 4633 4634 4635 4636 4637 4638 4639 4640 4641 4642 4643
			/** Galera record locking rules:
			* If there is no other record lock to the same record, we may grant
			the lock request.
			* If there is other record lock but this requested record lock is
			compatible, we may grant the lock request.
			* If there is other record lock and it is not compatible with
			requested lock, all normal transactions must wait.
			* BF (brute force) additional exceptions :
			** If BF already holds record lock for requested record, we may
			grant new record lock even if there is conflicting record lock(s)
			waiting on a queue.
			** If conflicting transaction holds requested record lock,
			we will cancel this record lock and select conflicting transaction
			for BF abort or kill victim.
			** If conflicting transaction is waiting for requested record lock
			we will cancel this wait and select conflicting transaction
			for BF abort or kill victim.
			** There should not be two BF transactions waiting for same record lock
			*/
4644
			if (other_lock->trx->is_wsrep() && !other_lock->is_waiting()) {
4645 4646
				wsrep_report_bf_lock_wait(impl_trx->mysql_thd, impl_trx->id);
				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
4647

4648
				if (!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
4649
						       cell, id, heap_no,
4650
						       impl_trx)) {
4651
					ib::info() << "WSREP impl BF lock conflict";
4652
				}
4653
			} else
4654
#endif /* WITH_WSREP */
4655
			{
4656
				ut_ad(other_lock->is_waiting());
4657
				ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
Vlad Lesin's avatar
Vlad Lesin committed
4658
						        cell, id, heap_no,
4659
							impl_trx));
4660
			}
4661
		}
4662

4663
		impl_trx->mutex_unlock();
4664
	}
4665

4666
	for (lock = lock_sys_t::get_first(cell, id, heap_no);
4667 4668
	     lock != NULL;
	     lock = lock_rec_get_next_const(heap_no, lock)) {
4669 4670
		ut_ad(!lock->trx->read_only
		      || !lock->trx->is_autocommit_non_locking());
4671
		ut_ad(!page_rec_is_metadata(rec));
4672

4673 4674 4675
		if (index) {
			ut_a(lock->index == index);
		}
4676

4677 4678
		if (lock->is_waiting()) {
			ut_a(lock->is_gap()
4679
			     || lock_rec_has_to_wait_in_queue(cell, lock));
4680 4681 4682
		} else if (!lock->is_gap()) {
			const lock_mode	mode = lock->mode() == LOCK_S
				? LOCK_X : LOCK_S;
4683

4684 4685
			const lock_t*	other_lock
				= lock_rec_other_has_expl_req(
4686 4687
					mode, cell, id, false, heap_no,
					lock->trx);
4688
#ifdef WITH_WSREP
4689 4690 4691 4692 4693 4694 4695 4696 4697 4698 4699 4700
			if (UNIV_UNLIKELY(other_lock && lock->trx->is_wsrep())) {
				/* Only BF transaction may be granted
				lock before other conflicting lock
				request. */
				if (!wsrep_thd_is_BF(lock->trx->mysql_thd, FALSE)
				    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
					/* If no BF, this case is a bug. */
					wsrep_report_bf_lock_wait(lock->trx->mysql_thd, lock->trx->id);
					wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
					ut_error;
				}
			} else
4701
#endif /* WITH_WSREP */
4702
			ut_ad(!other_lock);
4703
		}
4704
	}
4705

Marko Mäkelä's avatar
Marko Mäkelä committed
4706
	goto func_exit;
4707
}
4708

4709 4710 4711 4712 4713
/** Validate the record lock queues on a page.
@param block    buffer pool block
@param latched  whether the tablespace latch may be held
@return true if ok */
static bool lock_rec_validate_page(const buf_block_t *block, bool latched)
4714
{
4715 4716 4717 4718 4719 4720
	const lock_t*	lock;
	const rec_t*	rec;
	ulint		nth_lock	= 0;
	ulint		nth_bit		= 0;
	ulint		i;
	mem_heap_t*	heap		= NULL;
4721 4722
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*	offsets		= offsets_;
4723
	rec_offs_init(offsets_);
4724

4725
	const page_id_t id{block->page.id()};
4726 4727

	LockGuard g{lock_sys.rec_hash, id};
4728
loop:
4729
	lock = lock_sys_t::get_first(g.cell(), id);
4730

4731 4732
	if (!lock) {
		goto function_exit;
4733 4734
	}

4735
	DBUG_ASSERT(!block->page.is_freed());
4736

4737
	for (i = 0; i < nth_lock; i++) {
4738

4739
		lock = lock_rec_get_next_on_page_const(lock);
4740

4741 4742
		if (!lock) {
			goto function_exit;
4743 4744 4745
		}
	}

4746 4747
	ut_ad(!lock->trx->read_only
	      || !lock->trx->is_autocommit_non_locking());
4748

4749
	/* Only validate the record queues when this thread is not
4750 4751
	holding a tablespace latch. */
	if (!latched)
4752
	for (i = nth_bit; i < lock_rec_get_n_bits(lock); i++) {
4753

4754 4755
		if (i == PAGE_HEAP_NO_SUPREMUM
		    || lock_rec_get_nth_bit(lock, i)) {
4756

4757
			rec = page_find_rec_with_heap_no(block->page.frame, i);
4758
			ut_a(rec);
4759 4760
			ut_ad(!lock_rec_get_nth_bit(lock, i)
			      || page_rec_is_leaf(rec));
4761
			offsets = rec_get_offsets(rec, lock->index, offsets,
4762 4763
						  lock->index->n_core_fields,
						  ULINT_UNDEFINED, &heap);
4764

4765 4766 4767 4768
			/* If this thread is holding the file space
			latch (fil_space_t::latch), the following
			check WILL break the latching order and may
			cause a deadlock of threads. */
4769

4770
			lock_rec_queue_validate(
4771
				true, id, rec, lock->index, offsets);
Sergei Golubchik's avatar
Sergei Golubchik committed
4772

4773
			nth_bit = i + 1;
4774

4775
			goto loop;
4776 4777 4778
		}
	}

4779 4780
	nth_bit = 0;
	nth_lock++;
4781

4782
	goto loop;
4783

4784 4785 4786 4787
function_exit:
	if (heap != NULL) {
		mem_heap_free(heap);
	}
4788 4789 4790
	return(TRUE);
}

4791 4792 4793
/*********************************************************************//**
Validate record locks up to a limit.
@return lock at limit or NULL if no more locks in the hash bucket */
4794
static MY_ATTRIBUTE((warn_unused_result))
4795 4796 4797
const lock_t*
lock_rec_validate(
/*==============*/
4798
	ulint		start,		/*!< in: lock_sys.rec_hash
4799
					bucket */
4800
	page_id_t*	limit)		/*!< in/out: upper limit of
4801 4802
					(space, page_no) */
{
4803
	lock_sys.assert_locked();
4804

4805
	for (const lock_t* lock = static_cast<const lock_t*>(
4806
		     HASH_GET_FIRST(&lock_sys.rec_hash, start));
4807 4808
	     lock != NULL;
	     lock = static_cast<const lock_t*>(HASH_GET_NEXT(hash, lock))) {
4809

4810 4811
		ut_ad(!lock->trx->read_only
		      || !lock->trx->is_autocommit_non_locking());
4812
		ut_ad(!lock->is_table());
4813

4814
		page_id_t current(lock->un_member.rec_lock.page_id);
4815

4816 4817 4818 4819
		if (current > *limit) {
			*limit = current + 1;
			return(lock);
		}
4820 4821
	}

4822 4823
	return(0);
}
4824

4825 4826
/*********************************************************************//**
Validate a record lock's block */
4827
static void lock_rec_block_validate(const page_id_t page_id)
4828 4829 4830 4831
{
	/* The lock and the block that it is referring to may be freed at
	this point. We pass BUF_GET_POSSIBLY_FREED to skip a debug check.
	If the lock exists in lock_rec_validate_page() we assert
4832
	block->page.status != FREED. */
4833

4834 4835
	buf_block_t*	block;
	mtr_t		mtr;
4836

4837 4838 4839 4840 4841
	/* Transactional locks should never refer to dropped
	tablespaces, because all DDL operations that would drop or
	discard or rebuild a tablespace do hold an exclusive table
	lock, which would conflict with any locks referring to the
	tablespace from other transactions. */
4842
	if (fil_space_t* space = fil_space_t::get(page_id.space())) {
4843 4844
		dberr_t err = DB_SUCCESS;
		mtr_start(&mtr);
4845

4846
		block = buf_page_get_gen(
4847
			page_id,
4848
			space->zip_size(),
4849 4850
			RW_X_LATCH, NULL,
			BUF_GET_POSSIBLY_FREED,
4851
			&mtr, &err);
4852

4853 4854
		if (err != DB_SUCCESS) {
			ib::error() << "Lock rec block validate failed for tablespace "
4855
				   << space->chain.start->name
4856
				   << page_id << " err " << err;
4857 4858
		}

4859
		ut_ad(!block || block->page.is_freed()
Marko Mäkelä's avatar
Marko Mäkelä committed
4860
		      || lock_rec_validate_page(block, space->is_latched()));
4861

4862
		mtr_commit(&mtr);
4863

4864
		space->release();
4865
	}
4866
}
4867

4868
static my_bool lock_validate_table_locks(rw_trx_hash_element_t *element, void*)
4869
{
4870
  lock_sys.assert_locked();
4871
  element->mutex.wr_lock();
4872 4873 4874 4875 4876 4877
  if (element->trx)
  {
    check_trx_state(element->trx);
    for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
         lock != NULL;
         lock= UT_LIST_GET_NEXT(trx_locks, lock))
4878
      if (lock->is_table())
4879 4880
        lock_table_queue_validate(lock->un_member.tab_lock.table);
  }
4881
  element->mutex.wr_unlock();
4882 4883 4884 4885
  return 0;
}


4886 4887
/** Validate the transactional locks. */
static void lock_validate()
4888
{
4889 4890
  std::set<page_id_t> pages;
  {
4891
    LockMutexGuard g{SRW_LOCK_CALL};
4892 4893
    /* Validate table locks */
    trx_sys.rw_trx_hash.iterate(lock_validate_table_locks);
4894

4895 4896 4897 4898 4899 4900 4901 4902 4903 4904 4905 4906
    for (ulint i= 0; i < lock_sys.rec_hash.n_cells; i++)
    {
      page_id_t limit{0, 0};
      while (const lock_t *lock= lock_rec_validate(i, &limit))
      {
        if (lock_rec_find_set_bit(lock) == ULINT_UNDEFINED)
          /* The lock bitmap is empty; ignore it. */
          continue;
        pages.insert(lock->un_member.rec_lock.page_id);
      }
    }
  }
4907

4908 4909
  for (page_id_t page_id : pages)
    lock_rec_block_validate(page_id);
4910 4911 4912 4913 4914 4915 4916 4917 4918 4919
}
#endif /* UNIV_DEBUG */
/*============ RECORD LOCK CHECKS FOR ROW OPERATIONS ====================*/

/*********************************************************************//**
Checks if locks of other transactions prevent an immediate insert of
a record. If they do, first tests if the query thread should anyway
be suspended for some reason; if not, then puts the transaction and
the query thread to the lock wait state and inserts a waiting request
for a gap x-lock to the lock queue.
4920
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
4921
TRANSACTIONAL_TARGET
4922 4923 4924 4925 4926 4927 4928 4929
dberr_t
lock_rec_insert_check_and_lock(
/*===========================*/
	const rec_t*	rec,	/*!< in: record after which to insert */
	buf_block_t*	block,	/*!< in/out: buffer block of rec */
	dict_index_t*	index,	/*!< in: index */
	que_thr_t*	thr,	/*!< in: query thread */
	mtr_t*		mtr,	/*!< in/out: mini-transaction */
Eugene Kosov's avatar
Eugene Kosov committed
4930
	bool*		inherit)/*!< out: set to true if the new
4931 4932 4933 4934
				inserted record maybe should inherit
				LOCK_GAP type locks from the successor
				record */
{
4935
  ut_ad(block->page.frame == page_align(rec));
4936
  ut_ad(mtr->is_named_space(index->table->space));
4937
  ut_ad(page_is_leaf(block->page.frame));
4938
  ut_ad(!index->table->is_temporary());
4939

4940 4941 4942 4943 4944 4945 4946
  dberr_t err= DB_SUCCESS;
  bool inherit_in= *inherit;
  trx_t *trx= thr_get_trx(thr);
  const rec_t *next_rec= page_rec_get_next_const(rec);
  ulint heap_no= page_rec_get_heap_no(next_rec);
  const page_id_t id{block->page.id()};
  ut_ad(!rec_is_metadata(next_rec, *index));
4947

4948
  {
4949
    LockGuard g{lock_sys.rec_hash, id};
4950 4951 4952
    /* Because this code is invoked for a running transaction by
    the thread that is serving the transaction, it is not necessary
    to hold trx->mutex here. */
4953

4954 4955 4956 4957
    /* When inserting a record into an index, the table must be at
    least IX-locked. When we are building an index, we would pass
    BTR_NO_LOCKING_FLAG and skip the locking altogether. */
    ut_ad(lock_table_has(trx, index->table, LOCK_IX));
4958

4959
    *inherit= lock_sys_t::get_first(g.cell(), id, heap_no);
4960

4961 4962 4963 4964 4965 4966 4967 4968 4969 4970 4971 4972 4973 4974 4975 4976 4977 4978
    if (*inherit)
    {
      /* Spatial index does not use GAP lock protection. It uses
      "predicate lock" to protect the "range" */
      if (index->is_spatial())
        return DB_SUCCESS;

      /* If another transaction has an explicit lock request which locks
      the gap, waiting or granted, on the successor, the insert has to wait.

      An exception is the case where the lock by the another transaction
      is a gap type lock which it placed to wait for its turn to insert. We
      do not consider that kind of a lock conflicting with our insert. This
      eliminates an unnecessary deadlock which resulted when 2 transactions
      had to wait for their insert. Both had waiting gap type lock requests
      on the successor, which produced an unnecessary deadlock. */
      const unsigned type_mode= LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION;

4979 4980 4981
      if (lock_t *c_lock= lock_rec_other_has_conflicting(type_mode,
                                                         g.cell(), id,
                                                         heap_no, trx))
4982 4983
      {
        trx->mutex_lock();
4984
        err= lock_rec_enqueue_waiting(c_lock, type_mode, id, block->page.frame,
4985
                                      heap_no, index, thr, nullptr);
4986 4987 4988 4989
        trx->mutex_unlock();
      }
    }
  }
4990

4991 4992 4993 4994 4995 4996 4997 4998 4999 5000 5001 5002 5003
  switch (err) {
  case DB_SUCCESS_LOCKED_REC:
    err = DB_SUCCESS;
    /* fall through */
  case DB_SUCCESS:
    if (!inherit_in || index->is_clust())
      break;
    /* Update the page max trx id field */
    page_update_max_trx_id(block, buf_block_get_page_zip(block), trx->id, mtr);
  default:
    /* We only care about the two return values. */
    break;
  }
5004

5005
#ifdef UNIV_DEBUG
5006 5007 5008 5009 5010
  {
    mem_heap_t *heap= nullptr;
    rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
    const rec_offs *offsets;
    rec_offs_init(offsets_);
5011

Marko Mäkelä's avatar
Marko Mäkelä committed
5012
    offsets= rec_get_offsets(next_rec, index, offsets_, index->n_core_fields,
5013
                             ULINT_UNDEFINED, &heap);
5014

5015
    ut_ad(lock_rec_queue_validate(false, id, next_rec, index, offsets));
5016

5017 5018 5019
    if (UNIV_LIKELY_NULL(heap))
      mem_heap_free(heap);
  }
5020 5021
#endif /* UNIV_DEBUG */

5022
  return err;
5023 5024 5025
}

/*********************************************************************//**
5026 5027 5028 5029
Creates an explicit record lock for a running transaction that currently only
has an implicit lock on the record. The transaction instance must have a
reference count > 0 so that it can't be committed and freed before this
function has completed. */
5030
static
5031 5032 5033
void
lock_rec_convert_impl_to_expl_for_trx(
/*==================================*/
5034
	const page_id_t		id,	/*!< in: page identifier */
5035 5036 5037 5038
	const rec_t*		rec,	/*!< in: user record on page */
	dict_index_t*		index,	/*!< in: index of record */
	trx_t*			trx,	/*!< in/out: active transaction */
	ulint			heap_no)/*!< in: rec heap number to lock */
5039
{
5040 5041 5042
  ut_ad(trx->is_referenced());
  ut_ad(page_rec_is_leaf(rec));
  ut_ad(!rec_is_metadata(rec, *index));
5043

5044 5045
  DEBUG_SYNC_C("before_lock_rec_convert_impl_to_expl_for_trx");
  {
5046
    LockGuard g{lock_sys.rec_hash, id};
5047 5048
    trx->mutex_lock();
    ut_ad(!trx_state_eq(trx, TRX_STATE_NOT_STARTED));
5049

5050
    if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) &&
5051 5052 5053
        !lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id, heap_no,
                           trx))
      lock_rec_add_to_queue(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id,
Vlad Lesin's avatar
Vlad Lesin committed
5054
                            page_align(rec), heap_no, index, trx, true);
5055
  }
5056

5057 5058
  trx->mutex_unlock();
  trx->release_reference();
5059

5060
  DEBUG_SYNC_C("after_lock_rec_convert_impl_to_expl_for_trx");
5061 5062
}

5063 5064 5065 5066 5067

#ifdef UNIV_DEBUG
struct lock_rec_other_trx_holds_expl_arg
{
  const ulint heap_no;
5068
  const hash_cell_t &cell;
5069 5070
  const page_id_t id;
  const trx_t &impl_trx;
5071 5072 5073 5074 5075 5076 5077
};


static my_bool lock_rec_other_trx_holds_expl_callback(
  rw_trx_hash_element_t *element,
  lock_rec_other_trx_holds_expl_arg *arg)
{
5078
  element->mutex.wr_lock();
5079 5080
  if (element->trx)
  {
5081
    element->trx->mutex_lock();
Marko Mäkelä's avatar
Marko Mäkelä committed
5082 5083
    ut_ad(element->trx->state != TRX_STATE_NOT_STARTED);
    lock_t *expl_lock= element->trx->state == TRX_STATE_COMMITTED_IN_MEMORY
5084 5085
      ? nullptr
      : lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP,
5086
                          arg->cell, arg->id, arg->heap_no, element->trx);
5087 5088 5089 5090
    /*
      An explicit lock is held by trx other than the trx holding the implicit
      lock.
    */
5091
    ut_ad(!expl_lock || expl_lock->trx == &arg->impl_trx);
5092
    element->trx->mutex_unlock();
5093
  }
5094
  element->mutex.wr_unlock();
5095 5096 5097 5098 5099 5100 5101 5102 5103 5104 5105 5106 5107 5108 5109
  return 0;
}


/**
  Checks if some transaction, other than given trx_id, has an explicit
  lock on the given rec.

  FIXME: if the current transaction holds implicit lock from INSERT, a
  subsequent locking read should not convert it to explicit. See also
  MDEV-11215.

  @param      caller_trx  trx of current thread
  @param[in]  trx         trx holding implicit lock on rec
  @param[in]  rec         user record
5110
  @param[in]  id          page identifier
5111 5112 5113
*/
static void lock_rec_other_trx_holds_expl(trx_t *caller_trx, trx_t *trx,
                                          const rec_t *rec,
5114
                                          const page_id_t id)
5115 5116 5117
{
  if (trx)
  {
5118
    ut_ad(!page_rec_is_metadata(rec));
5119
    LockGuard g{lock_sys.rec_hash, id};
5120
    ut_ad(trx->is_referenced());
5121
    const trx_state_t state{trx->state};
5122 5123
    ut_ad(state != TRX_STATE_NOT_STARTED);
    if (state == TRX_STATE_COMMITTED_IN_MEMORY)
5124
      /* The transaction was committed before we acquired LockGuard. */
5125
      return;
5126
    lock_rec_other_trx_holds_expl_arg arg=
5127
    { page_rec_get_heap_no(rec), g.cell(), id, *trx };
5128
    trx_sys.rw_trx_hash.iterate(caller_trx,
5129
                                lock_rec_other_trx_holds_expl_callback, &arg);
5130 5131 5132 5133 5134
  }
}
#endif /* UNIV_DEBUG */


5135 5136 5137 5138 5139 5140 5141 5142 5143 5144 5145 5146
/** If an implicit x-lock exists on a record, convert it to an explicit one.

Often, this is called by a transaction that is about to enter a lock wait
due to the lock conflict. Two explicit locks would be created: first the
exclusive lock on behalf of the lock-holder transaction in this function,
and then a wait request on behalf of caller_trx, in the calling function.

This may also be called by the same transaction that is already holding
an implicit exclusive lock on the record. In this case, no explicit lock
should be created.

@param[in,out]	caller_trx	current transaction
5147
@param[in]	id		index tree leaf page identifier
5148 5149 5150 5151
@param[in]	rec		record on the leaf page
@param[in]	index		the index of the record
@param[in]	offsets		rec_get_offsets(rec,index)
@return	whether caller_trx already holds an exclusive lock on rec */
5152
static
5153
bool
5154
lock_rec_convert_impl_to_expl(
5155
	trx_t*			caller_trx,
5156
	page_id_t		id,
5157 5158
	const rec_t*		rec,
	dict_index_t*		index,
5159
	const rec_offs*		offsets)
5160
{
5161
	trx_t*		trx;
5162

5163
	lock_sys.assert_unlocked();
5164
	ut_ad(page_rec_is_user_rec(rec));
5165 5166
	ut_ad(rec_offs_validate(rec, index, offsets));
	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
5167
	ut_ad(page_rec_is_leaf(rec));
5168
	ut_ad(!rec_is_metadata(rec, *index));
5169

5170 5171
	if (dict_index_is_clust(index)) {
		trx_id_t	trx_id;
5172

5173 5174
		trx_id = lock_clust_rec_some_has_impl(rec, index, offsets);

5175 5176 5177 5178 5179 5180 5181
		if (trx_id == 0) {
			return false;
		}
		if (UNIV_UNLIKELY(trx_id == caller_trx->id)) {
			return true;
		}

5182
		trx = trx_sys.find(caller_trx, trx_id);
5183 5184 5185
	} else {
		ut_ad(!dict_index_is_online_ddl(index));

5186 5187
		trx = lock_sec_rec_some_has_impl(caller_trx, rec, index,
						 offsets);
5188 5189 5190 5191
		if (trx == caller_trx) {
			trx->release_reference();
			return true;
		}
5192

5193
		ut_d(lock_rec_other_trx_holds_expl(caller_trx, trx, rec, id));
5194 5195
	}

5196
	if (trx) {
5197
		ulint	heap_no = page_rec_get_heap_no(rec);
5198

5199
		ut_ad(trx->is_referenced());
5200

5201 5202 5203
		/* If the transaction is still active and has no
		explicit x-lock set on the record, set one for it.
		trx cannot be committed until the ref count is zero. */
5204

5205
		lock_rec_convert_impl_to_expl_for_trx(
5206
			id, rec, index, trx, heap_no);
5207
	}
5208 5209

	return false;
5210
}
5211

5212 5213 5214 5215 5216 5217 5218
/*********************************************************************//**
Checks if locks of other transactions prevent an immediate modify (update,
delete mark, or delete unmark) of a clustered index record. If they do,
first tests if the query thread should anyway be suspended for some
reason; if not, then puts the transaction and the query thread to the
lock wait state and inserts a waiting request for a record x-lock to the
lock queue.
5219
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5220 5221 5222 5223 5224 5225 5226
dberr_t
lock_clust_rec_modify_check_and_lock(
/*=================================*/
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: record which should be
					modified */
	dict_index_t*		index,	/*!< in: clustered index */
5227
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5228 5229 5230 5231
	que_thr_t*		thr)	/*!< in: query thread */
{
	dberr_t	err;
	ulint	heap_no;
5232

5233
	ut_ad(rec_offs_validate(rec, index, offsets));
5234
	ut_ad(page_rec_is_leaf(rec));
5235
	ut_ad(dict_index_is_clust(index));
5236
	ut_ad(block->page.frame == page_align(rec));
5237

5238
	ut_ad(!rec_is_metadata(rec, *index));
5239
	ut_ad(!index->table->is_temporary());
5240

5241 5242 5243
	heap_no = rec_offs_comp(offsets)
		? rec_get_heap_no_new(rec)
		: rec_get_heap_no_old(rec);
5244

5245 5246
	/* If a transaction has no explicit x-lock set on the record, set one
	for it */
5247

5248 5249
	if (lock_rec_convert_impl_to_expl(thr_get_trx(thr), block->page.id(),
					  rec, index, offsets)) {
5250 5251 5252
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
	}
5253

5254
	err = lock_rec_lock(true, LOCK_X | LOCK_REC_NOT_GAP,
5255
			    block, heap_no, index, thr);
5256

5257 5258
	ut_ad(lock_rec_queue_validate(false, block->page.id(),
				      rec, index, offsets));
5259 5260 5261

	if (err == DB_SUCCESS_LOCKED_REC) {
		err = DB_SUCCESS;
5262 5263
	}

5264
	return(err);
5265 5266 5267
}

/*********************************************************************//**
5268 5269
Checks if locks of other transactions prevent an immediate modify (delete
mark or delete unmark) of a secondary index record.
5270
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5271 5272 5273 5274 5275 5276 5277 5278 5279 5280 5281 5282 5283 5284 5285
dberr_t
lock_sec_rec_modify_check_and_lock(
/*===============================*/
	ulint		flags,	/*!< in: if BTR_NO_LOCKING_FLAG
				bit is set, does nothing */
	buf_block_t*	block,	/*!< in/out: buffer block of rec */
	const rec_t*	rec,	/*!< in: record which should be
				modified; NOTE: as this is a secondary
				index, we always have to modify the
				clustered index record first: see the
				comment below */
	dict_index_t*	index,	/*!< in: secondary index */
	que_thr_t*	thr,	/*!< in: query thread
				(can be NULL if BTR_NO_LOCKING_FLAG) */
	mtr_t*		mtr)	/*!< in/out: mini-transaction */
5286
{
5287 5288
	dberr_t	err;
	ulint	heap_no;
5289

5290 5291
	ut_ad(!dict_index_is_clust(index));
	ut_ad(!dict_index_is_online_ddl(index) || (flags & BTR_CREATE_FLAG));
5292
	ut_ad(block->page.frame == page_align(rec));
5293
	ut_ad(mtr->is_named_space(index->table->space));
5294
	ut_ad(page_rec_is_leaf(rec));
5295
	ut_ad(!rec_is_metadata(rec, *index));
5296

5297
	if (flags & BTR_NO_LOCKING_FLAG) {
5298

5299
		return(DB_SUCCESS);
5300
	}
5301
	ut_ad(!index->table->is_temporary());
5302

5303
	heap_no = page_rec_get_heap_no(rec);
5304

5305 5306 5307 5308 5309 5310 5311 5312 5313 5314
#ifdef WITH_WSREP
	trx_t *trx= thr_get_trx(thr);
	/* If transaction scanning an unique secondary key is wsrep
	high priority thread (brute force) this scanning may involve
	GAP-locking in the index. As this locking happens also when
	applying replication events in high priority applier threads,
	there is a probability for lock conflicts between two wsrep
	high priority threads. To avoid this GAP-locking we mark that
	this transaction is using unique key scan here. */
	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
Marko Mäkelä's avatar
Marko Mäkelä committed
5315
		trx->wsrep = 3;
5316 5317
#endif /* WITH_WSREP */

5318 5319 5320 5321
	/* Another transaction cannot have an implicit lock on the record,
	because when we come here, we already have modified the clustered
	index record, and this would not have been possible if another active
	transaction had modified this secondary index record. */
5322

5323
	err = lock_rec_lock(true, LOCK_X | LOCK_REC_NOT_GAP,
5324
			    block, heap_no, index, thr);
5325

5326
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
5327
	if (trx->wsrep == 3) trx->wsrep = 1;
5328
#endif /* WITH_WSREP */
5329

5330 5331 5332
#ifdef UNIV_DEBUG
	{
		mem_heap_t*	heap		= NULL;
5333 5334
		rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
		const rec_offs*	offsets;
5335
		rec_offs_init(offsets_);
5336

5337 5338
		offsets = rec_get_offsets(rec, index, offsets_,
					  index->n_core_fields,
5339
					  ULINT_UNDEFINED, &heap);
5340

5341
		ut_ad(lock_rec_queue_validate(
5342
			      false, block->page.id(), rec, index, offsets));
5343

5344 5345
		if (heap != NULL) {
			mem_heap_free(heap);
5346 5347
		}
	}
5348
#endif /* UNIV_DEBUG */
5349

5350 5351 5352 5353 5354 5355 5356 5357 5358
	if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
		/* Update the page max trx id field */
		/* It might not be necessary to do this if
		err == DB_SUCCESS (no new lock created),
		but it should not cost too much performance. */
		page_update_max_trx_id(block,
				       buf_block_get_page_zip(block),
				       thr_get_trx(thr)->id, mtr);
		err = DB_SUCCESS;
5359
	}
5360 5361

	return(err);
5362 5363 5364
}

/*********************************************************************//**
5365 5366
Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record.
5367
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5368 5369 5370 5371 5372 5373 5374 5375 5376 5377 5378
dberr_t
lock_sec_rec_read_check_and_lock(
/*=============================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: secondary index */
5379
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5380 5381 5382 5383 5384
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5385
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5386 5387
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5388
{
5389 5390
	dberr_t	err;
	ulint	heap_no;
5391

5392 5393
	ut_ad(!dict_index_is_clust(index));
	ut_ad(!dict_index_is_online_ddl(index));
5394
	ut_ad(block->page.frame == page_align(rec));
5395 5396
	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
	ut_ad(rec_offs_validate(rec, index, offsets));
5397
	ut_ad(page_rec_is_leaf(rec));
5398
	ut_ad(mode == LOCK_X || mode == LOCK_S);
5399

5400 5401
	if ((flags & BTR_NO_LOCKING_FLAG)
	    || srv_read_only_mode
5402
	    || index->table->is_temporary()) {
5403

5404 5405
		return(DB_SUCCESS);
	}
5406

5407 5408
	const page_id_t id{block->page.id()};

5409
	ut_ad(!rec_is_metadata(rec, *index));
5410
	heap_no = page_rec_get_heap_no(rec);
5411

5412 5413 5414
	/* Some transaction may have an implicit x-lock on the record only
	if the max trx id for the page >= min trx id for the trx list or a
	database recovery is running. */
5415

5416 5417 5418
	trx_t *trx = thr_get_trx(thr);
	if (!lock_table_has(trx, index->table, LOCK_X)
	    && !page_rec_is_supremum(rec)
5419 5420
	    && page_get_max_trx_id(block->page.frame)
	    >= trx_sys.get_min_trx_id()
5421
	    && lock_rec_convert_impl_to_expl(thr_get_trx(thr), id, rec,
5422 5423
					     index, offsets)
	    && gap_mode == LOCK_REC_NOT_GAP) {
5424 5425
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
5426
	}
5427

5428 5429 5430 5431 5432 5433 5434 5435 5436
#ifdef WITH_WSREP
	/* If transaction scanning an unique secondary key is wsrep
	high priority thread (brute force) this scanning may involve
	GAP-locking in the index. As this locking happens also when
	applying replication events in high priority applier threads,
	there is a probability for lock conflicts between two wsrep
	high priority threads. To avoid this GAP-locking we mark that
	this transaction is using unique key scan here. */
	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
Marko Mäkelä's avatar
Marko Mäkelä committed
5437
		trx->wsrep = 3;
5438
#endif /* WITH_WSREP */
5439

5440
	err = lock_rec_lock(false, gap_mode | mode,
5441
			    block, heap_no, index, thr);
5442

5443
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
5444
	if (trx->wsrep == 3) trx->wsrep = 1;
5445
#endif /* WITH_WSREP */
5446

5447
	ut_ad(lock_rec_queue_validate(false, id, rec, index, offsets));
5448

5449
	return(err);
5450 5451 5452
}

/*********************************************************************//**
5453 5454 5455 5456 5457 5458
Checks if locks of other transactions prevent an immediate read, or passing
over by a read cursor, of a clustered index record. If they do, first tests
if the query thread should anyway be suspended for some reason; if not, then
puts the transaction and the query thread to the lock wait state and inserts a
waiting request for a record lock to the lock queue. Sets the requested mode
lock on the record.
5459
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5460 5461 5462 5463 5464 5465 5466 5467 5468 5469 5470
dberr_t
lock_clust_rec_read_check_and_lock(
/*===============================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: clustered index */
5471
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5472 5473 5474 5475 5476
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5477
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5478 5479
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5480
{
5481
	ut_ad(dict_index_is_clust(index));
5482
	ut_ad(block->page.frame == page_align(rec));
5483 5484 5485 5486
	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
	ut_ad(gap_mode == LOCK_ORDINARY || gap_mode == LOCK_GAP
	      || gap_mode == LOCK_REC_NOT_GAP);
	ut_ad(rec_offs_validate(rec, index, offsets));
5487
	ut_ad(page_rec_is_leaf(rec));
5488
	ut_ad(!rec_is_metadata(rec, *index));
5489

5490 5491
	if ((flags & BTR_NO_LOCKING_FLAG)
	    || srv_read_only_mode
5492
	    || index->table->is_temporary()) {
5493

5494
		return(DB_SUCCESS);
5495 5496
	}

5497 5498
	const page_id_t id{block->page.id()};

Marko Mäkelä's avatar
Marko Mäkelä committed
5499
	ulint heap_no = page_rec_get_heap_no(rec);
5500

5501 5502 5503
	trx_t *trx = thr_get_trx(thr);
	if (!lock_table_has(trx, index->table, LOCK_X)
	    && heap_no != PAGE_HEAP_NO_SUPREMUM
Marko Mäkelä's avatar
Marko Mäkelä committed
5504
	    && lock_rec_convert_impl_to_expl(trx, id, rec, index, offsets)
5505
	    && gap_mode == LOCK_REC_NOT_GAP) {
5506 5507
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
5508
	}
5509

Marko Mäkelä's avatar
Marko Mäkelä committed
5510 5511
	dberr_t err = lock_rec_lock(false, gap_mode | mode,
				    block, heap_no, index, thr);
5512

5513
	ut_ad(lock_rec_queue_validate(false, id, rec, index, offsets));
5514

5515
	DEBUG_SYNC_C("after_lock_clust_rec_read_check_and_lock");
5516

5517 5518
	return(err);
}
5519
/*********************************************************************//**
5520 5521 5522 5523 5524 5525 5526 5527
Checks if locks of other transactions prevent an immediate read, or passing
over by a read cursor, of a clustered index record. If they do, first tests
if the query thread should anyway be suspended for some reason; if not, then
puts the transaction and the query thread to the lock wait state and inserts a
waiting request for a record lock to the lock queue. Sets the requested mode
lock on the record. This is an alternative version of
lock_clust_rec_read_check_and_lock() that does not require the parameter
"offsets".
5528
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5529
dberr_t
5530 5531 5532 5533 5534 5535 5536 5537 5538 5539 5540 5541 5542 5543 5544
lock_clust_rec_read_check_and_lock_alt(
/*===================================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: clustered index */
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5545
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5546 5547
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5548
{
5549
	mem_heap_t*	tmp_heap	= NULL;
5550 5551
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*	offsets		= offsets_;
5552
	dberr_t		err;
5553
	rec_offs_init(offsets_);
5554

5555
	ut_ad(page_rec_is_leaf(rec));
5556
	offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
5557 5558 5559 5560 5561 5562
				  ULINT_UNDEFINED, &tmp_heap);
	err = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
						 offsets, mode, gap_mode, thr);
	if (tmp_heap) {
		mem_heap_free(tmp_heap);
	}
5563

5564 5565
	if (err == DB_SUCCESS_LOCKED_REC) {
		err = DB_SUCCESS;
5566 5567
	}

5568 5569
	return(err);
}
5570

5571 5572 5573 5574 5575 5576 5577 5578 5579 5580
/*******************************************************************//**
Check if a transaction holds any autoinc locks.
@return TRUE if the transaction holds any AUTOINC locks. */
static
ibool
lock_trx_holds_autoinc_locks(
/*=========================*/
	const trx_t*	trx)		/*!< in: transaction */
{
	ut_a(trx->autoinc_locks != NULL);
5581

5582 5583
	return(!ib_vector_is_empty(trx->autoinc_locks));
}
5584

5585
/** Release all AUTO_INCREMENT locks of the transaction. */
5586
static void lock_release_autoinc_locks(trx_t *trx)
5587
{
5588
  {
5589 5590 5591 5592 5593 5594 5595 5596 5597 5598 5599 5600 5601 5602 5603 5604 5605
    LockMutexGuard g{SRW_LOCK_CALL};
    mysql_mutex_lock(&lock_sys.wait_mutex);
    trx->mutex_lock();
    auto autoinc_locks= trx->autoinc_locks;
    ut_a(autoinc_locks);

    /* We release the locks in the reverse order. This is to avoid
    searching the vector for the element to delete at the lower level.
    See (lock_table_remove_low()) for details. */
    while (ulint size= ib_vector_size(autoinc_locks))
    {
      lock_t *lock= *static_cast<lock_t**>
        (ib_vector_get(autoinc_locks, size - 1));
      ut_ad(lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE));
      lock_table_dequeue(lock, true);
      lock_trx_table_locks_remove(lock);
    }
5606
  }
5607 5608
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  trx->mutex_unlock();
5609
}
5610

5611
/** Cancel a waiting lock request and release possibly waiting transactions */
5612
static void lock_cancel_waiting_and_release(lock_t *lock)
5613
{
5614
  lock_sys.assert_locked(*lock);
5615 5616
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  trx_t *trx= lock->trx;
5617
  trx->mutex_lock();
5618
  ut_ad(trx->state == TRX_STATE_ACTIVE);
5619

5620
  if (!lock->is_table())
5621
    lock_rec_dequeue_from_page(lock, true);
5622 5623
  else
  {
5624 5625 5626 5627 5628
    if (lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE))
    {
      ut_ad(trx->autoinc_locks);
      ib_vector_remove(trx->autoinc_locks, lock);
    }
5629
    lock_table_dequeue(lock, true);
5630 5631 5632
    /* Remove the lock from table lock vector too. */
    lock_trx_table_locks_remove(lock);
  }
5633

5634 5635
  /* Reset the wait flag and the back pointer to lock in trx. */
  lock_reset_lock_and_trx_wait(lock);
5636

5637
  lock_wait_end(trx);
5638 5639
  trx->mutex_unlock();
}
5640

5641 5642 5643 5644 5645 5646 5647 5648 5649 5650 5651 5652 5653
void lock_sys_t::cancel_lock_wait_for_trx(trx_t *trx)
{
  lock_sys.wr_lock(SRW_LOCK_CALL);
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (lock_t *lock= trx->lock.wait_lock)
  {
    /* check if victim is still waiting */
    if (lock->is_waiting())
      lock_cancel_waiting_and_release(lock);
  }
  lock_sys.wr_unlock();
  mysql_mutex_unlock(&lock_sys.wait_mutex);
}
5654 5655

/** Cancel a waiting lock request.
5656 5657 5658
@tparam check_victim  whether to check for DB_DEADLOCK
@param lock           waiting lock request
@param trx            active transaction
5659 5660 5661
@retval DB_SUCCESS    if no lock existed
@retval DB_DEADLOCK   if trx->lock.was_chosen_as_deadlock_victim was set
@retval DB_LOCK_WAIT  if the lock was canceled */
5662 5663
template<bool check_victim>
dberr_t lock_sys_t::cancel(trx_t *trx, lock_t *lock)
5664 5665 5666 5667
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  ut_ad(trx->lock.wait_lock == lock);
  ut_ad(trx->state == TRX_STATE_ACTIVE);
5668
  dberr_t err= DB_SUCCESS;
5669 5670
  /* This would be too large for a memory transaction, except in the
  DB_DEADLOCK case, which was already tested in lock_trx_handle_wait(). */
5671 5672
  if (lock->is_table())
  {
5673 5674 5675 5676 5677 5678 5679 5680 5681 5682 5683 5684 5685 5686 5687 5688
    if (!lock_sys.rd_lock_try())
    {
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.rd_lock(SRW_LOCK_CALL);
      mysql_mutex_lock(&lock_sys.wait_mutex);
      lock= trx->lock.wait_lock;
      if (!lock);
      else if (check_victim && trx->lock.was_chosen_as_deadlock_victim)
        err= DB_DEADLOCK;
      else
        goto resolve_table_lock;
    }
    else
    {
resolve_table_lock:
      dict_table_t *table= lock->un_member.tab_lock.table;
5689 5690 5691 5692 5693 5694 5695 5696 5697 5698 5699 5700 5701 5702 5703 5704 5705
      if (!table->lock_mutex_trylock())
      {
        /* The correct latching order is:
        lock_sys.latch, table->lock_mutex_lock(), lock_sys.wait_mutex.
        Thus, we must release lock_sys.wait_mutex for a blocking wait. */
        mysql_mutex_unlock(&lock_sys.wait_mutex);
        table->lock_mutex_lock();
        mysql_mutex_lock(&lock_sys.wait_mutex);
        lock= trx->lock.wait_lock;
        if (!lock)
          goto retreat;
        else if (check_victim && trx->lock.was_chosen_as_deadlock_victim)
        {
          err= DB_DEADLOCK;
          goto retreat;
        }
      }
5706 5707 5708 5709 5710 5711 5712 5713 5714
      if (lock->is_waiting())
        lock_cancel_waiting_and_release(lock);
      /* Even if lock->is_waiting() did not hold above, we must return
      DB_LOCK_WAIT, or otherwise optimistic parallel replication could
      occasionally hang. Potentially affected tests:
      rpl.rpl_parallel_optimistic
      rpl.rpl_parallel_optimistic_nobinlog
      rpl.rpl_parallel_optimistic_xa_lsu_off */
      err= DB_LOCK_WAIT;
5715 5716
retreat:
      table->lock_mutex_unlock();
5717 5718
    }
    lock_sys.rd_unlock();
5719 5720 5721
  }
  else
  {
5722 5723 5724 5725 5726 5727 5728 5729 5730 5731 5732 5733 5734 5735 5736 5737 5738 5739 5740 5741 5742 5743 5744 5745 5746 5747 5748 5749
    /* To prevent the record lock from being moved between pages
    during a page split or merge, we must hold exclusive lock_sys.latch. */
    if (!lock_sys.wr_lock_try())
    {
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.wr_lock(SRW_LOCK_CALL);
      mysql_mutex_lock(&lock_sys.wait_mutex);
      lock= trx->lock.wait_lock;
      if (!lock);
      else if (check_victim && trx->lock.was_chosen_as_deadlock_victim)
        err= DB_DEADLOCK;
      else
        goto resolve_record_lock;
    }
    else
    {
resolve_record_lock:
      if (lock->is_waiting())
        lock_cancel_waiting_and_release(lock);
      /* Even if lock->is_waiting() did not hold above, we must return
      DB_LOCK_WAIT, or otherwise optimistic parallel replication could
      occasionally hang. Potentially affected tests:
      rpl.rpl_parallel_optimistic
      rpl.rpl_parallel_optimistic_nobinlog
      rpl.rpl_parallel_optimistic_xa_lsu_off */
      err= DB_LOCK_WAIT;
    }
    lock_sys.wr_unlock();
5750
  }
5751 5752

  return err;
5753 5754 5755 5756 5757 5758 5759 5760
}

/** Cancel a waiting lock request (if any) when killing a transaction */
void lock_sys_t::cancel(trx_t *trx)
{
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (lock_t *lock= trx->lock.wait_lock)
  {
5761 5762 5763 5764 5765 5766
    /* Dictionary transactions must be immune to KILL, because they
    may be executed as part of a multi-transaction DDL operation, such
    as rollback_inplace_alter_table() or ha_innobase::delete_table(). */
    if (!trx->dict_operation)
    {
      trx->error_state= DB_INTERRUPTED;
5767
      cancel<false>(trx, lock);
5768
    }
5769 5770 5771
  }
  lock_sys.deadlock_check();
  mysql_mutex_unlock(&lock_sys.wait_mutex);
5772 5773 5774 5775 5776 5777 5778 5779 5780 5781 5782
}

/*********************************************************************//**
Unlocks AUTO_INC type locks that were possibly reserved by a trx. This
function should be called at the the end of an SQL statement, by the
connection thread that owns the transaction (trx->mysql_thd). */
void
lock_unlock_table_autoinc(
/*======================*/
	trx_t*	trx)	/*!< in/out: transaction */
{
5783
	lock_sys.assert_unlocked();
5784
	ut_ad(!trx->mutex_is_owner());
5785 5786 5787 5788
	ut_ad(!trx->lock.wait_lock);

	/* This can be invoked on NOT_STARTED, ACTIVE, PREPARED,
	but not COMMITTED transactions. */
5789

5790 5791
	ut_ad(trx_state_eq(trx, TRX_STATE_NOT_STARTED)
	      || !trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY));
5792

5793 5794 5795
	/* This function is invoked for a running transaction by the
	thread that is serving the transaction. Therefore it is not
	necessary to hold trx->mutex here. */
5796

5797
	if (lock_trx_holds_autoinc_locks(trx)) {
5798
		lock_release_autoinc_locks(trx);
5799 5800 5801
	}
}

5802 5803 5804 5805 5806 5807 5808 5809
/** Handle a pending lock wait (DB_LOCK_WAIT) in a semi-consistent read
while holding a clustered index leaf page latch.
@param trx           transaction that is or was waiting for a lock
@retval DB_SUCCESS   if the lock was granted
@retval DB_DEADLOCK  if the transaction must be aborted due to a deadlock
@retval DB_LOCK_WAIT if a lock wait would be necessary; the pending
                     lock request was released */
dberr_t lock_trx_handle_wait(trx_t *trx)
5810
{
5811 5812 5813 5814
  if (trx->lock.was_chosen_as_deadlock_victim)
    return DB_DEADLOCK;
  if (!trx->lock.wait_lock)
    return DB_SUCCESS;
5815 5816 5817 5818 5819
  dberr_t err= DB_SUCCESS;
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (trx->lock.was_chosen_as_deadlock_victim)
    err= DB_DEADLOCK;
  else if (lock_t *wait_lock= trx->lock.wait_lock)
5820
    err= lock_sys_t::cancel<true>(trx, wait_lock);
5821
  lock_sys.deadlock_check();
5822
  mysql_mutex_unlock(&lock_sys.wait_mutex);
5823
  return err;
5824 5825
}

5826
#ifdef UNIV_DEBUG
5827 5828 5829 5830 5831 5832 5833 5834 5835 5836
/**
  Do an exhaustive check for any locks (table or rec) against the table.

  @param[in]  table  check if there are any locks held on records in this table
                     or on the table itself
*/

static my_bool lock_table_locks_lookup(rw_trx_hash_element_t *element,
                                       const dict_table_t *table)
{
5837
  lock_sys.assert_locked();
5838
  element->mutex.wr_lock();
5839 5840
  if (element->trx)
  {
5841
    element->trx->mutex_lock();
5842
    check_trx_state(element->trx);
Marko Mäkelä's avatar
Marko Mäkelä committed
5843
    if (element->trx->state != TRX_STATE_COMMITTED_IN_MEMORY)
5844
    {
Marko Mäkelä's avatar
Marko Mäkelä committed
5845 5846 5847
      for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
           lock != NULL;
           lock= UT_LIST_GET_NEXT(trx_locks, lock))
5848
      {
Marko Mäkelä's avatar
Marko Mäkelä committed
5849
        ut_ad(lock->trx == element->trx);
5850
        if (!lock->is_table())
Marko Mäkelä's avatar
Marko Mäkelä committed
5851
        {
Marko Mäkelä's avatar
Marko Mäkelä committed
5852
          ut_ad(lock->index->online_status != ONLINE_INDEX_CREATION ||
Marko Mäkelä's avatar
Marko Mäkelä committed
5853 5854 5855 5856 5857
                lock->index->is_primary());
          ut_ad(lock->index->table != table);
        }
        else
          ut_ad(lock->un_member.tab_lock.table != table);
5858 5859
      }
    }
5860
    element->trx->mutex_unlock();
5861
  }
5862
  element->mutex.wr_unlock();
5863
  return 0;
5864
}
5865
#endif /* UNIV_DEBUG */
5866

5867
/** Check if there are any locks on a table.
5868
@return true if table has either table or record locks. */
5869
TRANSACTIONAL_TARGET
5870
bool lock_table_has_locks(dict_table_t *table)
5871
{
5872 5873
  if (table->n_rec_locks)
    return true;
5874 5875 5876 5877 5878 5879 5880 5881 5882 5883 5884 5885 5886 5887 5888 5889
  ulint len;
#if !defined NO_ELISION && !defined SUX_LOCK_GENERIC
  if (xbegin())
  {
    if (table->lock_mutex_is_locked())
      xabort();
    len= UT_LIST_GET_LEN(table->locks);
    xend();
  }
  else
#endif
  {
    table->lock_mutex_lock();
    len= UT_LIST_GET_LEN(table->locks);
    table->lock_mutex_unlock();
  }
5890 5891
  if (len)
    return true;
5892
#ifdef UNIV_DEBUG
5893 5894 5895 5896 5897
  {
    LockMutexGuard g{SRW_LOCK_CALL};
    trx_sys.rw_trx_hash.iterate(lock_table_locks_lookup,
                                const_cast<const dict_table_t*>(table));
  }
5898
#endif /* UNIV_DEBUG */
5899
  return false;
5900
}
5901

5902 5903 5904 5905 5906 5907 5908 5909 5910
/*******************************************************************//**
Initialise the table lock list. */
void
lock_table_lock_list_init(
/*======================*/
	table_lock_list_t*	lock_list)	/*!< List to initialise */
{
	UT_LIST_INIT(*lock_list, &lock_table_t::locks);
}
5911

5912 5913 5914 5915 5916 5917 5918 5919 5920 5921 5922 5923
#ifdef UNIV_DEBUG
/*******************************************************************//**
Check if the transaction holds any locks on the sys tables
or its records.
@return the strongest lock found on any sys table or 0 for none */
const lock_t*
lock_trx_has_sys_table_locks(
/*=========================*/
	const trx_t*	trx)	/*!< in: transaction to check */
{
	const lock_t*	strongest_lock = 0;
	lock_mode	strongest = LOCK_NONE;
5924

5925
	LockMutexGuard g{SRW_LOCK_CALL};
5926

5927 5928
	const lock_list::const_iterator end = trx->lock.table_locks.end();
	lock_list::const_iterator it = trx->lock.table_locks.begin();
5929

5930
	/* Find a valid mode. Note: ib_vector_size() can be 0. */
5931

5932 5933 5934 5935 5936 5937
	for (/* No op */; it != end; ++it) {
		const lock_t*	lock = *it;

		if (lock != NULL
		    && dict_is_sys_table(lock->un_member.tab_lock.table->id)) {

5938
			strongest = lock->mode();
5939 5940 5941 5942
			ut_ad(strongest != LOCK_NONE);
			strongest_lock = lock;
			break;
		}
5943 5944
	}

5945 5946
	if (strongest == LOCK_NONE) {
		return(NULL);
5947 5948
	}

5949 5950
	for (/* No op */; it != end; ++it) {
		const lock_t*	lock = *it;
5951

5952 5953 5954
		if (lock == NULL) {
			continue;
		}
5955

5956
		ut_ad(trx == lock->trx);
5957 5958
		ut_ad(lock->is_table());
		ut_ad(lock->un_member.tab_lock.table);
5959

5960
		lock_mode mode = lock->mode();
5961

5962 5963
		if (dict_is_sys_table(lock->un_member.tab_lock.table->id)
		    && lock_mode_stronger_or_eq(mode, strongest)) {
5964

5965 5966 5967 5968
			strongest = mode;
			strongest_lock = lock;
		}
	}
5969

5970
	return(strongest_lock);
5971 5972
}

5973 5974 5975
/** Check if the transaction holds an explicit exclusive lock on a record.
@param[in]	trx	transaction
@param[in]	table	table
5976
@param[in]	id	leaf page identifier
5977 5978
@param[in]	heap_no	heap number identifying the record
@return whether an explicit X-lock is held */
5979 5980
bool lock_trx_has_expl_x_lock(const trx_t &trx, const dict_table_t &table,
                              page_id_t id, ulint heap_no)
5981
{
5982 5983
  ut_ad(heap_no > PAGE_HEAP_NO_SUPREMUM);
  ut_ad(lock_table_has(&trx, &table, LOCK_IX));
5984 5985
  if (!lock_table_has(&trx, &table, LOCK_X))
  {
5986
    LockGuard g{lock_sys.rec_hash, id};
5987 5988
    ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
                            g.cell(), id, heap_no, &trx));
5989
  }
5990
  return true;
5991
}
5992
#endif /* UNIV_DEBUG */
5993

5994
namespace Deadlock
5995
{
5996 5997 5998 5999 6000 6001
  /** rewind(3) the file used for storing the latest detected deadlock and
  print a heading message to stderr if printing of all deadlocks to stderr
  is enabled. */
  static void start_print()
  {
    lock_sys.assert_locked();
6002

6003 6004
    rewind(lock_latest_err_file);
    ut_print_timestamp(lock_latest_err_file);
6005

6006 6007 6008 6009
    if (srv_print_all_deadlocks)
      ib::info() << "Transactions deadlock detected,"
                    " dumping detailed information.";
  }
6010

6011 6012 6013
  /** Print a message to the deadlock file and possibly to stderr.
  @param msg message to print */
  static void print(const char *msg)
6014
  {
6015 6016 6017
    fputs(msg, lock_latest_err_file);
    if (srv_print_all_deadlocks)
      ib::info() << msg;
6018
  }
6019

6020 6021 6022 6023 6024
  /** Print transaction data to the deadlock file and possibly to stderr.
  @param trx transaction */
  static void print(const trx_t &trx)
  {
    lock_sys.assert_locked();
6025

6026 6027 6028
    ulint n_rec_locks= trx.lock.n_rec_locks;
    ulint n_trx_locks= UT_LIST_GET_LEN(trx.lock.trx_locks);
    ulint heap_size= mem_heap_get_size(trx.lock.lock_heap);
6029

6030 6031
    trx_print_low(lock_latest_err_file, &trx, 3000,
                  n_rec_locks, n_trx_locks, heap_size);
6032

6033 6034 6035
    if (srv_print_all_deadlocks)
      trx_print_low(stderr, &trx, 3000, n_rec_locks, n_trx_locks, heap_size);
  }
6036

6037 6038 6039 6040 6041
  /** Print lock data to the deadlock file and possibly to stderr.
  @param lock record or table type lock */
  static void print(const lock_t &lock)
  {
    lock_sys.assert_locked();
6042

6043 6044 6045 6046
    if (!lock.is_table())
    {
      mtr_t mtr;
      lock_rec_print(lock_latest_err_file, &lock, mtr);
6047

6048 6049 6050 6051 6052 6053
      if (srv_print_all_deadlocks)
        lock_rec_print(stderr, &lock, mtr);
    }
    else
    {
      lock_table_print(lock_latest_err_file, &lock);
6054

6055 6056 6057 6058
      if (srv_print_all_deadlocks)
        lock_table_print(stderr, &lock);
    }
  }
6059

6060 6061 6062 6063 6064 6065
  ATTRIBUTE_COLD
  /** Report a deadlock (cycle in the waits-for graph).
  @param trx        transaction waiting for a lock in this thread
  @param current_trx whether trx belongs to the current thread
  @return the transaction to be rolled back (unless one was committed already)
  @return nullptr if no deadlock */
6066
  static trx_t *report(trx_t *const trx, bool current_trx)
6067
  {
6068
    mysql_mutex_assert_owner(&lock_sys.wait_mutex);
6069
    ut_ad(xtest() || lock_sys.is_writer() == !current_trx);
6070

6071 6072
    /* Normally, trx should be a direct part of the deadlock
    cycle. However, if innodb_deadlock_detect had been OFF in the
6073 6074 6075 6076
    past, or if current_trx=false, trx may be waiting for a lock that
    is held by a participant of a pre-existing deadlock, without being
    part of the deadlock itself. That is, the path to the deadlock may be
    P-shaped instead of O-shaped, with trx being at the foot of the P.
6077 6078 6079 6080 6081 6082 6083

    We will process the entire path leading to a cycle, and we will
    choose the victim (to be aborted) among the cycle. */

    static const char rollback_msg[]= "*** WE ROLL BACK TRANSACTION (%u)\n";
    char buf[9 + sizeof rollback_msg];

6084 6085 6086 6087
    /* If current_trx=true, trx is owned by this thread, and we can
    safely invoke these without holding trx->mutex or lock_sys.latch.
    If current_trx=false, a concurrent commit is protected by both
    lock_sys.latch and lock_sys.wait_mutex. */
6088
    const undo_no_t trx_weight= TRX_WEIGHT(trx) |
6089 6090 6091 6092 6093 6094 6095
      (trx->mysql_thd &&
#ifdef WITH_WSREP
       (thd_has_edited_nontrans_tables(trx->mysql_thd) ||
        (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false)))
#else
       thd_has_edited_nontrans_tables(trx->mysql_thd)
#endif /* WITH_WSREP */
6096
       ? 1ULL << 63 : 0);
6097

6098 6099 6100
    trx_t *victim= nullptr;
    undo_no_t victim_weight= ~0ULL;
    unsigned victim_pos= 0, trx_pos= 0;
6101

6102 6103 6104
    /* Here, lock elision does not make sense, because
    for the output we are going to invoke system calls,
    which would interrupt a memory transaction. */
6105
    if (current_trx && !lock_sys.wr_lock_try())
6106
    {
6107 6108
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.wr_lock(SRW_LOCK_CALL);
6109
      mysql_mutex_lock(&lock_sys.wait_mutex);
6110 6111 6112 6113
    }

    {
      unsigned l= 0;
6114 6115 6116 6117
      /* Now that we are holding lock_sys.wait_mutex again, check
      whether a cycle still exists. */
      trx_t *cycle= find_cycle(trx);
      if (!cycle)
6118
        goto func_exit; /* One of the transactions was already aborted. */
6119 6120 6121 6122
      for (trx_t *next= cycle;;)
      {
        next= next->lock.wait_trx;
        const undo_no_t next_weight= TRX_WEIGHT(next) |
6123 6124 6125 6126 6127 6128 6129
          (next->mysql_thd &&
#ifdef WITH_WSREP
           (thd_has_edited_nontrans_tables(next->mysql_thd) ||
            (next->is_wsrep() && wsrep_thd_is_BF(next->mysql_thd, false)))
#else
           thd_has_edited_nontrans_tables(next->mysql_thd)
#endif /* WITH_WSREP */
6130 6131 6132 6133 6134 6135 6136 6137 6138 6139 6140 6141
           ? 1ULL << 63 : 0);
        if (next_weight < victim_weight)
        {
          victim_weight= next_weight;
          victim= next;
          victim_pos= l;
        }
        if (next == victim)
          trx_pos= l;
        if (next == cycle)
          break;
      }
6142

6143 6144 6145 6146 6147
      if (trx_pos && trx_weight == victim_weight)
      {
        victim= trx;
        victim_pos= trx_pos;
      }
6148

6149 6150 6151 6152 6153 6154 6155 6156
      /* Finally, display the deadlock */
      switch (const auto r= static_cast<enum report>(innodb_deadlock_report)) {
      case REPORT_OFF:
        break;
      case REPORT_BASIC:
      case REPORT_FULL:
        start_print();
        l= 0;
6157

6158 6159 6160 6161 6162 6163 6164 6165 6166 6167 6168 6169
        for (trx_t *next= cycle;;)
        {
          next= next->lock.wait_trx;
          ut_ad(next);
          ut_ad(next->state == TRX_STATE_ACTIVE);
          const lock_t *wait_lock= next->lock.wait_lock;
          ut_ad(wait_lock);
          snprintf(buf, sizeof buf, "\n*** (%u) TRANSACTION:\n", ++l);
          print(buf);
          print(*next);
          print("*** WAITING FOR THIS LOCK TO BE GRANTED:\n");
          print(*wait_lock);
6170
          if (r == REPORT_BASIC);
6171 6172 6173 6174 6175 6176 6177 6178 6179 6180 6181 6182 6183 6184 6185
          else if (wait_lock->is_table())
          {
            if (const lock_t *lock=
                UT_LIST_GET_FIRST(wait_lock->un_member.tab_lock.table->locks))
            {
              ut_ad(!lock->is_waiting());
              print("*** CONFLICTING WITH:\n");
              do
                print(*lock);
              while ((lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) &&
                     !lock->is_waiting());
            }
            else
              ut_ad("no conflicting table lock found" == 0);
          }
6186
          else
6187
          {
6188 6189 6190 6191 6192 6193 6194 6195 6196 6197 6198 6199 6200 6201 6202 6203 6204 6205
            const page_id_t id{wait_lock->un_member.rec_lock.page_id};
            hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                                 ? lock_sys.prdt_hash : lock_sys.rec_hash).
              cell_get(id.fold());
            if (const lock_t *lock= lock_sys_t::get_first(cell, id))
            {
              const ulint heap_no= lock_rec_find_set_bit(wait_lock);
              if (!lock_rec_get_nth_bit(lock, heap_no))
                lock= lock_rec_get_next_const(heap_no, lock);
              ut_ad(!lock->is_waiting());
              print("*** CONFLICTING WITH:\n");
              do
                print(*lock);
              while ((lock= lock_rec_get_next_const(heap_no, lock)) &&
                     !lock->is_waiting());
            }
            else
              ut_ad("no conflicting record lock found" == 0);
6206 6207 6208 6209 6210 6211 6212
          }
          if (next == cycle)
            break;
        }
        snprintf(buf, sizeof buf, rollback_msg, victim_pos);
        print(buf);
      }
6213

6214
      ut_ad(victim->state == TRX_STATE_ACTIVE);
6215

6216 6217
      victim->lock.was_chosen_as_deadlock_victim= true;
      lock_cancel_waiting_and_release(victim->lock.wait_lock);
6218 6219 6220 6221 6222
#ifdef WITH_WSREP
      if (victim->is_wsrep() && wsrep_thd_is_SR(victim->mysql_thd))
        wsrep_handle_SR_rollback(trx->mysql_thd, victim->mysql_thd);
#endif
    }
6223

6224 6225 6226
func_exit:
    if (current_trx)
      lock_sys.wr_unlock();
6227 6228
    return victim;
  }
6229 6230
}

6231 6232 6233 6234 6235
/** Check if a lock request results in a deadlock.
Resolve a deadlock by choosing a transaction that will be rolled back.
@param trx    transaction requesting a lock
@return whether trx must report DB_DEADLOCK */
static bool Deadlock::check_and_resolve(trx_t *trx)
6236
{
6237 6238
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);

6239
  ut_ad(!trx->mutex_is_owner());
6240 6241
  ut_ad(trx->state == TRX_STATE_ACTIVE);
  ut_ad(!srv_read_only_mode);
6242

6243 6244
  if (!innodb_deadlock_detect)
    return false;
6245

6246 6247 6248 6249 6250 6251
  if (UNIV_LIKELY_NULL(find_cycle(trx)) && report(trx, true) == trx)
    return true;

  if (UNIV_LIKELY(!trx->lock.was_chosen_as_deadlock_victim))
    return false;

6252
  if (lock_t *wait_lock= trx->lock.wait_lock)
6253
    lock_sys_t::cancel<false>(trx, wait_lock);
6254

6255
  lock_sys.deadlock_check();
6256
  return true;
6257
}
6258

6259
/** Check for deadlocks while holding only lock_sys.wait_mutex. */
6260
TRANSACTIONAL_TARGET
6261
void lock_sys_t::deadlock_check()
6262
{
6263 6264
  ut_ad(!is_writer());
  mysql_mutex_assert_owner(&wait_mutex);
6265
  bool acquired= false;
6266 6267 6268
#if !defined NO_ELISION && !defined SUX_LOCK_GENERIC
  bool elided= false;
#endif
6269

6270 6271
  if (Deadlock::to_be_checked)
  {
6272
    for (;;)
6273
    {
6274
      auto i= Deadlock::to_check.begin();
6275 6276
      if (i == Deadlock::to_check.end())
        break;
6277 6278 6279 6280 6281 6282 6283 6284 6285 6286
      if (acquired);
#if !defined NO_ELISION && !defined SUX_LOCK_GENERIC
      else if (xbegin())
      {
        if (latch.is_locked_or_waiting())
          xabort();
        acquired= elided= true;
      }
#endif
      else
6287
      {
6288 6289
        acquired= wr_lock_try();
        if (!acquired)
6290
        {
6291 6292
          acquired= true;
          mysql_mutex_unlock(&wait_mutex);
6293
          lock_sys.wr_lock(SRW_LOCK_CALL);
6294
          mysql_mutex_lock(&wait_mutex);
6295 6296 6297
          continue;
        }
      }
6298 6299 6300 6301
      trx_t *trx= *i;
      Deadlock::to_check.erase(i);
      if (Deadlock::find_cycle(trx))
        Deadlock::report(trx, false);
6302
    }
6303
    Deadlock::to_be_checked= false;
6304
  }
6305
  ut_ad(Deadlock::to_check.empty());
6306 6307 6308 6309
#if !defined NO_ELISION && !defined SUX_LOCK_GENERIC
  if (elided)
    return;
#endif
6310
  if (acquired)
6311
    wr_unlock();
6312 6313
}

6314 6315 6316
/** Update the locks when a page is split and merged to two pages,
in defragmentation. */
void lock_update_split_and_merge(
6317 6318 6319 6320
	const buf_block_t* left_block,	/*!< in: left page to which merged */
	const rec_t* orig_pred,		/*!< in: original predecessor of
					supremum on the left page before merge*/
	const buf_block_t* right_block)	/*!< in: right page from which merged */
6321
{
6322 6323 6324
  ut_ad(page_is_leaf(left_block->page.frame));
  ut_ad(page_is_leaf(right_block->page.frame));
  ut_ad(page_align(orig_pred) == left_block->page.frame);
6325

6326 6327
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
6328

6329
  /* This would likely be too large for a memory transaction. */
6330 6331 6332
  LockMultiGuard g{lock_sys.rec_hash, l, r};
  const rec_t *left_next_rec= page_rec_get_next_const(orig_pred);
  ut_ad(!page_rec_is_metadata(left_next_rec));
6333

6334 6335
  /* Inherit the locks on the supremum of the left page to the
  first record which was moved from the right page */
6336
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell1(), l, left_block->page.frame,
6337 6338
                          page_rec_get_heap_no(left_next_rec),
                          PAGE_HEAP_NO_SUPREMUM);
6339

6340 6341
  /* Reset the locks on the supremum of the left page,
  releasing waiting transactions */
6342
  lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
6343

6344 6345
  /* Inherit the locks to the supremum of the left page from the
  successor of the infimum on the right page */
6346
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->page.frame,
6347
                          PAGE_HEAP_NO_SUPREMUM,
6348
                          lock_get_min_heap_no(right_block));
6349
}