Commit 84f733af authored by Vadim Tkachenko's avatar Vadim Tkachenko

rw_locks applied

parent d6c7789c
......@@ -328,7 +328,17 @@ rw_lock_get_x_lock_count(
Accessor functions for rw lock. */
UNIV_INLINE
ulint
rw_lock_get_waiters(
rw_lock_get_s_waiters(
/*==================*/
rw_lock_t* lock);
UNIV_INLINE
ulint
rw_lock_get_x_waiters(
/*==================*/
rw_lock_t* lock);
UNIV_INLINE
ulint
rw_lock_get_wx_waiters(
/*================*/
rw_lock_t* lock);
UNIV_INLINE
......@@ -412,6 +422,11 @@ rw_lock_debug_print(
rw_lock_debug_t* info); /* in: debug struct */
#endif /* UNIV_SYNC_DEBUG */
#ifdef HAVE_GCC_ATOMIC_BUILTINS
/* This value means NOT_LOCKED */
#define RW_LOCK_BIAS 0x00100000
#endif
/* NOTE! The structure appears here only for the compiler to know its size.
Do not use its fields directly! The structure used in the spin lock
implementation of a read-write lock. Several threads may have a shared lock
......@@ -421,9 +436,9 @@ blocked by readers, a writer may queue for the lock by setting the writer
field. Then no new readers are allowed in. */
struct rw_lock_struct {
os_event_t event; /* Used by sync0arr.c for thread queueing */
#ifdef __WIN__
/* Used by sync0arr.c for thread queueing */
os_event_t s_event; /* Used for s_lock */
os_event_t x_event; /* Used for x_lock */
os_event_t wait_ex_event; /* This windows specific event is
used by the thread which has set the
lock state to RW_LOCK_WAIT_EX. The
......@@ -431,30 +446,34 @@ struct rw_lock_struct {
thread will be the next one to proceed
once the current the event gets
signalled. See LEMMA 2 in sync0sync.c */
#ifdef HAVE_GCC_ATOMIC_BUILTINS
volatile lint lock_word; /* Used by using atomic builtin */
#endif
ulint reader_count; /* Number of readers who have locked this
volatile ulint reader_count; /* Number of readers who have locked this
lock in the shared mode */
ulint writer; /* This field is set to RW_LOCK_EX if there
volatile ulint writer; /* This field is set to RW_LOCK_EX if there
is a writer owning the lock (in exclusive
mode), RW_LOCK_WAIT_EX if a writer is
queueing for the lock, and
RW_LOCK_NOT_LOCKED, otherwise. */
os_thread_id_t writer_thread;
volatile os_thread_id_t writer_thread;
/* Thread id of a possible writer thread */
ulint writer_count; /* Number of times the same thread has
volatile ulint writer_count; /* Number of times the same thread has
recursively locked the lock in the exclusive
mode */
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_t mutex; /* The mutex protecting rw_lock_struct */
#endif
ulint pass; /* Default value 0. This is set to some
value != 0 given by the caller of an x-lock
operation, if the x-lock is to be passed to
another thread to unlock (which happens in
asynchronous i/o). */
ulint waiters; /* This ulint is set to 1 if there are
waiters (readers or writers) in the global
wait array, waiting for this rw_lock.
Otherwise, == 0. */
volatile ulint s_waiters; /* 1: there are waiters (s_lock) */
volatile ulint x_waiters; /* 1: there are waiters (x_lock) */
volatile ulint wait_ex_waiters; /* 1: there are waiters (wait_ex) */
UT_LIST_NODE_T(rw_lock_t) list;
/* All allocated rw locks are put into a
list */
......@@ -467,7 +486,7 @@ struct rw_lock_struct {
const char* cfile_name;/* File name where lock created */
const char* last_s_file_name;/* File name where last s-locked */
const char* last_x_file_name;/* File name where last x-locked */
ibool writer_is_wait_ex;
volatile ibool writer_is_wait_ex;
/* This is TRUE if the writer field is
RW_LOCK_WAIT_EX; this field is located far
from the memory update hotspot fields which
......
......@@ -47,20 +47,52 @@ rw_lock_remove_debug_info(
Accessor functions for rw lock. */
UNIV_INLINE
ulint
rw_lock_get_waiters(
rw_lock_get_s_waiters(
/*================*/
rw_lock_t* lock)
{
return(lock->waiters);
return(lock->s_waiters);
}
UNIV_INLINE
ulint
rw_lock_get_x_waiters(
/*================*/
rw_lock_t* lock)
{
return(lock->x_waiters);
}
UNIV_INLINE
ulint
rw_lock_get_wx_waiters(
/*================*/
rw_lock_t* lock)
{
return(lock->wait_ex_waiters);
}
UNIV_INLINE
void
rw_lock_set_s_waiters(
rw_lock_t* lock,
ulint flag)
{
lock->s_waiters = flag;
}
UNIV_INLINE
void
rw_lock_set_waiters(
rw_lock_set_x_waiters(
rw_lock_t* lock,
ulint flag)
{
lock->x_waiters = flag;
}
UNIV_INLINE
void
rw_lock_set_wx_waiters(
/*================*/
rw_lock_t* lock,
ulint flag)
{
lock->waiters = flag;
lock->wait_ex_waiters = flag;
}
UNIV_INLINE
ulint
......@@ -68,7 +100,19 @@ rw_lock_get_writer(
/*===============*/
rw_lock_t* lock)
{
#ifdef HAVE_GCC_ATOMIC_BUILTINS
if (lock->writer == RW_LOCK_NOT_LOCKED) {
return(RW_LOCK_NOT_LOCKED);
}
if (lock->writer_is_wait_ex) {
return(RW_LOCK_WAIT_EX);
} else {
return(RW_LOCK_EX);
}
#else
return(lock->writer);
#endif
}
UNIV_INLINE
void
......@@ -96,6 +140,7 @@ rw_lock_set_reader_count(
{
lock->reader_count = count;
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
UNIV_INLINE
mutex_t*
rw_lock_get_mutex(
......@@ -104,6 +149,7 @@ rw_lock_get_mutex(
{
return(&(lock->mutex));
}
#endif
/**********************************************************************
Returns the value of writer_count for the lock. Does not reserve the lock
......@@ -133,13 +179,27 @@ rw_lock_s_lock_low(
const char* file_name, /* in: file name where lock requested */
ulint line) /* in: line where requested */
{
#ifndef HAVE_GCC_ATOMIC_BUILTINS
ut_ad(mutex_own(rw_lock_get_mutex(lock)));
#endif
/* Check if the writer field is free */
#ifdef HAVE_GCC_ATOMIC_BUILTINS
if (UNIV_LIKELY(rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED)) {
/* try s-lock */
if(__sync_sub_and_fetch(&(lock->lock_word),1) <= 0) {
/* fail */
__sync_fetch_and_add(&(lock->lock_word),1);
return(FALSE); /* locking did not succeed */
}
/* success */
__sync_fetch_and_add(&(lock->reader_count),1);
#else
if (UNIV_LIKELY(lock->writer == RW_LOCK_NOT_LOCKED)) {
/* Set the shared lock by incrementing the reader count */
lock->reader_count++;
#endif
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, pass, RW_LOCK_SHARED, file_name,
......@@ -166,11 +226,15 @@ rw_lock_s_lock_direct(
const char* file_name, /* in: file name where requested */
ulint line) /* in: line where lock requested */
{
ut_ad(lock->writer == RW_LOCK_NOT_LOCKED);
ut_ad(rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED);
ut_ad(rw_lock_get_reader_count(lock) == 0);
/* Set the shared lock by incrementing the reader count */
#ifdef HAVE_GCC_ATOMIC_BUILTINS
__sync_fetch_and_add(&(lock->reader_count),1);
#else
lock->reader_count++;
#endif
lock->last_s_file_name = file_name;
lock->last_s_line = line;
......@@ -198,7 +262,11 @@ rw_lock_x_lock_direct(
rw_lock_set_writer(lock, RW_LOCK_EX);
lock->writer_thread = os_thread_get_curr_id();
#ifdef HAVE_GCC_ATOMIC_BUILTINS
__sync_fetch_and_add(&(lock->writer_count),1);
#else
lock->writer_count++;
#endif
lock->pass = 0;
lock->last_x_file_name = file_name;
......@@ -240,15 +308,21 @@ rw_lock_s_lock_func(
ut_ad(!rw_lock_own(lock, RW_LOCK_SHARED)); /* see NOTE above */
#endif /* UNIV_SYNC_DEBUG */
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(rw_lock_get_mutex(lock));
#endif
if (UNIV_LIKELY(rw_lock_s_lock_low(lock, pass, file_name, line))) {
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(rw_lock_get_mutex(lock));
#endif
return; /* Success */
} else {
/* Did not succeed, try spin wait */
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(rw_lock_get_mutex(lock));
#endif
rw_lock_s_lock_spin(lock, pass, file_name, line);
......@@ -271,11 +345,23 @@ rw_lock_s_lock_func_nowait(
{
ibool success = FALSE;
#ifdef HAVE_GCC_ATOMIC_BUILTINS
if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
/* try s-lock */
if(__sync_sub_and_fetch(&(lock->lock_word),1) <= 0) {
/* fail */
__sync_fetch_and_add(&(lock->lock_word),1);
return(FALSE); /* locking did not succeed */
}
/* success */
__sync_fetch_and_add(&(lock->reader_count),1);
#else
mutex_enter(rw_lock_get_mutex(lock));
if (lock->writer == RW_LOCK_NOT_LOCKED) {
/* Set the shared lock by incrementing the reader count */
lock->reader_count++;
#endif
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, 0, RW_LOCK_SHARED, file_name,
......@@ -288,7 +374,9 @@ rw_lock_s_lock_func_nowait(
success = TRUE;
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(rw_lock_get_mutex(lock));
#endif
return(success);
}
......@@ -308,6 +396,55 @@ rw_lock_x_lock_func_nowait(
{
ibool success = FALSE;
os_thread_id_t curr_thread = os_thread_get_curr_id();
#ifdef HAVE_GCC_ATOMIC_BUILTINS
if ((lock->lock_word == RW_LOCK_BIAS)
&& rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
/* try x-lock */
if(__sync_sub_and_fetch(&(lock->lock_word),
RW_LOCK_BIAS) == 0) {
/* success */
/* try to lock writer */
if(__sync_lock_test_and_set(&(lock->writer),RW_LOCK_EX)
== RW_LOCK_NOT_LOCKED) {
/* success */
lock->writer_thread = curr_thread;
lock->pass = 0;
lock->writer_is_wait_ex = FALSE;
/* next function may work as memory barrier */
relock:
__sync_fetch_and_add(&(lock->writer_count),1);
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, 0, RW_LOCK_EX, file_name, line);
#endif
lock->last_x_file_name = file_name;
lock->last_x_line = line;
ut_ad(rw_lock_validate(lock));
return(TRUE);
} else {
/* x-unlock */
__sync_fetch_and_add(&(lock->lock_word),
RW_LOCK_BIAS);
}
} else {
/* fail (x-lock) */
__sync_fetch_and_add(&(lock->lock_word),RW_LOCK_BIAS);
}
}
if (lock->pass == 0
&& os_thread_eq(lock->writer_thread, curr_thread)
&& rw_lock_get_writer(lock) == RW_LOCK_EX) {
goto relock;
}
ut_ad(rw_lock_validate(lock));
return(FALSE);
#else
mutex_enter(rw_lock_get_mutex(lock));
if (UNIV_UNLIKELY(rw_lock_get_reader_count(lock) != 0)) {
......@@ -338,6 +475,7 @@ relock:
ut_ad(rw_lock_validate(lock));
return(success);
#endif
}
/**********************************************************************
......@@ -353,16 +491,33 @@ rw_lock_s_unlock_func(
#endif
)
{
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_t* mutex = &(lock->mutex);
ibool sg = FALSE;
#endif
ibool x_sg = FALSE;
ibool wx_sg = FALSE;
#ifdef HAVE_GCC_ATOMIC_BUILTINS
ibool last = FALSE;
#endif
#ifndef HAVE_GCC_ATOMIC_BUILTINS
/* Acquire the mutex protecting the rw-lock fields */
mutex_enter(mutex);
#endif
/* Reset the shared lock by decrementing the reader count */
ut_a(lock->reader_count > 0);
#ifdef HAVE_GCC_ATOMIC_BUILTINS
/* unlock lock_word */
__sync_fetch_and_add(&(lock->lock_word),1);
if(__sync_sub_and_fetch(&(lock->reader_count),1) == 0) {
last = TRUE;
}
#else
lock->reader_count--;
#endif
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, pass, RW_LOCK_SHARED);
......@@ -371,20 +526,36 @@ rw_lock_s_unlock_func(
/* If there may be waiters and this was the last s-lock,
signal the object */
if (UNIV_UNLIKELY(lock->waiters)
#ifdef HAVE_GCC_ATOMIC_BUILTINS
if (UNIV_UNLIKELY(last && lock->wait_ex_waiters)) {
#else
if (UNIV_UNLIKELY(lock->wait_ex_waiters)
&& lock->reader_count == 0) {
sg = TRUE;
#endif
wx_sg = TRUE;
rw_lock_set_waiters(lock, 0);
rw_lock_set_wx_waiters(lock, 0);
}
#ifdef HAVE_GCC_ATOMIC_BUILTINS
else if (UNIV_UNLIKELY(last && lock->x_waiters)) {
#else
else if (UNIV_UNLIKELY(lock->x_waiters)
&& lock->reader_count == 0) {
#endif
x_sg = TRUE;
rw_lock_set_x_waiters(lock, 0);
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(mutex);
#endif
if (UNIV_UNLIKELY(sg)) {
#ifdef __WIN__
if (UNIV_UNLIKELY(wx_sg)) {
os_event_set(lock->wait_ex_event);
#endif
os_event_set(lock->event);
sync_array_object_signalled(sync_primary_wait_array);
} else if (UNIV_UNLIKELY(x_sg)) {
os_event_set(lock->x_event);
sync_array_object_signalled(sync_primary_wait_array);
}
......@@ -408,13 +579,19 @@ rw_lock_s_unlock_direct(
ut_ad(lock->reader_count > 0);
#ifdef HAVE_GCC_ATOMIC_BUILTINS
__sync_sub_and_fetch(&(lock->reader_count),1);
#else
lock->reader_count--;
#endif
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, 0, RW_LOCK_SHARED);
#endif
ut_ad(!lock->waiters);
ut_ad(!lock->s_waiters);
ut_ad(!lock->x_waiters);
ut_ad(!lock->wait_ex_waiters);
ut_ad(rw_lock_validate(lock));
#ifdef UNIV_SYNC_PERF_STAT
rw_s_exit_count++;
......@@ -434,41 +611,81 @@ rw_lock_x_unlock_func(
#endif
)
{
ibool sg = FALSE;
#ifdef HAVE_GCC_ATOMIC_BUILTINS
ibool last = FALSE;
#endif
ibool s_sg = FALSE;
ibool x_sg = FALSE;
#ifndef HAVE_GCC_ATOMIC_BUILTINS
/* Acquire the mutex protecting the rw-lock fields */
mutex_enter(&(lock->mutex));
#endif
/* Reset the exclusive lock if this thread no longer has an x-mode
lock */
ut_ad(lock->writer_count > 0);
#ifdef HAVE_GCC_ATOMIC_BUILTINS
if(__sync_sub_and_fetch(&(lock->writer_count),1) == 0) {
last = TRUE;
}
if (last) {
/* unlock lock_word */
__sync_fetch_and_add(&(lock->lock_word),RW_LOCK_BIAS);
/* FIXME: It is a value of bad manners for pthread.
But we shouldn't keep an ID of not-owner. */
lock->writer_thread = -1;
/* atomic operation may be safer about memory order. */
rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED);
__sync_synchronize();
}
#else
lock->writer_count--;
if (lock->writer_count == 0) {
rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED);
}
#endif
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, pass, RW_LOCK_EX);
#endif
/* If there may be waiters, signal the lock */
if (UNIV_UNLIKELY(lock->waiters)
&& lock->writer_count == 0) {
sg = TRUE;
rw_lock_set_waiters(lock, 0);
#ifdef HAVE_GCC_ATOMIC_BUILTINS
if (last) {
#else
if (lock->writer_count == 0) {
#endif
if(lock->s_waiters){
s_sg = TRUE;
rw_lock_set_s_waiters(lock, 0);
}
if(lock->x_waiters){
x_sg = TRUE;
rw_lock_set_x_waiters(lock, 0);
}
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(&(lock->mutex));
#endif
if (UNIV_UNLIKELY(sg)) {
if (UNIV_UNLIKELY(s_sg)) {
os_event_set(lock->s_event);
sync_array_object_signalled(sync_primary_wait_array);
}
if (UNIV_UNLIKELY(x_sg)) {
#ifdef __WIN__
/* I doubt the necessity of it. */
os_event_set(lock->wait_ex_event);
#endif
os_event_set(lock->event);
os_event_set(lock->x_event);
sync_array_object_signalled(sync_primary_wait_array);
}
......@@ -493,9 +710,13 @@ rw_lock_x_unlock_direct(
ut_ad(lock->writer_count > 0);
#ifdef HAVE_GCC_ATOMIC_BUILTINS
if(__sync_sub_and_fetch(&(lock->writer_count),1) == 0) {
#else
lock->writer_count--;
if (lock->writer_count == 0) {
#endif
rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED);
}
......@@ -503,7 +724,9 @@ rw_lock_x_unlock_direct(
rw_lock_remove_debug_info(lock, 0, RW_LOCK_EX);
#endif
ut_ad(!lock->waiters);
ut_ad(!lock->s_waiters);
ut_ad(!lock->x_waiters);
ut_ad(!lock->wait_ex_waiters);
ut_ad(rw_lock_validate(lock));
#ifdef UNIV_SYNC_PERF_STAT
......
......@@ -307,13 +307,13 @@ sync_cell_event_reset(
{
if (type == SYNC_MUTEX) {
return(os_event_reset(((mutex_t *) object)->event));
#ifdef __WIN__
} else if (type == RW_LOCK_WAIT_EX) {
return(os_event_reset(
((rw_lock_t *) object)->wait_ex_event));
#endif
} else {
return(os_event_reset(((rw_lock_t *) object)->event));
} else if (type == RW_LOCK_SHARED) {
return(os_event_reset(((rw_lock_t *) object)->s_event));
} else { /* RW_LOCK_EX */
return(os_event_reset(((rw_lock_t *) object)->x_event));
}
}
......@@ -413,15 +413,12 @@ sync_array_wait_event(
if (cell->request_type == SYNC_MUTEX) {
event = ((mutex_t*) cell->wait_object)->event;
#ifdef __WIN__
/* On windows if the thread about to wait is the one which
has set the state of the rw_lock to RW_LOCK_WAIT_EX, then
it waits on a special event i.e.: wait_ex_event. */
} else if (cell->request_type == RW_LOCK_WAIT_EX) {
event = ((rw_lock_t*) cell->wait_object)->wait_ex_event;
#endif
} else if (cell->request_type == RW_LOCK_SHARED) {
event = ((rw_lock_t*) cell->wait_object)->s_event;
} else {
event = ((rw_lock_t*) cell->wait_object)->event;
event = ((rw_lock_t*) cell->wait_object)->x_event;
}
cell->waiting = TRUE;
......@@ -462,6 +459,7 @@ sync_array_cell_print(
mutex_t* mutex;
rw_lock_t* rwlock;
ulint type;
ulint writer;
type = cell->request_type;
......@@ -491,12 +489,10 @@ sync_array_cell_print(
(ulong) mutex->waiters);
} else if (type == RW_LOCK_EX
#ifdef __WIN__
|| type == RW_LOCK_WAIT_EX
#endif
|| type == RW_LOCK_SHARED) {
fputs(type == RW_LOCK_EX ? "X-lock on" : "S-lock on", file);
fputs(type == RW_LOCK_SHARED ? "S-lock on" : "X-lock on", file);
rwlock = cell->old_wait_rw_lock;
......@@ -504,22 +500,24 @@ sync_array_cell_print(
" RW-latch at %p created in file %s line %lu\n",
(void*) rwlock, rwlock->cfile_name,
(ulong) rwlock->cline);
if (rwlock->writer != RW_LOCK_NOT_LOCKED) {
writer = rw_lock_get_writer(rwlock);
if (writer != RW_LOCK_NOT_LOCKED) {
fprintf(file,
"a writer (thread id %lu) has"
" reserved it in mode %s",
(ulong) os_thread_pf(rwlock->writer_thread),
rwlock->writer == RW_LOCK_EX
writer == RW_LOCK_EX
? " exclusive\n"
: " wait exclusive\n");
}
fprintf(file,
"number of readers %lu, waiters flag %lu\n"
"number of readers %lu, s_waiters flag %lu, x_waiters flag %lu\n"
"Last time read locked in file %s line %lu\n"
"Last time write locked in file %s line %lu\n",
(ulong) rwlock->reader_count,
(ulong) rwlock->waiters,
(ulong) rwlock->s_waiters,
(ulong) (rwlock->x_waiters || rwlock->wait_ex_waiters),
rwlock->last_s_file_name,
(ulong) rwlock->last_s_line,
rwlock->last_x_file_name,
......@@ -844,11 +842,15 @@ sync_array_object_signalled(
/*========================*/
sync_array_t* arr) /* in: wait array */
{
#ifdef HAVE_GCC_ATOMIC_BUILTINS
__sync_fetch_and_add(&(arr->sg_count),1);
#else
sync_array_enter(arr);
arr->sg_count++;
sync_array_exit(arr);
#endif
}
/**************************************************************************
......@@ -889,19 +891,23 @@ sync_arr_wake_threads_if_sema_free(void)
mutex = cell->wait_object;
os_event_set(mutex->event);
#ifdef __WIN__
} else if (cell->request_type
== RW_LOCK_WAIT_EX) {
rw_lock_t* lock;
lock = cell->wait_object;
os_event_set(lock->wait_ex_event);
#endif
} else if (cell->request_type
== RW_LOCK_SHARED) {
rw_lock_t* lock;
lock = cell->wait_object;
os_event_set(lock->s_event);
} else {
rw_lock_t* lock;
lock = cell->wait_object;
os_event_set(lock->event);
os_event_set(lock->x_event);
}
}
}
......
......@@ -119,6 +119,7 @@ rw_lock_create_func(
/* If this is the very first time a synchronization object is
created, then the following call initializes the sync system. */
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_create(rw_lock_get_mutex(lock), SYNC_NO_ORDER_CHECK);
lock->mutex.cfile_name = cfile_name;
......@@ -128,8 +129,14 @@ rw_lock_create_func(
lock->mutex.cmutex_name = cmutex_name;
lock->mutex.mutex_type = 1;
#endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */
#endif /* !HAVE_GCC_ATOMIC_BUILTINS */
rw_lock_set_waiters(lock, 0);
#ifdef HAVE_GCC_ATOMIC_BUILTINS
lock->lock_word = RW_LOCK_BIAS;
#endif
rw_lock_set_s_waiters(lock, 0);
rw_lock_set_x_waiters(lock, 0);
rw_lock_set_wx_waiters(lock, 0);
rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED);
lock->writer_count = 0;
rw_lock_set_reader_count(lock, 0);
......@@ -151,11 +158,9 @@ rw_lock_create_func(
lock->last_x_file_name = "not yet reserved";
lock->last_s_line = 0;
lock->last_x_line = 0;
lock->event = os_event_create(NULL);
#ifdef __WIN__
lock->s_event = os_event_create(NULL);
lock->x_event = os_event_create(NULL);
lock->wait_ex_event = os_event_create(NULL);
#endif
mutex_enter(&rw_lock_list_mutex);
......@@ -181,19 +186,21 @@ rw_lock_free(
{
ut_ad(rw_lock_validate(lock));
ut_a(rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED);
ut_a(rw_lock_get_waiters(lock) == 0);
ut_a(rw_lock_get_s_waiters(lock) == 0);
ut_a(rw_lock_get_x_waiters(lock) == 0);
ut_a(rw_lock_get_wx_waiters(lock) == 0);
ut_a(rw_lock_get_reader_count(lock) == 0);
lock->magic_n = 0;
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_free(rw_lock_get_mutex(lock));
#endif
mutex_enter(&rw_lock_list_mutex);
os_event_free(lock->event);
#ifdef __WIN__
os_event_free(lock->s_event);
os_event_free(lock->x_event);
os_event_free(lock->wait_ex_event);
#endif
if (UT_LIST_GET_PREV(list, lock)) {
ut_a(UT_LIST_GET_PREV(list, lock)->magic_n == RW_LOCK_MAGIC_N);
......@@ -211,6 +218,8 @@ rw_lock_free(
/**********************************************************************
Checks that the rw-lock has been initialized and that there are no
simultaneous shared and exclusive locks. */
/* MEMO: If HAVE_GCC_ATOMIC_BUILTINS, we should use this function statically. */
UNIV_INTERN
ibool
rw_lock_validate(
......@@ -219,7 +228,9 @@ rw_lock_validate(
{
ut_a(lock);
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(rw_lock_get_mutex(lock));
#endif
ut_a(lock->magic_n == RW_LOCK_MAGIC_N);
ut_a((rw_lock_get_reader_count(lock) == 0)
......@@ -227,11 +238,17 @@ rw_lock_validate(
ut_a((rw_lock_get_writer(lock) == RW_LOCK_EX)
|| (rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX)
|| (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED));
ut_a((rw_lock_get_waiters(lock) == 0)
|| (rw_lock_get_waiters(lock) == 1));
ut_a((rw_lock_get_s_waiters(lock) == 0)
|| (rw_lock_get_s_waiters(lock) == 1));
ut_a((rw_lock_get_x_waiters(lock) == 0)
|| (rw_lock_get_x_waiters(lock) == 1));
ut_a((rw_lock_get_wx_waiters(lock) == 0)
|| (rw_lock_get_wx_waiters(lock) == 1));
ut_a((lock->writer != RW_LOCK_EX) || (lock->writer_count > 0));
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(rw_lock_get_mutex(lock));
#endif
return(TRUE);
}
......@@ -258,13 +275,14 @@ rw_lock_s_lock_spin(
ut_ad(rw_lock_validate(lock));
lock_loop:
i = 0;
spin_loop:
rw_s_spin_wait_count++;
/* Spin waiting for the writer field to become free */
i = 0;
while (rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED
&& i < SYNC_SPIN_ROUNDS) {
while (i < SYNC_SPIN_ROUNDS
&& rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
}
......@@ -285,15 +303,27 @@ rw_lock_s_lock_spin(
lock->cfile_name, (ulong) lock->cline, (ulong) i);
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(rw_lock_get_mutex(lock));
#endif
/* We try once again to obtain the lock */
if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) {
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(rw_lock_get_mutex(lock));
#endif
return; /* Success */
} else {
#ifdef HAVE_GCC_ATOMIC_BUILTINS
/* like sync0sync.c doing */
i++;
if (i < SYNC_SPIN_ROUNDS) {
goto spin_loop;
}
#endif
/* If we get here, locking did not succeed, we may
suspend the thread to wait in the wait array */
......@@ -304,9 +334,19 @@ rw_lock_s_lock_spin(
file_name, line,
&index);
rw_lock_set_waiters(lock, 1);
rw_lock_set_s_waiters(lock, 1);
#ifdef HAVE_GCC_ATOMIC_BUILTINS
/* like sync0sync.c doing */
for (i = 0; i < 4; i++) {
if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) {
sync_array_free_cell(sync_primary_wait_array, index);
return; /* Success */
}
}
#else
mutex_exit(rw_lock_get_mutex(lock));
#endif
if (srv_print_latch_waits) {
fprintf(stderr,
......@@ -343,13 +383,19 @@ rw_lock_x_lock_move_ownership(
{
ut_ad(rw_lock_is_locked(lock, RW_LOCK_EX));
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(&(lock->mutex));
#endif
lock->writer_thread = os_thread_get_curr_id();
lock->pass = 0;
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(&(lock->mutex));
#else
__sync_synchronize();
#endif
}
/**********************************************************************
......@@ -367,6 +413,89 @@ rw_lock_x_lock_low(
const char* file_name,/* in: file name where lock requested */
ulint line) /* in: line where requested */
{
#ifdef HAVE_GCC_ATOMIC_BUILTINS
os_thread_id_t curr_thread = os_thread_get_curr_id();
/* try to lock writer */
if(__sync_lock_test_and_set(&(lock->writer),RW_LOCK_EX)
== RW_LOCK_NOT_LOCKED) {
/* success */
/* obtain RW_LOCK_WAIT_EX right */
lock->writer_thread = curr_thread;
lock->pass = pass;
lock->writer_is_wait_ex = TRUE;
/* atomic operation may be safer about memory order. */
__sync_synchronize();
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, pass, RW_LOCK_WAIT_EX,
file_name, line);
#endif
}
if (!os_thread_eq(lock->writer_thread, curr_thread)) {
return(RW_LOCK_NOT_LOCKED);
}
switch(rw_lock_get_writer(lock)) {
case RW_LOCK_WAIT_EX:
/* have right to try x-lock */
if (lock->lock_word == RW_LOCK_BIAS) {
/* try x-lock */
if(__sync_sub_and_fetch(&(lock->lock_word),
RW_LOCK_BIAS) == 0) {
/* success */
lock->pass = pass;
lock->writer_is_wait_ex = FALSE;
__sync_fetch_and_add(&(lock->writer_count),1);
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, pass, RW_LOCK_WAIT_EX);
rw_lock_add_debug_info(lock, pass, RW_LOCK_EX,
file_name, line);
#endif
lock->last_x_file_name = file_name;
lock->last_x_line = line;
/* Locking succeeded, we may return */
return(RW_LOCK_EX);
} else {
/* fail */
__sync_fetch_and_add(&(lock->lock_word),
RW_LOCK_BIAS);
}
}
/* There are readers, we have to wait */
return(RW_LOCK_WAIT_EX);
break;
case RW_LOCK_EX:
/* already have x-lock */
if ((lock->pass == 0)&&(pass == 0)) {
__sync_fetch_and_add(&(lock->writer_count),1);
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, pass, RW_LOCK_EX, file_name,
line);
#endif
lock->last_x_file_name = file_name;
lock->last_x_line = line;
/* Locking succeeded, we may return */
return(RW_LOCK_EX);
}
return(RW_LOCK_NOT_LOCKED);
break;
default: /* ??? */
return(RW_LOCK_NOT_LOCKED);
}
#else /* HAVE_GCC_ATOMIC_BUILTINS */
ut_ad(mutex_own(rw_lock_get_mutex(lock)));
if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
......@@ -447,6 +576,7 @@ rw_lock_x_lock_low(
/* Locking succeeded, we may return */
return(RW_LOCK_EX);
}
#endif /* HAVE_GCC_ATOMIC_BUILTINS */
/* Locking did not succeed */
return(RW_LOCK_NOT_LOCKED);
......@@ -472,19 +602,33 @@ rw_lock_x_lock_func(
ulint line) /* in: line where requested */
{
ulint index; /* index of the reserved wait cell */
ulint state; /* lock state acquired */
ulint state = RW_LOCK_NOT_LOCKED; /* lock state acquired */
#ifdef HAVE_GCC_ATOMIC_BUILTINS
ulint prev_state = RW_LOCK_NOT_LOCKED;
#endif
ulint i; /* spin round count */
ut_ad(rw_lock_validate(lock));
lock_loop:
i = 0;
#ifdef HAVE_GCC_ATOMIC_BUILTINS
prev_state = state;
#else
/* Acquire the mutex protecting the rw-lock fields */
mutex_enter_fast(&(lock->mutex));
#endif
state = rw_lock_x_lock_low(lock, pass, file_name, line);
#ifdef HAVE_GCC_ATOMIC_BUILTINS
if (state != prev_state) i=0; /* if progress, reset counter. */
#else
mutex_exit(&(lock->mutex));
#endif
spin_loop:
if (state == RW_LOCK_EX) {
return; /* Locking succeeded */
......@@ -492,10 +636,9 @@ rw_lock_x_lock_func(
} else if (state == RW_LOCK_NOT_LOCKED) {
/* Spin waiting for the writer field to become free */
i = 0;
while (rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED
&& i < SYNC_SPIN_ROUNDS) {
while (i < SYNC_SPIN_ROUNDS
&& rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0,
srv_spin_wait_delay));
......@@ -509,9 +652,12 @@ rw_lock_x_lock_func(
} else if (state == RW_LOCK_WAIT_EX) {
/* Spin waiting for the reader count field to become zero */
i = 0;
#ifdef HAVE_GCC_ATOMIC_BUILTINS
while (lock->lock_word != RW_LOCK_BIAS
#else
while (rw_lock_get_reader_count(lock) != 0
#endif
&& i < SYNC_SPIN_ROUNDS) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0,
......@@ -524,7 +670,6 @@ rw_lock_x_lock_func(
os_thread_yield();
}
} else {
i = 0; /* Eliminate a compiler warning */
ut_error;
}
......@@ -541,34 +686,69 @@ rw_lock_x_lock_func(
/* We try once again to obtain the lock. Acquire the mutex protecting
the rw-lock fields */
#ifdef HAVE_GCC_ATOMIC_BUILTINS
prev_state = state;
#else
mutex_enter(rw_lock_get_mutex(lock));
#endif
state = rw_lock_x_lock_low(lock, pass, file_name, line);
#ifdef HAVE_GCC_ATOMIC_BUILTINS
if (state != prev_state) i=0; /* if progress, reset counter. */
#endif
if (state == RW_LOCK_EX) {
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(rw_lock_get_mutex(lock));
#endif
return; /* Locking succeeded */
}
#ifdef HAVE_GCC_ATOMIC_BUILTINS
/* like sync0sync.c doing */
i++;
if (i < SYNC_SPIN_ROUNDS) {
goto spin_loop;
}
#endif
rw_x_system_call_count++;
sync_array_reserve_cell(sync_primary_wait_array,
lock,
#ifdef __WIN__
/* On windows RW_LOCK_WAIT_EX signifies
that this thread should wait on the
special wait_ex_event. */
(state == RW_LOCK_WAIT_EX)
? RW_LOCK_WAIT_EX :
#endif
RW_LOCK_EX,
file_name, line,
&index);
rw_lock_set_waiters(lock, 1);
if (state == RW_LOCK_WAIT_EX) {
rw_lock_set_wx_waiters(lock, 1);
} else {
rw_lock_set_x_waiters(lock, 1);
}
#ifdef HAVE_GCC_ATOMIC_BUILTINS
/* like sync0sync.c doing */
for (i = 0; i < 4; i++) {
prev_state = state;
state = rw_lock_x_lock_low(lock, pass, file_name, line);
if (state == RW_LOCK_EX) {
sync_array_free_cell(sync_primary_wait_array, index);
return; /* Locking succeeded */
}
if (state != prev_state) {
/* retry! */
sync_array_free_cell(sync_primary_wait_array, index);
goto lock_loop;
}
}
#else
mutex_exit(rw_lock_get_mutex(lock));
#endif
if (srv_print_latch_waits) {
fprintf(stderr,
......@@ -730,7 +910,9 @@ rw_lock_own(
ut_ad(lock);
ut_ad(rw_lock_validate(lock));
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(&(lock->mutex));
#endif
info = UT_LIST_GET_FIRST(lock->debug_list);
......@@ -740,7 +922,9 @@ rw_lock_own(
&& (info->pass == 0)
&& (info->lock_type == lock_type)) {
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(&(lock->mutex));
#endif
/* Found! */
return(TRUE);
......@@ -748,7 +932,9 @@ rw_lock_own(
info = UT_LIST_GET_NEXT(list, info);
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(&(lock->mutex));
#endif
return(FALSE);
}
......@@ -770,21 +956,25 @@ rw_lock_is_locked(
ut_ad(lock);
ut_ad(rw_lock_validate(lock));
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(&(lock->mutex));
#endif
if (lock_type == RW_LOCK_SHARED) {
if (lock->reader_count > 0) {
ret = TRUE;
}
} else if (lock_type == RW_LOCK_EX) {
if (lock->writer == RW_LOCK_EX) {
if (rw_lock_get_writer(lock) == RW_LOCK_EX) {
ret = TRUE;
}
} else {
ut_error;
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(&(lock->mutex));
#endif
return(ret);
}
......@@ -814,16 +1004,26 @@ rw_lock_list_print_info(
count++;
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(&(lock->mutex));
#endif
if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
|| (rw_lock_get_reader_count(lock) != 0)
|| (rw_lock_get_waiters(lock) != 0)) {
|| (rw_lock_get_s_waiters(lock) != 0)
|| (rw_lock_get_x_waiters(lock) != 0)
|| (rw_lock_get_wx_waiters(lock) != 0)) {
fprintf(file, "RW-LOCK: %p ", (void*) lock);
if (rw_lock_get_waiters(lock)) {
fputs(" Waiters for the lock exist\n", file);
if (rw_lock_get_s_waiters(lock)) {
fputs(" s_waiters for the lock exist,", file);
}
if (rw_lock_get_x_waiters(lock)) {
fputs(" x_waiters for the lock exist\n", file);
}
if (rw_lock_get_wx_waiters(lock)) {
fputs(" wait_ex_waiters for the lock exist\n", file);
} else {
putc('\n', file);
}
......@@ -835,7 +1035,9 @@ rw_lock_list_print_info(
}
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(&(lock->mutex));
#endif
lock = UT_LIST_GET_NEXT(list, lock);
}
......@@ -860,10 +1062,18 @@ rw_lock_print(
if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
|| (rw_lock_get_reader_count(lock) != 0)
|| (rw_lock_get_waiters(lock) != 0)) {
|| (rw_lock_get_s_waiters(lock) != 0)
|| (rw_lock_get_x_waiters(lock) != 0)
|| (rw_lock_get_wx_waiters(lock) != 0)) {
if (rw_lock_get_waiters(lock)) {
fputs(" Waiters for the lock exist\n", stderr);
if (rw_lock_get_s_waiters(lock)) {
fputs(" s_waiters for the lock exist,", stderr);
}
if (rw_lock_get_x_waiters(lock)) {
fputs(" x_waiters for the lock exist\n", stderr);
}
if (rw_lock_get_wx_waiters(lock)) {
fputs(" wait_ex_waiters for the lock exist\n", stderr);
} else {
putc('\n', stderr);
}
......@@ -922,14 +1132,18 @@ rw_lock_n_locked(void)
lock = UT_LIST_GET_FIRST(rw_lock_list);
while (lock != NULL) {
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(rw_lock_get_mutex(lock));
#endif
if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
|| (rw_lock_get_reader_count(lock) != 0)) {
count++;
}
#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(rw_lock_get_mutex(lock));
#endif
lock = UT_LIST_GET_NEXT(list, lock);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment