Commit 4709a172 authored by heikki@donna.mysql.fi's avatar heikki@donna.mysql.fi

srv0srv.h Merging work for 3.23.37

os0thread.c	Set created thread priority to QUERY_PRIOR if specified
row0mysql.c	Drop table removes locks on the table, error handling changed
row0ins.c	Insert now always sets a shared lock on a duplicate or suspected duplicate record
lock0lock.h	Can remove locks from a table to be dropped
lock0lock.c	Can remove locks from a table to be dropped
srv0srv.c	Try to do log flush every second
srv0start.c	No need for a slash at the end of dir
trx0trx.h	Added an IGNORE option field to trx handle
trx0trx.c	Added an IGNORE option field to trx handle
ha_innobase.h	Added last_query_id field to handle
ha_innobase.cc	Fix drop table hang, roll-forward etc.
parent 54b9d367
sasha@mysql.sashanet.com
monty@donna.mysql.fi
heikki@donna.mysql.fi
......@@ -396,6 +396,14 @@ lock_release_off_kernel(
/*====================*/
trx_t* trx); /* in: transaction */
/*************************************************************************
Resets all locks, both table and record locks, on a table to be dropped.
No lock is allowed to be a wait lock. */
void
lock_reset_all_on_table(
/*====================*/
dict_table_t* table); /* in: table to be dropped */
/*************************************************************************
Calculates the fold value of a page file address: used in inserting or
searching for a lock in the hash table. */
UNIV_INLINE
......
......@@ -48,6 +48,9 @@ extern dulint srv_archive_recovery_limit_lsn;
extern ulint srv_lock_wait_timeout;
extern ibool srv_set_thread_priorities;
extern int srv_query_thread_priority;
/*-------------------------------------------*/
extern ulint srv_n_spin_wait_rounds;
extern ulint srv_spin_wait_delay;
......
......@@ -282,6 +282,10 @@ struct trx_struct{
ulint n_mysql_tables_in_use; /* number of Innobase tables
used in the processing of the current
SQL statement in MySQL */
ibool ignore_duplicates_in_insert;
/* in an insert roll back only insert
of the latest row in case
of a duplicate key error */
UT_LIST_NODE_T(trx_t)
trx_list; /* list of transactions */
/*------------------------------*/
......
......@@ -700,24 +700,24 @@ lock_mode_stronger_or_eq(
ulint mode1, /* in: lock mode */
ulint mode2) /* in: lock mode */
{
ut_ad((mode1 == LOCK_X) || (mode1 == LOCK_S) || (mode1 == LOCK_IX)
|| (mode1 == LOCK_IS));
ut_ad((mode2 == LOCK_X) || (mode2 == LOCK_S) || (mode2 == LOCK_IX)
|| (mode2 == LOCK_IS));
ut_ad(mode1 == LOCK_X || mode1 == LOCK_S || mode1 == LOCK_IX
|| mode1 == LOCK_IS);
ut_ad(mode2 == LOCK_X || mode2 == LOCK_S || mode2 == LOCK_IX
|| mode2 == LOCK_IS);
if (mode1 == LOCK_X) {
return(TRUE);
} else if ((mode1 == LOCK_S)
&& ((mode2 == LOCK_S) || (mode2 == LOCK_IS))) {
} else if (mode1 == LOCK_S
&& (mode2 == LOCK_S || mode2 == LOCK_IS)) {
return(TRUE);
} else if ((mode1 == LOCK_IS) && (mode2 == LOCK_IS)) {
} else if (mode1 == LOCK_IS && mode2 == LOCK_IS) {
return(TRUE);
} else if ((mode1 == LOCK_IX) && ((mode2 == LOCK_IX)
|| (mode2 == LOCK_IS))) {
} else if (mode1 == LOCK_IX && (mode2 == LOCK_IX
|| mode2 == LOCK_IS)) {
return(TRUE);
}
......@@ -734,12 +734,12 @@ lock_mode_compatible(
ulint mode1, /* in: lock mode */
ulint mode2) /* in: lock mode */
{
ut_ad((mode1 == LOCK_X) || (mode1 == LOCK_S) || (mode1 == LOCK_IX)
|| (mode1 == LOCK_IS));
ut_ad((mode2 == LOCK_X) || (mode2 == LOCK_S) || (mode2 == LOCK_IX)
|| (mode2 == LOCK_IS));
ut_ad(mode1 == LOCK_X || mode1 == LOCK_S || mode1 == LOCK_IX
|| mode1 == LOCK_IS);
ut_ad(mode2 == LOCK_X || mode2 == LOCK_S || mode2 == LOCK_IX
|| mode2 == LOCK_IS);
if ((mode1 == LOCK_S) && ((mode2 == LOCK_IS) || (mode2 == LOCK_S))) {
if (mode1 == LOCK_S && (mode2 == LOCK_IS || mode2 == LOCK_S)) {
return(TRUE);
......@@ -747,13 +747,13 @@ lock_mode_compatible(
return(FALSE);
} else if ((mode1 == LOCK_IS) && ((mode2 == LOCK_IS)
|| (mode2 == LOCK_IX)
|| (mode2 == LOCK_S))) {
} else if (mode1 == LOCK_IS && (mode2 == LOCK_IS
|| mode2 == LOCK_IX
|| mode2 == LOCK_S)) {
return(TRUE);
} else if ((mode1 == LOCK_IX) && ((mode2 == LOCK_IS)
|| (mode2 == LOCK_IX))) {
} else if (mode1 == LOCK_IX && (mode2 == LOCK_IS
|| mode2 == LOCK_IX)) {
return(TRUE);
}
......@@ -769,7 +769,7 @@ lock_get_confl_mode(
/* out: conflicting basic lock mode */
ulint mode) /* in: LOCK_S or LOCK_X */
{
ut_ad((mode == LOCK_X) || (mode == LOCK_S));
ut_ad(mode == LOCK_X || mode == LOCK_S);
if (mode == LOCK_S) {
......@@ -792,7 +792,7 @@ lock_has_to_wait(
lock_t* lock2) /* in: another lock; NOTE that it is assumed that this
has a lock bit set on the same record as in lock1 */
{
if ((lock1->trx != lock2->trx)
if (lock1->trx != lock2->trx
&& !lock_mode_compatible(lock_get_mode(lock1),
lock_get_mode(lock2))) {
return(TRUE);
......@@ -1228,8 +1228,8 @@ lock_table_has(
while (lock != NULL) {
if ((lock->trx == trx)
&& (lock_mode_stronger_or_eq(lock_get_mode(lock), mode))) {
if (lock->trx == trx
&& lock_mode_stronger_or_eq(lock_get_mode(lock), mode)) {
/* The same trx already has locked the table in
a mode stronger or equal to the mode given */
......@@ -1267,7 +1267,7 @@ lock_rec_has_expl(
lock = lock_rec_get_first(rec);
while (lock) {
if ((lock->trx == trx)
if (lock->trx == trx
&& lock_mode_stronger_or_eq(lock_get_mode(lock), mode)
&& !lock_get_wait(lock)
&& !(lock_rec_get_gap(lock)
......@@ -1306,7 +1306,7 @@ lock_rec_other_has_expl_req(
lock = lock_rec_get_first(rec);
while (lock) {
if ((lock->trx != trx)
if (lock->trx != trx
&& (gap ||
!(lock_rec_get_gap(lock) || page_rec_is_supremum(rec)))
&& (wait || !lock_get_wait(lock))
......@@ -1344,9 +1344,9 @@ lock_rec_find_similar_on_page(
lock = lock_rec_get_first_on_page(rec);
while (lock != NULL) {
if ((lock->trx == trx)
&& (lock->type_mode == type_mode)
&& (lock_rec_get_n_bits(lock) > heap_no)) {
if (lock->trx == trx
&& lock->type_mode == type_mode
&& lock_rec_get_n_bits(lock) > heap_no) {
return(lock);
}
......@@ -1653,9 +1653,9 @@ lock_rec_lock_fast(
return(FALSE);
}
if ((lock->trx != thr_get_trx(thr))
|| (lock->type_mode != (mode | LOCK_REC))
|| (lock_rec_get_n_bits(lock) <= heap_no)) {
if (lock->trx != thr_get_trx(thr)
|| lock->type_mode != (mode | LOCK_REC)
|| lock_rec_get_n_bits(lock) <= heap_no) {
return(FALSE);
}
......@@ -1836,7 +1836,7 @@ lock_rec_cancel(
{
ut_ad(mutex_own(&kernel_mutex));
/* Reset the bit in lock bitmap */
/* Reset the bit (there can be only one set bit) in the lock bitmap */
lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
/* Reset the wait flag and the back pointer to lock in trx */
......@@ -1885,7 +1885,6 @@ lock_rec_dequeue_from_page(
lock = lock_rec_get_first_on_page_addr(space, page_no);
while (lock != NULL) {
if (lock_get_wait(lock)
&& !lock_rec_has_to_wait_in_queue(lock)) {
......@@ -1897,6 +1896,33 @@ lock_rec_dequeue_from_page(
}
}
/*****************************************************************
Removes a record lock request, waiting or granted, from the queue. */
static
void
lock_rec_discard(
/*=============*/
lock_t* in_lock)/* in: record lock object: all record locks which
are contained in this lock object are removed */
{
ulint space;
ulint page_no;
trx_t* trx;
ut_ad(mutex_own(&kernel_mutex));
ut_ad(lock_get_type(in_lock) == LOCK_REC);
trx = in_lock->trx;
space = in_lock->un_member.rec_lock.space;
page_no = in_lock->un_member.rec_lock.page_no;
HASH_DELETE(lock_t, hash, lock_sys->rec_hash,
lock_rec_fold(space, page_no), in_lock);
UT_LIST_REMOVE(trx_locks, trx->trx_locks, in_lock);
}
/*****************************************************************
Removes record lock objects set on an index page which is discarded. This
function does not move locks, or check for waiting locks, therefore the
......@@ -1911,7 +1937,6 @@ lock_rec_free_all_from_discard_page(
ulint page_no;
lock_t* lock;
lock_t* next_lock;
trx_t* trx;
ut_ad(mutex_own(&kernel_mutex));
......@@ -1926,11 +1951,7 @@ lock_rec_free_all_from_discard_page(
next_lock = lock_rec_get_next_on_page(lock);
HASH_DELETE(lock_t, hash, lock_sys->rec_hash,
lock_rec_fold(space, page_no), lock);
trx = lock->trx;
UT_LIST_REMOVE(trx_locks, trx->trx_locks, lock);
lock_rec_discard(lock);
lock = next_lock;
}
......@@ -1957,7 +1978,6 @@ lock_rec_reset_and_release_wait(
lock = lock_rec_get_first(rec);
while (lock != NULL) {
if (lock_get_wait(lock)) {
lock_rec_cancel(lock);
} else {
......@@ -3088,6 +3108,69 @@ lock_release_off_kernel(
mem_heap_empty(trx->lock_heap);
}
/*************************************************************************
Resets all record and table locks of a transaction on a table to be dropped.
No lock is allowed to be a wait lock. */
static
void
lock_reset_all_on_table_for_trx(
/*============================*/
dict_table_t* table, /* in: table to be dropped */
trx_t* trx) /* in: a transaction */
{
lock_t* lock;
lock_t* prev_lock;
ut_ad(mutex_own(&kernel_mutex));
lock = UT_LIST_GET_LAST(trx->trx_locks);
while (lock != NULL) {
prev_lock = UT_LIST_GET_PREV(trx_locks, lock);
if (lock_get_type(lock) == LOCK_REC
&& lock->index->table == table) {
ut_a(!lock_get_wait(lock));
lock_rec_discard(lock);
} else if (lock_get_type(lock) == LOCK_TABLE
&& lock->un_member.tab_lock.table == table) {
ut_a(!lock_get_wait(lock));
lock_table_remove_low(lock);
}
lock = prev_lock;
}
}
/*************************************************************************
Resets all locks, both table and record locks, on a table to be dropped.
No lock is allowed to be a wait lock. */
void
lock_reset_all_on_table(
/*====================*/
dict_table_t* table) /* in: table to be dropped */
{
lock_t* lock;
mutex_enter(&kernel_mutex);
lock = UT_LIST_GET_FIRST(table->locks);
while (lock) {
ut_a(!lock_get_wait(lock));
lock_reset_all_on_table_for_trx(table, lock->trx);
lock = UT_LIST_GET_FIRST(table->locks);
}
mutex_exit(&kernel_mutex);
}
/*===================== VALIDATION AND DEBUGGING ====================*/
/*************************************************************************
......@@ -3414,7 +3497,7 @@ lock_rec_queue_validate(
return(TRUE);
}
if (index && index->type & DICT_CLUSTERED) {
if (index && (index->type & DICT_CLUSTERED)) {
impl_trx = lock_clust_rec_some_has_impl(rec, index);
......@@ -3525,12 +3608,12 @@ lock_rec_validate_page(
}
ut_a(trx_in_trx_list(lock->trx));
ut_a(((lock->trx)->conc_state == TRX_ACTIVE)
|| ((lock->trx)->conc_state == TRX_COMMITTED_IN_MEMORY));
ut_a(lock->trx->conc_state == TRX_ACTIVE
|| lock->trx->conc_state == TRX_COMMITTED_IN_MEMORY);
for (i = nth_bit; i < lock_rec_get_n_bits(lock); i++) {
if ((i == 1) || lock_rec_get_nth_bit(lock, i)) {
if (i == 1 || lock_rec_get_nth_bit(lock, i)) {
index = lock->index;
rec = page_find_rec_with_heap_no(page, i);
......@@ -3894,9 +3977,9 @@ lock_sec_rec_read_check_and_lock(
lock_mutex_enter_kernel();
ut_ad((mode != LOCK_X)
ut_ad(mode != LOCK_X
|| lock_table_has(thr_get_trx(thr), index->table, LOCK_IX));
ut_ad((mode != LOCK_S)
ut_ad(mode != LOCK_S
|| lock_table_has(thr_get_trx(thr), index->table, LOCK_IS));
/* Some transaction may have an implicit x-lock on the record only
......@@ -3956,9 +4039,9 @@ lock_clust_rec_read_check_and_lock(
lock_mutex_enter_kernel();
ut_ad((mode != LOCK_X)
ut_ad(mode != LOCK_X
|| lock_table_has(thr_get_trx(thr), index->table, LOCK_IX));
ut_ad((mode != LOCK_S)
ut_ad(mode != LOCK_S
|| lock_table_has(thr_get_trx(thr), index->table, LOCK_IS));
if (!page_rec_is_supremum(rec)) {
......
......@@ -16,6 +16,8 @@ Created 9/8/1995 Heikki Tuuri
#include <windows.h>
#endif
#include "srv0srv.h"
/*********************************************************************
Returns the thread identifier of current thread. */
......@@ -70,6 +72,15 @@ os_thread_create(
thread_id);
ut_a(thread);
if (srv_set_thread_priorities) {
/* Set created thread priority the same as a normal query
in MYSQL: we try to prevent starvation of threads by
assigning same priority QUERY_PRIOR to all */
ut_a(SetThreadPriority(thread, srv_query_thread_priority));
}
return(thread);
#else
int ret;
......@@ -82,6 +93,11 @@ os_thread_create(
pthread_attr_destroy(&attr);
if (srv_set_thread_priorities) {
my_pthread_setprio(pthread, srv_query_thread_priority);
}
return(pthread);
#endif
}
......
......@@ -284,15 +284,15 @@ ibool
row_ins_dupl_error_with_rec(
/*========================*/
/* out: TRUE if error */
rec_t* rec, /* in: user record */
rec_t* rec, /* in: user record; NOTE that we assume
that the caller already has a record lock on
the record! */
dtuple_t* entry, /* in: entry to insert */
dict_index_t* index, /* in: index */
trx_t* trx) /* in: inserting transaction */
dict_index_t* index) /* in: index */
{
ulint matched_fields;
ulint matched_bytes;
ulint n_unique;
trx_t* impl_trx;
n_unique = dict_index_get_n_unique(index);
......@@ -311,46 +311,55 @@ row_ins_dupl_error_with_rec(
return(TRUE);
}
/* If we get here, the record has its delete mark set. It is still
a unique key violation if the transaction which set the delete mark
is currently active and is not trx itself. We check if some
transaction has an implicit x-lock on the record. */
return(FALSE);
}
mutex_enter(&kernel_mutex);
/*************************************************************************
Sets a shared lock on a record. Used in locking possible duplicate key
records. */
static
ulint
row_ins_set_shared_rec_lock(
/*========================*/
/* out: DB_SUCCESS or error code */
rec_t* rec, /* in: record */
dict_index_t* index, /* in: index */
que_thr_t* thr) /* in: query thread */
{
ulint err;
if (index->type & DICT_CLUSTERED) {
impl_trx = lock_clust_rec_some_has_impl(rec, index);
err = lock_clust_rec_read_check_and_lock(0, rec, index, LOCK_S,
thr);
} else {
impl_trx = lock_sec_rec_some_has_impl_off_kernel(rec, index);
}
mutex_exit(&kernel_mutex);
if (impl_trx && impl_trx != trx) {
return(TRUE);
err = lock_sec_rec_read_check_and_lock(0, rec, index, LOCK_S,
thr);
}
return(FALSE);
return(err);
}
/*******************************************************************
Scans a unique non-clustered index at a given index entry to determine
whether a uniqueness violation has occurred for the key value of the entry. */
whether a uniqueness violation has occurred for the key value of the entry.
Set shared locks on possible duplicate records. */
static
ulint
row_ins_scan_sec_index_for_duplicate(
/*=================================*/
/* out: DB_SUCCESS or DB_DUPLICATE_KEY */
/* out: DB_SUCCESS, DB_DUPLICATE_KEY, or
DB_LOCK_WAIT */
dict_index_t* index, /* in: non-clustered unique index */
dtuple_t* entry, /* in: index entry */
trx_t* trx) /* in: inserting transaction */
que_thr_t* thr) /* in: query thread */
{
ulint dupl_count = 0;
int cmp;
ulint n_fields_cmp;
rec_t* rec;
btr_pcur_t pcur;
trx_t* trx = thr_get_trx(thr);
ulint err = DB_SUCCESS;
ibool moved;
mtr_t mtr;
mtr_start(&mtr);
......@@ -361,32 +370,45 @@ row_ins_scan_sec_index_for_duplicate(
dtuple_set_n_fields_cmp(entry, dict_index_get_n_unique(index));
btr_pcur_open_on_user_rec(index, entry, PAGE_CUR_GE,
BTR_SEARCH_LEAF, &pcur, &mtr);
btr_pcur_open(index, entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
/* Scan index records and check that there are no duplicates */
/* Scan index records and check if there is a duplicate */
for (;;) {
if (btr_pcur_is_after_last_in_tree(&pcur, &mtr)) {
rec = btr_pcur_get_rec(&pcur);
if (rec == page_get_infimum_rec(buf_frame_align(rec))) {
goto next_rec;
}
/* Try to place a lock on the index record */
err = row_ins_set_shared_rec_lock(rec, index, thr);
if (err != DB_SUCCESS) {
break;
}
rec = btr_pcur_get_rec(&pcur);
if (rec == page_get_supremum_rec(buf_frame_align(rec))) {
goto next_rec;
}
cmp = cmp_dtuple_rec(entry, rec);
if (cmp == 0) {
if (row_ins_dupl_error_with_rec(rec, entry, index,
trx)) {
dupl_count++;
if (dupl_count > 1) {
/* printf(
"Duplicate key in index %s\n",
if (row_ins_dupl_error_with_rec(rec, entry, index)) {
/* printf("Duplicate key in index %s\n",
index->name);
dtuple_print(entry); */
}
err = DB_DUPLICATE_KEY;
trx->error_info = index;
break;
}
}
......@@ -395,8 +417,12 @@ row_ins_scan_sec_index_for_duplicate(
}
ut_a(cmp == 0);
next_rec:
moved = btr_pcur_move_to_next(&pcur, &mtr);
btr_pcur_move_to_next_user_rec(&pcur, &mtr);
if (!moved) {
break;
}
}
mtr_commit(&mtr);
......@@ -404,44 +430,35 @@ row_ins_scan_sec_index_for_duplicate(
/* Restore old value */
dtuple_set_n_fields_cmp(entry, n_fields_cmp);
ut_a(dupl_count >= 1);
if (dupl_count > 1) {
trx->error_info = index;
return(DB_DUPLICATE_KEY);
}
return(DB_SUCCESS);
return(err);
}
/*******************************************************************
Tries to check if a unique key violation error would occur at an index entry
insert. */
Checks if a unique key violation error would occur at an index entry
insert. Sets shared locks on possible duplicate records. Works only
for a clustered index! */
static
ulint
row_ins_duplicate_error(
/*====================*/
/* out: DB_SUCCESS if no error
DB_DUPLICATE_KEY if error,
DB_STRONG_FAIL if this is a non-clustered
index record and we cannot determine yet
if there will be an error: in this last
case we must call
row_ins_scan_sec_index_for_duplicate
AFTER the insertion of the record! */
row_ins_duplicate_error_in_clust(
/*=============================*/
/* out: DB_SUCCESS if no error,
DB_DUPLICATE_KEY if error, DB_LOCK_WAIT if we
have to wait for a lock on a possible
duplicate record */
btr_cur_t* cursor, /* in: B-tree cursor */
dtuple_t* entry, /* in: entry to insert */
trx_t* trx, /* in: inserting transaction */
mtr_t* mtr, /* in: mtr */
rec_t** dupl_rec)/* out: record with which duplicate error */
que_thr_t* thr, /* in: query thread */
mtr_t* mtr) /* in: mtr */
{
ulint err;
rec_t* rec;
page_t* page;
ulint n_unique;
trx_t* trx = thr_get_trx(thr);
UT_NOT_USED(mtr);
ut_a(cursor->index->type & DICT_CLUSTERED);
ut_ad(cursor->index->type & DICT_UNIQUE);
/* NOTE: For unique non-clustered indexes there may be any number
......@@ -466,9 +483,20 @@ row_ins_duplicate_error(
if (rec != page_get_infimum_rec(page)) {
/* We set a lock on the possible duplicate: this
is needed in logical logging of MySQL to make
sure that in roll-forward we get the same duplicate
errors as in original execution */
err = row_ins_set_shared_rec_lock(rec, cursor->index,
thr);
if (err != DB_SUCCESS) {
return(err);
}
if (row_ins_dupl_error_with_rec(rec, entry,
cursor->index, trx)) {
*dupl_rec = rec;
cursor->index)) {
trx->error_info = cursor->index;
return(DB_DUPLICATE_KEY);
......@@ -483,9 +511,15 @@ row_ins_duplicate_error(
if (rec != page_get_supremum_rec(page)) {
err = row_ins_set_shared_rec_lock(rec, cursor->index,
thr);
if (err != DB_SUCCESS) {
return(err);
}
if (row_ins_dupl_error_with_rec(rec, entry,
cursor->index, trx)) {
*dupl_rec = rec;
cursor->index)) {
trx->error_info = cursor->index;
return(DB_DUPLICATE_KEY);
......@@ -496,15 +530,7 @@ row_ins_duplicate_error(
/* This should never happen */
}
if (cursor->index->type & DICT_CLUSTERED) {
return(DB_SUCCESS);
}
/* It was a non-clustered index: we must scan the index after the
insertion to be sure if there will be duplicate key error */
return(DB_STRONG_FAIL);
}
/*******************************************************************
......@@ -574,18 +600,15 @@ row_ins_index_entry_low(
que_thr_t* thr) /* in: query thread */
{
btr_cur_t cursor;
ulint dupl = DB_SUCCESS;
ulint modify;
rec_t* dummy_rec;
rec_t* rec;
rec_t* dupl_rec; /* Note that this may be undefined
for a non-clustered index even if
there is a duplicate key */
ulint err;
ulint n_unique;
mtr_t mtr;
log_free_check();
mtr_start(&mtr);
cursor.thr = thr;
......@@ -611,19 +634,37 @@ row_ins_index_entry_low(
if (index->type & DICT_UNIQUE && (cursor.up_match >= n_unique
|| cursor.low_match >= n_unique)) {
dupl = row_ins_duplicate_error(&cursor, entry,
thr_get_trx(thr), &mtr, &dupl_rec);
if (dupl == DB_DUPLICATE_KEY) {
if (index->type & DICT_CLUSTERED) {
/* Note that the following may return also
DB_LOCK_WAIT */
/* printf("Duplicate key in index %s lm %lu\n",
cursor->index->name, cursor->low_match);
rec_print(rec);
dtuple_print(entry); */
err = row_ins_duplicate_error_in_clust(&cursor,
entry, thr, &mtr);
if (err != DB_SUCCESS) {
err = dupl;
goto function_exit;
}
} else {
mtr_commit(&mtr);
err = row_ins_scan_sec_index_for_duplicate(index,
entry, thr);
mtr_start(&mtr);
if (err != DB_SUCCESS) {
goto function_exit;
}
/* We did not find a duplicate and we have now
locked with s-locks the necessary records to
prevent any insertion of a duplicate by another
transaction. Let us now reposition the cursor and
continue the insertion. */
btr_cur_search_to_nth_level(index, 0, entry,
PAGE_CUR_LE, mode | BTR_INSERT,
&cursor, 0, &mtr);
}
}
modify = row_ins_must_modify(&cursor);
......@@ -659,19 +700,6 @@ row_ins_index_entry_low(
function_exit:
mtr_commit(&mtr);
if (err == DB_SUCCESS && dupl == DB_STRONG_FAIL) {
/* We were not able to determine before the insertion
whether there will be a duplicate key error: do the check
now */
err = row_ins_scan_sec_index_for_duplicate(index, entry,
thr_get_trx(thr));
}
ut_ad(err != DB_DUPLICATE_KEY || index->type & DICT_CLUSTERED
|| DB_DUPLICATE_KEY ==
row_ins_scan_sec_index_for_duplicate(index, entry,
thr_get_trx(thr)));
return(err);
}
......
......@@ -170,12 +170,7 @@ row_mysql_handle_errors(
trx_general_rollback_for_mysql(trx, TRUE, savept);
}
} else if (err == DB_TOO_BIG_RECORD) {
if (savept) {
/* Roll back the latest, possibly incomplete
insertion or update */
trx_general_rollback_for_mysql(trx, TRUE, savept);
}
/* MySQL will roll back the latest SQL statement */
} else if (err == DB_LOCK_WAIT) {
timeout_expired = srv_suspend_mysql_thread(thr);
......@@ -193,19 +188,19 @@ row_mysql_handle_errors(
return(TRUE);
} else if (err == DB_DEADLOCK) {
/* Roll back the whole transaction */
trx_general_rollback_for_mysql(trx, FALSE, NULL);
/* MySQL will roll back the latest SQL statement */
} else if (err == DB_OUT_OF_FILE_SPACE) {
/* MySQL will roll back the latest SQL statement */
/* Roll back the whole transaction */
trx_general_rollback_for_mysql(trx, FALSE, NULL);
} else if (err == DB_MUST_GET_MORE_FILE_SPACE) {
ut_a(0); /* TODO: print something to MySQL error log */
fprintf(stderr,
"InnoDB: The database cannot continue operation because of\n"
"InnoDB: lack of space. You must add a new data file to\n"
"InnoDB: my.cnf and restart the database.\n");
exit(1);
} else {
ut_a(0);
}
......@@ -919,7 +914,7 @@ row_drop_table_for_mysql(
char* str2;
ulint len;
char buf[10000];
retry:
ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
ut_a(name != NULL);
......@@ -997,20 +992,14 @@ row_drop_table_for_mysql(
goto funct_exit;
}
/* Check if there are any locks on the table: if yes, it cannot
be dropped: we have to wait for the locks to be released */
if (lock_is_on_table(table)) {
err = DB_TABLE_IS_BEING_USED;
/* Remove any locks there are on the table or its records */
goto funct_exit;
}
lock_reset_all_on_table(table);
/* TODO: check that MySQL prevents users from accessing the table
after this function row_drop_table_for_mysql has been called:
otherwise anyone with an open handle to the table could, for example,
come to read the table! */
come to read the table! Monty said that it prevents. */
trx->dict_operation = TRUE;
trx->table_id = table->id;
......@@ -1041,12 +1030,6 @@ row_drop_table_for_mysql(
que_graph_free(graph);
if (err == DB_TABLE_IS_BEING_USED) {
os_thread_sleep(200000);
goto retry;
}
return((int) err);
}
......
......@@ -87,6 +87,9 @@ ibool srv_archive_recovery = 0;
dulint srv_archive_recovery_limit_lsn;
ulint srv_lock_wait_timeout = 1024 * 1024 * 1024;
ibool srv_set_thread_priorities = TRUE;
int srv_query_thread_priority = 0;
/*-------------------------------------------*/
ulint srv_n_spin_wait_rounds = 20;
ulint srv_spin_wait_delay = 5;
......@@ -1837,6 +1840,8 @@ srv_master_thread(
ulint n_pages_flushed;
ulint n_bytes_archived;
ulint i;
time_t last_flush_time;
time_t current_time;
UT_NOT_USED(arg);
......@@ -1861,6 +1866,12 @@ srv_master_thread(
for (i = 0; i < 10; i++) {
os_thread_sleep(1000000);
/* We flush the log once in a second even if no commit
is issued or the we have specified in my.cnf no flush
at transaction commit */
log_flush_up_to(ut_dulint_max, LOG_WAIT_ONE_GROUP);
if (srv_activity_count == old_activity_count) {
if (srv_print_thread_releases) {
......@@ -1877,10 +1888,19 @@ srv_master_thread(
n_pages_purged = 1;
last_flush_time = time(NULL);
while (n_pages_purged) {
n_pages_purged = trx_purge();
/* TODO: replace this by a check if we are running
out of file space! */
n_pages_purged = trx_purge();
current_time = time(NULL);
if (difftime(current_time, last_flush_time) > 1) {
log_flush_up_to(ut_dulint_max, LOG_WAIT_ONE_GROUP);
last_flush_time = current_time;
}
}
background_loop:
......
......@@ -113,6 +113,65 @@ io_handler_thread(
#endif
}
#ifdef __WIN__
#define SRV_PATH_SEPARATOR "\\"
#else
#define SRV_PATH_SEPARATOR "/"
#endif
/*************************************************************************
Normalizes a directory path for Windows: converts slashes to backslashes. */
static
void
srv_normalize_path_for_win(
/*=======================*/
char* str) /* in/out: null-terminated character string */
{
#ifdef __WIN__
ulint i;
for (i = 0; i < ut_strlen(str); i++) {
if (str[i] == '/') {
str[i] = '\\';
}
}
#endif
}
/*************************************************************************
Adds a slash or a backslash to the end of a string if it is missing. */
static
char*
srv_add_path_separator_if_needed(
/*=============================*/
/* out, own: string which has the separator */
char* str) /* in: null-terminated character string */
{
char* out_str;
if (ut_strlen(str) == 0) {
out_str = ut_malloc(2);
sprintf(out_str, "%s", SRV_PATH_SEPARATOR);
return(out_str);
}
if (str[ut_strlen(str) - 1] == SRV_PATH_SEPARATOR[0]) {
out_str = ut_malloc(ut_strlen(str) + 1);
sprintf(out_str, "%s", str);
return(out_str);
}
out_str = ut_malloc(ut_strlen(str) + 2);
sprintf(out_str, "%s%s", str, SRV_PATH_SEPARATOR);
return(out_str);
}
/*************************************************************************
Creates or opens the log files. */
static
......@@ -137,6 +196,10 @@ open_or_create_log_file(
*log_file_created = FALSE;
srv_normalize_path_for_win(srv_log_group_home_dirs[k]);
srv_log_group_home_dirs[k] = srv_add_path_separator_if_needed(
srv_log_group_home_dirs[k]);
sprintf(name, "%s%s%lu", srv_log_group_home_dirs[k], "ib_logfile", i);
files[i] = os_file_create(name, OS_FILE_CREATE, OS_FILE_NORMAL, &ret);
......@@ -258,7 +321,11 @@ open_or_create_data_files(
*create_new_db = FALSE;
srv_normalize_path_for_win(srv_data_home);
srv_data_home = srv_add_path_separator_if_needed(srv_data_home);
for (i = 0; i < srv_n_data_files; i++) {
srv_normalize_path_for_win(srv_data_file_names[i]);
sprintf(name, "%s%s", srv_data_home, srv_data_file_names[i]);
......@@ -525,6 +592,14 @@ innobase_start_or_create_for_mysql(void)
os_thread_create(io_handler_thread, n + i, thread_ids + i);
}
if (0 != ut_strcmp(srv_log_group_home_dirs[0], srv_arch_dir)) {
fprintf(stderr,
"InnoDB: Error: you must set the log group home dir in my.cnf the\n"
"InnoDB: same as log arch dir.\n");
return(DB_ERROR);
}
err = open_or_create_data_files(&create_new_db,
&min_flushed_lsn, &min_arch_log_no,
&max_flushed_lsn, &max_arch_log_no,
......@@ -536,6 +611,9 @@ innobase_start_or_create_for_mysql(void)
return((int) err);
}
srv_normalize_path_for_win(srv_arch_dir);
srv_arch_dir = srv_add_path_separator_if_needed(srv_arch_dir);
for (k = 0; k < srv_n_log_groups; k++) {
for (i = 0; i < srv_n_log_files; i++) {
......
......@@ -64,6 +64,8 @@ trx_create(
trx->n_mysql_tables_in_use = 0;
trx->ignore_duplicates_in_insert = FALSE;
mutex_create(&(trx->undo_mutex));
mutex_set_level(&(trx->undo_mutex), SYNC_TRX_UNDO);
......
......@@ -35,20 +35,21 @@ Innobase */
#define MAX_ULONG_BIT ((ulong) 1 << (sizeof(ulong)*8-1))
/* The following must be declared here so that we can handle SAFE_MUTEX */
pthread_mutex_t innobase_mutex;
#include "ha_innobase.h"
/* We must declare this here because we undef SAFE_MUTEX below */
pthread_mutex_t innobase_mutex;
/* Store MySQL definition of 'byte': in Linux it is char while Innobase
uses unsigned char */
typedef byte mysql_byte;
#define INSIDE_HA_INNOBASE_CC
#ifdef SAFE_MUTEX
#undef pthread_mutex_t
#endif
#define INSIDE_HA_INNOBASE_CC
/* Include necessary Innobase headers */
extern "C" {
#include "../innobase/include/univ.i"
......@@ -97,6 +98,8 @@ ulong innobase_active_counter = 0;
char* innobase_home = NULL;
char innodb_dummy_stmt_trx_handle = 'D';
static HASH innobase_open_tables;
static mysql_byte* innobase_get_key(INNOBASE_SHARE *share,uint *length,
......@@ -198,12 +201,13 @@ check_trx_exists(
thd->transaction.all.innobase_tid = trx;
/* The execution of a single SQL statement is denoted by
a 'transaction' handle which is a NULL pointer: Innobase
a 'transaction' handle which is a dummy pointer: Innobase
remembers internally where the latest SQL statement
started, and if error handling requires rolling back the
latest statement, Innobase does a rollback to a savepoint. */
thd->transaction.stmt.innobase_tid = NULL;
thd->transaction.stmt.innobase_tid =
(void*)&innodb_dummy_stmt_trx_handle;
}
return(trx);
......@@ -272,8 +276,12 @@ innobase_parse_data_file_paths_and_sizes(void)
size = strtoul(str, &endp, 10);
str = endp;
if (*str != 'M') {
if ((*str != 'M') && (*str != 'G')) {
size = size / (1024 * 1024);
} else if (*str == 'G') {
size = size * 1024;
str++;
} else {
str++;
}
......@@ -318,8 +326,12 @@ innobase_parse_data_file_paths_and_sizes(void)
size = strtoul(str, &endp, 10);
str = endp;
if (*str != 'M') {
if ((*str != 'M') && (*str != 'G')) {
size = size / (1024 * 1024);
} else if (*str == 'G') {
size = size * 1024;
str++;
} else {
str++;
}
......@@ -419,6 +431,13 @@ innobase_init(void)
DBUG_ENTER("innobase_init");
if (specialflag & SPECIAL_NO_PRIOR) {
srv_set_thread_priorities = FALSE;
} else {
srv_set_thread_priorities = TRUE;
srv_query_thread_priority = QUERY_PRIOR;
}
/* Use current_dir if no paths are set */
current_dir[0]=FN_CURLIB;
current_dir[1]=FN_LIBCHAR;
......@@ -557,8 +576,9 @@ innobase_commit(
trx = check_trx_exists(thd);
if (trx_handle) {
if (trx_handle != (void*)&innodb_dummy_stmt_trx_handle) {
trx_commit_for_mysql(trx);
trx_mark_sql_stat_end(trx);
} else {
trx_mark_sql_stat_end(trx);
}
......@@ -585,9 +605,7 @@ innobase_rollback(
/* out: 0 or error number */
THD* thd, /* in: handle to the MySQL thread of the user
whose transaction should be rolled back */
void* trx_handle)/* in: Innobase trx handle or NULL: NULL means
that the current SQL statement should be rolled
back */
void* trx_handle)/* in: Innobase trx handle or a dummy stmt handle */
{
int error = 0;
trx_t* trx;
......@@ -597,10 +615,11 @@ innobase_rollback(
trx = check_trx_exists(thd);
if (trx_handle) {
if (trx_handle != (void*)&innodb_dummy_stmt_trx_handle) {
error = trx_rollback_for_mysql(trx);
} else {
error = trx_rollback_last_sql_stat_for_mysql(trx);
trx_mark_sql_stat_end(trx);
}
DBUG_RETURN(convert_error_code_to_mysql(error));
......@@ -618,7 +637,8 @@ innobase_close_connection(
whose transaction should be rolled back */
{
if (NULL != thd->transaction.all.innobase_tid) {
trx_rollback_for_mysql((trx_t*)
(thd->transaction.all.innobase_tid));
trx_free_for_mysql((trx_t*)
(thd->transaction.all.innobase_tid));
}
......@@ -726,6 +746,8 @@ ha_innobase::open(
user_thd = NULL;
last_query_id = (ulong)-1;
if (!(share=get_share(name)))
DBUG_RETURN(1);
......@@ -1229,6 +1251,11 @@ ha_innobase::write_row(
update_timestamp(record + table->time_stamp - 1);
}
if (last_query_id != user_thd->query_id) {
prebuilt->sql_stat_start = TRUE;
last_query_id = user_thd->query_id;
}
if (table->next_number_field && record == table->record[0]) {
/* Set the 'in_update_remember_pos' flag to FALSE to
make sure all columns are fetched in the select done by
......@@ -1255,8 +1282,17 @@ ha_innobase::write_row(
build_template(prebuilt, NULL, table, ROW_MYSQL_WHOLE_ROW);
}
if (user_thd->lex.sql_command == SQLCOM_INSERT
&& user_thd->lex.duplicates == DUP_IGNORE) {
prebuilt->trx->ignore_duplicates_in_insert = TRUE;
} else {
prebuilt->trx->ignore_duplicates_in_insert = FALSE;
}
error = row_insert_for_mysql((byte*) record, prebuilt);
prebuilt->trx->ignore_duplicates_in_insert = FALSE;
error = convert_error_code_to_mysql(error);
/* Tell Innobase server that there might be work for
......@@ -1441,6 +1477,11 @@ ha_innobase::update_row(
DBUG_ENTER("ha_innobase::update_row");
if (last_query_id != user_thd->query_id) {
prebuilt->sql_stat_start = TRUE;
last_query_id = user_thd->query_id;
}
if (prebuilt->upd_node) {
uvect = prebuilt->upd_node->update;
} else {
......@@ -1485,6 +1526,11 @@ ha_innobase::delete_row(
DBUG_ENTER("ha_innobase::delete_row");
if (last_query_id != user_thd->query_id) {
prebuilt->sql_stat_start = TRUE;
last_query_id = user_thd->query_id;
}
if (!prebuilt->upd_node) {
row_get_prebuilt_update_vector(prebuilt);
}
......@@ -1590,6 +1636,11 @@ ha_innobase::index_read(
DBUG_ENTER("index_read");
statistic_increment(ha_read_key_count, &LOCK_status);
if (last_query_id != user_thd->query_id) {
prebuilt->sql_stat_start = TRUE;
last_query_id = user_thd->query_id;
}
index = prebuilt->index;
/* Note that if the select is used for an update, we always
......@@ -2622,7 +2673,6 @@ ha_innobase::update_table_comment(
return(str);
}
/****************************************************************************
Handling the shared INNOBASE_SHARE structure that is needed to provide table
locking.
......@@ -2697,12 +2747,18 @@ ha_innobase::store_lock(
{
row_prebuilt_t* prebuilt = (row_prebuilt_t*) innobase_prebuilt;
if (lock_type == TL_READ_WITH_SHARED_LOCKS) {
/* This is a SELECT ... IN SHARE MODE */
if (lock_type == TL_READ_WITH_SHARED_LOCKS ||
lock_type == TL_READ_NO_INSERT) {
/* This is a SELECT ... IN SHARE MODE, or
we are doing a complex SQL statement like
INSERT INTO ... SELECT ... and the logical logging
requires the use of a locking read */
prebuilt->select_lock_type = LOCK_S;
} else {
/* We set possible LOCK_X value in external_lock, not yet
here even if this would be SELECT ... FOR UPDATE */
prebuilt->select_lock_type = LOCK_NONE;
}
......
......@@ -41,6 +41,8 @@ class ha_innobase: public handler
THD* user_thd; /* the thread handle of the user
currently using the handle; this is
set in external_lock function */
ulong last_query_id; /* the latest query id where the
handle was used */
THR_LOCK_DATA lock;
INNOBASE_SHARE *share;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment