Commit 945b2dda authored by Rich Prohaska's avatar Rich Prohaska

merge db->truncate to main. closes #968

git-svn-id: file:///svn/tokudb@5218 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5398881b
......@@ -2125,6 +2125,94 @@ static u_int32_t get_roothash (BRT brt, int rootnum) {
return rh->fullhash;
}
// open a file for use by the brt. if the file does not exist, create it.
static int brt_open_file(BRT brt, const char *fname, const char *fname_in_env, int is_create, TOKUTXN txn, int *fdp) {
brt = brt;
mode_t mode = 0777;
int r;
int fd = open(fname, O_RDWR, mode);
if (fd==-1) {
r = errno;
if (errno == ENOENT) {
if (!is_create) {
return r;
}
fd = open(fname, O_RDWR | O_CREAT, mode);
if (fd == -1) {
r = errno; return r;
}
r = toku_logger_log_fcreate(txn, fname_in_env, mode);
if (r != 0) {
close(fd); return r;
}
} else
return r;
}
*fdp = fd;
return 0;
}
// allocate and initialize a brt header.
static int brt_alloc_init_header(BRT t, const char *dbname, TOKUTXN txn) {
int r;
assert(t->h == 0);
if ((MALLOC(t->h))==0) {
assert(errno==ENOMEM);
r = ENOMEM;
if (0) { died2: toku_free(t->h); }
t->h=0;
return r;
}
t->h->dirty=1;
if ((MALLOC_N(1, t->h->flags_array))==0) { r = errno; if (0) { died3: toku_free(t->h->flags_array); } goto died2; }
t->h->flags_array[0] = t->flags;
t->h->nodesize=t->nodesize;
t->h->freelist=-1;
t->h->unused_memory=2*t->nodesize;
toku_fifo_create(&t->h->fifo);
t->h->root_put_counter = global_root_put_counter++;
if (dbname) {
t->h->n_named_roots = 1;
if ((MALLOC_N(1, t->h->names))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died4: if (dbname) toku_free(t->h->names); } goto died3; }
if ((MALLOC_N(1, t->h->roots))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died5: if (dbname) toku_free(t->h->roots); } goto died4; }
if ((MALLOC_N(1, t->h->root_hashes))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died6: if (dbname) toku_free(t->h->root_hashes); } goto died5; }
if ((t->h->names[0] = toku_strdup(dbname))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died7: if (dbname) toku_free(t->h->names[0]); } goto died6; }
t->h->roots[0] = t->nodesize;
compute_and_fill_remembered_hash(t, 0);
} else {
MALLOC_N(1, t->h->roots); assert(t->h->roots);
MALLOC_N(1, t->h->root_hashes); assert(t->h->root_hashes);
t->h->roots[0] = t->nodesize;
compute_and_fill_remembered_hash(t, 0);
t->h->n_named_roots = -1;
t->h->names=0;
}
{
LOGGEDBRTHEADER lh = {.size= toku_serialize_brt_header_size(t->h),
.flags = t->flags,
.nodesize = t->h->nodesize,
.freelist = t->h->freelist,
.unused_memory = t->h->unused_memory,
.n_named_roots = t->h->n_named_roots };
if (t->h->n_named_roots>=0) {
lh.u.many.names = t->h->names;
lh.u.many.roots = t->h->roots;
} else {
lh.u.one.root = t->h->roots[0];
}
if ((r=toku_log_fheader(toku_txn_logger(txn), (LSN*)0, 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { goto died7; }
}
if ((r=setup_initial_brt_root_node(t, t->nodesize, toku_txn_logger(txn)))!=0) { goto died7; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
u_int32_t fullhash = toku_cachefile_fullhash_of_header(t->cf);
t->h->fullhash = fullhash;
if ((r=toku_cachetable_put(t->cf, 0, fullhash, t->h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0))) { goto died7; }
return r;
}
int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char *dbname, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, DB *db) {
/* If dbname is NULL then we setup to hold a single tree. Otherwise we setup an array. */
......@@ -2156,26 +2244,11 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
t->db = db;
t->txn_that_created = 0; // Uses 0 for no transaction.
{
int fd = open(fname, O_RDWR, 0777);
if (fd==-1) {
r = errno;
if (errno==ENOENT) {
if (!is_create) {
t->database_name=0;
goto died0a;
}
fd = open(fname, O_RDWR | O_CREAT, 0777);
t->txn_that_created = toku_txn_get_txnid(txn);
if (fd==-1) {
r = errno;
t->database_name=0;
goto died0a;
}
r = toku_logger_log_fcreate(txn, fname_in_env, 0777);
if (r!=0) goto died0a;
} else
goto died0a;
}
int fd = -1;
r = brt_open_file(t, fname, fname_in_env, is_create, txn, &fd);
if (r != 0) {
t->database_name = 0; goto died0a;
}
r=toku_cachetable_openfd(&t->cf, cachetable, fd, fname_in_env);
if (r != 0) goto died0a;
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf));
......@@ -2195,59 +2268,8 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
if (is_create) {
r = toku_read_and_pin_brt_header(t->cf, &t->h);
if (r==-1) {
/* construct a new header. */
if ((MALLOC(t->h))==0) {
assert(errno==ENOMEM);
r = ENOMEM;
if (0) { died2: toku_free(t->h); }
t->h=0;
goto died_after_read_and_pin;
}
t->h->dirty=1;
if ((MALLOC_N(1, t->h->flags_array))==0) { r = errno; if (0) { died3: toku_free(t->h->flags_array); } goto died2; }
t->h->flags_array[0] = t->flags;
t->h->nodesize=t->nodesize;
t->h->freelist=-1;
t->h->unused_memory=2*t->nodesize;
toku_fifo_create(&t->h->fifo);
t->h->root_put_counter = global_root_put_counter++;
if (dbname) {
t->h->n_named_roots = 1;
if ((MALLOC_N(1, t->h->names))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died4: if (dbname) toku_free(t->h->names); } goto died3; }
if ((MALLOC_N(1, t->h->roots))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died5: if (dbname) toku_free(t->h->roots); } goto died4; }
if ((MALLOC_N(1, t->h->root_hashes))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died6: if (dbname) toku_free(t->h->root_hashes); } goto died5; }
if ((t->h->names[0] = toku_strdup(dbname))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died7: if (dbname) toku_free(t->h->names[0]); } goto died6; }
t->h->roots[0] = t->nodesize;
compute_and_fill_remembered_hash(t, 0);
} else {
MALLOC_N(1, t->h->roots); assert(t->h->roots);
MALLOC_N(1, t->h->root_hashes); assert(t->h->root_hashes);
t->h->roots[0] = t->nodesize;
compute_and_fill_remembered_hash(t, 0);
t->h->n_named_roots = -1;
t->h->names=0;
}
{
LOGGEDBRTHEADER lh = {.size= toku_serialize_brt_header_size(t->h),
.flags = t->flags,
.nodesize = t->h->nodesize,
.freelist = t->h->freelist,
.unused_memory = t->h->unused_memory,
.n_named_roots = t->h->n_named_roots };
if (t->h->n_named_roots>=0) {
lh.u.many.names = t->h->names;
lh.u.many.roots = t->h->roots;
} else {
lh.u.one.root = t->h->roots[0];
}
if ((r=toku_log_fheader(toku_txn_logger(txn), (LSN*)0, 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { goto died7; }
}
if ((r=setup_initial_brt_root_node(t, t->nodesize, toku_txn_logger(txn)))!=0) { goto died7; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
u_int32_t fullhash = toku_cachefile_fullhash_of_header(t->cf);
t->h->fullhash = fullhash;
if ((r=toku_cachetable_put(t->cf, 0, fullhash, t->h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0))) { goto died7; }
r = brt_alloc_init_header(t, dbname, txn);
if (r != 0) goto died_after_read_and_pin;
}
else if (r!=0) {
goto died_after_read_and_pin;
......@@ -2320,6 +2342,30 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
return 0;
}
int toku_brt_reopen(BRT brt, const char *fname, const char *fname_in_env, TOKUTXN txn) {
int r;
// create a new file
int fd = -1;
r = brt_open_file(brt, fname, fname_in_env, TRUE, txn, &fd);
if (r != 0) return r;
// set the cachefile
r = toku_cachefile_set_fd(brt->cf, fd, fname_in_env);
assert(r == 0);
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(brt->cf));
// init the tree header
assert(brt->h == 0);
r = toku_read_and_pin_brt_header(brt->cf, &brt->h);
if (r == -1) {
r = brt_alloc_init_header(brt, NULL, txn);
assert(r == 0);
r = toku_unpin_brt_header(brt);
}
return r;
}
int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) {
int r;
int r2 = 0;
......@@ -2422,6 +2468,10 @@ int toku_close_brt (BRT brt, TOKULOGGER logger) {
return 0;
}
int toku_brt_flush (BRT brt) {
return toku_cachefile_flush(brt->cf);
}
int toku_brt_debug_mode = 0;//strcmp(key,"hello387")==0;
CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *roothash) {
......@@ -3763,6 +3813,14 @@ int toku_brt_height_of_root(BRT brt, int *height) {
return 0;
}
int toku_brt_get_cursor_count (BRT brt) {
int n = 0;
struct list *list;
for (list = brt->cursors.next; list != &brt->cursors; list = list->next)
n += 1;
return n;
}
struct omt_compressor_state {
struct mempool *new_kvspace;
OMT omt;
......
......@@ -23,6 +23,12 @@ int toku_brt_set_bt_compare(BRT, int (*bt_compare)(DB *, const DBT*, const DBT*)
int toku_brt_set_dup_compare(BRT, int (*dup_compare)(DB *, const DBT*, const DBT*));
int brt_set_cachetable(BRT, CACHETABLE);
int toku_brt_open(BRT, const char *fname, const char *fname_in_env, const char *dbname, int is_create, int only_create, CACHETABLE ct, TOKUTXN txn, DB *db);
int toku_brt_reopen (BRT brt, const char *fname, const char *fname_in_env, TOKUTXN txn);
// reopen the tree
// effect: attach the tree to a new file
// returns: 0 if success
int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags);
int toku_brt_insert (BRT, DBT *, DBT *, TOKUTXN);
......@@ -30,11 +36,21 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v);
int toku_brt_delete (BRT brt, DBT *k, TOKUTXN);
int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN); // Delete a pair only if both k and v are equal according to the comparison function.
int toku_close_brt (BRT, TOKULOGGER);
int toku_dump_brt (BRT brt);
void brt_fsync (BRT); /* fsync, but don't clear the caches. */
void brt_fsync (BRT); /* fsync, but don't clear the caches. */
void brt_flush (BRT); /* fsync and clear the caches. */
int toku_brt_get_cursor_count (BRT brt);
// get the number of cursors in the tree
// returns: the number of cursors.
// asserts: the number of cursors >= 0.
int toku_brt_flush (BRT brt);
// effect: the tree's cachefile is flushed
// returns: 0 if success
/* create and initialize a cache table
cachesize is the upper limit on the size of the size of the values in the table
pass 0 if you want the default */
......
......@@ -115,11 +115,21 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf) {
return ENOENT;
}
static FILENUM next_filenum_to_use={0};
static void cachefile_init_filenum(CACHEFILE newcf, int fd, const char *fname, struct fileid fileid) \
{
newcf->filenum.fileid = next_filenum_to_use.fileid++;
newcf->fd = fd;
newcf->fileid = fileid;
newcf->fname = fname ? toku_strdup(fname) : 0;
}
// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, const char *fname) {
int r;
CACHEFILE extant;
static FILENUM next_filenum_to_use={0};
struct stat statbuf;
struct fileid fileid;
memset(&fileid, 0, sizeof(fileid));
......@@ -145,14 +155,11 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, const char *fna
}
{
CACHEFILE MALLOC(newcf);
newcf->filenum.fileid = next_filenum_to_use.fileid++;
newcf->cachetable = t;
cachefile_init_filenum(newcf, fd, fname, fileid);
newcf->refcount = 1;
newcf->header_fullhash = toku_cachetable_hash(newcf, 0);
newcf->next = t->cachefiles;
newcf->refcount = 1;
newcf->fd = fd;
newcf->cachetable = t;
newcf->fileid = fileid;
newcf->fname = fname ? toku_strdup(fname) : 0;
t->cachefiles = newcf;
*cf = newcf;
return 0;
......@@ -165,6 +172,31 @@ int toku_cachetable_openf (CACHEFILE *cf, CACHETABLE t, const char *fname, int f
return toku_cachetable_openfd (cf, t, fd, fname);
}
int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) {
int r;
struct stat statbuf;
r=fstat(fd, &statbuf);
if (r != 0) {
r=errno; close(fd); return r;
}
close(cf->fd);
cf->fd = -1;
if (cf->fname) {
toku_free(cf->fname);
cf->fname = 0;
}
struct fileid fileid;
memset(&fileid, 0, sizeof fileid);
fileid.st_dev = statbuf.st_dev;
fileid.st_ino = statbuf.st_ino;
cachefile_init_filenum(cf, fd, fname, fileid);
return 0;
}
int toku_cachefile_fd (CACHEFILE cf) {
return cf->fd;
}
static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) {
if (list==0) return 0;
else if (list==cf) {
......@@ -209,6 +241,10 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger) {
}
}
int toku_cachefile_flush (CACHEFILE cf) {
return cachefile_flush_and_remove(cf);
}
int toku_cachetable_assert_all_unpinned (CACHETABLE t) {
u_int32_t i;
int some_pinned=0;
......@@ -804,9 +840,6 @@ int cachefile_pread (CACHEFILE cf, void *buf, size_t count, off_t offset) {
}
#endif
int toku_cachefile_fd (CACHEFILE cf) {
return cf->fd;
}
/* debug functions */
......
......@@ -64,15 +64,26 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke
int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
int toku_cachefile_close (CACHEFILE*, TOKULOGGER);
//int cachefile_flush (CACHEFILE); /* Flush everything related to the VOID* to disk and free all memory. Don't destroy the cachetable. */
void toku_cachefile_refup (CACHEFILE cfp); // Increment the reference count. Use close to decrement it.
int toku_cachefile_flush (CACHEFILE);
// effect: flush everything owned by the cachefile.
// returns: 0 if success
void toku_cachefile_refup (CACHEFILE cfp);
// Increment the reference count. Use close to decrement it.
// Return on success (different from pread and pwrite)
//int cachefile_pwrite (CACHEFILE, const void *buf, size_t count, off_t offset);
//int cachefile_pread (CACHEFILE, void *buf, size_t count, off_t offset);
int toku_cachefile_fd (CACHEFILE);
// get the file descriptor bound to this cachefile
// returns: the file descriptor
int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname);
// set the cachefile's fd and fname.
// effect: bind the cachefile to a new fd and fname. the old fd is closed.
// returns: 0 if success
// Useful for debugging
void toku_cachetable_print_state (CACHETABLE ct);
......
......@@ -54,7 +54,7 @@ TDB_TESTS = $(patsubst %.c,%.tdb,$(SRCS))
BDB_DONTRUN = bug627 test_abort1 keyrange keyrange-unflat keyrange-dupsort keyrange-dupsort-unflat manyfiles test938c
BDB_TESTS = $(patsubst %.c,%.bdb,$(filter-out $(patsubst %,%.c,$(BDB_DONTRUN)),$(SRCS)))
TDB_TESTS_THAT_SHOULD_FAIL = test_groupcommit_count test-recover1 test-recover2 test-recover3 test_txn_recover3 test944
TDB_TESTS_THAT_SHOULD_FAIL = test_groupcommit_count test-recover1 test-recover2 test-recover3 test_txn_recover3 test944 test_truncate_txn_abort test_truncate_subdb
TDB_TESTS_THAT_SHOULD_FAIL_LIT = test_log2.recover test_log3.recover test_log4.recover test_log5.recover test_log6.recover test_log7.recover test_log8.recover test_log9.recover test_log10.recover
ALL_TESTS = $(TDB_TESTS) $(BDB_TESTS)
......
// truncate a database without transactions and verify that
// the database is empty after the truncate is performed
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <db.h>
#include "test.h"
int test_truncate(int n) {
int r;
DB_ENV *env;
DB *db;
DBC *cursor;
r = db_env_create(&env, 0); assert(r == 0);
r = env->open(env, ENVDIR, DB_INIT_MPOOL + DB_PRIVATE + DB_CREATE, 0777); assert(r == 0);
int i;
// populate the tree
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
for (i=0; i<n; i++) {
int k = htonl(i); int v = i;
DBT key, val;
r = db->put(db, 0, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
}
r = db->close(db, 0); assert(r == 0);
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_UNKNOWN, 0, 0777); assert(r == 0);
// walk the tree - expect n rows
i = 0;
r = db->cursor(db, 0, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == n);
// truncate the tree
u_int32_t row_count = 0;
r = db->truncate(db, 0, &row_count, 0); assert(r == 0);
// walk the tree - expect 0 rows
i = 0;
r = db->cursor(db, 0, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == 0);
r = db->close(db, 0); assert(r == 0);
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_UNKNOWN, 0, 0777); assert(r == 0);
// walk the tree - expect 0 rows
i = 0;
r = db->cursor(db, 0, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == 0);
r = db->close(db, 0); assert(r == 0);
r = env->close(env, 0); assert(r == 0);
return 0;
}
int main(int argc, const char *argv[]) {
parse_args(argc, argv);
int nodesize = 1024*1024;
int leafentry = 25;
int n = (nodesize/leafentry) * 2;
system("rm -rf " ENVDIR);
mkdir(ENVDIR, 0777);
int r = test_truncate(n);
return r;
}
// truncate a named database
// verify that the database is empty
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <db.h>
#include "test.h"
int test_truncate_subdb(int n) {
int r;
DB_ENV *env;
DB *db;
DBC *cursor;
r = db_env_create(&env, 0); assert(r == 0);
r = env->open(env, ENVDIR, DB_INIT_MPOOL + DB_PRIVATE + DB_CREATE, 0777); assert(r == 0);
int i;
// populate the tree
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", "a", DB_BTREE, DB_CREATE, 0777); assert(r == 0);
for (i=0; i<1; i++) {
int k = htonl(i); int v = i;
DBT key, val;
r = db->put(db, 0, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
}
r = db->close(db, 0); assert(r == 0);
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", "b", DB_BTREE, DB_CREATE, 0777); assert(r == 0);
for (i=0; i<n; i++) {
int k = htonl(i); int v = i;
DBT key, val;
r = db->put(db, 0, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
}
r = db->close(db, 0); assert(r == 0);
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", "b", DB_UNKNOWN, 0, 0777); assert(r == 0);
// walk the tree - expect n rows
i = 0;
r = db->cursor(db, 0, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == n);
// truncate the tree
u_int32_t row_count = 0;
r = db->truncate(db, 0, &row_count, 0); assert(r == 0);
// walk the tree - expect 0 rows
i = 0;
r = db->cursor(db, 0, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == 0);
r = db->close(db, 0); assert(r == 0);
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", "b", DB_UNKNOWN, 0, 0777); assert(r == 0);
// walk the tree - expect 0 rows
i = 0;
r = db->cursor(db, 0, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == 0);
r = db->close(db, 0); assert(r == 0);
r = env->close(env, 0); assert(r == 0);
return 0;
}
int main(int argc, const char *argv[]) {
parse_args(argc, argv);
int nodesize = 1024*1024;
int leafentry = 25;
int n = (nodesize/leafentry) * 2;
system("rm -rf " ENVDIR);
mkdir(ENVDIR, 0777);
int r = test_truncate_subdb(n);
return r;
}
// truncate a database within a transaction
// abort the transaction and verify that the database is not empty
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <db.h>
#include "test.h"
int test_truncate_txn_abort(int n) {
int r;
DB_ENV *env;
DB *db;
DBC *cursor;
r = db_env_create(&env, 0); assert(r == 0);
r = env->open(env, ENVDIR, DB_INIT_MPOOL + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, 0777); assert(r == 0);
int i;
// populate the tree
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
for (i=0; i<n; i++) {
int k = htonl(i); int v = i;
DBT key, val;
r = db->put(db, 0, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
}
r = db->close(db, 0); assert(r == 0);
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_UNKNOWN, DB_AUTO_COMMIT, 0777); assert(r == 0);
DB_TXN *txn;
// walk - expect n rows
r = env->txn_begin(env, NULL, &txn, 0); assert(r == 0);
i = 0;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == n);
r = txn->commit(txn, 0); assert(r == 0);
// truncate the db
r = env->txn_begin(env, NULL, &txn, 0); assert(r == 0);
u_int32_t row_count = 0;
r = db->truncate(db, txn, &row_count, 0); assert(r == 0);
r = txn->abort(txn); assert(r == 0);
// walk - expect n rows
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
i = 0;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == n);
r = txn->commit(txn, 0); assert(r == 0);
r = db->close(db, 0); assert(r == 0);
// walk the tree - expect n rows
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_UNKNOWN, DB_AUTO_COMMIT, 0777); assert(r == 0);
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
i = 0;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == n);
r = txn->commit(txn, 0); assert(r == 0);
r = db->close(db, 0); assert(r == 0);
r = env->close(env, 0); assert(r == 0);
return 0;
}
int main(int argc, const char *argv[]) {
parse_args(argc, argv);
int nodesize = 1024*1024;
int leafentry = 25;
int n = (nodesize/leafentry) * 2;
system("rm -rf " ENVDIR);
mkdir(ENVDIR, 0777);
int r = test_truncate_txn_abort(n);
return r;
}
// truncate a database within a transaction
// commit the transaction and verify that the database is empty
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <db.h>
#include "test.h"
int test_truncate_txn_commit(int n) {
int r;
DB_ENV *env;
DB *db;
DBC *cursor;
r = db_env_create(&env, 0); assert(r == 0);
r = env->open(env, ENVDIR, DB_INIT_MPOOL + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, 0777); assert(r == 0);
int i;
// populate the tree
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
for (i=0; i<n; i++) {
int k = htonl(i); int v = i;
DBT key, val;
r = db->put(db, 0, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
}
r = db->close(db, 0); assert(r == 0);
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_UNKNOWN, DB_AUTO_COMMIT, 0777); assert(r == 0);
DB_TXN *txn;
// walk - expect n rows
r = env->txn_begin(env, NULL, &txn, 0); assert(r == 0);
i = 0;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == n);
r = txn->commit(txn, 0); assert(r == 0);
// truncate the db
r = env->txn_begin(env, NULL, &txn, 0); assert(r == 0);
u_int32_t row_count = 0;
r = db->truncate(db, txn, &row_count, 0); assert(r == 0);
r = txn->commit(txn, 0); assert(r == 0);
// walk - expect 0 rows
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
i = 0;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == 0);
r = txn->commit(txn, 0); assert(r == 0);
r = db->close(db, 0); assert(r == 0);
// walk the tree - expect 0 rows
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_UNKNOWN, DB_AUTO_COMMIT, 0777); assert(r == 0);
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
i = 0;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == 0);
r = txn->commit(txn, 0); assert(r == 0);
r = db->close(db, 0); assert(r == 0);
r = env->close(env, 0); assert(r == 0);
return 0;
}
int main(int argc, const char *argv[]) {
parse_args(argc, argv);
int nodesize = 1024*1024;
int leafentry = 25;
int n = (nodesize/leafentry) * 2;
system("rm -rf " ENVDIR);
mkdir(ENVDIR, 0777);
int r = test_truncate_txn_commit(n);
return r;
}
// truncate a database with open cursors
// verify that the truncate returns EINVAL
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <db.h>
#include "test.h"
// try to truncate with cursors active
int test_truncate_with_cursors(int n) {
int r;
DB_ENV *env;
DB *db;
DBC *cursor;
r = db_env_create(&env, 0); assert(r == 0);
r = env->open(env, ENVDIR, DB_INIT_MPOOL + DB_PRIVATE + DB_CREATE, 0777); assert(r == 0);
int i;
// populate the tree
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
for (i=0; i<n; i++) {
int k = htonl(i); int v = i;
DBT key, val;
r = db->put(db, 0, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
}
r = db->close(db, 0); assert(r == 0);
// test1:
// walk the tree - expect n
// truncate
// walk the tree - expect 0
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_UNKNOWN, 0, 0777); assert(r == 0);
i = 0;
r = db->cursor(db, 0, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
assert(i == n);
// try to truncate with an active cursor
u_int32_t row_count = 0;
r = db->truncate(db, 0, &row_count, 0); assert(r == EINVAL);
r = cursor->c_close(cursor); assert(r == 0);
// ok, now try it
r = db->truncate(db, 0, &row_count, 0); assert(r == 0);
i = 0;
r = db->cursor(db, 0, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == 0);
r = db->close(db, 0); assert(r == 0);
// test 2: walk the tree - expect 0
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, 0, "test.db", 0, DB_UNKNOWN, 0, 0777); assert(r == 0);
i = 0;
r = db->cursor(db, 0, &cursor, 0); assert(r == 0);
while (1) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
if (r == DB_NOTFOUND) break;
i++;
}
r = cursor->c_close(cursor); assert(r == 0);
assert(i == 0);
r = db->close(db, 0); assert(r == 0);
r = env->close(env, 0); assert(r == 0);
return 0;
}
int main(int argc, const char *argv[]) {
parse_args(argc, argv);
int nodesize = 1024*1024;
int leafentry = 25;
int n = (nodesize/leafentry) * 2;
system("rm -rf " ENVDIR);
mkdir(ENVDIR, 0777);
int r = test_truncate_with_cursors(n);
return r;
}
......@@ -25,6 +25,7 @@ struct __toku_db_internal {
int freed;
struct db_header *header;
int database_number; // -1 if it is the single unnamed database. Nonnengative number otherwise.
char *fname;
char *full_fname;
char *database_name;
//int fd;
......
......@@ -1149,6 +1149,7 @@ static int toku_db_close(DB * db, u_int32_t flags) {
int is_panicked = toku_env_is_panicked(db->dbenv);
env_unref(db->dbenv);
toku_free(db->i->database_name);
toku_free(db->i->fname);
toku_free(db->i->full_fname);
toku_free(db->i);
toku_free(db);
......@@ -2571,10 +2572,15 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
r = find_db_file(db->dbenv, fname, &db->i->full_fname);
if (r != 0) goto error_cleanup;
// printf("Full name = %s\n", db->i->full_fname);
assert(db->i->fname == 0);
db->i->fname = toku_strdup(fname);
if (db->i->fname == 0) {
r = ENOMEM; goto error_cleanup;
}
assert(db->i->database_name == 0);
db->i->database_name = toku_strdup(dbname ? dbname : "");
if (db->i->database_name == 0) {
r = ENOMEM;
goto error_cleanup;
r = ENOMEM; goto error_cleanup;
}
if (is_db_rdonly)
openflags |= O_RDONLY;
......@@ -2641,6 +2647,10 @@ error_cleanup:
toku_free(db->i->full_fname);
db->i->full_fname = NULL;
}
if(db->i->fname) {
toku_free(db->i->fname);
db->i->fname = NULL;
}
if (db->i->lt) {
toku_lt_remove_ref(db->i->lt);
db->i->lt = NULL;
......@@ -3042,6 +3052,51 @@ int locked_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
return r;
}
// truncate a database
// effect: remove all of the rows from a database
static int toku_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
int r;
// dont support flags (yet)
if (flags)
return EINVAL;
// dont support cursors
if (toku_brt_get_cursor_count(db->i->brt) > 0)
return EINVAL;
// dont support sub databases (yet)
if (db->i->database_name != 0 && strcmp(db->i->database_name, "") != 0)
return EINVAL;
// dont support associated databases (yet)
if (!list_empty(&db->i->associated))
return EINVAL;
// acquire a table lock
if (txn) {
r = toku_db_pre_acquire_table_lock(db, txn);
if (r != 0)
return r;
}
*row_count = 0;
// flush the cached tree blocks
r = toku_brt_flush(db->i->brt);
if (r != 0)
return r;
// rename the db file and log the rename operation (for now, just unlink the file)
r = unlink(db->i->full_fname);
if (r == -1)
r = errno;
if (r != 0)
return r;
// reopen the new db file
r = toku_brt_reopen(db->i->brt, db->i->full_fname, db->i->fname, txn ? txn->i->tokutxn : NULL_TXN);
return r;
}
static inline int autotxn_db_pget(DB* db, DB_TXN* txn, DBT* key, DBT* pkey,
DBT* data, u_int32_t flags) {
BOOL changed; int r;
......@@ -3132,6 +3187,11 @@ static const DBT* toku_db_dbt_neg_infty(void) {
return toku_lt_neg_infinity;
}
static int locked_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) {
toku_ydb_lock(); int r = toku_db_truncate(db, txn, row_count, flags); toku_ydb_unlock(); return r\
;
}
static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
int r;
......@@ -3185,6 +3245,7 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
SDB(fd);
SDB(pre_acquire_read_lock);
SDB(pre_acquire_table_lock);
SDB(truncate);
#undef SDB
result->dbt_pos_infty = toku_db_dbt_pos_infty;
result->dbt_neg_infty = toku_db_dbt_neg_infty;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment