From 040c16ab8b7d5e4192a17a72224e89ff14899cd5 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Marko=20M=C3=A4kel=C3=A4?= <marko.makela@mariadb.com>
Date: Mon, 19 Apr 2021 17:47:21 +0300
Subject: [PATCH] MDEV-25404: Optimize srw_mutex on Linux, OpenBSD, Windows

On Linux, OpenBSD and Microsoft Windows, srw_mutex was an alias for a
rw-lock while we only need mutex functionality. Let us implement a
futex-based mutex with one bit for HOLDER and 31 bits for counting
waiting requests.

srw_lock::wr_unlock() can avoid waking up a waiter when no waiting
requests exist. (Previously, we only had 1-bit rw_lock::WRITER_WAITING
flag that could be wrongly cleared if multiple waiting wr_lock() exist.
Now we have no problem with up to 2,147,483,648 conflicting threads.)

On 64-bit Microsoft Windows, the advantage is that
sizeof(srw_mutex) is 4, while sizeof(SRWLOCK) would be 8.

Reviewed by: Vladislav Vaintroub
---
 storage/innobase/include/srw_lock.h | 52 ++++++++++++++++++++++++++---
 storage/innobase/sync/srw_lock.cc   | 48 ++++++++++++++++++++++++--
 2 files changed, 93 insertions(+), 7 deletions(-)

diff --git a/storage/innobase/include/srw_lock.h b/storage/innobase/include/srw_lock.h
index 50675d2ffb3..1c7646d3b97 100644
--- a/storage/innobase/include/srw_lock.h
+++ b/storage/innobase/include/srw_lock.h
@@ -1,6 +1,6 @@
 /*****************************************************************************
 
-Copyright (c) 2020, MariaDB Corporation.
+Copyright (c) 2020, 2021, MariaDB Corporation.
 
 This program is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free Software
@@ -25,9 +25,9 @@ this program; if not, write to the Free Software Foundation, Inc.,
 # define SRW_LOCK_DUMMY /* Use dummy implementation for debugging purposes */
 #endif
 
-#if defined SRW_LOCK_DUMMY && !(defined _WIN32)
+#if defined SRW_LOCK_DUMMY
 /** An exclusive-only variant of srw_lock */
-class srw_mutex
+class srw_mutex final
 {
   pthread_mutex_t lock;
 public:
@@ -38,7 +38,51 @@ class srw_mutex
   bool wr_lock_try() { return !pthread_mutex_trylock(&lock); }
 };
 #else
-# define srw_mutex srw_lock_low
+/** Futex-based mutex */
+class srw_mutex final
+{
+  /** The lock word, containing HOLDER and a count of waiters */
+  std::atomic<uint32_t> lock;
+  /** Identifies that the lock is being held */
+  static constexpr uint32_t HOLDER= 1U << 31;
+
+  /** Wait until the mutex has been acquired */
+  void wait_and_lock();
+  /** Wait for lock!=lk */
+  inline void wait(uint32_t lk);
+  /** Wake up one wait() thread */
+  void wake();
+public:
+  /** @return whether the mutex is being held or waited for */
+  bool is_locked_or_waiting() const
+  { return lock.load(std::memory_order_relaxed) != 0; }
+  /** @return whether the mutex is being held by any thread */
+  bool is_locked() const
+  { return (lock.load(std::memory_order_relaxed) & HOLDER) != 0; }
+
+  void init() { DBUG_ASSERT(!is_locked_or_waiting()); }
+  void destroy() { DBUG_ASSERT(!is_locked_or_waiting()); }
+
+  /** @return whether the mutex was acquired */
+  bool wr_lock_try()
+  {
+    uint32_t lk= 0;
+    return lock.compare_exchange_strong(lk, HOLDER,
+                                        std::memory_order_acquire,
+                                        std::memory_order_relaxed);
+  }
+
+  void wr_lock() { if (!wr_lock_try()) wait_and_lock(); }
+  void wr_unlock()
+  {
+    const uint32_t lk= lock.fetch_and(~HOLDER, std::memory_order_release);
+    if (lk != HOLDER)
+    {
+      DBUG_ASSERT(lk & HOLDER);
+      wake();
+    }
+  }
+};
 #endif
 
 #include "rw_lock.h"
diff --git a/storage/innobase/sync/srw_lock.cc b/storage/innobase/sync/srw_lock.cc
index 223acac665f..12c521cb085 100644
--- a/storage/innobase/sync/srw_lock.cc
+++ b/storage/innobase/sync/srw_lock.cc
@@ -1,6 +1,6 @@
 /*****************************************************************************
 
-Copyright (c) 2020, MariaDB Corporation.
+Copyright (c) 2020, 2021, MariaDB Corporation.
 
 This program is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free Software
@@ -73,6 +73,10 @@ static_assert(4 == sizeof(rw_lock), "ABI");
 # ifdef _WIN32
 #  include <synchapi.h>
 
+inline void srw_mutex::wait(uint32_t lk)
+{ WaitOnAddress(&lock, &lk, 4, INFINITE); }
+void srw_mutex::wake() { WakeByAddressSingle(&lock); }
+
 inline void ssux_lock_low::writer_wait(uint32_t l)
 {
   WaitOnAddress(word(), &l, 4, INFINITE);
@@ -86,14 +90,17 @@ inline void ssux_lock_low::readers_wake() { WakeByAddressAll(word()); }
 #   define SRW_FUTEX(a,op,n) \
     syscall(SYS_futex, a, FUTEX_ ## op ## _PRIVATE, n, nullptr, nullptr, 0)
 #  elif defined __OpenBSD__
-#  include <sys/time.h>
-#  include <sys/futex.h>
+#   include <sys/time.h>
+#   include <sys/futex.h>
 #   define SRW_FUTEX(a,op,n) \
     futex((volatile uint32_t*) a, FUTEX_ ## op, n, nullptr, nullptr)
 #  else
 #   error "no futex support"
 #  endif
 
+inline void srw_mutex::wait(uint32_t lk) { SRW_FUTEX(&lock, WAIT, lk); }
+void srw_mutex::wake() { SRW_FUTEX(&lock, WAKE, 1); }
+
 inline void ssux_lock_low::writer_wait(uint32_t l)
 {
   SRW_FUTEX(word(), WAIT, l);
@@ -102,6 +109,41 @@ inline void ssux_lock_low::writer_wake() { SRW_FUTEX(word(), WAKE, 1); }
 inline void ssux_lock_low::readers_wake() { SRW_FUTEX(word(), WAKE, INT_MAX); }
 # endif
 # define readers_wait writer_wait
+
+
+void srw_mutex::wait_and_lock()
+{
+  uint32_t lk= 1 + lock.fetch_add(1, std::memory_order_relaxed);
+  for (auto spin= srv_n_spin_wait_rounds; spin; spin--)
+  {
+    lk&= ~HOLDER;
+    DBUG_ASSERT(lk);
+    while (!lock.compare_exchange_weak(lk, HOLDER | (lk - 1),
+                                       std::memory_order_acquire,
+                                       std::memory_order_relaxed))
+      if (lk & HOLDER)
+        goto occupied;
+    return;
+occupied:
+    ut_delay(srv_spin_wait_delay);
+  }
+
+  for (;;)
+  {
+    lk= lock.load(std::memory_order_relaxed);
+    while (!(lk & HOLDER))
+    {
+      DBUG_ASSERT(lk);
+      if (lock.compare_exchange_weak(lk, HOLDER | (lk - 1),
+                                     std::memory_order_acquire,
+                                     std::memory_order_relaxed))
+        return;
+    }
+    DBUG_ASSERT(lk > HOLDER);
+    wait(lk);
+  }
+}
+
 #endif
 
 /** Wait for a read lock.
-- 
2.30.9