lock0lock.cc 190 KB
Newer Older
1 2
/*****************************************************************************

3
Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
4
Copyright (c) 2014, 2022, MariaDB Corporation.
5 6 7 8 9 10 11 12 13 14 15

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
Vicențiu Ciorbaru's avatar
Vicențiu Ciorbaru committed
16
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 18 19 20 21 22 23 24 25 26 27 28

*****************************************************************************/

/**************************************************//**
@file lock/lock0lock.cc
The transaction lock system

Created 5/7/1996 Heikki Tuuri
*******************************************************/

#define LOCK_MODULE_IMPLEMENTATION

29
#include "univ.i"
30

31
#include <mysql/service_thd_error_context.h>
32
#include <mysql/service_thd_wait.h>
33
#include <sql_class.h>
34

35 36
#include "lock0lock.h"
#include "lock0priv.h"
37
#include "dict0mem.h"
38 39 40
#include "trx0purge.h"
#include "trx0sys.h"
#include "ut0vec.h"
41
#include "btr0cur.h"
42 43
#include "row0sel.h"
#include "row0mysql.h"
44
#include "row0vers.h"
45
#include "pars0pars.h"
46
#include "srv0mon.h"
47

48 49
#include <set>

50
#ifdef WITH_WSREP
51
#include <mysql/service_wsrep.h>
52
#include <debug_sync.h>
53
#endif /* WITH_WSREP */
54

55
/** The value of innodb_deadlock_detect */
56 57 58
my_bool innodb_deadlock_detect;
/** The value of innodb_deadlock_report */
ulong innodb_deadlock_report;
59

60
#ifdef HAVE_REPLICATION
61 62
extern "C" void thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd);
extern "C" int thd_need_wait_reports(const MYSQL_THD thd);
63
extern "C" int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
64
#endif
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
/** Functor for accessing the embedded node within a table lock. */
struct TableLockGetNode
{
  ut_list_node<lock_t> &operator()(lock_t &elem)
  { return(elem.un_member.tab_lock.locks); }
};

/** Create the hash table.
@param n  the lower bound of n_cells */
void lock_sys_t::hash_table::create(ulint n)
{
  n_cells= ut_find_prime(n);
  const size_t size= pad(n_cells) * sizeof *array;
  void* v= aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
  memset(v, 0, size);
  array= static_cast<hash_cell_t*>(v);
}

/** Resize the hash table.
@param n  the lower bound of n_cells */
void lock_sys_t::hash_table::resize(ulint n)
{
  ut_ad(lock_sys.is_writer());
  ulint new_n_cells= ut_find_prime(n);
  const size_t size= pad(new_n_cells) * sizeof *array;
  void* v= aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
  memset(v, 0, size);
  hash_cell_t *new_array= static_cast<hash_cell_t*>(v);

  for (auto i= pad(n_cells); i--; )
  {
    if (lock_t *lock= static_cast<lock_t*>(array[i].node))
    {
99 100
      /* all hash_latch must vacated */
      ut_ad(i % (ELEMENTS_PER_LATCH + LATCH) >= LATCH);
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
      do
      {
        ut_ad(!lock->is_table());
        hash_cell_t *c= calc_hash(lock->un_member.rec_lock.page_id.fold(),
                                  new_n_cells) + new_array;
        lock_t *next= lock->hash;
        lock->hash= nullptr;
        if (!c->node)
          c->node= lock;
        else if (!lock->is_waiting())
        {
          lock->hash= static_cast<lock_t*>(c->node);
          c->node= lock;
        }
        else
        {
          lock_t *next= static_cast<lock_t*>(c->node);
          while (next->hash)
            next= next->hash;
          next->hash= lock;
        }
        lock= next;
      }
      while (lock);
    }
  }

128
  aligned_free(array);
129 130 131 132
  array= new_array;
  n_cells= new_n_cells;
}

133
#ifdef SUX_LOCK_GENERIC
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
void lock_sys_t::hash_latch::wait()
{
  pthread_mutex_lock(&lock_sys.hash_mutex);
  while (!write_trylock())
    pthread_cond_wait(&lock_sys.hash_cond, &lock_sys.hash_mutex);
  pthread_mutex_unlock(&lock_sys.hash_mutex);
}

void lock_sys_t::hash_latch::release()
{
  pthread_mutex_lock(&lock_sys.hash_mutex);
  write_unlock();
  pthread_cond_signal(&lock_sys.hash_cond);
  pthread_mutex_unlock(&lock_sys.hash_mutex);
}
#endif

#ifdef UNIV_DEBUG
/** Assert that a lock shard is exclusively latched by this thread */
void lock_sys_t::assert_locked(const lock_t &lock) const
{
  ut_ad(this == &lock_sys);
  if (is_writer())
    return;
  if (lock.is_table())
    assert_locked(*lock.un_member.tab_lock.table);
  else
    lock_sys.hash_get(lock.type_mode).
      assert_locked(lock.un_member.rec_lock.page_id);
}

/** Assert that a table lock shard is exclusively latched by this thread */
void lock_sys_t::assert_locked(const dict_table_t &table) const
{
  ut_ad(!table.is_temporary());
169
  if (is_writer())
170 171 172 173 174
    return;
  ut_ad(readers);
  ut_ad(table.lock_mutex_is_owner());
}

175
/** Assert that hash cell for page is exclusively latched by this thread */
176 177 178 179 180
void lock_sys_t::hash_table::assert_locked(const page_id_t id) const
{
  if (lock_sys.is_writer())
    return;
  ut_ad(lock_sys.readers);
181 182 183 184 185 186
  ut_ad(latch(cell_get(id.fold()))->is_locked());
}

/** Assert that a hash table cell is exclusively latched (by some thread) */
void lock_sys_t::assert_locked(const hash_cell_t &cell) const
{
187
  if (is_writer())
188 189 190
    return;
  ut_ad(lock_sys.readers);
  ut_ad(hash_table::latch(const_cast<hash_cell_t*>(&cell))->is_locked());
191 192 193 194 195 196 197
}
#endif

LockGuard::LockGuard(lock_sys_t::hash_table &hash, page_id_t id)
{
  const auto id_fold= id.fold();
  lock_sys.rd_lock(SRW_LOCK_CALL);
198 199
  cell_= hash.cell_get(id_fold);
  hash.latch(cell_)->acquire();
200 201 202 203 204 205 206 207
}

LockMultiGuard::LockMultiGuard(lock_sys_t::hash_table &hash,
                               const page_id_t id1, const page_id_t id2)
{
  ut_ad(id1.space() == id2.space());
  const auto id1_fold= id1.fold(), id2_fold= id2.fold();
  lock_sys.rd_lock(SRW_LOCK_CALL);
208 209 210 211
  cell1_= hash.cell_get(id1_fold);
  cell2_= hash.cell_get(id2_fold);

  auto latch1= hash.latch(cell1_), latch2= hash.latch(cell2_);
212 213 214 215 216 217 218 219 220
  if (latch1 > latch2)
    std::swap(latch1, latch2);
  latch1->acquire();
  if (latch1 != latch2)
    latch2->acquire();
}

LockMultiGuard::~LockMultiGuard()
{
221 222
  auto latch1= lock_sys_t::hash_table::latch(cell1_),
    latch2= lock_sys_t::hash_table::latch(cell2_);
223 224 225 226 227 228 229
  latch1->release();
  if (latch1 != latch2)
    latch2->release();
  /* Must be last, to avoid a race with lock_sys_t::hash_table::resize() */
  lock_sys.rd_unlock();
}

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
TRANSACTIONAL_TARGET
TMLockGuard::TMLockGuard(lock_sys_t::hash_table &hash, page_id_t id)
{
  const auto id_fold= id.fold();
#if !defined NO_ELISION && !defined SUX_LOCK_GENERIC
  if (xbegin())
  {
    if (lock_sys.latch.is_write_locked())
      xabort();
    cell_= hash.cell_get(id_fold);
    if (hash.latch(cell_)->is_locked())
      xabort();
    elided= true;
    return;
  }
  elided= false;
#endif
  lock_sys.rd_lock(SRW_LOCK_CALL);
  cell_= hash.cell_get(id_fold);
  hash.latch(cell_)->acquire();
}

Marko Mäkelä's avatar
Marko Mäkelä committed
252
/** Pretty-print a table lock.
253 254
@param[in,out]	file	output stream
@param[in]	lock	table lock */
Marko Mäkelä's avatar
Marko Mäkelä committed
255
static void lock_table_print(FILE* file, const lock_t* lock);
256

Marko Mäkelä's avatar
Marko Mäkelä committed
257
/** Pretty-print a record lock.
258
@param[in,out]	file	output stream
Marko Mäkelä's avatar
Marko Mäkelä committed
259 260 261
@param[in]	lock	record lock
@param[in,out]	mtr	mini-transaction for accessing the record */
static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr);
262

263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
namespace Deadlock
{
  /** Whether to_check may be nonempty */
  static Atomic_relaxed<bool> to_be_checked;
  /** Transactions to check for deadlock. Protected by lock_sys.wait_mutex. */
  static std::set<trx_t*> to_check;

  MY_ATTRIBUTE((nonnull, warn_unused_result))
  /** Check if a lock request results in a deadlock.
  Resolve a deadlock by choosing a transaction that will be rolled back.
  @param trx    transaction requesting a lock
  @return whether trx must report DB_DEADLOCK */
  static bool check_and_resolve(trx_t *trx);

  /** Quickly detect a deadlock using Brent's cycle detection algorithm.
  @param trx     transaction that is waiting for another transaction
  @return a transaction that is part of a cycle
  @retval nullptr if no cycle was found */
  inline trx_t *find_cycle(trx_t *trx)
  {
    mysql_mutex_assert_owner(&lock_sys.wait_mutex);
    trx_t *tortoise= trx, *hare= trx;
    for (unsigned power= 1, l= 1; (hare= hare->lock.wait_trx) != nullptr; l++)
    {
      if (tortoise == hare)
      {
        ut_ad(l > 1);
        lock_sys.deadlocks++;
        /* Note: Normally, trx should be part of any deadlock cycle
        that is found. However, if innodb_deadlock_detect=OFF had been
        in effect in the past, it is possible that trx will be waiting
        for a transaction that participates in a pre-existing deadlock
        cycle. In that case, our victim will not be trx. */
        return hare;
      }
      if (l == power)
      {
        /* The maximum concurrent number of TRX_STATE_ACTIVE transactions
        is TRX_RSEG_N_SLOTS * 128, or innodb_page_size / 16 * 128
        (default: 131,072, maximum: 524,288).
        Our maximum possible number of iterations should be twice that. */
        power<<= 1;
        l= 0;
        tortoise= hare;
      }
    }
    return nullptr;
  }
311 312
};

313
#ifdef UNIV_DEBUG
314 315
/** Validate the transactional locks. */
static void lock_validate();
316

317 318 319 320 321 322
/** Validate the record lock queues on a page.
@param block    buffer pool block
@param latched  whether the tablespace latch may be held
@return true if ok */
static bool lock_rec_validate_page(const buf_block_t *block, bool latched)
  MY_ATTRIBUTE((nonnull, warn_unused_result));
323 324 325
#endif /* UNIV_DEBUG */

/* The lock system */
326
lock_sys_t lock_sys;
327

328 329
/** Only created if !srv_read_only_mode. Protected by lock_sys.latch. */
static FILE *lock_latest_err_file;
330 331 332

/*********************************************************************//**
Reports that a transaction id is insensible, i.e., in the future. */
333
ATTRIBUTE_COLD
334 335 336 337 338 339
void
lock_report_trx_id_insanity(
/*========================*/
	trx_id_t	trx_id,		/*!< in: trx id */
	const rec_t*	rec,		/*!< in: user record */
	dict_index_t*	index,		/*!< in: index */
340
	const rec_offs*	offsets,	/*!< in: rec_get_offsets(rec, index) */
341
	trx_id_t	max_trx_id)	/*!< in: trx_sys.get_max_trx_id() */
342
{
343
	ut_ad(rec_offs_validate(rec, index, offsets));
344
	ut_ad(!rec_is_metadata(rec, *index));
345

346
	ib::error()
347
		<< "Transaction id " << ib::hex(trx_id)
348 349 350 351 352
		<< " associated with record" << rec_offsets_print(rec, offsets)
		<< " in index " << index->name
		<< " of table " << index->table->name
		<< " is greater than the global counter " << max_trx_id
		<< "! The table is corrupted.";
353 354 355 356
}

/*********************************************************************//**
Checks that a transaction id is sensible, i.e., not in the future.
357
@return true if ok */
358 359 360 361 362 363
bool
lock_check_trx_id_sanity(
/*=====================*/
	trx_id_t	trx_id,		/*!< in: trx id */
	const rec_t*	rec,		/*!< in: user record */
	dict_index_t*	index,		/*!< in: index */
364
	const rec_offs*	offsets)	/*!< in: rec_get_offsets(rec, index) */
365
{
366 367 368 369 370 371 372 373 374 375 376 377
  ut_ad(rec_offs_validate(rec, index, offsets));
  ut_ad(!rec_is_metadata(rec, *index));

  trx_id_t max_trx_id= trx_sys.get_max_trx_id();
  ut_ad(max_trx_id || srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN);

  if (UNIV_LIKELY(max_trx_id != 0) && UNIV_UNLIKELY(trx_id >= max_trx_id))
  {
    lock_report_trx_id_insanity(trx_id, rec, index, offsets, max_trx_id);
    return false;
  }
  return true;
378 379 380
}


381 382
/**
  Creates the lock system at database start.
383

384 385 386 387
  @param[in] n_cells number of slots in lock hash table
*/
void lock_sys_t::create(ulint n_cells)
{
388 389
  ut_ad(this == &lock_sys);
  ut_ad(!is_initialised());
390

391
  m_initialised= true;
392

393
  latch.SRW_LOCK_INIT(lock_latch_key);
394 395 396
#ifdef __aarch64__
  mysql_mutex_init(lock_wait_mutex_key, &wait_mutex, MY_MUTEX_INIT_FAST);
#else
397
  mysql_mutex_init(lock_wait_mutex_key, &wait_mutex, nullptr);
398
#endif
399
#ifdef SUX_LOCK_GENERIC
400 401 402
  pthread_mutex_init(&hash_mutex, nullptr);
  pthread_cond_init(&hash_cond, nullptr);
#endif
403

404 405 406
  rec_hash.create(n_cells);
  prdt_hash.create(n_cells);
  prdt_page_hash.create(n_cells);
407

408 409 410 411 412
  if (!srv_read_only_mode)
  {
    lock_latest_err_file= os_file_create_tmpfile();
    ut_a(lock_latest_err_file);
  }
413 414
}

415 416 417 418
#ifdef UNIV_PFS_RWLOCK
/** Acquire exclusive lock_sys.latch */
void lock_sys_t::wr_lock(const char *file, unsigned line)
{
419
  mysql_mutex_assert_not_owner(&wait_mutex);
420 421 422 423 424 425 426 427 428 429
  latch.wr_lock(file, line);
  ut_ad(!writer.exchange(os_thread_get_curr_id(), std::memory_order_relaxed));
}
/** Release exclusive lock_sys.latch */
void lock_sys_t::wr_unlock()
{
  ut_ad(writer.exchange(0, std::memory_order_relaxed) ==
        os_thread_get_curr_id());
  latch.wr_unlock();
}
430

431 432 433
/** Acquire shared lock_sys.latch */
void lock_sys_t::rd_lock(const char *file, unsigned line)
{
434
  mysql_mutex_assert_not_owner(&wait_mutex);
435 436 437 438 439 440 441 442 443 444 445 446 447
  latch.rd_lock(file, line);
  ut_ad(!writer.load(std::memory_order_relaxed));
  ut_d(readers.fetch_add(1, std::memory_order_relaxed));
}

/** Release shared lock_sys.latch */
void lock_sys_t::rd_unlock()
{
  ut_ad(!writer.load(std::memory_order_relaxed));
  ut_ad(readers.fetch_sub(1, std::memory_order_relaxed));
  latch.rd_unlock();
}
#endif
448

449 450 451 452 453 454
/**
  Resize the lock hash table.

  @param[in] n_cells number of slots in lock hash table
*/
void lock_sys_t::resize(ulint n_cells)
455
{
456
  ut_ad(this == &lock_sys);
457 458
  /* Buffer pool resizing is rarely initiated by the user, and this
  would exceed the maximum size of a memory transaction. */
459 460 461 462
  LockMutexGuard g{SRW_LOCK_CALL};
  rec_hash.resize(n_cells);
  prdt_hash.resize(n_cells);
  prdt_page_hash.resize(n_cells);
463 464
}

465 466
/** Closes the lock system at database shutdown. */
void lock_sys_t::close()
467
{
468
  ut_ad(this == &lock_sys);
469

470 471
  if (!m_initialised)
    return;
472

473 474 475 476 477
  if (lock_latest_err_file)
  {
    my_fclose(lock_latest_err_file, MYF(MY_WME));
    lock_latest_err_file= nullptr;
  }
478

479 480 481
  rec_hash.free();
  prdt_hash.free();
  prdt_page_hash.free();
482
#ifdef SUX_LOCK_GENERIC
483 484 485
  pthread_mutex_destroy(&hash_mutex);
  pthread_cond_destroy(&hash_cond);
#endif
486

487
  latch.destroy();
488
  mysql_mutex_destroy(&wait_mutex);
489

490 491 492
  Deadlock::to_check.clear();
  Deadlock::to_be_checked= false;

493
  m_initialised= false;
494 495
}

496
#ifdef WITH_WSREP
497
# ifdef UNIV_DEBUG
498 499 500 501 502 503
/** Check if both conflicting lock transaction and other transaction
requesting record lock are brute force (BF). If they are check is
this BF-BF wait correct and if not report BF wait and assert.

@param[in]	lock_rec	other waiting record lock
@param[in]	trx		trx requesting conflicting record lock
504
*/
505
static void wsrep_assert_no_bf_bf_wait(const lock_t *lock, const trx_t *trx)
506
{
Marko Mäkelä's avatar
Marko Mäkelä committed
507 508
	ut_ad(!lock->is_table());
	lock_sys.assert_locked(*lock);
509
	trx_t* lock_trx= lock->trx;
510

Marko Mäkelä's avatar
Marko Mäkelä committed
511
	/* Note that we are holding lock_sys.latch, thus we should
Marko Mäkelä's avatar
Marko Mäkelä committed
512
	not acquire THD::LOCK_thd_data mutex below to avoid latching
513
	order violation. */
514

515
	if (!trx->is_wsrep() || !lock_trx->is_wsrep())
516
		return;
517 518
	if (UNIV_LIKELY(!wsrep_thd_is_BF(trx->mysql_thd, FALSE))
	    || UNIV_LIKELY(!wsrep_thd_is_BF(lock_trx->mysql_thd, FALSE)))
519 520
		return;

521 522
	ut_ad(trx->state == TRX_STATE_ACTIVE);

Marko Mäkelä's avatar
Marko Mäkelä committed
523
	switch (lock_trx->state) {
524
	case TRX_STATE_COMMITTED_IN_MEMORY:
Marko Mäkelä's avatar
Marko Mäkelä committed
525 526
		/* The state change is only protected by trx_t::mutex,
		which we are not even holding here. */
527
	case TRX_STATE_PREPARED:
Marko Mäkelä's avatar
Marko Mäkelä committed
528 529
		/* Wait for lock->trx to complete the commit
		(or XA ROLLBACK) and to release the lock. */
530
		return;
531 532 533 534 535
	case TRX_STATE_ACTIVE:
		break;
	default:
		ut_ad("invalid state" == 0);
	}
536

537 538 539 540 541
	/* If BF - BF order is honored, i.e. trx already holding
	record lock should be ordered before this new lock request
	we can keep trx waiting for the lock. If conflicting
	transaction is already aborting or rolling back for replaying
	we can also let new transaction waiting. */
Marko Mäkelä's avatar
Marko Mäkelä committed
542 543
	if (wsrep_thd_order_before(lock_trx->mysql_thd, trx->mysql_thd)
	    || wsrep_thd_is_aborting(lock_trx->mysql_thd)) {
544 545 546
		return;
	}

547 548 549
	mtr_t mtr;

	ib::error() << "Conflicting lock on table: "
550
		    << lock->index->table->name
551
		    << " index: "
552
		    << lock->index->name()
553
		    << " that has lock ";
554
	lock_rec_print(stderr, lock, mtr);
555 556 557

	ib::error() << "WSREP state: ";

558 559 560 561
	wsrep_report_bf_lock_wait(trx->mysql_thd,
				  trx->id);
	wsrep_report_bf_lock_wait(lock_trx->mysql_thd,
				  lock_trx->id);
562 563 564
	/* BF-BF wait is a bug */
	ut_error;
}
565
# endif /* UNIV_DEBUG */
566

567
/** check if lock timeout was for priority thread,
568
as a side effect trigger lock monitor
569 570 571
@param trx    transaction owning the lock
@return false for regular lock timeout */
ATTRIBUTE_NOINLINE static bool wsrep_is_BF_lock_timeout(const trx_t &trx)
572
{
573
  ut_ad(trx.is_wsrep());
574

575 576 577 578 579 580 581
  if (trx.error_state == DB_DEADLOCK || !srv_monitor_timer ||
      !wsrep_thd_is_BF(trx.mysql_thd, false))
    return false;

  ib::info() << "WSREP: BF lock wait long for trx:" << ib::hex(trx.id)
             << " query: " << wsrep_thd_query(trx.mysql_thd);
  return true;
582
}
583 584
#endif /* WITH_WSREP */

585 586
/*********************************************************************//**
Checks if a lock request for a new lock has to wait for request lock2.
587
@return TRUE if new lock has to wait for lock2 to be removed */
588
UNIV_INLINE
589
bool
590 591
lock_rec_has_to_wait(
/*=================*/
592 593
	bool		for_locking,
				/*!< in is called locking or releasing */
594
	const trx_t*	trx,	/*!< in: trx of new lock */
595
	unsigned	type_mode,/*!< in: precise mode of the new lock
596 597 598 599 600 601 602
				to set: LOCK_S or LOCK_X, possibly
				ORed to LOCK_GAP or LOCK_REC_NOT_GAP,
				LOCK_INSERT_INTENTION */
	const lock_t*	lock2,	/*!< in: another record lock; NOTE that
				it is assumed that this has a lock bit
				set on the same record as in the new
				lock we are setting */
603 604
	bool		lock_is_on_supremum)
				/*!< in: TRUE if we are setting the
605 606 607 608
				lock on the 'supremum' record of an
				index page: we know then that the lock
				request is really for a 'gap' type lock */
{
609 610
	ut_ad(trx);
	ut_ad(!lock2->is_table());
Marko Mäkelä's avatar
Marko Mäkelä committed
611 612
	ut_d(lock_sys.hash_get(type_mode).assert_locked(
		     lock2->un_member.rec_lock.page_id));
613

614 615 616
	if (trx == lock2->trx
	    || lock_mode_compatible(
		       static_cast<lock_mode>(LOCK_MODE_MASK & type_mode),
617
		       lock2->mode())) {
Eugene Kosov's avatar
Eugene Kosov committed
618
		return false;
619
	}
620

621 622
	/* We have somewhat complex rules when gap type record locks
	cause waits */
623

624 625
	if ((lock_is_on_supremum || (type_mode & LOCK_GAP))
	    && !(type_mode & LOCK_INSERT_INTENTION)) {
626

627 628 629 630
		/* Gap type locks without LOCK_INSERT_INTENTION flag
		do not need to wait for anything. This is because
		different users can have conflicting lock types
		on gaps. */
631

Eugene Kosov's avatar
Eugene Kosov committed
632
		return false;
633
	}
634

635
	if (!(type_mode & LOCK_INSERT_INTENTION) && lock2->is_gap()) {
636

637 638
		/* Record lock (LOCK_ORDINARY or LOCK_REC_NOT_GAP
		does not need to wait for a gap type lock */
639

Eugene Kosov's avatar
Eugene Kosov committed
640
		return false;
641
	}
642

643
	if ((type_mode & LOCK_GAP) && lock2->is_record_not_gap()) {
644

645 646
		/* Lock on gap does not need to wait for
		a LOCK_REC_NOT_GAP type lock */
647

Eugene Kosov's avatar
Eugene Kosov committed
648
		return false;
649
	}
650

651
	if (lock2->is_insert_intention()) {
652 653 654 655 656 657 658
		/* No lock request needs to wait for an insert
		intention lock to be removed. This is ok since our
		rules allow conflicting locks on gaps. This eliminates
		a spurious deadlock caused by a next-key lock waiting
		for an insert intention lock; when the insert
		intention lock was granted, the insert deadlocked on
		the waiting next-key lock.
659

660 661
		Also, insert intention locks do not disturb each
		other. */
662

Eugene Kosov's avatar
Eugene Kosov committed
663
		return false;
664
	}
665

666
#ifdef HAVE_REPLICATION
667
	if ((type_mode & LOCK_GAP || lock2->is_gap())
668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688
	    && !thd_need_ordering_with(trx->mysql_thd, lock2->trx->mysql_thd)) {
		/* If the upper server layer has already decided on the
		commit order between the transaction requesting the
		lock and the transaction owning the lock, we do not
		need to wait for gap locks. Such ordeering by the upper
		server layer happens in parallel replication, where the
		commit order is fixed to match the original order on the
		master.

		Such gap locks are mainly needed to get serialisability
		between transactions so that they will be binlogged in
		the correct order so that statement-based replication
		will give the correct results. Since the right order
		was already determined on the master, we do not need
		to enforce it again here.

		Skipping the locks is not essential for correctness,
		since in case of deadlock we will just kill the later
		transaction and retry it. But it can save some
		unnecessary rollbacks and retries. */

Eugene Kosov's avatar
Eugene Kosov committed
689
		return false;
690
	}
691
#endif /* HAVE_REPLICATION */
692

693
#ifdef WITH_WSREP
694 695 696 697 698
		/* New lock request from a transaction is using unique key
		scan and this transaction is a wsrep high priority transaction
		(brute force). If conflicting transaction is also wsrep high
		priority transaction we should avoid lock conflict because
		ordering of these transactions is already decided and
Marko Mäkelä's avatar
Marko Mäkelä committed
699
		conflicting transaction will be later replayed. */
700
		if (trx->is_wsrep_UK_scan()
701
		    && wsrep_thd_is_BF(lock2->trx->mysql_thd, false)) {
702
			return false;
703 704
		}

705 706 707 708 709
		/* We very well can let bf to wait normally as other
		BF will be replayed in case of conflict. For debug
		builds we will do additional sanity checks to catch
		unsupported bf wait if any. */
		ut_d(wsrep_assert_no_bf_bf_wait(lock2, trx));
710
#endif /* WITH_WSREP */
711

Eugene Kosov's avatar
Eugene Kosov committed
712
	return true;
713 714 715 716
}

/*********************************************************************//**
Checks if a lock request lock1 has to wait for request lock2.
717
@return TRUE if lock1 has to wait for lock2 to be removed */
Eugene Kosov's avatar
Eugene Kosov committed
718
bool
719 720 721 722 723 724 725 726 727 728
lock_has_to_wait(
/*=============*/
	const lock_t*	lock1,	/*!< in: waiting lock */
	const lock_t*	lock2)	/*!< in: another lock; NOTE that it is
				assumed that this has a lock bit set
				on the same record as in lock1 if the
				locks are record locks */
{
	ut_ad(lock1 && lock2);

Eugene Kosov's avatar
Eugene Kosov committed
729
	if (lock1->trx == lock2->trx
730
	    || lock_mode_compatible(lock1->mode(), lock2->mode())) {
Eugene Kosov's avatar
Eugene Kosov committed
731
		return false;
Eugene Kosov's avatar
Eugene Kosov committed
732
	}
733

734
	if (lock1->is_table()) {
Eugene Kosov's avatar
Eugene Kosov committed
735
		return true;
736 737
	}

738
	ut_ad(!lock2->is_table());
Eugene Kosov's avatar
Eugene Kosov committed
739 740

	if (lock1->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)) {
Eugene Kosov's avatar
Eugene Kosov committed
741
		return lock_prdt_has_to_wait(lock1->trx, lock1->type_mode,
Eugene Kosov's avatar
Eugene Kosov committed
742
					     lock_get_prdt_from_lock(lock1),
Eugene Kosov's avatar
Eugene Kosov committed
743
					     lock2);
Eugene Kosov's avatar
Eugene Kosov committed
744 745
	}

Eugene Kosov's avatar
Eugene Kosov committed
746 747 748
	return lock_rec_has_to_wait(
		false, lock1->trx, lock1->type_mode, lock2,
		lock_rec_get_nth_bit(lock1, PAGE_HEAP_NO_SUPREMUM));
749 750 751 752 753 754 755 756 757 758 759 760 761 762
}

/*============== RECORD LOCK BASIC FUNCTIONS ============================*/

/**********************************************************************//**
Looks for a set bit in a record lock bitmap. Returns ULINT_UNDEFINED,
if none found.
@return bit index == heap number of the record, or ULINT_UNDEFINED if
none found */
ulint
lock_rec_find_set_bit(
/*==================*/
	const lock_t*	lock)	/*!< in: record lock with at least one bit set */
{
763
	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {
764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785

		if (lock_rec_get_nth_bit(lock, i)) {

			return(i);
		}
	}

	return(ULINT_UNDEFINED);
}

/*********************************************************************//**
Resets the record lock bitmap to zero. NOTE: does not touch the wait_lock
pointer in the transaction! This function is used in lock object creation
and resetting. */
static
void
lock_rec_bitmap_reset(
/*==================*/
	lock_t*	lock)	/*!< in: record lock */
{
	ulint	n_bytes;

786
	ut_ad(!lock->is_table());
787 788 789 790 791 792 793 794

	/* Reset to zero the bitmap which resides immediately after the lock
	struct */

	n_bytes = lock_rec_get_n_bits(lock) / 8;

	ut_ad((lock_rec_get_n_bits(lock) % 8) == 0);

795
	memset(reinterpret_cast<void*>(&lock[1]), 0, n_bytes);
796 797 798 799
}

/*********************************************************************//**
Copies a record lock to heap.
800
@return copy of lock */
801 802 803 804 805 806 807 808 809
static
lock_t*
lock_rec_copy(
/*==========*/
	const lock_t*	lock,	/*!< in: record lock */
	mem_heap_t*	heap)	/*!< in: memory heap */
{
	ulint	size;

810
	ut_ad(!lock->is_table());
811 812 813 814 815 816 817 818

	size = sizeof(lock_t) + lock_rec_get_n_bits(lock) / 8;

	return(static_cast<lock_t*>(mem_heap_dup(heap, lock, size)));
}

/*********************************************************************//**
Gets the previous record lock set on a record.
819
@return previous lock on the same record, NULL if none exists */
820 821 822 823 824 825
const lock_t*
lock_rec_get_prev(
/*==============*/
	const lock_t*	in_lock,/*!< in: record lock */
	ulint		heap_no)/*!< in: heap number of the record */
{
826 827 828
  ut_ad(!in_lock->is_table());
  const page_id_t id{in_lock->un_member.rec_lock.page_id};
  hash_cell_t *cell= lock_sys.hash_get(in_lock->type_mode).cell_get(id.fold());
829

830 831 832 833
  for (lock_t *lock= lock_sys_t::get_first(*cell, id); lock != in_lock;
       lock= lock_rec_get_next_on_page(lock))
    if (lock_rec_get_nth_bit(lock, heap_no))
      return lock;
834

835
  return nullptr;
836 837 838 839 840 841 842
}

/*============= FUNCTIONS FOR ANALYZING RECORD LOCK QUEUE ================*/

/*********************************************************************//**
Checks if a transaction has a GRANTED explicit lock on rec stronger or equal
to precise_mode.
843
@return lock or NULL */
844 845 846 847 848 849 850 851 852
UNIV_INLINE
lock_t*
lock_rec_has_expl(
/*==============*/
	ulint			precise_mode,/*!< in: LOCK_S or LOCK_X
					possibly ORed to LOCK_GAP or
					LOCK_REC_NOT_GAP, for a
					supremum record we regard this
					always a gap type request */
853
	const hash_cell_t&	cell,	/*!< in: lock hash table cell */
854
	const page_id_t		id,	/*!< in: page identifier */
855 856 857
	ulint			heap_no,/*!< in: heap number of the record */
	const trx_t*		trx)	/*!< in: transaction */
{
858 859 860
  ut_ad((precise_mode & LOCK_MODE_MASK) == LOCK_S
	|| (precise_mode & LOCK_MODE_MASK) == LOCK_X);
  ut_ad(!(precise_mode & LOCK_INSERT_INTENTION));
861

862
  for (lock_t *lock= lock_sys_t::get_first(cell, id, heap_no); lock;
863
       lock= lock_rec_get_next(heap_no, lock))
864 865 866 867 868 869 870 871
    if (lock->trx == trx &&
	!(lock->type_mode & (LOCK_WAIT | LOCK_INSERT_INTENTION)) &&
	(!((LOCK_REC_NOT_GAP | LOCK_GAP) & lock->type_mode) ||
	 heap_no == PAGE_HEAP_NO_SUPREMUM ||
	 ((LOCK_REC_NOT_GAP | LOCK_GAP) & precise_mode & lock->type_mode)) &&
	lock_mode_stronger_or_eq(lock->mode(), static_cast<lock_mode>
				 (precise_mode & LOCK_MODE_MASK)))
      return lock;
872

873
  return nullptr;
874 875 876 877 878
}

#ifdef UNIV_DEBUG
/*********************************************************************//**
Checks if some other transaction has a lock request in the queue.
879
@return lock or NULL */
880
static
881
lock_t*
882 883
lock_rec_other_has_expl_req(
/*========================*/
884
	lock_mode		mode,	/*!< in: LOCK_S or LOCK_X */
885
	const hash_cell_t&	cell,	/*!< in: lock hash table cell */
886
	const page_id_t		id,	/*!< in: page identifier */
887 888
	bool			wait,	/*!< in: whether also waiting locks
					are taken into account */
889 890 891 892 893 894 895
	ulint			heap_no,/*!< in: heap number of the record */
	const trx_t*		trx)	/*!< in: transaction, or NULL if
					requests by all transactions
					are taken into account */
{
	ut_ad(mode == LOCK_X || mode == LOCK_S);

896 897 898 899 900 901
	/* Only GAP lock can be on SUPREMUM, and we are not looking for
	GAP lock */
	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
		return(NULL);
	}

902
	for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no);
903
	     lock; lock = lock_rec_get_next(heap_no, lock)) {
904
		if (lock->trx != trx
905 906 907
		    && !lock->is_gap()
		    && (!lock->is_waiting() || wait)
		    && lock_mode_stronger_or_eq(lock->mode(), mode)) {
908 909 910 911 912 913 914 915 916

			return(lock);
		}
	}

	return(NULL);
}
#endif /* UNIV_DEBUG */

917
#ifdef WITH_WSREP
918
void lock_wait_wsrep_kill(trx_t *bf_trx, ulong thd_id, trx_id_t trx_id);
919

920 921
/** Kill the holders of conflicting locks.
@param trx   brute-force applier transaction running in the current thread */
922 923
ATTRIBUTE_COLD ATTRIBUTE_NOINLINE
static void lock_wait_wsrep(trx_t *trx)
924
{
925
  DBUG_ASSERT(wsrep_on(trx->mysql_thd));
926 927
  if (!wsrep_thd_is_BF(trx->mysql_thd, false))
    return;
Marko Mäkelä's avatar
Marko Mäkelä committed
928

929
  std::set<trx_t*> victims;
930

931 932 933 934 935
  lock_sys.wr_lock(SRW_LOCK_CALL);
  mysql_mutex_lock(&lock_sys.wait_mutex);

  const lock_t *wait_lock= trx->lock.wait_lock;
  if (!wait_lock)
936 937
  {
func_exit:
938 939 940
    lock_sys.wr_unlock();
    mysql_mutex_unlock(&lock_sys.wait_mutex);
    return;
941
  }
942

943 944 945 946 947
  if (wait_lock->is_table())
  {
    dict_table_t *table= wait_lock->un_member.tab_lock.table;
    for (lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock;
         lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
948 949 950 951 952
      /* if victim has also BF status, but has earlier seqno, we have to wait */
      if (lock->trx != trx &&
          !(wsrep_thd_is_BF(lock->trx->mysql_thd, false) &&
            wsrep_thd_order_before(lock->trx->mysql_thd, trx->mysql_thd)))
      {
953
        victims.emplace(lock->trx);
954
      }
955
  }
956
  else
957
  {
958 959 960 961 962 963 964 965 966 967
    const page_id_t id{wait_lock->un_member.rec_lock.page_id};
    hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                         ? lock_sys.prdt_hash : lock_sys.rec_hash).cell_get
      (id.fold());
    if (lock_t *lock= lock_sys_t::get_first(cell, id))
    {
      const ulint heap_no= lock_rec_find_set_bit(wait_lock);
      if (!lock_rec_get_nth_bit(lock, heap_no))
        lock= lock_rec_get_next(heap_no, lock);
      do
968 969 970 971 972
        /* if victim has also BF status, but has earlier seqno, we have to wait */
        if (lock->trx != trx &&
            !(wsrep_thd_is_BF(lock->trx->mysql_thd, false) &&
              wsrep_thd_order_before(lock->trx->mysql_thd, trx->mysql_thd)))
        {
973
          victims.emplace(lock->trx);
974
        }
975 976
      while ((lock= lock_rec_get_next(heap_no, lock)));
    }
977
  }
978

979 980
  if (victims.empty())
    goto func_exit;
981

982 983 984 985
  std::vector<std::pair<ulong,trx_id_t>> victim_id;
  for (trx_t *v : victims)
    victim_id.emplace_back(std::pair<ulong,trx_id_t>
                           {thd_get_thread_id(v->mysql_thd), v->id});
986

987 988 989 990 991 992 993 994
  DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort",
                  {
                    const char act[]=
                      "now SIGNAL sync.before_wsrep_thd_abort_reached "
                      "WAIT_FOR signal.before_wsrep_thd_abort";
                    DBUG_ASSERT(!debug_sync_set_action(trx->mysql_thd,
                                                       STRING_WITH_LEN(act)));
                  };);
995

996 997
  lock_sys.wr_unlock();
  mysql_mutex_unlock(&lock_sys.wait_mutex);
998

999 1000
  for (const auto &v : victim_id)
    lock_wait_wsrep_kill(trx, v.first, v.second);
1001
}
1002 1003
#endif /* WITH_WSREP */

1004 1005 1006
/*********************************************************************//**
Checks if some other transaction has a conflicting explicit lock request
in the queue, so that we have to wait.
1007
@return lock or NULL */
1008
static
1009
lock_t*
1010 1011
lock_rec_other_has_conflicting(
/*===========================*/
1012
	unsigned		mode,	/*!< in: LOCK_S or LOCK_X,
1013 1014 1015
					possibly ORed to LOCK_GAP or
					LOC_REC_NOT_GAP,
					LOCK_INSERT_INTENTION */
1016
	const hash_cell_t&	cell,	/*!< in: lock hash table cell */
1017
	const page_id_t		id,	/*!< in: page identifier */
1018 1019 1020
	ulint			heap_no,/*!< in: heap number of the record */
	const trx_t*		trx)	/*!< in: our transaction */
{
1021
	bool	is_supremum = (heap_no == PAGE_HEAP_NO_SUPREMUM);
1022

1023
	for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no);
1024
	     lock; lock = lock_rec_get_next(heap_no, lock)) {
1025
		if (lock_rec_has_to_wait(true, trx, mode, lock, is_supremum)) {
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035
			return(lock);
		}
	}

	return(NULL);
}

/*********************************************************************//**
Checks if some transaction has an implicit x-lock on a record in a secondary
index.
1036
@return transaction id of the transaction which has the x-lock, or 0;
1037 1038 1039 1040
NOTE that this function can return false positives but never false
negatives. The caller must confirm all positive results by calling
trx_is_active(). */
static
1041
trx_t*
1042 1043
lock_sec_rec_some_has_impl(
/*=======================*/
1044
	trx_t*		caller_trx,/*!<in/out: trx of current thread */
1045 1046
	const rec_t*	rec,	/*!< in: user record */
	dict_index_t*	index,	/*!< in: secondary index */
1047
	const rec_offs*	offsets)/*!< in: rec_get_offsets(rec, index) */
1048
{
1049
	trx_t*		trx;
1050 1051 1052
	trx_id_t	max_trx_id;
	const page_t*	page = page_align(rec);

1053
	lock_sys.assert_unlocked();
1054 1055 1056
	ut_ad(!dict_index_is_clust(index));
	ut_ad(page_rec_is_user_rec(rec));
	ut_ad(rec_offs_validate(rec, index, offsets));
1057
	ut_ad(!rec_is_metadata(rec, *index));
1058 1059 1060 1061 1062

	max_trx_id = page_get_max_trx_id(page);

	/* Some transaction may have an implicit x-lock on the record only
	if the max trx id for the page >= min trx id for the trx list, or
Marko Mäkelä's avatar
Marko Mäkelä committed
1063
	database recovery is running. */
1064

1065
	if (max_trx_id < trx_sys.get_min_trx_id()) {
1066

1067
		trx = 0;
1068 1069 1070 1071

	} else if (!lock_check_trx_id_sanity(max_trx_id, rec, index, offsets)) {

		/* The page is corrupt: try to avoid a crash by returning 0 */
1072
		trx = 0;
1073 1074 1075 1076 1077

	/* In this case it is possible that some transaction has an implicit
	x-lock. We have to look in the clustered index. */

	} else {
1078
		trx = row_vers_impl_x_locked(caller_trx, rec, index, offsets);
1079 1080
	}

1081
	return(trx);
1082 1083
}

1084 1085
/*********************************************************************//**
Return the number of table locks for a transaction.
1086
The caller must be holding lock_sys.latch. */
1087 1088 1089 1090
ulint
lock_number_of_tables_locked(
/*=========================*/
	const trx_lock_t*	trx_lock)	/*!< in: transaction locks */
1091 1092
{
	const lock_t*	lock;
1093
	ulint		n_tables = 0;
1094

1095
	lock_sys.assert_locked();
1096 1097 1098 1099 1100

	for (lock = UT_LIST_GET_FIRST(trx_lock->trx_locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {

1101
		if (lock->is_table()) {
1102
			n_tables++;
1103 1104 1105
		}
	}

1106
	return(n_tables);
1107 1108 1109 1110
}

/*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/

1111 1112 1113 1114
/** Reset the wait status of a lock.
@param[in,out]	lock	lock that was possibly being waited for */
static void lock_reset_lock_and_trx_wait(lock_t *lock)
{
1115 1116 1117
  lock_sys.assert_locked(*lock);
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  trx_t *trx= lock->trx;
1118
  ut_ad(lock->is_waiting());
1119
  ut_ad(!trx->lock.wait_lock || trx->lock.wait_lock == lock);
1120 1121
  if (trx_t *wait_trx= trx->lock.wait_trx)
    Deadlock::to_check.erase(wait_trx);
1122
  trx->lock.wait_lock= nullptr;
1123
  trx->lock.wait_trx= nullptr;
1124 1125 1126
  lock->type_mode&= ~LOCK_WAIT;
}

1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
#ifdef UNIV_DEBUG
/** Check transaction state */
static void check_trx_state(const trx_t *trx)
{
  ut_ad(!trx->auto_commit || trx->will_lock);
  const auto state= trx->state;
  ut_ad(state == TRX_STATE_ACTIVE ||
        state == TRX_STATE_PREPARED_RECOVERED ||
        state == TRX_STATE_PREPARED ||
        state == TRX_STATE_COMMITTED_IN_MEMORY);
}
#endif

1140 1141
/** Create a new record lock and inserts it to the lock queue,
without checking for deadlocks or conflicts.
1142
@param[in]	c_lock		conflicting lock
1143
@param[in]	type_mode	lock mode and wait flag
1144
@param[in]	page_id		index page number
1145 1146 1147 1148 1149 1150
@param[in]	page		R-tree index page, or NULL
@param[in]	heap_no		record heap number in the index page
@param[in]	index		the index tree
@param[in,out]	trx		transaction
@param[in]	holds_trx_mutex	whether the caller holds trx->mutex
@return created lock */
1151
lock_t*
1152
lock_rec_create_low(
1153
	lock_t*		c_lock,
1154
	unsigned	type_mode,
1155
	const page_id_t	page_id,
1156 1157
	const page_t*	page,
	ulint		heap_no,
1158
	dict_index_t*	index,
1159 1160
	trx_t*		trx,
	bool		holds_trx_mutex)
1161
{
1162 1163
	lock_t*		lock;
	ulint		n_bytes;
1164

1165
	ut_d(lock_sys.hash_get(type_mode).assert_locked(page_id));
1166
	ut_ad(xtest() || holds_trx_mutex == trx->mutex_is_owner());
1167
	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1168
	ut_ad(!(type_mode & LOCK_TABLE));
1169
	ut_ad(trx->state != TRX_STATE_NOT_STARTED);
Marko Mäkelä's avatar
Marko Mäkelä committed
1170
	ut_ad(!trx->is_autocommit_non_locking());
1171

1172 1173 1174
	/* If rec is the supremum record, then we reset the gap and
	LOCK_REC_NOT_GAP bits, as all locks on the supremum are
	automatically of the gap type */
1175

1176 1177 1178 1179
	if (UNIV_UNLIKELY(heap_no == PAGE_HEAP_NO_SUPREMUM)) {
		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));
		type_mode = type_mode & ~(LOCK_GAP | LOCK_REC_NOT_GAP);
	}
1180

1181
	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
Marko Mäkelä's avatar
Marko Mäkelä committed
1182
		n_bytes = (page_dir_get_n_heap(page) + 7) / 8;
1183
	} else {
1184
		ut_ad(heap_no == PRDT_HEAPNO);
1185

1186 1187 1188 1189
		/* The lock is always on PAGE_HEAP_NO_INFIMUM (0), so
		we only need 1 bit (which round up to 1 byte) for
		lock bit setting */
		n_bytes = 1;
1190

1191 1192
		if (type_mode & LOCK_PREDICATE) {
			ulint	tmp = UNIV_WORD_SIZE - 1;
1193

1194 1195 1196 1197 1198 1199 1200 1201
			/* We will attach predicate structure after lock.
			Make sure the memory is aligned on 8 bytes,
			the mem_heap_alloc will align it with
			MEM_SPACE_NEEDED anyway. */
			n_bytes = (n_bytes + sizeof(lock_prdt_t) + tmp) & ~tmp;
			ut_ad(n_bytes == sizeof(lock_prdt_t) + UNIV_WORD_SIZE);
		}
	}
1202

1203 1204 1205 1206 1207 1208
	if (!holds_trx_mutex) {
		trx->mutex_lock();
	}
	ut_ad(trx->mutex_is_owner());
	ut_ad(trx->state != TRX_STATE_NOT_STARTED);

1209 1210
	if (trx->lock.rec_cached >= UT_ARR_SIZE(trx->lock.rec_pool)
	    || sizeof *lock + n_bytes > sizeof *trx->lock.rec_pool) {
1211 1212 1213
		lock = static_cast<lock_t*>(
			mem_heap_alloc(trx->lock.lock_heap,
				       sizeof *lock + n_bytes));
1214
	} else {
1215
		lock = &trx->lock.rec_pool[trx->lock.rec_cached++].lock;
1216
	}
1217 1218

	lock->trx = trx;
1219
	lock->type_mode = type_mode;
1220
	lock->index = index;
1221
	lock->un_member.rec_lock.page_id = page_id;
1222

1223 1224 1225 1226 1227 1228 1229 1230 1231
	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
		lock->un_member.rec_lock.n_bits = uint32_t(n_bytes * 8);
	} else {
		/* Predicate lock always on INFIMUM (0) */
		lock->un_member.rec_lock.n_bits = 8;
 	}
	lock_rec_bitmap_reset(lock);
	lock_rec_set_nth_bit(lock, heap_no);
	index->table->n_rec_locks++;
1232
	ut_ad(index->table->get_ref_count() || !index->table->can_be_evicted);
1233

1234 1235
	const auto lock_hash = &lock_sys.hash_get(type_mode);
	HASH_INSERT(lock_t, hash, lock_hash, page_id.fold(), lock);
1236

1237
	if (type_mode & LOCK_WAIT) {
1238 1239 1240 1241 1242 1243 1244 1245 1246
		if (trx->lock.wait_trx) {
			ut_ad(!c_lock || trx->lock.wait_trx == c_lock->trx);
			ut_ad(trx->lock.wait_lock);
			ut_ad((*trx->lock.wait_lock).trx == trx);
		} else {
			ut_ad(c_lock);
			trx->lock.wait_trx = c_lock->trx;
			ut_ad(!trx->lock.wait_lock);
		}
1247
		trx->lock.wait_lock = lock;
1248 1249 1250
	}
	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
	if (!holds_trx_mutex) {
1251
		trx->mutex_unlock();
1252
	}
1253
	MONITOR_INC(MONITOR_RECLOCK_CREATED);
1254
	MONITOR_INC(MONITOR_NUM_RECLOCK);
1255

1256
	return lock;
1257 1258
}

1259 1260
/** Enqueue a waiting request for a lock which cannot be granted immediately.
Check for deadlocks.
1261
@param[in]	c_lock		conflicting lock
1262 1263 1264 1265 1266 1267 1268
@param[in]	type_mode	the requested lock mode (LOCK_S or LOCK_X)
				possibly ORed with LOCK_GAP or
				LOCK_REC_NOT_GAP, ORed with
				LOCK_INSERT_INTENTION if this
				waiting lock request is set
				when performing an insert of
				an index record
1269 1270
@param[in]	id		page identifier
@param[in]	page		leaf page in the index
1271 1272 1273 1274 1275
@param[in]	heap_no		record heap number in the block
@param[in]	index		index tree
@param[in,out]	thr		query thread
@param[in]	prdt		minimum bounding box (spatial index)
@retval	DB_LOCK_WAIT		if the waiting lock was enqueued
1276
@retval	DB_DEADLOCK		if this transaction was chosen as the victim */
1277 1278
dberr_t
lock_rec_enqueue_waiting(
1279
	lock_t*			c_lock,
1280
	unsigned		type_mode,
1281 1282
	const page_id_t		id,
	const page_t*		page,
1283 1284 1285 1286
	ulint			heap_no,
	dict_index_t*		index,
	que_thr_t*		thr,
	lock_prdt_t*		prdt)
1287
{
1288
	ut_d(lock_sys.hash_get(type_mode).assert_locked(id));
1289 1290
	ut_ad(!srv_read_only_mode);
	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
1291

1292
	trx_t* trx = thr_get_trx(thr);
1293
	ut_ad(xtest() || trx->mutex_is_owner());
1294
	ut_ad(!trx->dict_operation_lock_mode);
1295

1296
	if (trx->mysql_thd && thd_lock_wait_timeout(trx->mysql_thd) == 0) {
1297 1298
		trx->error_state = DB_LOCK_WAIT_TIMEOUT;
		return DB_LOCK_WAIT_TIMEOUT;
1299 1300
	}

1301 1302
	/* Enqueue the lock request that will wait to be granted, note that
	we already own the trx mutex. */
1303
	lock_t* lock = lock_rec_create_low(
1304
		c_lock,
1305
		type_mode | LOCK_WAIT, id, page, heap_no, index, trx, true);
1306

1307 1308
	if (prdt && type_mode & LOCK_PREDICATE) {
		lock_prdt_set_prdt(lock, prdt);
1309
	}
1310

1311
	trx->lock.wait_thr = thr;
1312
	trx->lock.clear_deadlock_victim();
1313

1314 1315 1316
	DBUG_LOG("ib_lock", "trx " << ib::hex(trx->id)
		 << " waits for lock in index " << index->name
		 << " of table " << index->table->name);
1317

1318
	MONITOR_INC(MONITOR_LOCKREC_WAIT);
1319

1320
	return DB_LOCK_WAIT;
1321 1322
}

1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335
/*********************************************************************//**
Looks for a suitable type record lock struct by the same trx on the same page.
This can be used to save space when a new record lock should be set on a page:
no new struct is needed, if a suitable old is found.
@return lock or NULL */
static inline
lock_t*
lock_rec_find_similar_on_page(
	ulint           type_mode,      /*!< in: lock type_mode field */
	ulint           heap_no,        /*!< in: heap number of the record */
	lock_t*         lock,           /*!< in: lock_sys.get_first() */
	const trx_t*    trx)            /*!< in: transaction */
{
1336
	lock_sys.rec_hash.assert_locked(lock->un_member.rec_lock.page_id);
1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352

	for (/* No op */;
	     lock != NULL;
	     lock = lock_rec_get_next_on_page(lock)) {

		if (lock->trx == trx
		    && lock->type_mode == type_mode
		    && lock_rec_get_n_bits(lock) > heap_no) {

			return(lock);
		}
	}

	return(NULL);
}

1353 1354 1355 1356 1357 1358 1359 1360
/*********************************************************************//**
Adds a record lock request in the record queue. The request is normally
added as the last in the queue, but if there are no waiting lock requests
on the record, and the request to be added is not a waiting request, we
can reuse a suitable record lock object already existing on the same page,
just setting the appropriate bit in its bitmap. This is a low-level function
which does NOT check for deadlocks or lock compatibility!
@return lock where the bit was set */
1361
TRANSACTIONAL_TARGET
1362 1363 1364 1365
static
void
lock_rec_add_to_queue(
/*==================*/
1366
	unsigned		type_mode,/*!< in: lock mode, wait, gap
1367
					etc. flags */
1368
	hash_cell_t&		cell,	/*!< in,out: first hash table cell */
1369 1370
	const page_id_t		id,	/*!< in: page identifier */
	const page_t*		page,	/*!< in: buffer block containing
1371 1372 1373 1374 1375 1376 1377 1378
					the record */
	ulint			heap_no,/*!< in: heap number of the record */
	dict_index_t*		index,	/*!< in: index of record */
	trx_t*			trx,	/*!< in/out: transaction */
	bool			caller_owns_trx_mutex)
					/*!< in: TRUE if caller owns the
					transaction mutex */
{
1379
	ut_d(lock_sys.hash_get(type_mode).assert_locked(id));
1380
	ut_ad(xtest() || caller_owns_trx_mutex == trx->mutex_is_owner());
1381
	ut_ad(index->is_primary()
1382
	      || dict_index_get_online_status(index) != ONLINE_INDEX_CREATION);
1383
	ut_ad(!(type_mode & LOCK_TABLE));
1384
#ifdef UNIV_DEBUG
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
	switch (type_mode & LOCK_MODE_MASK) {
	case LOCK_X:
	case LOCK_S:
		break;
	default:
		ut_error;
	}

	if (!(type_mode & (LOCK_WAIT | LOCK_GAP))) {
		lock_mode	mode = (type_mode & LOCK_MODE_MASK) == LOCK_S
			? LOCK_X
			: LOCK_S;
		const lock_t*	other_lock
			= lock_rec_other_has_expl_req(
1399
				mode, cell, id, false, heap_no, trx);
1400
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
1401
		if (UNIV_LIKELY_NULL(other_lock) && trx->is_wsrep()) {
1402 1403 1404 1405 1406 1407 1408 1409 1410 1411
			/* Only BF transaction may be granted lock
			before other conflicting lock request. */
			if (!wsrep_thd_is_BF(trx->mysql_thd, FALSE)
			    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
				/* If it is not BF, this case is a bug. */
				wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
				ut_error;
			}
		} else
1412
#endif /* WITH_WSREP */
1413
		ut_ad(!other_lock);
1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430
	}
#endif /* UNIV_DEBUG */

	/* If rec is the supremum record, then we can reset the gap bit, as
	all locks on the supremum are automatically of the gap type, and we
	try to avoid unnecessary memory consumption of a new record lock
	struct for a gap type lock */

	if (heap_no == PAGE_HEAP_NO_SUPREMUM) {
		ut_ad(!(type_mode & LOCK_REC_NOT_GAP));

		/* There should never be LOCK_REC_NOT_GAP on a supremum
		record, but let us play safe */

		type_mode &= ~(LOCK_GAP | LOCK_REC_NOT_GAP);
	}

1431 1432
	if (type_mode & LOCK_WAIT) {
		goto create;
1433
	} else if (lock_t *first_lock = lock_sys_t::get_first(cell, id)) {
1434 1435 1436 1437 1438 1439 1440 1441
		for (lock_t* lock = first_lock;;) {
			if (lock->is_waiting()
			    && lock_rec_get_nth_bit(lock, heap_no)) {
				goto create;
			}
			if (!(lock = lock_rec_get_next_on_page(lock))) {
				break;
			}
1442 1443 1444 1445 1446
		}

		/* Look for a similar record lock on the same page:
		if one is found and there are no waiting lock requests,
		we can just set the bit */
1447 1448
		if (lock_t* lock = lock_rec_find_similar_on_page(
			    type_mode, heap_no, first_lock, trx)) {
1449 1450 1451 1452
			trx_t* lock_trx = lock->trx;
			if (caller_owns_trx_mutex) {
				trx->mutex_unlock();
			}
1453 1454 1455 1456 1457
			{
				TMTrxGuard tg{*lock_trx};
				lock_rec_set_nth_bit(lock, heap_no);
			}

1458 1459 1460
			if (caller_owns_trx_mutex) {
				trx->mutex_lock();
			}
1461 1462 1463 1464
			return;
		}
	}

Marko Mäkelä's avatar
Marko Mäkelä committed
1465
create:
1466 1467 1468 1469 1470 1471 1472
	/* Note: We will not pass any conflicting lock to lock_rec_create(),
	because we should be moving an existing waiting lock request. */
	ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx);

	lock_rec_create_low(nullptr,
			    type_mode, id, page, heap_no, index, trx,
			    caller_owns_trx_mutex);
1473
}
1474 1475 1476 1477 1478 1479 1480

/*********************************************************************//**
Tries to lock the specified record in the mode requested. If not immediately
possible, enqueues a waiting lock request. This is a low-level function
which does NOT look at implicit locks! Checks lock compatibility within
explicit locks. This function sets a normal next-key lock, or in the case
of a page supremum record, a gap type lock.
1481
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
1482 1483 1484 1485
static
dberr_t
lock_rec_lock(
/*==========*/
1486
	bool			impl,	/*!< in: if true, no lock is set
1487 1488 1489
					if no wait is necessary: we
					assume that the caller will
					set an implicit lock */
1490
	unsigned		mode,	/*!< in: lock mode: LOCK_X or
1491 1492 1493 1494 1495 1496 1497 1498
					LOCK_S possibly ORed to either
					LOCK_GAP or LOCK_REC_NOT_GAP */
	const buf_block_t*	block,	/*!< in: buffer block containing
					the record */
	ulint			heap_no,/*!< in: heap number of record */
	dict_index_t*		index,	/*!< in: index of record */
	que_thr_t*		thr)	/*!< in: query thread */
{
1499 1500 1501
  trx_t *trx= thr_get_trx(thr);

  ut_ad(!srv_read_only_mode);
1502 1503 1504
  ut_ad(((LOCK_MODE_MASK | LOCK_TABLE) & mode) == LOCK_S ||
        ((LOCK_MODE_MASK | LOCK_TABLE) & mode) == LOCK_X);
  ut_ad(~mode & (LOCK_GAP | LOCK_REC_NOT_GAP));
1505 1506 1507 1508 1509 1510 1511 1512
  ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
  DBUG_EXECUTE_IF("innodb_report_deadlock", return DB_DEADLOCK;);

  ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ||
        lock_table_has(trx, index->table, LOCK_IS));
  ut_ad((LOCK_MODE_MASK & mode) != LOCK_X ||
         lock_table_has(trx, index->table, LOCK_IX));

1513
  if (lock_table_has(trx, index->table,
1514 1515 1516
                     static_cast<lock_mode>(LOCK_MODE_MASK & mode)))
    return DB_SUCCESS;

1517 1518 1519 1520 1521 1522
  /* During CREATE TABLE, we will write to newly created FTS_*_CONFIG
  on which no lock has been created yet. */
  ut_ad(!trx->dict_operation_lock_mode ||
        (strstr(index->table->name.m_name, "/FTS_") &&
         strstr(index->table->name.m_name, "_CONFIG") + sizeof("_CONFIG") ==
         index->table->name.m_name + strlen(index->table->name.m_name) + 1));
1523 1524
  MONITOR_ATOMIC_INC(MONITOR_NUM_RECLOCK_REQ);
  const page_id_t id{block->page.id()};
1525
  LockGuard g{lock_sys.rec_hash, id};
1526

1527
  if (lock_t *lock= lock_sys_t::get_first(g.cell(), id))
1528
  {
1529
    dberr_t err= DB_SUCCESS;
1530
    trx->mutex_lock();
1531 1532
    if (lock_rec_get_next_on_page(lock) ||
        lock->trx != trx ||
1533
        lock->type_mode != mode ||
1534 1535
        lock_rec_get_n_bits(lock) <= heap_no)
    {
1536
      /* Do nothing if the trx already has a strong enough lock on rec */
1537
      if (!lock_rec_has_expl(mode, g.cell(), id, heap_no, trx))
1538
      {
1539
        if (lock_t *c_lock= lock_rec_other_has_conflicting(mode, g.cell(), id,
1540
                                                           heap_no, trx))
1541 1542 1543 1544
          /*
            If another transaction has a non-gap conflicting
            request in the queue, as this transaction does not
            have a lock strong enough already granted on the
1545 1546
            record, we have to wait.
          */
1547 1548
          err= lock_rec_enqueue_waiting(c_lock, mode, id, block->page.frame,
                                        heap_no, index, thr, nullptr);
1549 1550 1551
        else if (!impl)
        {
          /* Set the requested lock on the record. */
1552
          lock_rec_add_to_queue(mode, g.cell(), id, block->page.frame, heap_no,
1553
                                index, trx, true);
1554 1555 1556
          err= DB_SUCCESS_LOCKED_REC;
        }
      }
1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569
    }
    else if (!impl)
    {
      /*
        If the nth bit of the record lock is already set then we do not set
        a new lock bit, otherwise we do set
      */
      if (!lock_rec_get_nth_bit(lock, heap_no))
      {
        lock_rec_set_nth_bit(lock, heap_no);
        err= DB_SUCCESS_LOCKED_REC;
      }
    }
1570
    trx->mutex_unlock();
1571
    return err;
1572
  }
1573

1574 1575 1576 1577 1578 1579
  /* Simplified and faster path for the most common cases */
  if (!impl)
    lock_rec_create_low(nullptr, mode, id, block->page.frame, heap_no, index,
                        trx, false);

  return DB_SUCCESS_LOCKED_REC;
1580 1581 1582 1583
}

/*********************************************************************//**
Checks if a waiting record lock request still has to wait in a queue.
1584
@return lock that is causing the wait */
1585 1586
static
const lock_t*
1587
lock_rec_has_to_wait_in_queue(const hash_cell_t &cell, const lock_t *wait_lock)
1588 1589 1590 1591 1592 1593
{
	const lock_t*	lock;
	ulint		heap_no;
	ulint		bit_mask;
	ulint		bit_offset;

1594
	ut_ad(wait_lock->is_waiting());
1595
	ut_ad(!wait_lock->is_table());
1596 1597 1598 1599

	heap_no = lock_rec_find_set_bit(wait_lock);

	bit_offset = heap_no / 8;
1600
	bit_mask = static_cast<ulint>(1) << (heap_no % 8);
1601

1602 1603
	for (lock = lock_sys_t::get_first(
		     cell, wait_lock->un_member.rec_lock.page_id);
1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617
	     lock != wait_lock;
	     lock = lock_rec_get_next_on_page_const(lock)) {
		const byte*	p = (const byte*) &lock[1];

		if (heap_no < lock_rec_get_n_bits(lock)
		    && (p[bit_offset] & bit_mask)
		    && lock_has_to_wait(wait_lock, lock)) {
			return(lock);
		}
	}

	return(NULL);
}

1618 1619 1620 1621
/** Note that a record lock wait started */
inline void lock_sys_t::wait_start()
{
  mysql_mutex_assert_owner(&wait_mutex);
1622 1623 1624 1625 1626
  wait_count+= WAIT_COUNT_STEP + 1;
  /* The maximum number of concurrently waiting transactions is one less
  than the maximum number of concurrent transactions. */
  static_assert(WAIT_COUNT_STEP == UNIV_PAGE_SIZE_MAX / 16 * TRX_SYS_N_RSEGS,
                "compatibility");
1627 1628 1629 1630 1631 1632 1633
}

/** Note that a record lock wait resumed */
inline
void lock_sys_t::wait_resume(THD *thd, my_hrtime_t start, my_hrtime_t now)
{
  mysql_mutex_assert_owner(&wait_mutex);
1634 1635 1636
  ut_ad(get_wait_pending());
  ut_ad(get_wait_cumulative());
  wait_count--;
1637 1638
  if (now.val >= start.val)
  {
1639 1640
    const uint32_t diff_time=
      static_cast<uint32_t>((now.val - start.val) / 1000);
1641 1642 1643 1644 1645 1646 1647 1648 1649
    wait_time+= diff_time;

    if (diff_time > wait_time_max)
      wait_time_max= diff_time;

    thd_storage_lock_wait(thd, diff_time);
  }
}

1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664
#ifdef HAVE_REPLICATION
ATTRIBUTE_NOINLINE MY_ATTRIBUTE((nonnull))
/** Report lock waits to parallel replication.
@param trx       transaction that may be waiting for a lock
@param wait_lock lock that is being waited for */
static void lock_wait_rpl_report(trx_t *trx)
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  ut_ad(trx->state == TRX_STATE_ACTIVE);
  THD *const thd= trx->mysql_thd;
  ut_ad(thd);
  const lock_t *wait_lock= trx->lock.wait_lock;
  if (!wait_lock)
    return;
  ut_ad(!(wait_lock->type_mode & LOCK_AUTO_INC));
1665 1666
  /* This would likely be too large to attempt to use a memory transaction,
  even for wait_lock->is_table(). */
1667 1668 1669 1670 1671
  if (!lock_sys.wr_lock_try())
  {
    mysql_mutex_unlock(&lock_sys.wait_mutex);
    lock_sys.wr_lock(SRW_LOCK_CALL);
    mysql_mutex_lock(&lock_sys.wait_mutex);
1672 1673 1674
    wait_lock= trx->lock.wait_lock;
    if (!wait_lock)
    {
1675
func_exit:
1676 1677 1678
      lock_sys.wr_unlock();
      return;
    }
1679
    ut_ad(wait_lock->is_waiting());
1680
  }
1681 1682
  else if (!wait_lock->is_waiting())
    goto func_exit;
1683 1684 1685 1686
  ut_ad(!(wait_lock->type_mode & LOCK_AUTO_INC));

  if (wait_lock->is_table())
  {
1687 1688 1689 1690 1691
    dict_table_t *table= wait_lock->un_member.tab_lock.table;
    for (lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock;
         lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
      if (!(lock->type_mode & LOCK_AUTO_INC) && lock->trx != trx)
        thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
1692
  }
1693
  else
1694
  {
1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708
    const page_id_t id{wait_lock->un_member.rec_lock.page_id};
    hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                         ? lock_sys.prdt_hash : lock_sys.rec_hash).cell_get
      (id.fold());
    if (lock_t *lock= lock_sys_t::get_first(cell, id))
    {
      const ulint heap_no= lock_rec_find_set_bit(wait_lock);
      if (!lock_rec_get_nth_bit(lock, heap_no))
        lock= lock_rec_get_next(heap_no, lock);
      do
        if (lock->trx->mysql_thd != thd)
          thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
      while ((lock= lock_rec_get_next(heap_no, lock)));
    }
1709 1710 1711 1712 1713 1714
  }

  goto func_exit;
}
#endif /* HAVE_REPLICATION */

1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730
/** Wait for a lock to be released.
@retval DB_DEADLOCK if this transaction was chosen as the deadlock victim
@retval DB_INTERRUPTED if the execution was interrupted by the user
@retval DB_LOCK_WAIT_TIMEOUT if the lock wait timed out
@retval DB_SUCCESS if the lock was granted */
dberr_t lock_wait(que_thr_t *thr)
{
  trx_t *trx= thr_get_trx(thr);

  if (trx->mysql_thd)
    DEBUG_SYNC_C("lock_wait_suspend_thread_enter");

  /* InnoDB system transactions may use the global value of
  innodb_lock_wait_timeout, because trx->mysql_thd == NULL. */
  const ulong innodb_lock_wait_timeout= trx_lock_wait_timeout_get(trx);
  const my_hrtime_t suspend_time= my_hrtime_coarse();
1731
  ut_ad(!trx->dict_operation_lock_mode);
1732

1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750
  /* The wait_lock can be cleared by another thread in lock_grant(),
  lock_rec_cancel(), or lock_cancel_waiting_and_release(). But, a wait
  can only be initiated by the current thread which owns the transaction.

  Even if trx->lock.wait_lock were changed, the object that it used to
  point to it will remain valid memory (remain allocated from
  trx->lock.lock_heap). If trx->lock.wait_lock was set to nullptr, the
  original object could be transformed to a granted lock. On a page
  split or merge, we would change trx->lock.wait_lock to point to
  another waiting lock request object, and the old object would be
  logically discarded.

  In any case, it is safe to read the memory that wait_lock points to,
  even though we are not holding any mutex. We are only reading
  wait_lock->type_mode & (LOCK_TABLE | LOCK_AUTO_INC), which will be
  unaffected by any page split or merge operation. (Furthermore,
  table lock objects will never be cloned or moved.) */
  const lock_t *const wait_lock= trx->lock.wait_lock;
1751

1752
  if (!wait_lock)
1753 1754
  {
    /* The lock has already been released or this transaction
1755 1756
    was chosen as a deadlock victim: no need to wait */
    if (trx->lock.was_chosen_as_deadlock_victim.fetch_and(byte(~1)))
1757
      trx->error_state= DB_DEADLOCK;
1758 1759
    else
      trx->error_state= DB_SUCCESS;
1760 1761 1762 1763 1764 1765

    return trx->error_state;
  }

  trx->lock.suspend_time= suspend_time;

1766
  ut_ad(!trx->dict_operation_lock_mode);
1767

1768
  IF_WSREP(if (trx->is_wsrep()) lock_wait_wsrep(trx),);
1769

1770
  const auto type_mode= wait_lock->type_mode;
1771
#ifdef HAVE_REPLICATION
1772 1773 1774 1775 1776 1777 1778 1779
  /* Even though lock_wait_rpl_report() has nothing to do with
  deadlock detection, it was always disabled by innodb_deadlock_detect=OFF.
  We will keep it in that way, because unfortunately
  thd_need_wait_reports() will hold even if parallel (or any) replication
  is not being used. We want to be allow the user to skip
  lock_wait_rpl_report(). */
  const bool rpl= !(type_mode & LOCK_AUTO_INC) && trx->mysql_thd &&
    innodb_deadlock_detect && thd_need_wait_reports(trx->mysql_thd);
1780
#endif
1781 1782 1783 1784
  const bool row_lock_wait= thr->lock_state == QUE_THR_LOCK_ROW;
  timespec abstime;
  set_timespec_time_nsec(abstime, suspend_time.val * 1000);
  abstime.MY_tv_sec+= innodb_lock_wait_timeout;
1785 1786 1787 1788 1789 1790 1791 1792
  /* Dictionary transactions must wait be immune to lock wait timeouts
  for locks on data dictionary tables. Here we check only for
  SYS_TABLES, SYS_COLUMNS, SYS_INDEXES, SYS_FIELDS. Locks on further
  tables SYS_FOREIGN, SYS_FOREIGN_COLS, SYS_VIRTUAL will only be
  acquired while holding an exclusive lock on one of the 4 tables. */
  const bool no_timeout= innodb_lock_wait_timeout >= 100000000 ||
    ((type_mode & LOCK_TABLE) &&
     wait_lock->un_member.tab_lock.table->id <= DICT_FIELDS_ID);
1793
  thd_wait_begin(trx->mysql_thd, (type_mode & LOCK_TABLE)
1794
                 ? THD_WAIT_TABLE_LOCK : THD_WAIT_ROW_LOCK);
1795
  dberr_t error_state= DB_SUCCESS;
1796

1797 1798 1799
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (trx->lock.wait_lock)
  {
1800 1801
    if (Deadlock::check_and_resolve(trx))
    {
1802 1803
      ut_ad(!trx->lock.wait_lock);
      error_state= DB_DEADLOCK;
1804
      goto end_wait;
1805
    }
1806 1807 1808 1809 1810 1811
  }
  else
    goto end_wait;

  if (row_lock_wait)
    lock_sys.wait_start();
1812 1813

#ifdef HAVE_REPLICATION
1814 1815
  if (rpl)
    lock_wait_rpl_report(trx);
1816
#endif
1817

1818 1819 1820 1821 1822 1823 1824
  trx->error_state= DB_SUCCESS;

  while (trx->lock.wait_lock)
  {
    int err;

    if (no_timeout)
1825
    {
1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838
      my_cond_wait(&trx->lock.cond, &lock_sys.wait_mutex.m_mutex);
      err= 0;
    }
    else
      err= my_cond_timedwait(&trx->lock.cond, &lock_sys.wait_mutex.m_mutex,
                             &abstime);
    error_state= trx->error_state;
    switch (error_state) {
    case DB_DEADLOCK:
    case DB_INTERRUPTED:
      break;
    default:
      ut_ad(error_state != DB_LOCK_WAIT_TIMEOUT);
1839 1840 1841 1842
      /* Dictionary transactions must ignore KILL, because they could
      be executed as part of a multi-transaction DDL operation,
      such as rollback_inplace_alter_table() or ha_innobase::delete_table(). */
      if (!trx->dict_operation && trx_is_interrupted(trx))
1843 1844 1845 1846 1847 1848
        /* innobase_kill_query() can only set trx->error_state=DB_INTERRUPTED
        for any transaction that is attached to a connection. */
        error_state= DB_INTERRUPTED;
      else if (!err)
        continue;
#ifdef WITH_WSREP
1849
      else if (trx->is_wsrep() && wsrep_is_BF_lock_timeout(*trx));
1850
#endif
1851
      else
1852 1853 1854
      {
        error_state= DB_LOCK_WAIT_TIMEOUT;
        lock_sys.timeouts++;
1855 1856
      }
    }
1857
    break;
1858 1859 1860 1861 1862
  }

  if (row_lock_wait)
    lock_sys.wait_resume(trx->mysql_thd, suspend_time, my_hrtime_coarse());

1863
  if (lock_t *lock= trx->lock.wait_lock)
1864
  {
1865
    lock_sys_t::cancel<false>(trx, lock);
1866
    lock_sys.deadlock_check();
1867 1868
  }

1869
end_wait:
1870 1871 1872
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  thd_wait_end(trx->mysql_thd);

1873 1874
  trx->error_state= error_state;
  return error_state;
1875
}
1876

1877

1878 1879 1880 1881
/** Resume a lock wait */
static void lock_wait_end(trx_t *trx)
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
1882
  ut_ad(trx->mutex_is_owner());
1883 1884
  ut_d(const auto state= trx->state);
  ut_ad(state == TRX_STATE_ACTIVE || state == TRX_STATE_PREPARED);
1885
  ut_ad(trx->lock.wait_thr);
1886

1887
  if (trx->lock.was_chosen_as_deadlock_victim.fetch_and(byte(~1)))
1888 1889
  {
    ut_ad(state == TRX_STATE_ACTIVE);
1890
    trx->error_state= DB_DEADLOCK;
1891
  }
1892

1893
  trx->lock.wait_thr= nullptr;
1894
  pthread_cond_signal(&trx->lock.cond);
1895
}
1896

1897 1898 1899 1900 1901
/** Grant a waiting lock request and release the waiting transaction. */
static void lock_grant(lock_t *lock)
{
  lock_reset_lock_and_trx_wait(lock);
  trx_t *trx= lock->trx;
1902
  trx->mutex_lock();
1903 1904 1905 1906 1907 1908 1909
  if (lock->mode() == LOCK_AUTO_INC)
  {
    dict_table_t *table= lock->un_member.tab_lock.table;
    ut_ad(!table->autoinc_trx);
    table->autoinc_trx= trx;
    ib_vector_push(trx->autoinc_locks, &lock);
  }
1910

1911
  DBUG_PRINT("ib_lock", ("wait for trx " TRX_ID_FMT " ends", trx->id));
1912

1913 1914
  /* If we are resolving a deadlock by choosing another transaction as
  a victim, then our original transaction may not be waiting anymore */
1915

1916 1917
  if (trx->lock.wait_thr)
    lock_wait_end(trx);
1918

1919
  trx->mutex_unlock();
1920 1921 1922 1923 1924 1925
}

/*************************************************************//**
Cancels a waiting record lock request and releases the waiting transaction
that requested it. NOTE: does NOT check if waiting lock requests behind this
one can now be granted! */
1926
static void lock_rec_cancel(lock_t *lock)
1927
{
1928 1929 1930
  trx_t *trx= lock->trx;
  mysql_mutex_lock(&lock_sys.wait_mutex);
  trx->mutex_lock();
1931

1932 1933
  ut_d(lock_sys.hash_get(lock->type_mode).
       assert_locked(lock->un_member.rec_lock.page_id));
1934 1935
  /* Reset the bit (there can be only one set bit) in the lock bitmap */
  lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
1936

1937 1938
  /* Reset the wait flag and the back pointer to lock in trx */
  lock_reset_lock_and_trx_wait(lock);
1939

1940 1941 1942 1943
  /* The following releases the trx from lock wait */
  lock_wait_end(trx);
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  trx->mutex_unlock();
1944 1945
}

1946 1947 1948
/** Remove a record lock request, waiting or granted, from the queue and
grant locks to other transactions in the queue if they now are entitled
to a lock. NOTE: all record locks contained in in_lock are removed.
1949 1950 1951
@param[in,out]	in_lock		record lock
@param[in]	owns_wait_mutex	whether lock_sys.wait_mutex is held */
static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex)
1952
{
1953 1954 1955
#ifdef SAFE_MUTEX
	ut_ad(owns_wait_mutex == mysql_mutex_is_owner(&lock_sys.wait_mutex));
#endif /* SAFE_MUTEX */
1956
	ut_ad(!in_lock->is_table());
1957

1958
	const page_id_t page_id{in_lock->un_member.rec_lock.page_id};
1959 1960
	auto& lock_hash = lock_sys.hash_get(in_lock->type_mode);
	ut_ad(lock_sys.is_writer() || in_lock->trx->mutex_is_owner());
1961

1962
	ut_d(auto old_n_locks=)
1963
	in_lock->index->table->n_rec_locks--;
1964
	ut_ad(old_n_locks);
1965

1966
	const ulint rec_fold = page_id.fold();
1967 1968
	hash_cell_t &cell = *lock_hash.cell_get(rec_fold);
	lock_sys.assert_locked(cell);
1969

1970 1971
	HASH_DELETE(lock_t, hash, &lock_hash, rec_fold, in_lock);
	ut_ad(lock_sys.is_writer() || in_lock->trx->mutex_is_owner());
1972
	UT_LIST_REMOVE(in_lock->trx->lock.trx_locks, in_lock);
1973 1974 1975

	MONITOR_INC(MONITOR_RECLOCK_REMOVED);
	MONITOR_DEC(MONITOR_NUM_RECLOCK);
1976

1977 1978
	bool acquired = false;

1979 1980 1981
	/* Check if waiting locks in the queue can now be granted:
	grant locks if there are no conflicting locks ahead. Stop at
	the first X lock that is waiting or has been granted. */
1982

1983
	for (lock_t* lock = lock_sys_t::get_first(cell, page_id);
1984 1985
	     lock != NULL;
	     lock = lock_rec_get_next_on_page(lock)) {
1986

1987
		if (!lock->is_waiting()) {
1988 1989
			continue;
		}
1990

1991 1992 1993 1994
		if (!owns_wait_mutex) {
			mysql_mutex_lock(&lock_sys.wait_mutex);
			acquired = owns_wait_mutex = true;
		}
1995 1996 1997 1998

		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

1999 2000
		if (const lock_t* c = lock_rec_has_to_wait_in_queue(
			    cell, lock)) {
2001 2002 2003 2004 2005 2006 2007 2008
			trx_t* c_trx = c->trx;
			lock->trx->lock.wait_trx = c_trx;
			if (c_trx->lock.wait_trx
			    && innodb_deadlock_detect
			    && Deadlock::to_check.emplace(c_trx).second) {
				Deadlock::to_be_checked = true;
			}
		} else {
2009 2010 2011
			/* Grant the lock */
			ut_ad(lock->trx != in_lock->trx);
			lock_grant(lock);
2012
		}
2013
	}
2014 2015 2016 2017

	if (acquired) {
		mysql_mutex_unlock(&lock_sys.wait_mutex);
	}
2018 2019
}

2020 2021 2022
/** Remove a record lock request, waiting or granted, on a discarded page
@param hash     hash table
@param in_lock  lock object */
2023
TRANSACTIONAL_TARGET
2024
void lock_rec_discard(lock_sys_t::hash_table &lock_hash, lock_t *in_lock)
2025
{
2026 2027
  ut_ad(!in_lock->is_table());
  lock_hash.assert_locked(in_lock->un_member.rec_lock.page_id);
2028

2029 2030
  HASH_DELETE(lock_t, hash, &lock_hash,
              in_lock->un_member.rec_lock.page_id.fold(), in_lock);
2031 2032 2033 2034 2035 2036 2037 2038
  ut_d(uint32_t old_locks);
  {
    trx_t *trx= in_lock->trx;
    TMTrxGuard tg{*trx};
    ut_d(old_locks=)
    in_lock->index->table->n_rec_locks--;
    UT_LIST_REMOVE(trx->lock.trx_locks, in_lock);
  }
2039 2040 2041
  ut_ad(old_locks);
  MONITOR_INC(MONITOR_RECLOCK_REMOVED);
  MONITOR_DEC(MONITOR_NUM_RECLOCK);
2042 2043 2044 2045 2046 2047
}

/*************************************************************//**
Removes record lock objects set on an index page which is discarded. This
function does not move locks, or check for waiting locks, therefore the
lock bitmaps must already be reset when this function is called. */
2048
static void
2049
lock_rec_free_all_from_discard_page(page_id_t id, const hash_cell_t &cell,
2050
                                    lock_sys_t::hash_table &lock_hash)
2051
{
2052
  for (lock_t *lock= lock_sys_t::get_first(cell, id); lock; )
2053
  {
2054 2055
    ut_ad(&lock_hash != &lock_sys.rec_hash ||
          lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
2056
    ut_ad(!lock->is_waiting());
2057
    lock_t *next_lock= lock_rec_get_next_on_page(lock);
2058
    lock_rec_discard(lock_hash, lock);
2059 2060
    lock= next_lock;
  }
2061 2062
}

2063 2064 2065 2066
/** Discard locks for an index when purging DELETE FROM SYS_INDEXES
after an aborted CREATE INDEX operation.
@param index   a stale index on which ADD INDEX operation was aborted */
ATTRIBUTE_COLD void lock_discard_for_index(const dict_index_t &index)
2067 2068
{
  ut_ad(!index.is_committed());
2069 2070 2071
  /* This is very rarely executed code, and the size of the hash array
  would exceed the maximum size of a memory transaction. */
  LockMutexGuard g{SRW_LOCK_CALL};
2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090
  const ulint n= lock_sys.rec_hash.pad(lock_sys.rec_hash.n_cells);
  for (ulint i= 0; i < n; i++)
  {
    for (lock_t *lock= static_cast<lock_t*>(lock_sys.rec_hash.array[i].node);
         lock; )
    {
      ut_ad(!lock->is_table());
      if (lock->index == &index)
      {
        ut_ad(!lock->is_waiting());
        lock_rec_discard(lock_sys.rec_hash, lock);
        lock= static_cast<lock_t*>(lock_sys.rec_hash.array[i].node);
      }
      else
        lock= lock->hash;
    }
  }
}

2091 2092 2093 2094 2095
/*============= RECORD LOCK MOVING AND INHERITING ===================*/

/*************************************************************//**
Resets the lock bits for a single record. Releases transactions waiting for
lock requests here. */
2096
TRANSACTIONAL_TARGET
2097 2098
static
void
2099 2100
lock_rec_reset_and_release_wait(const hash_cell_t &cell, const page_id_t id,
                                ulint heap_no)
2101
{
2102
  for (lock_t *lock= lock_sys.get_first(cell, id, heap_no); lock;
2103
       lock= lock_rec_get_next(heap_no, lock))
2104
  {
2105 2106 2107
    if (lock->is_waiting())
      lock_rec_cancel(lock);
    else
2108
    {
2109
      TMTrxGuard tg{*lock->trx};
2110
      lock_rec_reset_nth_bit(lock, heap_no);
2111 2112
    }
  }
2113 2114
}

2115 2116 2117 2118 2119 2120 2121 2122 2123
/*************************************************************//**
Makes a record to inherit the locks (except LOCK_INSERT_INTENTION type)
of another record as gap type locks, but does not reset the lock bits of
the other record. Also waiting lock requests on rec are inherited as
GRANTED gap locks. */
static
void
lock_rec_inherit_to_gap(
/*====================*/
2124
	hash_cell_t&		heir_cell,	/*!< heir hash table cell */
2125
	const page_id_t		heir,		/*!< in: page containing the
2126
						record which inherits */
2127 2128
	const hash_cell_t&	donor_cell,	/*!< donor hash table cell */
	const page_id_t		donor,		/*!< in: page containing the
2129 2130 2131
						record from which inherited;
						does NOT reset the locks on
						this record */
2132
	const page_t*		heir_page,	/*!< in: heir page frame */
2133 2134 2135 2136 2137
	ulint			heir_heap_no,	/*!< in: heap_no of the
						inheriting record */
	ulint			heap_no)	/*!< in: heap_no of the
						donating record */
{
2138 2139
	/* At READ UNCOMMITTED or READ COMMITTED isolation level,
	we do not want locks set
2140
	by an UPDATE or a DELETE to be inherited as gap type locks. But we
Sergei Golubchik's avatar
Sergei Golubchik committed
2141
	DO want S-locks/X-locks(taken for replace) set by a consistency
2142
	constraint to be inherited also then. */
2143

2144
	for (lock_t* lock= lock_sys_t::get_first(donor_cell, donor, heap_no);
2145
	     lock;
2146
	     lock = lock_rec_get_next(heap_no, lock)) {
2147
		trx_t* lock_trx = lock->trx;
2148
		if (!lock->is_insert_intention()
2149
		    && (lock_trx->isolation_level > TRX_ISO_READ_COMMITTED
2150
			|| lock->mode() !=
2151
			(lock_trx->duplicates ? LOCK_S : LOCK_X))) {
2152
			lock_rec_add_to_queue(LOCK_GAP | lock->mode(),
2153
					      heir_cell, heir, heir_page,
2154
					      heir_heap_no,
2155
					      lock->index, lock_trx, false);
2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175
		}
	}
}

/*************************************************************//**
Makes a record to inherit the gap locks (except LOCK_INSERT_INTENTION type)
of another record as gap type locks, but does not reset the lock bits of the
other record. Also waiting lock requests are inherited as GRANTED gap locks. */
static
void
lock_rec_inherit_to_gap_if_gap_lock(
/*================================*/
	const buf_block_t*	block,		/*!< in: buffer block */
	ulint			heir_heap_no,	/*!< in: heap_no of
						record which inherits */
	ulint			heap_no)	/*!< in: heap_no of record
						from which inherited;
						does NOT reset the locks
						on this record */
{
2176
  const page_id_t id{block->page.id()};
2177
  LockGuard g{lock_sys.rec_hash, id};
2178

2179
  for (lock_t *lock= lock_sys_t::get_first(g.cell(), id, heap_no); lock;
2180
       lock= lock_rec_get_next(heap_no, lock))
2181 2182 2183
     if (!lock->is_insert_intention() && (heap_no == PAGE_HEAP_NO_SUPREMUM ||
                                          !lock->is_record_not_gap()) &&
         !lock_table_has(lock->trx, lock->index->table, LOCK_X))
2184
       lock_rec_add_to_queue(LOCK_GAP | lock->mode(),
2185
                             g.cell(), id, block->page.frame,
2186
                             heir_heap_no, lock->index, lock->trx, false);
2187 2188 2189 2190 2191
}

/*************************************************************//**
Moves the locks of a record to another record and resets the lock bits of
the donating record. */
2192
TRANSACTIONAL_TARGET
2193
static
2194
void
2195 2196
lock_rec_move(
	hash_cell_t&		receiver_cell,	/*!< in: hash table cell */
2197
	const buf_block_t&	receiver,	/*!< in: buffer block containing
2198
						the receiving record */
2199 2200
	const page_id_t		receiver_id,	/*!< in: page identifier */
	const hash_cell_t&	donator_cell,	/*!< in: hash table cell */
2201
	const page_id_t		donator_id,	/*!< in: page identifier of
2202 2203 2204 2205 2206 2207 2208 2209
						the donating record */
	ulint			receiver_heap_no,/*!< in: heap_no of the record
						which gets the locks; there
						must be no lock requests
						on it! */
	ulint			donator_heap_no)/*!< in: heap_no of the record
						which gives the locks */
{
2210 2211
	ut_ad(!lock_sys_t::get_first(receiver_cell,
				     receiver_id, receiver_heap_no));
2212

2213 2214
	for (lock_t *lock = lock_sys_t::get_first(donator_cell, donator_id,
						  donator_heap_no);
2215 2216
	     lock != NULL;
	     lock = lock_rec_get_next(donator_heap_no, lock)) {
2217
		const auto type_mode = lock->type_mode;
2218
		if (type_mode & LOCK_WAIT) {
2219 2220
			ut_ad(lock->trx->lock.wait_lock == lock);
			lock->type_mode &= ~LOCK_WAIT;
2221 2222
		}

2223 2224 2225 2226
		trx_t* lock_trx = lock->trx;
		lock_trx->mutex_lock();
		lock_rec_reset_nth_bit(lock, donator_heap_no);

2227
		/* Note that we FIRST reset the bit, and then set the lock:
2228
		the function works also if donator_id == receiver_id */
2229

2230
		lock_rec_add_to_queue(type_mode, receiver_cell,
2231
				      receiver_id, receiver.page.frame,
2232
				      receiver_heap_no,
2233 2234
				      lock->index, lock_trx, true);
		lock_trx->mutex_unlock();
2235 2236
	}

2237 2238
	ut_ad(!lock_sys_t::get_first(donator_cell, donator_id,
				     donator_heap_no));
2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267
}

/** Move all the granted locks to the front of the given lock list.
All the waiting locks will be at the end of the list.
@param[in,out]	lock_list	the given lock list.  */
static
void
lock_move_granted_locks_to_front(
	UT_LIST_BASE_NODE_T(lock_t)&	lock_list)
{
	lock_t*	lock;

	bool seen_waiting_lock = false;

	for (lock = UT_LIST_GET_FIRST(lock_list); lock;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {

		if (!seen_waiting_lock) {
			if (lock->is_waiting()) {
				seen_waiting_lock = true;
			}
			continue;
		}

		ut_ad(seen_waiting_lock);

		if (!lock->is_waiting()) {
			lock_t* prev = UT_LIST_GET_PREV(trx_locks, lock);
			ut_a(prev);
Marko Mäkelä's avatar
Marko Mäkelä committed
2268
			ut_list_move_to_front(lock_list, lock);
2269 2270 2271
			lock = prev;
		}
	}
2272 2273 2274 2275 2276 2277 2278
}

/*************************************************************//**
Updates the lock table when we have reorganized a page. NOTE: we copy
also the locks set on the infimum of the page; the infimum may carry
locks if an update of a record is occurring on the page, and its locks
were temporarily stored on the infimum. */
2279
TRANSACTIONAL_TARGET
2280 2281 2282 2283 2284 2285 2286 2287
void
lock_move_reorganize_page(
/*======================*/
	const buf_block_t*	block,	/*!< in: old index page, now
					reorganized */
	const buf_block_t*	oblock)	/*!< in: copy of the old, not
					reorganized page */
{
2288
  mem_heap_t *heap;
2289

2290 2291 2292
  {
    UT_LIST_BASE_NODE_T(lock_t) old_locks;
    UT_LIST_INIT(old_locks, &lock_t::trx_locks);
2293

2294
    const page_id_t id{block->page.id()};
2295
    const auto id_fold= id.fold();
2296
    {
2297
      TMLockGuard g{lock_sys.rec_hash, id};
2298
      if (!lock_sys_t::get_first(g.cell(), id))
2299 2300 2301
        return;
    }

2302 2303 2304
    /* We will modify arbitrary trx->lock.trx_locks.
    Do not bother with a memory transaction; we are going
    to allocate memory and copy a lot of data. */
2305
    LockMutexGuard g{SRW_LOCK_CALL};
2306
    hash_cell_t &cell= *lock_sys.rec_hash.cell_get(id_fold);
2307

2308 2309 2310
    /* Note: Predicate locks for SPATIAL INDEX are not affected by
    page reorganize, because they do not refer to individual record
    heap numbers. */
2311
    lock_t *lock= lock_sys_t::get_first(cell, id);
2312

2313 2314
    if (!lock)
      return;
2315

2316
    heap= mem_heap_create(256);
2317

2318 2319 2320
    /* Copy first all the locks on the page to heap and reset the
    bitmaps in the original locks; chain the copies of the locks
    using the trx_locks field in them. */
2321

2322 2323 2324 2325
    do
    {
      /* Make a copy of the lock */
      lock_t *old_lock= lock_rec_copy(lock, heap);
2326

2327
      UT_LIST_ADD_LAST(old_locks, old_lock);
2328

2329 2330
      /* Reset bitmap of lock */
      lock_rec_bitmap_reset(lock);
2331

2332 2333 2334 2335 2336
      if (lock->is_waiting())
      {
        ut_ad(lock->trx->lock.wait_lock == lock);
        lock->type_mode&= ~LOCK_WAIT;
      }
2337

2338 2339 2340
      lock= lock_rec_get_next_on_page(lock);
    }
    while (lock);
2341

2342 2343
    const ulint comp= page_is_comp(block->page.frame);
    ut_ad(comp == page_is_comp(oblock->page.frame));
2344

2345
    lock_move_granted_locks_to_front(old_locks);
2346

2347 2348
    DBUG_EXECUTE_IF("do_lock_reverse_page_reorganize",
                    ut_list_reverse(old_locks););
2349

2350 2351 2352 2353 2354 2355 2356
    for (lock= UT_LIST_GET_FIRST(old_locks); lock;
         lock= UT_LIST_GET_NEXT(trx_locks, lock))
    {
      /* NOTE: we copy also the locks set on the infimum and
      supremum of the page; the infimum may carry locks if an
      update of a record is occurring on the page, and its locks
      were temporarily stored on the infimum */
2357 2358
      const rec_t *rec1= page_get_infimum_rec(block->page.frame);
      const rec_t *rec2= page_get_infimum_rec(oblock->page.frame);
2359 2360 2361 2362 2363 2364 2365 2366

      /* Set locks according to old locks */
      for (;;)
      {
        ulint old_heap_no;
        ulint new_heap_no;
        ut_d(const rec_t* const orec= rec1);
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
2367

2368 2369 2370 2371
        if (comp)
        {
          old_heap_no= rec_get_heap_no_new(rec2);
          new_heap_no= rec_get_heap_no_new(rec1);
2372

2373 2374 2375 2376 2377 2378 2379 2380
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          old_heap_no= rec_get_heap_no_old(rec2);
          new_heap_no= rec_get_heap_no_old(rec1);
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
2381

2382 2383 2384
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2385

2386
        trx_t *lock_trx= lock->trx;
2387
	lock_trx->mutex_lock();
2388

2389 2390
	/* Clear the bit in old_lock. */
	if (old_heap_no < lock->un_member.rec_lock.n_bits &&
2391 2392 2393
            lock_rec_reset_nth_bit(lock, old_heap_no))
        {
          ut_ad(!page_rec_is_metadata(orec));
2394

2395 2396
          /* NOTE that the old lock bitmap could be too
          small for the new heap number! */
2397
          lock_rec_add_to_queue(lock->type_mode, cell, id, block->page.frame,
2398
                                new_heap_no, lock->index, lock_trx, true);
2399
        }
2400

2401
	lock_trx->mutex_unlock();
2402

2403 2404 2405 2406 2407 2408
        if (new_heap_no == PAGE_HEAP_NO_SUPREMUM)
        {
           ut_ad(old_heap_no == PAGE_HEAP_NO_SUPREMUM);
           break;
        }
      }
2409

2410 2411 2412
      ut_ad(lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
    }
  }
2413

2414
  mem_heap_free(heap);
2415 2416

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2417 2418 2419 2420 2421
  if (fil_space_t *space= fil_space_t::get(id.space()))
  {
    ut_ad(lock_rec_validate_page(block, space->is_latched()));
    space->release();
  }
2422 2423 2424 2425 2426 2427
#endif
}

/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list end is moved to another page. */
2428
TRANSACTIONAL_TARGET
2429 2430 2431 2432 2433 2434 2435 2436
void
lock_move_rec_list_end(
/*===================*/
	const buf_block_t*	new_block,	/*!< in: index page to move to */
	const buf_block_t*	block,		/*!< in: index page */
	const rec_t*		rec)		/*!< in: record on page: this
						is the first record moved */
{
2437
  const ulint comp= page_rec_is_comp(rec);
2438

2439 2440
  ut_ad(block->page.frame == page_align(rec));
  ut_ad(comp == page_is_comp(new_block->page.frame));
2441

2442 2443 2444
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
  {
2445
    /* This would likely be too large for a memory transaction. */
2446
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2447

2448 2449 2450 2451 2452
    /* Note: when we move locks from record to record, waiting locks
    and possible granted gap type locks behind them are enqueued in
    the original order, because new elements are inserted to a hash
    table to the end of the hash chain, and lock_rec_add_to_queue
    does not reuse locks if there are waiters in the queue. */
2453
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2454 2455 2456 2457 2458
         lock= lock_rec_get_next_on_page(lock))
    {
      const rec_t *rec1= rec;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2459

2460 2461 2462 2463
      if (comp)
      {
        if (page_offset(rec1) == PAGE_NEW_INFIMUM)
          rec1= page_rec_get_next_low(rec1, TRUE);
2464 2465
        rec2= page_rec_get_next_low(new_block->page.frame + PAGE_NEW_INFIMUM,
                                    TRUE);
2466 2467 2468 2469 2470
      }
      else
      {
        if (page_offset(rec1) == PAGE_OLD_INFIMUM)
          rec1= page_rec_get_next_low(rec1, FALSE);
2471 2472
        rec2= page_rec_get_next_low(new_block->page.frame + PAGE_OLD_INFIMUM,
                                    FALSE);
2473
      }
2474

2475 2476 2477 2478 2479 2480
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
      for (;;)
      {
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
        ut_d(const rec_t* const orec= rec1);
2481

2482 2483
        ulint rec1_heap_no;
        ulint rec2_heap_no;
2484

2485 2486 2487 2488 2489
        if (comp)
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
          if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM)
            break;
2490

2491 2492 2493 2494 2495 2496 2497
          rec2_heap_no= rec_get_heap_no_new(rec2);
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
2498

2499 2500 2501
          if (rec1_heap_no == PAGE_HEAP_NO_SUPREMUM)
            break;
          rec2_heap_no= rec_get_heap_no_old(rec2);
2502

2503 2504
          ut_ad(rec_get_data_size_old(rec1) == rec_get_data_size_old(rec2));
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec1)));
2505

2506 2507 2508
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2509

2510 2511 2512
        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();

2513 2514 2515 2516
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
        {
          ut_ad(!page_rec_is_metadata(orec));
2517

2518 2519
          if (type_mode & LOCK_WAIT)
          {
2520
            ut_ad(lock_trx->lock.wait_lock == lock);
2521 2522
            lock->type_mode&= ~LOCK_WAIT;
          }
2523

2524 2525
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id,
                                new_block->page.frame,
2526
                                rec2_heap_no, lock->index, lock_trx, true);
2527
        }
2528 2529

        lock_trx->mutex_unlock();
2530 2531 2532
      }
    }
  }
2533 2534

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2535 2536 2537 2538 2539 2540 2541
  if (fil_space_t *space= fil_space_t::get(id.space()))
  {
    const bool is_latched{space->is_latched()};
    ut_ad(lock_rec_validate_page(block, is_latched));
    ut_ad(lock_rec_validate_page(new_block, is_latched));
    space->release();
  }
2542 2543 2544 2545 2546 2547
#endif
}

/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list start is moved to another page. */
2548
TRANSACTIONAL_TARGET
2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563
void
lock_move_rec_list_start(
/*=====================*/
	const buf_block_t*	new_block,	/*!< in: index page to
						move to */
	const buf_block_t*	block,		/*!< in: index page */
	const rec_t*		rec,		/*!< in: record on page:
						this is the first
						record NOT copied */
	const rec_t*		old_end)	/*!< in: old
						previous-to-last
						record on new_page
						before the records
						were copied */
{
2564
  const ulint comp= page_rec_is_comp(rec);
2565

2566 2567 2568
  ut_ad(block->page.frame == page_align(rec));
  ut_ad(comp == page_is_comp(new_block->page.frame));
  ut_ad(new_block->page.frame == page_align(old_end));
2569 2570 2571
  ut_ad(!page_rec_is_metadata(rec));
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
2572

2573
  {
2574
    /* This would likely be too large for a memory transaction. */
2575
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2576

2577
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2578 2579 2580 2581 2582
         lock= lock_rec_get_next_on_page(lock))
    {
      const rec_t *rec1;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2583

2584 2585
      if (comp)
      {
2586 2587
        rec1= page_rec_get_next_low(block->page.frame + PAGE_NEW_INFIMUM,
                                    TRUE);
2588 2589 2590 2591
        rec2= page_rec_get_next_low(old_end, TRUE);
      }
      else
      {
2592 2593
        rec1= page_rec_get_next_low(block->page.frame + PAGE_OLD_INFIMUM,
                                    FALSE);
2594 2595
        rec2= page_rec_get_next_low(old_end, FALSE);
      }
2596

2597 2598
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
2599

2600 2601 2602 2603
      while (rec1 != rec)
      {
        ut_ad(page_rec_is_metadata(rec1) == page_rec_is_metadata(rec2));
        ut_d(const rec_t* const prev= rec1);
2604

2605 2606
        ulint rec1_heap_no;
        ulint rec2_heap_no;
2607

2608 2609 2610 2611
        if (comp)
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
          rec2_heap_no= rec_get_heap_no_new(rec2);
2612

2613 2614 2615 2616 2617 2618 2619
          rec1= page_rec_get_next_low(rec1, TRUE);
          rec2= page_rec_get_next_low(rec2, TRUE);
        }
        else
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
          rec2_heap_no= rec_get_heap_no_old(rec2);
2620

2621
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
2622

2623 2624 2625
          rec1= page_rec_get_next_low(rec1, FALSE);
          rec2= page_rec_get_next_low(rec2, FALSE);
        }
2626

2627 2628 2629
        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();

2630 2631 2632 2633
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
        {
          ut_ad(!page_rec_is_metadata(prev));
2634

2635 2636
          if (type_mode & LOCK_WAIT)
          {
2637
            ut_ad(lock_trx->lock.wait_lock == lock);
2638 2639 2640
            lock->type_mode&= ~LOCK_WAIT;
          }

2641 2642
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id,
                                new_block->page.frame,
2643
                                rec2_heap_no, lock->index, lock_trx, true);
2644
        }
2645 2646

        lock_trx->mutex_unlock();
2647
      }
2648 2649

#ifdef UNIV_DEBUG
2650 2651 2652
      if (page_rec_is_supremum(rec))
        for (auto i= lock_rec_get_n_bits(lock); --i > PAGE_HEAP_NO_USER_LOW; )
          ut_ad(!lock_rec_get_nth_bit(lock, i));
2653
#endif /* UNIV_DEBUG */
2654 2655
    }
  }
2656 2657

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2658
  ut_ad(lock_rec_validate_page(block));
2659 2660 2661
#endif
}

2662 2663 2664
/*************************************************************//**
Moves the explicit locks on user records to another page if a record
list start is moved to another page. */
2665
TRANSACTIONAL_TARGET
2666 2667 2668 2669 2670 2671 2672 2673 2674 2675
void
lock_rtr_move_rec_list(
/*===================*/
	const buf_block_t*	new_block,	/*!< in: index page to
						move to */
	const buf_block_t*	block,		/*!< in: index page */
	rtr_rec_move_t*		rec_move,       /*!< in: recording records
						moved */
	ulint			num_move)       /*!< in: num of rec to move */
{
2676 2677
  if (!num_move)
    return;
2678

2679
  const ulint comp= page_rec_is_comp(rec_move[0].old_rec);
2680

2681 2682
  ut_ad(block->page.frame == page_align(rec_move[0].old_rec));
  ut_ad(new_block->page.frame == page_align(rec_move[0].new_rec));
2683 2684 2685
  ut_ad(comp == page_rec_is_comp(rec_move[0].new_rec));
  const page_id_t id{block->page.id()};
  const page_id_t new_id{new_block->page.id()};
2686

2687
  {
2688
    /* This would likely be too large for a memory transaction. */
2689
    LockMultiGuard g{lock_sys.rec_hash, id, new_id};
2690

2691
    for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
2692
         lock= lock_rec_get_next_on_page(lock))
2693 2694 2695 2696
    {
      const rec_t *rec1;
      const rec_t *rec2;
      const auto type_mode= lock->type_mode;
2697

2698 2699
      /* Copy lock requests on user records to new page and
      reset the lock bits on the old */
2700

2701 2702 2703
      for (ulint moved= 0; moved < num_move; moved++)
      {
        ulint rec1_heap_no;
2704
        ulint rec2_heap_no;
2705

2706 2707 2708 2709
        rec1= rec_move[moved].old_rec;
        rec2= rec_move[moved].new_rec;
        ut_ad(!page_rec_is_metadata(rec1));
        ut_ad(!page_rec_is_metadata(rec2));
2710

2711
        if (comp)
2712 2713
        {
          rec1_heap_no= rec_get_heap_no_new(rec1);
2714 2715 2716
          rec2_heap_no= rec_get_heap_no_new(rec2);
        }
        else
2717 2718
        {
          rec1_heap_no= rec_get_heap_no_old(rec1);
2719
          rec2_heap_no= rec_get_heap_no_old(rec2);
2720

2721 2722 2723 2724 2725
          ut_ad(!memcmp(rec1, rec2, rec_get_data_size_old(rec2)));
        }

        trx_t *lock_trx= lock->trx;
        lock_trx->mutex_lock();
2726

2727 2728
        if (rec1_heap_no < lock->un_member.rec_lock.n_bits &&
            lock_rec_reset_nth_bit(lock, rec1_heap_no))
2729 2730 2731
        {
          if (type_mode & LOCK_WAIT)
          {
2732
            ut_ad(lock_trx->lock.wait_lock == lock);
2733 2734
            lock->type_mode&= ~LOCK_WAIT;
          }
2735

2736 2737
          lock_rec_add_to_queue(type_mode, g.cell2(), new_id,
                                new_block->page.frame,
2738
                                rec2_heap_no, lock->index, lock_trx, true);
2739

2740 2741 2742 2743
          rec_move[moved].moved= true;
        }

        lock_trx->mutex_unlock();
2744 2745 2746
      }
    }
  }
2747 2748

#ifdef UNIV_DEBUG_LOCK_VALIDATE
2749
  ut_ad(lock_rec_validate_page(block));
2750 2751
#endif
}
2752 2753 2754 2755 2756 2757 2758 2759
/*************************************************************//**
Updates the lock table when a page is split to the right. */
void
lock_update_split_right(
/*====================*/
	const buf_block_t*	right_block,	/*!< in: right page */
	const buf_block_t*	left_block)	/*!< in: left page */
{
2760 2761 2762
  const ulint h= lock_get_min_heap_no(right_block);
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
2763

2764
  /* This would likely be too large for a memory transaction. */
2765
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2766

2767 2768
  /* Move the locks on the supremum of the left page to the supremum
  of the right page */
2769

2770 2771
  lock_rec_move(g.cell2(), *right_block, r, g.cell1(), l,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
2772

2773 2774
  /* Inherit the locks to the supremum of left page from the successor
  of the infimum on right page */
2775
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->page.frame,
2776
                          PAGE_HEAP_NO_SUPREMUM, h);
2777 2778
}

2779 2780 2781 2782
#ifdef UNIV_DEBUG
static void lock_assert_no_spatial(const page_id_t id)
{
  const auto id_fold= id.fold();
2783 2784
  auto cell= lock_sys.prdt_page_hash.cell_get(id_fold);
  auto latch= lock_sys_t::hash_table::latch(cell);
2785 2786 2787
  latch->acquire();
  /* there should exist no page lock on the left page,
  otherwise, it will be blocked from merge */
2788
  ut_ad(!lock_sys_t::get_first(*cell, id));
2789
  latch->release();
2790 2791
  cell= lock_sys.prdt_hash.cell_get(id_fold);
  latch= lock_sys_t::hash_table::latch(cell);
2792
  latch->acquire();
2793
  ut_ad(!lock_sys_t::get_first(*cell, id));
2794 2795 2796 2797
  latch->release();
}
#endif

2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812
/*************************************************************//**
Updates the lock table when a page is merged to the right. */
void
lock_update_merge_right(
/*====================*/
	const buf_block_t*	right_block,	/*!< in: right page to
						which merged */
	const rec_t*		orig_succ,	/*!< in: original
						successor of infimum
						on the right page
						before merge */
	const buf_block_t*	left_block)	/*!< in: merged index
						page which will be
						discarded */
{
2813
  ut_ad(!page_rec_is_metadata(orig_succ));
2814

2815 2816
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
2817
  /* This would likely be too large for a memory transaction. */
2818
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2819

2820 2821 2822
  /* Inherit the locks from the supremum of the left page to the
  original successor of infimum on the right page, to which the left
  page was merged */
2823
  lock_rec_inherit_to_gap(g.cell2(), r, g.cell1(), l, right_block->page.frame,
2824 2825
                          page_rec_get_heap_no(orig_succ),
                          PAGE_HEAP_NO_SUPREMUM);
2826

2827 2828
  /* Reset the locks on the supremum of the left page, releasing
  waiting transactions */
2829 2830
  lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(l, g.cell1(), lock_sys.rec_hash);
2831

2832
  ut_d(lock_assert_no_spatial(l));
2833 2834
}

2835 2836
/** Update locks when the root page is copied to another in
btr_root_raise_and_insert(). Note that we leave lock structs on the
2837 2838 2839 2840
root page, even though they do not make sense on other than leaf
pages: the reason is that in a pessimistic update the infimum record
of the root page will act as a dummy carrier of the locks of the record
to be updated. */
2841
void lock_update_root_raise(const buf_block_t &block, const page_id_t root)
2842
{
2843
  const page_id_t id{block.page.id()};
2844
  /* This would likely be too large for a memory transaction. */
2845
  LockMultiGuard g{lock_sys.rec_hash, id, root};
2846
  /* Move the locks on the supremum of the root to the supremum of block */
2847 2848
  lock_rec_move(g.cell1(), block, id, g.cell2(), root,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
2849 2850
}

2851 2852 2853 2854
/** Update the lock table when a page is copied to another.
@param new_block  the target page
@param old        old page (not index root page) */
void lock_update_copy_and_discard(const buf_block_t &new_block, page_id_t old)
2855
{
2856
  const page_id_t id{new_block.page.id()};
2857
  /* This would likely be too large for a memory transaction. */
2858
  LockMultiGuard g{lock_sys.rec_hash, id, old};
2859
  /* Move the locks on the supremum of the old page to the supremum of new */
2860 2861 2862
  lock_rec_move(g.cell1(), new_block, id, g.cell2(), old,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(old, g.cell2(), lock_sys.rec_hash);
2863 2864 2865 2866 2867 2868 2869 2870 2871 2872
}

/*************************************************************//**
Updates the lock table when a page is split to the left. */
void
lock_update_split_left(
/*===================*/
	const buf_block_t*	right_block,	/*!< in: right page */
	const buf_block_t*	left_block)	/*!< in: left page */
{
2873 2874 2875
  ulint h= lock_get_min_heap_no(right_block);
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
2876
  LockMultiGuard g{lock_sys.rec_hash, l, r};
2877 2878
  /* Inherit the locks to the supremum of the left page from the
  successor of the infimum on the right page */
2879
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->page.frame,
2880
                          PAGE_HEAP_NO_SUPREMUM, h);
2881 2882
}

2883 2884 2885 2886 2887 2888
/** Update the lock table when a page is merged to the left.
@param left      left page
@param orig_pred original predecessor of supremum on the left page before merge
@param right     merged, to-be-discarded right page */
void lock_update_merge_left(const buf_block_t& left, const rec_t *orig_pred,
                            const page_id_t right)
2889
{
2890
  ut_ad(left.page.frame == page_align(orig_pred));
2891

2892
  const page_id_t l{left.page.id()};
2893

2894
  /* This would likely be too large for a memory transaction. */
2895
  LockMultiGuard g{lock_sys.rec_hash, l, right};
2896
  const rec_t *left_next_rec= page_rec_get_next_const(orig_pred);
2897

2898 2899 2900 2901
  if (!page_rec_is_supremum(left_next_rec))
  {
    /* Inherit the locks on the supremum of the left page to the
    first record which was moved from the right page */
2902
    lock_rec_inherit_to_gap(g.cell1(), l, g.cell1(), l, left.page.frame,
2903 2904 2905 2906 2907
                            page_rec_get_heap_no(left_next_rec),
                            PAGE_HEAP_NO_SUPREMUM);

    /* Reset the locks on the supremum of the left page,
    releasing waiting transactions */
2908
    lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
2909
  }
2910

2911 2912
  /* Move the locks from the supremum of right page to the supremum
  of the left page */
2913 2914 2915
  lock_rec_move(g.cell1(), left, l, g.cell2(), right,
                PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
  lock_rec_free_all_from_discard_page(right, g.cell2(), lock_sys.rec_hash);
2916

2917 2918
  /* there should exist no page lock on the right page,
  otherwise, it will be blocked from merge */
2919
  ut_d(lock_assert_no_spatial(right));
2920 2921
}

2922 2923 2924 2925 2926 2927
/*************************************************************//**
Resets the original locks on heir and replaces them with gap type locks
inherited from rec. */
void
lock_rec_reset_and_inherit_gap_locks(
/*=================================*/
2928
	const buf_block_t&	heir_block,	/*!< in: block containing the
2929
						record which inherits */
2930
	const page_id_t		donor,		/*!< in: page containing the
2931 2932 2933 2934 2935 2936 2937 2938
						record from which inherited;
						does NOT reset the locks on
						this record */
	ulint			heir_heap_no,	/*!< in: heap_no of the
						inheriting record */
	ulint			heap_no)	/*!< in: heap_no of the
						donating record */
{
2939
  const page_id_t heir{heir_block.page.id()};
2940
  /* This is a rare operation and likely too large for a memory transaction. */
2941
  LockMultiGuard g{lock_sys.rec_hash, heir, donor};
2942
  lock_rec_reset_and_release_wait(g.cell1(), heir, heir_heap_no);
2943 2944
  lock_rec_inherit_to_gap(g.cell1(), heir, g.cell2(), donor,
                          heir_block.page.frame, heir_heap_no, heap_no);
2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958
}

/*************************************************************//**
Updates the lock table when a page is discarded. */
void
lock_update_discard(
/*================*/
	const buf_block_t*	heir_block,	/*!< in: index page
						which will inherit the locks */
	ulint			heir_heap_no,	/*!< in: heap_no of the record
						which will inherit the locks */
	const buf_block_t*	block)		/*!< in: index page
						which will be discarded */
{
2959
	const page_t*	page = block->page.frame;
2960 2961
	const rec_t*	rec;
	ulint		heap_no;
2962
	const page_id_t	heir(heir_block->page.id());
2963
	const page_id_t	page_id(block->page.id());
2964
	/* This would likely be too large for a memory transaction. */
2965
	LockMultiGuard	g{lock_sys.rec_hash, heir, page_id};
2966

2967
	if (lock_sys_t::get_first(g.cell2(), page_id)) {
2968
		ut_d(lock_assert_no_spatial(page_id));
2969 2970
		/* Inherit all the locks on the page to the record and
		reset all the locks on the page */
2971

2972 2973
		if (page_is_comp(page)) {
			rec = page + PAGE_NEW_INFIMUM;
2974

2975 2976
			do {
				heap_no = rec_get_heap_no_new(rec);
2977

2978 2979
				lock_rec_inherit_to_gap(g.cell1(), heir,
							g.cell2(), page_id,
2980
							heir_block->page.frame,
2981
							heir_heap_no, heap_no);
2982

2983
				lock_rec_reset_and_release_wait(
2984
					g.cell2(), page_id, heap_no);
2985

2986 2987 2988 2989
				rec = page + rec_get_next_offs(rec, TRUE);
			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
		} else {
			rec = page + PAGE_OLD_INFIMUM;
2990

2991 2992
			do {
				heap_no = rec_get_heap_no_old(rec);
2993

2994 2995
				lock_rec_inherit_to_gap(g.cell1(), heir,
							g.cell2(), page_id,
2996
							heir_block->page.frame,
2997
							heir_heap_no, heap_no);
2998

2999
				lock_rec_reset_and_release_wait(
3000
					g.cell2(), page_id, heap_no);
3001

3002 3003 3004
				rec = page + rec_get_next_offs(rec, FALSE);
			} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
		}
3005

3006
		lock_rec_free_all_from_discard_page(page_id, g.cell2(),
3007
						    lock_sys.rec_hash);
3008
	} else {
3009
		const auto fold = page_id.fold();
3010 3011
		auto cell = lock_sys.prdt_hash.cell_get(fold);
		auto latch = lock_sys_t::hash_table::latch(cell);
3012
		latch->acquire();
3013
		lock_rec_free_all_from_discard_page(page_id, *cell,
3014 3015
						    lock_sys.prdt_hash);
		latch->release();
3016 3017
		cell = lock_sys.prdt_page_hash.cell_get(fold);
		latch = lock_sys_t::hash_table::latch(cell);
3018
		latch->acquire();
3019 3020
		lock_rec_free_all_from_discard_page(page_id, *cell,
						    lock_sys.prdt_page_hash);
3021
		latch->release();
3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035
	}
}

/*************************************************************//**
Updates the lock table when a new user record is inserted. */
void
lock_update_insert(
/*===============*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: the inserted record */
{
	ulint	receiver_heap_no;
	ulint	donator_heap_no;

3036
	ut_ad(block->page.frame == page_align(rec));
3037
	ut_ad(!page_rec_is_metadata(rec));
3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063

	/* Inherit the gap-locking locks for rec, in gap mode, from the next
	record */

	if (page_rec_is_comp(rec)) {
		receiver_heap_no = rec_get_heap_no_new(rec);
		donator_heap_no = rec_get_heap_no_new(
			page_rec_get_next_low(rec, TRUE));
	} else {
		receiver_heap_no = rec_get_heap_no_old(rec);
		donator_heap_no = rec_get_heap_no_old(
			page_rec_get_next_low(rec, FALSE));
	}

	lock_rec_inherit_to_gap_if_gap_lock(
		block, receiver_heap_no, donator_heap_no);
}

/*************************************************************//**
Updates the lock table when a record is removed. */
void
lock_update_delete(
/*===============*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: the record to be removed */
{
3064
	const page_t*	page = block->page.frame;
3065 3066 3067 3068
	ulint		heap_no;
	ulint		next_heap_no;

	ut_ad(page == page_align(rec));
3069
	ut_ad(!page_rec_is_metadata(rec));
3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082

	if (page_is_comp(page)) {
		heap_no = rec_get_heap_no_new(rec);
		next_heap_no = rec_get_heap_no_new(page
						   + rec_get_next_offs(rec,
								       TRUE));
	} else {
		heap_no = rec_get_heap_no_old(rec);
		next_heap_no = rec_get_heap_no_old(page
						   + rec_get_next_offs(rec,
								       FALSE));
	}

3083
	const page_id_t id{block->page.id()};
3084
	LockGuard g{lock_sys.rec_hash, id};
3085 3086 3087

	/* Let the next record inherit the locks from rec, in gap mode */

3088
	lock_rec_inherit_to_gap(g.cell(), id, g.cell(), id, block->page.frame,
3089
				next_heap_no, heap_no);
3090 3091

	/* Reset the lock bits on rec and release waiting transactions */
3092
	lock_rec_reset_and_release_wait(g.cell(), id, heap_no);
3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111
}

/*********************************************************************//**
Stores on the page infimum record the explicit locks of another record.
This function is used to store the lock state of a record when it is
updated and the size of the record changes in the update. The record
is moved in such an update, perhaps to another page. The infimum record
acts as a dummy carrier record, taking care of lock releases while the
actual record is being moved. */
void
lock_rec_store_on_page_infimum(
/*===========================*/
	const buf_block_t*	block,	/*!< in: buffer block containing rec */
	const rec_t*		rec)	/*!< in: record whose lock state
					is stored on the infimum
					record of the same page; lock
					bits are reset on the
					record */
{
3112
  const ulint heap_no= page_rec_get_heap_no(rec);
3113

3114
  ut_ad(block->page.frame == page_align(rec));
3115
  const page_id_t id{block->page.id()};
3116

3117
  LockGuard g{lock_sys.rec_hash, id};
3118 3119
  lock_rec_move(g.cell(), *block, id, g.cell(), id,
                PAGE_HEAP_NO_INFIMUM, heap_no);
3120 3121
}

3122 3123 3124 3125 3126 3127 3128 3129
/** Restore the explicit lock requests on a single record, where the
state was stored on the infimum of a page.
@param block   buffer block containing rec
@param rec     record whose lock state is restored
@param donator page (rec is not necessarily on this page)
whose infimum stored the lock state; lock bits are reset on the infimum */
void lock_rec_restore_from_page_infimum(const buf_block_t &block,
					const rec_t *rec, page_id_t donator)
3130
{
3131
  const ulint heap_no= page_rec_get_heap_no(rec);
3132 3133 3134 3135
  const page_id_t id{block.page.id()};
  LockMultiGuard g{lock_sys.rec_hash, id, donator};
  lock_rec_move(g.cell1(), block, id, g.cell2(), donator, heap_no,
                PAGE_HEAP_NO_INFIMUM);
3136 3137
}

3138
/*========================= TABLE LOCKS ==============================*/
3139

3140 3141 3142 3143 3144 3145 3146 3147 3148
/**
Create a table lock, without checking for deadlocks or lock compatibility.
@param table      table on which the lock is created
@param type_mode  lock type and mode
@param trx        transaction
@param c_lock     conflicting lock
@return the created lock object */
lock_t *lock_table_create(dict_table_t *table, unsigned type_mode, trx_t *trx,
                          lock_t *c_lock)
3149
{
3150
	lock_t*		lock;
3151

3152
	lock_sys.assert_locked(*table);
3153
	ut_ad(trx->mutex_is_owner());
3154 3155
	ut_ad(!trx->is_wsrep() || lock_sys.is_writer());
	ut_ad(trx->state == TRX_STATE_ACTIVE || trx->is_recovered);
Marko Mäkelä's avatar
Marko Mäkelä committed
3156
	ut_ad(!trx->is_autocommit_non_locking());
3157 3158 3159 3160 3161 3162
	/* During CREATE TABLE, we will write to newly created FTS_*_CONFIG
	on which no lock has been created yet. */
	ut_ad(!trx->dict_operation_lock_mode
	      || (strstr(table->name.m_name, "/FTS_")
		  && strstr(table->name.m_name, "_CONFIG") + sizeof("_CONFIG")
		  == table->name.m_name + strlen(table->name.m_name) + 1));
3163

3164 3165
	switch (LOCK_MODE_MASK & type_mode) {
	case LOCK_AUTO_INC:
3166
		++table->n_waiting_or_granted_auto_inc_locks;
3167 3168 3169 3170 3171
		/* For AUTOINC locking we reuse the lock instance only if
		there is no wait involved else we allocate the waiting lock
		from the transaction lock heap. */
		if (type_mode == LOCK_AUTO_INC) {
			lock = table->autoinc_lock;
3172

3173 3174
			ut_ad(!table->autoinc_trx);
			table->autoinc_trx = trx;
3175

3176 3177 3178
			ib_vector_push(trx->autoinc_locks, &lock);
			goto allocated;
		}
3179

3180 3181 3182 3183 3184
		break;
	case LOCK_X:
	case LOCK_S:
		++table->n_lock_x_or_s;
		break;
3185 3186
	}

3187 3188 3189 3190 3191 3192
	lock = trx->lock.table_cached < array_elements(trx->lock.table_pool)
		? &trx->lock.table_pool[trx->lock.table_cached++]
		: static_cast<lock_t*>(
			mem_heap_alloc(trx->lock.lock_heap, sizeof *lock));

allocated:
3193 3194
	lock->type_mode = ib_uint32_t(type_mode | LOCK_TABLE);
	lock->trx = trx;
3195

3196
	lock->un_member.tab_lock.table = table;
3197

3198
	ut_ad(table->get_ref_count() > 0 || !table->can_be_evicted);
3199

3200
	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
3201

3202
	ut_list_append(table->locks, lock, TableLockGetNode());
3203

3204
	if (type_mode & LOCK_WAIT) {
3205 3206 3207 3208 3209 3210 3211 3212 3213
		if (trx->lock.wait_trx) {
			ut_ad(!c_lock || trx->lock.wait_trx == c_lock->trx);
			ut_ad(trx->lock.wait_lock);
			ut_ad((*trx->lock.wait_lock).trx == trx);
		} else {
			ut_ad(c_lock);
			trx->lock.wait_trx = c_lock->trx;
			ut_ad(!trx->lock.wait_lock);
		}
3214
		trx->lock.wait_lock = lock;
3215 3216
	}

3217 3218 3219 3220
	lock->trx->lock.table_locks.push_back(lock);

	MONITOR_INC(MONITOR_TABLELOCK_CREATED);
	MONITOR_INC(MONITOR_NUM_TABLELOCK);
3221 3222 3223 3224

	return(lock);
}

3225 3226 3227 3228 3229
/*************************************************************//**
Pops autoinc lock requests from the transaction's autoinc_locks. We
handle the case where there are gaps in the array and they need to
be popped off the stack. */
UNIV_INLINE
3230
void
3231 3232 3233
lock_table_pop_autoinc_locks(
/*=========================*/
	trx_t*	trx)	/*!< in/out: transaction that owns the AUTOINC locks */
3234
{
3235
	ut_ad(!ib_vector_is_empty(trx->autoinc_locks));
3236

3237 3238
	/* Skip any gaps, gaps are NULL lock entries in the
	trx->autoinc_locks vector. */
3239

3240 3241
	do {
		ib_vector_pop(trx->autoinc_locks);
3242

3243 3244 3245
		if (ib_vector_is_empty(trx->autoinc_locks)) {
			return;
		}
3246

3247 3248
	} while (*(lock_t**) ib_vector_get_last(trx->autoinc_locks) == NULL);
}
3249

3250 3251 3252 3253 3254 3255 3256 3257 3258
/*************************************************************//**
Removes an autoinc lock request from the transaction's autoinc_locks. */
UNIV_INLINE
void
lock_table_remove_autoinc_lock(
/*===========================*/
	lock_t*	lock,	/*!< in: table lock */
	trx_t*	trx)	/*!< in/out: transaction that owns the lock */
{
3259
	ut_ad(lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE));
3260
	lock_sys.assert_locked(*lock->un_member.tab_lock.table);
3261 3262 3263 3264
	ut_ad(trx->mutex_is_owner());

	auto s = ib_vector_size(trx->autoinc_locks);
	ut_ad(s);
3265

3266 3267 3268 3269
	/* With stored functions and procedures the user may drop
	a table within the same "statement". This special case has
	to be handled by deleting only those AUTOINC locks that were
	held by the table being dropped. */
3270

3271 3272
	lock_t*	autoinc_lock = *static_cast<lock_t**>(
		ib_vector_get(trx->autoinc_locks, --s));
3273

3274
	/* This is the default fast case. */
3275

3276 3277 3278 3279 3280
	if (autoinc_lock == lock) {
		lock_table_pop_autoinc_locks(trx);
	} else {
		/* The last element should never be NULL */
		ut_a(autoinc_lock != NULL);
3281

3282
		/* Handle freeing the locks from within the stack. */
3283

3284
		while (s) {
3285
			autoinc_lock = *static_cast<lock_t**>(
3286
				ib_vector_get(trx->autoinc_locks, --s));
3287

3288 3289
			if (autoinc_lock == lock) {
				void*	null_var = NULL;
3290
				ib_vector_set(trx->autoinc_locks, s, &null_var);
3291 3292
				return;
			}
3293
		}
3294

3295 3296
		/* Must find the autoinc lock. */
		ut_error;
3297
	}
3298 3299
}

3300 3301 3302 3303 3304
/*************************************************************//**
Removes a table lock request from the queue and the trx list of locks;
this is a low-level function which does NOT check if waiting requests
can now be granted. */
UNIV_INLINE
3305
const dict_table_t*
3306 3307 3308
lock_table_remove_low(
/*==================*/
	lock_t*	lock)	/*!< in/out: table lock */
3309
{
3310 3311
	ut_ad(lock->is_table());

3312 3313
	trx_t*		trx;
	dict_table_t*	table;
3314

3315
	ut_ad(lock->is_table());
3316 3317
	trx = lock->trx;
	table = lock->un_member.tab_lock.table;
3318
	lock_sys.assert_locked(*table);
3319
	ut_ad(trx->mutex_is_owner());
3320

3321 3322
	/* Remove the table from the transaction's AUTOINC vector, if
	the lock that is being released is an AUTOINC lock. */
3323 3324
	switch (lock->mode()) {
	case LOCK_AUTO_INC:
3325
		ut_ad((table->autoinc_trx == trx) == !lock->is_waiting());
3326

3327 3328
		if (table->autoinc_trx == trx) {
			table->autoinc_trx = NULL;
3329 3330 3331
			/* The locks must be freed in the reverse order from
			the one in which they were acquired. This is to avoid
			traversing the AUTOINC lock vector unnecessarily.
3332

3333 3334 3335
			We only store locks that were granted in the
			trx->autoinc_locks vector (see lock_table_create()
			and lock_grant()). */
3336 3337 3338
			lock_table_remove_autoinc_lock(lock, trx);
		}

3339 3340 3341 3342 3343 3344 3345 3346 3347 3348
		ut_ad(table->n_waiting_or_granted_auto_inc_locks);
		--table->n_waiting_or_granted_auto_inc_locks;
		break;
	case LOCK_X:
	case LOCK_S:
		ut_ad(table->n_lock_x_or_s);
		--table->n_lock_x_or_s;
		break;
	default:
		break;
3349 3350 3351 3352 3353 3354 3355
	}

	UT_LIST_REMOVE(trx->lock.trx_locks, lock);
	ut_list_remove(table->locks, lock, TableLockGetNode());

	MONITOR_INC(MONITOR_TABLELOCK_REMOVED);
	MONITOR_DEC(MONITOR_NUM_TABLELOCK);
3356
	return table;
3357 3358
}

3359 3360 3361
/*********************************************************************//**
Enqueues a waiting request for a table lock which cannot be granted
immediately. Checks for deadlocks.
3362
@retval	DB_LOCK_WAIT	if the waiting lock was enqueued
3363
@retval	DB_DEADLOCK	if this transaction was chosen as the victim */
3364
static
3365 3366 3367
dberr_t
lock_table_enqueue_waiting(
/*=======================*/
3368
	unsigned	mode,	/*!< in: lock mode this transaction is
3369 3370
				requesting */
	dict_table_t*	table,	/*!< in/out: table */
3371 3372
	que_thr_t*	thr,	/*!< in: query thread */
	lock_t*		c_lock)	/*!< in: conflicting lock or NULL */
3373
{
3374
	lock_sys.assert_locked(*table);
3375
	ut_ad(!srv_read_only_mode);
3376

3377 3378
	trx_t* trx = thr_get_trx(thr);
	ut_ad(trx->mutex_is_owner());
3379
	ut_ad(!trx->dict_operation_lock_mode);
3380

3381
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
3382
	if (trx->is_wsrep() && trx->lock.was_chosen_as_deadlock_victim) {
3383 3384 3385
		return(DB_DEADLOCK);
	}
#endif /* WITH_WSREP */
3386

3387
	/* Enqueue the lock request that will wait to be granted */
3388
	lock_table_create(table, mode | LOCK_WAIT, trx, c_lock);
3389

3390
	trx->lock.wait_thr = thr;
3391
	trx->lock.clear_deadlock_victim();
3392

3393 3394 3395
	MONITOR_INC(MONITOR_TABLELOCK_WAIT);
	return(DB_LOCK_WAIT);
}
3396

3397 3398 3399 3400 3401
/*********************************************************************//**
Checks if other transactions have an incompatible mode lock request in
the lock queue.
@return lock or NULL */
UNIV_INLINE
3402
lock_t*
3403 3404 3405 3406 3407 3408 3409 3410 3411 3412
lock_table_other_has_incompatible(
/*==============================*/
	const trx_t*		trx,	/*!< in: transaction, or NULL if all
					transactions should be included */
	ulint			wait,	/*!< in: LOCK_WAIT if also
					waiting locks are taken into
					account, or 0 if not */
	const dict_table_t*	table,	/*!< in: table */
	lock_mode		mode)	/*!< in: lock mode */
{
3413
	lock_sys.assert_locked(*table);
3414

3415 3416 3417 3418 3419 3420 3421 3422 3423
	static_assert(LOCK_IS == 0, "compatibility");
	static_assert(LOCK_IX == 1, "compatibility");

	if (UNIV_LIKELY(mode <= LOCK_IX && !table->n_lock_x_or_s)) {
		return(NULL);
	}

	for (lock_t* lock = UT_LIST_GET_LAST(table->locks);
	     lock;
3424
	     lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock)) {
3425

3426 3427 3428
		trx_t* lock_trx = lock->trx;

		if (lock_trx != trx
3429 3430
		    && !lock_mode_compatible(lock->mode(), mode)
		    && (wait || !lock->is_waiting())) {
3431 3432 3433
			return(lock);
		}
	}
3434

3435 3436
	return(NULL);
}
3437

3438 3439 3440 3441
/** Aqcuire or enqueue a table lock */
static dberr_t lock_table_low(dict_table_t *table, lock_mode mode,
                              que_thr_t *thr, trx_t *trx)
{
3442
  DBUG_EXECUTE_IF("innodb_table_deadlock", return DB_DEADLOCK;);
3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469
  lock_t *wait_for=
    lock_table_other_has_incompatible(trx, LOCK_WAIT, table, mode);
  dberr_t err= DB_SUCCESS;

  trx->mutex_lock();

  if (wait_for)
    err= lock_table_enqueue_waiting(mode, table, thr, wait_for);
  else
    lock_table_create(table, mode, trx, nullptr);

  trx->mutex_unlock();

  return err;
}

#ifdef WITH_WSREP
/** Aqcuire or enqueue a table lock in Galera replication mode. */
ATTRIBUTE_NOINLINE
static dberr_t lock_table_wsrep(dict_table_t *table, lock_mode mode,
                                que_thr_t *thr, trx_t *trx)
{
  LockMutexGuard g{SRW_LOCK_CALL};
  return lock_table_low(table, mode, thr, trx);
}
#endif

3470 3471 3472
/*********************************************************************//**
Locks the specified database table in the mode given. If the lock cannot
be granted immediately, the query thread is put to wait.
3473
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
3474 3475 3476 3477 3478 3479 3480 3481 3482
dberr_t
lock_table(
/*=======*/
	dict_table_t*	table,	/*!< in/out: database table
				in dictionary cache */
	lock_mode	mode,	/*!< in: lock mode */
	que_thr_t*	thr)	/*!< in: query thread */
{
	trx_t*		trx;
3483

3484 3485
	if (table->is_temporary()) {
		return DB_SUCCESS;
3486
	}
3487

3488
	trx = thr_get_trx(thr);
3489

3490
	/* Look for equal or stronger locks the same trx already
3491
	has on the table. No need to acquire LockMutexGuard here
3492
	because only this transaction can add/access table locks
3493
	to/from trx_t::table_locks. */
3494

3495
	if (lock_table_has(trx, table, mode) || srv_read_only_mode) {
3496
		return(DB_SUCCESS);
3497 3498
	}

3499 3500 3501 3502
	/* Read only transactions can write to temp tables, we don't want
	to promote them to RW transactions. Their updates cannot be visible
	to other transactions. Therefore we can keep them out
	of the read views. */
3503

3504 3505 3506
	if ((mode == LOCK_IX || mode == LOCK_X)
	    && !trx->read_only
	    && trx->rsegs.m_redo.rseg == 0) {
3507

3508 3509
		trx_set_rw_mode(trx);
	}
3510

3511 3512
#ifdef WITH_WSREP
	if (trx->is_wsrep()) {
3513
		return lock_table_wsrep(table, mode, thr, trx);
3514
	}
3515
#endif
3516 3517
	lock_sys.rd_lock(SRW_LOCK_CALL);
	table->lock_mutex_lock();
3518
	dberr_t err = lock_table_low(table, mode, thr, trx);
3519 3520
	table->lock_mutex_unlock();
	lock_sys.rd_unlock();
3521

3522
	return err;
3523 3524
}

3525
/** Create a table lock object for a resurrected transaction.
3526
@param table    table to be X-locked
3527 3528 3529
@param trx      transaction
@param mode     LOCK_X or LOCK_IX */
void lock_table_resurrect(dict_table_t *table, trx_t *trx, lock_mode mode)
3530 3531
{
  ut_ad(trx->is_recovered);
3532 3533 3534
  ut_ad(mode == LOCK_X || mode == LOCK_IX);

  if (lock_table_has(trx, table, mode))
3535 3536
    return;

3537
  {
3538 3539
    /* This is executed at server startup while no connections
    are alowed. Do not bother with lock elision. */
3540
    LockMutexGuard g{SRW_LOCK_CALL};
3541
    ut_ad(!lock_table_other_has_incompatible(trx, LOCK_WAIT, table, mode));
3542

3543
    trx->mutex_lock();
3544
    lock_table_create(table, mode, trx);
3545
  }
3546
  trx->mutex_unlock();
3547 3548
}

3549 3550
/** Find a lock that a waiting table lock request still has to wait for. */
static const lock_t *lock_table_has_to_wait_in_queue(const lock_t *wait_lock)
3551
{
3552 3553
  ut_ad(wait_lock->is_waiting());
  ut_ad(wait_lock->is_table());
3554

3555 3556
  dict_table_t *table= wait_lock->un_member.tab_lock.table;
  lock_sys.assert_locked(*table);
3557

3558 3559
  static_assert(LOCK_IS == 0, "compatibility");
  static_assert(LOCK_IX == 1, "compatibility");
3560

3561 3562
  if (UNIV_LIKELY(wait_lock->mode() <= LOCK_IX && !table->n_lock_x_or_s))
    return nullptr;
3563

3564 3565 3566 3567
  for (const lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock != wait_lock;
       lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
    if (lock_has_to_wait(wait_lock, lock))
      return lock;
3568

3569
  return nullptr;
3570
}
3571

3572 3573 3574
/*************************************************************//**
Removes a table lock request, waiting or granted, from the queue and grants
locks to other transactions in the queue, if they now are entitled to a
3575 3576 3577 3578
lock.
@param[in,out]	in_lock		table lock
@param[in]	owns_wait_mutex	whether lock_sys.wait_mutex is held */
static void lock_table_dequeue(lock_t *in_lock, bool owns_wait_mutex)
3579
{
3580 3581 3582
#ifdef SAFE_MUTEX
	ut_ad(owns_wait_mutex == mysql_mutex_is_owner(&lock_sys.wait_mutex));
#endif
3583
	ut_ad(in_lock->trx->mutex_is_owner());
3584
	lock_t*	lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, in_lock);
3585

3586
	const dict_table_t* table = lock_table_remove_low(in_lock);
3587

3588 3589 3590 3591 3592 3593 3594
	static_assert(LOCK_IS == 0, "compatibility");
	static_assert(LOCK_IX == 1, "compatibility");

	if (UNIV_LIKELY(in_lock->mode() <= LOCK_IX && !table->n_lock_x_or_s)) {
		return;
	}

3595 3596
	bool acquired = false;

3597 3598
	/* Check if waiting locks in the queue can now be granted: grant
	locks if there are no conflicting locks ahead. */
3599

3600 3601 3602
	for (/* No op */;
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
3603 3604 3605 3606 3607 3608 3609 3610
		if (!lock->is_waiting()) {
			continue;
		}

		if (!owns_wait_mutex) {
			mysql_mutex_lock(&lock_sys.wait_mutex);
			acquired = owns_wait_mutex = true;
		}
3611

3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623
		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

		if (const lock_t* c = lock_table_has_to_wait_in_queue(lock)) {
			trx_t* c_trx = c->trx;
			lock->trx->lock.wait_trx = c_trx;
			if (c_trx->lock.wait_trx
			    && innodb_deadlock_detect
			    && Deadlock::to_check.emplace(c_trx).second) {
				Deadlock::to_be_checked = true;
			}
		} else {
3624 3625
			/* Grant the lock */
			ut_ad(in_lock->trx != lock->trx);
3626
			in_lock->trx->mutex_unlock();
3627
			lock_grant(lock);
3628
			in_lock->trx->mutex_lock();
3629 3630
		}
	}
3631 3632 3633 3634

	if (acquired) {
		mysql_mutex_unlock(&lock_sys.wait_mutex);
	}
3635 3636
}

3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668
/** Sets a lock on a table based on the given mode.
@param[in]	table	table to lock
@param[in,out]	trx	transaction
@param[in]	mode	LOCK_X or LOCK_S
@return error code or DB_SUCCESS. */
dberr_t
lock_table_for_trx(
	dict_table_t*	table,
	trx_t*		trx,
	enum lock_mode	mode)
{
	mem_heap_t*	heap;
	que_thr_t*	thr;
	dberr_t		err;
	sel_node_t*	node;
	heap = mem_heap_create(512);

	node = sel_node_create(heap);
	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
	thr->graph->state = QUE_FORK_ACTIVE;

	/* We use the select query graph as the dummy graph needed
	in the lock module call */

	thr = static_cast<que_thr_t*>(
		que_fork_get_first_thr(
			static_cast<que_fork_t*>(que_node_get_parent(thr))));

run_again:
	thr->run_node = thr;
	thr->prev_node = thr->common.parent;

3669
	err = lock_table(table, mode, thr);
3670 3671 3672

	trx->error_state = err;

3673
	if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
3674
		if (row_mysql_handle_errors(&err, trx, thr, NULL)) {
3675 3676 3677 3678 3679 3680 3681 3682 3683 3684
			goto run_again;
		}
	}

	que_graph_free(thr->graph);
	trx->op_info = "";

	return(err);
}

3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706
/** Exclusively lock the data dictionary tables.
@param trx  dictionary transaction
@return error code
@retval DB_SUCCESS on success */
dberr_t lock_sys_tables(trx_t *trx)
{
  dberr_t err;
  if (!(err= lock_table_for_trx(dict_sys.sys_tables, trx, LOCK_X)) &&
      !(err= lock_table_for_trx(dict_sys.sys_columns, trx, LOCK_X)) &&
      !(err= lock_table_for_trx(dict_sys.sys_indexes, trx, LOCK_X)) &&
      !(err= lock_table_for_trx(dict_sys.sys_fields, trx, LOCK_X)))
  {
    if (dict_sys.sys_foreign)
      err= lock_table_for_trx(dict_sys.sys_foreign, trx, LOCK_X);
    if (!err && dict_sys.sys_foreign_cols)
      err= lock_table_for_trx(dict_sys.sys_foreign_cols, trx, LOCK_X);
    if (!err && dict_sys.sys_virtual)
      err= lock_table_for_trx(dict_sys.sys_virtual, trx, LOCK_X);
  }
  return err;
}

3707
/*=========================== LOCK RELEASE ==============================*/
3708

3709 3710 3711 3712
/*************************************************************//**
Removes a granted record lock of a transaction from the queue and grants
locks to other transactions waiting in the queue if they now are entitled
to a lock. */
3713
TRANSACTIONAL_TARGET
3714 3715 3716 3717 3718
void
lock_rec_unlock(
/*============*/
	trx_t*			trx,	/*!< in/out: transaction that has
					set a record lock */
3719
	const page_id_t		id,	/*!< in: page containing rec */
3720 3721
	const rec_t*		rec,	/*!< in: record */
	lock_mode		lock_mode)/*!< in: LOCK_S or LOCK_X */
3722
{
3723 3724 3725
	lock_t*		first_lock;
	lock_t*		lock;
	ulint		heap_no;
3726

3727 3728 3729 3730
	ut_ad(trx);
	ut_ad(rec);
	ut_ad(!trx->lock.wait_lock);
	ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
3731
	ut_ad(!page_rec_is_metadata(rec));
3732

3733
	heap_no = page_rec_get_heap_no(rec);
3734

3735
	LockGuard g{lock_sys.rec_hash, id};
3736

3737
	first_lock = lock_sys_t::get_first(g.cell(), id, heap_no);
3738

3739 3740
	/* Find the last lock with the same lock_mode and transaction
	on the record. */
3741

3742 3743
	for (lock = first_lock; lock != NULL;
	     lock = lock_rec_get_next(heap_no, lock)) {
3744
		if (lock->trx == trx && lock->mode() == lock_mode) {
3745 3746
			goto released;
		}
3747 3748
	}

3749 3750 3751 3752
	{
		ib::error	err;
		err << "Unlock row could not find a " << lock_mode
			<< " mode lock on the record. Current statement: ";
Marko Mäkelä's avatar
Marko Mäkelä committed
3753 3754 3755 3756
		size_t		stmt_len;
		if (const char* stmt = innobase_get_stmt_unsafe(
			    trx->mysql_thd, &stmt_len)) {
			err.write(stmt, stmt_len);
3757
		}
3758
	}
3759

3760
	return;
3761

3762
released:
3763
	ut_a(!lock->is_waiting());
3764 3765 3766 3767
	{
		TMTrxGuard tg{*trx};
		lock_rec_reset_nth_bit(lock, heap_no);
	}
3768

3769
	/* Check if we can now grant waiting lock requests */
3770

3771 3772
	for (lock = first_lock; lock != NULL;
	     lock = lock_rec_get_next(heap_no, lock)) {
3773
		if (!lock->is_waiting()) {
3774 3775
			continue;
		}
3776 3777 3778 3779
		mysql_mutex_lock(&lock_sys.wait_mutex);
		ut_ad(lock->trx->lock.wait_trx);
		ut_ad(lock->trx->lock.wait_lock);

3780 3781
		if (const lock_t* c = lock_rec_has_to_wait_in_queue(g.cell(),
								    lock)) {
3782 3783 3784 3785 3786
			lock->trx->lock.wait_trx = c->trx;
		} else {
			/* Grant the lock */
			ut_ad(trx != lock->trx);
			lock_grant(lock);
3787
		}
3788
		mysql_mutex_unlock(&lock_sys.wait_mutex);
3789
	}
3790
}
3791

3792
/** Release the explicit locks of a committing transaction,
3793 3794
and release possible other transactions waiting because of these locks.
@return whether the operation succeeded */
3795
TRANSACTIONAL_TARGET static bool lock_release_try(trx_t *trx)
3796
{
3797 3798 3799 3800
  /* At this point, trx->lock.trx_locks cannot be modified by other
  threads, because our transaction has been committed.
  See the checks and assertions in lock_rec_create_low() and
  lock_rec_add_to_queue().
3801

3802 3803 3804 3805 3806
  The function lock_table_create() should never be invoked on behalf
  of a transaction running in another thread. Also there, we will
  assert that the current transaction be active. */
  DBUG_ASSERT(trx->state == TRX_STATE_COMMITTED_IN_MEMORY);
  DBUG_ASSERT(!trx->is_referenced());
3807

3808 3809 3810
  bool all_released= true;
restart:
  ulint count= 1000;
3811 3812 3813 3814
  /* We will not attempt hardware lock elision (memory transaction)
  here. Both lock_rec_dequeue_from_page() and lock_table_dequeue()
  would likely lead to a memory transaction due to a system call, to
  wake up a waiting transaction. */
3815
  lock_sys.rd_lock(SRW_LOCK_CALL);
3816
  trx->mutex_lock();
3817

3818 3819 3820 3821 3822 3823
  /* Note: Anywhere else, trx->mutex is not held while acquiring
  a lock table latch, but here we are following the opposite order.
  To avoid deadlocks, we only try to acquire the lock table latches
  but not keep waiting for them. */

  for (lock_t *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock; )
3824 3825
  {
    ut_ad(lock->trx == trx);
3826
    lock_t *prev= UT_LIST_GET_PREV(trx_locks, lock);
3827 3828 3829 3830 3831 3832
    if (!lock->is_table())
    {
      ut_ad(!lock->index->table->is_temporary());
      ut_ad(lock->mode() != LOCK_X ||
            lock->index->table->id >= DICT_HDR_FIRST_ID ||
            trx->dict_operation);
3833
      auto &lock_hash= lock_sys.hash_get(lock->type_mode);
3834 3835
      auto cell= lock_hash.cell_get(lock->un_member.rec_lock.page_id.fold());
      auto latch= lock_sys_t::hash_table::latch(cell);
3836 3837 3838 3839 3840 3841 3842
      if (!latch->try_acquire())
        all_released= false;
      else
      {
        lock_rec_dequeue_from_page(lock, false);
        latch->release();
      }
3843 3844 3845
    }
    else
    {
3846
      dict_table_t *table= lock->un_member.tab_lock.table;
3847 3848 3849 3850
      ut_ad(!table->is_temporary());
      ut_ad(table->id >= DICT_HDR_FIRST_ID ||
            (lock->mode() != LOCK_IX && lock->mode() != LOCK_X) ||
            trx->dict_operation);
3851 3852 3853 3854 3855 3856 3857
      if (!table->lock_mutex_trylock())
        all_released= false;
      else
      {
        lock_table_dequeue(lock, false);
        table->lock_mutex_unlock();
      }
3858
    }
3859

3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875
    lock= all_released ? UT_LIST_GET_LAST(trx->lock.trx_locks) : prev;
    if (!--count)
      break;
  }

  lock_sys.rd_unlock();
  trx->mutex_unlock();
  if (all_released && !count)
    goto restart;
  return all_released;
}

/** Release the explicit locks of a committing transaction,
and release possible other transactions waiting because of these locks. */
void lock_release(trx_t *trx)
{
3876 3877
#if defined SAFE_MUTEX && defined UNIV_DEBUG
  std::set<table_id_t> to_evict;
3878 3879 3880 3881 3882 3883
  if (innodb_evict_tables_on_commit_debug &&
      !trx->is_recovered && !trx->dict_operation &&
      !trx->dict_operation_lock_mode)
    for (const auto& p : trx->mod_tables)
      if (!p.first->is_temporary())
        to_evict.emplace(p.first->id);
3884
#endif
3885 3886 3887 3888 3889 3890 3891 3892 3893
  ulint count;

  for (count= 5; count--; )
    if (lock_release_try(trx))
      goto released;

  /* Fall back to acquiring lock_sys.latch in exclusive mode */
restart:
  count= 1000;
3894 3895
  /* There is probably no point to try lock elision here;
  in lock_release_try() it is different. */
3896 3897 3898 3899 3900 3901 3902
  lock_sys.wr_lock(SRW_LOCK_CALL);
  trx->mutex_lock();

  while (lock_t *lock= UT_LIST_GET_LAST(trx->lock.trx_locks))
  {
    ut_ad(lock->trx == trx);
    if (!lock->is_table())
3903
    {
3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917
      ut_ad(!lock->index->table->is_temporary());
      ut_ad(lock->mode() != LOCK_X ||
            lock->index->table->id >= DICT_HDR_FIRST_ID ||
            trx->dict_operation);
      lock_rec_dequeue_from_page(lock, false);
    }
    else
    {
      ut_d(dict_table_t *table= lock->un_member.tab_lock.table);
      ut_ad(!table->is_temporary());
      ut_ad(table->id >= DICT_HDR_FIRST_ID ||
            (lock->mode() != LOCK_IX && lock->mode() != LOCK_X) ||
            trx->dict_operation);
      lock_table_dequeue(lock, false);
3918
    }
3919

3920 3921
    if (!--count)
      break;
3922
  }
3923

3924
  lock_sys.wr_unlock();
3925
  trx->mutex_unlock();
3926 3927 3928 3929
  if (!count)
    goto restart;

released:
3930 3931 3932
  if (UNIV_UNLIKELY(Deadlock::to_be_checked))
  {
    mysql_mutex_lock(&lock_sys.wait_mutex);
3933
    lock_sys.deadlock_check();
3934 3935
    mysql_mutex_unlock(&lock_sys.wait_mutex);
  }
3936

3937 3938
  trx->lock.was_chosen_as_deadlock_victim= false;
  trx->lock.n_rec_locks= 0;
3939 3940 3941 3942

#if defined SAFE_MUTEX && defined UNIV_DEBUG
  if (to_evict.empty())
    return;
3943
  dict_sys.lock(SRW_LOCK_CALL);
3944 3945
  LockMutexGuard g{SRW_LOCK_CALL};
  for (const table_id_t id : to_evict)
3946
    if (dict_table_t *table= dict_sys.find_table(id))
3947 3948
      if (!table->get_ref_count() && !UT_LIST_GET_LEN(table->locks))
        dict_sys.remove(table, true);
3949
  dict_sys.unlock();
3950
#endif
3951 3952
}

Marko Mäkelä's avatar
Marko Mäkelä committed
3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020
/** Release non-exclusive locks on XA PREPARE,
and wake up possible other transactions waiting because of these locks.
@param trx   transaction in XA PREPARE state
@return whether all locks were released */
static bool lock_release_on_prepare_try(trx_t *trx)
{
  /* At this point, trx->lock.trx_locks can still be modified by other
  threads to convert implicit exclusive locks into explicit ones.

  The function lock_table_create() should never be invoked on behalf
  of a transaction that is running in another thread. Also there, we
  will assert that the current transaction be active. */
  DBUG_ASSERT(trx->state == TRX_STATE_PREPARED);

  bool all_released= true;
  lock_sys.rd_lock(SRW_LOCK_CALL);
  trx->mutex_lock();

  /* Note: Normally, trx->mutex is not held while acquiring
  a lock table latch, but here we are following the opposite order.
  To avoid deadlocks, we only try to acquire the lock table latches
  but not keep waiting for them. */

  for (lock_t *prev, *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock;
       lock= prev)
  {
    ut_ad(lock->trx == trx);
    prev= UT_LIST_GET_PREV(trx_locks, lock);
    if (!lock->is_table())
    {
      ut_ad(!lock->index->table->is_temporary());
      if (lock->mode() == LOCK_X && !lock->is_gap())
        continue;
      auto &lock_hash= lock_sys.hash_get(lock->type_mode);
      auto cell= lock_hash.cell_get(lock->un_member.rec_lock.page_id.fold());
      auto latch= lock_sys_t::hash_table::latch(cell);
      if (latch->try_acquire())
      {
        lock_rec_dequeue_from_page(lock, false);
        latch->release();
      }
      else
        all_released= false;
    }
    else
    {
      dict_table_t *table= lock->un_member.tab_lock.table;
      ut_ad(!table->is_temporary());
      switch (lock->mode()) {
      case LOCK_IS:
      case LOCK_S:
        if (table->lock_mutex_trylock())
        {
          lock_table_dequeue(lock, false);
          table->lock_mutex_unlock();
        }
        else
          all_released= false;
        break;
      case LOCK_IX:
      case LOCK_X:
        ut_ad(table->id >= DICT_HDR_FIRST_ID || trx->dict_operation);
        /* fall through */
      default:
        break;
      }
    }
  }
4021

Marko Mäkelä's avatar
Marko Mäkelä committed
4022 4023 4024
  lock_sys.rd_unlock();
  trx->mutex_unlock();
  return all_released;
4025 4026
}

4027 4028 4029 4030
/** Release non-exclusive locks on XA PREPARE,
and release possible other transactions waiting because of these locks. */
void lock_release_on_prepare(trx_t *trx)
{
Marko Mäkelä's avatar
Marko Mäkelä committed
4031 4032 4033
  for (ulint count= 5; count--; )
    if (lock_release_on_prepare_try(trx))
      return;
4034

Marko Mäkelä's avatar
Marko Mäkelä committed
4035 4036 4037 4038 4039
  LockMutexGuard g{SRW_LOCK_CALL};
  trx->mutex_lock();

  for (lock_t *prev, *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock;
       lock= prev)
4040 4041
  {
    ut_ad(lock->trx == trx);
Marko Mäkelä's avatar
Marko Mäkelä committed
4042 4043
    prev= UT_LIST_GET_PREV(trx_locks, lock);
    if (!lock->is_table())
4044 4045
    {
      ut_ad(!lock->index->table->is_temporary());
Marko Mäkelä's avatar
Marko Mäkelä committed
4046 4047
      if (lock->mode() != LOCK_X || lock->is_gap())
        lock_rec_dequeue_from_page(lock, false);
4048 4049 4050
    }
    else
    {
4051
      ut_d(dict_table_t *table= lock->un_member.tab_lock.table);
4052
      ut_ad(!table->is_temporary());
Marko Mäkelä's avatar
Marko Mäkelä committed
4053
      switch (lock->mode()) {
4054 4055
      case LOCK_IS:
      case LOCK_S:
Marko Mäkelä's avatar
Marko Mäkelä committed
4056
        lock_table_dequeue(lock, false);
4057 4058 4059 4060 4061 4062
        break;
      case LOCK_IX:
      case LOCK_X:
        ut_ad(table->id >= DICT_HDR_FIRST_ID || trx->dict_operation);
        /* fall through */
      default:
Marko Mäkelä's avatar
Marko Mäkelä committed
4063
        break;
4064 4065
      }
    }
Marko Mäkelä's avatar
Marko Mäkelä committed
4066
  }
4067

Marko Mäkelä's avatar
Marko Mäkelä committed
4068 4069 4070
  trx->mutex_unlock();
}

4071
/** Release locks on a table whose creation is being rolled back */
4072 4073
ATTRIBUTE_COLD
void lock_release_on_rollback(trx_t *trx, dict_table_t *table)
4074 4075 4076
{
  trx->mod_tables.erase(table);

4077 4078 4079
  /* This is very rarely executed code, in the rare case that an
  CREATE TABLE operation is being rolled back. Theoretically,
  we might try to remove the locks in multiple memory transactions. */
4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104
  lock_sys.wr_lock(SRW_LOCK_CALL);
  trx->mutex_lock();

  for (lock_t *next, *lock= UT_LIST_GET_FIRST(table->locks); lock; lock= next)
  {
    next= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock);
    ut_ad(lock->trx == trx);
    UT_LIST_REMOVE(trx->lock.trx_locks, lock);
    ut_list_remove(table->locks, lock, TableLockGetNode());
  }

  for (lock_t *p, *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock; lock= p)
  {
    p= UT_LIST_GET_PREV(trx_locks, lock);
    ut_ad(lock->trx == trx);
    if (lock->is_table())
      ut_ad(lock->un_member.tab_lock.table != table);
    else if (lock->index->table == table)
      lock_rec_dequeue_from_page(lock, false);
  }

  lock_sys.wr_unlock();
  trx->mutex_unlock();
}

4105 4106 4107
/*********************************************************************//**
Removes table locks of the transaction on a table to be dropped. */
static
4108
void
4109 4110 4111
lock_trx_table_locks_remove(
/*========================*/
	const lock_t*	lock_to_remove)		/*!< in: lock to remove */
4112
{
4113
	trx_t*		trx = lock_to_remove->trx;
4114

4115
	ut_ad(lock_to_remove->is_table());
4116
	lock_sys.assert_locked(*lock_to_remove->un_member.tab_lock.table);
4117
	ut_ad(trx->mutex_is_owner());
4118

4119 4120
	for (lock_list::iterator it = trx->lock.table_locks.begin(),
             end = trx->lock.table_locks.end(); it != end; ++it) {
4121 4122
		const lock_t*	lock = *it;

4123
		ut_ad(!lock || trx == lock->trx);
4124
		ut_ad(!lock || lock->is_table());
4125
		ut_ad(!lock || lock->un_member.tab_lock.table);
4126

4127 4128 4129 4130
		if (lock == lock_to_remove) {
			*it = NULL;
			return;
		}
4131 4132
	}

4133 4134
	/* Lock must exist in the vector. */
	ut_error;
4135 4136
}

4137
/*===================== VALIDATION AND DEBUGGING ====================*/
4138

4139 4140 4141 4142
/** Print info of a table lock.
@param[in,out]	file	output stream
@param[in]	lock	table lock */
static
4143
void
4144
lock_table_print(FILE* file, const lock_t* lock)
4145
{
4146
	lock_sys.assert_locked();
4147
	ut_a(lock->is_table());
4148

4149 4150 4151
	fputs("TABLE LOCK table ", file);
	ut_print_name(file, lock->trx,
		      lock->un_member.tab_lock.table->name.m_name);
4152
	fprintf(file, " trx id " TRX_ID_FMT, lock->trx->id);
4153

4154 4155
	switch (auto mode = lock->mode()) {
	case LOCK_S:
4156
		fputs(" lock mode S", file);
4157 4158
		break;
	case LOCK_X:
4159 4160
		ut_ad(lock->trx->id != 0);
		fputs(" lock mode X", file);
4161 4162
		break;
	case LOCK_IS:
4163
		fputs(" lock mode IS", file);
4164 4165
		break;
	case LOCK_IX:
4166 4167
		ut_ad(lock->trx->id != 0);
		fputs(" lock mode IX", file);
4168 4169
		break;
	case LOCK_AUTO_INC:
4170
		fputs(" lock mode AUTO-INC", file);
4171 4172 4173
		break;
	default:
		fprintf(file, " unknown lock mode %u", mode);
4174 4175
	}

4176
	if (lock->is_waiting()) {
4177 4178
		fputs(" waiting", file);
	}
4179

4180
	putc('\n', file);
4181 4182
}

Marko Mäkelä's avatar
Marko Mäkelä committed
4183
/** Pretty-print a record lock.
4184
@param[in,out]	file	output stream
Marko Mäkelä's avatar
Marko Mäkelä committed
4185 4186 4187
@param[in]	lock	record lock
@param[in,out]	mtr	mini-transaction for accessing the record */
static void lock_rec_print(FILE* file, const lock_t* lock, mtr_t& mtr)
4188
{
4189
	ut_ad(!lock->is_table());
4190

4191
	const page_id_t page_id{lock->un_member.rec_lock.page_id};
4192
	ut_d(lock_sys.hash_get(lock->type_mode).assert_locked(page_id));
4193

4194 4195 4196 4197
	fprintf(file, "RECORD LOCKS space id %u page no %u n bits " ULINTPF
		" index %s of table ",
		page_id.space(), page_id.page_no(),
		lock_rec_get_n_bits(lock),
4198
		lock->index->name());
4199
	ut_print_name(file, lock->trx, lock->index->table->name.m_name);
4200
	fprintf(file, " trx id " TRX_ID_FMT, lock->trx->id);
4201

4202 4203
	switch (lock->mode()) {
	case LOCK_S:
4204
		fputs(" lock mode S", file);
4205 4206
		break;
	case LOCK_X:
4207
		fputs(" lock_mode X", file);
4208 4209
		break;
	default:
4210 4211
		ut_error;
	}
4212

4213
	if (lock->is_gap()) {
4214 4215
		fputs(" locks gap before rec", file);
	}
4216

4217
	if (lock->is_record_not_gap()) {
4218 4219
		fputs(" locks rec but not gap", file);
	}
4220

4221
	if (lock->is_insert_intention()) {
4222 4223
		fputs(" insert intention", file);
	}
4224

4225
	if (lock->is_waiting()) {
4226 4227
		fputs(" waiting", file);
	}
4228

4229 4230
	putc('\n', file);

Marko Mäkelä's avatar
Marko Mäkelä committed
4231
	mem_heap_t*		heap		= NULL;
4232 4233
	rec_offs		offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*		offsets		= offsets_;
Marko Mäkelä's avatar
Marko Mäkelä committed
4234
	rec_offs_init(offsets_);
4235

Marko Mäkelä's avatar
Marko Mäkelä committed
4236
	mtr.start();
4237
	const buf_block_t* block = buf_page_try_get(page_id, &mtr);
4238 4239 4240 4241 4242 4243 4244 4245 4246 4247

	for (ulint i = 0; i < lock_rec_get_n_bits(lock); ++i) {

		if (!lock_rec_get_nth_bit(lock, i)) {
			continue;
		}

		fprintf(file, "Record lock, heap no %lu", (ulong) i);

		if (block) {
4248
			ut_ad(page_is_leaf(block->page.frame));
4249 4250 4251 4252
			const rec_t*	rec;

			rec = page_find_rec_with_heap_no(
				buf_block_get_frame(block), i);
4253
			ut_ad(!page_rec_is_metadata(rec));
4254

4255
			offsets = rec_get_offsets(
4256 4257
				rec, lock->index, offsets,
				lock->index->n_core_fields,
4258 4259 4260 4261
				ULINT_UNDEFINED, &heap);

			putc(' ', file);
			rec_print_new(file, rec, offsets);
4262
		}
4263 4264

		putc('\n', file);
4265 4266
	}

Marko Mäkelä's avatar
Marko Mäkelä committed
4267
	mtr.commit();
4268

Marko Mäkelä's avatar
Marko Mäkelä committed
4269
	if (UNIV_LIKELY_NULL(heap)) {
4270 4271
		mem_heap_free(heap);
	}
4272 4273
}

4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284
#ifdef UNIV_DEBUG
/* Print the number of lock structs from lock_print_info_summary() only
in non-production builds for performance reasons, see
http://bugs.mysql.com/36942 */
#define PRINT_NUM_OF_LOCK_STRUCTS
#endif /* UNIV_DEBUG */

#ifdef PRINT_NUM_OF_LOCK_STRUCTS
/*********************************************************************//**
Calculates the number of record lock structs in the record lock hash table.
@return number of record locks */
4285
TRANSACTIONAL_TARGET
4286
static ulint lock_get_n_rec_locks()
4287
{
4288 4289
	ulint	n_locks	= 0;
	ulint	i;
4290

4291
	lock_sys.assert_locked();
4292

4293
	for (i = 0; i < lock_sys.rec_hash.n_cells; i++) {
4294
		const lock_t*	lock;
4295

4296
		for (lock = static_cast<const lock_t*>(
4297
			     HASH_GET_FIRST(&lock_sys.rec_hash, i));
4298 4299 4300
		     lock != 0;
		     lock = static_cast<const lock_t*>(
				HASH_GET_NEXT(hash, lock))) {
4301

4302 4303 4304
			n_locks++;
		}
	}
4305

4306 4307 4308
	return(n_locks);
}
#endif /* PRINT_NUM_OF_LOCK_STRUCTS */
4309

4310 4311
/*********************************************************************//**
Prints info of locks for all transactions.
4312
@return FALSE if not able to acquire lock_sys.latch (and dislay info) */
4313 4314 4315 4316
ibool
lock_print_info_summary(
/*====================*/
	FILE*	file,	/*!< in: file where to print */
4317
	ibool	nowait)	/*!< in: whether to wait for lock_sys.latch */
4318
{
4319 4320 4321
	/* Here, lock elision does not make sense, because
	for the output we are going to invoke system calls,
	which would interrupt a memory transaction. */
4322
	if (!nowait) {
4323 4324
		lock_sys.wr_lock(SRW_LOCK_CALL);
	} else if (!lock_sys.wr_lock_try()) {
4325 4326 4327 4328
		fputs("FAIL TO OBTAIN LOCK MUTEX,"
		      " SKIP LOCK INFO PRINTING\n", file);
		return(FALSE);
	}
4329

4330
	if (lock_sys.deadlocks) {
4331 4332 4333 4334 4335 4336
		fputs("------------------------\n"
		      "LATEST DETECTED DEADLOCK\n"
		      "------------------------\n", file);

		if (!srv_read_only_mode) {
			ut_copy_file(file, lock_latest_err_file);
4337 4338 4339
		}
	}

4340 4341 4342
	fputs("------------\n"
	      "TRANSACTIONS\n"
	      "------------\n", file);
4343

4344
	fprintf(file, "Trx id counter " TRX_ID_FMT "\n",
4345
		trx_sys.get_max_trx_id());
4346

4347 4348
	fprintf(file,
		"Purge done for trx's n:o < " TRX_ID_FMT
4349
		" undo n:o < " TRX_ID_FMT " state: %s\n"
4350
		"History list length %u\n",
4351
		purge_sys.tail.trx_no,
4352 4353 4354 4355 4356
		purge_sys.tail.undo_no,
		purge_sys.enabled()
		? (purge_sys.running() ? "running"
		   : purge_sys.paused() ? "stopped" : "running but idle")
		: "disabled",
4357
		trx_sys.history_size());
4358

4359 4360 4361 4362 4363 4364 4365
#ifdef PRINT_NUM_OF_LOCK_STRUCTS
	fprintf(file,
		"Total number of lock structs in row lock hash table %lu\n",
		(ulong) lock_get_n_rec_locks());
#endif /* PRINT_NUM_OF_LOCK_STRUCTS */
	return(TRUE);
}
4366

4367 4368
/** Prints transaction lock wait and MVCC state.
@param[in,out]	file	file where to print
Marko Mäkelä's avatar
Marko Mäkelä committed
4369
@param[in]	trx	transaction
4370 4371 4372
@param[in]	now	current my_hrtime_coarse() */
void lock_trx_print_wait_and_mvcc_state(FILE *file, const trx_t *trx,
                                        my_hrtime_t now)
4373
{
4374
	fprintf(file, "---");
4375

4376
	trx_print_latched(file, trx, 600);
4377
	trx->read_view.print_limits(file);
4378

4379
	if (const lock_t* wait_lock = trx->lock.wait_lock) {
4380
		const my_hrtime_t suspend_time= trx->lock.suspend_time;
4381
		fprintf(file,
4382
			"------- TRX HAS BEEN WAITING %llu ns"
4383
			" FOR THIS LOCK TO BE GRANTED:\n",
4384
			now.val - suspend_time.val);
4385

4386
		if (!wait_lock->is_table()) {
Marko Mäkelä's avatar
Marko Mäkelä committed
4387
			mtr_t mtr;
4388
			lock_rec_print(file, wait_lock, mtr);
4389
		} else {
4390
			lock_table_print(file, wait_lock);
4391 4392
		}

4393 4394 4395
		fprintf(file, "------------------\n");
	}
}
4396

4397
/*********************************************************************//**
4398
Prints info of locks for a transaction. */
4399
static
4400
void
4401 4402 4403
lock_trx_print_locks(
/*=================*/
	FILE*		file,		/*!< in/out: File to write */
4404
	const trx_t*	trx)		/*!< in: current transaction */
4405
{
Marko Mäkelä's avatar
Marko Mäkelä committed
4406
	mtr_t mtr;
4407
	uint32_t i= 0;
4408
	/* Iterate over the transaction's locks. */
4409
	lock_sys.assert_locked();
4410 4411 4412
	for (lock_t *lock = UT_LIST_GET_FIRST(trx->lock.trx_locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
4413
		if (!lock->is_table()) {
Marko Mäkelä's avatar
Marko Mäkelä committed
4414
			lock_rec_print(file, lock, mtr);
4415 4416 4417 4418
		} else {
			lock_table_print(file, lock);
		}

4419
		if (++i == 10) {
4420 4421 4422 4423 4424 4425

			fprintf(file,
				"10 LOCKS PRINTED FOR THIS TRX:"
				" SUPPRESSING FURTHER PRINTS\n");

			break;
4426 4427
		}
	}
4428
}
4429

Marko Mäkelä's avatar
Marko Mäkelä committed
4430
/** Functor to display all transactions */
4431
struct lock_print_info
4432
{
4433
  lock_print_info(FILE* file, my_hrtime_t now) :
4434
    file(file), now(now),
4435
    purge_trx(purge_sys.query ? purge_sys.query->trx : nullptr)
4436
  {}
4437

4438
  void operator()(const trx_t &trx) const
4439
  {
4440
    if (UNIV_UNLIKELY(&trx == purge_trx))
4441
      return;
4442
    lock_trx_print_wait_and_mvcc_state(file, &trx, now);
4443

4444 4445
    if (trx.will_lock && srv_print_innodb_lock_monitor)
      lock_trx_print_locks(file, &trx);
4446
  }
4447

4448
  FILE* const file;
4449
  const my_hrtime_t now;
4450
  const trx_t* const purge_trx;
4451
};
4452

4453
/*********************************************************************//**
4454 4455
Prints info of locks for each transaction. This function will release
lock_sys.latch, which the caller must be holding in exclusive mode. */
4456 4457 4458 4459
void
lock_print_info_all_transactions(
/*=============================*/
	FILE*		file)	/*!< in/out: file where to print */
4460
{
4461 4462
	fprintf(file, "LIST OF TRANSACTIONS FOR EACH SESSION:\n");

4463
	trx_sys.trx_list.for_each(lock_print_info(file, my_hrtime_coarse()));
4464
	lock_sys.wr_unlock();
4465

4466
	ut_d(lock_validate());
4467 4468
}

4469
#ifdef UNIV_DEBUG
4470
/*********************************************************************//**
4471 4472 4473 4474 4475 4476 4477 4478
Find the the lock in the trx_t::trx_lock_t::table_locks vector.
@return true if found */
static
bool
lock_trx_table_locks_find(
/*======================*/
	trx_t*		trx,		/*!< in: trx to validate */
	const lock_t*	find_lock)	/*!< in: lock to find */
4479
{
4480
	bool		found = false;
4481

4482 4483
	ut_ad(trx->mutex_is_owner());

4484 4485
	for (lock_list::const_iterator it = trx->lock.table_locks.begin(),
             end = trx->lock.table_locks.end(); it != end; ++it) {
4486

4487
		const lock_t*	lock = *it;
4488

4489
		if (lock == NULL) {
4490

4491
			continue;
4492

4493
		} else if (lock == find_lock) {
4494

4495 4496 4497 4498
			/* Can't be duplicates. */
			ut_a(!found);
			found = true;
		}
4499

4500
		ut_a(trx == lock->trx);
4501
		ut_a(lock->is_table());
4502
		ut_a(lock->un_member.tab_lock.table != NULL);
4503 4504
	}

4505 4506
	return(found);
}
4507 4508

/*********************************************************************//**
4509 4510 4511 4512 4513 4514 4515
Validates the lock queue on a table.
@return TRUE if ok */
static
ibool
lock_table_queue_validate(
/*======================*/
	const dict_table_t*	table)	/*!< in: table */
4516
{
4517 4518
	const lock_t*	lock;

4519
	lock_sys.assert_locked(*table);
4520

4521 4522 4523
	for (lock = UT_LIST_GET_FIRST(table->locks);
	     lock != NULL;
	     lock = UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) {
4524

4525
		/* lock->trx->state cannot change from or to NOT_STARTED
4526
		while we are holding the lock_sys.latch. It may change
4527
		from ACTIVE or PREPARED to PREPARED or COMMITTED. */
4528
		lock->trx->mutex_lock();
4529
		check_trx_state(lock->trx);
4530

Marko Mäkelä's avatar
Marko Mäkelä committed
4531
		if (lock->trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4532
		} else if (!lock->is_waiting()) {
4533 4534
			ut_a(!lock_table_other_has_incompatible(
				     lock->trx, 0, table,
4535
				     lock->mode()));
4536 4537 4538 4539 4540
		} else {
			ut_a(lock_table_has_to_wait_in_queue(lock));
		}

		ut_a(lock_trx_table_locks_find(lock->trx, lock));
4541
		lock->trx->mutex_unlock();
4542 4543 4544
	}

	return(TRUE);
4545 4546 4547
}

/*********************************************************************//**
4548 4549 4550
Validates the lock queue on a single record.
@return TRUE if ok */
static
4551
bool
4552 4553
lock_rec_queue_validate(
/*====================*/
4554
	bool			locked_lock_trx_sys,
4555
					/*!< in: if the caller holds
4556
					both the lock_sys.latch and
4557
					trx_sys_t->lock. */
4558
	const page_id_t		id,	/*!< in: page identifier */
4559 4560
	const rec_t*		rec,	/*!< in: record to look at */
	const dict_index_t*	index,	/*!< in: index, or NULL if not known */
4561
	const rec_offs*		offsets)/*!< in: rec_get_offsets(rec, index) */
4562
{
4563 4564
	const lock_t*	lock;
	ulint		heap_no;
4565

4566 4567 4568
	ut_a(rec);
	ut_ad(rec_offs_validate(rec, index, offsets));
	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
4569
	ut_ad(page_rec_is_leaf(rec));
4570 4571
	ut_ad(!index || dict_index_is_clust(index)
	      || !dict_index_is_online_ddl(index));
4572

4573
	heap_no = page_rec_get_heap_no(rec);
4574

4575
	if (!locked_lock_trx_sys) {
4576
		lock_sys.wr_lock(SRW_LOCK_CALL);
4577
	}
4578

4579 4580
	hash_cell_t &cell= *lock_sys.rec_hash.cell_get(id.fold());
	lock_sys.assert_locked(cell);
4581

4582
	if (!page_rec_is_user_rec(rec)) {
4583

4584
		for (lock = lock_sys_t::get_first(cell, id, heap_no);
4585 4586
		     lock != NULL;
		     lock = lock_rec_get_next_const(heap_no, lock)) {
4587

4588
			ut_ad(!index || lock->index == index);
4589

4590
			lock->trx->mutex_lock();
4591 4592
			ut_ad(!lock->trx->read_only
			      || !lock->trx->is_autocommit_non_locking());
4593 4594
			ut_ad(trx_state_eq(lock->trx,
					   TRX_STATE_COMMITTED_IN_MEMORY)
4595
			      || !lock->is_waiting()
4596
			      || lock_rec_has_to_wait_in_queue(cell, lock));
4597
			lock->trx->mutex_unlock();
4598
		}
4599

Marko Mäkelä's avatar
Marko Mäkelä committed
4600 4601
func_exit:
		if (!locked_lock_trx_sys) {
4602
			lock_sys.wr_unlock();
4603
		}
4604

Marko Mäkelä's avatar
Marko Mäkelä committed
4605
		return true;
4606 4607
	}

4608 4609
	ut_ad(page_rec_is_leaf(rec));

Marko Mäkelä's avatar
Marko Mäkelä committed
4610 4611 4612
	const trx_id_t impl_trx_id = index && index->is_primary()
		? lock_clust_rec_some_has_impl(rec, index, offsets)
		: 0;
4613

Marko Mäkelä's avatar
Marko Mäkelä committed
4614 4615 4616
	if (trx_t *impl_trx = impl_trx_id
	    ? trx_sys.find(current_trx(), impl_trx_id, false)
	    : 0) {
4617 4618
		/* impl_trx could have been committed before we
		acquire its mutex, but not thereafter. */
4619

4620
		impl_trx->mutex_lock();
4621 4622
		ut_ad(impl_trx->state != TRX_STATE_NOT_STARTED);
		if (impl_trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
4623 4624
		} else if (const lock_t* other_lock
			   = lock_rec_other_has_expl_req(
4625 4626
				   LOCK_S, cell, id, true, heap_no,
				   impl_trx)) {
4627 4628 4629 4630 4631
			/* The impl_trx is holding an implicit lock on the
			given record 'rec'. So there cannot be another
			explicit granted lock.  Also, there can be another
			explicit waiting lock only if the impl_trx has an
			explicit granted lock. */
4632

4633
#ifdef WITH_WSREP
4634 4635 4636 4637 4638 4639 4640 4641 4642 4643 4644 4645 4646 4647 4648 4649 4650 4651 4652
			/** Galera record locking rules:
			* If there is no other record lock to the same record, we may grant
			the lock request.
			* If there is other record lock but this requested record lock is
			compatible, we may grant the lock request.
			* If there is other record lock and it is not compatible with
			requested lock, all normal transactions must wait.
			* BF (brute force) additional exceptions :
			** If BF already holds record lock for requested record, we may
			grant new record lock even if there is conflicting record lock(s)
			waiting on a queue.
			** If conflicting transaction holds requested record lock,
			we will cancel this record lock and select conflicting transaction
			for BF abort or kill victim.
			** If conflicting transaction is waiting for requested record lock
			we will cancel this wait and select conflicting transaction
			for BF abort or kill victim.
			** There should not be two BF transactions waiting for same record lock
			*/
4653
			if (other_lock->trx->is_wsrep() && !other_lock->is_waiting()) {
4654 4655
				wsrep_report_bf_lock_wait(impl_trx->mysql_thd, impl_trx->id);
				wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
4656

4657
				if (!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
4658
						       cell, id, heap_no,
4659
						       impl_trx)) {
4660
					ib::info() << "WSREP impl BF lock conflict";
4661
				}
4662
			} else
4663
#endif /* WITH_WSREP */
4664
			{
4665
				ut_ad(other_lock->is_waiting());
4666
				ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
4667 4668
						        cell, id, heap_no,
							impl_trx));
4669
			}
4670
		}
4671

4672
		impl_trx->mutex_unlock();
4673
	}
4674

4675
	for (lock = lock_sys_t::get_first(cell, id, heap_no);
4676 4677
	     lock != NULL;
	     lock = lock_rec_get_next_const(heap_no, lock)) {
4678 4679
		ut_ad(!lock->trx->read_only
		      || !lock->trx->is_autocommit_non_locking());
4680
		ut_ad(!page_rec_is_metadata(rec));
4681

4682 4683 4684
		if (index) {
			ut_a(lock->index == index);
		}
4685

4686 4687
		if (lock->is_waiting()) {
			ut_a(lock->is_gap()
4688
			     || lock_rec_has_to_wait_in_queue(cell, lock));
4689 4690 4691
		} else if (!lock->is_gap()) {
			const lock_mode	mode = lock->mode() == LOCK_S
				? LOCK_X : LOCK_S;
4692

4693 4694
			const lock_t*	other_lock
				= lock_rec_other_has_expl_req(
4695 4696
					mode, cell, id, false, heap_no,
					lock->trx);
4697
#ifdef WITH_WSREP
4698 4699 4700 4701 4702 4703 4704 4705 4706 4707 4708 4709
			if (UNIV_UNLIKELY(other_lock && lock->trx->is_wsrep())) {
				/* Only BF transaction may be granted
				lock before other conflicting lock
				request. */
				if (!wsrep_thd_is_BF(lock->trx->mysql_thd, FALSE)
				    && !wsrep_thd_is_BF(other_lock->trx->mysql_thd, FALSE)) {
					/* If no BF, this case is a bug. */
					wsrep_report_bf_lock_wait(lock->trx->mysql_thd, lock->trx->id);
					wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
					ut_error;
				}
			} else
4710
#endif /* WITH_WSREP */
4711
			ut_ad(!other_lock);
4712
		}
4713
	}
4714

Marko Mäkelä's avatar
Marko Mäkelä committed
4715
	goto func_exit;
4716
}
4717

4718 4719 4720 4721 4722
/** Validate the record lock queues on a page.
@param block    buffer pool block
@param latched  whether the tablespace latch may be held
@return true if ok */
static bool lock_rec_validate_page(const buf_block_t *block, bool latched)
4723
{
4724 4725 4726 4727 4728 4729
	const lock_t*	lock;
	const rec_t*	rec;
	ulint		nth_lock	= 0;
	ulint		nth_bit		= 0;
	ulint		i;
	mem_heap_t*	heap		= NULL;
4730 4731
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*	offsets		= offsets_;
4732
	rec_offs_init(offsets_);
4733

4734
	const page_id_t id{block->page.id()};
4735 4736

	LockGuard g{lock_sys.rec_hash, id};
4737
loop:
4738
	lock = lock_sys_t::get_first(g.cell(), id);
4739

4740 4741
	if (!lock) {
		goto function_exit;
4742 4743
	}

4744
	DBUG_ASSERT(!block->page.is_freed());
4745

4746
	for (i = 0; i < nth_lock; i++) {
4747

4748
		lock = lock_rec_get_next_on_page_const(lock);
4749

4750 4751
		if (!lock) {
			goto function_exit;
4752 4753 4754
		}
	}

4755 4756
	ut_ad(!lock->trx->read_only
	      || !lock->trx->is_autocommit_non_locking());
4757

4758
	/* Only validate the record queues when this thread is not
4759 4760
	holding a tablespace latch. */
	if (!latched)
4761
	for (i = nth_bit; i < lock_rec_get_n_bits(lock); i++) {
4762

4763 4764
		if (i == PAGE_HEAP_NO_SUPREMUM
		    || lock_rec_get_nth_bit(lock, i)) {
4765

4766
			rec = page_find_rec_with_heap_no(block->page.frame, i);
4767
			ut_a(rec);
4768 4769
			ut_ad(!lock_rec_get_nth_bit(lock, i)
			      || page_rec_is_leaf(rec));
4770
			offsets = rec_get_offsets(rec, lock->index, offsets,
4771 4772
						  lock->index->n_core_fields,
						  ULINT_UNDEFINED, &heap);
4773

4774 4775 4776 4777
			/* If this thread is holding the file space
			latch (fil_space_t::latch), the following
			check WILL break the latching order and may
			cause a deadlock of threads. */
4778

4779
			lock_rec_queue_validate(
4780
				true, id, rec, lock->index, offsets);
Sergei Golubchik's avatar
Sergei Golubchik committed
4781

4782
			nth_bit = i + 1;
4783

4784
			goto loop;
4785 4786 4787
		}
	}

4788 4789
	nth_bit = 0;
	nth_lock++;
4790

4791
	goto loop;
4792

4793 4794 4795 4796
function_exit:
	if (heap != NULL) {
		mem_heap_free(heap);
	}
4797 4798 4799
	return(TRUE);
}

4800 4801 4802
/*********************************************************************//**
Validate record locks up to a limit.
@return lock at limit or NULL if no more locks in the hash bucket */
4803
static MY_ATTRIBUTE((warn_unused_result))
4804 4805 4806
const lock_t*
lock_rec_validate(
/*==============*/
4807
	ulint		start,		/*!< in: lock_sys.rec_hash
4808
					bucket */
4809
	page_id_t*	limit)		/*!< in/out: upper limit of
4810 4811
					(space, page_no) */
{
4812
	lock_sys.assert_locked();
4813

4814
	for (const lock_t* lock = static_cast<const lock_t*>(
4815
		     HASH_GET_FIRST(&lock_sys.rec_hash, start));
4816 4817
	     lock != NULL;
	     lock = static_cast<const lock_t*>(HASH_GET_NEXT(hash, lock))) {
4818

4819 4820
		ut_ad(!lock->trx->read_only
		      || !lock->trx->is_autocommit_non_locking());
4821
		ut_ad(!lock->is_table());
4822

4823
		page_id_t current(lock->un_member.rec_lock.page_id);
4824

4825 4826 4827 4828
		if (current > *limit) {
			*limit = current + 1;
			return(lock);
		}
4829 4830
	}

4831 4832
	return(0);
}
4833

4834 4835
/*********************************************************************//**
Validate a record lock's block */
4836
static void lock_rec_block_validate(const page_id_t page_id)
4837 4838 4839 4840
{
	/* The lock and the block that it is referring to may be freed at
	this point. We pass BUF_GET_POSSIBLY_FREED to skip a debug check.
	If the lock exists in lock_rec_validate_page() we assert
4841
	block->page.status != FREED. */
4842

4843 4844
	buf_block_t*	block;
	mtr_t		mtr;
4845

4846 4847 4848 4849 4850
	/* Transactional locks should never refer to dropped
	tablespaces, because all DDL operations that would drop or
	discard or rebuild a tablespace do hold an exclusive table
	lock, which would conflict with any locks referring to the
	tablespace from other transactions. */
4851
	if (fil_space_t* space = fil_space_t::get(page_id.space())) {
4852 4853
		dberr_t err = DB_SUCCESS;
		mtr_start(&mtr);
4854

4855
		block = buf_page_get_gen(
4856
			page_id,
4857
			space->zip_size(),
4858 4859
			RW_X_LATCH, NULL,
			BUF_GET_POSSIBLY_FREED,
4860
			&mtr, &err);
4861

4862 4863
		if (err != DB_SUCCESS) {
			ib::error() << "Lock rec block validate failed for tablespace "
4864
				   << space->chain.start->name
4865
				   << page_id << " err " << err;
4866 4867
		}

4868
		ut_ad(!block || block->page.is_freed()
Marko Mäkelä's avatar
Marko Mäkelä committed
4869
		      || lock_rec_validate_page(block, space->is_latched()));
4870

4871
		mtr_commit(&mtr);
4872

4873
		space->release();
4874
	}
4875
}
4876

4877
static my_bool lock_validate_table_locks(rw_trx_hash_element_t *element, void*)
4878
{
4879
  lock_sys.assert_locked();
4880
  mysql_mutex_lock(&element->mutex);
4881 4882 4883 4884 4885 4886
  if (element->trx)
  {
    check_trx_state(element->trx);
    for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
         lock != NULL;
         lock= UT_LIST_GET_NEXT(trx_locks, lock))
4887
      if (lock->is_table())
4888 4889
        lock_table_queue_validate(lock->un_member.tab_lock.table);
  }
4890
  mysql_mutex_unlock(&element->mutex);
4891 4892 4893 4894
  return 0;
}


4895 4896
/** Validate the transactional locks. */
static void lock_validate()
4897
{
4898 4899
  std::set<page_id_t> pages;
  {
4900
    LockMutexGuard g{SRW_LOCK_CALL};
4901 4902
    /* Validate table locks */
    trx_sys.rw_trx_hash.iterate(lock_validate_table_locks);
4903

4904 4905 4906 4907 4908 4909 4910 4911 4912 4913 4914 4915
    for (ulint i= 0; i < lock_sys.rec_hash.n_cells; i++)
    {
      page_id_t limit{0, 0};
      while (const lock_t *lock= lock_rec_validate(i, &limit))
      {
        if (lock_rec_find_set_bit(lock) == ULINT_UNDEFINED)
          /* The lock bitmap is empty; ignore it. */
          continue;
        pages.insert(lock->un_member.rec_lock.page_id);
      }
    }
  }
4916

4917 4918
  for (page_id_t page_id : pages)
    lock_rec_block_validate(page_id);
4919 4920 4921 4922 4923 4924 4925 4926 4927 4928
}
#endif /* UNIV_DEBUG */
/*============ RECORD LOCK CHECKS FOR ROW OPERATIONS ====================*/

/*********************************************************************//**
Checks if locks of other transactions prevent an immediate insert of
a record. If they do, first tests if the query thread should anyway
be suspended for some reason; if not, then puts the transaction and
the query thread to the lock wait state and inserts a waiting request
for a gap x-lock to the lock queue.
4929
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
4930
TRANSACTIONAL_TARGET
4931 4932 4933 4934 4935 4936 4937 4938
dberr_t
lock_rec_insert_check_and_lock(
/*===========================*/
	const rec_t*	rec,	/*!< in: record after which to insert */
	buf_block_t*	block,	/*!< in/out: buffer block of rec */
	dict_index_t*	index,	/*!< in: index */
	que_thr_t*	thr,	/*!< in: query thread */
	mtr_t*		mtr,	/*!< in/out: mini-transaction */
Eugene Kosov's avatar
Eugene Kosov committed
4939
	bool*		inherit)/*!< out: set to true if the new
4940 4941 4942 4943
				inserted record maybe should inherit
				LOCK_GAP type locks from the successor
				record */
{
4944
  ut_ad(block->page.frame == page_align(rec));
4945
  ut_ad(mtr->is_named_space(index->table->space));
4946
  ut_ad(page_is_leaf(block->page.frame));
4947
  ut_ad(!index->table->is_temporary());
4948

4949 4950 4951 4952 4953 4954 4955
  dberr_t err= DB_SUCCESS;
  bool inherit_in= *inherit;
  trx_t *trx= thr_get_trx(thr);
  const rec_t *next_rec= page_rec_get_next_const(rec);
  ulint heap_no= page_rec_get_heap_no(next_rec);
  const page_id_t id{block->page.id()};
  ut_ad(!rec_is_metadata(next_rec, *index));
4956

4957
  {
4958
    LockGuard g{lock_sys.rec_hash, id};
4959 4960 4961
    /* Because this code is invoked for a running transaction by
    the thread that is serving the transaction, it is not necessary
    to hold trx->mutex here. */
4962

4963 4964 4965 4966
    /* When inserting a record into an index, the table must be at
    least IX-locked. When we are building an index, we would pass
    BTR_NO_LOCKING_FLAG and skip the locking altogether. */
    ut_ad(lock_table_has(trx, index->table, LOCK_IX));
4967

4968
    *inherit= lock_sys_t::get_first(g.cell(), id, heap_no);
4969

4970 4971 4972 4973 4974 4975 4976 4977 4978 4979 4980 4981 4982 4983 4984 4985 4986 4987
    if (*inherit)
    {
      /* Spatial index does not use GAP lock protection. It uses
      "predicate lock" to protect the "range" */
      if (index->is_spatial())
        return DB_SUCCESS;

      /* If another transaction has an explicit lock request which locks
      the gap, waiting or granted, on the successor, the insert has to wait.

      An exception is the case where the lock by the another transaction
      is a gap type lock which it placed to wait for its turn to insert. We
      do not consider that kind of a lock conflicting with our insert. This
      eliminates an unnecessary deadlock which resulted when 2 transactions
      had to wait for their insert. Both had waiting gap type lock requests
      on the successor, which produced an unnecessary deadlock. */
      const unsigned type_mode= LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION;

4988 4989 4990
      if (lock_t *c_lock= lock_rec_other_has_conflicting(type_mode,
                                                         g.cell(), id,
                                                         heap_no, trx))
4991 4992
      {
        trx->mutex_lock();
4993
        err= lock_rec_enqueue_waiting(c_lock, type_mode, id, block->page.frame,
4994
                                      heap_no, index, thr, nullptr);
4995 4996 4997 4998
        trx->mutex_unlock();
      }
    }
  }
4999

5000 5001 5002 5003 5004 5005 5006 5007 5008 5009 5010 5011 5012
  switch (err) {
  case DB_SUCCESS_LOCKED_REC:
    err = DB_SUCCESS;
    /* fall through */
  case DB_SUCCESS:
    if (!inherit_in || index->is_clust())
      break;
    /* Update the page max trx id field */
    page_update_max_trx_id(block, buf_block_get_page_zip(block), trx->id, mtr);
  default:
    /* We only care about the two return values. */
    break;
  }
5013

5014
#ifdef UNIV_DEBUG
5015 5016 5017 5018 5019
  {
    mem_heap_t *heap= nullptr;
    rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
    const rec_offs *offsets;
    rec_offs_init(offsets_);
5020

Marko Mäkelä's avatar
Marko Mäkelä committed
5021
    offsets= rec_get_offsets(next_rec, index, offsets_, index->n_core_fields,
5022
                             ULINT_UNDEFINED, &heap);
5023

5024
    ut_ad(lock_rec_queue_validate(false, id, next_rec, index, offsets));
5025

5026 5027 5028
    if (UNIV_LIKELY_NULL(heap))
      mem_heap_free(heap);
  }
5029 5030
#endif /* UNIV_DEBUG */

5031
  return err;
5032 5033 5034
}

/*********************************************************************//**
5035 5036 5037 5038
Creates an explicit record lock for a running transaction that currently only
has an implicit lock on the record. The transaction instance must have a
reference count > 0 so that it can't be committed and freed before this
function has completed. */
5039
static
5040 5041 5042
void
lock_rec_convert_impl_to_expl_for_trx(
/*==================================*/
5043
	const page_id_t		id,	/*!< in: page identifier */
5044 5045 5046 5047
	const rec_t*		rec,	/*!< in: user record on page */
	dict_index_t*		index,	/*!< in: index of record */
	trx_t*			trx,	/*!< in/out: active transaction */
	ulint			heap_no)/*!< in: rec heap number to lock */
5048
{
5049 5050 5051
  ut_ad(trx->is_referenced());
  ut_ad(page_rec_is_leaf(rec));
  ut_ad(!rec_is_metadata(rec, *index));
5052

5053 5054
  DEBUG_SYNC_C("before_lock_rec_convert_impl_to_expl_for_trx");
  {
5055
    LockGuard g{lock_sys.rec_hash, id};
5056 5057
    trx->mutex_lock();
    ut_ad(!trx_state_eq(trx, TRX_STATE_NOT_STARTED));
5058

5059
    if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) &&
5060 5061 5062 5063
        !lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id, heap_no,
                           trx))
      lock_rec_add_to_queue(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id,
                            page_align(rec), heap_no, index, trx, true);
5064
  }
5065

5066 5067
  trx->mutex_unlock();
  trx->release_reference();
5068

5069
  DEBUG_SYNC_C("after_lock_rec_convert_impl_to_expl_for_trx");
5070 5071
}

5072 5073 5074 5075 5076

#ifdef UNIV_DEBUG
struct lock_rec_other_trx_holds_expl_arg
{
  const ulint heap_no;
5077
  const hash_cell_t &cell;
5078 5079
  const page_id_t id;
  const trx_t &impl_trx;
5080 5081 5082 5083 5084 5085 5086
};


static my_bool lock_rec_other_trx_holds_expl_callback(
  rw_trx_hash_element_t *element,
  lock_rec_other_trx_holds_expl_arg *arg)
{
5087
  mysql_mutex_lock(&element->mutex);
5088 5089
  if (element->trx)
  {
5090
    element->trx->mutex_lock();
Marko Mäkelä's avatar
Marko Mäkelä committed
5091 5092
    ut_ad(element->trx->state != TRX_STATE_NOT_STARTED);
    lock_t *expl_lock= element->trx->state == TRX_STATE_COMMITTED_IN_MEMORY
5093 5094
      ? nullptr
      : lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP,
5095
                          arg->cell, arg->id, arg->heap_no, element->trx);
5096 5097 5098 5099
    /*
      An explicit lock is held by trx other than the trx holding the implicit
      lock.
    */
5100
    ut_ad(!expl_lock || expl_lock->trx == &arg->impl_trx);
5101
    element->trx->mutex_unlock();
5102
  }
5103
  mysql_mutex_unlock(&element->mutex);
5104 5105 5106 5107 5108 5109 5110 5111 5112 5113 5114 5115 5116 5117 5118
  return 0;
}


/**
  Checks if some transaction, other than given trx_id, has an explicit
  lock on the given rec.

  FIXME: if the current transaction holds implicit lock from INSERT, a
  subsequent locking read should not convert it to explicit. See also
  MDEV-11215.

  @param      caller_trx  trx of current thread
  @param[in]  trx         trx holding implicit lock on rec
  @param[in]  rec         user record
5119
  @param[in]  id          page identifier
5120 5121 5122
*/
static void lock_rec_other_trx_holds_expl(trx_t *caller_trx, trx_t *trx,
                                          const rec_t *rec,
5123
                                          const page_id_t id)
5124 5125 5126
{
  if (trx)
  {
5127
    ut_ad(!page_rec_is_metadata(rec));
5128
    LockGuard g{lock_sys.rec_hash, id};
5129
    ut_ad(trx->is_referenced());
5130
    const trx_state_t state{trx->state};
5131 5132
    ut_ad(state != TRX_STATE_NOT_STARTED);
    if (state == TRX_STATE_COMMITTED_IN_MEMORY)
5133
      /* The transaction was committed before we acquired LockGuard. */
5134
      return;
5135
    lock_rec_other_trx_holds_expl_arg arg=
5136
    { page_rec_get_heap_no(rec), g.cell(), id, *trx };
5137
    trx_sys.rw_trx_hash.iterate(caller_trx,
5138
                                lock_rec_other_trx_holds_expl_callback, &arg);
5139 5140 5141 5142 5143
  }
}
#endif /* UNIV_DEBUG */


5144 5145 5146 5147 5148 5149 5150 5151 5152 5153 5154 5155
/** If an implicit x-lock exists on a record, convert it to an explicit one.

Often, this is called by a transaction that is about to enter a lock wait
due to the lock conflict. Two explicit locks would be created: first the
exclusive lock on behalf of the lock-holder transaction in this function,
and then a wait request on behalf of caller_trx, in the calling function.

This may also be called by the same transaction that is already holding
an implicit exclusive lock on the record. In this case, no explicit lock
should be created.

@param[in,out]	caller_trx	current transaction
5156
@param[in]	id		index tree leaf page identifier
5157 5158 5159 5160
@param[in]	rec		record on the leaf page
@param[in]	index		the index of the record
@param[in]	offsets		rec_get_offsets(rec,index)
@return	whether caller_trx already holds an exclusive lock on rec */
5161
static
5162
bool
5163
lock_rec_convert_impl_to_expl(
5164
	trx_t*			caller_trx,
5165
	page_id_t		id,
5166 5167
	const rec_t*		rec,
	dict_index_t*		index,
5168
	const rec_offs*		offsets)
5169
{
5170
	trx_t*		trx;
5171

5172
	lock_sys.assert_unlocked();
5173
	ut_ad(page_rec_is_user_rec(rec));
5174 5175
	ut_ad(rec_offs_validate(rec, index, offsets));
	ut_ad(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
5176
	ut_ad(page_rec_is_leaf(rec));
5177
	ut_ad(!rec_is_metadata(rec, *index));
5178

5179 5180
	if (dict_index_is_clust(index)) {
		trx_id_t	trx_id;
5181

5182 5183
		trx_id = lock_clust_rec_some_has_impl(rec, index, offsets);

5184 5185 5186 5187 5188 5189 5190
		if (trx_id == 0) {
			return false;
		}
		if (UNIV_UNLIKELY(trx_id == caller_trx->id)) {
			return true;
		}

5191
		trx = trx_sys.find(caller_trx, trx_id);
5192 5193 5194
	} else {
		ut_ad(!dict_index_is_online_ddl(index));

5195 5196
		trx = lock_sec_rec_some_has_impl(caller_trx, rec, index,
						 offsets);
5197 5198 5199 5200
		if (trx == caller_trx) {
			trx->release_reference();
			return true;
		}
5201

5202
		ut_d(lock_rec_other_trx_holds_expl(caller_trx, trx, rec, id));
5203 5204
	}

5205
	if (trx) {
5206
		ulint	heap_no = page_rec_get_heap_no(rec);
5207

5208
		ut_ad(trx->is_referenced());
5209

5210 5211 5212
		/* If the transaction is still active and has no
		explicit x-lock set on the record, set one for it.
		trx cannot be committed until the ref count is zero. */
5213

5214
		lock_rec_convert_impl_to_expl_for_trx(
5215
			id, rec, index, trx, heap_no);
5216
	}
5217 5218

	return false;
5219
}
5220

5221 5222 5223 5224 5225 5226 5227
/*********************************************************************//**
Checks if locks of other transactions prevent an immediate modify (update,
delete mark, or delete unmark) of a clustered index record. If they do,
first tests if the query thread should anyway be suspended for some
reason; if not, then puts the transaction and the query thread to the
lock wait state and inserts a waiting request for a record x-lock to the
lock queue.
5228
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5229 5230 5231 5232 5233 5234 5235
dberr_t
lock_clust_rec_modify_check_and_lock(
/*=================================*/
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: record which should be
					modified */
	dict_index_t*		index,	/*!< in: clustered index */
5236
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5237 5238 5239 5240
	que_thr_t*		thr)	/*!< in: query thread */
{
	dberr_t	err;
	ulint	heap_no;
5241

5242
	ut_ad(rec_offs_validate(rec, index, offsets));
5243
	ut_ad(page_rec_is_leaf(rec));
5244
	ut_ad(dict_index_is_clust(index));
5245
	ut_ad(block->page.frame == page_align(rec));
5246

5247
	ut_ad(!rec_is_metadata(rec, *index));
5248
	ut_ad(!index->table->is_temporary());
5249

5250 5251 5252
	heap_no = rec_offs_comp(offsets)
		? rec_get_heap_no_new(rec)
		: rec_get_heap_no_old(rec);
5253

5254 5255
	/* If a transaction has no explicit x-lock set on the record, set one
	for it */
5256

5257 5258
	if (lock_rec_convert_impl_to_expl(thr_get_trx(thr), block->page.id(),
					  rec, index, offsets)) {
5259 5260 5261
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
	}
5262

5263
	err = lock_rec_lock(true, LOCK_X | LOCK_REC_NOT_GAP,
5264
			    block, heap_no, index, thr);
5265

5266 5267
	ut_ad(lock_rec_queue_validate(false, block->page.id(),
				      rec, index, offsets));
5268 5269 5270

	if (err == DB_SUCCESS_LOCKED_REC) {
		err = DB_SUCCESS;
5271 5272
	}

5273
	return(err);
5274 5275 5276
}

/*********************************************************************//**
5277 5278
Checks if locks of other transactions prevent an immediate modify (delete
mark or delete unmark) of a secondary index record.
5279
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5280 5281 5282 5283 5284 5285 5286 5287 5288 5289 5290 5291 5292 5293 5294
dberr_t
lock_sec_rec_modify_check_and_lock(
/*===============================*/
	ulint		flags,	/*!< in: if BTR_NO_LOCKING_FLAG
				bit is set, does nothing */
	buf_block_t*	block,	/*!< in/out: buffer block of rec */
	const rec_t*	rec,	/*!< in: record which should be
				modified; NOTE: as this is a secondary
				index, we always have to modify the
				clustered index record first: see the
				comment below */
	dict_index_t*	index,	/*!< in: secondary index */
	que_thr_t*	thr,	/*!< in: query thread
				(can be NULL if BTR_NO_LOCKING_FLAG) */
	mtr_t*		mtr)	/*!< in/out: mini-transaction */
5295
{
5296 5297
	dberr_t	err;
	ulint	heap_no;
5298

5299 5300
	ut_ad(!dict_index_is_clust(index));
	ut_ad(!dict_index_is_online_ddl(index) || (flags & BTR_CREATE_FLAG));
5301
	ut_ad(block->page.frame == page_align(rec));
5302
	ut_ad(mtr->is_named_space(index->table->space));
5303
	ut_ad(page_rec_is_leaf(rec));
5304
	ut_ad(!rec_is_metadata(rec, *index));
5305

5306
	if (flags & BTR_NO_LOCKING_FLAG) {
5307

5308
		return(DB_SUCCESS);
5309
	}
5310
	ut_ad(!index->table->is_temporary());
5311

5312
	heap_no = page_rec_get_heap_no(rec);
5313

5314 5315 5316 5317 5318 5319 5320 5321 5322 5323
#ifdef WITH_WSREP
	trx_t *trx= thr_get_trx(thr);
	/* If transaction scanning an unique secondary key is wsrep
	high priority thread (brute force) this scanning may involve
	GAP-locking in the index. As this locking happens also when
	applying replication events in high priority applier threads,
	there is a probability for lock conflicts between two wsrep
	high priority threads. To avoid this GAP-locking we mark that
	this transaction is using unique key scan here. */
	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
Marko Mäkelä's avatar
Marko Mäkelä committed
5324
		trx->wsrep = 3;
5325 5326
#endif /* WITH_WSREP */

5327 5328 5329 5330
	/* Another transaction cannot have an implicit lock on the record,
	because when we come here, we already have modified the clustered
	index record, and this would not have been possible if another active
	transaction had modified this secondary index record. */
5331

5332
	err = lock_rec_lock(true, LOCK_X | LOCK_REC_NOT_GAP,
5333
			    block, heap_no, index, thr);
5334

5335
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
5336
	if (trx->wsrep == 3) trx->wsrep = 1;
5337
#endif /* WITH_WSREP */
5338

5339 5340 5341
#ifdef UNIV_DEBUG
	{
		mem_heap_t*	heap		= NULL;
5342 5343
		rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
		const rec_offs*	offsets;
5344
		rec_offs_init(offsets_);
5345

5346 5347
		offsets = rec_get_offsets(rec, index, offsets_,
					  index->n_core_fields,
5348
					  ULINT_UNDEFINED, &heap);
5349

5350
		ut_ad(lock_rec_queue_validate(
5351
			      false, block->page.id(), rec, index, offsets));
5352

5353 5354
		if (heap != NULL) {
			mem_heap_free(heap);
5355 5356
		}
	}
5357
#endif /* UNIV_DEBUG */
5358

5359 5360 5361 5362 5363 5364 5365 5366 5367
	if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
		/* Update the page max trx id field */
		/* It might not be necessary to do this if
		err == DB_SUCCESS (no new lock created),
		but it should not cost too much performance. */
		page_update_max_trx_id(block,
				       buf_block_get_page_zip(block),
				       thr_get_trx(thr)->id, mtr);
		err = DB_SUCCESS;
5368
	}
5369 5370

	return(err);
5371 5372 5373
}

/*********************************************************************//**
5374 5375
Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record.
5376
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5377 5378 5379 5380 5381 5382 5383 5384 5385 5386 5387
dberr_t
lock_sec_rec_read_check_and_lock(
/*=============================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: secondary index */
5388
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5389 5390 5391 5392 5393
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5394
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5395 5396
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5397
{
5398 5399
	dberr_t	err;
	ulint	heap_no;
5400

5401 5402
	ut_ad(!dict_index_is_clust(index));
	ut_ad(!dict_index_is_online_ddl(index));
5403
	ut_ad(block->page.frame == page_align(rec));
5404 5405
	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
	ut_ad(rec_offs_validate(rec, index, offsets));
5406
	ut_ad(page_rec_is_leaf(rec));
5407
	ut_ad(mode == LOCK_X || mode == LOCK_S);
5408

5409 5410
	if ((flags & BTR_NO_LOCKING_FLAG)
	    || srv_read_only_mode
5411
	    || index->table->is_temporary()) {
5412

5413 5414
		return(DB_SUCCESS);
	}
5415

5416 5417
	const page_id_t id{block->page.id()};

5418
	ut_ad(!rec_is_metadata(rec, *index));
5419
	heap_no = page_rec_get_heap_no(rec);
5420

5421 5422 5423
	/* Some transaction may have an implicit x-lock on the record only
	if the max trx id for the page >= min trx id for the trx list or a
	database recovery is running. */
5424

5425 5426 5427
	trx_t *trx = thr_get_trx(thr);
	if (!lock_table_has(trx, index->table, LOCK_X)
	    && !page_rec_is_supremum(rec)
5428 5429
	    && page_get_max_trx_id(block->page.frame)
	    >= trx_sys.get_min_trx_id()
5430
	    && lock_rec_convert_impl_to_expl(thr_get_trx(thr), id, rec,
5431 5432
					     index, offsets)
	    && gap_mode == LOCK_REC_NOT_GAP) {
5433 5434
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
5435
	}
5436

5437 5438 5439 5440 5441 5442 5443 5444 5445
#ifdef WITH_WSREP
	/* If transaction scanning an unique secondary key is wsrep
	high priority thread (brute force) this scanning may involve
	GAP-locking in the index. As this locking happens also when
	applying replication events in high priority applier threads,
	there is a probability for lock conflicts between two wsrep
	high priority threads. To avoid this GAP-locking we mark that
	this transaction is using unique key scan here. */
	if (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false))
Marko Mäkelä's avatar
Marko Mäkelä committed
5446
		trx->wsrep = 3;
5447
#endif /* WITH_WSREP */
5448

5449
	err = lock_rec_lock(false, gap_mode | mode,
5450
			    block, heap_no, index, thr);
5451

5452
#ifdef WITH_WSREP
Marko Mäkelä's avatar
Marko Mäkelä committed
5453
	if (trx->wsrep == 3) trx->wsrep = 1;
5454
#endif /* WITH_WSREP */
5455

5456
	ut_ad(lock_rec_queue_validate(false, id, rec, index, offsets));
5457

5458
	return(err);
5459 5460 5461
}

/*********************************************************************//**
5462 5463 5464 5465 5466 5467
Checks if locks of other transactions prevent an immediate read, or passing
over by a read cursor, of a clustered index record. If they do, first tests
if the query thread should anyway be suspended for some reason; if not, then
puts the transaction and the query thread to the lock wait state and inserts a
waiting request for a record lock to the lock queue. Sets the requested mode
lock on the record.
5468
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, or DB_DEADLOCK */
5469 5470 5471 5472 5473 5474 5475 5476 5477 5478 5479
dberr_t
lock_clust_rec_read_check_and_lock(
/*===============================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: clustered index */
5480
	const rec_offs*		offsets,/*!< in: rec_get_offsets(rec, index) */
5481 5482 5483 5484 5485
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5486
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5487 5488
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5489
{
5490
	ut_ad(dict_index_is_clust(index));
5491
	ut_ad(block->page.frame == page_align(rec));
5492 5493 5494 5495
	ut_ad(page_rec_is_user_rec(rec) || page_rec_is_supremum(rec));
	ut_ad(gap_mode == LOCK_ORDINARY || gap_mode == LOCK_GAP
	      || gap_mode == LOCK_REC_NOT_GAP);
	ut_ad(rec_offs_validate(rec, index, offsets));
5496
	ut_ad(page_rec_is_leaf(rec));
5497
	ut_ad(!rec_is_metadata(rec, *index));
5498

5499 5500
	if ((flags & BTR_NO_LOCKING_FLAG)
	    || srv_read_only_mode
5501
	    || index->table->is_temporary()) {
5502

5503
		return(DB_SUCCESS);
5504 5505
	}

5506 5507
	const page_id_t id{block->page.id()};

Marko Mäkelä's avatar
Marko Mäkelä committed
5508
	ulint heap_no = page_rec_get_heap_no(rec);
5509

5510 5511 5512
	trx_t *trx = thr_get_trx(thr);
	if (!lock_table_has(trx, index->table, LOCK_X)
	    && heap_no != PAGE_HEAP_NO_SUPREMUM
Marko Mäkelä's avatar
Marko Mäkelä committed
5513
	    && lock_rec_convert_impl_to_expl(trx, id, rec, index, offsets)
5514
	    && gap_mode == LOCK_REC_NOT_GAP) {
5515 5516
		/* We already hold an implicit exclusive lock. */
		return DB_SUCCESS;
5517
	}
5518

Marko Mäkelä's avatar
Marko Mäkelä committed
5519 5520
	dberr_t err = lock_rec_lock(false, gap_mode | mode,
				    block, heap_no, index, thr);
5521

5522
	ut_ad(lock_rec_queue_validate(false, id, rec, index, offsets));
5523

5524
	DEBUG_SYNC_C("after_lock_clust_rec_read_check_and_lock");
5525

5526 5527
	return(err);
}
5528
/*********************************************************************//**
5529 5530 5531 5532 5533 5534 5535 5536
Checks if locks of other transactions prevent an immediate read, or passing
over by a read cursor, of a clustered index record. If they do, first tests
if the query thread should anyway be suspended for some reason; if not, then
puts the transaction and the query thread to the lock wait state and inserts a
waiting request for a record lock to the lock queue. Sets the requested mode
lock on the record. This is an alternative version of
lock_clust_rec_read_check_and_lock() that does not require the parameter
"offsets".
5537
@return DB_SUCCESS, DB_LOCK_WAIT, or DB_DEADLOCK */
5538
dberr_t
5539 5540 5541 5542 5543 5544 5545 5546 5547 5548 5549 5550 5551 5552 5553
lock_clust_rec_read_check_and_lock_alt(
/*===================================*/
	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
					bit is set, does nothing */
	const buf_block_t*	block,	/*!< in: buffer block of rec */
	const rec_t*		rec,	/*!< in: user record or page
					supremum record which should
					be read or passed over by a
					read cursor */
	dict_index_t*		index,	/*!< in: clustered index */
	lock_mode		mode,	/*!< in: mode of the lock which
					the read cursor should set on
					records: LOCK_S or LOCK_X; the
					latter is possible in
					SELECT FOR UPDATE */
5554
	unsigned		gap_mode,/*!< in: LOCK_ORDINARY, LOCK_GAP, or
5555 5556
					LOCK_REC_NOT_GAP */
	que_thr_t*		thr)	/*!< in: query thread */
5557
{
5558
	mem_heap_t*	tmp_heap	= NULL;
5559 5560
	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
	rec_offs*	offsets		= offsets_;
5561
	dberr_t		err;
5562
	rec_offs_init(offsets_);
5563

5564
	ut_ad(page_rec_is_leaf(rec));
5565
	offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
5566 5567 5568 5569 5570 5571
				  ULINT_UNDEFINED, &tmp_heap);
	err = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
						 offsets, mode, gap_mode, thr);
	if (tmp_heap) {
		mem_heap_free(tmp_heap);
	}
5572

5573 5574
	if (err == DB_SUCCESS_LOCKED_REC) {
		err = DB_SUCCESS;
5575 5576
	}

5577 5578
	return(err);
}
5579

5580 5581 5582 5583 5584 5585 5586 5587 5588 5589
/*******************************************************************//**
Check if a transaction holds any autoinc locks.
@return TRUE if the transaction holds any AUTOINC locks. */
static
ibool
lock_trx_holds_autoinc_locks(
/*=========================*/
	const trx_t*	trx)		/*!< in: transaction */
{
	ut_a(trx->autoinc_locks != NULL);
5590

5591 5592
	return(!ib_vector_is_empty(trx->autoinc_locks));
}
5593

5594
/** Release all AUTO_INCREMENT locks of the transaction. */
5595
static void lock_release_autoinc_locks(trx_t *trx)
5596
{
5597
  {
5598 5599 5600 5601 5602 5603 5604 5605 5606 5607 5608 5609 5610 5611 5612 5613 5614
    LockMutexGuard g{SRW_LOCK_CALL};
    mysql_mutex_lock(&lock_sys.wait_mutex);
    trx->mutex_lock();
    auto autoinc_locks= trx->autoinc_locks;
    ut_a(autoinc_locks);

    /* We release the locks in the reverse order. This is to avoid
    searching the vector for the element to delete at the lower level.
    See (lock_table_remove_low()) for details. */
    while (ulint size= ib_vector_size(autoinc_locks))
    {
      lock_t *lock= *static_cast<lock_t**>
        (ib_vector_get(autoinc_locks, size - 1));
      ut_ad(lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE));
      lock_table_dequeue(lock, true);
      lock_trx_table_locks_remove(lock);
    }
5615
  }
5616 5617
  mysql_mutex_unlock(&lock_sys.wait_mutex);
  trx->mutex_unlock();
5618
}
5619

5620
/** Cancel a waiting lock request and release possibly waiting transactions */
5621
static void lock_cancel_waiting_and_release(lock_t *lock)
5622
{
5623
  lock_sys.assert_locked(*lock);
5624 5625
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  trx_t *trx= lock->trx;
5626
  trx->mutex_lock();
5627
  ut_ad(trx->state == TRX_STATE_ACTIVE);
5628

5629
  if (!lock->is_table())
5630
    lock_rec_dequeue_from_page(lock, true);
5631 5632
  else
  {
5633 5634 5635 5636 5637
    if (lock->type_mode == (LOCK_AUTO_INC | LOCK_TABLE))
    {
      ut_ad(trx->autoinc_locks);
      ib_vector_remove(trx->autoinc_locks, lock);
    }
5638
    lock_table_dequeue(lock, true);
5639 5640 5641
    /* Remove the lock from table lock vector too. */
    lock_trx_table_locks_remove(lock);
  }
5642

5643 5644
  /* Reset the wait flag and the back pointer to lock in trx. */
  lock_reset_lock_and_trx_wait(lock);
5645

5646
  lock_wait_end(trx);
5647 5648
  trx->mutex_unlock();
}
5649
#ifdef WITH_WSREP
5650
TRANSACTIONAL_TARGET
5651 5652 5653 5654 5655 5656 5657 5658 5659 5660 5661 5662 5663 5664
void lock_sys_t::cancel_lock_wait_for_trx(trx_t *trx)
{
  lock_sys.wr_lock(SRW_LOCK_CALL);
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (lock_t *lock= trx->lock.wait_lock)
  {
    /* check if victim is still waiting */
    if (lock->is_waiting())
      lock_cancel_waiting_and_release(lock);
  }
  lock_sys.wr_unlock();
  mysql_mutex_unlock(&lock_sys.wait_mutex);
}
#endif /* WITH_WSREP */
5665 5666

/** Cancel a waiting lock request.
5667 5668 5669
@tparam check_victim  whether to check for DB_DEADLOCK
@param lock           waiting lock request
@param trx            active transaction
5670 5671 5672
@retval DB_SUCCESS    if no lock existed
@retval DB_DEADLOCK   if trx->lock.was_chosen_as_deadlock_victim was set
@retval DB_LOCK_WAIT  if the lock was canceled */
5673 5674
template<bool check_victim>
dberr_t lock_sys_t::cancel(trx_t *trx, lock_t *lock)
5675 5676 5677 5678
{
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);
  ut_ad(trx->lock.wait_lock == lock);
  ut_ad(trx->state == TRX_STATE_ACTIVE);
5679
  dberr_t err= DB_SUCCESS;
5680 5681
  /* This would be too large for a memory transaction, except in the
  DB_DEADLOCK case, which was already tested in lock_trx_handle_wait(). */
5682 5683
  if (lock->is_table())
  {
5684 5685 5686 5687 5688 5689 5690 5691 5692 5693 5694 5695 5696 5697 5698 5699
    if (!lock_sys.rd_lock_try())
    {
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.rd_lock(SRW_LOCK_CALL);
      mysql_mutex_lock(&lock_sys.wait_mutex);
      lock= trx->lock.wait_lock;
      if (!lock);
      else if (check_victim && trx->lock.was_chosen_as_deadlock_victim)
        err= DB_DEADLOCK;
      else
        goto resolve_table_lock;
    }
    else
    {
resolve_table_lock:
      dict_table_t *table= lock->un_member.tab_lock.table;
5700 5701 5702 5703 5704 5705 5706 5707 5708 5709 5710 5711 5712 5713 5714 5715 5716
      if (!table->lock_mutex_trylock())
      {
        /* The correct latching order is:
        lock_sys.latch, table->lock_mutex_lock(), lock_sys.wait_mutex.
        Thus, we must release lock_sys.wait_mutex for a blocking wait. */
        mysql_mutex_unlock(&lock_sys.wait_mutex);
        table->lock_mutex_lock();
        mysql_mutex_lock(&lock_sys.wait_mutex);
        lock= trx->lock.wait_lock;
        if (!lock)
          goto retreat;
        else if (check_victim && trx->lock.was_chosen_as_deadlock_victim)
        {
          err= DB_DEADLOCK;
          goto retreat;
        }
      }
5717 5718 5719 5720 5721 5722 5723 5724 5725
      if (lock->is_waiting())
        lock_cancel_waiting_and_release(lock);
      /* Even if lock->is_waiting() did not hold above, we must return
      DB_LOCK_WAIT, or otherwise optimistic parallel replication could
      occasionally hang. Potentially affected tests:
      rpl.rpl_parallel_optimistic
      rpl.rpl_parallel_optimistic_nobinlog
      rpl.rpl_parallel_optimistic_xa_lsu_off */
      err= DB_LOCK_WAIT;
5726 5727
retreat:
      table->lock_mutex_unlock();
5728 5729
    }
    lock_sys.rd_unlock();
5730 5731 5732
  }
  else
  {
5733 5734 5735 5736 5737 5738 5739 5740 5741 5742 5743 5744 5745 5746 5747 5748 5749 5750 5751 5752 5753 5754 5755 5756 5757 5758 5759 5760
    /* To prevent the record lock from being moved between pages
    during a page split or merge, we must hold exclusive lock_sys.latch. */
    if (!lock_sys.wr_lock_try())
    {
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.wr_lock(SRW_LOCK_CALL);
      mysql_mutex_lock(&lock_sys.wait_mutex);
      lock= trx->lock.wait_lock;
      if (!lock);
      else if (check_victim && trx->lock.was_chosen_as_deadlock_victim)
        err= DB_DEADLOCK;
      else
        goto resolve_record_lock;
    }
    else
    {
resolve_record_lock:
      if (lock->is_waiting())
        lock_cancel_waiting_and_release(lock);
      /* Even if lock->is_waiting() did not hold above, we must return
      DB_LOCK_WAIT, or otherwise optimistic parallel replication could
      occasionally hang. Potentially affected tests:
      rpl.rpl_parallel_optimistic
      rpl.rpl_parallel_optimistic_nobinlog
      rpl.rpl_parallel_optimistic_xa_lsu_off */
      err= DB_LOCK_WAIT;
    }
    lock_sys.wr_unlock();
5761
  }
5762 5763

  return err;
5764 5765 5766 5767 5768 5769 5770 5771
}

/** Cancel a waiting lock request (if any) when killing a transaction */
void lock_sys_t::cancel(trx_t *trx)
{
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (lock_t *lock= trx->lock.wait_lock)
  {
5772 5773 5774 5775 5776 5777
    /* Dictionary transactions must be immune to KILL, because they
    may be executed as part of a multi-transaction DDL operation, such
    as rollback_inplace_alter_table() or ha_innobase::delete_table(). */
    if (!trx->dict_operation)
    {
      trx->error_state= DB_INTERRUPTED;
5778
      cancel<false>(trx, lock);
5779
    }
5780 5781 5782
  }
  lock_sys.deadlock_check();
  mysql_mutex_unlock(&lock_sys.wait_mutex);
5783 5784 5785 5786 5787 5788 5789 5790 5791 5792 5793
}

/*********************************************************************//**
Unlocks AUTO_INC type locks that were possibly reserved by a trx. This
function should be called at the the end of an SQL statement, by the
connection thread that owns the transaction (trx->mysql_thd). */
void
lock_unlock_table_autoinc(
/*======================*/
	trx_t*	trx)	/*!< in/out: transaction */
{
5794
	lock_sys.assert_unlocked();
5795
	ut_ad(!trx->mutex_is_owner());
5796 5797 5798 5799
	ut_ad(!trx->lock.wait_lock);

	/* This can be invoked on NOT_STARTED, ACTIVE, PREPARED,
	but not COMMITTED transactions. */
5800

5801 5802
	ut_ad(trx_state_eq(trx, TRX_STATE_NOT_STARTED)
	      || !trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY));
5803

5804 5805 5806
	/* This function is invoked for a running transaction by the
	thread that is serving the transaction. Therefore it is not
	necessary to hold trx->mutex here. */
5807

5808
	if (lock_trx_holds_autoinc_locks(trx)) {
5809
		lock_release_autoinc_locks(trx);
5810 5811 5812
	}
}

5813 5814 5815 5816 5817 5818 5819 5820
/** Handle a pending lock wait (DB_LOCK_WAIT) in a semi-consistent read
while holding a clustered index leaf page latch.
@param trx           transaction that is or was waiting for a lock
@retval DB_SUCCESS   if the lock was granted
@retval DB_DEADLOCK  if the transaction must be aborted due to a deadlock
@retval DB_LOCK_WAIT if a lock wait would be necessary; the pending
                     lock request was released */
dberr_t lock_trx_handle_wait(trx_t *trx)
5821
{
5822 5823 5824 5825
  if (trx->lock.was_chosen_as_deadlock_victim)
    return DB_DEADLOCK;
  if (!trx->lock.wait_lock)
    return DB_SUCCESS;
5826 5827 5828 5829 5830
  dberr_t err= DB_SUCCESS;
  mysql_mutex_lock(&lock_sys.wait_mutex);
  if (trx->lock.was_chosen_as_deadlock_victim)
    err= DB_DEADLOCK;
  else if (lock_t *wait_lock= trx->lock.wait_lock)
5831
    err= lock_sys_t::cancel<true>(trx, wait_lock);
5832
  lock_sys.deadlock_check();
5833
  mysql_mutex_unlock(&lock_sys.wait_mutex);
5834
  return err;
5835 5836
}

5837
#ifdef UNIV_DEBUG
5838 5839 5840 5841 5842 5843 5844 5845 5846 5847
/**
  Do an exhaustive check for any locks (table or rec) against the table.

  @param[in]  table  check if there are any locks held on records in this table
                     or on the table itself
*/

static my_bool lock_table_locks_lookup(rw_trx_hash_element_t *element,
                                       const dict_table_t *table)
{
5848
  lock_sys.assert_locked();
5849
  mysql_mutex_lock(&element->mutex);
5850 5851
  if (element->trx)
  {
5852
    element->trx->mutex_lock();
5853
    check_trx_state(element->trx);
Marko Mäkelä's avatar
Marko Mäkelä committed
5854
    if (element->trx->state != TRX_STATE_COMMITTED_IN_MEMORY)
5855
    {
Marko Mäkelä's avatar
Marko Mäkelä committed
5856 5857 5858
      for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks);
           lock != NULL;
           lock= UT_LIST_GET_NEXT(trx_locks, lock))
5859
      {
Marko Mäkelä's avatar
Marko Mäkelä committed
5860
        ut_ad(lock->trx == element->trx);
5861
        if (!lock->is_table())
Marko Mäkelä's avatar
Marko Mäkelä committed
5862
        {
Marko Mäkelä's avatar
Marko Mäkelä committed
5863
          ut_ad(lock->index->online_status != ONLINE_INDEX_CREATION ||
Marko Mäkelä's avatar
Marko Mäkelä committed
5864 5865 5866 5867 5868
                lock->index->is_primary());
          ut_ad(lock->index->table != table);
        }
        else
          ut_ad(lock->un_member.tab_lock.table != table);
5869 5870
      }
    }
5871
    element->trx->mutex_unlock();
5872
  }
5873
  mysql_mutex_unlock(&element->mutex);
5874
  return 0;
5875
}
5876
#endif /* UNIV_DEBUG */
5877

5878
/** Check if there are any locks on a table.
5879
@return true if table has either table or record locks. */
5880
TRANSACTIONAL_TARGET
5881
bool lock_table_has_locks(dict_table_t *table)
5882
{
5883 5884
  if (table->n_rec_locks)
    return true;
5885 5886 5887 5888 5889 5890 5891 5892 5893 5894 5895 5896 5897 5898 5899 5900
  ulint len;
#if !defined NO_ELISION && !defined SUX_LOCK_GENERIC
  if (xbegin())
  {
    if (table->lock_mutex_is_locked())
      xabort();
    len= UT_LIST_GET_LEN(table->locks);
    xend();
  }
  else
#endif
  {
    table->lock_mutex_lock();
    len= UT_LIST_GET_LEN(table->locks);
    table->lock_mutex_unlock();
  }
5901 5902
  if (len)
    return true;
5903
#ifdef UNIV_DEBUG
5904 5905 5906 5907 5908
  {
    LockMutexGuard g{SRW_LOCK_CALL};
    trx_sys.rw_trx_hash.iterate(lock_table_locks_lookup,
                                const_cast<const dict_table_t*>(table));
  }
5909
#endif /* UNIV_DEBUG */
5910
  return false;
5911
}
5912

5913 5914 5915 5916 5917 5918 5919 5920 5921
/*******************************************************************//**
Initialise the table lock list. */
void
lock_table_lock_list_init(
/*======================*/
	table_lock_list_t*	lock_list)	/*!< List to initialise */
{
	UT_LIST_INIT(*lock_list, &lock_table_t::locks);
}
5922

5923 5924 5925 5926 5927 5928 5929 5930 5931 5932 5933 5934
#ifdef UNIV_DEBUG
/*******************************************************************//**
Check if the transaction holds any locks on the sys tables
or its records.
@return the strongest lock found on any sys table or 0 for none */
const lock_t*
lock_trx_has_sys_table_locks(
/*=========================*/
	const trx_t*	trx)	/*!< in: transaction to check */
{
	const lock_t*	strongest_lock = 0;
	lock_mode	strongest = LOCK_NONE;
5935

5936
	LockMutexGuard g{SRW_LOCK_CALL};
5937

5938 5939
	const lock_list::const_iterator end = trx->lock.table_locks.end();
	lock_list::const_iterator it = trx->lock.table_locks.begin();
5940

5941
	/* Find a valid mode. Note: ib_vector_size() can be 0. */
5942

5943 5944 5945 5946 5947 5948
	for (/* No op */; it != end; ++it) {
		const lock_t*	lock = *it;

		if (lock != NULL
		    && dict_is_sys_table(lock->un_member.tab_lock.table->id)) {

5949
			strongest = lock->mode();
5950 5951 5952 5953
			ut_ad(strongest != LOCK_NONE);
			strongest_lock = lock;
			break;
		}
5954 5955
	}

5956 5957
	if (strongest == LOCK_NONE) {
		return(NULL);
5958 5959
	}

5960 5961
	for (/* No op */; it != end; ++it) {
		const lock_t*	lock = *it;
5962

5963 5964 5965
		if (lock == NULL) {
			continue;
		}
5966

5967
		ut_ad(trx == lock->trx);
5968 5969
		ut_ad(lock->is_table());
		ut_ad(lock->un_member.tab_lock.table);
5970

5971
		lock_mode mode = lock->mode();
5972

5973 5974
		if (dict_is_sys_table(lock->un_member.tab_lock.table->id)
		    && lock_mode_stronger_or_eq(mode, strongest)) {
5975

5976 5977 5978 5979
			strongest = mode;
			strongest_lock = lock;
		}
	}
5980

5981
	return(strongest_lock);
5982 5983
}

5984 5985 5986
/** Check if the transaction holds an explicit exclusive lock on a record.
@param[in]	trx	transaction
@param[in]	table	table
5987
@param[in]	id	leaf page identifier
5988 5989
@param[in]	heap_no	heap number identifying the record
@return whether an explicit X-lock is held */
5990 5991
bool lock_trx_has_expl_x_lock(const trx_t &trx, const dict_table_t &table,
                              page_id_t id, ulint heap_no)
5992
{
5993 5994
  ut_ad(heap_no > PAGE_HEAP_NO_SUPREMUM);
  ut_ad(lock_table_has(&trx, &table, LOCK_IX));
5995 5996
  if (!lock_table_has(&trx, &table, LOCK_X))
  {
5997
    LockGuard g{lock_sys.rec_hash, id};
5998 5999
    ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
                            g.cell(), id, heap_no, &trx));
6000
  }
6001
  return true;
6002
}
6003
#endif /* UNIV_DEBUG */
6004

6005
namespace Deadlock
6006
{
6007 6008 6009 6010 6011 6012
  /** rewind(3) the file used for storing the latest detected deadlock and
  print a heading message to stderr if printing of all deadlocks to stderr
  is enabled. */
  static void start_print()
  {
    lock_sys.assert_locked();
6013

6014 6015
    rewind(lock_latest_err_file);
    ut_print_timestamp(lock_latest_err_file);
6016

6017 6018 6019 6020
    if (srv_print_all_deadlocks)
      ib::info() << "Transactions deadlock detected,"
                    " dumping detailed information.";
  }
6021

6022 6023 6024
  /** Print a message to the deadlock file and possibly to stderr.
  @param msg message to print */
  static void print(const char *msg)
6025
  {
6026 6027 6028
    fputs(msg, lock_latest_err_file);
    if (srv_print_all_deadlocks)
      ib::info() << msg;
6029
  }
6030

6031 6032 6033 6034 6035
  /** Print transaction data to the deadlock file and possibly to stderr.
  @param trx transaction */
  static void print(const trx_t &trx)
  {
    lock_sys.assert_locked();
6036

6037 6038 6039
    ulint n_rec_locks= trx.lock.n_rec_locks;
    ulint n_trx_locks= UT_LIST_GET_LEN(trx.lock.trx_locks);
    ulint heap_size= mem_heap_get_size(trx.lock.lock_heap);
6040

6041 6042
    trx_print_low(lock_latest_err_file, &trx, 3000,
                  n_rec_locks, n_trx_locks, heap_size);
6043

6044 6045 6046
    if (srv_print_all_deadlocks)
      trx_print_low(stderr, &trx, 3000, n_rec_locks, n_trx_locks, heap_size);
  }
6047

6048 6049 6050 6051 6052
  /** Print lock data to the deadlock file and possibly to stderr.
  @param lock record or table type lock */
  static void print(const lock_t &lock)
  {
    lock_sys.assert_locked();
6053

6054 6055 6056 6057
    if (!lock.is_table())
    {
      mtr_t mtr;
      lock_rec_print(lock_latest_err_file, &lock, mtr);
6058

6059 6060 6061 6062 6063 6064
      if (srv_print_all_deadlocks)
        lock_rec_print(stderr, &lock, mtr);
    }
    else
    {
      lock_table_print(lock_latest_err_file, &lock);
6065

6066 6067 6068 6069
      if (srv_print_all_deadlocks)
        lock_table_print(stderr, &lock);
    }
  }
6070

6071 6072 6073 6074 6075 6076
  ATTRIBUTE_COLD
  /** Report a deadlock (cycle in the waits-for graph).
  @param trx        transaction waiting for a lock in this thread
  @param current_trx whether trx belongs to the current thread
  @return the transaction to be rolled back (unless one was committed already)
  @return nullptr if no deadlock */
6077
  static trx_t *report(trx_t *const trx, bool current_trx)
6078
  {
6079
    mysql_mutex_assert_owner(&lock_sys.wait_mutex);
6080
    ut_ad(xtest() || lock_sys.is_writer() == !current_trx);
6081

6082 6083
    /* Normally, trx should be a direct part of the deadlock
    cycle. However, if innodb_deadlock_detect had been OFF in the
6084 6085 6086 6087
    past, or if current_trx=false, trx may be waiting for a lock that
    is held by a participant of a pre-existing deadlock, without being
    part of the deadlock itself. That is, the path to the deadlock may be
    P-shaped instead of O-shaped, with trx being at the foot of the P.
6088 6089 6090 6091 6092 6093 6094

    We will process the entire path leading to a cycle, and we will
    choose the victim (to be aborted) among the cycle. */

    static const char rollback_msg[]= "*** WE ROLL BACK TRANSACTION (%u)\n";
    char buf[9 + sizeof rollback_msg];

6095 6096 6097 6098
    /* If current_trx=true, trx is owned by this thread, and we can
    safely invoke these without holding trx->mutex or lock_sys.latch.
    If current_trx=false, a concurrent commit is protected by both
    lock_sys.latch and lock_sys.wait_mutex. */
6099
    const undo_no_t trx_weight= TRX_WEIGHT(trx) |
6100 6101 6102 6103 6104 6105 6106
      (trx->mysql_thd &&
#ifdef WITH_WSREP
       (thd_has_edited_nontrans_tables(trx->mysql_thd) ||
        (trx->is_wsrep() && wsrep_thd_is_BF(trx->mysql_thd, false)))
#else
       thd_has_edited_nontrans_tables(trx->mysql_thd)
#endif /* WITH_WSREP */
6107
       ? 1ULL << 63 : 0);
6108

6109 6110 6111
    trx_t *victim= nullptr;
    undo_no_t victim_weight= ~0ULL;
    unsigned victim_pos= 0, trx_pos= 0;
6112

6113 6114 6115
    /* Here, lock elision does not make sense, because
    for the output we are going to invoke system calls,
    which would interrupt a memory transaction. */
6116
    if (current_trx && !lock_sys.wr_lock_try())
6117
    {
6118 6119
      mysql_mutex_unlock(&lock_sys.wait_mutex);
      lock_sys.wr_lock(SRW_LOCK_CALL);
6120
      mysql_mutex_lock(&lock_sys.wait_mutex);
6121 6122 6123 6124
    }

    {
      unsigned l= 0;
6125 6126 6127 6128
      /* Now that we are holding lock_sys.wait_mutex again, check
      whether a cycle still exists. */
      trx_t *cycle= find_cycle(trx);
      if (!cycle)
6129
        goto func_exit; /* One of the transactions was already aborted. */
6130 6131 6132 6133
      for (trx_t *next= cycle;;)
      {
        next= next->lock.wait_trx;
        const undo_no_t next_weight= TRX_WEIGHT(next) |
6134 6135 6136 6137 6138 6139 6140
          (next->mysql_thd &&
#ifdef WITH_WSREP
           (thd_has_edited_nontrans_tables(next->mysql_thd) ||
            (next->is_wsrep() && wsrep_thd_is_BF(next->mysql_thd, false)))
#else
           thd_has_edited_nontrans_tables(next->mysql_thd)
#endif /* WITH_WSREP */
6141 6142 6143 6144 6145 6146 6147 6148 6149 6150 6151 6152
           ? 1ULL << 63 : 0);
        if (next_weight < victim_weight)
        {
          victim_weight= next_weight;
          victim= next;
          victim_pos= l;
        }
        if (next == victim)
          trx_pos= l;
        if (next == cycle)
          break;
      }
6153

6154 6155 6156 6157 6158
      if (trx_pos && trx_weight == victim_weight)
      {
        victim= trx;
        victim_pos= trx_pos;
      }
6159

6160 6161 6162 6163 6164 6165 6166 6167
      /* Finally, display the deadlock */
      switch (const auto r= static_cast<enum report>(innodb_deadlock_report)) {
      case REPORT_OFF:
        break;
      case REPORT_BASIC:
      case REPORT_FULL:
        start_print();
        l= 0;
6168

6169 6170 6171 6172 6173 6174 6175 6176 6177 6178 6179 6180
        for (trx_t *next= cycle;;)
        {
          next= next->lock.wait_trx;
          ut_ad(next);
          ut_ad(next->state == TRX_STATE_ACTIVE);
          const lock_t *wait_lock= next->lock.wait_lock;
          ut_ad(wait_lock);
          snprintf(buf, sizeof buf, "\n*** (%u) TRANSACTION:\n", ++l);
          print(buf);
          print(*next);
          print("*** WAITING FOR THIS LOCK TO BE GRANTED:\n");
          print(*wait_lock);
6181
          if (r == REPORT_BASIC);
6182 6183 6184 6185 6186 6187 6188 6189 6190 6191 6192 6193 6194 6195 6196
          else if (wait_lock->is_table())
          {
            if (const lock_t *lock=
                UT_LIST_GET_FIRST(wait_lock->un_member.tab_lock.table->locks))
            {
              ut_ad(!lock->is_waiting());
              print("*** CONFLICTING WITH:\n");
              do
                print(*lock);
              while ((lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock)) &&
                     !lock->is_waiting());
            }
            else
              ut_ad("no conflicting table lock found" == 0);
          }
6197
          else
6198
          {
6199 6200 6201 6202 6203 6204 6205 6206 6207 6208 6209 6210 6211 6212 6213 6214 6215 6216
            const page_id_t id{wait_lock->un_member.rec_lock.page_id};
            hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
                                 ? lock_sys.prdt_hash : lock_sys.rec_hash).
              cell_get(id.fold());
            if (const lock_t *lock= lock_sys_t::get_first(cell, id))
            {
              const ulint heap_no= lock_rec_find_set_bit(wait_lock);
              if (!lock_rec_get_nth_bit(lock, heap_no))
                lock= lock_rec_get_next_const(heap_no, lock);
              ut_ad(!lock->is_waiting());
              print("*** CONFLICTING WITH:\n");
              do
                print(*lock);
              while ((lock= lock_rec_get_next_const(heap_no, lock)) &&
                     !lock->is_waiting());
            }
            else
              ut_ad("no conflicting record lock found" == 0);
6217 6218 6219 6220 6221 6222 6223
          }
          if (next == cycle)
            break;
        }
        snprintf(buf, sizeof buf, rollback_msg, victim_pos);
        print(buf);
      }
6224

6225
      ut_ad(victim->state == TRX_STATE_ACTIVE);
6226

6227 6228
      victim->lock.was_chosen_as_deadlock_victim= true;
      lock_cancel_waiting_and_release(victim->lock.wait_lock);
6229 6230 6231 6232 6233
#ifdef WITH_WSREP
      if (victim->is_wsrep() && wsrep_thd_is_SR(victim->mysql_thd))
        wsrep_handle_SR_rollback(trx->mysql_thd, victim->mysql_thd);
#endif
    }
6234

6235 6236 6237
func_exit:
    if (current_trx)
      lock_sys.wr_unlock();
6238 6239
    return victim;
  }
6240 6241
}

6242 6243 6244 6245 6246
/** Check if a lock request results in a deadlock.
Resolve a deadlock by choosing a transaction that will be rolled back.
@param trx    transaction requesting a lock
@return whether trx must report DB_DEADLOCK */
static bool Deadlock::check_and_resolve(trx_t *trx)
6247
{
6248 6249
  mysql_mutex_assert_owner(&lock_sys.wait_mutex);

6250
  ut_ad(!trx->mutex_is_owner());
6251 6252
  ut_ad(trx->state == TRX_STATE_ACTIVE);
  ut_ad(!srv_read_only_mode);
6253

6254 6255
  if (!innodb_deadlock_detect)
    return false;
6256

6257 6258 6259 6260 6261 6262
  if (UNIV_LIKELY_NULL(find_cycle(trx)) && report(trx, true) == trx)
    return true;

  if (UNIV_LIKELY(!trx->lock.was_chosen_as_deadlock_victim))
    return false;

6263
  if (lock_t *wait_lock= trx->lock.wait_lock)
6264
    lock_sys_t::cancel<false>(trx, wait_lock);
6265

6266
  lock_sys.deadlock_check();
6267
  return true;
6268
}
6269

6270
/** Check for deadlocks while holding only lock_sys.wait_mutex. */
6271
TRANSACTIONAL_TARGET
6272
void lock_sys_t::deadlock_check()
6273
{
6274 6275
  ut_ad(!is_writer());
  mysql_mutex_assert_owner(&wait_mutex);
6276
  bool acquired= false;
6277 6278 6279
#if !defined NO_ELISION && !defined SUX_LOCK_GENERIC
  bool elided= false;
#endif
6280

6281 6282
  if (Deadlock::to_be_checked)
  {
6283
    for (;;)
6284
    {
6285
      auto i= Deadlock::to_check.begin();
6286 6287
      if (i == Deadlock::to_check.end())
        break;
6288 6289 6290 6291 6292 6293 6294 6295 6296 6297
      if (acquired);
#if !defined NO_ELISION && !defined SUX_LOCK_GENERIC
      else if (xbegin())
      {
        if (latch.is_locked_or_waiting())
          xabort();
        acquired= elided= true;
      }
#endif
      else
6298
      {
6299 6300
        acquired= wr_lock_try();
        if (!acquired)
6301
        {
6302 6303
          acquired= true;
          mysql_mutex_unlock(&wait_mutex);
6304
          lock_sys.wr_lock(SRW_LOCK_CALL);
6305
          mysql_mutex_lock(&wait_mutex);
6306 6307 6308
          continue;
        }
      }
6309 6310 6311 6312
      trx_t *trx= *i;
      Deadlock::to_check.erase(i);
      if (Deadlock::find_cycle(trx))
        Deadlock::report(trx, false);
6313
    }
6314
    Deadlock::to_be_checked= false;
6315
  }
6316
  ut_ad(Deadlock::to_check.empty());
6317 6318 6319 6320
#if !defined NO_ELISION && !defined SUX_LOCK_GENERIC
  if (elided)
    return;
#endif
6321
  if (acquired)
6322
    wr_unlock();
6323 6324
}

6325 6326 6327
/** Update the locks when a page is split and merged to two pages,
in defragmentation. */
void lock_update_split_and_merge(
6328 6329 6330 6331
	const buf_block_t* left_block,	/*!< in: left page to which merged */
	const rec_t* orig_pred,		/*!< in: original predecessor of
					supremum on the left page before merge*/
	const buf_block_t* right_block)	/*!< in: right page from which merged */
6332
{
6333 6334 6335
  ut_ad(page_is_leaf(left_block->page.frame));
  ut_ad(page_is_leaf(right_block->page.frame));
  ut_ad(page_align(orig_pred) == left_block->page.frame);
6336

6337 6338
  const page_id_t l{left_block->page.id()};
  const page_id_t r{right_block->page.id()};
6339

6340
  /* This would likely be too large for a memory transaction. */
6341 6342 6343
  LockMultiGuard g{lock_sys.rec_hash, l, r};
  const rec_t *left_next_rec= page_rec_get_next_const(orig_pred);
  ut_ad(!page_rec_is_metadata(left_next_rec));
6344

6345 6346
  /* Inherit the locks on the supremum of the left page to the
  first record which was moved from the right page */
6347
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell1(), l, left_block->page.frame,
6348 6349
                          page_rec_get_heap_no(left_next_rec),
                          PAGE_HEAP_NO_SUPREMUM);
6350

6351 6352
  /* Reset the locks on the supremum of the left page,
  releasing waiting transactions */
6353
  lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
6354

6355 6356
  /* Inherit the locks to the supremum of the left page from the
  successor of the infimum on the right page */
6357
  lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->page.frame,
6358
                          PAGE_HEAP_NO_SUPREMUM,
6359
                          lock_get_min_heap_no(right_block));
6360
}