Commit b2fc97a4 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #350

Bugfix with releasing locks/unlocking a transaction.

DBC->c_get (DB_SET)
and DB->put (DB_YESOVERWRITE) now perform locking
Secondary dbs not included (behavior unknown).

Several tests added for c_get (DB_SET) and DB->put(DB_YESOVERWRITE).

Lock tree only created if INIT_TXN is enabled during env creation.


git-svn-id: file:///svn/tokudb@2191 c7de825b-a66e-492c-adef-691d508d4ae1
parent c1de576b
...@@ -570,7 +570,7 @@ static int __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert, ...@@ -570,7 +570,7 @@ static int __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert,
u_int32_t i; u_int32_t i;
for (i = 0; i < numfound; i++) { for (i = 0; i < numfound; i++) {
if (rt != NULL) { if (rt != NULL) {
r = toku_rt_delete(rt, to_insert); r = toku_rt_delete(rt, &tree->buf[i]);
if (r!=0) return __toku_lt_panic(tree, r); if (r!=0) return __toku_lt_panic(tree, r);
} }
/* /*
...@@ -1207,6 +1207,8 @@ int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn) { ...@@ -1207,6 +1207,8 @@ int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn) {
ranges += toku_rt_get_size(selfwrite); ranges += toku_rt_get_size(selfwrite);
r = __toku_lt_border_delete(tree, selfwrite); r = __toku_lt_border_delete(tree, selfwrite);
if (r!=0) return __toku_lt_panic(tree, r); if (r!=0) return __toku_lt_panic(tree, r);
r = __toku_lt_free_contents(tree, selfwrite, NULL);
if (r!=0) return __toku_lt_panic(tree, r);
} }
if (selfread || selfwrite) toku_rth_delete(tree->rth, txn); if (selfread || selfwrite) toku_rth_delete(tree->rth, txn);
......
...@@ -363,6 +363,7 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -363,6 +363,7 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn,
* Returns: * Returns:
* 0: Success. * 0: Success.
* EINVAL: If (tree == NULL || txn == NULL). * EINVAL: If (tree == NULL || txn == NULL).
* EINVAL: If panicking.
* *** Note that txn == NULL is not supported at this time. * *** Note that txn == NULL is not supported at this time.
*/ */
int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn); int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn);
......
...@@ -42,7 +42,7 @@ endif ...@@ -42,7 +42,7 @@ endif
LIBNAME=libdb.$(LIBEXT) LIBNAME=libdb.$(LIBEXT)
# GCOV_FLAGS = -fprofile-arcs -ftest-coverage # GCOV_FLAGS = -fprofile-arcs -ftest-coverage
CFLAGS += -Wall -Werror $(OPTFLAGS) -g $(GCOV_FLAGS) CFLAGS += -Wall -Werror $(OPTFLAGS) -g3 -ggdb3 $(GCOV_FLAGS)
TDB_CPPFLAGS = -I../../include TDB_CPPFLAGS = -I../../include
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <db.h> #include <db.h>
#include "../../newbrt/brttypes.h"
#ifndef DB_YESOVERWRITE #ifndef DB_YESOVERWRITE
#define DB_YESOVERWRITE 0 #define DB_YESOVERWRITE 0
...@@ -15,6 +16,7 @@ int verbose=0; ...@@ -15,6 +16,7 @@ int verbose=0;
#define CKERR(r) ({ if (r!=0) fprintf(stderr, "%s:%d error %d %s\n", __FILE__, __LINE__, r, db_strerror(r)); assert(r==0); }) #define CKERR(r) ({ if (r!=0) fprintf(stderr, "%s:%d error %d %s\n", __FILE__, __LINE__, r, db_strerror(r)); assert(r==0); })
#define CKERR2(r,r2) ({ if (r!=r2) fprintf(stderr, "%s:%d error %d %s, expected %d\n", __FILE__, __LINE__, r, db_strerror(r), r2); assert(r==r2); }) #define CKERR2(r,r2) ({ if (r!=r2) fprintf(stderr, "%s:%d error %d %s, expected %d\n", __FILE__, __LINE__, r, db_strerror(r), r2); assert(r==r2); })
#define CKERR2s(r,r2,r3) ({ if (r!=r2 && r!=r3) fprintf(stderr, "%s:%d error %d %s, expected %d or %d\n", __FILE__, __LINE__, r, db_strerror(r), r2,r3); assert(r==r2||r==r3); })
void parse_args (int argc, const char *argv[]) { void parse_args (int argc, const char *argv[]) {
const char *argv0=argv[0]; const char *argv0=argv[0];
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include <string.h>
#include <db.h>
#include <assert.h>
#include <errno.h>
#include <sys/stat.h>
#include "test.h"
// DIR is defined in the Makefile
int dbtcmp(DBT *dbt1, DBT *dbt2) {
int r;
r = dbt1->size - dbt2->size; if (r) return r;
return memcmp(dbt1->data, dbt2->data, dbt1->size);
}
DB *db;
DB_TXN* txns[(int)256];
DB_ENV* dbenv;
DBC* cursors[(int)256];
void put(BOOL success, char txn, int _key, int _data) {
assert(txns[(int)txn]);
int r;
DBT key;
DBT data;
r = db->put(db, txns[(int)txn],
dbt_init(&key, &_key, sizeof(int)),
dbt_init(&data, &_data, sizeof(int)),
DB_YESOVERWRITE);
if (success) CKERR(r);
else CKERR2s(r, DB_LOCK_DEADLOCK, DB_LOCK_NOTGRANTED);
}
void cget(BOOL success, BOOL find, char txn, int _key, int _data) {
assert(txns[(int)txn] && cursors[(int)txn]);
int r;
DBT key;
DBT data;
r = cursors[(int)txn]->c_get(cursors[(int)txn],
dbt_init(&key, &_key, sizeof(int)),
dbt_init(&data, &_data, sizeof(int)),
DB_SET);
if (success) {
if (find) CKERR(r);
else CKERR2(r, DB_NOTFOUND);
}
else CKERR2s(r, DB_LOCK_DEADLOCK, DB_LOCK_NOTGRANTED);
}
void init_txn(char name) {
int r;
assert(!txns[(int)name]);
r = dbenv->txn_begin(dbenv, NULL, &txns[(int)name], DB_TXN_NOWAIT);
CKERR(r);
assert(txns[(int)name]);
}
void init_dbc(char name) {
int r;
assert(!cursors[(int)name] && txns[(int)name]);
r = db->cursor(db, txns[(int)name], &cursors[(int)name], 0);
CKERR(r);
assert(cursors[(int)name]);
}
void commit_txn(char name) {
int r;
assert(txns[(int)name] && !cursors[(int)name]);
r = txns[(int)name]->commit(txns[(int)name], 0);
CKERR(r);
txns[(int)name] = NULL;
}
void abort_txn(char name) {
int r;
assert(txns[(int)name] && !cursors[(int)name]);
r = txns[(int)name]->abort(txns[(int)name]);
CKERR(r);
txns[(int)name] = NULL;
}
void close_dbc(char name) {
int r;
assert(cursors[(int)name]);
r = cursors[(int)name]->c_close(cursors[(int)name]);
CKERR(r);
cursors[(int)name] = NULL;
}
void early_commit(char name) {
assert(cursors[(int)name] && txns[(int)name]);
close_dbc(name);
commit_txn(name);
}
void early_abort(char name) {
assert(cursors[(int)name] && txns[(int)name]);
close_dbc(name);
abort_txn(name);
}
void setup_dbs(u_int32_t dup_flags) {
int r;
system("rm -rf " DIR);
mkdir(DIR, 0777);
dbenv = NULL;
db = NULL;
/* Open/create primary */
r = db_env_create(&dbenv, 0);
CKERR(r);
r = dbenv->open(dbenv, DIR, DB_CREATE | DB_PRIVATE | DB_INIT_MPOOL | DB_INIT_TXN, 0600);
CKERR(r);
r = db_create(&db, dbenv, 0);
CKERR(r);
if (dup_flags) {
r = db->set_flags(db, dup_flags);
CKERR(r);
}
r = db->open(db, NULL, "foobar.db", NULL, DB_BTREE, DB_CREATE, 0600);
CKERR(r);
char a;
for (a = 'a'; a <= 'z'; a++) {
init_txn(a);
init_dbc(a);
}
}
void close_dbs(void) {
char a;
for (a = 'a'; a <= 'z'; a++) {
if (cursors[(int)a]) close_dbc(a);
if (txns[(int)a]) commit_txn(a);
}
int r;
r = db->close(db, 0);
CKERR(r);
db = NULL;
r = dbenv->close(dbenv, 0);
CKERR(r);
dbenv = NULL;
}
void test(u_int32_t dup_flags) {
/* ********************************************************************** */
setup_dbs(dup_flags);
close_dbs();
/* ********************************************************************** */
setup_dbs(dup_flags);
cget(TRUE, FALSE, 'a', 1, 1);
close_dbs();
/* ********************************************************************** */
setup_dbs(dup_flags);
put(TRUE, 'a', 1, 1);
close_dbs();
/* ********************************************************************** */
setup_dbs(dup_flags);
cget(TRUE, FALSE, 'a', 1, 1);
cget(TRUE, FALSE, 'a', 2, 1);
close_dbs();
/* ********************************************************************** */
setup_dbs(dup_flags);
cget(TRUE, FALSE, 'a', 1, 1);
cget(TRUE, FALSE, 'a', 1, 1);
close_dbs();
/* ********************************************************************** */
setup_dbs(dup_flags);
cget(TRUE, FALSE, 'a', 1, 1);
cget(TRUE, FALSE, 'b', 2, 1);
close_dbs();
/* ********************************************************************** */
setup_dbs(dup_flags);
cget(TRUE, FALSE, 'a', 1, 1);
cget(TRUE, FALSE, 'b', 1, 1);
close_dbs();
/* ********************************************************************** */
setup_dbs(dup_flags);
cget(TRUE, FALSE, 'a', 1, 1);
cget(TRUE, FALSE, 'b', 1, 1);
put(FALSE, 'a', 1, 1);
early_commit('b');
put(TRUE, 'a', 1, 1);
cget(TRUE, TRUE, 'a', 1, 1);
cget(TRUE, FALSE, 'a', 2, 1);
cget(FALSE, TRUE, 'c', 1, 1);
early_commit('a');
cget(TRUE, TRUE, 'c', 1, 1);
close_dbs();
/* ********************************************************************** */
setup_dbs(dup_flags);
cget(TRUE, FALSE, 'a', 1, 1);
cget(TRUE, FALSE, 'b', 1, 1);
put(FALSE, 'a', 1, 1);
early_commit('b');
put(TRUE, 'a', 1, 1);
cget(TRUE, TRUE, 'a', 1, 1);
cget(TRUE, FALSE, 'a', 2, 1);
cget(FALSE, TRUE, 'c', 1, 1);
early_abort('a');
cget(TRUE, FALSE, 'c', 1, 1);
close_dbs();
/* ********************************************************************** */
}
int main() {
test(0);
test(DB_DUP | DB_DUPSORT);
return 0;
}
...@@ -755,22 +755,23 @@ int db_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -755,22 +755,23 @@ int db_env_create(DB_ENV ** envp, u_int32_t flags) {
ydb_lock(); int r = toku_env_create(envp, flags); ydb_unlock(); return r; ydb_lock(); int r = toku_env_create(envp, flags); ydb_unlock(); return r;
} }
static void toku_txn_release_locks(DB_TXN* txn) { static int toku_txn_release_locks(DB_TXN* txn) {
assert(txn); assert(txn);
toku_lth* lth = txn->i->lth; toku_lth* lth = txn->i->lth;
assert(lth); assert(lth);
int r; int r;
int r2 = 0;
toku_lth_start_scan(lth); toku_lth_start_scan(lth);
toku_lock_tree* next = toku_lth_next(lth); toku_lock_tree* next = toku_lth_next(lth);
while (next) { while (next) {
r = toku_lt_unlock(next, txn); r = toku_lt_unlock(next, txn);
/* Only NULL parameters can give a non 0 return value. */ if (r!=0 && !r2) r2 = r;
assert(r==0);
next = toku_lth_next(lth); next = toku_lth_next(lth);
} }
toku_lth_close(lth); toku_lth_close(lth);
txn->i->lth = NULL; txn->i->lth = NULL;
return r2;
} }
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) { static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
...@@ -788,11 +789,11 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) { ...@@ -788,11 +789,11 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
toku_free(txn->i->tokutxn); toku_free(txn->i->tokutxn);
} }
// Cleanup */ // Cleanup */
toku_txn_release_locks(txn); int r2 = toku_txn_release_locks(txn);
if (txn->i) if (txn->i)
toku_free(txn->i); toku_free(txn->i);
toku_free(txn); toku_free(txn);
return r; // The txn is no good after the commit. return r ? r : r2; // The txn is no good after the commit.
} }
static u_int32_t toku_txn_id(DB_TXN * txn) { static u_int32_t toku_txn_id(DB_TXN * txn) {
...@@ -1134,15 +1135,34 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) ...@@ -1134,15 +1135,34 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag)
static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) { static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
DB *db = c->dbp; DB *db = c->dbp;
// DB_TXN *txn = c->i->txn; DB_TXN *txn = c->i->txn;
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
u_int32_t get_flag = get_main_cursor_flag(flag); u_int32_t get_flag = get_main_cursor_flag(flag);
int r; int r;
if (db->i->primary==0) { if (db->i->primary==0) {
r = toku_c_get_noassociate(c, key, data, flag); r = toku_c_get_noassociate(c, key, data, flag);
if (db->i->lt) {
if (r != DB_NOTFOUND && r != 0 && r != DB_KEYEMPTY) return r;
int r2 = 0; int r2 = 0;
switch (get_flag) { switch (get_flag) {
case (DB_SET): {
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, toku_lt_neg_infinity,
key, toku_lt_infinity);
}
else {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, toku_lt_neg_infinity,
key, data);
}
break;
}
}
if (r2!=0) return r2;
}
}
/* /*
These should be done but were not tested prior to commit. These should be done but were not tested prior to commit.
case (DB_CURRENT): { case (DB_CURRENT): {
...@@ -1250,9 +1270,6 @@ These are not ready and are just notes. ...@@ -1250,9 +1270,6 @@ These are not ready and are just notes.
assert(FALSE); assert(FALSE);
} }
*/ */
}
if (r2!=0) return r2;
}
else { else {
// It's a c_get on a secondary. // It's a c_get on a secondary.
DBT primary_key; DBT primary_key;
...@@ -1818,6 +1835,7 @@ static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt) { ...@@ -1818,6 +1835,7 @@ static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt) {
static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) { static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
// Warning. Should check arguments. Should check return codes on malloc and open and so forth. // Warning. Should check arguments. Should check return codes on malloc and open and so forth.
BOOL transactions = (db->dbenv->i->open_flags & DB_INIT_TXN) != 0;
int openflags = 0; int openflags = 0;
int r; int r;
...@@ -1878,7 +1896,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db ...@@ -1878,7 +1896,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
db->i->open_flags = flags; db->i->open_flags = flags;
db->i->open_mode = mode; db->i->open_mode = mode;
/* TODO: Only create lock tree if necessary! (lock subsystem?) */ if (transactions) {
r = toku_lt_create(&db->i->lt, db, FALSE, r = toku_lt_create(&db->i->lt, db, FALSE,
toku_db_lt_panic, db->dbenv->i->max_locks, toku_db_lt_panic, db->dbenv->i->max_locks,
&db->dbenv->i->num_locks, &db->dbenv->i->num_locks,
...@@ -1887,6 +1905,9 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db ...@@ -1887,6 +1905,9 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
if (r!=0) goto error_cleanup; if (r!=0) goto error_cleanup;
r = toku_lt_set_txn_add_lt_callback(db->i->lt, toku_txn_add_lt); r = toku_lt_set_txn_add_lt_callback(db->i->lt, toku_txn_add_lt);
assert(r==0); assert(r==0);
}
r = toku_brt_open(db->i->brt, db->i->full_fname, fname, dbname, r = toku_brt_open(db->i->brt, db->i->full_fname, fname, dbname,
is_db_create, is_db_excl, is_db_unknown, is_db_create, is_db_excl, is_db_unknown,
...@@ -1896,6 +1917,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db ...@@ -1896,6 +1917,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
if (r != 0) if (r != 0)
goto error_cleanup; goto error_cleanup;
if (db->i->lt) {
unsigned int brtflags; unsigned int brtflags;
BOOL dups; BOOL dups;
/* Whether we have dups is only known starting now. */ /* Whether we have dups is only known starting now. */
...@@ -1904,6 +1926,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db ...@@ -1904,6 +1926,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
r = toku_lt_set_dups(db->i->lt, dups); r = toku_lt_set_dups(db->i->lt, dups);
/* toku_lt_set_dups cannot return an error here. */ /* toku_lt_set_dups cannot return an error here. */
assert(r==0); assert(r==0);
}
return 0; return 0;
...@@ -1966,7 +1989,10 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, ...@@ -1966,7 +1989,10 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
#endif #endif
} }
} }
if (db->i->lt) {
r = toku_lt_acquire_write_lock(db->i->lt, txn, key, data);
if (r!=0) return r;
}
r = toku_brt_insert(db->i->brt, key, data, txn ? txn->i->tokutxn : 0); r = toku_brt_insert(db->i->brt, key, data, txn ? txn->i->tokutxn : 0);
//printf("%s:%d %d=__toku_db_put(...)\n", __FILE__, __LINE__, r); //printf("%s:%d %d=__toku_db_put(...)\n", __FILE__, __LINE__, r);
return r; return r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment