From 82a1bd8c7db284a5fc72476a857d067ad9f6a269 Mon Sep 17 00:00:00 2001
From: Yoni Fogel <yoni@tokutek.com>
Date: Mon, 17 Mar 2008 20:39:01 +0000
Subject: [PATCH] addresses #523 implemented lock escalation MINUS flagging of
 messy transactions implemented tests for lock escalation

git-svn-id: file:///svn/tokudb@2902 c7de825b-a66e-492c-adef-691d508d4ae1
---
 src/lock_tree/locktree.c                      | 146 ++++-
 src/lock_tree/locktree.h                      |   3 +
 src/lock_tree/tests/test.h                    |  13 +
 .../tests/test_00060_lock_escalation.c        | 522 ++++++++++++++++++
 4 files changed, 666 insertions(+), 18 deletions(-)
 create mode 100644 src/lock_tree/tests/test_00060_lock_escalation.c

diff --git a/src/lock_tree/locktree.c b/src/lock_tree/locktree.c
index 78381abe118..83e0009c424 100644
--- a/src/lock_tree/locktree.c
+++ b/src/lock_tree/locktree.c
@@ -122,10 +122,11 @@ int toku__lt_point_cmp(const toku_point* x, const toku_point* y) {
                    toku__recreate_DBT(&point_2, y->data_payload, y->data_len));
 }
 
-static inline BOOL toku__lt_fraction_ranges_free(toku_lock_tree* tree, u_int32_t denominator) {
-    assert(tree && tree->num_ranges && denominator);
-    return *tree->num_ranges <=
-            tree->max_ranges - (tree->max_ranges / denominator);
+static inline BOOL toku__lt_percent_ranges_free(toku_lock_tree* tree, 
+                                                u_int32_t percent) {
+    assert(tree && tree->num_ranges && (percent <= 100));
+    u_int64_t max_ranges64= tree->max_ranges;
+    return *tree->num_ranges <= max_ranges64 * (100 - percent) / 100;
 }
 
 /* Functions to update the range count and compare it with the
@@ -283,6 +284,12 @@ static inline int toku__lt_selfwrite(toku_lock_tree* tree, DB_TXN* txn,
 }
 
 
+static inline BOOL toku__dominated(toku_range* query, toku_range* by) {
+    assert(query && by);
+    return (toku__lt_point_cmp(query->left,  by->left) >= 0 &&
+            toku__lt_point_cmp(query->right, by->right) <= 0);
+}
+
 /*
     This function only supports non-overlapping trees.
     Uses the standard definition of dominated from the design document.
@@ -316,8 +323,7 @@ static inline int toku__lt_rt_dominates(toku_lock_tree* tree, toku_range* query,
         return 0;
     }
     assert(numfound == 1);
-    *dominated = (toku__lt_point_cmp(query->left,  buf[0].left) >= 0 &&
-                  toku__lt_point_cmp(query->right, buf[0].right) <= 0);
+    *dominated = toku__dominated(query, &buf[0]);
     return 0;
 }
 
@@ -392,7 +398,8 @@ static inline int toku__lt_meets(toku_lock_tree* tree, toku_range* query,
 
 /* 
     Determines whether 'query' meets 'rt' at txn2 not equal to txn.
-    This function supports all range trees, but queries must be a single point. 
+    This function supports all range trees, but queries must either be a single point,
+    or the range tree is homogenous.
     Uses the standard definition of 'query' meets 'tree' at 'data' from the
     design document.
 */
@@ -400,7 +407,7 @@ static inline int toku__lt_meets_peer(toku_lock_tree* tree, toku_range* query,
                                        toku_range_tree* rt, BOOL is_homogenous,
                                        DB_TXN* self, BOOL* met) {
     assert(tree && query && rt && self && met);
-    assert(query->left == query->right);
+    assert(query->left == query->right || is_homogenous);
 
     const u_int32_t query_size = is_homogenous ? 1 : 2;
     toku_range   buffer[2];
@@ -1071,7 +1078,7 @@ static inline int toku__lt_write_range_conflicts_reads(toku_lock_tree* tree,
     
     while ((forest = toku_rth_next(tree->rth)) != NULL) {
         if (forest->self_read != NULL && forest->hash_key != txn) {
-            r = toku__lt_meets_peer(tree, query, forest->self_read, TRUE, txn,
+            r = toku__lt_meets_peer(tree, query, forest->self_read, TRUE, txn,///
                             &met);
             if (r!=0) { goto cleanup; }
             if (met)  { r = DB_LOCK_NOTGRANTED; goto cleanup; }
@@ -1082,7 +1089,13 @@ static inline int toku__lt_write_range_conflicts_reads(toku_lock_tree* tree,
     return r;
 }
 
-static inline int toku__border_escalation_trivial(toku_lock_tree* tree, toku_range* border_range, BOOL* trivial) {
+/*
+    Tests whether a range from BorderWrite is trivially escalatable.
+    i.e. No read locks from other transactions overlap the range.
+*/
+static inline int toku__border_escalation_trivial(toku_lock_tree* tree, 
+                                                  toku_range* border_range, 
+                                                  BOOL* trivial) {
     assert(tree && border_range && trivial);
     int r = ENOSYS;
 
@@ -1099,23 +1112,111 @@ static inline int toku__border_escalation_trivial(toku_lock_tree* tree, toku_ran
     return r;
 }
 
-static inline int toku__escalate_reads_from_border_range(toku_lock_tree* tree, toku_range* border_range) {
-    assert(tree && border_range);
-    return 0;
+/*  */
+static inline int toku__escalate_writes_from_border_range(toku_lock_tree* tree, 
+                                                          toku_range* border_range) {
+    int r = ENOSYS;
+    if (!tree || !border_range) { r = EINVAL; goto cleanup; }
+    DB_TXN* txn = border_range->data;
+    toku_range_tree* self_write = toku__lt_ifexist_selfwrite(tree, txn);
+    assert(self_write);
+    toku_range query = *border_range;
+    u_int32_t numfound = 0;
+    query.data = NULL;
+
+    /*
+     * Delete all overlapping ranges
+     */
+    r = toku_rt_find(self_write, &query, 0, &tree->buf, &tree->buflen, &numfound);
+    if (r != 0) { goto cleanup; }
+    u_int32_t i;
+    for (i = 0; i < numfound; i++) {
+        r = toku_rt_delete(self_write, &tree->buf[i]);
+        if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; }
+        /*
+         * Clean up memory that is not referenced by border_range.
+         */
+        if (tree->buf[i].left != tree->buf[i].right &&
+            toku__lt_p_independent(tree->buf[i].left, border_range)) {
+            /* Do not double free if left and right are same point. */
+            toku__p_free(tree, tree->buf[i].left);
+        }
+        if (toku__lt_p_independent(tree->buf[i].right, border_range)) {
+            toku__p_free(tree, tree->buf[i].right);
+        }
+    }
+    
+    /*
+     * Insert border_range into self_write table
+     */
+    r = toku_rt_insert(self_write, border_range);
+    if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; }
+
+    toku__lt_range_incr(tree, numfound);
+    r = 0;
+cleanup:
+    return r;
 }
-static inline int toku__escalate_writes_from_border_range(toku_lock_tree* tree, toku_range* border_range) {
-    assert(tree && border_range);
-    return 0;
+
+static inline int toku__escalate_reads_from_border_range(toku_lock_tree* tree, 
+                                                         toku_range* border_range) {
+    int r = ENOSYS;
+    if (!tree || !border_range) { r = EINVAL; goto cleanup; }
+    DB_TXN* txn = border_range->data;
+    toku_range_tree* self_read = toku__lt_ifexist_selfread(tree, txn);
+    if (self_read == NULL) { r = 0; goto cleanup; }
+    toku_range query = *border_range;
+    u_int32_t numfound = 0;
+    query.data = NULL;
+
+    /*
+     * Delete all overlapping ranges
+     */
+    r = toku_rt_find(self_read, &query, 0, &tree->buf, &tree->buflen, &numfound);
+    if (r != 0) { goto cleanup; }
+    u_int32_t i;
+    u_int32_t removed = 0;
+    for (i = 0; i < numfound; i++) {
+        if (!toku__dominated(&tree->buf[0], border_range)) { continue; }
+        r = toku_rt_delete(self_read, &tree->buf[i]);
+        if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; }
+#if !defined(TOKU_RT_NOOVERLAPS)
+        r = toku_rt_delete(tree->mainread, &tree->buf[i]);
+        if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; }
+#endif /* TOKU_RT_NOOVERLAPS */
+        removed++;
+        /*
+         * Clean up memory that is not referenced by border_range.
+         */
+        if (tree->buf[i].left != tree->buf[i].right &&
+            toku__lt_p_independent(tree->buf[i].left, border_range)) {
+            /* Do not double free if left and right are same point. */
+            toku__p_free(tree, tree->buf[i].left);
+        }
+        if (toku__lt_p_independent(tree->buf[i].right, border_range)) {
+            toku__p_free(tree, tree->buf[i].right);
+        }
+    }
+    
+    toku__lt_range_decr(tree, removed);
+    r = 0;
+cleanup:
+    return r;
 }
 
+
 /*
- * TODO: implement function
+ * For each range in BorderWrite:
+ *     Check to see if range conflicts any read lock held by other transactions
+ *     Replaces all writes that overlap with range
+ *     Deletes all reads dominated by range
  */
 static int toku__do_escalation(toku_lock_tree* tree, BOOL* locks_available) {
     int r = ENOSYS;
     if (!tree || !locks_available)      { r = EINVAL; goto cleanup; }
     if (!tree->lock_escalation_allowed) { r = EDOM;   goto cleanup; }
     toku_range_tree* border       = tree->borderwrite;
+    assert(border);
     toku_range       border_range;
     BOOL             found        = FALSE;
     BOOL             trivial      = FALSE;
@@ -1125,6 +1226,10 @@ static int toku__do_escalation(toku_lock_tree* tree, BOOL* locks_available) {
         r = toku__border_escalation_trivial(tree, &border_range, &trivial);
         if (r!=0)     { goto cleanup; }
         if (!trivial) { continue; }
+        /*
+         * At this point, we determine that escalation is simple,
+         * Attempt escalation
+         */
         r = toku__escalate_writes_from_border_range(tree, &border_range);
         if (r!=0)     { r = toku__lt_panic(tree, r); goto cleanup; }
         r = toku__escalate_reads_from_border_range(tree, &border_range);
@@ -1133,7 +1238,8 @@ static int toku__do_escalation(toku_lock_tree* tree, BOOL* locks_available) {
     r = 0;
     *locks_available = toku__lt_range_test_incr(tree, 0);
     /* Escalation is allowed if 1/10th of the locks (or more) are free. */
-    tree->lock_escalation_allowed = toku__lt_fraction_ranges_free(tree, 10);
+    tree->lock_escalation_allowed = toku__lt_percent_ranges_free(tree,
+                                             TOKU_DISABLE_ESCALATION_THRESHOLD);
 cleanup:
     if (r!=0) {
         if (tree && locks_available) {
@@ -1470,6 +1576,10 @@ int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn) {
     
     toku__lt_range_decr(tree, ranges);
 
+    if (toku__lt_percent_ranges_free(tree, TOKU_ENABLE_ESCALATION_THRESHOLD)) {
+        tree->lock_escalation_allowed = TRUE;
+    }
+
     return 0;
 }
 
diff --git a/src/lock_tree/locktree.h b/src/lock_tree/locktree.h
index 2c2d5dcf59d..4dcf324a333 100644
--- a/src/lock_tree/locktree.h
+++ b/src/lock_tree/locktree.h
@@ -30,6 +30,9 @@ typedef enum {
                                    state */
 } TOKU_LT_ERROR;
 
+#define TOKU_DISABLE_ESCALATION_THRESHOLD 10
+#define TOKU_ENABLE_ESCALATION_THRESHOLD  20
+
 /** Convert error codes into a human-readable error message */
 char* toku_lt_strerror(TOKU_LT_ERROR r /**< Error code */) 
                        __attribute__((const,pure));
diff --git a/src/lock_tree/tests/test.h b/src/lock_tree/tests/test.h
index 3651ed31ea3..8ba02439dcc 100644
--- a/src/lock_tree/tests/test.h
+++ b/src/lock_tree/tests/test.h
@@ -34,6 +34,19 @@ int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2le
     return key1len-key2len;
 }
 
+int intcmp(DB *db __attribute__((__unused__)), const DBT* a, const DBT* b) {
+    int x = *(int*)a->data;
+    int y = *(int*)b->data;
+
+    return x - y;
+}
+
+int charcmp(DB *db __attribute__((__unused__)), const DBT* a, const DBT* b) {
+    int x = *(char*)a->data;
+    int y = *(char*)b->data;
+
+    return x - y;
+}
 
 int dbcmp (DB *db __attribute__((__unused__)), const DBT *a, const DBT*b) {
     return toku_keycompare(a->data, a->size, b->data, b->size);
diff --git a/src/lock_tree/tests/test_00060_lock_escalation.c b/src/lock_tree/tests/test_00060_lock_escalation.c
new file mode 100644
index 00000000000..8534e49bb6f
--- /dev/null
+++ b/src/lock_tree/tests/test_00060_lock_escalation.c
@@ -0,0 +1,522 @@
+/* We are going to test whether create and close properly check their input. */
+
+#include "test.h"
+
+toku_range_tree* toku__lt_ifexist_selfwrite(toku_lock_tree* tree, DB_TXN* txn);
+toku_range_tree* toku__lt_ifexist_selfread(toku_lock_tree* tree, DB_TXN* txn);
+
+int r;
+toku_lock_tree* lt  = NULL;
+DB*             db  = (DB*)1;
+u_int32_t max_locks = 10;
+u_int32_t num_locks = 0;
+BOOL duplicates = FALSE;
+int  nums[10000];
+
+DBT _key_left[2];
+DBT _key_right[2];
+DBT _data_left[2];
+DBT _data_right[2];
+DBT* key_left[2]   ;
+DBT* key_right[2]  ;
+DBT* data_left [2] ;
+DBT* data_right[2] ;
+
+toku_point qleft, qright;
+toku_range query;
+toku_range* buf;
+unsigned buflen;
+unsigned numfound;
+
+void init_query(BOOL dups) {  
+    init_point(&qleft,  lt);
+    init_point(&qright, lt);
+    
+    qleft.key_payload  = (void *) toku_lt_neg_infinity;
+    qright.key_payload = (void *) toku_lt_infinity;
+
+    if (dups) {
+        qleft.data_payload  = qleft.key_payload;
+        qright.data_payload = qright.key_payload;
+    }
+
+    memset(&query,0,sizeof(query));
+    query.left  = &qleft;
+    query.right = &qright;
+}
+
+void setup_tree(BOOL dups) {
+    num_locks = 0;
+    r = toku_lt_create(&lt, db, dups, dbpanic, max_locks, &num_locks, intcmp, charcmp,
+                       toku_malloc, toku_free, toku_realloc);
+    CKERR(r);
+    assert(lt);
+    init_query(dups);
+}
+
+void close_tree(void) {
+    assert(lt);
+    r = toku_lt_close(lt);
+    CKERR(r);
+    lt = NULL;
+}
+
+typedef enum { null = -1, infinite = -2, neg_infinite = -3 } lt_infty;
+
+DBT* set_to_infty(DBT *dbt, lt_infty value) {
+    if (value == infinite) return (DBT*)toku_lt_infinity;
+    if (value == neg_infinite) return (DBT*)toku_lt_neg_infinity;
+    if (value == null) return dbt_init(dbt, NULL, 0);
+    assert(value >= 0);
+    return                    dbt_init(dbt, &nums[value], sizeof(nums[0]));
+}
+
+
+void lt_insert(BOOL dups, int r_expect, char txn, int key_l, int data_l, 
+               int key_r, int data_r, BOOL read_flag) {
+    DBT _key_left;
+    DBT _key_right;
+    DBT _data_left;
+    DBT _data_right;
+    DBT* key_left   = &_key_left;
+    DBT* key_right  = &_key_right;
+    DBT* data_left  = dups ? &_data_left : NULL;
+    DBT* data_right = dups ? &_data_right: NULL;
+
+    key_left  = set_to_infty(key_left,  key_l);
+    key_right = set_to_infty(key_right, key_r);
+    if (dups) {
+        if (key_left != &_key_left) data_left = key_left;
+        else data_left = set_to_infty(data_left,  data_l);
+        if (key_right != &_key_right) data_right = key_right;
+        else data_right = set_to_infty(data_right,  data_r);
+        assert(key_left  && data_left);
+        assert(!read_flag || (key_right && data_right));
+    } else {
+        data_left = data_right = NULL;
+        assert(key_left  && !data_left);
+        assert(!read_flag || (key_right && !data_right));
+    }
+
+    DB_TXN* local_txn = (DB_TXN*) (size_t) txn;
+
+    if (read_flag)
+        r = toku_lt_acquire_range_read_lock(lt, local_txn, key_left,  data_left,
+                                            key_right, data_right);
+    else
+        r = toku_lt_acquire_write_lock(lt, local_txn, key_left,  data_left);
+    CKERR2(r, r_expect);
+}
+
+void lt_insert_read(BOOL dups, int r_expect, char txn, int key_l, int data_l, 
+                    int key_r, int data_r) {
+    lt_insert(dups, r_expect, txn, key_l, data_l, key_r, data_r, TRUE);
+}
+
+void lt_insert_write(BOOL dups, int r_expect, char txn, int key_l, int data_l) {
+    lt_insert(dups, r_expect, txn, key_l, data_l, 0, 0, FALSE);
+}
+
+
+void setup_payload_len(void** payload, u_int32_t* len, int val) {
+    assert(payload && len);
+
+    DBT temp;
+
+    *payload = set_to_infty(&temp, val);
+    
+    if (val < 0) {
+        *len = 0;
+    }
+    else {
+        *len = sizeof(nums[0]);
+        *payload = temp.data;
+    }
+}
+
+void lt_find(BOOL dups, toku_range_tree* rt,
+                        unsigned k, int key_l, int data_l,
+                                    int key_r, int data_r,
+                                    char char_txn) {
+
+    r = toku_rt_find(rt, &query, 0, &buf, &buflen, &numfound);
+    CKERR(r);
+    assert(numfound==k);
+
+    DB_TXN* find_txn = (DB_TXN *) (size_t) char_txn;
+
+    toku_point left, right;
+    init_point(&left, lt);
+    setup_payload_len(&left.key_payload, &left.key_len, key_l);
+    if (dups) {
+        if (key_l < null) left.data_payload = left.key_payload;
+        else setup_payload_len(&left.data_payload, &left.data_len, data_l);
+    }
+    init_point(&right, lt);
+    setup_payload_len(&right.key_payload, &right.key_len, key_r);
+    if (dups) {
+        if (key_r < null) right.data_payload = right.key_payload;
+        else setup_payload_len(&right.data_payload, &right.data_len, data_r);
+    }
+    unsigned i;
+    for (i = 0; i < numfound; i++) {
+        if (toku__lt_point_cmp(buf[i].left,  &left ) == 0 &&
+            toku__lt_point_cmp(buf[i].right, &right) == 0 &&
+            buf[i].data == find_txn) return;
+    }
+    assert(FALSE);  //Crash since we didn't find it.
+}
+
+void lt_unlock(char ctxn) {
+  int r;
+  r = toku_lt_unlock(lt, (DB_TXN *) (size_t) ctxn);
+  CKERR(r);
+}
+              
+void runtest(BOOL dups) {
+    
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 2, 1);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_write(dups, 0, 'a', 2, 1);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_read (dups, 0, 'a', 1, 1, 1, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 1, 1, 1, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_read (dups, 0, 'b', 1, 1, 1, 1);
+    lt_insert_write(dups, DB_LOCK_NOTGRANTED, 'a', 1, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_write(dups, 0, 'a', 2, 1);
+    lt_insert_write(dups, 0, 'a', 3, 1);
+    lt_insert_write(dups, 0, 'a', 4, 1);
+    lt_insert_write(dups, 0, 'a', 5, 1);
+    lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 2, 1, 4, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_write(dups, 0, 'a', 2, 1);
+    lt_insert_write(dups, 0, 'a', 3, 1);
+    lt_insert_write(dups, 0, 'a', 4, 1);
+    lt_insert_write(dups, 0, 'a', 5, 1);
+    lt_insert_write (dups, DB_LOCK_NOTGRANTED, 'b', 2, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_write(dups, 0, 'a', 2, 1);
+    lt_insert_write(dups, 0, 'a', 4, 1);
+    lt_insert_write(dups, 0, 'a', 5, 1);
+    lt_insert_read (dups, 0, 'b', 3, 1, 3, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_write(dups, 0, 'a', 2, 1);
+    lt_insert_write(dups, 0, 'a', 4, 1);
+    lt_insert_write(dups, 0, 'a', 5, 1);
+    lt_insert_read (dups, 0, 'b', 3, 1, 3, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'b', 1, 1);
+    lt_insert_write(dups, 0, 'b', 2, 1);
+    lt_insert_write(dups, 0, 'b', 3, 1);
+    lt_insert_write(dups, 0, 'b', 4, 1);
+    lt_insert_write(dups, 0, 'a', 5, 1);
+    lt_insert_write(dups, 0, 'a', 6, 1);
+    lt_insert_write(dups, 0, 'a', 7, 1);
+    lt_insert_write(dups, 0, 'a', 8, 1);
+    lt_insert_write(dups, 0, 'a', 9, 1);
+    lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'b', 1, 1);
+    lt_insert_write(dups, 0, 'b', 2, 1);
+    lt_insert_write(dups, 0, 'b', 3, 1);
+    lt_insert_write(dups, 0, 'b', 4, 1);
+    lt_insert_write(dups, 0, 'b', 5, 1);
+    lt_insert_write(dups, 0, 'b', 6, 1);
+    lt_insert_write(dups, 0, 'b', 7, 1);
+    lt_insert_write(dups, 0, 'b', 8, 1);
+    lt_insert_write(dups, 0, 'b', 9, 1);
+    lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_write(dups, 0, 'a', 2, 1);
+    lt_insert_write(dups, 0, 'a', 3, 1);
+    lt_insert_write(dups, 0, 'a', 4, 1);
+    lt_insert_read (dups, 0, 'a', 3, 1, 7, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'b', 1, 1);
+    lt_insert_write(dups, 0, 'b', 2, 1);
+    lt_insert_write(dups, 0, 'b', 3, 1);
+    lt_insert_write(dups, 0, 'b', 4, 1);
+    lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_write(dups, 0, 'a', 2, 1);
+    lt_insert_write(dups, 0, 'a', 4, 1);
+    lt_insert_write(dups, 0, 'a', 5, 1);
+    lt_insert_write(dups, 0, 'a', 3, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_write(dups, 0, 'a', 2, 1);
+    lt_insert_write(dups, 0, 'b', 4, 1);
+    lt_insert_write(dups, 0, 'b', 5, 1);
+    lt_insert_write(dups, 0, 'a', 3, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_write(dups, 0, 'a', 2, 1);
+    lt_insert_write(dups, 0, 'a', 3, 1);
+    lt_insert_write(dups, 0, 'a', 4, 1);
+    lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 3, 1, 3, 1);
+    lt_unlock('a');
+    lt_insert_write(dups, 0, 'b', 3, 1);
+    lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 3, 1);
+    lt_unlock('b');
+    lt_insert_read (dups, 0, 'a', 3, 1, 3, 1);
+    close_tree();
+    /* ********************* */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_write(dups, 0, 'a', 3, 1);
+    lt_insert_write(dups, 0, 'b', 2, 1);
+    lt_unlock('b');
+    close_tree();
+    /* ********************* */
+}
+
+
+void run_escalation_test(BOOL dups) {
+    int i = 0;
+/* ******************** */
+/* 1 transaction request 1000 write locks, make sure it succeeds*/
+    setup_tree(dups);
+    assert(lt->lock_escalation_allowed);
+    for (i = 0; i < 1000; i++) {
+        lt_insert_write(dups, 0, 'a', i, i);
+        assert(lt->lock_escalation_allowed);
+    }
+    close_tree();
+/* ******************** */
+/* interleaving transactions,
+   TXN A grabs 1 3 5 7 9 
+   TXN B grabs 2 4 6 8 10
+   make sure lock escalation fails, and that we run out of locks */
+    setup_tree(dups);
+    // this should grab ten locks successfully
+    for (i = 1; i < 10; i+=2) {
+        lt_insert_write(dups, 0, 'a', i, i);
+        lt_insert_write(dups, 0, 'b', i+1, i+1);
+    }
+    lt_insert_write(dups, ENOMEM, 'a', 100, 100);
+    lt_insert_write(dups, ENOMEM, 'b', 100, 100);
+    lt_insert_write(dups, ENOMEM, 'c', 100, 100);
+    close_tree();
+/* ******************** */
+/*
+   test that escalation allowed flag goes from FALSE->TRUE->FALSE
+   TXN A grabs 1 3 5 7 9 
+   TXN B grabs 2 4 6 8 10
+   try to grab another lock, fail, lock escalation should be disabled
+   txn B gets freed
+   lock escalation should be reenabled
+   txn C grabs 60,70,80,90,100
+   lock escalation should work
+*/
+    setup_tree(dups);
+    assert(lt->lock_escalation_allowed);
+    // this should grab ten locks successfully
+    for (i = 1; i < 10; i+=2) {
+        lt_insert_write(dups, 0, 'a', i, i);
+        lt_insert_write(dups, 0, 'b', i+1, i+1);
+    }
+    assert(lt->lock_escalation_allowed);
+    lt_insert_write(dups, ENOMEM, 'a', 100, 100);
+    assert(!lt->lock_escalation_allowed);
+    lt_insert_write(dups, ENOMEM, 'b', 100, 100);
+    assert(!lt->lock_escalation_allowed);
+    lt_insert_write(dups, ENOMEM, 'c', 100, 100);
+    assert(!lt->lock_escalation_allowed);
+    lt_insert_read(dups, ENOMEM, 'a', 100, 100, 100, 100);
+    lt_insert_read(dups, ENOMEM, 'b', 100, 100, 100, 100);
+    lt_insert_read(dups, ENOMEM, 'c', 100, 100, 100, 100);
+    lt_unlock('b');
+    assert(lt->lock_escalation_allowed);
+    for (i = 50; i < 1000; i++) {
+        lt_insert_write(dups, 0, 'c', i, i);
+        assert(lt->lock_escalation_allowed);
+    }
+    close_tree();
+/* ******************** */
+/*
+   txn A grabs 0,1,2,...,8  (9 locks)
+   txn B grabs read lock [5,7]
+   txn C attempts to grab lock, escalation, and lock grab, should fail
+   lock
+*/
+    setup_tree(dups);
+    assert(lt->lock_escalation_allowed);
+    // this should grab ten locks successfully
+    for (i = 0; i < 10; i ++) { 
+        if (i == 2 || i == 5) { continue; }
+        lt_insert_write(dups, 0, 'a', i, i);
+    }
+    lt_insert_read (dups, 0, 'b', 5, 5, 5, 5);
+    lt_insert_read (dups, 0, 'b', 2, 2, 2, 2);
+    lt_insert_write(dups, ENOMEM, 'a', 100, 100);
+    lt_insert_write(dups, ENOMEM, 'b', 100, 100);
+    lt_insert_write(dups, ENOMEM, 'c', 100, 100);
+    lt_insert_read(dups, ENOMEM, 'a', 100, 100, 100, 100);
+    lt_insert_read(dups, ENOMEM, 'b', 100, 100, 100, 100);
+    lt_insert_read(dups, ENOMEM, 'c', 100, 100, 100, 100);
+    lt_unlock('b');
+    assert(lt->lock_escalation_allowed);
+    for (i = 50; i < 1000; i++) {
+        lt_insert_write(dups, 0, 'c', i, i);
+        assert(lt->lock_escalation_allowed);
+    }
+    close_tree();
+/* ******************** */
+#if 0 //Only use when messy transactions are enabled.
+/*
+   txn A grabs 0,1,2,...,8  (9 locks)
+   txn B grabs read lock [5,7]
+   txn C attempts to grab lock, escalation, and lock grab, should fail
+   lock
+*/
+    setup_tree(dups);
+    assert(lt->lock_escalation_allowed);
+    // this should grab ten locks successfully
+    for (i = 0; i < 7; i++) { 
+        lt_insert_write(dups, 0, 'a', i, i);
+    }
+    lt_insert_read (dups, 0, 'b', 5, 5, 6, 6);
+    lt_insert_read (dups, 0, 'b', 2, 2, 3, 3);
+    lt_insert_write(dups, ENOMEM, 'a', 100, 100);
+    lt_insert_write(dups, ENOMEM, 'b', 100, 100);
+    lt_insert_write(dups, ENOMEM, 'c', 100, 100);
+    lt_insert_read(dups, ENOMEM, 'a', 100, 100, 100, 100);
+    lt_insert_read(dups, ENOMEM, 'b', 100, 100, 100, 100);
+    lt_insert_read(dups, ENOMEM, 'c', 100, 100, 100, 100);
+    lt_unlock('b');
+    assert(lt->lock_escalation_allowed);
+    for (i = 50; i < 1000; i++) {
+        lt_insert_write(dups, 0, 'c', i, i);
+        assert(lt->lock_escalation_allowed);
+    }
+    close_tree();
+#endif
+/* ******************** */
+/* escalate on read lock, */
+    setup_tree(dups);
+    for (i = 0; i < 10; i++) {
+        lt_insert_write(dups, 0, 'a', i, i);
+    }
+    lt_insert_read(dups, 0, 'a', 10, 10, 10, 10);
+    close_tree();
+/* ******************** */
+/* escalate on read lock of different transaction. */
+    setup_tree(dups);
+    for (i = 0; i < 10; i++) {
+        lt_insert_write(dups, 0, 'a', i, i);
+    }
+    lt_insert_read(dups, 0, 'b', 10, 10, 10, 10);
+    close_tree();
+/* ******************** */
+/* txn A grabs write lock 0,9
+   txn A grabs read lock 1,2,3,4,5,6,7,8
+   txn B grabs write lock 11, 12, should succeed */
+    setup_tree(dups);
+    for (i = 1; i < 9; i++) {
+        lt_insert_read(dups, 0, 'a', i, i, i, i);
+    }
+    lt_insert_write(dups, 0, 'a', 0, 0);
+    lt_insert_write(dups, 0, 'a', 9, 9);
+    for (i = 50; i < 1000; i++) {
+        lt_insert_write(dups, 0, 'b', i, i);
+        assert(lt->lock_escalation_allowed);
+    }
+    close_tree();
+/* ******************** */
+/* [1-A-5]   [10-B-15]   [20-A-25]  BORDER WRITE
+    [2B]  [6C] [12A]       [22A]    READ LOCKS
+    check that only last borderwrite range is escalated */
+    setup_tree(dups);
+    lt_insert_write(dups, 0, 'a', 1, 1);
+    lt_insert_write(dups, 0, 'a', 5, 5);
+    lt_insert_write(dups, 0, 'b', 10, 10);
+    lt_insert_write(dups, 0, 'b', 15, 15);
+    lt_insert_write(dups, 0, 'a', 20, 20);
+    lt_insert_write(dups, 0, 'a', 23, 23);
+    lt_insert_write(dups, 0, 'a', 25, 25);
+
+    lt_insert_read(dups, 0, 'b', 2, 2, 2, 2);
+    lt_insert_read(dups, 0, 'a', 12, 12, 12, 12);
+    lt_insert_read(dups, 0, 'a', 22, 22, 22, 22);
+    
+    lt_insert_read(dups, 0, 'a', 100, 100, 100, 100);
+
+    lt_insert_write(dups, DB_LOCK_NOTGRANTED, 'b', 24, 24);
+    lt_insert_write(dups, 0, 'a', 14, 14);
+    lt_insert_write(dups, 0, 'b', 4, 4);
+    close_tree();
+/* ******************** */
+}
+
+void init_test(void) {
+    unsigned i;
+    for (i = 0; i < sizeof(nums)/sizeof(nums[0]); i++) nums[i] = i;
+
+    buflen = 64;
+    buf = (toku_range*) toku_malloc(buflen*sizeof(toku_range));
+}
+
+
+
+
+
+int main(int argc, const char *argv[]) {
+    parse_args(argc, argv);
+
+    init_test();
+
+    run_escalation_test(FALSE);
+    run_escalation_test(TRUE);
+
+    return 0;
+}
-- 
2.30.9