Commit 43b239a0 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-24915 Galera conflict resolution is unnecessarily complex

The fix of MDEV-23328 introduced a background thread for
killing conflicting transactions.
Thanks to the refactoring that was conducted in MDEV-24671,
the high-priority ("brute-force") applier thread can kill the
conflicting transactions itself, before waiting for the
locks to be finally released (after the conflicting transactions
have been rolled back).

This also allows us to remove the hack LockGGuard that had to
be added in MDEV-20612, and remove Galera-related function
parameters from lock creation.
parent 18dc5b01
...@@ -17991,162 +17991,66 @@ static struct st_mysql_storage_engine innobase_storage_engine= ...@@ -17991,162 +17991,66 @@ static struct st_mysql_storage_engine innobase_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION }; { MYSQL_HANDLERTON_INTERFACE_VERSION };
#ifdef WITH_WSREP #ifdef WITH_WSREP
/** Request a transaction to be killed that holds a conflicting lock.
struct bg_wsrep_kill_trx_arg { @param bf_trx brute force applier transaction
my_thread_id thd_id, bf_thd_id; @param thd_id thd_get_thread_id(victim_trx->mysql_htd)
trx_id_t trx_id, bf_trx_id; @param trx_id victim_trx->id */
bool signal; void lock_wait_wsrep_kill(trx_t *bf_trx, ulong thd_id, trx_id_t trx_id)
};
/** Kill one transaction from a background manager thread
wsrep_innobase_kill_one_trx() is invoked when lock_sys.mutex and trx mutex
are taken, wsrep_thd_bf_abort() cannot be used there as it takes THD mutexes
that must be taken before lock_sys.mutex and trx mutex. That's why
wsrep_innobase_kill_one_trx only posts the killing task to the manager thread
and the actual killing happens asynchronously here.
As no mutexes were held we don't know whether THD or trx pointers are still
valid, so we need to pass thread/trx ids and perform a lookup.
*/
static void bg_wsrep_kill_trx(void *void_arg)
{
bg_wsrep_kill_trx_arg *arg= (bg_wsrep_kill_trx_arg *)void_arg;
THD *thd, *bf_thd;
trx_t *victim_trx;
bool aborting= false;
if ((bf_thd= find_thread_by_id(arg->bf_thd_id)))
wsrep_thd_LOCK(bf_thd);
if ((thd= find_thread_by_id(arg->thd_id)))
wsrep_thd_LOCK(thd);
if (!thd || !bf_thd || !(victim_trx= thd_to_trx(thd)))
goto ret0;
lock_sys.wr_lock(SRW_LOCK_CALL);
mysql_mutex_lock(&lock_sys.wait_mutex);
victim_trx->mutex_lock();
if (victim_trx->id != arg->trx_id ||
victim_trx->state == TRX_STATE_COMMITTED_IN_MEMORY)
{
/* apparently victim trx was meanwhile rolled back. */
goto ret1;
}
DBUG_ASSERT(wsrep_on(bf_thd));
WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
WSREP_DEBUG("Aborter %s trx_id: " TRX_ID_FMT " thread: %ld "
"seqno: %lld client_state: %s client_mode: %s transaction_mode: %s "
"query: %s",
wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal",
arg->bf_trx_id,
thd_get_thread_id(bf_thd),
wsrep_thd_trx_seqno(bf_thd),
wsrep_thd_client_state_str(bf_thd),
wsrep_thd_client_mode_str(bf_thd),
wsrep_thd_transaction_state_str(bf_thd),
wsrep_thd_query(bf_thd));
WSREP_DEBUG("Victim %s trx_id: " TRX_ID_FMT " thread: %ld "
"seqno: %lld client_state: %s client_mode: %s transaction_mode: %s "
"query: %s",
wsrep_thd_is_BF(thd, false) ? "BF" : "normal",
victim_trx->id,
thd_get_thread_id(thd),
wsrep_thd_trx_seqno(thd),
wsrep_thd_client_state_str(thd),
wsrep_thd_client_mode_str(thd),
wsrep_thd_transaction_state_str(thd),
wsrep_thd_query(thd));
/* Mark transaction as a victim for Galera abort */
victim_trx->lock.was_chosen_as_deadlock_victim.fetch_or(2);
if (wsrep_thd_set_wsrep_aborter(bf_thd, thd))
{
WSREP_DEBUG("innodb kill transaction skipped due to wsrep_aborter set");
goto ret1;
}
aborting= true;
ret1:
victim_trx->mutex_unlock();
lock_sys.wr_unlock();
mysql_mutex_unlock(&lock_sys.wait_mutex);
ret0:
if (thd) {
wsrep_thd_UNLOCK(thd);
if (aborting) {
DEBUG_SYNC(bf_thd, "before_wsrep_thd_abort");
wsrep_thd_bf_abort(bf_thd, thd, arg->signal);
}
wsrep_thd_kill_UNLOCK(thd);
}
if (bf_thd) {
wsrep_thd_UNLOCK(bf_thd);
wsrep_thd_kill_UNLOCK(bf_thd);
}
free(arg);
}
/** This function is used to kill one transaction.
This transaction was open on this node (not-yet-committed), and a
conflicting writeset from some other node that was being applied
caused a locking conflict. First committed (from other node)
wins, thus open transaction is rolled back. BF stands for
brute-force: any transaction can get aborted by galera any time
it is necessary.
This conflict can happen only when the replicated writeset (from
other node) is being applied, not when it’s waiting in the queue.
If our local transaction reached its COMMIT and this conflicting
writeset was in the queue, then it should fail the local
certification test instead.
A brute force abort is only triggered by a locking conflict
between a writeset being applied by an applier thread (slave thread)
and an open transaction on the node, not by a Galera writeset
comparison as in the local certification failure.
@param[in] bf_thd Brute force (BF) thread
@param[in,out] victim_trx Vimtim trx to be killed
@param[in] signal Should victim be signaled */
void
wsrep_innobase_kill_one_trx(
THD* bf_thd,
trx_t *victim_trx,
bool signal)
{ {
ut_ad(bf_thd); THD *bf_thd= bf_trx->mysql_thd;
ut_ad(victim_trx);
ut_ad(victim_trx->mutex_is_owner());
DBUG_ENTER("wsrep_innobase_kill_one_trx");
DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort",
{
const char act[]=
"now "
"SIGNAL sync.before_wsrep_thd_abort_reached "
"WAIT_FOR signal.before_wsrep_thd_abort";
DBUG_ASSERT(!debug_sync_set_action(bf_thd,
STRING_WITH_LEN(act)));
};);
trx_t* bf_trx= thd_to_trx(bf_thd);
bg_wsrep_kill_trx_arg *arg = (bg_wsrep_kill_trx_arg*)malloc(sizeof(*arg));
arg->thd_id = thd_get_thread_id(victim_trx->mysql_thd);
arg->trx_id = victim_trx->id;
arg->bf_thd_id = thd_get_thread_id(bf_thd);
arg->bf_trx_id = bf_trx ? bf_trx->id : TRX_ID_MAX;
arg->signal = signal;
mysql_manager_submit(bg_wsrep_kill_trx, arg);
DBUG_VOID_RETURN; if (THD *vthd= find_thread_by_id(thd_id))
{
bool aborting= false;
wsrep_thd_LOCK(vthd);
if (trx_t *vtrx= thd_to_trx(vthd))
{
lock_sys.wr_lock(SRW_LOCK_CALL);
mysql_mutex_lock(&lock_sys.wait_mutex);
vtrx->mutex_lock();
if (vtrx->id == trx_id && vtrx->state == TRX_STATE_ACTIVE)
{
WSREP_LOG_CONFLICT(bf_thd, vthd, TRUE);
WSREP_DEBUG("Aborter BF trx_id: " TRX_ID_FMT " thread: %ld "
"seqno: %lld client_state: %s "
"client_mode: %s transaction_mode: %s query: %s",
bf_trx->id,
thd_get_thread_id(bf_thd),
wsrep_thd_trx_seqno(bf_thd),
wsrep_thd_client_state_str(bf_thd),
wsrep_thd_client_mode_str(bf_thd),
wsrep_thd_transaction_state_str(bf_thd),
wsrep_thd_query(bf_thd));
WSREP_DEBUG("Victim %s trx_id: " TRX_ID_FMT " thread: %ld "
"seqno: %lld client_state: %s "
"client_mode: %s transaction_mode: %s query: %s",
wsrep_thd_is_BF(vthd, false) ? "BF" : "normal",
vtrx->id,
thd_get_thread_id(vthd),
wsrep_thd_trx_seqno(vthd),
wsrep_thd_client_state_str(vthd),
wsrep_thd_client_mode_str(vthd),
wsrep_thd_transaction_state_str(vthd),
wsrep_thd_query(vthd));
/* Mark transaction as a victim for Galera abort */
vtrx->lock.was_chosen_as_deadlock_victim.fetch_or(2);
if (!wsrep_thd_set_wsrep_aborter(bf_thd, vthd))
aborting= true;
else
WSREP_DEBUG("kill transaction skipped due to wsrep_aborter set");
}
lock_sys.wr_unlock();
mysql_mutex_unlock(&lock_sys.wait_mutex);
vtrx->mutex_unlock();
}
wsrep_thd_UNLOCK(vthd);
if (aborting)
{
DEBUG_SYNC(bf_thd, "before_wsrep_thd_abort");
wsrep_thd_bf_abort(bf_thd, vthd, true);
}
wsrep_thd_kill_UNLOCK(vthd);
}
} }
/** This function forces the victim transaction to abort. Aborting the /** This function forces the victim transaction to abort. Aborting the
......
...@@ -209,7 +209,6 @@ innobase_casedn_str( ...@@ -209,7 +209,6 @@ innobase_casedn_str(
char* a); /*!< in/out: string to put in lower case */ char* a); /*!< in/out: string to put in lower case */
#ifdef WITH_WSREP #ifdef WITH_WSREP
void wsrep_innobase_kill_one_trx(THD *bf_thd, trx_t *victim_trx, bool signal);
ulint wsrep_innobase_mysql_sort(int mysql_type, uint charset_number, ulint wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
unsigned char* str, ulint str_length, unsigned char* str, ulint str_length,
unsigned int buf_length); unsigned int buf_length);
......
...@@ -580,7 +580,6 @@ class lock_sys_t ...@@ -580,7 +580,6 @@ class lock_sys_t
{ {
friend struct LockGuard; friend struct LockGuard;
friend struct LockMultiGuard; friend struct LockMultiGuard;
friend struct LockGGuard;
/** Hash table latch */ /** Hash table latch */
struct hash_latch struct hash_latch
...@@ -920,18 +919,6 @@ struct LockGuard ...@@ -920,18 +919,6 @@ struct LockGuard
lock_sys_t::hash_latch *latch; lock_sys_t::hash_latch *latch;
}; };
#ifdef WITH_WSREP
/** lock_sys.latch guard for a page_id_t shard */
struct LockGGuard
{
LockGGuard(lock_sys_t::hash_table &hash, const page_id_t id, bool all);
~LockGGuard();
private:
/** The hash bucket (nullptr if all of them) */
lock_sys_t::hash_latch *latch;
};
#endif
/** lock_sys.latch guard for 2 page_id_t shards */ /** lock_sys.latch guard for 2 page_id_t shards */
struct LockMultiGuard struct LockMultiGuard
{ {
...@@ -952,9 +939,6 @@ lock_t* ...@@ -952,9 +939,6 @@ lock_t*
lock_rec_create( lock_rec_create(
/*============*/ /*============*/
lock_t* c_lock, /*!< conflicting lock */ lock_t* c_lock, /*!< conflicting lock */
#ifdef WITH_WSREP
que_thr_t* thr, /*!< thread owning trx */
#endif
unsigned type_mode,/*!< in: lock mode and wait flag */ unsigned type_mode,/*!< in: lock mode and wait flag */
const buf_block_t* block, /*!< in: buffer block containing const buf_block_t* block, /*!< in: buffer block containing
the record */ the record */
...@@ -984,9 +968,6 @@ without checking for deadlocks or conflicts. ...@@ -984,9 +968,6 @@ without checking for deadlocks or conflicts.
lock_t* lock_t*
lock_rec_create_low( lock_rec_create_low(
lock_t* c_lock, lock_t* c_lock,
#ifdef WITH_WSREP
que_thr_t* thr, /*!< thread owning trx */
#endif
unsigned type_mode, unsigned type_mode,
const page_id_t page_id, const page_id_t page_id,
const page_t* page, const page_t* page,
......
...@@ -61,9 +61,6 @@ lock_t* ...@@ -61,9 +61,6 @@ lock_t*
lock_rec_create( lock_rec_create(
/*============*/ /*============*/
lock_t* c_lock, /*!< conflicting lock */ lock_t* c_lock, /*!< conflicting lock */
#ifdef WITH_WSREP
que_thr_t* thr, /*!< thread owning trx */
#endif
unsigned type_mode,/*!< in: lock mode and wait flag */ unsigned type_mode,/*!< in: lock mode and wait flag */
const buf_block_t* block, /*!< in: buffer block containing const buf_block_t* block, /*!< in: buffer block containing
the record */ the record */
...@@ -77,9 +74,6 @@ lock_rec_create( ...@@ -77,9 +74,6 @@ lock_rec_create(
btr_assert_not_corrupted(block, index); btr_assert_not_corrupted(block, index);
return lock_rec_create_low( return lock_rec_create_low(
c_lock, c_lock,
#ifdef WITH_WSREP
thr,
#endif
type_mode, block->page.id(), block->frame, heap_no, type_mode, block->page.id(), block->frame, heap_no,
index, trx, caller_owns_trx_mutex); index, trx, caller_owns_trx_mutex);
} }
...@@ -49,6 +49,7 @@ Created 5/7/1996 Heikki Tuuri ...@@ -49,6 +49,7 @@ Created 5/7/1996 Heikki Tuuri
#ifdef WITH_WSREP #ifdef WITH_WSREP
#include <mysql/service_wsrep.h> #include <mysql/service_wsrep.h>
#include <debug_sync.h>
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
/** The value of innodb_deadlock_detect */ /** The value of innodb_deadlock_detect */
...@@ -190,36 +191,6 @@ LockGuard::LockGuard(lock_sys_t::hash_table &hash, page_id_t id) ...@@ -190,36 +191,6 @@ LockGuard::LockGuard(lock_sys_t::hash_table &hash, page_id_t id)
latch->acquire(); latch->acquire();
} }
#ifdef WITH_WSREP
LockGGuard::LockGGuard(lock_sys_t::hash_table &hash, page_id_t id, bool all)
{
if (UNIV_UNLIKELY(all))
{
latch= nullptr;
lock_sys.wr_lock(SRW_LOCK_CALL);
}
else
{
const auto id_fold= id.fold();
lock_sys.rd_lock(SRW_LOCK_CALL);
latch= hash.lock_get(id_fold);
latch->acquire();
}
}
LockGGuard::~LockGGuard()
{
if (UNIV_UNLIKELY(!latch))
lock_sys.wr_unlock();
else
{
latch->release();
/* Must be last, to avoid a race with lock_sys_t::hash_table::resize() */
lock_sys.rd_unlock();
}
}
#endif
LockMultiGuard::LockMultiGuard(lock_sys_t::hash_table &hash, LockMultiGuard::LockMultiGuard(lock_sys_t::hash_table &hash,
const page_id_t id1, const page_id_t id2) const page_id_t id1, const page_id_t id2)
{ {
...@@ -916,73 +887,85 @@ lock_rec_other_has_expl_req( ...@@ -916,73 +887,85 @@ lock_rec_other_has_expl_req(
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
#ifdef WITH_WSREP #ifdef WITH_WSREP
ATTRIBUTE_COLD void lock_wait_wsrep_kill(trx_t *bf_trx, ulong thd_id, trx_id_t trx_id);
static void wsrep_kill_victim(const trx_t *trx, const lock_t *lock)
{
lock_sys.assert_locked();
ut_ad(trx->is_wsrep());
ut_ad(lock->trx != trx);
if (!wsrep_thd_is_BF(trx->mysql_thd, FALSE)) { /** Kill the holders of conflicting locks.
return; @param trx brute-force applier transaction running in the current thread */
} ATTRIBUTE_COLD ATTRIBUTE_NOINLINE static void lock_wait_wsrep(trx_t *trx)
{
trx_t* lock_trx = lock->trx; mysql_mutex_assert_owner(&lock_sys.wait_mutex);
lock_trx->mutex_lock(); const lock_t *wait_lock= trx->lock.wait_lock;
if (!wait_lock)
return;
if (lock_trx->state == TRX_STATE_COMMITTED_IN_MEMORY if (!wsrep_thd_is_BF(trx->mysql_thd, false))
|| lock_trx->lock.was_chosen_as_deadlock_victim & 2) { return;
lock_trx->mutex_unlock();
return;
}
my_bool bf_other = wsrep_thd_is_BF(lock_trx->mysql_thd, FALSE); DBUG_ASSERT(wsrep_on(trx->mysql_thd));
mtr_t mtr;
if ((!bf_other) || std::set<trx_t*> victims;
(wsrep_thd_order_before(
trx->mysql_thd, lock_trx->mysql_thd))) {
if (lock_trx->lock.wait_lock) { if (!lock_sys.wr_lock_try())
if (UNIV_UNLIKELY(wsrep_debug)) { {
ib::info() << "WSREP: BF victim waiting\n"; mysql_mutex_unlock(&lock_sys.wait_mutex);
} lock_sys.wr_lock(SRW_LOCK_CALL);
/* cannot release lock, until our lock mysql_mutex_lock(&lock_sys.wait_mutex);
is in the queue*/ wait_lock= trx->lock.wait_lock;
} else { if (!wait_lock)
if (wsrep_log_conflicts) { {
ib::info() << "*** Priority TRANSACTION:"; func_exit:
lock_sys.wr_unlock();
return;
}
}
trx_print_latched(stderr, trx, 3000); if (wait_lock->is_table())
{
dict_table_t *table= wait_lock->un_member.tab_lock.table;
for (lock_t *lock= UT_LIST_GET_FIRST(table->locks); lock;
lock= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock))
if (lock->trx != trx)
victims.emplace(lock->trx);
}
else if (lock_t *lock=
(wait_lock->type_mode & LOCK_PREDICATE
? lock_sys.prdt_hash : lock_sys.rec_hash).
get_first(wait_lock->un_member.rec_lock.page_id))
{
const ulint heap_no= lock_rec_find_set_bit(wait_lock);
if (!lock_rec_get_nth_bit(lock, heap_no))
lock= lock_rec_get_next(heap_no, lock);
do
if (lock->trx != trx)
victims.emplace(lock->trx);
while ((lock= lock_rec_get_next(heap_no, lock)));
}
if (bf_other) { if (victims.empty())
ib::info() << "*** Priority TRANSACTION:"; goto func_exit;
} else {
ib::info() << "*** Victim TRANSACTION:";
}
trx_print_latched(stderr, lock_trx, 3000);
ib::info() << "*** WAITING FOR THIS LOCK TO BE GRANTED:"; std::vector<std::pair<ulong,trx_id_t>> victim_id;
for (trx_t *v : victims)
victim_id.emplace_back(std::pair<ulong,trx_id_t>
{thd_get_thread_id(v->mysql_thd), v->id});
if (!lock->is_table()) { DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort",
lock_rec_print(stderr, lock, mtr); {
} else { const char act[]=
lock_table_print(stderr, lock); "now SIGNAL sync.before_wsrep_thd_abort_reached "
} "WAIT_FOR signal.before_wsrep_thd_abort";
DBUG_ASSERT(!debug_sync_set_action(trx->mysql_thd,
STRING_WITH_LEN(act)));
};);
ib::info() << " SQL1: " lock_sys.wr_unlock();
<< wsrep_thd_query(trx->mysql_thd); mysql_mutex_unlock(&lock_sys.wait_mutex);
ib::info() << " SQL2: "
<< wsrep_thd_query(lock_trx->mysql_thd);
}
wsrep_innobase_kill_one_trx(trx->mysql_thd, for (const auto &v : victim_id)
lock_trx, true); lock_wait_wsrep_kill(trx, v.first, v.second);
}
}
lock->trx->mutex_unlock(); mysql_mutex_lock(&lock_sys.wait_mutex);
} }
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
...@@ -1007,11 +990,6 @@ lock_rec_other_has_conflicting( ...@@ -1007,11 +990,6 @@ lock_rec_other_has_conflicting(
for (lock_t* lock = lock_sys.get_first(lock_sys.rec_hash, id, heap_no); for (lock_t* lock = lock_sys.get_first(lock_sys.rec_hash, id, heap_no);
lock; lock = lock_rec_get_next(heap_no, lock)) { lock; lock = lock_rec_get_next(heap_no, lock)) {
if (lock_rec_has_to_wait(true, trx, mode, lock, is_supremum)) { if (lock_rec_has_to_wait(true, trx, mode, lock, is_supremum)) {
#ifdef WITH_WSREP
if (trx->is_wsrep()) {
wsrep_kill_victim(trx, lock);
}
#endif /* WITH_WSREP */
return(lock); return(lock);
} }
} }
...@@ -1097,37 +1075,6 @@ lock_number_of_tables_locked( ...@@ -1097,37 +1075,6 @@ lock_number_of_tables_locked(
/*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/ /*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/
#ifdef WITH_WSREP
ATTRIBUTE_COLD
static
void
wsrep_print_wait_locks(
/*===================*/
lock_t* c_lock) /* conflicting lock to print */
{
const lock_t *wait_lock = c_lock->trx->lock.wait_lock;
if (wait_lock != c_lock) {
mtr_t mtr;
ib::info() << "WSREP: c_lock != wait lock";
ib::info() << " SQL: "
<< wsrep_thd_query(c_lock->trx->mysql_thd);
if (c_lock->is_table()) {
lock_table_print(stderr, c_lock);
} else {
lock_rec_print(stderr, c_lock, mtr);
}
if (wait_lock->is_table()) {
lock_table_print(stderr, wait_lock);
} else {
lock_rec_print(stderr, wait_lock, mtr);
}
}
}
#endif /* WITH_WSREP */
/** Reset the wait status of a lock. /** Reset the wait status of a lock.
@param[in,out] lock lock that was possibly being waited for */ @param[in,out] lock lock that was possibly being waited for */
static void lock_reset_lock_and_trx_wait(lock_t *lock) static void lock_reset_lock_and_trx_wait(lock_t *lock)
...@@ -1144,142 +1091,6 @@ static void lock_reset_lock_and_trx_wait(lock_t *lock) ...@@ -1144,142 +1091,6 @@ static void lock_reset_lock_and_trx_wait(lock_t *lock)
lock->type_mode&= ~LOCK_WAIT; lock->type_mode&= ~LOCK_WAIT;
} }
#ifdef WITH_WSREP
/** Set the wait status of a lock.
@param[in,out] lock lock that will be waited for
@param[in,out] trx transaction that will wait for the lock
@param[in] ctrx holder of the conflicting lock */
static void lock_set_lock_and_trx_wait(lock_t *lock, trx_t *trx, trx_t *ctrx)
{
ut_ad(lock);
ut_ad(lock->trx == trx);
lock_sys.assert_locked(*lock);
ut_ad(trx->mutex_is_owner());
if (trx->lock.wait_trx)
{
ut_ad(trx->lock.wait_trx == ctrx);
ut_ad(trx->lock.wait_lock != lock);
ut_ad((*trx->lock.wait_lock).trx == trx);
}
else
{
trx->lock.wait_trx= ctrx;
ut_ad(!trx->lock.wait_lock);
}
trx->lock.wait_lock= lock;
lock->type_mode|= LOCK_WAIT;
}
/** Create a waiting record lock in brute-force handling.
@param c_lock conflicting lock
@param thr query thread
@param lock requested lock
@param trx transaction
@return whether the lock can be returned */
ATTRIBUTE_COLD ATTRIBUTE_NOINLINE
static bool lock_rec_create_wsrep(lock_t *c_lock, que_thr_t *thr, lock_t *lock,
trx_t *trx, bool holds_trx_mutex)
{
ut_ad(!lock->is_table());
lock_sys.assert_locked();
lock_t *hash= c_lock->hash;
lock_t *prev= nullptr;
while (hash && wsrep_thd_is_BF(hash->trx->mysql_thd, false) &&
wsrep_thd_order_before(hash->trx->mysql_thd, trx->mysql_thd))
{
prev= hash;
hash= hash->hash;
}
lock->hash= hash;
if (prev)
prev->hash= lock;
else
c_lock->hash= lock;
/*
* delayed conflict resolution '...kill_one_trx' was not called,
* if victim was waiting for some other lock
*/
if (holds_trx_mutex)
trx->mutex_unlock();
mysql_mutex_lock(&lock_sys.wait_mutex);
trx->mutex_lock();
trx_t *ctrx= c_lock->trx;
ctrx->mutex_lock();
const bool ret= ctrx->lock.wait_thr != nullptr;
if (ret)
{
ctrx->lock.was_chosen_as_deadlock_victim= 3;
if (UNIV_UNLIKELY(wsrep_debug))
wsrep_print_wait_locks(c_lock);
lock_set_lock_and_trx_wait(lock, trx, ctrx);
UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
trx->lock.wait_thr= thr;
lock_cancel_waiting_and_release(ctrx->lock.wait_lock);
}
mysql_mutex_unlock(&lock_sys.wait_mutex);
ctrx->mutex_unlock();
if (!holds_trx_mutex)
trx->mutex_unlock();
return ret;
}
/** Create a waiting table lock.
@param c_lock conflicting lock
@param lock requested lock
@param table table
@param trx transaction */
ATTRIBUTE_COLD ATTRIBUTE_NOINLINE
static void lock_table_create_wsrep(lock_t *c_lock, lock_t *lock, dict_table_t *table, trx_t *trx)
{
ut_ad(lock->is_table());
lock_sys.assert_locked();
trx_t *c_trx= c_lock->trx;
if (wsrep_thd_is_BF(trx->mysql_thd, FALSE))
{
ut_list_insert(table->locks, c_lock, lock, TableLockGetNode());
if (UNIV_UNLIKELY(wsrep_debug))
{
wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
wsrep_report_bf_lock_wait(c_trx->mysql_thd, c_trx->id);
}
}
else
ut_list_append(table->locks, lock, TableLockGetNode());
trx->mutex_unlock();
mysql_mutex_lock(&lock_sys.wait_mutex);
trx->mutex_lock();
c_trx->mutex_lock();
if (c_trx->lock.wait_thr)
{
c_trx->lock.was_chosen_as_deadlock_victim= true;
if (UNIV_UNLIKELY(wsrep_debug))
{
wsrep_report_bf_lock_wait(trx->mysql_thd, trx->id);
wsrep_report_bf_lock_wait(c_trx->mysql_thd, c_trx->id);
wsrep_print_wait_locks(c_lock);
}
lock_cancel_waiting_and_release(c_trx->lock.wait_lock);
}
mysql_mutex_unlock(&lock_sys.wait_mutex);
c_trx->mutex_unlock();
}
#endif
/** Create a new record lock and inserts it to the lock queue, /** Create a new record lock and inserts it to the lock queue,
without checking for deadlocks or conflicts. without checking for deadlocks or conflicts.
@param[in] c_lock conflicting lock @param[in] c_lock conflicting lock
...@@ -1294,9 +1105,6 @@ without checking for deadlocks or conflicts. ...@@ -1294,9 +1105,6 @@ without checking for deadlocks or conflicts.
lock_t* lock_t*
lock_rec_create_low( lock_rec_create_low(
lock_t* c_lock, lock_t* c_lock,
#ifdef WITH_WSREP
que_thr_t* thr, /*!< thread owning trx */
#endif
unsigned type_mode, unsigned type_mode,
const page_id_t page_id, const page_id_t page_id,
const page_t* page, const page_t* page,
...@@ -1378,16 +1186,6 @@ lock_rec_create_low( ...@@ -1378,16 +1186,6 @@ lock_rec_create_low(
ut_ad(index->table->get_ref_count() || !index->table->can_be_evicted); ut_ad(index->table->get_ref_count() || !index->table->can_be_evicted);
const auto lock_hash = &lock_sys.hash_get(type_mode); const auto lock_hash = &lock_sys.hash_get(type_mode);
#ifdef WITH_WSREP
if (c_lock && trx->is_wsrep()
&& wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
if (lock_rec_create_wsrep(c_lock, thr, lock, trx,
holds_trx_mutex)) {
/* have to bail out here to avoid lock_set_lock... */
return(lock);
}
} else
#endif /* WITH_WSREP */
HASH_INSERT(lock_t, hash, lock_hash, page_id.fold(), lock); HASH_INSERT(lock_t, hash, lock_hash, page_id.fold(), lock);
if (type_mode & LOCK_WAIT) { if (type_mode & LOCK_WAIT) {
...@@ -1447,7 +1245,6 @@ lock_rec_enqueue_waiting( ...@@ -1447,7 +1245,6 @@ lock_rec_enqueue_waiting(
trx_t* trx = thr_get_trx(thr); trx_t* trx = thr_get_trx(thr);
ut_ad(trx->mutex_is_owner()); ut_ad(trx->mutex_is_owner());
ut_ad(!trx->is_wsrep() || lock_sys.is_writer());
switch (trx_get_dict_operation(trx)) { switch (trx_get_dict_operation(trx)) {
case TRX_DICT_OP_NONE: case TRX_DICT_OP_NONE:
...@@ -1472,9 +1269,6 @@ lock_rec_enqueue_waiting( ...@@ -1472,9 +1269,6 @@ lock_rec_enqueue_waiting(
we already own the trx mutex. */ we already own the trx mutex. */
lock_t* lock = lock_rec_create_low( lock_t* lock = lock_rec_create_low(
c_lock, c_lock,
#ifdef WITH_WSREP
thr,
#endif
type_mode | LOCK_WAIT, id, page, heap_no, index, trx, true); type_mode | LOCK_WAIT, id, page, heap_no, index, trx, true);
if (prdt && type_mode & LOCK_PREDICATE) { if (prdt && type_mode & LOCK_PREDICATE) {
...@@ -1638,9 +1432,6 @@ lock_rec_add_to_queue( ...@@ -1638,9 +1432,6 @@ lock_rec_add_to_queue(
ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx); ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx);
lock_rec_create_low(nullptr, lock_rec_create_low(nullptr,
#ifdef WITH_WSREP
nullptr,
#endif
type_mode, id, page, heap_no, index, trx, type_mode, id, page, heap_no, index, trx,
caller_owns_trx_mutex); caller_owns_trx_mutex);
} }
...@@ -1689,8 +1480,7 @@ lock_rec_lock( ...@@ -1689,8 +1480,7 @@ lock_rec_lock(
MONITOR_ATOMIC_INC(MONITOR_NUM_RECLOCK_REQ); MONITOR_ATOMIC_INC(MONITOR_NUM_RECLOCK_REQ);
const page_id_t id{block->page.id()}; const page_id_t id{block->page.id()};
IF_WSREP(LockGGuard g(lock_sys.rec_hash, id, trx->is_wsrep()), LockGuard g{lock_sys.rec_hash, id};
LockGuard g(lock_sys.rec_hash, id));
if (lock_t *lock= lock_sys.rec_hash.get_first(id)) if (lock_t *lock= lock_sys.rec_hash.get_first(id))
{ {
...@@ -1746,9 +1536,6 @@ lock_rec_lock( ...@@ -1746,9 +1536,6 @@ lock_rec_lock(
*/ */
if (!impl) if (!impl)
lock_rec_create_low(nullptr, lock_rec_create_low(nullptr,
#ifdef WITH_WSREP
nullptr,
#endif
mode, id, block->frame, heap_no, index, trx, false); mode, id, block->frame, heap_no, index, trx, false);
return DB_SUCCESS_LOCKED_REC; return DB_SUCCESS_LOCKED_REC;
...@@ -1935,6 +1722,7 @@ dberr_t lock_wait(que_thr_t *thr) ...@@ -1935,6 +1722,7 @@ dberr_t lock_wait(que_thr_t *thr)
if (const lock_t *wait_lock= trx->lock.wait_lock) if (const lock_t *wait_lock= trx->lock.wait_lock)
{ {
const auto type_mode= wait_lock->type_mode; const auto type_mode= wait_lock->type_mode;
IF_WSREP(if (trx->is_wsrep()) lock_wait_wsrep(trx),);
mysql_mutex_unlock(&lock_sys.wait_mutex); mysql_mutex_unlock(&lock_sys.wait_mutex);
if (had_dict_lock) /* Release foreign key check latch */ if (had_dict_lock) /* Release foreign key check latch */
...@@ -1954,7 +1742,6 @@ dberr_t lock_wait(que_thr_t *thr) ...@@ -1954,7 +1742,6 @@ dberr_t lock_wait(que_thr_t *thr)
const bool rpl= !(type_mode & LOCK_AUTO_INC) && trx->mysql_thd && const bool rpl= !(type_mode & LOCK_AUTO_INC) && trx->mysql_thd &&
innodb_deadlock_detect && thd_need_wait_reports(trx->mysql_thd); innodb_deadlock_detect && thd_need_wait_reports(trx->mysql_thd);
#endif #endif
timespec abstime; timespec abstime;
set_timespec_time_nsec(abstime, suspend_time.val * 1000); set_timespec_time_nsec(abstime, suspend_time.val * 1000);
abstime.MY_tv_sec+= innodb_lock_wait_timeout; abstime.MY_tv_sec+= innodb_lock_wait_timeout;
...@@ -3310,11 +3097,6 @@ lock_table_create( ...@@ -3310,11 +3097,6 @@ lock_table_create(
UT_LIST_ADD_LAST(trx->lock.trx_locks, lock); UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
#ifdef WITH_WSREP
if (c_lock && trx->is_wsrep()) {
lock_table_create_wsrep(c_lock, lock, table, trx);
} else
#endif /* WITH_WSREP */
ut_list_append(table->locks, lock, TableLockGetNode()); ut_list_append(table->locks, lock, TableLockGetNode());
if (type_mode & LOCK_WAIT) { if (type_mode & LOCK_WAIT) {
...@@ -3555,19 +3337,6 @@ lock_table_other_has_incompatible( ...@@ -3555,19 +3337,6 @@ lock_table_other_has_incompatible(
if (lock_trx != trx if (lock_trx != trx
&& !lock_mode_compatible(lock->mode(), mode) && !lock_mode_compatible(lock->mode(), mode)
&& (wait || !lock->is_waiting())) { && (wait || !lock->is_waiting())) {
#ifdef WITH_WSREP
if (trx->is_wsrep() && lock_trx->is_wsrep()) {
if (UNIV_UNLIKELY(wsrep_debug)) {
ib::info() << "WSREP: table lock abort for table:"
<< table->name;
ib::info() << " SQL: "
<< wsrep_thd_query(lock->trx->mysql_thd);
}
wsrep_kill_victim(trx, lock);
}
#endif /* WITH_WSREP */
return(lock); return(lock);
} }
} }
...@@ -4907,8 +4676,7 @@ lock_rec_insert_check_and_lock( ...@@ -4907,8 +4676,7 @@ lock_rec_insert_check_and_lock(
ut_ad(!rec_is_metadata(next_rec, *index)); ut_ad(!rec_is_metadata(next_rec, *index));
{ {
IF_WSREP(LockGGuard g(lock_sys.rec_hash, id, trx->is_wsrep()), LockGuard g{lock_sys.rec_hash, id};
LockGuard g(lock_sys.rec_hash, id));
/* Because this code is invoked for a running transaction by /* Because this code is invoked for a running transaction by
the thread that is serving the transaction, it is not necessary the thread that is serving the transaction, it is not necessary
to hold trx->mutex here. */ to hold trx->mutex here. */
......
...@@ -468,9 +468,6 @@ lock_prdt_add_to_queue( ...@@ -468,9 +468,6 @@ lock_prdt_add_to_queue(
ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx); ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx);
lock_t* lock = lock_rec_create(nullptr, lock_t* lock = lock_rec_create(nullptr,
#ifdef WITH_WSREP
nullptr,
#endif
type_mode, block, PRDT_HEAPNO, index, type_mode, block, PRDT_HEAPNO, index,
trx, caller_owns_trx_mutex); trx, caller_owns_trx_mutex);
...@@ -734,9 +731,6 @@ lock_prdt_lock( ...@@ -734,9 +731,6 @@ lock_prdt_lock(
if (lock == NULL) { if (lock == NULL) {
lock = lock_rec_create( lock = lock_rec_create(
NULL, NULL,
#ifdef WITH_WSREP
NULL, /* FIXME: replicate SPATIAL INDEX locks */
#endif
prdt_mode, block, PRDT_HEAPNO, prdt_mode, block, PRDT_HEAPNO,
index, trx, FALSE); index, trx, FALSE);
...@@ -835,9 +829,6 @@ lock_place_prdt_page_lock( ...@@ -835,9 +829,6 @@ lock_place_prdt_page_lock(
if (lock == NULL) { if (lock == NULL) {
lock = lock_rec_create_low( lock = lock_rec_create_low(
NULL, NULL,
#ifdef WITH_WSREP
NULL, /* FIXME: replicate SPATIAL INDEX locks */
#endif
mode, page_id, NULL, PRDT_HEAPNO, mode, page_id, NULL, PRDT_HEAPNO,
index, trx, FALSE); index, trx, FALSE);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment