Commit 363ff440 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4687], fix some issues surrounding hot indexing and its stress test

git-svn-id: file:///svn/toku/tokudb@44570 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7c7f81dc
...@@ -226,7 +226,7 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash ...@@ -226,7 +226,7 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash
void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) { void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) {
ROLLBACK_LOG_NODE pinned_log; ROLLBACK_LOG_NODE pinned_log;
invariant(txn->state == TOKUTXN_LIVE); // #3258 invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions
if (txn_has_current_rollback_log(txn)) { if (txn_has_current_rollback_log(txn)) {
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, txn->roll_info.current_rollback_hash, &pinned_log); toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, txn->roll_info.current_rollback_hash, &pinned_log);
toku_rollback_verify_contents(pinned_log, txn->txnid64, txn->roll_info.num_rollback_nodes - 1); toku_rollback_verify_contents(pinned_log, txn->txnid64, txn->roll_info.num_rollback_nodes - 1);
......
...@@ -385,6 +385,8 @@ void toku_txn_complete_txn(TOKUTXN txn) { ...@@ -385,6 +385,8 @@ void toku_txn_complete_txn(TOKUTXN txn) {
assert(txn->roll_info.spilled_rollback_head.b == ROLLBACK_NONE.b); assert(txn->roll_info.spilled_rollback_head.b == ROLLBACK_NONE.b);
assert(txn->roll_info.spilled_rollback_tail.b == ROLLBACK_NONE.b); assert(txn->roll_info.spilled_rollback_tail.b == ROLLBACK_NONE.b);
assert(txn->roll_info.current_rollback.b == ROLLBACK_NONE.b); assert(txn->roll_info.current_rollback.b == ROLLBACK_NONE.b);
assert(txn->num_pin == 0);
assert(txn->state == TOKUTXN_COMMITTING || txn->state == TOKUTXN_ABORTING);
toku_txn_manager_finish_txn(txn->logger->txn_manager, txn); toku_txn_manager_finish_txn(txn->logger->txn_manager, txn);
// note that here is another place we depend on // note that here is another place we depend on
// this function being called with the multi operation lock // this function being called with the multi operation lock
......
...@@ -200,6 +200,29 @@ put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT * ...@@ -200,6 +200,29 @@ put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *
return 0; return 0;
} }
static int
del_callback(DB *dest_db, DB *src_db, DBT *dest_key, const DBT *src_key, const DBT *src_data) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; src_key = src_key; src_data = src_data;
lazy_assert(src_db != NULL && dest_db != NULL);
switch (dest_key->flags) {
case 0:
dest_key->data = src_data->data;
dest_key->size = src_data->size;
break;
case DB_DBT_REALLOC:
dest_key->data = toku_realloc(dest_key->data, src_data->size);
memcpy(dest_key->data, src_data->data, src_data->size);
dest_key->size = src_data->size;
break;
default:
lazy_assert(0);
}
return 0;
}
static DB_INDEXER *test_indexer = NULL; static DB_INDEXER *test_indexer = NULL;
static DB *test_hotdb = NULL; static DB *test_hotdb = NULL;
...@@ -425,6 +448,7 @@ run_test(char *envdir, char *testname) { ...@@ -425,6 +448,7 @@ run_test(char *envdir, char *testname) {
r = env->set_redzone(env, 0); assert_zero(r); r = env->set_redzone(env, 0); assert_zero(r);
r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r); r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
r = env->open(env, envdir, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r); r = env->open(env, envdir, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
DB* hot_db; DB* hot_db;
toku_mutex_t fops_lock; toku_mutex_t fops_lock;
toku_mutex_t hi_lock; toku_mutex_t hi_lock;
u_int32_t gid_count;
u_int8_t hi_gid[DB_GID_SIZE];
static int static int
hi_put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data) { hi_put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data) {
...@@ -112,12 +115,16 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void ...@@ -112,12 +115,16 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void
toku_free(dest_vals[1].data); toku_free(dest_vals[1].data);
} }
increment_counter(stats_extra, PUTS, i); increment_counter(stats_extra, PUTS, i);
gid_count++;
*(u_int32_t *)hi_gid = gid_count;
int rr = hi_txn->prepare(hi_txn, hi_gid);
CKERR(rr);
if (r || (random() % 2)) { if (r || (random() % 2)) {
int rr = hi_txn->abort(hi_txn); rr = hi_txn->abort(hi_txn);
CKERR(rr); CKERR(rr);
} }
else { else {
int rr = hi_txn->commit(hi_txn, 0); rr = hi_txn->commit(hi_txn, 0);
CKERR(rr); CKERR(rr);
} }
toku_mutex_unlock(&fops_lock); toku_mutex_unlock(&fops_lock);
...@@ -252,6 +259,8 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -252,6 +259,8 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
int int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
gid_count = 0;
memset(hi_gid, 0, sizeof(hi_gid));
toku_mutex_init(&hi_lock, NULL); toku_mutex_init(&hi_lock, NULL);
toku_mutex_init(&fops_lock, NULL); toku_mutex_init(&fops_lock, NULL);
hot_db = NULL; hot_db = NULL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment