Many files:

  Fix Bug #3300 : if innodb_locks_unsafe_for_binlog is set, release locks on rows that we do not UPDATE or DELETE
parent f9bc3cb5
......@@ -243,17 +243,27 @@ row_update_for_mysql(
the MySQL format */
row_prebuilt_t* prebuilt); /* in: prebuilt struct in MySQL
handle */
/*************************************************************************
Does an unlock of a row for MySQL. */
This can only be used when srv_locks_unsafe_for_binlog is TRUE. Before
calling this function we must use trx_reset_new_rec_lock_info() and
trx_register_new_rec_lock() to store the information which new record locks
really were set. This function removes a newly set lock under prebuilt->pcur,
and also under prebuilt->clust_pcur. Currently, this is only used and tested
in the case of an UPDATE or a DELETE statement, where the row lock is of the
LOCK_X type.
Thus, this implements a 'mini-rollback' that releases the latest record
locks we set. */
int
row_unlock_for_mysql(
/*=================*/
/* out: error code or DB_SUCCESS */
row_prebuilt_t* prebuilt); /* in: prebuilt struct in MySQL
row_prebuilt_t* prebuilt, /* in: prebuilt struct in MySQL
handle */
ibool has_latches_on_recs);/* TRUE if called so that we have
the latches on the records under pcur
and clust_pcur, and we do not need to
reposition the cursors. */
/*************************************************************************
Creates an query graph node of 'update' type to be used in the MySQL
interface. */
......
......@@ -16,10 +16,39 @@ Created 3/26/1996 Heikki Tuuri
#include "que0types.h"
#include "mem0mem.h"
#include "read0types.h"
#include "dict0types.h"
#include "trx0xa.h"
extern ulint trx_n_mysql_transactions;
/*****************************************************************
Resets the new record lock info in a transaction struct. */
UNIV_INLINE
void
trx_reset_new_rec_lock_info(
/*========================*/
trx_t* trx); /* in: transaction struct */
/*****************************************************************
Registers that we have set a new record lock on an index. This can only be
called twice after calling trx_reset_new_rec_lock_info(), since we only have
space to store 2 indexes! */
UNIV_INLINE
void
trx_register_new_rec_lock(
/*======================*/
trx_t* trx, /* in: transaction struct */
dict_index_t* index); /* in: trx sets a new record lock on this
index*/
/*****************************************************************
Checks if trx has set a new record lock on an index. */
UNIV_INLINE
ibool
trx_new_rec_locks_contain(
/*======================*/
/* out: TRUE if trx has set a new record lock
on index */
trx_t* trx, /* in: transaction struct */
dict_index_t* index); /* in: index */
/************************************************************************
Releases the search latch if trx has reserved it. */
......@@ -495,8 +524,18 @@ struct trx_struct{
lock_t* auto_inc_lock; /* possible auto-inc lock reserved by
the transaction; note that it is also
in the lock list trx_locks */
ibool trx_create_lock;/* this is TRUE if we have created a
new lock for a record accessed */
dict_index_t* new_rec_locks[2];/* these are normally NULL; if
srv_locks_unsafe_for_binlog is TRUE,
in a cursor search, if we set a new
record lock on an index, this is set
to point to the index; this is
used in releasing the locks under the
cursors if we are performing an UPDATE
and we determine after retrieving
the row that it does not need to be
locked; thus, these can be used to
implement a 'mini-rollback' that
releases the latest record locks */
UT_LIST_NODE_T(trx_t)
trx_list; /* list of transactions */
UT_LIST_NODE_T(trx_t)
......
......@@ -39,4 +39,52 @@ trx_start_if_not_started_low(
}
}
/*****************************************************************
Resets the new record lock info in a transaction struct. */
UNIV_INLINE
void
trx_reset_new_rec_lock_info(
/*========================*/
trx_t* trx) /* in: transaction struct */
{
trx->new_rec_locks[0] = NULL;
trx->new_rec_locks[1] = NULL;
}
/*****************************************************************
Registers that we have set a new record lock on an index. This can only be
called twice after calling trx_reset_new_rec_lock_info(), since we only have
space to store 2 indexes! */
UNIV_INLINE
void
trx_register_new_rec_lock(
/*======================*/
trx_t* trx, /* in: transaction struct */
dict_index_t* index) /* in: trx sets a new record lock on this
index*/
{
if (trx->new_rec_locks[0] == NULL) {
trx->new_rec_locks[0] = index;
return;
}
ut_a(trx->new_rec_locks[1] == NULL);
trx->new_rec_locks[1] = index;
}
/*****************************************************************
Checks if trx has set a new record lock on an index. */
UNIV_INLINE
ibool
trx_new_rec_locks_contain(
/*======================*/
/* out: TRUE if trx has set a new record lock
on index */
trx_t* trx, /* in: transaction struct */
dict_index_t* index) /* in: index */
{
return(trx->new_rec_locks[0] == index
|| trx->new_rec_locks[1] == index);
}
......@@ -956,7 +956,7 @@ lock_rec_has_to_wait(
cause waits */
if ((lock_is_on_supremum || (type_mode & LOCK_GAP))
&& !(type_mode & LOCK_INSERT_INTENTION)) {
&& !(type_mode & LOCK_INSERT_INTENTION)) {
/* Gap type locks without LOCK_INSERT_INTENTION flag
do not need to wait for anything. This is because
......@@ -1765,10 +1765,7 @@ lock_rec_create(
lock_rec_set_nth_bit(lock, heap_no);
HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
lock_rec_fold(space, page_no), lock);
/* Note that we have create a new lock */
trx->trx_create_lock = TRUE;
lock_rec_fold(space, page_no), lock);
if (type_mode & LOCK_WAIT) {
lock_set_lock_and_trx_wait(lock, trx);
......@@ -1945,15 +1942,6 @@ lock_rec_add_to_queue(
if (similar_lock && !somebody_waits && !(type_mode & LOCK_WAIT)) {
/* If the nth bit of a record lock is already set then we
do not set a new lock bit, otherwice we set */
if (lock_rec_get_nth_bit(similar_lock, heap_no)) {
trx->trx_create_lock = FALSE;
} else {
trx->trx_create_lock = TRUE;
}
lock_rec_set_nth_bit(similar_lock, heap_no);
return(similar_lock);
......@@ -2005,11 +1993,14 @@ lock_rec_lock_fast(
lock = lock_rec_get_first_on_page(rec);
trx = thr_get_trx(thr);
trx->trx_create_lock = FALSE;
if (lock == NULL) {
if (!impl) {
lock_rec_create(mode, rec, index, trx);
if (srv_locks_unsafe_for_binlog) {
trx_register_new_rec_lock(trx, index);
}
}
return(TRUE);
......@@ -2021,23 +2012,22 @@ lock_rec_lock_fast(
}
if (lock->trx != trx
|| lock->type_mode != (mode | LOCK_REC)
|| lock_rec_get_n_bits(lock) <= heap_no) {
|| lock->type_mode != (mode | LOCK_REC)
|| lock_rec_get_n_bits(lock) <= heap_no) {
return(FALSE);
}
if (!impl) {
/* If the nth bit of the record lock is already set then we
do not set a new lock bit, otherwise we do set */
/* If the nth bit of a record lock is already set then we
do not set a new lock bit, otherwice we set */
if (lock_rec_get_nth_bit(lock, heap_no)) {
trx->trx_create_lock = FALSE;
} else {
trx->trx_create_lock = TRUE;
if (!lock_rec_get_nth_bit(lock, heap_no)) {
lock_rec_set_nth_bit(lock, heap_no);
if (srv_locks_unsafe_for_binlog) {
trx_register_new_rec_lock(trx, index);
}
}
lock_rec_set_nth_bit(lock, heap_no);
}
return(TRUE);
......@@ -2093,12 +2083,19 @@ lock_rec_lock_slow(
enough already granted on the record, we have to wait. */
err = lock_rec_enqueue_waiting(mode, rec, index, thr);
if (srv_locks_unsafe_for_binlog) {
trx_register_new_rec_lock(trx, index);
}
} else {
if (!impl) {
/* Set the requested lock on the record */
lock_rec_add_to_queue(LOCK_REC | mode, rec, index,
trx);
if (srv_locks_unsafe_for_binlog) {
trx_register_new_rec_lock(trx, index);
}
}
err = DB_SUCCESS;
......@@ -2436,8 +2433,15 @@ lock_rec_inherit_to_gap(
lock = lock_rec_get_first(rec);
/* If srv_locks_unsafe_for_binlog is TRUE, we do not want locks set
by an UPDATE or a DELETE to be inherited as gap type locks. But we
DO want S-locks set by a consistency constraint to be inherited also
then. */
while (lock != NULL) {
if (!lock_rec_get_insert_intention(lock)) {
if (!lock_rec_get_insert_intention(lock)
&& !(srv_locks_unsafe_for_binlog
&& lock_get_mode(lock) == LOCK_X)) {
lock_rec_add_to_queue(LOCK_REC | lock_get_mode(lock)
| LOCK_GAP,
......@@ -3069,7 +3073,7 @@ lock_update_insert(
lock_rec_inherit_to_gap_if_gap_lock(rec, page_rec_get_next(rec));
lock_mutex_exit_kernel();
}
}
/*****************************************************************
Updates the lock table when a record is removed. */
......
......@@ -1429,51 +1429,106 @@ run_again:
}
/*************************************************************************
Does an unlock of a row for MySQL. */
This can only be used when srv_locks_unsafe_for_binlog is TRUE. Before
calling this function we must use trx_reset_new_rec_lock_info() and
trx_register_new_rec_lock() to store the information which new record locks
really were set. This function removes a newly set lock under prebuilt->pcur,
and also under prebuilt->clust_pcur. Currently, this is only used and tested
in the case of an UPDATE or a DELETE statement, where the row lock is of the
LOCK_X type.
Thus, this implements a 'mini-rollback' that releases the latest record
locks we set. */
int
row_unlock_for_mysql(
/*=================*/
/* out: error code or DB_SUCCESS */
row_prebuilt_t* prebuilt) /* in: prebuilt struct in MySQL
row_prebuilt_t* prebuilt, /* in: prebuilt struct in MySQL
handle */
ibool has_latches_on_recs)/* TRUE if called so that we have
the latches on the records under pcur
and clust_pcur, and we do not need to
reposition the cursors. */
{
rec_t* rec;
btr_pcur_t* cur = prebuilt->pcur;
dict_index_t* index;
btr_pcur_t* pcur = prebuilt->pcur;
btr_pcur_t* clust_pcur = prebuilt->clust_pcur;
trx_t* trx = prebuilt->trx;
rec_t* rec;
mtr_t mtr;
ut_ad(prebuilt && trx);
ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
if (!srv_locks_unsafe_for_binlog) {
fprintf(stderr,
"InnoDB: Error: calling row_unlock_for_mysql though\n"
"InnoDB: srv_locks_unsafe_for_binlog is FALSE.\n");
return(DB_SUCCESS);
}
trx->op_info = "unlock_row";
if (srv_locks_unsafe_for_binlog) {
if (trx->trx_create_lock == TRUE) {
mtr_start(&mtr);
index = btr_pcur_get_btr_cur(pcur)->index;
if (index != NULL && trx_new_rec_locks_contain(trx, index)) {
mtr_start(&mtr);
/* Restore a cursor position and find a record */
btr_pcur_restore_position(BTR_SEARCH_LEAF, cur, &mtr);
rec = btr_pcur_get_rec(cur);
/* Restore the cursor position and find the record */
if (!has_latches_on_recs) {
btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, &mtr);
}
if (rec) {
rec = btr_pcur_get_rec(pcur);
lock_rec_reset_and_release_wait(rec);
} else {
fputs("InnoDB: Error: "
"Record for the lock not found\n",
stderr);
mem_analyze_corruption((byte*) trx);
ut_error;
}
mutex_enter(&kernel_mutex);
trx->trx_create_lock = FALSE;
mtr_commit(&mtr);
lock_rec_reset_and_release_wait(rec);
mutex_exit(&kernel_mutex);
mtr_commit(&mtr);
/* If the search was done through the clustered index, then
we have not used clust_pcur at all, and we must NOT try to
reset locks on clust_pcur. The values in clust_pcur may be
garbage! */
if (index->type & DICT_CLUSTERED) {
goto func_exit;
}
}
index = btr_pcur_get_btr_cur(clust_pcur)->index;
if (index != NULL && trx_new_rec_locks_contain(trx, index)) {
mtr_start(&mtr);
/* Restore the cursor position and find the record */
if (!has_latches_on_recs) {
btr_pcur_restore_position(BTR_SEARCH_LEAF, clust_pcur,
&mtr);
}
rec = btr_pcur_get_rec(pcur);
mutex_enter(&kernel_mutex);
lock_rec_reset_and_release_wait(rec);
mutex_exit(&kernel_mutex);
mtr_commit(&mtr);
}
func_exit:
trx->op_info = "";
return(DB_SUCCESS);
......
......@@ -2784,6 +2784,10 @@ sel_restore_position_for_mysql(
process the record the cursor is
now positioned on (i.e. we should
not go to the next record yet) */
ibool* same_user_rec, /* out: TRUE if we were able to restore
the cursor on a user record with the
same ordering prefix in in the
B-tree index */
ulint latch_mode, /* in: latch mode wished in
restoration */
btr_pcur_t* pcur, /* in: cursor whose position
......@@ -2800,6 +2804,8 @@ sel_restore_position_for_mysql(
success = btr_pcur_restore_position(latch_mode, pcur, mtr);
*same_user_rec = success;
if (relative_position == BTR_PCUR_ON) {
if (success) {
return(FALSE);
......@@ -3064,10 +3070,12 @@ row_search_for_mysql(
ulint cnt = 0;
#endif /* UNIV_SEARCH_DEBUG */
ulint next_offs;
ibool same_user_rec;
mtr_t mtr;
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
ut_ad(index && pcur && search_tuple);
......@@ -3138,6 +3146,14 @@ row_search_for_mysql(
trx->search_latch_timeout = BTR_SEA_TIMEOUT;
}
/* Reset the new record lock info if we srv_locks_unsafe_for_binlog
is set. Then we are able to remove the record locks set here on an
individual row. */
if (srv_locks_unsafe_for_binlog) {
trx_reset_new_rec_lock_info(trx);
}
/*-------------------------------------------------------------*/
/* PHASE 1: Try to pop the row from the prefetch cache */
......@@ -3396,8 +3412,9 @@ shortcut_fails_too_big_rec:
clust_index = dict_table_get_first_index(index->table);
if (UNIV_LIKELY(direction != 0)) {
if (!sel_restore_position_for_mysql(BTR_SEARCH_LEAF, pcur,
moves_up, &mtr)) {
if (!sel_restore_position_for_mysql(&same_user_rec,
BTR_SEARCH_LEAF,
pcur, moves_up, &mtr)) {
goto next_rec;
}
......@@ -3659,7 +3676,7 @@ rec_loop:
goto normal_return;
}
}
/* We are ready to look at a possible new index entry in the result
set: the cursor is now placed on a user record */
......@@ -3679,6 +3696,7 @@ rec_loop:
|| srv_locks_unsafe_for_binlog
|| (unique_search && !UNIV_UNLIKELY(rec_get_deleted_flag(
rec, page_rec_is_comp(rec))))) {
goto no_gap_lock;
} else {
lock_type = LOCK_ORDINARY;
......@@ -3701,7 +3719,7 @@ rec_loop:
&& dtuple_get_n_fields_cmp(search_tuple)
== dict_index_get_n_unique(index)
&& 0 == cmp_dtuple_rec(search_tuple, rec, offsets)) {
no_gap_lock:
no_gap_lock:
lock_type = LOCK_REC_NOT_GAP;
}
......@@ -3764,6 +3782,7 @@ rec_loop:
/* Get the clustered index record if needed */
index_rec = rec;
ut_ad(index != clust_index);
goto requires_clust_rec;
}
}
......@@ -3773,6 +3792,15 @@ rec_loop:
/* The record is delete-marked: we can skip it if this is
not a consistent read which might see an earlier version
of a non-clustered index record */
if (srv_locks_unsafe_for_binlog) {
/* No need to keep a lock on a delete-marked record
if we do not want to use next-key locking. */
row_unlock_for_mysql(prebuilt, TRUE);
trx_reset_new_rec_lock_info(trx);
}
goto next_rec;
}
......@@ -3783,7 +3811,8 @@ rec_loop:
index_rec = rec;
if (index != clust_index && prebuilt->need_to_access_clustered) {
requires_clust_rec:
requires_clust_rec:
/* Before and after this "if" block, "offsets" will be
related to "rec", which may be in a secondary index "index" or
the clustered index ("clust_index"). However, after this
......@@ -3816,6 +3845,16 @@ rec_loop:
/* The record is delete marked: we can skip it */
if (srv_locks_unsafe_for_binlog) {
/* No need to keep a lock on a delete-marked
record if we do not want to use next-key
locking. */
row_unlock_for_mysql(prebuilt, TRUE);
trx_reset_new_rec_lock_info(trx);
}
goto next_rec;
}
......@@ -3908,7 +3947,7 @@ got_row:
next_rec:
/*-------------------------------------------------------------*/
/* PHASE 5: Move the cursor to the next index record */
if (UNIV_UNLIKELY(mtr_has_extra_clust_latch)) {
/* We must commit mtr if we are moving to the next
non-clustered index record, because we could break the
......@@ -3921,8 +3960,9 @@ next_rec:
mtr_has_extra_clust_latch = FALSE;
mtr_start(&mtr);
if (sel_restore_position_for_mysql(BTR_SEARCH_LEAF, pcur,
moves_up, &mtr)) {
if (sel_restore_position_for_mysql(&same_user_rec,
BTR_SEARCH_LEAF,
pcur, moves_up, &mtr)) {
#ifdef UNIV_SEARCH_DEBUG
cnt++;
#endif /* UNIV_SEARCH_DEBUG */
......@@ -3976,8 +4016,29 @@ lock_wait_or_error:
thr->lock_state = QUE_THR_LOCK_NOLOCK;
mtr_start(&mtr);
sel_restore_position_for_mysql(BTR_SEARCH_LEAF, pcur,
moves_up, &mtr);
sel_restore_position_for_mysql(&same_user_rec,
BTR_SEARCH_LEAF, pcur,
moves_up, &mtr);
if (srv_locks_unsafe_for_binlog && !same_user_rec) {
/* Since we were not able to restore the cursor
on the same user record, we cannot use
row_unlock_for_mysql() to unlock any records, and
we must thus reset the new rec lock info. Since
in lock0lock.c we have blocked the inheriting of gap
X-locks, we actually do not have any new record locks
set in this case.
Note that if we were able to restore on the 'same'
user record, it is still possible that we were actually
waiting on a delete-marked record, and meanwhile
it was removed by purge and inserted again by some
other user. But that is no problem, because in
rec_loop we will again try to set a lock, and
new_rec_lock_info in trx will be right at the end. */
trx_reset_new_rec_lock_info(trx);
}
mode = pcur->search_mode;
goto rec_loop;
......
......@@ -166,6 +166,8 @@ trx_create(
memset(&trx->xid, 0, sizeof(trx->xid));
trx->xid.formatID = -1;
trx_reset_new_rec_lock_info(trx);
return(trx);
}
......
......@@ -3538,7 +3538,9 @@ ha_innobase::delete_row(
}
/**************************************************************************
Deletes a lock set to a row */
Removes a new lock set on a row. This can be called after a row has been read
in the processing of an UPDATE or a DELETE query, if the option
innodb_locks_unsafe_for_binlog is set. */
void
ha_innobase::unlock_row(void)
......@@ -3556,8 +3558,10 @@ ha_innobase::unlock_row(void)
mem_analyze_corruption((byte *) prebuilt->trx);
ut_error;
}
row_unlock_for_mysql(prebuilt);
if (srv_locks_unsafe_for_binlog) {
row_unlock_for_mysql(prebuilt, FALSE);
}
}
/**********************************************************************
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment