Commit bc03f282 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3043 merge a new hotindex undo function to main refs[t:3043]

git-svn-id: file:///svn/toku/tokudb@25766 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9bb62a9b
...@@ -10,6 +10,14 @@ ...@@ -10,6 +10,14 @@
#ifndef TOKU_INDEXER_INTERNAL_H #ifndef TOKU_INDEXER_INTERNAL_H
#define TOKU_INDEXER_INTERNAL_H #define TOKU_INDEXER_INTERNAL_H
// the indexer_commit_keys is a set of keys described by a DBT in the keys array.
// the array is a resizeable array with max size "max_keys" and current size "current_keys".
struct indexer_commit_keys {
int max_keys; // max number of keys
int current_keys; // number of valid keys
DBT *keys; // the variable length keys array
};
struct __toku_indexer_internal { struct __toku_indexer_internal {
DB_ENV *env; DB_ENV *env;
DB_TXN *txn; DB_TXN *txn;
...@@ -27,10 +35,15 @@ struct __toku_indexer_internal { ...@@ -27,10 +35,15 @@ struct __toku_indexer_internal {
FILENUM *fnums; /* [N] */ FILENUM *fnums; /* [N] */
FILENUMS filenums; FILENUMS filenums;
// undo state
struct indexer_commit_keys commit_keys; // set of keys to commit
DBT hotkey, hotval; // current hot key and value
DBT previous_insert_key; // previous insert key
// test functions // test functions
int (*undo_do)(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule); int (*undo_do)(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule);
int (*test_is_xid_live)(DB_INDEXER *indexer, TXNID xid); int (*test_is_xid_live)(DB_INDEXER *indexer, TXNID xid);
int (*test_maybe_lock_provisional_key)(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key); int (*test_lock_key)(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key);
int (*test_delete_provisional)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids); int (*test_delete_provisional)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
int (*test_delete_committed)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids); int (*test_delete_committed)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
int (*test_insert_provisional)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids); int (*test_insert_provisional)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids);
...@@ -41,6 +54,10 @@ struct __toku_indexer_internal { ...@@ -41,6 +54,10 @@ struct __toku_indexer_internal {
int test_only_flags; int test_only_flags;
}; };
void indexer_undo_do_init(DB_INDEXER *indexer);
void indexer_undo_do_destroy(DB_INDEXER *indexer);
int indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule); int indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule);
#endif #endif
...@@ -25,79 +25,221 @@ ...@@ -25,79 +25,221 @@
#include "indexer-internal.h" #include "indexer-internal.h"
static void
indexer_commit_keys_init(struct indexer_commit_keys *keys) {
keys->max_keys = keys->current_keys = 0;
keys->keys = NULL;
}
static void
indexer_commit_keys_destroy(struct indexer_commit_keys *keys) {
for (int i = 0; i < keys->max_keys; i++)
toku_destroy_dbt(&keys->keys[i]);
toku_free(keys->keys);
}
static int
indexer_commit_keys_max_valid(struct indexer_commit_keys *keys) {
return keys->current_keys;
}
static void
indexer_commit_keys_add(struct indexer_commit_keys *keys, size_t length, void *ptr) {
if (keys->current_keys >= keys->max_keys) {
int new_max_keys = keys->max_keys == 0 ? 8 : keys->max_keys * 2;
keys->keys = (DBT *) toku_realloc(keys->keys, new_max_keys * sizeof (DBT));
resource_assert(keys->keys);
for (int i = keys->current_keys; i < new_max_keys; i++)
toku_init_dbt(&keys->keys[i]);
keys->max_keys = new_max_keys;
}
DBT *key = &keys->keys[keys->current_keys];
if (key->flags == 0)
assert(key->data == NULL);
key->flags = DB_DBT_REALLOC;
toku_dbt_set(length, ptr, key, NULL);
keys->current_keys++;
}
static void
indexer_commit_keys_set_empty(struct indexer_commit_keys *keys) {
keys->current_keys = 0;
}
// internal functions // internal functions
static int indexer_setup_xids(DB_INDEXER *indexer, ULEHANDLE ule, int i, TXNID xid, XIDS *xids_result); static int indexer_setup_xids_committed(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result);
static int indexer_find_prev(DB_INDEXER *indexer, ULEHANDLE ule, int i); static int indexer_setup_xids_provisional(DB_INDEXER *indexer, ULEHANDLE ule, int trindex, TXNID xid, BOOL xid_is_live, XIDS *xids_result);
static int indexer_find_prev_committed(DB_INDEXER *indexer, ULEHANDLE ule, int i);
static int indexer_find_prev_provisional(DB_INDEXER *indexer, ULEHANDLE ule, int i);
static int indexer_generate_hot_key_val(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, UXRHANDLE uxr, DBT *hotkey, DBT *hotval); static int indexer_generate_hot_key_val(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, UXRHANDLE uxr, DBT *hotkey, DBT *hotval);
static int indexer_brt_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids); static int indexer_brt_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
static int indexer_brt_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids); static int indexer_brt_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
static int indexer_brt_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids); static int indexer_brt_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids);
static int indexer_brt_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids); static int indexer_brt_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids);
static int indexer_brt_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids); static int indexer_brt_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
static int indexer_maybe_lock_provisional_key(DB_INDEXER *indexer, ULEHANDLE ule, int i, DB *hotdb, DBT *key, XIDS xids); static int indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_xid);
static int indexer_is_xid_live(DB_INDEXER *indexer, TXNID xid); static int indexer_is_xid_live(DB_INDEXER *indexer, TXNID xid);
static int indexer_is_previous_insert_key_valid(DB_INDEXER *indexer);
int void
indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) { indexer_undo_do_init(DB_INDEXER *indexer) {
indexer_commit_keys_init(&indexer->i->commit_keys);
// DBTs for the key and val
toku_init_dbt(&indexer->i->hotkey); indexer->i->hotkey.flags = DB_DBT_REALLOC;
toku_init_dbt(&indexer->i->hotval); indexer->i->hotval.flags = DB_DBT_REALLOC;
// DBT for the previous inserted key
toku_init_dbt(&indexer->i->previous_insert_key); indexer->i->previous_insert_key.flags = DB_DBT_REALLOC;
}
void
indexer_undo_do_destroy(DB_INDEXER *indexer) {
toku_destroy_dbt(&indexer->i->hotkey);
toku_destroy_dbt(&indexer->i->hotval);
toku_destroy_dbt(&indexer->i->previous_insert_key);
indexer_commit_keys_destroy(&indexer->i->commit_keys);
}
static int
indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
int result = 0; int result = 0;
// init the xids to the root xid // init the xids to the root xid
XIDS xids = xids_get_root_xids(); XIDS xids = xids_get_root_xids();
// DBTs for the key and val // scan the committed stack from bottom to top
DBT hotkey; toku_init_dbt(&hotkey); hotkey.flags = DB_DBT_REALLOC; int num_committed = ule_get_num_committed(ule);
DBT hotval; toku_init_dbt(&hotval); hotval.flags = DB_DBT_REALLOC; for (int trindex = 0; trindex < num_committed; trindex++) {
// DBT for the previous inserted key indexer_commit_keys_set_empty(&indexer->i->commit_keys);
DBT previous_insert_key; toku_init_dbt(&previous_insert_key); previous_insert_key.flags = DB_DBT_REALLOC;
// DBTs for the commit keys // get the transaction record
DBT delete_key; toku_init_dbt(&delete_key); delete_key.flags = DB_DBT_REALLOC; UXRHANDLE uxr = ule_get_uxr(ule, trindex);
DBT insert_key; toku_init_dbt(&insert_key); insert_key.flags = DB_DBT_REALLOC;
// scan the transaction stack from bottom to top
int num_uxrs = ule_get_num_committed(ule) + ule_get_num_provisional(ule);
for (int i = 0; i < num_uxrs; i++) {
// get the ith transaction record
UXRHANDLE uxr = ule_get_uxr(ule, i);
invariant(uxr); invariant(uxr);
// setup up the xids // setup up the xids
result = indexer_setup_xids(indexer, ule, i, uxr_get_txnid(uxr), &xids); TXNID this_xid = uxr_get_txnid(uxr);
result = indexer_setup_xids_committed(indexer, this_xid, &xids);
if (result != 0) if (result != 0)
break; break;
// skip placeholders // skip placeholders
if (uxr_is_placeholder(uxr)) { if (uxr_is_placeholder(uxr)) {
invariant(ule_is_provisional(ule, i)); invariant(0); // not allowed
continue; continue;
} }
// init delete and insert commits // undo
BOOL do_delete_commit = FALSE, do_insert_commit = FALSE; int prev_trindex = indexer_find_prev_committed(indexer, ule, trindex);
if (prev_trindex >= 0) {
UXRHANDLE prevuxr = ule_get_uxr(ule, prev_trindex);
if (uxr_is_delete(prevuxr)) {
; // do nothing
} else if (uxr_is_insert(prevuxr)) {
// generate the hot key and val
result = indexer_generate_hot_key_val(indexer, hotdb, ule, prevuxr, &indexer->i->hotkey, &indexer->i->hotval);
if (result == 0) {
// send the delete message
result = indexer_brt_delete_committed(indexer, hotdb, &indexer->i->hotkey, xids);
if (result == 0)
indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
}
} else
invariant(0);
}
if (result != 0)
break;
// do
if (uxr_is_delete(uxr)) {
; // do nothing
} else if (uxr_is_insert(uxr)) {
// generate the hot key and val
result = indexer_generate_hot_key_val(indexer, hotdb, ule, uxr, &indexer->i->hotkey, &indexer->i->hotval);
if (result == 0) {
// send the insert message
result = indexer_brt_insert_committed(indexer, hotdb, &indexer->i->hotkey, &indexer->i->hotval, xids);
if (result == 0) {
indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
result = toku_dbt_set(indexer->i->hotkey.size, indexer->i->hotkey.data, &indexer->i->previous_insert_key, NULL);
}
}
} else
invariant(0);
// send commit messages if needed
for (int i = 0; result == 0 && i < indexer_commit_keys_max_valid(&indexer->i->commit_keys); i++)
result = indexer_brt_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids);
if (result != 0)
break;
}
xids_destroy(&xids);
return result;
}
static int
indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
int result = 0;
indexer_commit_keys_set_empty(&indexer->i->commit_keys);
// init the xids to the root xid
XIDS xids = xids_get_root_xids();
TXNID outermost_xid;
BOOL outermost_is_live = FALSE;
// scan the provisional stack from bottom to top
int num_committed = ule_get_num_committed(ule);
int num_provisional = ule_get_num_provisional(ule);
for (int trindex = num_committed; trindex < num_committed + num_provisional; trindex++) {
// get the ith transaction record
UXRHANDLE uxr = ule_get_uxr(ule, trindex);
invariant(uxr);
TXNID this_xid = uxr_get_txnid(uxr);
BOOL this_xid_is_live = indexer_is_xid_live(indexer, this_xid);
if (trindex == num_committed) {
outermost_xid = this_xid;
outermost_is_live = this_xid_is_live;
}
// setup up the xids
result = indexer_setup_xids_provisional(indexer, ule, trindex, this_xid, this_xid_is_live, &xids);
if (result != 0)
break;
// skip placeholders
if (uxr_is_placeholder(uxr))
continue;
// undo // undo
int previ = indexer_find_prev(indexer, ule, i); int prev_trindex = indexer_find_prev_provisional(indexer, ule, trindex);
if (previ >= 0) { if (prev_trindex >= 0) {
UXRHANDLE prevuxr = ule_get_uxr(ule, previ); UXRHANDLE prevuxr = ule_get_uxr(ule, prev_trindex);
if (uxr_is_delete(prevuxr)) { if (uxr_is_delete(prevuxr)) {
; // do nothing ; // do nothing
} else if (uxr_is_insert(prevuxr)) { } else if (uxr_is_insert(prevuxr)) {
// generate the hot key and val // generate the hot key and val
result = indexer_generate_hot_key_val(indexer, hotdb, ule, prevuxr, &hotkey, &hotval); result = indexer_generate_hot_key_val(indexer, hotdb, ule, prevuxr, &indexer->i->hotkey, &indexer->i->hotval);
if (result == 0) { if (result == 0) {
// send the delete message // send the delete message
if (ule_is_committed(ule, i) || !indexer_is_xid_live(indexer, xids_get_outermost_xid(xids))) { if (!outermost_is_live) {
result = indexer_brt_delete_committed(indexer, hotdb, &hotkey, xids); result = indexer_brt_delete_committed(indexer, hotdb, &indexer->i->hotkey, xids);
if (result == 0) { if (result == 0)
do_delete_commit = TRUE; indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
result = toku_dbt_set(hotkey.size, hotkey.data, &delete_key, NULL);
}
} else { } else {
result = indexer_brt_delete_provisional(indexer, hotdb, &hotkey, xids); result = indexer_brt_delete_provisional(indexer, hotdb, &indexer->i->hotkey, xids);
if (result == 0)
result = indexer_lock_key(indexer, hotdb, &indexer->i->hotkey, outermost_xid);
} }
if (result == 0)
result = indexer_maybe_lock_provisional_key(indexer, ule, i, hotdb, &hotkey, xids);
} }
} else } else
invariant(0); invariant(0);
...@@ -107,78 +249,101 @@ indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) { ...@@ -107,78 +249,101 @@ indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
// do // do
if (uxr_is_delete(uxr)) { if (uxr_is_delete(uxr)) {
result = indexer_maybe_lock_provisional_key(indexer, ule, i, hotdb, &previous_insert_key, xids); if (outermost_is_live && indexer_is_previous_insert_key_valid(indexer))
result = indexer_lock_key(indexer, hotdb, &indexer->i->previous_insert_key, outermost_xid);
} else if (uxr_is_insert(uxr)) { } else if (uxr_is_insert(uxr)) {
// generate the hot key and val // generate the hot key and val
result = indexer_generate_hot_key_val(indexer, hotdb, ule, uxr, &hotkey, &hotval); result = indexer_generate_hot_key_val(indexer, hotdb, ule, uxr, &indexer->i->hotkey, &indexer->i->hotval);
if (result == 0) { if (result == 0) {
// send the insert message // send the insert message
if (ule_is_committed(ule, i) || !indexer_is_xid_live(indexer, xids_get_outermost_xid(xids))) { if (!outermost_is_live) {
result = indexer_brt_insert_committed(indexer, hotdb, &hotkey, &hotval, xids); result = indexer_brt_insert_committed(indexer, hotdb, &indexer->i->hotkey, &indexer->i->hotval, xids);
if (result == 0) {
do_insert_commit = TRUE;
result = toku_dbt_set(hotkey.size, hotkey.data, &insert_key, NULL);
}
} else { } else {
result = indexer_brt_insert_provisional(indexer, hotdb, &hotkey, &hotval, xids); result = indexer_brt_insert_provisional(indexer, hotdb, &indexer->i->hotkey, &indexer->i->hotval, xids);
} if (result == 0)
if (result == 0) { result = indexer_lock_key(indexer, hotdb, &indexer->i->hotkey, outermost_xid);
result = indexer_maybe_lock_provisional_key(indexer, ule, i, hotdb, &hotkey, xids);
if (result == 0)
result = toku_dbt_set(hotkey.size, hotkey.data, &previous_insert_key, NULL);
} }
if (result == 0)
result = toku_dbt_set(indexer->i->hotkey.size, indexer->i->hotkey.data, &indexer->i->previous_insert_key, NULL);
} }
} else } else
invariant(0); invariant(0);
// send commit messages if needed
if (result == 0 && do_delete_commit)
result = indexer_brt_commit(indexer, hotdb, &delete_key, xids);
if (result == 0 && do_insert_commit)
result = indexer_brt_commit(indexer, hotdb, &insert_key, xids);
if (result != 0) if (result != 0)
break; break;
} }
// cleanup // send commits if the outermost provisional transaction is committed
toku_destroy_dbt(&hotkey); for (int i = 0; result == 0 && i < indexer_commit_keys_max_valid(&indexer->i->commit_keys); i++)
toku_destroy_dbt(&hotval); result = indexer_brt_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids);
toku_destroy_dbt(&delete_key);
toku_destroy_dbt(&insert_key);
toku_destroy_dbt(&previous_insert_key);
xids_destroy(&xids); xids_destroy(&xids);
return result;
}
int
indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
int result;
result = indexer_undo_do_committed(indexer, hotdb, ule);
if (result == 0)
result = indexer_undo_do_provisional(indexer, hotdb, ule);
if ( indexer->i->test_only_flags == INDEXER_TEST_ONLY_ERROR_CALLBACK ) if ( indexer->i->test_only_flags == INDEXER_TEST_ONLY_ERROR_CALLBACK )
result = EINVAL; result = EINVAL;
return result; return result;
} }
// the committed XIDS always = [this_xid]
static int
indexer_setup_xids_committed(DB_INDEXER *UU(indexer), TXNID this_xid, XIDS *xids_result) {
int result = 0;
XIDS old_xids = *xids_result;
XIDS new_xids = xids_get_root_xids();
if (this_xid > 0) {
XIDS child_xids;
result = xids_create_child(new_xids, &child_xids, this_xid);
xids_destroy(&new_xids);
if (result == 0)
new_xids = child_xids;
}
if (result == 0) {
xids_destroy(&old_xids);
*xids_result = new_xids;
}
return result;
}
// the provisional XIDS = XIDS . [this_xid] when this_xid is live or when XIDS is empty
static int static int
indexer_setup_xids(DB_INDEXER *indexer, ULEHANDLE ule, int i, TXNID this_xid, XIDS *xids_result) { indexer_setup_xids_provisional(DB_INDEXER *UU(indexer), ULEHANDLE ule, int trindex, TXNID xid, BOOL xid_is_live, XIDS *xids_result) {
indexer = indexer; invariant(trindex >= ule_get_num_committed(ule));
int result = 0; int result = 0;
XIDS old_xids = *xids_result; XIDS old_xids = *xids_result;
XIDS new_xids; XIDS new_xids;
if (i <= ule_get_num_committed(ule) || xids_get_num_xids(old_xids) == 0) { if (xids_get_num_xids(old_xids) == 0) {
// setup xids = [ root xid, this xid ] // setup xids = [ root xid, xid ]
new_xids = xids_get_root_xids(); new_xids = xids_get_root_xids();
if (this_xid > 0) { if (xid > 0) {
XIDS child_xids; XIDS child_xids;
result = xids_create_child(new_xids, &child_xids, this_xid); result = xids_create_child(new_xids, &child_xids, xid);
xids_destroy(&new_xids); xids_destroy(&new_xids);
if (result == 0) if (result == 0) {
new_xids = child_xids; new_xids = child_xids;
xids_destroy(&old_xids);
*xids_result = new_xids;
}
} }
} else { } else if (xid_is_live) {
// append this_xid to xids // append xid to xids
result = xids_create_child(old_xids, &new_xids, this_xid); result = xids_create_child(old_xids, &new_xids, xid);
} if (result == 0) {
xids_destroy(&old_xids);
if (result == 0) { *xids_result = new_xids;
xids_destroy(&old_xids); }
*xids_result = new_xids;
} }
return result; return result;
...@@ -226,36 +391,30 @@ indexer_is_xid_live(DB_INDEXER *indexer, TXNID xid) { ...@@ -226,36 +391,30 @@ indexer_is_xid_live(DB_INDEXER *indexer, TXNID xid) {
// Take a write lock on the given key for the outermost xid in the xids list. // Take a write lock on the given key for the outermost xid in the xids list.
static int static int
indexer_maybe_lock_provisional_key(DB_INDEXER *indexer, ULEHANDLE ule, int i, DB *hotdb, DBT *key, XIDS xids) { indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_xid) {
int result = 0; int result = 0;
if (ule_is_provisional(ule, i)) { // TEST
TXNID outermost_live_xid = xids_get_outermost_xid(xids); if (indexer->i->test_lock_key) {
// TEST result = indexer->i->test_lock_key(indexer, outermost_live_xid, hotdb, key);
if (indexer->i->test_maybe_lock_provisional_key) { } else {
result = indexer->i->test_maybe_lock_provisional_key(indexer, outermost_live_xid, hotdb, key); DB_ENV *env = indexer->i->env;
} else { TOKUTXN txn = NULL;
DB_ENV *env = indexer->i->env; result = toku_txnid2txn(env->i->logger, outermost_live_xid, &txn);
TOKUTXN txn = NULL; invariant(result == 0 && txn != NULL);
result = toku_txnid2txn(env->i->logger, outermost_live_xid, &txn); result = toku_grab_write_lock(hotdb, key, txn);
invariant(result == 0);
if (txn != NULL) {
result = toku_grab_write_lock(hotdb, key, txn);
lazy_assert(result == 0);
}
}
} }
return result; return result;
} }
static int static int
indexer_find_prev_committed(ULEHANDLE ule, int i) { indexer_find_prev_committed(DB_INDEXER *UU(indexer), ULEHANDLE ule, int i) {
invariant(i < ule_num_uxrs(ule)); invariant(i < ule_num_uxrs(ule));
int previ = i - 1; int previ = i - 1;
return previ; return previ;
} }
static int static int
indexer_find_prev_provisional(ULEHANDLE ule, int i) { indexer_find_prev_provisional(DB_INDEXER *UU(indexer), ULEHANDLE ule, int i) {
invariant(i < ule_num_uxrs(ule)); invariant(i < ule_num_uxrs(ule));
int previ = i - 1; int previ = i - 1;
while (previ >= 0) { while (previ >= 0) {
...@@ -267,17 +426,6 @@ indexer_find_prev_provisional(ULEHANDLE ule, int i) { ...@@ -267,17 +426,6 @@ indexer_find_prev_provisional(ULEHANDLE ule, int i) {
return previ; return previ;
} }
// find the index of the previous transaction record before the ith position in the stack
static int
indexer_find_prev(DB_INDEXER *UU(indexer), ULEHANDLE ule, int i) {
int result;
if (ule_is_committed(ule, i))
result = indexer_find_prev_committed(ule, i);
else
result = indexer_find_prev_provisional(ule, i);
return result;
}
static TOKUTXN static TOKUTXN
indexer_get_innermost_live_txn(DB_INDEXER *indexer, XIDS xids) { indexer_get_innermost_live_txn(DB_INDEXER *indexer, XIDS xids) {
DB_ENV *env = indexer->i->env; DB_ENV *env = indexer->i->env;
...@@ -378,3 +526,8 @@ indexer_brt_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) { ...@@ -378,3 +526,8 @@ indexer_brt_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) {
} }
return result; return result;
} }
static int
indexer_is_previous_insert_key_valid(DB_INDEXER *indexer) {
return indexer->i->previous_insert_key.size > 0;
}
...@@ -92,6 +92,7 @@ free_indexer_resources(DB_INDEXER *indexer) { ...@@ -92,6 +92,7 @@ free_indexer_resources(DB_INDEXER *indexer) {
toku_free(indexer->i->fnums); toku_free(indexer->i->fnums);
indexer->i->fnums = NULL; indexer->i->fnums = NULL;
} }
indexer_undo_do_destroy(indexer);
indexer_release_refs(indexer); indexer_release_refs(indexer);
// indexer->i // indexer->i
toku_free(indexer->i); toku_free(indexer->i);
...@@ -172,6 +173,7 @@ toku_indexer_create_indexer(DB_ENV *env, ...@@ -172,6 +173,7 @@ toku_indexer_create_indexer(DB_ENV *env,
create_exit: create_exit:
if ( rval == 0 ) { if ( rval == 0 ) {
indexer_undo_do_init(indexer);
indexer_add_refs(indexer); indexer_add_refs(indexer);
*indexerp = indexer; *indexerp = indexer;
......
...@@ -195,14 +195,13 @@ test_is_xid_live(DB_INDEXER *indexer, TXNID xid) { ...@@ -195,14 +195,13 @@ test_is_xid_live(DB_INDEXER *indexer, TXNID xid) {
} }
static int static int
test_maybe_lock_provisional_key(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key) { test_lock_key(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key) {
invariant(indexer == test_indexer); invariant(indexer == test_indexer);
invariant(hotdb == test_hotdb); invariant(hotdb == test_hotdb);
if (test_is_xid_live(indexer, xid)) { invariant(test_is_xid_live(indexer, xid));
printf("lock [%lu] ", xid); printf("lock [%lu] ", xid);
print_dbt(key); print_dbt(key);
printf("\n"); printf("\n");
}
return 0; return 0;
} }
...@@ -360,19 +359,19 @@ read_test(char *testname, ULE ule) { ...@@ -360,19 +359,19 @@ read_test(char *testname, ULE ule) {
ule_add_provisional(ule, &uxr_s); ule_add_provisional(ule, &uxr_s);
continue; continue;
} }
printf("%s???\n", line); fprintf(stderr, "%s???\n", line);
r = EINVAL; r = EINVAL;
} }
toku_free(line); toku_free(line);
fclose(f); fclose(f);
} else { } else {
r = errno; r = errno;
printf("fopen %s errno=%d\n", testname, errno); fprintf(stderr, "fopen %s errno=%d\n", testname, errno);
} }
return r; return r;
} }
static void static int
run_test(char *envdir, char *testname) { run_test(char *envdir, char *testname) {
if (verbose) if (verbose)
printf("%s\n", testname); printf("%s\n", testname);
...@@ -403,7 +402,7 @@ run_test(char *envdir, char *testname) { ...@@ -403,7 +402,7 @@ run_test(char *envdir, char *testname) {
// set test callbacks // set test callbacks
indexer->i->test_is_xid_live = test_is_xid_live; indexer->i->test_is_xid_live = test_is_xid_live;
indexer->i->test_maybe_lock_provisional_key = test_maybe_lock_provisional_key; indexer->i->test_lock_key = test_lock_key;
indexer->i->test_delete_provisional = test_delete_provisional; indexer->i->test_delete_provisional = test_delete_provisional;
indexer->i->test_delete_committed = test_delete_committed; indexer->i->test_delete_committed = test_delete_committed;
indexer->i->test_insert_provisional = test_insert_provisional; indexer->i->test_insert_provisional = test_insert_provisional;
...@@ -419,7 +418,9 @@ run_test(char *envdir, char *testname) { ...@@ -419,7 +418,9 @@ run_test(char *envdir, char *testname) {
ule_init(ule); ule_init(ule);
// read the test // read the test
r = read_test(testname, ule); assert_zero(r); r = read_test(testname, ule);
if (r != 0)
return r;
r = indexer->i->undo_do(indexer, dest_db, ule); assert_zero(r); r = indexer->i->undo_do(indexer, dest_db, ule); assert_zero(r);
...@@ -435,6 +436,8 @@ run_test(char *envdir, char *testname) { ...@@ -435,6 +436,8 @@ run_test(char *envdir, char *testname) {
r = env->close(env, 0); assert_zero(r); r = env->close(env, 0); assert_zero(r);
live_destroy(&live_xids); live_destroy(&live_xids);
return r;
} }
int int
...@@ -457,7 +460,7 @@ test_main(int argc, char * const argv[]) { ...@@ -457,7 +460,7 @@ test_main(int argc, char * const argv[]) {
break; break;
} }
for ( ; i < argc; i++) { for (r = 0 ; r == 0 && i < argc; i++) {
char *testname = argv[i]; char *testname = argv[i];
char envdir[strlen(ENVDIR) + 1 + 32 + 1]; char envdir[strlen(ENVDIR) + 1 + 32 + 1];
sprintf(envdir, "%s.%d", ENVDIR, toku_os_getpid()); sprintf(envdir, "%s.%d", ENVDIR, toku_os_getpid());
...@@ -467,9 +470,9 @@ test_main(int argc, char * const argv[]) { ...@@ -467,9 +470,9 @@ test_main(int argc, char * const argv[]) {
r = system(syscmd); assert_zero(r); r = system(syscmd); assert_zero(r);
r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r); r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
run_test(envdir, testname); r = run_test(envdir, testname);
} }
return 0; return r;
} }
key k1
delete committed 0
delete committed 100
delete committed 200
insert_committed [200] v100 k1
commit_any [200] v100
key k1
delete committed 0
delete committed 100
insert committed 200 v100
key k1
delete committed 0
delete committed 100
insert_committed [100] v100 k1
commit_any [100] v100
delete_committed [200] v100
commit_any [200] v100
key k1
delete committed 0
insert committed 100 v100
delete committed 200
insert_committed [0] v0 k1
delete_committed [100] v0
commit_any [100] v0
insert_committed [200] v100 k1
commit_any [200] v100
key k1
insert committed 0 v0
delete committed 100
insert committed 200 v100
insert_committed [0] v10 k1
delete_committed [100] v10
commit_any [100] v10
key k1
insert committed 0 v10
delete committed 100
insert_committed [0] v0 k1
delete_committed [100] v0
insert_committed [100] v100 k1
commit_any [100] v0
commit_any [100] v100
delete_committed [200] v100
commit_any [200] v100
key k1
insert committed 0 v0
insert committed 100 v100
delete committed 200
insert_committed [0] v0 k1
delete_committed [100] v0
insert_committed [100] v100 k1
commit_any [100] v0
commit_any [100] v100
delete_committed [200] v100
insert_committed [200] v200 k1
commit_any [200] v100
commit_any [200] v200
key k1
insert committed 0 v0
insert committed 100 v100
insert committed 200 v200
insert_committed [0] v10 k1
delete_committed [100] v10
insert_committed [100] v20 k1
commit_any [100] v10
commit_any [100] v20
key k1
insert committed 0 v10
insert committed 100 v20
insert_committed [0] v100 k1
delete_committed [200] v100
insert_committed [200] v200 k1
commit_any [200] v100
commit_any [200] v200
delete_committed [300] v200
insert_committed [300] v300 k1
commit_any [300] v200
commit_any [300] v300
key k1
insert committed 0 v100
insert committed 200 v200
insert committed 300 v300
key k1 key k1
delete committed 0 delete committed 0
insert committed 100 v10
insert provisional 300 v18 insert provisional 300 v18
placeholder 301 placeholder 301
insert provisional 302 v20 insert provisional 302 v20
...@@ -7,9 +7,8 @@ delete_committed [200] v20 ...@@ -7,9 +7,8 @@ delete_committed [200] v20
insert_committed [200] v10 k1 insert_committed [200] v10 k1
commit_any [200] v20 commit_any [200] v20
commit_any [200] v10 commit_any [200] v10
delete_committed [300,301] v10 delete_committed [300] v10
insert_committed [300,301] v30 k1 insert_committed [300] v30 k1
commit_any [300,301] v10 delete_committed [300] v30
commit_any [300,301] v30 commit_any [300] v10
delete_committed [300,301,302] v30 commit_any [300] v30
commit_any [300,301,302] v30
key k1
delete committed 0
delete provisional 100
insert_committed [100] v100 k1 insert_committed [100] v100 k1
delete_committed [100] v100
commit_any [100] v100 commit_any [100] v100
key k1
delete committed 0
insert provisional 100 v100
delete provisional 200
insert_committed [0] v10 k1
delete_committed [100] v10
commit_any [100] v10
key k1
insert committed 0 v10
delete provisional 100
insert_committed [0] v10 k1
delete_committed [100] v10
insert_committed [100] v100 k1
commit_any [100] v10
key k1
insert committed 0 v10
insert provisional 100 v100
live 100
key k1
delete committed 0
delete provisional 100
insert_provisional [100] v100 k1
lock [100] v100
delete_provisional [100,200] v100
lock [100] v100
lock [100] v100
live 100 200
key k1
delete committed 0
insert provisional 100 v100
delete provisional 200
insert_committed [0] v10 k1
delete_provisional [100] v10
lock [100] v10
lock [100] v10
live 100
key k1
insert committed 0 v10
delete provisional 100
insert_committed [0] v10 k1
delete_provisional [100] v10
lock [100] v10
insert_provisional [100] v100 k1
lock [100] v100
live 100
key k1
insert committed 0 v10
insert provisional 100 v100
...@@ -4,10 +4,11 @@ ...@@ -4,10 +4,11 @@
tests="" tests=""
verbose=0 verbose=0
valgrind=""
for arg in $* ; do for arg in $* ; do
if [ $arg = "--verbose" ] ; then if [[ $arg =~ "--(.*)=(.*)" ]] ; then
verbose=1 eval ${BASH_REMATCH[1]}=${BASH_REMATCH[2]}
else else
tests="$tests $arg" tests="$tests $arg"
fi fi
...@@ -26,7 +27,7 @@ for t in $tests ; do ...@@ -26,7 +27,7 @@ for t in $tests ; do
fi fi
if [ $verbose != 0 ] ; then echo $testdir $testname $testfile $resultfile; fi if [ $verbose != 0 ] ; then echo $testdir $testname $testfile $resultfile; fi
./hotindexer-undo-do-test.tdb $testdir/$testfile >$testdir/$testname.run $valgrind ./hotindexer-undo-do-test.tdb $testdir/$testfile >$testdir/$testname.run
if [ -f $testdir/$resultfile ] ; then if [ -f $testdir/$resultfile ] ; then
diff -q $testdir/$testname.run $testdir/$resultfile >/dev/null 2>&1 diff -q $testdir/$testname.run $testdir/$resultfile >/dev/null 2>&1
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment