Commit 78bfa5e7 authored by Rich Prohaska's avatar Rich Prohaska

#107 use frwlock as checkpoint_safe_lock

parent be83087a
...@@ -96,6 +96,7 @@ PATENT RIGHTS GRANT: ...@@ -96,6 +96,7 @@ PATENT RIGHTS GRANT:
#include "background_job_manager.h" #include "background_job_manager.h"
#include <portability/toku_random.h> #include <portability/toku_random.h>
#include <util/frwlock.h> #include <util/frwlock.h>
#include <util/frwlock.cc>
#include <util/kibbutz.h> #include <util/kibbutz.h>
#include <util/nb_mutex.h> #include <util/nb_mutex.h>
#include <util/partitioned_counter.h> #include <util/partitioned_counter.h>
......
...@@ -136,6 +136,7 @@ PATENT RIGHTS GRANT: ...@@ -136,6 +136,7 @@ PATENT RIGHTS GRANT:
#include "checkpoint.h" #include "checkpoint.h"
#include <portability/toku_atomic.h> #include <portability/toku_atomic.h>
#include <util/status.h> #include <util/status.h>
#include <util/frwlock.h>
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
// Engine status // Engine status
...@@ -187,7 +188,8 @@ toku_checkpoint_get_status(CACHETABLE ct, CHECKPOINT_STATUS statp) { ...@@ -187,7 +188,8 @@ toku_checkpoint_get_status(CACHETABLE ct, CHECKPOINT_STATUS statp) {
static LSN last_completed_checkpoint_lsn; static LSN last_completed_checkpoint_lsn;
static toku_pthread_rwlock_t checkpoint_safe_lock; static toku_mutex_t checkpoint_safe_mutex;
static toku::frwlock checkpoint_safe_lock;
static toku_pthread_rwlock_t multi_operation_lock; static toku_pthread_rwlock_t multi_operation_lock;
static toku_pthread_rwlock_t low_priority_multi_operation_lock; static toku_pthread_rwlock_t low_priority_multi_operation_lock;
...@@ -237,28 +239,33 @@ multi_operation_checkpoint_unlock(void) { ...@@ -237,28 +239,33 @@ multi_operation_checkpoint_unlock(void) {
static void static void
checkpoint_safe_lock_init(void) { checkpoint_safe_lock_init(void) {
toku_pthread_rwlock_init(&checkpoint_safe_lock, NULL); toku_mutex_init(&checkpoint_safe_mutex, NULL);
checkpoint_safe_lock.init(&checkpoint_safe_mutex);
locked_cs = false; locked_cs = false;
} }
static void static void
checkpoint_safe_lock_destroy(void) { checkpoint_safe_lock_destroy(void) {
toku_pthread_rwlock_destroy(&checkpoint_safe_lock); checkpoint_safe_lock.deinit();
toku_mutex_destroy(&checkpoint_safe_mutex);
} }
static void static void
checkpoint_safe_checkpoint_lock(void) { checkpoint_safe_checkpoint_lock(void) {
toku_pthread_rwlock_wrlock(&checkpoint_safe_lock); toku_mutex_lock(&checkpoint_safe_mutex);
checkpoint_safe_lock.write_lock(false);
toku_mutex_unlock(&checkpoint_safe_mutex);
locked_cs = true; locked_cs = true;
} }
static void static void
checkpoint_safe_checkpoint_unlock(void) { checkpoint_safe_checkpoint_unlock(void) {
locked_cs = false; locked_cs = false;
toku_pthread_rwlock_wrunlock(&checkpoint_safe_lock); toku_mutex_lock(&checkpoint_safe_mutex);
checkpoint_safe_lock.write_unlock();
toku_mutex_unlock(&checkpoint_safe_mutex);
} }
// toku_xxx_client_(un)lock() functions are only called from client code, // toku_xxx_client_(un)lock() functions are only called from client code,
// never from checkpoint code, and use the "reader" interface to the lock functions. // never from checkpoint code, and use the "reader" interface to the lock functions.
...@@ -286,18 +293,20 @@ void ...@@ -286,18 +293,20 @@ void
toku_checkpoint_safe_client_lock(void) { toku_checkpoint_safe_client_lock(void) {
if (locked_cs) if (locked_cs)
(void) toku_sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_CS), 1); (void) toku_sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_CS), 1);
toku_pthread_rwlock_rdlock(&checkpoint_safe_lock); toku_mutex_lock(&checkpoint_safe_mutex);
checkpoint_safe_lock.read_lock();
toku_mutex_unlock(&checkpoint_safe_mutex);
toku_multi_operation_client_lock(); toku_multi_operation_client_lock();
} }
void void
toku_checkpoint_safe_client_unlock(void) { toku_checkpoint_safe_client_unlock(void) {
toku_pthread_rwlock_rdunlock(&checkpoint_safe_lock); toku_mutex_lock(&checkpoint_safe_mutex);
checkpoint_safe_lock.read_unlock();
toku_mutex_unlock(&checkpoint_safe_mutex);
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
} }
// Initialize the checkpoint mechanism, must be called before any client operations. // Initialize the checkpoint mechanism, must be called before any client operations.
void void
toku_checkpoint_init(void) { toku_checkpoint_init(void) {
......
...@@ -127,6 +127,7 @@ PATENT RIGHTS GRANT: ...@@ -127,6 +127,7 @@ PATENT RIGHTS GRANT:
#include <toku_assert.h> #include <toku_assert.h>
#include <util/rwlock.h> #include <util/rwlock.h>
#include <util/frwlock.h> #include <util/frwlock.h>
#include <util/frwlock.cc>
#include <portability/toku_atomic.h> #include <portability/toku_atomic.h>
#include "toku_fair_rwlock.h" #include "toku_fair_rwlock.h"
......
...@@ -118,11 +118,11 @@ void frwlock::deinit(void) { ...@@ -118,11 +118,11 @@ void frwlock::deinit(void) {
toku_cond_destroy(&m_wait_read); toku_cond_destroy(&m_wait_read);
} }
inline bool frwlock::queue_is_empty(void) const { bool frwlock::queue_is_empty(void) const {
return m_wait_head == nullptr; return m_wait_head == nullptr;
} }
inline void frwlock::enq_item(queue_item *const item) { void frwlock::enq_item(queue_item *const item) {
paranoid_invariant_null(item->next); paranoid_invariant_null(item->next);
if (m_wait_tail != nullptr) { if (m_wait_tail != nullptr) {
m_wait_tail->next = item; m_wait_tail->next = item;
...@@ -133,7 +133,7 @@ inline void frwlock::enq_item(queue_item *const item) { ...@@ -133,7 +133,7 @@ inline void frwlock::enq_item(queue_item *const item) {
m_wait_tail = item; m_wait_tail = item;
} }
inline toku_cond_t *frwlock::deq_item(void) { toku_cond_t *frwlock::deq_item(void) {
paranoid_invariant_notnull(m_wait_head); paranoid_invariant_notnull(m_wait_head);
paranoid_invariant_notnull(m_wait_tail); paranoid_invariant_notnull(m_wait_tail);
queue_item *item = m_wait_head; queue_item *item = m_wait_head;
...@@ -145,7 +145,7 @@ inline toku_cond_t *frwlock::deq_item(void) { ...@@ -145,7 +145,7 @@ inline toku_cond_t *frwlock::deq_item(void) {
} }
// Prerequisite: Holds m_mutex. // Prerequisite: Holds m_mutex.
inline void frwlock::write_lock(bool expensive) { void frwlock::write_lock(bool expensive) {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
if (this->try_write_lock(expensive)) { if (this->try_write_lock(expensive)) {
return; return;
...@@ -178,7 +178,7 @@ inline void frwlock::write_lock(bool expensive) { ...@@ -178,7 +178,7 @@ inline void frwlock::write_lock(bool expensive) {
m_current_writer_expensive = expensive; m_current_writer_expensive = expensive;
} }
inline bool frwlock::try_write_lock(bool expensive) { bool frwlock::try_write_lock(bool expensive) {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
if (m_num_readers > 0 || m_num_writers > 0 || m_num_signaled_readers > 0 || m_num_want_write > 0) { if (m_num_readers > 0 || m_num_writers > 0 || m_num_signaled_readers > 0 || m_num_want_write > 0) {
return false; return false;
...@@ -191,7 +191,7 @@ inline bool frwlock::try_write_lock(bool expensive) { ...@@ -191,7 +191,7 @@ inline bool frwlock::try_write_lock(bool expensive) {
return true; return true;
} }
inline void frwlock::read_lock(void) { void frwlock::read_lock(void) {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
if (m_num_writers > 0 || m_num_want_write > 0) { if (m_num_writers > 0 || m_num_want_write > 0) {
if (!m_wait_read_is_in_queue) { if (!m_wait_read_is_in_queue) {
...@@ -223,7 +223,7 @@ inline void frwlock::read_lock(void) { ...@@ -223,7 +223,7 @@ inline void frwlock::read_lock(void) {
++m_num_readers; ++m_num_readers;
} }
inline bool frwlock::try_read_lock(void) { bool frwlock::try_read_lock(void) {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
if (m_num_writers > 0 || m_num_want_write > 0) { if (m_num_writers > 0 || m_num_want_write > 0) {
return false; return false;
...@@ -235,7 +235,7 @@ inline bool frwlock::try_read_lock(void) { ...@@ -235,7 +235,7 @@ inline bool frwlock::try_read_lock(void) {
return true; return true;
} }
inline void frwlock::maybe_signal_next_writer(void) { void frwlock::maybe_signal_next_writer(void) {
if (m_num_want_write > 0 && m_num_signaled_readers == 0 && m_num_readers == 0) { if (m_num_want_write > 0 && m_num_signaled_readers == 0 && m_num_readers == 0) {
toku_cond_t *cond = this->deq_item(); toku_cond_t *cond = this->deq_item();
paranoid_invariant(cond != &m_wait_read); paranoid_invariant(cond != &m_wait_read);
...@@ -245,7 +245,7 @@ inline void frwlock::maybe_signal_next_writer(void) { ...@@ -245,7 +245,7 @@ inline void frwlock::maybe_signal_next_writer(void) {
} }
} }
inline void frwlock::read_unlock(void) { void frwlock::read_unlock(void) {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
paranoid_invariant(m_num_writers == 0); paranoid_invariant(m_num_writers == 0);
paranoid_invariant(m_num_readers > 0); paranoid_invariant(m_num_readers > 0);
...@@ -253,7 +253,7 @@ inline void frwlock::read_unlock(void) { ...@@ -253,7 +253,7 @@ inline void frwlock::read_unlock(void) {
this->maybe_signal_next_writer(); this->maybe_signal_next_writer();
} }
inline bool frwlock::read_lock_is_expensive(void) { bool frwlock::read_lock_is_expensive(void) {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
if (m_wait_read_is_in_queue) { if (m_wait_read_is_in_queue) {
return m_read_wait_expensive; return m_read_wait_expensive;
...@@ -264,7 +264,7 @@ inline bool frwlock::read_lock_is_expensive(void) { ...@@ -264,7 +264,7 @@ inline bool frwlock::read_lock_is_expensive(void) {
} }
inline void frwlock::maybe_signal_or_broadcast_next(void) { void frwlock::maybe_signal_or_broadcast_next(void) {
paranoid_invariant(m_num_signaled_readers == 0); paranoid_invariant(m_num_signaled_readers == 0);
if (this->queue_is_empty()) { if (this->queue_is_empty()) {
...@@ -289,42 +289,42 @@ inline void frwlock::maybe_signal_or_broadcast_next(void) { ...@@ -289,42 +289,42 @@ inline void frwlock::maybe_signal_or_broadcast_next(void) {
} }
} }
inline void frwlock::write_unlock(void) { void frwlock::write_unlock(void) {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
paranoid_invariant(m_num_writers == 1); paranoid_invariant(m_num_writers == 1);
m_num_writers = 0; m_num_writers = 0;
m_current_writer_expensive = false; m_current_writer_expensive = false;
this->maybe_signal_or_broadcast_next(); this->maybe_signal_or_broadcast_next();
} }
inline bool frwlock::write_lock_is_expensive(void) { bool frwlock::write_lock_is_expensive(void) {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
return (m_num_expensive_want_write > 0) || (m_current_writer_expensive); return (m_num_expensive_want_write > 0) || (m_current_writer_expensive);
} }
inline uint32_t frwlock::users(void) const { uint32_t frwlock::users(void) const {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
return m_num_readers + m_num_writers + m_num_want_read + m_num_want_write; return m_num_readers + m_num_writers + m_num_want_read + m_num_want_write;
} }
inline uint32_t frwlock::blocked_users(void) const { uint32_t frwlock::blocked_users(void) const {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
return m_num_want_read + m_num_want_write; return m_num_want_read + m_num_want_write;
} }
inline uint32_t frwlock::writers(void) const { uint32_t frwlock::writers(void) const {
// this is sometimes called as "assert(lock->writers())" when we // this is sometimes called as "assert(lock->writers())" when we
// assume we have the write lock. if that's the assumption, we may // assume we have the write lock. if that's the assumption, we may
// not own the mutex, so we don't assert_locked here // not own the mutex, so we don't assert_locked here
return m_num_writers; return m_num_writers;
} }
inline uint32_t frwlock::blocked_writers(void) const { uint32_t frwlock::blocked_writers(void) const {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
return m_num_want_write; return m_num_want_write;
} }
inline uint32_t frwlock::readers(void) const { uint32_t frwlock::readers(void) const {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
return m_num_readers; return m_num_readers;
} }
inline uint32_t frwlock::blocked_readers(void) const { uint32_t frwlock::blocked_readers(void) const {
toku_mutex_assert_locked(m_mutex); toku_mutex_assert_locked(m_mutex);
return m_num_want_read; return m_num_want_read;
} }
......
...@@ -106,24 +106,24 @@ public: ...@@ -106,24 +106,24 @@ public:
void init(toku_mutex_t *const mutex); void init(toku_mutex_t *const mutex);
void deinit(void); void deinit(void);
inline void write_lock(bool expensive); void write_lock(bool expensive);
inline bool try_write_lock(bool expensive); bool try_write_lock(bool expensive);
inline void write_unlock(void); void write_unlock(void);
// returns true if acquiring a write lock will be expensive // returns true if acquiring a write lock will be expensive
inline bool write_lock_is_expensive(void); bool write_lock_is_expensive(void);
inline void read_lock(void); void read_lock(void);
inline bool try_read_lock(void); bool try_read_lock(void);
inline void read_unlock(void); void read_unlock(void);
// returns true if acquiring a read lock will be expensive // returns true if acquiring a read lock will be expensive
inline bool read_lock_is_expensive(void); bool read_lock_is_expensive(void);
inline uint32_t users(void) const; uint32_t users(void) const;
inline uint32_t blocked_users(void) const; uint32_t blocked_users(void) const;
inline uint32_t writers(void) const; uint32_t writers(void) const;
inline uint32_t blocked_writers(void) const; uint32_t blocked_writers(void) const;
inline uint32_t readers(void) const; uint32_t readers(void) const;
inline uint32_t blocked_readers(void) const; uint32_t blocked_readers(void) const;
private: private:
struct queue_item { struct queue_item {
...@@ -131,11 +131,11 @@ private: ...@@ -131,11 +131,11 @@ private:
struct queue_item *next; struct queue_item *next;
}; };
inline bool queue_is_empty(void) const; bool queue_is_empty(void) const;
inline void enq_item(queue_item *const item); void enq_item(queue_item *const item);
inline toku_cond_t *deq_item(void); toku_cond_t *deq_item(void);
inline void maybe_signal_or_broadcast_next(void); void maybe_signal_or_broadcast_next(void);
inline void maybe_signal_next_writer(void); void maybe_signal_next_writer(void);
toku_mutex_t *m_mutex; toku_mutex_t *m_mutex;
...@@ -168,6 +168,6 @@ ENSURE_POD(frwlock); ...@@ -168,6 +168,6 @@ ENSURE_POD(frwlock);
} // namespace toku } // namespace toku
// include the implementation here // include the implementation here
#include "frwlock.cc" // #include "frwlock.cc"
#endif // UTIL_FRWLOCK_H #endif // UTIL_FRWLOCK_H
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <unistd.h> #include <unistd.h>
#include <pthread.h> #include <pthread.h>
#include <util/frwlock.h> #include <util/frwlock.h>
#include <util/frwlock.cc>
toku_mutex_t rwlock_mutex; toku_mutex_t rwlock_mutex;
toku::frwlock rwlock; toku::frwlock rwlock;
......
...@@ -100,6 +100,7 @@ PATENT RIGHTS GRANT: ...@@ -100,6 +100,7 @@ PATENT RIGHTS GRANT:
#include <portability/toku_pthread.h> #include <portability/toku_pthread.h>
#include <portability/toku_time.h> #include <portability/toku_time.h>
#include <util/frwlock.h> #include <util/frwlock.h>
#include <util/frwlock.cc>
#include <util/rwlock.h> #include <util/rwlock.h>
#include "rwlock_condvar.h" #include "rwlock_condvar.h"
......
...@@ -127,6 +127,7 @@ PATENT RIGHTS GRANT: ...@@ -127,6 +127,7 @@ PATENT RIGHTS GRANT:
#include <portability/toku_pthread.h> #include <portability/toku_pthread.h>
#include <portability/toku_time.h> #include <portability/toku_time.h>
#include <util/frwlock.h> #include <util/frwlock.h>
#include <util/frwlock.cc>
#include <util/rwlock.h> #include <util/rwlock.h>
#include "rwlock_condvar.h" #include "rwlock_condvar.h"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment