Commit a9cf7ad0 authored by heikki@hundin.mysql.fi's avatar heikki@hundin.mysql.fi

Merge heikki@bk-internal.mysql.com:/home/bk/mysql-5.0

into hundin.mysql.fi:/home/heikki/mysql-5.0
parents a834b692 44c0f57d
...@@ -243,17 +243,27 @@ row_update_for_mysql( ...@@ -243,17 +243,27 @@ row_update_for_mysql(
the MySQL format */ the MySQL format */
row_prebuilt_t* prebuilt); /* in: prebuilt struct in MySQL row_prebuilt_t* prebuilt); /* in: prebuilt struct in MySQL
handle */ handle */
/************************************************************************* /*************************************************************************
Does an unlock of a row for MySQL. */ This can only be used when srv_locks_unsafe_for_binlog is TRUE. Before
calling this function we must use trx_reset_new_rec_lock_info() and
trx_register_new_rec_lock() to store the information which new record locks
really were set. This function removes a newly set lock under prebuilt->pcur,
and also under prebuilt->clust_pcur. Currently, this is only used and tested
in the case of an UPDATE or a DELETE statement, where the row lock is of the
LOCK_X type.
Thus, this implements a 'mini-rollback' that releases the latest record
locks we set. */
int int
row_unlock_for_mysql( row_unlock_for_mysql(
/*=================*/ /*=================*/
/* out: error code or DB_SUCCESS */ /* out: error code or DB_SUCCESS */
row_prebuilt_t* prebuilt); /* in: prebuilt struct in MySQL row_prebuilt_t* prebuilt, /* in: prebuilt struct in MySQL
handle */ handle */
ibool has_latches_on_recs);/* TRUE if called so that we have
the latches on the records under pcur
and clust_pcur, and we do not need to
reposition the cursors. */
/************************************************************************* /*************************************************************************
Creates an query graph node of 'update' type to be used in the MySQL Creates an query graph node of 'update' type to be used in the MySQL
interface. */ interface. */
......
...@@ -16,10 +16,39 @@ Created 3/26/1996 Heikki Tuuri ...@@ -16,10 +16,39 @@ Created 3/26/1996 Heikki Tuuri
#include "que0types.h" #include "que0types.h"
#include "mem0mem.h" #include "mem0mem.h"
#include "read0types.h" #include "read0types.h"
#include "dict0types.h"
#include "trx0xa.h" #include "trx0xa.h"
extern ulint trx_n_mysql_transactions; extern ulint trx_n_mysql_transactions;
/*****************************************************************
Resets the new record lock info in a transaction struct. */
UNIV_INLINE
void
trx_reset_new_rec_lock_info(
/*========================*/
trx_t* trx); /* in: transaction struct */
/*****************************************************************
Registers that we have set a new record lock on an index. We only have space
to store 2 indexes! If this is called to store more than 2 indexes after
trx_reset_new_rec_lock_info(), then this function does nothing. */
UNIV_INLINE
void
trx_register_new_rec_lock(
/*======================*/
trx_t* trx, /* in: transaction struct */
dict_index_t* index); /* in: trx sets a new record lock on this
index */
/*****************************************************************
Checks if trx has set a new record lock on an index. */
UNIV_INLINE
ibool
trx_new_rec_locks_contain(
/*======================*/
/* out: TRUE if trx has set a new record lock
on index */
trx_t* trx, /* in: transaction struct */
dict_index_t* index); /* in: index */
/************************************************************************ /************************************************************************
Releases the search latch if trx has reserved it. */ Releases the search latch if trx has reserved it. */
...@@ -495,8 +524,18 @@ struct trx_struct{ ...@@ -495,8 +524,18 @@ struct trx_struct{
lock_t* auto_inc_lock; /* possible auto-inc lock reserved by lock_t* auto_inc_lock; /* possible auto-inc lock reserved by
the transaction; note that it is also the transaction; note that it is also
in the lock list trx_locks */ in the lock list trx_locks */
ibool trx_create_lock;/* this is TRUE if we have created a dict_index_t* new_rec_locks[2];/* these are normally NULL; if
new lock for a record accessed */ srv_locks_unsafe_for_binlog is TRUE,
in a cursor search, if we set a new
record lock on an index, this is set
to point to the index; this is
used in releasing the locks under the
cursors if we are performing an UPDATE
and we determine after retrieving
the row that it does not need to be
locked; thus, these can be used to
implement a 'mini-rollback' that
releases the latest record locks */
UT_LIST_NODE_T(trx_t) UT_LIST_NODE_T(trx_t)
trx_list; /* list of transactions */ trx_list; /* list of transactions */
UT_LIST_NODE_T(trx_t) UT_LIST_NODE_T(trx_t)
......
...@@ -39,4 +39,60 @@ trx_start_if_not_started_low( ...@@ -39,4 +39,60 @@ trx_start_if_not_started_low(
} }
} }
/*****************************************************************
Resets the new record lock info in a transaction struct. */
UNIV_INLINE
void
trx_reset_new_rec_lock_info(
/*========================*/
trx_t* trx) /* in: transaction struct */
{
trx->new_rec_locks[0] = NULL;
trx->new_rec_locks[1] = NULL;
}
/*****************************************************************
Registers that we have set a new record lock on an index. We only have space
to store 2 indexes! If this is called to store more than 2 indexes after
trx_reset_new_rec_lock_info(), then this function does nothing. */
UNIV_INLINE
void
trx_register_new_rec_lock(
/*======================*/
trx_t* trx, /* in: transaction struct */
dict_index_t* index) /* in: trx sets a new record lock on this
index */
{
if (trx->new_rec_locks[0] == NULL) {
trx->new_rec_locks[0] = index;
return;
}
if (trx->new_rec_locks[0] == index) {
return;
}
if (trx->new_rec_locks[1] != NULL) {
return;
}
trx->new_rec_locks[1] = index;
}
/*****************************************************************
Checks if trx has set a new record lock on an index. */
UNIV_INLINE
ibool
trx_new_rec_locks_contain(
/*======================*/
/* out: TRUE if trx has set a new record lock
on index */
trx_t* trx, /* in: transaction struct */
dict_index_t* index) /* in: index */
{
return(trx->new_rec_locks[0] == index
|| trx->new_rec_locks[1] == index);
}
...@@ -1766,9 +1766,6 @@ lock_rec_create( ...@@ -1766,9 +1766,6 @@ lock_rec_create(
HASH_INSERT(lock_t, hash, lock_sys->rec_hash, HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
lock_rec_fold(space, page_no), lock); lock_rec_fold(space, page_no), lock);
/* Note that we have create a new lock */
trx->trx_create_lock = TRUE;
if (type_mode & LOCK_WAIT) { if (type_mode & LOCK_WAIT) {
lock_set_lock_and_trx_wait(lock, trx); lock_set_lock_and_trx_wait(lock, trx);
...@@ -1945,15 +1942,6 @@ lock_rec_add_to_queue( ...@@ -1945,15 +1942,6 @@ lock_rec_add_to_queue(
if (similar_lock && !somebody_waits && !(type_mode & LOCK_WAIT)) { if (similar_lock && !somebody_waits && !(type_mode & LOCK_WAIT)) {
/* If the nth bit of a record lock is already set then we
do not set a new lock bit, otherwice we set */
if (lock_rec_get_nth_bit(similar_lock, heap_no)) {
trx->trx_create_lock = FALSE;
} else {
trx->trx_create_lock = TRUE;
}
lock_rec_set_nth_bit(similar_lock, heap_no); lock_rec_set_nth_bit(similar_lock, heap_no);
return(similar_lock); return(similar_lock);
...@@ -2005,11 +1993,14 @@ lock_rec_lock_fast( ...@@ -2005,11 +1993,14 @@ lock_rec_lock_fast(
lock = lock_rec_get_first_on_page(rec); lock = lock_rec_get_first_on_page(rec);
trx = thr_get_trx(thr); trx = thr_get_trx(thr);
trx->trx_create_lock = FALSE;
if (lock == NULL) { if (lock == NULL) {
if (!impl) { if (!impl) {
lock_rec_create(mode, rec, index, trx); lock_rec_create(mode, rec, index, trx);
if (srv_locks_unsafe_for_binlog) {
trx_register_new_rec_lock(trx, index);
}
} }
return(TRUE); return(TRUE);
...@@ -2023,21 +2014,20 @@ lock_rec_lock_fast( ...@@ -2023,21 +2014,20 @@ lock_rec_lock_fast(
if (lock->trx != trx if (lock->trx != trx
|| lock->type_mode != (mode | LOCK_REC) || lock->type_mode != (mode | LOCK_REC)
|| lock_rec_get_n_bits(lock) <= heap_no) { || lock_rec_get_n_bits(lock) <= heap_no) {
return(FALSE); return(FALSE);
} }
if (!impl) { if (!impl) {
/* If the nth bit of the record lock is already set then we
do not set a new lock bit, otherwise we do set */
/* If the nth bit of a record lock is already set then we if (!lock_rec_get_nth_bit(lock, heap_no)) {
do not set a new lock bit, otherwice we set */
if (lock_rec_get_nth_bit(lock, heap_no)) {
trx->trx_create_lock = FALSE;
} else {
trx->trx_create_lock = TRUE;
}
lock_rec_set_nth_bit(lock, heap_no); lock_rec_set_nth_bit(lock, heap_no);
if (srv_locks_unsafe_for_binlog) {
trx_register_new_rec_lock(trx, index);
}
}
} }
return(TRUE); return(TRUE);
...@@ -2093,12 +2083,19 @@ lock_rec_lock_slow( ...@@ -2093,12 +2083,19 @@ lock_rec_lock_slow(
enough already granted on the record, we have to wait. */ enough already granted on the record, we have to wait. */
err = lock_rec_enqueue_waiting(mode, rec, index, thr); err = lock_rec_enqueue_waiting(mode, rec, index, thr);
if (srv_locks_unsafe_for_binlog) {
trx_register_new_rec_lock(trx, index);
}
} else { } else {
if (!impl) { if (!impl) {
/* Set the requested lock on the record */ /* Set the requested lock on the record */
lock_rec_add_to_queue(LOCK_REC | mode, rec, index, lock_rec_add_to_queue(LOCK_REC | mode, rec, index,
trx); trx);
if (srv_locks_unsafe_for_binlog) {
trx_register_new_rec_lock(trx, index);
}
} }
err = DB_SUCCESS; err = DB_SUCCESS;
...@@ -2436,8 +2433,15 @@ lock_rec_inherit_to_gap( ...@@ -2436,8 +2433,15 @@ lock_rec_inherit_to_gap(
lock = lock_rec_get_first(rec); lock = lock_rec_get_first(rec);
/* If srv_locks_unsafe_for_binlog is TRUE, we do not want locks set
by an UPDATE or a DELETE to be inherited as gap type locks. But we
DO want S-locks set by a consistency constraint to be inherited also
then. */
while (lock != NULL) { while (lock != NULL) {
if (!lock_rec_get_insert_intention(lock)) { if (!lock_rec_get_insert_intention(lock)
&& !(srv_locks_unsafe_for_binlog
&& lock_get_mode(lock) == LOCK_X)) {
lock_rec_add_to_queue(LOCK_REC | lock_get_mode(lock) lock_rec_add_to_queue(LOCK_REC | lock_get_mode(lock)
| LOCK_GAP, | LOCK_GAP,
......
...@@ -1429,51 +1429,106 @@ run_again: ...@@ -1429,51 +1429,106 @@ run_again:
} }
/************************************************************************* /*************************************************************************
Does an unlock of a row for MySQL. */ This can only be used when srv_locks_unsafe_for_binlog is TRUE. Before
calling this function we must use trx_reset_new_rec_lock_info() and
trx_register_new_rec_lock() to store the information which new record locks
really were set. This function removes a newly set lock under prebuilt->pcur,
and also under prebuilt->clust_pcur. Currently, this is only used and tested
in the case of an UPDATE or a DELETE statement, where the row lock is of the
LOCK_X type.
Thus, this implements a 'mini-rollback' that releases the latest record
locks we set. */
int int
row_unlock_for_mysql( row_unlock_for_mysql(
/*=================*/ /*=================*/
/* out: error code or DB_SUCCESS */ /* out: error code or DB_SUCCESS */
row_prebuilt_t* prebuilt) /* in: prebuilt struct in MySQL row_prebuilt_t* prebuilt, /* in: prebuilt struct in MySQL
handle */ handle */
ibool has_latches_on_recs)/* TRUE if called so that we have
the latches on the records under pcur
and clust_pcur, and we do not need to
reposition the cursors. */
{ {
rec_t* rec; dict_index_t* index;
btr_pcur_t* cur = prebuilt->pcur; btr_pcur_t* pcur = prebuilt->pcur;
btr_pcur_t* clust_pcur = prebuilt->clust_pcur;
trx_t* trx = prebuilt->trx; trx_t* trx = prebuilt->trx;
rec_t* rec;
mtr_t mtr; mtr_t mtr;
ut_ad(prebuilt && trx); ut_ad(prebuilt && trx);
ut_ad(trx->mysql_thread_id == os_thread_get_curr_id()); ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
if (!srv_locks_unsafe_for_binlog) {
fprintf(stderr,
"InnoDB: Error: calling row_unlock_for_mysql though\n"
"InnoDB: srv_locks_unsafe_for_binlog is FALSE.\n");
return(DB_SUCCESS);
}
trx->op_info = "unlock_row"; trx->op_info = "unlock_row";
if (srv_locks_unsafe_for_binlog) { index = btr_pcur_get_btr_cur(pcur)->index;
if (trx->trx_create_lock == TRUE) {
if (index != NULL && trx_new_rec_locks_contain(trx, index)) {
mtr_start(&mtr); mtr_start(&mtr);
/* Restore a cursor position and find a record */ /* Restore the cursor position and find the record */
btr_pcur_restore_position(BTR_SEARCH_LEAF, cur, &mtr);
rec = btr_pcur_get_rec(cur); if (!has_latches_on_recs) {
btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, &mtr);
}
if (rec) { rec = btr_pcur_get_rec(pcur);
mutex_enter(&kernel_mutex);
lock_rec_reset_and_release_wait(rec); lock_rec_reset_and_release_wait(rec);
} else {
fputs("InnoDB: Error: "
"Record for the lock not found\n",
stderr);
mem_analyze_corruption((byte*) trx);
ut_error;
}
trx->trx_create_lock = FALSE; mutex_exit(&kernel_mutex);
mtr_commit(&mtr); mtr_commit(&mtr);
/* If the search was done through the clustered index, then
we have not used clust_pcur at all, and we must NOT try to
reset locks on clust_pcur. The values in clust_pcur may be
garbage! */
if (index->type & DICT_CLUSTERED) {
goto func_exit;
}
} }
index = btr_pcur_get_btr_cur(clust_pcur)->index;
if (index != NULL && trx_new_rec_locks_contain(trx, index)) {
mtr_start(&mtr);
/* Restore the cursor position and find the record */
if (!has_latches_on_recs) {
btr_pcur_restore_position(BTR_SEARCH_LEAF, clust_pcur,
&mtr);
}
rec = btr_pcur_get_rec(clust_pcur);
mutex_enter(&kernel_mutex);
lock_rec_reset_and_release_wait(rec);
mutex_exit(&kernel_mutex);
mtr_commit(&mtr);
} }
func_exit:
trx->op_info = ""; trx->op_info = "";
return(DB_SUCCESS); return(DB_SUCCESS);
......
...@@ -2784,6 +2784,10 @@ sel_restore_position_for_mysql( ...@@ -2784,6 +2784,10 @@ sel_restore_position_for_mysql(
process the record the cursor is process the record the cursor is
now positioned on (i.e. we should now positioned on (i.e. we should
not go to the next record yet) */ not go to the next record yet) */
ibool* same_user_rec, /* out: TRUE if we were able to restore
the cursor on a user record with the
same ordering prefix in in the
B-tree index */
ulint latch_mode, /* in: latch mode wished in ulint latch_mode, /* in: latch mode wished in
restoration */ restoration */
btr_pcur_t* pcur, /* in: cursor whose position btr_pcur_t* pcur, /* in: cursor whose position
...@@ -2800,6 +2804,8 @@ sel_restore_position_for_mysql( ...@@ -2800,6 +2804,8 @@ sel_restore_position_for_mysql(
success = btr_pcur_restore_position(latch_mode, pcur, mtr); success = btr_pcur_restore_position(latch_mode, pcur, mtr);
*same_user_rec = success;
if (relative_position == BTR_PCUR_ON) { if (relative_position == BTR_PCUR_ON) {
if (success) { if (success) {
return(FALSE); return(FALSE);
...@@ -3064,10 +3070,12 @@ row_search_for_mysql( ...@@ -3064,10 +3070,12 @@ row_search_for_mysql(
ulint cnt = 0; ulint cnt = 0;
#endif /* UNIV_SEARCH_DEBUG */ #endif /* UNIV_SEARCH_DEBUG */
ulint next_offs; ulint next_offs;
ibool same_user_rec;
mtr_t mtr; mtr_t mtr;
mem_heap_t* heap = NULL; mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE]; ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_; ulint* offsets = offsets_;
*offsets_ = (sizeof offsets_) / sizeof *offsets_; *offsets_ = (sizeof offsets_) / sizeof *offsets_;
ut_ad(index && pcur && search_tuple); ut_ad(index && pcur && search_tuple);
...@@ -3138,6 +3146,14 @@ row_search_for_mysql( ...@@ -3138,6 +3146,14 @@ row_search_for_mysql(
trx->search_latch_timeout = BTR_SEA_TIMEOUT; trx->search_latch_timeout = BTR_SEA_TIMEOUT;
} }
/* Reset the new record lock info if we srv_locks_unsafe_for_binlog
is set. Then we are able to remove the record locks set here on an
individual row. */
if (srv_locks_unsafe_for_binlog) {
trx_reset_new_rec_lock_info(trx);
}
/*-------------------------------------------------------------*/ /*-------------------------------------------------------------*/
/* PHASE 1: Try to pop the row from the prefetch cache */ /* PHASE 1: Try to pop the row from the prefetch cache */
...@@ -3396,8 +3412,9 @@ shortcut_fails_too_big_rec: ...@@ -3396,8 +3412,9 @@ shortcut_fails_too_big_rec:
clust_index = dict_table_get_first_index(index->table); clust_index = dict_table_get_first_index(index->table);
if (UNIV_LIKELY(direction != 0)) { if (UNIV_LIKELY(direction != 0)) {
if (!sel_restore_position_for_mysql(BTR_SEARCH_LEAF, pcur, if (!sel_restore_position_for_mysql(&same_user_rec,
moves_up, &mtr)) { BTR_SEARCH_LEAF,
pcur, moves_up, &mtr)) {
goto next_rec; goto next_rec;
} }
...@@ -3679,6 +3696,7 @@ rec_loop: ...@@ -3679,6 +3696,7 @@ rec_loop:
|| srv_locks_unsafe_for_binlog || srv_locks_unsafe_for_binlog
|| (unique_search && !UNIV_UNLIKELY(rec_get_deleted_flag( || (unique_search && !UNIV_UNLIKELY(rec_get_deleted_flag(
rec, page_rec_is_comp(rec))))) { rec, page_rec_is_comp(rec))))) {
goto no_gap_lock; goto no_gap_lock;
} else { } else {
lock_type = LOCK_ORDINARY; lock_type = LOCK_ORDINARY;
...@@ -3701,7 +3719,7 @@ rec_loop: ...@@ -3701,7 +3719,7 @@ rec_loop:
&& dtuple_get_n_fields_cmp(search_tuple) && dtuple_get_n_fields_cmp(search_tuple)
== dict_index_get_n_unique(index) == dict_index_get_n_unique(index)
&& 0 == cmp_dtuple_rec(search_tuple, rec, offsets)) { && 0 == cmp_dtuple_rec(search_tuple, rec, offsets)) {
no_gap_lock: no_gap_lock:
lock_type = LOCK_REC_NOT_GAP; lock_type = LOCK_REC_NOT_GAP;
} }
...@@ -3764,6 +3782,7 @@ rec_loop: ...@@ -3764,6 +3782,7 @@ rec_loop:
/* Get the clustered index record if needed */ /* Get the clustered index record if needed */
index_rec = rec; index_rec = rec;
ut_ad(index != clust_index); ut_ad(index != clust_index);
goto requires_clust_rec; goto requires_clust_rec;
} }
} }
...@@ -3774,6 +3793,15 @@ rec_loop: ...@@ -3774,6 +3793,15 @@ rec_loop:
not a consistent read which might see an earlier version not a consistent read which might see an earlier version
of a non-clustered index record */ of a non-clustered index record */
if (srv_locks_unsafe_for_binlog) {
/* No need to keep a lock on a delete-marked record
if we do not want to use next-key locking. */
row_unlock_for_mysql(prebuilt, TRUE);
trx_reset_new_rec_lock_info(trx);
}
goto next_rec; goto next_rec;
} }
...@@ -3783,7 +3811,8 @@ rec_loop: ...@@ -3783,7 +3811,8 @@ rec_loop:
index_rec = rec; index_rec = rec;
if (index != clust_index && prebuilt->need_to_access_clustered) { if (index != clust_index && prebuilt->need_to_access_clustered) {
requires_clust_rec:
requires_clust_rec:
/* Before and after this "if" block, "offsets" will be /* Before and after this "if" block, "offsets" will be
related to "rec", which may be in a secondary index "index" or related to "rec", which may be in a secondary index "index" or
the clustered index ("clust_index"). However, after this the clustered index ("clust_index"). However, after this
...@@ -3816,6 +3845,16 @@ rec_loop: ...@@ -3816,6 +3845,16 @@ rec_loop:
/* The record is delete marked: we can skip it */ /* The record is delete marked: we can skip it */
if (srv_locks_unsafe_for_binlog) {
/* No need to keep a lock on a delete-marked
record if we do not want to use next-key
locking. */
row_unlock_for_mysql(prebuilt, TRUE);
trx_reset_new_rec_lock_info(trx);
}
goto next_rec; goto next_rec;
} }
...@@ -3921,8 +3960,9 @@ next_rec: ...@@ -3921,8 +3960,9 @@ next_rec:
mtr_has_extra_clust_latch = FALSE; mtr_has_extra_clust_latch = FALSE;
mtr_start(&mtr); mtr_start(&mtr);
if (sel_restore_position_for_mysql(BTR_SEARCH_LEAF, pcur, if (sel_restore_position_for_mysql(&same_user_rec,
moves_up, &mtr)) { BTR_SEARCH_LEAF,
pcur, moves_up, &mtr)) {
#ifdef UNIV_SEARCH_DEBUG #ifdef UNIV_SEARCH_DEBUG
cnt++; cnt++;
#endif /* UNIV_SEARCH_DEBUG */ #endif /* UNIV_SEARCH_DEBUG */
...@@ -3976,8 +4016,29 @@ lock_wait_or_error: ...@@ -3976,8 +4016,29 @@ lock_wait_or_error:
thr->lock_state = QUE_THR_LOCK_NOLOCK; thr->lock_state = QUE_THR_LOCK_NOLOCK;
mtr_start(&mtr); mtr_start(&mtr);
sel_restore_position_for_mysql(BTR_SEARCH_LEAF, pcur, sel_restore_position_for_mysql(&same_user_rec,
BTR_SEARCH_LEAF, pcur,
moves_up, &mtr); moves_up, &mtr);
if (srv_locks_unsafe_for_binlog && !same_user_rec) {
/* Since we were not able to restore the cursor
on the same user record, we cannot use
row_unlock_for_mysql() to unlock any records, and
we must thus reset the new rec lock info. Since
in lock0lock.c we have blocked the inheriting of gap
X-locks, we actually do not have any new record locks
set in this case.
Note that if we were able to restore on the 'same'
user record, it is still possible that we were actually
waiting on a delete-marked record, and meanwhile
it was removed by purge and inserted again by some
other user. But that is no problem, because in
rec_loop we will again try to set a lock, and
new_rec_lock_info in trx will be right at the end. */
trx_reset_new_rec_lock_info(trx);
}
mode = pcur->search_mode; mode = pcur->search_mode;
goto rec_loop; goto rec_loop;
......
...@@ -166,6 +166,8 @@ trx_create( ...@@ -166,6 +166,8 @@ trx_create(
memset(&trx->xid, 0, sizeof(trx->xid)); memset(&trx->xid, 0, sizeof(trx->xid));
trx->xid.formatID = -1; trx->xid.formatID = -1;
trx_reset_new_rec_lock_info(trx);
return(trx); return(trx);
} }
......
...@@ -1496,8 +1496,8 @@ innobase_start_trx_and_assign_read_view( ...@@ -1496,8 +1496,8 @@ innobase_start_trx_and_assign_read_view(
/********************************************************************* /*********************************************************************
Commits a transaction in an InnoDB database or marks an SQL statement Commits a transaction in an InnoDB database or marks an SQL statement
ended. */ ended. */
static
static int int
innobase_commit( innobase_commit(
/*============*/ /*============*/
/* out: 0 */ /* out: 0 */
...@@ -3538,7 +3538,9 @@ ha_innobase::delete_row( ...@@ -3538,7 +3538,9 @@ ha_innobase::delete_row(
} }
/************************************************************************** /**************************************************************************
Deletes a lock set to a row */ Removes a new lock set on a row. This can be called after a row has been read
in the processing of an UPDATE or a DELETE query, if the option
innodb_locks_unsafe_for_binlog is set. */
void void
ha_innobase::unlock_row(void) ha_innobase::unlock_row(void)
...@@ -3557,7 +3559,9 @@ ha_innobase::unlock_row(void) ...@@ -3557,7 +3559,9 @@ ha_innobase::unlock_row(void)
ut_error; ut_error;
} }
row_unlock_for_mysql(prebuilt); if (srv_locks_unsafe_for_binlog) {
row_unlock_for_mysql(prebuilt, FALSE);
}
} }
/********************************************************************** /**********************************************************************
...@@ -5991,6 +5995,7 @@ ha_innobase::external_lock( ...@@ -5991,6 +5995,7 @@ ha_innobase::external_lock(
reads. */ reads. */
prebuilt->select_lock_type = LOCK_S; prebuilt->select_lock_type = LOCK_S;
prebuilt->stored_select_lock_type = LOCK_S;
} }
/* Starting from 4.1.9, no InnoDB table lock is taken in LOCK /* Starting from 4.1.9, no InnoDB table lock is taken in LOCK
...@@ -6030,7 +6035,6 @@ ha_innobase::external_lock( ...@@ -6030,7 +6035,6 @@ ha_innobase::external_lock(
trx->n_mysql_tables_in_use--; trx->n_mysql_tables_in_use--;
prebuilt->mysql_has_locked = FALSE; prebuilt->mysql_has_locked = FALSE;
/* If the MySQL lock count drops to zero we know that the current SQL /* If the MySQL lock count drops to zero we know that the current SQL
statement has ended */ statement has ended */
...@@ -6563,12 +6567,14 @@ the value of the auto-inc counter. */ ...@@ -6563,12 +6567,14 @@ the value of the auto-inc counter. */
int int
ha_innobase::innobase_read_and_init_auto_inc( ha_innobase::innobase_read_and_init_auto_inc(
/*=========================================*/ /*=========================================*/
/* out: 0 or error code: deadlock or /* out: 0 or error code: deadlock or lock wait
lock wait timeout */ timeout */
longlong* ret) /* out: auto-inc value */ longlong* ret) /* out: auto-inc value */
{ {
row_prebuilt_t* prebuilt = (row_prebuilt_t*) innobase_prebuilt; row_prebuilt_t* prebuilt = (row_prebuilt_t*) innobase_prebuilt;
longlong auto_inc; longlong auto_inc;
ulint old_select_lock_type;
ibool trx_was_not_started = FALSE;
int error; int error;
ut_a(prebuilt); ut_a(prebuilt);
...@@ -6576,6 +6582,10 @@ ha_innobase::innobase_read_and_init_auto_inc( ...@@ -6576,6 +6582,10 @@ ha_innobase::innobase_read_and_init_auto_inc(
(trx_t*) current_thd->ha_data[innobase_hton.slot]); (trx_t*) current_thd->ha_data[innobase_hton.slot]);
ut_a(prebuilt->table); ut_a(prebuilt->table);
if (prebuilt->trx->conc_state == TRX_NOT_STARTED) {
trx_was_not_started = TRUE;
}
/* In case MySQL calls this in the middle of a SELECT query, release /* In case MySQL calls this in the middle of a SELECT query, release
possible adaptive hash latch to avoid deadlocks of threads */ possible adaptive hash latch to avoid deadlocks of threads */
...@@ -6587,7 +6597,9 @@ ha_innobase::innobase_read_and_init_auto_inc( ...@@ -6587,7 +6597,9 @@ ha_innobase::innobase_read_and_init_auto_inc(
/* Already initialized */ /* Already initialized */
*ret = auto_inc; *ret = auto_inc;
return(0); error = 0;
goto func_exit_early;
} }
error = row_lock_table_autoinc_for_mysql(prebuilt); error = row_lock_table_autoinc_for_mysql(prebuilt);
...@@ -6595,7 +6607,7 @@ ha_innobase::innobase_read_and_init_auto_inc( ...@@ -6595,7 +6607,7 @@ ha_innobase::innobase_read_and_init_auto_inc(
if (error != DB_SUCCESS) { if (error != DB_SUCCESS) {
error = convert_error_code_to_mysql(error, user_thd); error = convert_error_code_to_mysql(error, user_thd);
goto func_exit; goto func_exit_early;
} }
/* Check again if someone has initialized the counter meanwhile */ /* Check again if someone has initialized the counter meanwhile */
...@@ -6604,30 +6616,37 @@ ha_innobase::innobase_read_and_init_auto_inc( ...@@ -6604,30 +6616,37 @@ ha_innobase::innobase_read_and_init_auto_inc(
if (auto_inc != 0) { if (auto_inc != 0) {
*ret = auto_inc; *ret = auto_inc;
return(0); error = 0;
goto func_exit_early;
} }
(void) extra(HA_EXTRA_KEYREAD); (void) extra(HA_EXTRA_KEYREAD);
index_init(table->s->next_number_index); index_init(table->s->next_number_index);
/* We use an exclusive lock when we read the max key value from the /* Starting from 5.0.9, we use a consistent read to read the auto-inc
auto-increment column index. This is because then build_template will column maximum value. This eliminates the spurious deadlocks caused
advise InnoDB to fetch all columns. In SHOW TABLE STATUS the query by the row X-lock that we previously used. Note the following flaw
id of the auto-increment column is not changed, and previously InnoDB in our algorithm: if some other user meanwhile UPDATEs the auto-inc
did not fetch it, causing SHOW TABLE STATUS to show wrong values column, our consistent read will not return the largest value. We
for the autoinc column. */ accept this flaw, since the deadlocks were a bigger trouble. */
prebuilt->select_lock_type = LOCK_X;
/* Play safe and also give in another way the hint to fetch /* Fetch all the columns in the key */
all columns in the key: */
prebuilt->hint_need_to_fetch_extra_cols = ROW_RETRIEVE_ALL_COLS; prebuilt->hint_need_to_fetch_extra_cols = ROW_RETRIEVE_ALL_COLS;
prebuilt->trx->mysql_n_tables_locked += 1; old_select_lock_type = prebuilt->select_lock_type;
prebuilt->select_lock_type = LOCK_NONE;
/* Eliminate an InnoDB error print that happens when we try to SELECT
from a table when no table has been locked in ::external_lock(). */
prebuilt->trx->n_mysql_tables_in_use++;
error = index_last(table->record[1]); error = index_last(table->record[1]);
prebuilt->trx->n_mysql_tables_in_use--;
prebuilt->select_lock_type = old_select_lock_type;
if (error) { if (error) {
if (error == HA_ERR_END_OF_FILE) { if (error == HA_ERR_END_OF_FILE) {
/* The table was empty, initialize to 1 */ /* The table was empty, initialize to 1 */
...@@ -6635,7 +6654,10 @@ ha_innobase::innobase_read_and_init_auto_inc( ...@@ -6635,7 +6654,10 @@ ha_innobase::innobase_read_and_init_auto_inc(
error = 0; error = 0;
} else { } else {
/* Deadlock or a lock wait timeout */ /* This should not happen in a consistent read */
fprintf(stderr,
"InnoDB: Error: consistent read of auto-inc column returned %lu\n",
(ulong)error);
auto_inc = -1; auto_inc = -1;
goto func_exit; goto func_exit;
...@@ -6655,6 +6677,17 @@ func_exit: ...@@ -6655,6 +6677,17 @@ func_exit:
*ret = auto_inc; *ret = auto_inc;
func_exit_early:
/* Since MySQL does not seem to call autocommit after SHOW TABLE
STATUS (even if we would register the trx here), we must commit our
transaction here if it was started here. This is to eliminate a
dangling transaction. */
if (trx_was_not_started) {
innobase_commit_low(prebuilt->trx);
}
return(error); return(error);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment