Commit e94961e8 authored by Rich Prohaska's avatar Rich Prohaska

#27 bigtxn with checkpoints passes the bigtxn27 test

parent 949ae3ae
...@@ -183,6 +183,7 @@ static LSN last_completed_checkpoint_lsn; ...@@ -183,6 +183,7 @@ static LSN last_completed_checkpoint_lsn;
static toku_pthread_rwlock_t checkpoint_safe_lock; static toku_pthread_rwlock_t checkpoint_safe_lock;
static toku_pthread_rwlock_t multi_operation_lock; static toku_pthread_rwlock_t multi_operation_lock;
static toku_pthread_rwlock_t big_multi_operation_lock;
static bool initialized = false; // sanity check static bool initialized = false; // sanity check
static volatile bool locked_mo = false; // true when the multi_operation write lock is held (by checkpoint) static volatile bool locked_mo = false; // true when the multi_operation write lock is held (by checkpoint)
...@@ -204,6 +205,7 @@ multi_operation_lock_init(void) { ...@@ -204,6 +205,7 @@ multi_operation_lock_init(void) {
// happen on osx // happen on osx
#endif #endif
toku_pthread_rwlock_init(&multi_operation_lock, &attr); toku_pthread_rwlock_init(&multi_operation_lock, &attr);
toku_pthread_rwlock_init(&big_multi_operation_lock, &attr);
pthread_rwlockattr_destroy(&attr); pthread_rwlockattr_destroy(&attr);
locked_mo = false; locked_mo = false;
} }
...@@ -211,10 +213,12 @@ multi_operation_lock_init(void) { ...@@ -211,10 +213,12 @@ multi_operation_lock_init(void) {
static void static void
multi_operation_lock_destroy(void) { multi_operation_lock_destroy(void) {
toku_pthread_rwlock_destroy(&multi_operation_lock); toku_pthread_rwlock_destroy(&multi_operation_lock);
toku_pthread_rwlock_destroy(&big_multi_operation_lock);
} }
static void static void
multi_operation_checkpoint_lock(void) { multi_operation_checkpoint_lock(void) {
toku_pthread_rwlock_wrlock(&big_multi_operation_lock);
toku_pthread_rwlock_wrlock(&multi_operation_lock); toku_pthread_rwlock_wrlock(&multi_operation_lock);
locked_mo = true; locked_mo = true;
} }
...@@ -223,6 +227,7 @@ static void ...@@ -223,6 +227,7 @@ static void
multi_operation_checkpoint_unlock(void) { multi_operation_checkpoint_unlock(void) {
locked_mo = false; locked_mo = false;
toku_pthread_rwlock_wrunlock(&multi_operation_lock); toku_pthread_rwlock_wrunlock(&multi_operation_lock);
toku_pthread_rwlock_wrunlock(&big_multi_operation_lock);
} }
static void static void
...@@ -264,6 +269,14 @@ toku_multi_operation_client_unlock(void) { ...@@ -264,6 +269,14 @@ toku_multi_operation_client_unlock(void) {
toku_pthread_rwlock_rdunlock(&multi_operation_lock); toku_pthread_rwlock_rdunlock(&multi_operation_lock);
} }
void toku_big_multi_operation_client_lock(void) {
toku_pthread_rwlock_rdlock(&big_multi_operation_lock);
}
void toku_big_multi_operation_client_unlock(void) {
toku_pthread_rwlock_rdunlock(&big_multi_operation_lock);
}
void void
toku_checkpoint_safe_client_lock(void) { toku_checkpoint_safe_client_lock(void) {
if (locked_cs) if (locked_cs)
......
...@@ -135,8 +135,10 @@ void toku_checkpoint_safe_client_unlock(void); ...@@ -135,8 +135,10 @@ void toku_checkpoint_safe_client_unlock(void);
*****/ *****/
void toku_multi_operation_client_lock(void); void toku_multi_operation_client_lock(void);
void toku_big_multi_operation_client_lock(void);
void toku_multi_operation_client_unlock(void); void toku_multi_operation_client_unlock(void);
void toku_big_multi_operation_client_unlock(void);
// Initialize the checkpoint mechanism, must be called before any client operations. // Initialize the checkpoint mechanism, must be called before any client operations.
......
...@@ -786,7 +786,9 @@ void toku_txn_unpin_live_txn(TOKUTXN txn) { ...@@ -786,7 +786,9 @@ void toku_txn_unpin_live_txn(TOKUTXN txn) {
toku_txn_unlock_state(txn); toku_txn_unlock_state(txn);
} }
bool toku_txn_has_spilled_rollback(TOKUTXN txn) {
return txn_has_spilled_rollback_logs(txn);
}
#include <toku_race_tools.h> #include <toku_race_tools.h>
void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void); void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void);
......
...@@ -216,4 +216,6 @@ void toku_txn_unlock_state(TOKUTXN txn); ...@@ -216,4 +216,6 @@ void toku_txn_unlock_state(TOKUTXN txn);
void toku_txn_pin_live_txn_unlocked(TOKUTXN txn); void toku_txn_pin_live_txn_unlocked(TOKUTXN txn);
void toku_txn_unpin_live_txn(TOKUTXN txn); void toku_txn_unpin_live_txn(TOKUTXN txn);
bool toku_txn_has_spilled_rollback(TOKUTXN txn);
#endif //TOKUTXN_H #endif //TOKUTXN_H
...@@ -92,8 +92,8 @@ PATENT RIGHTS GRANT: ...@@ -92,8 +92,8 @@ PATENT RIGHTS GRANT:
#include <pthread.h> #include <pthread.h>
// verify that a commit of a big txn does not block the commits of other txn's // verify that a commit of a big txn does not block the commits of other txn's
// commit writer happens before bigtxn commit happens before checkpoint // commit writer (0) happens before bigtxn commit (1) happens before checkpoint (2)
static int state = 0; static int test_state = 0;
static void *checkpoint_thread(void *arg) { static void *checkpoint_thread(void *arg) {
sleep(1); sleep(1);
...@@ -102,7 +102,8 @@ static void *checkpoint_thread(void *arg) { ...@@ -102,7 +102,8 @@ static void *checkpoint_thread(void *arg) {
int r = env->txn_checkpoint(env, 0, 0, 0); int r = env->txn_checkpoint(env, 0, 0, 0);
assert(r == 0); assert(r == 0);
printf("%s done\n", __FUNCTION__); printf("%s done\n", __FUNCTION__);
assert(toku_sync_fetch_and_add(&state, 1) == 2); int old_state = toku_sync_fetch_and_add(&test_state, 1);
assert(old_state == 2);
return arg; return arg;
} }
...@@ -132,18 +133,25 @@ static void *w_thread(void *arg) { ...@@ -132,18 +133,25 @@ static void *w_thread(void *arg) {
r = txn->commit(txn, 0); r = txn->commit(txn, 0);
assert(r == 0); assert(r == 0);
printf("%s done\n", __FUNCTION__); printf("%s done\n", __FUNCTION__);
assert(toku_sync_fetch_and_add(&state, 1) == 0); int old_state = toku_sync_fetch_and_add(&test_state, 1);
assert(old_state == 0);
return arg; return arg;
} }
static void bigtxn_progress(TOKU_TXN_PROGRESS progress, void *extra) { static void bigtxn_progress(TOKU_TXN_PROGRESS progress, void *extra) {
printf("%s %lu %lu %p\n", __FUNCTION__, progress->entries_processed, progress->entries_total, extra); printf("%s %lu %lu %p\n", __FUNCTION__, progress->entries_processed, progress->entries_total, extra);
sleep(10); sleep(1);
} }
int test_main (int argc __attribute__((__unused__)), char *const argv[] __attribute__((__unused__))) { int test_main (int argc, char *const argv[]) {
int r; int r;
int N = 10000; int N = 25000;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--N") == 0 && i+1 < argc) {
N = atoi(argv[++i]);
continue;
}
}
toku_os_recursive_delete(TOKU_TEST_FILENAME); toku_os_recursive_delete(TOKU_TEST_FILENAME);
r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO);
...@@ -153,6 +161,10 @@ int test_main (int argc __attribute__((__unused__)), char *const argv[] __attrib ...@@ -153,6 +161,10 @@ int test_main (int argc __attribute__((__unused__)), char *const argv[] __attrib
r = db_env_create(&env, 0); r = db_env_create(&env, 0);
assert(r == 0); assert(r == 0);
// avoid locktree escalation by picking a big enough lock tree
r = env->set_lk_max_memory(env, 128*1024*1024);
assert(r == 0);
r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0); assert(r == 0);
...@@ -167,27 +179,34 @@ int test_main (int argc __attribute__((__unused__)), char *const argv[] __attrib ...@@ -167,27 +179,34 @@ int test_main (int argc __attribute__((__unused__)), char *const argv[] __attrib
r = env->txn_begin(env, NULL, &bigtxn, 0); r = env->txn_begin(env, NULL, &bigtxn, 0);
assert(r == 0); assert(r == 0);
// use a big key so that the rollback log spills
char k[1024]; memset(k, 0, sizeof k);
char v[8]; memset(v, 0, sizeof v);
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++) {
DBT key = { .data = &i, .size = sizeof i }; memcpy(k, &i, sizeof i);
DBT val = { .data = &i, .size = sizeof i }; memcpy(v, &i, sizeof i);
DBT key = { .data = k, .size = sizeof k };
DBT val = { .data = v, .size = sizeof v };
r = db->put(db, bigtxn, &key, &val, 0); r = db->put(db, bigtxn, &key, &val, 0);
assert(r == 0); assert(r == 0);
if ((i % 10000) == 0) if ((i % 10000) == 0)
printf("put %d\n", i); printf("put %d\n", i);
} }
pthread_t checkpoint_tid; pthread_t checkpoint_tid = 0;
r = pthread_create(&checkpoint_tid, NULL, checkpoint_thread, env); r = pthread_create(&checkpoint_tid, NULL, checkpoint_thread, env);
assert(r == 0); assert(r == 0);
pthread_t w_tid; pthread_t w_tid = 0;
struct writer_arg w_arg = { env, db, N }; struct writer_arg w_arg = { env, db, N };
r = pthread_create(&w_tid, NULL, w_thread, &w_arg); r = pthread_create(&w_tid, NULL, w_thread, &w_arg);
assert(r == 0); assert(r == 0);
r = bigtxn->commit_with_progress(bigtxn, 0, bigtxn_progress, NULL); r = bigtxn->commit_with_progress(bigtxn, 0, bigtxn_progress, NULL);
assert(r == 0); assert(r == 0);
assert(toku_sync_fetch_and_add(&state, 1) == 1); int old_state = toku_sync_fetch_and_add(&test_state, 1);
assert(old_state == 1);
void *ret; void *ret;
r = pthread_join(w_tid, &ret); r = pthread_join(w_tid, &ret);
......
...@@ -136,12 +136,12 @@ toku_txn_destroy(DB_TXN *txn) { ...@@ -136,12 +136,12 @@ toku_txn_destroy(DB_TXN *txn) {
static int static int
toku_txn_commit(DB_TXN * txn, uint32_t flags, toku_txn_commit(DB_TXN * txn, uint32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_mo_lock) { bool release_mo_lock, bool big_txn) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
//Recursively kill off children //Recursively kill off children
if (db_txn_struct_i(txn)->child) { if (db_txn_struct_i(txn)->child) {
//commit of child sets the child pointer to NULL //commit of child sets the child pointer to NULL
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, flags, NULL, NULL, false); int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, flags, NULL, NULL, false, false);
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n"); env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n");
} }
...@@ -192,8 +192,12 @@ toku_txn_commit(DB_TXN * txn, uint32_t flags, ...@@ -192,8 +192,12 @@ toku_txn_commit(DB_TXN * txn, uint32_t flags,
// begin checkpoint logs these associations, so we must be protect // begin checkpoint logs these associations, so we must be protect
// the changing of these associations with checkpointing // the changing of these associations with checkpointing
if (release_mo_lock) { if (release_mo_lock) {
if (big_txn) {
toku_big_multi_operation_client_unlock();
} else {
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
} }
}
toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync); toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync);
if (flags!=0) { if (flags!=0) {
r = EINVAL; r = EINVAL;
...@@ -218,7 +222,7 @@ toku_txn_abort(DB_TXN * txn, ...@@ -218,7 +222,7 @@ toku_txn_abort(DB_TXN * txn,
//Recursively kill off children (abort or commit are both correct, commit is cheaper) //Recursively kill off children (abort or commit are both correct, commit is cheaper)
if (db_txn_struct_i(txn)->child) { if (db_txn_struct_i(txn)->child) {
//commit of child sets the child pointer to NULL //commit of child sets the child pointer to NULL
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, DB_TXN_NOSYNC, NULL, NULL, false); int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, DB_TXN_NOSYNC, NULL, NULL, false, false);
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent abort.\n"); env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent abort.\n");
} }
...@@ -270,7 +274,7 @@ toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) { ...@@ -270,7 +274,7 @@ toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) {
//commit of child sets the child pointer to NULL //commit of child sets the child pointer to NULL
// toku_txn_commit will take the mo_lock if not held and a non-readonly txn is found. // toku_txn_commit will take the mo_lock if not held and a non-readonly txn is found.
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, 0, NULL, NULL, false); int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, 0, NULL, NULL, false, false);
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n"); env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n");
} }
...@@ -324,18 +328,24 @@ static int ...@@ -324,18 +328,24 @@ static int
locked_txn_commit_with_progress(DB_TXN *txn, uint32_t flags, locked_txn_commit_with_progress(DB_TXN *txn, uint32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
bool holds_mo_lock = false; bool holds_mo_lock = false;
if (!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) { bool big_txn = false;
// A readonly transaction does no logging, and therefore does not TOKUTXN tokutxn = db_txn_struct_i(txn)->tokutxn;
// need the MO lock. if (!toku_txn_is_read_only(tokutxn)) {
toku_multi_operation_client_lock(); // A readonly transaction does no logging, and therefore does not need the MO lock.
holds_mo_lock = true; holds_mo_lock = true;
if (toku_txn_has_spilled_rollback(tokutxn)) {
big_txn = true;
toku_big_multi_operation_client_lock();
} else {
toku_multi_operation_client_lock();
}
} }
// cannot begin a checkpoint. // cannot begin a checkpoint.
// the multi operation lock is taken the first time we // the multi operation lock is taken the first time we
// see a non-readonly txn in the recursive commit. // see a non-readonly txn in the recursive commit.
// But released in the first-level toku_txn_commit (if taken), // But released in the first-level toku_txn_commit (if taken),
// this way, we don't hold it while we fsync the log. // this way, we don't hold it while we fsync the log.
int r = toku_txn_commit(txn, flags, poll, poll_extra, holds_mo_lock); int r = toku_txn_commit(txn, flags, poll, poll_extra, holds_mo_lock, big_txn);
return r; return r;
} }
...@@ -347,16 +357,26 @@ locked_txn_abort_with_progress(DB_TXN *txn, ...@@ -347,16 +357,26 @@ locked_txn_abort_with_progress(DB_TXN *txn,
// see a non-readonly txn in the abort (or recursive commit). // see a non-readonly txn in the abort (or recursive commit).
// But released here so we don't have to hold additional state. // But released here so we don't have to hold additional state.
bool holds_mo_lock = false; bool holds_mo_lock = false;
if (!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) { bool big_txn = false;
// A readonly transaction does no logging, and therefore does not TOKUTXN tokutxn = db_txn_struct_i(txn)->tokutxn;
// need the MO lock. if (!toku_txn_is_read_only(tokutxn)) {
toku_multi_operation_client_lock(); // A readonly transaction does no logging, and therefore does not need the MO lock.
holds_mo_lock = true; holds_mo_lock = true;
if (toku_txn_has_spilled_rollback(tokutxn)) {
big_txn = true;
toku_big_multi_operation_client_lock();
} else {
toku_multi_operation_client_lock();
}
} }
int r = toku_txn_abort(txn, poll, poll_extra); int r = toku_txn_abort(txn, poll, poll_extra);
if (holds_mo_lock) { if (holds_mo_lock) {
if (big_txn) {
toku_big_multi_operation_client_unlock();
} else {
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
} }
}
return r; return r;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment