Commit bbee1fec authored by Rich Prohaska's avatar Rich Prohaska

#56 run lock escalation on a background thread

parent 64cdcbc2
...@@ -310,8 +310,6 @@ public: ...@@ -310,8 +310,6 @@ public:
void *extra); void *extra);
int iterate_pending_lock_requests(lock_request_iterate_callback cb, void *extra); int iterate_pending_lock_requests(lock_request_iterate_callback cb, void *extra);
void escalation_wait(uint64_t t);
private: private:
static const uint64_t DEFAULT_MAX_LOCK_MEMORY = 64L * 1024 * 1024; static const uint64_t DEFAULT_MAX_LOCK_MEMORY = 64L * 1024 * 1024;
static const uint64_t DEFAULT_LOCK_WAIT_TIME = 0; static const uint64_t DEFAULT_LOCK_WAIT_TIME = 0;
...@@ -321,15 +319,6 @@ public: ...@@ -321,15 +319,6 @@ public:
uint64_t m_current_lock_memory; uint64_t m_current_lock_memory;
memory_tracker m_mem_tracker; memory_tracker m_mem_tracker;
// statistics about lock escalation.
uint64_t m_escalation_count;
tokutime_t m_escalation_time;
uint64_t m_escalation_latest_result;
uint64_t m_wait_escalation_count;
uint64_t m_wait_escalation_time;
uint64_t m_long_wait_escalation_count;
uint64_t m_long_wait_escalation_time;
struct lt_counters m_lt_counters; struct lt_counters m_lt_counters;
// lock wait time for blocking row locks, in ms // lock wait time for blocking row locks, in ms
...@@ -367,12 +356,40 @@ public: ...@@ -367,12 +356,40 @@ public:
void locktree_map_remove(locktree *lt); void locktree_map_remove(locktree *lt);
// effect: Runs escalation on all locktrees. // effect: Runs escalation on all locktrees.
// requires: Manager's mutex is held
void run_escalation(void); void run_escalation(void);
static int find_by_dict_id(locktree *const &lt, const DICTIONARY_ID &dict_id); static int find_by_dict_id(locktree *const &lt, const DICTIONARY_ID &dict_id);
void escalator_init(void);
void escalator_destroy(void);
// effect: Add time t to the escalator's wait time statistics
void add_escalator_wait_time(uint64_t t);
// effect: escalate's the locks in each locktree
// requires: manager's mutex is held
void escalate_all_locktrees(void);
// statistics about lock escalation.
uint64_t m_escalation_count;
tokutime_t m_escalation_time;
uint64_t m_escalation_latest_result;
uint64_t m_wait_escalation_count;
uint64_t m_wait_escalation_time;
uint64_t m_long_wait_escalation_count;
uint64_t m_long_wait_escalation_time;
toku_mutex_t m_escalator_mutex;
toku_cond_t m_escalator_work; // signal the escalator to run
toku_cond_t m_escalator_done; // signal that escalation is done
bool m_escalator_killed;
toku_pthread_t m_escalator_id;
friend class manager_unit_test; friend class manager_unit_test;
public:
void escalator_work(void);
}; };
ENSURE_POD(manager); ENSURE_POD(manager);
......
...@@ -102,13 +102,7 @@ namespace toku { ...@@ -102,13 +102,7 @@ namespace toku {
void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) { void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) {
m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY; m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY;
m_current_lock_memory = 0; m_current_lock_memory = 0;
m_escalation_count = 0; escalator_init();
m_escalation_time = 0;
m_wait_escalation_count = 0;
m_wait_escalation_time = 0;
m_long_wait_escalation_count = 0;
m_long_wait_escalation_time = 0;
m_escalation_latest_result = 0;
m_lock_wait_time_ms = DEFAULT_LOCK_WAIT_TIME; m_lock_wait_time_ms = DEFAULT_LOCK_WAIT_TIME;
m_mem_tracker.set_manager(this); m_mem_tracker.set_manager(this);
...@@ -126,9 +120,11 @@ void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, ...@@ -126,9 +120,11 @@ void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb,
} }
void locktree::manager::destroy(void) { void locktree::manager::destroy(void) {
escalator_destroy();
invariant(m_current_lock_memory == 0); invariant(m_current_lock_memory == 0);
invariant(m_locktree_map.size() == 0); invariant(m_locktree_map.size() == 0);
m_locktree_map.destroy(); m_locktree_map.destroy();
toku_mutex_destroy(&m_mutex);
} }
void locktree::manager::mutex_lock(void) { void locktree::manager::mutex_lock(void) {
...@@ -319,29 +315,31 @@ void locktree::manager::run_escalation_for_test(void) { ...@@ -319,29 +315,31 @@ void locktree::manager::run_escalation_for_test(void) {
run_escalation(); run_escalation();
} }
// effect: escalate's the locks in each locktree
// requires: manager's mutex is held
void locktree::manager::run_escalation(void) { void locktree::manager::run_escalation(void) {
// there are too many row locks in the system and we need to tidy up. uint64_t t0 = toku_current_time_microsec();
// if (1) {
// a simple implementation of escalation does not attempt // run escalation on the background thread
// to reduce the memory foot print of each txn's range buffer. int r;
// doing so would require some layering hackery (or a callback) toku_mutex_lock(&m_escalator_mutex);
// and more complicated locking. for now, just escalate each toku_cond_broadcast(&m_escalator_work);
// locktree individually, in-place. struct timeval tv;
tokutime_t t0 = toku_time_now(); r = gettimeofday(&tv, 0);
size_t num_locktrees = m_locktree_map.size(); assert_zero(r);
for (size_t i = 0; i < num_locktrees; i++) { uint64_t t = tv.tv_sec * 1000000 + tv.tv_usec;
locktree *lt; t += 100000; // 100 milliseconds
int r = m_locktree_map.fetch(i, &lt); toku_timespec_t wakeup_time;
invariant_zero(r); wakeup_time.tv_sec = t / 1000000;
lt->escalate(m_lt_escalate_callback, m_lt_escalate_callback_extra); wakeup_time.tv_nsec = (t % 1000000) * 1000;
r = toku_cond_timedwait(&m_escalator_done, &m_escalator_mutex, &wakeup_time);
toku_mutex_unlock(&m_escalator_mutex);
} else {
// run escalation on this thread
mutex_lock();
escalate_all_locktrees();
mutex_unlock();
} }
tokutime_t t1 = toku_time_now(); uint64_t t1 = toku_current_time_microsec();
add_escalator_wait_time(t1 - t0);
m_escalation_count++;
m_escalation_time += (t1 - t0);
m_escalation_latest_result = m_current_lock_memory;
} }
void locktree::manager::memory_tracker::set_manager(manager *mgr) { void locktree::manager::memory_tracker::set_manager(manager *mgr) {
...@@ -354,17 +352,10 @@ int locktree::manager::memory_tracker::check_current_lock_constraints(void) { ...@@ -354,17 +352,10 @@ int locktree::manager::memory_tracker::check_current_lock_constraints(void) {
// mutex and check again. if we're still out of locks, run escalation. // mutex and check again. if we're still out of locks, run escalation.
// return an error if we're still out of locks after escalation. // return an error if we're still out of locks after escalation.
if (out_of_locks()) { if (out_of_locks()) {
uint64_t t0 = toku_current_time_microsec(); m_mgr->run_escalation();
m_mgr->mutex_lock();
if (out_of_locks()) { if (out_of_locks()) {
m_mgr->run_escalation(); r = TOKUDB_OUT_OF_LOCKS;
if (out_of_locks()) {
r = TOKUDB_OUT_OF_LOCKS;
}
} }
uint64_t t1 = toku_current_time_microsec();
m_mgr->escalation_wait(t1 - t0);
m_mgr->mutex_unlock();
} }
return r; return r;
} }
...@@ -411,13 +402,92 @@ int locktree::manager::iterate_pending_lock_requests( ...@@ -411,13 +402,92 @@ int locktree::manager::iterate_pending_lock_requests(
return r; return r;
} }
void locktree::manager::escalation_wait(uint64_t t) { static void *escalator_thread(void *arg) {
locktree::manager *mgr = reinterpret_cast<locktree::manager*>(arg);
mgr->escalator_work();
return arg;
}
void locktree::manager::escalator_init(void) {
ZERO_STRUCT(m_escalator_mutex);
toku_mutex_init(&m_escalator_mutex, nullptr);
toku_cond_init(&m_escalator_work, nullptr);
toku_cond_init(&m_escalator_done, nullptr);
m_escalator_killed = false;
m_escalation_count = 0;
m_escalation_time = 0;
m_wait_escalation_count = 0;
m_wait_escalation_time = 0;
m_long_wait_escalation_count = 0;
m_long_wait_escalation_time = 0;
m_escalation_latest_result = 0;
int r = toku_pthread_create(&m_escalator_id, nullptr, escalator_thread, this);
assert_zero(r);
}
void locktree::manager::escalator_destroy(void) {
toku_mutex_lock(&m_escalator_mutex);
m_escalator_killed = true;
toku_cond_broadcast(&m_escalator_work);
toku_mutex_unlock(&m_escalator_mutex);
void *ret;
int r = toku_pthread_join(m_escalator_id, &ret);
assert_zero(r);
toku_mutex_destroy(&m_escalator_mutex);
toku_cond_destroy(&m_escalator_work);
toku_cond_destroy(&m_escalator_done);
}
void locktree::manager::escalate_all_locktrees(void) {
// there are too many row locks in the system and we need to tidy up.
//
// a simple implementation of escalation does not attempt
// to reduce the memory foot print of each txn's range buffer.
// doing so would require some layering hackery (or a callback)
// and more complicated locking. for now, just escalate each
// locktree individually, in-place.
tokutime_t t0 = toku_time_now();
size_t num_locktrees = m_locktree_map.size();
for (size_t i = 0; i < num_locktrees; i++) {
locktree *lt;
int r = m_locktree_map.fetch(i, &lt);
invariant_zero(r);
lt->escalate(m_lt_escalate_callback, m_lt_escalate_callback_extra);
}
tokutime_t t1 = toku_time_now();
toku_mutex_lock(&m_escalator_mutex);
m_escalation_count++;
m_escalation_time += (t1 - t0);
m_escalation_latest_result = m_current_lock_memory;
toku_mutex_unlock(&m_escalator_mutex);
}
void locktree::manager::add_escalator_wait_time(uint64_t t) {
toku_mutex_lock(&m_escalator_mutex);
m_wait_escalation_count += 1; m_wait_escalation_count += 1;
m_wait_escalation_time += t; m_wait_escalation_time += t;
if (t >= 1000000) { if (t >= 1000000) {
m_long_wait_escalation_count += 1; m_long_wait_escalation_count += 1;
m_long_wait_escalation_time += t; m_long_wait_escalation_time += t;
} }
toku_mutex_unlock(&m_escalator_mutex);
}
void locktree::manager::escalator_work(void) {
toku_mutex_lock(&m_escalator_mutex);
while (!m_escalator_killed) {
toku_cond_wait(&m_escalator_work, &m_escalator_mutex);
if (!m_escalator_killed) {
toku_mutex_unlock(&m_escalator_mutex);
mutex_lock();
escalate_all_locktrees();
mutex_unlock();
toku_mutex_lock(&m_escalator_mutex);
toku_cond_broadcast(&m_escalator_done);
}
}
toku_mutex_unlock(&m_escalator_mutex);
} }
#define STATUS_INIT(k,c,t,l,inc) TOKUDB_STATUS_INIT(status, k, c, t, "locktree: " l, inc) #define STATUS_INIT(k,c,t,l,inc) TOKUDB_STATUS_INIT(status, k, c, t, "locktree: " l, inc)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment