Commit b419c752 authored by Sergey Vojtovich's avatar Sergey Vojtovich

Applying InnoDB snapshot, fixes BUG#49047.

Detailed revision comments:

r6534 | sunny | 2010-01-29 23:42:49 +0200 (Fri, 29 Jan 2010) | 15 lines
branches/zip: Two changes to fix the problem:

1. First scan the joining transaction's locks and check if no other
transaction is waiting for a lock held by the joining transaction.
If no other transaction is waiting then  no deadlock an occur and
we avoid doing an exhaustive search.

2. Change the direction of the lock traversal from backward to forward.
Previously we traversed backward from the lock that has to wait, the function
to that fetched the previous node was very inefficient resulting in O(n^2)
access to the rec lock list.

Fix Bug #49047 InnoDB deadlock detection is CPU intensive with many locks on a single row.

rb://218
parent ed2765d9
......@@ -526,6 +526,9 @@ struct trx_struct{
/* 0, RW_S_LATCH, or RW_X_LATCH:
the latch mode trx currently holds
on dict_operation_lock */
unsigned deadlock_mark:1;/*!< a mark field used in deadlock
checking algorithm. Always protected
by the kernel_mutex. */
time_t start_time; /*!< time the trx object was created
or the state last time became
TRX_ACTIVE */
......@@ -640,11 +643,6 @@ struct trx_struct{
wait_thrs; /*!< query threads belonging to this
trx that are in the QUE_THR_LOCK_WAIT
state */
ulint deadlock_mark; /*!< a mark field used in deadlock
checking algorithm. This must be
in its own machine word, because
it can be changed by other
threads while holding kernel_mutex. */
/*------------------------------*/
mem_heap_t* lock_heap; /*!< memory heap for the locks of the
transaction */
......
......@@ -401,7 +401,7 @@ lock_deadlock_recursive(
/*====================*/
trx_t* start, /*!< in: recursion starting point */
trx_t* trx, /*!< in: a transaction waiting for a lock */
lock_t* wait_lock, /*!< in: the lock trx is waiting to be granted */
lock_t* wait_lock, /*!< in: lock that is waiting to be granted */
ulint* cost, /*!< in/out: number of calculation steps thus
far: if this exceeds LOCK_MAX_N_STEPS_...
we return LOCK_VICTIM_IS_START */
......@@ -411,7 +411,7 @@ lock_deadlock_recursive(
/*********************************************************************//**
Gets the nth bit of a record lock.
@return TRUE if bit set */
@return TRUE if bit set also if i == ULINT_UNDEFINED return FALSE*/
UNIV_INLINE
ibool
lock_rec_get_nth_bit(
......@@ -1222,7 +1222,7 @@ lock_rec_get_first_on_page(
/*********************************************************************//**
Gets the next explicit lock request on a record.
@return next lock, NULL if none exists */
@return next lock, NULL if none exists or if heap_no == ULINT_UNDEFINED */
UNIV_INLINE
lock_t*
lock_rec_get_next(
......@@ -3311,6 +3311,64 @@ lock_deadlock_occurs(
return(FALSE);
}
/********************************************************************//**
Check that no other transaction is waiting on this transaction's locks.
@return TRUE if some other transaction is waiting for this lock. */
static
ulint
lock_trx_has_no_waiters(
/*====================*/
const trx_t* trx) /*!< in: the transaction to check */
{
const lock_t* lock;
ut_ad(mutex_own(&kernel_mutex));
for (lock = UT_LIST_GET_FIRST(trx->trx_locks);
lock != NULL;
lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
const lock_t* wait_lock = lock;
/* Look for all transactions that could be waiting on this
transaction's locks. For that we need to search forward. */
if (lock_get_type_low(lock) == LOCK_REC) {
ulint heap_no;
/* It's possible for heap_no to be undefined here.
This can happen during lock move from one page to
another when we split. */
heap_no = lock_rec_find_set_bit(lock);
do {
wait_lock = lock_rec_get_next(
heap_no, (lock_t*) wait_lock);
if (wait_lock != NULL
&& lock_has_to_wait(wait_lock, lock)) {
return(FALSE);
}
} while (wait_lock != NULL);
} else {
do {
wait_lock = UT_LIST_GET_NEXT(
un_member.tab_lock.locks, wait_lock);
if (wait_lock != NULL
&& lock_has_to_wait(wait_lock, lock) ) {
return(FALSE);
}
} while (wait_lock != NULL);
}
}
return(TRUE);
}
/********************************************************************//**
Looks recursively for a deadlock.
@return 0 if no deadlock found, LOCK_VICTIM_IS_START if there was a
......@@ -3324,7 +3382,7 @@ lock_deadlock_recursive(
/*====================*/
trx_t* start, /*!< in: recursion starting point */
trx_t* trx, /*!< in: a transaction waiting for a lock */
lock_t* wait_lock, /*!< in: the lock trx is waiting to be granted */
lock_t* wait_lock, /*!< in: lock that is waiting to be granted */
ulint* cost, /*!< in/out: number of calculation steps thus
far: if this exceeds LOCK_MAX_N_STEPS_...
we return LOCK_VICTIM_IS_START */
......@@ -3332,10 +3390,10 @@ lock_deadlock_recursive(
LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK, we
return LOCK_VICTIM_IS_START */
{
ulint ret;
lock_t* lock;
ulint bit_no = ULINT_UNDEFINED;
trx_t* lock_trx;
ulint ret;
ulint heap_no = ULINT_UNDEFINED;
ut_a(trx);
ut_a(start);
......@@ -3346,32 +3404,54 @@ lock_deadlock_recursive(
/* We have already exhaustively searched the subtree starting
from this trx */
return(0);
} else if (lock_trx_has_no_waiters(trx)) {
/* If no other transaction is waiting for this transaction
to release its locks then no deadlock can occur. */
return(0);
}
*cost = *cost + 1;
lock = wait_lock;
if (lock_get_type_low(wait_lock) == LOCK_REC) {
ulint space;
ulint page_no;
heap_no = lock_rec_find_set_bit(wait_lock);
ut_a(heap_no != ULINT_UNDEFINED);
space = wait_lock->un_member.rec_lock.space;
page_no = wait_lock->un_member.rec_lock.page_no;
lock = lock_rec_get_first_on_page_addr(space, page_no);
bit_no = lock_rec_find_set_bit(wait_lock);
/* Position the iterator on the first matching record lock. */
while (lock != NULL
&& lock != wait_lock
&& !lock_rec_get_nth_bit(lock, heap_no)) {
ut_a(bit_no != ULINT_UNDEFINED);
lock = lock_rec_get_next_on_page(lock);
}
if (lock == wait_lock) {
lock = NULL;
}
ut_ad(lock == NULL || lock_rec_get_nth_bit(lock, heap_no));
} else {
lock = wait_lock;
}
/* Look at the locks ahead of wait_lock in the lock queue */
for (;;) {
if (lock_get_type_low(lock) & LOCK_TABLE) {
/* Get previous table lock. */
if (heap_no == ULINT_UNDEFINED) {
lock = UT_LIST_GET_PREV(un_member.tab_lock.locks,
lock);
} else {
ut_ad(lock_get_type_low(lock) == LOCK_REC);
ut_a(bit_no != ULINT_UNDEFINED);
lock = (lock_t*) lock_rec_get_prev(lock, bit_no);
lock = UT_LIST_GET_PREV(
un_member.tab_lock.locks, lock);
}
if (lock == NULL) {
......@@ -3493,12 +3573,28 @@ lock_deadlock_recursive(
ret = lock_deadlock_recursive(
start, lock_trx,
lock_trx->wait_lock, cost, depth + 1);
if (ret != 0) {
return(ret);
}
}
}
/* Get the next record lock to check. */
if (heap_no != ULINT_UNDEFINED) {
ut_a(lock != NULL);
do {
lock = lock_rec_get_next_on_page(lock);
} while (lock != NULL
&& lock != wait_lock
&& !lock_rec_get_nth_bit(lock, heap_no));
if (lock == wait_lock) {
lock = NULL;
}
}
}/* end of the 'for (;;)'-loop */
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment