Commit a73eedbf authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-26467 Unnecessary compare-and-swap loop in srw_mutex

srw_mutex::wait_and_lock(): In the spin loop, we will try to poll
for non-conflicting lock word state by reads, avoiding any writes.
We invoke explicit std::atomic_thread_fence(std::memory_order_acquire)
before returning. The individual operations on the lock word
can use memory_order_relaxed.

srw_mutex::lock: Document that the value for a single writer is
HOLDER+1 instead of HOLDER.

srw_mutex::wr_lock_try(), srw_mutex::wr_unlock(): Adjust the value
of the lock word of a single writer from HOLDER to HOLDER+1.
parent 7730dd39
......@@ -36,7 +36,8 @@ class srw_mutex final
/** Futex-based mutex */
class srw_mutex final
{
/** The lock word, containing HOLDER and a count of waiters */
/** The lock word, containing HOLDER + 1 if the lock is being held,
plus the number of waiters */
std::atomic<uint32_t> lock;
/** Identifies that the lock is being held */
static constexpr uint32_t HOLDER= 1U << 31;
......@@ -62,7 +63,7 @@ class srw_mutex final
bool wr_lock_try()
{
uint32_t lk= 0;
return lock.compare_exchange_strong(lk, HOLDER,
return lock.compare_exchange_strong(lk, HOLDER + 1,
std::memory_order_acquire,
std::memory_order_relaxed);
}
......@@ -70,8 +71,8 @@ class srw_mutex final
void wr_lock() { if (!wr_lock_try()) wait_and_lock(); }
void wr_unlock()
{
const uint32_t lk= lock.fetch_and(~HOLDER, std::memory_order_release);
if (lk != HOLDER)
const uint32_t lk= lock.fetch_sub(HOLDER + 1, std::memory_order_release);
if (lk != HOLDER + 1)
{
DBUG_ASSERT(lk & HOLDER);
wake();
......
......@@ -233,33 +233,39 @@ void ssux_lock_low::wake() { SRW_FUTEX(&readers, WAKE, 1); }
void srw_mutex::wait_and_lock()
{
uint32_t lk= 1 + lock.fetch_add(1, std::memory_order_relaxed);
for (auto spin= srv_n_spin_wait_rounds; spin; spin--)
for (auto spin= srv_n_spin_wait_rounds;;)
{
lk&= ~HOLDER;
DBUG_ASSERT(lk);
while (!lock.compare_exchange_weak(lk, HOLDER | (lk - 1),
std::memory_order_acquire,
std::memory_order_relaxed))
DBUG_ASSERT(~HOLDER & lk);
if (lk & HOLDER)
goto occupied;
return;
occupied:
lk= lock.load(std::memory_order_relaxed);
else
{
lk= lock.fetch_or(HOLDER, std::memory_order_relaxed);
if (!(lk & HOLDER))
goto acquired;
}
ut_delay(srv_spin_wait_delay);
if (!--spin)
break;
}
for (;;)
for (;; wait(lk))
{
if (lk & HOLDER)
{
lk= lock.load(std::memory_order_relaxed);
while (!(lk & HOLDER))
if (lk & HOLDER)
continue;
}
lk= lock.fetch_or(HOLDER, std::memory_order_relaxed);
if (!(lk & HOLDER))
{
acquired:
DBUG_ASSERT(lk);
if (lock.compare_exchange_weak(lk, HOLDER | (lk - 1),
std::memory_order_acquire,
std::memory_order_relaxed))
std::atomic_thread_fence(std::memory_order_acquire);
return;
}
DBUG_ASSERT(lk > HOLDER);
wait(lk);
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment