Commit fba83125 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

fixes #5961 pass a callback to the ydb layer when escalation occurs for a...

fixes #5961 pass a callback to the ydb layer when escalation occurs for a txnid, providing the locktree and range_buffer describing how to update the txn's internal data structures so they don't grow out of control for very long lived write transactions


git-svn-id: file:///svn/toku/tokudb@52941 c7de825b-a66e-492c-adef-691d508d4ae1
parent a1d664f0
......@@ -507,6 +507,25 @@ static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
return num_extracted;
}
// Store each newly escalated lock in a range buffer for appropriate txnid.
// We'll rebuild the locktree by iterating over these ranges, and then we
// can pass back each txnid/buffer pair individually through a callback
// to notify higher layers that locks have changed.
struct txnid_range_buffer {
TXNID txnid;
range_buffer buffer;
static int find_by_txnid(const struct txnid_range_buffer &other_buffer, const TXNID &txnid) {
if (txnid < other_buffer.txnid) {
return -1;
} else if (other_buffer.txnid == txnid) {
return 0;
} else {
return 1;
}
}
};
// escalate the locks in the locktree by merging adjacent
// locks that have the same txnid into one larger lock.
//
......@@ -514,9 +533,9 @@ static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
// approach works well. if there are many txnids and each
// has locks in a random/alternating order, then this does
// not work so well.
void locktree::escalate(void) {
GrowableArray<row_lock> escalated_locks;
escalated_locks.init();
void locktree::escalate(manager::lt_escalate_cb after_escalate_callback, void *after_escalate_callback_extra) {
omt<struct txnid_range_buffer, struct txnid_range_buffer *> range_buffers;
range_buffers.create();
// prepare and acquire a locked keyrange on the entire locktree
concurrent_tree::locked_keyrange lkr;
......@@ -533,7 +552,7 @@ void locktree::escalate(void) {
// extract and remove batches of row locks from the locktree
int num_extracted;
static const int num_row_locks_per_batch = 128;
const int num_row_locks_per_batch = 128;
row_lock *XCALLOC_N(num_row_locks_per_batch, extracted_buf);
// we always remove the "first" n because we are removing n
......@@ -553,18 +572,33 @@ void locktree::escalate(void) {
next_txnid_index++;
}
// create a range which dominates the ranges between
// the current index and the next txnid's index (excle).
keyrange merged_range;
merged_range.create(
extracted_buf[current_index].range.get_left_key(),
extracted_buf[next_txnid_index - 1].range.get_right_key());
// Create an escalated range for the current txnid that dominates
// each range between the current indext and the next txnid's index.
const TXNID current_txnid = extracted_buf[current_index].txnid;
const DBT *escalated_left_key = extracted_buf[current_index].range.get_left_key();
const DBT *escalated_right_key = extracted_buf[next_txnid_index - 1].range.get_right_key();
// Try to find a range buffer for the current txnid. Create one if it doesn't exist.
// Then, append the new escalated range to the buffer.
uint32_t idx;
struct txnid_range_buffer new_range_buffer;
struct txnid_range_buffer *existing_range_buffer;
int r = range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>(
current_txnid,
&existing_range_buffer,
&idx
);
if (r == DB_NOTFOUND) {
new_range_buffer.txnid = current_txnid;
new_range_buffer.buffer.create();
new_range_buffer.buffer.append(escalated_left_key, escalated_right_key);
range_buffers.insert_at(new_range_buffer, idx);
} else {
invariant_zero(r);
invariant(existing_range_buffer->txnid == current_txnid);
existing_range_buffer->buffer.append(escalated_left_key, escalated_right_key);
}
// save the new lock, continue from the next txnid's index
row_lock merged_row_lock;
merged_row_lock.range.create_copy(merged_range);
merged_row_lock.txnid = extracted_buf[current_index].txnid;
escalated_locks.push(merged_row_lock);
current_index = next_txnid_index;
}
......@@ -575,17 +609,35 @@ void locktree::escalate(void) {
}
toku_free(extracted_buf);
// we should have extracted every lock from the old rangetree.
// now it is time to repopulate it with the escalated locks.
// Rebuild the locktree from each range in each range buffer,
// then notify higher layers that the txnid's locks have changed.
invariant(m_rangetree->is_empty());
size_t new_num_locks = escalated_locks.get_size();
for (size_t i = 0; i < new_num_locks; i++) {
row_lock lock = escalated_locks.fetch_unchecked(i);
const size_t num_range_buffers = range_buffers.size();
for (size_t i = 0; i < num_range_buffers; i++) {
struct txnid_range_buffer *current_range_buffer;
int r = range_buffers.fetch(i, &current_range_buffer);
invariant_zero(r);
const TXNID current_txnid = current_range_buffer->txnid;
range_buffer::iterator iter;
range_buffer::iterator::record rec;
iter.create(&current_range_buffer->buffer);
while (iter.current(&rec)) {
keyrange range;
range.create(rec.get_left_key(), rec.get_right_key());
row_lock lock = { .range = range, .txnid = current_txnid };
insert_row_lock_into_tree(&lkr, lock, m_mem_tracker);
lock.range.destroy();
iter.next();
}
// Notify higher layers that locks have changed for the current txnid
if (after_escalate_callback) {
after_escalate_callback(current_txnid, this, current_range_buffer->buffer, after_escalate_callback_extra);
}
current_range_buffer->buffer.destroy();
}
range_buffers.destroy();
escalated_locks.deinit();
lkr.release();
}
......@@ -605,7 +657,7 @@ void locktree::set_descriptor(DESCRIPTOR desc) {
m_cmp->set_descriptor(desc);
}
int locktree::compare(locktree *lt) {
int locktree::compare(const locktree *lt) {
if (m_dict_id.dictid < lt->m_dict_id.dictid) {
return -1;
} else if (m_dict_id.dictid == lt->m_dict_id.dictid) {
......
......@@ -75,7 +75,7 @@ class locktree {
void set_descriptor(DESCRIPTOR desc);
int compare(locktree *lt);
int compare(const locktree *lt);
// The locktree stores some data for lock requests. It doesn't have to know
// how they work or even what a lock request object looks like.
......@@ -101,10 +101,11 @@ class locktree {
public:
typedef void (*lt_create_cb)(locktree *lt, void *extra);
typedef void (*lt_destroy_cb)(locktree *lt);
typedef void (*lt_escalate_cb)(TXNID txnid, const locktree *lt, const range_buffer &buffer, void *extra);
// note: create_cb is called just after a locktree is first created.
// destroy_cb is called just before a locktree is destroyed.
void create(lt_create_cb create_cb, lt_destroy_cb destroy_cb);
void create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb, void *extra);
void destroy(void);
......@@ -189,6 +190,8 @@ class locktree {
// the create and destroy callbacks for the locktrees
lt_create_cb m_lt_create_callback;
lt_destroy_cb m_lt_destroy_callback;
lt_escalate_cb m_lt_escalate_callback;
void *m_lt_escalate_callback_extra;
omt<locktree *> m_locktree_map;
......@@ -408,7 +411,7 @@ class locktree {
int try_acquire_lock(bool is_write_request, TXNID txnid,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts);
void escalate();
void escalate(manager::lt_escalate_cb after_escalate_callback, void *extra);
friend class locktree_unit_test;
friend class manager_unit_test;
......
......@@ -12,7 +12,7 @@
namespace toku {
void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb) {
void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) {
m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY;
m_current_lock_memory = 0;
m_lock_wait_time_ms = DEFAULT_LOCK_WAIT_TIME;
......@@ -21,6 +21,9 @@ void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb)
m_locktree_map.create();
m_lt_create_callback = create_cb;
m_lt_destroy_callback = destroy_cb;
m_lt_escalate_callback = escalate_cb;
m_lt_escalate_callback_extra = escalate_extra;
ZERO_STRUCT(m_mutex);
toku_mutex_init(&m_mutex, nullptr);
}
......@@ -218,7 +221,7 @@ void locktree::manager::run_escalation(void) {
locktree *lt;
int r = m_locktree_map.fetch(i, &lt);
invariant_zero(r);
lt->escalate();
lt->escalate(m_lt_escalate_callback, m_lt_escalate_callback_extra);
}
}
......
......@@ -17,7 +17,10 @@ namespace toku {
// be stored, iterated over, and then destroyed all at once.
class range_buffer {
private:
// Private in spirit: We fail POD asserts when we try to store range_buffers in an omt.
// So make it all public, but don't touch.
public:
//private:
// the key range buffer is a bunch of records in a row.
// each record has the following header, followed by the
......@@ -109,7 +112,7 @@ class range_buffer {
void destroy(void);
private:
//private:
char *m_buf;
size_t m_buf_size;
size_t m_buf_current;
......
......@@ -16,7 +16,7 @@ void lock_request_unit_test::test_start_deadlock(void) {
// something short
const uint64_t lock_wait_time = 10;
mgr.create(nullptr, nullptr);
mgr.create(nullptr, nullptr, nullptr, nullptr);
DICTIONARY_ID dict_id = { 1 };
lt = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr);
......
......@@ -18,7 +18,7 @@ void lock_request_unit_test::test_start_pending(void) {
// bogus, just has to be something.
const uint64_t lock_wait_time = 0;
mgr.create(nullptr, nullptr);
mgr.create(nullptr, nullptr, nullptr, nullptr);
DICTIONARY_ID dict_id = { 1 };
lt = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr);
......
......@@ -21,7 +21,7 @@ namespace toku {
// test read lock conflicts when write locks exist
void locktree_unit_test::test_conflicts(void) {
locktree::manager mgr;
mgr.create(nullptr, nullptr);
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr);
......
......@@ -11,7 +11,7 @@ namespace toku {
// test simple create and destroy of the locktree
void locktree_unit_test::test_create_destroy(void) {
locktree::manager mgr;
mgr.create(nullptr, nullptr);
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr);
......
......@@ -11,7 +11,7 @@ namespace toku {
// test that ranges with infinite endpoints work
void locktree_unit_test::test_infinity(void) {
locktree::manager mgr;
mgr.create(nullptr, nullptr);
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr);
......
......@@ -23,7 +23,7 @@ static int my_compare_dbts(DB *db, const DBT *a, const DBT *b) {
// test that get/set userdata works, and that get_manager() works
void locktree_unit_test::test_misc(void) {
locktree::manager mgr;
mgr.create(nullptr, nullptr);
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, my_compare_dbts, nullptr);
......
......@@ -14,7 +14,7 @@ namespace toku {
// or write locks are consolidated by overlapping relocks.
void locktree_unit_test::test_overlapping_relock(void) {
locktree::manager mgr;
mgr.create(nullptr, nullptr);
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr);
......
......@@ -11,7 +11,7 @@ namespace toku {
// test simple, non-overlapping read locks and then write locks
void locktree_unit_test::test_simple_lock(void) {
locktree::manager mgr;
mgr.create(nullptr, nullptr);
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr);
......
......@@ -14,7 +14,7 @@ namespace toku {
// or write locks are consolidated by overlapping relocks.
void locktree_unit_test::test_single_txnid_optimization(void) {
locktree::manager mgr;
mgr.create(nullptr, nullptr);
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr);
......
......@@ -14,7 +14,10 @@ void manager_unit_test::test_create_destroy(void) {
(locktree::manager::lt_create_cb) (long) 1;
locktree::manager::lt_destroy_cb destroy_callback =
(locktree::manager::lt_destroy_cb) (long) 2;
mgr.create(create_callback, destroy_callback);
locktree::manager::lt_escalate_cb escalate_callback =
(locktree::manager::lt_escalate_cb) (long) 3;
void *extra = (void *) (long) 4;
mgr.create(create_callback, destroy_callback, escalate_callback, extra);
invariant(mgr.m_max_lock_memory == locktree::manager::DEFAULT_MAX_LOCK_MEMORY);
invariant(mgr.m_current_lock_memory == 0);
......@@ -23,6 +26,8 @@ void manager_unit_test::test_create_destroy(void) {
invariant(mgr.m_locktree_map.size() == 0);
invariant(mgr.m_lt_create_callback == create_callback);
invariant(mgr.m_lt_destroy_callback == destroy_callback);
invariant(mgr.m_lt_escalate_callback == escalate_callback);
invariant(mgr.m_lt_escalate_callback_extra == extra);
mgr.mutex_lock();
mgr.mutex_unlock();
......
......@@ -10,7 +10,7 @@ namespace toku {
void manager_unit_test::test_lt_map(void) {
locktree::manager mgr;
mgr.create(nullptr, nullptr);
mgr.create(nullptr, nullptr, nullptr, nullptr);
locktree aa;
locktree bb;
......
......@@ -11,7 +11,7 @@ namespace toku {
void manager_unit_test::test_params(void) {
int r;
locktree::manager mgr;
mgr.create(nullptr, nullptr);
mgr.create(nullptr, nullptr, nullptr, nullptr);
uint64_t new_max_lock_memory = 15307752356;
r = mgr.set_max_lock_memory(new_max_lock_memory);
......
......@@ -23,7 +23,7 @@ static void destroy_cb(locktree *lt) {
void manager_unit_test::test_reference_release_lt(void) {
locktree::manager mgr;
mgr.create(create_cb, destroy_cb);
mgr.create(create_cb, destroy_cb, nullptr, nullptr);
DICTIONARY_ID a = { 0 };
DICTIONARY_ID b = { 1 };
......
......@@ -2199,7 +2199,10 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
assert_zero(r);
assert(result->i->logger);
result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback);
// Create the locktree manager, passing in the create/destroy/escalate callbacks.
// The extra parameter for escalation is simply a pointer to this environment.
// The escalate callback will need it to translate txnids to DB_TXNs
result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback, toku_db_txn_escalate_callback, result);
r = toku_omt_create(&result->i->open_dbs);
toku_mutex_init(&result->i->open_dbs_lock, NULL);
......
......@@ -27,14 +27,14 @@ static DB_TXN *txn_oldest_ancester(DB_TXN* txn) {
}
int find_key_ranges_by_lt(const txn_lt_key_ranges &ranges,
toku::locktree *const &find_lt);
const toku::locktree *const &find_lt);
int find_key_ranges_by_lt(const txn_lt_key_ranges &ranges,
toku::locktree *const &find_lt) {
const toku::locktree *const &find_lt) {
return ranges.lt->compare(find_lt);
}
static void db_txn_note_row_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key) {
toku::locktree *lt = db->i->lt;
const toku::locktree *lt = db->i->lt;
toku_mutex_lock(&db_txn_struct_i(txn)->txn_mutex);
......@@ -44,16 +44,16 @@ static void db_txn_note_row_lock(DB *db, DB_TXN *txn, const DBT *left_key, const
// if this txn has not yet already referenced this
// locktree, then add it to this txn's locktree map
int r = map->find_zero<toku::locktree *, find_key_ranges_by_lt>(lt, &ranges, &idx);
int r = map->find_zero<const toku::locktree *, find_key_ranges_by_lt>(lt, &ranges, &idx);
if (r == DB_NOTFOUND) {
ranges.lt = lt;
ranges.lt = db->i->lt;
XMALLOC(ranges.buffer);
ranges.buffer->create();
map->insert_at(ranges, idx);
// let the manager know we're referencing this lt
toku::locktree::manager *ltm = &txn->mgrp->i->ltm;
ltm->reference_lt(lt);
ltm->reference_lt(ranges.lt);
} else {
invariant_zero(r);
}
......@@ -64,6 +64,71 @@ static void db_txn_note_row_lock(DB *db, DB_TXN *txn, const DBT *left_key, const
toku_mutex_unlock(&db_txn_struct_i(txn)->txn_mutex);
}
void toku_db_txn_escalate_callback(TXNID txnid, const toku::locktree *lt, const toku::range_buffer &buffer, void *extra) {
DB_ENV *CAST_FROM_VOIDP(env, extra);
// Get the TOKUTXN and DB_TXN for this txnid from the environment's txn manager.
// Only the parent id is used in the search.
TOKUTXN ttxn;
TXNID_PAIR txnid_pair = { .parent_id64 = txnid, .child_id64 = 0 };
TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
toku_txn_manager_suspend(txn_manager);
toku_txn_manager_id2txn_unlocked(txn_manager, txnid_pair, &ttxn);
// We are still holding the txn manager lock. If we couldn't find the txn,
// then we lost a race with a committing transaction that got removed
// from the txn manager before it released its locktree locks. In this
// case we do nothing - that transaction has or is just about to release
// its locks and be gone, so there's not point in updating its lt_map
// with the new escalated ranges. It will go about releasing the old
// locks it thinks it had, and will succeed as if nothing happened.
//
// If we did find the transaction, then it has not yet been removed
// from the manager and therefore has not yet released its locks.
// We must be able to find and replace the range buffer associated
// with this locktree. This is impotant, otherwise it can grow out of
// control (ticket 5961).
if (ttxn != nullptr) {
DB_TXN *txn = toku_txn_get_container_db_txn(ttxn);
// One subtle point is that if the transaction is still live, it is impossible
// to deadlock on the txn mutex, even though we are holding the locktree's root
// mutex and release locks takes them in the opposite order.
//
// Proof: releasing locks takes the txn mutex and then acquires the locktree's
// root mutex, escalation takes the root mutex and possibly takes the txn mutex.
// releasing locks implies the txn is not live, and a non-live txn implies we
// will not need to take the txn mutex, so the deadlock is avoided.
toku_mutex_lock(&db_txn_struct_i(txn)->txn_mutex);
// We should be able to find this locktree. It was just escalated, and we had locks.
uint32_t idx;
txn_lt_key_ranges ranges;
toku::omt<txn_lt_key_ranges> *map = &db_txn_struct_i(txn)->lt_map;
int r = map->find_zero<const toku::locktree *, find_key_ranges_by_lt>(lt, &ranges, &idx);
invariant_zero(r);
// Destroy the old range buffer, create a new one, and insert the new ranges.
//
// We could theoretically steal the memory from the caller instead of copying
// it, but it's simpler to have a callback API that doesn't transfer memory ownership.
ranges.buffer->destroy();
ranges.buffer->create();
toku::range_buffer::iterator iter;
toku::range_buffer::iterator::record rec;
iter.create(&buffer);
while (iter.current(&rec)) {
ranges.buffer->append(rec.get_left_key(), rec.get_right_key());
iter.next();
}
toku_mutex_unlock(&db_txn_struct_i(txn)->txn_mutex);
}
toku_txn_manager_resume(txn_manager);
}
// Get a range lock.
// Return when the range lock is acquired or the default lock tree timeout has expired.
......
......@@ -11,6 +11,10 @@
#include <locktree/lock_request.h>
// Expose the escalate callback to ydb.cc,
// so it can pass the function pointer to the locktree
void toku_db_txn_escalate_callback(TXNID txnid, const toku::locktree *lt, const toku::range_buffer &buffer, void *extra);
int toku_db_get_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key,
toku::lock_request::type lock_type);
......
......@@ -25,6 +25,11 @@ toku_txn_id64(DB_TXN * txn) {
static void
toku_txn_release_locks(DB_TXN *txn) {
// Prevent access to the locktree map while releasing.
// It is possible for lock escalation to attempt to
// modify this data structure while the txn commits.
toku_mutex_lock(&db_txn_struct_i(txn)->txn_mutex);
size_t num_ranges = db_txn_struct_i(txn)->lt_map.size();
for (size_t i = 0; i < num_ranges; i++) {
txn_lt_key_ranges ranges;
......@@ -32,6 +37,8 @@ toku_txn_release_locks(DB_TXN *txn) {
invariant_zero(r);
toku_db_release_lt_key_ranges(txn, &ranges);
}
toku_mutex_unlock(&db_txn_struct_i(txn)->txn_mutex);
}
static void
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment