Commit 8a94d55e authored by Marko Mäkelä's avatar Marko Mäkelä

Bug#12612184 Race condition after btr_cur_pessimistic_update()

btr_cur_compress_if_useful(), btr_compress(): Add the parameter ibool
adjust. If adjust=TRUE, adjust the cursor position after compressing
the page.

btr_lift_page_up(): Return a pointer to the father page.

BTR_KEEP_POS_FLAG: A new flag for btr_cur_pessimistic_update().

btr_cur_pessimistic_update(): If *big_rec != NULL and flags &
BTR_KEEP_POS_FLAG, keep the cursor positioned on the updated record.
Also, do not release the index tree x-lock if *big_rec != NULL.

btr_cur_mtr_commit_and_start(): Commits and restarts a
mini-transaction so that it will retain an x-lock on index->lock and
the page of the cursor. This is invoked when
btr_cur_pessimistic_update() returns *big_rec != NULL.

In all callers of btr_cur_pessimistic_update() that do not pass
BTR_KEEP_POS_FLAG, assert that *big_rec == NULL.

btr_cur_compress(): Unused function [in the built-in MySQL 5.1], remove.

page_rec_get_nth(): Return the nth record on the page (an inverse
function of page_rec_get_n_recs_before()). Refactored from
page_get_middle_rec().

page_get_middle_rec(): Invoke page_rec_get_nth().

page_cur_insert_rec_zip_reorg(): Make use of the page directory
shortcuts in page_rec_get_nth() instead of scanning the whole list of
records.

row_ins_clust_index_entry_by_modify(): Pass BTR_KEEP_POS_FLAG to
btr_cur_pessimistic_update().

row_ins_index_entry_low(): If row_ins_clust_index_entry_by_modify()
returns a big_rec, invoke btr_cur_mtr_commit_and_start() in order to
commit and start the mini-transaction without releasing the x-locks on
index->lock and the cursor page, and write the big_rec. Releasing the
page latch in mtr_commit() caused a race condition.

row_upd_clust_rec(): Pass BTR_KEEP_POS_FLAG to
btr_cur_pessimistic_update(). If it returns a big_rec, invoke
btr_cur_mtr_commit_and_start() in order to commit and start the
mini-transaction without releasing the x-locks on index->lock and the
cursor page, and write the big_rec. Releasing the page latch in
mtr_commit() caused a race condition.

sync_thread_add_level(): Add the parameter ibool relock. When TRUE,
bypass the latching order rules.

rw_lock_add_debug_info(): For nested X-lock requests, pass relock=TRUE
to sync_thread_add_level().

rb:678 approved by Jimmy Yang
parent 2a48b142
......@@ -1937,7 +1937,7 @@ btr_node_ptr_delete(
ut_a(err == DB_SUCCESS);
if (!compressed) {
btr_cur_compress_if_useful(&cursor, mtr);
btr_cur_compress_if_useful(&cursor, FALSE, mtr);
}
}
......@@ -1945,9 +1945,10 @@ btr_node_ptr_delete(
If page is the only on its level, this function moves its records to the
father page, thus reducing the tree height. */
static
void
page_t*
btr_lift_page_up(
/*=============*/
/* out: father page */
dict_index_t* index, /* in: index tree */
page_t* page, /* in: page which is the only on its level;
must not be empty: use
......@@ -2023,6 +2024,8 @@ btr_lift_page_up(
ibuf_reset_free_bits(index, father_page);
ut_ad(page_validate(father_page, index));
ut_ad(btr_check_node_ptr(index, father_page, mtr));
return(father_page);
}
/*****************************************************************
......@@ -2039,11 +2042,13 @@ enough free extents so that the compression will always succeed if done! */
void
btr_compress(
/*=========*/
btr_cur_t* cursor, /* in: cursor on the page to merge or lift;
the page must not be empty: in record delete
use btr_discard_page if the page would become
empty */
mtr_t* mtr) /* in: mtr */
btr_cur_t* cursor, /* in/out: cursor on the page to merge
or lift; the page must not be empty:
when deleting records, use btr_discard_page()
if the page would become empty */
ibool adjust, /* in: TRUE if should adjust the
cursor position even if compression occurs */
mtr_t* mtr) /* in/out: mini-transaction */
{
dict_index_t* index;
ulint space;
......@@ -2058,6 +2063,7 @@ btr_compress(
rec_t* node_ptr;
ulint data_size;
ulint n_recs;
ulint nth_rec;
ulint max_ins_size;
ulint max_ins_size_reorg;
ulint comp;
......@@ -2065,6 +2071,7 @@ btr_compress(
page = btr_cur_get_page(cursor);
index = btr_cur_get_index(cursor);
comp = page_is_comp(page);
ut_a((ibool)!!comp == dict_table_is_comp(index->table));
ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
......@@ -2086,6 +2093,10 @@ btr_compress(
father_page = buf_frame_align(node_ptr);
ut_a(comp == page_is_comp(father_page));
if (adjust) {
nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
}
/* Decide the page to which we try to merge and which will inherit
the locks */
......@@ -2110,9 +2121,8 @@ btr_compress(
} else {
/* The page is the only one on the level, lift the records
to the father */
btr_lift_page_up(index, page, mtr);
return;
merge_page = btr_lift_page_up(index, page, mtr);
goto func_exit;
}
n_recs = page_get_n_recs(page);
......@@ -2188,6 +2198,10 @@ btr_compress(
index, mtr);
lock_update_merge_left(merge_page, orig_pred, page);
if (adjust) {
nth_rec += page_rec_get_n_recs_before(orig_pred);
}
} else {
orig_succ = page_rec_get_next(
page_get_infimum_rec(merge_page));
......@@ -2208,6 +2222,12 @@ btr_compress(
btr_page_free(index, page, mtr);
ut_ad(btr_check_node_ptr(index, merge_page, mtr));
func_exit:
if (adjust) {
btr_cur_position(index, page_rec_get_nth(merge_page, nth_rec),
cursor);
}
}
/*****************************************************************
......
......@@ -1791,7 +1791,9 @@ btr_cur_pessimistic_update(
/* out: DB_SUCCESS or error code */
ulint flags, /* in: undo logging, locking, and rollback
flags */
btr_cur_t* cursor, /* in: cursor on the record to update */
btr_cur_t* cursor, /* in/out: cursor on the record to update;
cursor may become invalid if *big_rec == NULL
|| !(flags & BTR_KEEP_POS_FLAG) */
big_rec_t** big_rec,/* out: big rec vector whose fields have to
be stored externally by the caller, or NULL */
upd_t* update, /* in: update vector; this is allowed also
......@@ -1926,6 +1928,10 @@ btr_cur_pessimistic_update(
err = DB_TOO_BIG_RECORD;
goto return_after_reservations;
}
ut_ad(index->type & DICT_CLUSTERED);
ut_ad(btr_page_get_level(page, mtr) == 0);
ut_ad(flags & BTR_KEEP_POS_FLAG);
}
page_cursor = btr_cur_get_page_cur(cursor);
......@@ -1952,6 +1958,8 @@ btr_cur_pessimistic_update(
ut_a(rec || optim_err != DB_UNDERFLOW);
if (rec) {
page_cursor->rec = rec;
lock_rec_restore_from_page_infimum(rec, page);
rec_set_field_extern_bits(rec, index,
ext_vect, n_ext_vect, mtr);
......@@ -1965,12 +1973,30 @@ btr_cur_pessimistic_update(
btr_cur_unmark_extern_fields(rec, mtr, offsets);
}
btr_cur_compress_if_useful(cursor, mtr);
btr_cur_compress_if_useful(
cursor,
big_rec_vec != NULL && (flags & BTR_KEEP_POS_FLAG),
mtr);
err = DB_SUCCESS;
goto return_after_reservations;
}
if (big_rec_vec) {
ut_ad(index->type & DICT_CLUSTERED);
ut_ad(btr_page_get_level(page, mtr) == 0);
ut_ad(flags & BTR_KEEP_POS_FLAG);
/* btr_page_split_and_insert() in
btr_cur_pessimistic_insert() invokes
mtr_memo_release(mtr, index->lock, MTR_MEMO_X_LOCK).
We must keep the index->lock when we created a
big_rec, so that row_upd_clust_rec() can store the
big_rec in the same mini-transaction. */
mtr_x_lock(dict_index_get_lock(index), mtr);
}
if (page_cur_is_before_first(page_cursor)) {
/* The record to be updated was positioned as the first user
record on its page */
......@@ -1991,6 +2017,7 @@ btr_cur_pessimistic_update(
ut_a(rec);
ut_a(err == DB_SUCCESS);
ut_a(dummy_big_rec == NULL);
page_cursor->rec = rec;
rec_set_field_extern_bits(rec, index, ext_vect, n_ext_vect, mtr);
offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
......@@ -2025,6 +2052,43 @@ btr_cur_pessimistic_update(
return(err);
}
/*****************************************************************
Commits and restarts a mini-transaction so that it will retain an
x-lock on index->lock and the cursor page. */
void
btr_cur_mtr_commit_and_start(
/*=========================*/
btr_cur_t* cursor, /* in: cursor */
mtr_t* mtr) /* in/out: mini-transaction */
{
buf_block_t* block;
block = buf_block_align(btr_cur_get_rec(cursor));
ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(cursor->index),
MTR_MEMO_X_LOCK));
ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
/* Keep the locks across the mtr_commit(mtr). */
rw_lock_x_lock(dict_index_get_lock(cursor->index));
rw_lock_x_lock(&block->lock);
mutex_enter(&block->mutex);
#ifdef UNIV_SYNC_DEBUG
buf_block_buf_fix_inc_debug(block, __FILE__, __LINE__);
#else
buf_block_buf_fix_inc(block);
#endif
mutex_exit(&block->mutex);
/* Write out the redo log. */
mtr_commit(mtr);
mtr_start(mtr);
/* Reassociate the locks with the mini-transaction.
They will be released on mtr_commit(mtr). */
mtr_memo_push(mtr, dict_index_get_lock(cursor->index),
MTR_MEMO_X_LOCK);
mtr_memo_push(mtr, block, MTR_MEMO_PAGE_X_FIX);
}
/*==================== B-TREE DELETE MARK AND UNMARK ===============*/
/********************************************************************
......@@ -2392,30 +2456,6 @@ btr_cur_del_unmark_for_ibuf(
/*==================== B-TREE RECORD REMOVE =========================*/
/*****************************************************************
Tries to compress a page of the tree on the leaf level. It is assumed
that mtr holds an x-latch on the tree and on the cursor page. To avoid
deadlocks, mtr must also own x-latches to brothers of page, if those
brothers exist. NOTE: it is assumed that the caller has reserved enough
free extents so that the compression will always succeed if done! */
void
btr_cur_compress(
/*=============*/
btr_cur_t* cursor, /* in: cursor on the page to compress;
cursor does not stay valid */
mtr_t* mtr) /* in: mtr */
{
ut_ad(mtr_memo_contains(mtr,
dict_index_get_lock(btr_cur_get_index(cursor)),
MTR_MEMO_X_LOCK));
ut_ad(mtr_memo_contains(mtr, buf_block_align(btr_cur_get_rec(cursor)),
MTR_MEMO_PAGE_X_FIX));
ut_ad(btr_page_get_level(btr_cur_get_page(cursor), mtr) == 0);
btr_compress(cursor, mtr);
}
/*****************************************************************
Tries to compress a page of the tree if it seems useful. It is assumed
that mtr holds an x-latch on the tree and on the cursor page. To avoid
......@@ -2427,10 +2467,12 @@ ibool
btr_cur_compress_if_useful(
/*=======================*/
/* out: TRUE if compression occurred */
btr_cur_t* cursor, /* in: cursor on the page to compress;
cursor does not stay valid if compression
occurs */
mtr_t* mtr) /* in: mtr */
btr_cur_t* cursor, /* in/out: cursor on the page to compress;
cursor does not stay valid if !adjust and
compression occurs */
ibool adjust, /* in: TRUE if should adjust the
cursor position even if compression occurs */
mtr_t* mtr) /* in/out: mini-transaction */
{
ut_ad(mtr_memo_contains(mtr,
dict_index_get_lock(btr_cur_get_index(cursor)),
......@@ -2440,7 +2482,7 @@ btr_cur_compress_if_useful(
if (btr_cur_compress_recommendation(cursor, mtr)) {
btr_compress(cursor, mtr);
btr_compress(cursor, adjust, mtr);
return(TRUE);
}
......@@ -2653,7 +2695,7 @@ btr_cur_pessimistic_delete(
mem_heap_free(heap);
if (ret == FALSE) {
ret = btr_cur_compress_if_useful(cursor, mtr);
ret = btr_cur_compress_if_useful(cursor, FALSE, mtr);
}
if (n_extents > 0) {
......
......@@ -312,11 +312,13 @@ enough free extents so that the compression will always succeed if done! */
void
btr_compress(
/*=========*/
btr_cur_t* cursor, /* in: cursor on the page to merge or lift;
the page must not be empty: in record delete
use btr_discard_page if the page would become
empty */
mtr_t* mtr); /* in: mtr */
btr_cur_t* cursor, /* in/out: cursor on the page to merge
or lift; the page must not be empty:
when deleting records, use btr_discard_page()
if the page would become empty */
ibool adjust, /* in: TRUE if should adjust the
cursor position even if compression occurs */
mtr_t* mtr); /* in/out: mini-transaction */
/*****************************************************************
Discards a page from a B-tree. This is used to remove the last record from
a B-tree page: the whole page must be removed at the same time. This cannot
......
......@@ -23,6 +23,9 @@ Created 10/16/1994 Heikki Tuuri
#define BTR_NO_LOCKING_FLAG 2 /* do no record lock checking */
#define BTR_KEEP_SYS_FLAG 4 /* sys fields will be found from the
update vector or inserted entry */
#define BTR_KEEP_POS_FLAG 8 /* btr_cur_pessimistic_update()
must keep cursor position when
moving columns to big_rec */
#define BTR_CUR_ADAPT
#define BTR_CUR_HASH_ADAPT
......@@ -237,7 +240,9 @@ btr_cur_pessimistic_update(
/* out: DB_SUCCESS or error code */
ulint flags, /* in: undo logging, locking, and rollback
flags */
btr_cur_t* cursor, /* in: cursor on the record to update */
btr_cur_t* cursor, /* in/out: cursor on the record to update;
cursor may become invalid if *big_rec == NULL
|| !(flags & BTR_KEEP_POS_FLAG) */
big_rec_t** big_rec,/* out: big rec vector whose fields have to
be stored externally by the caller, or NULL */
upd_t* update, /* in: update vector; this is allowed also
......@@ -247,6 +252,15 @@ btr_cur_pessimistic_update(
updates */
que_thr_t* thr, /* in: query thread */
mtr_t* mtr); /* in: mtr */
/*****************************************************************
Commits and restarts a mini-transaction so that it will retain an
x-lock on index->lock and the cursor page. */
void
btr_cur_mtr_commit_and_start(
/*=========================*/
btr_cur_t* cursor, /* in: cursor */
mtr_t* mtr); /* in/out: mini-transaction */
/***************************************************************
Marks a clustered index record deleted. Writes an undo log record to
undo log on this delete marking. Writes in the trx id field the id
......@@ -286,19 +300,6 @@ btr_cur_del_unmark_for_ibuf(
rec_t* rec, /* in: record to delete unmark */
mtr_t* mtr); /* in: mtr */
/*****************************************************************
Tries to compress a page of the tree on the leaf level. It is assumed
that mtr holds an x-latch on the tree and on the cursor page. To avoid
deadlocks, mtr must also own x-latches to brothers of page, if those
brothers exist. NOTE: it is assumed that the caller has reserved enough
free extents so that the compression will always succeed if done! */
void
btr_cur_compress(
/*=============*/
btr_cur_t* cursor, /* in: cursor on the page to compress;
cursor does not stay valid */
mtr_t* mtr); /* in: mtr */
/*****************************************************************
Tries to compress a page of the tree if it seems useful. It is assumed
that mtr holds an x-latch on the tree and on the cursor page. To avoid
deadlocks, mtr must also own x-latches to brothers of page, if those
......@@ -309,10 +310,12 @@ ibool
btr_cur_compress_if_useful(
/*=======================*/
/* out: TRUE if compression occurred */
btr_cur_t* cursor, /* in: cursor on the page to compress;
cursor does not stay valid if compression
occurs */
mtr_t* mtr); /* in: mtr */
btr_cur_t* cursor, /* in/out: cursor on the page to compress;
cursor does not stay valid if !adjust and
compression occurs */
ibool adjust, /* in: TRUE if should adjust the
cursor position even if compression occurs */
mtr_t* mtr); /* in/out: mini-transaction */
/***********************************************************
Removes the record on which the tree cursor is positioned. It is assumed
that the mtr has an x-latch on the page where the cursor is positioned,
......
......@@ -682,6 +682,25 @@ buf_page_address_fold(
/* out: the folded value */
ulint space, /* in: space id */
ulint offset);/* in: offset of the page within space */
#ifdef UNIV_SYNC_DEBUG
/***********************************************************************
Increments the bufferfix count. */
UNIV_INLINE
void
buf_block_buf_fix_inc_debug(
/*========================*/
buf_block_t* block, /* in: block to bufferfix */
const char* file __attribute__ ((unused)), /* in: file name */
ulint line __attribute__ ((unused))); /* in: line */
#else /* UNIV_SYNC_DEBUG */
/***********************************************************************
Increments the bufferfix count. */
UNIV_INLINE
void
buf_block_buf_fix_inc(
/*==================*/
buf_block_t* block); /* in: block to bufferfix */
#endif /* UNIV_SYNC_DEBUG */
/**********************************************************************
Returns the control block of a file page, NULL if not found. */
UNIV_INLINE
......
......@@ -660,6 +660,6 @@ buf_page_dbg_add_level(
ulint level __attribute__((unused))) /* in: latching order
level */
{
sync_thread_add_level(&(buf_block_align(frame)->lock), level);
sync_thread_add_level(&(buf_block_align(frame)->lock), level, FALSE);
}
#endif /* UNIV_SYNC_DEBUG */
......@@ -234,10 +234,21 @@ page_get_supremum_rec(
/*==================*/
/* out: the last record in record list */
page_t* page); /* in: page which must have record(s) */
/****************************************************************
Returns the middle record of record list. If there are an even number
of records in the list, returns the first record of upper half-list. */
/************************************************************//**
Returns the nth record of the record list.
This is the inverse function of page_rec_get_n_recs_before(). */
rec_t*
page_rec_get_nth(
/*=============*/
/* out: nth record */
page_t* page, /* in: page */
ulint nth); /* in: nth record */
/*****************************************************************
Returns the middle record of the records on the page. If there is an
even number of records in the list, returns the first record of the
upper half-list. */
UNIV_INLINE
rec_t*
page_get_middle_rec(
/*================*/
......@@ -280,7 +291,8 @@ page_get_n_recs(
page_t* page); /* in: index page */
/*******************************************************************
Returns the number of records before the given record in chain.
The number includes infimum and supremum records. */
The number includes infimum and supremum records.
This is the inverse function of page_rec_get_nth(). */
ulint
page_rec_get_n_recs_before(
......
......@@ -340,6 +340,22 @@ page_rec_is_infimum(
return(page_rec_is_infimum_low(page_offset(rec)));
}
/*****************************************************************
Returns the middle record of the records on the page. If there is an
even number of records in the list, returns the first record of the
upper half-list. */
UNIV_INLINE
rec_t*
page_get_middle_rec(
/*================*/
/* out: middle record */
page_t* page) /* in: page */
{
ulint middle = (page_get_n_recs(page) + 2) / 2;
return(page_rec_get_nth(page, middle));
}
/*****************************************************************
Compares a data tuple to a physical record. Differs from the function
cmp_dtuple_rec_with_match in the way that the record must reside on an
......
......@@ -198,8 +198,9 @@ void
sync_thread_add_level(
/*==================*/
void* latch, /* in: pointer to a mutex or an rw-lock */
ulint level); /* in: level in the latching order; if
ulint level, /* in: level in the latching order; if
SYNC_LEVEL_VARYING, nothing is done */
ibool relock);/* in: TRUE if re-entering an x-lock */
/**********************************************************************
Removes a latch from the thread level array if it is found there. */
......
......@@ -1194,49 +1194,42 @@ page_dir_balance_slot(
}
/****************************************************************
Returns the middle record of the record list. If there are an even number
of records in the list, returns the first record of the upper half-list. */
Returns the nth record of the record list. */
rec_t*
page_get_middle_rec(
/*================*/
/* out: middle record */
page_t* page) /* in: page */
page_rec_get_nth(
/*=============*/
/* out: nth record */
page_t* page, /* in: page */
ulint nth) /* in: nth record */
{
page_dir_slot_t* slot;
ulint middle;
ulint i;
ulint n_owned;
ulint count;
rec_t* rec;
/* This many records we must leave behind */
middle = (page_get_n_recs(page) + 2) / 2;
count = 0;
ut_ad(nth < UNIV_PAGE_SIZE / (REC_N_NEW_EXTRA_BYTES + 1));
for (i = 0;; i++) {
slot = page_dir_get_nth_slot(page, i);
n_owned = page_dir_slot_get_n_owned(slot);
if (count + n_owned > middle) {
if (n_owned > nth) {
break;
} else {
count += n_owned;
nth -= n_owned;
}
}
ut_ad(i > 0);
slot = page_dir_get_nth_slot(page, i - 1);
rec = page_dir_slot_get_rec(slot);
rec = page_rec_get_next(rec);
/* There are now count records behind rec */
for (i = 0; i < middle - count; i++) {
do {
rec = page_rec_get_next(rec);
}
ut_ad(rec);
} while (nth--);
return(rec);
}
......
......@@ -259,6 +259,7 @@ row_ins_sec_index_entry_by_modify(
err = btr_cur_pessimistic_update(BTR_KEEP_SYS_FLAG, cursor,
&dummy_big_rec, update,
0, thr, mtr);
ut_a(!dummy_big_rec);
}
func_exit:
mem_heap_free(heap);
......@@ -329,8 +330,9 @@ row_ins_clust_index_entry_by_modify(
goto func_exit;
}
err = btr_cur_pessimistic_update(0, cursor, big_rec, update,
0, thr, mtr);
err = btr_cur_pessimistic_update(
BTR_KEEP_POS_FLAG, cursor, big_rec, update,
0, thr, mtr);
}
func_exit:
mem_heap_free(heap);
......@@ -2083,6 +2085,41 @@ row_ins_index_entry_low(
err = row_ins_clust_index_entry_by_modify(
mode, &cursor, &big_rec, entry,
ext_vec, n_ext_vec, thr, &mtr);
if (big_rec) {
ut_a(err == DB_SUCCESS);
/* Write out the externally stored
columns while still x-latching
index->lock and block->lock. We have
to mtr_commit(mtr) first, so that the
redo log will be written in the
correct order. Otherwise, we would run
into trouble on crash recovery if mtr
freed B-tree pages on which some of
the big_rec fields will be written. */
btr_cur_mtr_commit_and_start(&cursor, &mtr);
rec = btr_cur_get_rec(&cursor);
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED,
&heap);
err = btr_store_big_rec_extern_fields(
index, rec, offsets, big_rec, &mtr);
/* If writing big_rec fails (for
example, because of DB_OUT_OF_FILE_SPACE),
the record will be corrupted. Even if
we did not update any externally
stored columns, our update could cause
the record to grow so that a
non-updated column was selected for
external storage. This non-update
would not have been written to the
undo log, and thus the record cannot
be rolled back. */
ut_a(err == DB_SUCCESS);
goto stored_big_rec;
}
} else {
err = row_ins_sec_index_entry_by_modify(
mode, &cursor, entry, thr, &mtr);
......@@ -2119,7 +2156,6 @@ row_ins_index_entry_low(
mtr_commit(&mtr);
if (big_rec) {
rec_t* rec;
mtr_start(&mtr);
btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
......@@ -2130,7 +2166,7 @@ row_ins_index_entry_low(
err = btr_store_big_rec_extern_fields(index, rec,
offsets, big_rec, &mtr);
stored_big_rec:
if (modify) {
dtuple_big_rec_free(big_rec);
} else {
......
......@@ -119,6 +119,7 @@ row_undo_mod_clust_low(
| BTR_KEEP_SYS_FLAG,
btr_cur, &dummy_big_rec, node->update,
node->cmpl_info, thr, mtr);
ut_ad(!dummy_big_rec);
}
return(err);
......@@ -471,6 +472,7 @@ row_undo_mod_del_unmark_sec_and_undo_update(
BTR_KEEP_SYS_FLAG | BTR_NO_LOCKING_FLAG,
btr_cur, &dummy_big_rec,
update, 0, thr, &mtr);
ut_ad(!dummy_big_rec);
}
mem_heap_free(heap);
......
......@@ -1580,32 +1580,48 @@ row_upd_clust_rec(
ut_ad(!rec_get_deleted_flag(btr_pcur_get_rec(pcur),
dict_table_is_comp(index->table)));
err = btr_cur_pessimistic_update(BTR_NO_LOCKING_FLAG, btr_cur,
&big_rec, node->update,
node->cmpl_info, thr, mtr);
mtr_commit(mtr);
err = btr_cur_pessimistic_update(
BTR_NO_LOCKING_FLAG | BTR_KEEP_POS_FLAG, btr_cur,
&big_rec, node->update, node->cmpl_info, thr, mtr);
if (err == DB_SUCCESS && big_rec) {
if (big_rec) {
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
rec_t* rec;
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
mtr_start(mtr);
ut_a(err == DB_SUCCESS);
/* Write out the externally stored columns while still
x-latching index->lock and block->lock. We have to
mtr_commit(mtr) first, so that the redo log will be
written in the correct order. Otherwise, we would run
into trouble on crash recovery if mtr freed B-tree
pages on which some of the big_rec fields will be
written. */
btr_cur_mtr_commit_and_start(btr_cur, mtr);
ut_a(btr_pcur_restore_position(BTR_MODIFY_TREE, pcur, mtr));
rec = btr_cur_get_rec(btr_cur);
err = btr_store_big_rec_extern_fields(
index, rec,
rec_get_offsets(rec, index, offsets_,
ULINT_UNDEFINED, &heap),
big_rec, mtr);
big_rec, mtr);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
mtr_commit(mtr);
/* If writing big_rec fails (for example, because of
DB_OUT_OF_FILE_SPACE), the record will be corrupted.
Even if we did not update any externally stored
columns, our update could cause the record to grow so
that a non-updated column was selected for external
storage. This non-update would not have been written
to the undo log, and thus the record cannot be rolled
back. */
ut_a(err == DB_SUCCESS);
}
mtr_commit(mtr);
if (big_rec) {
dtuple_big_rec_free(big_rec);
}
......
......@@ -663,7 +663,9 @@ rw_lock_add_debug_info(
rw_lock_debug_mutex_exit();
if ((pass == 0) && (lock_type != RW_LOCK_WAIT_EX)) {
sync_thread_add_level(lock, lock->level);
sync_thread_add_level(lock, lock->level,
lock_type == RW_LOCK_EX
&& lock->writer_count > 1);
}
}
......
......@@ -641,7 +641,7 @@ mutex_set_debug_info(
ut_ad(mutex);
ut_ad(file_name);
sync_thread_add_level(mutex, mutex->level);
sync_thread_add_level(mutex, mutex->level, FALSE);
mutex->file_name = file_name;
mutex->line = line;
......@@ -1011,8 +1011,9 @@ void
sync_thread_add_level(
/*==================*/
void* latch, /* in: pointer to a mutex or an rw-lock */
ulint level) /* in: level in the latching order; if
ulint level, /* in: level in the latching order; if
SYNC_LEVEL_VARYING, nothing is done */
ibool relock) /* in: TRUE if re-entering an x-lock */
{
sync_level_t* array;
sync_level_t* slot;
......@@ -1060,6 +1061,10 @@ sync_thread_add_level(
array = thread_slot->levels;
if (relock) {
goto levels_ok;
}
/* NOTE that there is a problem with _NODE and _LEAF levels: if the
B-tree height changes, then a leaf can change to an internal node
or the other way around. We do not know at present if this can cause
......@@ -1209,6 +1214,7 @@ sync_thread_add_level(
ut_error;
}
levels_ok:
for (i = 0; i < SYNC_THREAD_N_LEVELS; i++) {
slot = sync_thread_levels_get_nth(array, i);
......
2011-06-16 The InnoDB Team
* btr/btr0btr.c, btr/btr0cur.c, include/btr0btr.h, include/btr0cur.h,
include/btr0cur.ic, include/buf0buf.h, include/buf0buf.ic,
include/page0cur.ic, include/page0page.h, include/page0page.ic,
include/sync0rw.ic, include/sync0sync.h, page/page0cur.c,
page/page0page.c, row/row0ins.c, row/row0upd.c,
sync/sync0rw.c, sync/sync0sync.c:
Fix Bug#12612184 Race condition after btr_cur_pessimistic_update()
2011-06-09 The InnoDB Team
* btr/btr0cur.c, include/rem0rec.h, include/rem0rec.ic,
* row/row0row.c, row/row0vers.c, trx/trx0rec.c:
......
/*****************************************************************************
Copyright (c) 1994, 2010, Innobase Oy. All Rights Reserved.
Copyright (c) 1994, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -2272,7 +2272,7 @@ btr_attach_half_pages(
/*==================*/
dict_index_t* index, /*!< in: the index tree */
buf_block_t* block, /*!< in/out: page to be split */
rec_t* split_rec, /*!< in: first record on upper
const rec_t* split_rec, /*!< in: first record on upper
half page */
buf_block_t* new_block, /*!< in/out: the new half page */
ulint direction, /*!< in: FSP_UP or FSP_DOWN */
......@@ -2964,15 +2964,16 @@ btr_node_ptr_delete(
ut_a(err == DB_SUCCESS);
if (!compressed) {
btr_cur_compress_if_useful(&cursor, mtr);
btr_cur_compress_if_useful(&cursor, FALSE, mtr);
}
}
/*************************************************************//**
If page is the only on its level, this function moves its records to the
father page, thus reducing the tree height. */
father page, thus reducing the tree height.
@return father block */
static
void
buf_block_t*
btr_lift_page_up(
/*=============*/
dict_index_t* index, /*!< in: index tree */
......@@ -3089,6 +3090,8 @@ btr_lift_page_up(
}
ut_ad(page_validate(father_page, index));
ut_ad(btr_check_node_ptr(index, father_block, mtr));
return(father_block);
}
/*************************************************************//**
......@@ -3105,11 +3108,13 @@ UNIV_INTERN
ibool
btr_compress(
/*=========*/
btr_cur_t* cursor, /*!< in: cursor on the page to merge or lift;
the page must not be empty: in record delete
use btr_discard_page if the page would become
empty */
mtr_t* mtr) /*!< in: mtr */
btr_cur_t* cursor, /*!< in/out: cursor on the page to merge
or lift; the page must not be empty:
when deleting records, use btr_discard_page()
if the page would become empty */
ibool adjust, /*!< in: TRUE if should adjust the
cursor position even if compression occurs */
mtr_t* mtr) /*!< in/out: mini-transaction */
{
dict_index_t* index;
ulint space;
......@@ -3127,12 +3132,14 @@ btr_compress(
ulint* offsets;
ulint data_size;
ulint n_recs;
ulint nth_rec;
ulint max_ins_size;
ulint max_ins_size_reorg;
block = btr_cur_get_block(cursor);
page = btr_cur_get_page(cursor);
index = btr_cur_get_index(cursor);
ut_a((ibool) !!page_is_comp(page) == dict_table_is_comp(index->table));
ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
......@@ -3153,6 +3160,10 @@ btr_compress(
offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
&father_cursor);
if (adjust) {
nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
}
/* Decide the page to which we try to merge and which will inherit
the locks */
......@@ -3179,9 +3190,9 @@ btr_compress(
} else {
/* The page is the only one on the level, lift the records
to the father */
btr_lift_page_up(index, block, mtr);
mem_heap_free(heap);
return(TRUE);
merge_block = btr_lift_page_up(index, block, mtr);
goto func_exit;
}
n_recs = page_get_n_recs(page);
......@@ -3263,6 +3274,10 @@ btr_compress(
btr_node_ptr_delete(index, block, mtr);
lock_update_merge_left(merge_block, orig_pred, block);
if (adjust) {
nth_rec += page_rec_get_n_recs_before(orig_pred);
}
} else {
rec_t* orig_succ;
#ifdef UNIV_BTR_DEBUG
......@@ -3327,7 +3342,6 @@ btr_compress(
}
btr_blob_dbg_remove(page, index, "btr_compress");
mem_heap_free(heap);
if (!dict_index_is_clust(index) && page_is_leaf(merge_page)) {
/* Update the free bits of the B-tree page in the
......@@ -3379,6 +3393,16 @@ btr_compress(
btr_page_free(index, block, mtr);
ut_ad(btr_check_node_ptr(index, merge_block, mtr));
func_exit:
mem_heap_free(heap);
if (adjust) {
btr_cur_position(
index,
page_rec_get_nth(merge_block->frame, nth_rec),
merge_block, cursor);
}
return(TRUE);
}
......
......@@ -2088,7 +2088,9 @@ btr_cur_pessimistic_update(
/*=======================*/
ulint flags, /*!< in: undo logging, locking, and rollback
flags */
btr_cur_t* cursor, /*!< in: cursor on the record to update */
btr_cur_t* cursor, /*!< in/out: cursor on the record to update;
cursor may become invalid if *big_rec == NULL
|| !(flags & BTR_KEEP_POS_FLAG) */
mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */
big_rec_t** big_rec,/*!< out: big rec vector whose fields have to
be stored externally by the caller, or NULL */
......@@ -2227,7 +2229,7 @@ btr_cur_pessimistic_update(
record to be inserted: we have to remember which fields were such */
ut_ad(!page_is_comp(page) || !rec_get_node_ptr_flag(rec));
offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, heap);
ut_ad(rec_offs_validate(rec, index, offsets));
n_ext += btr_push_update_extern_fields(new_entry, update, *heap);
if (UNIV_LIKELY_NULL(page_zip)) {
......@@ -2250,6 +2252,10 @@ btr_cur_pessimistic_update(
err = DB_TOO_BIG_RECORD;
goto return_after_reservations;
}
ut_ad(page_is_leaf(page));
ut_ad(dict_index_is_clust(index));
ut_ad(flags & BTR_KEEP_POS_FLAG);
}
/* Store state of explicit locks on rec on the page infimum record,
......@@ -2277,6 +2283,8 @@ btr_cur_pessimistic_update(
rec = btr_cur_insert_if_possible(cursor, new_entry, n_ext, mtr);
if (rec) {
page_cursor->rec = rec;
lock_rec_restore_from_page_infimum(btr_cur_get_block(cursor),
rec, block);
......@@ -2290,7 +2298,10 @@ btr_cur_pessimistic_update(
rec, index, offsets, mtr);
}
btr_cur_compress_if_useful(cursor, mtr);
btr_cur_compress_if_useful(
cursor,
big_rec_vec != NULL && (flags & BTR_KEEP_POS_FLAG),
mtr);
if (page_zip && !dict_index_is_clust(index)
&& page_is_leaf(page)) {
......@@ -2310,6 +2321,21 @@ btr_cur_pessimistic_update(
}
}
if (big_rec_vec) {
ut_ad(page_is_leaf(page));
ut_ad(dict_index_is_clust(index));
ut_ad(flags & BTR_KEEP_POS_FLAG);
/* btr_page_split_and_insert() in
btr_cur_pessimistic_insert() invokes
mtr_memo_release(mtr, index->lock, MTR_MEMO_X_LOCK).
We must keep the index->lock when we created a
big_rec, so that row_upd_clust_rec() can store the
big_rec in the same mini-transaction. */
mtr_x_lock(dict_index_get_lock(index), mtr);
}
/* Was the record to be updated positioned as the first user
record on its page? */
was_first = page_cur_is_before_first(page_cursor);
......@@ -2325,6 +2351,7 @@ btr_cur_pessimistic_update(
ut_a(rec);
ut_a(err == DB_SUCCESS);
ut_a(dummy_big_rec == NULL);
page_cursor->rec = rec;
if (dict_index_is_sec_or_ibuf(index)) {
/* Update PAGE_MAX_TRX_ID in the index page header.
......@@ -2383,6 +2410,39 @@ btr_cur_pessimistic_update(
return(err);
}
/**************************************************************//**
Commits and restarts a mini-transaction so that it will retain an
x-lock on index->lock and the cursor page. */
UNIV_INTERN
void
btr_cur_mtr_commit_and_start(
/*=========================*/
btr_cur_t* cursor, /*!< in: cursor */
mtr_t* mtr) /*!< in/out: mini-transaction */
{
buf_block_t* block;
block = btr_cur_get_block(cursor);
ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(cursor->index),
MTR_MEMO_X_LOCK));
ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
/* Keep the locks across the mtr_commit(mtr). */
rw_lock_x_lock(dict_index_get_lock(cursor->index));
rw_lock_x_lock(&block->lock);
mutex_enter(&block->mutex);
buf_block_buf_fix_inc(block, __FILE__, __LINE__);
mutex_exit(&block->mutex);
/* Write out the redo log. */
mtr_commit(mtr);
mtr_start(mtr);
/* Reassociate the locks with the mini-transaction.
They will be released on mtr_commit(mtr). */
mtr_memo_push(mtr, dict_index_get_lock(cursor->index),
MTR_MEMO_X_LOCK);
mtr_memo_push(mtr, block, MTR_MEMO_PAGE_X_FIX);
}
/*==================== B-TREE DELETE MARK AND UNMARK ===============*/
/****************************************************************//**
......@@ -2762,10 +2822,12 @@ UNIV_INTERN
ibool
btr_cur_compress_if_useful(
/*=======================*/
btr_cur_t* cursor, /*!< in: cursor on the page to compress;
cursor does not stay valid if compression
occurs */
mtr_t* mtr) /*!< in: mtr */
btr_cur_t* cursor, /*!< in/out: cursor on the page to compress;
cursor does not stay valid if !adjust and
compression occurs */
ibool adjust, /*!< in: TRUE if should adjust the
cursor position even if compression occurs */
mtr_t* mtr) /*!< in/out: mini-transaction */
{
ut_ad(mtr_memo_contains(mtr,
dict_index_get_lock(btr_cur_get_index(cursor)),
......@@ -2774,7 +2836,7 @@ btr_cur_compress_if_useful(
MTR_MEMO_PAGE_X_FIX));
return(btr_cur_compress_recommendation(cursor, mtr)
&& btr_compress(cursor, mtr));
&& btr_compress(cursor, adjust, mtr));
}
/*******************************************************//**
......@@ -3016,7 +3078,7 @@ btr_cur_pessimistic_delete(
mem_heap_free(heap);
if (ret == FALSE) {
ret = btr_cur_compress_if_useful(cursor, mtr);
ret = btr_cur_compress_if_useful(cursor, FALSE, mtr);
}
if (n_extents > 0) {
......
/*****************************************************************************
Copyright (c) 1994, 2010, Innobase Oy. All Rights Reserved.
Copyright (c) 1994, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -470,11 +470,14 @@ UNIV_INTERN
ibool
btr_compress(
/*=========*/
btr_cur_t* cursor, /*!< in: cursor on the page to merge or lift;
the page must not be empty: in record delete
use btr_discard_page if the page would become
empty */
mtr_t* mtr); /*!< in: mtr */
btr_cur_t* cursor, /*!< in/out: cursor on the page to merge
or lift; the page must not be empty:
when deleting records, use btr_discard_page()
if the page would become empty */
ibool adjust, /*!< in: TRUE if should adjust the
cursor position even if compression occurs */
mtr_t* mtr) /*!< in/out: mini-transaction */
__attribute__((nonnull));
/*************************************************************//**
Discards a page from a B-tree. This is used to remove the last record from
a B-tree page: the whole page must be removed at the same time. This cannot
......
/*****************************************************************************
Copyright (c) 1994, 2010, Innobase Oy. All Rights Reserved.
Copyright (c) 1994, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -36,6 +36,9 @@ Created 10/16/1994 Heikki Tuuri
#define BTR_NO_LOCKING_FLAG 2 /* do no record lock checking */
#define BTR_KEEP_SYS_FLAG 4 /* sys fields will be found from the
update vector or inserted entry */
#define BTR_KEEP_POS_FLAG 8 /* btr_cur_pessimistic_update()
must keep cursor position when
moving columns to big_rec */
#ifndef UNIV_HOTBACKUP
#include "que0types.h"
......@@ -309,7 +312,9 @@ btr_cur_pessimistic_update(
/*=======================*/
ulint flags, /*!< in: undo logging, locking, and rollback
flags */
btr_cur_t* cursor, /*!< in: cursor on the record to update */
btr_cur_t* cursor, /*!< in/out: cursor on the record to update;
cursor may become invalid if *big_rec == NULL
|| !(flags & BTR_KEEP_POS_FLAG) */
mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */
big_rec_t** big_rec,/*!< out: big rec vector whose fields have to
be stored externally by the caller, or NULL */
......@@ -321,6 +326,16 @@ btr_cur_pessimistic_update(
que_thr_t* thr, /*!< in: query thread */
mtr_t* mtr); /*!< in: mtr; must be committed before
latching any further pages */
/*****************************************************************
Commits and restarts a mini-transaction so that it will retain an
x-lock on index->lock and the cursor page. */
UNIV_INTERN
void
btr_cur_mtr_commit_and_start(
/*=========================*/
btr_cur_t* cursor, /*!< in: cursor */
mtr_t* mtr) /*!< in/out: mini-transaction */
__attribute__((nonnull));
/***********************************************************//**
Marks a clustered index record deleted. Writes an undo log record to
undo log on this delete marking. Writes in the trx id field the id
......@@ -376,10 +391,13 @@ UNIV_INTERN
ibool
btr_cur_compress_if_useful(
/*=======================*/
btr_cur_t* cursor, /*!< in: cursor on the page to compress;
btr_cur_t* cursor, /*!< in/out: cursor on the page to compress;
cursor does not stay valid if compression
occurs */
mtr_t* mtr); /*!< in: mtr */
ibool adjust, /*!< in: TRUE if should adjust the
cursor position even if compression occurs */
mtr_t* mtr) /*!< in/out: mini-transaction */
__attribute__((nonnull));
/*******************************************************//**
Removes the record on which the tree cursor is positioned. It is assumed
that the mtr has an x-latch on the page where the cursor is positioned,
......
/*****************************************************************************
Copyright (c) 1994, 2009, Innobase Oy. All Rights Reserved.
Copyright (c) 1994, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -139,7 +139,7 @@ btr_cur_compress_recommendation(
btr_cur_t* cursor, /*!< in: btr cursor */
mtr_t* mtr) /*!< in: mtr */
{
page_t* page;
const page_t* page;
ut_ad(mtr_memo_contains(mtr, btr_cur_get_block(cursor),
MTR_MEMO_PAGE_X_FIX));
......
/*****************************************************************************
Copyright (c) 1995, 2010, Innobase Oy. All Rights Reserved.
Copyright (c) 1995, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -467,6 +467,31 @@ buf_block_get_modify_clock(
#else /* !UNIV_HOTBACKUP */
# define buf_block_modify_clock_inc(block) ((void) 0)
#endif /* !UNIV_HOTBACKUP */
/*******************************************************************//**
Increments the bufferfix count. */
UNIV_INLINE
void
buf_block_buf_fix_inc_func(
/*=======================*/
#ifdef UNIV_SYNC_DEBUG
const char* file, /*!< in: file name */
ulint line, /*!< in: line */
#endif /* UNIV_SYNC_DEBUG */
buf_block_t* block) /*!< in/out: block to bufferfix */
__attribute__((nonnull));
#ifdef UNIV_SYNC_DEBUG
/** Increments the bufferfix count.
@param b in/out: block to bufferfix
@param f in: file name where requested
@param l in: line number where requested */
# define buf_block_buf_fix_inc(b,f,l) buf_block_buf_fix_inc_func(f,l,b)
#else /* UNIV_SYNC_DEBUG */
/** Increments the bufferfix count.
@param b in/out: block to bufferfix
@param f in: file name where requested
@param l in: line number where requested */
# define buf_block_buf_fix_inc(b,f,l) buf_block_buf_fix_inc_func(b)
#endif /* UNIV_SYNC_DEBUG */
/********************************************************************//**
Calculates a page checksum which is stored to the page when it is written
to a file. Note that we must be careful to calculate the same value
......
/*****************************************************************************
Copyright (c) 1995, 2010, Innobase Oy. All Rights Reserved.
Copyright (c) 1995, 2011, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2008, Google Inc.
Portions of this file contain modifications contributed and copyrighted by
......@@ -871,19 +871,6 @@ buf_block_buf_fix_inc_func(
block->page.buf_fix_count++;
}
#ifdef UNIV_SYNC_DEBUG
/** Increments the bufferfix count.
@param b in/out: block to bufferfix
@param f in: file name where requested
@param l in: line number where requested */
# define buf_block_buf_fix_inc(b,f,l) buf_block_buf_fix_inc_func(f,l,b)
#else /* UNIV_SYNC_DEBUG */
/** Increments the bufferfix count.
@param b in/out: block to bufferfix
@param f in: file name where requested
@param l in: line number where requested */
# define buf_block_buf_fix_inc(b,f,l) buf_block_buf_fix_inc_func(b)
#endif /* UNIV_SYNC_DEBUG */
/*******************************************************************//**
Decrements the bufferfix count. */
......@@ -1071,7 +1058,7 @@ buf_block_dbg_add_level(
where we have acquired latch */
ulint level) /*!< in: latching order level */
{
sync_thread_add_level(&block->lock, level);
sync_thread_add_level(&block->lock, level, FALSE);
}
#endif /* UNIV_SYNC_DEBUG */
#endif /* !UNIV_HOTBACKUP */
/*****************************************************************************
Copyright (c) 1994, 2009, Innobase Oy. All Rights Reserved.
Copyright (c) 1994, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -27,6 +27,8 @@ Created 10/4/1994 Heikki Tuuri
#include "buf0types.h"
#ifdef UNIV_DEBUG
# include "rem0cmp.h"
/*********************************************************//**
Gets pointer to the page frame where the cursor is positioned.
@return page */
......@@ -268,6 +270,7 @@ page_cur_tuple_insert(
index, rec, offsets, mtr);
}
ut_ad(!rec || !cmp_dtuple_rec(tuple, rec, offsets));
mem_heap_free(heap);
return(rec);
}
......
/*****************************************************************************
Copyright (c) 1994, 2009, Innobase Oy. All Rights Reserved.
Copyright (c) 1994, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -284,16 +284,42 @@ page_get_supremum_offset(
const page_t* page); /*!< in: page which must have record(s) */
#define page_get_infimum_rec(page) ((page) + page_get_infimum_offset(page))
#define page_get_supremum_rec(page) ((page) + page_get_supremum_offset(page))
/************************************************************//**
Returns the middle record of record list. If there are an even number
of records in the list, returns the first record of upper half-list.
@return middle record */
Returns the nth record of the record list.
This is the inverse function of page_rec_get_n_recs_before().
@return nth record */
UNIV_INTERN
const rec_t*
page_rec_get_nth_const(
/*===================*/
const page_t* page, /*!< in: page */
ulint nth) /*!< in: nth record */
__attribute__((nonnull, warn_unused_result));
/************************************************************//**
Returns the nth record of the record list.
This is the inverse function of page_rec_get_n_recs_before().
@return nth record */
UNIV_INLINE
rec_t*
page_rec_get_nth(
/*=============*/
page_t* page, /*< in: page */
ulint nth) /*!< in: nth record */
__attribute__((nonnull, warn_unused_result));
#ifndef UNIV_HOTBACKUP
/************************************************************//**
Returns the middle record of the records on the page. If there is an
even number of records in the list, returns the first record of the
upper half-list.
@return middle record */
UNIV_INLINE
rec_t*
page_get_middle_rec(
/*================*/
page_t* page); /*!< in: page */
#ifndef UNIV_HOTBACKUP
page_t* page) /*!< in: page */
__attribute__((nonnull, warn_unused_result));
/*************************************************************//**
Compares a data tuple to a physical record. Differs from the function
cmp_dtuple_rec_with_match in the way that the record must reside on an
......@@ -348,6 +374,7 @@ page_get_n_recs(
/***************************************************************//**
Returns the number of records before the given record in chain.
The number includes infimum and supremum records.
This is the inverse function of page_rec_get_nth().
@return number of records */
UNIV_INTERN
ulint
......
/*****************************************************************************
Copyright (c) 1994, 2009, Innobase Oy. All Rights Reserved.
Copyright (c) 1994, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -420,7 +420,37 @@ page_rec_is_infimum(
return(page_rec_is_infimum_low(page_offset(rec)));
}
/************************************************************//**
Returns the nth record of the record list.
This is the inverse function of page_rec_get_n_recs_before().
@return nth record */
UNIV_INLINE
rec_t*
page_rec_get_nth(
/*=============*/
page_t* page, /*!< in: page */
ulint nth) /*!< in: nth record */
{
return((rec_t*) page_rec_get_nth_const(page, nth));
}
#ifndef UNIV_HOTBACKUP
/************************************************************//**
Returns the middle record of the records on the page. If there is an
even number of records in the list, returns the first record of the
upper half-list.
@return middle record */
UNIV_INLINE
rec_t*
page_get_middle_rec(
/*================*/
page_t* page) /*!< in: page */
{
ulint middle = (page_get_n_recs(page) + PAGE_HEAP_NO_USER_LOW) / 2;
return(page_rec_get_nth(page, middle));
}
/*************************************************************//**
Compares a data tuple to a physical record. Differs from the function
cmp_dtuple_rec_with_match in the way that the record must reside on an
......
/*****************************************************************************
Copyright (c) 1995, 2009, Innobase Oy. All Rights Reserved.
Copyright (c) 1995, 2011, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2008, Google Inc.
Portions of this file contain modifications contributed and copyrighted by
......@@ -603,16 +603,16 @@ rw_lock_x_unlock_direct(
ut_ad((lock->lock_word % X_LOCK_DECR) == 0);
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, 0, RW_LOCK_EX);
#endif
if (lock->lock_word == 0) {
lock->recursive = FALSE;
UNIV_MEM_INVALID(&lock->writer_thread,
sizeof lock->writer_thread);
}
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, 0, RW_LOCK_EX);
#endif
lock->lock_word += X_LOCK_DECR;
ut_ad(!lock->waiters);
......
/*****************************************************************************
Copyright (c) 1995, 2010, Innobase Oy. All Rights Reserved.
Copyright (c) 1995, 2011, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2008, Google Inc.
Portions of this file contain modifications contributed and copyrighted by
......@@ -219,8 +219,10 @@ void
sync_thread_add_level(
/*==================*/
void* latch, /*!< in: pointer to a mutex or an rw-lock */
ulint level); /*!< in: level in the latching order; if
ulint level, /*!< in: level in the latching order; if
SYNC_LEVEL_VARYING, nothing is done */
ibool relock) /*!< in: TRUE if re-entering an x-lock */
__attribute__((nonnull));
/******************************************************************//**
Removes a latch from the thread level array if it is found there.
@return TRUE if found in the array; it is no error if the latch is
......
/*****************************************************************************
Copyright (c) 1994, 2009, Innobase Oy. All Rights Reserved.
Copyright (c) 1994, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -1180,14 +1180,15 @@ page_cur_insert_rec_zip_reorg(
/* Before trying to reorganize the page,
store the number of preceding records on the page. */
pos = page_rec_get_n_recs_before(rec);
ut_ad(pos > 0);
if (page_zip_reorganize(block, index, mtr)) {
/* The page was reorganized: Find rec by seeking to pos,
and update *current_rec. */
rec = page + PAGE_NEW_INFIMUM;
while (--pos) {
rec = page + rec_get_next_offs(rec, TRUE);
if (pos > 1) {
rec = page_rec_get_nth(page, pos - 1);
} else {
rec = page + PAGE_NEW_INFIMUM;
}
*current_rec = rec;
......@@ -1283,6 +1284,12 @@ page_cur_insert_rec_zip(
insert_rec = page_cur_insert_rec_zip_reorg(
current_rec, block, index, insert_rec,
page, page_zip, mtr);
#ifdef UNIV_DEBUG
if (insert_rec) {
rec_offs_make_valid(
insert_rec, index, offsets);
}
#endif /* UNIV_DEBUG */
}
return(insert_rec);
......
/*****************************************************************************
Copyright (c) 1994, 2010, Innobase Oy. All Rights Reserved.
Copyright (c) 1994, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -1487,55 +1487,54 @@ page_dir_balance_slot(
}
}
#ifndef UNIV_HOTBACKUP
/************************************************************//**
Returns the middle record of the record list. If there are an even number
of records in the list, returns the first record of the upper half-list.
@return middle record */
Returns the nth record of the record list.
This is the inverse function of page_rec_get_n_recs_before().
@return nth record */
UNIV_INTERN
rec_t*
page_get_middle_rec(
/*================*/
page_t* page) /*!< in: page */
const rec_t*
page_rec_get_nth_const(
/*===================*/
const page_t* page, /*!< in: page */
ulint nth) /*!< in: nth record */
{
page_dir_slot_t* slot;
ulint middle;
const page_dir_slot_t* slot;
ulint i;
ulint n_owned;
ulint count;
rec_t* rec;
const rec_t* rec;
/* This many records we must leave behind */
middle = (page_get_n_recs(page) + PAGE_HEAP_NO_USER_LOW) / 2;
count = 0;
ut_ad(nth < UNIV_PAGE_SIZE / (REC_N_NEW_EXTRA_BYTES + 1));
for (i = 0;; i++) {
slot = page_dir_get_nth_slot(page, i);
n_owned = page_dir_slot_get_n_owned(slot);
if (count + n_owned > middle) {
if (n_owned > nth) {
break;
} else {
count += n_owned;
nth -= n_owned;
}
}
ut_ad(i > 0);
slot = page_dir_get_nth_slot(page, i - 1);
rec = (rec_t*) page_dir_slot_get_rec(slot);
rec = page_rec_get_next(rec);
/* There are now count records behind rec */
rec = page_dir_slot_get_rec(slot);
for (i = 0; i < middle - count; i++) {
rec = page_rec_get_next(rec);
if (page_is_comp(page)) {
do {
rec = page_rec_get_next_low(rec, TRUE);
ut_ad(rec);
} while (nth--);
} else {
do {
rec = page_rec_get_next_low(rec, FALSE);
ut_ad(rec);
} while (nth--);
}
return(rec);
}
#endif /* !UNIV_HOTBACKUP */
/***************************************************************//**
Returns the number of records before the given record in chain.
......@@ -1597,6 +1596,7 @@ page_rec_get_n_recs_before(
n--;
ut_ad(n >= 0);
ut_ad(n < UNIV_PAGE_SIZE / (REC_N_NEW_EXTRA_BYTES + 1));
return((ulint) n);
}
......
/*****************************************************************************
Copyright (c) 1996, 2010, Innobase Oy. All Rights Reserved.
Copyright (c) 1996, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -345,9 +345,9 @@ row_ins_clust_index_entry_by_modify(
return(DB_LOCK_TABLE_FULL);
}
err = btr_cur_pessimistic_update(0, cursor,
heap, big_rec, update,
0, thr, mtr);
err = btr_cur_pessimistic_update(
BTR_KEEP_POS_FLAG, cursor, heap, big_rec, update,
0, thr, mtr);
}
return(err);
......@@ -1986,6 +1986,7 @@ row_ins_index_entry_low(
ulint modify = 0; /* remove warning */
rec_t* insert_rec;
rec_t* rec;
ulint* offsets;
ulint err;
ulint n_unique;
big_rec_t* big_rec = NULL;
......@@ -2089,6 +2090,42 @@ row_ins_index_entry_low(
err = row_ins_clust_index_entry_by_modify(
mode, &cursor, &heap, &big_rec, entry,
thr, &mtr);
if (big_rec) {
ut_a(err == DB_SUCCESS);
/* Write out the externally stored
columns while still x-latching
index->lock and block->lock. We have
to mtr_commit(mtr) first, so that the
redo log will be written in the
correct order. Otherwise, we would run
into trouble on crash recovery if mtr
freed B-tree pages on which some of
the big_rec fields will be written. */
btr_cur_mtr_commit_and_start(&cursor, &mtr);
rec = btr_cur_get_rec(&cursor);
offsets = rec_get_offsets(
rec, index, NULL,
ULINT_UNDEFINED, &heap);
err = btr_store_big_rec_extern_fields(
index, btr_cur_get_block(&cursor),
rec, offsets, &mtr, FALSE, big_rec);
/* If writing big_rec fails (for
example, because of DB_OUT_OF_FILE_SPACE),
the record will be corrupted. Even if
we did not update any externally
stored columns, our update could cause
the record to grow so that a
non-updated column was selected for
external storage. This non-update
would not have been written to the
undo log, and thus the record cannot
be rolled back. */
ut_a(err == DB_SUCCESS);
goto stored_big_rec;
}
} else {
ut_ad(!n_ext);
err = row_ins_sec_index_entry_by_modify(
......@@ -2117,8 +2154,6 @@ row_ins_index_entry_low(
mtr_commit(&mtr);
if (UNIV_LIKELY_NULL(big_rec)) {
rec_t* rec;
ulint* offsets;
mtr_start(&mtr);
btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
......@@ -2132,6 +2167,7 @@ row_ins_index_entry_low(
index, btr_cur_get_block(&cursor),
rec, offsets, &mtr, FALSE, big_rec);
stored_big_rec:
if (modify) {
dtuple_big_rec_free(big_rec);
} else {
......
/*****************************************************************************
Copyright (c) 1996, 2009, Innobase Oy. All Rights Reserved.
Copyright (c) 1996, 2011, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -1969,28 +1969,43 @@ row_upd_clust_rec(
ut_ad(!rec_get_deleted_flag(btr_pcur_get_rec(pcur),
dict_table_is_comp(index->table)));
err = btr_cur_pessimistic_update(BTR_NO_LOCKING_FLAG, btr_cur,
&heap, &big_rec, node->update,
node->cmpl_info, thr, mtr);
mtr_commit(mtr);
if (err == DB_SUCCESS && big_rec) {
err = btr_cur_pessimistic_update(
BTR_NO_LOCKING_FLAG | BTR_KEEP_POS_FLAG, btr_cur,
&heap, &big_rec, node->update, node->cmpl_info, thr, mtr);
if (big_rec) {
ulint offsets_[REC_OFFS_NORMAL_SIZE];
rec_t* rec;
rec_offs_init(offsets_);
mtr_start(mtr);
ut_a(err == DB_SUCCESS);
/* Write out the externally stored columns while still
x-latching index->lock and block->lock. We have to
mtr_commit(mtr) first, so that the redo log will be
written in the correct order. Otherwise, we would run
into trouble on crash recovery if mtr freed B-tree
pages on which some of the big_rec fields will be
written. */
btr_cur_mtr_commit_and_start(btr_cur, mtr);
ut_a(btr_pcur_restore_position(BTR_MODIFY_TREE, pcur, mtr));
rec = btr_cur_get_rec(btr_cur);
err = btr_store_big_rec_extern_fields(
index, btr_cur_get_block(btr_cur), rec,
rec_get_offsets(rec, index, offsets_,
ULINT_UNDEFINED, &heap),
mtr, TRUE, big_rec);
mtr_commit(mtr);
/* If writing big_rec fails (for example, because of
DB_OUT_OF_FILE_SPACE), the record will be corrupted.
Even if we did not update any externally stored
columns, our update could cause the record to grow so
that a non-updated column was selected for external
storage. This non-update would not have been written
to the undo log, and thus the record cannot be rolled
back. */
ut_a(err == DB_SUCCESS);
}
mtr_commit(mtr);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
......
......@@ -766,7 +766,9 @@ rw_lock_add_debug_info(
rw_lock_debug_mutex_exit();
if ((pass == 0) && (lock_type != RW_LOCK_WAIT_EX)) {
sync_thread_add_level(lock, lock->level);
sync_thread_add_level(lock, lock->level,
lock_type == RW_LOCK_EX
&& lock->lock_word < 0);
}
}
......
/*****************************************************************************
Copyright (c) 1995, 2010, Innobase Oy. All Rights Reserved.
Copyright (c) 1995, 2011, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2008, Google Inc.
Portions of this file contain modifications contributed and copyrighted by
......@@ -656,7 +656,7 @@ mutex_set_debug_info(
ut_ad(mutex);
ut_ad(file_name);
sync_thread_add_level(mutex, mutex->level);
sync_thread_add_level(mutex, mutex->level, FALSE);
mutex->file_name = file_name;
mutex->line = line;
......@@ -1083,8 +1083,9 @@ void
sync_thread_add_level(
/*==================*/
void* latch, /*!< in: pointer to a mutex or an rw-lock */
ulint level) /*!< in: level in the latching order; if
ulint level, /*!< in: level in the latching order; if
SYNC_LEVEL_VARYING, nothing is done */
ibool relock) /*!< in: TRUE if re-entering an x-lock */
{
sync_level_t* array;
sync_level_t* slot;
......@@ -1132,6 +1133,10 @@ sync_thread_add_level(
array = thread_slot->levels;
if (relock) {
goto levels_ok;
}
/* NOTE that there is a problem with _NODE and _LEAF levels: if the
B-tree height changes, then a leaf can change to an internal node
or the other way around. We do not know at present if this can cause
......@@ -1269,6 +1274,7 @@ sync_thread_add_level(
ut_error;
}
levels_ok:
for (i = 0; i < SYNC_THREAD_N_LEVELS; i++) {
slot = sync_thread_levels_get_nth(array, i);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment