Commit a34734cc authored by Zardosht Kasheff's avatar Zardosht Kasheff

refs #54, improve the performance of hot indexing. This change does two things:

 - gets indexer to run in reverse, that is, start at the end and run to beginning
 - refines locking a bit. An estimate of the position of the hot indexer is stored,
   that is cheap to look at. Threads that use this estimate with a mutex either do
   only a quick comparison or set it to a new value. Threads doing writes (with XXX_multiple calls)
   will check their position with respect to the estimate, and if they see the hot indexer
   is already past where they will modify, they don't grab the more expensive indexer
   lock. For insertion workloads that go to the end of the main dictionary of a table/collection,
   this check should practically always pass.
parent b23b43a0
...@@ -120,8 +120,8 @@ toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE ft_handle, TOKUTXN ...@@ -120,8 +120,8 @@ toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE ft_handle, TOKUTXN
if (result == 0) { if (result == 0) {
// TODO move the leaf mode to the ft cursor constructor // TODO move the leaf mode to the ft cursor constructor
toku_ft_cursor_set_leaf_mode(le_cursor->ft_cursor); toku_ft_cursor_set_leaf_mode(le_cursor->ft_cursor);
le_cursor->neg_infinity = true; le_cursor->neg_infinity = false;
le_cursor->pos_infinity = false; le_cursor->pos_infinity = true;
// zero out the fake DB. this is a rare operation so it's not too slow. // zero out the fake DB. this is a rare operation so it's not too slow.
memset(&le_cursor->fake_db, 0, sizeof(le_cursor->fake_db)); memset(&le_cursor->fake_db, 0, sizeof(le_cursor->fake_db));
} }
...@@ -147,21 +147,21 @@ void toku_le_cursor_close(LE_CURSOR le_cursor) { ...@@ -147,21 +147,21 @@ void toku_le_cursor_close(LE_CURSOR le_cursor) {
int int
toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) { toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) {
int result; int result;
if (le_cursor->pos_infinity) { if (le_cursor->neg_infinity) {
result = DB_NOTFOUND; result = DB_NOTFOUND;
} else { } else {
le_cursor->neg_infinity = false; le_cursor->pos_infinity = false;
// TODO replace this with a non deprecated function. Which? // TODO replace this with a non deprecated function. Which?
result = toku_ft_cursor_get(le_cursor->ft_cursor, NULL, getf, getf_v, DB_NEXT); result = toku_ft_cursor_get(le_cursor->ft_cursor, NULL, getf, getf_v, DB_PREV);
if (result == DB_NOTFOUND) { if (result == DB_NOTFOUND) {
le_cursor->pos_infinity = true; le_cursor->neg_infinity = true;
} }
} }
return result; return result;
} }
bool bool
toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) { toku_le_cursor_is_key_greater_or_equal(LE_CURSOR le_cursor, const DBT *key) {
bool result; bool result;
if (le_cursor->neg_infinity) { if (le_cursor->neg_infinity) {
result = true; // all keys are greater than -infinity result = true; // all keys are greater than -infinity
...@@ -175,7 +175,7 @@ toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) { ...@@ -175,7 +175,7 @@ toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) {
// get the current position from the cursor and compare it to the given key. // get the current position from the cursor and compare it to the given key.
DBT *cursor_key = &le_cursor->ft_cursor->key; DBT *cursor_key = &le_cursor->ft_cursor->key;
int r = keycompare(&le_cursor->fake_db, cursor_key, key); int r = keycompare(&le_cursor->fake_db, cursor_key, key);
if (r < 0) { if (r <= 0) {
result = true; // key is right of the cursor key result = true; // key is right of the cursor key
} else { } else {
result = false; // key is at or left of the cursor key result = false; // key is at or left of the cursor key
...@@ -183,3 +183,17 @@ toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) { ...@@ -183,3 +183,17 @@ toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) {
} }
return result; return result;
} }
void
toku_le_cursor_update_estimate(LE_CURSOR le_cursor, DBT* estimate) {
// don't handle these edge cases, not worth it.
// estimate stays same
if (le_cursor->pos_infinity || le_cursor->neg_infinity) {
return;
}
DBT *cursor_key = &le_cursor->ft_cursor->key;
estimate->data = toku_xrealloc(estimate->data, cursor_key->size);
memcpy(estimate->data, cursor_key->data, cursor_key->size);
estimate->size = cursor_key->size;
estimate->flags = DB_DBT_REALLOC;
}
...@@ -121,6 +121,10 @@ int toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void ...@@ -121,6 +121,10 @@ int toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void
// The LE_CURSOR position is intialized to -infinity. Any key comparision with -infinity returns true. // The LE_CURSOR position is intialized to -infinity. Any key comparision with -infinity returns true.
// When the cursor runs off the right edge of the tree, the LE_CURSOR position is set to +infinity. Any key comparision with +infinity // When the cursor runs off the right edge of the tree, the LE_CURSOR position is set to +infinity. Any key comparision with +infinity
// returns false. // returns false.
bool toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key); bool toku_le_cursor_is_key_greater_or_equal(LE_CURSOR le_cursor, const DBT *key);
// extracts position of le_cursor into estimate. Responsibility of caller to handle
// thread safety. Caller (the indexer), does so by ensuring indexer lock is held
void toku_le_cursor_update_estimate(LE_CURSOR le_cursor, DBT* estimate);
#endif #endif
...@@ -252,7 +252,7 @@ test_provdel(const char *logdir, const char *fname, int n) { ...@@ -252,7 +252,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
assert(le->keylen == sizeof (int)); assert(le->keylen == sizeof (int));
int ii; int ii;
memcpy(&ii, le->u.mvcc.key_xrs, le->keylen); memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
assert((int) toku_htonl(i) == ii); assert((int) toku_htonl(n-i-1) == ii);
} }
assert(i == n); assert(i == n);
......
...@@ -188,9 +188,9 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -188,9 +188,9 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
toku_cachetable_close(&ct); toku_cachetable_close(&ct);
} }
// test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at -infinity // test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at +infinity
static void static void
test_neg_infinity(const char *fname, int n) { test_pos_infinity(const char *fname, int n) {
if (verbose) fprintf(stderr, "%s %s %d\n", __FUNCTION__, fname, n); if (verbose) fprintf(stderr, "%s %s %d\n", __FUNCTION__, fname, n);
int error; int error;
...@@ -210,8 +210,8 @@ test_neg_infinity(const char *fname, int n) { ...@@ -210,8 +210,8 @@ test_neg_infinity(const char *fname, int n) {
int k = toku_htonl(i); int k = toku_htonl(i);
DBT key; DBT key;
toku_fill_dbt(&key, &k, sizeof k); toku_fill_dbt(&key, &k, sizeof k);
int right = toku_le_cursor_is_key_greater(cursor, &key); int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key);
assert(right == true); assert(right == false);
} }
toku_le_cursor_close(cursor); toku_le_cursor_close(cursor);
...@@ -222,9 +222,9 @@ test_neg_infinity(const char *fname, int n) { ...@@ -222,9 +222,9 @@ test_neg_infinity(const char *fname, int n) {
toku_cachetable_close(&ct); toku_cachetable_close(&ct);
} }
// test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at +infinity // test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at -infinity
static void static void
test_pos_infinity(const char *fname, int n) { test_neg_infinity(const char *fname, int n) {
if (verbose) fprintf(stderr, "%s %s %d\n", __FUNCTION__, fname, n); if (verbose) fprintf(stderr, "%s %s %d\n", __FUNCTION__, fname, n);
int error; int error;
...@@ -246,7 +246,7 @@ test_pos_infinity(const char *fname, int n) { ...@@ -246,7 +246,7 @@ test_pos_infinity(const char *fname, int n) {
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC; toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
int i; int i;
for (i = 0; ; i++) { for (i = n-1; ; i--) {
error = le_cursor_get_next(cursor, &val); error = le_cursor_get_next(cursor, &val);
if (error != 0) if (error != 0)
break; break;
...@@ -259,7 +259,7 @@ test_pos_infinity(const char *fname, int n) { ...@@ -259,7 +259,7 @@ test_pos_infinity(const char *fname, int n) {
assert((int) toku_htonl(i) == ii); assert((int) toku_htonl(i) == ii);
} }
assert(i == n); assert(i == -1);
toku_destroy_dbt(&key); toku_destroy_dbt(&key);
toku_destroy_dbt(&val); toku_destroy_dbt(&val);
...@@ -268,8 +268,8 @@ test_pos_infinity(const char *fname, int n) { ...@@ -268,8 +268,8 @@ test_pos_infinity(const char *fname, int n) {
int k = toku_htonl(i); int k = toku_htonl(i);
DBT key2; DBT key2;
toku_fill_dbt(&key2, &k, sizeof k); toku_fill_dbt(&key2, &k, sizeof k);
int right = toku_le_cursor_is_key_greater(cursor, &key2); int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key2);
assert(right == false); assert(right == true);
} }
toku_le_cursor_close(cursor); toku_le_cursor_close(cursor);
...@@ -315,24 +315,26 @@ test_between(const char *fname, int n) { ...@@ -315,24 +315,26 @@ test_between(const char *fname, int n) {
assert(le->keylen == sizeof (int)); assert(le->keylen == sizeof (int));
int ii; int ii;
memcpy(&ii, le->u.mvcc.key_xrs, le->keylen); memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
assert((int) toku_htonl(i) == ii); // hot indexer runs in reverse, therefore need
// to check n-i-1
assert((int) toku_htonl(n-i-1) == ii);
// test that 0 .. i is not right of the cursor // test 0 .. i-1
for (int j = 0; j <= i; j++) { for (int j = 0; j <= i; j++) {
int k = toku_htonl(j); int k = toku_htonl(n-j-1);
DBT key2; DBT key2;
toku_fill_dbt(&key2, &k, sizeof k); toku_fill_dbt(&key2, &k, sizeof k);
int right = toku_le_cursor_is_key_greater(cursor, &key2); int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key2);
assert(right == false); assert(right == true);
} }
// test that i+1 .. n is left of the cursor // test i .. n
for (int j = i + 1; j <= n; j++) { for (int j = i+1; j < n; j++) {
int k = toku_htonl(j); int k = toku_htonl(n-j-1);
DBT key2; DBT key2;
toku_fill_dbt(&key2, &k, sizeof k); toku_fill_dbt(&key2, &k, sizeof k);
int right = toku_le_cursor_is_key_greater(cursor, &key2); int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key2);
assert(right == true); assert(right == false);
} }
} }
...@@ -373,8 +375,8 @@ test_main (int argc , const char *argv[]) { ...@@ -373,8 +375,8 @@ test_main (int argc , const char *argv[]) {
const int n = 10; const int n = 10;
create_populate_tree(".", "ftfile", n); create_populate_tree(".", "ftfile", n);
test_neg_infinity("ftfile", n);
test_pos_infinity("ftfile", n); test_pos_infinity("ftfile", n);
test_neg_infinity("ftfile", n);
test_between("ftfile", n); test_between("ftfile", n);
return 0; return 0;
......
...@@ -220,7 +220,7 @@ walk_tree(const char *fname, int n) { ...@@ -220,7 +220,7 @@ walk_tree(const char *fname, int n) {
assert(le->keylen == sizeof (int)); assert(le->keylen == sizeof (int));
int ii; int ii;
memcpy(&ii, le->u.mvcc.key_xrs, le->keylen); memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
assert((int) toku_htonl(i) == ii); assert((int) toku_htonl(n-i-1) == ii);
} }
assert(i == n); assert(i == n);
......
...@@ -125,6 +125,8 @@ struct __toku_indexer_internal { ...@@ -125,6 +125,8 @@ struct __toku_indexer_internal {
DB_ENV *env; DB_ENV *env;
DB_TXN *txn; DB_TXN *txn;
toku_mutex_t indexer_lock; toku_mutex_t indexer_lock;
toku_mutex_t indexer_estimate_lock;
DBT position_estimate;
DB *src_db; DB *src_db;
int N; int N;
DB **dest_dbs; /* [N] */ DB **dest_dbs; /* [N] */
......
...@@ -189,6 +189,8 @@ static void ...@@ -189,6 +189,8 @@ static void
free_indexer_resources(DB_INDEXER *indexer) { free_indexer_resources(DB_INDEXER *indexer) {
if ( indexer->i ) { if ( indexer->i ) {
toku_mutex_destroy(&indexer->i->indexer_lock); toku_mutex_destroy(&indexer->i->indexer_lock);
toku_mutex_destroy(&indexer->i->indexer_estimate_lock);
toku_destroy_dbt(&indexer->i->position_estimate);
if ( indexer->i->lec ) { if ( indexer->i->lec ) {
toku_le_cursor_close(indexer->i->lec); toku_le_cursor_close(indexer->i->lec);
} }
...@@ -222,6 +224,49 @@ toku_indexer_unlock(DB_INDEXER* indexer) { ...@@ -222,6 +224,49 @@ toku_indexer_unlock(DB_INDEXER* indexer) {
toku_mutex_unlock(&indexer->i->indexer_lock); toku_mutex_unlock(&indexer->i->indexer_lock);
} }
// a shortcut call
//
// a cheap(er) call to see if a key must be inserted
// into the DB. If true, then we know we have to insert.
// If false, then we don't know, and have to check again
// after grabbing the indexer lock
bool
toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key) {
bool retval = false;
toku_mutex_lock(&indexer->i->indexer_estimate_lock);
// if we have no position estimate, we can't tell, so return false
if (indexer->i->position_estimate.data == NULL) {
retval = false;
}
else {
FT_HANDLE ft_handle = indexer->i->src_db->i->ft_handle;
ft_compare_func keycompare = toku_ft_get_bt_compare(ft_handle);
int r = keycompare(
indexer->i->src_db,
&indexer->i->position_estimate,
key
);
// if key > position_estimate, then we know the indexer cursor
// is past key, and we can safely say that associated values of
// key must be inserted into the indexer's db
if (r < 0) {
retval = true;
}
else {
retval = false;
}
}
toku_mutex_unlock(&indexer->i->indexer_estimate_lock);
return retval;
}
void
toku_indexer_update_estimate(DB_INDEXER* indexer) {
toku_mutex_lock(&indexer->i->indexer_estimate_lock);
toku_le_cursor_update_estimate(indexer->i->lec, &indexer->i->position_estimate);
toku_mutex_unlock(&indexer->i->indexer_estimate_lock);
}
// forward declare the test-only wrapper function for undo-do // forward declare the test-only wrapper function for undo-do
static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule); static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule);
...@@ -272,6 +317,8 @@ toku_indexer_create_indexer(DB_ENV *env, ...@@ -272,6 +317,8 @@ toku_indexer_create_indexer(DB_ENV *env,
indexer->abort = abort_indexer; indexer->abort = abort_indexer;
toku_mutex_init(&indexer->i->indexer_lock, NULL); toku_mutex_init(&indexer->i->indexer_lock, NULL);
toku_mutex_init(&indexer->i->indexer_estimate_lock, NULL);
toku_init_dbt(&indexer->i->position_estimate);
// //
// create and close a dummy loader to get redirection going for the hot indexer // create and close a dummy loader to get redirection going for the hot indexer
...@@ -356,8 +403,14 @@ toku_indexer_set_error_callback(DB_INDEXER *indexer, ...@@ -356,8 +403,14 @@ toku_indexer_set_error_callback(DB_INDEXER *indexer,
// a key is to the right of the indexer's cursor if it compares // a key is to the right of the indexer's cursor if it compares
// greater than the current le cursor position. // greater than the current le cursor position.
bool bool
toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, const DBT *key) { toku_indexer_should_insert_key(DB_INDEXER *indexer, const DBT *key) {
return toku_le_cursor_is_key_greater(indexer->i->lec, key); // the hot indexer runs from the end to the beginning, it gets the largest keys first
//
// if key is less than indexer's position, then we should NOT insert it because
// the indexer will get to it. If it is greater or equal, that means the indexer
// has already processed the key, and will not get to it, therefore, we need
// to handle it
return toku_le_cursor_is_key_greater_or_equal(indexer->i->lec, key);
} }
// initialize provisional info by allocating enough space to hold provisional // initialize provisional info by allocating enough space to hold provisional
......
...@@ -97,6 +97,8 @@ PATENT RIGHTS GRANT: ...@@ -97,6 +97,8 @@ PATENT RIGHTS GRANT:
void toku_indexer_lock(DB_INDEXER* indexer); void toku_indexer_lock(DB_INDEXER* indexer);
void toku_indexer_unlock(DB_INDEXER* indexer); void toku_indexer_unlock(DB_INDEXER* indexer);
bool toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key);
void toku_indexer_update_estimate(DB_INDEXER* indexer);
// The indexer populates multiple destination db's from the contents of one source db. // The indexer populates multiple destination db's from the contents of one source db.
// While the indexes are being built by the indexer, the application may continue to // While the indexes are being built by the indexer, the application may continue to
...@@ -146,7 +148,7 @@ int toku_indexer_set_error_callback(DB_INDEXER *indexer, ...@@ -146,7 +148,7 @@ int toku_indexer_set_error_callback(DB_INDEXER *indexer,
// Is the key right of the indexer's leaf entry cursor? // Is the key right of the indexer's leaf entry cursor?
// Returns true if right of le_cursor // Returns true if right of le_cursor
// Returns false if left or equal to le_cursor // Returns false if left or equal to le_cursor
bool toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, const DBT *key); bool toku_indexer_should_insert_key(DB_INDEXER *indexer, const DBT *key);
// Get the indexer's source db // Get the indexer's source db
DB *toku_indexer_get_src_db(DB_INDEXER *indexer); DB *toku_indexer_get_src_db(DB_INDEXER *indexer);
......
...@@ -440,7 +440,7 @@ lookup_src_db(uint32_t num_dbs, DB *db_array[], DB *src_db) { ...@@ -440,7 +440,7 @@ lookup_src_db(uint32_t num_dbs, DB *db_array[], DB *src_db) {
} }
static int static int
do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DB *src_db, const DBT *src_key) { do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DB *src_db, const DBT *src_key, bool indexer_shortcut) {
int r = 0; int r = 0;
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
for (uint32_t which_db = 0; r == 0 && which_db < num_dbs; which_db++) { for (uint32_t which_db = 0; r == 0 && which_db < num_dbs; which_db++) {
...@@ -452,7 +452,7 @@ do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], ...@@ -452,7 +452,7 @@ do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[],
// indexers cursor. we have to get the src_db from the indexer and find it in the db_array. // indexers cursor. we have to get the src_db from the indexer and find it in the db_array.
int do_delete = true; int do_delete = true;
DB_INDEXER *indexer = toku_db_get_indexer(db); DB_INDEXER *indexer = toku_db_get_indexer(db);
if (indexer) { // if this db is the index under construction if (indexer && !indexer_shortcut) { // if this db is the index under construction
DB *indexer_src_db = toku_indexer_get_src_db(indexer); DB *indexer_src_db = toku_indexer_get_src_db(indexer);
invariant(indexer_src_db != NULL); invariant(indexer_src_db != NULL);
const DBT *indexer_src_key; const DBT *indexer_src_key;
...@@ -465,7 +465,8 @@ do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], ...@@ -465,7 +465,8 @@ do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[],
invariant(keys[which_src_db].size == 1); invariant(keys[which_src_db].size == 1);
indexer_src_key = &keys[which_src_db].dbts[0]; indexer_src_key = &keys[which_src_db].dbts[0];
} }
do_delete = !toku_indexer_is_key_right_of_le_cursor(indexer, indexer_src_key); do_delete = toku_indexer_should_insert_key(indexer, indexer_src_key);
toku_indexer_update_estimate(indexer);
} }
if (do_delete) { if (do_delete) {
for (uint32_t i = 0; i < keys[which_db].size; i++) { for (uint32_t i = 0; i < keys[which_db].size; i++) {
...@@ -485,7 +486,9 @@ static int ...@@ -485,7 +486,9 @@ static int
get_indexer_if_exists( get_indexer_if_exists(
uint32_t num_dbs, uint32_t num_dbs,
DB **db_array, DB **db_array,
DB_INDEXER** indexerp DB *src_db,
DB_INDEXER** indexerp,
bool *src_db_is_indexer_src
) )
{ {
int r = 0; int r = 0;
...@@ -502,6 +505,13 @@ get_indexer_if_exists( ...@@ -502,6 +505,13 @@ get_indexer_if_exists(
} }
} }
if (r == 0) { if (r == 0) {
if (first_indexer) {
DB* indexer_src_db = toku_indexer_get_src_db(first_indexer);
// we should just make this an invariant
if (src_db == indexer_src_db) {
*src_db_is_indexer_src = true;
}
}
*indexerp = first_indexer; *indexerp = first_indexer;
} }
return r; return r;
...@@ -529,6 +539,9 @@ env_del_multiple( ...@@ -529,6 +539,9 @@ env_del_multiple(
uint32_t lock_flags[num_dbs]; uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs]; uint32_t remaining_flags[num_dbs];
FT_HANDLE brts[num_dbs]; FT_HANDLE brts[num_dbs];
bool indexer_lock_taken = false;
bool src_same = false;
bool indexer_shortcut = false;
if (!txn) { if (!txn) {
r = EINVAL; r = EINVAL;
goto cleanup; goto cleanup;
...@@ -539,7 +552,7 @@ env_del_multiple( ...@@ -539,7 +552,7 @@ env_del_multiple(
} }
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn); HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
r = get_indexer_if_exists(num_dbs, db_array, &indexer); r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
if (r) { if (r) {
goto cleanup; goto cleanup;
} }
...@@ -584,13 +597,23 @@ env_del_multiple( ...@@ -584,13 +597,23 @@ env_del_multiple(
} }
if (indexer) { if (indexer) {
// do a cheap check
if (src_same) {
bool may_insert = toku_indexer_may_insert(indexer, src_key);
if (!may_insert) {
toku_indexer_lock(indexer); toku_indexer_lock(indexer);
indexer_lock_taken = true;
}
else {
indexer_shortcut = true;
}
}
} }
toku_multi_operation_client_lock(); toku_multi_operation_client_lock();
log_del_multiple(txn, src_db, src_key, src_val, num_dbs, brts, del_keys); log_del_multiple(txn, src_db, src_key, src_val, num_dbs, brts, del_keys);
r = do_del_multiple(txn, num_dbs, db_array, del_keys, src_db, src_key); r = do_del_multiple(txn, num_dbs, db_array, del_keys, src_db, src_key, indexer_shortcut);
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
if (indexer) { if (indexer_lock_taken) {
toku_indexer_unlock(indexer); toku_indexer_unlock(indexer);
} }
...@@ -612,7 +635,7 @@ log_put_multiple(DB_TXN *txn, DB *src_db, const DBT *src_key, const DBT *src_val ...@@ -612,7 +635,7 @@ log_put_multiple(DB_TXN *txn, DB *src_db, const DBT *src_key, const DBT *src_val
} }
static int static int
do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DBT_ARRAY vals[], DB *src_db, const DBT *src_key) { do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DBT_ARRAY vals[], DB *src_db, const DBT *src_key, bool indexer_shortcut) {
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
for (uint32_t which_db = 0; which_db < num_dbs; which_db++) { for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
DB *db = db_array[which_db]; DB *db = db_array[which_db];
...@@ -624,7 +647,7 @@ do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], ...@@ -624,7 +647,7 @@ do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[],
if (keys[which_db].size > 0) { if (keys[which_db].size > 0) {
bool do_put = true; bool do_put = true;
DB_INDEXER *indexer = toku_db_get_indexer(db); DB_INDEXER *indexer = toku_db_get_indexer(db);
if (indexer) { // if this db is the index under construction if (indexer && !indexer_shortcut) { // if this db is the index under construction
DB *indexer_src_db = toku_indexer_get_src_db(indexer); DB *indexer_src_db = toku_indexer_get_src_db(indexer);
invariant(indexer_src_db != NULL); invariant(indexer_src_db != NULL);
const DBT *indexer_src_key; const DBT *indexer_src_key;
...@@ -637,7 +660,8 @@ do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], ...@@ -637,7 +660,8 @@ do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[],
invariant(keys[which_src_db].size == 1); invariant(keys[which_src_db].size == 1);
indexer_src_key = &keys[which_src_db].dbts[0]; indexer_src_key = &keys[which_src_db].dbts[0];
} }
do_put = !toku_indexer_is_key_right_of_le_cursor(indexer, indexer_src_key); do_put = toku_indexer_should_insert_key(indexer, indexer_src_key);
toku_indexer_update_estimate(indexer);
} }
if (do_put) { if (do_put) {
for (uint32_t i = 0; i < keys[which_db].size; i++) { for (uint32_t i = 0; i < keys[which_db].size; i++) {
...@@ -677,6 +701,9 @@ env_put_multiple_internal( ...@@ -677,6 +701,9 @@ env_put_multiple_internal(
uint32_t lock_flags[num_dbs]; uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs]; uint32_t remaining_flags[num_dbs];
FT_HANDLE brts[num_dbs]; FT_HANDLE brts[num_dbs];
bool indexer_shortcut = false;
bool indexer_lock_taken = false;
bool src_same = false;
if (!txn || !num_dbs) { if (!txn || !num_dbs) {
r = EINVAL; r = EINVAL;
...@@ -688,7 +715,7 @@ env_put_multiple_internal( ...@@ -688,7 +715,7 @@ env_put_multiple_internal(
} }
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn); HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
r = get_indexer_if_exists(num_dbs, db_array, &indexer); r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
if (r) { if (r) {
goto cleanup; goto cleanup;
} }
...@@ -749,13 +776,23 @@ env_put_multiple_internal( ...@@ -749,13 +776,23 @@ env_put_multiple_internal(
} }
if (indexer) { if (indexer) {
// do a cheap check
if (src_same) {
bool may_insert = toku_indexer_may_insert(indexer, src_key);
if (!may_insert) {
toku_indexer_lock(indexer); toku_indexer_lock(indexer);
indexer_lock_taken = true;
}
else {
indexer_shortcut = true;
}
}
} }
toku_multi_operation_client_lock(); toku_multi_operation_client_lock();
log_put_multiple(txn, src_db, src_key, src_val, num_dbs, brts); log_put_multiple(txn, src_db, src_key, src_val, num_dbs, brts);
r = do_put_multiple(txn, num_dbs, db_array, put_keys, put_vals, src_db, src_key); r = do_put_multiple(txn, num_dbs, db_array, put_keys, put_vals, src_db, src_key, indexer_shortcut);
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
if (indexer) { if (indexer_lock_taken) {
toku_indexer_unlock(indexer); toku_indexer_unlock(indexer);
} }
...@@ -787,6 +824,9 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, ...@@ -787,6 +824,9 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
DB_INDEXER* indexer = NULL; DB_INDEXER* indexer = NULL;
bool indexer_shortcut = false;
bool indexer_lock_taken = false;
bool src_same = false;
HANDLE_READ_ONLY_TXN(txn); HANDLE_READ_ONLY_TXN(txn);
DBT_ARRAY old_key_arrays[num_dbs]; DBT_ARRAY old_key_arrays[num_dbs];
DBT_ARRAY new_key_arrays[num_dbs]; DBT_ARRAY new_key_arrays[num_dbs];
...@@ -806,7 +846,7 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, ...@@ -806,7 +846,7 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
} }
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn); HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
r = get_indexer_if_exists(num_dbs, db_array, &indexer); r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
if (r) { if (r) {
goto cleanup; goto cleanup;
} }
...@@ -1008,12 +1048,24 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, ...@@ -1008,12 +1048,24 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
} }
} }
if (indexer) { if (indexer) {
// do a cheap check
if (src_same) {
bool may_insert =
toku_indexer_may_insert(indexer, old_src_key) &&
toku_indexer_may_insert(indexer, new_src_key);
if (!may_insert) {
toku_indexer_lock(indexer); toku_indexer_lock(indexer);
indexer_lock_taken = true;
}
else {
indexer_shortcut = true;
}
}
} }
toku_multi_operation_client_lock(); toku_multi_operation_client_lock();
if (r == 0 && n_del_dbs > 0) { if (r == 0 && n_del_dbs > 0) {
log_del_multiple(txn, src_db, old_src_key, old_src_data, n_del_dbs, del_fts, del_key_arrays); log_del_multiple(txn, src_db, old_src_key, old_src_data, n_del_dbs, del_fts, del_key_arrays);
r = do_del_multiple(txn, n_del_dbs, del_dbs, del_key_arrays, src_db, old_src_key); r = do_del_multiple(txn, n_del_dbs, del_dbs, del_key_arrays, src_db, old_src_key, indexer_shortcut);
} }
if (r == 0 && n_put_dbs > 0) { if (r == 0 && n_put_dbs > 0) {
...@@ -1022,10 +1074,10 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, ...@@ -1022,10 +1074,10 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
// recovery so we don't end up losing data. // recovery so we don't end up losing data.
// So unlike env->put_multiple, we ONLY log a 'put_multiple' log entry. // So unlike env->put_multiple, we ONLY log a 'put_multiple' log entry.
log_put_multiple(txn, src_db, new_src_key, new_src_data, n_put_dbs, put_fts); log_put_multiple(txn, src_db, new_src_key, new_src_data, n_put_dbs, put_fts);
r = do_put_multiple(txn, n_put_dbs, put_dbs, put_key_arrays, put_val_arrays, src_db, new_src_key); r = do_put_multiple(txn, n_put_dbs, put_dbs, put_key_arrays, put_val_arrays, src_db, new_src_key, indexer_shortcut);
} }
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
if (indexer) { if (indexer_lock_taken) {
toku_indexer_unlock(indexer); toku_indexer_unlock(indexer);
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment