Commit 2c252ba9 authored by Mikael Ronstrom's avatar Mikael Ronstrom

Google SMP patch

parent 772cdf00
......@@ -333,7 +333,7 @@ btr_cur_search_to_nth_level(
#ifdef UNIV_SEARCH_PERF_STAT
info->n_searches++;
#endif
if (btr_search_latch.writer == RW_LOCK_NOT_LOCKED
if (rw_lock_get_writer(&btr_search_latch) == RW_LOCK_NOT_LOCKED
&& latch_mode <= BTR_MODIFY_LEAF && info->last_hash_succ
&& !estimate
#ifdef PAGE_CUR_LE_OR_EXTENDS
......
......@@ -748,8 +748,8 @@ btr_search_guess_on_hash(
rw_lock_s_lock(&btr_search_latch);
}
ut_ad(btr_search_latch.writer != RW_LOCK_EX);
ut_ad(btr_search_latch.reader_count > 0);
ut_ad(rw_lock_get_writer(&btr_search_latch) != RW_LOCK_EX);
ut_ad(rw_lock_get_reader_count(&btr_search_latch) > 0);
rec = ha_search_and_get_data(btr_search_sys->hash_index, fold);
......
......@@ -1277,8 +1277,8 @@ loop:
if (mode == BUF_GET_NOWAIT) {
if (rw_latch == RW_S_LATCH) {
success = rw_lock_s_lock_func_nowait(&(block->lock),
file, line);
success = rw_lock_s_lock_nowait(&(block->lock),
file, line);
fix_type = MTR_MEMO_PAGE_S_FIX;
} else {
ut_ad(rw_latch == RW_X_LATCH);
......@@ -1403,8 +1403,8 @@ buf_page_optimistic_get_func(
ut_ad(!ibuf_inside() || ibuf_page(block->space, block->offset));
if (rw_latch == RW_S_LATCH) {
success = rw_lock_s_lock_func_nowait(&(block->lock),
file, line);
success = rw_lock_s_lock_nowait(&(block->lock),
file, line);
fix_type = MTR_MEMO_PAGE_S_FIX;
} else {
success = rw_lock_x_lock_func_nowait(&(block->lock),
......@@ -1534,8 +1534,8 @@ buf_page_get_known_nowait(
ut_ad(!ibuf_inside() || (mode == BUF_KEEP_OLD));
if (rw_latch == RW_S_LATCH) {
success = rw_lock_s_lock_func_nowait(&(block->lock),
file, line);
success = rw_lock_s_lock_nowait(&(block->lock),
file, line);
fix_type = MTR_MEMO_PAGE_S_FIX;
} else {
success = rw_lock_x_lock_func_nowait(&(block->lock),
......
......@@ -374,6 +374,10 @@ static SHOW_VAR innodb_status_variables[]= {
(char*) &export_vars.innodb_dblwr_pages_written, SHOW_LONG},
{"dblwr_writes",
(char*) &export_vars.innodb_dblwr_writes, SHOW_LONG},
{"have_atomic_builtins",
(char*) &export_vars.innodb_have_atomic_builtins, SHOW_BOOL},
{"heap_enabled",
(char*) &export_vars.innodb_heap_enabled, SHOW_BOOL},
{"log_waits",
(char*) &export_vars.innodb_log_waits, SHOW_LONG},
{"log_write_requests",
......@@ -6878,6 +6882,7 @@ innodb_mutex_show_status(
{
char buf1[IO_SIZE], buf2[IO_SIZE];
mutex_t* mutex;
rw_lock_t* lock;
#ifdef UNIV_DEBUG
ulint rw_lock_count= 0;
ulint rw_lock_count_spin_loop= 0;
......@@ -6948,6 +6953,31 @@ innodb_mutex_show_status(
mutex_exit_noninline(&mutex_list_mutex);
mutex_enter_noninline(&rw_lock_list_mutex);
lock = UT_LIST_GET_FIRST(rw_lock_list);
while (lock != NULL)
{
if (lock->count_os_wait)
{
buf1len= my_snprintf(buf1, sizeof(buf1), "%s:%lu",
lock->cfile_name, (ulong) lock->cline);
buf2len= my_snprintf(buf2, sizeof(buf2),
"os_waits=%lu", lock->count_os_wait);
if (stat_print(thd, innobase_hton_name,
hton_name_len, buf1, buf1len,
buf2, buf2len)) {
mutex_exit_noninline(&rw_lock_list_mutex);
DBUG_RETURN(1);
}
}
lock = UT_LIST_GET_NEXT(list, lock);
}
mutex_exit_noninline(&rw_lock_list_mutex);
#ifdef UNIV_DEBUG
buf2len= my_snprintf(buf2, sizeof(buf2),
"count=%lu, spin_waits=%lu, spin_rounds=%lu, "
......@@ -6980,6 +7010,7 @@ bool innobase_show_status(handlerton *hton, THD* thd,
return FALSE;
}
}
rw_lock_t* lock;
/****************************************************************************
......
......@@ -513,7 +513,7 @@ buf_block_buf_fix_inc_debug(
{
ibool ret;
ret = rw_lock_s_lock_func_nowait(&(block->debug_latch), file, line);
ret = rw_lock_s_lock_nowait(&(block->debug_latch), file, line);
ut_ad(ret == TRUE);
ut_ad(mutex_own(&block->mutex));
......
......@@ -261,6 +261,29 @@ os_fast_mutex_free(
/*===============*/
os_fast_mutex_t* fast_mutex); /* in: mutex to free */
#ifdef HAVE_GCC_ATOMIC_BUILTINS
/**************************************************************
Atomic compare-and-swap for InnoDB. Currently requires GCC atomic builtins. */
UNIV_INLINE
ibool
os_compare_and_swap(
/*================*/
/* out: true if swapped */
volatile lint* ptr, /* in: pointer to target */
lint oldVal, /* in: value to compare to */
lint newVal); /* in: value to swap in */
/**************************************************************
Atomic increment for InnoDB. Currently requires GCC atomic builtins. */
UNIV_INLINE
lint
os_atomic_increment(
/*================*/
/* out: resulting value */
volatile lint* ptr, /* in: pointer to target */
lint amount); /* in: amount of increment */
#endif /* HAVE_GCC_ATOMIC_BUILTINS */
#ifndef UNIV_NONINL
#include "os0sync.ic"
#endif
......
......@@ -44,3 +44,38 @@ os_fast_mutex_trylock(
#endif
#endif
}
#ifdef HAVE_GCC_ATOMIC_BUILTINS
/**************************************************************
Atomic compare-and-swap for InnoDB. Currently requires GCC atomic builtins. */
UNIV_INLINE
ibool
os_compare_and_swap(
/*================*/
/* out: true if swapped */
volatile lint* ptr, /* in: pointer to target */
lint oldVal, /* in: value to compare to */
lint newVal) /* in: value to swap in */
{
if(__sync_bool_compare_and_swap(ptr, oldVal, newVal)) {
return(TRUE);
}
return(FALSE);
}
/**************************************************************
Atomic increment for InnoDB. Currently requires GCC atomic builtins. */
UNIV_INLINE
lint
os_atomic_increment(
/*================*/
/* out: resulting value */
volatile lint* ptr, /* in: pointer to target */
lint amount) /* in: amount of increment */
{
lint newVal = __sync_add_and_fetch(ptr, amount);
return newVal;
}
#endif /* HAVE_GCC_ATOMIC_BUILTINS */
......@@ -513,6 +513,8 @@ struct export_var_struct{
ulint innodb_buffer_pool_read_ahead_rnd;
ulint innodb_dblwr_pages_written;
ulint innodb_dblwr_writes;
ibool innodb_have_atomic_builtins;
ibool innodb_heap_enabled;
ulint innodb_log_waits;
ulint innodb_log_write_requests;
ulint innodb_log_writes;
......@@ -549,4 +551,3 @@ struct srv_sys_struct{
extern ulint srv_n_threads_active[];
#endif
......@@ -24,6 +24,12 @@ smaller than 30 and the order of the numerical values like below! */
#define RW_X_LATCH 2
#define RW_NO_LATCH 3
/* We decrement lock_word by this amount for each x_lock. It is also the
start value for the lock_word, meaning that it limits the maximum number
of concurrent read locks before the rw_lock breaks. The current value of
0x00100000 allows 1,048,575 concurrent readers and 2047 recursive writers.*/
#define X_LOCK_DECR 0x00100000
typedef struct rw_lock_struct rw_lock_t;
#ifdef UNIV_SYNC_DEBUG
typedef struct rw_lock_debug_struct rw_lock_debug_t;
......@@ -47,14 +53,14 @@ extern ibool rw_lock_debug_waiters; /* This is set to TRUE, if
there may be waiters for the event */
#endif /* UNIV_SYNC_DEBUG */
extern ulint rw_s_system_call_count;
extern ulint rw_s_spin_wait_count;
extern ulint rw_s_exit_count;
extern ulint rw_s_os_wait_count;
extern ulint rw_x_system_call_count;
extern ulint rw_x_spin_wait_count;
extern ulint rw_x_os_wait_count;
extern ulint rw_x_exit_count;
extern ib_longlong rw_s_spin_wait_count;
extern ib_longlong rw_s_spin_round_count;
extern ib_longlong rw_s_exit_count;
extern ib_longlong rw_s_os_wait_count;
extern ib_longlong rw_x_spin_wait_count;
extern ib_longlong rw_x_spin_round_count;
extern ib_longlong rw_x_os_wait_count;
extern ib_longlong rw_x_exit_count;
/**********************************************************************
Creates, or rather, initializes an rw-lock object in a specified memory
......@@ -127,8 +133,8 @@ corresponding function. */
NOTE! The following macros should be used in rw s-locking, not the
corresponding function. */
#define rw_lock_s_lock_nowait(M) rw_lock_s_lock_func_nowait(\
(M), __FILE__, __LINE__)
#define rw_lock_s_lock_nowait(M, F, L) rw_lock_s_lock_low(\
(M), 0, (F), (L))
/**********************************************************************
NOTE! Use the corresponding macro, not directly this function, except if
you supply the file name and line number. Lock an rw-lock in shared mode
......@@ -146,18 +152,6 @@ rw_lock_s_lock_func(
const char* file_name,/* in: file name where lock requested */
ulint line); /* in: line where requested */
/**********************************************************************
NOTE! Use the corresponding macro, not directly this function, except if
you supply the file name and line number. Lock an rw-lock in shared mode
for the current thread if the lock can be acquired immediately. */
UNIV_INLINE
ibool
rw_lock_s_lock_func_nowait(
/*=======================*/
/* out: TRUE if success */
rw_lock_t* lock, /* in: pointer to rw-lock */
const char* file_name,/* in: file name where lock requested */
ulint line); /* in: line where requested */
/**********************************************************************
NOTE! Use the corresponding macro, not directly this function! Lock an
rw-lock in exclusive mode for the current thread if the lock can be
obtained immediately. */
......@@ -341,6 +335,23 @@ ulint
rw_lock_get_reader_count(
/*=====================*/
rw_lock_t* lock);
/**********************************************************************
Decrements lock_word the specified amount if it is greater than 0.
This is used by both s_lock and x_lock operations. */
UNIV_INLINE
ibool
rw_lock_lock_word_decr(
/* out: TRUE if decr occurs */
rw_lock_t* lock, /* in: rw-lock */
ulint amount); /* in: amount to decrement */
/**********************************************************************
Increments lock_word the specified amount and returns new value. */
UNIV_INLINE
lint
rw_lock_lock_word_incr(
/* out: TRUE if decr occurs */
rw_lock_t* lock,
ulint amount); /* in: rw-lock */
#ifdef UNIV_SYNC_DEBUG
/**********************************************************************
Checks if the thread has locked the rw-lock in the specified mode, with
......@@ -417,44 +428,28 @@ Do not use its fields directly! The structure used in the spin lock
implementation of a read-write lock. Several threads may have a shared lock
simultaneously in this lock, but only one writer may have an exclusive lock,
in which case no shared locks are allowed. To prevent starving of a writer
blocked by readers, a writer may queue for the lock by setting the writer
field. Then no new readers are allowed in. */
blocked by readers, a writer may queue for x-lock by decrementing lock_word:
no new readers will be let in while the thread waits for readers to exit. */
struct rw_lock_struct {
os_event_t event; /* Used by sync0arr.c for thread queueing */
#ifdef __WIN__
os_event_t wait_ex_event; /* This windows specific event is
used by the thread which has set the
lock state to RW_LOCK_WAIT_EX. The
rw_lock design guarantees that this
thread will be the next one to proceed
once the current the event gets
signalled. See LEMMA 2 in sync0sync.c */
#endif
ulint reader_count; /* Number of readers who have locked this
lock in the shared mode */
ulint writer; /* This field is set to RW_LOCK_EX if there
is a writer owning the lock (in exclusive
mode), RW_LOCK_WAIT_EX if a writer is
queueing for the lock, and
RW_LOCK_NOT_LOCKED, otherwise. */
os_thread_id_t writer_thread;
/* Thread id of a possible writer thread */
ulint writer_count; /* Number of times the same thread has
recursively locked the lock in the exclusive
mode */
mutex_t mutex; /* The mutex protecting rw_lock_struct */
ulint pass; /* Default value 0. This is set to some
volatile lint lock_word;
/* Holds the state of the lock. */
volatile ulint waiters;/* 1: there are waiters */
volatile ulint pass; /* Default value 0. This is set to some
value != 0 given by the caller of an x-lock
operation, if the x-lock is to be passed to
another thread to unlock (which happens in
asynchronous i/o). */
ulint waiters; /* This ulint is set to 1 if there are
waiters (readers or writers) in the global
wait array, waiting for this rw_lock.
Otherwise, == 0. */
volatile os_thread_id_t writer_thread;
/* Thread id of writer thread */
os_event_t event; /* Used by sync0arr.c for thread queueing */
os_event_t wait_ex_event;
/* Event for next-writer to wait on. A thread
must decrement lock_word before waiting. */
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_t mutex; /* The mutex protecting rw_lock_struct */
#endif /* HAVE_GCC_ATOMIC_BUILTINS */
UT_LIST_NODE_T(rw_lock_t) list;
/* All allocated rw locks are put into a
list */
......@@ -464,7 +459,9 @@ struct rw_lock_struct {
info list of the lock */
ulint level; /* Level in the global latching order. */
#endif /* UNIV_SYNC_DEBUG */
ulint count_os_wait; /* Count of os_waits. May not be accurate */
const char* cfile_name;/* File name where lock created */
/* last s-lock file/line is not guaranteed to be correct */
const char* last_s_file_name;/* File name where last s-locked */
const char* last_x_file_name;/* File name where last x-locked */
ibool writer_is_wait_ex;
......
......@@ -62,40 +62,48 @@ rw_lock_set_waiters(
{
lock->waiters = flag;
}
/**********************************************************************
Returns the write-status of the lock - this function made more sense
with the old rw_lock implementation.
*/
UNIV_INLINE
ulint
rw_lock_get_writer(
/*===============*/
rw_lock_t* lock)
{
return(lock->writer);
}
UNIV_INLINE
void
rw_lock_set_writer(
/*===============*/
rw_lock_t* lock,
ulint flag)
{
lock->writer = flag;
lint lock_word = lock->lock_word;
if(lock_word > 0) {
/* return NOT_LOCKED in s-lock state, like the writer
member of the old lock implementation. */
return RW_LOCK_NOT_LOCKED;
} else if (((-lock_word) % X_LOCK_DECR) == 0) {
return RW_LOCK_EX;
} else {
ut_ad(lock_word > -X_LOCK_DECR);
return RW_LOCK_WAIT_EX;
}
}
UNIV_INLINE
ulint
rw_lock_get_reader_count(
/*=====================*/
rw_lock_t* lock)
{
return(lock->reader_count);
}
UNIV_INLINE
void
rw_lock_set_reader_count(
/*=====================*/
rw_lock_t* lock,
ulint count)
{
lock->reader_count = count;
lint lock_word = lock->lock_word;
if(lock_word > 0) {
/* s-locked, no x-waiters */
return(X_LOCK_DECR - lock_word);
} else if (lock_word < 0 && lock_word > -X_LOCK_DECR) {
/* s-locked, with x-waiters */
return (ulint)(-lock_word);
}
return 0;
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
UNIV_INLINE
mutex_t*
rw_lock_get_mutex(
......@@ -104,6 +112,7 @@ rw_lock_get_mutex(
{
return(&(lock->mutex));
}
#endif
/**********************************************************************
Returns the value of writer_count for the lock. Does not reserve the lock
......@@ -115,7 +124,87 @@ rw_lock_get_x_lock_count(
/* out: value of writer_count */
rw_lock_t* lock) /* in: rw-lock */
{
return(lock->writer_count);
lint lock_copy = lock->lock_word;
/* If there is a reader, lock_word is not divisible by X_LOCK_DECR */
if(lock_copy > 0 || (-lock_copy) % X_LOCK_DECR != 0) {
return 0;
}
return ((-lock_copy) / X_LOCK_DECR) + 1;
}
/**********************************************************************
Two different implementations for decrementing the lock_word of a rw_lock:
one for systems supporting atomic operations, one for others. This does
does not support recusive x-locks: they should be handled by the caller and
need not be atomic since they are performed by the current lock holder.
Returns true if the decrement was made, false if not. */
UNIV_INLINE
ibool
rw_lock_lock_word_decr(
/* out: TRUE if decr occurs */
rw_lock_t* lock, /* in: rw-lock */
ulint amount) /* in: amount of decrement */
{
#ifdef HAVE_GCC_ATOMIC_BUILTINS
lint local_lock_word = lock->lock_word;
while (local_lock_word > 0) {
if(os_compare_and_swap(&(lock->lock_word),
local_lock_word,
local_lock_word - amount)) {
return TRUE;
}
local_lock_word = lock->lock_word;
}
return(FALSE);
#else /* HAVE_GCC_ATOMIC_BUILTINS */
ibool success = FALSE;
mutex_enter(&(lock->mutex));
if(lock->lock_word > 0) {
lock->lock_word -= amount;
success = TRUE;
}
mutex_exit(&(lock->mutex));
return success;
#endif /* HAVE_GCC_ATOMIC_BUILTINS */
}
/**********************************************************************
Two different implementations for incrementing the lock_word of a rw_lock:
one for systems supporting atomic operations, one for others.
Returns the value of lock_word after increment. */
UNIV_INLINE
lint
rw_lock_lock_word_incr(
/* out: lock->lock_word after increment */
rw_lock_t* lock, /* in: rw-lock */
ulint amount) /* in: amount of increment */
{
#ifdef HAVE_GCC_ATOMIC_BUILTINS
return(os_atomic_increment(&(lock->lock_word), amount));
#else /* HAVE_GCC_ATOMIC_BUILTINS */
lint local_lock_word;
mutex_enter(&(lock->mutex));
lock->lock_word += amount;
local_lock_word = lock->lock_word;
mutex_exit(&(lock->mutex));
return local_lock_word;
#endif /* HAVE_GCC_ATOMIC_BUILTINS */
}
/**********************************************************************
......@@ -133,25 +222,21 @@ rw_lock_s_lock_low(
const char* file_name, /* in: file name where lock requested */
ulint line) /* in: line where requested */
{
ut_ad(mutex_own(rw_lock_get_mutex(lock)));
/* Check if the writer field is free */
if (UNIV_LIKELY(lock->writer == RW_LOCK_NOT_LOCKED)) {
/* Set the shared lock by incrementing the reader count */
lock->reader_count++;
/* TODO: study performance of UNIV_LIKELY branch prediction hints. */
if (!rw_lock_lock_word_decr(lock, 1)) {
/* Locking did not succeed */
return(FALSE);
}
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, pass, RW_LOCK_SHARED, file_name,
line);
rw_lock_add_debug_info(lock, pass, RW_LOCK_SHARED, file_name, line);
#endif
lock->last_s_file_name = file_name;
lock->last_s_line = line;
return(TRUE); /* locking succeeded */
}
/* These debugging values are not set safely: they may be incorrect
or even refer to a line that is invalid for the file name. */
lock->last_s_file_name = file_name;
lock->last_s_line = line;
return(FALSE); /* locking did not succeed */
return(TRUE); /* locking succeeded */
}
/**********************************************************************
......@@ -166,11 +251,10 @@ rw_lock_s_lock_direct(
const char* file_name, /* in: file name where requested */
ulint line) /* in: line where lock requested */
{
ut_ad(lock->writer == RW_LOCK_NOT_LOCKED);
ut_ad(rw_lock_get_reader_count(lock) == 0);
ut_ad(lock->lock_word == X_LOCK_DECR);
/* Set the shared lock by incrementing the reader count */
lock->reader_count++;
/* Indicate there is a new reader by decrementing lock_word */
lock->lock_word--;
lock->last_s_file_name = file_name;
lock->last_s_line = line;
......@@ -193,12 +277,10 @@ rw_lock_x_lock_direct(
ulint line) /* in: line where lock requested */
{
ut_ad(rw_lock_validate(lock));
ut_ad(rw_lock_get_reader_count(lock) == 0);
ut_ad(rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED);
ut_ad(lock->lock_word == X_LOCK_DECR);
rw_lock_set_writer(lock, RW_LOCK_EX);
lock->lock_word -= X_LOCK_DECR;
lock->writer_thread = os_thread_get_curr_id();
lock->writer_count++;
lock->pass = 0;
lock->last_x_file_name = file_name;
......@@ -240,15 +322,12 @@ rw_lock_s_lock_func(
ut_ad(!rw_lock_own(lock, RW_LOCK_SHARED)); /* see NOTE above */
#endif /* UNIV_SYNC_DEBUG */
mutex_enter(rw_lock_get_mutex(lock));
if (UNIV_LIKELY(rw_lock_s_lock_low(lock, pass, file_name, line))) {
mutex_exit(rw_lock_get_mutex(lock));
/* TODO: study performance of UNIV_LIKELY branch prediction hints. */
if (rw_lock_s_lock_low(lock, pass, file_name, line)) {
return; /* Success */
} else {
/* Did not succeed, try spin wait */
mutex_exit(rw_lock_get_mutex(lock));
rw_lock_s_lock_spin(lock, pass, file_name, line);
......@@ -258,86 +337,66 @@ rw_lock_s_lock_func(
/**********************************************************************
NOTE! Use the corresponding macro, not directly this function! Lock an
rw-lock in shared mode for the current thread if the lock can be acquired
immediately. */
rw-lock in exclusive mode for the current thread if the lock can be
obtained immediately. */
UNIV_INLINE
ibool
rw_lock_s_lock_func_nowait(
rw_lock_x_lock_func_nowait(
/*=======================*/
/* out: TRUE if success */
rw_lock_t* lock, /* in: pointer to rw-lock */
const char* file_name,/* in: file name where lock requested */
ulint line) /* in: line where requested */
{
ibool success = FALSE;
mutex_enter(rw_lock_get_mutex(lock));
os_thread_id_t curr_thread = os_thread_get_curr_id();
if (lock->writer == RW_LOCK_NOT_LOCKED) {
/* Set the shared lock by incrementing the reader count */
lock->reader_count++;
ibool success;
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, 0, RW_LOCK_SHARED, file_name,
line);
#endif
lock->last_s_file_name = file_name;
lock->last_s_line = line;
#ifdef HAVE_GCC_ATOMIC_BUILTINS
success = os_compare_and_swap(&(lock->lock_word), X_LOCK_DECR, 0);
#else
success = FALSE;
mutex_enter(&(lock->mutex));
if(lock->lock_word == X_LOCK_DECR) {
lock->lock_word = 0;
success = TRUE;
}
mutex_exit(&(lock->mutex));
mutex_exit(rw_lock_get_mutex(lock));
return(success);
}
/**********************************************************************
NOTE! Use the corresponding macro, not directly this function! Lock an
rw-lock in exclusive mode for the current thread if the lock can be
obtained immediately. */
UNIV_INLINE
ibool
rw_lock_x_lock_func_nowait(
/*=======================*/
/* out: TRUE if success */
rw_lock_t* lock, /* in: pointer to rw-lock */
const char* file_name,/* in: file name where lock requested */
ulint line) /* in: line where requested */
{
ibool success = FALSE;
os_thread_id_t curr_thread = os_thread_get_curr_id();
mutex_enter(rw_lock_get_mutex(lock));
if (UNIV_UNLIKELY(rw_lock_get_reader_count(lock) != 0)) {
} else if (UNIV_LIKELY(rw_lock_get_writer(lock)
== RW_LOCK_NOT_LOCKED)) {
rw_lock_set_writer(lock, RW_LOCK_EX);
#endif
if(success) {
lock->writer_thread = curr_thread;
lock->pass = 0;
relock:
lock->writer_count++;
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, 0, RW_LOCK_EX, file_name, line);
#endif
} else if (!(lock->pass) &&
os_thread_eq(lock->writer_thread, curr_thread)) {
/* Must verify pass first: otherwise another thread can
call move_ownership suddenly allowing recursive locks.
and after we have verified our thread_id matches
(though move_ownership has since changed it).*/
lock->last_x_file_name = file_name;
lock->last_x_line = line;
/* Relock: this lock_word modification is safe since no other
threads can modify (lock, unlock, or reserve) lock_word while
there is an exclusive writer and this is the writer thread. */
lock->lock_word -= X_LOCK_DECR;
success = TRUE;
} else if (rw_lock_get_writer(lock) == RW_LOCK_EX
&& lock->pass == 0
&& os_thread_eq(lock->writer_thread, curr_thread)) {
goto relock;
ut_ad(((-lock->lock_word) % X_LOCK_DECR) == 0);
} else {
/* Failure */
return(FALSE);
}
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, 0, RW_LOCK_EX, file_name, line);
#endif
mutex_exit(rw_lock_get_mutex(lock));
lock->last_x_file_name = file_name;
lock->last_x_line = line;
ut_ad(rw_lock_validate(lock));
return(success);
return(TRUE);
}
/**********************************************************************
......@@ -353,39 +412,21 @@ rw_lock_s_unlock_func(
#endif
)
{
mutex_t* mutex = &(lock->mutex);
ibool sg = FALSE;
/* Acquire the mutex protecting the rw-lock fields */
mutex_enter(mutex);
/* Reset the shared lock by decrementing the reader count */
ut_a(lock->reader_count > 0);
lock->reader_count--;
ut_ad((lock->lock_word % X_LOCK_DECR) != 0);
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, pass, RW_LOCK_SHARED);
#endif
/* If there may be waiters and this was the last s-lock,
signal the object */
if (UNIV_UNLIKELY(lock->waiters)
&& lock->reader_count == 0) {
sg = TRUE;
/* Increment lock_word to indicate 1 less reader */
if(rw_lock_lock_word_incr(lock, 1) == 0) {
rw_lock_set_waiters(lock, 0);
}
mutex_exit(mutex);
if (UNIV_UNLIKELY(sg)) {
#ifdef __WIN__
/* wait_ex waiter exists. It may not be asleep, but we signal
anyway. We do not wake other waiters, because they can't
exist without wait_ex waiter and wait_ex waiter goes first.*/
os_event_set(lock->wait_ex_event);
#endif
os_event_set(lock->event);
sync_array_object_signalled(sync_primary_wait_array);
}
ut_ad(rw_lock_validate(lock));
......@@ -404,16 +445,15 @@ rw_lock_s_unlock_direct(
/*====================*/
rw_lock_t* lock) /* in: rw-lock */
{
/* Reset the shared lock by decrementing the reader count */
ut_ad(lock->reader_count > 0);
lock->reader_count--;
ut_ad(lock->lock_word < X_LOCK_DECR);
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, 0, RW_LOCK_SHARED);
#endif
/* Decrease reader count by incrementing lock_word */
lock->lock_word++;
ut_ad(!lock->waiters);
ut_ad(rw_lock_validate(lock));
#ifdef UNIV_SYNC_PERF_STAT
......@@ -434,42 +474,32 @@ rw_lock_x_unlock_func(
#endif
)
{
ibool sg = FALSE;
ut_ad((lock->lock_word % X_LOCK_DECR) == 0);
/* Acquire the mutex protecting the rw-lock fields */
mutex_enter(&(lock->mutex));
/* Reset the exclusive lock if this thread no longer has an x-mode
lock */
ut_ad(lock->writer_count > 0);
lock->writer_count--;
if (lock->writer_count == 0) {
rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED);
}
/* Must reset writer_thread while we still have the lock.
If we are not the last unlocker, we correct it later in the function,
which is harmless since we still hold the lock. */
/* TODO: are there any risks of a thread id == -1 on any platform? */
os_thread_id_t local_writer_thread = lock->writer_thread;
lock->writer_thread = -1;
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, pass, RW_LOCK_EX);
#endif
/* If there may be waiters, signal the lock */
if (UNIV_UNLIKELY(lock->waiters)
&& lock->writer_count == 0) {
sg = TRUE;
rw_lock_set_waiters(lock, 0);
}
mutex_exit(&(lock->mutex));
if(rw_lock_lock_word_incr(lock, X_LOCK_DECR) == X_LOCK_DECR) {
/* Lock is now free. May have to signal read/write waiters.
We do not need to signal wait_ex waiters, since they cannot
exist when there is a writer. */
if(lock->waiters) {
rw_lock_set_waiters(lock, 0);
os_event_set(lock->event);
sync_array_object_signalled(sync_primary_wait_array);
}
if (UNIV_UNLIKELY(sg)) {
#ifdef __WIN__
os_event_set(lock->wait_ex_event);
#endif
os_event_set(lock->event);
sync_array_object_signalled(sync_primary_wait_array);
} else {
/* We still hold x-lock, so we correct writer_thread. */
lock->writer_thread = local_writer_thread;
}
ut_ad(rw_lock_validate(lock));
......@@ -491,18 +521,14 @@ rw_lock_x_unlock_direct(
/* Reset the exclusive lock if this thread no longer has an x-mode
lock */
ut_ad(lock->writer_count > 0);
lock->writer_count--;
if (lock->writer_count == 0) {
rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED);
}
ut_ad((lock->lock_word % X_LOCK_DECR) == 0);
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, 0, RW_LOCK_EX);
#endif
lock->lock_word += X_LOCK_DECR;
ut_ad(!lock->waiters);
ut_ad(rw_lock_validate(lock));
......
......@@ -252,7 +252,7 @@ mutex_n_reserved(void);
NOT to be used outside this module except in debugging! Gets the value
of the lock word. */
UNIV_INLINE
ulint
byte
mutex_get_lock_word(
/*================*/
const mutex_t* mutex); /* in: mutex */
......@@ -471,9 +471,13 @@ implementation of a mutual exclusion semaphore. */
struct mutex_struct {
os_event_t event; /* Used by sync0arr.c for the wait queue */
ulint lock_word; /* This ulint is the target of the atomic
test-and-set instruction in Win32 */
#if !defined(_WIN32) || !defined(UNIV_CAN_USE_X86_ASSEMBLER)
byte lock_word; /* This byte is the target of the atomic
test-and-set instruction in Win32 and
x86 32/64 with GCC 4.1.0 or later version */
#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
#else
os_fast_mutex_t
os_fast_mutex; /* In other systems we use this OS mutex
in place of lock_word */
......@@ -526,8 +530,7 @@ to 20 microseconds. */
/* The number of system calls made in this module. Intended for performance
monitoring. */
extern ulint mutex_system_call_count;
extern ulint mutex_exit_count;
extern ib_longlong mutex_exit_count;
#ifdef UNIV_SYNC_DEBUG
/* Latching order checks start when this is set TRUE */
......
......@@ -6,16 +6,6 @@ Mutex, the basic synchronization primitive
Created 9/5/1995 Heikki Tuuri
*******************************************************/
#if defined(not_defined) && defined(__GNUC__) && defined(UNIV_INTEL_X86)
/* %z0: Use the size of operand %0 which in our case is *m to determine
instruction size, it should end up as xchgl. "1" in the input constraint,
says that "in" has to go in the same place as "out".*/
#define TAS(m, in, out) \
asm volatile ("xchg%z0 %2, %0" \
: "=g" (*(m)), "=r" (out) \
: "1" (in)) /* Note: "1" here refers to "=r" (out) */
#endif
/**********************************************************************
Sets the waiters field in a mutex. */
......@@ -59,7 +49,7 @@ mutex_signal_object(
Performs an atomic test-and-set instruction to the lock_word field of a
mutex. */
UNIV_INLINE
ulint
byte
mutex_test_and_set(
/*===============*/
/* out: the previous value of lock_word: 0 or
......@@ -67,18 +57,18 @@ mutex_test_and_set(
mutex_t* mutex) /* in: mutex */
{
#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
ulint res;
ulint* lw; /* assembler code is used to ensure that
byte res;
byte* lw; /* assembler code is used to ensure that
lock_word is loaded from memory */
ut_ad(mutex);
ut_ad(sizeof(ulint) == 4);
ut_ad(sizeof(byte) == 1);
lw = &(mutex->lock_word);
__asm MOV ECX, lw
__asm MOV EDX, 1
__asm XCHG EDX, DWORD PTR [ECX]
__asm MOV res, EDX
__asm XCHG DL, BYTE PTR [ECX]
__asm MOV res, DL
/* The fence below would prevent this thread from
reading the data structure protected by the mutex
......@@ -98,12 +88,8 @@ mutex_test_and_set(
/* mutex_fence(); */
return(res);
#elif defined(not_defined) && defined(__GNUC__) && defined(UNIV_INTEL_X86)
ulint res;
TAS(&mutex->lock_word, 1, res);
return(res);
#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
return __sync_lock_test_and_set(&(mutex->lock_word), 1);
#else
ibool ret;
......@@ -117,7 +103,7 @@ mutex_test_and_set(
mutex->lock_word = 1;
}
return(ret);
return((byte)ret);
#endif
}
......@@ -131,7 +117,7 @@ mutex_reset_lock_word(
mutex_t* mutex) /* in: mutex */
{
#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
ulint* lw; /* assembler code is used to ensure that
byte* lw; /* assembler code is used to ensure that
lock_word is loaded from memory */
ut_ad(mutex);
......@@ -139,11 +125,12 @@ mutex_reset_lock_word(
__asm MOV EDX, 0
__asm MOV ECX, lw
__asm XCHG EDX, DWORD PTR [ECX]
#elif defined(not_defined) && defined(__GNUC__) && defined(UNIV_INTEL_X86)
ulint res;
TAS(&mutex->lock_word, 0, res);
__asm XCHG DL, BYTE PTR [ECX]
#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
/* In theory __sync_lock_release should be used to release the lock.
Unfortunately, it does not work properly alone. The workaround is
that more conservative __sync_lock_test_and_set is used instead. */
__sync_lock_test_and_set(&(mutex->lock_word), 0);
#else
mutex->lock_word = 0;
......@@ -154,12 +141,12 @@ mutex_reset_lock_word(
/**********************************************************************
Gets the value of the lock word. */
UNIV_INLINE
ulint
byte
mutex_get_lock_word(
/*================*/
const mutex_t* mutex) /* in: mutex */
{
const volatile ulint* ptr; /* declared volatile to ensure that
const volatile byte* ptr; /* declared volatile to ensure that
lock_word is loaded from memory */
ut_ad(mutex);
......
......@@ -116,6 +116,9 @@ by one. */
#define UNIV_SET_MEM_TO_ZERO
#endif
/* Use malloc instead of innodb additional memory pool (great with tcmalloc) */
#define UNIV_DISABLE_MEM_POOL
/*
#define UNIV_SQL_DEBUG
#define UNIV_LOG_DEBUG
......
......@@ -329,6 +329,9 @@ mem_area_alloc(
minus MEM_AREA_EXTRA_SIZE */
mem_pool_t* pool) /* in: memory pool */
{
#ifdef UNIV_DISABLE_MEM_POOL
return malloc(size);
#else /* UNIV_DISABLE_MEM_POOL */
mem_area_t* area;
ulint n;
ibool ret;
......@@ -407,6 +410,7 @@ mem_area_alloc(
ut_2_exp(n) - MEM_AREA_EXTRA_SIZE);
return((void*)(MEM_AREA_EXTRA_SIZE + ((byte*)area)));
#endif /* UNIV_DISABLE_MEM_POOL */
}
/************************************************************************
......@@ -459,6 +463,9 @@ mem_area_free(
buffer */
mem_pool_t* pool) /* in: memory pool */
{
#ifdef UNIV_DISABLE_MEM_POOL
free(ptr);
#else /* UNIV_DISABLE_MEM_POOL */
mem_area_t* area;
mem_area_t* buddy;
void* new_ptr;
......@@ -570,6 +577,7 @@ mem_area_free(
mutex_exit(&(pool->mutex));
ut_ad(mem_pool_validate(pool));
#endif /* UNIV_DISABLE_MEM_POOL */
}
/************************************************************************
......
......@@ -1248,7 +1248,7 @@ table_loop:
rw_lock_s_lock(&btr_search_latch);
search_latch_locked = TRUE;
} else if (btr_search_latch.writer_is_wait_ex) {
} else if (rw_lock_get_writer(&btr_search_latch) == RW_LOCK_WAIT_EX) {
/* There is an x-latch request waiting: release the
s-latch for a moment; as an s-latch here is often
......@@ -3327,7 +3327,7 @@ row_search_for_mysql(
/* PHASE 0: Release a possible s-latch we are holding on the
adaptive hash index latch if there is someone waiting behind */
if (UNIV_UNLIKELY(btr_search_latch.writer != RW_LOCK_NOT_LOCKED)
if (UNIV_UNLIKELY(rw_lock_get_writer(&btr_search_latch) != RW_LOCK_NOT_LOCKED)
&& trx->has_search_latch) {
/* There is an x-latch request on the adaptive hash index:
......
......@@ -1834,6 +1834,16 @@ srv_export_innodb_status(void)
export_vars.innodb_buffer_pool_pages_misc = buf_pool->max_size
- UT_LIST_GET_LEN(buf_pool->LRU)
- UT_LIST_GET_LEN(buf_pool->free);
#ifdef HAVE_GCC_ATOMIC_BUILTINS
export_vars.innodb_have_atomic_builtins = 1;
#else
export_vars.innodb_have_atomic_builtins = 0;
#endif
#ifdef UNIV_DISABLE_MEM_POOL
export_vars.innodb_heap_enabled = 0;
#else
export_vars.innodb_heap_enabled = 1;
#endif
export_vars.innodb_page_size = UNIV_PAGE_SIZE;
export_vars.innodb_log_waits = srv_log_waits;
export_vars.innodb_os_log_written = srv_os_log_written;
......
......@@ -1062,6 +1062,16 @@ innobase_start_or_create_for_mysql(void)
return(DB_ERROR);
}
#ifdef UNIV_DISABLE_MEM_POOL
fprintf(stderr,
"InnoDB: The InnoDB memory heap has been disabled.\n");
#endif
#ifdef HAVE_GCC_ATOMIC_BUILTINS
fprintf(stderr,
"InnoDB: Mutex and rw_lock use GCC atomic builtins.\n");
#endif
/* Since InnoDB does not currently clean up all its internal data
structures in MySQL Embedded Server Library server_end(), we
print an error message if someone tries to start up InnoDB a
......
......@@ -295,28 +295,25 @@ sync_array_validate(
}
/***********************************************************************
Puts the cell event in reset state. */
Returns the event that the thread owning the cell waits for. */
static
ib_longlong
sync_cell_event_reset(
/*==================*/
/* out: value of signal_count
at the time of reset. */
ulint type, /* in: lock type mutex/rw_lock */
void* object) /* in: the rw_lock/mutex object */
os_event_t
sync_cell_get_event(
/*================*/
sync_cell_t* cell) /* in: non-empty sync array cell */
{
ulint type = cell->request_type;
if (type == SYNC_MUTEX) {
return(os_event_reset(((mutex_t *) object)->event));
#ifdef __WIN__
return(((mutex_t *) cell->wait_object)->event);
} else if (type == RW_LOCK_WAIT_EX) {
return(os_event_reset(
((rw_lock_t *) object)->wait_ex_event));
#endif
} else {
return(os_event_reset(((rw_lock_t *) object)->event));
return(((rw_lock_t *) cell->wait_object)->wait_ex_event);
} else { /* RW_LOCK_SHARED and RW_LOCK_EX wait on the same event */
return(((rw_lock_t *) cell->wait_object)->event);
}
}
/**********************************************************************
Reserves a wait array cell for waiting for an object.
The event of the cell is reset to nonsignalled state. */
......@@ -332,6 +329,7 @@ sync_array_reserve_cell(
ulint* index) /* out: index of the reserved cell */
{
sync_cell_t* cell;
os_event_t event;
ulint i;
ut_a(object);
......@@ -370,8 +368,8 @@ sync_array_reserve_cell(
/* Make sure the event is reset and also store
the value of signal_count at which the event
was reset. */
cell->signal_count = sync_cell_event_reset(type,
object);
event = sync_cell_get_event(cell);
cell->signal_count = os_event_reset(event);
cell->reservation_time = time(NULL);
......@@ -411,19 +409,7 @@ sync_array_wait_event(
ut_a(!cell->waiting);
ut_ad(os_thread_get_curr_id() == cell->thread);
if (cell->request_type == SYNC_MUTEX) {
event = ((mutex_t*) cell->wait_object)->event;
#ifdef __WIN__
/* On windows if the thread about to wait is the one which
has set the state of the rw_lock to RW_LOCK_WAIT_EX, then
it waits on a special event i.e.: wait_ex_event. */
} else if (cell->request_type == RW_LOCK_WAIT_EX) {
event = ((rw_lock_t*) cell->wait_object)->wait_ex_event;
#endif
} else {
event = ((rw_lock_t*) cell->wait_object)->event;
}
event = sync_cell_get_event(cell);
cell->waiting = TRUE;
#ifdef UNIV_SYNC_DEBUG
......@@ -462,6 +448,7 @@ sync_array_cell_print(
mutex_t* mutex;
rw_lock_t* rwlock;
ulint type;
ulint writer;
type = cell->request_type;
......@@ -491,9 +478,7 @@ sync_array_cell_print(
(ulong) mutex->waiters);
} else if (type == RW_LOCK_EX
#ifdef __WIN__
|| type == RW_LOCK_WAIT_EX
#endif
|| type == RW_LOCK_SHARED) {
fputs(type == RW_LOCK_EX ? "X-lock on" : "S-lock on", file);
......@@ -504,22 +489,25 @@ sync_array_cell_print(
" RW-latch at %p created in file %s line %lu\n",
(void*) rwlock, rwlock->cfile_name,
(ulong) rwlock->cline);
if (rwlock->writer != RW_LOCK_NOT_LOCKED) {
writer = rw_lock_get_writer(rwlock);
if (writer != RW_LOCK_NOT_LOCKED) {
fprintf(file,
"a writer (thread id %lu) has"
" reserved it in mode %s",
(ulong) os_thread_pf(rwlock->writer_thread),
rwlock->writer == RW_LOCK_EX
writer == RW_LOCK_EX
? " exclusive\n"
: " wait exclusive\n");
}
fprintf(file,
"number of readers %lu, waiters flag %lu\n"
"number of readers %lu, waiters flag %lu, "
"lock_word: %ld\n"
"Last time read locked in file %s line %lu\n"
"Last time write locked in file %s line %lu\n",
(ulong) rwlock->reader_count,
(ulong) rw_lock_get_reader_count(rwlock),
(ulong) rwlock->waiters,
rwlock->lock_word,
rwlock->last_s_file_name,
(ulong) rwlock->last_s_line,
rwlock->last_x_file_name,
......@@ -778,28 +766,30 @@ sync_arr_cell_can_wake_up(
return(TRUE);
}
} else if (cell->request_type == RW_LOCK_EX
|| cell->request_type == RW_LOCK_WAIT_EX) {
} else if (cell->request_type == RW_LOCK_EX) {
lock = cell->wait_object;
if (rw_lock_get_reader_count(lock) == 0
&& rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
/* X_LOCK_DECR is the unlocked state */
if (lock->lock_word == X_LOCK_DECR) {
return(TRUE);
}
if (rw_lock_get_reader_count(lock) == 0
&& rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX
&& os_thread_eq(lock->writer_thread, cell->thread)) {
} else if (cell->request_type == RW_LOCK_WAIT_EX) {
lock = cell->wait_object;
/* lock_word == 0 means all readers have left */
if (lock->lock_word == 0) {
return(TRUE);
}
} else if (cell->request_type == RW_LOCK_SHARED) {
lock = cell->wait_object;
if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
/* lock_word > 0 means no writer or reserved writer */
if (lock->lock_word > 0) {
return(TRUE);
}
......@@ -844,11 +834,15 @@ sync_array_object_signalled(
/*========================*/
sync_array_t* arr) /* in: wait array */
{
#ifdef HAVE_GCC_ATOMIC_BUILTINS
__sync_fetch_and_add(&(arr->sg_count),1);
#else
sync_array_enter(arr);
arr->sg_count++;
sync_array_exit(arr);
#endif
}
/**************************************************************************
......@@ -868,6 +862,7 @@ sync_arr_wake_threads_if_sema_free(void)
sync_cell_t* cell;
ulint count;
ulint i;
os_event_t event;
sync_array_enter(arr);
......@@ -877,36 +872,20 @@ sync_arr_wake_threads_if_sema_free(void)
while (count < arr->n_reserved) {
cell = sync_array_get_nth_cell(arr, i);
i++;
if (cell->wait_object != NULL) {
if (cell->wait_object == NULL) {
continue;
}
count++;
if (sync_arr_cell_can_wake_up(cell)) {
if (cell->request_type == SYNC_MUTEX) {
mutex_t* mutex;
event = sync_cell_get_event(cell);
mutex = cell->wait_object;
os_event_set(mutex->event);
#ifdef __WIN__
} else if (cell->request_type
== RW_LOCK_WAIT_EX) {
rw_lock_t* lock;
lock = cell->wait_object;
os_event_set(lock->wait_ex_event);
#endif
} else {
rw_lock_t* lock;
lock = cell->wait_object;
os_event_set(lock->event);
}
}
os_event_set(event);
}
i++;
}
sync_array_exit(arr);
......@@ -1026,4 +1005,3 @@ sync_array_print_info(
sync_array_exit(arr);
}
......@@ -15,35 +15,111 @@ Created 9/11/1995 Heikki Tuuri
#include "mem0mem.h"
#include "srv0srv.h"
/* number of system calls made during shared latching */
ulint rw_s_system_call_count = 0;
/*
IMPLEMENTATION OF THE RW_LOCK
=============================
The status of a rw_lock is held in lock_word. The initial value of lock_word is
X_LOCK_DECR. lock_word is decremented by 1 for each s-lock and by X_LOCK_DECR
for each x-lock. This describes the lock state for each value of lock_word:
lock_word == X_LOCK_DECR: Unlocked.
0 < lock_word < X_LOCK_DECR: Read locked, no waiting writers.
(X_LOCK_DECR - lock_word) is the
number of readers that hold the lock.
lock_word == 0: Write locked
-X_LOCK_DECR < lock_word < 0: Read locked, with a waiting writer.
(-lock_word) is the number of readers
that hold the lock.
lock_word <= -X_LOCK_DECR: Recursively write locked. lock_word has been
decremented by X_LOCK_DECR once for each lock,
so the number of locks is:
((-lock_word) / X_LOCK_DECR) + 1
When lock_word <= -X_LOCK_DECR, we also know that lock_word % X_LOCK_DECR == 0:
other values of lock_word are invalid.
The lock_word is always read and updated atomically and consistently, so that
it always represents the state of the lock, and the state of the lock changes
with a single atomic operation. This lock_word holds all of the information
that a thread needs in order to determine if it is eligible to gain the lock
or if it must spin or sleep. The one exception to this is that writer_thread
must be verified before recursive write locks: to solve this scenario, we make
writer_thread readable by all threads, but only writeable by the x-lock holder.
The other members of the lock obey the following rules to remain consistent:
pass: This is only set to 1 to prevent recursive x-locks. It must
be set as specified by x_lock caller after the lock_word
indicates that the thread holds the lock, but before that
thread resumes execution. It must be reset to 0 during the
final x_unlock, but before the lock_word status is updated.
When an x_lock or move_ownership call wishes to change
pass, it must first update the writer_thread appropriately.
writer_thread: Must be set to the writers thread_id after the lock_word
indicates that the thread holds the lock, but before that
thread resumes execution. It must be reset to -1 during the
final x_unlock, but before the lock_word status is updated.
This ensures that when the lock_word indicates that an x_lock
is held, the only legitimate values for writer_thread are -1
(x_lock function hasn't completed) or the writer's thread_id.
waiters: May be set to 1 anytime, but to avoid unnecessary wake-up
signals, it should only be set to 1 when there are threads
waiting on event. Must be 1 when a writer starts waiting to
ensure the current x-locking thread sends a wake-up signal
during unlock. May only be reset to 0 immediately before a
a wake-up signal is sent to event.
event: Threads wait on event for read or writer lock when another
thread has an x-lock or an x-lock reservation (wait_ex). A
thread may only wait on event after performing the following
actions in order:
(1) Record the counter value of event (with os_event_reset).
(2) Set waiters to 1.
(3) Verify lock_word <= 0.
(1) must come before (2) to ensure signal is not missed.
(2) must come before (3) to ensure a signal is sent.
These restrictions force the above ordering.
Immediately before sending the wake-up signal, we should:
(1) Verify lock_word == X_LOCK_DECR (unlocked)
(2) Reset waiters to 0.
wait_ex_event: A thread may only wait on the wait_ex_event after it has
performed the following actions in order:
(1) Decrement lock_word by X_LOCK_DECR.
(2) Record counter value of wait_ex_event (os_event_reset,
called from sync_array_reserve_cell).
(3) Verify that lock_word < 0.
(1) must come first to ensures no other threads become reader
or next writer, and notifies unlocker that signal must be sent.
(2) must come before (3) to ensure the signal is not missed.
These restrictions force the above ordering.
Immediately before sending the wake-up signal, we should:
Verify lock_word == 0 (waiting thread holds x_lock)
*/
/* number of spin waits on rw-latches,
resulted during shared (read) locks */
ulint rw_s_spin_wait_count = 0;
ib_longlong rw_s_spin_wait_count = 0;
ib_longlong rw_s_spin_round_count = 0;
/* number of OS waits on rw-latches,
resulted during shared (read) locks */
ulint rw_s_os_wait_count = 0;
ib_longlong rw_s_os_wait_count = 0;
/* number of unlocks (that unlock shared locks),
set only when UNIV_SYNC_PERF_STAT is defined */
ulint rw_s_exit_count = 0;
/* number of system calls made during exclusive latching */
ulint rw_x_system_call_count = 0;
ib_longlong rw_s_exit_count = 0;
/* number of spin waits on rw-latches,
resulted during exclusive (write) locks */
ulint rw_x_spin_wait_count = 0;
ib_longlong rw_x_spin_wait_count = 0;
ib_longlong rw_x_spin_round_count = 0;
/* number of OS waits on rw-latches,
resulted during exclusive (write) locks */
ulint rw_x_os_wait_count = 0;
ib_longlong rw_x_os_wait_count = 0;
/* number of unlocks (that unlock exclusive locks),
set only when UNIV_SYNC_PERF_STAT is defined */
ulint rw_x_exit_count = 0;
ib_longlong rw_x_exit_count = 0;
/* The global list of rw-locks */
rw_lock_list_t rw_lock_list;
......@@ -119,6 +195,7 @@ rw_lock_create_func(
/* If this is the very first time a synchronization object is
created, then the following call initializes the sync system. */
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_create(rw_lock_get_mutex(lock), SYNC_NO_ORDER_CHECK);
lock->mutex.cfile_name = cfile_name;
......@@ -129,12 +206,12 @@ rw_lock_create_func(
lock->mutex.mutex_type = 1;
#endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */
rw_lock_set_waiters(lock, 0);
rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED);
lock->writer_count = 0;
rw_lock_set_reader_count(lock, 0);
#endif /* HAVE_GCC_ATOMIC_BUILTINS */
lock->writer_is_wait_ex = FALSE;
lock->lock_word = X_LOCK_DECR;
rw_lock_set_waiters(lock, 0);
lock->writer_thread = -1;
lock->pass = 0;
#ifdef UNIV_SYNC_DEBUG
UT_LIST_INIT(lock->debug_list);
......@@ -147,15 +224,13 @@ rw_lock_create_func(
lock->cfile_name = cfile_name;
lock->cline = (unsigned int) cline;
lock->count_os_wait = 0;
lock->last_s_file_name = "not yet reserved";
lock->last_x_file_name = "not yet reserved";
lock->last_s_line = 0;
lock->last_x_line = 0;
lock->event = os_event_create(NULL);
#ifdef __WIN__
lock->wait_ex_event = os_event_create(NULL);
#endif
mutex_enter(&rw_lock_list_mutex);
......@@ -180,20 +255,19 @@ rw_lock_free(
rw_lock_t* lock) /* in: rw-lock */
{
ut_ad(rw_lock_validate(lock));
ut_a(rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED);
ut_a(lock->lock_word == X_LOCK_DECR);
ut_a(rw_lock_get_waiters(lock) == 0);
ut_a(rw_lock_get_reader_count(lock) == 0);
lock->magic_n = 0;
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_free(rw_lock_get_mutex(lock));
#endif /* HAVE_GCC_ATOMIC_BUILTINS */
mutex_enter(&rw_lock_list_mutex);
os_event_free(lock->event);
#ifdef __WIN__
os_event_free(lock->wait_ex_event);
#endif
if (UT_LIST_GET_PREV(list, lock)) {
ut_a(UT_LIST_GET_PREV(list, lock)->magic_n == RW_LOCK_MAGIC_N);
......@@ -219,19 +293,12 @@ rw_lock_validate(
{
ut_a(lock);
mutex_enter(rw_lock_get_mutex(lock));
ulint waiters = rw_lock_get_waiters(lock);
lint lock_word = lock->lock_word;
ut_a(lock->magic_n == RW_LOCK_MAGIC_N);
ut_a((rw_lock_get_reader_count(lock) == 0)
|| (rw_lock_get_writer(lock) != RW_LOCK_EX));
ut_a((rw_lock_get_writer(lock) == RW_LOCK_EX)
|| (rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX)
|| (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED));
ut_a((rw_lock_get_waiters(lock) == 0)
|| (rw_lock_get_waiters(lock) == 1));
ut_a((lock->writer != RW_LOCK_EX) || (lock->writer_count > 0));
mutex_exit(rw_lock_get_mutex(lock));
ut_a(waiters == 0 || waiters == 1);
ut_a(lock_word > -X_LOCK_DECR ||(-lock_word) % X_LOCK_DECR == 0);
return(TRUE);
}
......@@ -253,18 +320,15 @@ rw_lock_s_lock_spin(
ulint line) /* in: line where requested */
{
ulint index; /* index of the reserved wait cell */
ulint i; /* spin round count */
ulint i = 0; /* spin round count */
ut_ad(rw_lock_validate(lock));
rw_s_spin_wait_count++; /* Count calls to this function */
lock_loop:
rw_s_spin_wait_count++;
/* Spin waiting for the writer field to become free */
i = 0;
while (rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED
&& i < SYNC_SPIN_ROUNDS) {
while (i < SYNC_SPIN_ROUNDS && lock->lock_word <= 0) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
}
......@@ -285,28 +349,32 @@ lock_loop:
lock->cfile_name, (ulong) lock->cline, (ulong) i);
}
mutex_enter(rw_lock_get_mutex(lock));
/* We try once again to obtain the lock */
if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) {
mutex_exit(rw_lock_get_mutex(lock));
rw_s_spin_round_count += i;
return; /* Success */
} else {
/* If we get here, locking did not succeed, we may
suspend the thread to wait in the wait array */
rw_s_system_call_count++;
if (i < SYNC_SPIN_ROUNDS) {
goto lock_loop;
}
rw_s_spin_round_count += i;
sync_array_reserve_cell(sync_primary_wait_array,
lock, RW_LOCK_SHARED,
file_name, line,
&index);
/* Set waiters before checking lock_word to ensure wake-up
signal is sent. This may lead to some unnecessary signals. */
rw_lock_set_waiters(lock, 1);
mutex_exit(rw_lock_get_mutex(lock));
if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) {
sync_array_free_cell(sync_primary_wait_array, index);
return; /* Success */
}
if (srv_print_latch_waits) {
fprintf(stderr,
......@@ -317,11 +385,13 @@ lock_loop:
(ulong) lock->cline);
}
rw_s_system_call_count++;
/* these stats may not be accurate */
lock->count_os_wait++;
rw_s_os_wait_count++;
sync_array_wait_event(sync_primary_wait_array, index);
i = 0;
goto lock_loop;
}
}
......@@ -343,113 +413,149 @@ rw_lock_x_lock_move_ownership(
{
ut_ad(rw_lock_is_locked(lock, RW_LOCK_EX));
#ifdef HAVE_GCC_ATOMIC_BUILTINS
os_thread_id_t local_writer_thread = lock->writer_thread;
os_thread_id_t new_writer_thread = os_thread_get_curr_id();
while (TRUE) {
if (local_writer_thread != -1) {
if(os_compare_and_swap(
&(lock->writer_thread),
local_writer_thread,
new_writer_thread)) {
break;
}
}
local_writer_thread = lock->writer_thread;
}
lock->pass = 0;
#else /* HAVE_GCC_ATOMIC_BUILTINS */
mutex_enter(&(lock->mutex));
lock->writer_thread = os_thread_get_curr_id();
lock->pass = 0;
mutex_exit(&(lock->mutex));
#endif /* HAVE_GCC_ATOMIC_BUILTINS */
}
/**********************************************************************
Low-level function for acquiring an exclusive lock. */
Function for the next writer to call. Waits for readers to exit.
The caller must have already decremented lock_word by X_LOCK_DECR.*/
UNIV_INLINE
ulint
rw_lock_x_lock_low(
/*===============*/
/* out: RW_LOCK_NOT_LOCKED if did
not succeed, RW_LOCK_EX if success,
RW_LOCK_WAIT_EX, if got wait reservation */
void
rw_lock_x_lock_wait(
/*================*/
rw_lock_t* lock, /* in: pointer to rw-lock */
#ifdef UNIV_SYNC_DEBUG
ulint pass, /* in: pass value; != 0, if the lock will
be passed to another thread to unlock */
#endif
const char* file_name,/* in: file name where lock requested */
ulint line) /* in: line where requested */
{
ut_ad(mutex_own(rw_lock_get_mutex(lock)));
if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
ulint index;
ulint i = 0;
if (rw_lock_get_reader_count(lock) == 0) {
ut_ad(lock->lock_word <= 0);
rw_lock_set_writer(lock, RW_LOCK_EX);
lock->writer_thread = os_thread_get_curr_id();
lock->writer_count++;
lock->pass = pass;
while (lock->lock_word < 0) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
}
if(i < SYNC_SPIN_ROUNDS) {
i++;
continue;
}
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, pass, RW_LOCK_EX,
file_name, line);
#endif
lock->last_x_file_name = file_name;
lock->last_x_line = (unsigned int) line;
/* If there is still a reader, then go to sleep.*/
rw_x_spin_round_count += i;
i = 0;
sync_array_reserve_cell(sync_primary_wait_array,
lock,
RW_LOCK_WAIT_EX,
file_name, line,
&index);
/* Check lock_word to ensure wake-up isn't missed.*/
if(lock->lock_word < 0) {
/* Locking succeeded, we may return */
return(RW_LOCK_EX);
} else {
/* There are readers, we have to wait */
rw_lock_set_writer(lock, RW_LOCK_WAIT_EX);
lock->writer_thread = os_thread_get_curr_id();
lock->pass = pass;
lock->writer_is_wait_ex = TRUE;
/* these stats may not be accurate */
lock->count_os_wait++;
rw_x_os_wait_count++;
/* Add debug info as it is needed to detect possible
deadlock. We must add info for WAIT_EX thread for
deadlock detection to work properly. */
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, pass, RW_LOCK_WAIT_EX,
file_name, line);
#endif
return(RW_LOCK_WAIT_EX);
}
} else if ((rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX)
&& os_thread_eq(lock->writer_thread,
os_thread_get_curr_id())) {
if (rw_lock_get_reader_count(lock) == 0) {
rw_lock_set_writer(lock, RW_LOCK_EX);
lock->writer_count++;
lock->pass = pass;
lock->writer_is_wait_ex = FALSE;
sync_array_wait_event(sync_primary_wait_array,
index);
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, pass, RW_LOCK_WAIT_EX);
rw_lock_add_debug_info(lock, pass, RW_LOCK_EX,
file_name, line);
rw_lock_remove_debug_info(lock, pass,
RW_LOCK_WAIT_EX);
#endif
lock->last_x_file_name = file_name;
lock->last_x_line = (unsigned int) line;
/* Locking succeeded, we may return */
return(RW_LOCK_EX);
/* It is possible to wake when lock_word < 0.
We must pass the while-loop check to proceed.*/
} else {
sync_array_free_cell(sync_primary_wait_array,
index);
}
}
rw_x_spin_round_count += i;
}
return(RW_LOCK_WAIT_EX);
} else if ((rw_lock_get_writer(lock) == RW_LOCK_EX)
&& os_thread_eq(lock->writer_thread,
os_thread_get_curr_id())
&& (lock->pass == 0)
&& (pass == 0)) {
/**********************************************************************
Low-level function for acquiring an exclusive lock. */
UNIV_INLINE
ibool
rw_lock_x_lock_low(
/*===============*/
/* out: RW_LOCK_NOT_LOCKED if did
not succeed, RW_LOCK_EX if success. */
rw_lock_t* lock, /* in: pointer to rw-lock */
ulint pass, /* in: pass value; != 0, if the lock will
be passed to another thread to unlock */
const char* file_name,/* in: file name where lock requested */
ulint line) /* in: line where requested */
{
os_thread_id_t curr_thread = os_thread_get_curr_id();
ut_ad(curr_thread != -1); /* We use -1 as the unlocked value. */
lock->writer_count++;
if(rw_lock_lock_word_decr(lock, X_LOCK_DECR)) {
ut_ad(lock->writer_thread == -1);
/* Decrement occurred: we are writer or next-writer. */
lock->writer_thread = curr_thread;
lock->pass = pass;
rw_lock_x_lock_wait(lock,
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, pass, RW_LOCK_EX, file_name,
line);
pass,
#endif
file_name, line);
lock->last_x_file_name = file_name;
lock->last_x_line = (unsigned int) line;
/* Locking succeeded, we may return */
return(RW_LOCK_EX);
} else {
/* Decrement failed: relock or failed lock */
/* Must verify pass first: otherwise another thread can
call move_ownership suddenly allowing recursive locks.
and after we have verified our thread_id matches
(though move_ownership has since changed it).*/
if(!pass && !(lock->pass) &&
os_thread_eq(lock->writer_thread, curr_thread)) {
/* Relock */
lock->lock_word -= X_LOCK_DECR;
} else {
/* Another thread locked before us */
return(FALSE);
}
}
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, pass, RW_LOCK_EX,
file_name, line);
#endif
lock->last_x_file_name = file_name;
lock->last_x_line = (unsigned int) line;
/* Locking did not succeed */
return(RW_LOCK_NOT_LOCKED);
return(TRUE);
}
/**********************************************************************
......@@ -472,47 +578,30 @@ rw_lock_x_lock_func(
ulint line) /* in: line where requested */
{
ulint index; /* index of the reserved wait cell */
ulint state; /* lock state acquired */
ulint i; /* spin round count */
ibool spinning = FALSE;
ut_ad(rw_lock_validate(lock));
lock_loop:
/* Acquire the mutex protecting the rw-lock fields */
mutex_enter_fast(&(lock->mutex));
state = rw_lock_x_lock_low(lock, pass, file_name, line);
i = 0;
mutex_exit(&(lock->mutex));
lock_loop:
if (state == RW_LOCK_EX) {
if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
rw_x_spin_round_count += i;
return; /* Locking succeeded */
} else if (state == RW_LOCK_NOT_LOCKED) {
/* Spin waiting for the writer field to become free */
i = 0;
while (rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED
&& i < SYNC_SPIN_ROUNDS) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0,
srv_spin_wait_delay));
}
} else {
i++;
}
if (i == SYNC_SPIN_ROUNDS) {
os_thread_yield();
if (!spinning) {
spinning = TRUE;
rw_x_spin_wait_count++;
}
} else if (state == RW_LOCK_WAIT_EX) {
/* Spin waiting for the reader count field to become zero */
i = 0;
while (rw_lock_get_reader_count(lock) != 0
&& i < SYNC_SPIN_ROUNDS) {
/* Spin waiting for the lock_word to become free */
while (i < SYNC_SPIN_ROUNDS
&& lock->lock_word <= 0) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0,
srv_spin_wait_delay));
......@@ -522,12 +611,13 @@ lock_loop:
}
if (i == SYNC_SPIN_ROUNDS) {
os_thread_yield();
} else {
goto lock_loop;
}
} else {
i = 0; /* Eliminate a compiler warning */
ut_error;
}
rw_x_spin_round_count += i;
if (srv_print_latch_waits) {
fprintf(stderr,
"Thread %lu spin wait rw-x-lock at %p"
......@@ -536,39 +626,20 @@ lock_loop:
lock->cfile_name, (ulong) lock->cline, (ulong) i);
}
rw_x_spin_wait_count++;
/* We try once again to obtain the lock. Acquire the mutex protecting
the rw-lock fields */
mutex_enter(rw_lock_get_mutex(lock));
state = rw_lock_x_lock_low(lock, pass, file_name, line);
if (state == RW_LOCK_EX) {
mutex_exit(rw_lock_get_mutex(lock));
return; /* Locking succeeded */
}
rw_x_system_call_count++;
sync_array_reserve_cell(sync_primary_wait_array,
lock,
#ifdef __WIN__
/* On windows RW_LOCK_WAIT_EX signifies
that this thread should wait on the
special wait_ex_event. */
(state == RW_LOCK_WAIT_EX)
? RW_LOCK_WAIT_EX :
#endif
RW_LOCK_EX,
file_name, line,
&index);
/* Waiters must be set before checking lock_word, to ensure signal
is sent. This could lead to a few unnecessary wake-up signals. */
rw_lock_set_waiters(lock, 1);
mutex_exit(rw_lock_get_mutex(lock));
if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
sync_array_free_cell(sync_primary_wait_array, index);
return; /* Locking succeeded */
}
if (srv_print_latch_waits) {
fprintf(stderr,
......@@ -578,11 +649,13 @@ lock_loop:
lock->cfile_name, (ulong) lock->cline);
}
rw_x_system_call_count++;
/* these stats may not be accurate */
lock->count_os_wait++;
rw_x_os_wait_count++;
sync_array_wait_event(sync_primary_wait_array, index);
i = 0;
goto lock_loop;
}
......@@ -730,7 +803,7 @@ rw_lock_own(
ut_ad(lock);
ut_ad(rw_lock_validate(lock));
mutex_enter(&(lock->mutex));
rw_lock_debug_mutex_enter();
info = UT_LIST_GET_FIRST(lock->debug_list);
......@@ -740,7 +813,7 @@ rw_lock_own(
&& (info->pass == 0)
&& (info->lock_type == lock_type)) {
mutex_exit(&(lock->mutex));
rw_lock_debug_mutex_exit();
/* Found! */
return(TRUE);
......@@ -748,7 +821,7 @@ rw_lock_own(
info = UT_LIST_GET_NEXT(list, info);
}
mutex_exit(&(lock->mutex));
rw_lock_debug_mutex_exit();
return(FALSE);
}
......@@ -770,22 +843,18 @@ rw_lock_is_locked(
ut_ad(lock);
ut_ad(rw_lock_validate(lock));
mutex_enter(&(lock->mutex));
if (lock_type == RW_LOCK_SHARED) {
if (lock->reader_count > 0) {
if (rw_lock_get_reader_count(lock) > 0) {
ret = TRUE;
}
} else if (lock_type == RW_LOCK_EX) {
if (lock->writer == RW_LOCK_EX) {
if (rw_lock_get_writer(lock) == RW_LOCK_EX) {
ret = TRUE;
}
} else {
ut_error;
}
mutex_exit(&(lock->mutex));
return(ret);
}
......@@ -814,11 +883,10 @@ rw_lock_list_print_info(
count++;
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(&(lock->mutex));
if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
|| (rw_lock_get_reader_count(lock) != 0)
|| (rw_lock_get_waiters(lock) != 0)) {
#endif
if (lock->lock_word != X_LOCK_DECR) {
fprintf(file, "RW-LOCK: %p ", (void*) lock);
......@@ -834,8 +902,10 @@ rw_lock_list_print_info(
info = UT_LIST_GET_NEXT(list, info);
}
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(&(lock->mutex));
#endif
lock = UT_LIST_GET_NEXT(list, lock);
}
......@@ -858,9 +928,10 @@ rw_lock_print(
"RW-LATCH INFO\n"
"RW-LATCH: %p ", (void*) lock);
if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
|| (rw_lock_get_reader_count(lock) != 0)
|| (rw_lock_get_waiters(lock) != 0)) {
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(&(lock->mutex));
#endif
if (lock->lock_word != X_LOCK_DECR) {
if (rw_lock_get_waiters(lock)) {
fputs(" Waiters for the lock exist\n", stderr);
......@@ -874,6 +945,9 @@ rw_lock_print(
info = UT_LIST_GET_NEXT(list, info);
}
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(&(lock->mutex));
#endif
}
/*************************************************************************
......@@ -922,14 +996,11 @@ rw_lock_n_locked(void)
lock = UT_LIST_GET_FIRST(rw_lock_list);
while (lock != NULL) {
mutex_enter(rw_lock_get_mutex(lock));
if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
|| (rw_lock_get_reader_count(lock) != 0)) {
if (lock->lock_word != X_LOCK_DECR) {
count++;
}
mutex_exit(rw_lock_get_mutex(lock));
lock = UT_LIST_GET_NEXT(list, lock);
}
......
......@@ -138,18 +138,13 @@ Therefore, this thread is guaranteed to catch the os_set_event()
signalled unconditionally at the release of the lock.
Q.E.D. */
/* The number of system calls made in this module. Intended for performance
monitoring. */
ulint mutex_system_call_count = 0;
/* Number of spin waits on mutexes: for performance monitoring */
/* round=one iteration of a spin loop */
ulint mutex_spin_round_count = 0;
ulint mutex_spin_wait_count = 0;
ulint mutex_os_wait_count = 0;
ulint mutex_exit_count = 0;
ib_longlong mutex_spin_round_count = 0;
ib_longlong mutex_spin_wait_count = 0;
ib_longlong mutex_os_wait_count = 0;
ib_longlong mutex_exit_count = 0;
/* The global array of wait cells for implementation of the database's own
mutexes and read-write locks */
......@@ -243,6 +238,8 @@ mutex_create_func(
{
#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
mutex_reset_lock_word(mutex);
#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
mutex_reset_lock_word(mutex);
#else
os_fast_mutex_init(&(mutex->os_fast_mutex));
mutex->lock_word = 0;
......@@ -333,7 +330,9 @@ mutex_free(
os_event_free(mutex->event);
#if !defined(_WIN32) || !defined(UNIV_CAN_USE_X86_ASSEMBLER)
#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
#else
os_fast_mutex_free(&(mutex->os_fast_mutex));
#endif
/* If we free the mutex protecting the mutex list (freeing is
......@@ -450,6 +449,12 @@ mutex_spin_wait(
#endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */
ut_ad(mutex);
/* This update is not thread safe, but we don't mind if the count
isn't exact. Moved out of ifdef that follows because we are willing
to sacrifice the cost of counting this as the data is valuable.
Count the number of calls to mutex_spin_wait. */
mutex_spin_wait_count++;
mutex_loop:
i = 0;
......@@ -462,7 +467,6 @@ mutex_loop:
spin_loop:
#if defined UNIV_DEBUG && !defined UNIV_HOTBACKUP
mutex_spin_wait_count++;
mutex->count_spin_loop++;
#endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */
......@@ -527,8 +531,6 @@ spin_loop:
sync_array_reserve_cell(sync_primary_wait_array, mutex,
SYNC_MUTEX, file_name, line, &index);
mutex_system_call_count++;
/* The memory order of the array reservation and the change in the
waiters field is important: when we suspend a thread, we first
reserve the cell and then set waiters field to 1. When threads are
......@@ -575,7 +577,6 @@ spin_loop:
mutex->cfile_name, (ulong) mutex->cline, (ulong) i);
#endif
mutex_system_call_count++;
mutex_os_wait_count++;
#ifndef UNIV_HOTBACKUP
......@@ -1377,21 +1378,31 @@ sync_print_wait_info(
FILE* file) /* in: file where to print */
{
#ifdef UNIV_SYNC_DEBUG
fprintf(file, "Mutex exits %lu, rws exits %lu, rwx exits %lu\n",
fprintf(file, "Mutex exits %llu, rws exits %llu, rwx exits %llu\n",
mutex_exit_count, rw_s_exit_count, rw_x_exit_count);
#endif
fprintf(file,
"Mutex spin waits %lu, rounds %lu, OS waits %lu\n"
"RW-shared spins %lu, OS waits %lu;"
" RW-excl spins %lu, OS waits %lu\n",
(ulong) mutex_spin_wait_count,
(ulong) mutex_spin_round_count,
(ulong) mutex_os_wait_count,
(ulong) rw_s_spin_wait_count,
(ulong) rw_s_os_wait_count,
(ulong) rw_x_spin_wait_count,
(ulong) rw_x_os_wait_count);
"Mutex spin waits %llu, rounds %llu, OS waits %llu\n"
"RW-shared spins %llu, OS waits %llu;"
" RW-excl spins %llu, OS waits %llu\n",
mutex_spin_wait_count,
mutex_spin_round_count,
mutex_os_wait_count,
rw_s_spin_wait_count,
rw_s_os_wait_count,
rw_x_spin_wait_count,
rw_x_os_wait_count);
fprintf(file,
"Spin rounds per wait: %.2f mutex, %.2f RW-shared, "
"%.2f RW-excl\n",
(double) mutex_spin_round_count /
(mutex_spin_wait_count ? mutex_spin_wait_count : 1),
(double) rw_s_spin_round_count /
(rw_s_spin_wait_count ? rw_s_spin_wait_count : 1),
(double) rw_x_spin_round_count /
(rw_x_spin_wait_count ? rw_x_spin_wait_count : 1));
}
/***********************************************************************
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment