Commit 57d20f11 authored by Sergey Vojtovich's avatar Sergey Vojtovich

MDEV-14529 - InnoDB rw-locks: optimize memory barriers

Remove volatile modifier from lock_word: it's not supposed for inter-thread
communication, use appropriate atomic operations instead.
parent c73e77da
......@@ -571,7 +571,7 @@ struct rw_lock_t
#endif /* UNIV_DEBUG */
{
/** Holds the state of the lock. */
volatile int32_t lock_word;
int32_t lock_word;
/** 1: there are waiters */
volatile uint32_t waiters;
......
......@@ -77,7 +77,8 @@ rw_lock_get_writer(
/*===============*/
const rw_lock_t* lock) /*!< in: rw-lock */
{
int32_t lock_word = lock->lock_word;
int32_t lock_word = my_atomic_load32_explicit(const_cast<int32_t*>(&lock->lock_word),
MY_MEMORY_ORDER_RELAXED);
ut_ad(lock_word <= X_LOCK_DECR);
if (lock_word > X_LOCK_HALF_DECR) {
......@@ -109,7 +110,8 @@ rw_lock_get_reader_count(
/*=====================*/
const rw_lock_t* lock) /*!< in: rw-lock */
{
int32_t lock_word = lock->lock_word;
int32_t lock_word = my_atomic_load32_explicit(const_cast<int32_t*>(&lock->lock_word),
MY_MEMORY_ORDER_RELAXED);
ut_ad(lock_word <= X_LOCK_DECR);
if (lock_word > X_LOCK_HALF_DECR) {
......@@ -145,7 +147,8 @@ rw_lock_get_x_lock_count(
/*=====================*/
const rw_lock_t* lock) /*!< in: rw-lock */
{
int32_t lock_copy = lock->lock_word;
int32_t lock_copy = my_atomic_load32_explicit(const_cast<int32_t*>(&lock->lock_word),
MY_MEMORY_ORDER_RELAXED);
ut_ad(lock_copy <= X_LOCK_DECR);
if (lock_copy == 0 || lock_copy == -X_LOCK_HALF_DECR) {
......@@ -178,7 +181,8 @@ rw_lock_get_sx_lock_count(
const rw_lock_t* lock) /*!< in: rw-lock */
{
#ifdef UNIV_DEBUG
int32_t lock_copy = lock->lock_word;
int32_t lock_copy = my_atomic_load32_explicit(const_cast<int32_t*>(&lock->lock_word),
MY_MEMORY_ORDER_RELAXED);
ut_ad(lock_copy <= X_LOCK_DECR);
......@@ -197,9 +201,7 @@ rw_lock_get_sx_lock_count(
}
/******************************************************************//**
Two different implementations for decrementing the lock_word of a rw_lock:
one for systems supporting atomic operations, one for others. This does
does not support recusive x-locks: they should be handled by the caller and
Recursive x-locks are not supported: they should be handled by the caller and
need not be atomic since they are performed by the current lock holder.
Returns true if the decrement was made, false if not.
@return true if decr occurs */
......@@ -211,13 +213,11 @@ rw_lock_lock_word_decr(
int32_t amount, /*!< in: amount to decrement */
int32_t threshold) /*!< in: threshold of judgement */
{
int32_t local_lock_word;
local_lock_word = my_atomic_load32_explicit(&lock->lock_word,
MY_MEMORY_ORDER_RELAXED);
while (local_lock_word > threshold) {
if (my_atomic_cas32(&lock->lock_word, &local_lock_word,
local_lock_word - amount)) {
int32_t lock_copy = my_atomic_load32_explicit(&lock->lock_word,
MY_MEMORY_ORDER_RELAXED);
while (lock_copy > threshold) {
if (my_atomic_cas32(&lock->lock_word, &lock_copy,
lock_copy - amount)) {
return(true);
}
}
......@@ -311,23 +311,24 @@ rw_lock_x_lock_func_nowait(
lock->writer_thread = os_thread_get_curr_id();
} else if (os_thread_eq(lock->writer_thread, os_thread_get_curr_id())) {
/* Relock: this lock_word modification is safe since no other
threads can modify (lock, unlock, or reserve) lock_word while
there is an exclusive writer and this is the writer thread. */
if (lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR) {
/* Relock: even though no other thread can modify (lock, unlock
or reserve) lock_word while there is an exclusive writer and
this is the writer thread, we still want concurrent threads to
observe consistent values. */
if (oldval == 0 || oldval == -X_LOCK_HALF_DECR) {
/* There are 1 x-locks */
lock->lock_word -= X_LOCK_DECR;
} else if (lock->lock_word <= -X_LOCK_DECR) {
my_atomic_add32_explicit(&lock->lock_word, -X_LOCK_DECR,
MY_MEMORY_ORDER_RELAXED);
} else if (oldval <= -X_LOCK_DECR) {
/* There are 2 or more x-locks */
lock->lock_word--;
my_atomic_add32_explicit(&lock->lock_word, -1,
MY_MEMORY_ORDER_RELAXED);
/* Watch for too many recursive locks */
ut_ad(oldval < 1);
} else {
/* Failure */
return(FALSE);
}
/* Watch for too many recursive locks */
ut_ad(lock->lock_word < 0);
} else {
/* Failure */
return(FALSE);
......@@ -356,8 +357,8 @@ rw_lock_s_unlock_func(
rw_lock_t* lock) /*!< in/out: rw-lock */
{
#ifdef UNIV_DEBUG
int32_t dbg_lock_word = my_atomic_load32_explicit(
&lock->lock_word, MY_MEMORY_ORDER_RELAXED);
int32_t dbg_lock_word = my_atomic_load32_explicit(&lock->lock_word,
MY_MEMORY_ORDER_RELAXED);
ut_ad(dbg_lock_word > -X_LOCK_DECR);
ut_ad(dbg_lock_word != 0);
ut_ad(dbg_lock_word < X_LOCK_DECR);
......@@ -392,9 +393,8 @@ rw_lock_x_unlock_func(
#endif /* UNIV_DEBUG */
rw_lock_t* lock) /*!< in/out: rw-lock */
{
int32_t lock_word;
lock_word = my_atomic_load32_explicit(&lock->lock_word,
MY_MEMORY_ORDER_RELAXED);
int32_t lock_word = my_atomic_load32_explicit(&lock->lock_word,
MY_MEMORY_ORDER_RELAXED);
ut_ad(lock_word == 0 || lock_word == -X_LOCK_HALF_DECR
|| lock_word <= -X_LOCK_DECR);
......@@ -427,11 +427,13 @@ rw_lock_x_unlock_func(
} else if (lock_word == -X_LOCK_DECR
|| lock_word == -(X_LOCK_DECR + X_LOCK_HALF_DECR)) {
/* There are 2 x-locks */
lock->lock_word += X_LOCK_DECR;
my_atomic_add32_explicit(&lock->lock_word, X_LOCK_DECR,
MY_MEMORY_ORDER_RELAXED);
} else {
/* There are more than 2 x-locks. */
ut_ad(lock_word < -X_LOCK_DECR);
lock->lock_word += 1;
my_atomic_add32_explicit(&lock->lock_word, 1,
MY_MEMORY_ORDER_RELAXED);
}
ut_ad(rw_lock_validate(lock));
......@@ -457,8 +459,10 @@ rw_lock_sx_unlock_func(
ut_d(rw_lock_remove_debug_info(lock, pass, RW_LOCK_SX));
if (lock->sx_recursive == 0) {
int32_t lock_word = my_atomic_load32_explicit(&lock->lock_word,
MY_MEMORY_ORDER_RELAXED);
/* Last caller in a possible recursive chain. */
if (lock->lock_word > 0) {
if (lock_word > 0) {
lock->writer_thread = 0;
if (my_atomic_add32(&lock->lock_word, X_LOCK_HALF_DECR) <= 0) {
......@@ -475,10 +479,10 @@ rw_lock_sx_unlock_func(
}
} else {
/* still has x-lock */
ut_ad(lock->lock_word == -X_LOCK_HALF_DECR
|| lock->lock_word <= -(X_LOCK_DECR
+ X_LOCK_HALF_DECR));
lock->lock_word += X_LOCK_HALF_DECR;
ut_ad(lock_word == -X_LOCK_HALF_DECR ||
lock_word <= -(X_LOCK_DECR + X_LOCK_HALF_DECR));
my_atomic_add32_explicit(&lock->lock_word, X_LOCK_HALF_DECR,
MY_MEMORY_ORDER_RELAXED);
}
}
......
......@@ -599,7 +599,7 @@ sync_array_cell_print(
"\n",
rw_lock_get_reader_count(rwlock),
rwlock->waiters,
rwlock->lock_word,
my_atomic_load32_explicit(&rwlock->lock_word, MY_MEMORY_ORDER_RELAXED),
innobase_basename(rwlock->last_s_file_name),
rwlock->last_s_line,
innobase_basename(rwlock->last_x_file_name),
......@@ -1398,7 +1398,8 @@ sync_arr_fill_sys_semphore_waits_table(
//fields[SYS_SEMAPHORE_WAITS_HOLDER_LINE]->set_notnull();
OK(field_store_ulint(fields[SYS_SEMAPHORE_WAITS_READERS], rw_lock_get_reader_count(rwlock)));
OK(field_store_ulint(fields[SYS_SEMAPHORE_WAITS_WAITERS_FLAG], (longlong)rwlock->waiters));
OK(field_store_ulint(fields[SYS_SEMAPHORE_WAITS_LOCK_WORD], (longlong)rwlock->lock_word));
OK(field_store_ulint(fields[SYS_SEMAPHORE_WAITS_LOCK_WORD],
my_atomic_load32_explicit(&rwlock->lock_word, MY_MEMORY_ORDER_RELAXED)));
OK(field_store_string(fields[SYS_SEMAPHORE_WAITS_LAST_READER_FILE], innobase_basename(rwlock->last_s_file_name)));
OK(fields[SYS_SEMAPHORE_WAITS_LAST_READER_LINE]->store(rwlock->last_s_line, true));
fields[SYS_SEMAPHORE_WAITS_LAST_READER_LINE]->set_notnull();
......
......@@ -268,7 +268,8 @@ rw_lock_free_func(
rw_lock_t* lock) /*!< in/out: rw-lock */
{
ut_ad(rw_lock_validate(lock));
ut_a(lock->lock_word == X_LOCK_DECR);
ut_a(my_atomic_load32_explicit(&lock->lock_word,
MY_MEMORY_ORDER_RELAXED) == X_LOCK_DECR);
mutex_enter(&rw_lock_list_mutex);
......@@ -438,12 +439,10 @@ rw_lock_x_lock_wait_func(
sync_array_t* sync_arr;
uint64_t count_os_wait = 0;
ut_ad(lock->lock_word <= threshold);
ut_ad(my_atomic_load32_explicit(&lock->lock_word, MY_MEMORY_ORDER_RELAXED) <= threshold);
while (lock->lock_word < threshold) {
HMT_low();
HMT_low();
while (my_atomic_load32_explicit(&lock->lock_word, MY_MEMORY_ORDER_RELAXED) < threshold) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
}
......@@ -452,7 +451,6 @@ rw_lock_x_lock_wait_func(
i++;
continue;
}
HMT_medium();
/* If there is still a reader, then go to sleep.*/
++n_spins;
......@@ -465,7 +463,7 @@ rw_lock_x_lock_wait_func(
i = 0;
/* Check lock_word to ensure wake-up isn't missed.*/
if (lock->lock_word < threshold) {
if (my_atomic_load32_explicit(&lock->lock_word, MY_MEMORY_ORDER_RELAXED) < threshold) {
++count_os_wait;
......@@ -488,7 +486,6 @@ rw_lock_x_lock_wait_func(
sync_array_free_cell(sync_arr, cell);
break;
}
HMT_low();
}
HMT_medium();
rw_lock_stats.rw_x_spin_round_count.add(n_spins);
......@@ -564,14 +561,18 @@ rw_lock_x_lock_low(
file_name, line);
} else {
int32_t lock_word = my_atomic_load32_explicit(&lock->lock_word,
MY_MEMORY_ORDER_RELAXED);
/* At least one X lock by this thread already
exists. Add another. */
if (lock->lock_word == 0
|| lock->lock_word == -X_LOCK_HALF_DECR) {
lock->lock_word -= X_LOCK_DECR;
if (lock_word == 0
|| lock_word == -X_LOCK_HALF_DECR) {
my_atomic_add32_explicit(&lock->lock_word, -X_LOCK_DECR,
MY_MEMORY_ORDER_RELAXED);
} else {
ut_ad(lock->lock_word <= -X_LOCK_DECR);
--lock->lock_word;
ut_ad(lock_word <= -X_LOCK_DECR);
my_atomic_add32_explicit(&lock->lock_word, -1,
MY_MEMORY_ORDER_RELAXED);
}
}
......@@ -642,12 +643,17 @@ rw_lock_sx_lock_low(
thread working on this lock and it is safe to
read and write to the lock_word. */
ut_ad((lock->lock_word == 0)
|| ((lock->lock_word <= -X_LOCK_DECR)
&& (lock->lock_word
#ifdef UNIV_DEBUG
int32_t lock_word =
#endif
my_atomic_add32_explicit(&lock->lock_word, -X_LOCK_HALF_DECR,
MY_MEMORY_ORDER_RELAXED);
ut_ad((lock_word == 0)
|| ((lock_word <= -X_LOCK_DECR)
&& (lock_word
> -(X_LOCK_DECR
+ X_LOCK_HALF_DECR))));
lock->lock_word -= X_LOCK_HALF_DECR;
}
} else {
/* Another thread locked before us */
......@@ -709,7 +715,7 @@ rw_lock_x_lock_func(
/* Spin waiting for the lock_word to become free */
HMT_low();
while (i < srv_n_spin_wait_rounds
&& lock->lock_word <= X_LOCK_HALF_DECR) {
&& my_atomic_load32_explicit(&lock->lock_word, MY_MEMORY_ORDER_RELAXED) <= X_LOCK_HALF_DECR) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(
......@@ -815,7 +821,7 @@ rw_lock_sx_lock_func(
/* Spin waiting for the lock_word to become free */
while (i < srv_n_spin_wait_rounds
&& lock->lock_word <= X_LOCK_HALF_DECR) {
&& my_atomic_load32_explicit(&lock->lock_word, MY_MEMORY_ORDER_RELAXED) <= X_LOCK_HALF_DECR) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(
......@@ -955,15 +961,17 @@ rw_lock_add_debug_info(
rw_lock_debug_mutex_exit();
if (pass == 0 && lock_type != RW_LOCK_X_WAIT) {
int32_t lock_word = my_atomic_load32_explicit(&lock->lock_word,
MY_MEMORY_ORDER_RELAXED);
/* Recursive x while holding SX
(lock_type == RW_LOCK_X && lock_word == -X_LOCK_HALF_DECR)
is treated as not-relock (new lock). */
if ((lock_type == RW_LOCK_X
&& lock->lock_word < -X_LOCK_HALF_DECR)
&& lock_word < -X_LOCK_HALF_DECR)
|| (lock_type == RW_LOCK_SX
&& (lock->lock_word < 0 || lock->sx_recursive == 1))) {
&& (lock_word < 0 || lock->sx_recursive == 1))) {
sync_check_lock_validate(lock);
sync_check_lock_granted(lock);
......@@ -1154,7 +1162,7 @@ rw_lock_list_print_info(
count++;
if (lock->lock_word != X_LOCK_DECR) {
if (my_atomic_load32_explicit(const_cast<int32_t*>(&lock->lock_word), MY_MEMORY_ORDER_RELAXED) != X_LOCK_DECR) {
fprintf(file, "RW-LOCK: %p ", (void*) lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment