From a34734ccbc2bd216df00adbf1aa29e576fadb368 Mon Sep 17 00:00:00 2001
From: Zardosht Kasheff <zardosht@gmail.com>
Date: Wed, 21 Aug 2013 15:10:05 -0400
Subject: [PATCH] refs #54, improve the performance of hot indexing. This
 change does two things:  - gets indexer to run in reverse, that is, start at
 the end and run to beginning  - refines locking a bit. An estimate of the
 position of the hot indexer is stored,    that is cheap to look at. Threads
 that use this estimate with a mutex either do    only a quick comparison or
 set it to a new value. Threads doing writes (with XXX_multiple calls)    will
 check their position with respect to the estimate, and if they see the hot
 indexer    is already past where they will modify, they don't grab the more
 expensive indexer    lock. For insertion workloads that go to the end of the
 main dictionary of a table/collection,    this check should practically
 always pass.

---
 ft/le-cursor.cc               | 30 +++++++++---
 ft/le-cursor.h                |  6 ++-
 ft/tests/le-cursor-provdel.cc |  2 +-
 ft/tests/le-cursor-right.cc   | 44 +++++++++--------
 ft/tests/le-cursor-walk.cc    | 10 ++--
 src/indexer-internal.h        |  4 +-
 src/indexer.cc                | 57 +++++++++++++++++++++-
 src/indexer.h                 |  4 +-
 src/ydb_write.cc              | 92 +++++++++++++++++++++++++++--------
 9 files changed, 189 insertions(+), 60 deletions(-)

diff --git a/ft/le-cursor.cc b/ft/le-cursor.cc
index 35b29a650b7..c57ab49879e 100644
--- a/ft/le-cursor.cc
+++ b/ft/le-cursor.cc
@@ -120,8 +120,8 @@ toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE ft_handle, TOKUTXN
         if (result == 0) {
             // TODO move the leaf mode to the ft cursor constructor
             toku_ft_cursor_set_leaf_mode(le_cursor->ft_cursor);
-            le_cursor->neg_infinity = true;
-            le_cursor->pos_infinity = false;
+            le_cursor->neg_infinity = false;
+            le_cursor->pos_infinity = true;
             // zero out the fake DB. this is a rare operation so it's not too slow.
             memset(&le_cursor->fake_db, 0, sizeof(le_cursor->fake_db));
         }
@@ -147,21 +147,21 @@ void toku_le_cursor_close(LE_CURSOR le_cursor) {
 int 
 toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) {
     int result;
-    if (le_cursor->pos_infinity) {
+    if (le_cursor->neg_infinity) {
         result = DB_NOTFOUND;
     } else {
-        le_cursor->neg_infinity = false;
+        le_cursor->pos_infinity = false;
         // TODO replace this with a non deprecated function. Which?
-        result = toku_ft_cursor_get(le_cursor->ft_cursor, NULL, getf, getf_v, DB_NEXT);
+        result = toku_ft_cursor_get(le_cursor->ft_cursor, NULL, getf, getf_v, DB_PREV);
         if (result == DB_NOTFOUND) {
-            le_cursor->pos_infinity = true;
+            le_cursor->neg_infinity = true;
         }
     }
     return result;
 }
 
 bool
-toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) {
+toku_le_cursor_is_key_greater_or_equal(LE_CURSOR le_cursor, const DBT *key) {
     bool result;
     if (le_cursor->neg_infinity) {
         result = true;      // all keys are greater than -infinity
@@ -175,7 +175,7 @@ toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) {
         // get the current position from the cursor and compare it to the given key.
         DBT *cursor_key = &le_cursor->ft_cursor->key;
         int r = keycompare(&le_cursor->fake_db, cursor_key, key);
-        if (r < 0) {
+        if (r <= 0) {
             result = true;  // key is right of the cursor key
         } else {
             result = false; // key is at or left of the cursor key
@@ -183,3 +183,17 @@ toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) {
     }
     return result;
 }
+
+void
+toku_le_cursor_update_estimate(LE_CURSOR le_cursor, DBT* estimate) {
+    // don't handle these edge cases, not worth it.
+    // estimate stays same
+    if (le_cursor->pos_infinity || le_cursor->neg_infinity) {
+        return;
+    }
+    DBT *cursor_key = &le_cursor->ft_cursor->key;
+    estimate->data = toku_xrealloc(estimate->data, cursor_key->size);
+    memcpy(estimate->data, cursor_key->data, cursor_key->size);
+    estimate->size = cursor_key->size;
+    estimate->flags = DB_DBT_REALLOC;
+}
diff --git a/ft/le-cursor.h b/ft/le-cursor.h
index 4ad960762d3..4bd24c47d98 100644
--- a/ft/le-cursor.h
+++ b/ft/le-cursor.h
@@ -121,6 +121,10 @@ int toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void
 // The LE_CURSOR position is intialized to -infinity. Any key comparision with -infinity returns true.
 // When the cursor runs off the right edge of the tree, the LE_CURSOR position is set to +infinity.  Any key comparision with +infinity
 // returns false.
-bool toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key);
+bool toku_le_cursor_is_key_greater_or_equal(LE_CURSOR le_cursor, const DBT *key);
+
+// extracts position of le_cursor into estimate. Responsibility of caller to handle
+// thread safety. Caller (the indexer), does so by ensuring indexer lock is held
+void toku_le_cursor_update_estimate(LE_CURSOR le_cursor, DBT* estimate);
 
 #endif
diff --git a/ft/tests/le-cursor-provdel.cc b/ft/tests/le-cursor-provdel.cc
index c71c49847de..a48d07b7e85 100644
--- a/ft/tests/le-cursor-provdel.cc
+++ b/ft/tests/le-cursor-provdel.cc
@@ -252,7 +252,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
         assert(le->keylen == sizeof (int));
         int ii;
         memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
-        assert((int) toku_htonl(i) == ii);
+        assert((int) toku_htonl(n-i-1) == ii);
     }
     assert(i == n);
 
diff --git a/ft/tests/le-cursor-right.cc b/ft/tests/le-cursor-right.cc
index 3db4cd9d812..6744bfdeeb8 100644
--- a/ft/tests/le-cursor-right.cc
+++ b/ft/tests/le-cursor-right.cc
@@ -188,9 +188,9 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
     toku_cachetable_close(&ct);
 }
 
-// test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at -infinity
+// test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at +infinity
 static void 
-test_neg_infinity(const char *fname, int n) {
+test_pos_infinity(const char *fname, int n) {
     if (verbose) fprintf(stderr, "%s %s %d\n", __FUNCTION__, fname, n);
     int error;
 
@@ -210,8 +210,8 @@ test_neg_infinity(const char *fname, int n) {
         int k = toku_htonl(i);
         DBT key;
         toku_fill_dbt(&key, &k, sizeof k);
-        int right = toku_le_cursor_is_key_greater(cursor, &key);
-        assert(right == true);
+        int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key);
+        assert(right == false);
     }
         
     toku_le_cursor_close(cursor);
@@ -222,9 +222,9 @@ test_neg_infinity(const char *fname, int n) {
     toku_cachetable_close(&ct);
 }
 
-// test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at +infinity
+// test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at -infinity
 static void 
-test_pos_infinity(const char *fname, int n) {
+test_neg_infinity(const char *fname, int n) {
     if (verbose) fprintf(stderr, "%s %s %d\n", __FUNCTION__, fname, n);
     int error;
 
@@ -246,7 +246,7 @@ test_pos_infinity(const char *fname, int n) {
     toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
 
     int i;
-    for (i = 0; ; i++) {
+    for (i = n-1; ; i--) {
         error = le_cursor_get_next(cursor, &val);
         if (error != 0) 
             break;
@@ -259,7 +259,7 @@ test_pos_infinity(const char *fname, int n) {
         assert((int) toku_htonl(i) == ii);
 
     }
-    assert(i == n);
+    assert(i == -1);
 
     toku_destroy_dbt(&key);
     toku_destroy_dbt(&val);
@@ -268,8 +268,8 @@ test_pos_infinity(const char *fname, int n) {
         int k = toku_htonl(i);
         DBT key2;
         toku_fill_dbt(&key2, &k, sizeof k);
-        int right = toku_le_cursor_is_key_greater(cursor, &key2);
-        assert(right == false);
+        int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key2);
+        assert(right == true);
     }
 
     toku_le_cursor_close(cursor);
@@ -315,24 +315,26 @@ test_between(const char *fname, int n) {
         assert(le->keylen == sizeof (int));
         int ii;
         memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
-        assert((int) toku_htonl(i) == ii);
+        // hot indexer runs in reverse, therefore need
+        // to check n-i-1
+        assert((int) toku_htonl(n-i-1) == ii);
 
-        // test that 0 .. i is not right of the cursor
+        // test 0 .. i-1 
         for (int j = 0; j <= i; j++) {
-            int k = toku_htonl(j);
+            int k = toku_htonl(n-j-1);
             DBT key2;
             toku_fill_dbt(&key2, &k, sizeof k);
-            int right = toku_le_cursor_is_key_greater(cursor, &key2);
-            assert(right == false);
+            int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key2);
+            assert(right == true);
         }
 
-        // test that i+1 .. n is left of the cursor
-        for (int j = i + 1; j <= n; j++) {
-            int k = toku_htonl(j);
+        // test i .. n
+        for (int j = i+1; j < n; j++) {
+            int k = toku_htonl(n-j-1);
             DBT key2;
             toku_fill_dbt(&key2, &k, sizeof k);
-            int right = toku_le_cursor_is_key_greater(cursor, &key2);
-            assert(right == true);
+            int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key2);
+            assert(right == false);
         }
 
     }
@@ -373,8 +375,8 @@ test_main (int argc , const char *argv[]) {
 
     const int n = 10;
     create_populate_tree(".", "ftfile", n);
-    test_neg_infinity("ftfile", n);
     test_pos_infinity("ftfile", n);
+    test_neg_infinity("ftfile", n);
     test_between("ftfile", n);
 
     return 0;
diff --git a/ft/tests/le-cursor-walk.cc b/ft/tests/le-cursor-walk.cc
index 01104699eae..c0b7866925f 100644
--- a/ft/tests/le-cursor-walk.cc
+++ b/ft/tests/le-cursor-walk.cc
@@ -208,10 +208,10 @@ walk_tree(const char *fname, int n) {
 
     int i;
     for (i = 0; ; i++) {
-	error = TOKUDB_TRY_AGAIN;
-	while (error == TOKUDB_TRY_AGAIN) {
-	    error = le_cursor_get_next(cursor, &val);
-	}
+        error = TOKUDB_TRY_AGAIN;
+        while (error == TOKUDB_TRY_AGAIN) {
+            error = le_cursor_get_next(cursor, &val);
+        }
         if (error != 0) 
             break;
 
@@ -220,7 +220,7 @@ walk_tree(const char *fname, int n) {
         assert(le->keylen == sizeof (int));
         int ii;
         memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
-        assert((int) toku_htonl(i) == ii);
+        assert((int) toku_htonl(n-i-1) == ii);
     }
     assert(i == n);
 
diff --git a/src/indexer-internal.h b/src/indexer-internal.h
index aed95d2dd6d..a6b2270801e 100644
--- a/src/indexer-internal.h
+++ b/src/indexer-internal.h
@@ -123,8 +123,10 @@ struct ule_prov_info {
 
 struct __toku_indexer_internal {
     DB_ENV *env;
-    DB_TXN *txn;    
+    DB_TXN *txn;
     toku_mutex_t indexer_lock;
+    toku_mutex_t indexer_estimate_lock;
+    DBT position_estimate;
     DB *src_db;
     int N;
     DB **dest_dbs; /* [N] */
diff --git a/src/indexer.cc b/src/indexer.cc
index 55a9271d946..c2b5f78eaed 100644
--- a/src/indexer.cc
+++ b/src/indexer.cc
@@ -189,6 +189,8 @@ static void
 free_indexer_resources(DB_INDEXER *indexer) {
     if ( indexer->i ) {        
         toku_mutex_destroy(&indexer->i->indexer_lock);
+        toku_mutex_destroy(&indexer->i->indexer_estimate_lock);
+        toku_destroy_dbt(&indexer->i->position_estimate);
         if ( indexer->i->lec ) {
             toku_le_cursor_close(indexer->i->lec);
         }
@@ -222,6 +224,49 @@ toku_indexer_unlock(DB_INDEXER* indexer) {
     toku_mutex_unlock(&indexer->i->indexer_lock);
 }
 
+// a shortcut call
+//
+// a cheap(er) call to see if a key must be inserted
+// into the DB. If true, then we know we have to insert.
+// If false, then we don't know, and have to check again
+// after grabbing the indexer lock
+bool
+toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key) {
+    bool retval = false;
+    toku_mutex_lock(&indexer->i->indexer_estimate_lock);
+    // if we have no position estimate, we can't tell, so return false
+    if (indexer->i->position_estimate.data == NULL) {
+        retval = false;
+    }
+    else {
+        FT_HANDLE ft_handle = indexer->i->src_db->i->ft_handle;
+        ft_compare_func keycompare = toku_ft_get_bt_compare(ft_handle);        
+        int r = keycompare(
+            indexer->i->src_db, 
+            &indexer->i->position_estimate, 
+            key
+            );
+        // if key > position_estimate, then we know the indexer cursor
+        // is past key, and we can safely say that associated values of 
+        // key must be inserted into the indexer's db
+        if (r  < 0) {
+            retval = true;
+        }
+        else {
+            retval = false;
+        }
+    }
+    toku_mutex_unlock(&indexer->i->indexer_estimate_lock);
+    return retval;
+}
+
+void
+toku_indexer_update_estimate(DB_INDEXER* indexer) {
+    toku_mutex_lock(&indexer->i->indexer_estimate_lock);
+    toku_le_cursor_update_estimate(indexer->i->lec, &indexer->i->position_estimate);
+    toku_mutex_unlock(&indexer->i->indexer_estimate_lock);
+}
+
 // forward declare the test-only wrapper function for undo-do
 static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule);
 
@@ -272,6 +317,8 @@ toku_indexer_create_indexer(DB_ENV *env,
     indexer->abort                 = abort_indexer;
 
     toku_mutex_init(&indexer->i->indexer_lock, NULL);
+    toku_mutex_init(&indexer->i->indexer_estimate_lock, NULL);
+    toku_init_dbt(&indexer->i->position_estimate);
 
     //
     // create and close a dummy loader to get redirection going for the hot indexer
@@ -356,8 +403,14 @@ toku_indexer_set_error_callback(DB_INDEXER *indexer,
 // a key is to the right of the indexer's cursor if it compares
 // greater than the current le cursor position.
 bool
-toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, const DBT *key) {
-    return toku_le_cursor_is_key_greater(indexer->i->lec, key);
+toku_indexer_should_insert_key(DB_INDEXER *indexer, const DBT *key) {
+    // the hot indexer runs from the end to the beginning, it gets the largest keys first
+    //
+    // if key is less than indexer's position, then we should NOT insert it because
+    // the indexer will get to it. If it is greater or equal, that means the indexer
+    // has already processed the key, and will not get to it, therefore, we need
+    // to handle it
+    return toku_le_cursor_is_key_greater_or_equal(indexer->i->lec, key);
 }
 
 // initialize provisional info by allocating enough space to hold provisional 
diff --git a/src/indexer.h b/src/indexer.h
index 46b302676d4..10543598825 100644
--- a/src/indexer.h
+++ b/src/indexer.h
@@ -97,6 +97,8 @@ PATENT RIGHTS GRANT:
 void toku_indexer_lock(DB_INDEXER* indexer);
 
 void toku_indexer_unlock(DB_INDEXER* indexer);
+bool toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key);
+void toku_indexer_update_estimate(DB_INDEXER* indexer);
 
 // The indexer populates multiple destination db's from the contents of one source db.
 // While the indexes are being built by the indexer, the application may continue to 
@@ -146,7 +148,7 @@ int toku_indexer_set_error_callback(DB_INDEXER *indexer,
 // Is the key right of the indexer's leaf entry cursor?
 // Returns true  if right of le_cursor
 // Returns false if left or equal to le_cursor
-bool toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, const DBT *key);
+bool toku_indexer_should_insert_key(DB_INDEXER *indexer, const DBT *key);
 
 // Get the indexer's source db
 DB *toku_indexer_get_src_db(DB_INDEXER *indexer);
diff --git a/src/ydb_write.cc b/src/ydb_write.cc
index 48d71af2066..08db94c1ca5 100644
--- a/src/ydb_write.cc
+++ b/src/ydb_write.cc
@@ -440,7 +440,7 @@ lookup_src_db(uint32_t num_dbs, DB *db_array[], DB *src_db) {
 }
 
 static int
-do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DB *src_db, const DBT *src_key) {
+do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DB *src_db, const DBT *src_key, bool indexer_shortcut) {
     int r = 0;
     TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
     for (uint32_t which_db = 0; r == 0 && which_db < num_dbs; which_db++) {
@@ -452,7 +452,7 @@ do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[],
         // indexers cursor.  we have to get the src_db from the indexer and find it in the db_array.
         int do_delete = true;
         DB_INDEXER *indexer = toku_db_get_indexer(db);
-        if (indexer) { // if this db is the index under construction
+        if (indexer && !indexer_shortcut) { // if this db is the index under construction
             DB *indexer_src_db = toku_indexer_get_src_db(indexer);
             invariant(indexer_src_db != NULL);
             const DBT *indexer_src_key;
@@ -465,7 +465,8 @@ do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[],
                 invariant(keys[which_src_db].size == 1);
                 indexer_src_key = &keys[which_src_db].dbts[0];
             }
-            do_delete = !toku_indexer_is_key_right_of_le_cursor(indexer, indexer_src_key);
+            do_delete = toku_indexer_should_insert_key(indexer, indexer_src_key);
+            toku_indexer_update_estimate(indexer);
         }
         if (do_delete) {
             for (uint32_t i = 0; i < keys[which_db].size; i++) {
@@ -485,7 +486,9 @@ static int
 get_indexer_if_exists(
     uint32_t num_dbs, 
     DB **db_array, 
-    DB_INDEXER** indexerp
+    DB *src_db,
+    DB_INDEXER** indexerp,
+    bool *src_db_is_indexer_src
     ) 
 {
     int r = 0;
@@ -502,6 +505,13 @@ get_indexer_if_exists(
         }
     }
     if (r == 0) {
+        if (first_indexer) {
+            DB* indexer_src_db = toku_indexer_get_src_db(first_indexer);
+            // we should just make this an invariant
+            if (src_db == indexer_src_db) {
+                *src_db_is_indexer_src = true;
+            }
+        }
         *indexerp = first_indexer;
     }
     return r;
@@ -529,6 +539,9 @@ env_del_multiple(
     uint32_t lock_flags[num_dbs];
     uint32_t remaining_flags[num_dbs];
     FT_HANDLE brts[num_dbs];
+    bool indexer_lock_taken = false;
+    bool src_same = false;
+    bool indexer_shortcut = false;
     if (!txn) {
         r = EINVAL;
         goto cleanup;
@@ -539,7 +552,7 @@ env_del_multiple(
     }
 
     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
-    r = get_indexer_if_exists(num_dbs, db_array, &indexer);
+    r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
     if (r) {
         goto cleanup;
     }
@@ -584,13 +597,23 @@ env_del_multiple(
     }
 
     if (indexer) {
-        toku_indexer_lock(indexer);
+        // do a cheap check
+        if (src_same) {
+            bool may_insert = toku_indexer_may_insert(indexer, src_key);
+            if (!may_insert) {
+                toku_indexer_lock(indexer);
+                indexer_lock_taken = true;
+            }
+            else {
+                indexer_shortcut = true;
+            }
+        }
     }
     toku_multi_operation_client_lock();
     log_del_multiple(txn, src_db, src_key, src_val, num_dbs, brts, del_keys);
-    r = do_del_multiple(txn, num_dbs, db_array, del_keys, src_db, src_key);
+    r = do_del_multiple(txn, num_dbs, db_array, del_keys, src_db, src_key, indexer_shortcut);
     toku_multi_operation_client_unlock();
-    if (indexer) {
+    if (indexer_lock_taken) {
         toku_indexer_unlock(indexer);
     }
 
@@ -612,7 +635,7 @@ log_put_multiple(DB_TXN *txn, DB *src_db, const DBT *src_key, const DBT *src_val
 }
 
 static int
-do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DBT_ARRAY vals[], DB *src_db, const DBT *src_key) {
+do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DBT_ARRAY vals[], DB *src_db, const DBT *src_key, bool indexer_shortcut) {
     TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
     for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
         DB *db = db_array[which_db];
@@ -624,7 +647,7 @@ do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[],
         if (keys[which_db].size > 0) {
             bool do_put = true;
             DB_INDEXER *indexer = toku_db_get_indexer(db);
-            if (indexer) { // if this db is the index under construction
+            if (indexer && !indexer_shortcut) { // if this db is the index under construction
                 DB *indexer_src_db = toku_indexer_get_src_db(indexer);
                 invariant(indexer_src_db != NULL);
                 const DBT *indexer_src_key;
@@ -637,7 +660,8 @@ do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[],
                     invariant(keys[which_src_db].size == 1);
                     indexer_src_key = &keys[which_src_db].dbts[0];
                 }
-                do_put = !toku_indexer_is_key_right_of_le_cursor(indexer, indexer_src_key);
+                do_put = toku_indexer_should_insert_key(indexer, indexer_src_key);
+                toku_indexer_update_estimate(indexer);
             }
             if (do_put) {
                 for (uint32_t i = 0; i < keys[which_db].size; i++) {
@@ -677,6 +701,9 @@ env_put_multiple_internal(
     uint32_t lock_flags[num_dbs];
     uint32_t remaining_flags[num_dbs];
     FT_HANDLE brts[num_dbs];
+    bool indexer_shortcut = false;
+    bool indexer_lock_taken = false;
+    bool src_same = false;
 
     if (!txn || !num_dbs) {
         r = EINVAL;
@@ -688,7 +715,7 @@ env_put_multiple_internal(
     }
 
     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
-    r = get_indexer_if_exists(num_dbs, db_array, &indexer);
+    r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
     if (r) {
         goto cleanup;
     }
@@ -749,13 +776,23 @@ env_put_multiple_internal(
     }
     
     if (indexer) {
-        toku_indexer_lock(indexer);
+        // do a cheap check
+        if (src_same) {
+            bool may_insert = toku_indexer_may_insert(indexer, src_key);
+            if (!may_insert) {
+                toku_indexer_lock(indexer);
+                indexer_lock_taken = true;
+            }
+            else {
+                indexer_shortcut = true;
+            }
+        }
     }
     toku_multi_operation_client_lock();
     log_put_multiple(txn, src_db, src_key, src_val, num_dbs, brts);
-    r = do_put_multiple(txn, num_dbs, db_array, put_keys, put_vals, src_db, src_key);
+    r = do_put_multiple(txn, num_dbs, db_array, put_keys, put_vals, src_db, src_key, indexer_shortcut);
     toku_multi_operation_client_unlock();
-    if (indexer) {
+    if (indexer_lock_taken) {
         toku_indexer_unlock(indexer);
     }
 
@@ -787,6 +824,9 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
 
     HANDLE_PANICKED_ENV(env);
     DB_INDEXER* indexer = NULL;
+    bool indexer_shortcut = false;
+    bool indexer_lock_taken = false;
+    bool src_same = false;
     HANDLE_READ_ONLY_TXN(txn);
     DBT_ARRAY old_key_arrays[num_dbs];
     DBT_ARRAY new_key_arrays[num_dbs];
@@ -806,7 +846,7 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
     }
 
     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
-    r = get_indexer_if_exists(num_dbs, db_array, &indexer);
+    r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
     if (r) {
         goto cleanup;
     }
@@ -1008,12 +1048,24 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
             }
         }
         if (indexer) {
-            toku_indexer_lock(indexer);
+            // do a cheap check
+            if (src_same) {
+                bool may_insert =
+                    toku_indexer_may_insert(indexer, old_src_key) &&
+                    toku_indexer_may_insert(indexer, new_src_key);
+                if (!may_insert) {
+                    toku_indexer_lock(indexer);
+                    indexer_lock_taken = true;
+                }
+                else {
+                    indexer_shortcut = true;
+                }
+            }
         }
         toku_multi_operation_client_lock();
         if (r == 0 && n_del_dbs > 0) {
             log_del_multiple(txn, src_db, old_src_key, old_src_data, n_del_dbs, del_fts, del_key_arrays);
-            r = do_del_multiple(txn, n_del_dbs, del_dbs, del_key_arrays, src_db, old_src_key);
+            r = do_del_multiple(txn, n_del_dbs, del_dbs, del_key_arrays, src_db, old_src_key, indexer_shortcut);
         }
 
         if (r == 0 && n_put_dbs > 0) {
@@ -1022,10 +1074,10 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
             // recovery so we don't end up losing data.
             // So unlike env->put_multiple, we ONLY log a 'put_multiple' log entry.
             log_put_multiple(txn, src_db, new_src_key, new_src_data, n_put_dbs, put_fts);
-            r = do_put_multiple(txn, n_put_dbs, put_dbs, put_key_arrays, put_val_arrays, src_db, new_src_key);
+            r = do_put_multiple(txn, n_put_dbs, put_dbs, put_key_arrays, put_val_arrays, src_db, new_src_key, indexer_shortcut);
         }
         toku_multi_operation_client_unlock();
-        if (indexer) {
+        if (indexer_lock_taken) {
             toku_indexer_unlock(indexer);
         }
     }
-- 
2.30.9