Commit f54709e1 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3952 speed up point write lock acquisition closes[t:3952]

git-svn-id: file:///svn/toku/tokudb@35856 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2ae82d8b
...@@ -1834,7 +1834,6 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB* db, TXNID txn, ...@@ -1834,7 +1834,6 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB* db, TXNID txn,
return r; return r;
} }
static int lt_try_acquire_range_write_lock(toku_lock_tree* tree, static int lt_try_acquire_range_write_lock(toku_lock_tree* tree,
DB* db, TXNID txn, DB* db, TXNID txn,
const DBT* key_left, const DBT* key_left,
...@@ -1864,12 +1863,56 @@ static int lt_try_acquire_range_write_lock(toku_lock_tree* tree, ...@@ -1864,12 +1863,56 @@ static int lt_try_acquire_range_write_lock(toku_lock_tree* tree,
r = lt_check_borderwrite_conflict(tree, txn, &query); r = lt_check_borderwrite_conflict(tree, txn, &query);
if (r != 0) if (r != 0)
goto cleanup; goto cleanup;
// insert and consolidate into the local write set if (key_left == key_right) {
toku_range to_insert; // Point write lock.
init_insert(&to_insert, &left, &right, txn); // Need to copy the memory and insert. No merging required in selfwrite.
r = consolidate_writes(tree, &to_insert, txn); // This is a point, and if merging was possible it would have been dominated by selfwrite.
if (r != 0)
goto cleanup; // Insert into selfwrite
toku_range to_insert;
init_insert(&to_insert, &left, &right, txn);
if (!ltm_lock_test_incr(tree->mgr, 0)) {
r = TOKUDB_OUT_OF_LOCKS;
goto cleanup;
}
BOOL dummy = TRUE;
r = lt_alloc_extreme(tree, &to_insert, TRUE, &dummy);
if (r != 0)
goto cleanup;
BOOL free_left = FALSE;
toku_range_tree* selfwrite;
r = lt_selfwrite(tree, txn, &selfwrite);
if (r != 0) {
free_left = TRUE;
goto cleanup_left;
}
assert(selfwrite);
r = toku_rt_insert(selfwrite, &to_insert);
if (r != 0) {
free_left = TRUE;
goto cleanup_left;
}
// Update borderwrite
r = lt_borderwrite_insert(tree, &query, &to_insert);
if (r != 0) {
r = lt_panic(tree, r);
goto cleanup_left;
}
ltm_lock_incr(tree->mgr, 0);
r = 0;
cleanup_left:
if (r != 0)
if (free_left)
p_free(tree, to_insert.ends.left);
} else {
// insert and consolidate into the local write set
toku_range to_insert;
init_insert(&to_insert, &left, &right, txn);
r = consolidate_writes(tree, &to_insert, txn);
if (r != 0)
goto cleanup;
}
cleanup: cleanup:
if (tree) if (tree)
lt_postprocess(tree); lt_postprocess(tree);
...@@ -1881,13 +1924,11 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB* db, TXNID txn, ...@@ -1881,13 +1924,11 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB* db, TXNID txn,
const DBT* key_right) { const DBT* key_right) {
int r = ENOSYS; int r = ENOSYS;
r = lt_try_acquire_range_write_lock(tree, db, txn, r = lt_try_acquire_range_write_lock(tree, db, txn, key_left, key_right);
key_left, key_right);
if (r == TOKUDB_OUT_OF_LOCKS) { if (r == TOKUDB_OUT_OF_LOCKS) {
r = ltm_do_escalation(tree->mgr); r = ltm_do_escalation(tree->mgr);
if (r == 0) { if (r == 0) {
r = lt_try_acquire_range_write_lock(tree, db, txn, r = lt_try_acquire_range_write_lock(tree, db, txn, key_left, key_right);
key_left, key_right);
if (r == 0) { if (r == 0) {
tree->mgr->status.lock_escalation_successes++; tree->mgr->status.lock_escalation_successes++;
} }
...@@ -1911,7 +1952,6 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB* db, TXNID txn, ...@@ -1911,7 +1952,6 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB* db, TXNID txn,
return r; return r;
} }
// toku_lt_acquire_write_lock() used only by test programs
int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB* db, TXNID txn, const DBT* key) { int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB* db, TXNID txn, const DBT* key) {
return toku_lt_acquire_range_write_lock(tree, db, txn, key, key); return toku_lt_acquire_range_write_lock(tree, db, txn, key, key);
} }
......
// benchmark point write locks acquisition rate.
// rate = nrows / time to execute the benchmark.
//
// example: ./benchmark_point_write_locks.tlog --max_locks 1000000 --max_lock_memory 1000000000 --nrows 1000000
#include "test.h"
int main(int argc, const char *argv[]) {
int r;
uint32_t max_locks = 2;
uint64_t max_lock_memory = 4096;
uint64_t nrows = 1;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
if (verbose > 0) verbose++;
continue;
}
if (strcmp(argv[i], "-q") == 0 || strcmp(argv[i], "--quiet") == 0) {
if (verbose > 0) verbose--;
continue;
}
if (strcmp(argv[i], "--max_locks") == 0 && i+1 < argc) {
max_locks = atoi(argv[++i]);
continue;
}
if (strcmp(argv[i], "--max_lock_memory") == 0 && i+1 < argc) {
max_lock_memory = atoi(argv[++i]);
continue;
}
if (strcmp(argv[i], "--nrows") == 0 && i+1 < argc) {
nrows = atoi(argv[++i]);
continue;
}
assert(0);
}
// setup
toku_ltm *ltm = NULL;
r = toku_ltm_create(&ltm, max_locks, max_lock_memory, dbpanic, get_compare_fun_from_db, toku_malloc, toku_free, toku_realloc);
assert(r == 0 && ltm);
toku_lock_tree *lt = NULL;
r = toku_lt_create(&lt, dbpanic, ltm, get_compare_fun_from_db, toku_malloc, toku_free, toku_realloc);
assert(r == 0 && lt);
DB *db_a = (DB *) 2;
TXNID txn_a = 1;
// acquire the locks on keys 0 .. nrows-1
for (uint64_t k = 0; k < nrows; k++) {
DBT key = { .data = &k, .size = sizeof k };
r = toku_lt_acquire_write_lock(lt, db_a, txn_a, &key); assert(r == 0);
}
// release the locks
r = toku_lt_unlock(lt, txn_a); assert(r == 0);
// shutdown
r = toku_lt_close(lt); assert(r == 0);
r = toku_ltm_close(ltm); assert(r == 0);
return 0;
}
...@@ -3274,6 +3274,9 @@ locked_c_getf_set_range_reverse(DBC *c, u_int32_t flag, DBT * key, YDB_CALLBACK_ ...@@ -3274,6 +3274,9 @@ locked_c_getf_set_range_reverse(DBC *c, u_int32_t flag, DBT * key, YDB_CALLBACK_
toku_ydb_lock(); int r = toku_c_getf_set_range_reverse(c, flag, key, f, extra); toku_ydb_unlock(); return r; toku_ydb_lock(); int r = toku_c_getf_set_range_reverse(c, flag, key, f, extra); toku_ydb_unlock(); return r;
} }
// Get a range lock.
// Return when the range lock is acquired or the default lock tree timeout has expired.
// The ydb mutex must be held when called and may be released when waiting in the lock tree.
static int static int
get_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key, toku_lock_type lock_type) { get_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key, toku_lock_type lock_type) {
int r; int r;
...@@ -3289,8 +3292,9 @@ get_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key, t ...@@ -3289,8 +3292,9 @@ get_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key, t
return r; return r;
} }
// Setup and start an asynchronous lock request.
static int static int
get_range_lock_request(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key, toku_lock_type lock_type, toku_lock_request *lock_request) { start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key, toku_lock_type lock_type, toku_lock_request *lock_request) {
int r; int r;
DB_TXN *txn_anc = toku_txn_ancestor(txn); DB_TXN *txn_anc = toku_txn_ancestor(txn);
r = toku_txn_add_lt(txn_anc, db->i->lt); r = toku_txn_add_lt(txn_anc, db->i->lt);
...@@ -3450,7 +3454,7 @@ c_del_callback(DBT const *key, DBT const *val, void *extra) { ...@@ -3450,7 +3454,7 @@ c_del_callback(DBT const *key, DBT const *val, void *extra) {
//Lock: //Lock:
// left(key,val)==right(key,val) == (key, val); // left(key,val)==right(key,val) == (key, val);
r = get_range_lock_request(context->db, context->txn, key, key, LOCK_REQUEST_WRITE, &context->lock_request); r = start_range_lock(context->db, context->txn, key, key, LOCK_REQUEST_WRITE, &context->lock_request);
//Give brt-layer an error (if any) to return from toku_c_getf_current_binding //Give brt-layer an error (if any) to return from toku_c_getf_current_binding
return r; return r;
...@@ -3511,8 +3515,8 @@ c_getf_first_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, ...@@ -3511,8 +3515,8 @@ c_getf_first_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val,
if (context->do_locking) { if (context->do_locking) {
const DBT *left_key = toku_lt_neg_infinity; const DBT *left_key = toku_lt_neg_infinity;
const DBT *right_key = key != NULL ? &found_key : toku_lt_infinity; const DBT *right_key = key != NULL ? &found_key : toku_lt_infinity;
r = get_range_lock_request(context->db, context->txn, left_key, right_key, r = start_range_lock(context->db, context->txn, left_key, right_key,
context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request); context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request);
} else } else
r = 0; r = 0;
...@@ -3564,8 +3568,8 @@ c_getf_last_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v ...@@ -3564,8 +3568,8 @@ c_getf_last_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v
if (context->do_locking) { if (context->do_locking) {
const DBT *left_key = key != NULL ? &found_key : toku_lt_neg_infinity; const DBT *left_key = key != NULL ? &found_key : toku_lt_neg_infinity;
const DBT *right_key = toku_lt_infinity; const DBT *right_key = toku_lt_infinity;
r = get_range_lock_request(context->db, context->txn, left_key, right_key, r = start_range_lock(context->db, context->txn, left_key, right_key,
context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request); context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request);
} else } else
r = 0; r = 0;
...@@ -3625,8 +3629,8 @@ c_getf_next_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v ...@@ -3625,8 +3629,8 @@ c_getf_next_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v
toku_brt_cursor_peek(context->c, &prevkey, &prevval); toku_brt_cursor_peek(context->c, &prevkey, &prevval);
const DBT *left_key = prevkey; const DBT *left_key = prevkey;
const DBT *right_key = key != NULL ? &found_key : toku_lt_infinity; const DBT *right_key = key != NULL ? &found_key : toku_lt_infinity;
r = get_range_lock_request(context->db, context->txn, left_key, right_key, r = start_range_lock(context->db, context->txn, left_key, right_key,
context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request); context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request);
} else } else
r = 0; r = 0;
...@@ -3685,8 +3689,8 @@ c_getf_prev_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v ...@@ -3685,8 +3689,8 @@ c_getf_prev_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v
toku_brt_cursor_peek(context->c, &prevkey, &prevval); toku_brt_cursor_peek(context->c, &prevkey, &prevval);
const DBT *left_key = key != NULL ? &found_key : toku_lt_neg_infinity; const DBT *left_key = key != NULL ? &found_key : toku_lt_neg_infinity;
const DBT *right_key = prevkey; const DBT *right_key = prevkey;
r = get_range_lock_request(context->db, context->txn, left_key, right_key, r = start_range_lock(context->db, context->txn, left_key, right_key,
context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request); context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request);
} else } else
r = 0; r = 0;
...@@ -3793,8 +3797,8 @@ c_getf_set_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, vo ...@@ -3793,8 +3797,8 @@ c_getf_set_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, vo
// left(key,val) = (input_key, -infinity) // left(key,val) = (input_key, -infinity)
// right(key,val) = (input_key, found ? found_val : infinity) // right(key,val) = (input_key, found ? found_val : infinity)
if (context->do_locking) { if (context->do_locking) {
r = get_range_lock_request(context->db, context->txn, super_context->input_key, super_context->input_key, r = start_range_lock(context->db, context->txn, super_context->input_key, super_context->input_key,
context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request); context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request);
} else } else
r = 0; r = 0;
...@@ -3851,8 +3855,8 @@ c_getf_set_range_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec v ...@@ -3851,8 +3855,8 @@ c_getf_set_range_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec v
if (context->do_locking) { if (context->do_locking) {
const DBT *left_key = super_context->input_key; const DBT *left_key = super_context->input_key;
const DBT *right_key = key != NULL ? &found_key : toku_lt_infinity; const DBT *right_key = key != NULL ? &found_key : toku_lt_infinity;
r = get_range_lock_request(context->db, context->txn, left_key, right_key, r = start_range_lock(context->db, context->txn, left_key, right_key,
context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request); context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request);
} else } else
r = 0; r = 0;
...@@ -3909,8 +3913,8 @@ c_getf_set_range_reverse_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, b ...@@ -3909,8 +3913,8 @@ c_getf_set_range_reverse_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, b
if (context->do_locking) { if (context->do_locking) {
const DBT *left_key = key != NULL ? &found_key : toku_lt_neg_infinity; const DBT *left_key = key != NULL ? &found_key : toku_lt_neg_infinity;
const DBT *right_key = super_context->input_key; const DBT *right_key = super_context->input_key;
r = get_range_lock_request(context->db, context->txn, left_key, right_key, r = start_range_lock(context->db, context->txn, left_key, right_key,
context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request); context->is_write_op ? LOCK_REQUEST_WRITE : LOCK_REQUEST_READ, &context->lock_request);
} else } else
r = 0; r = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment