Commit e6206072 authored by marko's avatar marko

Implement semi-consistent read to reduce lock conflicts at the cost

of breaking serializability.  (Bug #3300)

ha_innobase::unlock_row(): reset the "did semi consistent read" flag

ha_innobase::was_semi_consistent_read(),
ha_innobase::try_semi_consistent_read(): new methods

row_prebuilt_t, row_create_prebuilt(): add field row_read_type for
keeping track of semi-consistent reads

row_vers_build_for_semi_consistent_read(),
row_sel_build_committed_vers_for_mysql(): new functions

row_search_for_mysql(): implement semi-consistent reads
parent 65caba82
......@@ -3828,9 +3828,9 @@ ha_innobase::delete_row(
}
/**************************************************************************
Removes a new lock set on a row. This can be called after a row has been read
in the processing of an UPDATE or a DELETE query, if the option
innodb_locks_unsafe_for_binlog is set. */
Removes a new lock set on a row, if it was not read optimistically. This can
be called after a row has been read in the processing of an UPDATE or a DELETE
query, if the option innodb_locks_unsafe_for_binlog is set. */
void
ha_innobase::unlock_row(void)
......@@ -3840,7 +3840,7 @@ ha_innobase::unlock_row(void)
DBUG_ENTER("ha_innobase::unlock_row");
if (last_query_id != user_thd->query_id) {
if (UNIV_UNLIKELY(last_query_id != user_thd->query_id)) {
ut_print_timestamp(stderr);
sql_print_error("last_query_id is %lu != user_thd_query_id is "
"%lu", (ulong) last_query_id,
......@@ -3849,8 +3849,44 @@ ha_innobase::unlock_row(void)
ut_error;
}
if (srv_locks_unsafe_for_binlog) {
switch (prebuilt->row_read_type) {
case ROW_READ_WITH_LOCKS:
if (!srv_locks_unsafe_for_binlog) {
break;
}
/* fall through */
case ROW_READ_TRY_SEMI_CONSISTENT:
row_unlock_for_mysql(prebuilt, FALSE);
break;
case ROW_READ_DID_SEMI_CONSISTENT:
prebuilt->row_read_type = ROW_READ_TRY_SEMI_CONSISTENT;
break;
}
DBUG_VOID_RETURN;
}
/* See handler.h and row0mysql.h for docs on this function. */
bool
ha_innobase::was_semi_consistent_read(void)
/*=======================================*/
{
row_prebuilt_t* prebuilt = (row_prebuilt_t*) innobase_prebuilt;
return(prebuilt->row_read_type == ROW_READ_DID_SEMI_CONSISTENT);
}
/* See handler.h and row0mysql.h for docs on this function. */
void
ha_innobase::try_semi_consistent_read(bool yes)
/*===========================================*/
{
row_prebuilt_t* prebuilt = (row_prebuilt_t*) innobase_prebuilt;
if (yes && srv_locks_unsafe_for_binlog) {
prebuilt->row_read_type = ROW_READ_TRY_SEMI_CONSISTENT;
} else {
prebuilt->row_read_type = ROW_READ_WITH_LOCKS;
}
}
......
......@@ -119,6 +119,8 @@ class ha_innobase: public handler
int write_row(byte * buf);
int update_row(const byte * old_data, byte * new_data);
int delete_row(const byte * buf);
bool was_semi_consistent_read();
void try_semi_consistent_read(bool yes);
void unlock_row();
int index_init(uint index, bool sorted);
......
......@@ -612,6 +612,31 @@ struct row_prebuilt_struct {
that was decided in ha_innodb.cc,
::store_lock(), ::external_lock(),
etc. */
ulint row_read_type; /* ROW_READ_WITH_LOCKS if row locks
should be the obtained for records
under an UPDATE or DELETE cursor.
If innodb_locks_unsafe_for_binlog
is TRUE, this can be set to
ROW_READ_TRY_SEMI_CONSISTENT, so that
if the row under an UPDATE or DELETE
cursor was locked by another
transaction, InnoDB will resort
to reading the last committed value
('semi-consistent read'). Then,
this field will be set to
ROW_READ_DID_SEMI_CONSISTENT to
indicate that. If the row does not
match the WHERE condition, MySQL will
invoke handler::unlock_row() to
clear the flag back to
ROW_READ_TRY_SEMI_CONSISTENT and
to simply skip the row. If
the row matches, the next call to
row_search_for_mysql() will lock
the row.
This eliminates lock waits in some
cases; note that this breaks
serializability. */
ulint mysql_prefix_len;/* byte offset of the end of
the last requested column */
ulint mysql_row_len; /* length in bytes of a row in the
......@@ -657,6 +682,10 @@ struct row_prebuilt_struct {
#define ROW_RETRIEVE_PRIMARY_KEY 1
#define ROW_RETRIEVE_ALL_COLS 2
/* Values for row_read_type */
#define ROW_READ_WITH_LOCKS 0
#define ROW_READ_TRY_SEMI_CONSISTENT 1
#define ROW_READ_DID_SEMI_CONSISTENT 2
#ifndef UNIV_NONINL
#include "row0mysql.ic"
......
......@@ -92,6 +92,32 @@ row_vers_build_for_consistent_read(
record does not exist in the view, that is,
it was freshly inserted afterwards */
/*********************************************************************
Constructs the last committed version of a clustered index record,
which should be seen by a semi-consistent read. */
ulint
row_vers_build_for_semi_consistent_read(
/*====================================*/
/* out: DB_SUCCESS or DB_MISSING_HISTORY */
rec_t* rec, /* in: record in a clustered index; the
caller must have a latch on the page; this
latch locks the top of the stack of versions
of this records */
mtr_t* mtr, /* in: mtr holding the latch on rec */
dict_index_t* index, /* in: the clustered index */
ulint** offsets,/* in/out: offsets returned by
rec_get_offsets(rec, index) */
mem_heap_t** offset_heap,/* in/out: memory heap from which
the offsets are allocated */
mem_heap_t* in_heap,/* in: memory heap from which the memory for
old_vers is allocated; memory for possible
intermediate versions is allocated and freed
locally within the function */
rec_t** old_vers);/* out, own: rec, old version, or NULL if the
record does not exist in the view, that is,
it was freshly inserted afterwards */
#ifndef UNIV_NONINL
#include "row0vers.ic"
......
......@@ -626,6 +626,8 @@ row_create_prebuilt(
prebuilt->select_lock_type = LOCK_NONE;
prebuilt->stored_select_lock_type = 99999999;
prebuilt->row_read_type = ROW_READ_WITH_LOCKS;
prebuilt->sel_graph = NULL;
prebuilt->search_tuple = dtuple_create(heap,
......
......@@ -535,6 +535,41 @@ row_sel_build_prev_vers(
return(err);
}
/*************************************************************************
Builds the last committed version of a clustered index record for a
semi-consistent read. */
static
ulint
row_sel_build_committed_vers_for_mysql(
/*===================================*/
/* out: DB_SUCCESS or error code */
dict_index_t* clust_index, /* in: clustered index */
row_prebuilt_t* prebuilt, /* in: prebuilt struct */
rec_t* rec, /* in: record in a clustered index */
ulint** offsets, /* in/out: offsets returned by
rec_get_offsets(rec, clust_index) */
mem_heap_t** offset_heap, /* in/out: memory heap from which
the offsets are allocated */
rec_t** old_vers, /* out: old version, or NULL if the
record does not exist in the view:
i.e., it was freshly inserted
afterwards */
mtr_t* mtr) /* in: mtr */
{
ulint err;
if (prebuilt->old_vers_heap) {
mem_heap_empty(prebuilt->old_vers_heap);
} else {
prebuilt->old_vers_heap = mem_heap_create(200);
}
err = row_vers_build_for_semi_consistent_read(rec, mtr, clust_index,
offsets, offset_heap,
prebuilt->old_vers_heap, old_vers);
return(err);
}
/*************************************************************************
Tests the conditions which determine when the index segment we are searching
through has been exhausted. */
......@@ -3066,7 +3101,6 @@ row_search_for_mysql(
rec_t* rec;
rec_t* result_rec;
rec_t* clust_rec;
rec_t* old_vers;
ulint err = DB_SUCCESS;
ibool unique_search = FALSE;
ibool unique_search_from_clust_index = FALSE;
......@@ -3077,6 +3111,11 @@ row_search_for_mysql(
locking SELECT, and the isolation
level is <= TRX_ISO_READ_COMMITTED,
then this is set to FALSE */
ibool did_semi_consistent_read = FALSE;
/* if the returned record was locked
and we did a semi-consistent read
(fetch the newest committed version),
then this is set to TRUE */
#ifdef UNIV_SEARCH_DEBUG
ulint cnt = 0;
#endif /* UNIV_SEARCH_DEBUG */
......@@ -3163,7 +3202,7 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
trx->search_latch_timeout = BTR_SEA_TIMEOUT;
}
/* Reset the new record lock info if we srv_locks_unsafe_for_binlog
/* Reset the new record lock info if srv_locks_unsafe_for_binlog
is set. Then we are able to remove the record locks set here on an
individual row. */
......@@ -3431,9 +3470,28 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
clust_index = dict_table_get_first_index(index->table);
if (UNIV_LIKELY(direction != 0)) {
if (!sel_restore_position_for_mysql(&same_user_rec,
BTR_SEARCH_LEAF,
pcur, moves_up, &mtr)) {
ibool need_to_process = sel_restore_position_for_mysql(
&same_user_rec, BTR_SEARCH_LEAF,
pcur, moves_up, &mtr);
if (UNIV_UNLIKELY(need_to_process)) {
if (UNIV_UNLIKELY(prebuilt->row_read_type
== ROW_READ_DID_SEMI_CONSISTENT)) {
/* We did a semi-consistent read,
but the record was removed in
the meantime. */
prebuilt->row_read_type
= ROW_READ_TRY_SEMI_CONSISTENT;
}
} else if (UNIV_LIKELY(prebuilt->row_read_type
!= ROW_READ_DID_SEMI_CONSISTENT)) {
/* The cursor was positioned on the record
that we returned previously. If we need
to repeat a semi-consistent read as a
pessimistic locking read, the record
cannot be skipped. */
goto next_rec;
}
......@@ -3751,10 +3809,67 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
prebuilt->select_lock_type,
lock_type, thr);
switch (err) {
rec_t* old_vers;
case DB_SUCCESS:
break;
case DB_LOCK_WAIT:
if (UNIV_LIKELY(prebuilt->row_read_type
!= ROW_READ_TRY_SEMI_CONSISTENT)
|| index != clust_index) {
goto lock_wait_or_error;
}
/* The following call returns 'offsets'
associated with 'old_vers' */
err = row_sel_build_committed_vers_for_mysql(
clust_index, prebuilt, rec,
&offsets, &heap,
&old_vers, &mtr);
if (err != DB_SUCCESS) {
goto lock_wait_or_error;
}
mutex_enter(&kernel_mutex);
if (trx->was_chosen_as_deadlock_victim) {
mutex_exit(&kernel_mutex);
goto lock_wait_or_error;
}
if (UNIV_LIKELY(trx->wait_lock != NULL)) {
lock_cancel_waiting_and_release(
trx->wait_lock);
trx_reset_new_rec_lock_info(trx);
} else {
mutex_exit(&kernel_mutex);
/* The lock was granted while we were
searching for the last committed version.
Do a normal locking read. */
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
err = DB_SUCCESS;
break;
}
mutex_exit(&kernel_mutex);
if (old_vers == NULL) {
/* The row was not yet committed */
goto next_rec;
}
did_semi_consistent_read = TRUE;
rec = old_vers;
break;
default:
goto lock_wait_or_error;
}
} else {
/* This is a non-locking consistent read: if necessary, fetch
a previous version of the record */
......@@ -3775,6 +3890,7 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
&& !lock_clust_rec_cons_read_sees(rec, index,
offsets, trx->read_view)) {
rec_t* old_vers;
/* The following call returns 'offsets'
associated with 'old_vers' */
err = row_sel_build_prev_vers_for_mysql(
......@@ -3821,14 +3937,13 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
/* The record is delete-marked: we can skip it */
if (srv_locks_unsafe_for_binlog
&& prebuilt->select_lock_type != LOCK_NONE) {
&& prebuilt->select_lock_type != LOCK_NONE
&& !did_semi_consistent_read) {
/* No need to keep a lock on a delete-marked record
if we do not want to use next-key locking. */
row_unlock_for_mysql(prebuilt, TRUE);
trx_reset_new_rec_lock_info(trx);
}
goto next_rec;
......@@ -3882,8 +3997,6 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
locking. */
row_unlock_for_mysql(prebuilt, TRUE);
trx_reset_new_rec_lock_info(trx);
}
goto next_rec;
......@@ -3990,6 +4103,19 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
goto normal_return;
next_rec:
/* Reset the old and new "did semi-consistent read" flags. */
if (UNIV_UNLIKELY(prebuilt->row_read_type
== ROW_READ_DID_SEMI_CONSISTENT)) {
prebuilt->row_read_type = ROW_READ_TRY_SEMI_CONSISTENT;
}
did_semi_consistent_read = FALSE;
if (UNIV_UNLIKELY(srv_locks_unsafe_for_binlog)
&& prebuilt->select_lock_type != LOCK_NONE) {
trx_reset_new_rec_lock_info(trx);
}
/*-------------------------------------------------------------*/
/* PHASE 5: Move the cursor to the next index record */
......@@ -4042,6 +4168,13 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
goto rec_loop;
lock_wait_or_error:
/* Reset the old and new "did semi-consistent read" flags. */
if (UNIV_UNLIKELY(prebuilt->row_read_type
== ROW_READ_DID_SEMI_CONSISTENT)) {
prebuilt->row_read_type = ROW_READ_TRY_SEMI_CONSISTENT;
}
did_semi_consistent_read = FALSE;
/*-------------------------------------------------------------*/
btr_pcur_store_position(pcur, &mtr);
......@@ -4126,6 +4259,20 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
/* Set or reset the "did semi-consistent read" flag on return.
The flag did_semi_consistent_read is set if and only if
the record being returned was fetched with a semi-consistent read. */
ut_ad(prebuilt->row_read_type != ROW_READ_WITH_LOCKS
|| !did_semi_consistent_read);
if (UNIV_UNLIKELY(prebuilt->row_read_type != ROW_READ_WITH_LOCKS)) {
if (UNIV_UNLIKELY(did_semi_consistent_read)) {
prebuilt->row_read_type = ROW_READ_DID_SEMI_CONSISTENT;
} else {
prebuilt->row_read_type = ROW_READ_TRY_SEMI_CONSISTENT;
}
}
return(err);
}
......
......@@ -490,3 +490,141 @@ row_vers_build_for_consistent_read(
return(err);
}
/*********************************************************************
Constructs the last committed version of a clustered index record,
which should be seen by a semi-consistent read. */
ulint
row_vers_build_for_semi_consistent_read(
/*====================================*/
/* out: DB_SUCCESS or DB_MISSING_HISTORY */
rec_t* rec, /* in: record in a clustered index; the
caller must have a latch on the page; this
latch locks the top of the stack of versions
of this records */
mtr_t* mtr, /* in: mtr holding the latch on rec */
dict_index_t* index, /* in: the clustered index */
ulint** offsets,/* in/out: offsets returned by
rec_get_offsets(rec, index) */
mem_heap_t** offset_heap,/* in/out: memory heap from which
the offsets are allocated */
mem_heap_t* in_heap,/* in: memory heap from which the memory for
old_vers is allocated; memory for possible
intermediate versions is allocated and freed
locally within the function */
rec_t** old_vers)/* out, own: rec, old version, or NULL if the
record does not exist in the view, that is,
it was freshly inserted afterwards */
{
rec_t* version;
mem_heap_t* heap = NULL;
byte* buf;
ulint err;
dulint rec_trx_id;
ut_ad(index->type & DICT_CLUSTERED);
ut_ad(mtr_memo_contains(mtr, buf_block_align(rec), MTR_MEMO_PAGE_X_FIX)
|| mtr_memo_contains(mtr, buf_block_align(rec),
MTR_MEMO_PAGE_S_FIX));
#ifdef UNIV_SYNC_DEBUG
ut_ad(!rw_lock_own(&(purge_sys->latch), RW_LOCK_SHARED));
#endif /* UNIV_SYNC_DEBUG */
ut_ad(rec_offs_validate(rec, index, *offsets));
rw_lock_s_lock(&(purge_sys->latch));
/* The S-latch on purge_sys prevents the purge view from
changing. Thus, if we have an uncommitted transaction at
this point, then purge cannot remove its undo log even if
the transaction could commit now. */
version = rec;
for (;;) {
trx_t* version_trx;
mem_heap_t* heap2;
rec_t* prev_version;
dulint version_trx_id;
version_trx_id = row_get_rec_trx_id(
version, index, *offsets);
if (rec == version) {
rec_trx_id = version_trx_id;
}
mutex_enter(&kernel_mutex);
version_trx = trx_get_on_id(version_trx_id);
mutex_exit(&kernel_mutex);
if (!version_trx
|| version_trx->conc_state == TRX_NOT_STARTED
|| version_trx->conc_state == TRX_COMMITTED_IN_MEMORY) {
/* We found a version that belongs to a
committed transaction: return it. */
if (rec == version) {
*old_vers = rec;
err = DB_SUCCESS;
break;
}
/* We assume that a rolled-back transaction stays in
TRX_ACTIVE state until all the changes have been
rolled back and the transaction is removed from
the global list of transactions. */
if (!ut_dulint_cmp(rec_trx_id, version_trx_id)) {
/* The transaction was committed while
we searched for earlier versions.
Return the current version as a
semi-consistent read. */
version = rec;
*offsets = rec_get_offsets(version,
index, *offsets,
ULINT_UNDEFINED, offset_heap);
}
buf = mem_heap_alloc(in_heap, rec_offs_size(*offsets));
*old_vers = rec_copy(buf, version, *offsets);
rec_offs_make_valid(*old_vers, index, *offsets);
err = DB_SUCCESS;
break;
}
heap2 = heap;
heap = mem_heap_create(1024);
err = trx_undo_prev_version_build(rec, mtr, version, index,
*offsets, heap, &prev_version);
if (heap2) {
mem_heap_free(heap2); /* free version */
}
if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
break;
}
if (prev_version == NULL) {
/* It was a freshly inserted version */
*old_vers = NULL;
err = DB_SUCCESS;
break;
}
version = prev_version;
*offsets = rec_get_offsets(version, index, *offsets,
ULINT_UNDEFINED, offset_heap);
}/* for (;;) */
if (heap) {
mem_heap_free(heap);
}
rw_lock_s_unlock(&(purge_sys->latch));
return(err);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment